Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 16 additions & 10 deletions nixnet/_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,28 @@

from nixnet import _cconsts
from nixnet import _cfuncs
from nixnet import _ctypedefs
from nixnet import errors


def check_for_error(error_code):
if error_code & _cconsts.NX_STATUS_ERROR:
buffer_size = 2048
error_buffer = ctypes.create_string_buffer(buffer_size)
raise_xnet_error(error_code)
elif error_code != _cconsts.NX_SUCCESS:
status = status_to_string(error_code)
warnings.warn(errors.XnetWarning(status, error_code))

_cfuncs.lib.nx_status_to_string(error_code, buffer_size, error_buffer)

raise errors.XnetError(error_buffer.value.decode("ascii"), error_code)
elif error_code != _cconsts.NX_SUCCESS:
buffer_size = 2048
error_buffer = ctypes.create_string_buffer(buffer_size)
def raise_xnet_error(error_code):
status = status_to_string(error_code)
raise errors.XnetError(status, error_code)

_cfuncs.lib.nx_status_to_string(error_code, buffer_size, error_buffer)

warnings.warn(errors.XnetWarning(
error_buffer.value.decode("ascii"), error_code))
def status_to_string(status_code):
buffer_size = 2048
buffer_size_ctypes = _ctypedefs.u32(buffer_size)
buffer_ctypes = ctypes.create_string_buffer(buffer_size)
status_code_ctypes = _ctypedefs.nxStatus_t(status_code)
_cfuncs.lib.nx_status_to_string(status_code_ctypes, buffer_size_ctypes, buffer_ctypes)
status_string = buffer_ctypes.value.decode("ascii")
return status_string
2 changes: 1 addition & 1 deletion nixnet/_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ def nx_read_state(
state_size, # type: int
state_value_ctypes_ptr, # type: typing.Any
):
# type: (...) -> typing.Tuple[typing.Any, int]
# type: (...) -> int
session_ref_ctypes = _ctypedefs.nxSessionRef_t(session_ref)
state_id_ctypes = _ctypedefs.u32(state_id.value)
state_size_ctypes = _ctypedefs.u32(state_size)
Expand Down
12 changes: 5 additions & 7 deletions nixnet/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@


def flatten_items(list):
# (typing.Union[typing.Text, typing.List[typing.Text]]) -> typing.Text
# type: (typing.Union[typing.Text, typing.List[typing.Text]]) -> typing.Text
"""Flatten an item list to a string

>>> str(flatten_items('Item'))
Expand All @@ -28,23 +28,21 @@ def flatten_items(list):
# For FRAME_IN_QUEUED / FRAME_OUT_QUEUED
# Convenience for everything else
if ',' in list:
# A bit of an abuse of an error code
_errors.check_for_error(_cconsts.NX_ERR_INVALID_PROPERTY_VALUE)
_errors.raise_xnet_error(_cconsts.NX_ERR_INVALID_PROPERTY_VALUE)
flattened = list
elif isinstance(list, collections.Iterable):
flattened = ",".join(list)
elif list is None:
# For FRAME_IN_STREAM / FRAME_OUT_STREAM
flattened = ''
else:
# A bit of an abuse of an error code
_errors.check_for_error(_cconsts.NX_ERR_INVALID_PROPERTY_VALUE)
_errors.raise_xnet_error(_cconsts.NX_ERR_INVALID_PROPERTY_VALUE)

return flattened


def parse_can_comm_bitfield(bitfield):
# (int) -> types.CanComm
# type: (int) -> types.CanComm
"""Parse a CAN Comm bitfield."""
state = constants.CanCommState(bitfield & 0x0F)
tcvr_err = ((bitfield >> 4) & 0x01) != 0
Expand All @@ -56,7 +54,7 @@ def parse_can_comm_bitfield(bitfield):


def parse_lin_comm_bitfield(first, second):
# (int) -> types.CanComm
# type: (int, int) -> types.LinComm

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

"""Parse a LIN Comm first."""
sleep = ((first >> 1) & 0x01) != 0
state = constants.LinCommState((first >> 2) & 0x03)
Expand Down
36 changes: 18 additions & 18 deletions nixnet/database/_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import typing # NOQA: F401

from nixnet import _cconsts
from nixnet import _errors
from nixnet import _funcs
from nixnet import _props
from nixnet import constants
Expand Down Expand Up @@ -53,6 +54,23 @@ def __hash__(self):
def __repr__(self):
return '{}(handle={})'.format(type(self).__name__, self._handle)

def check_config_status(self):
# type: () -> None
"""Check this cluster's configuration status.

