|
| 1 | +import argparse |
| 2 | +import re |
| 3 | +from collections.abc import Sequence |
| 4 | +from typing import Any, Optional, Union |
| 5 | + |
| 6 | +import can |
| 7 | +from can.typechecking import CanFilter, TAdditionalCliArgs |
| 8 | +from can.util import _dict2timing, cast_from_string |
| 9 | + |
| 10 | + |
| 11 | +def add_bus_arguments( |
| 12 | + parser: argparse.ArgumentParser, |
| 13 | + *, |
| 14 | + filter_arg: bool = False, |
| 15 | + prefix: Optional[str] = None, |
| 16 | + group_title: Optional[str] = None, |
| 17 | +) -> None: |
| 18 | + """Adds CAN bus configuration options to an argument parser. |
| 19 | +
|
| 20 | + :param parser: |
| 21 | + The argument parser to which the options will be added. |
| 22 | + :param filter_arg: |
| 23 | + Whether to include the filter argument. |
| 24 | + :param prefix: |
| 25 | + An optional prefix for the argument names, allowing configuration of multiple buses. |
| 26 | + :param group_title: |
| 27 | + The title of the argument group. If not provided, a default title will be generated |
| 28 | + based on the prefix. For example, "bus arguments (prefix)" if a prefix is specified, |
| 29 | + or "bus arguments" otherwise. |
| 30 | + """ |
| 31 | + if group_title is None: |
| 32 | + group_title = f"bus arguments ({prefix})" if prefix else "bus arguments" |
| 33 | + |
| 34 | + group = parser.add_argument_group(group_title) |
| 35 | + |
| 36 | + flags = [f"--{prefix}-channel"] if prefix else ["-c", "--channel"] |
| 37 | + dest = f"{prefix}_channel" if prefix else "channel" |
| 38 | + group.add_argument( |
| 39 | + *flags, |
| 40 | + dest=dest, |
| 41 | + default=argparse.SUPPRESS, |
| 42 | + metavar="CHANNEL", |
| 43 | + help=r"Most backend interfaces require some sort of channel. For " |
| 44 | + r"example with the serial interface the channel might be a rfcomm" |
| 45 | + r' device: "/dev/rfcomm0". With the socketcan interface valid ' |
| 46 | + r'channel examples include: "can0", "vcan0".', |
| 47 | + ) |
| 48 | + |
| 49 | + flags = [f"--{prefix}-interface"] if prefix else ["-i", "--interface"] |
| 50 | + dest = f"{prefix}_interface" if prefix else "interface" |
| 51 | + group.add_argument( |
| 52 | + *flags, |
| 53 | + dest=dest, |
| 54 | + default=argparse.SUPPRESS, |
| 55 | + choices=sorted(can.VALID_INTERFACES), |
| 56 | + help="""Specify the backend CAN interface to use. If left blank, |
| 57 | + fall back to reading from configuration files.""", |
| 58 | + ) |
| 59 | + |
| 60 | + flags = [f"--{prefix}-bitrate"] if prefix else ["-b", "--bitrate"] |
| 61 | + dest = f"{prefix}_bitrate" if prefix else "bitrate" |
| 62 | + group.add_argument( |
| 63 | + *flags, |
| 64 | + dest=dest, |
| 65 | + type=int, |
| 66 | + default=argparse.SUPPRESS, |
| 67 | + metavar="BITRATE", |
| 68 | + help="Bitrate to use for the CAN bus.", |
| 69 | + ) |
| 70 | + |
| 71 | + flags = [f"--{prefix}-fd"] if prefix else ["--fd"] |
| 72 | + dest = f"{prefix}_fd" if prefix else "fd" |
| 73 | + group.add_argument( |
| 74 | + *flags, |
| 75 | + dest=dest, |
| 76 | + default=argparse.SUPPRESS, |
| 77 | + action="store_true", |
| 78 | + help="Activate CAN-FD support", |
| 79 | + ) |
| 80 | + |
| 81 | + flags = [f"--{prefix}-data-bitrate"] if prefix else ["--data-bitrate"] |
| 82 | + dest = f"{prefix}_data_bitrate" if prefix else "data_bitrate" |
| 83 | + group.add_argument( |
| 84 | + *flags, |
| 85 | + dest=dest, |
| 86 | + type=int, |
| 87 | + default=argparse.SUPPRESS, |
| 88 | + metavar="DATA_BITRATE", |
| 89 | + help="Bitrate to use for the data phase in case of CAN-FD.", |
| 90 | + ) |
| 91 | + |
| 92 | + flags = [f"--{prefix}-timing"] if prefix else ["--timing"] |
| 93 | + dest = f"{prefix}_timing" if prefix else "timing" |
| 94 | + group.add_argument( |
| 95 | + *flags, |
| 96 | + dest=dest, |
| 97 | + action=_BitTimingAction, |
| 98 | + nargs=argparse.ONE_OR_MORE, |
| 99 | + default=argparse.SUPPRESS, |
| 100 | + metavar="TIMING_ARG", |
| 101 | + help="Configure bit rate and bit timing. For example, use " |
| 102 | + "`--timing f_clock=8_000_000 tseg1=5 tseg2=2 sjw=2 brp=2 nof_samples=1` for classical CAN " |
| 103 | + "or `--timing f_clock=80_000_000 nom_tseg1=119 nom_tseg2=40 nom_sjw=40 nom_brp=1 " |
| 104 | + "data_tseg1=29 data_tseg2=10 data_sjw=10 data_brp=1` for CAN FD. " |
| 105 | + "Check the python-can documentation to verify whether your " |
| 106 | + "CAN interface supports the `timing` argument.", |
| 107 | + ) |
| 108 | + |
| 109 | + if filter_arg: |
| 110 | + flags = [f"--{prefix}-filter"] if prefix else ["--filter"] |
| 111 | + dest = f"{prefix}_can_filters" if prefix else "can_filters" |
| 112 | + group.add_argument( |
| 113 | + *flags, |
| 114 | + dest=dest, |
| 115 | + nargs=argparse.ONE_OR_MORE, |
| 116 | + action=_CanFilterAction, |
| 117 | + default=argparse.SUPPRESS, |
| 118 | + metavar="{<can_id>:<can_mask>,<can_id>~<can_mask>}", |
| 119 | + help="R|Space separated CAN filters for the given CAN interface:" |
| 120 | + "\n <can_id>:<can_mask> (matches when <received_can_id> & mask ==" |
| 121 | + " can_id & mask)" |
| 122 | + "\n <can_id>~<can_mask> (matches when <received_can_id> & mask !=" |
| 123 | + " can_id & mask)" |
| 124 | + "\nFx to show only frames with ID 0x100 to 0x103 and 0x200 to 0x20F:" |
| 125 | + "\n python -m can.viewer --filter 100:7FC 200:7F0" |
| 126 | + "\nNote that the ID and mask are always interpreted as hex values", |
| 127 | + ) |
| 128 | + |
| 129 | + flags = [f"--{prefix}-bus-kwargs"] if prefix else ["--bus-kwargs"] |
| 130 | + dest = f"{prefix}_bus_kwargs" if prefix else "bus_kwargs" |
| 131 | + group.add_argument( |
| 132 | + *flags, |
| 133 | + dest=dest, |
| 134 | + action=_BusKwargsAction, |
| 135 | + nargs=argparse.ONE_OR_MORE, |
| 136 | + default=argparse.SUPPRESS, |
| 137 | + metavar="BUS_KWARG", |
| 138 | + help="Pass keyword arguments down to the instantiation of the bus class. " |
| 139 | + "For example, `-i vector -c 1 --bus-kwargs app_name=MyCanApp serial=1234` is equivalent " |
| 140 | + "to opening the bus with `can.Bus('vector', channel=1, app_name='MyCanApp', serial=1234)", |
| 141 | + ) |
| 142 | + |
| 143 | + |
| 144 | +def create_bus_from_namespace( |
| 145 | + namespace: argparse.Namespace, |
| 146 | + *, |
| 147 | + prefix: Optional[str] = None, |
| 148 | + **kwargs: Any, |
| 149 | +) -> can.BusABC: |
| 150 | + """Creates and returns a CAN bus instance based on the provided namespace and arguments. |
| 151 | +
|
| 152 | + :param namespace: |
| 153 | + The namespace containing parsed arguments. |
| 154 | + :param prefix: |
| 155 | + An optional prefix for the argument names, enabling support for multiple buses. |
| 156 | + :param kwargs: |
| 157 | + Additional keyword arguments to configure the bus. |
| 158 | + :return: |
| 159 | + A CAN bus instance. |
| 160 | + """ |
| 161 | + config: dict[str, Any] = {"single_handle": True, **kwargs} |
| 162 | + |
| 163 | + for keyword in ( |
| 164 | + "channel", |
| 165 | + "interface", |
| 166 | + "bitrate", |
| 167 | + "fd", |
| 168 | + "data_bitrate", |
| 169 | + "can_filters", |
| 170 | + "timing", |
| 171 | + "bus_kwargs", |
| 172 | + ): |
| 173 | + prefixed_keyword = f"{prefix}_{keyword}" if prefix else keyword |
| 174 | + |
| 175 | + if prefixed_keyword in namespace: |
| 176 | + value = getattr(namespace, prefixed_keyword) |
| 177 | + |
| 178 | + if keyword == "bus_kwargs": |
| 179 | + config.update(value) |
| 180 | + else: |
| 181 | + config[keyword] = value |
| 182 | + |
| 183 | + try: |
| 184 | + return can.Bus(**config) |
| 185 | + except Exception as exc: |
| 186 | + err_msg = f"Unable to instantiate bus from arguments {vars(namespace)}." |
| 187 | + raise argparse.ArgumentError(None, err_msg) from exc |
| 188 | + |
| 189 | + |
| 190 | +class _CanFilterAction(argparse.Action): |
| 191 | + def __call__( |
| 192 | + self, |
| 193 | + parser: argparse.ArgumentParser, |
| 194 | + namespace: argparse.Namespace, |
| 195 | + values: Union[str, Sequence[Any], None], |
| 196 | + option_string: Optional[str] = None, |
| 197 | + ) -> None: |
| 198 | + if not isinstance(values, list): |
| 199 | + raise argparse.ArgumentError(self, "Invalid filter argument") |
| 200 | + |
| 201 | + print(f"Adding filter(s): {values}") |
| 202 | + can_filters: list[CanFilter] = [] |
| 203 | + |
| 204 | + for filt in values: |
| 205 | + if ":" in filt: |
| 206 | + parts = filt.split(":") |
| 207 | + can_id = int(parts[0], base=16) |
| 208 | + can_mask = int(parts[1], base=16) |
| 209 | + elif "~" in filt: |
| 210 | + parts = filt.split("~") |
| 211 | + can_id = int(parts[0], base=16) | 0x20000000 # CAN_INV_FILTER |
| 212 | + can_mask = int(parts[1], base=16) & 0x20000000 # socket.CAN_ERR_FLAG |
| 213 | + else: |
| 214 | + raise argparse.ArgumentError(self, "Invalid filter argument") |
| 215 | + can_filters.append({"can_id": can_id, "can_mask": can_mask}) |
| 216 | + |
| 217 | + setattr(namespace, self.dest, can_filters) |
| 218 | + |
| 219 | + |
| 220 | +class _BitTimingAction(argparse.Action): |
| 221 | + def __call__( |
| 222 | + self, |
| 223 | + parser: argparse.ArgumentParser, |
| 224 | + namespace: argparse.Namespace, |
| 225 | + values: Union[str, Sequence[Any], None], |
| 226 | + option_string: Optional[str] = None, |
| 227 | + ) -> None: |
| 228 | + if not isinstance(values, list): |
| 229 | + raise argparse.ArgumentError(self, "Invalid --timing argument") |
| 230 | + |
| 231 | + timing_dict: dict[str, int] = {} |
| 232 | + for arg in values: |
| 233 | + try: |
| 234 | + key, value_string = arg.split("=") |
| 235 | + value = int(value_string) |
| 236 | + timing_dict[key] = value |
| 237 | + except ValueError: |
| 238 | + raise argparse.ArgumentError( |
| 239 | + self, f"Invalid timing argument: {arg}" |
| 240 | + ) from None |
| 241 | + |
| 242 | + if not (timing := _dict2timing(timing_dict)): |
| 243 | + err_msg = "Invalid --timing argument. Incomplete parameters." |
| 244 | + raise argparse.ArgumentError(self, err_msg) |
| 245 | + |
| 246 | + setattr(namespace, self.dest, timing) |
| 247 | + print(timing) |
| 248 | + |
| 249 | + |
| 250 | +class _BusKwargsAction(argparse.Action): |
| 251 | + def __call__( |
| 252 | + self, |
| 253 | + parser: argparse.ArgumentParser, |
| 254 | + namespace: argparse.Namespace, |
| 255 | + values: Union[str, Sequence[Any], None], |
| 256 | + option_string: Optional[str] = None, |
| 257 | + ) -> None: |
| 258 | + if not isinstance(values, list): |
| 259 | + raise argparse.ArgumentError(self, "Invalid --bus-kwargs argument") |
| 260 | + |
| 261 | + bus_kwargs: dict[str, Union[str, int, float, bool]] = {} |
| 262 | + |
| 263 | + for arg in values: |
| 264 | + try: |
| 265 | + match = re.match( |
| 266 | + r"^(?P<name>[_a-zA-Z][_a-zA-Z0-9]*)=(?P<value>\S*?)$", |
| 267 | + arg, |
| 268 | + ) |
| 269 | + if not match: |
| 270 | + raise ValueError |
| 271 | + key = match["name"].replace("-", "_") |
| 272 | + string_val = match["value"] |
| 273 | + bus_kwargs[key] = cast_from_string(string_val) |
| 274 | + except ValueError: |
| 275 | + raise argparse.ArgumentError( |
| 276 | + self, |
| 277 | + f"Unable to parse bus keyword argument '{arg}'", |
| 278 | + ) from None |
| 279 | + |
| 280 | + setattr(namespace, self.dest, bus_kwargs) |
| 281 | + |
| 282 | + |
| 283 | +def _add_extra_args( |
| 284 | + parser: Union[argparse.ArgumentParser, argparse._ArgumentGroup], |
| 285 | +) -> None: |
| 286 | + parser.add_argument( |
| 287 | + "extra_args", |
| 288 | + nargs=argparse.REMAINDER, |
| 289 | + help="The remaining arguments will be used for logger/player initialisation. " |
| 290 | + "For example, `can_logger -i virtual -c test -f logfile.blf --compression-level=9` " |
| 291 | + "passes the keyword argument `compression_level=9` to the BlfWriter.", |
| 292 | + ) |
| 293 | + |
| 294 | + |
| 295 | +def _parse_additional_config(unknown_args: Sequence[str]) -> TAdditionalCliArgs: |
| 296 | + for arg in unknown_args: |
| 297 | + if not re.match(r"^--[a-zA-Z][a-zA-Z0-9\-]*=\S*?$", arg): |
| 298 | + raise ValueError(f"Parsing argument {arg} failed") |
| 299 | + |
| 300 | + def _split_arg(_arg: str) -> tuple[str, str]: |
| 301 | + left, right = _arg.split("=", 1) |
| 302 | + return left.lstrip("-").replace("-", "_"), right |
| 303 | + |
| 304 | + args: dict[str, Union[str, int, float, bool]] = {} |
| 305 | + for key, string_val in map(_split_arg, unknown_args): |
| 306 | + args[key] = cast_from_string(string_val) |
| 307 | + return args |
| 308 | + |
| 309 | + |
| 310 | +def _set_logging_level_from_namespace(namespace: argparse.Namespace) -> None: |
| 311 | + if "verbosity" in namespace: |
| 312 | + logging_level_names = [ |
| 313 | + "critical", |
| 314 | + "error", |
| 315 | + "warning", |
| 316 | + "info", |
| 317 | + "debug", |
| 318 | + "subdebug", |
| 319 | + ] |
| 320 | + can.set_logging_level(logging_level_names[min(5, namespace.verbosity)]) |
0 commit comments