From 24f0a7cb0e02ce9ab653cb58f3c26cd662e3c778 Mon Sep 17 00:00:00 2001 From: Nicolas Barascud <10333715+nbara@users.noreply.github.com> Date: Thu, 25 Jan 2024 15:20:35 +0100 Subject: [PATCH 1/8] implement resonant and non-resonant oscillators --- meegkit/utils/buffer.py | 135 +++++++++ meegkit/utils/filters.py | 580 +++++++++++++++++++++++++++++++++++++++ tests/test_buffer.py | 72 +++++ tests/test_filters.py | 157 +++++++++++ 4 files changed, 944 insertions(+) create mode 100644 meegkit/utils/buffer.py create mode 100644 meegkit/utils/filters.py create mode 100644 tests/test_buffer.py create mode 100644 tests/test_filters.py diff --git a/meegkit/utils/buffer.py b/meegkit/utils/buffer.py new file mode 100644 index 00000000..1cb2733f --- /dev/null +++ b/meegkit/utils/buffer.py @@ -0,0 +1,135 @@ +"""Buffer class for real-time processing.""" +import warnings + +import numpy as np + + +class Buffer: + """Circular buffer for real-time processing. + + Parameters + ---------- + size : int + The number of samples of the buffer. + n_channels : int + The number of channels of the buffer. + + Attributes + ---------- + size : int + The number of samples of the buffer. + n_channels : int + The number of channels of the buffer. + counter : int + The number of samples in the buffer. + head : int + The index of the most recent sample in the buffer. + tail : int + The index of the most recent read sample in the buffer. + _data : ndarray, shape (size, n_channels) + Data buffer. + + + """ + + def __init__(self, size, n_channels=1): + self.size = size + self.n_channels = n_channels + self.reset() + + def reset(self): + """Reset the buffer.""" + self.counter = 0 # number of samples in the buffer + self._head = 0 # most recent sample in the buffer + self._tail = 0 # most recent read sample in the buffer + self._data = np.zeros((self.size, self.n_channels)) + + def push(self, X): + """Update the buffer with new data. + + Parameters + ---------- + X : ndarray, shape (n_samples, n_channels) + The input signal to be transformed. + """ + # Check input + if X.ndim == 0: + # X = np.array([X]) + n_samples = 1 + elif X.ndim == 1: + X = X[:, None] + n_samples = X.shape[0] + elif X.ndim > 2: + raise ValueError("X must be 1- or 2-dimensional") + + # Check size + if n_samples > self.size: + warnings.warn("Buffer overflow: some old data has been lost") + X = X[-self.size:] # keep only the last self.size samples + n_samples = X.shape[0] + + # Warn when data has not been read for a long time + if self.counter + n_samples - self._tail > self.size: + warnings.warn("Buffer overflow: some old data has been discarded") + self._tail = self._head + n_samples - self.size + + # Update buffer + end = self.head + n_samples + if end <= self.size: + # If the new data fits in the remaining space + self._data[self.head:end] = X + else: + # If the new data wraps around to the beginning of the buffer + overflow = end - self.size + self._data[self.head:] = X[:-overflow] + self._data[:overflow] = X[-overflow:] + + self._head += n_samples + self.counter += n_samples + + def get_new_samples(self, n_samples=None): + """Consume n_samples.""" + if n_samples is None: + n_samples = min(self.counter, self.size) + elif n_samples > self.counter: + raise ValueError("Not enough samples in the buffer") + elif n_samples > self.size: + raise ValueError("Requested more samples than buffer size") + + start = self.tail + end = (self.tail + n_samples) % self.size + + if start < end: + samples = self._data[start:end] + else: + samples = np.concatenate((self._data[start:], self._data[:end])) + + self._tail += n_samples + return samples + + def view(self, n_samples): + """View the last n_samples, without consuming them.""" + start = (self.head - n_samples) % self.size + end = self.head + if start < end: + return self._data[start:end] + else: + return np.concatenate((self._data[start:], self._data[:end])) + + def __repr__(self) -> str: + """Return complete string representation.""" + repr = f"Buffer({self.size}, {self.n_channels})\n" + repr += f"> counter: {self.counter}\n" + repr += f"> head: {self.head}\n" + repr += f"> tail: {self.tail}\n" + return repr + + @property + def tail(self): + """Get the index of the tail.""" + return self._tail % self.size + + @property + def head(self): + """Get the index of the head.""" + return self._head % self.size diff --git a/meegkit/utils/filters.py b/meegkit/utils/filters.py new file mode 100644 index 00000000..fe199271 --- /dev/null +++ b/meegkit/utils/filters.py @@ -0,0 +1,580 @@ + +"""Real-time phase and amplitude estimation using resonant oscillators. + +.. [1] Rosenblum, M., Pikovsky, A., Kühn, A.A. et al. Real-time estimation + of phase and amplitude with application to neural data. Sci Rep 11, 18037 + (2021). https://doi.org/10.1038/s41598-021-97560-5 +""" +import numpy as np + +from .buffer import Buffer + + +class Device: + """Measurement device. + + This class implements a linear oscillator with a given frequency and damping. + + Parameters + ---------- + om0 : float + Oscillator frequency (estimation). + dt : float + Sampling interval. + damping : float + Damping parameter. + type : str in {"integrator", "oscillator"} + Type of the device. + + """ + + def __init__(self, om0, dt, damping, type, mu=500): + self.om0 = om0 + self.dt = dt + self.damping = damping + self.type = type + self.init_coefs(om0, dt, damping) + + # For the integrator + if type == "integrator": + self.mu = mu + self.edelmu = np.exp(-self.dt / self.mu) + + self.state = dict(x=0, dx=0) + + + def init_coefs(self, om0, dt, damping): + """Initialize coefficients for solving oscillator's equations.""" + self.c1, self.c2, self.c3, self.enuDel, self.ealDel, self.eta = \ + init_coefs(om0, dt, damping) + self.om0 = om0 + + def step(self, sprev, s, snew): + """Perform one step of the oscillator equations. + + Parameters + ---------- + x : float + State variable x. + y : float + State variable y. + sprev : float + Previous measurement. + s : float + Current measurement. + snew : float + New measurement. + + Returns + ------- + x : float + Updated state variable x. + y : float + Updated state variable y. + """ + if self.type == "oscillator": + x = self.state["x"] + y = self.state["dx"] + x, dx = one_step_oscillator(x, y, self.damping, self.eta, self.enuDel, + self.ealDel, self.c1, self.c2, self.c3, + sprev, s, snew) + self.state["x"] = x + self.state["dx"] = dx + + elif self.type == "integrator": + x = self.state["x"] + x = one_step_integrator(x, self.edelmu, self.mu, self.dt, + sprev, s, snew) + self.state["x"] = x + + return self + + +class NonResOscillator: + """Real-time measurement of phase and amplitude using non-resonant oscillator. + + This estimator relies on the resonance effect. The measuring “device” consists + of two linear damped oscillators. The oscillators' frequency is much larger + than the frequency of the signal, i.e., the system is far from resonance. + We choose the damping parameters to ensure that (i) the phase of the first + linear oscillator equals that of the input and that (ii) amplitude of the + second one and the input relate by a known constant multiplicator. The + technique yields both phase and amplitude of the input signal. + + This estimator includes an automated frequency-tuning algorithm to adjust + to the a priori unknown signal frequency. + + For a detailed description, refer to [1]_. + + Parameters + ---------- + fs : float + Sampling frequency in Hz. + nu : float + Rough estimate of the main frequency of interest. + + References + ---------- + .. [1] Rosenblum, M., Pikovsky, A., Kühn, A.A. et al. Real-time estimation + of phase and amplitude with application to neural data. Sci Rep 11, 18037 + (2021). https://doi.org/10.1038/s41598-021-97560-5 + """ + + def __init__(self, fs=250, nu=1.1): + + # Parameters of the measurement "devices" + self.dt = 1 / fs # Sampling interval + self.nu = nu # Rough estimate of the tremor frequency + self.om0 = 5 * nu # Oscillator frequency (estimation) + self.alpha_a = 2.5 * self.om0 # Damping parameter for the "amplitude device" + self.gamma_a = self.alpha_a / 2 + self.alpha_p = 0.2 # Damping parameter for the "phase device" + self.gamma_p = self.alpha_p / 2 + self.factor = np.sqrt((self.om0 ** 2 - nu ** 2) ** 2 + (self.alpha_a * nu) ** 2) + + # Set up the phase amplitude "devices" + self.ampl_device = Device(self.om0, self.dt, self.gamma_a, "oscillator") + self.phase_device = Device(self.om0, self.dt, self.gamma_p, "oscillator") + + # Update parameters + update_factor = 5 + self.memory = round(2 * np.pi / self.om0 / self.dt) + self.update_point = 2 * self.memory + self.update_step = round(self.memory / update_factor) + self._tbuf = np.arange(self.memory) * self.dt + self._Sx = np.sum(self._tbuf) + self._denom = self.memory * np.sum(self._tbuf ** 2) - self._Sx ** 2 + + # Buffer to store past input values + self.n_channels = None + self.buffer = None + + def transform(self, X): + """Transform the input signal into phase and amplitude estimates. + + Parameters + ---------- + X : ndarray, shape (n_samples, n_channels) + The input signal to be transformed. + + Returns + ------- + phase : float + Current phase estimate. + ampl : float + Current amplitude estimate. + """ + n_samples = X.shape[0] + phase = np.zeros(n_samples) + ampl = np.zeros(n_samples) + + if self.buffer is None: + n_channels = 1 # X.shape[1] + self.buffer = Buffer(self.memory, n_channels) + + for k in range(n_samples): + self.buffer.push(X[k]) + + if self.buffer.counter < 3: + continue # Skip the first two samples + + # Amplitude estimation + spp, sp, s = self.buffer.view(3) + self.ampl_device.step(spp, sp, s) + z = self.ampl_device.state["dx"] / self.nu + ampl[k] = self.factor * np.sqrt(z ** 2 + self.ampl_device.state["x"] ** 2) + + # Phase estimation + self.phase_device.step(spp, sp, s) + z = self.phase_device.state["dx"] / self.nu + phase[k] = np.arctan2(-z, self.phase_device.state["x"]) + + if k > self.update_point: + tmp = np.unwrap(phase[k - self.memory:k]) + self.nu = (self.memory * np.sum(self._tbuf * tmp) - + self._Sx * np.sum(tmp)) / self._denom + self.update_point += self.update_step + + + return phase, ampl + + +class ResOscillator: + """Real-time measurement of phase and amplitude using a resonant oscillator. + + In this version, the corresponding “device” consists of a linear oscillator + in resonance with the measured signal and of an integrating unit. It also + provides both phase and amplitude of the input signal. The method exploits + the known relation between the resonant oscillator's phase and amplitude + and those of the input. Additionally, the resonant oscillator acts as a + bandpass filter for experimental data. + + This filter also includes a frequency-adaptation algorithm, and removes + low-frequency trend (baseline fluctuations). For a detailed description, + refer to [1]_. + + Parameters + ---------- + npt : int + Number of points to be measured and processed + fs : float + Sampling frequency in Hz. + nu : float + Rough estimate of the tremor frequency. + + Returns + ------- + Dphase : numpy.ndarray + Array containing the phase values. + Dampl : numpy.ndarray + Array containing the amplitude values. + + References + ---------- + .. [1] Rosenblum, M., Pikovsky, A., Kühn, A.A. et al. Real-time estimation + of phase and amplitude with application to neural data. Sci Rep 11, 18037 + (2021). https://doi.org/10.1038/s41598-021-97560-5 + """ + + def __init__(self, fs=1000, nu=4.5): + + # Parameters of the measurement "device" + self.dt = 1 / fs # Sampling interval + self.om0 = 5 * nu # Angular frequency + self.alpha = 0.3 * self.om0 + self.gamma = self.alpha / 2 + + # Parameters of adaptation algorithm + nperiods = 2 # Number of previous periods for frequency correction + npt_period = round(2 * np.pi / self.om0 / self.dt) # Number of points per period + self.memory = nperiods * npt_period # M points for frequency correction buffer + self.update_factor = 5 # Number of frequency updates per period + self.update_step = round(npt_period / self.update_factor) + self.updatepoint = 2 * self.memory + + # Precomputed quantities for linear fit for frequency adaptation + self._tbuf = np.arange(self.memory) * self.dt + self._Sx = np.sum(self._tbuf) + self._denom = self.memory * np.sum(self._tbuf ** 2) - self._Sx ** 2 + + # Initialize amplitude and phase devices + self.oscillator = Device(self.om0, self.dt, self.gamma, "oscillator") + self.integrator = Device(self.om0, self.dt, self.gamma, "integrator") + + # Buffer to store past input values + self.n_channels = None + self.buffer = None + self.runav = 0. # Initial guess for the dc-component + + def transform(self, X): + """Transform the input signal into phase and amplitude estimates. + + Parameters + ---------- + X : ndarray, shape (n_samples, n_channels) + The input signal to be transformed. + + Returns + ------- + phase : float + Current phase estimate. + ampl : float + Current amplitude estimate. + """ + n_samples = X.shape[0] + phase = np.zeros(n_samples) + ampl = np.zeros(n_samples) + sdemean = np.copy(X) + + if self.buffer is None: + n_channels = 1 # X.shape[1] + self.buffer = Buffer(self.memory, n_channels) + + for k in range(n_samples): + self.buffer.push(X[k]) + sdemean[k] = self.buffer.view(1) - self.runav # Baseline correction + + if self.buffer.counter < 3: + continue # Skip the first two samples + + # Perform one step of the oscillator equations + self.oscillator.step(sdemean[k - 2], sdemean[k - 1], sdemean[k]) + self.integrator.step(sdemean[k - 2], sdemean[k - 1], sdemean[k]) + + # New phase and amplitude values + v = self.integrator.mu * self.integrator.state["x"] * self.om0 + phase[k] = np.arctan2(v, self.oscillator.state["dx"]) + ampl[k] = self.alpha * np.sqrt(self.oscillator.state["dx"] ** 2 + v ** 2) + + if k > self.updatepoint: + # Buffer for frequency estimation + tmp = np.unwrap(phase[k - self.memory:k]) + + # Frequency via linear fit of the phases in the buffer + self.om0 = (self.memory * np.sum(self._tbuf * tmp) - + self._Sx * np.sum(tmp)) / self._denom + + # Recompute integration parameters for the new frequency + self.integrator.om0 = self.om0 + self.oscillator.om0 = self.om0 + self.oscillator.init_coefs(self.om0, self.dt, self.gamma) + + # Update running average + self.runav = (self.runav + np.mean(self.buffer.view(self.memory))) / 2 + self.updatepoint += self.update_step # Point for the next update + + return phase, ampl + + +def locking_based_phase(s, dt, npt): + """Compute the locking-based phase. + + This technique exploits the ideas from the synchronization theory. It is + well-known that a force s(t) acting on a limit-cycle oscillator can entrain + it. It means that the oscillator's frequency becomes equal to that of the + force, and their phases differ by a constant. Thus, the phase of the locked + limit-cycle oscillator will correspond to the phase of the signal. For our + purposes, it is helpful to use the simplest oscillator model, the so-called + phase oscillator. To ensure the phase-locking to the force, we have to + adjust the oscillator's frequency to the signal's frequency. We assume that + we do not know the latter a priori, but can only roughly estimate it. We + propose a simple approach that starts with this estimate and automatically + tunes the “device's” frequency to ensure the locking and thus provides the + instantaneous phase. + + Note that the amplitude is not determined with this approach. Technically, + the algorithm reduces to solving differential equation incorporating + measured data given at discrete time points; for details, see [1]_. + + Parameters + ---------- + s: array-like + Input signal. + dt: float + Time step. + npt: int + Number of points. + + Returns + ------- + lb_phase: ndarray + Locking-based phase. + + References + ---------- + .. [1] Rosenblum, M., Pikovsky, A., Kühn, A.A. et al. Real-time estimation + of phase and amplitude with application to neural data. Sci Rep 11, 18037 + (2021). https://doi.org/10.1038/s41598-021-97560-5 + """ + lb_phase = np.zeros(npt) + + # Parameters of the measurement device + epsilon = 0.8 # Amplitude of the oscillator + K = 0.5 # Frequency adaptation coefficient + n_rk_steps = 5 # Number of Runge-Kutta steps per time step + + # Parameters of the frequency adaptation algorithm + nu_est = 1.1 # initial guess for the frequency + update_factor = 20 # number of frequency updates per period + + # Buffer for frequency estimation + npbuf = round(2 * np.pi / nu_est / dt) # number of samples for frequency estimation + update_point = 2 * npbuf + update_step = round(npbuf / update_factor) + tbuf = np.arange(npbuf) * dt + sx = np.sum(tbuf) + denom = npbuf * np.sum(tbuf ** 2) - sx ** 2 + + for k in range(2, npt): + lb_phase[k] = rk(lb_phase[k - 1], dt, n_rk_steps, nu_est, epsilon, + s[k], s[k - 1], s[k]) + + if k > update_point: + buffer = lb_phase[k - npbuf:k] + nu_quasi = (npbuf * np.sum(tbuf * buffer) - sx * np.sum(buffer)) / denom + nu_est = nu_est + K * (nu_quasi - nu_est) + update_point += update_step + + return lb_phase + + +def rk(phi, dt, n_steps, omega, epsilon, s_prev, s, s_new): + """Runge-Kutta method for phase calculation. + + Parameters + ---------- + phi : float + Initial phase value. + dt : float + Time step size. + n_steps : int + Number of steps to iterate. + omega : float + Angular frequency. + epsilon : float + Amplitude of the oscillation. + s_prev : float + Previous phase value. + s : float + Current phase value. + s_new : float + New phase value. + + Returns + ------- + phi : float + The calculated phase value. + + """ + h = dt / n_steps + b = (s_new - s_prev) / dt / 2 + c = (s_prev - 2 * s + s_new) / dt ** 2 + hh = h / 2. + h6 = h / 6. + t = 0 + + for _ in range(n_steps): + th = t + hh + dy0 = omega - epsilon * (s + b * t + c * t ** 2) * np.sin(phi) + phit = phi + hh * dy0 + dyt = omega - epsilon * (s + b * th + c * th ** 2) * np.sin(phit) + phit = phi + hh * dyt + dym = omega - epsilon * (s + b * th + c * th ** 2) * np.sin(phit) + phit = phi + h * dym + dym = dym + dyt + t = t + h + dyt = omega - epsilon * (s + b * t + c * t ** 2) * np.sin(phit) + phi = phi + h6 * (dy0 + 2 * dym + dyt) + + return phi + + +def init_coefs(om0, dt, alpha): + """Compute coefficients for solving oscillator's equations. + + Parameters + ---------- + om0 : float + Oscillator frequency (estimation). + dt : float + Sampling interval. + alpha : float + Half of the damping. + + Returns + ------- + C1 : float + Coefficient C1. + C2 : float + Coefficient C2. + C3 : float + Coefficient C3. + eetadel : complex + Exponential term for amplitude device. + ealdel : float + Exponential term for amplitude device. + eta : float + Square root of the difference of oscillator frequency squared and + damping squared. + + """ + # alpha is the half of the damping: x'' + 2 * alpha * x' + om0^2 * x = input + eta2 = om0**2 - alpha**2 + eta = np.sqrt(eta2) if eta2 > 0 else np.sqrt(-eta2) # replicate Matlab behavior + eetadel = np.exp(1j * eta * dt) + a = 1. / eetadel + ealdel = np.exp(alpha * dt) + + I1 = 1j * (a - 1) / eta + I2 = (a * (1 + 1j * eta * dt) - 1) / eta2 + I3 = (a * (dt * eta * (2 + 1j * dt * eta) - 2 * 1j) + 2 * 1j) / eta2 / eta + + C1 = (I3 - I2 * dt) / (2 * dt**2 * ealdel) + C2 = I1 - I3 / dt**2 + C3 = ealdel * (I2 * dt + I3) / (2 * dt**2) + + return C1, C2, C3, eetadel, ealdel, eta + + +def one_step_oscillator(x, xd, alpha, eta, edel_mu, eal_del, C1, C2, C3, spp, + sp, s): + """Perform one step of the oscillator equations. + + Parameters + ---------- + x : float + State variable x. + xd : float + Derivative of state variable x. + alpha : float + Damping parameter. + eta : float + Square root of the difference of oscillator frequency squared and + damping squared. + edel_mu : complex + Exponential term for time step. + eal_del : float + Exponential term for time step. + C1 : float + Coefficient C1. + C2 : float + Coefficient C2. + C3 : float + Coefficient C3. + spp : float + Second orevious measurement. + sp : float + Previous measurement. + s : float + Current measurement. + + Returns + ------- + x : float + Updated state variable x. + xd : float + Updated derivative of state variable x. + """ + A = x - 1j * (xd + alpha * x) / eta + A = A - 1j * (C1 * spp + C2 * sp + C3 * s) / eta + d = A * edel_mu + y = np.real(d) + yd = 1j * eta * (d - np.conj(d)) / 2 + x = y / eal_del + xd = np.real((yd - alpha * y) / edel_mu) + + return x, xd + + +def one_step_integrator(z, edelmu, mu, dt, spp, sp, s): + """Perform one step of the integrator in the oscillator equations. + + Parameters + ---------- + z : float + State variable z. + edelmu : float + Exponential term for integrator. + mu : float + Integrator parameter. + dt : float + Sampling interval. + spp : float + Second previous measurement. + sp : float + Previous measurement. + s : float + Current measurement. + + Returns + ------- + z : float + Updated state variable z. + """ + a = sp + b = (s - spp) / (2 * dt) + c = (spp - 2 * sp + s) / (2 * dt ** 2) + d = -a + b * mu - 2 * c * mu ** 2 + C0 = z + d + z = C0 * edelmu - d + b * dt - 2 * c * mu * dt + c * dt ** 2 + return z diff --git a/tests/test_buffer.py b/tests/test_buffer.py new file mode 100644 index 00000000..f2fbf030 --- /dev/null +++ b/tests/test_buffer.py @@ -0,0 +1,72 @@ +import numpy as np +import pytest + +from meegkit.utils.buffer import Buffer + + +def test_push(): + buf = Buffer(32, 1) + data = np.arange(1000) + buf.push(data[:10]) + assert buf.counter == 10 + assert buf.head == 10 + assert buf.tail == 0 + +def test_get_new_samples(): + buf = Buffer(32, 1) + data = np.arange(1000) + buf.push(data[:10]) + samples = buf.get_new_samples(5) + np.testing.assert_array_equal(samples.flatten(), data[:5]) + assert buf.tail == 5 + samples = buf.get_new_samples(5) + np.testing.assert_array_equal(samples.flatten(), data[5:10]) + +def test_reset(): + buf = Buffer(32, 1) + data = np.arange(1000) + buf.push(data[:10]) + buf.reset() + assert buf.counter == 0 + assert buf.head == 0 + assert buf.tail == 0 + +def test_repr(): + buf = Buffer(32, 1) + repr = buf.__repr__() + assert repr == "Buffer(32, 1)\n> counter: 0\n> head: 0\n> tail: 0\n" + +def test_overflow_warning(): + buf = Buffer(32, 1) + data = np.arange(1000) + with pytest.warns(UserWarning, match="Buffer overflow: some old data has been lost"): + buf.push(data) + +def test_repeated_push(): + buf = Buffer(32, 1) + data = np.arange(1000) + + print(buf) + buf.push(data[:10]) + print(buf) + buf.push(data[10:20]) + print(buf) + + with pytest.warns(UserWarning, match="Buffer overflow: some old data has been discarded"): + buf.push(data[20:40]) + print(buf) + assert buf.counter == 40 + assert buf.head == 8 + assert buf._head == 40 + assert buf.tail == 8 + assert buf._tail == 8 + + out = buf.get_new_samples() + print(out.flatten()) + np.testing.assert_array_equal(out.flatten(), data[8:40]) # first 8 samples are lost + + +if __name__ == "__main__": + # test_get_new_samples() + # test_repeated_push() + pytest.main([__file__]) \ No newline at end of file diff --git a/tests/test_filters.py b/tests/test_filters.py new file mode 100644 index 00000000..9c927dae --- /dev/null +++ b/tests/test_filters.py @@ -0,0 +1,157 @@ +import matplotlib.pyplot as plt +import numpy as np +from scipy.signal import hilbert + +from meegkit.utils.filters import ( + NonResOscillator, + ResOscillator, + locking_based_phase, +) + + +def test_all_alg(show=True): + """Replicate the figure from the paper.""" + # Model data, all three algorithms + npt = 40000 + fs = 100 + s = generate_multi_comp_data_fr_mod(npt, fs) # Generate test data + dt = 1 / fs + time = np.arange(npt) * dt + + # Plot the test signal's Fourier spectrum + # f, ax = plt.subplots(1, 1) + # ax.psd(s, Fs=fs, NFFT=2018*4, noverlap=fs) + # ax.set_title("Test signal's Fourier spectrum") + # plt.show() + + + # The test signal s and its Hilbert amplitude ah (red); one can see that + # ah does not represent a good envelope for s. On the contrary, the + # Hilbert-based phase estimation yields good results, and therefore we take + # it for the ground truth. + ht_ampl = np.abs(hilbert(s)) # Hilbert amplitude + ht_phase = np.angle(hilbert(s)) # Hilbert phase + + lb_phase = locking_based_phase(s, dt, npt) + lb_phi_dif = phase_difference(ht_phase, lb_phase) + + osc = NonResOscillator(fs, 1) + nr_phase, nr_ampl = osc.transform(s) + nr_phi_dif = phase_difference(ht_phase, nr_phase) + + osc = ResOscillator(fs, 1.1) + r_phase, r_ampl = osc.transform(s) + r_phi_dif = phase_difference(ht_phase, r_phase) + + # Panels (b, c, d) show the difference between the + # Hilbert phase and causally estimated phases (phi_L, phi_N, phi_R) are + # obtained by means of the locking-based technique, non-resonant and + # resonant oscillator, respectively). These panels demonstrate that the + # output of the developed causal algorithms is very close to the HT-phase. + # Notice that in (c) we show phi_H - phi_N modulo 2pi, since the phase + # difference is not bounded : within the first 20 time units the phase + # difference decreases to -14pi, until it saturates and oscillates around + # (phi_H - phi_N) = 0. + f, ax = plt.subplots(4, 2, sharex=True, figsize=(12, 8)) + ax[0, 0].plot(time, s, time, ht_phase) + ax[0, 0].set_ylabel(r"$s,\phi_H$") + # ax[0, 0].set_ylim([-2, 3]) + ax[0, 0].set_title("Signal and its Hilbert phase") + + ax[1, 0].plot(time, lb_phi_dif) + ax[1, 0].set_ylabel(r"$\phi_H - \phi_L$") + ax[1, 0].set_ylim([-np.pi, np.pi]) + ax[1, 0].set_title("Phase locking approach") + + ax[2, 0].plot(time, nr_phi_dif) + ax[2, 0].set_ylabel(r"$\phi_H - \phi_N$") + ax[2, 0].set_ylim([-np.pi, np.pi]) + ax[2, 0].set_title("Nonresonant oscillator") + + ax[3, 0].plot(time, r_phi_dif) + ax[3, 0].set_ylim([-np.pi, np.pi]) + ax[3, 0].set_ylabel("$\phi_H - \phi_R$") + ax[3, 0].set_xlabel("Time") + ax[3, 0].set_title("Resonant oscillator") + + + ax[0, 1].plot(time, s, time, ht_ampl) + ax[0, 1].set_ylabel(r"$s,a_H$") + ax[0, 1].set_ylim([-2, 3]) + ax[0, 1].set_title("Signal and its Hilbert amplitude") + ax[1, 1].axis("off") + ax[2, 1].plot(time, s, time, nr_ampl) + ax[2, 1].set_ylabel(r"$s,a_N$") + ax[2, 1].set_title("Amplitudes") + ax[2, 1].set_ylim([-2, 4]) + ax[2, 1].set_title("Nonresonant oscillator") + ax[3, 1].plot(time, s, time, r_ampl) + ax[3, 1].set_xlabel("Time") + ax[3, 1].set_ylabel(r"$s,a_R$") + ax[3, 1].set_title("Resonant oscillator") + plt.suptitle("Amplitude (right) and phase (left) estimation algorithms") + plt.tight_layout() + plt.show() + +def phase_difference(phi1, phi2): + cos_phi_dif = np.cos(phi1) * np.cos(phi2) + np.sin(phi1) * np.sin(phi2) + sin_phi_dif = np.sin(phi1) * np.cos(phi2) - np.cos(phi1) * np.sin(phi2) + phi_dif = np.arctan2(sin_phi_dif, cos_phi_dif) + + return phi_dif + +def generate_multi_comp_data_fr_mod(npt=40000, fs=100): + """Generate multi-component data with frequency modulation. + + Returns + ------- + s : np.ndarray + Generated data. + dt : float + Time step. + npt : int + Number of data points. + + """ + dt = 1 / fs + t = np.arange(npt) * dt + omega1 = np.sqrt(2) / 30 # Frequency of the first component (slow) + omega2 = np.sqrt(5) / 60 # Frequency of the second component (fast) + amp = 1 + 0.95 * np.cos(omega1 * t) + p = t + 5 * np.sin(omega2 * t) + s = np.cos(p) + 0.2 * np.cos(2 * p + np.pi / 6) + 0.1 * np.cos(3 * p + np.pi / 3) + s *= amp + + return s + + +def generate_simple_signal(npt=40000, fs=100, f1=1.2, f2=10): + """Generate multi-component data with frequency modulation. + + Returns + ------- + s : np.ndarray + Generated data. + dt : float + Time step. + npt : int + Number of data points. + + """ + dt = 1 / fs + t = np.arange(npt) * dt + omega1 = 2 * np.pi * f1 # Frequency of the first component (slow) + omega2 = 2 * np.pi * f2 # Frequency of the second component (fast) + amp = 1 + 0.95 * np.cos(omega1 * t) + p = t + np.sin(omega2 * t) + s = np.cos(p) + s += 0.2 * np.cos(2 * p + np.pi / 6) + s += 0.1 * np.cos(3 * p + np.pi / 3) + s *= amp + + return s + + +if __name__ == "__main__": + # Run the model_data_all_alg function + test_all_alg() From ede00e06d4a7abe171915bb5fcab76bf721c24de Mon Sep 17 00:00:00 2001 From: Nicolas Barascud <10333715+nbara@users.noreply.github.com> Date: Mon, 29 Jan 2024 14:08:57 +0100 Subject: [PATCH 2/8] replicate paper results perfectly --- meegkit/utils/filters.py | 68 ++++++++++++++++++++++------------------ tests/test_filters.py | 54 +++++++++++++------------------ 2 files changed, 59 insertions(+), 63 deletions(-) diff --git a/meegkit/utils/filters.py b/meegkit/utils/filters.py index fe199271..f8d72b3a 100644 --- a/meegkit/utils/filters.py +++ b/meegkit/utils/filters.py @@ -40,7 +40,7 @@ def __init__(self, om0, dt, damping, type, mu=500): self.mu = mu self.edelmu = np.exp(-self.dt / self.mu) - self.state = dict(x=0, dx=0) + self.state = dict(x=0, y=0) def init_coefs(self, om0, dt, damping): @@ -74,12 +74,12 @@ def step(self, sprev, s, snew): """ if self.type == "oscillator": x = self.state["x"] - y = self.state["dx"] - x, dx = one_step_oscillator(x, y, self.damping, self.eta, self.enuDel, + y = self.state["y"] + x, y = one_step_oscillator(x, y, self.damping, self.eta, self.enuDel, self.ealDel, self.c1, self.c2, self.c3, sprev, s, snew) self.state["x"] = x - self.state["dx"] = dx + self.state["y"] = y elif self.type == "integrator": x = self.state["x"] @@ -126,7 +126,7 @@ def __init__(self, fs=250, nu=1.1): self.dt = 1 / fs # Sampling interval self.nu = nu # Rough estimate of the tremor frequency self.om0 = 5 * nu # Oscillator frequency (estimation) - self.alpha_a = 2.5 * self.om0 # Damping parameter for the "amplitude device" + self.alpha_a = 6.0 # Damping parameter for the "amplitude device" self.gamma_a = self.alpha_a / 2 self.alpha_p = 0.2 # Damping parameter for the "phase device" self.gamma_p = self.alpha_p / 2 @@ -181,12 +181,12 @@ def transform(self, X): # Amplitude estimation spp, sp, s = self.buffer.view(3) self.ampl_device.step(spp, sp, s) - z = self.ampl_device.state["dx"] / self.nu + z = self.ampl_device.state["y"] / self.nu ampl[k] = self.factor * np.sqrt(z ** 2 + self.ampl_device.state["x"] ** 2) # Phase estimation self.phase_device.step(spp, sp, s) - z = self.phase_device.state["dx"] / self.nu + z = self.phase_device.state["y"] / self.nu phase[k] = np.arctan2(-z, self.phase_device.state["x"]) if k > self.update_point: @@ -240,12 +240,12 @@ def __init__(self, fs=1000, nu=4.5): # Parameters of the measurement "device" self.dt = 1 / fs # Sampling interval - self.om0 = 5 * nu # Angular frequency + self.om0 = 1.1 # Angular frequency self.alpha = 0.3 * self.om0 self.gamma = self.alpha / 2 # Parameters of adaptation algorithm - nperiods = 2 # Number of previous periods for frequency correction + nperiods = 1 # Number of previous periods for frequency correction npt_period = round(2 * np.pi / self.om0 / self.dt) # Number of points per period self.memory = nperiods * npt_period # M points for frequency correction buffer self.update_factor = 5 # Number of frequency updates per period @@ -253,7 +253,7 @@ def __init__(self, fs=1000, nu=4.5): self.updatepoint = 2 * self.memory # Precomputed quantities for linear fit for frequency adaptation - self._tbuf = np.arange(self.memory) * self.dt + self._tbuf = np.arange(1, self.memory + 1) * self.dt self._Sx = np.sum(self._tbuf) self._denom = self.memory * np.sum(self._tbuf ** 2) - self._Sx ** 2 @@ -284,27 +284,34 @@ def transform(self, X): n_samples = X.shape[0] phase = np.zeros(n_samples) ampl = np.zeros(n_samples) - sdemean = np.copy(X) if self.buffer is None: n_channels = 1 # X.shape[1] self.buffer = Buffer(self.memory, n_channels) + self._demean = np.zeros((3, n_channels)) # Buffer for the demeaned signal + self._oscbuf = np.zeros((3, n_channels)) # Buffer for the integrator for k in range(n_samples): self.buffer.push(X[k]) - sdemean[k] = self.buffer.view(1) - self.runav # Baseline correction + self._demean = np.roll(self._demean, -1, axis=0) + self._demean[-1] = self.buffer.view(1) - self.runav # Baseline correction if self.buffer.counter < 3: continue # Skip the first two samples # Perform one step of the oscillator equations - self.oscillator.step(sdemean[k - 2], sdemean[k - 1], sdemean[k]) - self.integrator.step(sdemean[k - 2], sdemean[k - 1], sdemean[k]) + self.oscillator.step(self._demean[-3], self._demean[-2], self._demean[-1]) + + self._oscbuf = np.roll(self._oscbuf, -1, axis=0) + self._oscbuf[-1] = self.oscillator.state["y"] + + self.integrator.step(self._oscbuf[-3], self._oscbuf[-2], self._oscbuf[-1]) # New phase and amplitude values v = self.integrator.mu * self.integrator.state["x"] * self.om0 - phase[k] = np.arctan2(v, self.oscillator.state["dx"]) - ampl[k] = self.alpha * np.sqrt(self.oscillator.state["dx"] ** 2 + v ** 2) + + phase[k] = np.arctan2(v, self.oscillator.state["y"]) + ampl[k] = self.alpha * np.sqrt(self.oscillator.state["y"] ** 2 + v ** 2) if k > self.updatepoint: # Buffer for frequency estimation @@ -315,12 +322,10 @@ def transform(self, X): self._Sx * np.sum(tmp)) / self._denom # Recompute integration parameters for the new frequency - self.integrator.om0 = self.om0 - self.oscillator.om0 = self.om0 self.oscillator.init_coefs(self.om0, self.dt, self.gamma) # Update running average - self.runav = (self.runav + np.mean(self.buffer.view(self.memory))) / 2 + self.runav = np.mean(self.buffer.view(self.memory)) self.updatepoint += self.update_step # Point for the next update return phase, ampl @@ -480,7 +485,8 @@ def init_coefs(om0, dt, alpha): """ # alpha is the half of the damping: x'' + 2 * alpha * x' + om0^2 * x = input eta2 = om0**2 - alpha**2 - eta = np.sqrt(eta2) if eta2 > 0 else np.sqrt(-eta2) # replicate Matlab behavior + eta2 = complex(eta2) if eta2 < 0 else eta2 + eta = np.sqrt(eta2) # replicate Matlab behavior eetadel = np.exp(1j * eta * dt) a = 1. / eetadel ealdel = np.exp(alpha * dt) @@ -496,8 +502,7 @@ def init_coefs(om0, dt, alpha): return C1, C2, C3, eetadel, ealdel, eta -def one_step_oscillator(x, xd, alpha, eta, edel_mu, eal_del, C1, C2, C3, spp, - sp, s): +def one_step_oscillator(x, xd, alpha, eta, eeta_del, eal_del, C1, C2, C3, spp, sp, s): """Perform one step of the oscillator equations. Parameters @@ -511,7 +516,7 @@ def one_step_oscillator(x, xd, alpha, eta, edel_mu, eal_del, C1, C2, C3, spp, eta : float Square root of the difference of oscillator frequency squared and damping squared. - edel_mu : complex + eeta_del : complex Exponential term for time step. eal_del : float Exponential term for time step. @@ -535,13 +540,14 @@ def one_step_oscillator(x, xd, alpha, eta, edel_mu, eal_del, C1, C2, C3, spp, xd : float Updated derivative of state variable x. """ - A = x - 1j * (xd + alpha * x) / eta - A = A - 1j * (C1 * spp + C2 * sp + C3 * s) / eta - d = A * edel_mu + # A = x - 1j * (xd + alpha * x) / eta + # A = A - 1j * (C1 * spp + C2 * sp + C3 * s) / eta + A = x - 1j * (xd + alpha * x + C1 * spp + C2 * sp + C3 * s) / eta + d = A * eeta_del y = np.real(d) - yd = 1j * eta * (d - np.conj(d)) / 2 + yd = - eta * np.imag(d) x = y / eal_del - xd = np.real((yd - alpha * y) / edel_mu) + xd = (yd - alpha * y) / eal_del return x, xd @@ -572,9 +578,9 @@ def one_step_integrator(z, edelmu, mu, dt, spp, sp, s): Updated state variable z. """ a = sp - b = (s - spp) / (2 * dt) - c = (spp - 2 * sp + s) / (2 * dt ** 2) - d = -a + b * mu - 2 * c * mu ** 2 + b = (s - spp) / 2 / dt + c = (spp - 2 * sp + s) / 2 / dt ** 2 + d = -a + b * mu - 2 * c * (mu ** 2) C0 = z + d z = C0 * edelmu - d + b * dt - 2 * c * mu * dt + c * dt ** 2 return z diff --git a/tests/test_filters.py b/tests/test_filters.py index 9c927dae..225ab50d 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -12,7 +12,7 @@ def test_all_alg(show=True): """Replicate the figure from the paper.""" # Model data, all three algorithms - npt = 40000 + npt = 100000 fs = 100 s = generate_multi_comp_data_fr_mod(npt, fs) # Generate test data dt = 1 / fs @@ -35,7 +35,7 @@ def test_all_alg(show=True): lb_phase = locking_based_phase(s, dt, npt) lb_phi_dif = phase_difference(ht_phase, lb_phase) - osc = NonResOscillator(fs, 1) + osc = NonResOscillator(fs, 1.1) nr_phase, nr_ampl = osc.transform(s) nr_phi_dif = phase_difference(ht_phase, nr_phase) @@ -52,40 +52,43 @@ def test_all_alg(show=True): # difference is not bounded : within the first 20 time units the phase # difference decreases to -14pi, until it saturates and oscillates around # (phi_H - phi_N) = 0. - f, ax = plt.subplots(4, 2, sharex=True, figsize=(12, 8)) - ax[0, 0].plot(time, s, time, ht_phase) + f, ax = plt.subplots(4, 2, sharex=True, sharey=True, figsize=(12, 8)) + ax[0, 0].plot(time, s, time, ht_phase, lw=.75) ax[0, 0].set_ylabel(r"$s,\phi_H$") # ax[0, 0].set_ylim([-2, 3]) ax[0, 0].set_title("Signal and its Hilbert phase") - ax[1, 0].plot(time, lb_phi_dif) + ax[1, 0].plot(time, lb_phi_dif, lw=.75) + ax[1, 0].axhline(0, color="k", ls=":", zorder=-1) ax[1, 0].set_ylabel(r"$\phi_H - \phi_L$") ax[1, 0].set_ylim([-np.pi, np.pi]) ax[1, 0].set_title("Phase locking approach") - ax[2, 0].plot(time, nr_phi_dif) + ax[2, 0].plot(time, nr_phi_dif, lw=.75) + ax[2, 0].axhline(0, color="k", ls=":", zorder=-1) ax[2, 0].set_ylabel(r"$\phi_H - \phi_N$") ax[2, 0].set_ylim([-np.pi, np.pi]) ax[2, 0].set_title("Nonresonant oscillator") - ax[3, 0].plot(time, r_phi_dif) + ax[3, 0].plot(time, r_phi_dif, lw=.75) + ax[3, 0].axhline(0, color="k", ls=":", zorder=-1) ax[3, 0].set_ylim([-np.pi, np.pi]) ax[3, 0].set_ylabel("$\phi_H - \phi_R$") ax[3, 0].set_xlabel("Time") ax[3, 0].set_title("Resonant oscillator") - - ax[0, 1].plot(time, s, time, ht_ampl) + ax[0, 1].plot(time, s, time, ht_ampl, lw=.75) ax[0, 1].set_ylabel(r"$s,a_H$") - ax[0, 1].set_ylim([-2, 3]) ax[0, 1].set_title("Signal and its Hilbert amplitude") + ax[1, 1].axis("off") - ax[2, 1].plot(time, s, time, nr_ampl) + + ax[2, 1].plot(time, s, time, nr_ampl, lw=.75) ax[2, 1].set_ylabel(r"$s,a_N$") ax[2, 1].set_title("Amplitudes") - ax[2, 1].set_ylim([-2, 4]) ax[2, 1].set_title("Nonresonant oscillator") - ax[3, 1].plot(time, s, time, r_ampl) + + ax[3, 1].plot(time, s, time, r_ampl, lw=.75) ax[3, 1].set_xlabel("Time") ax[3, 1].set_ylabel(r"$s,a_R$") ax[3, 1].set_title("Resonant oscillator") @@ -107,14 +110,10 @@ def generate_multi_comp_data_fr_mod(npt=40000, fs=100): ------- s : np.ndarray Generated data. - dt : float - Time step. - npt : int - Number of data points. """ dt = 1 / fs - t = np.arange(npt) * dt + t = np.arange(1, npt + 1) * dt omega1 = np.sqrt(2) / 30 # Frequency of the first component (slow) omega2 = np.sqrt(5) / 60 # Frequency of the second component (fast) amp = 1 + 0.95 * np.cos(omega1 * t) @@ -125,29 +124,20 @@ def generate_multi_comp_data_fr_mod(npt=40000, fs=100): return s -def generate_simple_signal(npt=40000, fs=100, f1=1.2, f2=10): +def generate_noisy_signal(npt=40000, fs=100, noise=0.1): """Generate multi-component data with frequency modulation. Returns ------- s : np.ndarray Generated data. - dt : float - Time step. - npt : int - Number of data points. """ + rng = np.default_rng(42) dt = 1 / fs - t = np.arange(npt) * dt - omega1 = 2 * np.pi * f1 # Frequency of the first component (slow) - omega2 = 2 * np.pi * f2 # Frequency of the second component (fast) - amp = 1 + 0.95 * np.cos(omega1 * t) - p = t + np.sin(omega2 * t) - s = np.cos(p) - s += 0.2 * np.cos(2 * p + np.pi / 6) - s += 0.1 * np.cos(3 * p + np.pi / 3) - s *= amp + # t = np.arange(1, npt + 1) * dt + s = generate_multi_comp_data_fr_mod(npt, fs) + s += rng.random(npt) * 0.1 return s From 2f1aa8e1dae7652497a0c5a45cd993afffb0d568 Mon Sep 17 00:00:00 2001 From: Nicolas Barascud <10333715+nbara@users.noreply.github.com> Date: Tue, 30 Jan 2024 16:54:55 +0100 Subject: [PATCH 3/8] support multichannel data --- meegkit/utils/filters.py | 154 ++++++++++++++++++++++++--------------- tests/test_filters.py | 85 ++++++++++++++++++++- 2 files changed, 176 insertions(+), 63 deletions(-) diff --git a/meegkit/utils/filters.py b/meegkit/utils/filters.py index f8d72b3a..4eaf25c6 100644 --- a/meegkit/utils/filters.py +++ b/meegkit/utils/filters.py @@ -1,6 +1,14 @@ """Real-time phase and amplitude estimation using resonant oscillators. +Notes +----- +While the original paper [1]_ only describes the algorithm for a single +channel, this implementation can handle multiple channels. However, the +algorithm is not optimized for this purpose. Therefore, this code is likely to +be slow for large input arrays (n_channels >> 10), since an individual +oscillator is instantiated for each channel. + .. [1] Rosenblum, M., Pikovsky, A., Kühn, A.A. et al. Real-time estimation of phase and amplitude with application to neural data. Sci Rep 11, 18037 (2021). https://doi.org/10.1038/s41598-021-97560-5 @@ -13,7 +21,7 @@ class Device: """Measurement device. - This class implements a linear oscillator with a given frequency and damping. + This class implements a measurement device, as describes by Rosenblum et al. Parameters ---------- @@ -25,6 +33,9 @@ class Device: Damping parameter. type : str in {"integrator", "oscillator"} Type of the device. + mu : float + Integrator parameter (only for type="integrator"). Should be >> target + frequency of interest. """ @@ -132,11 +143,7 @@ def __init__(self, fs=250, nu=1.1): self.gamma_p = self.alpha_p / 2 self.factor = np.sqrt((self.om0 ** 2 - nu ** 2) ** 2 + (self.alpha_a * nu) ** 2) - # Set up the phase amplitude "devices" - self.ampl_device = Device(self.om0, self.dt, self.gamma_a, "oscillator") - self.phase_device = Device(self.om0, self.dt, self.gamma_p, "oscillator") - - # Update parameters + # Update parameters, and precomputed quantities update_factor = 5 self.memory = round(2 * np.pi / self.om0 / self.dt) self.update_point = 2 * self.memory @@ -149,28 +156,38 @@ def __init__(self, fs=250, nu=1.1): self.n_channels = None self.buffer = None + def _set_devices(self, n_channels): + # Set up the phase and amplitude "devices" + self.adevice = [Device(self.om0, self.dt, self.gamma_a, "oscillator") + for _ in range(n_channels)] + self.pdevice = [Device(self.om0, self.dt, self.gamma_p, "oscillator") + for _ in range(n_channels)] + + def transform(self, X): """Transform the input signal into phase and amplitude estimates. Parameters ---------- - X : ndarray, shape (n_samples, n_channels) + X : ndarray, shape=(n_samples, n_channels) The input signal to be transformed. Returns ------- - phase : float + phase : ndarray, shape=(n_samples, n_channels) Current phase estimate. - ampl : float + ampl : ndarray, shape=(n_samples, n_channels) Current amplitude estimate. """ - n_samples = X.shape[0] - phase = np.zeros(n_samples) - ampl = np.zeros(n_samples) - if self.buffer is None: - n_channels = 1 # X.shape[1] - self.buffer = Buffer(self.memory, n_channels) + self.n_channels = 1 if X.ndim < 2 else X.shape[1] + self.buffer = Buffer(self.memory, self.n_channels) + self.phase_buffer = Buffer(self.memory, self.n_channels) + self._set_devices(self.n_channels) + + n_samples = X.shape[0] + phase = np.zeros((n_samples, self.n_channels)) + amp = np.zeros((n_samples, self.n_channels)) for k in range(n_samples): self.buffer.push(X[k]) @@ -180,23 +197,28 @@ def transform(self, X): # Amplitude estimation spp, sp, s = self.buffer.view(3) - self.ampl_device.step(spp, sp, s) - z = self.ampl_device.state["y"] / self.nu - ampl[k] = self.factor * np.sqrt(z ** 2 + self.ampl_device.state["x"] ** 2) - # Phase estimation - self.phase_device.step(spp, sp, s) - z = self.phase_device.state["y"] / self.nu - phase[k] = np.arctan2(-z, self.phase_device.state["x"]) + for ch in range(self.n_channels): + self.adevice[ch].step(spp, sp, s) + z = self.adevice[ch].state["y"] / self.nu + amp[k, ch] = self.factor * np.sqrt(z ** 2 + self.adevice[ch].state["x"] ** 2) + + # Phase estimation + self.pdevice[ch].step(spp, sp, s) + z = self.pdevice[ch].state["y"] / self.nu + phase[k, ch] = np.arctan2(-z, self.pdevice[ch].state["x"]) + + self.phase_buffer.push(phase[k]) if k > self.update_point: - tmp = np.unwrap(phase[k - self.memory:k]) - self.nu = (self.memory * np.sum(self._tbuf * tmp) - - self._Sx * np.sum(tmp)) / self._denom + tmpphase = self.phase_buffer.view(self.memory) + for ch in range(self.n_channels): + tmp = np.unwrap(tmpphase[:, ch]) + self.nu = (self.memory * np.sum(self._tbuf * tmp) - + self._Sx * np.sum(tmp)) / self._denom self.update_point += self.update_step - - return phase, ampl + return phase, amp class ResOscillator: @@ -221,6 +243,8 @@ class ResOscillator: Sampling frequency in Hz. nu : float Rough estimate of the tremor frequency. + freq_adaptation : bool + Whether to use the frequency adaptation algorithm (default=True). Returns ------- @@ -236,7 +260,7 @@ class ResOscillator: (2021). https://doi.org/10.1038/s41598-021-97560-5 """ - def __init__(self, fs=1000, nu=4.5): + def __init__(self, fs=1000, nu=4.5, freq_adaptation=True): # Parameters of the measurement "device" self.dt = 1 / fs # Sampling interval @@ -257,14 +281,18 @@ def __init__(self, fs=1000, nu=4.5): self._Sx = np.sum(self._tbuf) self._denom = self.memory * np.sum(self._tbuf ** 2) - self._Sx ** 2 - # Initialize amplitude and phase devices - self.oscillator = Device(self.om0, self.dt, self.gamma, "oscillator") - self.integrator = Device(self.om0, self.dt, self.gamma, "integrator") - # Buffer to store past input values self.n_channels = None self.buffer = None self.runav = 0. # Initial guess for the dc-component + self.freq_adaptation = freq_adaptation + + def _set_devices(self, n_channels): + # Set up the phase and amplitude "devices" + self.osc = [Device(self.om0, self.dt, self.gamma, "oscillator") + for _ in range(n_channels)] + self.int = [Device(self.om0, self.dt, self.gamma, "integrator") + for _ in range(n_channels)] def transform(self, X): """Transform the input signal into phase and amplitude estimates. @@ -281,15 +309,17 @@ def transform(self, X): ampl : float Current amplitude estimate. """ - n_samples = X.shape[0] - phase = np.zeros(n_samples) - ampl = np.zeros(n_samples) - if self.buffer is None: - n_channels = 1 # X.shape[1] - self.buffer = Buffer(self.memory, n_channels) - self._demean = np.zeros((3, n_channels)) # Buffer for the demeaned signal - self._oscbuf = np.zeros((3, n_channels)) # Buffer for the integrator + self.n_channels = 1 if X.ndim < 2 else X.shape[1] + self.buffer = Buffer(self.memory, self.n_channels) + self.phase_buffer = Buffer(self.memory, self.n_channels) + self._demean = np.zeros((3, self.n_channels)) # past demeaned signal values + self._oscbuf = np.zeros((3, self.n_channels)) # past integrator values + self._set_devices(self.n_channels) + + n_samples = X.shape[0] + phase = np.zeros((n_samples, self.n_channels)) + ampl = np.zeros((n_samples, self.n_channels)) for k in range(n_samples): self.buffer.push(X[k]) @@ -299,33 +329,39 @@ def transform(self, X): if self.buffer.counter < 3: continue # Skip the first two samples - # Perform one step of the oscillator equations - self.oscillator.step(self._demean[-3], self._demean[-2], self._demean[-1]) - - self._oscbuf = np.roll(self._oscbuf, -1, axis=0) - self._oscbuf[-1] = self.oscillator.state["y"] + for ch in range(self.n_channels): + # Perform one step of the oscillator equations + self.osc[ch].step(self._demean[-3, ch], + self._demean[-2, ch], + self._demean[-1, ch]) - self.integrator.step(self._oscbuf[-3], self._oscbuf[-2], self._oscbuf[-1]) + self._oscbuf[:, ch] = np.roll(self._oscbuf[:, ch], -1, axis=0) + self._oscbuf[-1, ch] = self.osc[ch].state["y"] - # New phase and amplitude values - v = self.integrator.mu * self.integrator.state["x"] * self.om0 + self.int[ch].step(self._oscbuf[-3, ch], + self._oscbuf[-2, ch], + self._oscbuf[-1, ch]) - phase[k] = np.arctan2(v, self.oscillator.state["y"]) - ampl[k] = self.alpha * np.sqrt(self.oscillator.state["y"] ** 2 + v ** 2) + # New phase and amplitude values + v = self.int[ch].mu * self.int[ch].state["x"] * self.osc[ch].om0 - if k > self.updatepoint: - # Buffer for frequency estimation - tmp = np.unwrap(phase[k - self.memory:k]) + phase[k, ch] = np.arctan2(v, np.real(self.osc[ch].state["y"])) + ampl[k, ch] = self.alpha * np.sqrt(self.osc[ch].state["y"] ** 2 + v ** 2) - # Frequency via linear fit of the phases in the buffer - self.om0 = (self.memory * np.sum(self._tbuf * tmp) - - self._Sx * np.sum(tmp)) / self._denom + self.phase_buffer.push(phase[k]) + if self.freq_adaptation and k > self.updatepoint: + tmpphase = self.phase_buffer.view(self.memory) # Recompute integration parameters for the new frequency - self.oscillator.init_coefs(self.om0, self.dt, self.gamma) + for ch in range(self.n_channels): + tmp = np.unwrap(tmpphase[:, ch]) + # Frequency via linear fit of the phases in the buffer + om0 = (self.memory * np.sum(self._tbuf * tmp) - + self._Sx * np.sum(tmp)) / self._denom + self.osc[ch].init_coefs(om0, self.dt, self.gamma) # Update running average - self.runav = np.mean(self.buffer.view(self.memory)) + self.runav = np.mean(self.buffer.view(self.memory), axis=0, keepdims=True) self.updatepoint += self.update_step # Point for the next update return phase, ampl @@ -540,8 +576,6 @@ def one_step_oscillator(x, xd, alpha, eta, eeta_del, eal_del, C1, C2, C3, spp, s xd : float Updated derivative of state variable x. """ - # A = x - 1j * (xd + alpha * x) / eta - # A = A - 1j * (C1 * spp + C2 * sp + C3 * s) / eta A = x - 1j * (xd + alpha * x + C1 * spp + C2 * sp + C3 * s) / eta d = A * eeta_del y = np.real(d) diff --git a/tests/test_filters.py b/tests/test_filters.py index 225ab50d..a5817b53 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -9,6 +9,82 @@ ) +def test_noisy_signal(show=True): + """Replicate the figure from the paper.""" + # Model data, all three algorithms + npt = 100000 + fs = 100 + s0 = generate_multi_comp_data_fr_mod(npt, fs) # Generate test data + s = generate_noisy_signal(npt, fs, noise=0.5) + dt = 1 / fs + time = np.arange(npt) * dt + + # Plot the test signal's Fourier spectrum + # f, ax = plt.subplots(1, 1) + # ax.psd(s, Fs=fs, NFFT=2018*4, noverlap=fs) + # ax.set_title("Test signal's Fourier spectrum") + # plt.show() + + # The test signal s and its Hilbert amplitude ah (red); one can see that + # ah does not represent a good envelope for s. On the contrary, the + # Hilbert-based phase estimation yields good results, and therefore we take + # it for the ground truth. + gta = np.abs(hilbert(s0)) # ground truth amplitude + gtp = np.angle(hilbert(s0)) # ground truth phase + + hta = np.abs(hilbert(s)) # Hilbert amplitude + htp = np.angle(hilbert(s)) # Hilbert phase + + osc = NonResOscillator(fs, 1.1) + nrp, nra = osc.transform(s) + + osc = ResOscillator(fs, 1.1) + rp, ra = osc.transform(s) + + f, ax = plt.subplots(3, 2, sharex="col", sharey=True, figsize=(12, 8)) + ax[0, 0].plot(time, gtp, lw=.75, label="Ground truth") + ax[0, 0].plot(time, htp, lw=.75, label=r"$\phi_H$") + ax[0, 0].set_ylabel(r"$\phi_H$") + ax[0, 0].set_title("Signal and its Hilbert phase") + + ax[1, 0].plot(time, gtp, lw=.75, label="Ground truth") + ax[1, 0].plot(time, nrp, lw=.75, label=r"$\phi_N$") + ax[1, 0].set_ylabel(r"$\phi_N$") + ax[1, 0].set_ylim([-np.pi, np.pi]) + ax[1, 0].set_title("Nonresonant oscillator") + + ax[2, 0].plot(time, gtp, lw=.75, label="Ground truth") + ax[2, 0].plot(time, rp, lw=.75, label=r"$\phi_N$") + ax[2, 0].set_ylim([-np.pi, np.pi]) + ax[2, 0].set_ylabel("$\phi_H - \phi_R$") + ax[2, 0].set_xlabel("Time") + ax[2, 0].set_title("Resonant oscillator") + + ax[0, 1].plot(time, gta, lw=.75, label="Ground truth") + ax[0, 1].plot(time, hta, lw=.75, label=r"$a_H$") + ax[0, 1].plot(time, s, lw=.75, label="Signal", color="grey", alpha=.5, zorder=0) + ax[0, 1].set_ylabel(r"$a_H$") + ax[0, 1].set_title("Signal and its Hilbert amplitude") + + ax[1, 1].plot(time, gta, lw=.75, label="Ground truth") + ax[1, 1].plot(time, nra, lw=.75, label=r"$a_N$") + ax[1, 1].set_ylabel(r"$a_N$") + ax[1, 1].set_title("Amplitudes") + ax[1, 1].set_title("Nonresonant oscillator") + + ax[2, 1].plot(time, gta, lw=.75, label="Ground truth") + ax[2, 1].plot(time, ra, lw=.75, label=r"$a_R$") + ax[2, 1].set_xlabel("Time") + ax[2, 1].set_ylabel(r"$a_R$") + ax[2, 1].set_title("Resonant oscillator") + plt.suptitle("Amplitude (right) and phase (left) - noisy signal") + + ax[2, 0].set_xlim([0, 40]) + ax[2, 1].set_xlim([0, 1000]) + plt.tight_layout() + plt.show() + + def test_all_alg(show=True): """Replicate the figure from the paper.""" # Model data, all three algorithms @@ -24,7 +100,6 @@ def test_all_alg(show=True): # ax.set_title("Test signal's Fourier spectrum") # plt.show() - # The test signal s and its Hilbert amplitude ah (red); one can see that # ah does not represent a good envelope for s. On the contrary, the # Hilbert-based phase estimation yields good results, and therefore we take @@ -37,10 +112,12 @@ def test_all_alg(show=True): osc = NonResOscillator(fs, 1.1) nr_phase, nr_ampl = osc.transform(s) + nr_phase = nr_phase[:, 0] nr_phi_dif = phase_difference(ht_phase, nr_phase) osc = ResOscillator(fs, 1.1) r_phase, r_ampl = osc.transform(s) + r_phase = r_phase[:, 0] r_phi_dif = phase_difference(ht_phase, r_phase) # Panels (b, c, d) show the difference between the @@ -96,6 +173,7 @@ def test_all_alg(show=True): plt.tight_layout() plt.show() + def phase_difference(phi1, phi2): cos_phi_dif = np.cos(phi1) * np.cos(phi2) + np.sin(phi1) * np.sin(phi2) sin_phi_dif = np.sin(phi1) * np.cos(phi2) - np.cos(phi1) * np.sin(phi2) @@ -133,8 +211,8 @@ def generate_noisy_signal(npt=40000, fs=100, noise=0.1): Generated data. """ - rng = np.default_rng(42) - dt = 1 / fs + rng = np.random.default_rng(1) + # dt = 1 / fs # t = np.arange(1, npt + 1) * dt s = generate_multi_comp_data_fr_mod(npt, fs) s += rng.random(npt) * 0.1 @@ -145,3 +223,4 @@ def generate_noisy_signal(npt=40000, fs=100, noise=0.1): if __name__ == "__main__": # Run the model_data_all_alg function test_all_alg() + # test_noisy_signal() \ No newline at end of file From 27841f5624a58f7c1c17e64cd2a677f30846c7f8 Mon Sep 17 00:00:00 2001 From: Nicolas Barascud <10333715+nbara@users.noreply.github.com> Date: Tue, 30 Jan 2024 16:55:20 +0100 Subject: [PATCH 4/8] pep --- meegkit/utils/filters.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/meegkit/utils/filters.py b/meegkit/utils/filters.py index 4eaf25c6..031f7c34 100644 --- a/meegkit/utils/filters.py +++ b/meegkit/utils/filters.py @@ -201,7 +201,8 @@ def transform(self, X): for ch in range(self.n_channels): self.adevice[ch].step(spp, sp, s) z = self.adevice[ch].state["y"] / self.nu - amp[k, ch] = self.factor * np.sqrt(z ** 2 + self.adevice[ch].state["x"] ** 2) + amp[k, ch] = self.factor * \ + np.sqrt(z ** 2 + self.adevice[ch].state["x"] ** 2) # Phase estimation self.pdevice[ch].step(spp, sp, s) From 36ec5a5dfc7dca7597a5489a6a251be9d582d959 Mon Sep 17 00:00:00 2001 From: Nicolas Barascud <10333715+nbara@users.noreply.github.com> Date: Wed, 31 Jan 2024 10:45:44 +0100 Subject: [PATCH 5/8] add example --- examples/example_phase_estimation.ipynb | 122 +++++++++++++++++++++ examples/example_phase_estimation.py | 137 ++++++++++++++++++++++++ meegkit/{utils/filters.py => phase.py} | 2 +- tests/test_filters.py | 95 ++++++++-------- 4 files changed, 308 insertions(+), 48 deletions(-) create mode 100644 examples/example_phase_estimation.ipynb create mode 100644 examples/example_phase_estimation.py rename meegkit/{utils/filters.py => phase.py} (99%) diff --git a/examples/example_phase_estimation.ipynb b/examples/example_phase_estimation.ipynb new file mode 100644 index 00000000..43d8cc99 --- /dev/null +++ b/examples/example_phase_estimation.ipynb @@ -0,0 +1,122 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n# Causal phase estimation example\n\nThis example shows how to causally estimate the phase of a signal using two\noscillator models, as described in [1]_.\n\nUses `meegkit.phase.ResOscillator()` and `meegkit.phase.NonResOscillator()`.\n\n## References\n.. [1] Rosenblum, M., Pikovsky, A., K\u00fchn, A.A. et al. Real-time estimation\n of phase and amplitude with application to neural data. Sci Rep 11, 18037\n (2021). https://doi.org/10.1038/s41598-021-97560-5\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "import os\nimport sys\n\nimport matplotlib.pyplot as plt\nimport numpy as np\nfrom scipy.signal import hilbert\n\nfrom meegkit.phase import NonResOscillator, ResOscillator, locking_based_phase\n\nsys.path.append(os.path.join(\"..\", \"tests\"))\n\nfrom test_filters import generate_multi_comp_data, phase_difference # noqa:E402\n\nrng = np.random.default_rng(5)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Build data\nFirst, we generate a multi-component signal with amplitude and phase\nmodulations, as described in the paper [1]_.\n\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "npt = 100000\nfs = 100\ns = generate_multi_comp_data(npt, fs) # Generate test data\ndt = 1 / fs\ntime = np.arange(npt) * dt" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Vizualize signal\nPlot the test signal's Fourier spectrum\n\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "f, ax = plt.subplots(2, 1)\nax[0].plot(time, s)\nax[0].set_xlabel(\"Time (s)\")\nax[0].set_title(\"Test signal\")\nax[1].psd(s, Fs=fs, NFFT=2048*4, noverlap=fs)\nax[1].set_title(\"Test signal's Fourier spectrum\")\nplt.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compute phase and amplitude\nWe compute the Hilbert phase and amplitude, as well as the phase and\namplitude obtained by the locking-based technique, non-resonant and\nresonant oscillator.\n\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "ht_ampl = np.abs(hilbert(s)) # Hilbert amplitude\nht_phase = np.angle(hilbert(s)) # Hilbert phase\n\nlb_phase = locking_based_phase(s, dt, npt)\nlb_phi_dif = phase_difference(ht_phase, lb_phase)\n\nosc = NonResOscillator(fs, 1.1)\nnr_phase, nr_ampl = osc.transform(s)\nnr_phase = nr_phase[:, 0]\nnr_phi_dif = phase_difference(ht_phase, nr_phase)\n\nosc = ResOscillator(fs, 1.1)\nr_phase, r_ampl = osc.transform(s)\nr_phase = r_phase[:, 0]\nr_phi_dif = phase_difference(ht_phase, r_phase)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Results\nHere we reproduce figure 1 from the original paper [1]_.\n\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The first row shows the test signal $s$ and its Hilbert amplitude $a_H$ ; one\ncan see that ah does not represent a good envelope for $s$. On the contrary,\nthe Hilbert-based phase estimation yields good results, and therefore we take\nit for the ground truth.\nRows 2-4 show the difference between the Hilbert phase and causally\nestimated phases ($\\phi_L$, $\\phi_N$, $\\phi_R$) are obtained by means of the\nlocking-based technique, non-resonant and resonant oscillator, respectively).\nThese panels demonstrate that the output of the developed causal algorithms\nis very close to the HT-phase. Notice that we show $\\phi_H - \\phi_N$\nmodulo $2\\pi$, since the phase difference is not bounded.\n\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "f, ax = plt.subplots(4, 2, sharex=True, sharey=True, figsize=(12, 8))\nax[0, 0].plot(time, s, time, ht_phase, lw=.75)\nax[0, 0].set_ylabel(r\"$s,\\phi_H$\")\nax[0, 0].set_title(\"Signal and its Hilbert phase\")\n\nax[1, 0].plot(time, lb_phi_dif, lw=.75)\nax[1, 0].axhline(0, color=\"k\", ls=\":\", zorder=-1)\nax[1, 0].set_ylabel(r\"$\\phi_H - \\phi_L$\")\nax[1, 0].set_ylim([-np.pi, np.pi])\nax[1, 0].set_title(\"Phase locking approach\")\n\nax[2, 0].plot(time, nr_phi_dif, lw=.75)\nax[2, 0].axhline(0, color=\"k\", ls=\":\", zorder=-1)\nax[2, 0].set_ylabel(r\"$\\phi_H - \\phi_N$\")\nax[2, 0].set_ylim([-np.pi, np.pi])\nax[2, 0].set_title(\"Nonresonant oscillator\")\n\nax[3, 0].plot(time, r_phi_dif, lw=.75)\nax[3, 0].axhline(0, color=\"k\", ls=\":\", zorder=-1)\nax[3, 0].set_ylim([-np.pi, np.pi])\nax[3, 0].set_ylabel(\"$\\phi_H - \\phi_R$\")\nax[3, 0].set_xlabel(\"Time\")\nax[3, 0].set_title(\"Resonant oscillator\")\n\nax[0, 1].plot(time, s, time, ht_ampl, lw=.75)\nax[0, 1].set_ylabel(r\"$s,a_H$\")\nax[0, 1].set_title(\"Signal and its Hilbert amplitude\")\n\nax[1, 1].axis(\"off\")\n\nax[2, 1].plot(time, s, time, nr_ampl, lw=.75)\nax[2, 1].set_ylabel(r\"$s,a_N$\")\nax[2, 1].set_title(\"Amplitudes\")\nax[2, 1].set_title(\"Nonresonant oscillator\")\n\nax[3, 1].plot(time, s, time, r_ampl, lw=.75)\nax[3, 1].set_xlabel(\"Time\")\nax[3, 1].set_ylabel(r\"$s,a_R$\")\nax[3, 1].set_title(\"Resonant oscillator\")\nplt.suptitle(\"Amplitude (right) and phase (left) estimation algorithms\")\nplt.tight_layout()\nplt.show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} \ No newline at end of file diff --git a/examples/example_phase_estimation.py b/examples/example_phase_estimation.py new file mode 100644 index 00000000..8e5e19b7 --- /dev/null +++ b/examples/example_phase_estimation.py @@ -0,0 +1,137 @@ +""" +Causal phase estimation example +=============================== + +This example shows how to causally estimate the phase of a signal using two +oscillator models, as described in [1]_. + +Uses `meegkit.phase.ResOscillator()` and `meegkit.phase.NonResOscillator()`. + +References +---------- +.. [1] Rosenblum, M., Pikovsky, A., Kühn, A.A. et al. Real-time estimation + of phase and amplitude with application to neural data. Sci Rep 11, 18037 + (2021). https://doi.org/10.1038/s41598-021-97560-5 + +""" +import os +import sys + +import matplotlib.pyplot as plt +import numpy as np +from scipy.signal import hilbert + +from meegkit.phase import NonResOscillator, ResOscillator, locking_based_phase + +sys.path.append(os.path.join("..", "tests")) + +from test_filters import generate_multi_comp_data, phase_difference # noqa:E402 + +rng = np.random.default_rng(5) + +############################################################################### +# Build data +# ----------------------------------------------------------------------------- +# First, we generate a multi-component signal with amplitude and phase +# modulations, as described in the paper [1]_. + +############################################################################### +npt = 100000 +fs = 100 +s = generate_multi_comp_data(npt, fs) # Generate test data +dt = 1 / fs +time = np.arange(npt) * dt + +############################################################################### +# Vizualize signal +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Plot the test signal's Fourier spectrum +f, ax = plt.subplots(2, 1) +ax[0].plot(time, s) +ax[0].set_xlabel("Time (s)") +ax[0].set_title("Test signal") +ax[1].psd(s, Fs=fs, NFFT=2048*4, noverlap=fs) +ax[1].set_title("Test signal's Fourier spectrum") +plt.tight_layout() + +############################################################################### +# Compute phase and amplitude +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# We compute the Hilbert phase and amplitude, as well as the phase and +# amplitude obtained by the locking-based technique, non-resonant and +# resonant oscillator. +ht_ampl = np.abs(hilbert(s)) # Hilbert amplitude +ht_phase = np.angle(hilbert(s)) # Hilbert phase + +lb_phase = locking_based_phase(s, dt, npt) +lb_phi_dif = phase_difference(ht_phase, lb_phase) + +osc = NonResOscillator(fs, 1.1) +nr_phase, nr_ampl = osc.transform(s) +nr_phase = nr_phase[:, 0] +nr_phi_dif = phase_difference(ht_phase, nr_phase) + +osc = ResOscillator(fs, 1.1) +r_phase, r_ampl = osc.transform(s) +r_phase = r_phase[:, 0] +r_phi_dif = phase_difference(ht_phase, r_phase) + + +############################################################################### +# Results +# ----------------------------------------------------------------------------- +# Here we reproduce figure 1 from the original paper [1]_. + +############################################################################### +# The first row shows the test signal $s$ and its Hilbert amplitude $a_H$ ; one +# can see that ah does not represent a good envelope for $s$. On the contrary, +# the Hilbert-based phase estimation yields good results, and therefore we take +# it for the ground truth. +# Rows 2-4 show the difference between the Hilbert phase and causally +# estimated phases ($\phi_L$, $\phi_N$, $\phi_R$) are obtained by means of the +# locking-based technique, non-resonant and resonant oscillator, respectively). +# These panels demonstrate that the output of the developed causal algorithms +# is very close to the HT-phase. Notice that we show $\phi_H - \phi_N$ +# modulo $2\pi$, since the phase difference is not bounded. +f, ax = plt.subplots(4, 2, sharex=True, sharey=True, figsize=(12, 8)) +ax[0, 0].plot(time, s, time, ht_phase, lw=.75) +ax[0, 0].set_ylabel(r"$s,\phi_H$") +ax[0, 0].set_title("Signal and its Hilbert phase") + +ax[1, 0].plot(time, lb_phi_dif, lw=.75) +ax[1, 0].axhline(0, color="k", ls=":", zorder=-1) +ax[1, 0].set_ylabel(r"$\phi_H - \phi_L$") +ax[1, 0].set_ylim([-np.pi, np.pi]) +ax[1, 0].set_title("Phase locking approach") + +ax[2, 0].plot(time, nr_phi_dif, lw=.75) +ax[2, 0].axhline(0, color="k", ls=":", zorder=-1) +ax[2, 0].set_ylabel(r"$\phi_H - \phi_N$") +ax[2, 0].set_ylim([-np.pi, np.pi]) +ax[2, 0].set_title("Nonresonant oscillator") + +ax[3, 0].plot(time, r_phi_dif, lw=.75) +ax[3, 0].axhline(0, color="k", ls=":", zorder=-1) +ax[3, 0].set_ylim([-np.pi, np.pi]) +ax[3, 0].set_ylabel("$\phi_H - \phi_R$") +ax[3, 0].set_xlabel("Time") +ax[3, 0].set_title("Resonant oscillator") + +ax[0, 1].plot(time, s, time, ht_ampl, lw=.75) +ax[0, 1].set_ylabel(r"$s,a_H$") +ax[0, 1].set_title("Signal and its Hilbert amplitude") + +ax[1, 1].axis("off") + +ax[2, 1].plot(time, s, time, nr_ampl, lw=.75) +ax[2, 1].set_ylabel(r"$s,a_N$") +ax[2, 1].set_title("Amplitudes") +ax[2, 1].set_title("Nonresonant oscillator") + +ax[3, 1].plot(time, s, time, r_ampl, lw=.75) +ax[3, 1].set_xlabel("Time") +ax[3, 1].set_ylabel(r"$s,a_R$") +ax[3, 1].set_title("Resonant oscillator") +plt.suptitle("Amplitude (right) and phase (left) estimation algorithms") +plt.tight_layout() +plt.show() \ No newline at end of file diff --git a/meegkit/utils/filters.py b/meegkit/phase.py similarity index 99% rename from meegkit/utils/filters.py rename to meegkit/phase.py index 031f7c34..8e3cc87f 100644 --- a/meegkit/utils/filters.py +++ b/meegkit/phase.py @@ -15,7 +15,7 @@ """ import numpy as np -from .buffer import Buffer +from meegkit.utils.buffer import Buffer class Device: diff --git a/tests/test_filters.py b/tests/test_filters.py index a5817b53..c9e6d990 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -2,7 +2,7 @@ import numpy as np from scipy.signal import hilbert -from meegkit.utils.filters import ( +from meegkit.phase import ( NonResOscillator, ResOscillator, locking_based_phase, @@ -14,7 +14,7 @@ def test_noisy_signal(show=True): # Model data, all three algorithms npt = 100000 fs = 100 - s0 = generate_multi_comp_data_fr_mod(npt, fs) # Generate test data + s0 = generate_multi_comp_data(npt, fs) # Generate test data s = generate_noisy_signal(npt, fs, noise=0.5) dt = 1 / fs time = np.arange(npt) * dt @@ -41,48 +41,49 @@ def test_noisy_signal(show=True): osc = ResOscillator(fs, 1.1) rp, ra = osc.transform(s) - f, ax = plt.subplots(3, 2, sharex="col", sharey=True, figsize=(12, 8)) - ax[0, 0].plot(time, gtp, lw=.75, label="Ground truth") - ax[0, 0].plot(time, htp, lw=.75, label=r"$\phi_H$") - ax[0, 0].set_ylabel(r"$\phi_H$") - ax[0, 0].set_title("Signal and its Hilbert phase") - - ax[1, 0].plot(time, gtp, lw=.75, label="Ground truth") - ax[1, 0].plot(time, nrp, lw=.75, label=r"$\phi_N$") - ax[1, 0].set_ylabel(r"$\phi_N$") - ax[1, 0].set_ylim([-np.pi, np.pi]) - ax[1, 0].set_title("Nonresonant oscillator") - - ax[2, 0].plot(time, gtp, lw=.75, label="Ground truth") - ax[2, 0].plot(time, rp, lw=.75, label=r"$\phi_N$") - ax[2, 0].set_ylim([-np.pi, np.pi]) - ax[2, 0].set_ylabel("$\phi_H - \phi_R$") - ax[2, 0].set_xlabel("Time") - ax[2, 0].set_title("Resonant oscillator") - - ax[0, 1].plot(time, gta, lw=.75, label="Ground truth") - ax[0, 1].plot(time, hta, lw=.75, label=r"$a_H$") - ax[0, 1].plot(time, s, lw=.75, label="Signal", color="grey", alpha=.5, zorder=0) - ax[0, 1].set_ylabel(r"$a_H$") - ax[0, 1].set_title("Signal and its Hilbert amplitude") - - ax[1, 1].plot(time, gta, lw=.75, label="Ground truth") - ax[1, 1].plot(time, nra, lw=.75, label=r"$a_N$") - ax[1, 1].set_ylabel(r"$a_N$") - ax[1, 1].set_title("Amplitudes") - ax[1, 1].set_title("Nonresonant oscillator") - - ax[2, 1].plot(time, gta, lw=.75, label="Ground truth") - ax[2, 1].plot(time, ra, lw=.75, label=r"$a_R$") - ax[2, 1].set_xlabel("Time") - ax[2, 1].set_ylabel(r"$a_R$") - ax[2, 1].set_title("Resonant oscillator") - plt.suptitle("Amplitude (right) and phase (left) - noisy signal") - - ax[2, 0].set_xlim([0, 40]) - ax[2, 1].set_xlim([0, 1000]) - plt.tight_layout() - plt.show() + if show: + f, ax = plt.subplots(3, 2, sharex="col", sharey=True, figsize=(12, 8)) + ax[0, 0].plot(time, gtp, lw=.75, label="Ground truth") + ax[0, 0].plot(time, htp, lw=.75, label=r"$\phi_H$") + ax[0, 0].set_ylabel(r"$\phi_H$") + ax[0, 0].set_title("Signal and its Hilbert phase") + + ax[1, 0].plot(time, gtp, lw=.75, label="Ground truth") + ax[1, 0].plot(time, nrp, lw=.75, label=r"$\phi_N$") + ax[1, 0].set_ylabel(r"$\phi_N$") + ax[1, 0].set_ylim([-np.pi, np.pi]) + ax[1, 0].set_title("Nonresonant oscillator") + + ax[2, 0].plot(time, gtp, lw=.75, label="Ground truth") + ax[2, 0].plot(time, rp, lw=.75, label=r"$\phi_N$") + ax[2, 0].set_ylim([-np.pi, np.pi]) + ax[2, 0].set_ylabel("$\phi_H - \phi_R$") + ax[2, 0].set_xlabel("Time") + ax[2, 0].set_title("Resonant oscillator") + + ax[0, 1].plot(time, gta, lw=.75, label="Ground truth") + ax[0, 1].plot(time, hta, lw=.75, label=r"$a_H$") + ax[0, 1].plot(time, s, lw=.75, label="Signal", color="grey", alpha=.5, zorder=0) + ax[0, 1].set_ylabel(r"$a_H$") + ax[0, 1].set_title("Signal and its Hilbert amplitude") + + ax[1, 1].plot(time, gta, lw=.75, label="Ground truth") + ax[1, 1].plot(time, nra, lw=.75, label=r"$a_N$") + ax[1, 1].set_ylabel(r"$a_N$") + ax[1, 1].set_title("Amplitudes") + ax[1, 1].set_title("Nonresonant oscillator") + + ax[2, 1].plot(time, gta, lw=.75, label="Ground truth") + ax[2, 1].plot(time, ra, lw=.75, label=r"$a_R$") + ax[2, 1].set_xlabel("Time") + ax[2, 1].set_ylabel(r"$a_R$") + ax[2, 1].set_title("Resonant oscillator") + plt.suptitle("Amplitude (right) and phase (left) - noisy signal") + + ax[2, 0].set_xlim([0, 40]) + ax[2, 1].set_xlim([0, 1000]) + plt.tight_layout() + plt.show() def test_all_alg(show=True): @@ -90,7 +91,7 @@ def test_all_alg(show=True): # Model data, all three algorithms npt = 100000 fs = 100 - s = generate_multi_comp_data_fr_mod(npt, fs) # Generate test data + s = generate_multi_comp_data(npt, fs) # Generate test data dt = 1 / fs time = np.arange(npt) * dt @@ -181,7 +182,7 @@ def phase_difference(phi1, phi2): return phi_dif -def generate_multi_comp_data_fr_mod(npt=40000, fs=100): +def generate_multi_comp_data(npt=40000, fs=100): """Generate multi-component data with frequency modulation. Returns @@ -214,7 +215,7 @@ def generate_noisy_signal(npt=40000, fs=100, noise=0.1): rng = np.random.default_rng(1) # dt = 1 / fs # t = np.arange(1, npt + 1) * dt - s = generate_multi_comp_data_fr_mod(npt, fs) + s = generate_multi_comp_data(npt, fs) s += rng.random(npt) * 0.1 return s From ff321287234b75e397dd5550402808b6cd20239a Mon Sep 17 00:00:00 2001 From: Nicolas Barascud <10333715+nbara@users.noreply.github.com> Date: Wed, 31 Jan 2024 10:51:07 +0100 Subject: [PATCH 6/8] Update README.md --- README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.md b/README.md index 809e023e..3a120144 100644 --- a/README.md +++ b/README.md @@ -153,3 +153,12 @@ and was adapted to python by [Giuseppe Ferraro](mailto:giuseppe.ferraro@isae-sup EEG Bad Channel Detection Using Local Outlier Factor (LOF). Sensors (Basel). 2022 Sep 27;22(19):7314. https://doi.org/10.3390/s22197314. ``` + +### 6. Real-Time Phase Estimation + +This code is based on the Matlab implementation from [Michael Rosenblum](http://www.stat.physik.uni-potsdam.de/~mros), and its corresponding paper [1]. + +```sql +[1] Rosenblum, M., Pikovsky, A., Kühn, A.A. et al. Real-time estimation of phase and amplitude with application to neural data. Sci Rep 11, 18037 (2021). https://doi.org/10.1038/s41598-021-97560-5 + +``` From 53452149a5da204dc918b319bb1658b7765b0fe5 Mon Sep 17 00:00:00 2001 From: Nicolas Barascud <10333715+nbara@users.noreply.github.com> Date: Wed, 31 Jan 2024 11:14:16 +0100 Subject: [PATCH 7/8] Update test_filters.py --- tests/test_filters.py | 127 ++++++++++++++++++++---------------------- 1 file changed, 60 insertions(+), 67 deletions(-) diff --git a/tests/test_filters.py b/tests/test_filters.py index c9e6d990..93f42341 100644 --- a/tests/test_filters.py +++ b/tests/test_filters.py @@ -15,16 +15,10 @@ def test_noisy_signal(show=True): npt = 100000 fs = 100 s0 = generate_multi_comp_data(npt, fs) # Generate test data - s = generate_noisy_signal(npt, fs, noise=0.5) + s = generate_noisy_signal(npt, fs, noise=0.8) dt = 1 / fs time = np.arange(npt) * dt - # Plot the test signal's Fourier spectrum - # f, ax = plt.subplots(1, 1) - # ax.psd(s, Fs=fs, NFFT=2018*4, noverlap=fs) - # ax.set_title("Test signal's Fourier spectrum") - # plt.show() - # The test signal s and its Hilbert amplitude ah (red); one can see that # ah does not represent a good envelope for s. On the contrary, the # Hilbert-based phase estimation yields good results, and therefore we take @@ -73,6 +67,7 @@ def test_noisy_signal(show=True): ax[1, 1].set_title("Amplitudes") ax[1, 1].set_title("Nonresonant oscillator") + # The resonant oscillator should be much more robust to noise ax[2, 1].plot(time, gta, lw=.75, label="Ground truth") ax[2, 1].plot(time, ra, lw=.75, label=r"$a_R$") ax[2, 1].set_xlabel("Time") @@ -86,7 +81,7 @@ def test_noisy_signal(show=True): plt.show() -def test_all_alg(show=True): +def test_all_alg(show=False): """Replicate the figure from the paper.""" # Model data, all three algorithms npt = 100000 @@ -95,12 +90,6 @@ def test_all_alg(show=True): dt = 1 / fs time = np.arange(npt) * dt - # Plot the test signal's Fourier spectrum - # f, ax = plt.subplots(1, 1) - # ax.psd(s, Fs=fs, NFFT=2018*4, noverlap=fs) - # ax.set_title("Test signal's Fourier spectrum") - # plt.show() - # The test signal s and its Hilbert amplitude ah (red); one can see that # ah does not represent a good envelope for s. On the contrary, the # Hilbert-based phase estimation yields good results, and therefore we take @@ -121,58 +110,62 @@ def test_all_alg(show=True): r_phase = r_phase[:, 0] r_phi_dif = phase_difference(ht_phase, r_phase) - # Panels (b, c, d) show the difference between the - # Hilbert phase and causally estimated phases (phi_L, phi_N, phi_R) are - # obtained by means of the locking-based technique, non-resonant and - # resonant oscillator, respectively). These panels demonstrate that the - # output of the developed causal algorithms is very close to the HT-phase. - # Notice that in (c) we show phi_H - phi_N modulo 2pi, since the phase - # difference is not bounded : within the first 20 time units the phase - # difference decreases to -14pi, until it saturates and oscillates around - # (phi_H - phi_N) = 0. - f, ax = plt.subplots(4, 2, sharex=True, sharey=True, figsize=(12, 8)) - ax[0, 0].plot(time, s, time, ht_phase, lw=.75) - ax[0, 0].set_ylabel(r"$s,\phi_H$") - # ax[0, 0].set_ylim([-2, 3]) - ax[0, 0].set_title("Signal and its Hilbert phase") - - ax[1, 0].plot(time, lb_phi_dif, lw=.75) - ax[1, 0].axhline(0, color="k", ls=":", zorder=-1) - ax[1, 0].set_ylabel(r"$\phi_H - \phi_L$") - ax[1, 0].set_ylim([-np.pi, np.pi]) - ax[1, 0].set_title("Phase locking approach") - - ax[2, 0].plot(time, nr_phi_dif, lw=.75) - ax[2, 0].axhline(0, color="k", ls=":", zorder=-1) - ax[2, 0].set_ylabel(r"$\phi_H - \phi_N$") - ax[2, 0].set_ylim([-np.pi, np.pi]) - ax[2, 0].set_title("Nonresonant oscillator") - - ax[3, 0].plot(time, r_phi_dif, lw=.75) - ax[3, 0].axhline(0, color="k", ls=":", zorder=-1) - ax[3, 0].set_ylim([-np.pi, np.pi]) - ax[3, 0].set_ylabel("$\phi_H - \phi_R$") - ax[3, 0].set_xlabel("Time") - ax[3, 0].set_title("Resonant oscillator") - - ax[0, 1].plot(time, s, time, ht_ampl, lw=.75) - ax[0, 1].set_ylabel(r"$s,a_H$") - ax[0, 1].set_title("Signal and its Hilbert amplitude") - - ax[1, 1].axis("off") - - ax[2, 1].plot(time, s, time, nr_ampl, lw=.75) - ax[2, 1].set_ylabel(r"$s,a_N$") - ax[2, 1].set_title("Amplitudes") - ax[2, 1].set_title("Nonresonant oscillator") - - ax[3, 1].plot(time, s, time, r_ampl, lw=.75) - ax[3, 1].set_xlabel("Time") - ax[3, 1].set_ylabel(r"$s,a_R$") - ax[3, 1].set_title("Resonant oscillator") - plt.suptitle("Amplitude (right) and phase (left) estimation algorithms") - plt.tight_layout() - plt.show() + if show: + f, ax = plt.subplots(4, 2, sharex=True, sharey=True, figsize=(12, 8)) + ax[0, 0].plot(time, s, time, ht_phase, lw=.75) + ax[0, 0].set_ylabel(r"$s,\phi_H$") + # ax[0, 0].set_ylim([-2, 3]) + ax[0, 0].set_title("Signal and its Hilbert phase") + + ax[1, 0].plot(time, lb_phi_dif, lw=.75) + ax[1, 0].axhline(0, color="k", ls=":", zorder=-1) + ax[1, 0].set_ylabel(r"$\phi_H - \phi_L$") + ax[1, 0].set_ylim([-np.pi, np.pi]) + ax[1, 0].set_title("Phase locking approach") + + ax[2, 0].plot(time, nr_phi_dif, lw=.75) + ax[2, 0].axhline(0, color="k", ls=":", zorder=-1) + ax[2, 0].set_ylabel(r"$\phi_H - \phi_N$") + ax[2, 0].set_ylim([-np.pi, np.pi]) + ax[2, 0].set_title("Nonresonant oscillator") + + ax[3, 0].plot(time, r_phi_dif, lw=.75) + ax[3, 0].axhline(0, color="k", ls=":", zorder=-1) + ax[3, 0].set_ylim([-np.pi, np.pi]) + ax[3, 0].set_ylabel("$\phi_H - \phi_R$") + ax[3, 0].set_xlabel("Time") + ax[3, 0].set_title("Resonant oscillator") + + ax[0, 1].plot(time, s, time, ht_ampl, lw=.75) + ax[0, 1].set_ylabel(r"$s,a_H$") + ax[0, 1].set_title("Signal and its Hilbert amplitude") + + ax[1, 1].axis("off") + + ax[2, 1].plot(time, s, time, nr_ampl, lw=.75) + ax[2, 1].set_ylabel(r"$s,a_N$") + ax[2, 1].set_title("Amplitudes") + ax[2, 1].set_title("Nonresonant oscillator") + + ax[3, 1].plot(time, s, time, r_ampl, lw=.75) + ax[3, 1].set_xlabel("Time") + ax[3, 1].set_ylabel(r"$s,a_R$") + ax[3, 1].set_title("Resonant oscillator") + plt.suptitle("Amplitude (right) and phase (left) estimation algorithms") + plt.tight_layout() + plt.show() + + # Assert that the phase difference between the Hilbert phase and the + # phase estimated by the locking-based technique is small + assert np.mean(np.abs(lb_phi_dif)) < 0.27 + + # Assert that the phase difference between the Hilbert phase and the + # phase estimated by the non-resonant oscillator is small + assert np.mean(np.abs(nr_phi_dif)) < 0.2 + + # Assert that the phase difference between the Hilbert phase and the + # phase estimated by the resonant oscillator is small + assert np.mean(np.abs(r_phi_dif)) < 0.21 def phase_difference(phi1, phi2): @@ -216,7 +209,7 @@ def generate_noisy_signal(npt=40000, fs=100, noise=0.1): # dt = 1 / fs # t = np.arange(1, npt + 1) * dt s = generate_multi_comp_data(npt, fs) - s += rng.random(npt) * 0.1 + s += rng.random(npt) * noise return s From 3006332e97cdff4fc53d1eb391d8ec0704264a82 Mon Sep 17 00:00:00 2001 From: Nicolas Barascud <10333715+nbara@users.noreply.github.com> Date: Wed, 31 Jan 2024 11:19:34 +0100 Subject: [PATCH 8/8] Update index.rst --- doc/index.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/index.rst b/doc/index.rst index a88c893e..fd2fe078 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -38,6 +38,7 @@ Here is a list of the methods and techniques available in ``meegkit``: ~meegkit.dss ~meegkit.detrend ~meegkit.lof + ~meegkit.phase ~meegkit.ress ~meegkit.sns ~meegkit.star