By default, incorrectly configured clusters in the database are not returned from
:any:`Database.clusters` because they cannot be used in the bus communication.
You can change this behavior by setting :any:`Database.show_invalid_from_open` to `True`.
When a cluster configuration status becomes invalid after the database is opened,
the cluster still is returned from :any:`Database.clusters`
even if :any:`Database.show_invalid_from_open` is `False`.

Raises:
XnetError: The cluster is incorrectly configured.
"""
status_code = _props.get_cluster_config_status(self._handle)
_errors.check_for_error(status_code)

def export(self, db_filepath):
# type: (typing.Text) -> None
"""Exports this cluster to a CANdb++ or LIN database file format.
Expand Down Expand Up @@ -180,24 +198,6 @@ def comment(self, value):
# type: (typing.Text) -> None
_props.set_cluster_comment(self._handle, value)

@property
def config_status(self):
# type: () -> int
"""int: Returns the cluster object configuration status.

Configuration Status returns an NI-XNET error code.
You can pass the value to the `nxStatusToString` function to
convert the value to a text description of the configuration problem.

By default, incorrectly configured clusters in the database are not returned from
:any:`Database.clusters` because they cannot be used in the bus communication.
You can change this behavior by setting :any:`Database.show_invalid_from_open` to ``True``.
When the configuration status of a cluster becomes invalid after the database has been opened,
the cluster still is returned from :any:`Database.clusters` even if
:any:`Database.show_invalid_from_open` to ``False``.
"""
return _props.get_cluster_config_status(self._handle)

@property
def database_ref(self):
# type: () -> int
Expand Down
36 changes: 18 additions & 18 deletions nixnet/database/_ecu.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import typing # NOQA: F401

from nixnet import _errors
from nixnet import _props
from nixnet import constants

Expand Down Expand Up @@ -39,6 +40,23 @@ def __hash__(self):
def __repr__(self):
return '{}(handle={})'.format(type(self).__name__, self._handle)

def check_config_status(self):
# type: () -> None
"""Check this ECU's configuration status.

By default, incorrectly configured ECUs in the database are not returned from
:any:`Cluster.ecus` because they cannot be used in the bus communication.
You can change this behavior by setting :any:`Database.show_invalid_from_open` to `True`.
When an ECU configuration status becomes invalid after the database is opened,
the ECU still is returned from :any:`Cluster.ecus`
even if :any:`Database.show_invalid_from_open` is `False`.

Raises:
XnetError: The ECU is incorrectly configured.
"""
status_code = _props.get_ecu_config_status(self._handle)
_errors.check_for_error(status_code)

@property
def clst(self):
# type: () -> _cluster.Cluster
Expand All @@ -64,24 +82,6 @@ def comment(self, value):
# type: (typing.Text) -> None
_props.set_ecu_comment(self._handle, value)

@property
def config_status(self):
# type: () -> int
"""int: Returns the ECU object configuration status.

Configuration Status returns an NI-XNET error code.
You can pass the value to the `nxStatusToString` function to
convert the value to a text description of the configuration problem.

By default, incorrectly configured ECUs in the database are not returned from
:any:`Cluster.ecus` because they cannot be used in the bus communication.
You can change this behavior by setting :any:`Database.show_invalid_from_open` to ``True``.
When the configuration status of a ECU becomes invalid after opening the database,
the ECU still is returned from :any:`Cluster.ecus`
even if :any:`Database.show_invalid_from_open` is ``False``.
"""
return _props.get_ecu_config_status(self._handle)

@property
def dbc_attributes(self):
# type: () -> _dbc_attributes.DbcAttributeCollection
Expand Down
41 changes: 20 additions & 21 deletions nixnet/database/_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,23 @@ def __hash__(self):
def __repr__(self):
return '{}(handle={})'.format(type(self).__name__, self._handle)

def check_config_status(self):
# type: () -> None
"""Check this frame's configuration status.

By default, incorrectly configured frames in the database are not returned from
:any:`Cluster.frames` because they cannot be used in the bus communication.
You can change this behavior by setting :any:`Database.show_invalid_from_open` to `True`.
When a frame configuration status becomes invalid after the database is opened,
the frame still is returned from :any:`Cluster.frames`
even if :any:`Database.show_invalid_from_open` is `False`.

Raises:
XnetError: The frame is incorrectly configured.
"""
status_code = _props.get_frame_config_status(self._handle)
_errors.check_for_error(status_code)

@property
def application_protocol(self):
# type: () -> constants.AppProtocol
Expand Down Expand Up @@ -82,24 +99,6 @@ def comment(self, value):
# type: (typing.Text) -> None
_props.set_frame_comment(self._handle, value)

