Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 106 additions & 12 deletions pysoundfile.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import numpy as _np
from cffi import FFI as _FFI
from contextlib import closing as _closing
from os import SEEK_SET, SEEK_CUR, SEEK_END

__version__ = "0.5.0"
Expand Down Expand Up @@ -624,6 +625,14 @@ def _check_array(self, array):
raise ValueError("dtype must be one of %s" %
repr([dt.name for dt in _ffi_types]))

def _create_empty_array(self, frames, always_2d, dtype):
# Create an empty array with appropriate shape
if always_2d or self.channels > 1:
shape = frames, self.channels
else:
shape = frames,
return _np.empty(shape, dtype, order='C')

def _read_or_write(self, funcname, array, frames):
# Call into libsndfile
ffi_type = _ffi_types[array.dtype]
Expand Down Expand Up @@ -672,11 +681,7 @@ def read(self, frames=-1, dtype='float64', always_2d=True,
if frames < 0 or (frames > remaining_frames and
fill_value is None):
frames = remaining_frames
if always_2d or self.channels > 1:
shape = frames, self.channels
else:
shape = frames,
out = _np.empty(shape, dtype, order='C')
out = self._create_empty_array(frames, always_2d, dtype)
else:
if frames < 0 or frames > len(out):
frames = len(out)
Expand Down Expand Up @@ -720,6 +725,52 @@ def write(self, data):
self._info.frames = self.seek(0, SEEK_END, 'w')
self.seek(curr, SEEK_SET, 'w')

def blocks(self, blocksize=None, overlap=0, frames=-1, dtype='float64',
always_2d=True, fill_value=None, out=None):
"""Return a generator for block-wise processing.

By default, the generator returns blocks of the given blocksize
until the end of the file is reached, frames can be used to
stop earlier.

overlap can be used to rewind a certain number of frames between
blocks.

For the arguments dtype, always_2d, fill_value and out see
SoundFile.read().

If fill_value is not specified, the last block may be smaller
than blocksize.

"""
if self.mode == 'w':
raise RuntimeError("blocks() is not allowed in write mode")

if out is None:
if blocksize is None:
raise TypeError("One of {blocksize, out} must be specified")
else:
if blocksize is not None:
raise TypeError(
"Only one of {blocksize, out} may be specified")
blocksize = len(out)

remaining_frames = self.frames - self.seek(0, SEEK_CUR, 'r')
if frames < 0 or (fill_value is None and frames > remaining_frames):
frames = remaining_frames

while frames > 0:
if frames < blocksize:
if fill_value is not None and out is None:
out = self._create_empty_array(blocksize, always_2d, dtype)
blocksize = frames
block = self.read(blocksize, dtype, always_2d, fill_value, out)
frames -= blocksize
if frames > 0:
self.seek(-overlap, SEEK_CUR, 'r')
frames += overlap
yield block


def open(file, mode='r', sample_rate=None, channels=None,
subtype=None, endian=None, format=None, closefd=True):
Expand All @@ -730,7 +781,7 @@ def open(file, mode='r', sample_rate=None, channels=None,


def read(file, sample_rate=None, channels=None, subtype=None, endian=None,
format=None, closefd=True, start=None, stop=None, frames=-1,
format=None, closefd=True, start=0, stop=None, frames=-1,
dtype='float64', always_2d=True, fill_value=None, out=None):
"""Read a sound file and return its contents as NumPy array.

Expand Down Expand Up @@ -762,15 +813,11 @@ def read(file, sample_rate=None, channels=None, subtype=None, endian=None,

"""
if frames >= 0 and stop is not None:
raise RuntimeError("Only one of {frames, stop} may be used")
raise TypeError("Only one of {frames, stop} may be used")

with SoundFile(file, 'r', sample_rate, channels,
subtype, endian, format, closefd) as f:
start, stop, _ = slice(start, stop).indices(f.frames)
if stop < start:
stop = start
if frames < 0:
frames = stop - start
start, frames = _get_read_range(start, stop, frames, f.frames)
f.seek(start, SEEK_SET)
data = f.read(frames, dtype, always_2d, fill_value, out)
return data, f.sample_rate
Expand Down Expand Up @@ -803,6 +850,53 @@ def write(data, file, sample_rate,
f.write(data)


def blocks(file, sample_rate=None, channels=None,
subtype=None, endian=None, format=None, closefd=True,
blocksize=None, overlap=0, start=0, stop=None, frames=-1,
dtype='float64', always_2d=True, fill_value=None, out=None):
"""Return a generator for block-wise processing.

Example usage:

import pysoundfile as sf
for block in sf.blocks('myfile.wav', blocksize=128):
print(block.max())
# ... or do something more useful with 'block'

All keyword arguments of SoundFile.blocks() are allowed.
All further arguments are forwarded to open().

By default, iteration stops at the end of the file. Use frames or
stop to stop earlier.

If you stop iterating over the generator before it's exhausted, the
sound file is not closed. This is normally not a problem because
the file is opened in read-only mode. To close the file properly,
the generator's close() method can be called.

"""
if frames >= 0 and stop is not None:
raise TypeError("Only one of {frames, stop} may be used")

with open(file, 'r', sample_rate, channels,
subtype, endian, format, closefd) as f:
start, frames = _get_read_range(start, stop, frames, f.frames)
f.seek(start, SEEK_SET)
for block in f.blocks(blocksize, overlap, frames,
dtype, always_2d, fill_value, out):
yield block


def _get_read_range(start, stop, frames, total_frames):
# Calculate start frame and length
start, stop, _ = slice(start, stop).indices(total_frames)
if stop < start:
stop = start
if frames < 0:
frames = stop - start
return start, frames


def default_subtype(format):
"""Return default subtype for given format."""
return _default_subtypes.get(str(format).upper())
Expand Down
55 changes: 41 additions & 14 deletions tests/test_argspec.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Make sure that arguments of open/read/write don't diverge"""
"""Make sure that arguments of open/read/write don't diverge."""

import pysoundfile as sf
from inspect import getargspec
Expand All @@ -9,51 +9,78 @@
read_function = getargspec(sf.read)
read_method = getargspec(sf.SoundFile.read)
write_function = getargspec(sf.write)
blocks_function = getargspec(sf.blocks)
blocks_method = getargspec(sf.SoundFile.blocks)


def defaults(spec):
return dict(zip(reversed(spec.args), reversed(spec.defaults)))


def remove_items(collection, subset):
"""From a collection of defaults, remove a subset and return the rest."""
the_rest = collection.copy()
for arg, default in subset.items():
assert (arg, the_rest[arg]) == (arg, default)
del the_rest[arg]
return the_rest


def test_if_open_is_identical_to_init():
assert ['self'] + open.args == init.args
assert open.varargs == init.varargs
assert open.keywords == init.keywords
assert open.defaults == init.defaults


def test_read_function():
def test_read_defaults():
func_defaults = defaults(read_function)
meth_defaults = defaults(read_method)
open_defaults = defaults(open)

# Not meaningful in read() function:
del open_defaults['mode']
del open_defaults['mode'] # Not meaningful in read() function:

# Only in read() function:
del func_defaults['start']
del func_defaults['stop']

# Same default values as open() and SoundFile.read():
for spec in open_defaults, meth_defaults:
for arg, default in spec.items():
assert (arg, func_defaults[arg]) == (arg, default)
del func_defaults[arg]
func_defaults = remove_items(func_defaults, spec)

assert not func_defaults # No more arguments should be left


def test_write_function():
def test_write_defaults():
write_defaults = defaults(write_function)
open_defaults = defaults(open)

# Same default values as open():
for arg, default in write_defaults.items():
assert (arg, open_defaults[arg]) == (arg, default)
del open_defaults[arg]
# Same default values as open()
open_defaults = remove_items(open_defaults, write_defaults)

del open_defaults['mode'] # mode is always 'w'
del open_defaults['channels'] # Inferred from data
del open_defaults['sample_rate'] # Obligatory in write()

assert not open_defaults # No more arguments should be left


def test_if_blocks_function_and_method_have_same_defaults():
func_defaults = defaults(blocks_function)
meth_defaults = defaults(blocks_method)
open_defaults = defaults(open)

del func_defaults['start']
del func_defaults['stop']
del open_defaults['mode']

for spec in open_defaults, meth_defaults:
func_defaults = remove_items(func_defaults, spec)

assert not func_defaults


def test_order_of_blocks_arguments():
meth_args = blocks_method.args[1:] # remove 'self'
meth_args[2:2] = ['start', 'stop']
open_args = open.args[:]
open_args.remove('mode')
assert blocks_function.args == open_args + meth_args
117 changes: 117 additions & 0 deletions tests/test_pysoundfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,123 @@ def test_write_function(file_w):
assert np.all(data == data_mono)


# -----------------------------------------------------------------------------
# Test blocks() function
# -----------------------------------------------------------------------------


def assert_equal_list_of_arrays(list1, list2):
"""Helper function to assert equality of all list items."""
for item1, item2 in zip(list1, list2):
assert np.all(item1 == item2)


def test_blocks_without_blocksize():
with pytest.raises(TypeError):
list(sf.blocks(filename_stereo))


def test_blocks_full_last_block():
blocks = list(sf.blocks(filename_stereo, blocksize=2))
assert_equal_list_of_arrays(blocks, [data_stereo[0:2], data_stereo[2:4]])


def test_blocks_partial_last_block():
blocks = list(sf.blocks(filename_stereo, blocksize=3))
assert_equal_list_of_arrays(blocks, [data_stereo[0:3], data_stereo[3:4]])


def test_blocks_fill_last_block():
blocks = list(sf.blocks(filename_stereo, blocksize=3, fill_value=0))
last_block = np.row_stack((data_stereo[3:4], np.zeros((2, 2))))
assert_equal_list_of_arrays(blocks, [data_stereo[0:3], last_block])


def test_blocks_with_overlap():
blocks = list(sf.blocks(filename_stereo, blocksize=3, overlap=2))
assert_equal_list_of_arrays(blocks, [data_stereo[0:3], data_stereo[1:4]])


def test_blocks_with_start():
blocks = list(sf.blocks(filename_stereo, blocksize=2, start=2))
assert_equal_list_of_arrays(blocks, [data_stereo[2:4]])


def test_blocks_with_stop():
blocks = list(sf.blocks(filename_stereo, blocksize=2, stop=2))
assert_equal_list_of_arrays(blocks, [data_stereo[0:2]])

with pytest.raises(TypeError):
list(sf.blocks(filename_stereo, blocksize=2, frames=2, stop=2))


def test_blocks_with_too_large_start():
blocks = list(sf.blocks(filename_stereo, blocksize=2, start=666))
assert_equal_list_of_arrays(blocks, [[]])


def test_blocks_with_too_large_stop():
blocks = list(sf.blocks(filename_stereo, blocksize=3, stop=666))
assert_equal_list_of_arrays(blocks, [data_stereo[0:3], data_stereo[3:4]])


def test_blocks_with_negative_start_and_stop():
blocks = list(sf.blocks(filename_stereo, blocksize=2, start=-2, stop=-1))
assert_equal_list_of_arrays(blocks, [data_stereo[-2:-1]])


def test_blocks_with_stop_smaller_than_start():
blocks = list(sf.blocks(filename_stereo, blocksize=2, start=2, stop=1))
assert blocks == []


def test_blocks_with_frames():
blocks = list(sf.blocks(filename_stereo, blocksize=2, frames=3))
assert_equal_list_of_arrays(blocks, [data_stereo[0:2], data_stereo[2:3]])


def test_blocks_with_frames_and_fill_value():
blocks = list(
sf.blocks(filename_stereo, blocksize=2, frames=3, fill_value=0))
last_block = np.row_stack((data_stereo[2:3], np.zeros((1, 2))))
assert_equal_list_of_arrays(blocks, [data_stereo[0:2], last_block])


def test_blocks_with_out():
out = np.empty((3, 2))
blocks = list(sf.blocks(filename_stereo, out=out))
assert blocks[0] is out
# First frame was overwritten by second block:
assert np.all(blocks[0] == [[0.25, -0.25], [0.75, -0.75], [0.5, -0.5]])
assert blocks[1].base is out
assert np.all(blocks[1] == [[0.25, -0.25]])

with pytest.raises(TypeError):
list(sf.blocks(filename_stereo, blocksize=3, out=out))


def test_blocks_mono():
blocks = list(sf.blocks(filename_mono, blocksize=3, dtype='int16',
always_2d=False, fill_value=0))
assert_equal_list_of_arrays(blocks, [[0, 1, 2], [-2, -1, 0]])


def test_blocks_rw_existing(sf_stereo_rw_existing):
blocks = list(sf_stereo_rw_existing.blocks(blocksize=2))
assert_equal_list_of_arrays(blocks, [data_stereo[0:2], data_stereo[2:4]])


def test_blocks_rw_new(sf_stereo_rw_new):
"""There is nothing to yield in a new 'rw' file."""
blocks = list(sf_stereo_rw_new.blocks(blocksize=2, frames=666))
assert blocks == []


def test_blocks_write(sf_stereo_w):
with pytest.raises(RuntimeError):
list(sf_stereo_w.blocks(blocksize=2))


# -----------------------------------------------------------------------------
# Test file metadata
# -----------------------------------------------------------------------------
Expand Down