diff --git a/can/io/asc.py b/can/io/asc.py index c8a2aada3..1ad3acef6 100644 --- a/can/io/asc.py +++ b/can/io/asc.py @@ -6,6 +6,9 @@ - under `test/data/logfile.asc` """ +from typing import cast, Any, Generator, IO, List, Optional, Tuple, Union +from can import typechecking + from datetime import datetime import time import logging @@ -32,7 +35,11 @@ class ASCReader(BaseIOHandler): TODO: turn relative timestamps back to absolute form """ - def __init__(self, file, base="hex"): + def __init__( + self, + file: Union[typechecking.FileLike, typechecking.StringPathLike], + base: str = "hex", + ) -> None: """ :param file: a path-like object or as file-like object to read from If this is a file-like object, is has to opened in text @@ -42,10 +49,13 @@ def __init__(self, file, base="hex"): this value will be overwritten. Default "hex". """ super().__init__(file, mode="r") + + if not self.file: + raise ValueError("The given file cannot be None") self.base = base @staticmethod - def _extract_can_id(str_can_id, base): + def _extract_can_id(str_can_id: str, base: int) -> Tuple[int, bool]: if str_can_id[-1:].lower() == "x": is_extended = True can_id = int(str_can_id[0:-1], base) @@ -55,13 +65,15 @@ def _extract_can_id(str_can_id, base): return can_id, is_extended @staticmethod - def _check_base(base): + def _check_base(base: str) -> int: if base not in ["hex", "dec"]: raise ValueError('base should be either "hex" or "dec"') return BASE_DEC if base == "dec" else BASE_HEX - def __iter__(self): + def __iter__(self) -> Generator[Message, None, None]: base = self._check_base(self.base) + # This is guaranteed to not be None since we raise ValueError in __init__ + self.file = cast(IO[Any], self.file) for line in self.file: # logger.debug("ASCReader: parsing line: '%s'", line.splitlines()[0]) if line.split(" ")[0] == "base": @@ -194,7 +206,11 @@ class ASCWriter(BaseIOHandler, Listener): FORMAT_DATE = "%a %b %d %I:%M:%S.{} %p %Y" FORMAT_EVENT = "{timestamp: 9.6f} {message}\n" - def __init__(self, file, channel=1): + def __init__( + self, + file: Union[typechecking.FileLike, typechecking.StringPathLike], + channel: int = 1, + ) -> None: """ :param file: a path-like object or as file-like object to write to If this is a file-like object, is has to opened in text @@ -203,6 +219,9 @@ def __init__(self, file, channel=1): have a channel set """ super().__init__(file, mode="w") + if not self.file: + raise ValueError("The given file cannot be None") + self.channel = channel # write start of file header @@ -213,24 +232,29 @@ def __init__(self, file, channel=1): # the last part is written with the timestamp of the first message self.header_written = False - self.last_timestamp = None - self.started = None + self.last_timestamp = 0.0 + self.started = 0.0 - def stop(self): + def stop(self) -> None: + # This is guaranteed to not be None since we raise ValueError in __init__ + self.file = cast(IO[Any], self.file) if not self.file.closed: self.file.write("End TriggerBlock\n") super().stop() - def log_event(self, message, timestamp=None): + def log_event(self, message: str, timestamp: Optional[float] = None) -> None: """Add a message to the log file. - :param str message: an arbitrary message - :param float timestamp: the absolute timestamp of the event + :param message: an arbitrary message + :param timestamp: the absolute timestamp of the event """ if not message: # if empty or None logger.debug("ASCWriter: ignoring empty message") return + # This is guaranteed to not be None since we raise ValueError in __init__ + self.file = cast(IO[Any], self.file) + # this is the case for the very first message: if not self.header_written: self.last_timestamp = timestamp or 0.0 @@ -251,14 +275,14 @@ def log_event(self, message, timestamp=None): line = self.FORMAT_EVENT.format(timestamp=timestamp, message=message) self.file.write(line) - def on_message_received(self, msg): + def on_message_received(self, msg: Message) -> None: if msg.is_error_frame: self.log_event("{} ErrorFrame".format(self.channel), msg.timestamp) return if msg.is_remote_frame: dtype = "r" - data = [] + data: List[str] = [] else: dtype = "d {}".format(msg.dlc) data = ["{:02X}".format(byte) for byte in msg.data]