|
17 | 17 | import can
|
18 | 18 | from can import Bus, BusState, Logger, SizedRotatingLogger
|
19 | 19 | from can.typechecking import TAdditionalCliArgs
|
20 |
| -from can.util import cast_from_string |
| 20 | +from can.util import _dict2timing, cast_from_string |
21 | 21 |
|
22 | 22 | if TYPE_CHECKING:
|
23 | 23 | from can.io import BaseRotatingLogger
|
@@ -58,6 +58,19 @@ def _create_base_argument_parser(parser: argparse.ArgumentParser) -> None:
|
58 | 58 | help="Bitrate to use for the data phase in case of CAN-FD.",
|
59 | 59 | )
|
60 | 60 |
|
| 61 | + parser.add_argument( |
| 62 | + "--timing", |
| 63 | + action=_BitTimingAction, |
| 64 | + nargs=argparse.ONE_OR_MORE, |
| 65 | + help="Configure bit rate and bit timing. For example, use " |
| 66 | + "`--timing f_clock=8_000_000 tseg1=5 tseg2=2 sjw=2 brp=2 nof_samples=1` for classical CAN " |
| 67 | + "or `--timing f_clock=80_000_000 nom_tseg1=119 nom_tseg2=40 nom_sjw=40 nom_brp=1 " |
| 68 | + "data_tseg1=29 data_tseg2=10 data_sjw=10 data_brp=1` for CAN FD. " |
| 69 | + "Check the python-can documentation to verify whether your " |
| 70 | + "CAN interface supports the `timing` argument.", |
| 71 | + metavar="TIMING_ARG", |
| 72 | + ) |
| 73 | + |
61 | 74 | parser.add_argument(
|
62 | 75 | "extra_args",
|
63 | 76 | nargs=argparse.REMAINDER,
|
@@ -109,6 +122,8 @@ def _create_bus(parsed_args: argparse.Namespace, **kwargs: Any) -> can.BusABC:
|
109 | 122 | config["data_bitrate"] = parsed_args.data_bitrate
|
110 | 123 | if getattr(parsed_args, "can_filters", None):
|
111 | 124 | config["can_filters"] = parsed_args.can_filters
|
| 125 | + if parsed_args.timing: |
| 126 | + config["timing"] = parsed_args.timing |
112 | 127 |
|
113 | 128 | return Bus(parsed_args.channel, **config)
|
114 | 129 |
|
@@ -143,6 +158,36 @@ def __call__(
|
143 | 158 | setattr(namespace, self.dest, can_filters)
|
144 | 159 |
|
145 | 160 |
|
| 161 | +class _BitTimingAction(argparse.Action): |
| 162 | + def __call__( |
| 163 | + self, |
| 164 | + parser: argparse.ArgumentParser, |
| 165 | + namespace: argparse.Namespace, |
| 166 | + values: Union[str, Sequence[Any], None], |
| 167 | + option_string: Optional[str] = None, |
| 168 | + ) -> None: |
| 169 | + if not isinstance(values, list): |
| 170 | + raise argparse.ArgumentError(None, "Invalid --timing argument") |
| 171 | + |
| 172 | + timing_dict: Dict[str, int] = {} |
| 173 | + for arg in values: |
| 174 | + try: |
| 175 | + key, value_string = arg.split("=") |
| 176 | + value = int(value_string) |
| 177 | + timing_dict[key] = value |
| 178 | + except ValueError: |
| 179 | + raise argparse.ArgumentError( |
| 180 | + None, f"Invalid timing argument: {arg}" |
| 181 | + ) from None |
| 182 | + |
| 183 | + if not (timing := _dict2timing(timing_dict)): |
| 184 | + err_msg = "Invalid --timing argument. Incomplete parameters." |
| 185 | + raise argparse.ArgumentError(None, err_msg) |
| 186 | + |
| 187 | + setattr(namespace, self.dest, timing) |
| 188 | + print(timing) |
| 189 | + |
| 190 | + |
146 | 191 | def _parse_additional_config(unknown_args: Sequence[str]) -> TAdditionalCliArgs:
|
147 | 192 | for arg in unknown_args:
|
148 | 193 | if not re.match(r"^--[a-zA-Z][a-zA-Z0-9\-]*=\S*?$", arg):
|
|
0 commit comments