diff --git a/docs/convert.rst b/docs/convert.rst new file mode 100644 index 00000000..c0378caf --- /dev/null +++ b/docs/convert.rst @@ -0,0 +1,14 @@ +nixnet.convert +============== + +.. automodule:: nixnet.convert + :members: + :show-inheritance: + :inherited-members: + +.. toctree:: + :maxdepth: 3 + :caption: API Reference: + + signals + j1939 diff --git a/docs/index.rst b/docs/index.rst index f189ecda..d5939651 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -13,6 +13,7 @@ NI-XNET Python Documentation :caption: API Reference: session + convert constants types errors diff --git a/nixnet/_funcs.py b/nixnet/_funcs.py index 2b4484a0..ef7f0ad3 100644 --- a/nixnet/_funcs.py +++ b/nixnet/_funcs.py @@ -246,17 +246,16 @@ def nx_write_signal_xy( def nx_convert_frames_to_signals_single_point( session_ref, # type: int frame_buffer, # type: bytes - value_buffer, # type: typing.List[float] - timestamp_buffer, # type: typing.List[int] + num_signals, # type: int ): - # type: (...) -> None + # type: (...) -> typing.Tuple[typing.List[_ctypedefs.nxTimestamp_t], typing.List[_ctypedefs.f64]] session_ref_ctypes = _ctypedefs.nxSessionRef_t(session_ref) frame_buffer_ctypes = (_ctypedefs.byte * len(frame_buffer))(*frame_buffer) # type: ignore size_of_frame_buffer_ctypes = _ctypedefs.u32(len(frame_buffer) * _ctypedefs.byte.BYTES) - value_buffer_ctypes = (_ctypedefs.f64 * len(value_buffer))(*value_buffer) # type: ignore - size_of_value_buffer_ctypes = _ctypedefs.u32(len(value_buffer) * _ctypedefs.f64.BYTES) - timestamp_buffer_ctypes = (_ctypedefs.nxTimestamp_t * len(timestamp_buffer))(*timestamp_buffer) # type: ignore - size_of_timestamp_buffer_ctypes = _ctypedefs.u32(len(timestamp_buffer) * _ctypedefs.nxTimestamp_t.BYTES) + value_buffer_ctypes = (_ctypedefs.f64 * num_signals)() # type: ignore + size_of_value_buffer_ctypes = _ctypedefs.u32(_ctypedefs.f64.BYTES * num_signals) + timestamp_buffer_ctypes = (_ctypedefs.nxTimestamp_t * num_signals)() # type: ignore + size_of_timestamp_buffer_ctypes = _ctypedefs.u32(_ctypedefs.nxTimestamp_t.BYTES * num_signals) result = _cfuncs.lib.nx_convert_frames_to_signals_single_point( session_ref_ctypes, frame_buffer_ctypes, @@ -267,19 +266,20 @@ def nx_convert_frames_to_signals_single_point( size_of_timestamp_buffer_ctypes, ) _errors.check_for_error(result.value) + return timestamp_buffer_ctypes, value_buffer_ctypes def nx_convert_signals_to_frames_single_point( session_ref, # type: int value_buffer, # type: typing.List[float] - buffer, # type: bytes + bytes_to_read, # type: int ): - # type: (...) -> int + # type: (...) -> typing.Tuple[bytes, int] session_ref_ctypes = _ctypedefs.nxSessionRef_t(session_ref) value_buffer_ctypes = (_ctypedefs.f64 * len(value_buffer))(*value_buffer) # type: ignore size_of_value_buffer_ctypes = _ctypedefs.u32(len(value_buffer) * _ctypedefs.f64.BYTES) - buffer_ctypes = (_ctypedefs.byte * len(buffer))(*buffer) # type: ignore - size_of_buffer_ctypes = _ctypedefs.u32(len(buffer) * _ctypedefs.byte.BYTES) + buffer_ctypes = (_ctypedefs.byte * bytes_to_read)() # type: ignore + size_of_buffer_ctypes = _ctypedefs.u32(_ctypedefs.byte.BYTES * bytes_to_read) number_of_bytes_returned_ctypes = _ctypedefs.u32() result = _cfuncs.lib.nx_convert_signals_to_frames_single_point( session_ref_ctypes, @@ -290,7 +290,7 @@ def nx_convert_signals_to_frames_single_point( ctypes.pointer(number_of_bytes_returned_ctypes), ) _errors.check_for_error(result.value) - return number_of_bytes_returned_ctypes.value + return buffer_ctypes.raw, number_of_bytes_returned_ctypes.value def nx_blink( diff --git a/nixnet/convert.py b/nixnet/convert.py new file mode 100644 index 00000000..b1242066 --- /dev/null +++ b/nixnet/convert.py @@ -0,0 +1,252 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import itertools +import typing # NOQA: F401 +import warnings + +from nixnet import _frames +from nixnet import _funcs +from nixnet import _props +from nixnet import _utils +from nixnet import constants +from nixnet import errors +from nixnet import types + +from nixnet._session import j1939 as session_j1939 +from nixnet._session import signals as session_signals + + +__all__ = [ + "SignalConversionSinglePointSession"] + + +class SignalConversionSinglePointSession(object): + """Convert NI-XNET signal data to frame data or vice versa. + + Conversion works similar to Single-Point mode. You specify a set of signals + that can span multiple frames. Signal to frame conversion reads a set of + values for the signals specified and writes them to the respective + frame(s). Frame to signal conversion parses a set of frames and returns the + latest signal value read from a corresponding frame. + """ + + def __init__( + self, + database_name, # type: typing.Text + cluster_name, # type: typing.Text + signals, # type: typing.Union[typing.Text, typing.List[typing.Text]] + ): + # type: (...) -> None + """Create an XNET session at run time using named references to database objects. + + Args: + database_name(str): XNET database name to use for + interface configuration. The database name must use the + or syntax (refer to Databases). + cluster_name(str): XNET cluster name to use for + interface configuration. The name must specify a cluster from + the database given in the database_name parameter. If it is left + blank, the cluster is extracted from the ``signals`` parameter. + signals(list of str): Strings describing signals for the session. The + list syntax is as follows: + + ``signals`` contains one or more XNET Signal names. Each name must + be one of the following options, whichever uniquely + identifies a signal within the database given: + + - ```` + - ``.`` + - ``..`` + - ``.`` + - ``..`` + + ``signals`` may also contain one or more trigger signals. For + information about trigger signals, refer to Signal Output + Single-Point Mode or Signal Input Single-Point Mode. + """ + flattened_list = _utils.flatten_items(signals) + + self._handle = None # To satisfy `__del__` in case nx_create_session throws + self._handle = _funcs.nx_create_session( + database_name, + cluster_name, + flattened_list, + "", + constants.CreateSessionMode.SIGNAL_CONVERSION_SINGLE_POINT) + self._j1939 = session_j1939.J1939(self._handle) + self._signals = session_signals.Signals(self._handle) + + def __del__(self): + if self._handle is not None: + warnings.warn( + 'Session was not explicitly closed before it was destructed. ' + 'Resources on the device may still be reserved.', + errors.XnetResourceWarning) + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + + def __eq__(self, other): + if isinstance(other, self.__class__): + return self._handle == typing.cast(SignalConversionSinglePointSession, other._handle) + else: + return NotImplemented + + def __ne__(self, other): + result = self.__eq__(other) + if result is NotImplemented: + return result + else: + return not result + + def __hash__(self): + return hash(self._handle) + + def __repr__(self): + # type: () -> typing.Text + return 'Session(handle={0})'.format(self._handle) + + def close(self): + # type: () -> None + """Close (clear) the XNET session.""" + if self._handle is None: + warnings.warn( + 'Attempting to close NI-XNET session but session was already ' + 'closed', errors.XnetResourceWarning) + return + + _funcs.nx_clear(self._handle) + + self._handle = None + + @property + def signals(self): + # type: () -> session_signals.Signals + """:any:`nixnet._session.signals.Signals`: Operate on session's signals""" + return self._signals + + @property + def j1939(self): + # type: () -> session_j1939.J1939 + """:any:`nixnet._session.j1939.J1939`: Returns the J1939 configuration object for the session.""" + return self._j1939 + + @property + def application_protocol(self): + # type: () -> constants.AppProtocol + """:any:`nixnet._enums.AppProtocol`: This property returns the application protocol that the session uses. + + The database used with the session determines the application protocol. + """ + return constants.AppProtocol(_props.get_session_application_protocol(self._handle)) + + @property + def cluster_name(self): + # type: () -> typing.Text + """str: This property returns the cluster (network) name used with the session.""" + return _props.get_session_cluster_name(self._handle) + + @property + def database_name(self): + # type: () -> typing.Text + """str: This property returns the database name used with the session.""" + return _props.get_session_database_name(self._handle) + + @property + def mode(self): + # type: () -> constants.CreateSessionMode + """:any:`nixnet._enums.CreateSessionMode`: This property returns the mode associated with the session. + + For more information, refer to :any:`nixnet._enums.CreateSessionMode`. + """ + return constants.CreateSessionMode(_props.get_session_mode(self._handle)) + + @property + def protocol(self): + # type: () -> constants.Protocol + """:any:`nixnet._enums.Protocol`: This property returns the protocol that the interface in the session uses.""" + return constants.Protocol(_props.get_session_protocol(self._handle)) + + def _convert_bytes_to_signals(self, bytes): + # type: (bytes) -> typing.Iterable[typing.Tuple[int, float]] + num_signals = len(self.signals) + timestamps, values = _funcs.nx_convert_frames_to_signals_single_point(self._handle, bytes, num_signals) + for timestamp, value in zip(timestamps, values): + yield timestamp.value, value.value + + def convert_frames_to_signals(self, frames): + # type: (typing.Iterable[types.Frame]) -> typing.Iterable[typing.Tuple[int, float]] + """Convert Frames to signals. + + The frames passed into the ``frames`` array are read one by one, and + the signal values found are written to internal buffers for each + signal. Frames are identified by their identifier (FlexRay: slot) + field. After all frames in ``frames`` array are processed, the internal + signal buffers' status is returned with the corresponding timestamps + from the frames where a signal value was found. The signal internal + buffers' status is being preserved over multiple calls to this + function. + + This way, for example, data returned from multiple calls of nxFrameRead + for a Frame Input Stream Mode session (or any other Frame Input + session) can be passed to this function directly. + + .. note:: Frames unknown to the session are silently ignored. + """ + units = itertools.chain.from_iterable( + _frames.serialize_frame(frame.to_raw()) + for frame in frames) + bytes = b"".join(units) + return self._convert_bytes_to_signals(bytes) + + def _convert_signals_to_bytes(self, signals, num_bytes): + # type: (typing.Iterable[float], int) -> bytes + buffer, number_of_bytes_returned = _funcs.nx_convert_signals_to_frames_single_point( + self._handle, + list(signals), + num_bytes) + return buffer[0:number_of_bytes_returned] + + def convert_signals_to_frames(self, signals, frame_type=types.XnetFrame): + # type: (typing.Iterable[float], typing.Type[types.FrameFactory]) -> typing.Iterable[types.Frame] + """Convert signals to frames. + + The signal values written to the ``signals`` array are written to a raw + frame buffer array. For each frame included in the session, one frame + is generated in the array that contains the signal values. Signals not + present in the session are written as their respective default values; + empty space in the frames that signals do not occupy is written with + the frame's default payload. + + The frame header values are filled with appropriate values so that this + function's output can be directly written to a Frame Output session. + + Args: + signals(list of float): Values corresponding to signals configured + in this session. + frame_type(:any:`nixnet.types.FrameFactory`): A factory for the + desired frame formats. + + Yields: + :any:`nixnet.types.Frame` + """ + from_raw = typing.cast(typing.Callable[[types.RawFrame], types.Frame], frame_type.from_raw) + # Unlike some session reads, this should be safe from asking to read too much. + num_frames_to_read = 5 + while True: + try: + num_bytes_to_read = num_frames_to_read * _frames.nxFrameFixed_t.size + buffer = self._convert_signals_to_bytes(signals, num_bytes_to_read) + break + except errors.XnetError as e: + if e.error_type == constants.Err.BUFFER_TOO_SMALL: + num_bytes_to_read *= 2 + else: + raise + for frame in _frames.iterate_frames(buffer): + yield from_raw(frame) diff --git a/nixnet/session.py b/nixnet/session.py index 6631a818..06001605 100644 --- a/nixnet/session.py +++ b/nixnet/session.py @@ -670,24 +670,3 @@ def write_signal_xy( timestamp_buffer, num_pairs_buffer): _funcs.nx_write_signal_xy(session_ref, timeout, value_buffer, timestamp_buffer, num_pairs_buffer) - - -def convert_frames_to_signals_single_point( - session_ref, - frame_buffer, - number_of_bytes_for_frames, - value_buffer, - size_of_value_buffer, - timestamp_buffer, - size_of_timestamp_buffer): - raise NotImplementedError("Placeholder") - - -def convert_signals_to_frames_single_point( - session_ref, - value_buffer, - size_of_value_buffer, - buffer, - size_of_buffer, - number_of_bytes_returned): - raise NotImplementedError("Placeholder") diff --git a/nixnet_examples/can_signal_conversion.py b/nixnet_examples/can_signal_conversion.py new file mode 100644 index 00000000..23785779 --- /dev/null +++ b/nixnet_examples/can_signal_conversion.py @@ -0,0 +1,47 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pprint +import six + +from nixnet import convert + + +pp = pprint.PrettyPrinter(indent=4) + + +def main(): + database_name = 'NIXNET_example' + cluster_name = 'CAN_Cluster' + signal_names = ['CANEventSignal1', 'CANEventSignal2'] + + with convert.SignalConversionSinglePointSession( + database_name, + cluster_name, + signal_names) as session: + + user_value = six.moves.input('Enter {} signal values [float, float]: '.format(len(signal_names))) + try: + expected_signals = [float(x.strip()) for x in user_value.split(",")] + except ValueError: + expected_signals = [24.5343, 77.0129] + print('Unrecognized input ({}). Setting data buffer to {}'.format(user_value, expected_signals)) + + if len(expected_signals) != len(signal_names): + expected_signals = [24.5343, 77.0129] + print('Invalid number of signal values entered. Setting data buffer to {}'.format(expected_signals)) + + frames = session.convert_signals_to_frames(expected_signals) + print('Frames:') + for frame in frames: + print(' {}'.format(pp.pformat(frame))) + + converted_signals = session.convert_frames_to_signals(frames) + print('Signals:') + for expected, (_, converted) in zip(expected_signals, converted_signals): + print(' {} {}'.format(expected, converted)) + + +if __name__ == '__main__': + main() diff --git a/tests/test_convert.py b/tests/test_convert.py new file mode 100644 index 00000000..88c3278d --- /dev/null +++ b/tests/test_convert.py @@ -0,0 +1,84 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pytest # type: ignore + +from nixnet import constants +from nixnet import convert +from nixnet import errors + + +def raise_code(code): + raise errors.XnetError("", code) + + +@pytest.mark.integration +def test_session_container(): + database_name = 'NIXNET_example' + cluster_name = 'CAN_Cluster' + signals = ['CANEventSignal1', 'CANEventSignal2'] + + with convert.SignalConversionSinglePointSession( + database_name, + cluster_name, + signals) as session: + with convert.SignalConversionSinglePointSession( + database_name, + cluster_name, + signals) as dup_session: + assert session == session + assert not (session == dup_session) + assert not (session == 1) + + assert not (session != session) + assert session != dup_session + assert session != 1 + + set([session]) # Testing `__hash__` + + print(repr(session)) + + with pytest.warns(errors.XnetResourceWarning): + session.close() + + +@pytest.mark.integration +def test_session_properties(): + """Verify Session properties. + + Ideally, mutable properties would be set to multiple values and we'd test + for the intended side-effect. That'll be a massive undertaking. For now, + ensure they are settable and getttable. + """ + database_name = 'NIXNET_example' + cluster_name = 'CAN_Cluster' + signals = ['CANEventSignal1', 'CANEventSignal2'] + + with convert.SignalConversionSinglePointSession( + database_name, + cluster_name, + signals) as session: + assert session.database_name == database_name + assert session.cluster_name == cluster_name + assert session.mode == constants.CreateSessionMode.SIGNAL_CONVERSION_SINGLE_POINT + assert session.application_protocol == constants.AppProtocol.NONE + assert session.protocol == constants.Protocol.CAN + + +@pytest.mark.integration +def test_conversion_roundtrip(): + database_name = 'NIXNET_example' + cluster_name = 'CAN_Cluster' + signal_names = ['CANEventSignal1', 'CANEventSignal2'] + + with convert.SignalConversionSinglePointSession( + database_name, + cluster_name, + signal_names) as session: + + expected_signals = [2, 3] + frames = session.convert_signals_to_frames(expected_signals) + converted_signals = session.convert_frames_to_signals(frames) + for expected, (_, converted) in zip(expected_signals, converted_signals): + assert pytest.approx(expected) == converted diff --git a/tests/test_examples.py b/tests/test_examples.py index eb45fb15..6fe94006 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -11,6 +11,7 @@ from nixnet_examples import can_frame_queued_io from nixnet_examples import can_frame_stream_io +from nixnet_examples import can_signal_conversion from nixnet_examples import can_signal_single_point_io @@ -23,6 +24,8 @@ MockXnetLibrary.nx_read_frame.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_write_signal_single_point.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_read_signal_single_point.return_value = _ctypedefs.u32(0) +MockXnetLibrary.nx_convert_frames_to_signals_single_point.return_value = _ctypedefs.u32(0) +MockXnetLibrary.nx_convert_signals_to_frames_single_point.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_stop.return_value = _ctypedefs.u32(0) MockXnetLibrary.nx_clear.return_value = _ctypedefs.u32(0) @@ -79,3 +82,15 @@ def test_can_frame_stream_empty_session(input_values): def test_can_signal_single_point_empty_session(input_values): with mock.patch('six.moves.input', six_input(input_values)): can_signal_single_point_io.main() + + +@pytest.mark.parametrize("input_values", [ + ['1, 2'], + ['1'], + [''], +]) +@mock.patch('nixnet._cfuncs.lib', MockXnetLibrary) +@mock.patch('time.sleep', lambda time: None) +def test_can_signal_conversion_empty_session(input_values): + with mock.patch('six.moves.input', six_input(input_values)): + can_signal_conversion.main()