Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions can/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
:class:`ValueError`. This should always be documented for the function at hand.
"""


from contextlib import contextmanager

from typing import Optional
from typing import Type


class CanError(Exception):
Expand Down Expand Up @@ -96,3 +100,18 @@ class CanTimeoutError(CanError, TimeoutError):
- Some message could not be sent after the timeout elapsed
- No message was read within the given time
"""


@contextmanager
def error_check(
error_message: Optional[str] = None,
exception_type: Type[CanError] = CanOperationError,
) -> None:
"""Catches any exceptions and turns them into the new type while preserving the stack trace."""
try:
yield
except Exception as error:
if error_message is None:
raise exception_type(str(error)) from error
else:
raise exception_type(error_message) from error
Comment on lines +105 to +117
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

108 changes: 63 additions & 45 deletions can/interfaces/cantact.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,20 @@
from unittest.mock import Mock

from can import BusABC, Message
from ..exceptions import (
CanInitializationError,
CanInterfaceNotImplementedError,
error_check,
)

logger = logging.getLogger(__name__)

try:
import cantact
except ImportError:
cantact = None
logger.warning(
"The CANtact module is not installed. Install it using `python3 -m pip install cantact`"
"The CANtact module is not installed. Install it using `python -m pip install cantact`"
)


Expand All @@ -25,8 +31,10 @@ class CantactBus(BusABC):
def _detect_available_configs():
try:
interface = cantact.Interface()
except (NameError, SystemError):
# couldn't import cantact, so no configurations are available
except (NameError, SystemError, AttributeError):
logger.debug(
"Could not import or instantiate cantact, so no configurations are available"
)
return []

channels = []
Expand All @@ -42,7 +50,7 @@ def __init__(
monitor=False,
bit_timing=None,
_testing=False,
**kwargs
**kwargs,
):
"""
:param int channel:
Expand All @@ -58,36 +66,45 @@ def __init__(
if _testing:
self.interface = MockInterface()
else:
self.interface = cantact.Interface()
if cantact is None:
raise CanInterfaceNotImplementedError(
"The CANtact module is not installed. Install it using `python -m pip install cantact`"
)
with error_check(
"Cannot create the cantact.Interface", CanInitializationError
):
self.interface = cantact.Interface()

self.channel = int(channel)
self.channel_info = "CANtact: ch:%s" % channel

# configure the interface
if bit_timing is None:
# use bitrate
self.interface.set_bitrate(int(channel), int(bitrate))
else:
# use custom bit timing
self.interface.set_bit_timing(
int(channel),
int(bit_timing.brp),
int(bit_timing.tseg1),
int(bit_timing.tseg2),
int(bit_timing.sjw),
)
self.interface.set_enabled(int(channel), True)
self.interface.set_monitor(int(channel), monitor)
self.interface.start()
self.channel_info = f"CANtact: ch:{channel}"

# Configure the interface
with error_check("Cannot setup the cantact.Interface", CanInitializationError):
if bit_timing is None:
# use bitrate
self.interface.set_bitrate(int(channel), int(bitrate))
else:
# use custom bit timing
self.interface.set_bit_timing(
int(channel),
int(bit_timing.brp),
int(bit_timing.tseg1),
int(bit_timing.tseg2),
int(bit_timing.sjw),
)
self.interface.set_enabled(int(channel), True)
self.interface.set_monitor(int(channel), monitor)
self.interface.start()

super().__init__(
channel=channel, bitrate=bitrate, poll_interval=poll_interval, **kwargs
)

def _recv_internal(self, timeout):
frame = self.interface.recv(int(timeout * 1000))
with error_check("Cannot receive message"):
frame = self.interface.recv(int(timeout * 1000))
if frame is None:
# timeout occured
# timeout occurred
return None, False

msg = Message(
Expand All @@ -103,31 +120,33 @@ def _recv_internal(self, timeout):
return msg, False

def send(self, msg, timeout=None):
self.interface.send(
self.channel,
msg.arbitration_id,
bool(msg.is_extended_id),
bool(msg.is_remote_frame),
msg.dlc,
msg.data,
)
with error_check("Cannot send message"):
self.interface.send(
self.channel,
msg.arbitration_id,
bool(msg.is_extended_id),
bool(msg.is_remote_frame),
msg.dlc,
msg.data,
)

def shutdown(self):
self.interface.stop()
with error_check("Cannot shutdown interface"):
self.interface.stop()


def mock_recv(timeout):
if timeout > 0:
frame = {}
frame["id"] = 0x123
frame["extended"] = False
frame["timestamp"] = time.time()
frame["loopback"] = False
frame["rtr"] = False
frame["dlc"] = 8
frame["data"] = [1, 2, 3, 4, 5, 6, 7, 8]
frame["channel"] = 0
return frame
return {
"id": 0x123,
"extended": False,
"timestamp": time.time(),
"loopback": False,
"rtr": False,
"dlc": 8,
"data": [1, 2, 3, 4, 5, 6, 7, 8],
"channel": 0,
}
else:
# simulate timeout when timeout = 0
return None
Expand All @@ -144,7 +163,6 @@ class MockInterface:
set_bit_timing = Mock()
set_enabled = Mock()
set_monitor = Mock()
start = Mock()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was duplicated

stop = Mock()
send = Mock()
channel_count = Mock(return_value=1)
Expand Down
10 changes: 1 addition & 9 deletions can/interfaces/usb2can/usb2canabstractionlayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from ctypes import *
from enum import IntEnum
import logging
from contextlib import contextmanager

import can
from ...exceptions import error_check

log = logging.getLogger("can.usb2can")

Expand Down Expand Up @@ -102,14 +102,6 @@ class CanalMsg(Structure):
]


@contextmanager
def error_check(error_message: str) -> None:
try:
yield
except Exception as error:
raise can.CanOperationError(error_message) from error


class Usb2CanAbstractionLayer:
"""A low level wrapper around the usb2can library.

Expand Down