diff --git a/README.md b/README.md index 809e023e..3a120144 100644 --- a/README.md +++ b/README.md @@ -153,3 +153,12 @@ and was adapted to python by [Giuseppe Ferraro](mailto:giuseppe.ferraro@isae-sup EEG Bad Channel Detection Using Local Outlier Factor (LOF). Sensors (Basel). 2022 Sep 27;22(19):7314. https://doi.org/10.3390/s22197314. ``` + +### 6. Real-Time Phase Estimation + +This code is based on the Matlab implementation from [Michael Rosenblum](http://www.stat.physik.uni-potsdam.de/~mros), and its corresponding paper [1]. + +```sql +[1] Rosenblum, M., Pikovsky, A., Kühn, A.A. et al. Real-time estimation of phase and amplitude with application to neural data. Sci Rep 11, 18037 (2021). https://doi.org/10.1038/s41598-021-97560-5 + +``` diff --git a/doc/index.rst b/doc/index.rst index a88c893e..fd2fe078 100644 --- a/doc/index.rst +++ b/doc/index.rst @@ -38,6 +38,7 @@ Here is a list of the methods and techniques available in ``meegkit``: ~meegkit.dss ~meegkit.detrend ~meegkit.lof + ~meegkit.phase ~meegkit.ress ~meegkit.sns ~meegkit.star diff --git a/examples/example_phase_estimation.ipynb b/examples/example_phase_estimation.ipynb new file mode 100644 index 00000000..43d8cc99 --- /dev/null +++ b/examples/example_phase_estimation.ipynb @@ -0,0 +1,122 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "\n# Causal phase estimation example\n\nThis example shows how to causally estimate the phase of a signal using two\noscillator models, as described in [1]_.\n\nUses `meegkit.phase.ResOscillator()` and `meegkit.phase.NonResOscillator()`.\n\n## References\n.. [1] Rosenblum, M., Pikovsky, A., K\u00fchn, A.A. et al. Real-time estimation\n of phase and amplitude with application to neural data. Sci Rep 11, 18037\n (2021). https://doi.org/10.1038/s41598-021-97560-5\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "import os\nimport sys\n\nimport matplotlib.pyplot as plt\nimport numpy as np\nfrom scipy.signal import hilbert\n\nfrom meegkit.phase import NonResOscillator, ResOscillator, locking_based_phase\n\nsys.path.append(os.path.join(\"..\", \"tests\"))\n\nfrom test_filters import generate_multi_comp_data, phase_difference # noqa:E402\n\nrng = np.random.default_rng(5)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Build data\nFirst, we generate a multi-component signal with amplitude and phase\nmodulations, as described in the paper [1]_.\n\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "npt = 100000\nfs = 100\ns = generate_multi_comp_data(npt, fs) # Generate test data\ndt = 1 / fs\ntime = np.arange(npt) * dt" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Vizualize signal\nPlot the test signal's Fourier spectrum\n\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "f, ax = plt.subplots(2, 1)\nax[0].plot(time, s)\nax[0].set_xlabel(\"Time (s)\")\nax[0].set_title(\"Test signal\")\nax[1].psd(s, Fs=fs, NFFT=2048*4, noverlap=fs)\nax[1].set_title(\"Test signal's Fourier spectrum\")\nplt.tight_layout()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Compute phase and amplitude\nWe compute the Hilbert phase and amplitude, as well as the phase and\namplitude obtained by the locking-based technique, non-resonant and\nresonant oscillator.\n\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "ht_ampl = np.abs(hilbert(s)) # Hilbert amplitude\nht_phase = np.angle(hilbert(s)) # Hilbert phase\n\nlb_phase = locking_based_phase(s, dt, npt)\nlb_phi_dif = phase_difference(ht_phase, lb_phase)\n\nosc = NonResOscillator(fs, 1.1)\nnr_phase, nr_ampl = osc.transform(s)\nnr_phase = nr_phase[:, 0]\nnr_phi_dif = phase_difference(ht_phase, nr_phase)\n\nosc = ResOscillator(fs, 1.1)\nr_phase, r_ampl = osc.transform(s)\nr_phase = r_phase[:, 0]\nr_phi_dif = phase_difference(ht_phase, r_phase)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Results\nHere we reproduce figure 1 from the original paper [1]_.\n\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The first row shows the test signal $s$ and its Hilbert amplitude $a_H$ ; one\ncan see that ah does not represent a good envelope for $s$. On the contrary,\nthe Hilbert-based phase estimation yields good results, and therefore we take\nit for the ground truth.\nRows 2-4 show the difference between the Hilbert phase and causally\nestimated phases ($\\phi_L$, $\\phi_N$, $\\phi_R$) are obtained by means of the\nlocking-based technique, non-resonant and resonant oscillator, respectively).\nThese panels demonstrate that the output of the developed causal algorithms\nis very close to the HT-phase. Notice that we show $\\phi_H - \\phi_N$\nmodulo $2\\pi$, since the phase difference is not bounded.\n\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": false + }, + "outputs": [], + "source": [ + "f, ax = plt.subplots(4, 2, sharex=True, sharey=True, figsize=(12, 8))\nax[0, 0].plot(time, s, time, ht_phase, lw=.75)\nax[0, 0].set_ylabel(r\"$s,\\phi_H$\")\nax[0, 0].set_title(\"Signal and its Hilbert phase\")\n\nax[1, 0].plot(time, lb_phi_dif, lw=.75)\nax[1, 0].axhline(0, color=\"k\", ls=\":\", zorder=-1)\nax[1, 0].set_ylabel(r\"$\\phi_H - \\phi_L$\")\nax[1, 0].set_ylim([-np.pi, np.pi])\nax[1, 0].set_title(\"Phase locking approach\")\n\nax[2, 0].plot(time, nr_phi_dif, lw=.75)\nax[2, 0].axhline(0, color=\"k\", ls=\":\", zorder=-1)\nax[2, 0].set_ylabel(r\"$\\phi_H - \\phi_N$\")\nax[2, 0].set_ylim([-np.pi, np.pi])\nax[2, 0].set_title(\"Nonresonant oscillator\")\n\nax[3, 0].plot(time, r_phi_dif, lw=.75)\nax[3, 0].axhline(0, color=\"k\", ls=\":\", zorder=-1)\nax[3, 0].set_ylim([-np.pi, np.pi])\nax[3, 0].set_ylabel(\"$\\phi_H - \\phi_R$\")\nax[3, 0].set_xlabel(\"Time\")\nax[3, 0].set_title(\"Resonant oscillator\")\n\nax[0, 1].plot(time, s, time, ht_ampl, lw=.75)\nax[0, 1].set_ylabel(r\"$s,a_H$\")\nax[0, 1].set_title(\"Signal and its Hilbert amplitude\")\n\nax[1, 1].axis(\"off\")\n\nax[2, 1].plot(time, s, time, nr_ampl, lw=.75)\nax[2, 1].set_ylabel(r\"$s,a_N$\")\nax[2, 1].set_title(\"Amplitudes\")\nax[2, 1].set_title(\"Nonresonant oscillator\")\n\nax[3, 1].plot(time, s, time, r_ampl, lw=.75)\nax[3, 1].set_xlabel(\"Time\")\nax[3, 1].set_ylabel(r\"$s,a_R$\")\nax[3, 1].set_title(\"Resonant oscillator\")\nplt.suptitle(\"Amplitude (right) and phase (left) estimation algorithms\")\nplt.tight_layout()\nplt.show()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.6" + } + }, + "nbformat": 4, + "nbformat_minor": 0 +} \ No newline at end of file diff --git a/examples/example_phase_estimation.py b/examples/example_phase_estimation.py new file mode 100644 index 00000000..8e5e19b7 --- /dev/null +++ b/examples/example_phase_estimation.py @@ -0,0 +1,137 @@ +""" +Causal phase estimation example +=============================== + +This example shows how to causally estimate the phase of a signal using two +oscillator models, as described in [1]_. + +Uses `meegkit.phase.ResOscillator()` and `meegkit.phase.NonResOscillator()`. + +References +---------- +.. [1] Rosenblum, M., Pikovsky, A., Kühn, A.A. et al. Real-time estimation + of phase and amplitude with application to neural data. Sci Rep 11, 18037 + (2021). https://doi.org/10.1038/s41598-021-97560-5 + +""" +import os +import sys + +import matplotlib.pyplot as plt +import numpy as np +from scipy.signal import hilbert + +from meegkit.phase import NonResOscillator, ResOscillator, locking_based_phase + +sys.path.append(os.path.join("..", "tests")) + +from test_filters import generate_multi_comp_data, phase_difference # noqa:E402 + +rng = np.random.default_rng(5) + +############################################################################### +# Build data +# ----------------------------------------------------------------------------- +# First, we generate a multi-component signal with amplitude and phase +# modulations, as described in the paper [1]_. + +############################################################################### +npt = 100000 +fs = 100 +s = generate_multi_comp_data(npt, fs) # Generate test data +dt = 1 / fs +time = np.arange(npt) * dt + +############################################################################### +# Vizualize signal +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# Plot the test signal's Fourier spectrum +f, ax = plt.subplots(2, 1) +ax[0].plot(time, s) +ax[0].set_xlabel("Time (s)") +ax[0].set_title("Test signal") +ax[1].psd(s, Fs=fs, NFFT=2048*4, noverlap=fs) +ax[1].set_title("Test signal's Fourier spectrum") +plt.tight_layout() + +############################################################################### +# Compute phase and amplitude +# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +# We compute the Hilbert phase and amplitude, as well as the phase and +# amplitude obtained by the locking-based technique, non-resonant and +# resonant oscillator. +ht_ampl = np.abs(hilbert(s)) # Hilbert amplitude +ht_phase = np.angle(hilbert(s)) # Hilbert phase + +lb_phase = locking_based_phase(s, dt, npt) +lb_phi_dif = phase_difference(ht_phase, lb_phase) + +osc = NonResOscillator(fs, 1.1) +nr_phase, nr_ampl = osc.transform(s) +nr_phase = nr_phase[:, 0] +nr_phi_dif = phase_difference(ht_phase, nr_phase) + +osc = ResOscillator(fs, 1.1) +r_phase, r_ampl = osc.transform(s) +r_phase = r_phase[:, 0] +r_phi_dif = phase_difference(ht_phase, r_phase) + + +############################################################################### +# Results +# ----------------------------------------------------------------------------- +# Here we reproduce figure 1 from the original paper [1]_. + +############################################################################### +# The first row shows the test signal $s$ and its Hilbert amplitude $a_H$ ; one +# can see that ah does not represent a good envelope for $s$. On the contrary, +# the Hilbert-based phase estimation yields good results, and therefore we take +# it for the ground truth. +# Rows 2-4 show the difference between the Hilbert phase and causally +# estimated phases ($\phi_L$, $\phi_N$, $\phi_R$) are obtained by means of the +# locking-based technique, non-resonant and resonant oscillator, respectively). +# These panels demonstrate that the output of the developed causal algorithms +# is very close to the HT-phase. Notice that we show $\phi_H - \phi_N$ +# modulo $2\pi$, since the phase difference is not bounded. +f, ax = plt.subplots(4, 2, sharex=True, sharey=True, figsize=(12, 8)) +ax[0, 0].plot(time, s, time, ht_phase, lw=.75) +ax[0, 0].set_ylabel(r"$s,\phi_H$") +ax[0, 0].set_title("Signal and its Hilbert phase") + +ax[1, 0].plot(time, lb_phi_dif, lw=.75) +ax[1, 0].axhline(0, color="k", ls=":", zorder=-1) +ax[1, 0].set_ylabel(r"$\phi_H - \phi_L$") +ax[1, 0].set_ylim([-np.pi, np.pi]) +ax[1, 0].set_title("Phase locking approach") + +ax[2, 0].plot(time, nr_phi_dif, lw=.75) +ax[2, 0].axhline(0, color="k", ls=":", zorder=-1) +ax[2, 0].set_ylabel(r"$\phi_H - \phi_N$") +ax[2, 0].set_ylim([-np.pi, np.pi]) +ax[2, 0].set_title("Nonresonant oscillator") + +ax[3, 0].plot(time, r_phi_dif, lw=.75) +ax[3, 0].axhline(0, color="k", ls=":", zorder=-1) +ax[3, 0].set_ylim([-np.pi, np.pi]) +ax[3, 0].set_ylabel("$\phi_H - \phi_R$") +ax[3, 0].set_xlabel("Time") +ax[3, 0].set_title("Resonant oscillator") + +ax[0, 1].plot(time, s, time, ht_ampl, lw=.75) +ax[0, 1].set_ylabel(r"$s,a_H$") +ax[0, 1].set_title("Signal and its Hilbert amplitude") + +ax[1, 1].axis("off") + +ax[2, 1].plot(time, s, time, nr_ampl, lw=.75) +ax[2, 1].set_ylabel(r"$s,a_N$") +ax[2, 1].set_title("Amplitudes") +ax[2, 1].set_title("Nonresonant oscillator") + +ax[3, 1].plot(time, s, time, r_ampl, lw=.75) +ax[3, 1].set_xlabel("Time") +ax[3, 1].set_ylabel(r"$s,a_R$") +ax[3, 1].set_title("Resonant oscillator") +plt.suptitle("Amplitude (right) and phase (left) estimation algorithms") +plt.tight_layout() +plt.show() \ No newline at end of file diff --git a/meegkit/phase.py b/meegkit/phase.py new file mode 100644 index 00000000..8e3cc87f --- /dev/null +++ b/meegkit/phase.py @@ -0,0 +1,621 @@ + +"""Real-time phase and amplitude estimation using resonant oscillators. + +Notes +----- +While the original paper [1]_ only describes the algorithm for a single +channel, this implementation can handle multiple channels. However, the +algorithm is not optimized for this purpose. Therefore, this code is likely to +be slow for large input arrays (n_channels >> 10), since an individual +oscillator is instantiated for each channel. + +.. [1] Rosenblum, M., Pikovsky, A., Kühn, A.A. et al. Real-time estimation + of phase and amplitude with application to neural data. Sci Rep 11, 18037 + (2021). https://doi.org/10.1038/s41598-021-97560-5 +""" +import numpy as np + +from meegkit.utils.buffer import Buffer + + +class Device: + """Measurement device. + + This class implements a measurement device, as describes by Rosenblum et al. + + Parameters + ---------- + om0 : float + Oscillator frequency (estimation). + dt : float + Sampling interval. + damping : float + Damping parameter. + type : str in {"integrator", "oscillator"} + Type of the device. + mu : float + Integrator parameter (only for type="integrator"). Should be >> target + frequency of interest. + + """ + + def __init__(self, om0, dt, damping, type, mu=500): + self.om0 = om0 + self.dt = dt + self.damping = damping + self.type = type + self.init_coefs(om0, dt, damping) + + # For the integrator + if type == "integrator": + self.mu = mu + self.edelmu = np.exp(-self.dt / self.mu) + + self.state = dict(x=0, y=0) + + + def init_coefs(self, om0, dt, damping): + """Initialize coefficients for solving oscillator's equations.""" + self.c1, self.c2, self.c3, self.enuDel, self.ealDel, self.eta = \ + init_coefs(om0, dt, damping) + self.om0 = om0 + + def step(self, sprev, s, snew): + """Perform one step of the oscillator equations. + + Parameters + ---------- + x : float + State variable x. + y : float + State variable y. + sprev : float + Previous measurement. + s : float + Current measurement. + snew : float + New measurement. + + Returns + ------- + x : float + Updated state variable x. + y : float + Updated state variable y. + """ + if self.type == "oscillator": + x = self.state["x"] + y = self.state["y"] + x, y = one_step_oscillator(x, y, self.damping, self.eta, self.enuDel, + self.ealDel, self.c1, self.c2, self.c3, + sprev, s, snew) + self.state["x"] = x + self.state["y"] = y + + elif self.type == "integrator": + x = self.state["x"] + x = one_step_integrator(x, self.edelmu, self.mu, self.dt, + sprev, s, snew) + self.state["x"] = x + + return self + + +class NonResOscillator: + """Real-time measurement of phase and amplitude using non-resonant oscillator. + + This estimator relies on the resonance effect. The measuring “device” consists + of two linear damped oscillators. The oscillators' frequency is much larger + than the frequency of the signal, i.e., the system is far from resonance. + We choose the damping parameters to ensure that (i) the phase of the first + linear oscillator equals that of the input and that (ii) amplitude of the + second one and the input relate by a known constant multiplicator. The + technique yields both phase and amplitude of the input signal. + + This estimator includes an automated frequency-tuning algorithm to adjust + to the a priori unknown signal frequency. + + For a detailed description, refer to [1]_. + + Parameters + ---------- + fs : float + Sampling frequency in Hz. + nu : float + Rough estimate of the main frequency of interest. + + References + ---------- + .. [1] Rosenblum, M., Pikovsky, A., Kühn, A.A. et al. Real-time estimation + of phase and amplitude with application to neural data. Sci Rep 11, 18037 + (2021). https://doi.org/10.1038/s41598-021-97560-5 + """ + + def __init__(self, fs=250, nu=1.1): + + # Parameters of the measurement "devices" + self.dt = 1 / fs # Sampling interval + self.nu = nu # Rough estimate of the tremor frequency + self.om0 = 5 * nu # Oscillator frequency (estimation) + self.alpha_a = 6.0 # Damping parameter for the "amplitude device" + self.gamma_a = self.alpha_a / 2 + self.alpha_p = 0.2 # Damping parameter for the "phase device" + self.gamma_p = self.alpha_p / 2 + self.factor = np.sqrt((self.om0 ** 2 - nu ** 2) ** 2 + (self.alpha_a * nu) ** 2) + + # Update parameters, and precomputed quantities + update_factor = 5 + self.memory = round(2 * np.pi / self.om0 / self.dt) + self.update_point = 2 * self.memory + self.update_step = round(self.memory / update_factor) + self._tbuf = np.arange(self.memory) * self.dt + self._Sx = np.sum(self._tbuf) + self._denom = self.memory * np.sum(self._tbuf ** 2) - self._Sx ** 2 + + # Buffer to store past input values + self.n_channels = None + self.buffer = None + + def _set_devices(self, n_channels): + # Set up the phase and amplitude "devices" + self.adevice = [Device(self.om0, self.dt, self.gamma_a, "oscillator") + for _ in range(n_channels)] + self.pdevice = [Device(self.om0, self.dt, self.gamma_p, "oscillator") + for _ in range(n_channels)] + + + def transform(self, X): + """Transform the input signal into phase and amplitude estimates. + + Parameters + ---------- + X : ndarray, shape=(n_samples, n_channels) + The input signal to be transformed. + + Returns + ------- + phase : ndarray, shape=(n_samples, n_channels) + Current phase estimate. + ampl : ndarray, shape=(n_samples, n_channels) + Current amplitude estimate. + """ + if self.buffer is None: + self.n_channels = 1 if X.ndim < 2 else X.shape[1] + self.buffer = Buffer(self.memory, self.n_channels) + self.phase_buffer = Buffer(self.memory, self.n_channels) + self._set_devices(self.n_channels) + + n_samples = X.shape[0] + phase = np.zeros((n_samples, self.n_channels)) + amp = np.zeros((n_samples, self.n_channels)) + + for k in range(n_samples): + self.buffer.push(X[k]) + + if self.buffer.counter < 3: + continue # Skip the first two samples + + # Amplitude estimation + spp, sp, s = self.buffer.view(3) + + for ch in range(self.n_channels): + self.adevice[ch].step(spp, sp, s) + z = self.adevice[ch].state["y"] / self.nu + amp[k, ch] = self.factor * \ + np.sqrt(z ** 2 + self.adevice[ch].state["x"] ** 2) + + # Phase estimation + self.pdevice[ch].step(spp, sp, s) + z = self.pdevice[ch].state["y"] / self.nu + phase[k, ch] = np.arctan2(-z, self.pdevice[ch].state["x"]) + + self.phase_buffer.push(phase[k]) + + if k > self.update_point: + tmpphase = self.phase_buffer.view(self.memory) + for ch in range(self.n_channels): + tmp = np.unwrap(tmpphase[:, ch]) + self.nu = (self.memory * np.sum(self._tbuf * tmp) - + self._Sx * np.sum(tmp)) / self._denom + self.update_point += self.update_step + + return phase, amp + + +class ResOscillator: + """Real-time measurement of phase and amplitude using a resonant oscillator. + + In this version, the corresponding “device” consists of a linear oscillator + in resonance with the measured signal and of an integrating unit. It also + provides both phase and amplitude of the input signal. The method exploits + the known relation between the resonant oscillator's phase and amplitude + and those of the input. Additionally, the resonant oscillator acts as a + bandpass filter for experimental data. + + This filter also includes a frequency-adaptation algorithm, and removes + low-frequency trend (baseline fluctuations). For a detailed description, + refer to [1]_. + + Parameters + ---------- + npt : int + Number of points to be measured and processed + fs : float + Sampling frequency in Hz. + nu : float + Rough estimate of the tremor frequency. + freq_adaptation : bool + Whether to use the frequency adaptation algorithm (default=True). + + Returns + ------- + Dphase : numpy.ndarray + Array containing the phase values. + Dampl : numpy.ndarray + Array containing the amplitude values. + + References + ---------- + .. [1] Rosenblum, M., Pikovsky, A., Kühn, A.A. et al. Real-time estimation + of phase and amplitude with application to neural data. Sci Rep 11, 18037 + (2021). https://doi.org/10.1038/s41598-021-97560-5 + """ + + def __init__(self, fs=1000, nu=4.5, freq_adaptation=True): + + # Parameters of the measurement "device" + self.dt = 1 / fs # Sampling interval + self.om0 = 1.1 # Angular frequency + self.alpha = 0.3 * self.om0 + self.gamma = self.alpha / 2 + + # Parameters of adaptation algorithm + nperiods = 1 # Number of previous periods for frequency correction + npt_period = round(2 * np.pi / self.om0 / self.dt) # Number of points per period + self.memory = nperiods * npt_period # M points for frequency correction buffer + self.update_factor = 5 # Number of frequency updates per period + self.update_step = round(npt_period / self.update_factor) + self.updatepoint = 2 * self.memory + + # Precomputed quantities for linear fit for frequency adaptation + self._tbuf = np.arange(1, self.memory + 1) * self.dt + self._Sx = np.sum(self._tbuf) + self._denom = self.memory * np.sum(self._tbuf ** 2) - self._Sx ** 2 + + # Buffer to store past input values + self.n_channels = None + self.buffer = None + self.runav = 0. # Initial guess for the dc-component + self.freq_adaptation = freq_adaptation + + def _set_devices(self, n_channels): + # Set up the phase and amplitude "devices" + self.osc = [Device(self.om0, self.dt, self.gamma, "oscillator") + for _ in range(n_channels)] + self.int = [Device(self.om0, self.dt, self.gamma, "integrator") + for _ in range(n_channels)] + + def transform(self, X): + """Transform the input signal into phase and amplitude estimates. + + Parameters + ---------- + X : ndarray, shape (n_samples, n_channels) + The input signal to be transformed. + + Returns + ------- + phase : float + Current phase estimate. + ampl : float + Current amplitude estimate. + """ + if self.buffer is None: + self.n_channels = 1 if X.ndim < 2 else X.shape[1] + self.buffer = Buffer(self.memory, self.n_channels) + self.phase_buffer = Buffer(self.memory, self.n_channels) + self._demean = np.zeros((3, self.n_channels)) # past demeaned signal values + self._oscbuf = np.zeros((3, self.n_channels)) # past integrator values + self._set_devices(self.n_channels) + + n_samples = X.shape[0] + phase = np.zeros((n_samples, self.n_channels)) + ampl = np.zeros((n_samples, self.n_channels)) + + for k in range(n_samples): + self.buffer.push(X[k]) + self._demean = np.roll(self._demean, -1, axis=0) + self._demean[-1] = self.buffer.view(1) - self.runav # Baseline correction + + if self.buffer.counter < 3: + continue # Skip the first two samples + + for ch in range(self.n_channels): + # Perform one step of the oscillator equations + self.osc[ch].step(self._demean[-3, ch], + self._demean[-2, ch], + self._demean[-1, ch]) + + self._oscbuf[:, ch] = np.roll(self._oscbuf[:, ch], -1, axis=0) + self._oscbuf[-1, ch] = self.osc[ch].state["y"] + + self.int[ch].step(self._oscbuf[-3, ch], + self._oscbuf[-2, ch], + self._oscbuf[-1, ch]) + + # New phase and amplitude values + v = self.int[ch].mu * self.int[ch].state["x"] * self.osc[ch].om0 + + phase[k, ch] = np.arctan2(v, np.real(self.osc[ch].state["y"])) + ampl[k, ch] = self.alpha * np.sqrt(self.osc[ch].state["y"] ** 2 + v ** 2) + + self.phase_buffer.push(phase[k]) + + if self.freq_adaptation and k > self.updatepoint: + tmpphase = self.phase_buffer.view(self.memory) + # Recompute integration parameters for the new frequency + for ch in range(self.n_channels): + tmp = np.unwrap(tmpphase[:, ch]) + # Frequency via linear fit of the phases in the buffer + om0 = (self.memory * np.sum(self._tbuf * tmp) - + self._Sx * np.sum(tmp)) / self._denom + self.osc[ch].init_coefs(om0, self.dt, self.gamma) + + # Update running average + self.runav = np.mean(self.buffer.view(self.memory), axis=0, keepdims=True) + self.updatepoint += self.update_step # Point for the next update + + return phase, ampl + + +def locking_based_phase(s, dt, npt): + """Compute the locking-based phase. + + This technique exploits the ideas from the synchronization theory. It is + well-known that a force s(t) acting on a limit-cycle oscillator can entrain + it. It means that the oscillator's frequency becomes equal to that of the + force, and their phases differ by a constant. Thus, the phase of the locked + limit-cycle oscillator will correspond to the phase of the signal. For our + purposes, it is helpful to use the simplest oscillator model, the so-called + phase oscillator. To ensure the phase-locking to the force, we have to + adjust the oscillator's frequency to the signal's frequency. We assume that + we do not know the latter a priori, but can only roughly estimate it. We + propose a simple approach that starts with this estimate and automatically + tunes the “device's” frequency to ensure the locking and thus provides the + instantaneous phase. + + Note that the amplitude is not determined with this approach. Technically, + the algorithm reduces to solving differential equation incorporating + measured data given at discrete time points; for details, see [1]_. + + Parameters + ---------- + s: array-like + Input signal. + dt: float + Time step. + npt: int + Number of points. + + Returns + ------- + lb_phase: ndarray + Locking-based phase. + + References + ---------- + .. [1] Rosenblum, M., Pikovsky, A., Kühn, A.A. et al. Real-time estimation + of phase and amplitude with application to neural data. Sci Rep 11, 18037 + (2021). https://doi.org/10.1038/s41598-021-97560-5 + """ + lb_phase = np.zeros(npt) + + # Parameters of the measurement device + epsilon = 0.8 # Amplitude of the oscillator + K = 0.5 # Frequency adaptation coefficient + n_rk_steps = 5 # Number of Runge-Kutta steps per time step + + # Parameters of the frequency adaptation algorithm + nu_est = 1.1 # initial guess for the frequency + update_factor = 20 # number of frequency updates per period + + # Buffer for frequency estimation + npbuf = round(2 * np.pi / nu_est / dt) # number of samples for frequency estimation + update_point = 2 * npbuf + update_step = round(npbuf / update_factor) + tbuf = np.arange(npbuf) * dt + sx = np.sum(tbuf) + denom = npbuf * np.sum(tbuf ** 2) - sx ** 2 + + for k in range(2, npt): + lb_phase[k] = rk(lb_phase[k - 1], dt, n_rk_steps, nu_est, epsilon, + s[k], s[k - 1], s[k]) + + if k > update_point: + buffer = lb_phase[k - npbuf:k] + nu_quasi = (npbuf * np.sum(tbuf * buffer) - sx * np.sum(buffer)) / denom + nu_est = nu_est + K * (nu_quasi - nu_est) + update_point += update_step + + return lb_phase + + +def rk(phi, dt, n_steps, omega, epsilon, s_prev, s, s_new): + """Runge-Kutta method for phase calculation. + + Parameters + ---------- + phi : float + Initial phase value. + dt : float + Time step size. + n_steps : int + Number of steps to iterate. + omega : float + Angular frequency. + epsilon : float + Amplitude of the oscillation. + s_prev : float + Previous phase value. + s : float + Current phase value. + s_new : float + New phase value. + + Returns + ------- + phi : float + The calculated phase value. + + """ + h = dt / n_steps + b = (s_new - s_prev) / dt / 2 + c = (s_prev - 2 * s + s_new) / dt ** 2 + hh = h / 2. + h6 = h / 6. + t = 0 + + for _ in range(n_steps): + th = t + hh + dy0 = omega - epsilon * (s + b * t + c * t ** 2) * np.sin(phi) + phit = phi + hh * dy0 + dyt = omega - epsilon * (s + b * th + c * th ** 2) * np.sin(phit) + phit = phi + hh * dyt + dym = omega - epsilon * (s + b * th + c * th ** 2) * np.sin(phit) + phit = phi + h * dym + dym = dym + dyt + t = t + h + dyt = omega - epsilon * (s + b * t + c * t ** 2) * np.sin(phit) + phi = phi + h6 * (dy0 + 2 * dym + dyt) + + return phi + + +def init_coefs(om0, dt, alpha): + """Compute coefficients for solving oscillator's equations. + + Parameters + ---------- + om0 : float + Oscillator frequency (estimation). + dt : float + Sampling interval. + alpha : float + Half of the damping. + + Returns + ------- + C1 : float + Coefficient C1. + C2 : float + Coefficient C2. + C3 : float + Coefficient C3. + eetadel : complex + Exponential term for amplitude device. + ealdel : float + Exponential term for amplitude device. + eta : float + Square root of the difference of oscillator frequency squared and + damping squared. + + """ + # alpha is the half of the damping: x'' + 2 * alpha * x' + om0^2 * x = input + eta2 = om0**2 - alpha**2 + eta2 = complex(eta2) if eta2 < 0 else eta2 + eta = np.sqrt(eta2) # replicate Matlab behavior + eetadel = np.exp(1j * eta * dt) + a = 1. / eetadel + ealdel = np.exp(alpha * dt) + + I1 = 1j * (a - 1) / eta + I2 = (a * (1 + 1j * eta * dt) - 1) / eta2 + I3 = (a * (dt * eta * (2 + 1j * dt * eta) - 2 * 1j) + 2 * 1j) / eta2 / eta + + C1 = (I3 - I2 * dt) / (2 * dt**2 * ealdel) + C2 = I1 - I3 / dt**2 + C3 = ealdel * (I2 * dt + I3) / (2 * dt**2) + + return C1, C2, C3, eetadel, ealdel, eta + + +def one_step_oscillator(x, xd, alpha, eta, eeta_del, eal_del, C1, C2, C3, spp, sp, s): + """Perform one step of the oscillator equations. + + Parameters + ---------- + x : float + State variable x. + xd : float + Derivative of state variable x. + alpha : float + Damping parameter. + eta : float + Square root of the difference of oscillator frequency squared and + damping squared. + eeta_del : complex + Exponential term for time step. + eal_del : float + Exponential term for time step. + C1 : float + Coefficient C1. + C2 : float + Coefficient C2. + C3 : float + Coefficient C3. + spp : float + Second orevious measurement. + sp : float + Previous measurement. + s : float + Current measurement. + + Returns + ------- + x : float + Updated state variable x. + xd : float + Updated derivative of state variable x. + """ + A = x - 1j * (xd + alpha * x + C1 * spp + C2 * sp + C3 * s) / eta + d = A * eeta_del + y = np.real(d) + yd = - eta * np.imag(d) + x = y / eal_del + xd = (yd - alpha * y) / eal_del + + return x, xd + + +def one_step_integrator(z, edelmu, mu, dt, spp, sp, s): + """Perform one step of the integrator in the oscillator equations. + + Parameters + ---------- + z : float + State variable z. + edelmu : float + Exponential term for integrator. + mu : float + Integrator parameter. + dt : float + Sampling interval. + spp : float + Second previous measurement. + sp : float + Previous measurement. + s : float + Current measurement. + + Returns + ------- + z : float + Updated state variable z. + """ + a = sp + b = (s - spp) / 2 / dt + c = (spp - 2 * sp + s) / 2 / dt ** 2 + d = -a + b * mu - 2 * c * (mu ** 2) + C0 = z + d + z = C0 * edelmu - d + b * dt - 2 * c * mu * dt + c * dt ** 2 + return z diff --git a/meegkit/utils/buffer.py b/meegkit/utils/buffer.py new file mode 100644 index 00000000..1cb2733f --- /dev/null +++ b/meegkit/utils/buffer.py @@ -0,0 +1,135 @@ +"""Buffer class for real-time processing.""" +import warnings + +import numpy as np + + +class Buffer: + """Circular buffer for real-time processing. + + Parameters + ---------- + size : int + The number of samples of the buffer. + n_channels : int + The number of channels of the buffer. + + Attributes + ---------- + size : int + The number of samples of the buffer. + n_channels : int + The number of channels of the buffer. + counter : int + The number of samples in the buffer. + head : int + The index of the most recent sample in the buffer. + tail : int + The index of the most recent read sample in the buffer. + _data : ndarray, shape (size, n_channels) + Data buffer. + + + """ + + def __init__(self, size, n_channels=1): + self.size = size + self.n_channels = n_channels + self.reset() + + def reset(self): + """Reset the buffer.""" + self.counter = 0 # number of samples in the buffer + self._head = 0 # most recent sample in the buffer + self._tail = 0 # most recent read sample in the buffer + self._data = np.zeros((self.size, self.n_channels)) + + def push(self, X): + """Update the buffer with new data. + + Parameters + ---------- + X : ndarray, shape (n_samples, n_channels) + The input signal to be transformed. + """ + # Check input + if X.ndim == 0: + # X = np.array([X]) + n_samples = 1 + elif X.ndim == 1: + X = X[:, None] + n_samples = X.shape[0] + elif X.ndim > 2: + raise ValueError("X must be 1- or 2-dimensional") + + # Check size + if n_samples > self.size: + warnings.warn("Buffer overflow: some old data has been lost") + X = X[-self.size:] # keep only the last self.size samples + n_samples = X.shape[0] + + # Warn when data has not been read for a long time + if self.counter + n_samples - self._tail > self.size: + warnings.warn("Buffer overflow: some old data has been discarded") + self._tail = self._head + n_samples - self.size + + # Update buffer + end = self.head + n_samples + if end <= self.size: + # If the new data fits in the remaining space + self._data[self.head:end] = X + else: + # If the new data wraps around to the beginning of the buffer + overflow = end - self.size + self._data[self.head:] = X[:-overflow] + self._data[:overflow] = X[-overflow:] + + self._head += n_samples + self.counter += n_samples + + def get_new_samples(self, n_samples=None): + """Consume n_samples.""" + if n_samples is None: + n_samples = min(self.counter, self.size) + elif n_samples > self.counter: + raise ValueError("Not enough samples in the buffer") + elif n_samples > self.size: + raise ValueError("Requested more samples than buffer size") + + start = self.tail + end = (self.tail + n_samples) % self.size + + if start < end: + samples = self._data[start:end] + else: + samples = np.concatenate((self._data[start:], self._data[:end])) + + self._tail += n_samples + return samples + + def view(self, n_samples): + """View the last n_samples, without consuming them.""" + start = (self.head - n_samples) % self.size + end = self.head + if start < end: + return self._data[start:end] + else: + return np.concatenate((self._data[start:], self._data[:end])) + + def __repr__(self) -> str: + """Return complete string representation.""" + repr = f"Buffer({self.size}, {self.n_channels})\n" + repr += f"> counter: {self.counter}\n" + repr += f"> head: {self.head}\n" + repr += f"> tail: {self.tail}\n" + return repr + + @property + def tail(self): + """Get the index of the tail.""" + return self._tail % self.size + + @property + def head(self): + """Get the index of the head.""" + return self._head % self.size diff --git a/tests/test_buffer.py b/tests/test_buffer.py new file mode 100644 index 00000000..f2fbf030 --- /dev/null +++ b/tests/test_buffer.py @@ -0,0 +1,72 @@ +import numpy as np +import pytest + +from meegkit.utils.buffer import Buffer + + +def test_push(): + buf = Buffer(32, 1) + data = np.arange(1000) + buf.push(data[:10]) + assert buf.counter == 10 + assert buf.head == 10 + assert buf.tail == 0 + +def test_get_new_samples(): + buf = Buffer(32, 1) + data = np.arange(1000) + buf.push(data[:10]) + samples = buf.get_new_samples(5) + np.testing.assert_array_equal(samples.flatten(), data[:5]) + assert buf.tail == 5 + samples = buf.get_new_samples(5) + np.testing.assert_array_equal(samples.flatten(), data[5:10]) + +def test_reset(): + buf = Buffer(32, 1) + data = np.arange(1000) + buf.push(data[:10]) + buf.reset() + assert buf.counter == 0 + assert buf.head == 0 + assert buf.tail == 0 + +def test_repr(): + buf = Buffer(32, 1) + repr = buf.__repr__() + assert repr == "Buffer(32, 1)\n> counter: 0\n> head: 0\n> tail: 0\n" + +def test_overflow_warning(): + buf = Buffer(32, 1) + data = np.arange(1000) + with pytest.warns(UserWarning, match="Buffer overflow: some old data has been lost"): + buf.push(data) + +def test_repeated_push(): + buf = Buffer(32, 1) + data = np.arange(1000) + + print(buf) + buf.push(data[:10]) + print(buf) + buf.push(data[10:20]) + print(buf) + + with pytest.warns(UserWarning, match="Buffer overflow: some old data has been discarded"): + buf.push(data[20:40]) + print(buf) + assert buf.counter == 40 + assert buf.head == 8 + assert buf._head == 40 + assert buf.tail == 8 + assert buf._tail == 8 + + out = buf.get_new_samples() + print(out.flatten()) + np.testing.assert_array_equal(out.flatten(), data[8:40]) # first 8 samples are lost + + +if __name__ == "__main__": + # test_get_new_samples() + # test_repeated_push() + pytest.main([__file__]) \ No newline at end of file diff --git a/tests/test_filters.py b/tests/test_filters.py new file mode 100644 index 00000000..93f42341 --- /dev/null +++ b/tests/test_filters.py @@ -0,0 +1,220 @@ +import matplotlib.pyplot as plt +import numpy as np +from scipy.signal import hilbert + +from meegkit.phase import ( + NonResOscillator, + ResOscillator, + locking_based_phase, +) + + +def test_noisy_signal(show=True): + """Replicate the figure from the paper.""" + # Model data, all three algorithms + npt = 100000 + fs = 100 + s0 = generate_multi_comp_data(npt, fs) # Generate test data + s = generate_noisy_signal(npt, fs, noise=0.8) + dt = 1 / fs + time = np.arange(npt) * dt + + # The test signal s and its Hilbert amplitude ah (red); one can see that + # ah does not represent a good envelope for s. On the contrary, the + # Hilbert-based phase estimation yields good results, and therefore we take + # it for the ground truth. + gta = np.abs(hilbert(s0)) # ground truth amplitude + gtp = np.angle(hilbert(s0)) # ground truth phase + + hta = np.abs(hilbert(s)) # Hilbert amplitude + htp = np.angle(hilbert(s)) # Hilbert phase + + osc = NonResOscillator(fs, 1.1) + nrp, nra = osc.transform(s) + + osc = ResOscillator(fs, 1.1) + rp, ra = osc.transform(s) + + if show: + f, ax = plt.subplots(3, 2, sharex="col", sharey=True, figsize=(12, 8)) + ax[0, 0].plot(time, gtp, lw=.75, label="Ground truth") + ax[0, 0].plot(time, htp, lw=.75, label=r"$\phi_H$") + ax[0, 0].set_ylabel(r"$\phi_H$") + ax[0, 0].set_title("Signal and its Hilbert phase") + + ax[1, 0].plot(time, gtp, lw=.75, label="Ground truth") + ax[1, 0].plot(time, nrp, lw=.75, label=r"$\phi_N$") + ax[1, 0].set_ylabel(r"$\phi_N$") + ax[1, 0].set_ylim([-np.pi, np.pi]) + ax[1, 0].set_title("Nonresonant oscillator") + + ax[2, 0].plot(time, gtp, lw=.75, label="Ground truth") + ax[2, 0].plot(time, rp, lw=.75, label=r"$\phi_N$") + ax[2, 0].set_ylim([-np.pi, np.pi]) + ax[2, 0].set_ylabel("$\phi_H - \phi_R$") + ax[2, 0].set_xlabel("Time") + ax[2, 0].set_title("Resonant oscillator") + + ax[0, 1].plot(time, gta, lw=.75, label="Ground truth") + ax[0, 1].plot(time, hta, lw=.75, label=r"$a_H$") + ax[0, 1].plot(time, s, lw=.75, label="Signal", color="grey", alpha=.5, zorder=0) + ax[0, 1].set_ylabel(r"$a_H$") + ax[0, 1].set_title("Signal and its Hilbert amplitude") + + ax[1, 1].plot(time, gta, lw=.75, label="Ground truth") + ax[1, 1].plot(time, nra, lw=.75, label=r"$a_N$") + ax[1, 1].set_ylabel(r"$a_N$") + ax[1, 1].set_title("Amplitudes") + ax[1, 1].set_title("Nonresonant oscillator") + + # The resonant oscillator should be much more robust to noise + ax[2, 1].plot(time, gta, lw=.75, label="Ground truth") + ax[2, 1].plot(time, ra, lw=.75, label=r"$a_R$") + ax[2, 1].set_xlabel("Time") + ax[2, 1].set_ylabel(r"$a_R$") + ax[2, 1].set_title("Resonant oscillator") + plt.suptitle("Amplitude (right) and phase (left) - noisy signal") + + ax[2, 0].set_xlim([0, 40]) + ax[2, 1].set_xlim([0, 1000]) + plt.tight_layout() + plt.show() + + +def test_all_alg(show=False): + """Replicate the figure from the paper.""" + # Model data, all three algorithms + npt = 100000 + fs = 100 + s = generate_multi_comp_data(npt, fs) # Generate test data + dt = 1 / fs + time = np.arange(npt) * dt + + # The test signal s and its Hilbert amplitude ah (red); one can see that + # ah does not represent a good envelope for s. On the contrary, the + # Hilbert-based phase estimation yields good results, and therefore we take + # it for the ground truth. + ht_ampl = np.abs(hilbert(s)) # Hilbert amplitude + ht_phase = np.angle(hilbert(s)) # Hilbert phase + + lb_phase = locking_based_phase(s, dt, npt) + lb_phi_dif = phase_difference(ht_phase, lb_phase) + + osc = NonResOscillator(fs, 1.1) + nr_phase, nr_ampl = osc.transform(s) + nr_phase = nr_phase[:, 0] + nr_phi_dif = phase_difference(ht_phase, nr_phase) + + osc = ResOscillator(fs, 1.1) + r_phase, r_ampl = osc.transform(s) + r_phase = r_phase[:, 0] + r_phi_dif = phase_difference(ht_phase, r_phase) + + if show: + f, ax = plt.subplots(4, 2, sharex=True, sharey=True, figsize=(12, 8)) + ax[0, 0].plot(time, s, time, ht_phase, lw=.75) + ax[0, 0].set_ylabel(r"$s,\phi_H$") + # ax[0, 0].set_ylim([-2, 3]) + ax[0, 0].set_title("Signal and its Hilbert phase") + + ax[1, 0].plot(time, lb_phi_dif, lw=.75) + ax[1, 0].axhline(0, color="k", ls=":", zorder=-1) + ax[1, 0].set_ylabel(r"$\phi_H - \phi_L$") + ax[1, 0].set_ylim([-np.pi, np.pi]) + ax[1, 0].set_title("Phase locking approach") + + ax[2, 0].plot(time, nr_phi_dif, lw=.75) + ax[2, 0].axhline(0, color="k", ls=":", zorder=-1) + ax[2, 0].set_ylabel(r"$\phi_H - \phi_N$") + ax[2, 0].set_ylim([-np.pi, np.pi]) + ax[2, 0].set_title("Nonresonant oscillator") + + ax[3, 0].plot(time, r_phi_dif, lw=.75) + ax[3, 0].axhline(0, color="k", ls=":", zorder=-1) + ax[3, 0].set_ylim([-np.pi, np.pi]) + ax[3, 0].set_ylabel("$\phi_H - \phi_R$") + ax[3, 0].set_xlabel("Time") + ax[3, 0].set_title("Resonant oscillator") + + ax[0, 1].plot(time, s, time, ht_ampl, lw=.75) + ax[0, 1].set_ylabel(r"$s,a_H$") + ax[0, 1].set_title("Signal and its Hilbert amplitude") + + ax[1, 1].axis("off") + + ax[2, 1].plot(time, s, time, nr_ampl, lw=.75) + ax[2, 1].set_ylabel(r"$s,a_N$") + ax[2, 1].set_title("Amplitudes") + ax[2, 1].set_title("Nonresonant oscillator") + + ax[3, 1].plot(time, s, time, r_ampl, lw=.75) + ax[3, 1].set_xlabel("Time") + ax[3, 1].set_ylabel(r"$s,a_R$") + ax[3, 1].set_title("Resonant oscillator") + plt.suptitle("Amplitude (right) and phase (left) estimation algorithms") + plt.tight_layout() + plt.show() + + # Assert that the phase difference between the Hilbert phase and the + # phase estimated by the locking-based technique is small + assert np.mean(np.abs(lb_phi_dif)) < 0.27 + + # Assert that the phase difference between the Hilbert phase and the + # phase estimated by the non-resonant oscillator is small + assert np.mean(np.abs(nr_phi_dif)) < 0.2 + + # Assert that the phase difference between the Hilbert phase and the + # phase estimated by the resonant oscillator is small + assert np.mean(np.abs(r_phi_dif)) < 0.21 + + +def phase_difference(phi1, phi2): + cos_phi_dif = np.cos(phi1) * np.cos(phi2) + np.sin(phi1) * np.sin(phi2) + sin_phi_dif = np.sin(phi1) * np.cos(phi2) - np.cos(phi1) * np.sin(phi2) + phi_dif = np.arctan2(sin_phi_dif, cos_phi_dif) + + return phi_dif + +def generate_multi_comp_data(npt=40000, fs=100): + """Generate multi-component data with frequency modulation. + + Returns + ------- + s : np.ndarray + Generated data. + + """ + dt = 1 / fs + t = np.arange(1, npt + 1) * dt + omega1 = np.sqrt(2) / 30 # Frequency of the first component (slow) + omega2 = np.sqrt(5) / 60 # Frequency of the second component (fast) + amp = 1 + 0.95 * np.cos(omega1 * t) + p = t + 5 * np.sin(omega2 * t) + s = np.cos(p) + 0.2 * np.cos(2 * p + np.pi / 6) + 0.1 * np.cos(3 * p + np.pi / 3) + s *= amp + + return s + + +def generate_noisy_signal(npt=40000, fs=100, noise=0.1): + """Generate multi-component data with frequency modulation. + + Returns + ------- + s : np.ndarray + Generated data. + + """ + rng = np.random.default_rng(1) + # dt = 1 / fs + # t = np.arange(1, npt + 1) * dt + s = generate_multi_comp_data(npt, fs) + s += rng.random(npt) * noise + + return s + + +if __name__ == "__main__": + # Run the model_data_all_alg function + test_all_alg() + # test_noisy_signal() \ No newline at end of file