11
11
import os
12
12
from datetime import datetime , timedelta , timezone
13
13
from enum import Enum
14
- from typing import Any , Callable , Dict , Generator , List , Optional , TextIO , Union
14
+ from typing import Any , Callable , Generator , Optional , TextIO , Union , Tuple , Dict
15
15
16
16
from ..message import Message
17
17
from ..typechecking import StringPathLike
18
- from ..util import channel2int , dlc2len , len2dlc
18
+ from ..util import channel2int , len2dlc
19
19
from .generic import (
20
20
TextIOMessageReader ,
21
21
TextIOMessageWriter ,
@@ -58,13 +58,14 @@ def __init__(
58
58
"""
59
59
super ().__init__ (file , mode = "r" )
60
60
self .file_version = TRCFileVersion .UNKNOWN
61
- self .start_time : Optional [ datetime ] = None
61
+ self .start_time : float = 0
62
62
self .columns : Dict [str , int ] = {}
63
+ self .num_columns = - 1
63
64
64
65
if not self .file :
65
66
raise ValueError ("The given file cannot be None" )
66
67
67
- self ._parse_cols : Callable [[List [str ]], Optional [Message ]] = lambda x : None
68
+ self ._parse_cols : Callable [[Tuple [str , ... ]], Optional [Message ]] = lambda x : None
68
69
69
70
def _extract_header (self ):
70
71
line = ""
@@ -89,16 +90,17 @@ def _extract_header(self):
89
90
elif line .startswith (";$STARTTIME" ):
90
91
logger .debug ("TRCReader: Found start time '%s'" , line )
91
92
try :
92
- self .start_time = datetime (
93
+ self .start_time = ( datetime (
93
94
1899 , 12 , 30 , tzinfo = timezone .utc
94
- ) + timedelta (days = float (line .split ("=" )[1 ]))
95
+ ) + timedelta (days = float (line .split ("=" )[1 ]))). timestamp ()
95
96
except IndexError :
96
97
logger .debug ("TRCReader: Failed to parse start time" )
97
98
elif line .startswith (";$COLUMNS" ):
98
99
logger .debug ("TRCReader: Found columns '%s'" , line )
99
100
try :
100
101
columns = line .split ("=" )[1 ].split ("," )
101
102
self .columns = {column : columns .index (column ) for column in columns }
103
+ self .num_columns = len (columns ) - 1
102
104
except IndexError :
103
105
logger .debug ("TRCReader: Failed to parse columns" )
104
106
elif line .startswith (";" ):
@@ -132,7 +134,7 @@ def _extract_header(self):
132
134
133
135
return line
134
136
135
- def _parse_msg_v1_0 (self , cols : List [str ]) -> Optional [Message ]:
137
+ def _parse_msg_v1_0 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
136
138
arbit_id = cols [2 ]
137
139
if arbit_id == "FFFFFFFF" :
138
140
logger .info ("TRCReader: Dropping bus info line" )
@@ -147,16 +149,11 @@ def _parse_msg_v1_0(self, cols: List[str]) -> Optional[Message]:
147
149
msg .data = bytearray ([int (cols [i + 4 ], 16 ) for i in range (msg .dlc )])
148
150
return msg
149
151
150
- def _parse_msg_v1_1 (self , cols : List [str ]) -> Optional [Message ]:
152
+ def _parse_msg_v1_1 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
151
153
arbit_id = cols [3 ]
152
154
153
155
msg = Message ()
154
- if isinstance (self .start_time , datetime ):
155
- msg .timestamp = (
156
- self .start_time + timedelta (milliseconds = float (cols [1 ]))
157
- ).timestamp ()
158
- else :
159
- msg .timestamp = float (cols [1 ]) / 1000
156
+ msg .timestamp = float (cols [1 ]) / 1000 + self .start_time
160
157
msg .arbitration_id = int (arbit_id , 16 )
161
158
msg .is_extended_id = len (arbit_id ) > 4
162
159
msg .channel = 1
@@ -165,16 +162,11 @@ def _parse_msg_v1_1(self, cols: List[str]) -> Optional[Message]:
165
162
msg .is_rx = cols [2 ] == "Rx"
166
163
return msg
167
164
168
- def _parse_msg_v1_3 (self , cols : List [str ]) -> Optional [Message ]:
165
+ def _parse_msg_v1_3 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
169
166
arbit_id = cols [4 ]
170
167
171
168
msg = Message ()
172
- if isinstance (self .start_time , datetime ):
173
- msg .timestamp = (
174
- self .start_time + timedelta (milliseconds = float (cols [1 ]))
175
- ).timestamp ()
176
- else :
177
- msg .timestamp = float (cols [1 ]) / 1000
169
+ msg .timestamp = float (cols [1 ]) / 1000 + self .start_time
178
170
msg .arbitration_id = int (arbit_id , 16 )
179
171
msg .is_extended_id = len (arbit_id ) > 4
180
172
msg .channel = int (cols [2 ])
@@ -183,7 +175,7 @@ def _parse_msg_v1_3(self, cols: List[str]) -> Optional[Message]:
183
175
msg .is_rx = cols [3 ] == "Rx"
184
176
return msg
185
177
186
- def _parse_msg_v2_x (self , cols : List [str ]) -> Optional [Message ]:
178
+ def _parse_msg_v2_x (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
187
179
type_ = cols [self .columns ["T" ]]
188
180
bus = self .columns .get ("B" , None )
189
181
@@ -192,50 +184,43 @@ def _parse_msg_v2_x(self, cols: List[str]) -> Optional[Message]:
192
184
dlc = len2dlc (length )
193
185
elif "L" in self .columns :
194
186
dlc = int (cols [self .columns ["L" ]])
195
- length = dlc2len (dlc )
196
187
else :
197
188
raise ValueError ("No length/dlc columns present." )
198
189
199
190
msg = Message ()
200
- if isinstance (self .start_time , datetime ):
201
- msg .timestamp = (
202
- self .start_time + timedelta (milliseconds = float (cols [self .columns ["O" ]]))
203
- ).timestamp ()
204
- else :
205
- msg .timestamp = float (cols [1 ]) / 1000
191
+ msg .timestamp = float (cols [self .columns ["O" ]]) / 1000 + self .start_time
206
192
msg .arbitration_id = int (cols [self .columns ["I" ]], 16 )
207
193
msg .is_extended_id = len (cols [self .columns ["I" ]]) > 4
208
194
msg .channel = int (cols [bus ]) if bus is not None else 1
209
195
msg .dlc = dlc
210
- msg .data = bytearray (
211
- [int (cols [i + self .columns ["D" ]], 16 ) for i in range (length )]
212
- )
196
+ if dlc :
197
+ msg .data = bytearray .fromhex (cols [self .columns ["D" ]])
213
198
msg .is_rx = cols [self .columns ["d" ]] == "Rx"
214
- msg .is_fd = type_ in [ "FD" , "FB" , "FE" , "BI" ]
215
- msg .bitrate_switch = type_ in [ "FB" , " FE" ]
216
- msg .error_state_indicator = type_ in [ "FE" , "BI" ]
199
+ msg .is_fd = type_ in { "FD" , "FB" , "FE" , "BI" }
200
+ msg .bitrate_switch = type_ in { "FB" , "FE" }
201
+ msg .error_state_indicator = type_ in { "FE" , "BI" }
217
202
218
203
return msg
219
204
220
- def _parse_cols_v1_1 (self , cols : List [str ]) -> Optional [Message ]:
205
+ def _parse_cols_v1_1 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
221
206
dtype = cols [2 ]
222
207
if dtype in ("Tx" , "Rx" ):
223
208
return self ._parse_msg_v1_1 (cols )
224
209
else :
225
210
logger .info ("TRCReader: Unsupported type '%s'" , dtype )
226
211
return None
227
212
228
- def _parse_cols_v1_3 (self , cols : List [str ]) -> Optional [Message ]:
213
+ def _parse_cols_v1_3 (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
229
214
dtype = cols [3 ]
230
215
if dtype in ("Tx" , "Rx" ):
231
216
return self ._parse_msg_v1_3 (cols )
232
217
else :
233
218
logger .info ("TRCReader: Unsupported type '%s'" , dtype )
234
219
return None
235
220
236
- def _parse_cols_v2_x (self , cols : List [str ]) -> Optional [Message ]:
221
+ def _parse_cols_v2_x (self , cols : Tuple [str , ... ]) -> Optional [Message ]:
237
222
dtype = cols [self .columns ["T" ]]
238
- if dtype in [ "DT" , "FD" , "FB" ] :
223
+ if dtype in { "DT" , "FD" , "FB" , "FE" , "BI" } :
239
224
return self ._parse_msg_v2_x (cols )
240
225
else :
241
226
logger .info ("TRCReader: Unsupported type '%s'" , dtype )
@@ -244,7 +229,7 @@ def _parse_cols_v2_x(self, cols: List[str]) -> Optional[Message]:
244
229
def _parse_line (self , line : str ) -> Optional [Message ]:
245
230
logger .debug ("TRCReader: Parse '%s'" , line )
246
231
try :
247
- cols = line .split ()
232
+ cols = tuple ( line .split (maxsplit = self . num_columns ) )
248
233
return self ._parse_cols (cols )
249
234
except IndexError :
250
235
logger .warning ("TRCReader: Failed to parse message '%s'" , line )
0 commit comments