Skip to content

Commit 3deb3d3

Browse files
committed
[ot] python/qemu/ot: refactor gpiodev.py as modules
Signed-off-by: Emmanuel Blot <eblot@rivosinc.com>
1 parent 232f60d commit 3deb3d3

File tree

8 files changed

+576
-257
lines changed

8 files changed

+576
-257
lines changed

python/qemu/ot/dtm/dtm.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ def __init__(self, engine: JtagEngine, ir_length: int):
234234
self._regs = self._load_registers()
235235
self._sticky: Optional[DMIError] = None
236236

237-
def __getitem__(self, name: str):
237+
def __getitem__(self, name: str) -> DTMRegister:
238238
try:
239239
return self._regs[name]
240240
except KeyError as exc:
@@ -258,11 +258,13 @@ def build_error(cls, code: int) -> Optional[DMIError]:
258258

259259
def read(self, address: int, length: int) -> BitSequence:
260260
"""Read a bit sequence value."""
261+
self._log.debug("read addr: 0x%x len: %d", address, length)
261262
self._engine.write_ir(BitSequence(address, self._ir_length))
262263
return self._engine.read_dr(length)
263264

264265
def write(self, address: int, bseq: BitSequence) -> None:
265266
"""Write a bit sequence value."""
267+
self._log.debug("write addr: 0x%x len: %d", address, len(bseq))
266268
self._engine.write_ir(BitSequence(address, self._ir_length))
267269
self._engine.write_dr(bseq)
268270
self._engine.run()

python/qemu/ot/gpio/__init__.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Copyright (c) 2024 Rivos, Inc.
2+
# All rights reserved.
3+
4+
"""GPIO utilities.
5+
6+
:author: Emmanuel Blot <eblot@rivosinc.com>
7+
"""

