Skip to content

Commit 11dbd6e

Browse files
author
luoja
committed
Improve serial port performance by threading
1 parent 9a766ce commit 11dbd6e

File tree

1 file changed

+99
-75
lines changed

1 file changed

+99
-75
lines changed

can/interfaces/slcan.py

Lines changed: 99 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
import io
66
import logging
77
import time
8-
from typing import Any, Optional, Tuple
9-
8+
from typing import Any, Optional, Tuple, List
9+
from queue import Queue, Empty
10+
import threading
11+
from serial.tools import list_ports
1012
from can import BusABC, CanProtocol, Message, typechecking
1113

1214
from ..exceptions import (
@@ -22,8 +24,7 @@
2224
import serial
2325
except ImportError:
2426
logger.warning(
25-
"You won't be able to use the slcan can backend without "
26-
"the serial module installed!"
27+
"You won't be able to use the slcan can backend without " "the serial module installed!"
2728
)
2829
serial = None
2930

@@ -132,6 +133,10 @@ def __init__(
132133
rtscts=False,
133134
**kwargs,
134135
)
136+
self._timestamp_offset = time.time() - time.perf_counter()
137+
self.queue_read = Queue()
138+
self.event_read = threading.Event()
139+
threading.Thread(None, target=self._read_can_thread, args=(self.event_read,)).start()
135140

136141
def set_bitrate(self, bitrate: int) -> None:
137142
"""
@@ -164,33 +169,9 @@ def _write(self, string: str) -> None:
164169
self.serialPortOrig.write(string.encode() + self.LINE_TERMINATOR)
165170
self.serialPortOrig.flush()
166171

167-
def _read(self, timeout: Optional[float]) -> Optional[str]:
168-
_timeout = serial.Timeout(timeout)
169-
170-
with error_check("Could not read from serial device"):
171-
while True:
172-
# Due to accessing `serialPortOrig.in_waiting` too often will reduce the performance.
173-
# We read the `serialPortOrig.in_waiting` only once here.
174-
in_waiting = self.serialPortOrig.in_waiting
175-
for _ in range(max(1, in_waiting)):
176-
new_byte = self.serialPortOrig.read(size=1)
177-
if new_byte:
178-
self._buffer.extend(new_byte)
179-
else:
180-
break
181-
182-
if new_byte in (self._ERROR, self._OK):
183-
string = self._buffer.decode()
184-
self._buffer.clear()
185-
return string
186-
187-
if _timeout.expired():
188-
break
189-
190-
return None
191-
192172
def flush(self) -> None:
193173
self._buffer.clear()
174+
self.queue_read = Queue()
194175
with error_check("Could not flush"):
195176
self.serialPortOrig.reset_input_buffer()
196177

@@ -203,55 +184,81 @@ def open(self) -> None:
203184
def close(self) -> None:
204185
self._write("C")
205186

206-
def _recv_internal(
207-
self, timeout: Optional[float]
208-
) -> Tuple[Optional[Message], bool]:
187+
def _recv_internal(self, timeout: Optional[float]):
188+
try:
189+
return self.queue_read.get(timeout=timeout), False
190+
except Empty:
191+
return None, False
192+
193+
def _read_can_thread(self, event_recv_send_batch_zlg):
194+
while not event_recv_send_batch_zlg.is_set():
195+
msgs = self._read_can()
196+
for i in msgs:
197+
self.queue_read.put(i)
198+
time.sleep(0.005)
199+
200+
def _read_can(self) -> List[Message]:
209201
canId = None
210202
remote = False
211203
extended = False
212204
data = None
205+
msgs = []
213206

214-
string = self._read(timeout)
207+
with error_check("Could not read from serial device"):
208+
# Due to accessing `serialPortOrig.in_waiting` too often will reduce the performance.
209+
# We read the `serialPortOrig.in_waiting` only once here.
210+
in_waiting = self.serialPortOrig.in_waiting
211+
for _ in range(in_waiting):
212+
new_byte = self.serialPortOrig.read(size=1)
213+
if new_byte:
214+
self._buffer.extend(new_byte)
215+
else:
216+
break
215217

216-
if not string:
217-
pass
218-
elif string[0] in (
219-
"T",
220-
"x", # x is an alternative extended message identifier for CANDapter
221-
):
222-
# extended frame
223-
canId = int(string[1:9], 16)
224-
dlc = int(string[9])
225-
extended = True
226-
data = bytearray.fromhex(string[10 : 10 + dlc * 2])
227-
elif string[0] == "t":
228-
# normal frame
229-
canId = int(string[1:4], 16)
230-
dlc = int(string[4])
231-
data = bytearray.fromhex(string[5 : 5 + dlc * 2])
232-
elif string[0] == "r":
233-
# remote frame
234-
canId = int(string[1:4], 16)
235-
dlc = int(string[4])
236-
remote = True
237-
elif string[0] == "R":
238-
# remote extended frame
239-
canId = int(string[1:9], 16)
240-
dlc = int(string[9])
241-
extended = True
242-
remote = True
243-
244-
if canId is not None:
245-
msg = Message(
246-
arbitration_id=canId,
247-
is_extended_id=extended,
248-
timestamp=time.time(), # Better than nothing...
249-
is_remote_frame=remote,
250-
dlc=dlc,
251-
data=data,
252-
)
253-
return msg, False
254-
return None, False
218+
if new_byte in (self._ERROR, self._OK):
219+
string = self._buffer.decode()
220+
self._buffer.clear()
221+
222+
if not string:
223+
pass
224+
elif string[0] in (
225+
"T",
226+
"x", # x is an alternative extended message identifier for CANDapter
227+
):
228+
# extended frame
229+
canId = int(string[1:9], 16)
230+
dlc = int(string[9])
231+
extended = True
232+
data = bytearray.fromhex(string[10 : 10 + dlc * 2])
233+
elif string[0] == "t":
234+
# normal frame
235+
canId = int(string[1:4], 16)
236+
dlc = int(string[4])
237+
data = bytearray.fromhex(string[5 : 5 + dlc * 2])
238+
elif string[0] == "r":
239+
# remote frame
240+
canId = int(string[1:4], 16)
241+
dlc = int(string[4])
242+
remote = True
243+
elif string[0] == "R":
244+
# remote extended frame
245+
canId = int(string[1:9], 16)
246+
dlc = int(string[9])
247+
extended = True
248+
remote = True
249+
250+
if canId is not None:
251+
msg = Message(
252+
arbitration_id=canId,
253+
is_extended_id=extended,
254+
timestamp=self._timestamp_offset
255+
+ time.perf_counter(), # Better than nothing...
256+
is_remote_frame=remote,
257+
dlc=dlc,
258+
data=data,
259+
)
260+
msgs.append(msg)
261+
return msgs
255262

256263
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
257264
if timeout != self.serialPortOrig.write_timeout:
@@ -271,6 +278,7 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None:
271278

272279
def shutdown(self) -> None:
273280
super().shutdown()
281+
self.event_read.set()
274282
self.close()
275283
with error_check("Could not close serial socket"):
276284
self.serialPortOrig.close()
@@ -285,9 +293,7 @@ def fileno(self) -> int:
285293
except Exception as exception:
286294
raise CanOperationError("Cannot fetch fileno") from exception
287295

288-
def get_version(
289-
self, timeout: Optional[float]
290-
) -> Tuple[Optional[int], Optional[int]]:
296+
def get_version(self, timeout: Optional[float]) -> Tuple[Optional[int], Optional[int]]:
291297
"""Get HW and SW version of the slcan interface.
292298
293299
:param timeout:
@@ -334,3 +340,21 @@ def get_serial_number(self, timeout: Optional[float]) -> Optional[str]:
334340
return serial_number
335341

336342
return None
343+
344+
@staticmethod
345+
def _detect_available_configs():
346+
"""
347+
Identify slcan devices
348+
"""
349+
ports = []
350+
351+
for p in list_ports.comports():
352+
ports.append((p.device, p.description))
353+
return [
354+
{
355+
"interface": "slcan",
356+
"channel": port,
357+
"name": des,
358+
}
359+
for port, des in ports
360+
]

0 commit comments

Comments
 (0)