Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
# Environment files
.vscode
.venv

# Python temp files
__pycache__
*.pyc
*.egg-info
dist

# Data files
/data
# Byte-compiled / optimized / DLL files
__pycache__/

# Distribution / packaging
dist/
_version.py
*.egg-info/
*.egg

# IDE
.vscode/*

# misc
log*.txt
scratch/
scratch*.py

# Test
.coverage

# Environment
.venv/
uv.lock
134 changes: 130 additions & 4 deletions harp/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ class MessageType(IntEnum):


_SECONDS_PER_TICK = 32e-6
_PAYLOAD_TIMESTAMP_MASK = 0x10
_messagetypes = [type.name for type in MessageType]
_payloadtypes = {
_dtypefrompayloadtype = {
1: np.dtype(np.uint8),
2: np.dtype(np.uint16),
4: np.dtype(np.uint32),
Expand All @@ -36,6 +37,7 @@ class MessageType(IntEnum):
136: np.dtype(np.int64),
68: np.dtype(np.float32),
}
_payloadtypefromdtype = {v: k for k, v in _dtypefrompayloadtype.items()}


def read(
Expand Down Expand Up @@ -140,20 +142,20 @@ def _fromraw(
nrows = len(data) // stride
payloadtype = data[4]
payloadoffset = 5
if payloadtype & 0x10 != 0:
if payloadtype & _PAYLOAD_TIMESTAMP_MASK != 0:
seconds = np.ndarray(nrows, dtype=np.uint32, buffer=data, offset=payloadoffset, strides=stride)
payloadoffset += 4
micros = np.ndarray(nrows, dtype=np.uint16, buffer=data, offset=payloadoffset, strides=stride)
payloadoffset += 2
time = micros * _SECONDS_PER_TICK + seconds
payloadtype = payloadtype & ~np.uint8(0x10)
payloadtype = payloadtype & ~np.uint8(_PAYLOAD_TIMESTAMP_MASK)
if epoch is not None:
time = epoch + pd.to_timedelta(time, "s") # type: ignore
index = pd.Series(time)
index.name = "Time"

payloadsize = stride - payloadoffset - 1
payloadtype = _payloadtypes[payloadtype]
payloadtype = _dtypefrompayloadtype[payloadtype]
if dtype is not None and dtype != payloadtype:
raise ValueError(f"expected payload type {dtype} but got {payloadtype}")

Expand All @@ -176,3 +178,127 @@ def _fromraw(
msgtype = pd.Categorical.from_codes(msgtype, categories=_messagetypes) # type: ignore
result[MessageType.__name__] = msgtype
return result


def write(
file: Union[str, bytes, PathLike[Any], BinaryIO],
data: pd.DataFrame,
address: int,
dtype: Optional[np.dtype] = None,
port: Optional[int] = None,
epoch: Optional[datetime] = None,
message_type: Optional[MessageType] = None,
):
"""Write single-register Harp data to the specified file.

Parameters
----------
file
Open file object or filename where to store binary data from
a single device register.
data
Pandas data frame containing message payload.
address
Register address used to identify all formatted Harp messages.
dtype
Data type of the register payload. If specified, all data will
be converted before formatting the binary payload.
port
Optional port value used for all formatted Harp messages.
epoch
Reference datetime at which time zero begins. If specified,
the input data frame must have a datetime index.
message_type
Optional message type used for all formatted Harp messages.
If not specified, data must contain a MessageType column.
"""
buffer = format(data, address, dtype, port, epoch, message_type)
buffer.tofile(file)


def format(
data: pd.DataFrame,
address: int,
dtype: Optional[np.dtype] = None,
port: Optional[int] = None,
epoch: Optional[datetime] = None,
message_type: Optional[MessageType] = None,
) -> npt.NDArray[np.uint8]:
"""Format single-register Harp data as a flat binary buffer.

Parameters
----------
data
Pandas data frame containing message payload.
address
Register address used to identify all formatted Harp messages.
dtype
Data type of the register payload. If specified, all data will
be converted before formatting the binary payload.
port
Optional port value used for all formatted Harp messages.
epoch
Reference datetime at which time zero begins. If specified,
the input data frame must have a datetime index.
message_type
Optional message type used for all formatted Harp messages.
If not specified, data must contain a MessageType column.

Returns
-------
An array object containing message data formatted according
to the Harp binary protocol.
"""
if len(data) == 0:
return np.empty(0, dtype=np.uint8)

if "MessageType" in data.columns:
msgtype = data["MessageType"].cat.codes
payload = data[data.columns.drop("MessageType")].values
elif message_type is not None:
msgtype = message_type
payload = data.values
else:
raise ValueError(f"message type must be specified either in the data or as argument")

time = data.index
is_timestamped = True
if epoch is not None:
if not isinstance(time, pd.DatetimeIndex):
raise ValueError(f"expected datetime index to encode with epoch but got {time.inferred_type}")
time = (time - epoch).total_seconds()
elif isinstance(time, pd.RangeIndex):
is_timestamped = False

if dtype is not None:
payload = payload.astype(dtype, copy=False)

if port is None:
port = 255

payloadtype = _payloadtypefromdtype[payload.dtype]
payloadlength = payload.shape[1] * payload.dtype.itemsize
stride = payloadlength + 6
if is_timestamped:
payloadtype |= _PAYLOAD_TIMESTAMP_MASK
stride += 6

nrows = len(data)
buffer = np.empty((nrows, stride), dtype=np.uint8)
buffer[:, 0] = msgtype
buffer[:, 1:5] = [stride - 2, address, port, payloadtype]

payloadoffset = 5
if is_timestamped:
seconds = time.astype(np.uint32)
micros = np.around(((time - seconds) / _SECONDS_PER_TICK).values).astype(np.uint16)
buffer[:, 5:9] = np.ndarray((nrows, 4), dtype=np.uint8, buffer=seconds.values)
buffer[:, 9:11] = np.ndarray((nrows, 2), dtype=np.uint8, buffer=micros)
payloadoffset += 6

payloadstop = payloadoffset + payloadlength
buffer[:, payloadoffset:payloadstop] = np.ndarray(
(nrows, payloadlength), dtype=np.uint8, buffer=np.ascontiguousarray(payload)
)
buffer[:, -1] = np.sum(buffer[:, 0:-1], axis=1, dtype=np.uint8)
return buffer.reshape(-1)
3 changes: 3 additions & 0 deletions harp/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@


def _read_common_registers() -> Registers:
if __package__ is None:
raise ValueError("__package__ is None: unable to read common registers")

file = resources.files(__package__) / "common.yml"
with file.open("r") as fileIO:
return parse_yaml_raw_as(Registers, fileIO.read())
Expand Down
25 changes: 24 additions & 1 deletion tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pytest
from pytest import mark

from harp.io import MessageType, parse, read
from harp.io import MessageType, format, parse, read
from tests.params import DataFileParam

testdata = [
Expand Down Expand Up @@ -63,3 +63,26 @@ def test_read(dataFile: DataFileParam):
if dataFile.expected_cols:
for col in dataFile.expected_cols:
assert col in data.columns


writedata = [
DataFileParam(path="data/device_0.bin", expected_rows=1, expected_address=0, keep_type=True),
]


@mark.parametrize("dataFile", writedata)
def test_write(dataFile: DataFileParam):
if dataFile.expected_address is None:
raise AssertionError("expected address must be defined for all write tests")

buffer = np.fromfile(dataFile.path, np.uint8)
data = parse(
buffer,
address=dataFile.expected_address,
dtype=dataFile.expected_dtype,
length=dataFile.expected_length,
keep_type=dataFile.keep_type,
)
assert len(data) == dataFile.expected_rows
write_buffer = format(data, address=dataFile.expected_address)
assert np.array_equal(buffer, write_buffer)