From 87055fe0770f4bbb1bdfce83577e8dfbb3ba2f4d Mon Sep 17 00:00:00 2001 From: Teque5 Date: Tue, 6 Jan 2026 10:02:49 -0800 Subject: [PATCH] fix capture byte boundaries for archives --- sigmf/__init__.py | 2 +- sigmf/sigmffile.py | 32 ++++++++++++++++++++--------- tests/test_sigmffile.py | 45 +++++++++++++++++++++++++---------------- 3 files changed, 52 insertions(+), 27 deletions(-) diff --git a/sigmf/__init__.py b/sigmf/__init__.py index 7db8e80..4bbd62b 100644 --- a/sigmf/__init__.py +++ b/sigmf/__init__.py @@ -5,7 +5,7 @@ # SPDX-License-Identifier: LGPL-3.0-or-later # version of this python module -__version__ = "1.4.0" +__version__ = "1.5.0" # matching version of the SigMF specification __specification__ = "1.2.6" diff --git a/sigmf/sigmffile.py b/sigmf/sigmffile.py index 4de20d5..c40e7e8 100644 --- a/sigmf/sigmffile.py +++ b/sigmf/sigmffile.py @@ -182,7 +182,9 @@ class SigMFFile(SigMFMetafile): ] VALID_KEYS = {GLOBAL_KEY: VALID_GLOBAL_KEYS, CAPTURE_KEY: VALID_CAPTURE_KEYS, ANNOTATION_KEY: VALID_ANNOTATION_KEYS} - def __init__(self, metadata=None, data_file=None, global_info=None, skip_checksum=False, map_readonly=True, autoscale=True): + def __init__( + self, metadata=None, data_file=None, global_info=None, skip_checksum=False, map_readonly=True, autoscale=True + ): """ API for SigMF I/O @@ -339,7 +341,7 @@ def __getitem__(self, sli): if self.is_complex_data: data = data.view(np.complex64) # for single-channel complex data, flatten the last dimension - if data.ndim > 1 and self.get_num_channels() == 1: + if data.ndim > 1 and self.num_channels == 1: data = data.flatten() return data[0] if isinstance(sli, int) else data else: @@ -509,7 +511,7 @@ def get_capture_start(self, index): raise SigMFAccessError("Capture {} does not have required {} key".format(index, self.START_INDEX_KEY)) return start - def get_capture_byte_boundarys(self, index): + def get_capture_byte_boundaries(self, index): """ Returns a tuple of the file byte range in a dataset of a given SigMF capture of the form [start, stop). This function works on either @@ -531,7 +533,13 @@ def get_capture_byte_boundarys(self, index): end_byte = start_byte if index == len(self.get_captures()) - 1: # last captures...data is the rest of the file - end_byte = self.data_file.stat().st_size - self.get_global_field(self.TRAILING_BYTES_KEY, 0) + if self.data_file is not None: + file_size = self.data_file.stat().st_size + elif self.data_buffer is not None: + file_size = len(self.data_buffer.getbuffer()) + else: + raise SigMFFileError("Neither data_file nor data_buffer is available") + end_byte = file_size - self.get_global_field(self.TRAILING_BYTES_KEY, 0) else: end_byte += ( (self.get_capture_start(index + 1) - self.get_capture_start(index)) @@ -540,6 +548,15 @@ def get_capture_byte_boundarys(self, index): ) return (start_byte, end_byte) + def get_capture_byte_boundarys(self, index): + warnings.warn( + "get_capture_byte_boundarys() is deprecated and will be removed in a future version of sigmf. " + "Use get_capture_byte_boundaries() instead.", + DeprecationWarning, + stacklevel=2, + ) + return self.get_capture_byte_boundaries(index) + def add_annotation(self, start_index, length=None, metadata=None): """ Insert annotation at start_index with length (if != None). @@ -789,7 +806,7 @@ def read_samples_in_capture(self, index=0): data : ndarray Samples are returned as an array of float or complex, with number of dimensions equal to NUM_CHANNELS_KEY. """ - cb = self.get_capture_byte_boundarys(index) + cb = self.get_capture_byte_boundaries(index) if (cb[1] - cb[0]) % (self.get_sample_size() * self.num_channels): warnings.warn( f"Capture `{index}` in `{self.data_file}` does not contain " @@ -826,10 +843,7 @@ def read_samples(self, start_index=0, count=-1): else: raise SigMFFileError("No signal data file has been associated with the metadata.") first_byte = start_index * self.get_sample_size() * self.num_channels - - if not self._is_conforming_dataset(): - warnings.warn(f"Recording dataset appears non-compliant, resulting data may be erroneous") - return self._read_datafile(first_byte, count * self.get_num_channels()) + return self._read_datafile(first_byte, count * self.num_channels) def _read_datafile(self, first_byte, nitems): """ diff --git a/tests/test_sigmffile.py b/tests/test_sigmffile.py index f2171ae..c809e0b 100644 --- a/tests/test_sigmffile.py +++ b/tests/test_sigmffile.py @@ -254,8 +254,8 @@ def test_000(self) -> None: meta = self.prepare(TEST_U8_DATA0, TEST_U8_META0, np.uint8, autoscale=False) self.assertEqual(256, meta._count_samples()) self.assertTrue(meta._is_conforming_dataset()) - self.assertTrue((0, 0), meta.get_capture_byte_boundarys(0)) - self.assertTrue((0, 256), meta.get_capture_byte_boundarys(1)) + self.assertTrue((0, 0), meta.get_capture_byte_boundaries(0)) + self.assertTrue((0, 256), meta.get_capture_byte_boundaries(1)) self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples())) self.assertTrue(np.array_equal(np.array([]), meta.read_samples_in_capture(0))) self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples_in_capture(1))) @@ -265,8 +265,8 @@ def test_001(self) -> None: meta = self.prepare(TEST_U8_DATA1, TEST_U8_META1, np.uint8, autoscale=False) self.assertEqual(192, meta._count_samples()) self.assertFalse(meta._is_conforming_dataset()) - self.assertTrue((32, 160), meta.get_capture_byte_boundarys(0)) - self.assertTrue((160, 224), meta.get_capture_byte_boundarys(1)) + self.assertTrue((32, 160), meta.get_capture_byte_boundaries(0)) + self.assertTrue((160, 224), meta.get_capture_byte_boundaries(1)) self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0))) self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1))) @@ -275,8 +275,8 @@ def test_002(self) -> None: meta = self.prepare(TEST_U8_DATA2, TEST_U8_META2, np.uint8, autoscale=False) self.assertEqual(192, meta._count_samples()) self.assertFalse(meta._is_conforming_dataset()) - self.assertTrue((32, 160), meta.get_capture_byte_boundarys(0)) - self.assertTrue((160, 224), meta.get_capture_byte_boundarys(1)) + self.assertTrue((32, 160), meta.get_capture_byte_boundaries(0)) + self.assertTrue((160, 224), meta.get_capture_byte_boundaries(1)) self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0))) self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1))) @@ -285,9 +285,9 @@ def test_003(self) -> None: meta = self.prepare(TEST_U8_DATA3, TEST_U8_META3, np.uint8, autoscale=False) self.assertEqual(192, meta._count_samples()) self.assertFalse(meta._is_conforming_dataset()) - self.assertTrue((32, 64), meta.get_capture_byte_boundarys(0)) - self.assertTrue((64, 160), meta.get_capture_byte_boundarys(1)) - self.assertTrue((160, 224), meta.get_capture_byte_boundarys(2)) + self.assertTrue((32, 64), meta.get_capture_byte_boundaries(0)) + self.assertTrue((64, 160), meta.get_capture_byte_boundaries(1)) + self.assertTrue((160, 224), meta.get_capture_byte_boundaries(2)) self.assertTrue(np.array_equal(np.arange(32), meta.read_samples_in_capture(0))) self.assertTrue(np.array_equal(np.arange(32, 128), meta.read_samples_in_capture(1))) self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(2))) @@ -297,8 +297,8 @@ def test_004(self) -> None: meta = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False) self.assertEqual(96, meta._count_samples()) self.assertFalse(meta._is_conforming_dataset()) - self.assertTrue((32, 96), meta.get_capture_byte_boundarys(0)) - self.assertTrue((96, 160), meta.get_capture_byte_boundarys(1)) + self.assertTrue((32, 96), meta.get_capture_byte_boundaries(0)) + self.assertTrue((96, 160), meta.get_capture_byte_boundaries(1)) self.assertTrue(np.array_equal(np.arange(64).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(0))) self.assertTrue(np.array_equal(np.arange(64, 96).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(1))) @@ -317,13 +317,24 @@ def test_slicing_rf32(self) -> None: def test_slicing_multiple_channels(self) -> None: """slice multiple channels""" - meta_raw = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False) - meta_scaled = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False) # use raw data for this test + + meta = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False) channelized = np.array(TEST_U8_DATA4).reshape((-1, 2)) - self.assertTrue(np.array_equal(meta_scaled[:][:], channelized)) - self.assertTrue(np.array_equal(meta_raw[10:20, 0], meta_raw.read_samples()[10:20, 0])) - self.assertTrue(np.array_equal(meta_scaled[0], channelized[0])) - self.assertTrue(np.array_equal(meta_scaled[1, :], channelized[1])) + self.assertTrue(np.array_equal(meta[:][:], channelized)) + self.assertTrue(np.array_equal(meta[10:20, 0], meta.read_samples()[10:20, 0])) + self.assertTrue(np.array_equal(meta[0], channelized[0])) + self.assertTrue(np.array_equal(meta[1, :], channelized[1])) + + def test_boundaries(self) -> None: + """capture byte boundaries from pairs & archives""" + # get a meta pair and archive + meta = self.prepare(TEST_U8_DATA3, TEST_U8_META3, np.uint8) + arc_path = self.temp_dir / "arc.sigmf" + meta.tofile(arc_path, toarchive=True) + arc = sigmf.fromfile(arc_path) + for bdx in range(3): + self.assertEqual(meta.get_capture_byte_boundaries(bdx), arc.get_capture_byte_boundaries(bdx)) + self.assertTrue(np.array_equal(meta.read_samples_in_capture(bdx), arc.read_samples_in_capture(bdx))) def simulate_capture(sigmf_md, n, capture_len):