diff --git a/mne/filter.py b/mne/filter.py index c2f4d73bdf9..1b50fe05596 100644 --- a/mne/filter.py +++ b/mne/filter.py @@ -3,13 +3,180 @@ from scipy.fftpack import fft, ifft -def band_pass_filter(x, Fs, Fp1, Fp2): +def _overlap_add_filter(x, h, n_fft=None, zero_phase=True): + """ Filter using overlap-add FFTs. + + Filters the signal x using a filter with the impulse response h. + If zero_phase==True, the amplitude response is scaled and the filter is + applied in forward and backward direction, resulting in a zero-phase + filter. + + Parameters + ---------- + x : 1d array + Signal to filter + h : 1d array + Filter impule response (FIR filter coefficients) + n_fft : int + Length of the FFT. If None, the best size is determined automatically. + zero_phase : bool + If True: the filter is applied in forward and backward direction, + resulting in a zero-phase filter + + Returns + ------- + xf : 1d array + x filtered + """ + n_h = len(h) + + # Extend the signal by mirroring the edges to reduce transient filter + # response + n_edge = min(n_h, len(x)) + + x_ext = np.r_[2 * x[0] - x[n_edge - 1:0:-1],\ + x, 2 * x[-1] - x[-2:-n_edge - 1:-1]] + + n_x = len(x_ext) + + # Determine FFT length to use + if n_fft == None: + if n_x > n_h: + n_tot = 2 * n_x if zero_phase else n_x + + N = 2 ** np.arange(np.ceil(np.log2(n_h)), + np.floor(np.log2(n_tot))) + cost = np.ceil(n_tot / (N - n_h + 1)) * N * (np.log2(N) + 1) + n_fft = N[np.argmin(cost)] + else: + # Use only a single block + n_fft = 2 ** np.ceil(np.log2(n_x + n_h - 1)) + + if n_fft <= 0: + raise ValueError('N_fft is too short, has to be at least len(h)') + + # Filter in frequency domain + h_fft = fft(np.r_[h, np.zeros(n_fft - n_h)]) + + if zero_phase: + # We will apply the filter in forward and backward direction: Scale + # frequency response of the filter so that the shape of the amplitude + # response stays the same when it is applied twice + + # be careful not to divide by too small numbers + idx = np.where(np.abs(h_fft) > 1e-6) + h_fft[idx] = h_fft[idx] / np.sqrt(np.abs(h_fft[idx])) + + x_filtered = np.zeros_like(x_ext) + + # Segment length for signal x + n_seg = n_fft - n_h + 1 + + # Number of segements (including fractional segments) + n_segments = int(np.ceil(n_x / float(n_seg))) + + filter_input = x_ext + + for pass_no in range(2) if zero_phase else range(1): + + if pass_no == 1: + # second pass: flip signal + filter_input = np.flipud(x_filtered) + x_filtered = np.zeros_like(x_ext) + + for seg_idx in range(n_segments): + seg = filter_input[seg_idx * n_seg:(seg_idx + 1) * n_seg] + seg_fft = fft(np.r_[seg, np.zeros(n_fft - len(seg))]) + + if seg_idx * n_seg + n_fft < n_x: + x_filtered[seg_idx * n_seg:seg_idx * n_seg + n_fft]\ + += np.real(ifft(h_fft * seg_fft)) + else: + # Last segment + x_filtered[seg_idx * n_seg:] \ + += np.real(ifft(h_fft * seg_fft))[:n_x - seg_idx * n_seg] + + # Remove mirrored edges that we added + x_filtered = x_filtered[n_edge - 1:-n_edge + 1] + + if zero_phase: + # flip signal back + x_filtered = np.flipud(x_filtered) + + return x_filtered + + +def _filter(x, Fs, freq, gain, filter_length=None): + """ Filter signal using gain control points in the frequency domain. + + The filter impulse response is constructed from a Hamming window (window + used in "firwin2" function) to avoid ripples in the frequency reponse + (windowing is a smoothing in frequency domain). The filter is zero-phase. + + Parameters + ---------- + x : 1d array + Signal to filter + Fs : float + Sampling rate in Hz + freq : 1d array + Frequency sampling points in Hz + gain : 1d array + Filter gain at frequency sampling points + filter_length : int (default: None) + Length of the filter to use. If None or "len(x) < filter_length", the + filter length used is len(x). Otherwise, overlap-add filtering with a + filter of the specified length is used (faster for long signals). + + Returns + ------- + xf : 1d array + x filtered + """ + assert x.ndim == 1 + + # normalize frequencies + freq = [f / (Fs / 2) for f in freq] + + if filter_length == None or len(x) <= filter_length: + # Use direct FFT filtering for short signals + + Norig = len(x) + + if (gain[-1] == 0.0 and Norig % 2 == 1) \ + or (gain[-1] == 1.0 and Norig % 2 != 1): + # Gain at Nyquist freq: 1: make x EVEN, 0: make x ODD + x = np.r_[x, x[-1]] + + N = len(x) + + H = signal.firwin2(N, freq, gain) + + # Make zero-phase filter function + B = np.abs(fft(H)) + + xf = np.real(ifft(fft(x) * B)) + xf = xf[:Norig] + x = x[:Norig] + else: + # Use overlap-add filter with a fixed length + N = filter_length + + if (gain[-1] == 0.0 and N % 2 == 1) \ + or (gain[-1] == 1.0 and N % 2 != 1): + # Gain at Nyquist freq: 1: make N EVEN, 0: make N ODD + N += 1 + + H = signal.firwin2(N, freq, gain) + xf = _overlap_add_filter(x, H, zero_phase=True) + + return xf + + +def band_pass_filter(x, Fs, Fp1, Fp2, filter_length=None): """Bandpass filter for the signal x. - An acausal fft algorithm is applied (i.e. no phase shift). The filter - functions is constructed from a Hamming window (window used in "firwin2" - function) to avoid ripples in the frequency reponse (windowing is a - smoothing in frequency domain) + Applies a zero-phase bandpass filter to the signal x. Parameters ---------- @@ -21,6 +188,10 @@ def band_pass_filter(x, Fs, Fp1, Fp2): low cut-off frequency Fp2 : float high cut-off frequency + filter_length : int (default: None) + Length of the filter to use. If None or "len(x) < filter_length", the + filter length used is len(x). Otherwise, overlap-add filtering with a + filter of the specified length is used (faster for long signals). Returns ------- @@ -43,6 +214,7 @@ def band_pass_filter(x, Fs, Fp1, Fp2): Fs1 = Fp1 - 0.5 in Hz Fs2 = Fp2 + 0.5 in Hz """ + Fs = float(Fs) Fp1 = float(Fp1) Fp2 = float(Fp2) @@ -50,41 +222,16 @@ def band_pass_filter(x, Fs, Fp1, Fp2): Fs1 = Fp1 - 0.5 Fs2 = Fp2 + 0.5 - assert x.ndim == 1 - - # Make x EVEN - Norig = len(x) - if Norig % 2 == 1: - x = np.r_[x, 1] - - # Normalize frequencies - Ns1 = Fs1 / (Fs / 2) - Ns2 = Fs2 / (Fs / 2) - Np1 = Fp1 / (Fs / 2) - Np2 = Fp2 / (Fs / 2) - - # Construct the filter function H(f) - N = len(x) - - B = signal.firwin2(N, [0, Ns1, Np1, Np2, Ns2, 1], [0, 0, 1, 1, 0, 0]) - - # Make zero-phase filter function - H = np.abs(fft(B)) - - xf = np.real(ifft(fft(x) * H)) - xf = xf[:Norig] - x = x[:Norig] + xf = _filter(x, Fs, [0, Fs1, Fp1, Fp2, Fs2, Fs / 2], [0, 0, 1, 1, 0, 0], + filter_length) return xf -def low_pass_filter(x, Fs, Fp): +def low_pass_filter(x, Fs, Fp, filter_length=None): """Lowpass filter for the signal x. - An acausal fft algorithm is applied (i.e. no phase shift). The filter - functions is constructed from a Hamming window (window used in "firwin2" - function) to avoid ripples in the frequency reponse (windowing is a - smoothing in frequency domain) + Applies a zero-phase lowpass filter to the signal x. Parameters ---------- @@ -94,6 +241,10 @@ def low_pass_filter(x, Fs, Fp): sampling rate Fp : float cut-off frequency + filter_length : int (default: None) + Length of the filter to use. If None or "len(x) < filter_length", the + filter length used is len(x). Otherwise, overlap-add filtering with a + filter of the specified length is used (faster for long signals). Returns ------- @@ -113,41 +264,20 @@ def low_pass_filter(x, Fs, Fp): Fp Fp+0.5 """ + Fs = float(Fs) Fp = float(Fp) - assert x.ndim == 1 - - # Make x EVEN - Norig = len(x) - if Norig % 2 == 1: - x = np.r_[x, 1] - - # Normalize frequencies - Ns = (Fp + 0.5) / (Fs / 2) - Np = Fp / (Fs / 2) - - # Construct the filter function H(f) - N = len(x) + Fstop = Fp + 0.5 - B = signal.firwin2(N, [0, Np, Ns, 1], [1, 1, 0, 0]) - - # Make zero-phase filter function - H = np.abs(fft(B)) - - xf = np.real(ifft(fft(x) * H)) - xf = xf[:Norig] - x = x[:Norig] + xf = _filter(x, Fs, [0, Fp, Fstop, Fs / 2], [1, 1, 0, 0], filter_length) return xf -def high_pass_filter(x, Fs, Fp): +def high_pass_filter(x, Fs, Fp, filter_length=None): """Highpass filter for the signal x. - An acausal fft algorithm is applied (i.e. no phase shift). The filter - functions is constructed from a Hamming window (window used in "firwin2" - function) to avoid ripples in the frequency reponse (windowing is a - smoothing in frequency domain) + Applies a zero-phase highpass filter to the signal x. Parameters ---------- @@ -157,6 +287,10 @@ def high_pass_filter(x, Fs, Fp): sampling rate Fp : float cut-off frequency + filter_length : int (default: None) + Length of the filter to use. If None or "len(x) < filter_length", the + filter length used is len(x). Otherwise, overlap-add filtering with a + filter of the specified length is used (faster for long signals). Returns ------- @@ -176,29 +310,12 @@ def high_pass_filter(x, Fs, Fp): Fp-0.5 Fp """ - Fp = float(Fp) - - assert x.ndim == 1 - # Make x ODD - Norig = len(x) - if Norig % 2 == 0: - x = np.r_[x, 1] - - # Normalize frequencies - Ns = (Fp - 0.5) / (Fs / 2) - Np = Fp / (Fs / 2) - - # Construct the filter function H(f) - N = len(x) - - B = signal.firwin2(N, [0, Ns, Np, 1], [0, 0, 1, 1]) + Fs = float(Fs) + Fp = float(Fp) - # Make zero-phase filter function - H = np.abs(fft(B)) + Fstop = Fp - 0.5 - xf = np.real(ifft(fft(x) * H)) - xf = xf[:Norig] - x = x[:Norig] + xf = _filter(x, Fs, [0, Fstop, Fp, Fs / 2], [0, 0, 1, 1], filter_length) - return xf + return xf \ No newline at end of file diff --git a/mne/tests/test_filter.py b/mne/tests/test_filter.py index 98f5ff1736f..83d5bab4578 100644 --- a/mne/tests/test_filter.py +++ b/mne/tests/test_filter.py @@ -3,10 +3,29 @@ from ..filter import band_pass_filter, high_pass_filter, low_pass_filter + def test_filters(): - a = np.random.randn(1000) - Fs = 1000 + Fs = 500 + sig_len_secs = 60 + + # Filtering of short signals (filter length = len(a)) + a = np.random.randn(sig_len_secs * Fs) bp = band_pass_filter(a, Fs, 4, 8) lp = low_pass_filter(a, Fs, 8) hp = high_pass_filter(lp, Fs, 4) assert_array_almost_equal(hp, bp, 2) + + # Overlap-add filtering with a fixed filter length + filter_length = 8192 + bp_oa = band_pass_filter(a, Fs, 4, 8, filter_length) + lp_oa = low_pass_filter(a, Fs, 8, filter_length) + hp_oa = high_pass_filter(lp_oa, Fs, 4, filter_length) + assert_array_almost_equal(hp_oa, bp_oa, 2) + + # The two methods should give the same result + # As filtering for short signals uses a circular convolution (FFT) and + # the overlap-add filter implements a linear convolution, the signal + # boundary will be slightly different and we ignore it + n_edge_ignore = 1000 + assert_array_almost_equal(hp[n_edge_ignore:-n_edge_ignore], + hp_oa[n_edge_ignore:-n_edge_ignore], 2) \ No newline at end of file