Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions harp/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from harp.io import REFERENCE_EPOCH, MessageType, parse, read
from harp.io import REFERENCE_EPOCH, MessageType, read, to_buffer, to_file
from harp.reader import create_reader
from harp.schema import read_schema

__all__ = ["REFERENCE_EPOCH", "MessageType", "parse", "read", "create_reader", "read_schema"]
__all__ = ["REFERENCE_EPOCH", "MessageType", "read", "to_buffer", "to_file", "create_reader", "read_schema"]
115 changes: 40 additions & 75 deletions harp/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pandas as pd
from pandas._typing import Axes

from harp.typing import BufferLike
from harp.typing import _BufferLike, _FileLike

REFERENCE_EPOCH = datetime(1904, 1, 1)
"""The reference epoch for UTC harp time."""
Expand Down Expand Up @@ -41,30 +41,30 @@ class MessageType(IntEnum):


def read(
file: Union[str, bytes, PathLike[Any], BinaryIO],
file_or_buf: Union[_FileLike, _BufferLike],
address: Optional[int] = None,
dtype: Optional[np.dtype] = None,
length: Optional[int] = None,
columns: Optional[Axes] = None,
epoch: Optional[datetime] = None,
keep_type: bool = False,
):
"""Read single-register Harp data from the specified file.
"""Read single-register Harp data from the specified file or buffer.

