diff --git a/pysoundfile.py b/pysoundfile.py index 088e960..1414a4d 100644 --- a/pysoundfile.py +++ b/pysoundfile.py @@ -246,6 +246,13 @@ 'RF64': 'PCM_16', } +_ffi_types = { + _np.dtype('float64'): 'double', + _np.dtype('float32'): 'float', + _np.dtype('int32'): 'int', + _np.dtype('int16'): 'short' +} + _snd = _ffi.dlopen('sndfile') @@ -579,7 +586,8 @@ def seek(self, frames, whence=SEEK_SET, which=None): raise ValueError("Invalid which: %s" % repr(which)) return _snd.sf_seek(self._file, frames, whence) - def read(self, frames=-1, dtype='float64'): + def read(self, frames=-1, dtype='float64', always_2d=True, + fill_value=None, out=None): """Read a number of frames from the file. Reads the given number of frames in the given data format from @@ -587,80 +595,172 @@ def read(self, frames=-1, dtype='float64'): position by the same number of frames. Use frames=-1 to read until the end of the file. - Returns the read data as a (frames x channels) NumPy array. + By default, a two-dimensional array is returned even if the + sound file has only one channel. Use always_2d=False to return + a one-dimensional array in this case. + + If there is less data left in the file than requested, a + shorter array is returned. Use fill_value to always return the + given number of frames and fill all remaining frames with + fill_value. - If there is not enough data left in the file to read, a - smaller NumPy array will be returned. + If out is given as a numpy array, the data is written into + that array. If there is not enough data left in the file to + fill the array, the rest of the frames are ignored and a + smaller view to the array is returned. Use fill_value to fill + the rest of the array and always return the full-length array. """ + self._check_if_closed() if self.mode == 'w': raise RuntimeError("Cannot read from file opened in write mode") - formats = { - _np.float64: 'double[]', - _np.float32: 'float[]', - _np.int32: 'int[]', - _np.int16: 'short[]' - } - readers = { - _np.float64: _snd.sf_readf_double, - _np.float32: _snd.sf_readf_float, - _np.int32: _snd.sf_readf_int, - _np.int16: _snd.sf_readf_short - } - dtype = _np.dtype(dtype) - if dtype.type not in formats: - raise ValueError("Can only read int16, int32, float32 and float64") + + remaining_frames = self.frames - self.seek(0, SEEK_CUR, 'r') + if frames < 0: - curr = self.seek(0, SEEK_CUR, 'r') - frames = self.frames - curr - data = _ffi.new(formats[dtype.type], frames*self.channels) - read = readers[dtype.type](self._file, data, frames) + frames = remaining_frames + if frames > remaining_frames and fill_value is None: + frames = remaining_frames + + if out is None: + if always_2d or self.channels > 1: + out = _np.empty((frames, self.channels), dtype) + else: + out = _np.empty(frames, dtype) + + self._check_frames_and_channels(out) + + frames_to_read = min(len(out), remaining_frames) + + ffi_type = _ffi_types[out.dtype] + reader = getattr(_snd, 'sf_readf_' + ffi_type) + ptr = _ffi.cast(ffi_type + '*', out.ctypes.data) + read_frames = reader(self._file, ptr, frames_to_read) self._handle_error() - np_data = _np.frombuffer(_ffi.buffer(data), dtype=dtype, - count=read*self.channels) - return _np.reshape(np_data, (read, self.channels)) + assert read_frames == frames_to_read + + if frames_to_read == len(out): + return out + elif fill_value is None: + return out[:frames_to_read] + else: + out[frames_to_read:] = fill_value + return out def write(self, data): """Write a number of frames to the file. - Writes a number of frames to the current read position in the - file. This also advances the read position by the same number + Writes a number of frames to the current write position in the + file. This also advances the write position by the same number of frames and enlarges the file if necessary. The data must be provided as a (frames x channels) NumPy - array. + array or as one-dimensional array for mono signals. """ self._check_if_closed() if self.mode == 'r': raise RuntimeError("Cannot write to file opened in read mode") - formats = { - _np.float64: 'double*', - _np.float32: 'float*', - _np.int32: 'int*', - _np.int16: 'short*' - } - writers = { - _np.float64: _snd.sf_writef_double, - _np.float32: _snd.sf_writef_float, - _np.int32: _snd.sf_writef_int, - _np.int16: _snd.sf_writef_short - } - if data.dtype.type not in writers: - raise ValueError("Data must be int16, int32, float32 or float64") - raw_data = _ffi.new('char[]', data.flatten().tostring()) - written = writers[data.dtype.type](self._file, - _ffi.cast( - formats[data.dtype.type], raw_data), - len(data)) + + data = _np.ascontiguousarray(data) + + self._check_frames_and_channels(data) + + ffi_type = _ffi_types[data.dtype] + writer = getattr(_snd, 'sf_writef_' + ffi_type) + ptr = _ffi.cast(ffi_type + '*', data.ctypes.data) + written = writer(self._file, ptr, len(data)) self._handle_error() + assert written == len(data) curr = self.seek(0, SEEK_CUR, 'w') self._info.frames = self.seek(0, SEEK_END, 'w') self.seek(curr, SEEK_SET, 'w') - return written + def _check_frames_and_channels(self, data): + """Error if data is not compatible with the shape of the sound file. + + """ + if data.dtype not in _ffi_types: + raise ValueError("data.dtype must be one of %s" % + repr([dt.name for dt in _ffi_types])) + + if not data.flags.c_contiguous: + raise ValueError("data must be c_contiguous") + + if data.ndim not in (1, 2): + raise ValueError("data must be one- or two-dimensional") + elif data.ndim == 1 and self.channels != 1: + raise ValueError("data must have 2 dimensions for non-mono signals") + elif data.ndim == 2 and data.shape[1] != self.channels: + raise ValueError("two-dimensional data must have %i columns" % + self.channels) + + +def open(*args, **kwargs): + """Return a new SoundFile object. + + Takes the same arguments as SoundFile.__init__(). + + """ + return SoundFile(*args, **kwargs) + + +def read(file, frames=-1, start=None, stop=None, **kwargs): + """Read a sound file and return its contents as NumPy array. + + The number of frames to read can be specified with frames, the + position to start reading can be specified with start. + By default, the whole file is read from the beginning. + Alternatively, a range can be specified with start and stop. + Both start and stop accept negative indices to specify positions + relative to the end of the file. + + The keyword arguments out, dtype, fill_value, channels_first and + always_2d are forwarded to SoundFile.read(). + All further arguments are forwarded to SoundFile.__init__(). + + """ + from inspect import getargspec + + if frames >= 0 and stop is not None: + raise RuntimeError("Only one of (frames, stop) may be used") + + read_kwargs = {} + for arg in getargspec(SoundFile.read).args: + if arg in kwargs: + read_kwargs[arg] = kwargs.pop(arg) + with SoundFile(file, 'r', **kwargs) as f: + start, stop, _ = slice(start, stop).indices(f.frames) + f.seek(start, SEEK_SET) + data = f.read(frames, **read_kwargs) + return data, f.sample_rate + + +def write(data, file, sample_rate, *args, **kwargs): + """Write data from a NumPy array into a sound file. + + If file exists, it will be overwritten! + + If data is one-dimensional, a mono file is written. + For two-dimensional data, the columns are interpreted as channels by + default. Use channels_first=False to interpret the rows as channels. + All further arguments are forwarded to SoundFile.__init__(). + + Example usage: + + import pysoundfile as sf + sf.write(myarray, 'myfile.wav', 44100, 'PCM_24') + + """ + data = _np.asarray(data) + if data.ndim == 1: + channels = 1 + else: + channels = data.shape[1] + with SoundFile(file, 'w', sample_rate, channels, *args, **kwargs) as f: + f.write(data) def default_subtype(format): diff --git a/tests/test_pysoundfile.py b/tests/test_pysoundfile.py index 93afc1a..fe9e0ca 100644 --- a/tests/test_pysoundfile.py +++ b/tests/test_pysoundfile.py @@ -1,5 +1,5 @@ import unittest -from pysoundfile import * +import pysoundfile as sf import numpy as np import os import io @@ -11,12 +11,14 @@ def setUp(self): self.channels = 2 self.filename = 'test.wav' self.data = np.ones((self.sample_rate, self.channels))*0.5 - with SoundFile(self.filename, 'w', self.sample_rate, self.channels) as f: + with sf.SoundFile(self.filename, 'w', self.sample_rate, self.channels) as f: f.write(self.data) def tearDown(self): os.remove(self.filename) + +class TestBasicAttributesOfWaveFile(TestWaveFile): def test_file_exists(self): """The test file should exist""" self.assertTrue(os.path.isfile(self.filename)) @@ -24,46 +26,46 @@ def test_file_exists(self): def test_open_file_descriptor(self): """Opening a file handle should work""" handle = os.open(self.filename, os.O_RDONLY) - with SoundFile(handle) as f: + with sf.SoundFile(handle) as f: self.assertTrue(np.all(self.data == f[:])) def test_open_virtual_io(self): """Opening a file-like object should work""" with open(self.filename, 'rb') as bytesio: - with SoundFile(bytesio) as f: + with sf.SoundFile(bytesio) as f: self.assertTrue(np.all(self.data == f[:])) def test_read_mode(self): """Opening the file in read mode should open in read mode from beginning""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: self.assertEqual(f.mode, 'r') - self.assertEqual(f.seek(0, SEEK_CUR), 0) + self.assertEqual(f.seek(0, sf.SEEK_CUR), 0) def test_write_mode(self): """Opening the file in write mode should open in write mode from beginning""" - with SoundFile(self.filename, 'w', self.sample_rate, self.channels) as f: + with sf.SoundFile(self.filename, 'w', self.sample_rate, self.channels) as f: self.assertEqual(f.mode, 'w') - self.assertEqual(f.seek(0, SEEK_CUR), 0) + self.assertEqual(f.seek(0, sf.SEEK_CUR), 0) def test_rw_mode(self): """Opening the file in rw mode should open in rw mode from end""" - with SoundFile(self.filename, 'rw') as f: + with sf.SoundFile(self.filename, 'rw') as f: self.assertEqual(f.mode, 'rw') - self.assertEqual(f.seek(0, SEEK_CUR), len(f)) + self.assertEqual(f.seek(0, sf.SEEK_CUR), len(f)) def test_channels(self): """The test file should have the correct number of channels""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: self.assertEqual(f.channels, self.channels) def test_sample_rate(self): """The test file should have the correct number of sample rate""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: self.assertEqual(f.sample_rate, self.sample_rate) def test_format(self): """The test file should be a wave file""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: self.assertEqual(f.format, 'WAV') self.assertEqual(f.subtype, 'PCM_16') self.assertEqual(f.endian, 'FILE') @@ -72,121 +74,245 @@ def test_format(self): def test_context_manager(self): """The context manager should close the file""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: pass self.assertTrue(f.closed) def test_closing(self): """Closing a file should close it""" - f = SoundFile(self.filename) + f = sf.SoundFile(self.filename) self.assertFalse(f.closed) f.close() self.assertTrue(f.closed) def test_file_length(self): """The file should have the correct length""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: self.assertEqual(len(f), self.sample_rate) def test_file_contents(self): """The file should contain the correct data""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: self.assertTrue(np.all(self.data == f[:])) + def test_file_attributes(self): + """Changing a file attribute should save it on disk""" + with sf.SoundFile(self.filename, 'rw') as f: + f.title = 'testing' + with sf.SoundFile(self.filename) as f: + self.assertEqual(f.title, 'testing') + + def test_non_file_attributes(self): + """Changing a non-file attribute should not save to disk""" + with sf.SoundFile(self.filename, 'rw') as f: + f.foobar = 'testing' + with sf.SoundFile(self.filename) as f: + with self.assertRaises(AttributeError): + f.foobar + + +class TestSeekWaveFile(TestWaveFile): def test_seek(self): """Seeking should advance the read/write pointer""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: self.assertEqual(f.seek(100), 100) def test_seek_cur(self): """seeking multiple times should advance the read/write pointer""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: f.seek(100) - self.assertEqual(f.seek(100, whence=SEEK_CUR), 200) + self.assertEqual(f.seek(100, whence=sf.SEEK_CUR), 200) def test_seek_end(self): """seeking from end should advance the read/write pointer""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: self.assertEqual(f.seek(-100, whence=SEEK_END), self.sample_rate-100) def test_seek_read(self): """Read-seeking should advance the read pointer""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: self.assertEqual(f.seek(100, which='r'), 100) def test_seek_write(self): """write-seeking should advance the write pointer""" - with SoundFile(self.filename, 'rw') as f: + with sf.SoundFile(self.filename, 'rw') as f: self.assertEqual(f.seek(100, which='w'), 100) def test_flush(self): """After flushing, data should be written to disk""" - with SoundFile(self.filename, 'rw') as f: + with sf.SoundFile(self.filename, 'rw') as f: size = os.path.getsize(self.filename) f.write(np.zeros((10,2))) f.flush() self.assertEqual(os.path.getsize(self.filename), size+40) + +class TestSeekWaveFile(TestWaveFile): def test_read(self): """read should read data and advance the read pointer""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: data = f.read(100) self.assertTrue(np.all(data == self.data[:100])) - self.assertEqual(100, f.seek(0, SEEK_CUR)) + self.assertEqual(100, f.seek(0, sf.SEEK_CUR)) def test_read_write_only(self): """reading a write-only file should not work""" - with SoundFile(self.filename, 'w', self.sample_rate, self.channels) as f: + with sf.SoundFile(self.filename, 'w', self.sample_rate, self.channels) as f: with self.assertRaises(RuntimeError) as err: f.read(100) def test_default_read_format(self): """By default, np.float64 should be read""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: self.assertEqual(f[:].dtype, np.float64) def test_read_int16(self): """reading 16 bit integers should read np.int16""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: data = f.read(100, dtype='int16') self.assertEqual(data.dtype, np.int16) def test_read_int32(self): """reading 32 bit integers should read np.int32""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: data = f.read(100, dtype='int32') self.assertEqual(data.dtype, np.int32) def test_read_float32(self): """reading 32 bit floats should read np.float32""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: data = f.read(100, dtype='float32') self.assertEqual(data.dtype, np.float32) def test_read_indexing(self): """Reading using indexing should read but not advance read pointer""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: self.assertTrue(np.all(f[:100] == self.data[:100])) - self.assertEqual(0, f.seek(0, SEEK_CUR)) + self.assertEqual(0, f.seek(0, sf.SEEK_CUR)) + + def test_read_number_of_frames(self): + """Reading N frames should return N frames""" + with sf.SoundFile(self.filename) as f: + data = f.read(100) + self.assertEqual(len(data), 100) + + def test_read_all_frames(self): + """Reading should return all remaining frames""" + with sf.SoundFile(self.filename) as f: + f.seek(-100, sf.SEEK_END) + data = f.read() + self.assertEqual(len(data), 100) + + def test_read_number_of_frames_over_end(self): + """Reading N frames at EOF should return only remaining frames""" + with sf.SoundFile(self.filename) as f: + f.seek(-50, sf.SEEK_END) + data = f.read(100) + self.assertEqual(len(data), 50) + + def test_read_number_of_frames_over_end_with_fill(self): + """Reading N frames with fill at EOF should return N frames""" + with sf.SoundFile(self.filename) as f: + f.seek(-50, sf.SEEK_END) + data = f.read(100, fill_value=0) + self.assertEqual(len(data), 100) + self.assertTrue(np.all(data[50:] == 0)) + + def test_read_into_out(self): + """Reading into out should return data and write into out""" + with sf.SoundFile(self.filename) as f: + data = np.empty((100, f.channels), dtype='float64') + out_data = f.read(out=data) + self.assertTrue(np.all(data == out_data)) + + def test_read_mono_into_out(self): + """Reading mono signal into out should return data and write into out""" + # create a dummy mono wave file + self.sample_rate = 44100 + self.channels = 1 + self.filename = 'test.wav' + self.data = np.ones((self.sample_rate, self.channels))*0.5 + with sf.SoundFile(self.filename, 'w', self.sample_rate, self.channels) as f: + f.write(self.data) + + with sf.SoundFile(self.filename) as f: + data = np.empty((100, f.channels), dtype='float64') + out_data = f.read(out=data) + self.assertTrue(np.all(data == out_data)) + + def test_read_into_out_with_too_many_channels(self): + """Reading into malformed out should throw an error""" + with sf.SoundFile(self.filename) as f: + data = np.empty((100, f.channels+1), dtype='float64') + with self.assertRaises(ValueError) as err: + out_data = f.read(out=data) + + def test_read_into_out_with_too_many_dimensions(self): + """Reading into malformed out should throw an error""" + with sf.SoundFile(self.filename) as f: + data = np.empty((100, f.channels, 1), dtype='float64') + with self.assertRaises(ValueError) as err: + out_data = f.read(out=data) + + def test_read_into_zero_len_out(self): + """Reading into aa zero len out should not read anything""" + with sf.SoundFile(self.filename) as f: + data = np.empty((0, f.channels), dtype='float64') + out_data = f.read(out=data) + self.assertTrue(np.all(data == out_data)) + + def test_read_into_out_over_end(self): + """Reading into out over end should return shorter data and write into out""" + with sf.SoundFile(self.filename) as f: + data = np.empty((100, f.channels), dtype='float64') + f.seek(-50, sf.SEEK_END) + out_data = f.read(out=data) + self.assertTrue(np.all(data[:50] == out_data[:50])) + self.assertEqual(out_data.shape, (50,2)) + self.assertEqual(data.shape, (100,2)) + + def test_read_into_out_over_end_with_fill(self): + """Reading into out over end with fill should return padded data and write into out""" + with sf.SoundFile(self.filename) as f: + data = np.empty((100, f.channels), dtype='float64') + f.seek(-50, sf.SEEK_END) + out_data = f.read(out=data, fill_value=0) + self.assertTrue(np.all(data == out_data)) + self.assertTrue(np.all(data[50:] == 0)) + + def test_read_mono_as_array(self): + """Reading with always_2d=False should return array""" + # create a dummy mono wave file + self.sample_rate = 44100 + self.channels = 1 + self.filename = 'test.wav' + self.data = np.ones((self.sample_rate, self.channels))*0.5 + with sf.SoundFile(self.filename, 'w', self.sample_rate, self.channels) as f: + f.write(self.data) + + with sf.SoundFile(self.filename) as f: + data = f.read(100, always_2d=False) + self.assertEqual(data.shape, (100,)) +class TestWriteWaveFile(TestWaveFile): def test_write(self): """write should write data and advance the write pointer""" - with SoundFile(self.filename, 'rw') as f: + with sf.SoundFile(self.filename, 'rw') as f: data = np.zeros((100,2)) - position = f.seek(0, SEEK_CUR) + position = f.seek(0, sf.SEEK_CUR) f.write(data) self.assertTrue(np.all(f[-100:] == data)) - self.assertEqual(100, f.seek(0, SEEK_CUR)-position) + self.assertEqual(100, f.seek(0, sf.SEEK_CUR)-position) def test_write_read_only(self): """writing to a read-only file should not work""" - with SoundFile(self.filename) as f: + with sf.SoundFile(self.filename) as f: with self.assertRaises(RuntimeError) as err: f.write(np.ones((100,2))) def test_write_float_precision(self): """Written float data should be written at most 2**-15 off""" - with SoundFile(self.filename, 'rw') as f: + with sf.SoundFile(self.filename, 'rw') as f: data = np.ones((100,2)) f.write(data) written_data = f[-100:] @@ -194,34 +320,26 @@ def test_write_float_precision(self): def test_write_int_precision(self): """Written int data should be written""" - with SoundFile(self.filename, 'rw') as f: + with sf.SoundFile(self.filename, 'rw') as f: data = np.zeros((100,2)) + 2**15-1 # full scale int16 data = np.array(data, dtype='int16') f.write(data) - f.seek(-100, SEEK_CUR) + f.seek(-100, sf.SEEK_CUR) written_data = f.read(dtype='int16') self.assertTrue(np.all(data == written_data)) def test_write_indexing(self): """Writing using indexing should write but not advance write pointer""" - with SoundFile(self.filename, 'rw') as f: - position = f.seek(0, SEEK_CUR) + with sf.SoundFile(self.filename, 'rw') as f: + position = f.seek(0, sf.SEEK_CUR) data = np.zeros((100,2)) f[:100] = data - self.assertEqual(position, f.seek(0, SEEK_CUR)) + self.assertEqual(position, f.seek(0, sf.SEEK_CUR)) self.assertTrue(np.all(data == f[:100])) - def test_file_attributes(self): - """Changing a file attribute should save it on disk""" - with SoundFile(self.filename, 'rw') as f: - f.title = 'testing' - with SoundFile(self.filename) as f: - self.assertEqual(f.title, 'testing') - - def test_non_file_attributes(self): - """Changing a non-file attribute should not save to disk""" - with SoundFile(self.filename, 'rw') as f: - f.foobar = 'testing' - with SoundFile(self.filename) as f: - with self.assertRaises(AttributeError): - f.foobar +class TestWriteFunctions(TestWaveFile): + def test_write(self): + """write should write data""" + data = np.ones((100,2)) + sf.write(data, self.filename, self.sample_rate) + self.assertTrue(np.allclose(sf.read(self.filename)[0], data, atol=2**-15))