diff --git a/can/__init__.py b/can/__init__.py index 544d4fe75..618ef347f 100644 --- a/can/__init__.py +++ b/can/__init__.py @@ -12,7 +12,7 @@ log = logging.getLogger("can") -rc: Dict[str, Any] = dict() +rc: Dict[str, Any] = {} from .listener import Listener, BufferedReader, RedirectReader, AsyncBufferedReader diff --git a/can/interfaces/canalystii.py b/can/interfaces/canalystii.py index 256ac582c..f75a06c30 100644 --- a/can/interfaces/canalystii.py +++ b/can/interfaces/canalystii.py @@ -53,7 +53,7 @@ def __init__( elif isinstance(channel, int): self.channels = [channel] else: # Sequence[int] - self.channels = [c for c in channel] + self.channels = list(channel) self.rx_queue = collections.deque( maxlen=rx_queue_size diff --git a/can/interfaces/cantact.py b/can/interfaces/cantact.py index 5eab7b3f3..3c538550c 100644 --- a/can/interfaces/cantact.py +++ b/can/interfaces/cantact.py @@ -39,7 +39,7 @@ def _detect_available_configs(): channels = [] for i in range(0, interface.channel_count()): - channels.append({"interface": "cantact", "channel": "ch:%d" % i}) + channels.append({"interface": "cantact", "channel": f"ch:{i}"}) return channels def __init__( diff --git a/can/interfaces/ics_neovi/neovi_bus.py b/can/interfaces/ics_neovi/neovi_bus.py index ec08e80ef..cdbf2a978 100644 --- a/can/interfaces/ics_neovi/neovi_bus.py +++ b/can/interfaces/ics_neovi/neovi_bus.py @@ -226,8 +226,8 @@ def channel_to_netid(channel_name_or_id): channel = getattr(ics, netid) else: raise ValueError( - "channel must be an integer or " "a valid ICS channel name" - ) + "channel must be an integer or a valid ICS channel name" + ) from None return channel @staticmethod diff --git a/can/interfaces/ixxat/canlib.py b/can/interfaces/ixxat/canlib.py index 13709c71c..ced840ff3 100644 --- a/can/interfaces/ixxat/canlib.py +++ b/can/interfaces/ixxat/canlib.py @@ -604,12 +604,12 @@ def _inWaiting(self): return 1 def flush_tx_buffer(self): - """ Flushes the transmit buffer on the IXXAT """ + """Flushes the transmit buffer on the IXXAT""" # TODO #64: no timeout? _canlib.canChannelWaitTxEvent(self._channel_handle, constants.INFINITE) def _recv_internal(self, timeout): - """ Read a message from IXXAT device. """ + """Read a message from IXXAT device.""" # TODO: handling CAN error messages? data_received = False diff --git a/can/interfaces/ixxat/exceptions.py b/can/interfaces/ixxat/exceptions.py index 3bc0e1111..babe08e3b 100644 --- a/can/interfaces/ixxat/exceptions.py +++ b/can/interfaces/ixxat/exceptions.py @@ -21,15 +21,15 @@ class VCITimeout(CanTimeoutError): - """ Wraps the VCI_E_TIMEOUT error """ + """Wraps the VCI_E_TIMEOUT error""" class VCIError(CanOperationError): - """ Try to display errors that occur within the wrapped C library nicely. """ + """Try to display errors that occur within the wrapped C library nicely.""" class VCIRxQueueEmptyError(VCIError): - """ Wraps the VCI_E_RXQUEUE_EMPTY error """ + """Wraps the VCI_E_RXQUEUE_EMPTY error""" def __init__(self): super().__init__("Receive queue is empty") diff --git a/can/interfaces/kvaser/canlib.py b/can/interfaces/kvaser/canlib.py index 9d6d6b64a..4886c70fb 100644 --- a/can/interfaces/kvaser/canlib.py +++ b/can/interfaces/kvaser/canlib.py @@ -713,11 +713,7 @@ def get_channel_info(channel): ctypes.sizeof(number), ) - return "%s, S/N %d (#%d)" % ( - name.value.decode("ascii"), - serial.value, - number.value + 1, - ) + return f"{name.value.decode('ascii')}, S/N {serial.value} (#{number.value + 1})" init_kvaser_library() diff --git a/can/interfaces/neousys/neousys.py b/can/interfaces/neousys/neousys.py index c996c8298..d4a89fd03 100644 --- a/can/interfaces/neousys/neousys.py +++ b/can/interfaces/neousys/neousys.py @@ -47,7 +47,7 @@ class NeousysCanSetup(Structure): - """ C CAN Setup struct """ + """C CAN Setup struct""" _fields_ = [ ("bitRate", c_uint), @@ -58,7 +58,7 @@ class NeousysCanSetup(Structure): class NeousysCanMsg(Structure): - """ C CAN Message struct """ + """C CAN Message struct""" _fields_ = [ ("id", c_uint), @@ -75,7 +75,7 @@ class NeousysCanMsg(Structure): # valid:1~4, Resynchronization Jump Width in time quanta # valid:1~1023, CAN_CLK divider used to determine time quanta class NeousysCanBitClk(Structure): - """ C CAN BIT Clock struct """ + """C CAN BIT Clock struct""" _fields_ = [ ("syncPropPhase1Seg", c_ushort), diff --git a/can/interfaces/nixnet.py b/can/interfaces/nixnet.py index f50daf4c9..a39884017 100644 --- a/can/interfaces/nixnet.py +++ b/can/interfaces/nixnet.py @@ -30,7 +30,7 @@ ) except ImportError: logger.error("Error, NIXNET python module cannot be loaded.") - raise ImportError() + raise else: logger.error("NI-XNET interface is only available on Windows systems") raise NotImplementedError("NiXNET is not supported on not Win32 platforms") diff --git a/can/interfaces/seeedstudio/seeedstudio.py b/can/interfaces/seeedstudio/seeedstudio.py index 7cfa6d670..f250e95a5 100644 --- a/can/interfaces/seeedstudio/seeedstudio.py +++ b/can/interfaces/seeedstudio/seeedstudio.py @@ -113,7 +113,7 @@ def __init__( "could not create the serial device" ) from error - super(SeeedBus, self).__init__(channel=channel, *args, **kwargs) + super().__init__(channel=channel, *args, **kwargs) self.init_frame() def shutdown(self): diff --git a/can/interfaces/socketcan/utils.py b/can/interfaces/socketcan/utils.py index 1316d153c..b718bb69e 100644 --- a/can/interfaces/socketcan/utils.py +++ b/can/interfaces/socketcan/utils.py @@ -11,7 +11,7 @@ from typing import cast, Iterable, Optional from can.interfaces.socketcan.constants import CAN_EFF_FLAG -import can.typechecking as typechecking +from can import typechecking log = logging.getLogger(__name__) diff --git a/can/interfaces/systec/structures.py b/can/interfaces/systec/structures.py index fbebdcdbd..841474b80 100644 --- a/can/interfaces/systec/structures.py +++ b/can/interfaces/systec/structures.py @@ -268,7 +268,7 @@ class HardwareInfoEx(Structure): ] def __init__(self): - super(HardwareInfoEx, self).__init__(sizeof(HardwareInfoEx)) + super().__init__(sizeof(HardwareInfoEx)) def __eq__(self, other): if not isinstance(other, HardwareInfoEx): diff --git a/can/interfaces/udp_multicast/bus.py b/can/interfaces/udp_multicast/bus.py index 86a78a1ff..df08af880 100644 --- a/can/interfaces/udp_multicast/bus.py +++ b/can/interfaces/udp_multicast/bus.py @@ -121,13 +121,13 @@ def _recv_internal(self, timeout: Optional[float]): return can_message, False - def send(self, message: can.Message, timeout: Optional[float] = None) -> None: - if not self.is_fd and message.is_fd: + def send(self, msg: can.Message, timeout: Optional[float] = None) -> None: + if not self.is_fd and msg.is_fd: raise can.CanOperationError( "cannot send FD message over bus with CAN FD disabled" ) - data = pack_message(message) + data = pack_message(msg) self._multicast.send(data, timeout) def fileno(self) -> int: @@ -186,8 +186,9 @@ def __init__( sock = self._create_socket(address_family) except OSError as error: log.info( - f"could not connect to the multicast IP network of candidate %s; reason: {error}", + "could not connect to the multicast IP network of candidate %s; reason: %s", connection_candidates, + error, ) if sock is not None: self._socket = sock diff --git a/can/interfaces/vector/canlib.py b/can/interfaces/vector/canlib.py index 22d99eda9..e9f299e74 100644 --- a/can/interfaces/vector/canlib.py +++ b/can/interfaces/vector/canlib.py @@ -38,8 +38,6 @@ ) from can.typechecking import AutoDetectedConfig, CanFilters -from .exceptions import VectorError - # Define Module Logger # ==================== LOG = logging.getLogger(__name__) @@ -140,7 +138,8 @@ def __init__( """ if os.name != "nt" and not kwargs.get("_testing", False): raise CanInterfaceNotImplementedError( - f'The Vector interface is only supported on Windows, but you are running "{os.name}"' + f"The Vector interface is only supported on Windows, " + f'but you are running "{os.name}"' ) if xldriver is None: @@ -163,7 +162,7 @@ def __init__( self._app_name = app_name.encode() if app_name is not None else b"" self.channel_info = "Application %s: %s" % ( app_name, - ", ".join("CAN %d" % (ch + 1) for ch in self.channels), + ", ".join(f"CAN {ch + 1}" for ch in self.channels), ) if serial is not None: @@ -358,8 +357,8 @@ def _apply_filters(self, filters) -> None: if can_filter.get("extended") else xldefine.XL_AcceptanceFilter.XL_CAN_STD, ) - except VectorOperationError as exc: - LOG.warning("Could not set filters: %s", exc) + except VectorOperationError as exception: + LOG.warning("Could not set filters: %s", exception) # go to fallback else: self._is_filtered = True @@ -400,8 +399,8 @@ def _recv_internal( else: msg = self._recv_can() - except VectorOperationError as exc: - if exc.error_code != xldefine.XL_Status.XL_ERR_QUEUE_IS_EMPTY: + except VectorOperationError as exception: + if exception.error_code != xldefine.XL_Status.XL_ERR_QUEUE_IS_EMPTY: raise else: if msg: @@ -435,7 +434,7 @@ def _recv_canfd(self) -> Optional[Message]: data_struct = xl_can_rx_event.tagData.canTxOkMsg else: self.handle_canfd_event(xl_can_rx_event) - return + return None msg_id = data_struct.canId dlc = dlc2len(data_struct.dlc) @@ -475,7 +474,7 @@ def _recv_can(self) -> Optional[Message]: if xl_event.tag != xldefine.XL_EventTags.XL_RECEIVE_MSG: self.handle_can_event(xl_event) - return + return None msg_id = xl_event.tagData.msg.id dlc = xl_event.tagData.msg.dlc @@ -520,8 +519,8 @@ def handle_canfd_event(self, event: xlclass.XLcanRxEvent) -> None: when `event.tag` is not `XL_CAN_EV_TAG_RX_OK` or `XL_CAN_EV_TAG_TX_OK`. Subclasses can implement this method. - :param event: `XLcanRxEvent` that could have a `XL_CAN_EV_TAG_RX_ERROR`, `XL_CAN_EV_TAG_TX_ERROR`, - `XL_TIMER` or `XL_CAN_EV_TAG_CHIP_STATE` tag. + :param event: `XLcanRxEvent` that could have a `XL_CAN_EV_TAG_RX_ERROR`, + `XL_CAN_EV_TAG_TX_ERROR`, `XL_TIMER` or `XL_CAN_EV_TAG_CHIP_STATE` tag. """ def send(self, msg: Message, timeout: Optional[float] = None): diff --git a/can/interfaces/virtual.py b/can/interfaces/virtual.py index f6d071723..6c73b7ba8 100644 --- a/can/interfaces/virtual.py +++ b/can/interfaces/virtual.py @@ -67,7 +67,7 @@ def __init__( # the channel identifier may be an arbitrary object self.channel_id = channel - self.channel_info = "Virtual bus channel {}".format(self.channel_id) + self.channel_info = f"Virtual bus channel {self.channel_id}" self.receive_own_messages = receive_own_messages self._open = True diff --git a/can/io/asc.py b/can/io/asc.py index 347ecf3c6..d708de6e7 100644 --- a/can/io/asc.py +++ b/can/io/asc.py @@ -6,7 +6,7 @@ - under `test/data/logfile.asc` """ -from typing import cast, Any, Generator, IO, List, Optional, Union, Dict +from typing import cast, Any, Generator, IO, List, Optional, Dict from datetime import datetime import time @@ -73,8 +73,10 @@ def _extract_header(self): elif lower_case.startswith("base"): try: _, base, _, timestamp_format = line.split() - except ValueError: - raise Exception("Unsupported header string format: {}".format(line)) + except ValueError as exception: + raise Exception( + f"Unsupported header string format: {line}" + ) from exception self.base = base self._converted_base = self._check_base(self.base) self.timestamps_format = timestamp_format @@ -135,8 +137,8 @@ def _process_classic_can_frame( # Error Frame msg_kwargs["is_error_frame"] = True else: - abr_id_str, dir, rest_of_message = line.split(None, 2) - msg_kwargs["is_rx"] = dir == "Rx" + abr_id_str, direction, rest_of_message = line.split(None, 2) + msg_kwargs["is_rx"] = direction == "Rx" self._extract_can_id(abr_id_str, msg_kwargs) if rest_of_message[0].lower() == "r": @@ -164,10 +166,10 @@ def _process_classic_can_frame( return Message(**msg_kwargs) def _process_fd_can_frame(self, line: str, msg_kwargs: Dict[str, Any]) -> Message: - channel, dir, rest_of_message = line.split(None, 2) + channel, direction, rest_of_message = line.split(None, 2) # See ASCWriter msg_kwargs["channel"] = int(channel) - 1 - msg_kwargs["is_rx"] = dir == "Rx" + msg_kwargs["is_rx"] = direction == "Rx" # CAN FD error frame if rest_of_message.strip()[:10].lower() == "errorframe": @@ -291,7 +293,7 @@ def __init__( # write start of file header now = datetime.now().strftime(self.FORMAT_START_OF_FILE_DATE) - self.file.write("date %s\n" % now) + self.file.write(f"date {now}\n") self.file.write("base hex timestamps absolute\n") self.file.write("internal events logged\n") @@ -327,7 +329,7 @@ def log_event(self, message: str, timestamp: Optional[float] = None) -> None: formatted_date = time.strftime( self.FORMAT_DATE.format(mlsec), time.localtime(self.last_timestamp) ) - self.file.write("Begin Triggerblock %s\n" % formatted_date) + self.file.write(f"Begin Triggerblock {formatted_date}\n") self.header_written = True self.log_event("Start of measurement") # caution: this is a recursive call! # Use last known timestamp if unknown @@ -342,15 +344,15 @@ def log_event(self, message: str, timestamp: Optional[float] = None) -> None: def on_message_received(self, msg: Message) -> None: if msg.is_error_frame: - self.log_event("{} ErrorFrame".format(self.channel), msg.timestamp) + self.log_event(f"{self.channel} ErrorFrame", msg.timestamp) return if msg.is_remote_frame: - dtype = "r {:x}".format(msg.dlc) # New after v8.5 + dtype = f"r {msg.dlc:x}" # New after v8.5 data: List[str] = [] else: - dtype = "d {:x}".format(msg.dlc) - data = ["{:02X}".format(byte) for byte in msg.data] - arb_id = "{:X}".format(msg.arbitration_id) + dtype = f"d {msg.dlc:x}" + data = [f"{byte:02X}" for byte in msg.data] + arb_id = f"{msg.arbitration_id:X}" if msg.is_extended_id: arb_id += "x" channel = channel2int(msg.channel) diff --git a/can/io/blf.py b/can/io/blf.py index d53e1296f..9bb54d984 100644 --- a/can/io/blf.py +++ b/can/io/blf.py @@ -228,7 +228,7 @@ def _parse_data(self, data): if pos + 8 > max_pos: # Not enough data in container return - raise BLFParseError("Could not find next object") + raise BLFParseError("Could not find next object") from None header = unpack_obj_header_base(data, pos) # print(header) signature, _, header_version, obj_size, obj_type = header @@ -258,7 +258,7 @@ def _parse_data(self, data): factor = 1e-5 if flags == 1 else 1e-9 timestamp = timestamp * factor + start_timestamp - if obj_type == CAN_MESSAGE or obj_type == CAN_MESSAGE2: + if obj_type in (CAN_MESSAGE, CAN_MESSAGE2): channel, flags, dlc, can_id, can_data = unpack_can_msg(data, pos) yield Message( timestamp=timestamp, diff --git a/can/io/generic.py b/can/io/generic.py index 44caa80c6..1606c444d 100644 --- a/can/io/generic.py +++ b/can/io/generic.py @@ -43,6 +43,7 @@ def __init__( # file is None or some file-like object self.file = cast(Optional[can.typechecking.FileLike], file) else: + # pylint: disable=consider-using-with # file is some path-like object self.file = open(cast(can.typechecking.StringPathLike, file), mode) diff --git a/can/io/logger.py b/can/io/logger.py index ec77d7ac3..1fc2a8487 100644 --- a/can/io/logger.py +++ b/can/io/logger.py @@ -132,7 +132,8 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.writer_args = args self.writer_kwargs = kwargs - self._writer: FileIOMessageWriter = None # type: ignore # Expected to be set by the subclass + # Expected to be set by the subclass + self._writer: FileIOMessageWriter = None # type: ignore @property def writer(self) -> FileIOMessageWriter: @@ -209,7 +210,8 @@ def _get_new_writer(self, filename: StringPathLike) -> FileIOMessageWriter: return cast(FileIOMessageWriter, logger) else: raise Exception( - "The Logger corresponding to the arguments is not a FileIOMessageWriter or can.Printer" + "The Logger corresponding to the arguments is not a FileIOMessageWriter or " + "can.Printer" ) def stop(self) -> None: @@ -283,8 +285,8 @@ class SizedRotatingLogger(BaseRotatingLogger): def __init__( self, base_filename: StringPathLike, - max_bytes: int = 0, *args: Any, + max_bytes: int = 0, **kwargs: Any, ) -> None: """ diff --git a/can/io/sqlite.py b/can/io/sqlite.py index c947ab3e3..8d184bce1 100644 --- a/can/io/sqlite.py +++ b/can/io/sqlite.py @@ -46,9 +46,7 @@ def __init__(self, file, table_name="messages"): self.table_name = table_name def __iter__(self): - for frame_data in self._cursor.execute( - "SELECT * FROM {}".format(self.table_name) - ): + for frame_data in self._cursor.execute(f"SELECT * FROM {self.table_name}"): yield SqliteReader._assemble_message(frame_data) @staticmethod @@ -66,7 +64,7 @@ def _assemble_message(frame_data): def __len__(self): # this might not run in constant time - result = self._cursor.execute("SELECT COUNT(*) FROM {}".format(self.table_name)) + result = self._cursor.execute(f"SELECT COUNT(*) FROM {self.table_name}") return int(result.fetchone()[0]) def read_all(self): @@ -74,9 +72,7 @@ def read_all(self): :rtype: Generator[can.Message] """ - result = self._cursor.execute( - "SELECT * FROM {}".format(self.table_name) - ).fetchall() + result = self._cursor.execute(f"SELECT * FROM {self.table_name}").fetchall() return (SqliteReader._assemble_message(frame) for frame in result) def stop(self): @@ -164,20 +160,16 @@ def _create_db(self): # create table structure self._conn.cursor().execute( - """ - CREATE TABLE IF NOT EXISTS {} - ( - ts REAL, - arbitration_id INTEGER, - extended INTEGER, - remote INTEGER, - error INTEGER, - dlc INTEGER, - data BLOB - ) - """.format( - self.table_name - ) + f"""CREATE TABLE IF NOT EXISTS {self.table_name} + ( + ts REAL, + arbitration_id INTEGER, + extended INTEGER, + remote INTEGER, + error INTEGER, + dlc INTEGER, + data BLOB + )""" ) self._conn.commit() @@ -209,9 +201,9 @@ def _db_writer_thread(self): or len(messages) > self.MAX_BUFFER_SIZE_BEFORE_WRITES ): break - else: - # just go on - msg = self.get_message(self.GET_MESSAGE_TIMEOUT) + + # just go on + msg = self.get_message(self.GET_MESSAGE_TIMEOUT) count = len(messages) if count > 0: diff --git a/can/listener.py b/can/listener.py index 815c7ecef..8ed4f7b77 100644 --- a/can/listener.py +++ b/can/listener.py @@ -142,7 +142,7 @@ class AsyncBufferedReader(Listener): def __init__(self, **kwargs: Any) -> None: self.buffer: "asyncio.Queue[Message]" - if "loop" in kwargs.keys(): + if "loop" in kwargs: warnings.warn( "The 'loop' argument is deprecated since python-can 4.0.0 " "and has no effect starting with Python 3.10", diff --git a/can/message.py b/can/message.py index 11df695be..29c97a39b 100644 --- a/can/message.py +++ b/can/message.py @@ -109,11 +109,11 @@ def __init__( # pylint: disable=too-many-locals, too-many-arguments self._check() def __str__(self) -> str: - field_strings = ["Timestamp: {0:>15.6f}".format(self.timestamp)] + field_strings = [f"Timestamp: {self.timestamp:>15.6f}"] if self.is_extended_id: - arbitration_id_string = "ID: {0:08x}".format(self.arbitration_id) + arbitration_id_string = f"ID: {self.arbitration_id:08x}" else: - arbitration_id_string = "ID: {0:04x}".format(self.arbitration_id) + arbitration_id_string = f"ID: {self.arbitration_id:04x}" field_strings.append(arbitration_id_string.rjust(12, " ")) flag_string = " ".join( @@ -130,22 +130,22 @@ def __str__(self) -> str: field_strings.append(flag_string) - field_strings.append("DLC: {0:2d}".format(self.dlc)) + field_strings.append("DLC: {self.dlc:2d}") data_strings = [] if self.data is not None: for index in range(0, min(self.dlc, len(self.data))): - data_strings.append("{0:02x}".format(self.data[index])) + data_strings.append(f"{self.data[index]:02x}") if data_strings: # if not empty field_strings.append(" ".join(data_strings).ljust(24, " ")) else: field_strings.append(" " * 24) if (self.data is not None) and (self.data.isalnum()): - field_strings.append("'{}'".format(self.data.decode("utf-8", "replace"))) + field_strings.append(f"'{self.data.decode('utf-8', 'replace')}'") if self.channel is not None: try: - field_strings.append("Channel: {}".format(self.channel)) + field_strings.append(f"Channel: {self.channel}") except UnicodeEncodeError: pass @@ -160,32 +160,32 @@ def __bool__(self) -> bool: def __repr__(self) -> str: args = [ - "timestamp={}".format(self.timestamp), - "arbitration_id={:#x}".format(self.arbitration_id), - "is_extended_id={}".format(self.is_extended_id), + f"timestamp={self.timestamp}", + f"arbitration_id={self.arbitration_id:#x}", + f"is_extended_id={self.is_extended_id}", ] if not self.is_rx: args.append("is_rx=False") if self.is_remote_frame: - args.append("is_remote_frame={}".format(self.is_remote_frame)) + args.append(f"is_remote_frame={self.is_remote_frame}") if self.is_error_frame: - args.append("is_error_frame={}".format(self.is_error_frame)) + args.append(f"is_error_frame={self.is_error_frame}") if self.channel is not None: - args.append("channel={!r}".format(self.channel)) + args.append(f"channel={self.channel!r}") - data = ["{:#02x}".format(byte) for byte in self.data] - args += ["dlc={}".format(self.dlc), "data=[{}]".format(", ".join(data))] + data = [f"{byte:#02x}" for byte in self.data] + args += [f"dlc={self.dlc}", f"data=[{', '.join(data)}]"] if self.is_fd: args.append("is_fd=True") - args.append("bitrate_switch={}".format(self.bitrate_switch)) - args.append("error_state_indicator={}".format(self.error_state_indicator)) + args.append(f"bitrate_switch={self.bitrate_switch}") + args.append(f"error_state_indicator={self.error_state_indicator}") - return "can.Message({})".format(", ".join(args)) + return f"can.Message({', '.join(args)})" def __format__(self, format_spec: Optional[str]) -> str: if not format_spec: diff --git a/can/notifier.py b/can/notifier.py index 3b80c7461..2554fb4fc 100644 --- a/can/notifier.py +++ b/can/notifier.py @@ -76,7 +76,7 @@ def add_bus(self, bus: BusABC) -> None: reader_thread = threading.Thread( target=self._rx_thread, args=(bus,), - name='can.notifier for bus "{}"'.format(bus.channel_info), + name=f'can.notifier for bus "{bus.channel_info}"', ) reader_thread.daemon = True reader_thread.start() diff --git a/can/util.py b/can/util.py index 57ccbdf55..e43a4d09d 100644 --- a/can/util.py +++ b/can/util.py @@ -339,7 +339,7 @@ def _rename_kwargs( ) kwargs[new] = value else: - warnings.warn("{} is deprecated".format(alias), DeprecationWarning) + warnings.warn(f"{alias} is deprecated", DeprecationWarning) def time_perfcounter_correlation() -> Tuple[float, float]: diff --git a/can/viewer.py b/can/viewer.py index 7e290bc4e..a92d18cdd 100644 --- a/can/viewer.py +++ b/can/viewer.py @@ -167,7 +167,7 @@ def unpack_data(cmd: int, cmd_to_struct: Dict, data: bytes) -> List[float]: return values - raise ValueError("Unknown command: 0x{:02X}".format(cmd)) + raise ValueError(f"Unknown command: 0x{cmd:02X}") def draw_can_bus_message(self, msg, sorting=False): # Use the CAN-Bus ID as the key in the dict @@ -233,12 +233,10 @@ def draw_can_bus_message(self, msg, sorting=False): self.draw_line( self.ids[key]["row"], 8, - "{0:.6f}".format(self.ids[key]["msg"].timestamp - self.start_time), + f"{self.ids[key]['msg'].timestamp - self.start_time:.6f}", color, ) - self.draw_line( - self.ids[key]["row"], 23, "{0:.6f}".format(self.ids[key]["dt"]), color - ) + self.draw_line(self.ids[key]["row"], 23, f"{self.ids[key]['dt']:.6f}", color) self.draw_line(self.ids[key]["row"], 35, arbitration_id_string, color) self.draw_line(self.ids[key]["row"], 47, str(msg.dlc), color) for i, b in enumerate(msg.data): @@ -247,7 +245,7 @@ def draw_can_bus_message(self, msg, sorting=False): # Data does not fit self.draw_line(self.ids[key]["row"], col - 4, "...", color) break - text = "{:02X}".format(b) + text = f"{b:02X}" self.draw_line(self.ids[key]["row"], col, text, color) if self.data_structs: @@ -257,7 +255,7 @@ def draw_can_bus_message(self, msg, sorting=False): msg.arbitration_id, self.data_structs, msg.data ): if isinstance(x, float): - values_list.append("{0:.6f}".format(x)) + values_list.append(f"{x:.6f}") else: values_list.append(str(x)) values_string = " ".join(values_list) @@ -292,8 +290,8 @@ def draw_header(self): def redraw_screen(self): # Trigger a complete redraw self.draw_header() - for key in self.ids: - self.draw_can_bus_message(self.ids[key]["msg"]) + for key, ids in self.ids.items(): + self.draw_can_bus_message(ids["msg"]) class SmartFormatter(argparse.HelpFormatter): @@ -305,12 +303,12 @@ def _format_usage(self, usage, actions, groups, prefix): return super()._format_usage(usage, actions, groups, "Usage: ") def _format_args(self, action, default_metavar): - if action.nargs != argparse.REMAINDER and action.nargs != argparse.ONE_OR_MORE: + if action.nargs not in (argparse.REMAINDER, argparse.ONE_OR_MORE): return super()._format_args(action, default_metavar) # Use the metavar if "REMAINDER" or "ONE_OR_MORE" is set get_metavar = self._metavar_formatter(action, default_metavar) - return "%s" % get_metavar(1) + return str(get_metavar(1)) def _format_action_invocation(self, action): if not action.option_strings or action.nargs == 0: @@ -323,9 +321,9 @@ def _format_action_invocation(self, action): args_string = self._format_args(action, default) for i, option_string in enumerate(action.option_strings): if i == len(action.option_strings) - 1: - parts.append("%s %s" % (option_string, args_string)) + parts.append(f"{option_string} {args_string}") else: - parts.append("%s" % option_string) + parts.append(str(option_string)) return ", ".join(parts) def _split_lines(self, text, width): @@ -371,7 +369,7 @@ def parse_args(args): "--version", action="version", help="Show program's version number and exit", - version="%(prog)s (version {version})".format(version=__version__), + version=f"%(prog)s (version {__version__})", ) # Copied from: can/logger.py @@ -493,7 +491,7 @@ def parse_args(args): ] = {} if parsed_args.decode: if os.path.isfile(parsed_args.decode[0]): - with open(parsed_args.decode[0], "r") as f: + with open(parsed_args.decode[0], "r", encoding="utf-8") as f: structs = f.readlines() else: structs = parsed_args.decode diff --git a/doc/conf.py b/doc/conf.py index 568a5641d..14390d5ad 100755 --- a/doc/conf.py +++ b/doc/conf.py @@ -25,7 +25,7 @@ # built documents. # # The short X.Y version. -version = can.__version__.split("-")[0] +version = can.__version__.split("-", maxsplit=1)[0] release = can.__version__ # General information about the project. diff --git a/examples/serial_com.py b/examples/serial_com.py index 60aeec4ce..1fbc997b2 100755 --- a/examples/serial_com.py +++ b/examples/serial_com.py @@ -42,7 +42,7 @@ def receive(bus, stop_event): while not stop_event.is_set(): rx_msg = bus.recv(1) if rx_msg is not None: - print("rx: {}".format(rx_msg)) + print(f"rx: {rx_msg}") print("Stopped receiving messages") diff --git a/requirements-lint.txt b/requirements-lint.txt index 1c4967679..9aefb7415 100644 --- a/requirements-lint.txt +++ b/requirements-lint.txt @@ -1,4 +1,4 @@ -pylint==2.7.4 -black==20.8b1 -mypy==0.812 +pylint==2.11.1 +black==21.10b0 +mypy==0.910 mypy-extensions==0.4.3 diff --git a/setup.py b/setup.py index 6e2dcd95a..31318ac06 100644 --- a/setup.py +++ b/setup.py @@ -15,12 +15,12 @@ logging.basicConfig(level=logging.WARNING) -with open("can/__init__.py", "r") as fd: +with open("can/__init__.py", "r", encoding="utf-8") as fd: version = re.search( r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', fd.read(), re.MULTILINE ).group(1) -with open("README.rst", "r") as f: +with open("README.rst", "r", encoding="utf-8") as f: long_description = f.read() # Dependencies diff --git a/test/test_socketcan_loopback.py b/test/test_socketcan_loopback.py index 9dce11dc5..17b83b268 100644 --- a/test/test_socketcan_loopback.py +++ b/test/test_socketcan_loopback.py @@ -13,7 +13,7 @@ @unittest.skipUnless(TEST_INTERFACE_SOCKETCAN, "skip testing of socketcan") class LocalLoopbackSocketCan(unittest.TestCase): - """ test local_loopback functionality""" + """test local_loopback functionality""" BITRATE = 500000 TIMEOUT = 0.1