Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion can/typechecking.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
StringPathLike = typing.Union[str, "os.PathLike[str]"]
AcceptedIOType = typing.Optional[typing.Union[FileLike, StringPathLike]]

BusConfig = typing.NewType("BusConfig", dict)
BusConfig = typing.NewType("BusConfig", typing.Dict[str, typing.Any])

AutoDetectedConfig = mypy_extensions.TypedDict(
"AutoDetectedConfig", {"interface": str, "channel": Channel}
Expand Down
89 changes: 43 additions & 46 deletions can/util.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
"""
Utilities and configuration file parsing.
"""

import functools
import warnings
from typing import Dict, Optional, Union
from typing import Any, Callable, cast, Dict, Iterable, Tuple, Optional, Union
from time import time, perf_counter, get_clock_info

from can import typechecking

import json
import os
import os.path
Expand All @@ -18,6 +16,7 @@

import can
from can.interfaces import VALID_INTERFACES
from can import typechecking

log = logging.getLogger("can.util")

Expand Down Expand Up @@ -89,9 +88,8 @@ def load_environment_config(context: Optional[str] = None) -> Dict[str, str]:
"bitrate": "CAN_BITRATE",
}

context_suffix = "_{}".format(context) if context else ""

can_config_key = "CAN_CONFIG" + context_suffix
context_suffix = f"_{context}" if context else ""
can_config_key = f"CAN_CONFIG{context_suffix}"
config: Dict[str, str] = json.loads(os.environ.get(can_config_key, "{}"))

for key, val in mapper.items():
Expand All @@ -104,7 +102,7 @@ def load_environment_config(context: Optional[str] = None) -> Dict[str, str]:

def load_config(
path: Optional[typechecking.AcceptedIOType] = None,
config=None,
config: Optional[Dict[str, Any]] = None,
context: Optional[str] = None,
) -> typechecking.BusConfig:
"""
Expand Down Expand Up @@ -158,16 +156,19 @@ def load_config(
config = {}

# use the given dict for default values
config_sources = [
given_config,
can.rc,
lambda _context: load_environment_config( # pylint: disable=unnecessary-lambda
_context
),
lambda _context: load_environment_config(),
lambda _context: load_file_config(path, _context),
lambda _context: load_file_config(path),
]
config_sources = cast(
Iterable[Union[Dict[str, Any], Callable[[Any], Dict[str, Any]]]],
[
given_config,
can.rc,
lambda _context: load_environment_config( # pylint: disable=unnecessary-lambda
_context
),
lambda _context: load_environment_config(),
lambda _context: load_file_config(path, _context),
lambda _context: load_file_config(path),
],
)

# Slightly complex here to only search for the file config if required
for cfg in config_sources:
Expand All @@ -189,9 +190,7 @@ def load_config(
config[key] = None

if config["interface"] not in VALID_INTERFACES:
raise NotImplementedError(
"Invalid CAN Bus Type - {}".format(config["interface"])
)
raise NotImplementedError(f'Invalid CAN Bus Type "{config["interface"]}"')

if "bitrate" in config:
config["bitrate"] = int(config["bitrate"])
Expand All @@ -216,30 +215,33 @@ def load_config(
timing_conf[key] = int(config[key], base=0)
del config[key]
if timing_conf:
timing_conf["bitrate"] = config.get("bitrate")
timing_conf["bitrate"] = config["bitrate"]
config["timing"] = can.BitTiming(**timing_conf)

can.log.debug("can config: {}".format(config))
return config
can.log.debug("can config: %s", config)

return cast(typechecking.BusConfig, config)


def set_logging_level(level_name: str) -> None:
"""Set the logging level for the `"can"` logger.

def set_logging_level(level_name: Optional[str] = None):
"""Set the logging level for the "can" logger.
Expects one of: 'critical', 'error', 'warning', 'info', 'debug', 'subdebug'
:param level_name: One of: `'critical'`, `'error'`, `'warning'`, `'info'`,
`'debug'`, `'subdebug'`, or the value `None` (=default). Defaults to `'debug'`.
"""
can_logger = logging.getLogger("can")

