diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 115929b..c2ea3b5 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -16,7 +16,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, windows-latest, macos-latest] - python-version: [3.9, 3.11] + python-version: [3.9, 3.12] fail-fast: false steps: - name: Checkout diff --git a/harp/__init__.py b/harp/__init__.py index d3581a5..31ec97f 100644 --- a/harp/__init__.py +++ b/harp/__init__.py @@ -1,5 +1,5 @@ -from harp.io import REFERENCE_EPOCH, MessageType, read +from harp.io import REFERENCE_EPOCH, MessageType, parse, read from harp.reader import create_reader from harp.schema import read_schema -__all__ = ["REFERENCE_EPOCH", "MessageType", "read", "create_reader", "read_schema"] +__all__ = ["REFERENCE_EPOCH", "MessageType", "parse", "read", "create_reader", "read_schema"] diff --git a/harp/io.py b/harp/io.py index 8c4bf96..aefa514 100644 --- a/harp/io.py +++ b/harp/io.py @@ -4,9 +4,12 @@ from typing import Any, BinaryIO, Optional, Union import numpy as np +import numpy.typing as npt import pandas as pd from pandas._typing import Axes +from harp.typing import BufferLike + REFERENCE_EPOCH = datetime(1904, 1, 1) """The reference epoch for UTC harp time.""" @@ -73,6 +76,59 @@ def read( A pandas data frame containing message data, sorted by time. """ data = np.fromfile(file, dtype=np.uint8) + return _fromraw(data, address, dtype, length, columns, epoch, keep_type) + + +def parse( + buffer: BufferLike, + address: Optional[int] = None, + dtype: Optional[np.dtype] = None, + length: Optional[int] = None, + columns: Optional[Axes] = None, + epoch: Optional[datetime] = None, + keep_type: bool = False, +): + """Parse single-register Harp data from the specified buffer. + + Parameters + ---------- + buffer + An object that exposes a buffer interface containing binary data from + a single device register. + address + Expected register address. If specified, the address of + the first message in the buffer is used for validation. + dtype + Expected data type of the register payload. If specified, the + payload type of the first message in the buffer is used for validation. + length + Expected number of elements in register payload. If specified, the + payload length of the first message in the buffer is used for validation. + columns + The optional column labels to use for the data values. + epoch + Reference datetime at which time zero begins. If specified, + the result data frame will have a datetime index. + keep_type + Specifies whether to include a column with the message type. + + Returns + ------- + A pandas data frame containing message data, sorted by time. + """ + data = np.frombuffer(buffer, dtype=np.uint8) + return _fromraw(data, address, dtype, length, columns, epoch, keep_type) + + +def _fromraw( + data: npt.NDArray[np.uint8], + address: Optional[int] = None, + dtype: Optional[np.dtype] = None, + length: Optional[int] = None, + columns: Optional[Axes] = None, + epoch: Optional[datetime] = None, + keep_type: bool = False, +): if len(data) == 0: return pd.DataFrame(columns=columns, index=pd.Index([], dtype=np.float64, name="Time")) diff --git a/harp/model.py b/harp/model.py index b6f5031..09f30c2 100644 --- a/harp/model.py +++ b/harp/model.py @@ -7,14 +7,7 @@ from enum import Enum from typing import Annotated, Dict, List, Optional, Union -from pydantic import ( - BaseModel, - BeforeValidator, - ConfigDict, - Field, - RootModel, - field_serializer, -) +from pydantic import BaseModel, BeforeValidator, ConfigDict, Field, RootModel, field_serializer class PayloadType(str, Enum): diff --git a/harp/reader.py b/harp/reader.py index b1b4961..138cf2e 100644 --- a/harp/reader.py +++ b/harp/reader.py @@ -12,9 +12,10 @@ from pandas import DataFrame, Series from pandas._typing import Axes -from harp.io import MessageType, read +from harp.io import MessageType, parse, read from harp.model import BitMask, GroupMask, Model, PayloadMember, Register from harp.schema import read_schema +from harp.typing import BufferLike @dataclass @@ -33,17 +34,29 @@ def __call__( ) -> DataFrame: ... +class _ParseRegister(Protocol): + def __call__( + self, + buffer: BufferLike, + epoch: Optional[datetime] = None, + keep_type: bool = False, + ) -> DataFrame: ... + + class RegisterReader: register: Register read: _ReadRegister + parse: _ParseRegister def __init__( self, register: Register, read: _ReadRegister, + parse: _ParseRegister, ) -> None: self.register = register self.read = read + self.parse = parse class RegisterMap(UserDict[str, RegisterReader]): @@ -81,12 +94,12 @@ def _compose_parser( params: _ReaderParams, ) -> Callable[..., DataFrame]: def parser( - file: Optional[Union[str, bytes, PathLike[Any], BinaryIO]] = None, + data, columns: Optional[Axes] = None, epoch: Optional[datetime] = params.epoch, keep_type: bool = params.keep_type, ): - df = g(file, columns, epoch, keep_type) + df = g(data, columns, epoch, keep_type) result = f(df) type_col = df.get(MessageType.__name__) if type_col is not None: @@ -189,38 +202,63 @@ def reader( return reader -def _create_register_parser(device: Model, name: str, params: _ReaderParams): +def _create_register_parser(register: Register, params: _ReaderParams): + def parser( + buffer: BufferLike, + columns: Optional[Axes] = None, + epoch: Optional[datetime] = params.epoch, + keep_type: bool = params.keep_type, + ): + return parse( + buffer, + address=register.address, + dtype=dtype(register.type), + length=register.length, + columns=columns, + epoch=epoch, + keep_type=keep_type, + ) + + return parser + + +def _create_register_handler(device: Model, name: str, params: _ReaderParams): register = device.registers[name] reader = _create_register_reader(register, params) + parser = _create_register_parser(register, params) if register.maskType is not None: key = register.maskType.root bitMask = None if device.bitMasks is None else device.bitMasks.get(key) if bitMask is not None: - parser = _create_bitmask_parser(bitMask) - reader = _compose_parser(parser, reader, params) - return RegisterReader(register, reader) + bitmask_parser = _create_bitmask_parser(bitMask) + reader = _compose_parser(bitmask_parser, reader, params) + parser = _compose_parser(bitmask_parser, parser, params) + return RegisterReader(register, reader, parser) groupMask = None if device.groupMasks is None else device.groupMasks.get(key) if groupMask is not None: - parser = _create_groupmask_parser(name, groupMask) - reader = _compose_parser(parser, reader, params) - return RegisterReader(register, reader) + groupmask_parser = _create_groupmask_parser(name, groupMask) + reader = _compose_parser(groupmask_parser, reader, params) + parser = _compose_parser(groupmask_parser, parser, params) + return RegisterReader(register, reader, parser) if register.payloadSpec is not None: - payload_parsers = [ + member_parsers = [ (key, _create_payloadmember_parser(device, member)) for key, member in register.payloadSpec.items() ] - def parser(df: DataFrame): - return DataFrame({n: f(df) for n, f in payload_parsers}, index=df.index) + def payload_parser(df: DataFrame): + return DataFrame({n: f(df) for n, f in member_parsers}, index=df.index) - reader = _compose_parser(parser, reader, params) - return RegisterReader(register, reader) + reader = _compose_parser(payload_parser, reader, params) + parser = _compose_parser(payload_parser, parser, params) + return RegisterReader(register, reader, parser) reader = partial(reader, columns=[name]) - return RegisterReader(register, reader) + parser = partial(parser, columns=[name]) + return RegisterReader(register, reader, parser) def create_reader( @@ -265,7 +303,7 @@ def create_reader( base_path = path / device.device if is_dir else path.parent / device.device reg_readers = { - name: _create_register_parser(device, name, _ReaderParams(base_path, epoch, keep_type)) + name: _create_register_handler(device, name, _ReaderParams(base_path, epoch, keep_type)) for name in device.registers.keys() } return DeviceReader(device, reg_readers) diff --git a/harp/typing.py b/harp/typing.py new file mode 100644 index 0000000..e462ecd --- /dev/null +++ b/harp/typing.py @@ -0,0 +1,10 @@ +import mmap +import sys +from typing import Any, Union + +from numpy.typing import NDArray + +if sys.version_info >= (3, 12): + from collections.abc import Buffer as BufferLike +else: + BufferLike = Union[bytes, bytearray, memoryview, mmap.mmap, NDArray[Any]] diff --git a/pyproject.toml b/pyproject.toml index d038094..f8264b0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,6 +67,9 @@ exclude = [ "reflex-generator" ] +[tool.ruff.lint] +select = ["I"] + [tool.pyright] venvPath = "." venv = ".venv" diff --git a/tests/params.py b/tests/params.py index 9040a26..7baa254 100644 --- a/tests/params.py +++ b/tests/params.py @@ -19,6 +19,7 @@ class DataFileParam: expected_dtype: Optional[np.dtype] = None expected_length: Optional[int] = None expected_error: Optional[Type[BaseException]] = None + repeat_data: Optional[int] = None keep_type: bool = False def __post_init__(self): diff --git a/tests/test_io.py b/tests/test_io.py index 25c5401..d38e90c 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -4,7 +4,7 @@ import pytest from pytest import mark -from harp.io import MessageType, read +from harp.io import MessageType, parse, read from tests.params import DataFileParam testdata = [ @@ -29,6 +29,7 @@ ), DataFileParam(path="data/write_0.bin", expected_address=0, expected_rows=4), DataFileParam(path="data/write_0.bin", expected_address=0, expected_rows=4, keep_type=True), + DataFileParam(path="data/device_0.bin", expected_rows=300, repeat_data=300), ] @@ -36,13 +37,25 @@ def test_read(dataFile: DataFileParam): context = pytest.raises if dataFile.expected_error else nullcontext with context(dataFile.expected_error): # type: ignore - data = read( - dataFile.path, - address=dataFile.expected_address, - dtype=dataFile.expected_dtype, - length=dataFile.expected_length, - keep_type=dataFile.keep_type, - ) + path = dataFile.path + if dataFile.repeat_data: + with open(path, "rb") as f: + buffer = f.read() * dataFile.repeat_data + data = parse( + buffer, + address=dataFile.expected_address, + dtype=dataFile.expected_dtype, + length=dataFile.expected_length, + keep_type=dataFile.keep_type, + ) + else: + data = read( + path, + address=dataFile.expected_address, + dtype=dataFile.expected_dtype, + length=dataFile.expected_length, + keep_type=dataFile.keep_type, + ) assert len(data) == dataFile.expected_rows if dataFile.keep_type: assert MessageType.__name__ in data.columns and data[MessageType.__name__].dtype == "category"