Parameters
----------
file
Open file object or filename containing binary data from
file_or_buf
File path, open file object, or buffer containing binary data from
a single device register.
address
Expected register address. If specified, the address of
the first message in the file is used for validation.
the first message is used for validation.
dtype
Expected data type of the register payload. If specified, the
payload type of the first message in the file is used for validation.
payload type of the first message is used for validation.
length
Expected number of elements in register payload. If specified, the
payload length of the first message in the file is used for validation.
payload length of the first message is used for validation.
columns
The optional column labels to use for the data values.
epoch
Expand All @@ -77,60 +77,13 @@ def read(
-------
A pandas data frame containing message data, sorted by time.
"""
data = np.fromfile(file, dtype=np.uint8)
return _fromraw(data, address, dtype, length, columns, epoch, keep_type)


def parse(
buffer: BufferLike,
address: Optional[int] = None,
dtype: Optional[np.dtype] = None,
length: Optional[int] = None,
columns: Optional[Axes] = None,
epoch: Optional[datetime] = None,
keep_type: bool = False,
):
"""Parse single-register Harp data from the specified buffer.

Parameters
----------
buffer
An object that exposes a buffer interface containing binary data from
a single device register.
address
Expected register address. If specified, the address of
the first message in the buffer is used for validation.
dtype
Expected data type of the register payload. If specified, the
payload type of the first message in the buffer is used for validation.
length
Expected number of elements in register payload. If specified, the
payload length of the first message in the buffer is used for validation.
columns
The optional column labels to use for the data values.
epoch
Reference datetime at which time zero begins. If specified,
the result data frame will have a datetime index.
keep_type
Specifies whether to include a column with the message type.

Returns
-------
A pandas data frame containing message data, sorted by time.
"""
data = np.frombuffer(buffer, dtype=np.uint8)
return _fromraw(data, address, dtype, length, columns, epoch, keep_type)

if isinstance(file_or_buf, (str, PathLike, BinaryIO)) or hasattr(file_or_buf, "readinto"):
# TODO: in the below we ignore the type as otherwise
# we have no way to runtime check _IOProtocol
data = np.fromfile(file_or_buf, dtype=np.uint8) # type: ignore
else:
data = np.frombuffer(file_or_buf, dtype=np.uint8)

def _fromraw(
data: npt.NDArray[np.uint8],
address: Optional[int] = None,
dtype: Optional[np.dtype] = None,
length: Optional[int] = None,
columns: Optional[Axes] = None,
epoch: Optional[datetime] = None,
keep_type: bool = False,
):
if len(data) == 0:
return pd.DataFrame(
columns=columns,
Expand Down Expand Up @@ -185,11 +138,12 @@ def _fromraw(
return result


def write(
file: Union[str, bytes, PathLike[Any], BinaryIO],
def to_file(
data: pd.DataFrame,
file: _FileLike,
address: int,
dtype: Optional[np.dtype] = None,
length: Optional[int] = None,
port: Optional[int] = None,
epoch: Optional[datetime] = None,
message_type: Optional[MessageType] = None,
Expand All @@ -198,16 +152,19 @@ def write(

Parameters
----------
file
Open file object or filename where to store binary data from
a single device register.
data
Pandas data frame containing message payload.
file
File path, or open file object in which to store binary data from
a single device register.
address
Register address used to identify all formatted Harp messages.
dtype
Data type of the register payload. If specified, all data will
be converted before formatting the binary payload.
length
Expected number of elements in register payload. If specified, the
number of columns in the input data frame is validated.
port
Optional port value used for all formatted Harp messages.
epoch
Expand All @@ -217,19 +174,20 @@ def write(
Optional message type used for all formatted Harp messages.
If not specified, data must contain a MessageType column.
"""
buffer = format(data, address, dtype, port, epoch, message_type)
buffer = to_buffer(data, address, dtype, port, length, epoch, message_type)
buffer.tofile(file)


def format(
def to_buffer(
data: pd.DataFrame,
address: int,
dtype: Optional[np.dtype] = None,
length: Optional[int] = None,
port: Optional[int] = None,
epoch: Optional[datetime] = None,
message_type: Optional[MessageType] = None,
) -> npt.NDArray[np.uint8]:
"""Format single-register Harp data as a flat binary buffer.
"""Convert single-register Harp data to a flat binary buffer.

Parameters
----------
Expand All @@ -240,6 +198,9 @@ def format(
dtype
Data type of the register payload. If specified, all data will
be converted before formatting the binary payload.
length
Expected number of elements in register payload. If specified, the
number of columns in the input data frame is validated.
port
Optional port value used for all formatted Harp messages.
epoch
Expand All @@ -254,12 +215,13 @@ def format(
An array object containing message data formatted according
to the Harp binary protocol.
"""
if len(data) == 0:
nrows = len(data)
if nrows == 0:
return np.empty(0, dtype=np.uint8)

if "MessageType" in data.columns:
msgtype = data["MessageType"].cat.codes
payload = data[data.columns.drop("MessageType")].values
if MessageType.__name__ in data.columns:
msgtype = data[MessageType.__name__].cat.codes
payload = data[data.columns.drop(MessageType.__name__)].values
elif message_type is not None:
msgtype = message_type
payload = data.values
Expand All @@ -278,17 +240,20 @@ def format(
if dtype is not None:
payload = payload.astype(dtype, copy=False)

ncols = payload.shape[1]
if length is not None and ncols != length:
raise ValueError(f"expected payload length {length} but got {ncols}")

if port is None:
port = 255

payloadtype = _payloadtypefromdtype[payload.dtype]
payloadlength = payload.shape[1] * payload.dtype.itemsize
payloadlength = ncols * payload.dtype.itemsize
stride = payloadlength + 6
if is_timestamped:
payloadtype |= _PAYLOAD_TIMESTAMP_MASK
stride += 6

nrows = len(data)
buffer = np.empty((nrows, stride), dtype=np.uint8)
buffer[:, 0] = msgtype
buffer[:, 1:5] = [stride - 2, address, port, payloadtype]
Expand Down
63 changes: 13 additions & 50 deletions harp/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,29 @@
from math import log2
from os import PathLike
from pathlib import Path
from typing import Any, BinaryIO, Callable, Iterable, Mapping, Optional, Protocol, Union
from typing import Callable, Iterable, Mapping, Optional, Protocol, Union

from numpy import dtype
from pandas import DataFrame, Series
from pandas._typing import Axes

from harp.io import MessageType, parse, read
from harp.io import MessageType, read
from harp.model import BitMask, GroupMask, Model, PayloadMember, Register
from harp.schema import read_schema
from harp.typing import BufferLike
from harp.typing import _BufferLike, _FileLike


@dataclass
class _ReaderParams:
path: Path
base_path: Path
epoch: Optional[datetime] = None
keep_type: bool = False


class _ReadRegister(Protocol):
def __call__(
self,
file: Optional[Union[str, bytes, PathLike[Any], BinaryIO]] = None,
epoch: Optional[datetime] = None,
keep_type: bool = False,
) -> DataFrame: ...


class _ParseRegister(Protocol):
def __call__(
self,
buffer: BufferLike,
file_or_buf: Optional[Union[_FileLike, _BufferLike]] = None,
epoch: Optional[datetime] = None,
keep_type: bool = False,
) -> DataFrame: ...
Expand All @@ -46,17 +37,14 @@ def __call__(
class RegisterReader:
register: Register
read: _ReadRegister
parse: _ParseRegister

def __init__(
self,
register: Register,
read: _ReadRegister,
parse: _ParseRegister,
) -> None:
self.register = register
self.read = read
self.parse = parse


class RegisterMap(UserDict[str, RegisterReader]):
Expand Down Expand Up @@ -180,16 +168,16 @@ def parser(df: DataFrame):

def _create_register_reader(register: Register, params: _ReaderParams):
def reader(
file: Optional[Union[str, bytes, PathLike[Any], BinaryIO]] = None,
file_or_buf: Optional[Union[_FileLike, _BufferLike]] = None,
columns: Optional[Axes] = None,
epoch: Optional[datetime] = params.epoch,
keep_type: bool = params.keep_type,
):
if file is None:
file = f"{params.path}_{register.address}.bin"
if file_or_buf is None:
file_or_buf = f"{params.base_path}_{register.address}.bin"

data = read(
file,
file_or_buf,
address=register.address,
dtype=dtype(register.type),
length=register.length,
Expand All @@ -202,46 +190,23 @@ def reader(
return reader


def _create_register_parser(register: Register, params: _ReaderParams):
def parser(
buffer: BufferLike,
columns: Optional[Axes] = None,
epoch: Optional[datetime] = params.epoch,
keep_type: bool = params.keep_type,
):
return parse(
buffer,
address=register.address,
dtype=dtype(register.type),
length=register.length,
columns=columns,
epoch=epoch,
keep_type=keep_type,
)

return parser


def _create_register_handler(device: Model, name: str, params: _ReaderParams):
register = device.registers[name]
reader = _create_register_reader(register, params)
parser = _create_register_parser(register, params)

if register.maskType is not None:
key = register.maskType.root
bitMask = None if device.bitMasks is None else device.bitMasks.get(key)
if bitMask is not None:
bitmask_parser = _create_bitmask_parser(bitMask)
reader = _compose_parser(bitmask_parser, reader, params)
parser = _compose_parser(bitmask_parser, parser, params)
return RegisterReader(register, reader, parser)
return RegisterReader(register, reader)

groupMask = None if device.groupMasks is None else device.groupMasks.get(key)
if groupMask is not None:
groupmask_parser = _create_groupmask_parser(name, groupMask)
reader = _compose_parser(groupmask_parser, reader, params)
parser = _compose_parser(groupmask_parser, parser, params)
return RegisterReader(register, reader, parser)
return RegisterReader(register, reader)

if register.payloadSpec is not None:
member_parsers = [
Expand All @@ -253,17 +218,15 @@ def payload_parser(df: DataFrame):
return DataFrame({n: f(df) for n, f in member_parsers}, index=df.index)

reader = _compose_parser(payload_parser, reader, params)
parser = _compose_parser(payload_parser, parser, params)
return RegisterReader(register, reader, parser)
return RegisterReader(register, reader)

columns = (
[name]
if register.length is None or register.length == 1
else [f"{name}_{i}" for i in range(register.length)]
)
reader = partial(reader, columns=columns)
parser = partial(parser, columns=columns)
return RegisterReader(register, reader, parser)
return RegisterReader(register, reader)


def create_reader(
Expand Down
Loading