1
1
#!/usr/bin/env python
2
2
3
- import unittest
4
- from typing import cast
3
+ import unittest . mock
4
+ from typing import cast , Optional
5
5
6
- import serial
6
+ from serial . serialutil import SerialBase
7
7
8
8
import can .interfaces .slcan
9
9
21
21
TIMEOUT = 0.5 if IS_PYPY else 0.01 # 0.001 is the default set in slcanBus
22
22
23
23
24
+ class SerialMock (SerialBase ):
25
+ def __init__ (self , * args , ** kwargs ) -> None :
26
+ super ().__init__ (* args , ** kwargs )
27
+
28
+ self ._input_buffer = b""
29
+ self ._output_buffer = b""
30
+
31
+ def open (self ) -> None :
32
+ self .is_open = True
33
+
34
+ def close (self ) -> None :
35
+ self .is_open = False
36
+ self ._input_buffer = b""
37
+ self ._output_buffer = b""
38
+
39
+ def read (self , size : int = - 1 , / ) -> bytes :
40
+ if size > 0 :
41
+ data = self ._input_buffer [:size ]
42
+ self ._input_buffer = self ._input_buffer [size :]
43
+ return data
44
+ return b""
45
+
46
+ def write (self , b : bytes , / ) -> Optional [int ]:
47
+ self ._output_buffer = b
48
+ if b == b"N\r " :
49
+ self .set_input_buffer (b"NA123\r " )
50
+ elif b == b"V\r " :
51
+ self .set_input_buffer (b"V1013\r " )
52
+ return len (b )
53
+
54
+ def set_input_buffer (self , expected : bytes ) -> None :
55
+ self ._input_buffer = expected
56
+
57
+ def get_output_buffer (self ) -> bytes :
58
+ return self ._output_buffer
59
+
60
+ def reset_input_buffer (self ) -> None :
61
+ self ._input_buffer = b""
62
+
63
+ @property
64
+ def in_waiting (self ) -> int :
65
+ return len (self ._input_buffer )
66
+
67
+ @classmethod
68
+ def serial_for_url (cls , * args , ** kwargs ) -> SerialBase :
69
+ return cls (* args , ** kwargs )
70
+
71
+
24
72
class slcanTestCase (unittest .TestCase ):
73
+ @unittest .mock .patch ("serial.serial_for_url" , SerialMock .serial_for_url )
25
74
def setUp (self ):
26
75
self .bus = cast (
27
76
can .interfaces .slcan .slcanBus ,
28
77
can .Bus ("loop://" , interface = "slcan" , sleep_after_open = 0 , timeout = TIMEOUT ),
29
78
)
30
- self .serial = cast (serial . Serial , self .bus .serialPortOrig )
79
+ self .serial = cast (SerialMock , self .bus .serialPortOrig )
31
80
self .serial .reset_input_buffer ()
32
81
33
82
def tearDown (self ):
34
83
self .bus .shutdown ()
35
84
36
85
def test_recv_extended (self ):
37
- self .serial .write (b"T12ABCDEF2AA55\r " )
86
+ self .serial .set_input_buffer (b"T12ABCDEF2AA55\r " )
38
87
msg = self .bus .recv (TIMEOUT )
39
88
self .assertIsNotNone (msg )
40
89
self .assertEqual (msg .arbitration_id , 0x12ABCDEF )
@@ -44,7 +93,7 @@ def test_recv_extended(self):
44
93
self .assertSequenceEqual (msg .data , [0xAA , 0x55 ])
45
94
46
95
# Ewert Energy Systems CANDapter specific
47
- self .serial .write (b"x12ABCDEF2AA55\r " )
96
+ self .serial .set_input_buffer (b"x12ABCDEF2AA55\r " )
48
97
msg = self .bus .recv (TIMEOUT )
49
98
self .assertIsNotNone (msg )
50
99
self .assertEqual (msg .arbitration_id , 0x12ABCDEF )
@@ -54,15 +103,19 @@ def test_recv_extended(self):
54
103
self .assertSequenceEqual (msg .data , [0xAA , 0x55 ])
55
104
56
105
def test_send_extended (self ):
106
+ payload = b"T12ABCDEF2AA55\r "
57
107
msg = can .Message (
58
108
arbitration_id = 0x12ABCDEF , is_extended_id = True , data = [0xAA , 0x55 ]
59
109
)
60
110
self .bus .send (msg )
111
+ self .assertEqual (payload , self .serial .get_output_buffer ())
112
+
113
+ self .serial .set_input_buffer (payload )
61
114
rx_msg = self .bus .recv (TIMEOUT )
62
115
self .assertTrue (msg .equals (rx_msg , timestamp_delta = None ))
63
116
64
117
def test_recv_standard (self ):
65
- self .serial .write (b"t4563112233\r " )
118
+ self .serial .set_input_buffer (b"t4563112233\r " )
66
119
msg = self .bus .recv (TIMEOUT )
67
120
self .assertIsNotNone (msg )
68
121
self .assertEqual (msg .arbitration_id , 0x456 )
@@ -72,15 +125,19 @@ def test_recv_standard(self):
72
125
self .assertSequenceEqual (msg .data , [0x11 , 0x22 , 0x33 ])
73
126
74
127
def test_send_standard (self ):
128
+ payload = b"t4563112233\r "
75
129
msg = can .Message (
76
130
arbitration_id = 0x456 , is_extended_id = False , data = [0x11 , 0x22 , 0x33 ]
77
131
)
78
132
self .bus .send (msg )
133
+ self .assertEqual (payload , self .serial .get_output_buffer ())
134
+
135
+ self .serial .set_input_buffer (payload )
79
136
rx_msg = self .bus .recv (TIMEOUT )
80
137
self .assertTrue (msg .equals (rx_msg , timestamp_delta = None ))
81
138
82
139
def test_recv_standard_remote (self ):
83
- self .serial .write (b"r1238\r " )
140
+ self .serial .set_input_buffer (b"r1238\r " )
84
141
msg = self .bus .recv (TIMEOUT )
85
142
self .assertIsNotNone (msg )
86
143
self .assertEqual (msg .arbitration_id , 0x123 )
@@ -89,15 +146,19 @@ def test_recv_standard_remote(self):
89
146
self .assertEqual (msg .dlc , 8 )
90
147
91
148
def test_send_standard_remote (self ):
149
+ payload = b"r1238\r "
92
150
msg = can .Message (
93
151
arbitration_id = 0x123 , is_extended_id = False , is_remote_frame = True , dlc = 8
94
152
)
95
153
self .bus .send (msg )
154
+ self .assertEqual (payload , self .serial .get_output_buffer ())
155
+
156
+ self .serial .set_input_buffer (payload )
96
157
rx_msg = self .bus .recv (TIMEOUT )
97
158
self .assertTrue (msg .equals (rx_msg , timestamp_delta = None ))
98
159
99
160
def test_recv_extended_remote (self ):
100
- self .serial .write (b"R12ABCDEF6\r " )
161
+ self .serial .set_input_buffer (b"R12ABCDEF6\r " )
101
162
msg = self .bus .recv (TIMEOUT )
102
163
self .assertIsNotNone (msg )
103
164
self .assertEqual (msg .arbitration_id , 0x12ABCDEF )
@@ -106,19 +167,23 @@ def test_recv_extended_remote(self):
106
167
self .assertEqual (msg .dlc , 6 )
107
168
108
169
def test_send_extended_remote (self ):
170
+ payload = b"R12ABCDEF6\r "
109
171
msg = can .Message (
110
172
arbitration_id = 0x12ABCDEF , is_extended_id = True , is_remote_frame = True , dlc = 6
111
173
)
112
174
self .bus .send (msg )
175
+ self .assertEqual (payload , self .serial .get_output_buffer ())
176
+
177
+ self .serial .set_input_buffer (payload )
113
178
rx_msg = self .bus .recv (TIMEOUT )
114
179
self .assertTrue (msg .equals (rx_msg , timestamp_delta = None ))
115
180
116
181
def test_partial_recv (self ):
117
- self .serial .write (b"T12ABCDEF" )
182
+ self .serial .set_input_buffer (b"T12ABCDEF" )
118
183
msg = self .bus .recv (TIMEOUT )
119
184
self .assertIsNone (msg )
120
185
121
- self .serial .write (b"2AA55\r T12" )
186
+ self .serial .set_input_buffer (b"2AA55\r T12" )
122
187
msg = self .bus .recv (TIMEOUT )
123
188
self .assertIsNotNone (msg )
124
189
self .assertEqual (msg .arbitration_id , 0x12ABCDEF )
@@ -130,28 +195,21 @@ def test_partial_recv(self):
130
195
msg = self .bus .recv (TIMEOUT )
131
196
self .assertIsNone (msg )
132
197
133
- self .serial .write (b"ABCDEF2AA55\r " )
198
+ self .serial .set_input_buffer (b"ABCDEF2AA55\r " )
134
199
msg = self .bus .recv (TIMEOUT )
135
200
self .assertIsNotNone (msg )
136
201
137
202
def test_version (self ):
138
- self .serial .write (b"V1013\r " )
139
203
hw_ver , sw_ver = self .bus .get_version (0 )
204
+ self .assertEqual (b"V\r " , self .serial .get_output_buffer ())
140
205
self .assertEqual (hw_ver , 10 )
141
206
self .assertEqual (sw_ver , 13 )
142
207
143
- hw_ver , sw_ver = self .bus .get_version (0 )
144
- self .assertIsNone (hw_ver )
145
- self .assertIsNone (sw_ver )
146
-
147
208
def test_serial_number (self ):
148
- self .serial .write (b"NA123\r " )
149
209
sn = self .bus .get_serial_number (0 )
210
+ self .assertEqual (b"N\r " , self .serial .get_output_buffer ())
150
211
self .assertEqual (sn , "A123" )
151
212
152
- sn = self .bus .get_serial_number (0 )
153
- self .assertIsNone (sn )
154
-
155
213
156
214
if __name__ == "__main__" :
157
215
unittest .main ()
0 commit comments