@property
def config_status(self):
# type: () -> int
"""int: Returns the frame object configuration status.

Configuration Status returns an NI-XNET error code.
You can pass the value to the `nxStatusToString` function to
convert the value to a text description of the configuration problem.

By default, incorrectly configured frames in the database are not returned from
:any:`Cluster.frames` because they cannot be used in the bus communication.
You can change this behavior by setting :any:`Database.show_invalid_from_open` to ``True``.
When the configuration status of a frames becomes invalid after opening the database,
the frame still is returned from :any:`Cluster.frames`
even if :any:`Database.show_invalid_from_open` is ``False``.
"""
return _props.get_frame_config_status(self._handle)

@property
def default_payload(self):
# type: () -> typing.Iterable[int]
Expand Down Expand Up @@ -522,8 +521,8 @@ def mux_data_mux_sig(self):
"""
ref = _props.get_frame_mux_data_mux_sig_ref(self._handle)
if ref == 0:
# A bit of an abuse of errors
_errors.check_for_error(_cconsts.NX_ERR_SIGNAL_NOT_FOUND)
_errors.raise_xnet_error(_cconsts.NX_ERR_SIGNAL_NOT_FOUND)

return _signal.Signal(ref)

@property
Expand Down Expand Up @@ -583,7 +582,7 @@ def pdus(self):

For CAN and LIN, NI-XNET supports only a one-to-one relationship between frames and PDUs.
For those interfaces, advanced PDU configuration returns
an error from the :any:`Frame.config_status` property and when creating a session.
raises an exception when calling :any:`Frame.check_config_status` and when creating a session.
If you do not use advanced PDU configuration,
you can avoid using PDUs in the database API
and create signals and subframes directly on a frame.
Expand Down
39 changes: 18 additions & 21 deletions nixnet/database/_lin_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import typing # NOQA: F401

from nixnet import _cconsts
from nixnet import _errors
from nixnet import _props
from nixnet import constants

Expand Down Expand Up @@ -44,6 +45,23 @@ def __hash__(self):
def __repr__(self):
return '{}(handle={})'.format(type(self).__name__, self._handle)

def check_config_status(self):
# type: () -> None
"""Check this LIN schedule's configuration status.

By default, incorrectly configured schedules in the database are not returned from
:any:`Cluster.lin_schedules` because they cannot be used in the bus communication.
You can change this behavior by setting :any:`Database.show_invalid_from_open` to `True`.
When a schedule configuration status becomes invalid after the database is opened,
the schedule still is returned from :any:`Cluster.lin_schedules`
even if :any:`Database.show_invalid_from_open` is `False`.

Raises:
XnetError: The LIN schedule is incorrectly configured.
"""
status_code = _props.get_lin_sched_config_status(self._handle)
_errors.check_for_error(status_code)

@property
def clst(self):
# type: () -> _cluster.Cluster
Expand All @@ -68,27 +86,6 @@ def comment(self, value):
# type: (typing.Text) -> None
_props.set_lin_sched_comment(self._handle, value)

@property
def config_status(self):
# type: () -> int
"""int: Returns the LIN schedule object configuration status.

Configuration Status returns an NI-XNET error code.
You can pass the value to the `nxStatusToString` function to
convert the value to a text description of the configuration problem.

By default, incorrectly configured schedules in the database are not returned from
:any:`Cluster.lin_schedules` because they cannot be used in the bus communication.
You can change this behavior by setting :any:`Database.show_invalid_from_open` to ``True``.
When the configuration status of a schedule becomes invalid after opening the database,
the schedule still is returned from :any:`Cluster.lin_schedules`
even if :any:`Database.show_invalid_from_open` is ``False``.

An example of invalid schedule configuration is when a required schedule property has not been defined.
For example, a schedule entry within this schedule has an undefined delay time.
"""
return _props.get_lin_sched_config_status(self._handle)

@property
def entries(self):
# type: () -> _collection.DbCollection
Expand Down
4 changes: 2 additions & 2 deletions nixnet/database/_lin_sched_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ def collision_res_sched(self):
"""
handle = _props.get_lin_sched_entry_collision_res_sched(self._handle)
if handle == 0:
# A bit of an abuse of errors
_errors.check_for_error(_cconsts.NX_ERR_DATABASE_OBJECT_NOT_FOUND)
_errors.raise_xnet_error(_cconsts.NX_ERR_DATABASE_OBJECT_NOT_FOUND)

return _lin_sched.LinSched(handle)

@collision_res_sched.setter
Expand Down
Loading