try:
can_logger.setLevel(getattr(logging, level_name.upper())) # type: ignore
can_logger.setLevel(getattr(logging, level_name.upper()))
except AttributeError:
can_logger.setLevel(logging.DEBUG)
log.debug("Logging set to {}".format(level_name))
log.debug("Logging set to %s", level_name)


def len2dlc(length: int) -> int:
"""Calculate the DLC from data length.

:param int length: Length in number of bytes (0-64)
:param length: Length in number of bytes (0-64)

:returns: DLC (0-15)
"""
Expand All @@ -261,20 +263,17 @@ def dlc2len(dlc: int) -> int:
return CAN_FD_DLC[dlc] if dlc <= 15 else 64


def channel2int(channel: Optional[Union[typechecking.Channel]]) -> Optional[int]:
def channel2int(channel: Optional[typechecking.Channel]) -> Optional[int]:
"""Try to convert the channel to an integer.

:param channel:
Channel string (e.g. can0, CAN1) or integer
Channel string (e.g. `"can0"`, `"CAN1"`) or an integer

:returns: Channel integer or `None` if unsuccessful
:returns: Channel integer or ``None`` if unsuccessful
"""
if channel is None:
return None
if isinstance(channel, int):
return channel
# String and byte objects have a lower() method
if hasattr(channel, "lower"):
if isinstance(channel, str):
match = re.match(r".*(\d+)$", channel)
if match:
return int(match.group(1))
Expand All @@ -299,35 +298,33 @@ def library_function(new_arg):
def deco(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
rename_kwargs(f.__name__, kwargs, aliases)
_rename_kwargs(f.__name__, kwargs, aliases)
return f(*args, **kwargs)

return wrapper

return deco


def rename_kwargs(func_name, kwargs, aliases):
def _rename_kwargs(
func_name: str, kwargs: Dict[str, str], aliases: Dict[str, str]
) -> None:
"""Helper function for `deprecated_args_alias`"""
for alias, new in aliases.items():
if alias in kwargs:
value = kwargs.pop(alias)
if new is not None:
warnings.warn(
"{} is deprecated; use {}".format(alias, new), DeprecationWarning
)
warnings.warn(f"{alias} is deprecated; use {new}", DeprecationWarning)
if new in kwargs:
raise TypeError(
"{} received both {} (deprecated) and {}".format(
func_name, alias, new
)
f"{func_name} received both {alias} (deprecated) and {new}"
)
kwargs[new] = value
else:
warnings.warn("{} is deprecated".format(alias), DeprecationWarning)


def time_perfcounter_correlation():
def time_perfcounter_correlation() -> Tuple[float, float]:
"""Get the `perf_counter` value nearest to when time.time() is updated

Computed if the default timer used by `time.time` on this platform has a resolution
Expand Down
6 changes: 3 additions & 3 deletions test/test_util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import unittest
import warnings

from can.util import rename_kwargs
from can.util import _rename_kwargs


class RenameKwargsTest(unittest.TestCase):
Expand All @@ -11,7 +11,7 @@ def _test(self, kwargs, aliases):

# Test that we do get the DeprecationWarning when called with deprecated kwargs
with self.assertWarnsRegex(DeprecationWarning, "is deprecated"):
rename_kwargs("unit_test", kwargs, aliases)
_rename_kwargs("unit_test", kwargs, aliases)

# Test that the aliases contains the deprecated values and
# the obsolete kwargs have been removed
Expand All @@ -23,7 +23,7 @@ def _test(self, kwargs, aliases):
# Cause all warnings to always be triggered.
warnings.simplefilter("error", DeprecationWarning)
try:
rename_kwargs("unit_test", kwargs, aliases)
_rename_kwargs("unit_test", kwargs, aliases)
finally:
warnings.resetwarnings()

Expand Down