diff --git a/CHANGELOG.md b/CHANGELOG.md index 8111c06a..4008903e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +## [0.2.0] - 2025-07-26 + +### Added +- **Cortical Input Module**: Comprehensive cortical input generation functionality with multiple waveform types: + - `create_sinusoidal_cortical_input()` - Sinusoidal cortical inputs with configurable amplitude, frequency, offset, and phase + - `create_sawtooth_cortical_input()` - Sawtooth waveform inputs with adjustable width and phase + - `create_step_cortical_input()` - Step function inputs with configurable duration and height + - `create_ramp_cortical_input()` - Linear ramp inputs between start and end firing rates + - `create_trapezoid_cortical_input()` - Trapezoidal inputs with configurable rise, plateau, and fall times +- New example script `07_simulate_cortical_input.py` demonstrating cortical input simulation +- Enhanced spike train plotting functionality with improved visualization tools + +### Changed +- **API Simplification**: Removed explicit `load_nmodl_files()` calls from example scripts for cleaner user experience +- Improved code readability and organization in spike train classes +- Enhanced error handling and callback mechanisms in spike train simulation + +### Fixed +- Fixed callback errors in spike train simulation for improved stability +- Improved code structure and readability across multiple modules + ## [0.1.1] - 2025-07-26 ### Added diff --git a/examples/00_simulate_recruitment_thresholds.py b/examples/00_simulate_recruitment_thresholds.py index 6fd23db1..b5b5528d 100644 --- a/examples/00_simulate_recruitment_thresholds.py +++ b/examples/00_simulate_recruitment_thresholds.py @@ -25,6 +25,7 @@ import joblib + from myogen import simulator ############################################################################## diff --git a/examples/01_simulate_spike_trains.py b/examples/01_simulate_spike_trains.py index a422f28a..d3b5e86c 100644 --- a/examples/01_simulate_spike_trains.py +++ b/examples/01_simulate_spike_trains.py @@ -38,7 +38,7 @@ from matplotlib import pyplot as plt from myogen import simulator, RANDOM_GENERATOR -from myogen.utils import load_nmodl_files + from myogen.utils.currents import create_trapezoid_current from myogen.utils.plotting import plot_spike_trains diff --git a/examples/07_simulate_cortical_input.py b/examples/07_simulate_cortical_input.py new file mode 100644 index 00000000..a62c95a3 --- /dev/null +++ b/examples/07_simulate_cortical_input.py @@ -0,0 +1,242 @@ +""" +Cortical Inputs and Motor Unit Spike Trains +================================== + +Instead of using injected currents, we can simulate the spike trains of the motor units by using cortical inputs. + +.. note:: + The spike trains are simulated using the **NEURON simulator** wrapped by **PyNN**. + This way we can simulate accurately the biophysical properties of the motor units. +""" + +############################################################################## +# Import Libraries +# ---------------- +# +# .. important:: +# In **MyoGen** all **random number generation** is handled by the ``RANDOM_GENERATOR`` object. +# +# This object is a wrapper around the ``numpy.random`` module and is used to generate random numbers. +# +# It is intended to be used with the following API: +# +# .. code-block:: python +# +# from myogen import simulator, RANDOM_GENERATOR +# +# To change the default seed, use ``set_random_seed``: +# +# .. code-block:: python +# +# from myogen import set_random_seed +# set_random_seed(42) + +from pathlib import Path + +import joblib +from myogen.utils.currents import create_trapezoid_current +import numpy as np +from matplotlib import pyplot as plt + +from myogen import simulator, RANDOM_GENERATOR +from myogen.utils import load_nmodl_files +from myogen.utils.cortical_inputs import create_sinusoidal_cortical_input +from myogen.utils.plotting import plot_spike_trains + +############################################################################## +# Define Parameters +# ----------------- +# In this example we will simulate a **motor pool** using the **recruitment thresholds** generated in the previous example. +# +# This motor pool will have **two different randomly generated trapezoidal ramp currents** injected into the motor units. +# +# The parameters of the input current are: +# +# - ``n_pools``: Number of distinct motor neuron pools +# - ``timestep``: Simulation timestep in ms (high resolution) +# - ``simulation_time``: Total simulation duration in ms +# +# To simulate realistic spike trains, we will also add a **common noise current source** to each neuron. +# The parameters of the noise current are: +# +# - ``noise_mean``: Mean noise current in nA +# - ``noise_stdev``: Standard deviation of noise current in nA + +n_pools = 2 # Number of distinct motor neuron pools + +timestep = 0.05 # Simulation timestep in ms (high resolution) +simulation_time = 2000 # Total simulation duration in ms + +noise_mean = 26 # Mean noise current in nA +noise_stdev = 20 # Standard deviation of noise current in nA + +############################################################################## +# Create Cortical Inputs +# ------------------------------ +# +# To drive the motor units, we use **cortical inputs**. +# +# In this example, we use a **sinusoidal input firing rate** which is generated using the ``create_sinusoidal_cortical_input`` function. +# +# .. note:: +# More convenient functions for generating input current profiles are available in the ``myogen.utils.cortical_inputs`` module. + +# Calculate number of time points +t_points = int(simulation_time / timestep) + +# Generate random parameters for each pool's cortical input +amplitude_range = list(RANDOM_GENERATOR.uniform(20, 80, size=n_pools)) +offsets = list(RANDOM_GENERATOR.uniform(50, 100, size=n_pools)) +frequencies = list(RANDOM_GENERATOR.uniform(0.5, 10, size=n_pools)) +phases = list(RANDOM_GENERATOR.uniform(0, 2 * np.pi, size=n_pools)) + +CST_number = 400 +connection_prob = 0.3 + + +print(f"\nCortical inputs parameters:") +for i in range(n_pools): + print( + f" Pool {i + 1}: amplitude={amplitude_range[i]:.1f} pps, " + f"offset={offsets[i]:.1f} pps, " + f"frequency={frequencies[i]:.1f} Hz, " + f"phase={phases[i]:.1f} rad" + ) + +# Create the cortical input matrix +cortical_input__matrix = create_sinusoidal_cortical_input( + n_pools, + t_points, + timestep, + amplitudes__pps=amplitude_range, + frequencies__Hz=frequencies, + offsets__pps=offsets, + phases__rad=phases, +) + +print( + f"\nCortical input matrix shape: {cortical_input__matrix.shape}\n amplitude={amplitude_range[i]:.1f} pps \n offset={offsets[i]:.1f} pps\n frequency={frequencies[i]:.1f} Hz\nphase={phases[i]:.1f} rad" +) + + +############################################################################## +# Create Motor Neuron Pools +# ------------------------- +# +# Since the **recruitment thresholds** are already generated, we can load them from the previous example using ``joblib``. +# +# Afterwards the custom files made for the ``NEURON`` simulator must be loaded. +# +# .. note:: +# This step is required as ``NEURON`` does not support the simulation of motor units directly. +# +# This is done using the ``load_nmodl_files`` function. +# +# Finally, the **motor neuron pools** are created using the ``MotorNeuronPool`` object. +# +# .. note:: +# The **MotorNeuronPool** object handles the simulation of the motor units. +# + +save_path = Path("./results") + +# Load recruitment thresholds +recruitment_thresholds = joblib.load(save_path / "thresholds.pkl") + + +# Load NEURON mechanism +load_nmodl_files() + +# Create motor neuron pool +motor_neuron_pool = simulator.MotorNeuronPool(recruitment_thresholds) + +# Compute MVC current threshold +mvc_current_threshold = motor_neuron_pool.mvc_current_threshold + +print(f"\nMVC current threshold: {mvc_current_threshold:.1f} nA") + +############################################################################## +# Simulate Motor Unit Spike Trains +# --------------------------------- +# +# The **motor unit spike trains** are simulated using the ``generate_spike_trains`` method of the ``MotorNeuronPool`` object. + +spike_trains_matrix, active_neuron_indices, data = ( + motor_neuron_pool.generate_spike_trains( + cortical_input__matrix=cortical_input__matrix, + timestep__ms=timestep, + CST_number=CST_number, + connection_prob=connection_prob, + ) +) + + +# Save motor neuron pool for later analysis +joblib.dump(motor_neuron_pool, save_path / "motor_neuron_pool.pkl") + +print(f"\nSpike trains shape: {spike_trains_matrix.shape}") +print(f" - {spike_trains_matrix.shape[0]} pools") +print(f" - {spike_trains_matrix.shape[1]} neurons per pool") +print(f" - {spike_trains_matrix.shape[2]} time points\n") + +############################################################################## +# Calculate and Display Statistics +# --------------------------------- +# +# It might be of interest to calculate the **firing rates** of the motor units. +# +# .. note:: +# The **firing rates** are calculated as the number of spikes divided by the simulation time. +# The simulation time is in milliseconds, so we need to convert it to seconds. + +# Calculate firing rates for each pool +firing_rates = np.zeros((n_pools, len(motor_neuron_pool.recruitment_thresholds))) +for pool_idx in range(n_pools): + for neuron_idx in range(len(motor_neuron_pool.recruitment_thresholds)): + spike_count = np.sum(spike_trains_matrix[pool_idx, neuron_idx, :]) + firing_rates[pool_idx, neuron_idx] = spike_count / (simulation_time / 1000.0) + +print(f"\nFiring rate statistics:") +for pool_idx in range(n_pools): + active_neurons = np.sum(firing_rates[pool_idx, :] > 0) + mean_rate = np.mean(firing_rates[pool_idx, firing_rates[pool_idx, :] > 0]) + max_rate = np.max(firing_rates[pool_idx, :]) + + print( + f" Pool {pool_idx + 1}: {active_neurons}/{len(motor_neuron_pool.recruitment_thresholds)} active neurons, " + f"mean rate: {mean_rate:.1f} Hz, max rate: {max_rate:.1f} Hz" + ) + +############################################################################## +# Visualize Spike Trains +# ---------------------- +# +# The **spike trains** can be visualized using the ``plot_spike_trains`` function. +# +# .. note:: +# **Plotting helper functions** are available in the ``myogen.utils.plotting`` module. +# +# .. code-block:: python +# +# from myogen.utils.plotting import plot_spike_trains + +# Suppress font warnings to keep output clean +import warnings +import logging + +warnings.filterwarnings("ignore", message=".*Font family.*not found.*") +warnings.filterwarnings("ignore", message=".*findfont.*") +logging.getLogger("matplotlib.font_manager").setLevel(logging.ERROR) + +print("Plotting spike trains...") +with plt.xkcd(): + _, ax = plt.subplots(figsize=(10, 6)) + plot_spike_trains( + spike_trains__matrix=spike_trains_matrix, + timestep__ms=timestep, + axs=[ax], + cortical_input__matrix=cortical_input__matrix, + pool_to_plot=[0], + ) +plt.tight_layout() +plt.show() diff --git a/myogen/simulator/core/spike_train/classes.py b/myogen/simulator/core/spike_train/classes.py index 7f558d1d..ab739874 100644 --- a/myogen/simulator/core/spike_train/classes.py +++ b/myogen/simulator/core/spike_train/classes.py @@ -204,47 +204,49 @@ class SpikeSourceGammaStart(SpikeSourceGamma): # Classe SetRate para alterar a taxa de disparo -class SetRate(object): +class SetRateFromArray(object): """ A callback which changes the firing rate of a population of poisson processes at a fixed interval, based on the forces of the muscle units. - """ - - def __init__( - self, population_source, population_neuron, force_objects, interval=20.0 - ): - self.population_source = population_source - self.population_neuron = population_neuron - self.force_objects = force_objects - self.interval = interval - - def __call__(self, t): - total_force = sum(force.F for force in self.force_objects.values()) - rate = 83 + ((200 - total_force) * 0.01) - self.population_source.set(beta=rate) - return t + self.interval - - -class SetRate(object): - """ - A callback which changes the firing rate of a population of poisson - processes at a fixed interval, based on the forces of the muscle units. + Parameters + ---------- + population_source : pyNN.Population + The population of poisson processes to change the firing rate of. + population_neuron : pyNN.Population + The population of neurons to change the firing rate of. + firing_rate : numpy.ndarray + The firing rate of the poisson processes. + timestep__ms : float, optional + The timestep in milliseconds. It is used to calculate the index of the firing rate array. Default is 0.05. + interval : float, optional + The interval in milliseconds. This is the time between updates of the firing rate. Default is 20.0. + + Returns + ------- + t : float + The time in milliseconds. """ def __init__( - self, population_source, population_neuron, force_objects, interval=20.0, ref=0 + self, + population_source, + population_neuron, + firing_rate, + timestep__ms=0.05, + interval=20.0, ): self.population_source = population_source self.population_neuron = population_neuron - self.force_objects = force_objects + self.firing_rate = firing_rate + self.timestep__ms = timestep__ms self.interval = interval - self.ref = ref - print(f"valor: {self.ref}") def __call__(self, t): - total_force = sum(force.F for force in self.force_objects.values()) - rate = 83 + ((self.ref - total_force) * 0.01) + # Ensure index doesn't exceed array bounds + rate = self.firing_rate[ + min(int(t / self.timestep__ms), len(self.firing_rate) - 1) + ] self.population_source.set(beta=rate) return t + self.interval diff --git a/myogen/simulator/core/spike_train/spike_train.py b/myogen/simulator/core/spike_train/spike_train.py index 8bd52428..e28b8411 100644 --- a/myogen/simulator/core/spike_train/spike_train.py +++ b/myogen/simulator/core/spike_train/spike_train.py @@ -1,13 +1,19 @@ +from operator import call from typing import Any, Literal import numpy as np import pyNN.neuron as sim from pyNN.neuron.morphology import uniform, centre from pyNN.parameters import IonicSpecies +from pyNN.random import NumpyRNG import neo from myogen import RANDOM_GENERATOR from myogen.simulator.core.spike_train import functions, classes -from myogen.utils.types import SPIKE_TRAIN__MATRIX, INPUT_CURRENT__MATRIX +from myogen.utils.types import ( + SPIKE_TRAIN__MATRIX, + INPUT_CURRENT__MATRIX, + CORTICAL_INPUT__MATRIX, +) class MotorNeuronPool: @@ -155,10 +161,13 @@ def _create_motor_neuron_pool(self) -> classes.cell_class: def generate_spike_trains( self, - input_current__matrix: INPUT_CURRENT__MATRIX, + input_current__matrix: INPUT_CURRENT__MATRIX | None = None, + cortical_input__matrix: CORTICAL_INPUT__MATRIX | None = None, timestep__ms: float = 0.05, noise_mean__nA: float = 30, # noqa N803 noise_stdev__nA: float = 30, # noqa N803 + CST_number: int = 400, + connection_prob: float = 0.3, what_to_record: list[ dict[Literal["variables", "to_file", "sampling_interval", "locations"], Any] ] = [ @@ -173,15 +182,22 @@ def generate_spike_trains( Parameters ---------- - input_current__matrix : INPUT_CURRENT__MATRIX + input_current__matrix : INPUT_CURRENT__MATRIX, optional Matrix of shape (n_pools, t_points) containing current values Each row represents the current for one pool + cortical_input__matrix : CORTICAL_INPUT__MATRIX, optional + Matrix of shape (n_pools, t_points) containing cortical input values + Each row represents the cortical input for one pool timestep__ms : float Simulation timestep__ms in ms noise_mean__nA : float Mean of the noise current in nA noise_stdev__nA : float Standard deviation of the noise current in nA + CST_number : int + Number of neurons in the cortical input population. Only used if cortical_input__matrix is provided. Default is 400. + connection_prob : float + Probability of a connection between a cortical input neuron and a motor neuron. Only used if cortical_input__matrix is provided. Default is 0.3. what_to_record: WhatToRecord List of dictionaries specifying what to record. @@ -210,6 +226,17 @@ def generate_spike_trains( self.timestep__ms = timestep__ms sim.setup(timestep=self.timestep__ms) + if input_current__matrix is not None: + n_pools = input_current__matrix.shape[0] + simulation_time_points = input_current__matrix.shape[-1] + elif cortical_input__matrix is not None: + n_pools = cortical_input__matrix.shape[0] + simulation_time_points = cortical_input__matrix.shape[-1] + else: + raise ValueError( + "Either 'input_current__matrix' or 'cortical_input__matrix' must be provided." + ) + # Create motor neuron pools pools: list[sim.Population] = [ sim.Population( @@ -217,27 +244,56 @@ def generate_spike_trains( self._create_motor_neuron_pool(), initial_values={"v": -70}, ) - for _ in range(input_current__matrix.shape[0]) + for _ in range(n_pools) ] # Inject currents into each pool - one current per pool - self.times = np.arange(input_current__matrix.shape[-1]) * self.timestep__ms - for input_current, pool in zip(input_current__matrix, pools): - current_source = sim.StepCurrentSource( - times=self.times, amplitudes=input_current - ) - current_source.inject_into(pool, location="soma") - - # Add Gaussian noise current to each neuron in the pool - for neuron in pool: - noise_source = sim.NoisyCurrentSource( - mean=noise_mean__nA, - stdev=noise_stdev__nA, - start=0, - stop=self.times[-1] + self.timestep__ms, - dt=self.timestep__ms, + if input_current__matrix is not None: + self.times = np.arange(input_current__matrix.shape[-1]) * self.timestep__ms + for input_current, pool in zip(input_current__matrix, pools): + current_source = sim.StepCurrentSource( + times=self.times, amplitudes=input_current + ) + current_source.inject_into(pool, location="soma") + + # Add Gaussian noise current to each neuron in the pool + for neuron in pool: + noise_source = sim.NoisyCurrentSource( + mean=noise_mean__nA, + stdev=noise_stdev__nA, + start=0, + stop=self.times[-1] + self.timestep__ms, + dt=self.timestep__ms, + ) + noise_source.inject_into([neuron], location="soma") + + callbacks = [] + projections = [] + # Create cortical input + if cortical_input__matrix is not None: + for FR, pool in zip(cortical_input__matrix, pools): + spike_source = sim.Population( + CST_number, classes.SpikeSourceGammaStart(alpha=1) + ) + synapse = sim.StaticSynapse(weight=0.6, delay=0.2) + projection = sim.Projection( + spike_source, + pool, + sim.FixedProbabilityConnector( + connection_prob, location_selector="dendrite", rng=NumpyRNG() + ), + synapse, + receptor_type="syn", + ) + projections.append(projection) + callbacks.append( + classes.SetRateFromArray( + population_source=spike_source, + population_neuron=pool, + firing_rate=FR, + timestep__ms=self.timestep__ms, + ) ) - noise_source.inject_into([neuron], location="soma") # Set up recording for pool in pools: @@ -246,13 +302,13 @@ def generate_spike_trains( pool.record(**record) # Run simulation - sim.run(input_current__matrix.shape[-1] * self.timestep__ms) + sim.run(simulation_time_points * self.timestep__ms, callbacks=callbacks) sim.end() self.data = [pool.get_data().segments[0] for pool in pools] # Convert spike times to binary arrays and save - self.time_indices = np.arange(input_current__matrix.shape[-1]) + self.time_indices = np.arange(simulation_time_points) self.spike_trains = np.array( [ [ diff --git a/myogen/utils/cortical_inputs.py b/myogen/utils/cortical_inputs.py new file mode 100644 index 00000000..eeaa3272 --- /dev/null +++ b/myogen/utils/cortical_inputs.py @@ -0,0 +1,601 @@ +from typing import cast + +import numpy as np +from beartype import beartype + +from myogen.utils.types import CORTICAL_INPUT__MATRIX + + +@beartype +def create_sinusoidal_cortical_input( + n_pools: int, + t_points: int, + timestep__ms: float, + amplitudes__pps: float | list[float], + frequencies__Hz: float | list[float], + offsets__pps: float | list[float], + phases__rad: float | list[float] = 0.0, +) -> CORTICAL_INPUT__MATRIX: + """Create a matrix of sinusoidal cortical input for multiple pools. + + Parameters + ---------- + n_pools : int + Number of current pools to generate + t_points : int + Number of time points + timestep__ms : float + Time step in milliseconds + amplitudes__pps : float | list[float] + Amplitude(s) of the sinusoidal current(s) in pulses per second. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + frequencies__Hz : float | list[float] + Frequency(s) of the sinusoidal current(s) in Hertz. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + offsets__pps : float | list[float] + DC offset(s) to add to the sinusoidal current(s) in pulses per second. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + phases__rad : float | list[float] + Phase(s) of the sinusoidal current(s) in radians. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + + Raises + ------ + ValueError + If the amplitudes, frequencies, offsets, or phases are lists and the length of the parameters does not match n_pools + + Returns + ------- + CORTICAL_INPUT__MATRIX + Matrix of shape (n_pools, t_points) containing sinusoidal currents + """ + t = np.arange(0, t_points * timestep__ms, timestep__ms) + + # Convert parameters to lists + amplitudes_list = cast( + list[float], + [amplitudes__pps] * n_pools + if isinstance(amplitudes__pps, float) + else amplitudes__pps, + ) + frequencies_list = cast( + list[float], + [frequencies__Hz] * n_pools + if isinstance(frequencies__Hz, float) + else frequencies__Hz, + ) + offsets_list = cast( + list[float], + [offsets__pps] * n_pools if isinstance(offsets__pps, float) else offsets__pps, + ) + phases_list = cast( + list[float], + [phases__rad] * n_pools if isinstance(phases__rad, float) else phases__rad, + ) + + if len(amplitudes_list) != n_pools: + raise ValueError( + f"Length of amplitudes__pps ({len(amplitudes_list)}) must match n_pools ({n_pools})" + ) + if len(frequencies_list) != n_pools: + raise ValueError( + f"Length of frequencies__Hz ({len(frequencies_list)}) must match n_pools ({n_pools})" + ) + if len(offsets_list) != n_pools: + raise ValueError( + f"Length of offsets__pps ({len(offsets_list)}) must match n_pools ({n_pools})" + ) + if len(phases_list) != n_pools: + raise ValueError( + f"Length of phases__rad ({len(phases_list)}) must match n_pools ({n_pools})" + ) + + return np.array( + [ + ( + amplitudes_list[i] + * np.sin(2 * np.pi * frequencies_list[i] * t / 1000 + phases_list[i]) + + offsets_list[i] + ) + for i in range(n_pools) + ] + ) + + +@beartype +def create_sawtooth_cortical_input( + n_pools: int, + t_points: int, + timestep_ms: float, + amplitudes__pps: float | list[float], + frequencies__Hz: float | list[float], + offsets__pps: float | list[float] = 0.0, + widths: float | list[float] = 0.5, + phases__rad: float | list[float] = 0.0, +) -> CORTICAL_INPUT__MATRIX: + """Create a matrix of sawtooth currents for multiple pools. + + Parameters + ---------- + n_pools : int + Number of current pools to generate + t_points : int + Number of time points + timestep_ms : float + Time step in milliseconds + amplitudes__muV : float | list[float] + Amplitude(s) of the sawtooth current(s) in microvolts. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + + frequencies__Hz : float | list[float] + Frequency(s) of the sawtooth current(s) in Hertz. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + + offsets__muV : float | list[float] + DC offset(s) to add to the sawtooth current(s) in microvolts. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + + widths : float | list[float] + Width(s) of the rising edge as proportion of period (0 to 1). + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + + phases__rad : float | list[float] + Phase(s) of the sawtooth current(s) in radians. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + + Raises + ------ + ValueError + If the parameters are lists and the length of the parameters does not match n_pools + + Returns + ------- + CORTICAL_INPUT__MATRIX + Matrix of shape (n_pools, t_points) containing sawtooth currents + """ + t = np.arange(0, t_points * timestep_ms, timestep_ms) + + # Convert parameters to lists + amplitudes_list = cast( + list[float], + [amplitudes__pps] * n_pools + if isinstance(amplitudes__pps, float) + else amplitudes__pps, + ) + frequencies_list = cast( + list[float], + [frequencies__Hz] * n_pools + if isinstance(frequencies__Hz, float) + else frequencies__Hz, + ) + offsets_list = cast( + list[float], + [offsets__pps] * n_pools if isinstance(offsets__pps, float) else offsets__pps, + ) + widths_list = cast( + list[float], [widths] * n_pools if isinstance(widths, float) else widths + ) + phases_list = cast( + list[float], + [phases__rad] * n_pools if isinstance(phases__rad, float) else phases__rad, + ) + + if len(amplitudes_list) != n_pools: + raise ValueError( + f"Length of amplitudes__pps ({len(amplitudes_list)}) must match n_pools ({n_pools})" + ) + if len(frequencies_list) != n_pools: + raise ValueError( + f"Length of frequencies__Hz ({len(frequencies_list)}) must match n_pools ({n_pools})" + ) + if len(offsets_list) != n_pools: + raise ValueError( + f"Length of offsets__pps ({len(offsets_list)}) must match n_pools ({n_pools})" + ) + if len(widths_list) != n_pools: + raise ValueError( + f"Length of widths ({len(widths_list)}) must match n_pools ({n_pools})" + ) + if len(phases_list) != n_pools: + raise ValueError( + f"Length of phases__rad ({len(phases_list)}) must match n_pools ({n_pools})" + ) + + cortical_input__matrix = np.zeros((n_pools, t_points)) + + for i in range(n_pools): + phase_t = 2 * np.pi * frequencies_list[i] * t / 1000 + phases_list[i] + sawtooth = (phase_t / (2 * np.pi)) % 1 + sawtooth = np.where( + sawtooth < widths_list[i], + sawtooth / widths_list[i], + (1 - sawtooth) / (1 - widths_list[i]), + ) + cortical_input__matrix[i] = amplitudes_list[i] * sawtooth + offsets_list[i] + + return cortical_input__matrix + + +@beartype +def create_step_cortical_input( + n_pools: int, + t_points: int, + timestep_ms: float, + step_heights__pps: float | list[float], + step_durations__ms: float | list[float], + offsets__pps: float | list[float] = 0.0, +) -> CORTICAL_INPUT__MATRIX: + """Create a matrix of step currents for multiple pools. + + Parameters + ---------- + n_pools : int + Number of current pools to generate + t_points : int + Number of time points + timestep_ms : float + Time step in milliseconds. + step_heights__muV : float | list[float] + Step height(s) for the current(s) in microvolts. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + step_durations__ms : float | list[float] + Step duration(s) in milliseconds. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + offsets__muV : float | list[float] + DC offset(s) to add to the step current(s) in microvolts. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + + Raises + ------ + ValueError + If the parameters are lists and the length of the parameters does not match n_pools + + Returns + ------- + INPUT_CURRENT__MATRIX + Matrix of shape (n_pools, t_points) containing step currents + """ + # Convert parameters to lists + step_heights_list = cast( + list[float], + [step_heights__pps] * n_pools + if isinstance(step_heights__pps, float) + else step_heights__pps, + ) + step_durations_list = cast( + list[float], + [step_durations__ms] * n_pools + if isinstance(step_durations__ms, float) + else step_durations__ms, + ) + offsets_list = cast( + list[float], + [offsets__pps] * n_pools if isinstance(offsets__pps, float) else offsets__pps, + ) + + if len(step_heights_list) != n_pools: + raise ValueError( + f"Length of step_heights__pps ({len(step_heights_list)}) must match n_pools ({n_pools})" + ) + if len(step_durations_list) != n_pools: + raise ValueError( + f"Length of step_durations__ms ({len(step_durations_list)}) must match n_pools ({n_pools})" + ) + if len(offsets_list) != n_pools: + raise ValueError( + f"Length of offsets__pps ({len(offsets_list)}) must match n_pools ({n_pools})" + ) + + cortical_input__matrix = np.zeros((n_pools, t_points)) + + for i in range(n_pools): + current = np.zeros(t_points) + + # Create step: constant value for duration, then back to zero + duration_points = int(step_durations_list[i] / timestep_ms) + if duration_points > 0: + end_idx = min(duration_points, t_points) + current[:end_idx] = step_heights_list[i] + + cortical_input__matrix[i] = current + offsets_list[i] + + return cortical_input__matrix + + +@beartype +def create_ramp_cortical_input( + n_pools: int, + t_points: int, + start_fr__pps: float | list[float], + end_fr__pps: float | list[float], + offsets__pps: float | list[float] = 0.0, +) -> CORTICAL_INPUT__MATRIX: + """Create a matrix of ramp currents for multiple pools. + + Parameters + ---------- + n_pools : int + Number of current pools to generate + t_points : int + Number of time points + start_currents__muV : float | list[float] + Starting current(s) for the ramp in microvolts. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + + end_currents__muV : float | list[float] + Ending current(s) for the ramp in microvolts. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + + offsets__muV : float | list[float] + DC offset(s) to add to the ramp current(s) in microvolts. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + + + Raises + ------ + ValueError + If the parameters are lists and the length of the parameters does not match n_pools + + Returns + ------- + CORTICAL_INPUT__MATRIX + Matrix of shape (n_pools, t_points) containing ramp firing rates + """ + # Convert parameters to lists + start_fr_list = cast( + list[float], + [start_fr__pps] * n_pools + if isinstance(start_fr__pps, float) + else start_fr__pps, + ) + end_fr_list = cast( + list[float], + [end_fr__pps] * n_pools if isinstance(end_fr__pps, float) else end_fr__pps, + ) + offsets_list = cast( + list[float], + [offsets__pps] * n_pools if isinstance(offsets__pps, float) else offsets__pps, + ) + + if len(start_fr_list) != n_pools: + raise ValueError( + f"Length of start_fr__pps ({len(start_fr_list)}) must match n_pools ({n_pools})" + ) + if len(end_fr_list) != n_pools: + raise ValueError( + f"Length of end_fr__pps ({len(end_fr_list)}) must match n_pools ({n_pools})" + ) + if len(offsets_list) != n_pools: + raise ValueError( + f"Length of offsets__pps ({len(offsets_list)}) must match n_pools ({n_pools})" + ) + + cortical_input__matrix = np.zeros((n_pools, t_points)) + + for i in range(n_pools): + ramp = np.linspace(start_fr_list[i], end_fr_list[i], t_points) + cortical_input__matrix[i] = ramp + offsets_list[i] + + return cortical_input__matrix + + +@beartype +def create_trapezoid_cortical_input( + n_pools: int, + t_points: int, + timestep_ms: float, + amplitudes__pps: float | list[float], + rise_times__ms: float | list[float] = 100.0, + plateau_times__ms: float | list[float] = 200.0, + fall_times__ms: float | list[float] = 100.0, + offsets__pps: float | list[float] = 0.0, + delays__ms: float | list[float] = 0.0, +) -> CORTICAL_INPUT__MATRIX: + """Create a matrix of trapezoidal firing rates for multiple pools. + + Parameters + ---------- + n_pools : int + Number of current pools to generate + t_points : int + Number of time points + timestep_ms : float + Time step in milliseconds + amplitudes__pps : float | list[float] + Amplitude(s) of the trapezoidal current(s) in pps. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + + rise_times__ms : float | list[float] + Duration(s) of the rising phase in milliseconds. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + + plateau_times__ms : float | list[float] + Duration(s) of the plateau phase in milliseconds. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + + fall_times__ms : float | list[float] + Duration(s) of the falling phase in milliseconds. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + offsets__pps : float | list[float] + DC offset(s) to add to the trapezoidal current(s) in pulses per second. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + delays__ms : float | list[float] + Delay(s) before starting the trapezoid in milliseconds. + + Must be: + - Single float: used for all pools + - List of floats: must match n_pools + + Raises + ------ + ValueError + If the parameters are lists and the length of the parameters does not match n_pools + + Returns + ------- + CORTICAL_INPUT__MATRIX + Matrix of shape (n_pools, t_points) containing trapezoidal firing rates + """ + # Convert parameters to lists + amplitudes_list = cast( + list[float], + [amplitudes__pps] * n_pools + if isinstance(amplitudes__pps, float) + else amplitudes__pps, + ) + rise_times_list = cast( + list[float], + [rise_times__ms] * n_pools + if isinstance(rise_times__ms, float) + else rise_times__ms, + ) + plateau_times_list = cast( + list[float], + [plateau_times__ms] * n_pools + if isinstance(plateau_times__ms, float) + else plateau_times__ms, + ) + fall_times_list = cast( + list[float], + [fall_times__ms] * n_pools + if isinstance(fall_times__ms, float) + else fall_times__ms, + ) + offsets_list = cast( + list[float], + [offsets__pps] * n_pools if isinstance(offsets__pps, float) else offsets__pps, + ) + delays_list = cast( + list[float], + [delays__ms] * n_pools if isinstance(delays__ms, float) else delays__ms, + ) + + if len(amplitudes_list) != n_pools: + raise ValueError( + f"Length of amplitudes__pps ({len(amplitudes_list)}) must match n_pools ({n_pools})" + ) + if len(rise_times_list) != n_pools: + raise ValueError( + f"Length of rise_times__ms ({len(rise_times_list)}) must match n_pools ({n_pools})" + ) + if len(plateau_times_list) != n_pools: + raise ValueError( + f"Length of plateau_times__ms ({len(plateau_times_list)}) must match n_pools ({n_pools})" + ) + if len(fall_times_list) != n_pools: + raise ValueError( + f"Length of fall_times__ms ({len(fall_times_list)}) must match n_pools ({n_pools})" + ) + if len(offsets_list) != n_pools: + raise ValueError( + f"Length of offsets__pps ({len(offsets_list)}) must match n_pools ({n_pools})" + ) + if len(delays_list) != n_pools: + raise ValueError( + f"Length of delays__ms ({len(delays_list)}) must match n_pools ({n_pools})" + ) + + cortical_input__matrix = np.zeros((n_pools, t_points)) + + for i in range(n_pools): + # Calculate indices for each phase + delay_points = int(delays_list[i] / timestep_ms) + rise_points = int(rise_times_list[i] / timestep_ms) + plateau_points = int(plateau_times_list[i] / timestep_ms) + fall_points = int(fall_times_list[i] / timestep_ms) + + # Create the base trapezoid shape + trapezoid = np.zeros(t_points) + + # Calculate start indices for each phase + rise_start = delay_points + plateau_start = rise_start + rise_points + fall_start = plateau_start + plateau_points + end_idx = fall_start + fall_points + + # Ensure we don't exceed array bounds + if rise_start < t_points: + # Rising phase (linear ramp up) + rise_end = min(plateau_start, t_points) + if rise_end > rise_start: + points_to_fill = rise_end - rise_start + trapezoid[rise_start:rise_end] = np.linspace(0, 1, points_to_fill) + + # Plateau phase (constant) + if plateau_start < t_points: + plateau_end = min(fall_start, t_points) + if plateau_end > plateau_start: + trapezoid[plateau_start:plateau_end] = 1 + + # Falling phase (linear ramp down) + if fall_start < t_points: + fall_end = min(end_idx, t_points) + if fall_end > fall_start: + points_to_fill = fall_end - fall_start + trapezoid[fall_start:fall_end] = np.linspace( + 1, 0, points_to_fill + ) + + cortical_input__matrix[i] = amplitudes_list[i] * trapezoid + offsets_list[i] + + return cortical_input__matrix diff --git a/myogen/utils/plotting/spikes.py b/myogen/utils/plotting/spikes.py index 40d2f3bf..cc9f5ab4 100644 --- a/myogen/utils/plotting/spikes.py +++ b/myogen/utils/plotting/spikes.py @@ -11,7 +11,11 @@ from beartype import beartype from beartype.cave import IterableType -from myogen.utils.types import INPUT_CURRENT__MATRIX, SPIKE_TRAIN__MATRIX +from myogen.utils.types import ( + INPUT_CURRENT__MATRIX, + SPIKE_TRAIN__MATRIX, + CORTICAL_INPUT__MATRIX, +) # Configure multiple sources to suppress font warnings logging.getLogger("matplotlib.font_manager").setLevel(logging.ERROR) @@ -27,6 +31,7 @@ def plot_spike_trains( timestep__ms: float, axs: IterableType[Axes], pool_current__matrix: INPUT_CURRENT__MATRIX | None = None, + cortical_input__matrix: CORTICAL_INPUT__MATRIX | None = None, pool_to_plot: list[int] | None = None, apply_default_formatting: bool = True, **kwargs: Any, @@ -130,6 +135,45 @@ def plot_spike_trains( pc_min = 0 pc_max = 1 + if cortical_input__matrix is not None: + pc = cortical_input__matrix[_pool_to_plot[spike_pool_idx]] + + pc_min = np.min(pc) + pc_max = np.max(pc) + print(pc_min, pc_max) + pc_normalized = (pc - pc_min) / (pc_max - pc_min) + pc_normalized = pc_normalized * (index) + + ax.plot( + np.arange(0, len(pc)) * timestep__ms / 1000, + pc_normalized, + linestyle="--", + linewidth=1, + alpha=1, + zorder=0, + color="black", + label=f"Cortical\nInput Firing Rate", + ) + + if apply_default_formatting: + ax.legend(frameon=False) + + ax2 = ax.twinx() + ax2.spines["right"].set_color("black") + print("index", index) + ax2.set_ylim(0, index + 1) + ax2.set_yticks(np.linspace(0, index + 1, 10)) + ax2.set_yticklabels( + np.round( + np.linspace(0, index + 1, 10) * (pc_max - pc_min) / (index + 1) + + pc_min + ) + ) + ax2.set_ylabel("Firing rate (pps)") + + ax2.tick_params(axis="y", colors="black") + ax2.yaxis.label.set_color("black") + if pool_current__matrix is not None: pc = pool_current__matrix[_pool_to_plot[spike_pool_idx]] diff --git a/myogen/utils/types.py b/myogen/utils/types.py index a253e79b..96e76d3a 100644 --- a/myogen/utils/types.py +++ b/myogen/utils/types.py @@ -40,3 +40,9 @@ npt.NDArray[np.floating], Is[lambda x: x.ndim == 3], ] + +# Cortical input matrix: (mu_pools, time_points) +CORTICAL_INPUT__MATRIX = Annotated[ + npt.NDArray[np.floating], + Is[lambda x: x.ndim == 2], +] diff --git a/pyproject.toml b/pyproject.toml index cbf552a0..70a9ea47 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "MyoGen" -version = "0.1.1" +version = "0.2.0" description = "Surface and Intramuscular EMG simulation toolkit" readme = "README.md" requires-python = ">=3.12"