diff --git a/sigmf/__init__.py b/sigmf/__init__.py index 060a9e5..243d5ed 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -5,7 +5,7 @@ # SPDX-License-Identifier: LGPL-3.0-or-later # version of this python module -__version__ = "1.2.7" +__version__ = "1.2.8" # matching version of the SigMF specification __specification__ = "1.2.3" diff --git a/sigmf/archive.py b/sigmf/archive.py index 895e3d1..e3abec9 100644 --- a/sigmf/archive.py +++ b/sigmf/archive.py @@ -129,10 +129,10 @@ def _resolve(self, name, fileobj): arcname = path.stem else: arcname = name - except io.UnsupportedOperation: - raise SigMFFileError(f"fileobj {fileobj} is not byte-writable.") - except AttributeError: - raise SigMFFileError(f"fileobj {fileobj} is invalid.") + except io.UnsupportedOperation as exc: + raise SigMFFileError(f"fileobj {fileobj} is not byte-writable.") from exc + except AttributeError as exc: + raise SigMFFileError(f"fileobj {fileobj} is invalid.") from exc elif name: path = Path(name) # ensure name has correct suffix if it exists @@ -146,8 +146,8 @@ def _resolve(self, name, fileobj): try: fileobj = open(path, "wb") - except (OSError, IOError): - raise SigMFFileError(f"Can't open {name} for writing.") + except (OSError, IOError) as exc: + raise SigMFFileError(f"Can't open {name} for writing.") from exc else: raise SigMFFileError("Either `name` or `fileobj` needs to be defined.") diff --git a/sigmf/archivereader.py b/sigmf/archivereader.py index 015036b..7f4c1d3 100644 --- a/sigmf/archivereader.py +++ b/sigmf/archivereader.py @@ -7,27 +7,18 @@ """Access SigMF archives without extracting them.""" import io -import os -import shutil import tarfile -import tempfile from pathlib import Path from . import __version__ -from .archive import ( - SIGMF_ARCHIVE_EXT, - SIGMF_DATASET_EXT, - SIGMF_METADATA_EXT, - SigMFArchive, -) +from .archive import SIGMF_ARCHIVE_EXT, SIGMF_DATASET_EXT, SIGMF_METADATA_EXT from .error import SigMFFileError from .sigmffile import SigMFFile -from .utils import dict_merge class SigMFArchiveReader: """ - Access data within SigMF archive `tar` in-place without extracting. + Access data within SigMF archive tarball in-place without extracting. Parameters ---------- @@ -44,6 +35,10 @@ class SigMFArchiveReader: ------ SigMFError Archive file does not exist or is improperly formatted. + ValueError + If invalid arguments. + ValidationError + If metadata is invalid. """ def __init__(self, name=None, skip_checksum=False, map_readonly=True, archive_buffer=None): @@ -96,7 +91,7 @@ def __init__(self, name=None, skip_checksum=False, map_readonly=True, archive_bu raise SigMFFileError("No .sigmf-data file found in archive!") self.sigmffile = SigMFFile(metadata=json_contents) - valid_md = self.sigmffile.validate() + self.sigmffile.validate() self.sigmffile.set_data_file( data_buffer=data_buffer, diff --git a/sigmf/schema.py b/sigmf/schema.py index 6660ef7..300f610 100644 --- a/sigmf/schema.py +++ b/sigmf/schema.py @@ -10,7 +10,6 @@ from pathlib import Path from . import __version__ as toolversion -from . import utils SCHEMA_META = "schema-meta.json" SCHEMA_COLLECTION = "schema-collection.json" diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 272dcc3..940b775 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -9,8 +9,6 @@ import codecs import io import json -import tarfile -import tempfile import warnings from collections import OrderedDict from pathlib import Path @@ -229,7 +227,7 @@ def __getitem__(self, sli): ray = mem[:, :, 0].astype(self._return_type) + 1.0j * mem[:, :, 1].astype(self._return_type) else: raise ValueError("unhandled ndim in SigMFFile.__getitem__(); this shouldn't happen") - return ray[0] if type(sli) is int else ray # return element instead of 1-element array + return ray[0] if isinstance(sli, int) else ray # return element instead of 1-element array def _get_start_offset(self): """ diff --git a/sigmf/utils.py b/sigmf/utils.py index 7d805c3..571a5e4 100644 --- a/sigmf/utils.py +++ b/sigmf/utils.py @@ -10,11 +10,10 @@ import sys from copy import deepcopy from datetime import datetime, timezone -from pathlib import Path import numpy as np -from . import error +from .error import SigMFError SIGMF_DATETIME_ISO8601_FMT = "%Y-%m-%dT%H:%M:%S.%fZ" @@ -75,7 +74,7 @@ def dict_merge(a_dict: dict, b_dict: dict) -> dict: def get_endian_str(ray: np.ndarray) -> str: """Return SigMF compatible endianness string for a numpy array""" if not isinstance(ray, np.ndarray): - raise error.SigMFError("Argument must be a numpy array") + raise SigMFError("Argument must be a numpy array") atype = ray.dtype if atype.byteorder == "<": @@ -94,10 +93,10 @@ def get_data_type_str(ray: np.ndarray) -> str: integer types are not supported. """ if not isinstance(ray, np.ndarray): - raise error.SigMFError("Argument must be a numpy array") + raise SigMFError("Argument must be a numpy array") atype = ray.dtype if atype.kind not in ("u", "i", "f", "c"): - raise error.SigMFError("Unsupported data type:", atype) + raise SigMFError("Unsupported data type:", atype) data_type_str = "" if atype.kind == "c": data_type_str += "cf" diff --git a/sigmf/validate.py b/sigmf/validate.py index 22755c2..0694332 100644 --- a/sigmf/validate.py +++ b/sigmf/validate.py @@ -136,10 +136,10 @@ def main(arg_tuple: Optional[Tuple[str, ...]] = None) -> None: log.error("No paths to validate.") sys.exit(1) elif n_completed != n_total: - log.info(f"Validated {n_completed} of {n_total} files OK") + log.info("Validated %d of %d files OK", n_completed, n_total) sys.exit(1) else: - log.info(f"Validated all {n_total} files OK!") + log.info("Validated all %d files OK!", n_total) if __name__ == "__main__": diff --git a/tests/test_archivereader.py b/tests/test_archivereader.py index bc04476..0426342 100644 --- a/tests/test_archivereader.py +++ b/tests/test_archivereader.py @@ -6,8 +6,8 @@ """Tests for SigMFArchiveReader""" -import tempfile import unittest +from tempfile import NamedTemporaryFile import numpy as np @@ -33,29 +33,28 @@ def setUp(self): def test_access_data_without_untar(self): """iterate through datatypes and verify IO is correct""" - _, temp_path = tempfile.mkstemp() - _, temp_archive = tempfile.mkstemp(suffix=".sigmf") + temp_data = NamedTemporaryFile() + temp_archive = NamedTemporaryFile(suffix=".sigmf") for key, dtype in self.lut.items(): # for each type of storage temp_samples = np.arange(self.raw_count, dtype=dtype) - temp_samples.tofile(temp_path) + temp_samples.tofile(temp_data.name) for num_channels in [1, 4, 8]: # for single or 8 channel for complex_prefix in ["r", "c"]: # for real or complex target_count = self.raw_count temp_meta = SigMFFile( - data_file=temp_path, + data_file=temp_data.name, global_info={ SigMFFile.DATATYPE_KEY: f"{complex_prefix}{key}_le", SigMFFile.NUM_CHANNELS_KEY: num_channels, - SigMFFile.VERSION_KEY: __specification__, }, ) - temp_meta.tofile(temp_archive, toarchive=True) + temp_meta.tofile(temp_archive.name, toarchive=True) - readback = SigMFArchiveReader(temp_archive) + readback = SigMFArchiveReader(temp_archive.name) readback_samples = readback[:] if complex_prefix == "c": @@ -84,10 +83,10 @@ def test_access_data_without_untar(self): def test_archiveread_data_file_unchanged(test_sigmffile): - with tempfile.NamedTemporaryFile(suffix=".sigmf") as temp: + with NamedTemporaryFile(suffix=".sigmf") as temp_file: input_samples = test_sigmffile.read_samples() - test_sigmffile.archive(temp.name) - arc = sigmf.sigmffile.fromfile(temp.name) + test_sigmffile.archive(temp_file.name) + arc = sigmf.sigmffile.fromfile(temp_file.name) output_samples = arc.read_samples() assert np.array_equal(input_samples, output_samples) diff --git a/tests/test_sigmffile.py b/tests/test_sigmffile.py index dccfcb6..0b792e4 100644 --- a/tests/test_sigmffile.py +++ b/tests/test_sigmffile.py @@ -122,55 +122,6 @@ def test_set_data_file_with_annotations(self): self.assertTrue(len(samples) == 16) -def simulate_capture(sigmf_md, n, capture_len): - start_index = capture_len * n - - capture_md = {"core:datetime": utils.get_sigmf_iso8601_datetime_now()} - - sigmf_md.add_capture(start_index=start_index, metadata=capture_md) - - annotation_md = { - "core:latitude": 40.0 + 0.0001 * n, - "core:longitude": -105.0 + 0.0001 * n, - } - - sigmf_md.add_annotation(start_index=start_index, length=capture_len, metadata=annotation_md) - - -def test_default_constructor(): - SigMFFile() - - -def test_set_non_required_global_field(): - sigf = SigMFFile() - sigf.set_global_field("this_is:not_in_the_schema", None) - - -def test_add_capture(): - sigf = SigMFFile() - sigf.add_capture(start_index=0, metadata={}) - - -def test_add_annotation(): - sigf = SigMFFile() - sigf.add_capture(start_index=0) - meta = {"latitude": 40.0, "longitude": -105.0} - sigf.add_annotation(start_index=0, length=128, metadata=meta) - - -def test_fromarchive(test_sigmffile): - _, tf = tempfile.mkstemp() - archive_path = test_sigmffile.archive(name=tf) - result = sigmffile.fromarchive(archive_path=archive_path) - assert result._metadata == test_sigmffile._metadata == TEST_METADATA - - -def test_add_multiple_captures_and_annotations(): - sigf = SigMFFile() - for idx in range(3): - simulate_capture(sigf, idx, 1024) - - class TestMultichannel(unittest.TestCase): def setUp(self): # in order to check shapes we need some positive number of samples to work with @@ -186,20 +137,25 @@ def setUp(self): "f32": np.float32, "f64": np.float64, } + self.temp_file = tempfile.NamedTemporaryFile() + self.temp_path = Path(self.temp_file.name) + + def tearDown(self): + """clean-up temporary files""" + self.temp_file.close() def test_multichannel_types(self): """check that real & complex for all types is reading multiple channels correctly""" - _, temp_path = tempfile.mkstemp() for key, dtype in self.lut.items(): # for each type of storage - np.arange(self.raw_count, dtype=dtype).tofile(temp_path) + np.arange(self.raw_count, dtype=dtype).tofile(self.temp_path) for num_channels in [1, 4, 8]: # for single or 8 channel for complex_prefix in ["r", "c"]: # for real or complex check_count = self.raw_count temp_signal = SigMFFile( - data_file=temp_path, + data_file=self.temp_path, global_info={ SigMFFile.DATATYPE_KEY: f"{complex_prefix}{key}_le", SigMFFile.NUM_CHANNELS_KEY: num_channels, @@ -218,12 +174,11 @@ def test_multichannel_types(self): self.assertEqual(check_count, temp_signal._count_samples()) def test_multichannel_seek(self): - """assure that seeking is working correctly with multichannel files""" - _, temp_path = tempfile.mkstemp() + """ensure that seeking is working correctly with multichannel files""" # write some dummy data and read back - np.arange(18, dtype=np.uint16).tofile(temp_path) + np.arange(18, dtype=np.uint16).tofile(self.temp_path) temp_signal = SigMFFile( - data_file=temp_path, + data_file=self.temp_path, global_info={ SigMFFile.DATATYPE_KEY: "cu16_le", SigMFFile.NUM_CHANNELS_KEY: 3, @@ -231,17 +186,17 @@ def test_multichannel_seek(self): ) # read after the first sample temp_samples = temp_signal.read_samples(start_index=1, autoscale=False) - # assure samples are in the order we expect + # ensure samples are in the order we expect self.assertTrue(np.all(temp_samples[:, 0] == np.array([6 + 7j, 12 + 13j]))) def test_key_validity(): - """assure the keys in test metadata are valid""" + """ensure the keys in test metadata are valid""" for top_key, top_val in TEST_METADATA.items(): - if type(top_val) is dict: + if isinstance(top_val, dict): for core_key in top_val.keys(): assert core_key in vars(SigMFFile)[f"VALID_{top_key.upper()}_KEYS"] - elif type(top_val) is list: + elif isinstance(top_val, list): # annotations are in a list for annot in top_val: for core_key in annot.keys(): @@ -258,92 +213,151 @@ def test_ordered_metadata(): assert kdx == top_sort_order.index(key) -def test_captures_checking(): - """ - these tests make sure the various captures access tools work properly - """ - np.array(TEST_U8_DATA0, dtype=np.uint8).tofile("/tmp/d0.sigmf-data") - with open("/tmp/d0.sigmf-meta", "w") as f0: - json.dump(TEST_U8_META0, f0) - np.array(TEST_U8_DATA1, dtype=np.uint8).tofile("/tmp/d1.sigmf-data") - with open("/tmp/d1.sigmf-meta", "w") as f1: - json.dump(TEST_U8_META1, f1) - np.array(TEST_U8_DATA2, dtype=np.uint8).tofile("/tmp/d2.sigmf-data") - with open("/tmp/d2.sigmf-meta", "w") as f2: - json.dump(TEST_U8_META2, f2) - np.array(TEST_U8_DATA3, dtype=np.uint8).tofile("/tmp/d3.sigmf-data") - with open("/tmp/d3.sigmf-meta", "w") as f3: - json.dump(TEST_U8_META3, f3) - np.array(TEST_U8_DATA4, dtype=np.uint8).tofile("/tmp/d4.sigmf-data") - with open("/tmp/d4.sigmf-meta", "w") as f4: - json.dump(TEST_U8_META4, f4) - - sigmf0 = sigmffile.fromfile("/tmp/d0.sigmf-meta", skip_checksum=True) - sigmf1 = sigmffile.fromfile("/tmp/d1.sigmf-meta", skip_checksum=True) - sigmf2 = sigmffile.fromfile("/tmp/d2.sigmf-meta", skip_checksum=True) - sigmf3 = sigmffile.fromfile("/tmp/d3.sigmf-meta", skip_checksum=True) - sigmf4 = sigmffile.fromfile("/tmp/d4.sigmf-meta", skip_checksum=True) - - assert sigmf0._count_samples() == 256 - assert sigmf0._is_conforming_dataset() - assert (0, 0) == sigmf0.get_capture_byte_boundarys(0) - assert (0, 256) == sigmf0.get_capture_byte_boundarys(1) - assert np.array_equal(TEST_U8_DATA0, sigmf0.read_samples(autoscale=False)) - assert np.array_equal(np.array([]), sigmf0.read_samples_in_capture(0)) - assert np.array_equal(TEST_U8_DATA0, sigmf0.read_samples_in_capture(1, autoscale=False)) - - assert sigmf1._count_samples() == 192 - assert not sigmf1._is_conforming_dataset() - assert (32, 160) == sigmf1.get_capture_byte_boundarys(0) - assert (160, 224) == sigmf1.get_capture_byte_boundarys(1) - assert np.array_equal(np.array(range(128)), sigmf1.read_samples_in_capture(0, autoscale=False)) - assert np.array_equal(np.array(range(128, 192)), sigmf1.read_samples_in_capture(1, autoscale=False)) - - assert sigmf2._count_samples() == 192 - assert not sigmf2._is_conforming_dataset() - assert (32, 160) == sigmf2.get_capture_byte_boundarys(0) - assert (176, 240) == sigmf2.get_capture_byte_boundarys(1) - assert np.array_equal(np.array(range(128)), sigmf2.read_samples_in_capture(0, autoscale=False)) - assert np.array_equal(np.array(range(128, 192)), sigmf2.read_samples_in_capture(1, autoscale=False)) - - assert sigmf3._count_samples() == 192 - assert not sigmf3._is_conforming_dataset() - assert (32, 64) == sigmf3.get_capture_byte_boundarys(0) - assert (64, 160) == sigmf3.get_capture_byte_boundarys(1) - assert (192, 256) == sigmf3.get_capture_byte_boundarys(2) - assert np.array_equal(np.array(range(32)), sigmf3.read_samples_in_capture(0, autoscale=False)) - assert np.array_equal(np.array(range(32, 128)), sigmf3.read_samples_in_capture(1, autoscale=False)) - assert np.array_equal(np.array(range(128, 192)), sigmf3.read_samples_in_capture(2, autoscale=False)) - - assert sigmf4._count_samples() == 96 - assert not sigmf4._is_conforming_dataset() - assert (32, 160) == sigmf4.get_capture_byte_boundarys(0) - assert (160, 224) == sigmf4.get_capture_byte_boundarys(1) - assert np.array_equal(np.array(range(64)), sigmf4.read_samples_in_capture(0, autoscale=False)[:, 0]) - assert np.array_equal(np.array(range(64, 96)), sigmf4.read_samples_in_capture(1, autoscale=False)[:, 1]) - - -def test_slicing(): - """Test __getitem___ builtin for sigmffile, make sure slicing and indexing works as expected.""" - _, temp_data0 = tempfile.mkstemp() - np.array(TEST_U8_DATA0, dtype=np.uint8).tofile(temp_data0) - sigmf0 = SigMFFile(metadata=TEST_U8_META0, data_file=temp_data0) - assert np.array_equal(TEST_U8_DATA0, sigmf0[:]) - assert TEST_U8_DATA0[6] == sigmf0[6] - - # test float32 - _, temp_data1 = tempfile.mkstemp() - np.array(TEST_FLOAT32_DATA, dtype=np.float32).tofile(temp_data1) - sigmf1 = SigMFFile(metadata=TEST_METADATA, data_file=temp_data1) - assert np.array_equal(TEST_FLOAT32_DATA, sigmf1[:]) - assert sigmf1[10] == TEST_FLOAT32_DATA[10] - - # test multiple channels - _, temp_data2 = tempfile.mkstemp() - np.array(TEST_U8_DATA4, dtype=np.uint8).tofile(temp_data2) - sigmf2 = SigMFFile(TEST_U8_META4, data_file=temp_data2) - channelized = np.array(TEST_U8_DATA4).reshape((128, 2)) - assert np.array_equal(channelized, sigmf2[:][:]) - assert np.array_equal(sigmf2[10:20, 91:112], sigmf2.read_samples(autoscale=False)[10:20, 91:112]) - assert np.array_equal(sigmf2[0], channelized[0]) - assert np.array_equal(sigmf2[1, :], channelized[1, :]) +class TestCaptures(unittest.TestCase): + """ensure capture access tools work properly""" + + def setUp(self) -> None: + """ensure tests have a valid SigMF object to work with""" + self.temp_dir = Path(tempfile.mkdtemp()) + self.temp_path_data = self.temp_dir / "trash.sigmf-data" + self.temp_path_meta = self.temp_dir / "trash.sigmf-meta" + + def tearDown(self) -> None: + """remove temporary dir""" + shutil.rmtree(self.temp_dir) + + def prepare(self, data: list, meta: dict, dtype: type) -> SigMFFile: + """write some data and metadata to temporary paths""" + np.array(data, dtype=dtype).tofile(self.temp_path_data) + with open(self.temp_path_meta, "w") as handle: + json.dump(meta, handle) + meta = sigmffile.fromfile(self.temp_path_meta, skip_checksum=True) + return meta + + def test_000(self) -> None: + """compliant two-capture recording""" + meta = self.prepare(TEST_U8_DATA0, TEST_U8_META0, np.uint8) + self.assertEqual(256, meta._count_samples()) + self.assertTrue(meta._is_conforming_dataset()) + self.assertTrue((0, 0), meta.get_capture_byte_boundarys(0)) + self.assertTrue((0, 256), meta.get_capture_byte_boundarys(1)) + self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples(autoscale=False))) + self.assertTrue(np.array_equal(np.array([]), meta.read_samples_in_capture(0))) + self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples_in_capture(1, autoscale=False))) + + def test_001(self) -> None: + """two capture recording with header_bytes and trailing_bytes set""" + meta = self.prepare(TEST_U8_DATA1, TEST_U8_META1, np.uint8) + self.assertEqual(192, meta._count_samples()) + self.assertFalse(meta._is_conforming_dataset()) + self.assertTrue((32, 160), meta.get_capture_byte_boundarys(0)) + self.assertTrue((160, 224), meta.get_capture_byte_boundarys(1)) + self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0, autoscale=False))) + self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1, autoscale=False))) + + def test_002(self) -> None: + """two capture recording with multiple header_bytes set""" + meta = self.prepare(TEST_U8_DATA2, TEST_U8_META2, np.uint8) + self.assertEqual(192, meta._count_samples()) + self.assertFalse(meta._is_conforming_dataset()) + self.assertTrue((32, 160), meta.get_capture_byte_boundarys(0)) + self.assertTrue((176, 240), meta.get_capture_byte_boundarys(1)) + self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0, autoscale=False))) + self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1, autoscale=False))) + + def test_003(self) -> None: + """three capture recording with multiple header_bytes set""" + meta = self.prepare(TEST_U8_DATA3, TEST_U8_META3, np.uint8) + self.assertEqual(192, meta._count_samples()) + self.assertFalse(meta._is_conforming_dataset()) + self.assertTrue((32, 64), meta.get_capture_byte_boundarys(0)) + self.assertTrue((64, 160), meta.get_capture_byte_boundarys(1)) + self.assertTrue((192, 256), meta.get_capture_byte_boundarys(2)) + self.assertTrue(np.array_equal(np.arange(32), meta.read_samples_in_capture(0, autoscale=False))) + self.assertTrue(np.array_equal(np.arange(32, 128), meta.read_samples_in_capture(1, autoscale=False))) + self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(2, autoscale=False))) + + def test_004(self) -> None: + """two channel version of 000""" + meta = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8) + self.assertEqual(96, meta._count_samples()) + self.assertFalse(meta._is_conforming_dataset()) + self.assertTrue((32, 160), meta.get_capture_byte_boundarys(0)) + self.assertTrue((160, 224), meta.get_capture_byte_boundarys(1)) + self.assertTrue( + np.array_equal(np.arange(64).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(0, autoscale=False)) + ) + self.assertTrue( + np.array_equal(np.arange(64, 96).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(1, autoscale=False)) + ) + + def test_slicing_ru8(self) -> None: + """slice real uint8""" + meta = self.prepare(TEST_U8_DATA0, TEST_U8_META0, np.uint8) + self.assertTrue(np.array_equal(meta[:], TEST_U8_DATA0)) + self.assertTrue(np.array_equal(meta[6], TEST_U8_DATA0[6])) + self.assertTrue(np.array_equal(meta[1:-1], TEST_U8_DATA0[1:-1])) + + def test_slicing_rf32(self) -> None: + """slice real float32""" + meta = self.prepare(TEST_FLOAT32_DATA, TEST_METADATA, np.float32) + self.assertTrue(np.array_equal(meta[:], TEST_FLOAT32_DATA)) + self.assertTrue(np.array_equal(meta[9], TEST_FLOAT32_DATA[9])) + + def test_slicing_multiple_channels(self) -> None: + """slice multiple channels""" + meta = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8) + channelized = np.array(TEST_U8_DATA4).reshape((-1, 2)) + self.assertTrue(np.array_equal(meta[:][:], channelized)) + self.assertTrue(np.array_equal(meta[10:20, 0], meta.read_samples(autoscale=False)[10:20, 0])) + self.assertTrue(np.array_equal(meta[0], channelized[0])) + self.assertTrue(np.array_equal(meta[1, :], channelized[1])) + + +def simulate_capture(sigmf_md, n, capture_len): + start_index = capture_len * n + + capture_md = {"core:datetime": utils.get_sigmf_iso8601_datetime_now()} + + sigmf_md.add_capture(start_index=start_index, metadata=capture_md) + + annotation_md = { + "core:latitude": 40.0 + 0.0001 * n, + "core:longitude": -105.0 + 0.0001 * n, + } + + sigmf_md.add_annotation(start_index=start_index, length=capture_len, metadata=annotation_md) + + +def test_default_constructor(): + SigMFFile() + + +def test_set_non_required_global_field(): + sigf = SigMFFile() + sigf.set_global_field("this_is:not_in_the_schema", None) + + +def test_add_capture(): + sigf = SigMFFile() + sigf.add_capture(start_index=0, metadata={}) + + +def test_add_annotation(): + sigf = SigMFFile() + sigf.add_capture(start_index=0) + meta = {"latitude": 40.0, "longitude": -105.0} + sigf.add_annotation(start_index=0, length=128, metadata=meta) + + +def test_fromarchive(test_sigmffile): + with tempfile.NamedTemporaryFile(suffix=".sigmf") as temp_file: + archive_path = test_sigmffile.archive(name=temp_file.name) + result = sigmffile.fromarchive(archive_path=archive_path) + assert result._metadata == test_sigmffile._metadata == TEST_METADATA + + +def test_add_multiple_captures_and_annotations(): + sigf = SigMFFile() + for idx in range(3): + simulate_capture(sigf, idx, 1024) diff --git a/tests/test_validation.py b/tests/test_validation.py index b3b0841..8f961d8 100644 --- a/tests/test_validation.py +++ b/tests/test_validation.py @@ -123,8 +123,9 @@ def test_annotation_without_sample_count(self): SigMFFile(self.metadata).validate() def test_invalid_hash(self): - _, temp_path = tempfile.mkstemp() - TEST_FLOAT32_DATA.tofile(temp_path) - self.metadata[SigMFFile.GLOBAL_KEY][SigMFFile.HASH_KEY] = "derp" - with self.assertRaises(sigmf.error.SigMFFileError): - SigMFFile(metadata=self.metadata, data_file=temp_path) + """wrong hash raises error on creation""" + with tempfile.NamedTemporaryFile() as temp_file: + TEST_FLOAT32_DATA.tofile(temp_file.name) + self.metadata[SigMFFile.GLOBAL_KEY][SigMFFile.HASH_KEY] = "derp" + with self.assertRaises(sigmf.error.SigMFFileError): + SigMFFile(metadata=self.metadata, data_file=temp_file.name)