diff --git a/harp/__init__.py b/harp/__init__.py index 31ec97f..ba6211c 100644 --- a/harp/__init__.py +++ b/harp/__init__.py @@ -1,5 +1,5 @@ -from harp.io import REFERENCE_EPOCH, MessageType, parse, read +from harp.io import REFERENCE_EPOCH, MessageType, read, to_buffer, to_file from harp.reader import create_reader from harp.schema import read_schema -__all__ = ["REFERENCE_EPOCH", "MessageType", "parse", "read", "create_reader", "read_schema"] +__all__ = ["REFERENCE_EPOCH", "MessageType", "read", "to_buffer", "to_file", "create_reader", "read_schema"] diff --git a/harp/io.py b/harp/io.py index 28ffe1f..5b9091b 100644 --- a/harp/io.py +++ b/harp/io.py @@ -8,7 +8,7 @@ import pandas as pd from pandas._typing import Axes -from harp.typing import BufferLike +from harp.typing import _BufferLike, _FileLike REFERENCE_EPOCH = datetime(1904, 1, 1) """The reference epoch for UTC harp time.""" @@ -41,7 +41,7 @@ class MessageType(IntEnum): def read( - file: Union[str, bytes, PathLike[Any], BinaryIO], + file_or_buf: Union[_FileLike, _BufferLike], address: Optional[int] = None, dtype: Optional[np.dtype] = None, length: Optional[int] = None, @@ -49,22 +49,22 @@ def read( epoch: Optional[datetime] = None, keep_type: bool = False, ): - """Read single-register Harp data from the specified file. + """Read single-register Harp data from the specified file or buffer. Parameters ---------- - file - Open file object or filename containing binary data from + file_or_buf + File path, open file object, or buffer containing binary data from a single device register. address Expected register address. If specified, the address of - the first message in the file is used for validation. + the first message is used for validation. dtype Expected data type of the register payload. If specified, the - payload type of the first message in the file is used for validation. + payload type of the first message is used for validation. length Expected number of elements in register payload. If specified, the - payload length of the first message in the file is used for validation. + payload length of the first message is used for validation. columns The optional column labels to use for the data values. epoch @@ -77,60 +77,13 @@ def read( ------- A pandas data frame containing message data, sorted by time. """ - data = np.fromfile(file, dtype=np.uint8) - return _fromraw(data, address, dtype, length, columns, epoch, keep_type) - - -def parse( - buffer: BufferLike, - address: Optional[int] = None, - dtype: Optional[np.dtype] = None, - length: Optional[int] = None, - columns: Optional[Axes] = None, - epoch: Optional[datetime] = None, - keep_type: bool = False, -): - """Parse single-register Harp data from the specified buffer. - - Parameters - ---------- - buffer - An object that exposes a buffer interface containing binary data from - a single device register. - address - Expected register address. If specified, the address of - the first message in the buffer is used for validation. - dtype - Expected data type of the register payload. If specified, the - payload type of the first message in the buffer is used for validation. - length - Expected number of elements in register payload. If specified, the - payload length of the first message in the buffer is used for validation. - columns - The optional column labels to use for the data values. - epoch - Reference datetime at which time zero begins. If specified, - the result data frame will have a datetime index. - keep_type - Specifies whether to include a column with the message type. - - Returns - ------- - A pandas data frame containing message data, sorted by time. - """ - data = np.frombuffer(buffer, dtype=np.uint8) - return _fromraw(data, address, dtype, length, columns, epoch, keep_type) - + if isinstance(file_or_buf, (str, PathLike, BinaryIO)) or hasattr(file_or_buf, "readinto"): + # TODO: in the below we ignore the type as otherwise + # we have no way to runtime check _IOProtocol + data = np.fromfile(file_or_buf, dtype=np.uint8) # type: ignore + else: + data = np.frombuffer(file_or_buf, dtype=np.uint8) -def _fromraw( - data: npt.NDArray[np.uint8], - address: Optional[int] = None, - dtype: Optional[np.dtype] = None, - length: Optional[int] = None, - columns: Optional[Axes] = None, - epoch: Optional[datetime] = None, - keep_type: bool = False, -): if len(data) == 0: return pd.DataFrame( columns=columns, @@ -185,11 +138,12 @@ def _fromraw( return result -def write( - file: Union[str, bytes, PathLike[Any], BinaryIO], +def to_file( data: pd.DataFrame, + file: _FileLike, address: int, dtype: Optional[np.dtype] = None, + length: Optional[int] = None, port: Optional[int] = None, epoch: Optional[datetime] = None, message_type: Optional[MessageType] = None, @@ -198,16 +152,19 @@ def write( Parameters ---------- - file - Open file object or filename where to store binary data from - a single device register. data Pandas data frame containing message payload. + file + File path, or open file object in which to store binary data from + a single device register. address Register address used to identify all formatted Harp messages. dtype Data type of the register payload. If specified, all data will be converted before formatting the binary payload. + length + Expected number of elements in register payload. If specified, the + number of columns in the input data frame is validated. port Optional port value used for all formatted Harp messages. epoch @@ -217,19 +174,20 @@ def write( Optional message type used for all formatted Harp messages. If not specified, data must contain a MessageType column. """ - buffer = format(data, address, dtype, port, epoch, message_type) + buffer = to_buffer(data, address, dtype, port, length, epoch, message_type) buffer.tofile(file) -def format( +def to_buffer( data: pd.DataFrame, address: int, dtype: Optional[np.dtype] = None, + length: Optional[int] = None, port: Optional[int] = None, epoch: Optional[datetime] = None, message_type: Optional[MessageType] = None, ) -> npt.NDArray[np.uint8]: - """Format single-register Harp data as a flat binary buffer. + """Convert single-register Harp data to a flat binary buffer. Parameters ---------- @@ -240,6 +198,9 @@ def format( dtype Data type of the register payload. If specified, all data will be converted before formatting the binary payload. + length + Expected number of elements in register payload. If specified, the + number of columns in the input data frame is validated. port Optional port value used for all formatted Harp messages. epoch @@ -254,12 +215,13 @@ def format( An array object containing message data formatted according to the Harp binary protocol. """ - if len(data) == 0: + nrows = len(data) + if nrows == 0: return np.empty(0, dtype=np.uint8) - if "MessageType" in data.columns: - msgtype = data["MessageType"].cat.codes - payload = data[data.columns.drop("MessageType")].values + if MessageType.__name__ in data.columns: + msgtype = data[MessageType.__name__].cat.codes + payload = data[data.columns.drop(MessageType.__name__)].values elif message_type is not None: msgtype = message_type payload = data.values @@ -278,17 +240,20 @@ def format( if dtype is not None: payload = payload.astype(dtype, copy=False) + ncols = payload.shape[1] + if length is not None and ncols != length: + raise ValueError(f"expected payload length {length} but got {ncols}") + if port is None: port = 255 payloadtype = _payloadtypefromdtype[payload.dtype] - payloadlength = payload.shape[1] * payload.dtype.itemsize + payloadlength = ncols * payload.dtype.itemsize stride = payloadlength + 6 if is_timestamped: payloadtype |= _PAYLOAD_TIMESTAMP_MASK stride += 6 - nrows = len(data) buffer = np.empty((nrows, stride), dtype=np.uint8) buffer[:, 0] = msgtype buffer[:, 1:5] = [stride - 2, address, port, payloadtype] diff --git a/harp/reader.py b/harp/reader.py index 3954892..590164c 100644 --- a/harp/reader.py +++ b/harp/reader.py @@ -6,21 +6,21 @@ from math import log2 from os import PathLike from pathlib import Path -from typing import Any, BinaryIO, Callable, Iterable, Mapping, Optional, Protocol, Union +from typing import Callable, Iterable, Mapping, Optional, Protocol, Union from numpy import dtype from pandas import DataFrame, Series from pandas._typing import Axes -from harp.io import MessageType, parse, read +from harp.io import MessageType, read from harp.model import BitMask, GroupMask, Model, PayloadMember, Register from harp.schema import read_schema -from harp.typing import BufferLike +from harp.typing import _BufferLike, _FileLike @dataclass class _ReaderParams: - path: Path + base_path: Path epoch: Optional[datetime] = None keep_type: bool = False @@ -28,16 +28,7 @@ class _ReaderParams: class _ReadRegister(Protocol): def __call__( self, - file: Optional[Union[str, bytes, PathLike[Any], BinaryIO]] = None, - epoch: Optional[datetime] = None, - keep_type: bool = False, - ) -> DataFrame: ... - - -class _ParseRegister(Protocol): - def __call__( - self, - buffer: BufferLike, + file_or_buf: Optional[Union[_FileLike, _BufferLike]] = None, epoch: Optional[datetime] = None, keep_type: bool = False, ) -> DataFrame: ... @@ -46,17 +37,14 @@ def __call__( class RegisterReader: register: Register read: _ReadRegister - parse: _ParseRegister def __init__( self, register: Register, read: _ReadRegister, - parse: _ParseRegister, ) -> None: self.register = register self.read = read - self.parse = parse class RegisterMap(UserDict[str, RegisterReader]): @@ -180,16 +168,16 @@ def parser(df: DataFrame): def _create_register_reader(register: Register, params: _ReaderParams): def reader( - file: Optional[Union[str, bytes, PathLike[Any], BinaryIO]] = None, + file_or_buf: Optional[Union[_FileLike, _BufferLike]] = None, columns: Optional[Axes] = None, epoch: Optional[datetime] = params.epoch, keep_type: bool = params.keep_type, ): - if file is None: - file = f"{params.path}_{register.address}.bin" + if file_or_buf is None: + file_or_buf = f"{params.base_path}_{register.address}.bin" data = read( - file, + file_or_buf, address=register.address, dtype=dtype(register.type), length=register.length, @@ -202,30 +190,9 @@ def reader( return reader -def _create_register_parser(register: Register, params: _ReaderParams): - def parser( - buffer: BufferLike, - columns: Optional[Axes] = None, - epoch: Optional[datetime] = params.epoch, - keep_type: bool = params.keep_type, - ): - return parse( - buffer, - address=register.address, - dtype=dtype(register.type), - length=register.length, - columns=columns, - epoch=epoch, - keep_type=keep_type, - ) - - return parser - - def _create_register_handler(device: Model, name: str, params: _ReaderParams): register = device.registers[name] reader = _create_register_reader(register, params) - parser = _create_register_parser(register, params) if register.maskType is not None: key = register.maskType.root @@ -233,15 +200,13 @@ def _create_register_handler(device: Model, name: str, params: _ReaderParams): if bitMask is not None: bitmask_parser = _create_bitmask_parser(bitMask) reader = _compose_parser(bitmask_parser, reader, params) - parser = _compose_parser(bitmask_parser, parser, params) - return RegisterReader(register, reader, parser) + return RegisterReader(register, reader) groupMask = None if device.groupMasks is None else device.groupMasks.get(key) if groupMask is not None: groupmask_parser = _create_groupmask_parser(name, groupMask) reader = _compose_parser(groupmask_parser, reader, params) - parser = _compose_parser(groupmask_parser, parser, params) - return RegisterReader(register, reader, parser) + return RegisterReader(register, reader) if register.payloadSpec is not None: member_parsers = [ @@ -253,8 +218,7 @@ def payload_parser(df: DataFrame): return DataFrame({n: f(df) for n, f in member_parsers}, index=df.index) reader = _compose_parser(payload_parser, reader, params) - parser = _compose_parser(payload_parser, parser, params) - return RegisterReader(register, reader, parser) + return RegisterReader(register, reader) columns = ( [name] @@ -262,8 +226,7 @@ def payload_parser(df: DataFrame): else [f"{name}_{i}" for i in range(register.length)] ) reader = partial(reader, columns=columns) - parser = partial(parser, columns=columns) - return RegisterReader(register, reader, parser) + return RegisterReader(register, reader) def create_reader( diff --git a/harp/typing.py b/harp/typing.py index e462ecd..1738f70 100644 --- a/harp/typing.py +++ b/harp/typing.py @@ -1,10 +1,13 @@ import mmap import sys -from typing import Any, Union +from os import PathLike +from typing import Any, BinaryIO, Union from numpy.typing import NDArray if sys.version_info >= (3, 12): - from collections.abc import Buffer as BufferLike + from collections.abc import Buffer as _BufferLike else: - BufferLike = Union[bytes, bytearray, memoryview, mmap.mmap, NDArray[Any]] + _BufferLike = Union[bytes, bytearray, memoryview, mmap.mmap, NDArray[Any]] + +_FileLike = Union[str, PathLike[str], BinaryIO] diff --git a/tests/test_io.py b/tests/test_io.py index 74536b3..3a94632 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -5,7 +5,7 @@ import pytest from pytest import mark -from harp.io import REFERENCE_EPOCH, MessageType, format, parse, read +from harp.io import REFERENCE_EPOCH, MessageType, read, to_buffer from tests.params import DataFileParam testdata = [ @@ -41,27 +41,20 @@ def test_read(dataFile: DataFileParam): context = pytest.raises if dataFile.expected_error else nullcontext with context(dataFile.expected_error): # type: ignore - path = dataFile.path + file_or_buf = dataFile.path if dataFile.repeat_data: - with open(path, "rb") as f: - buffer = f.read() * dataFile.repeat_data - data = parse( - buffer, - address=dataFile.expected_address, - dtype=dataFile.expected_dtype, - length=dataFile.expected_length, - epoch=dataFile.epoch, - keep_type=dataFile.keep_type, - ) - else: - data = read( - path, - address=dataFile.expected_address, - dtype=dataFile.expected_dtype, - length=dataFile.expected_length, - epoch=dataFile.epoch, - keep_type=dataFile.keep_type, - ) + with open(file_or_buf, "rb") as f: + file_or_buf = f.read() * dataFile.repeat_data + + data = read( + file_or_buf, + address=dataFile.expected_address, + dtype=dataFile.expected_dtype, + length=dataFile.expected_length, + epoch=dataFile.epoch, + keep_type=dataFile.keep_type, + ) + assert len(data) == dataFile.expected_rows assert isinstance(data.index, pd.DatetimeIndex if dataFile.epoch else pd.Index) if dataFile.keep_type: @@ -83,7 +76,7 @@ def test_write(dataFile: DataFileParam): raise AssertionError("expected address must be defined for all write tests") buffer = np.fromfile(dataFile.path, np.uint8) - data = parse( + data = read( buffer, address=dataFile.expected_address, dtype=dataFile.expected_dtype, @@ -91,5 +84,5 @@ def test_write(dataFile: DataFileParam): keep_type=dataFile.keep_type, ) assert len(data) == dataFile.expected_rows - write_buffer = format(data, address=dataFile.expected_address) + write_buffer = to_buffer(data, address=dataFile.expected_address, length=dataFile.expected_length) assert np.array_equal(buffer, write_buffer)