Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions nixnet/_session/intf.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,13 @@ def out_strm_list_by_id(self):
in that it provides a list of frames for replay filtering. This
property provides an alternate format for you to specify the frames by
their CAN arbitration ID or LIN unprotected ID. The property's data
type is an array of unsigned 32-bit integer (U32). Each integer
represents a CAN or LIN frame's identifier, using the same encoding as
the Raw Frame Format.

Within each CAN frame ID value, bit 29 (hex 20000000) indicates the CAN
identifier format (set for extended, clear for standard). If bit 29 is
clear, the lower 11 bits (0-10) contain the CAN frame identifier. If
bit 29 is set, the lower 29 bits (0-28) contain the CAN frame
identifier. LIN frame ID values may be within the range of possible LIN
type is an array of integers. Each integer represents a CAN or LIN
frame's identifier, using the same encoding as :any:`nixnet.types.RawFrame`.

For CAN Frames, see :any:`nixnet.types.CanIdentifier` for parsing and
generating raw identifiers.

LIN frame ID values may be within the range of possible LIN
IDs (0-63).

See also :any:`Interface.out_strm_list`.
Expand Down
135 changes: 107 additions & 28 deletions nixnet/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
__all__ = [
'DriverVersion',
'CanComm',
'CanIdentifier',
'FrameFactory',
'Frame',
'RawFrame',
Expand Down Expand Up @@ -72,6 +73,96 @@ class CanComm(CanComm_):
pass


class CanIdentifier(object):
"""CAN frame arbitration identifier.

Attributes:
identifier(int): CAN frame arbitration identifier
extended(bool): If the identifier is extended
"""

_FRAME_ID_MASK = 0x000003FF
_EXTENDED_FRAME_ID_MASK = 0x1FFFFFFF

def __init__(self, identifier, extended=False):
# type: (int, bool) -> None
self.identifier = identifier
self.extended = extended

@classmethod
def from_raw(cls, raw):
# type: (int) -> CanIdentifier
"""Parse a raw frame identifier into a CanIdentifier

Args:
raw(int): A raw frame identifier

Returns:
CanIdentifier: parsed value

>>> CanIdentifier.from_raw(0x1)
CanIdentifier(0x1)
>>> CanIdentifier.from_raw(0x20000001)
CanIdentifier(0x1, extended=True)
"""
extended = bool(raw & _cconsts.NX_FRAME_ID_CAN_IS_EXTENDED)
if extended:
identifier = raw & cls._EXTENDED_FRAME_ID_MASK
else:
identifier = raw & cls._FRAME_ID_MASK
return cls(identifier, extended)

def __int__(self):
"""Convert CanIdentifier into a raw frame identifier

>>> hex(int(CanIdentifier(1)))
'0x1'
>>> hex(int(CanIdentifier(1, True)))
'0x20000001'
"""
identifier = self.identifier
if self.extended:
if identifier != (identifier & self._EXTENDED_FRAME_ID_MASK):
_errors.check_for_error(_cconsts.NX_ERR_UNDEFINED_FRAME_ID)
identifier |= _cconsts.NX_FRAME_ID_CAN_IS_EXTENDED
else:
if identifier != (identifier & self._FRAME_ID_MASK):
_errors.check_for_error(_cconsts.NX_ERR_UNDEFINED_FRAME_ID)
return identifier

def __eq__(self, other):
if isinstance(other, CanIdentifier):
other_id = typing.cast(CanIdentifier, other)
return all((
self.identifier == other_id.identifier,
self.extended == other_id.extended))
else:
return NotImplemented

def __ne__(self, other):
result = self.__eq__(other)
if result is NotImplemented:
return result
else:
return not result

def __repr__(self):
"""CanIdentifier debug representation.

>>> CanIdentifier(1)
CanIdentifier(0x1)
>>> CanIdentifier(1, True)
CanIdentifier(0x1, extended=True)
"""
if self.extended:
return "CanIdentifier(0x{:x}, extended={})".format(
self.identifier,
self.extended)
else:
return "CanIdentifier(0x{:x})".format(
self.identifier)


@six.add_metaclass(abc.ABCMeta)
class FrameFactory(object):
"""ABC for creating :any:`nixnet.types.Frame` objects."""
Expand Down Expand Up @@ -125,7 +216,7 @@ class RawFrame(Frame):

Attributes:
timestamp(int): Absolute time the XNET interface received the end-of-frame.
identifier(int): CAN frame arbitration identifier.
identifier(int): Frame identifier.
type(:any:`nixnet._enums.FrameType`): Frame type.
flags(int): Flags that qualify the type.
info(int): Info that qualify the type.
Expand Down Expand Up @@ -194,8 +285,7 @@ class CanFrame(Frame):
"""CAN Frame.

Attributes:
identifier(int): CAN frame arbitration identifier.
extended(bool): If the identifier uses an extended format.
identifier(:any:`nixnet.types.CanIdentifier`): CAN frame arbitration identifier.
echo(bool): If the frame is an echo of a successful
transmit rather than being received from the network.
type(:any:`nixnet._enums.FrameType`): Frame type.
Expand All @@ -205,7 +295,6 @@ class CanFrame(Frame):

__slots__ = [
"identifier",
"extended",
"echo",
"_type",
"timestamp",
Expand All @@ -214,10 +303,12 @@ class CanFrame(Frame):
_FRAME_ID_MASK = 0x000003FF
_EXTENDED_FRAME_ID_MASK = 0x1FFFFFFF

def __init__(self, identifier, extended, type, payload=b""):
# type: (int, bool, constants.FrameType, bytes) -> None
self.identifier = identifier
self.extended = extended
def __init__(self, identifier, type, payload=b""):
# type: (typing.Union[CanIdentifier, int], constants.FrameType, bytes) -> None
if isinstance(identifier, int):
self.identifier = CanIdentifier(identifier)
else:
self.identifier = identifier
self.echo = False # Used only for Read
self._type = type
self.timestamp = 0 # Used only for Read
Expand All @@ -229,32 +320,21 @@ def from_raw(cls, frame):

>>> raw = RawFrame(5, 0x20000001, constants.FrameType.CAN_DATA, _cconsts.NX_FRAME_FLAGS_TRANSMIT_ECHO, 0, b'')
>>> CanFrame.from_raw(raw)
CanFrame(identifier=0x1, echo=True, type=FrameType.CAN_DATA, timestamp=0x5, payload=...)
CanFrame(CanIdentifier(0x1, extended=True), echo=True, type=FrameType.CAN_DATA, timestamp=0x5, payload=...)
"""
extended = bool(frame.identifier & _cconsts.NX_FRAME_ID_CAN_IS_EXTENDED)
if extended:
identifier = frame.identifier & cls._EXTENDED_FRAME_ID_MASK
else:
identifier = frame.identifier & cls._FRAME_ID_MASK
can_frame = CanFrame(identifier, extended, constants.FrameType(frame.type), frame.payload)
identifier = CanIdentifier.from_raw(frame.identifier)
can_frame = CanFrame(identifier, constants.FrameType(frame.type), frame.payload)
can_frame.timestamp = frame.timestamp
can_frame.echo = bool(frame.flags & _cconsts.NX_FRAME_FLAGS_TRANSMIT_ECHO)
return can_frame

def to_raw(self):
"""Convert to RawFrame.

>>> CanFrame(1, True, constants.FrameType.CAN_DATA).to_raw()
>>> CanFrame(CanIdentifier(1, True), constants.FrameType.CAN_DATA).to_raw()
RawFrame(timestamp=0x0, identifier=0x20000001, type=FrameType.CAN_DATA, flags=0x0, info=0x0, payload=...)
"""
identifier = self.identifier
if self.extended:
if identifier != (identifier & self._EXTENDED_FRAME_ID_MASK):
_errors.check_for_error(_cconsts.NX_ERR_UNDEFINED_FRAME_ID)
identifier |= _cconsts.NX_FRAME_ID_CAN_IS_EXTENDED
else:
if identifier != (identifier & self._FRAME_ID_MASK):
_errors.check_for_error(_cconsts.NX_ERR_UNDEFINED_FRAME_ID)
identifier = int(self.identifier)
flags = 0
if self.echo:
flags |= _cconsts.NX_FRAME_FLAGS_TRANSMIT_ECHO
Expand All @@ -269,7 +349,6 @@ def __eq__(self, other):
other_frame = typing.cast(CanFrame, other)
return all((
self.identifier == other_frame.identifier,
self.extended == other_frame.extended,
self.echo == other_frame.echo,
self.type == other_frame.type,
self.timestamp == other_frame.timestamp,
Expand All @@ -281,10 +360,10 @@ def __repr__(self):
# type: () -> typing.Text
"""CanFrame debug representation.

>>> CanFrame(1, True, constants.FrameType.CAN_DATA)
CanFrame(identifier=0x1, echo=False, type=FrameType.CAN_DATA, timestamp=0x0, payload=...)
>>> CanFrame(1, constants.FrameType.CAN_DATA)
CanFrame(CanIdentifier(0x1), echo=False, type=FrameType.CAN_DATA, timestamp=0x0, payload=...)
"""
return "CanFrame(identifier=0x{:x}, echo={}, type={}, timestamp=0x{:x}, payload=...)".format(
return "CanFrame({}, echo={}, type={}, timestamp=0x{:x}, payload=...)".format(
self.identifier,
self.echo,
self.type,
Expand Down
5 changes: 2 additions & 3 deletions nixnet_examples/can_frame_queued_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,9 @@ def main():
payload_list = [2, 4, 8, 16]
print('Unrecognized input ({}). Setting data buffer to {}', user_value, payload_list)

id = 0
extended = False
id = types.CanIdentifier(0)
payload = bytearray(payload_list)
frame = types.CanFrame(id, extended, constants.FrameType.CAN_DATA, payload)
frame = types.CanFrame(id, constants.FrameType.CAN_DATA, payload)

i = 0
while True:
Expand Down
5 changes: 2 additions & 3 deletions nixnet_examples/can_frame_stream_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ def main():
payload_list = [2, 4, 8, 16]
print('Unrecognized input ({}). Setting data buffer to {}', user_value, payload_list)

id = 0
extended = False
id = types.CanIdentifier(0)
payload = bytearray(payload_list)
frame = types.CanFrame(id, extended, constants.FrameType.CAN_DATA, payload)
frame = types.CanFrame(id, constants.FrameType.CAN_DATA, payload)

print('The same values should be received. Press q to quit')
i = 0
Expand Down
9 changes: 4 additions & 5 deletions tests/test_frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ def test_serialize_frame_with_base_payload():

def assert_can_frame(index, sent, received):
assert sent.identifier == received.identifier
assert sent.extended == received.extended
assert sent.echo == received.echo
assert sent.type == received.type
assert sent.payload == received.payload
Expand All @@ -100,7 +99,7 @@ def test_stream_loopback(nixnet_in_interface, nixnet_out_interface):

payload_list = [2, 4, 8, 16]
expected_frames = [
types.CanFrame(0, False, constants.FrameType.CAN_DATA, bytes(bytearray(payload_list)))]
types.CanFrame(0, constants.FrameType.CAN_DATA, bytes(bytearray(payload_list)))]
output_session.frames.write(expected_frames)

# Wait 1 s and then read the received values.
Expand Down Expand Up @@ -135,7 +134,7 @@ def test_queued_loopback(nixnet_in_interface, nixnet_out_interface):

payload_list = [2, 4, 8, 16]
expected_frames = [
types.CanFrame(66, False, constants.FrameType.CAN_DATA, bytes(bytearray(payload_list)))]
types.CanFrame(66, constants.FrameType.CAN_DATA, bytes(bytearray(payload_list)))]
output_session.frames.write(expected_frames)

# Wait 1 s and then read the received values.
Expand Down Expand Up @@ -171,8 +170,8 @@ def test_singlepoint_loopback(nixnet_in_interface, nixnet_out_interface):
first_payload_list = [2, 4, 8, 16]
second_payload_list = [1, 3]
expected_frames = [
types.CanFrame(66, False, constants.FrameType.CAN_DATA, bytes(bytearray(first_payload_list))),
types.CanFrame(67, False, constants.FrameType.CAN_DATA, bytes(bytearray(second_payload_list)))]
types.CanFrame(66, constants.FrameType.CAN_DATA, bytes(bytearray(first_payload_list))),
types.CanFrame(67, constants.FrameType.CAN_DATA, bytes(bytearray(second_payload_list)))]
output_session.frames.write(expected_frames)

# Wait 1 s and then read the received values.
Expand Down