diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index e688093..8aaf0e5 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -22,7 +22,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - pip install .[test,apps] + pip install .[test] - name: Test with pytest run: | coverage run diff --git a/.gitignore b/.gitignore index 4cb6761..d496d21 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ __pycache__/ *.swp *.py[cod] .cache +.vscode # packaging related dist/ diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 9ee2227..e0fcec7 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -18,7 +18,6 @@ python: path: . extra_requirements: - test - - apps - requirements: docs/requirements.txt # Build documentation in the "docs/" directory with Sphinx diff --git a/README.md b/README.md index 2dca188..dea36bb 100644 --- a/README.md +++ b/README.md @@ -12,10 +12,26 @@ freely under the terms GNU Lesser GPL v3 License. This module follows the SigMF specification [html](https://sigmf.org/)/[pdf](https://sigmf.github.io/SigMF/sigmf-spec.pdf) from the [spec repository](https://github.com/sigmf/SigMF). -To install the latest PyPI release, install from pip: +### Install ```bash pip install sigmf ``` -**[Please visit the documentation for examples & more info.](https://sigmf.readthedocs.io/en/latest/)** +### Read SigMF + +```python +import sigmf + +# read SigMF recording +meta = sigmf.fromfile("recording.sigmf-meta") +samples = meta[0:1024] # get first 1024 samples + +# read other formats containing RF time series as SigMF +meta = sigmf.fromfile("recording.wav") # WAV +meta = sigmf.fromfile("recording.cdif") # BLUE / Platinum +``` + +### Docs + +**[Please visit our documentation for full API reference and more info.](https://sigmf.readthedocs.io/en/latest/)** diff --git a/docs/Makefile b/docs/Makefile index d0c3cbf..9071ea0 100644 --- a/docs/Makefile +++ b/docs/Makefile @@ -12,7 +12,11 @@ BUILDDIR = build help: @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) -.PHONY: help Makefile +clean: + rm -rf "$(BUILDDIR)" + rm -rf "$(SOURCEDIR)/_autosummary" + +.PHONY: help clean Makefile # Catch-all target: route all unknown targets to Sphinx using the new # "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). diff --git a/docs/source/api.rst b/docs/source/api.rst index 2c3bddb..ae52a37 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -7,12 +7,13 @@ SigMF API :template: custom-module-template.rst :recursive: - sigmf.apps.convert_wav sigmf.archive sigmf.archivereader + sigmf.convert.blue + sigmf.convert.wav sigmf.error sigmf.schema - sigmf.sigmf_hash + sigmf.hashing sigmf.sigmffile sigmf.utils sigmf.validate diff --git a/docs/source/converters.rst b/docs/source/converters.rst new file mode 100644 index 0000000..7dd9dfb --- /dev/null +++ b/docs/source/converters.rst @@ -0,0 +1,151 @@ +========== +Converters +========== + +The SigMF Python library includes converters to import data from various RF recording formats into SigMF. +Converters can create standard SigMF file pairs or Non-Conforming Datasets (NCDs) that reference the original files. + +Overview +-------- + +Conversion is available for: + +* **BLUE files** - MIDAS Blue and Platinum BLUE RF recordings (usually ``.cdif``) +* **WAV files** - Audio recordings (``.wav``) + +All converters return a :class:`~sigmf.SigMFFile` object with converted metadata. + + +Fromfile Auto-Detection +~~~~~~~~~~~~~~~~~~~~~~~ + +The :func:`~sigmf.sigmffile.fromfile` function automatically detects file formats and creates Non-Conforming Datasets: + +.. code-block:: python + + import sigmf + + # auto-detect and create NCD for any supported format + meta = sigmf.fromfile("recording.cdif") # BLUE file + meta = sigmf.fromfile("recording.wav") # WAV file + meta = sigmf.fromfile("recording.sigmf") # SigMF archive + + all_samples = meta.read_samples() + sample_rate = meta.sample_rate + + +Python API +~~~~~~~~~~~ + +For programmatic access, use the individual converter functions directly: + +.. code-block:: python + + from sigmf.convert.wav import wav_to_sigmf + from sigmf.convert.blue import blue_to_sigmf + + # convert WAV to SigMF archive + _ = wav_to_sigmf(wav_path="recording.wav", out_path="recording", create_archive=True) + + # convert BLUE to SigMF pair and return metadata for new files + meta = blue_to_sigmf(blue_path="recording.cdif", out_path="recording") + + +Command Line Usage +~~~~~~~~~~~~~~~~~~ + +Converters are accessed through a unified command-line interface that automatically detects file formats: + +.. code-block:: bash + + # unified converter + sigmf_convert input_file output_file + + # examples + sigmf_convert recording.cdif recording.sigmf + sigmf_convert recording.wav recording.sigmf + +The converter uses magic byte detection to automatically identify BLUE and WAV file formats. +No need to remember format-specific commands! + + +Output Options +~~~~~~~~~~~~~~ + +The unified converter supports multiple output modes: + +.. code-block:: bash + + # standard conversion (creates out.sigmf-data and out.sigmf-meta files) + sigmf_convert in.wav out + + # archive mode (creates single out.sigmf archive) + sigmf_convert in.wav out --archive + + # non-conforming dataset (creates out.sigmf-meta only, references original file) + sigmf_convert in.wav out --ncd + + # extra verbose output + sigmf_convert in.wav out -vv + +**Important**: When using ``--ncd``, the input and output files must be in the same directory. +This ensures proper relative path references in the metadata. + + +BLUE Converter +-------------- + +The BLUE converter handles CDIF (.cdif) recordings while placing BLUE header information into the following global fields: + +* ``blue:fixed`` - Fixed header information (at start of file). +* ``blue:adjunct`` - Adjunct header information (after fixed header). +* ``blue:extended`` - Extended header information (at end of file). Note any duplicate fields will have a suffix like ``_1``, ``_2``, etc appended. +* ``blue:keywords`` - User-defined key-value pairs. + +.. autofunction:: sigmf.convert.blue.blue_to_sigmf + +Examples +~~~~~~~~ + +.. code-block:: python + + from sigmf.convert.blue import blue_to_sigmf + + # standard conversion + meta = blue_to_sigmf(blue_path="recording.cdif", out_path="recording") + + # create NCD automatically (metadata-only, references original file) but don't save any output file + meta = blue_to_sigmf(blue_path="recording.cdif") + + # access standard SigMF data & metadata + all_samples = meta.read_samples() + sample_rate = meta.sample_rate + + # access BLUE-specific metadata + blue_type = meta.get_global_field("blue:fixed")["type"] # e.g., 1000 + blue_version = meta.get_global_field("blue:keywords")["IO"] # e.g., "X-Midas" + + +WAV Converter +------------- + +Converts WAV audio recordings to SigMF format. + +.. autofunction:: sigmf.convert.wav.wav_to_sigmf + +Examples +~~~~~~~~ + +.. code-block:: python + + from sigmf.convert.wav import wav_to_sigmf + + # standard conversion + meta = wav_to_sigmf(wav_path="recording.wav", out_path="recording") + + # create NCD automatically (metadata-only, references original file) + meta = wav_to_sigmf(wav_path="recording.wav") + + # access standard SigMF data & metadata + all_samples = meta.read_samples() + sample_rate_hz = meta.sample_rate \ No newline at end of file diff --git a/docs/source/developers.rst b/docs/source/developers.rst index fd323ed..268c713 100644 --- a/docs/source/developers.rst +++ b/docs/source/developers.rst @@ -60,9 +60,9 @@ To build the docs and host locally: .. code-block:: console $ cd docs + $ make clean $ make html - $ cd build/html/ - $ python3 -m http.server + $ python3 -m http.server --directory build/html/ -------------- Find an Issue? diff --git a/docs/source/index.rst b/docs/source/index.rst index f845252..d541a70 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -8,11 +8,12 @@ It offers a *simple* and *intuitive* API for Python developers. .. Note: The toolversion & specversion below are replaced dynamically during build. + The root __init__.py file is used as the sole source of truth for these values. This documentation is for version |toolversion| of the library, which is compatible with version |specversion| of the SigMF specification. -To get started, see the :doc:`quickstart` section or learn how to :ref:`install` the library. +To get started, see `quickstart`. ----- @@ -23,6 +24,7 @@ To get started, see the :doc:`quickstart` section or learn how to :ref:`install` quickstart advanced + converters developers .. toctree:: diff --git a/pyproject.toml b/pyproject.toml index cac61a3..09f4566 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,7 +33,7 @@ dependencies = [ [project.scripts] sigmf_validate = "sigmf.validate:main" - sigmf_convert_wav = "sigmf.apps.convert_wav:main [apps]" + sigmf_convert = "sigmf.convert.__main__:main" [project.optional-dependencies] test = [ "pylint", @@ -41,9 +41,6 @@ dependencies = [ "pytest-cov", "hypothesis", # next-gen testing framework ] - apps = [ - "scipy", # for wav i/o - ] [tool.setuptools] packages = ["sigmf"] @@ -106,6 +103,6 @@ legacy_tox_ini = ''' [testenv] usedevelop = True - deps = .[test,apps] + deps = .[test] commands = coverage run ''' diff --git a/sigmf/__init__.py b/sigmf/__init__.py index 1987443..9050252 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -5,7 +5,7 @@ # SPDX-License-Identifier: LGPL-3.0-or-later # version of this python module -__version__ = "1.5.1" +__version__ = "1.6.0" # matching version of the SigMF specification __specification__ = "1.2.6" diff --git a/sigmf/apps/convert_wav.py b/sigmf/apps/convert_wav.py deleted file mode 100755 index c2f1f2e..0000000 --- a/sigmf/apps/convert_wav.py +++ /dev/null @@ -1,103 +0,0 @@ -# Copyright: Multiple Authors -# -# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python -# -# SPDX-License-Identifier: LGPL-3.0-or-later - -"""converter for wav containers""" - -import argparse -import getpass -import logging -import tempfile -from datetime import datetime, timezone -from os import PathLike -from pathlib import Path -from typing import Optional - -from scipy.io import wavfile - -from .. import SigMFFile -from .. import __version__ as toolversion -from ..sigmffile import get_sigmf_filenames -from ..utils import SIGMF_DATETIME_ISO8601_FMT, get_data_type_str - -log = logging.getLogger() - - -def convert_wav( - wav_path: str, - out_path: Optional[str] = None, - author: Optional[str] = None, -) -> PathLike: - """ - Read a wav and write a sigmf archive. - """ - wav_path = Path(wav_path) - wav_stem = wav_path.stem - samp_rate, wav_data = wavfile.read(wav_path) - - global_info = { - SigMFFile.AUTHOR_KEY: getpass.getuser() if author is None else author, - SigMFFile.DATATYPE_KEY: get_data_type_str(wav_data), - SigMFFile.DESCRIPTION_KEY: f"converted from {wav_path.name}", - SigMFFile.NUM_CHANNELS_KEY: 1 if len(wav_data.shape) < 2 else wav_data.shape[1], - SigMFFile.RECORDER_KEY: "Official SigMF wav converter", - SigMFFile.SAMPLE_RATE_KEY: samp_rate, - } - - modify_time = wav_path.lstat().st_mtime - wav_datetime = datetime.fromtimestamp(modify_time, tz=timezone.utc) - - capture_info = { - SigMFFile.START_INDEX_KEY: 0, - SigMFFile.DATETIME_KEY: wav_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT), - } - - temp_dir = Path(tempfile.mkdtemp()) - if out_path is None: - # extension will be changed - out_path = Path(wav_stem) - else: - out_path = Path(out_path) - filenames = get_sigmf_filenames(out_path) - - data_path = temp_dir / filenames["data_fn"] - wav_data.tofile(data_path) - - meta = SigMFFile(data_file=data_path, global_info=global_info) - meta.add_capture(0, metadata=capture_info) - log.debug("created %r", meta) - - arc_path = filenames["archive_fn"] - meta.tofile(arc_path, toarchive=True) - log.info("wrote %s", arc_path) - return arc_path - - -def main() -> None: - """ - entry-point for sigmf_convert_wav - """ - parser = argparse.ArgumentParser(description="Convert wav to sigmf archive.") - parser.add_argument("input", type=str, help="wav path") - parser.add_argument("--author", type=str, default=None, help=f"set {SigMFFile.AUTHOR_KEY} metadata") - parser.add_argument("-v", "--verbose", action="count", default=0) - parser.add_argument("--version", action="version", version=f"%(prog)s v{toolversion}") - args = parser.parse_args() - - level_lut = { - 0: logging.WARNING, - 1: logging.INFO, - 2: logging.DEBUG, - } - logging.basicConfig(level=level_lut[min(args.verbose, 2)]) - - _ = convert_wav( - wav_path=args.input, - author=args.author, - ) - - -if __name__ == "__main__": - main() diff --git a/sigmf/apps/__init__.py b/sigmf/convert/__init__.py similarity index 100% rename from sigmf/apps/__init__.py rename to sigmf/convert/__init__.py diff --git a/sigmf/convert/__main__.py b/sigmf/convert/__main__.py new file mode 100644 index 0000000..937c2b3 --- /dev/null +++ b/sigmf/convert/__main__.py @@ -0,0 +1,106 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Unified converter for non-SigMF file formats""" + +import argparse +import logging +import textwrap +from pathlib import Path + +from .. import __version__ as toolversion +from ..error import SigMFConversionError +from ..utils import get_magic_bytes +from .blue import blue_to_sigmf +from .wav import wav_to_sigmf + + +def main() -> None: + """ + Unified entry-point for SigMF conversion of non-SigMF recordings. + + This command-line interface converts various non-SigMF file formats into SigMF-compliant datasets. + It currently supports WAV and BLUE/Platinum file formats. + The converter detects the file type based on magic bytes and invokes the appropriate conversion function. + + By default it will output a SigMF pair (.sigmf-meta and .sigmf-data). + + Converter Processing Pattern + ---------------------------- + if out_path is None: + create_ncd = True + + if create_ncd: + + if out_path: + + return SigMFFile + if create_archive: + with TemporaryDirectory() as temp_dir: + + + else: + + + return SigMFFile + """ + parser = argparse.ArgumentParser( + description=textwrap.dedent(main.__doc__), + formatter_class=argparse.RawDescriptionHelpFormatter, + prog="sigmf_convert", + ) + parser.add_argument("input", type=str, help="Input recording path") + parser.add_argument("output", type=str, help="Output SigMF path (no extension)") + parser.add_argument("-v", "--verbose", action="count", default=0, help="Increase verbosity level") + exclusive_group = parser.add_mutually_exclusive_group() + exclusive_group.add_argument("-a", "--archive", action="store_true", help="Output .sigmf archive only") + exclusive_group.add_argument( + "--ncd", action="store_true", help="Output .sigmf-meta only and process as a Non-Conforming Dataset (NCD)" + ) + parser.add_argument("--version", action="version", version=f"%(prog)s v{toolversion}") + args = parser.parse_args() + + level_lut = { + 0: logging.WARNING, + 1: logging.INFO, + 2: logging.DEBUG, + } + logging.basicConfig(level=level_lut[min(args.verbose, 2)]) + + input_path = Path(args.input) + output_path = Path(args.output) + + # for ncd check that input & output files are in same directory + if args.ncd and input_path.parent.resolve() != output_path.parent.resolve(): + raise SigMFConversionError( + f"NCD files must be in the same directory as input file. " + f"Input: {input_path.parent.resolve()}, Output: {output_path.parent.resolve()}" + ) + + # check that the output path is a file and not a directory + if output_path.is_dir(): + raise SigMFConversionError(f"Output path must be a filename, not a directory: {output_path}") + + # detect file type using magic bytes (same logic as fromfile()) + magic_bytes = get_magic_bytes(input_path, count=4, offset=0) + + if magic_bytes == b"RIFF": + # WAV file + _ = wav_to_sigmf(wav_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd) + + elif magic_bytes == b"BLUE": + # BLUE file + _ = blue_to_sigmf(blue_path=input_path, out_path=output_path, create_archive=args.archive, create_ncd=args.ncd) + + else: + raise SigMFConversionError( + f"Unsupported file format. Magic bytes: {magic_bytes}. " + f"Supported formats for conversion are WAV and BLUE/Platinum." + ) + + +if __name__ == "__main__": + main() diff --git a/sigmf/convert/blue.py b/sigmf/convert/blue.py new file mode 100644 index 0000000..8c4173d --- /dev/null +++ b/sigmf/convert/blue.py @@ -0,0 +1,852 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +""" +X-Midas BLUE File converter. +This script reads and parses the HCB (Header Control Block) and Extended Headers. +It supports different file types and extracts metadata accordingly. +Converts the extracted metadata into SigMF format. +""" + +import base64 +import getpass +import io +import logging +import struct +import tempfile +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional, Tuple + +import numpy as np +from packaging.version import InvalidVersion, Version + +from .. import __version__ as toolversion +from ..error import SigMFConversionError +from ..sigmffile import SigMFFile, fromfile, get_sigmf_filenames +from ..utils import SIGMF_DATETIME_ISO8601_FMT + +log = logging.getLogger() + +# fmt: off +FIXED_LAYOUT = [ + # Fixed Header definitions: (key, offset, size, fmt, description) up to adjunct + ("version", 0, 4, "4s", "Header version"), + ("head_rep", 4, 4, "4s", "Header representation"), + ("data_rep", 8, 4, "4s", "Data representation"), + ("detached", 12, 4, "i", "Detached header"), + ("protected", 16, 4, "i", "Protected from overwrite"), + ("pipe", 20, 4, "i", "Pipe mode (N/A)"), + ("ext_start", 24, 4, "i", "Extended header start (512-byte blocks)"), + ("ext_size", 28, 4, "i", "Extended header size in bytes"), + ("data_start",32, 8, "d", "Data start in bytes"), + ("data_size", 40, 8, "d", "Data size in bytes"), + ("type", 48, 4, "i", "File type code"), + ("format", 52, 2, "2s", "2 Letter data format code"), + ("flagmask", 54, 2, "h", "16-bit flagmask"), + ("timecode", 56, 8, "d", "Time code field"), + ("inlet", 64, 2, "h", "Inlet owner"), + ("outlets", 66, 2, "h", "Number of outlets"), + ("outmask", 68, 4, "i", "Outlet async mask"), + ("pipeloc", 72, 4, "i", "Pipe location"), + ("pipesize", 76, 4, "i", "Pipe size in bytes"), + ("in_byte", 80, 8, "d", "Next input byte"), + ("out_byte", 88, 8, "d", "Next out byte (cumulative)"), + ("outbytes", 96, 64, "8d", "Next out byte (each outlet)"), + ("keylength", 160, 4, "i", "Length of keyword string"), + ("keywords", 164, 92, "92s", "User defined keyword string"), + # Adjunct starts at byte 256 after this +] +# fmt: on + +HEADER_SIZE_BYTES = 512 +BLOCK_SIZE_BYTES = 512 + +TYPE_MAP = { + # BLUE code to numpy dtype + "A": np.dtype("S1"), # ASCII character + "B": np.int8, + "I": np.int16, + "L": np.int32, + "X": np.int64, + "F": np.float32, + "D": np.float64, + # unsupported codes + # "P" : packed bits + # "N" : 4-bit integer +} + + +def blue_to_sigmf_type_str(h_fixed): + """ + Convert BLUE format code to SigMF datatype string. + + Parameters + ---------- + h_fixed : dict + Fixed Header dictionary containing 'format' and 'data_rep' fields. + + Returns + ------- + str + SigMF datatype string (e.g., 'ci16_le', 'rf32_be'). + """ + # extract format code and endianness from header + format_code = h_fixed.get("format") + endianness = h_fixed.get("data_rep") + + # parse format code components + is_complex = format_code[0] == "C" + numpy_dtype = TYPE_MAP[format_code[1]] + + # compute everything from numpy dtype + dtype_obj = np.dtype(numpy_dtype) + bits = dtype_obj.itemsize * 8 # bytes to bits + + # infer sigmf type from numpy kind + sigmf_type = "i" if dtype_obj.kind in ("i", "u") else "f" + + # build datatype string + prefix = "c" if is_complex else "r" + datatype = f"{prefix}{sigmf_type}{bits}" + + # add endianness for types > 8 bits + if bits > 8: + endian_suffix = "_le" if endianness == "EEEI" else "_be" + datatype += endian_suffix + + return datatype + + +def detect_endian(data): + """ + Detect endianness of a Bluefile header. + + Parameters + ---------- + data : bytes + Raw header data. + + Returns + ------- + str + "<" for little-endian or ">" for big-endian. + + Raises + ------ + SigMFConversionError + If the endianness is unexpected. + """ + endianness = data[8:12].decode("ascii") + if endianness == "EEEI": + return "<" + elif endianness == "IEEE": + return ">" + else: + raise SigMFConversionError(f"Unsupported endianness: {endianness}") + + +def read_hcb(file_path): + """ + Read Header Control Block (HCB) from BLUE file. + + First 256 bytes contains fixed header, followed by 256 bytes of adjunct header. + + Parameters + ---------- + file_path : str + Path to the Blue file. + + Returns + ------- + h_fixed : dict + Fixed Header + h_keywords : dict + Custom User Keywords + h_adjunct : dict + Adjunct Header + + Raises + ------ + SigMFConversionError + If header cannot be parsed. + """ + with open(file_path, "rb") as handle: + header_bytes = handle.read(256) + + endian = detect_endian(header_bytes) + + # fixed header fields + h_fixed = {} + for key, offset, size, fmt, _ in FIXED_LAYOUT: + raw = header_bytes[offset : offset + size] + try: + val = struct.unpack(endian + fmt, raw)[0] + except struct.error: + raise SigMFConversionError(f"Failed to unpack field {key} with endian {endian}") + if isinstance(val, bytes): + val = val.decode("ascii", errors="replace") + h_fixed[key] = val + + # parse user keywords & decode standard keywords + h_keywords = {} + + for field in h_fixed["keywords"].split("\x00"): + if "=" in field: + key, value = field.split("=", 1) + h_keywords[key] = value + + # variable (adjunct) header parsing + if h_fixed["type"] in (1000, 1001): + h_adjunct = { + "xstart": struct.unpack(f"{endian}d", handle.read(8))[0], + "xdelta": struct.unpack(f"{endian}d", handle.read(8))[0], + "xunits": struct.unpack(f"{endian}i", handle.read(4))[0], + } + elif h_fixed["type"] == 2000: + h_adjunct = { + "xstart": struct.unpack(f"{endian}d", handle.read(8))[0], + "xdelta": struct.unpack(f"{endian}d", handle.read(8))[0], + "xunits": struct.unpack(f"{endian}i", handle.read(4))[0], + "subsize": struct.unpack(f"{endian}i", handle.read(4))[0], + "ystart": struct.unpack(f"{endian}d", handle.read(8))[0], + "ydelta": struct.unpack(f"{endian}d", handle.read(8))[0], + "yunits": struct.unpack(f"{endian}i", handle.read(4))[0], + } + else: + # read raw adjunct header as bytes and convert to base64 for JSON serialization + log.warning(f"Unknown BLUE file type {h_fixed['type']}, encoding adjunct header in metadata as base64.") + raw_adjunct = handle.read(256) + h_adjunct = {"raw_base64": base64.b64encode(raw_adjunct).decode("ascii")} + + try: + spec_str = "Unknown" + version = Version(h_keywords.get("VER", "0.0")) + if version.major == 1: + spec_str = f"BLUE {version}" + elif version.major == 2: + spec_str = f"Platinum {version}" + except InvalidVersion: + log.debug("Could not parse BLUE specification from VER keyword.") + pass + # h_fixed will contain number e.g. 1000, 1001, 2000, 2001 + log.info(f"Read {h_fixed['version']} type {h_fixed['type']} using {spec_str} specification.") + + validate_fixed(h_fixed) + validate_adjunct(h_adjunct) + + return h_fixed, h_keywords, h_adjunct + + +def read_extended_header(file_path, h_fixed): + """ + Read Extended Header from a BLUE file. + + Parameters + ---------- + file_path : str + Path to the BLUE file. + h_fixed : dict + Fixed Header containing 'ext_size' and 'ext_start'. + + Returns + ------- + list of dict + List of dictionaries containing parsed records. + + Raises + ------ + SigMFConversionError + If the extended header cannot be parsed. + """ + entries = [] + if h_fixed["ext_size"] <= 0: + return entries + endian = "<" if h_fixed.get("head_rep") == "EEEI" else ">" + with open(file_path, "rb") as handle: + handle.seek(int(h_fixed["ext_start"]) * BLOCK_SIZE_BYTES) + bytes_remaining = int(h_fixed["ext_size"]) + while bytes_remaining > 0: + lkey = struct.unpack(f"{endian}i", handle.read(4))[0] + lext = struct.unpack(f"{endian}h", handle.read(2))[0] + ltag = struct.unpack(f"{endian}b", handle.read(1))[0] + type_char = handle.read(1).decode("ascii", errors="replace") + + # get dtype and compute bytes per element + if type_char in TYPE_MAP: + dtype = TYPE_MAP[type_char] + bytes_per_element = np.dtype(dtype).itemsize + else: + # fallback for unknown types + dtype = np.dtype("S1") + bytes_per_element = 1 + + val_len = lkey - lext + val_count = val_len // bytes_per_element if bytes_per_element else 0 + + if type_char == "A": + raw = handle.read(val_len) + if len(raw) < val_len: + raise SigMFConversionError("Unexpected end of extended header") + value = raw.rstrip(b"\x00").decode("ascii", errors="replace") + else: + value = np.frombuffer(handle.read(val_len), dtype=dtype, count=val_count) + if value.size == 1: + val_item = value[0] + # handle bytes first (numpy.bytes_ is also np.generic) + if isinstance(val_item, bytes): + # handle bytes from S1 dtype - convert to base64 for JSON + value = base64.b64encode(val_item).decode("ascii") + elif isinstance(val_item, np.generic): + # convert numpy scalar to native python type + value = val_item.item() + else: + value = val_item + else: + value = value.tolist() + + tag = handle.read(ltag).decode("ascii", errors="replace") if ltag > 0 else "" + + total = 4 + 2 + 1 + 1 + val_len + ltag + pad = (8 - (total % 8)) % 8 + if pad: + handle.read(pad) + + entries.append({"tag": tag, "type": type_char, "value": value, "lkey": lkey, "lext": lext, "ltag": ltag}) + bytes_remaining -= lkey + + validate_extended_header(entries) + + return entries + + +def data_loopback(blue_path: Path, data_path: Path, h_fixed: dict) -> None: + """ + Write SigMF data file from BLUE file samples. + + Parameters + ---------- + blue_path : Path + Path to the BLUE file. + data_path : Path + Destination path for the SigMF dataset (.sigmf-data). + h_fixed : dict + Header Control Block dictionary. + + Returns + ------- + numpy.ndarray + Parsed samples. Empty array for zero-sample files. + """ + log.debug("parsing BLUE file data values") + + # use header data_size field instead of file size calculation + data_size_bytes = int(h_fixed.get("data_size", 0)) + fmt = h_fixed.get("format") + + log.debug(f"format: {fmt}, data_size from header: {data_size_bytes} bytes") + + # parse format code components + is_complex = fmt[0] == "C" + np_dtype = TYPE_MAP[fmt[1]] + + # calculate element size and count + elem_size = np.dtype(np_dtype).itemsize + elem_count = data_size_bytes // elem_size + + log.debug(f"elem_size: {elem_size}, elem_count: {elem_count}, is_complex: {is_complex}") + + # check for zero-sample file (metadata-only) + if elem_count == 0: + log.info("detected zero-sample BLUE file, creating metadata-only SigMF") + return + + # read raw samples + raw_samples = np.fromfile(blue_path, dtype=np_dtype, offset=HEADER_SIZE_BYTES, count=elem_count) + + if is_complex: + # check if data is already complex or needs deinterleaving + if np.iscomplexobj(raw_samples): + # already complex, no reassembly needed + samples = raw_samples + else: + # reassemble interleaved IQ samples + samples = raw_samples[::2] + 1j * raw_samples[1::2] + else: + # scalar data + samples = raw_samples + + # save out as SigMF IQ data file + samples.tofile(data_path) + log.info("wrote SigMF dataset to %s", data_path) + + +def _build_common_metadata( + h_fixed: dict, + h_keywords: dict, + h_adjunct: dict, + h_extended: list, + is_ncd: bool = False, + blue_file_name: str = None, + trailing_bytes: int = 0, +) -> Tuple[dict, dict]: + """ + Build common global_info and capture_info metadata for both standard and NCD SigMF files. + + Parameters + ---------- + h_fixed : dict + Fixed Header + h_keywords : dict + Custom User Keywords + h_adjunct : dict + Adjunct Header + h_extended : list of dict + Parsed extended header entries. + is_ncd : bool, optional + If True, adds NCD-specific fields. + blue_file_name : str, optional + Original BLUE file name (for NCD). + trailing_bytes : int, optional + Number of trailing bytes (for NCD). + + Returns + ------- + tuple[dict, dict] + (global_info, capture_info) dictionaries. + + Raises + ------ + SigMFConversionError + If SigMF spec compliance is violated. + """ + # helper to look up extended header values by tag + def get_tag(tag): + for entry in h_extended: + if entry["tag"] == tag: + return entry["value"] + return None + + # get sigmf datatype from blue format and endianness + datatype = blue_to_sigmf_type_str(h_fixed) + log.info(f"using SigMF datatype {datatype} for BLUE format {h_fixed['format']} {h_fixed['data_rep']}") + + # sample rate: prefer adjunct.xdelta, else extended header SAMPLE_RATE + if "xdelta" in h_adjunct: + sample_rate_hz = 1 / h_adjunct["xdelta"] + else: + sample_rate_hz = float(get_tag("SAMPLE_RATE")) + + if "outlets" in h_fixed and h_fixed["outlets"] > 0: + num_channels = int(h_fixed["outlets"]) + else: + num_channels = 1 + + # base global metadata + global_info = { + "core:author": getpass.getuser(), + SigMFFile.DATATYPE_KEY: datatype, + SigMFFile.RECORDER_KEY: f"Official SigMF BLUE converter", + SigMFFile.NUM_CHANNELS_KEY: num_channels, + SigMFFile.SAMPLE_RATE_KEY: sample_rate_hz, + SigMFFile.EXTENSIONS_KEY: [{"name": "blue", "version": "0.0.1", "optional": True}], + } + + # add NCD-specific fields + if is_ncd: + global_info[SigMFFile.DATASET_KEY] = blue_file_name + global_info[SigMFFile.TRAILING_BYTES_KEY] = trailing_bytes + + # merge HCB values into metadata + global_info["blue:fixed"] = h_fixed + global_info["blue:keywords"] = h_keywords + global_info["blue:adjunct"] = h_adjunct + + # merge extended header fields, handling duplicate keys + if h_extended: + extended = {} + tag_counts = {} + for entry in h_extended: + tag = entry.get("tag") + value = entry.get("value") + if hasattr(value, "item"): + value = value.item() + + # handle duplicate tags by numbering them + if tag in extended: + tag_counts[tag] = tag_counts.get(tag, 0) + 1 + numbered_tag = f"{tag}_{tag_counts[tag]}" + extended[numbered_tag] = value + else: + extended[tag] = value + global_info["blue:extended"] = extended + + # calculate blue start time + blue_start_time = float(h_fixed.get("timecode", 0)) + blue_start_time += h_adjunct.get("xstart", 0) + blue_start_time += float(h_keywords.get("TC_PREC", 0)) + + capture_info = {} + + if blue_start_time == 0: + log.warning("BLUE timecode is zero or missing; capture datetime metadata will be absent.") + else: + # timecode uses 1950-01-01 as epoch, datetime uses 1970-01-01 + blue_epoch = blue_start_time - 631152000 # seconds between 1950 and 1970 + blue_datetime = datetime.fromtimestamp(blue_epoch, tz=timezone.utc) + capture_info[SigMFFile.DATETIME_KEY] = blue_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT) + + if get_tag("RF_FREQ") is not None: + # it's possible other keys indicate tune frequency, but RF_FREQ is standard + capture_info[SigMFFile.FREQUENCY_KEY] = float(get_tag("RF_FREQ")) + + # validate SigMF spec compliance: metadata_only and dataset fields are mutually exclusive + if SigMFFile.METADATA_ONLY_KEY in global_info and SigMFFile.DATASET_KEY in global_info: + raise SigMFConversionError( + "SigMF spec violation: core:metadata_only MAY NOT be used in conjunction with " + "Non-Conforming Datasets or the core:dataset field" + ) + + return global_info, capture_info + + +def validate_file(blue_path: Path) -> None: + """ + Basic validation of the BLUE file. + + Parameters + ---------- + blue_path : Path + Path to the BLUE file. + + Raises + ------ + SigMFConversionError + If the file is abnormal. + """ + if blue_path.stat().st_size < 512: + raise SigMFConversionError("BLUE file is too small to contain required headers.") + + +def validate_fixed(h_fixed: dict) -> None: + """ + Check that Fixed Header contains minimum required fields. + + Parameters + ---------- + h_fixed : dict + Fixed Header dictionary. + + Raises + ------ + SigMFConversionError + If required fields are missing or invalid. + """ + required = ["version", "data_start", "data_size", "data_rep", "head_rep", "detached", "format", "type"] + for field in required: + if field not in h_fixed: + raise SigMFConversionError(f"Missing required Fixed Header field: {field}") + for rep_field in ["data_rep", "head_rep"]: + if h_fixed[rep_field] not in ("EEEI", "IEEE"): + raise SigMFConversionError(f"Invalid value for {rep_field}: {h_fixed[rep_field]}") + if h_fixed["data_size"] < 0: + raise SigMFConversionError(f"Invalid data_size: {h_fixed['data_size']} (must be >= 0)") + if len(h_fixed["format"]) != 2 or h_fixed["format"][0] not in "SC" or h_fixed["format"][1] not in TYPE_MAP: + raise SigMFConversionError(f"Unsupported data format: {h_fixed['format']}") + + +def validate_adjunct(adjunct: dict) -> None: + """ + Check that the Adjunct header contains minimum required fields. + """ + # validate xdelta (1 / samp_rate) if present + if "xdelta" in adjunct: + xdelta = adjunct["xdelta"] + if xdelta <= 0: + raise SigMFConversionError(f"Invalid adjunct xdelta time interval: {xdelta}") + + +def validate_extended_header(entries: list) -> None: + """ + Check that BLUE Extended Header contains minimum required fields. + + Parameters + ---------- + entries : list of dict + List of extended header entries. + + Raises + ------ + SigMFConversionError + If required fields are missing or invalid. + """ + # check for SAMPLE_RATE if present + for entry in entries: + if entry["tag"] == "SAMPLE_RATE": + sample_rate = float(entry["value"]) + if sample_rate <= 0: + raise SigMFConversionError(f"Invalid SAMPLE_RATE in extended header: {sample_rate}") + + +def construct_sigmf( + filenames: dict, + h_fixed: dict, + h_keywords: dict, + h_adjunct: dict, + h_extended: list, + is_metadata_only: bool = False, + create_archive: bool = False, +) -> SigMFFile: + """ + Built & write a SigMF object from BLUE metadata. + + Parameters + ---------- + filenames : dict + Mapping returned by get_sigmf_filenames containing destination paths. + h_fixed : dict + Fixed Header + h_keywords : dict + Custom User Keywords + h_adjunct : dict + Adjunct Header + h_extended : list of dict + Parsed extended header entries from read_extended_header(). + is_metadata_only : bool, optional + If True, creates a metadata-only SigMF file. + create_archive : bool, optional + When True, package output as SigMF archive instead of a meta/data pair. + + Returns + ------- + SigMFFile + SigMF object. + """ + # use shared helper to build common metadata + global_info, capture_info = _build_common_metadata(h_fixed, h_keywords, h_adjunct, h_extended) + + # set metadata-only flag for zero-sample files (only for non-NCD files) + if is_metadata_only: + # ensure we're not accidentally setting metadata_only for an NCD + if SigMFFile.DATASET_KEY in global_info: + raise ValueError( + "Cannot set metadata_only=True for Non-Conforming Dataset files. " + "Per SigMF spec, metadata_only MAY NOT be used with core:dataset field." + ) + global_info[SigMFFile.METADATA_ONLY_KEY] = True + + # for metadata-only files, don't specify data_file and skip checksum + if is_metadata_only: + meta = SigMFFile( + data_file=None, + global_info=global_info, + skip_checksum=True, + ) + meta.data_buffer = io.BytesIO() + else: + meta = SigMFFile( + data_file=filenames["data_fn"], + global_info=global_info, + ) + meta.add_capture(0, metadata=capture_info) + + if create_archive: + meta.tofile(filenames["archive_fn"], toarchive=True) + log.info("wrote SigMF archive to %s", filenames["archive_fn"]) + # metadata returned should be for this archive + meta = fromfile(filenames["archive_fn"]) + else: + meta.tofile(filenames["meta_fn"], toarchive=False) + log.info("wrote SigMF metadata to %s", filenames["meta_fn"]) + + log.debug("created %r", meta) + return meta + + +def construct_sigmf_ncd( + blue_path: Path, + h_fixed: dict, + h_keywords: dict, + h_adjunct: dict, + h_extended: list, + header_bytes: int, + trailing_bytes: int, +) -> SigMFFile: + """ + Construct Non-Conforming Dataset SigMF metadata for BLUE file. + + Parameters + ---------- + blue_path : Path + Path to the original BLUE file. + h_fixed : dict + Fixed Header + h_keywords : dict + Custom User Keywords + h_adjunct : dict + Adjunct Header + h_extended : list of dict + Parsed extended header entries from read_extended_header(). + header_bytes : int + Number of header bytes to skip. + trailing_bytes : int + Number of trailing bytes to ignore. + + Returns + ------- + SigMFFile + NCD SigMF object pointing to original BLUE file. + """ + # use shared helper to build common metadata, with NCD-specific additions + global_info, capture_info = _build_common_metadata( + h_fixed, + h_keywords, + h_adjunct, + h_extended, + is_ncd=True, + blue_file_name=blue_path.name, + trailing_bytes=trailing_bytes, + ) + + # add NCD-specific capture info + capture_info[SigMFFile.HEADER_BYTES_KEY] = header_bytes + + # create NCD metadata-only SigMF pointing to original file + meta = SigMFFile(global_info=global_info, skip_checksum=True) + meta.set_data_file(data_file=blue_path, offset=header_bytes, skip_checksum=True) + meta.data_buffer = io.BytesIO() + meta.add_capture(0, metadata=capture_info) + log.debug("created NCD SigMF: %r", meta) + + return meta + + +def blue_to_sigmf( + blue_path: str, + out_path: Optional[str] = None, + create_archive: bool = False, + create_ncd: bool = False, +) -> SigMFFile: + """ + Read a MIDAS Bluefile, optionally write SigMF, return associated SigMF object. + + Parameters + ---------- + blue_path : str + Path to the Blue file. + out_path : str, optional + Path to the output SigMF metadata file. + create_archive : bool, optional + When True, package output as a .sigmf archive. + create_ncd : bool, optional + When True, create Non-Conforming Dataset with header_bytes and trailing_bytes. + + Returns + ------- + SigMFFile + SigMF object, potentially as Non-Conforming Dataset. + """ + log.debug(f"read {blue_path}") + + # auto-enable NCD when no output path is specified + if out_path is None: + create_ncd = True + + blue_path = Path(blue_path) + if out_path is None: + base_path = blue_path + else: + base_path = Path(out_path) + + filenames = get_sigmf_filenames(base_path) + + # ensure output directory exists + filenames["base_fn"].parent.mkdir(parents=True, exist_ok=True) + + validate_file(blue_path) + + # read Header control block (HCB) to determine how to process the rest of the file + h_fixed, h_keywords, h_adjunct = read_hcb(blue_path) + + # read extended header + h_extended = read_extended_header(blue_path, h_fixed) + + # calculate NCD byte boundaries if requested + if create_ncd: + header_bytes = HEADER_SIZE_BYTES + int(h_fixed.get("ext_size", 0)) + + # for NCD, trailing_bytes = file_size - header_bytes - actual_data_size + file_size = blue_path.stat().st_size + actual_data_size = file_size - header_bytes + trailing_bytes = 0 # assume no trailing bytes for NCD unless file is smaller than expected + + log.debug( + f"BLUE NCD: file_size={file_size}, header_bytes={header_bytes}, actual_data_size={actual_data_size}, trailing_bytes={trailing_bytes}" + ) + + # check if this is a zero-sample (metadata-only) file + data_size_bytes = int(h_fixed.get("data_size", 0)) + metadata_only = data_size_bytes == 0 + + # handle NCD case + if create_ncd: + # create metadata-only SigMF for NCD pointing to original file + ncd_meta = construct_sigmf_ncd( + blue_path=blue_path, + h_fixed=h_fixed, + h_keywords=h_keywords, + h_adjunct=h_adjunct, + h_extended=h_extended, + header_bytes=header_bytes, + trailing_bytes=trailing_bytes, + ) + + # write NCD metadata to specified output path if provided + if out_path is not None: + ncd_meta.tofile(filenames["meta_fn"]) + log.info("wrote SigMF non-conforming metadata to %s", filenames["meta_fn"]) + + return ncd_meta + + with tempfile.TemporaryDirectory() as temp_dir: + if not metadata_only: + if create_archive: + # for archives, write data to a temporary file that will be cleaned up + data_path = Path(temp_dir) / filenames["data_fn"].name + filenames["data_fn"] = data_path # update path for construct_sigmf + else: + # for file pairs, write to the final destination + data_path = filenames["data_fn"] + data_loopback(blue_path, data_path, h_fixed) + else: + log.info("skipping data file creation for zero-sample BLUE file") + + # call the SigMF conversion for metadata generation + meta = construct_sigmf( + filenames=filenames, + h_fixed=h_fixed, + h_keywords=h_keywords, + h_adjunct=h_adjunct, + h_extended=h_extended, + is_metadata_only=metadata_only, + create_archive=create_archive, + ) + + log.debug(">>>>>>>>> Fixed Header") + for key, _, _, _, desc in FIXED_LAYOUT: + log.debug(f"{key:10s}: {h_fixed[key]!r} # {desc}") + + log.debug(">>>>>>>>> User Keywords") + log.debug(h_keywords) + + log.debug(">>>>>>>>> Adjunct Header") + log.debug(h_adjunct) + + log.debug(">>>>>>>>> Extended Header") + for entry in h_extended: + log.debug(f"{entry['tag']:20s}:{entry['value']}") + + return meta diff --git a/sigmf/convert/wav.py b/sigmf/convert/wav.py new file mode 100644 index 0000000..5d7d082 --- /dev/null +++ b/sigmf/convert/wav.py @@ -0,0 +1,217 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""converter for wav containers""" + +import io +import logging +import tempfile +import wave +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional + +import numpy as np + +from .. import SigMFFile +from .. import __version__ as toolversion +from .. import fromfile +from ..sigmffile import get_sigmf_filenames +from ..utils import SIGMF_DATETIME_ISO8601_FMT, get_data_type_str + +log = logging.getLogger() + + +def _calculate_wav_ncd_bytes(wav_path: Path) -> tuple: + """ + Calculate header_bytes and trailing_bytes for WAV NCD. + + Returns + ------- + tuple + (header_bytes, trailing_bytes) + """ + # use wave module to get basic info + with wave.open(str(wav_path), "rb") as wav_reader: + n_channels = wav_reader.getnchannels() + samp_width = wav_reader.getsampwidth() + n_frames = wav_reader.getnframes() + + # calculate sample data size in bytes + sample_bytes = n_frames * n_channels * samp_width + file_size = wav_path.stat().st_size + + # parse WAV file structure to find data chunk + with open(wav_path, "rb") as handle: + # skip RIFF header (12 bytes: 'RIFF' + size + 'WAVE') + handle.seek(12) + header_bytes = 12 + + # search for 'data' chunk + while header_bytes < file_size: + chunk_id = handle.read(4) + if len(chunk_id) != 4: + break + chunk_size = int.from_bytes(handle.read(4), "little") + + if chunk_id == b"data": + # found data chunk, header ends here + header_bytes += 8 # include chunk_id and chunk_size + break + + # skip this chunk + header_bytes += 8 + chunk_size + # ensure even byte boundary (WAV chunks are word-aligned) + if chunk_size % 2: + header_bytes += 1 + handle.seek(header_bytes) + + trailing_bytes = max(0, file_size - header_bytes - sample_bytes) + return header_bytes, trailing_bytes + + +def wav_to_sigmf( + wav_path: str, + out_path: Optional[str] = None, + create_archive: bool = False, + create_ncd: bool = False, +) -> SigMFFile: + """ + Read a wav, optionally write sigmf, return associated SigMF object. + + Parameters + ---------- + wav_path : str + Path to the WAV file. + out_path : str, optional + Path to the output SigMF metadata file. + create_archive : bool, optional + When True, package output as a .sigmf archive. + create_ncd : bool, optional + When True, create Non-Conforming Dataset with header_bytes and trailing_bytes. + + Returns + ------- + SigMFFile + SigMF object, potentially as Non-Conforming Dataset. + + Raises + ------ + wave.Error + If the wav file cannot be read. + """ + wav_path = Path(wav_path) + out_path = None if out_path is None else Path(out_path) + + # auto-enable NCD when no output path is specified + if out_path is None: + create_ncd = True + + # use built-in wave module exclusively for precise sample boundary detection + with wave.open(str(wav_path), "rb") as wav_reader: + n_channels = wav_reader.getnchannels() + samp_width = wav_reader.getsampwidth() + samp_rate = wav_reader.getframerate() + n_frames = wav_reader.getnframes() + np_dtype = f"int{samp_width * 8}" + + # for NCD support, calculate precise byte boundaries + if create_ncd: + header_bytes, trailing_bytes = _calculate_wav_ncd_bytes(wav_path) + log.debug(f"WAV NCD: header_bytes={header_bytes}, trailing_bytes={trailing_bytes}") + + # only read audio data if we're not creating NCD metadata-only + wav_data = None + if create_ncd: + # for NCD metadata-only, create dummy sample to get datatype + dummy_sample = np.array([0], dtype=np_dtype) + datatype_str = get_data_type_str(dummy_sample) + # don't read any wav_data + else: + # normal conversion: read the audio data + raw_data = wav_reader.readframes(n_frames) + wav_data = ( + np.frombuffer(raw_data, dtype=np_dtype).reshape(-1, n_channels) + if n_channels > 1 + else np.frombuffer(raw_data, dtype=np_dtype) + ) + datatype_str = get_data_type_str(wav_data) + + global_info = { + SigMFFile.DATATYPE_KEY: datatype_str, + SigMFFile.DESCRIPTION_KEY: f"converted from {wav_path.name}", + SigMFFile.NUM_CHANNELS_KEY: n_channels, + SigMFFile.RECORDER_KEY: "Official SigMF WAV converter", + SigMFFile.SAMPLE_RATE_KEY: samp_rate, + } + + modify_time = wav_path.lstat().st_mtime + wav_datetime = datetime.fromtimestamp(modify_time, tz=timezone.utc) + + capture_info = { + SigMFFile.DATETIME_KEY: wav_datetime.strftime(SIGMF_DATETIME_ISO8601_FMT), + } + + if create_ncd: + # NCD requires extra fields + global_info[SigMFFile.TRAILING_BYTES_KEY] = trailing_bytes + global_info[SigMFFile.DATASET_KEY] = wav_path.name + capture_info[SigMFFile.HEADER_BYTES_KEY] = header_bytes + + # create metadata-only SigMF for NCD pointing to original file + meta = SigMFFile(global_info=global_info) + meta.set_data_file(data_file=wav_path, offset=header_bytes) + meta.data_buffer = io.BytesIO() + meta.add_capture(0, metadata=capture_info) + + # write metadata file if output path specified + if out_path is not None: + filenames = get_sigmf_filenames(out_path) + output_dir = filenames["meta_fn"].parent + output_dir.mkdir(parents=True, exist_ok=True) + meta.tofile(filenames["meta_fn"], toarchive=False) + log.info("wrote SigMF non-conforming metadata to %s", filenames["meta_fn"]) + + log.debug("created %r", meta) + return meta + + if out_path is None: + base_path = wav_path.with_suffix(".sigmf") + else: + base_path = Path(out_path) + + filenames = get_sigmf_filenames(base_path) + + output_dir = filenames["meta_fn"].parent + output_dir.mkdir(parents=True, exist_ok=True) + + if create_archive: + # use temporary directory for data file when creating archive + with tempfile.TemporaryDirectory() as temp_dir: + data_path = Path(temp_dir) / filenames["data_fn"].name + wav_data.tofile(data_path) + + meta = SigMFFile(data_file=data_path, global_info=global_info) + meta.add_capture(0, metadata=capture_info) + + meta.tofile(filenames["archive_fn"], toarchive=True) + log.info("wrote SigMF archive to %s", filenames["archive_fn"]) + # metadata returned should be for this archive + meta = fromfile(filenames["archive_fn"]) + else: + # write separate meta and data files + data_path = filenames["data_fn"] + wav_data.tofile(data_path) + log.info("wrote SigMF dataset to %s", data_path) + + meta = SigMFFile(data_file=data_path, global_info=global_info) + meta.add_capture(0, metadata=capture_info) + + meta.tofile(filenames["meta_fn"], toarchive=False) + log.info("wrote SigMF metadata to %s", filenames["meta_fn"]) + + log.debug("created %r", meta) + return meta diff --git a/sigmf/error.py b/sigmf/error.py index 9f2564c..9a1ca5f 100644 --- a/sigmf/error.py +++ b/sigmf/error.py @@ -22,3 +22,7 @@ class SigMFAccessError(SigMFError): class SigMFFileError(SigMFError): """Exceptions related to reading or writing SigMF files or archives.""" + + +class SigMFConversionError(SigMFError): + """Exceptions related to converting to SigMF format.""" diff --git a/sigmf/hashing.py b/sigmf/hashing.py new file mode 100644 index 0000000..3874729 --- /dev/null +++ b/sigmf/hashing.py @@ -0,0 +1,62 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Hashing Functions""" + +import hashlib +from pathlib import Path + + +def calculate_sha512(filename=None, fileobj=None): + """ + Calculate SHA512 hash of a dataset for integrity verification. + + The entire recording file should be hashed according to the SigMF specification. + + Parameters + ---------- + filename : str or Path, optional + Path to the file to hash. If provided, the file will be opened and hashed. + Cannot be used together with fileobj. + fileobj : file-like object, optional + An open file-like object (e.g., BytesIO) to hash. Must have read() and + seek() methods. Cannot be used together with filename. + + Returns + ------- + str + 128 character hex digest (512 bits). + + Raises + ------ + ValueError + If neither filename nor fileobj is provided. + """ + the_hash = hashlib.sha512() + bytes_read = 0 + + if filename is not None: + fileobj = open(filename, "rb") + bytes_to_hash = Path(filename).stat().st_size + elif fileobj is not None: + current_pos = fileobj.tell() + # seek to end + fileobj.seek(0, 2) + bytes_to_hash = fileobj.tell() + # reset to original position + fileobj.seek(current_pos) + else: + raise ValueError("Either filename or fileobj must be provided") + + while bytes_read < bytes_to_hash: + buff = fileobj.read(min(4096, (bytes_to_hash - bytes_read))) + the_hash.update(buff) + bytes_read += len(buff) + + if filename is not None: + fileobj.close() + + return the_hash.hexdigest() diff --git a/sigmf/sigmf_hash.py b/sigmf/sigmf_hash.py deleted file mode 100644 index 9482c35..0000000 --- a/sigmf/sigmf_hash.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright: Multiple Authors -# -# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python -# -# SPDX-License-Identifier: LGPL-3.0-or-later - -"""Hashing Functions""" - -import hashlib -from pathlib import Path - - -def calculate_sha512(filename=None, fileobj=None, offset=None, size=None): - """ - Return sha512 of file or fileobj. - """ - the_hash = hashlib.sha512() - bytes_to_hash = size - bytes_read = 0 - - if filename is not None: - fileobj = open(filename, "rb") - if size is None: - bytes_to_hash = Path(filename).stat().st_size - else: - fileobj.seek(offset) - - while bytes_read < bytes_to_hash: - buff = fileobj.read(min(4096, (bytes_to_hash - bytes_read))) - the_hash.update(buff) - bytes_read += len(buff) - - if filename is not None: - fileobj.close() - - return the_hash.hexdigest() diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index a72043e..56b9deb 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -15,7 +15,7 @@ import numpy as np -from . import __specification__, __version__, schema, sigmf_hash, validate +from . import __specification__, __version__, hashing, schema, validate from .archive import ( SIGMF_ARCHIVE_EXT, SIGMF_COLLECTION_EXT, @@ -23,8 +23,8 @@ SIGMF_METADATA_EXT, SigMFArchive, ) -from .error import SigMFAccessError, SigMFError, SigMFFileError -from .utils import dict_merge +from .error import SigMFAccessError, SigMFConversionError, SigMFError, SigMFFileError +from .utils import dict_merge, get_magic_bytes class SigMFMetafile: @@ -216,7 +216,8 @@ def __init__( if global_info is not None: self.set_global_info(global_info) if data_file is not None: - self.set_data_file(data_file, skip_checksum=skip_checksum, map_readonly=map_readonly) + offset = self._get_ncd_offset() + self.set_data_file(data_file, skip_checksum=skip_checksum, map_readonly=map_readonly, offset=offset) def __len__(self): return self._memmap.shape[0] @@ -401,6 +402,30 @@ def _is_conforming_dataset(self): # if we get here, the file exists and is conforming return True + def _get_ncd_offset(self): + """ + Detect Non-Conforming Dataset files and return the appropriate header offset. + + For NCD files that reference external non-SigMF files (e.g., WAV), the + core:header_bytes field indicates how many bytes to skip to reach the + actual sample data. + + Returns + ------- + int + Byte offset to apply when reading the dataset file. 0 for conforming datasets. + """ + if self._is_conforming_dataset(): + return 0 + + # check if this is an NCD with core:dataset and header_bytes + captures = self.get_captures() + dataset_field = self.get_global_field(self.DATASET_KEY) + if dataset_field and captures and self.HEADER_BYTES_KEY in captures[0]: + return captures[0][self.HEADER_BYTES_KEY] + + return 0 + def get_schema(self): """ Return a schema object valid for the current metadata @@ -633,6 +658,7 @@ def _count_samples(self): If there is no data file but there are annotations, use the sample_count from the annotation with the highest end index. If there are no annotations, use 0. + For complex data, a 'sample' includes both the real and imaginary part. """ if self.data_file is None and self.data_buffer is None: @@ -684,17 +710,9 @@ def calculate_hash(self): """ old_hash = self.get_global_field(self.HASH_KEY) if self.data_file is not None: - new_hash = sigmf_hash.calculate_sha512( - filename=self.data_file, - offset=self.data_offset, - size=self.data_size_bytes, - ) + new_hash = hashing.calculate_sha512(filename=self.data_file) else: - new_hash = sigmf_hash.calculate_sha512( - fileobj=self.data_buffer, - offset=self.data_offset, - size=self.data_size_bytes, - ) + new_hash = hashing.calculate_sha512(fileobj=self.data_buffer) if old_hash is not None: if old_hash != new_hash: raise SigMFFileError("Calculated file hash does not match associated metadata.") @@ -843,6 +861,8 @@ def read_samples(self, start_index=0, count=-1): """ if count == 0: raise IOError("Number of samples must be greater than zero, or -1 for all samples.") + elif count == -1: + count = self.sample_count - start_index elif start_index + count > self.sample_count: raise IOError("Cannot read beyond EOF.") if self.data_file is None and not isinstance(self.data_buffer, io.BytesIO): @@ -863,7 +883,6 @@ def _read_datafile(self, first_byte, nitems): is_fixedpoint_data = dtype["is_fixedpoint"] is_unsigned_data = dtype["is_unsigned"] data_type_in = dtype["sample_dtype"] - component_type_in = dtype["component_dtype"] component_size = dtype["component_size"] data_type_out = np.dtype("f4") if not self.is_complex_data else np.dtype("f4, f4") @@ -871,7 +890,10 @@ def _read_datafile(self, first_byte, nitems): if self.data_file is not None: fp = open(self.data_file, "rb") - fp.seek(first_byte, 0) + # account for data_offset when seeking (important for NCDs) + seek_position = first_byte + getattr(self, "data_offset", 0) + fp.seek(seek_position, 0) + data = np.fromfile(fp, dtype=data_type_in, count=nitems) elif self.data_buffer is not None: # handle offset for data_buffer like we do for data_file @@ -990,7 +1012,7 @@ def verify_stream_hashes(self) -> None: metafile_name = get_sigmf_filenames(stream.get("name"))["meta_fn"] metafile_path = self.base_path / metafile_name if Path.is_file(metafile_path): - new_hash = sigmf_hash.calculate_sha512(filename=metafile_path) + new_hash = hashing.calculate_sha512(filename=metafile_path) if old_hash != new_hash: raise SigMFFileError( f"Calculated file hash for {metafile_path} does not match collection metadata." @@ -1008,7 +1030,7 @@ def set_streams(self, metafiles) -> None: stream = { # name must be string here to be serializable later "name": str(get_sigmf_filenames(metafile)["base_fn"]), - "hash": sigmf_hash.calculate_sha512(filename=metafile_path), + "hash": hashing.calculate_sha512(filename=metafile_path), } streams.append(stream) else: @@ -1222,13 +1244,14 @@ def fromarchive(archive_path, dir=None, skip_checksum=False, autoscale=True): def fromfile(filename, skip_checksum=False, autoscale=True): """ - Creates and returns a SigMFFile or SigMFCollection instance with metadata loaded from the specified file. + Read a file as a SigMFFile or SigMFCollection. The file can be one of: - * A SigMF Metadata file (.sigmf-meta) - * A SigMF Dataset file (.sigmf-data) - * A SigMF Collection file (.sigmf-collection) - * A SigMF Archive file (.sigmf-archive) + * a SigMF Archive (.sigmf) + * a SigMF Metadata file (.sigmf-meta) + * a SigMF Dataset file (.sigmf-data) + * a SigMF Collection file (.sigmf-collection) + * a non-SigMF RF recording that can be converted (.wav, .cdif) Parameters ---------- @@ -1241,22 +1264,34 @@ def fromfile(filename, skip_checksum=False, autoscale=True): Returns ------- - object - SigMFFile with dataset & metadata or a SigMFCollection depending on file type. + SigMFFile | SigMFCollection + A SigMFFile or a SigMFCollection depending on file type. + + Raises + ------ + SigMFFileError + If the file cannot be read as any supported format. + SigMFConversionError + If auto-detection conversion fails. """ + file_path = Path(filename) fns = get_sigmf_filenames(filename) meta_fn = fns["meta_fn"] archive_fn = fns["archive_fn"] collection_fn = fns["collection_fn"] - # extract the extension to check whether we are dealing with an archive, collection, etc. - file_path = Path(filename) - ext = file_path.suffix + # extract the extension to check file type + ext = file_path.suffix.lower() + + # group SigMF extensions for cleaner checking + sigmf_extensions = (SIGMF_METADATA_EXT, SIGMF_DATASET_EXT, SIGMF_COLLECTION_EXT, SIGMF_ARCHIVE_EXT) - if (ext.lower().endswith(SIGMF_ARCHIVE_EXT) or not Path.is_file(meta_fn)) and Path.is_file(archive_fn): + # try SigMF archive + if (ext.endswith(SIGMF_ARCHIVE_EXT) or not Path.is_file(meta_fn)) and Path.is_file(archive_fn): return fromarchive(archive_fn, skip_checksum=skip_checksum, autoscale=autoscale) - if (ext.lower().endswith(SIGMF_COLLECTION_EXT) or not Path.is_file(meta_fn)) and Path.is_file(collection_fn): + # try SigMF collection + if (ext.endswith(SIGMF_COLLECTION_EXT) or not Path.is_file(meta_fn)) and Path.is_file(collection_fn): collection_fp = open(collection_fn, "rb") bytestream_reader = codecs.getreader("utf-8") mdfile_reader = bytestream_reader(collection_fp) @@ -1266,7 +1301,8 @@ def fromfile(filename, skip_checksum=False, autoscale=True): dir_path = meta_fn.parent return SigMFCollection(metadata=metadata, base_path=dir_path, skip_checksums=skip_checksum) - else: + # try standard SigMF metadata file + if Path.is_file(meta_fn): meta_fp = open(meta_fn, "rb") bytestream_reader = codecs.getreader("utf-8") mdfile_reader = bytestream_reader(meta_fp) @@ -1276,6 +1312,26 @@ def fromfile(filename, skip_checksum=False, autoscale=True): data_fn = get_dataset_filename_from_metadata(meta_fn, metadata) return SigMFFile(metadata=metadata, data_file=data_fn, skip_checksum=skip_checksum, autoscale=autoscale) + # try auto-detection for non-SigMF files only + if Path.is_file(file_path) and not ext.endswith(sigmf_extensions): + if not autoscale: + # TODO: allow autoscale=False for converters + warnings.warn("non-SigMF auto-detection conversion only supports autoscale=True; ignoring autoscale=False") + magic_bytes = get_magic_bytes(file_path, count=4, offset=0) + + if magic_bytes == b"RIFF": + from .convert.wav import wav_to_sigmf + + return wav_to_sigmf(file_path, create_ncd=True) + + elif magic_bytes == b"BLUE": + from .convert.blue import blue_to_sigmf + + return blue_to_sigmf(file_path, create_ncd=True) + + # if file doesn't exist at all or no valid files found, raise original error + raise SigMFFileError(f"Cannot read {filename} as SigMF or supported non-SigMF format.") + def get_sigmf_filenames(filename): """ @@ -1288,7 +1344,7 @@ def get_sigmf_filenames(filename): Returns ------- - dict with 'data_fn', 'meta_fn', and 'archive_fn' as keys. + dict with filename keys. """ stem_path = Path(filename) # If the path has a sigmf suffix, remove it. Otherwise do not remove the diff --git a/sigmf/utils.py b/sigmf/utils.py index 571a5e4..3c325c3 100644 --- a/sigmf/utils.py +++ b/sigmf/utils.py @@ -10,10 +10,11 @@ import sys from copy import deepcopy from datetime import datetime, timezone +from pathlib import Path import numpy as np -from .error import SigMFError +from .error import SigMFConversionError, SigMFError SIGMF_DATETIME_ISO8601_FMT = "%Y-%m-%dT%H:%M:%S.%fZ" @@ -112,3 +113,37 @@ def get_data_type_str(ray: np.ndarray) -> str: # only append endianness for types over 8 bits data_type_str += get_endian_str(ray) return data_type_str + + +def get_magic_bytes(file_path: Path, count: int = 4, offset: int = 0) -> bytes: + """ + Get magic bytes from a file to help identify file type. + + Parameters + ---------- + file_path : Path + Path to the file to read magic bytes from. + count : int, optional + Number of bytes to read. Default is 4. + offset : int, optional + Byte offset to start reading from. Default is 0. + + Returns + ------- + bytes + Magic bytes from the file. + + Raises + ------ + SigMFConversionError + If file cannot be read or is too small. + """ + try: + with open(file_path, "rb") as handle: + handle.seek(offset) + magic_bytes = handle.read(count) + if len(magic_bytes) < count: + raise SigMFConversionError(f"File {file_path} too small to read {count} magic bytes at offset {offset}") + return magic_bytes + except (IOError, OSError) as err: + raise SigMFConversionError(f"Cannot read magic bytes from {file_path}: {err}") diff --git a/tests/test_convert_blue.py b/tests/test_convert_blue.py new file mode 100644 index 0000000..ae4fb17 --- /dev/null +++ b/tests/test_convert_blue.py @@ -0,0 +1,81 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Tests for BLUE Converter""" + +import tempfile +import unittest +from pathlib import Path + +import numpy as np + +import sigmf +from sigmf.convert.blue import blue_to_sigmf + +from .test_convert_wav import _validate_ncd +from .testdata import NONSIGMF_ENV, NONSIGMF_REPO + + +class TestBlueWithNonSigMFRepo(unittest.TestCase): + """BLUE converter tests using external files""" + + def setUp(self) -> None: + """setup paths to blue files""" + self.tmp_dir = tempfile.TemporaryDirectory() + self.tmp_path = Path(self.tmp_dir.name) + if not NONSIGMF_REPO: + # skip test if environment variable not set + self.skipTest(f"Set {NONSIGMF_ENV} environment variable to path with BLUE files to run test.") + + # glob all files in blue/ directory + blue_dir = NONSIGMF_REPO / "blue" + self.blue_paths = [] + if blue_dir.exists(): + for ext in ["*.cdif", "*.tmp"]: + self.blue_paths.extend(blue_dir.glob(f"**/{ext}")) + if not self.blue_paths: + self.fail(f"No BLUE files (*.cdif, *.tmp) found in {blue_dir}.") + + def tearDown(self) -> None: + """clean up temporary directory""" + self.tmp_dir.cleanup() + + def test_sigmf_pair(self): + """test standard blue to sigmf conversion with file pairs""" + for blue_path in self.blue_paths: + print(blue_path) + sigmf_path = self.tmp_path / blue_path.stem + meta = blue_to_sigmf(blue_path=blue_path, out_path=sigmf_path) + self.assertIsInstance(meta, sigmf.SigMFFile) + if not meta.get_global_field("core:metadata_only"): + # check sample read consistency + np.testing.assert_allclose(meta.read_samples(count=10), meta[0:10], atol=1e-6) + + def test_sigmf_archive(self): + """test blue to sigmf conversion with archive output""" + for blue_path in self.blue_paths: + sigmf_path = self.tmp_path / f"{blue_path.stem}_archive" + meta = blue_to_sigmf(blue_path=blue_path, out_path=sigmf_path, create_archive=True) + self.assertIsInstance(meta, sigmf.SigMFFile) + if not meta.get_global_field("core:metadata_only"): + # check sample read consistency + np.testing.assert_allclose(meta.read_samples(count=10), meta[0:10], atol=1e-6) + + def test_create_ncd(self): + """test direct NCD conversion""" + for blue_path in self.blue_paths: + meta = blue_to_sigmf(blue_path=blue_path) + _validate_ncd(self, meta, blue_path) + + # test that data can be read if not metadata-only + if not meta.get_global_field("core:metadata_only"): + _ = meta.read_samples(count=10) + + def test_autodetect_ncd(self): + """test automatic NCD conversion""" + for blue_path in self.blue_paths: + meta = sigmf.fromfile(blue_path) + _validate_ncd(self, meta, blue_path) diff --git a/tests/test_convert_wav.py b/tests/test_convert_wav.py index 11ef52a..7c502ec 100644 --- a/tests/test_convert_wav.py +++ b/tests/test_convert_wav.py @@ -4,43 +4,156 @@ # # SPDX-License-Identifier: LGPL-3.0-or-later -"""Tests wav formatted audio conversion""" +"""Tests for WAV Converter""" -import os import tempfile +import unittest +import wave +from pathlib import Path import numpy as np -import pytest -from scipy.io import wavfile - -from sigmf.apps.convert_wav import convert_wav - - -def test_wav_to_sigmf_basic(): - """Basic smoke-test: convert a tiny WAV → SIGMF, assert file created.""" - fs = 48_000 - t = np.linspace(0, 0.1, int(fs * 0.1)) # 0.1 s - sine = np.sin(2 * np.pi * 1000 * t) - sine_int = (sine * 32767).astype(np.int16) - - # Create temp file and close it before use - with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_wav: - tmp_wav_path = tmp_wav.name - - # Write to the closed file - wavfile.write(tmp_wav_path, fs, sine_int) - tmp_sigmf = tmp_wav_path.replace(".wav", ".sigmf") - - try: - # Run converter - convert_wav(tmp_wav_path, tmp_sigmf) - - # Assert SIGMF file exists and non-zero - assert os.path.exists(tmp_sigmf), "SIGMF file not created" - assert os.path.getsize(tmp_sigmf) > 0, "SIGMF file is empty" - finally: - # Clean up both files - if os.path.exists(tmp_wav_path): - os.remove(tmp_wav_path) - if os.path.exists(tmp_sigmf): - os.remove(tmp_sigmf) + +import sigmf +from sigmf.convert.wav import wav_to_sigmf + +from .testdata import NONSIGMF_ENV, NONSIGMF_REPO + + +def _validate_ncd(test, meta, target_path): + """non-conforming dataset has a specific structure""" + test.assertEqual(str(meta.data_file), str(target_path), "Auto-detected NCD should point to original file") + test.assertIsInstance(meta, sigmf.SigMFFile) + + global_info = meta.get_global_info() + capture_info = meta.get_captures() + + # validate NCD SigMF spec compliance + test.assertGreater(len(capture_info), 0, "Should have at least one capture") + test.assertIn("core:header_bytes", capture_info[0]) + test.assertGreater(capture_info[0]["core:header_bytes"], 0, "Should have non-zero core:header_bytes field") + test.assertIn("core:trailing_bytes", global_info, "Should have core:trailing_bytes field.") + test.assertIn("core:dataset", global_info, "Should have core:dataset field.") + test.assertNotIn("core:metadata_only", global_info, "Should NOT have core:metadata_only field.") + + +class TestWAVConverter(unittest.TestCase): + """Create a realistic WAV file and test conversion methods.""" + + def setUp(self) -> None: + """temp WAV file with tone for testing""" + self.tmp_dir = tempfile.TemporaryDirectory() + self.tmp_path = Path(self.tmp_dir.name) + self.wav_path = self.tmp_path / "foo.wav" + samp_rate = 48000 + duration_s = 0.1 + ttt = np.linspace(0, duration_s, int(samp_rate * duration_s), endpoint=False) + freq = 440 # A4 note + self.audio_data = 0.5 * np.sin(2 * np.pi * freq * ttt) + # convert float audio to 16-bit PCM integer format + audio_int16 = (self.audio_data * 32767).astype(np.int16) + + # write wav file using built-in wave module + with wave.open(str(self.wav_path), "wb") as wav_file: + wav_file.setnchannels(1) # mono + wav_file.setsampwidth(2) # 16-bit = 2 bytes + wav_file.setframerate(samp_rate) + wav_file.writeframes(audio_int16.tobytes()) + + def tearDown(self) -> None: + """clean up temporary directory""" + self.tmp_dir.cleanup() + + def test_wav_to_sigmf_pair(self): + """test standard wav to sigmf conversion with file pairs""" + sigmf_path = self.tmp_path / "bar.tmp" + meta = wav_to_sigmf(wav_path=str(self.wav_path), out_path=str(sigmf_path)) + filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) + self.assertTrue(filenames["data_fn"].exists(), "dataset path missing") + self.assertTrue(filenames["meta_fn"].exists(), "metadata path missing") + # verify data + data = meta.read_samples() + self.assertGreater(len(data), 0, "Should read some samples") + # allow numerical differences due to PCM quantization + self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + + def test_wav_to_sigmf_archive(self): + """test wav to sigmf conversion with archive output""" + sigmf_path = self.tmp_path / "baz.ext" + meta = wav_to_sigmf(wav_path=str(self.wav_path), out_path=str(sigmf_path), create_archive=True) + filenames = sigmf.sigmffile.get_sigmf_filenames(sigmf_path) + self.assertTrue(filenames["archive_fn"].exists(), "archive path missing") + # verify data + data = meta.read_samples() + self.assertGreater(len(data), 0, "Should read some samples") + # allow numerical differences due to PCM quantization + self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + + def test_wav_to_sigmf_ncd(self): + """test wav to sigmf conversion as Non-Conforming Dataset""" + meta = wav_to_sigmf(wav_path=str(self.wav_path), create_ncd=True) + _validate_ncd(self, meta, self.wav_path) + + # verify data + data = meta.read_samples() + # allow numerical differences due to PCM quantization + self.assertGreater(len(data), 0, "Should read some samples") + self.assertTrue(np.allclose(self.audio_data, data, atol=1e-4)) + + +class TestWAVWithNonSigMFRepo(unittest.TestCase): + """Test WAV converter with real example files if available""" + + def setUp(self) -> None: + """setup paths to example wav files""" + self.tmp_dir = tempfile.TemporaryDirectory() + self.tmp_path = Path(self.tmp_dir.name) + if not NONSIGMF_REPO: + # skip test if environment variable not set + self.skipTest(f"Set {NONSIGMF_ENV} environment variable to path with WAV files to run test.") + + # glob all files in wav/ directory + wav_dir = NONSIGMF_REPO / "wav" + self.wav_paths = [] + if wav_dir.exists(): + self.wav_paths = list(wav_dir.glob("*.wav")) + if not self.wav_paths: + self.fail(f"No WAV files (*.wav) found in {wav_dir}.") + + def tearDown(self) -> None: + """clean up temporary directory""" + self.tmp_dir.cleanup() + + def test_sigmf_pair(self): + """test standard wav to sigmf conversion with file pairs""" + for wav_path in self.wav_paths: + sigmf_path = self.tmp_path / wav_path.stem + meta = wav_to_sigmf(wav_path=wav_path, out_path=sigmf_path) + self.assertIsInstance(meta, sigmf.SigMFFile) + if not meta.get_global_field("core:metadata_only"): + # check sample read consistency + np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) + + def test_sigmf_archive(self): + """test wav to sigmf conversion with archive output""" + for wav_path in self.wav_paths: + sigmf_path = self.tmp_path / f"{wav_path.stem}_archive" + meta = wav_to_sigmf(wav_path=wav_path, out_path=sigmf_path, create_archive=True) + self.assertIsInstance(meta, sigmf.SigMFFile) + if not meta.get_global_field("core:metadata_only"): + # check sample read consistency + np.testing.assert_array_equal(meta.read_samples(count=10), meta[0:10]) + + def test_create_ncd(self): + """test direct NCD conversion""" + for wav_path in self.wav_paths: + meta = wav_to_sigmf(wav_path=wav_path) + _validate_ncd(self, meta, wav_path) + + # test file read + _ = meta.read_samples(count=10) + + def test_autodetect_ncd(self): + """test automatic NCD conversion""" + for wav_path in self.wav_paths: + meta = sigmf.fromfile(wav_path) + _validate_ncd(self, meta, wav_path) diff --git a/tests/test_hashing.py b/tests/test_hashing.py new file mode 100644 index 0000000..c0c225b --- /dev/null +++ b/tests/test_hashing.py @@ -0,0 +1,75 @@ +# Copyright: Multiple Authors +# +# This file is part of sigmf-python. https://github.com/sigmf/sigmf-python +# +# SPDX-License-Identifier: LGPL-3.0-or-later + +"""Hashing Tests""" + +import io +import shutil +import tempfile +import unittest +from copy import deepcopy +from hashlib import sha512 +from pathlib import Path + +import numpy as np + +from sigmf import SigMFFile, hashing + +from .testdata import TEST_FLOAT32_DATA, TEST_METADATA + + +class TestHashCalculation(unittest.TestCase): + """Test hash calculation consistency across different SigMF formats.""" + + def setUp(self): + """Set up temporary directory for tests.""" + self.temp_dir = Path(tempfile.mkdtemp()) + + def tearDown(self): + """Clean up temporary directory.""" + shutil.rmtree(self.temp_dir) + + def test_ncd_hash_covers_entire_file(self): + """Test that non-conforming datasets hash the entire file including headers.""" + data_path = self.temp_dir / "ncd.bin" + with open(data_path, "wb") as handle: + # Create NCD file with header, data, and trailer + handle.write(b"\x00" * 64) # header + handle.write(TEST_FLOAT32_DATA.tobytes()) # sample data + handle.write(b"\xFF" * 32) # trailer + + # Create SigMF metadata for NCD + ncd_metadata = deepcopy(TEST_METADATA) + del ncd_metadata["global"][SigMFFile.HASH_KEY] + ncd_metadata["global"][SigMFFile.TRAILING_BYTES_KEY] = 32 + meta = SigMFFile(metadata=ncd_metadata) + meta.set_data_file(data_path, offset=64) + + file_hash = hashing.calculate_sha512(filename=data_path) + sigmf_hash = meta.get_global_field(SigMFFile.HASH_KEY) + self.assertEqual(file_hash, sigmf_hash) + + def test_edge_cases(self): + """Test edge cases in hash calculation function.""" + # empty file + empty_file = self.temp_dir / "empty.dat" + empty_file.touch() + empty_hash = hashing.calculate_sha512(filename=empty_file) + empty_hash_expected = sha512(b"").hexdigest() + self.assertEqual(empty_hash, empty_hash_expected) + + # small file (less than 4096 bytes) + small_data = np.random.bytes(128) + small_hash_expected = sha512(small_data).hexdigest() + small_file = self.temp_dir / "small.dat" + small_file.write_bytes(small_data) + small_hash = hashing.calculate_sha512(filename=small_file) + self.assertEqual(small_hash, small_hash_expected) + + # BytesIO + buffer = io.BytesIO(small_data) + buffer_hash = hashing.calculate_sha512(fileobj=buffer) + self.assertEqual(buffer_hash, small_hash_expected) diff --git a/tests/testdata.py b/tests/testdata.py index d773d69..fceb2ba 100644 --- a/tests/testdata.py +++ b/tests/testdata.py @@ -6,12 +6,21 @@ """Shared test data for tests.""" +import os +from pathlib import Path + import numpy as np from sigmf import SigMFFile, __specification__, __version__ -TEST_FLOAT32_DATA = np.arange(16, dtype=np.float32) +# detection for https://github.com/sigmf/example_nonsigmf_recordings +NONSIGMF_ENV = "EXAMPLE_NONSIGMF_RECORDINGS_PATH" +NONSIGMF_REPO = None +_recordings_path = Path(os.getenv(NONSIGMF_ENV, "nopath")) +if _recordings_path.is_dir(): + NONSIGMF_REPO = Path(_recordings_path) +TEST_FLOAT32_DATA = np.arange(16, dtype=np.float32) TEST_METADATA = { SigMFFile.ANNOTATION_KEY: [{SigMFFile.LENGTH_INDEX_KEY: 16, SigMFFile.START_INDEX_KEY: 0}], SigMFFile.CAPTURE_KEY: [{SigMFFile.START_INDEX_KEY: 0}],