python/qemu/ot/gpio/device.py

Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
# Copyright (c) 2024 Rivos, Inc.
2+
# All rights reserved.
3+
4+
"""GPIO device tiny simulator.
5+
6+
:author: Emmanuel Blot <eblot@rivosinc.com>
7+
"""
8+
9+
from logging import getLogger
10+
from socket import create_server, socket, SHUT_RDWR
11+
from time import sleep
12+
from typing import Optional, TextIO
13+
import re
14+
15+
16+
# pylint: disable-msg=missing-function-docstring,missing-class-docstring
17+
18+
class GpioChecker:
19+
20+
CHK_RE = r'^\s*(\d+)?\s*([@]):([0-9a-fA-F]{8})$'
21+
"""Handler either log file or `uniq -c` post-processed file."""
22+
23+
def __init__(self):
24+
self._log = getLogger('gpio.check')
25+
self._seq = []
26+
27+
def load(self, lfp: TextIO) -> None:
28+
commands = ''.join(GpioDevice.INPUT_CMD_MAP.keys())
29+
chk_re = self.CHK_RE.replace('@', commands)
30+
error = 0
31+
for lno, line in enumerate(lfp, start=1):
32+
line = line.strip()
33+
cmo = re.match(chk_re, line)
34+
if not cmo:
35+
self._log.error('Unknown check line @ %d: %s', lno, line)
36+
error += 1
37+
continue
38+
repeat, command, value = cmo.groups()
39+
irepeat = int(repeat) if repeat else 1
40+
ivalue = int(value, 16)
41+
while irepeat:
42+
self._seq.append((command, ivalue))
43+
irepeat -= 1
44+
if error:
45+
raise RuntimeError('Cannot use checker file {lfp.name}')
46+
47+
class Iterator:
48+
49+
def __init__(self, parent: 'GpioChecker'):
50+
self._log = getLogger('gpio.dev.it')
51+
self._iter = enumerate(parent._seq)
52+
self._len = len(parent._seq)
53+
self._pos = 0
54+
55+
def __iter__(self):
56+
return self
57+
58+
def __next__(self) -> tuple[str, int]:
59+
self._pos, value = next(self._iter)
60+
return value
61+
62+
@property
63+
def last(self) -> bool:
64+
self._log.debug("pos: %d len: %d", self._pos, self._len)
65+
return self._pos + 1 >= self._len
66+
67+
def __iter__(self) -> Iterator:
68+
return self.Iterator(self)
69+
70+
71+
class GpioDevice:
72+
73+
INPUT_CMD_MAP = {
74+
'C': 'clear',
75+
'D': 'direction',
76+
'O': 'output',
77+
'P': 'pull',
78+
'Q': 'query',
79+
'Y': 'ynput',
80+
'Z': 'hi_z',
81+
}
82+
83+
OUTPUT_CMD_MAP = {
84+
'input': 'I',
85+
'mask': 'M',
86+
'repeat': 'R',
87+
}
88+
89+
def __init__(self):
90+
self._log = getLogger('gpio.dev')
91+
self._socket = None
92+
self._checker: Optional[TextIO] = None
93+
self._resume = False
94+
self._oe = 0 # output enable (1: out, 0: in)
95+
self._out = 0 # output value (from peer)
96+
self._hiz = 0xffff_ffff # high-Z
97+
self._wpud = 0 # weak pull (1: up 0: down)
98+
self._inact = 0xffff_ffff # input activated
99+
self._in = 0 # input value (to peer)
100+
self._yn = 0 # mirror input value handled by QEMU
101+
self._error_count = 0
102+
self._record = []
103+
104+
def load(self, lfp: TextIO) -> None:
105+
self._checker = GpioChecker()
106+
self._checker.load(lfp)
107+
108+
def save(self, sfp: TextIO) -> None:
109+
for cmd, value in self._record:
110+
print(f'{cmd}:{value:08x}', end='\r\n', file=sfp)
111+
sfp.close()
112+
113+
def run(self, port: int, single_run: bool, fatal: bool,
114+
end: Optional[int]) -> bool:
115+
self._socket = create_server(('localhost', port), reuse_port=True)
116+
fail = False
117+
self._socket.listen(0)
118+
while True:
119+
resume = True
120+
peer, addr = self._socket.accept()
121+
with peer:
122+
self._log.info('Connection from %s:%d', *addr)
123+
# rewind on each connection
124+
it_cmd = iter(self._checker) if self._checker else None
125+
self._error_count = 0
126+
buf = bytearray()
127+
while resume:
128+
if end is not None:
129+
if it_cmd and it_cmd.last:
130+
self._terminate(peer, end)
131+
resume = False
132+
break
133+
data = peer.recv(1024)
134+
if not data:
135+
break
136+
buf.extend(data)
137+
try:
138+
buf = self._process(peer, it_cmd, buf)
139+
except ValueError:
140+
fail = True
141+
resume = False
142+
break
143+
try:
144+
self._log.info('Disconnect from %s:%d', *addr)
145+
peer.close()
146+
peer.shutdown(SHUT_RDWR)
147+
except OSError:
148+
pass
149+
if single_run:
150+
break
151+
if fail and fatal:
152+
break
153+
try:
154+
self._socket.close()
155+
self._socket.shutdown(SHUT_RDWR)
156+
except OSError:
157+
pass
158+
self._socket = None
159+
return not fail
160+
161+
def _process(self, peer: socket, it_cmd: Optional[GpioChecker.Iterator],
162+
buf: bytearray) -> bytearray:
163+
while True:
164+
eol = buf.find(b'\n')
165+
if eol < 0:
166+
return buf
167+
line, buf = buf[:eol], buf[eol+1:]
168+
line = line.strip()
169+
sline = line.decode('utf8')
170+
self._log.debug('in %s', sline)
171+
resp = self._inject(sline, it_cmd)
172+
if resp is not None:
173+
for oline in resp.split('\n'):
174+
if oline:
175+
self._log.info('send %s', oline.strip())
176+
out = resp.encode('utf8')
177+
peer.send(out)
178+
179+
def _terminate(self, peer: socket, end: int) -> None:
180+
resp = self._build_reply(mask=~end, input=end)
181+
for oline in resp.split('\n'):
182+
if oline:
183+
self._log.info('send %s', oline.strip())
184+
out = resp.encode('utf8')
185+
peer.send(out)
186+
sleep(0.1)
187+
188+
def _inject(self, line: str, it_cmd: Optional[GpioChecker.Iterator]) -> \
189+
Optional[str]:
190+
try:
191+
cmd, value = line.split(':', 1)
192+
except ValueError:
193+
self._log.error('Unsupported line: %s', line)
194+
return None
195+
try:
196+
word = int(value, 16)
197+
except ValueError:
198+
self._log.error('Unsupported value: %s', value)
199+
return None
200+
try:
201+
command = self.INPUT_CMD_MAP[cmd]
202+
except KeyError:
203+
self._log.error('Unsupported command: %s', cmd)
204+
return None
205+
handler = getattr(self, f'_inject_{command}', None)
206+
if handler is None:
207+
self._log.warning('Unimplemented handler for %s', command)
208+
return None
209+
self._log.info('recv %s: 0x%08x', command, word)
210+
self._record.append((cmd, word))
211+
# pylint: disable=not-callable
212+
out = handler(word)
213+
if it_cmd:
214+
try:
215+
refcmd, refword = next(it_cmd)
216+
except StopIteration as exc:
217+
self._log.warning('End of checker')
218+
raise ValueError('Unexpected command') from exc
219+
self._log.debug('ck %c:%08x', refcmd, refword)
220+
if cmd != refcmd:
221+
self._log.error('Received command %s differs from expected %s',
222+
cmd, refcmd)
223+
raise ValueError('Command mismatch')
224+
if word != refword:
225+
self._log.error('Received word 0x%08x differs from expected '
226+
'0x%08x', word, refword)
227+
raise ValueError('Value mismatch')
228+
return out
229+
230+
def _inject_clear(self, _) -> None:
231+
self._oe = 0
232+
self._out = 0
233+
self._hiz = 0xfffffff
234+
self._wpud = 0
235+
self._inact = 0
236+
self._in = 0
237+
238+
def _inject_output(self, value) -> None:
239+
self._out = value
240+
241+
def _inject_direction(self, value) -> None:
242+
self._oe = value
243+
244+
def _inject_pull(self, value) -> None:
245+
self._wpud = value
246+
247+
def _inject_query(self, _) -> str:
248+
return self._build_reply(mask=self._inact, input=self._in)
249+
250+
def _inject_hi_z(self, value) -> None:
251+
self._hiz = value
252+
253+
def _inject_ynput(self, value) -> None:
254+
self._yn = value
255+
256+
@classmethod
257+
def _build_reply(cls, **kwargs) -> str:
258+
lines = []
259+
for cmd, value in kwargs.items():
260+
value &= (1 << 32) - 1
261+
lines.append(f'{cls.OUTPUT_CMD_MAP[cmd]}:{value:08x}\r\n')
262+
return ''.join(lines)

0 commit comments

Comments
 (0)