Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sigmf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# SPDX-License-Identifier: LGPL-3.0-or-later

# version of this python module
__version__ = "1.4.0"
__version__ = "1.5.0"
# matching version of the SigMF specification
__specification__ = "1.2.6"

Expand Down
32 changes: 23 additions & 9 deletions sigmf/sigmffile.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ class SigMFFile(SigMFMetafile):
]
VALID_KEYS = {GLOBAL_KEY: VALID_GLOBAL_KEYS, CAPTURE_KEY: VALID_CAPTURE_KEYS, ANNOTATION_KEY: VALID_ANNOTATION_KEYS}

def __init__(self, metadata=None, data_file=None, global_info=None, skip_checksum=False, map_readonly=True, autoscale=True):
def __init__(
self, metadata=None, data_file=None, global_info=None, skip_checksum=False, map_readonly=True, autoscale=True
):
"""
API for SigMF I/O

Expand Down Expand Up @@ -339,7 +341,7 @@ def __getitem__(self, sli):
if self.is_complex_data:
data = data.view(np.complex64)
# for single-channel complex data, flatten the last dimension
if data.ndim > 1 and self.get_num_channels() == 1:
if data.ndim > 1 and self.num_channels == 1:
data = data.flatten()
return data[0] if isinstance(sli, int) else data
else:
Expand Down Expand Up @@ -509,7 +511,7 @@ def get_capture_start(self, index):
raise SigMFAccessError("Capture {} does not have required {} key".format(index, self.START_INDEX_KEY))
return start

def get_capture_byte_boundarys(self, index):
def get_capture_byte_boundaries(self, index):
"""
Returns a tuple of the file byte range in a dataset of a given SigMF
capture of the form [start, stop). This function works on either
Expand All @@ -531,7 +533,13 @@ def get_capture_byte_boundarys(self, index):

end_byte = start_byte
if index == len(self.get_captures()) - 1: # last captures...data is the rest of the file
end_byte = self.data_file.stat().st_size - self.get_global_field(self.TRAILING_BYTES_KEY, 0)
if self.data_file is not None:
file_size = self.data_file.stat().st_size
elif self.data_buffer is not None:
file_size = len(self.data_buffer.getbuffer())
else:
raise SigMFFileError("Neither data_file nor data_buffer is available")
end_byte = file_size - self.get_global_field(self.TRAILING_BYTES_KEY, 0)
else:
end_byte += (
(self.get_capture_start(index + 1) - self.get_capture_start(index))
Expand All @@ -540,6 +548,15 @@ def get_capture_byte_boundarys(self, index):
)
return (start_byte, end_byte)

def get_capture_byte_boundarys(self, index):
warnings.warn(
"get_capture_byte_boundarys() is deprecated and will be removed in a future version of sigmf. "
"Use get_capture_byte_boundaries() instead.",
DeprecationWarning,
stacklevel=2,
)
return self.get_capture_byte_boundaries(index)

