"""
Intramuscular Electromyography (iEMG) Simulation.
This module provides the main simulation framework for generating intramuscular
electromyography signals using needle electrodes. It integrates motor unit
simulation, electrode modeling, and signal generation with realistic noise.
"""
import logging
from copy import deepcopy
from typing import Optional
import numpy as np
import quantities as pq
from joblib import Parallel, delayed
from neo import AnalogSignal, Block, Segment
from tqdm import tqdm
from myogen import get_random_generator
from myogen.simulator.core.emg.electrodes import IntramuscularElectrodeArray
from myogen.simulator.core.muscle import Muscle
from myogen.utils.binning import bin_spike_trains
from myogen.utils.decorators import beartowertype
from myogen.utils.types import (
INTRAMUSCULAR_EMG__Block,
INTRAMUSCULAR_MUAP__Block,
Quantity__Hz,
Quantity__m_per_s,
Quantity__mm,
Quantity__s,
SPIKE_TRAIN__Block,
)
try:
import cupy as cp
HAS_CUPY = cp.cuda.runtime.getDeviceCount() > 0
except (ImportError, Exception):
HAS_CUPY = False
from .motor_unit_sim import MotorUnitSim
[docs]
@beartowertype
class IntramuscularEMG:
"""
Intramuscular Electromyography (iEMG) Simulation.
This class provides a comprehensive simulation framework for generating
intramuscular EMG signals detected by needle electrodes.
Parameters
----------
muscle_model : Muscle
Pre-computed muscle model (see :class:`myogen.simulator.Muscle`).
electrode_array : IntramuscularElectrodeArray
Intramuscular electrode array configuration to use for simulation (see :class:`myogen.simulator.IntramuscularElectrodeArray`).
sampling_frequency__Hz : Quantity__Hz, default=10240.0 * pq.Hz
Sampling frequency in Hz for EMG simulation.
Default is set to 10240 Hz as used by the Quattrocento (OT Bioelettronica, Turin, Italy) system.
spatial_resolution__mm : Quantity__mm, default=0.01 * pq.mm
Spatial resolution for fiber action potential calculation in mm.
Default is set to 0.01 mm.
endplate_center__percent : float, default=50
Percentage of muscle length where the endplate is located.
By default, the endplate is located at the center of the muscle (50% of the muscle length).
nmj_jitter__s : Quantity__s, default=35e-6 * pq.s
Standard deviation of neuromuscular junction jitter in seconds.
Default is set to 35e-6 s as determined by Konstantin et al. 2020 [1]_.
branch_cvs__m_per_s : tuple[Quantity__m_per_s, Quantity__m_per_s], default=(5.0 * pq.m / pq.s, 2.0 * pq.m / pq.s)
Conduction velocities for the two-layer model of the neuromuscular junction in m/s.
Default is set to (5.0, 2.0) m/s as determined by Konstantin et al. 2020 [1]_.
.. note::
The two-layer model is a simplification of the actual arborization pattern, but it is a good approximation for the purposes of this simulation.
Follows the implementation of Konstantin et al. 2020 [1]_.
MUs_to_simulate : list[int], optional
Indices of motor units to simulate. If None, all motor units are simulated.
Default is None. For computational efficiency, consider
simulating subsets for initial analysis.
Indices correspond to the recruitment order (0 is recruited first).
Attributes
----------
muaps__Block : INTRAMUSCULAR_MUAP__Block
Intramuscular MUAP shapes for the electrode array as a neo.Block. Available after simulate_muaps().
intramuscular_emg__Block : INTRAMUSCULAR_EMG__Block
Intramuscular EMG signals for the electrode array as a neo.Block. Available after simulate_intramuscular_emg().
noisy_intramuscular_emg__Block : INTRAMUSCULAR_EMG__Block
Noisy intramuscular EMG signals for the electrode array as a neo.Block. Available after add_noise().
spike_train__Block : SPIKE_TRAIN__Block
Spike train block used for EMG generation. Available after simulate_intramuscular_emg().
References
----------
.. [1] Konstantin, A., Yu, T., Le Carpentier, E., Aoustin, Y., Farina, D., 2020. Simulation of Motor Unit Action Potential Recordings From Intramuscular Multichannel Scanning Electrodes. IEEE Transactions on Biomedical Engineering 67, 2005–2014. https://doi.org/10.1109/TBME.2019.2953680
"""
[docs]
def __init__(
self,
muscle_model: Muscle,
electrode_array: IntramuscularElectrodeArray,
sampling_frequency__Hz: Quantity__Hz = 10240.0 * pq.Hz,
spatial_resolution__mm: Quantity__mm = 0.01 * pq.mm,
endplate_center__percent: float = 50,
nmj_jitter__s: Quantity__s = 35e-6 * pq.s,
branch_cvs__m_per_s: tuple[Quantity__m_per_s, Quantity__m_per_s] = (
5.0 * pq.m / pq.s,
2.0 * pq.m / pq.s,
),
MUs_to_simulate: list[int] | None = None,
):
# Immutable public arguments - never modify these
self.muscle_model = muscle_model
self.electrode_array = electrode_array
self.sampling_frequency__Hz = sampling_frequency__Hz
self.spatial_resolution__mm = spatial_resolution__mm
self.endplate_center__percent = endplate_center__percent
self.nmj_jitter__s = nmj_jitter__s
self.branch_cvs__m_per_s = branch_cvs__m_per_s
self.MUs_to_simulate = MUs_to_simulate
# Private copies for internal modifications (extract magnitudes)
self._muscle_model = muscle_model
self._electrode_array = electrode_array
self._sampling_frequency__Hz = float(sampling_frequency__Hz.rescale(pq.Hz).magnitude)
self._spatial_resolution__mm = float(spatial_resolution__mm.rescale(pq.mm).magnitude)
self._endplate_center__percent = endplate_center__percent
self._nmj_jitter__s = float(nmj_jitter__s.rescale(pq.s).magnitude)
self._branch_cvs__m_per_s = (
float(branch_cvs__m_per_s[0].rescale(pq.m / pq.s).magnitude),
float(branch_cvs__m_per_s[1].rescale(pq.m / pq.s).magnitude),
)
self._MUs_to_simulate = MUs_to_simulate
# Derived parameters - immutable public access
self.endplate_center__mm = self._muscle_model.length__mm * (
self._endplate_center__percent / 100.0
)
# Private copies for internal modifications
self._branch_cvs__mm_per_s: tuple[float, float] = (
self._branch_cvs__m_per_s[0] * 1000.0,
self._branch_cvs__m_per_s[1] * 1000.0,
)
# Extract magnitude for internal use (calculations expect floats)
length_mm = (
float(self._muscle_model.length__mm.rescale(pq.mm).magnitude)
if hasattr(self._muscle_model.length__mm, "magnitude")
else float(self._muscle_model.length__mm)
)
self._endplate_center__mm = length_mm * (self._endplate_center__percent / 100.0)
# Derived parameters - private for internal use
self._dt = 1.0 / self._sampling_frequency__Hz
self._dz = self._spatial_resolution__mm
self._n_motor_units = len(self._muscle_model.recruitment_thresholds)
# Motor unit selection
if self._MUs_to_simulate is None:
self._MUs_to_simulate = list(range(self._n_motor_units))
else:
self._MUs_to_simulate = self._MUs_to_simulate
# Motor unit simulations - private storage
self._motor_units: list[MotorUnitSim] = [] # List of motor unit simulators
self._muaps__Block: Optional[INTRAMUSCULAR_MUAP__Block] = None
self._max_muap_length: int = 0
# Simulation results - stored privately, accessed via properties
self._intramuscular_emg__Block: Optional[INTRAMUSCULAR_EMG__Block] = None
self._noisy_intramuscular_emg__Block: Optional[INTRAMUSCULAR_EMG__Block] = None
self._spike_train__Block: Optional[SPIKE_TRAIN__Block] = None
[docs]
def simulate_muaps(self, n_jobs: int = -2, verbose: bool = True) -> INTRAMUSCULAR_MUAP__Block:
"""
Simulate MUAPs for all electrode arrays using the provided muscle model.
This method generates intramuscular Motor Unit Action Potential (MUAP) templates
by simulating individual motor units with realistic neuromuscular junction
distributions and fiber action potential propagation.
Parameters
----------
n_jobs : int, optional
Number of parallel workers for motor unit processing. Default is -2.
- n_jobs=-1: Use all CPU cores
- n_jobs=-2: Use all cores except one (recommended, keeps system responsive)
- n_jobs=-3: Use all cores except two
- n_jobs=1: No parallelization
- n_jobs=N: Use exactly N cores
verbose : bool, default=True
If True, display progress bars. Set to False to disable.
Returns
-------
INTRAMUSCULAR_MUAP__Block
Intramuscular MUAP shapes for all motor units stored in a neo.Block.
Results are stored in the `muaps__Block` property after execution.
Notes
-----
This method must be called before simulate_intramuscular_emg(). The process
includes: (1) motor unit initialization, (2) neuromuscular junction simulation,
and (3) MUAP calculation with spatial filtering.
"""
self._initialize_motor_units(verbose=verbose)
self._simulate_neuromuscular_junctions(verbose=verbose)
return self._calculate_muaps(n_jobs=n_jobs, verbose=verbose)
def _initialize_motor_units(self, verbose: bool = True) -> None:
"""
Initialize individual motor unit simulators.
This method creates MotorUnitSim objects for each motor unit based on
the muscle model fiber assignments and properties.
"""
if not hasattr(self._muscle_model, "assignment") or self._muscle_model.assignment is None:
raise ValueError(
"Muscle model must have fiber assignments. Call muscle.assign_mfs2mns() first."
)
# Initialize list with None for all MUs (will be filled with simulators)
self._motor_units = [None] * self._n_motor_units
for mu_idx in tqdm(
range(self._n_motor_units),
desc="Creating motor unit simulators",
unit="Simulator",
disable=not verbose,
):
# Get fibers assigned to this motor unit
fiber_mask = self._muscle_model.assignment == mu_idx
if not np.any(fiber_mask):
continue
# Create motor unit simulator at the correct index
# Extract magnitudes from quantities for MotorUnitSim (expects floats/arrays)
def _extract_magnitude(val):
"""Helper to extract magnitude from Quantity or return value as-is."""
if hasattr(val, "magnitude"):
return val.magnitude
return val
self._motor_units[mu_idx] = MotorUnitSim(
muscle_fiber_centers__mm=_extract_magnitude(
self._muscle_model.muscle_fiber_centers__mm[fiber_mask]
),
muscle_length__mm=float(_extract_magnitude(self._muscle_model.length__mm)),
muscle_fiber_diameters__mm=_extract_magnitude(
self._muscle_model.muscle_fiber_diameters__mm[fiber_mask]
),
muscle_fiber_conduction_velocity__mm_per_s=_extract_magnitude(
self._muscle_model.muscle_fiber_conduction_velocities__mm_per_s[fiber_mask]
),
neuromuscular_junction_conduction_velocities__mm_per_s=list(
self._branch_cvs__mm_per_s
),
nominal_center__mm=_extract_magnitude(
self._muscle_model.innervation_center_positions__mm[mu_idx]
),
)
def _simulate_neuromuscular_junctions(self, verbose: bool = True) -> None:
"""
Simulate neuromuscular junction distributions for all motor units.
This implements the logic from s08_cl_init_muaps.m for generating
realistic NMJ branch patterns with size-dependent complexity.
"""
if not self._motor_units:
raise ValueError("Must call _initialize_motor_units() first")
n_branches = 1 + np.round(
np.log(
self._muscle_model.recruitment_thresholds
/ self._muscle_model.recruitment_thresholds[0]
)
)
for mu_idx, mu_sim in enumerate(
tqdm(self._motor_units, desc="Setting up NMJ distributions", unit="Simulator", disable=not verbose)
):
spread_factor = np.sum(self._muscle_model.recruitment_thresholds[:mu_idx]) / np.sum(
self._muscle_model.recruitment_thresholds
)
# Create NMJ distribution
# Branch spread increases with motor unit size
mu_sim.sim_nmj_branches_two_layers(
n_branches=int(n_branches[mu_idx]),
endplate_center=self._endplate_center__mm,
branches_z_std=1.5 + spread_factor * 4.0,
arborization_z_std=0.5 + spread_factor * 1.5,
)
def _calculate_muaps(self, n_jobs: int = -2, verbose: bool = True) -> INTRAMUSCULAR_MUAP__Block:
"""
Pre-calculate motor unit action potentials (MUAPs) using parallel processing.
Parameters
----------
n_jobs : int, optional
Number of parallel workers for motor unit processing. Default is -2.
- n_jobs=-1: Use all CPU cores
- n_jobs=-2: Use all cores except one (recommended, keeps system responsive)
- n_jobs=-3: Use all cores except two
- n_jobs=1: No parallelization
- n_jobs=N: Use exactly N cores
Returns
-------
INTRAMUSCULAR_MUAP__Block
Intramuscular MUAP shapes for all motor units stored in a neo.Block.
"""
if not self._motor_units:
raise ValueError("Must call _initialize_motor_units() first")
# Set default MUs to simulate
if self._MUs_to_simulate is None:
self._MUs_to_simulate = list(range(len(self._motor_units)))
# Helper function to process a single motor unit
def _process_single_mu(
mu_idx: int,
mu_sim: Optional[MotorUnitSim],
dt: float,
dz: float,
electrode_positions: np.ndarray,
) -> tuple[Optional[np.ndarray], int]:
"""
Process a single motor unit (calculate SFAP and MUAP).
Parameters
----------
mu_idx : int
Index of the motor unit to process.
mu_sim : Optional[MotorUnitSim]
Motor unit simulator object (None if MU has no fibers).
dt : float
Temporal resolution.
dz : float
Spatial resolution.
electrode_positions : np.ndarray
Electrode positions array.
Returns
-------
tuple[Optional[np.ndarray], int]
Tuple of (muap_array, muap_length) where muap_array is the MUAP signal
for this MU (or None if MU has no fibers) and muap_length is the length.
"""
try:
if mu_sim is None:
return None, 0
# Deep copy to avoid threading issues
mu_sim_copy = deepcopy(mu_sim)
# Calculate SFAPs
mu_sim_copy.calc_sfaps(
index=mu_idx,
dt=dt,
dz=dz,
electrode_positions=electrode_positions,
)
# Calculate MUAP (no jitter for templates)
muap = mu_sim_copy.calc_muap(jitter_std=0.0)
return muap, muap.shape[0]
except Exception as e:
raise RuntimeError(f"Failed to process MU {mu_idx}") from e
# Process only specified motor units in parallel
n_motor_units = len(self._motor_units)
n_mus_to_compute = len(self._MUs_to_simulate)
logging.info(
f"Processing {n_mus_to_compute}/{n_motor_units} motor units using parallel processing..."
)
# Parallel execution with progress bar
# Only compute MUs in the subset
results = {} # Use dict to map MU_index -> result
max_length = 0
with tqdm(total=n_mus_to_compute, desc="Computing MUAPs", unit="MU", disable=not verbose) as pbar:
for muap, muap_length in Parallel(
n_jobs=n_jobs,
return_as="generator",
verbose=0,
batch_size="auto",
)(
delayed(_process_single_mu)(
mu_idx,
self._motor_units[mu_idx],
self._dt,
self._dz,
self._electrode_array.pts,
)
for mu_idx in self._MUs_to_simulate
):
# Results come in order, map back to original MU index
mu_idx = self._MUs_to_simulate[len(results)]
results[mu_idx] = muap
max_length = max(max_length, muap_length)
pbar.update(1)
# Create neo Block structure
# Create segments for ALL MUs (maintaining index order)
# Non-computed MUs get empty signals
block = Block()
for mu_idx in range(n_motor_units):
muap = results.get(mu_idx)
if muap is None:
# Create empty segment for MUs with no fibers or not selected
block.segments.append(segment := Segment(name=f"MUAP_{mu_idx}"))
segment.analogsignals.append(
AnalogSignal(
np.zeros((1, len(self._electrode_array.pts))) * pq.dimensionless,
sampling_rate=self._sampling_frequency__Hz * pq.Hz,
)
)
else:
block.segments.append(segment := Segment(name=f"MUAP_{mu_idx}"))
segment.analogsignals.append(
AnalogSignal(
muap * pq.dimensionless,
sampling_rate=self._sampling_frequency__Hz * pq.Hz,
)
)
self._muaps__Block = block
return self._muaps__Block
def _analyze_detectable_motor_units(self, verbose: bool = True) -> tuple[np.ndarray, list[int]]:
"""
Analyze which motor units are detectable by the electrode.
This implements s11_cl_get_detectable_mus.m logic for determining
motor unit visibility based on signal-to-noise ratio and contribution.
Returns
-------
tuple[np.ndarray, List[int]]
Boolean array of detectable motor units and their indices
"""
if self._muaps__Block is None:
raise ValueError("Must call simulate_muaps() first")
if verbose:
print("Analyzing motor unit detectability...")
detectable = np.zeros(len(self._motor_units), dtype=bool)
# Prominence criterion: MUAP amplitude vs noise
over_noise_threshold = 2.0 # 2x noise level
for i, mu_sim in enumerate(self._motor_units):
# Get peak MUAP amplitude across all channels
muap_data = self._muaps__Block.segments[i].analogsignals[0].magnitude
muap_amplitudes = np.max(np.abs(muap_data), axis=0)
max_amplitude = np.max(muap_amplitudes)
# Check if MUAP is prominent enough above noise
# Use a simple noise threshold estimate
all_muaps = np.array(
[seg.analogsignals[0].magnitude for seg in self._muaps__Block.segments]
)
noise_estimate = np.std(all_muaps) * 0.1 # Simple noise estimate
is_prominent = max_amplitude > over_noise_threshold * noise_estimate
# Additional criterion: contribution to total signal variance
relative_size = (i + 1) / len(self._motor_units)
min_contribution = 0.05 # Minimum 5% contribution
contributes_enough = relative_size > min_contribution
detectable[i] = is_prominent and contributes_enough
detectable_indices = [i for i, det in enumerate(detectable) if det]
if verbose:
print(f"Found {np.sum(detectable)} detectable motor units out of {len(self._motor_units)}")
return detectable, detectable_indices
[docs]
def simulate_intramuscular_emg(
self,
spike_train__Block: SPIKE_TRAIN__Block,
verbose: bool = True,
) -> INTRAMUSCULAR_EMG__Block:
"""
Generate intramuscular EMG signals using the provided spike train block.
This method convolves the pre-computed MUAP templates with spike trains
to synthesize realistic intramuscular EMG signals. The process includes temporal
resampling and supports both CPU and GPU acceleration for efficient computation.
Parameters
----------
spike_train__Block : SPIKE_TRAIN__Block
Block containing spike trains organized as segments (pools) with spiketrains.
verbose : bool, default=True
If True, display progress bars. Set to False to disable.
Returns
-------
INTRAMUSCULAR_EMG__Block
Intramuscular EMG signals for the electrode array stored in a neo.Block.
Results are stored in the `intramuscular_emg__Block` property after execution.
Raises
------
ValueError
If MUAP templates have not been generated. Call simulate_muaps() first.
"""
if self._muaps__Block is None:
raise ValueError("MUAP templates have not been generated. Call simulate_muaps() first.")
# Store spike train data privately
self._spike_train__Block = spike_train__Block
# Handle MUs to simulate
if self._MUs_to_simulate is None:
MUs_to_simulate = set(
range(len(self._muscle_model.resulting_number_of_innervated_fibers))
)
else:
MUs_to_simulate = set(self._MUs_to_simulate)
# Extract MUAP data from Block and pad to same length
muap_data_list = [seg.analogsignals[0].magnitude for seg in self._muaps__Block.segments]
# Find the maximum length among all MUAPs
max_length = max(muap.shape[0] for muap in muap_data_list)
n_electrodes = muap_data_list[0].shape[1]
# Pad all MUAPs to the same length (centered, pad both sides)
padded_muaps = []
for muap in muap_data_list:
pad_total = max_length - muap.shape[0]
if pad_total > 0:
pad_left = pad_total // 2
pad_right = pad_total - pad_left
pad_width = ((pad_left, pad_right), (0, 0))
padded_muap = np.pad(muap, pad_width, mode="constant", constant_values=0)
else:
padded_muap = muap
padded_muaps.append(padded_muap)
muap_array = np.array(padded_muaps)
# Extract timestep from the first spike train
first_spiketrain = spike_train__Block.segments[0].spiketrains[0]
spiketrain_timestep__ms = first_spiketrain.sampling_period.rescale("ms")
target_length = int(
np.round(
muap_array.shape[1]
/ self._sampling_frequency__Hz
* 1
/ (spiketrain_timestep__ms.rescale("s").magnitude)
)
)
muap_shapes = np.zeros((muap_array.shape[0], muap_array.shape[2], target_length))
for muap_nr in range(muap_shapes.shape[0]):
for electrode_nr in range(muap_shapes.shape[1]):
muap_shapes[muap_nr, electrode_nr] = np.interp(
np.linspace(
0,
muap_array.shape[1] / self._sampling_frequency__Hz,
target_length,
endpoint=False,
),
np.arange(
0,
muap_array.shape[1] / self._sampling_frequency__Hz,
1 / self._sampling_frequency__Hz,
),
muap_array[muap_nr, :, electrode_nr],
)
# Convert spike train block to numpy arrays
n_pools = len(spike_train__Block.segments)
n_neurons = len(spike_train__Block.segments[0].spiketrains)
n_electrodes = muap_shapes.shape[1]
# Bin each pool's spike trains into a boolean occupancy array.
spike_trains = np.array(
[
bin_spike_trains(segment.spiketrains, bin_size=spiketrain_timestep__ms)
for segment in spike_train__Block.segments
]
)
# Create active neuron indices (all neurons are active in each pool for spike train block)
active_neuron_indices = [list(range(n_neurons)) for _ in range(n_pools)]
# Initialize result array
sample_conv = np.convolve(
spike_trains[0, 0],
muap_shapes[0, 0],
mode="same",
)
intramuscular_emg = np.zeros((n_pools, n_electrodes, len(sample_conv)))
# Normalize MUAP shapes before convolution
# Note: Unlike surface EMG, intramuscular MUAP amplitudes from biophysical
# calculations are in arbitrary units and must be normalized to prevent
# numerical overflow during convolution. Final EMG amplitudes are determined
# by the spike train convolution, not the raw MUAP amplitudes.
max_muap_amplitude = np.max(np.abs(muap_shapes))
if max_muap_amplitude > 0:
muap_shapes /= max_muap_amplitude
# Perform convolution for each pool using GPU acceleration if available
if HAS_CUPY:
# Use GPU acceleration with CuPy
spike_gpu = cp.asarray(spike_trains)
muap_gpu = cp.asarray(muap_shapes)
intramuscular_emg_gpu = cp.zeros((n_pools, n_electrodes, len(sample_conv)))
for pool_idx in tqdm(
range(n_pools),
desc="Intramuscular EMG (GPU)",
unit="pool",
disable=not verbose,
):
pool_active_neurons = set(active_neuron_indices[pool_idx])
for e_idx in range(n_electrodes):
# Process all active MUs on GPU
convolutions = []
for mu_idx in MUs_to_simulate.intersection(pool_active_neurons):
# Use mu_idx directly since muap_shapes now contains all MUs
if mu_idx < muap_gpu.shape[0]:
conv = cp.convolve(
spike_gpu[pool_idx, mu_idx],
muap_gpu[mu_idx, e_idx],
mode="same",
)
convolutions.append(conv)
convolutions = cp.array(convolutions) if convolutions else cp.array([])
# Sum across MUAPs on GPU
if len(convolutions) > 0:
intramuscular_emg_gpu[pool_idx, e_idx] = cp.sum(convolutions, axis=0)
# Transfer results back to CPU
intramuscular_emg = cp.asnumpy(intramuscular_emg_gpu)
else:
# Fallback to CPU computation with NumPy
for pool_idx in tqdm(
range(n_pools),
desc="Intramuscular EMG (CPU)",
unit="pool",
disable=not verbose,
):
pool_active_neurons = set(active_neuron_indices[pool_idx])
for e_idx in range(n_electrodes):
# Process all active MUs
convolutions = []
for mu_idx in MUs_to_simulate.intersection(pool_active_neurons):
# Use mu_idx directly since muap_shapes now contains all MUs
if mu_idx < muap_shapes.shape[0]:
conv = np.convolve(
spike_trains[pool_idx, mu_idx],
muap_shapes[mu_idx, e_idx],
mode="same",
)
convolutions.append(conv)
if convolutions:
intramuscular_emg[pool_idx, e_idx] = np.sum(convolutions, axis=0)
# Temporal resampling
intramuscular_emg_resampled = np.zeros(
(
n_pools,
n_electrodes,
int(
intramuscular_emg.shape[-1]
* spiketrain_timestep__ms.rescale("s").magnitude
* self._sampling_frequency__Hz
),
)
)
for pool_idx in range(n_pools):
for e_idx in range(n_electrodes):
intramuscular_emg_resampled[pool_idx, e_idx] = np.interp(
x=np.arange(
start=0,
stop=intramuscular_emg.shape[-1]
* spiketrain_timestep__ms.rescale("s").magnitude,
step=1 / self._sampling_frequency__Hz,
),
xp=np.arange(
start=0,
stop=intramuscular_emg.shape[-1]
* spiketrain_timestep__ms.rescale("s").magnitude,
step=spiketrain_timestep__ms.rescale("s").magnitude,
),
fp=intramuscular_emg[pool_idx, e_idx],
)
# Create neo Block structure
block = Block()
# Create segments for each motor unit pool
for pool_idx in range(n_pools):
segment = Segment(name=f"Pool_{pool_idx}")
block.segments.append(segment)
# Create AnalogSignal for this pool's EMG data
segment.analogsignals.append(
AnalogSignal(
intramuscular_emg_resampled[pool_idx].T * pq.dimensionless,
t_start=0 * pq.ms,
sampling_rate=self._sampling_frequency__Hz * pq.Hz,
)
)
# Store results privately
self._intramuscular_emg__Block = block
return block
[docs]
def add_noise(
self,
snr__dB: float,
noise_type: str = "gaussian",
*,
spectral_slope: float = -0.5,
excess_kurtosis: float = 3.0,
powerline_hz: float = 50.0,
powerline_amplitude: float = 0.1,
powerline_harmonic_ratios: Optional[list[float]] = None,
powerline_frequency_drift_hz: float = 0.3,
powerline_amplitude_modulation_depth: float = 0.15,
peak_hz: float = 1000.0,
analog_hpf_hz: float = 10.0,
baseline_drift_rms_uv: float = 0.0,
baseline_drift_alpha: float = 1.75,
baseline_drift_low_hz: float | None = None,
baseline_drift_high_hz: float = 1.0,
) -> INTRAMUSCULAR_EMG__Block:
"""
Add noise to the electrode array.
Two noise models are available:
* ``"gaussian"`` – white Gaussian noise (legacy default). Each
electrode channel gets independent normal noise scaled to hit
the requested per-channel SNR.
* ``"realistic"`` – spectrally colored noise calibrated against
real bipolar fine-wire iEMG recordings (TEP / Synergy studies).
1/f-like base, mid-band spectral emphasis from
electrode–amplifier bandwidth, heavy tails from cross-talk
artifacts, and additive 50/60 Hz powerline interference with
harmonics. See :mod:`myogen.utils.emg_noise` for the math.
Per-channel SNR is preserved across both modes: each electrode's
noise RMS is computed from that channel's own signal RMS so
electrodes with different amplitudes get appropriately scaled
noise.
Parameters
----------
snr__dB : float
Signal-to-noise ratio in dB. Higher values result in cleaner
signals. Typical intramuscular EMG has SNR ranging from
15–50 dB. Applied independently to each electrode channel.
noise_type : {"gaussian", "realistic"}, default="gaussian"
Noise model to use. ``"realistic"`` activates the colored
noise pipeline; the remaining keyword-only parameters then
apply.
spectral_slope : float, default=-0.5
PSD slope in log–log space for the colored-noise base.
``0`` = white, ``-1`` = pink. Real iEMG: -0.4 to -0.8.
Ignored when ``noise_type="gaussian"``.
excess_kurtosis : float, default=3.0
Target excess kurtosis (``0`` = Gaussian). Controlled
isometric iEMG: 1–6. Dynamic tasks: higher.
Ignored when ``noise_type="gaussian"``.
powerline_hz : float, default=50.0
Powerline interference frequency. Use ``60.0`` for North
America, ``0.0`` to disable.
Ignored when ``noise_type="gaussian"``.
powerline_amplitude : float, default=0.1
Powerline fundamental amplitude as a fraction of noise RMS.
Set to ``0`` (or ``powerline_hz=0``) to disable.
Ignored when ``noise_type="gaussian"``.
powerline_harmonic_ratios : list of float, optional
Per-harmonic amplitude ratios relative to the fundamental.
``None`` uses the default ``[1.0, 0.5, 0.3, 0.15, 0.08]``
(fundamental + 4 harmonics).
Ignored when ``noise_type="gaussian"``.
powerline_frequency_drift_hz : float, default=0.3
Standard deviation (Hz) of the slow random walk of the
mains instantaneous frequency. Broadens each line peak
from a delta into a ~2-5 Hz FWHM bump (typical of real
recordings). Set to 0 for a pure-tone powerline.
Ignored when ``noise_type="gaussian"``.
powerline_amplitude_modulation_depth : float, default=0.15
Fractional AM depth (±15% by default) applied to each
powerline harmonic — adds narrow sidebands within ±2 Hz
of every line. Set to 0 to disable.
Ignored when ``noise_type="gaussian"``.
peak_hz : float, default=1000.0
Center frequency of the mid-band spectral emphasis from
electrode–amplifier bandwidth interaction.
Ignored when ``noise_type="gaussian"``.
baseline_drift_rms_uv : float, default=0.0
Target RMS (post-HPF) of a band-limited 1/f^α baseline
drift representing electrode/interface noise (Huigen 2002,
Gondran 1996). Same units as the EMG block. Set to 0 (the
default) to disable, preserving legacy behaviour.
The spectral form is paper-constrained; the amplitude
defaults are *not* validated for intramuscular EMG —
calibrate against real recordings via
:func:`myogen.utils.calibrate_baseline_drift_profile`.
Broadband movement artifacts (0–20 Hz, De Luca 2010) are
a separate phenomenon out of scope here.
Ignored when ``noise_type="gaussian"``.
baseline_drift_alpha : float, default=1.75
Drift PSD slope α (PSD ∝ 1/f^α). Midpoint of the [1.5, 2.0]
electrode-noise regime. Must be > 0.
Ignored when ``noise_type="gaussian"``.
baseline_drift_low_hz : float or None, default=None
Lower edge of the drift band, in Hz. ``None`` resolves
to the lowest nonzero FFT bin available for the
simulation length.
Ignored when ``noise_type="gaussian"``.
baseline_drift_high_hz : float, default=1.0
Upper edge of the drift band, in Hz. Default keeps the
knob scoped to sub-1 Hz baseline wander.
Ignored when ``noise_type="gaussian"``.
Returns
-------
INTRAMUSCULAR_EMG__Block
Noisy intramuscular EMG signals for the electrode array as a
``neo.Block``. Results are also stored on
``noisy_intramuscular_emg__Block``.
Raises
------
ValueError
If intramuscular EMG has not been simulated (call
:meth:`simulate_intramuscular_emg` first) or ``noise_type``
is unrecognized.
"""
if self._intramuscular_emg__Block is None:
raise ValueError(
"Intramuscular EMG has not been simulated. Call simulate_intramuscular_emg() first."
)
noise_kind = noise_type.lower()
if noise_kind not in ("gaussian", "realistic"):
raise ValueError(f"Unsupported noise type: {noise_type}")
noisy_block = Block()
rng_global = get_random_generator()
for pool_idx, segment in enumerate(self._intramuscular_emg__Block.segments):
noisy_segment = Segment(name=f"Pool_{pool_idx}")
noisy_block.segments.append(noisy_segment)
# Get the EMG signal data
emg_signal = segment.analogsignals[0]
emg_array = emg_signal.magnitude # Shape: (time, n_electrodes)
# Calculate signal power PER CHANNEL (per electrode)
# Mean along time axis (axis=0) gives power per electrode
signal_power_per_channel = np.mean(emg_array**2, axis=0) # Shape: (n_electrodes,)
# Calculate noise power per channel
snr_linear = 10 ** (snr__dB / 10)
noise_power_per_channel = signal_power_per_channel / snr_linear
noise_std_per_channel = np.sqrt(noise_power_per_channel) # Shape: (n_electrodes,)
if noise_kind == "gaussian":
# Generate standard normal noise, then scale per channel
noise = rng_global.normal(loc=0.0, scale=1.0, size=emg_array.shape)
noise = noise * noise_std_per_channel[np.newaxis, :]
else:
# Colored noise (signal-shape preserving SNR semantics).
# Per-channel call so each electrode hits the requested SNR
# individually, matching the legacy gaussian path.
from myogen.utils.emg_noise import generate_realistic_noise
n_samples, n_channels = emg_array.shape
fs_hz = float(self._sampling_frequency__Hz)
noise = np.empty_like(emg_array)
for ch in range(n_channels):
ch_rng = np.random.default_rng(rng_global.integers(0, 2**31 - 1))
noise[:, ch] = generate_realistic_noise(
n_samples,
fs_hz,
noise_rms=float(noise_std_per_channel[ch]),
spectral_slope=spectral_slope,
excess_kurtosis=excess_kurtosis,
powerline_hz=powerline_hz,
powerline_amplitude=powerline_amplitude,
powerline_harmonic_ratios=powerline_harmonic_ratios,
powerline_frequency_drift_hz=powerline_frequency_drift_hz,
powerline_amplitude_modulation_depth=powerline_amplitude_modulation_depth,
peak_hz=peak_hz,
analog_hpf_hz=analog_hpf_hz,
baseline_drift_rms_uv=baseline_drift_rms_uv,
baseline_drift_alpha=baseline_drift_alpha,
baseline_drift_low_hz=baseline_drift_low_hz,
baseline_drift_high_hz=baseline_drift_high_hz,
rng=ch_rng,
)
# Add noise
noisy_emg = emg_array + noise
# Create new AnalogSignal with noise
noisy_segment.analogsignals.append(
AnalogSignal(
noisy_emg * emg_signal.units,
t_start=emg_signal.t_start,
sampling_rate=emg_signal.sampling_rate,
)
)
# Store results privately
self._noisy_intramuscular_emg__Block = noisy_block
return noisy_block
# Property accessors for computed results
@property
def muaps__Block(self) -> INTRAMUSCULAR_MUAP__Block:
"""
Intramuscular MUAP shapes for the electrode array.
Returns
-------
INTRAMUSCULAR_MUAP__Block
Intramuscular MUAP templates for the electrode array as a neo.Block.
Raises
------
ValueError
If MUAP templates have not been computed yet.
"""
if self._muaps__Block is None:
raise ValueError("MUAP templates not computed. Call simulate_muaps() first.")
return self._muaps__Block
@property
def intramuscular_emg__Block(self) -> INTRAMUSCULAR_EMG__Block:
"""
Intramuscular EMG signals for the electrode array.
Returns
-------
INTRAMUSCULAR_EMG__Block
Intramuscular EMG signals for the electrode array as a neo.Block.
Raises
------
ValueError
If intramuscular EMG has not been computed yet.
"""
if self._intramuscular_emg__Block is None:
raise ValueError(
"Intramuscular EMG signals not computed. Call simulate_intramuscular_emg() first."
)
return self._intramuscular_emg__Block
@property
def noisy_intramuscular_emg__Block(self) -> INTRAMUSCULAR_EMG__Block:
"""
Noisy intramuscular EMG signals for the electrode array.
Returns
-------
INTRAMUSCULAR_EMG__Block
Noisy intramuscular EMG signals for the electrode array as a neo.Block.
Raises
------
ValueError
If noisy intramuscular EMG has not been computed yet.
"""
if self._noisy_intramuscular_emg__Block is None:
raise ValueError(
"Noisy intramuscular EMG signals not computed. Call add_noise() first."
)
return self._noisy_intramuscular_emg__Block
@property
def spike_train__Block(self) -> SPIKE_TRAIN__Block:
"""
Spike train block used for EMG generation.
Returns
-------
SPIKE_TRAIN__Block
The spike train block used in the simulation.
Raises
------
ValueError
If spike train block has not been set yet.
"""
if self._spike_train__Block is None:
raise ValueError("Spike train block not set. Call simulate_intramuscular_emg() first.")
return self._spike_train__Block