diff --git a/pysoundfile.py b/pysoundfile.py index 67f6f0a..a897c5f 100644 --- a/pysoundfile.py +++ b/pysoundfile.py @@ -1,5 +1,6 @@ import numpy as _np from cffi import FFI as _FFI +from contextlib import closing as _closing from os import SEEK_SET, SEEK_CUR, SEEK_END __version__ = "0.5.0" @@ -624,6 +625,14 @@ def _check_array(self, array): raise ValueError("dtype must be one of %s" % repr([dt.name for dt in _ffi_types])) + def _create_empty_array(self, frames, always_2d, dtype): + # Create an empty array with appropriate shape + if always_2d or self.channels > 1: + shape = frames, self.channels + else: + shape = frames, + return _np.empty(shape, dtype, order='C') + def _read_or_write(self, funcname, array, frames): # Call into libsndfile ffi_type = _ffi_types[array.dtype] @@ -672,11 +681,7 @@ def read(self, frames=-1, dtype='float64', always_2d=True, if frames < 0 or (frames > remaining_frames and fill_value is None): frames = remaining_frames - if always_2d or self.channels > 1: - shape = frames, self.channels - else: - shape = frames, - out = _np.empty(shape, dtype, order='C') + out = self._create_empty_array(frames, always_2d, dtype) else: if frames < 0 or frames > len(out): frames = len(out) @@ -720,6 +725,52 @@ def write(self, data): self._info.frames = self.seek(0, SEEK_END, 'w') self.seek(curr, SEEK_SET, 'w') + def blocks(self, blocksize=None, overlap=0, frames=-1, dtype='float64', + always_2d=True, fill_value=None, out=None): + """Return a generator for block-wise processing. + + By default, the generator returns blocks of the given blocksize + until the end of the file is reached, frames can be used to + stop earlier. + + overlap can be used to rewind a certain number of frames between + blocks. + + For the arguments dtype, always_2d, fill_value and out see + SoundFile.read(). + + If fill_value is not specified, the last block may be smaller + than blocksize. + + """ + if self.mode == 'w': + raise RuntimeError("blocks() is not allowed in write mode") + + if out is None: + if blocksize is None: + raise TypeError("One of {blocksize, out} must be specified") + else: + if blocksize is not None: + raise TypeError( + "Only one of {blocksize, out} may be specified") + blocksize = len(out) + + remaining_frames = self.frames - self.seek(0, SEEK_CUR, 'r') + if frames < 0 or (fill_value is None and frames > remaining_frames): + frames = remaining_frames + + while frames > 0: + if frames < blocksize: + if fill_value is not None and out is None: + out = self._create_empty_array(blocksize, always_2d, dtype) + blocksize = frames + block = self.read(blocksize, dtype, always_2d, fill_value, out) + frames -= blocksize + if frames > 0: + self.seek(-overlap, SEEK_CUR, 'r') + frames += overlap + yield block + def open(file, mode='r', sample_rate=None, channels=None, subtype=None, endian=None, format=None, closefd=True): @@ -730,7 +781,7 @@ def open(file, mode='r', sample_rate=None, channels=None, def read(file, sample_rate=None, channels=None, subtype=None, endian=None, - format=None, closefd=True, start=None, stop=None, frames=-1, + format=None, closefd=True, start=0, stop=None, frames=-1, dtype='float64', always_2d=True, fill_value=None, out=None): """Read a sound file and return its contents as NumPy array. @@ -762,15 +813,11 @@ def read(file, sample_rate=None, channels=None, subtype=None, endian=None, """ if frames >= 0 and stop is not None: - raise RuntimeError("Only one of {frames, stop} may be used") + raise TypeError("Only one of {frames, stop} may be used") with SoundFile(file, 'r', sample_rate, channels, subtype, endian, format, closefd) as f: - start, stop, _ = slice(start, stop).indices(f.frames) - if stop < start: - stop = start - if frames < 0: - frames = stop - start + start, frames = _get_read_range(start, stop, frames, f.frames) f.seek(start, SEEK_SET) data = f.read(frames, dtype, always_2d, fill_value, out) return data, f.sample_rate @@ -803,6 +850,53 @@ def write(data, file, sample_rate, f.write(data) +def blocks(file, sample_rate=None, channels=None, + subtype=None, endian=None, format=None, closefd=True, + blocksize=None, overlap=0, start=0, stop=None, frames=-1, + dtype='float64', always_2d=True, fill_value=None, out=None): + """Return a generator for block-wise processing. + + Example usage: + + import pysoundfile as sf + for block in sf.blocks('myfile.wav', blocksize=128): + print(block.max()) + # ... or do something more useful with 'block' + + All keyword arguments of SoundFile.blocks() are allowed. + All further arguments are forwarded to open(). + + By default, iteration stops at the end of the file. Use frames or + stop to stop earlier. + + If you stop iterating over the generator before it's exhausted, the + sound file is not closed. This is normally not a problem because + the file is opened in read-only mode. To close the file properly, + the generator's close() method can be called. + + """ + if frames >= 0 and stop is not None: + raise TypeError("Only one of {frames, stop} may be used") + + with open(file, 'r', sample_rate, channels, + subtype, endian, format, closefd) as f: + start, frames = _get_read_range(start, stop, frames, f.frames) + f.seek(start, SEEK_SET) + for block in f.blocks(blocksize, overlap, frames, + dtype, always_2d, fill_value, out): + yield block + + +def _get_read_range(start, stop, frames, total_frames): + # Calculate start frame and length + start, stop, _ = slice(start, stop).indices(total_frames) + if stop < start: + stop = start + if frames < 0: + frames = stop - start + return start, frames + + def default_subtype(format): """Return default subtype for given format.""" return _default_subtypes.get(str(format).upper()) diff --git a/tests/test_argspec.py b/tests/test_argspec.py index a261002..087fb54 100644 --- a/tests/test_argspec.py +++ b/tests/test_argspec.py @@ -1,4 +1,4 @@ -"""Make sure that arguments of open/read/write don't diverge""" +"""Make sure that arguments of open/read/write don't diverge.""" import pysoundfile as sf from inspect import getargspec @@ -9,12 +9,23 @@ read_function = getargspec(sf.read) read_method = getargspec(sf.SoundFile.read) write_function = getargspec(sf.write) +blocks_function = getargspec(sf.blocks) +blocks_method = getargspec(sf.SoundFile.blocks) def defaults(spec): return dict(zip(reversed(spec.args), reversed(spec.defaults))) +def remove_items(collection, subset): + """From a collection of defaults, remove a subset and return the rest.""" + the_rest = collection.copy() + for arg, default in subset.items(): + assert (arg, the_rest[arg]) == (arg, default) + del the_rest[arg] + return the_rest + + def test_if_open_is_identical_to_init(): assert ['self'] + open.args == init.args assert open.varargs == init.varargs @@ -22,38 +33,54 @@ def test_if_open_is_identical_to_init(): assert open.defaults == init.defaults -def test_read_function(): +def test_read_defaults(): func_defaults = defaults(read_function) meth_defaults = defaults(read_method) open_defaults = defaults(open) - # Not meaningful in read() function: - del open_defaults['mode'] + del open_defaults['mode'] # Not meaningful in read() function: - # Only in read() function: del func_defaults['start'] del func_defaults['stop'] # Same default values as open() and SoundFile.read(): for spec in open_defaults, meth_defaults: - for arg, default in spec.items(): - assert (arg, func_defaults[arg]) == (arg, default) - del func_defaults[arg] + func_defaults = remove_items(func_defaults, spec) assert not func_defaults # No more arguments should be left -def test_write_function(): +def test_write_defaults(): write_defaults = defaults(write_function) open_defaults = defaults(open) - # Same default values as open(): - for arg, default in write_defaults.items(): - assert (arg, open_defaults[arg]) == (arg, default) - del open_defaults[arg] + # Same default values as open() + open_defaults = remove_items(open_defaults, write_defaults) del open_defaults['mode'] # mode is always 'w' del open_defaults['channels'] # Inferred from data del open_defaults['sample_rate'] # Obligatory in write() - assert not open_defaults # No more arguments should be left + + +def test_if_blocks_function_and_method_have_same_defaults(): + func_defaults = defaults(blocks_function) + meth_defaults = defaults(blocks_method) + open_defaults = defaults(open) + + del func_defaults['start'] + del func_defaults['stop'] + del open_defaults['mode'] + + for spec in open_defaults, meth_defaults: + func_defaults = remove_items(func_defaults, spec) + + assert not func_defaults + + +def test_order_of_blocks_arguments(): + meth_args = blocks_method.args[1:] # remove 'self' + meth_args[2:2] = ['start', 'stop'] + open_args = open.args[:] + open_args.remove('mode') + assert blocks_function.args == open_args + meth_args diff --git a/tests/test_pysoundfile.py b/tests/test_pysoundfile.py index af98c8e..15eea23 100644 --- a/tests/test_pysoundfile.py +++ b/tests/test_pysoundfile.py @@ -195,6 +195,123 @@ def test_write_function(file_w): assert np.all(data == data_mono) +# ----------------------------------------------------------------------------- +# Test blocks() function +# ----------------------------------------------------------------------------- + + +def assert_equal_list_of_arrays(list1, list2): + """Helper function to assert equality of all list items.""" + for item1, item2 in zip(list1, list2): + assert np.all(item1 == item2) + + +def test_blocks_without_blocksize(): + with pytest.raises(TypeError): + list(sf.blocks(filename_stereo)) + + +def test_blocks_full_last_block(): + blocks = list(sf.blocks(filename_stereo, blocksize=2)) + assert_equal_list_of_arrays(blocks, [data_stereo[0:2], data_stereo[2:4]]) + + +def test_blocks_partial_last_block(): + blocks = list(sf.blocks(filename_stereo, blocksize=3)) + assert_equal_list_of_arrays(blocks, [data_stereo[0:3], data_stereo[3:4]]) + + +def test_blocks_fill_last_block(): + blocks = list(sf.blocks(filename_stereo, blocksize=3, fill_value=0)) + last_block = np.row_stack((data_stereo[3:4], np.zeros((2, 2)))) + assert_equal_list_of_arrays(blocks, [data_stereo[0:3], last_block]) + + +def test_blocks_with_overlap(): + blocks = list(sf.blocks(filename_stereo, blocksize=3, overlap=2)) + assert_equal_list_of_arrays(blocks, [data_stereo[0:3], data_stereo[1:4]]) + + +def test_blocks_with_start(): + blocks = list(sf.blocks(filename_stereo, blocksize=2, start=2)) + assert_equal_list_of_arrays(blocks, [data_stereo[2:4]]) + + +def test_blocks_with_stop(): + blocks = list(sf.blocks(filename_stereo, blocksize=2, stop=2)) + assert_equal_list_of_arrays(blocks, [data_stereo[0:2]]) + + with pytest.raises(TypeError): + list(sf.blocks(filename_stereo, blocksize=2, frames=2, stop=2)) + + +def test_blocks_with_too_large_start(): + blocks = list(sf.blocks(filename_stereo, blocksize=2, start=666)) + assert_equal_list_of_arrays(blocks, [[]]) + + +def test_blocks_with_too_large_stop(): + blocks = list(sf.blocks(filename_stereo, blocksize=3, stop=666)) + assert_equal_list_of_arrays(blocks, [data_stereo[0:3], data_stereo[3:4]]) + + +def test_blocks_with_negative_start_and_stop(): + blocks = list(sf.blocks(filename_stereo, blocksize=2, start=-2, stop=-1)) + assert_equal_list_of_arrays(blocks, [data_stereo[-2:-1]]) + + +def test_blocks_with_stop_smaller_than_start(): + blocks = list(sf.blocks(filename_stereo, blocksize=2, start=2, stop=1)) + assert blocks == [] + + +def test_blocks_with_frames(): + blocks = list(sf.blocks(filename_stereo, blocksize=2, frames=3)) + assert_equal_list_of_arrays(blocks, [data_stereo[0:2], data_stereo[2:3]]) + + +def test_blocks_with_frames_and_fill_value(): + blocks = list( + sf.blocks(filename_stereo, blocksize=2, frames=3, fill_value=0)) + last_block = np.row_stack((data_stereo[2:3], np.zeros((1, 2)))) + assert_equal_list_of_arrays(blocks, [data_stereo[0:2], last_block]) + + +def test_blocks_with_out(): + out = np.empty((3, 2)) + blocks = list(sf.blocks(filename_stereo, out=out)) + assert blocks[0] is out + # First frame was overwritten by second block: + assert np.all(blocks[0] == [[0.25, -0.25], [0.75, -0.75], [0.5, -0.5]]) + assert blocks[1].base is out + assert np.all(blocks[1] == [[0.25, -0.25]]) + + with pytest.raises(TypeError): + list(sf.blocks(filename_stereo, blocksize=3, out=out)) + + +def test_blocks_mono(): + blocks = list(sf.blocks(filename_mono, blocksize=3, dtype='int16', + always_2d=False, fill_value=0)) + assert_equal_list_of_arrays(blocks, [[0, 1, 2], [-2, -1, 0]]) + + +def test_blocks_rw_existing(sf_stereo_rw_existing): + blocks = list(sf_stereo_rw_existing.blocks(blocksize=2)) + assert_equal_list_of_arrays(blocks, [data_stereo[0:2], data_stereo[2:4]]) + + +def test_blocks_rw_new(sf_stereo_rw_new): + """There is nothing to yield in a new 'rw' file.""" + blocks = list(sf_stereo_rw_new.blocks(blocksize=2, frames=666)) + assert blocks == [] + + +def test_blocks_write(sf_stereo_w): + with pytest.raises(RuntimeError): + list(sf_stereo_w.blocks(blocksize=2)) + + # ----------------------------------------------------------------------------- # Test file metadata # -----------------------------------------------------------------------------