Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-latest]
python-version: [3.9, 3.11]
python-version: [3.9, 3.12]
fail-fast: false
steps:
- name: Checkout
Expand Down
4 changes: 2 additions & 2 deletions harp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from harp.io import REFERENCE_EPOCH, MessageType, read
from harp.io import REFERENCE_EPOCH, MessageType, parse, read
from harp.reader import create_reader
from harp.schema import read_schema

__all__ = ["REFERENCE_EPOCH", "MessageType", "read", "create_reader", "read_schema"]
__all__ = ["REFERENCE_EPOCH", "MessageType", "parse", "read", "create_reader", "read_schema"]
56 changes: 56 additions & 0 deletions harp/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
from typing import Any, BinaryIO, Optional, Union

import numpy as np
import numpy.typing as npt
import pandas as pd
from pandas._typing import Axes

from harp.typing import BufferLike

REFERENCE_EPOCH = datetime(1904, 1, 1)
"""The reference epoch for UTC harp time."""

Expand Down Expand Up @@ -73,6 +76,59 @@ def read(
A pandas data frame containing message data, sorted by time.
"""
data = np.fromfile(file, dtype=np.uint8)
return _fromraw(data, address, dtype, length, columns, epoch, keep_type)


def parse(
buffer: BufferLike,
address: Optional[int] = None,
dtype: Optional[np.dtype] = None,
length: Optional[int] = None,
columns: Optional[Axes] = None,
epoch: Optional[datetime] = None,
keep_type: bool = False,
):
"""Parse single-register Harp data from the specified buffer.

Parameters
----------
buffer
An object that exposes a buffer interface containing binary data from
a single device register.
address
Expected register address. If specified, the address of
the first message in the buffer is used for validation.
dtype
Expected data type of the register payload. If specified, the
payload type of the first message in the buffer is used for validation.
length
Expected number of elements in register payload. If specified, the
payload length of the first message in the buffer is used for validation.
columns
The optional column labels to use for the data values.
epoch
Reference datetime at which time zero begins. If specified,
the result data frame will have a datetime index.
keep_type
Specifies whether to include a column with the message type.

Returns
-------
A pandas data frame containing message data, sorted by time.
"""
data = np.frombuffer(buffer, dtype=np.uint8)
return _fromraw(data, address, dtype, length, columns, epoch, keep_type)


def _fromraw(
data: npt.NDArray[np.uint8],
address: Optional[int] = None,
dtype: Optional[np.dtype] = None,
length: Optional[int] = None,
columns: Optional[Axes] = None,
epoch: Optional[datetime] = None,
keep_type: bool = False,
):
if len(data) == 0:
return pd.DataFrame(columns=columns, index=pd.Index([], dtype=np.float64, name="Time"))

Expand Down
9 changes: 1 addition & 8 deletions harp/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,7 @@
from enum import Enum
from typing import Annotated, Dict, List, Optional, Union

from pydantic import (
BaseModel,
BeforeValidator,
ConfigDict,
Field,
RootModel,
field_serializer,
)
from pydantic import BaseModel, BeforeValidator, ConfigDict, Field, RootModel, field_serializer


class PayloadType(str, Enum):
Expand Down
72 changes: 55 additions & 17 deletions harp/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
from pandas import DataFrame, Series
from pandas._typing import Axes

from harp.io import MessageType, read
from harp.io import MessageType, parse, read
from harp.model import BitMask, GroupMask, Model, PayloadMember, Register
from harp.schema import read_schema
from harp.typing import BufferLike


@dataclass
Expand All @@ -33,17 +34,29 @@ def __call__(
) -> DataFrame: ...


class _ParseRegister(Protocol):
def __call__(
self,
buffer: BufferLike,
epoch: Optional[datetime] = None,
keep_type: bool = False,
) -> DataFrame: ...


class RegisterReader:
register: Register
read: _ReadRegister
parse: _ParseRegister

def __init__(
self,
register: Register,
read: _ReadRegister,
parse: _ParseRegister,
) -> None:
self.register = register
self.read = read
self.parse = parse


class RegisterMap(UserDict[str, RegisterReader]):
Expand Down Expand Up @@ -81,12 +94,12 @@ def _compose_parser(
params: _ReaderParams,
) -> Callable[..., DataFrame]:
def parser(
file: Optional[Union[str, bytes, PathLike[Any], BinaryIO]] = None,
data,
columns: Optional[Axes] = None,
epoch: Optional[datetime] = params.epoch,
keep_type: bool = params.keep_type,
):
df = g(file, columns, epoch, keep_type)
df = g(data, columns, epoch, keep_type)
result = f(df)
type_col = df.get(MessageType.__name__)
if type_col is not None:
Expand Down Expand Up @@ -189,38 +202,63 @@ def reader(
return reader


def _create_register_parser(device: Model, name: str, params: _ReaderParams):
def _create_register_parser(register: Register, params: _ReaderParams):
def parser(
buffer: BufferLike,
columns: Optional[Axes] = None,
epoch: Optional[datetime] = params.epoch,
keep_type: bool = params.keep_type,
):
return parse(
buffer,
address=register.address,
dtype=dtype(register.type),
length=register.length,
columns=columns,
epoch=epoch,
keep_type=keep_type,
)

return parser


def _create_register_handler(device: Model, name: str, params: _ReaderParams):
register = device.registers[name]
reader = _create_register_reader(register, params)
parser = _create_register_parser(register, params)

if register.maskType is not None:
key = register.maskType.root
bitMask = None if device.bitMasks is None else device.bitMasks.get(key)
if bitMask is not None:
parser = _create_bitmask_parser(bitMask)
reader = _compose_parser(parser, reader, params)
return RegisterReader(register, reader)
bitmask_parser = _create_bitmask_parser(bitMask)
reader = _compose_parser(bitmask_parser, reader, params)
parser = _compose_parser(bitmask_parser, parser, params)
return RegisterReader(register, reader, parser)

groupMask = None if device.groupMasks is None else device.groupMasks.get(key)
if groupMask is not None:
parser = _create_groupmask_parser(name, groupMask)
reader = _compose_parser(parser, reader, params)
return RegisterReader(register, reader)
groupmask_parser = _create_groupmask_parser(name, groupMask)
reader = _compose_parser(groupmask_parser, reader, params)
parser = _compose_parser(groupmask_parser, parser, params)
return RegisterReader(register, reader, parser)

if register.payloadSpec is not None:
payload_parsers = [
member_parsers = [
(key, _create_payloadmember_parser(device, member))
for key, member in register.payloadSpec.items()
]

def parser(df: DataFrame):
return DataFrame({n: f(df) for n, f in payload_parsers}, index=df.index)
def payload_parser(df: DataFrame):
return DataFrame({n: f(df) for n, f in member_parsers}, index=df.index)

reader = _compose_parser(parser, reader, params)
return RegisterReader(register, reader)
reader = _compose_parser(payload_parser, reader, params)
parser = _compose_parser(payload_parser, parser, params)
return RegisterReader(register, reader, parser)

reader = partial(reader, columns=[name])
return RegisterReader(register, reader)
parser = partial(parser, columns=[name])
return RegisterReader(register, reader, parser)


def create_reader(
Expand Down Expand Up @@ -265,7 +303,7 @@ def create_reader(
base_path = path / device.device if is_dir else path.parent / device.device

reg_readers = {
name: _create_register_parser(device, name, _ReaderParams(base_path, epoch, keep_type))
name: _create_register_handler(device, name, _ReaderParams(base_path, epoch, keep_type))
for name in device.registers.keys()
}
return DeviceReader(device, reg_readers)
10 changes: 10 additions & 0 deletions harp/typing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import mmap
import sys
from typing import Any, Union

from numpy.typing import NDArray

if sys.version_info >= (3, 12):
from collections.abc import Buffer as BufferLike
else:
BufferLike = Union[bytes, bytearray, memoryview, mmap.mmap, NDArray[Any]]
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ exclude = [
"reflex-generator"
]

[tool.ruff.lint]
select = ["I"]

[tool.pyright]
venvPath = "."
venv = ".venv"
Expand Down
1 change: 1 addition & 0 deletions tests/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class DataFileParam:
expected_dtype: Optional[np.dtype] = None
expected_length: Optional[int] = None
expected_error: Optional[Type[BaseException]] = None
repeat_data: Optional[int] = None
keep_type: bool = False

def __post_init__(self):
Expand Down
29 changes: 21 additions & 8 deletions tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest
from pytest import mark

from harp.io import MessageType, read
from harp.io import MessageType, parse, read
from tests.params import DataFileParam

testdata = [
Expand All @@ -29,20 +29,33 @@
),
DataFileParam(path="data/write_0.bin", expected_address=0, expected_rows=4),
DataFileParam(path="data/write_0.bin", expected_address=0, expected_rows=4, keep_type=True),
DataFileParam(path="data/device_0.bin", expected_rows=300, repeat_data=300),
]


@mark.parametrize("dataFile", testdata)
def test_read(dataFile: DataFileParam):
context = pytest.raises if dataFile.expected_error else nullcontext
with context(dataFile.expected_error): # type: ignore
data = read(
dataFile.path,
address=dataFile.expected_address,
dtype=dataFile.expected_dtype,
length=dataFile.expected_length,
keep_type=dataFile.keep_type,
)
path = dataFile.path
if dataFile.repeat_data:
with open(path, "rb") as f:
buffer = f.read() * dataFile.repeat_data
data = parse(
buffer,
address=dataFile.expected_address,
dtype=dataFile.expected_dtype,
length=dataFile.expected_length,
keep_type=dataFile.keep_type,
)
else:
data = read(
path,
address=dataFile.expected_address,
dtype=dataFile.expected_dtype,
length=dataFile.expected_length,
keep_type=dataFile.keep_type,
)
assert len(data) == dataFile.expected_rows
if dataFile.keep_type:
assert MessageType.__name__ in data.columns and data[MessageType.__name__].dtype == "category"
Expand Down