def add_annotation(self, start_index, length=None, metadata=None):
"""
Insert annotation at start_index with length (if != None).
Expand Down Expand Up @@ -789,7 +806,7 @@ def read_samples_in_capture(self, index=0):
data : ndarray
Samples are returned as an array of float or complex, with number of dimensions equal to NUM_CHANNELS_KEY.
"""
cb = self.get_capture_byte_boundarys(index)
cb = self.get_capture_byte_boundaries(index)
if (cb[1] - cb[0]) % (self.get_sample_size() * self.num_channels):
warnings.warn(
f"Capture `{index}` in `{self.data_file}` does not contain "
Expand Down Expand Up @@ -826,10 +843,7 @@ def read_samples(self, start_index=0, count=-1):
else:
raise SigMFFileError("No signal data file has been associated with the metadata.")
first_byte = start_index * self.get_sample_size() * self.num_channels

if not self._is_conforming_dataset():
warnings.warn(f"Recording dataset appears non-compliant, resulting data may be erroneous")
return self._read_datafile(first_byte, count * self.get_num_channels())
return self._read_datafile(first_byte, count * self.num_channels)

def _read_datafile(self, first_byte, nitems):
"""
Expand Down
45 changes: 28 additions & 17 deletions tests/test_sigmffile.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,8 @@ def test_000(self) -> None:
meta = self.prepare(TEST_U8_DATA0, TEST_U8_META0, np.uint8, autoscale=False)
self.assertEqual(256, meta._count_samples())
self.assertTrue(meta._is_conforming_dataset())
self.assertTrue((0, 0), meta.get_capture_byte_boundarys(0))
self.assertTrue((0, 256), meta.get_capture_byte_boundarys(1))
self.assertTrue((0, 0), meta.get_capture_byte_boundaries(0))
self.assertTrue((0, 256), meta.get_capture_byte_boundaries(1))
self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples()))
self.assertTrue(np.array_equal(np.array([]), meta.read_samples_in_capture(0)))
self.assertTrue(np.array_equal(TEST_U8_DATA0, meta.read_samples_in_capture(1)))
Expand All @@ -265,8 +265,8 @@ def test_001(self) -> None:
meta = self.prepare(TEST_U8_DATA1, TEST_U8_META1, np.uint8, autoscale=False)
self.assertEqual(192, meta._count_samples())
self.assertFalse(meta._is_conforming_dataset())
self.assertTrue((32, 160), meta.get_capture_byte_boundarys(0))
self.assertTrue((160, 224), meta.get_capture_byte_boundarys(1))
self.assertTrue((32, 160), meta.get_capture_byte_boundaries(0))
self.assertTrue((160, 224), meta.get_capture_byte_boundaries(1))
self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0)))
self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1)))

Expand All @@ -275,8 +275,8 @@ def test_002(self) -> None:
meta = self.prepare(TEST_U8_DATA2, TEST_U8_META2, np.uint8, autoscale=False)
self.assertEqual(192, meta._count_samples())
self.assertFalse(meta._is_conforming_dataset())
self.assertTrue((32, 160), meta.get_capture_byte_boundarys(0))
self.assertTrue((160, 224), meta.get_capture_byte_boundarys(1))
self.assertTrue((32, 160), meta.get_capture_byte_boundaries(0))
self.assertTrue((160, 224), meta.get_capture_byte_boundaries(1))
self.assertTrue(np.array_equal(np.arange(128), meta.read_samples_in_capture(0)))
self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(1)))

Expand All @@ -285,9 +285,9 @@ def test_003(self) -> None:
meta = self.prepare(TEST_U8_DATA3, TEST_U8_META3, np.uint8, autoscale=False)
self.assertEqual(192, meta._count_samples())
self.assertFalse(meta._is_conforming_dataset())
self.assertTrue((32, 64), meta.get_capture_byte_boundarys(0))
self.assertTrue((64, 160), meta.get_capture_byte_boundarys(1))
self.assertTrue((160, 224), meta.get_capture_byte_boundarys(2))
self.assertTrue((32, 64), meta.get_capture_byte_boundaries(0))
self.assertTrue((64, 160), meta.get_capture_byte_boundaries(1))
self.assertTrue((160, 224), meta.get_capture_byte_boundaries(2))
self.assertTrue(np.array_equal(np.arange(32), meta.read_samples_in_capture(0)))
self.assertTrue(np.array_equal(np.arange(32, 128), meta.read_samples_in_capture(1)))
self.assertTrue(np.array_equal(np.arange(128, 192), meta.read_samples_in_capture(2)))
Expand All @@ -297,8 +297,8 @@ def test_004(self) -> None:
meta = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False)
self.assertEqual(96, meta._count_samples())
self.assertFalse(meta._is_conforming_dataset())
self.assertTrue((32, 96), meta.get_capture_byte_boundarys(0))
self.assertTrue((96, 160), meta.get_capture_byte_boundarys(1))
self.assertTrue((32, 96), meta.get_capture_byte_boundaries(0))
self.assertTrue((96, 160), meta.get_capture_byte_boundaries(1))
self.assertTrue(np.array_equal(np.arange(64).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(0)))
self.assertTrue(np.array_equal(np.arange(64, 96).repeat(2).reshape(-1, 2), meta.read_samples_in_capture(1)))

Expand All @@ -317,13 +317,24 @@ def test_slicing_rf32(self) -> None:

def test_slicing_multiple_channels(self) -> None:
"""slice multiple channels"""
meta_raw = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False)
meta_scaled = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False) # use raw data for this test

meta = self.prepare(TEST_U8_DATA4, TEST_U8_META4, np.uint8, autoscale=False)
channelized = np.array(TEST_U8_DATA4).reshape((-1, 2))
self.assertTrue(np.array_equal(meta_scaled[:][:], channelized))
self.assertTrue(np.array_equal(meta_raw[10:20, 0], meta_raw.read_samples()[10:20, 0]))
self.assertTrue(np.array_equal(meta_scaled[0], channelized[0]))
self.assertTrue(np.array_equal(meta_scaled[1, :], channelized[1]))
self.assertTrue(np.array_equal(meta[:][:], channelized))
self.assertTrue(np.array_equal(meta[10:20, 0], meta.read_samples()[10:20, 0]))
self.assertTrue(np.array_equal(meta[0], channelized[0]))
self.assertTrue(np.array_equal(meta[1, :], channelized[1]))

def test_boundaries(self) -> None:
"""capture byte boundaries from pairs & archives"""
# get a meta pair and archive
meta = self.prepare(TEST_U8_DATA3, TEST_U8_META3, np.uint8)
arc_path = self.temp_dir / "arc.sigmf"
meta.tofile(arc_path, toarchive=True)
arc = sigmf.fromfile(arc_path)
for bdx in range(3):
self.assertEqual(meta.get_capture_byte_boundaries(bdx), arc.get_capture_byte_boundaries(bdx))
self.assertTrue(np.array_equal(meta.read_samples_in_capture(bdx), arc.read_samples_in_capture(bdx)))


def simulate_capture(sigmf_md, n, capture_len):
Expand Down