"""
Intramuscular Electromyography (iEMG) Simulation.
This module provides the main simulation framework for generating intramuscular
electromyography signals using needle electrodes. It integrates motor unit
simulation, electrode modeling, and signal generation with realistic noise.
"""
import logging
import warnings
from copy import deepcopy
from typing import Optional
try:
import elephant
import elephant.utils
HAS_ELEPHANT = True
except ImportError:
HAS_ELEPHANT = False
elephant = None # type: ignore
import numpy as np
import quantities as pq
from joblib import Parallel, delayed
from neo import AnalogSignal, Block, Segment
from tqdm import tqdm
from myogen import RANDOM_GENERATOR
from myogen.simulator.core.emg.electrodes import IntramuscularElectrodeArray
from myogen.simulator.core.muscle import Muscle
from myogen.utils.decorators import beartowertype
from myogen.utils.types import (
INTRAMUSCULAR_EMG__Block,
INTRAMUSCULAR_MUAP__Block,
Quantity__Hz,
Quantity__m_per_s,
Quantity__mm,
Quantity__s,
SPIKE_TRAIN__Block,
)
try:
import cupy as cp
HAS_CUPY = True
except ImportError:
HAS_CUPY = False
from .motor_unit_sim import MotorUnitSim
# Suppress warnings for cleaner output
warnings.filterwarnings("ignore")
[docs]
@beartowertype
class IntramuscularEMG:
"""
Intramuscular Electromyography (iEMG) Simulation.
This class provides a comprehensive simulation framework for generating
intramuscular EMG signals detected by needle electrodes.
Parameters
----------
muscle_model : Muscle
Pre-computed muscle model (see :class:`myogen.simulator.Muscle`).
electrode_array : IntramuscularElectrodeArray
Intramuscular electrode array configuration to use for simulation (see :class:`myogen.simulator.IntramuscularElectrodeArray`).
sampling_frequency__Hz : Quantity__Hz, default=10240.0 * pq.Hz
Sampling frequency in Hz for EMG simulation.
Default is set to 10240 Hz as used by the Quattrocento (OT Bioelettronica, Turin, Italy) system.
spatial_resolution__mm : Quantity__mm, default=0.01 * pq.mm
Spatial resolution for fiber action potential calculation in mm.
Default is set to 0.01 mm.
endplate_center__percent : float, default=50
Percentage of muscle length where the endplate is located.
By default, the endplate is located at the center of the muscle (50% of the muscle length).
nmj_jitter__s : Quantity__s, default=35e-6 * pq.s
Standard deviation of neuromuscular junction jitter in seconds.
Default is set to 35e-6 s as determined by Konstantin et al. 2020 [1]_.
branch_cvs__m_per_s : tuple[Quantity__m_per_s, Quantity__m_per_s], default=(5.0 * pq.m / pq.s, 2.0 * pq.m / pq.s)
Conduction velocities for the two-layer model of the neuromuscular junction in m/s.
Default is set to (5.0, 2.0) m/s as determined by Konstantin et al. 2020 [1]_.
.. note::
The two-layer model is a simplification of the actual arborization pattern, but it is a good approximation for the purposes of this simulation.
Follows the implementation of Kontos et al. 2020 [1]_.
MUs_to_simulate : list[int], optional
Indices of motor units to simulate. If None, all motor units are simulated.
Default is None. For computational efficiency, consider
simulating subsets for initial analysis.
Indices correspond to the recruitment order (0 is recruited first).
Attributes
----------
muaps__Block : INTRAMUSCULAR_MUAP__Block
Intramuscular MUAP shapes for the electrode array as a neo.Block. Available after simulate_muaps().
intramuscular_emg__Block : INTRAMUSCULAR_EMG__Block
Intramuscular EMG signals for the electrode array as a neo.Block. Available after simulate_intramuscular_emg().
noisy_intramuscular_emg__Block : INTRAMUSCULAR_EMG__Block
Noisy intramuscular EMG signals for the electrode array as a neo.Block. Available after add_noise().
spike_train__Block : SPIKE_TRAIN__Block
Spike train block used for EMG generation. Available after simulate_intramuscular_emg().
References
----------
.. [1] Konstantin, A., Yu, T., Le Carpentier, E., Aoustin, Y., Farina, D., 2020. Simulation of Motor Unit Action Potential Recordings From Intramuscular Multichannel Scanning Electrodes. IEEE Transactions on Biomedical Engineering 67, 2005–2014. https://doi.org/10.1109/TBME.2019.2953680
"""
[docs]
def __init__(
self,
muscle_model: Muscle,
electrode_array: IntramuscularElectrodeArray,
sampling_frequency__Hz: Quantity__Hz = 10240.0 * pq.Hz,
spatial_resolution__mm: Quantity__mm = 0.01 * pq.mm,
endplate_center__percent: float = 50,
nmj_jitter__s: Quantity__s = 35e-6 * pq.s,
branch_cvs__m_per_s: tuple[Quantity__m_per_s, Quantity__m_per_s] = (
5.0 * pq.m / pq.s,
2.0 * pq.m / pq.s,
),
MUs_to_simulate: list[int] | None = None,
):
# Immutable public arguments - never modify these
self.muscle_model = muscle_model
self.electrode_array = electrode_array
self.sampling_frequency__Hz = sampling_frequency__Hz
self.spatial_resolution__mm = spatial_resolution__mm
self.endplate_center__percent = endplate_center__percent
self.nmj_jitter__s = nmj_jitter__s
self.branch_cvs__m_per_s = branch_cvs__m_per_s
self.MUs_to_simulate = MUs_to_simulate
# Private copies for internal modifications (extract magnitudes)
self._muscle_model = muscle_model
self._electrode_array = electrode_array
self._sampling_frequency__Hz = float(sampling_frequency__Hz.rescale(pq.Hz).magnitude)
self._spatial_resolution__mm = float(spatial_resolution__mm.rescale(pq.mm).magnitude)
self._endplate_center__percent = endplate_center__percent
self._nmj_jitter__s = float(nmj_jitter__s.rescale(pq.s).magnitude)
self._branch_cvs__m_per_s = (
float(branch_cvs__m_per_s[0].rescale(pq.m / pq.s).magnitude),
float(branch_cvs__m_per_s[1].rescale(pq.m / pq.s).magnitude),
)
self._MUs_to_simulate = MUs_to_simulate
# Derived parameters - immutable public access
self.branch_cvs__mm_per_s = (
float(branch_cvs__m_per_s[0].rescale(pq.mm / pq.s).magnitude),
float(branch_cvs__m_per_s[1].rescale(pq.mm / pq.s).magnitude),
)
self.endplate_center__mm = self._muscle_model.length__mm * (
self._endplate_center__percent / 100.0
)
# Private copies for internal modifications
self._branch_cvs__mm_per_s: tuple[float, float] = (
self._branch_cvs__m_per_s[0] * 1000.0,
self._branch_cvs__m_per_s[1] * 1000.0,
)
# Extract magnitude for internal use (calculations expect floats)
length_mm = (
float(self._muscle_model.length__mm.rescale(pq.mm).magnitude)
if hasattr(self._muscle_model.length__mm, "magnitude")
else float(self._muscle_model.length__mm)
)
self._endplate_center__mm = length_mm * (self._endplate_center__percent / 100.0)
# Derived parameters - private for internal use
self._dt = 1.0 / self._sampling_frequency__Hz
self._dz = self._spatial_resolution__mm
self._n_motor_units = len(self._muscle_model.recruitment_thresholds)
# Motor unit selection
if self._MUs_to_simulate is None:
self._MUs_to_simulate = list(range(self._n_motor_units))
else:
self._MUs_to_simulate = self._MUs_to_simulate
# Motor unit simulations - private storage
self._motor_units: list[MotorUnitSim] = [] # List of motor unit simulators
self._muaps__Block: Optional[INTRAMUSCULAR_MUAP__Block] = None
self._max_muap_length: int = 0
# Simulation results - stored privately, accessed via properties
self._intramuscular_emg__Block: Optional[INTRAMUSCULAR_EMG__Block] = None
self._noisy_intramuscular_emg__Block: Optional[INTRAMUSCULAR_EMG__Block] = None
self._spike_train__Block: Optional[SPIKE_TRAIN__Block] = None
[docs]
def simulate_muaps(self, n_jobs: int = -2) -> INTRAMUSCULAR_MUAP__Block:
"""
Simulate MUAPs for all electrode arrays using the provided muscle model.
This method generates intramuscular Motor Unit Action Potential (MUAP) templates
by simulating individual motor units with realistic neuromuscular junction
distributions and fiber action potential propagation.
Parameters
----------
n_jobs : int, optional
Number of parallel workers for motor unit processing. Default is -2.
- n_jobs=-1: Use all CPU cores
- n_jobs=-2: Use all cores except one (recommended, keeps system responsive)
- n_jobs=-3: Use all cores except two
- n_jobs=1: No parallelization
- n_jobs=N: Use exactly N cores
Returns
-------
INTRAMUSCULAR_MUAP_SHAPE__TENSOR
Intramuscular MUAP shapes for all electrode arrays.
Results are stored in the `muaps` property after execution.
Notes
-----
This method must be called before simulate_intramuscular_emg(). The process
includes: (1) motor unit initialization, (2) neuromuscular junction simulation,
and (3) MUAP calculation with spatial filtering.
"""
self._initialize_motor_units()
self._simulate_neuromuscular_junctions()
return self._calculate_muaps(n_jobs=n_jobs)
def _initialize_motor_units(self) -> None:
"""
Initialize individual motor unit simulators.
This method creates MotorUnitSim objects for each motor unit based on
the muscle model fiber assignments and properties.
"""
if not hasattr(self._muscle_model, "assignment") or self._muscle_model.assignment is None:
raise ValueError(
"Muscle model must have fiber assignments. Call muscle.assign_mfs2mns() first."
)
# Initialize list with None for all MUs (will be filled with simulators)
self._motor_units = [None] * self._n_motor_units
for mu_idx in tqdm(
range(self._n_motor_units),
desc="Creating motor unit simulators",
unit="Simulator",
):
# Get fibers assigned to this motor unit
fiber_mask = self._muscle_model.assignment == mu_idx
if not np.any(fiber_mask):
continue
# Create motor unit simulator at the correct index
# Extract magnitudes from quantities for MotorUnitSim (expects floats/arrays)
def _extract_magnitude(val):
"""Helper to extract magnitude from Quantity or return value as-is."""
if hasattr(val, "magnitude"):
return val.magnitude
return val
self._motor_units[mu_idx] = MotorUnitSim(
muscle_fiber_centers__mm=_extract_magnitude(
self._muscle_model.muscle_fiber_centers__mm[fiber_mask]
),
muscle_length__mm=float(_extract_magnitude(self._muscle_model.length__mm)),
muscle_fiber_diameters__mm=_extract_magnitude(
self._muscle_model.muscle_fiber_diameters__mm[fiber_mask]
),
muscle_fiber_conduction_velocity__mm_per_s=_extract_magnitude(
self._muscle_model.muscle_fiber_conduction_velocities__mm_per_s[fiber_mask]
),
neuromuscular_junction_conduction_velocities__mm_per_s=list(
self._branch_cvs__mm_per_s
),
nominal_center__mm=_extract_magnitude(
self._muscle_model.innervation_center_positions__mm[mu_idx]
),
)
def _simulate_neuromuscular_junctions(self) -> None:
"""
Simulate neuromuscular junction distributions for all motor units.
This implements the logic from s08_cl_init_muaps.m for generating
realistic NMJ branch patterns with size-dependent complexity.
"""
if not self._motor_units:
raise ValueError("Must call _initialize_motor_units() first")
n_branches = 1 + np.round(
np.log(
self._muscle_model.recruitment_thresholds
/ self._muscle_model.recruitment_thresholds[0]
)
)
for mu_idx, mu_sim in enumerate(
tqdm(self._motor_units, desc="Setting up NMJ distributions", unit="Simulator")
):
spread_factor = np.sum(self._muscle_model.recruitment_thresholds[:mu_idx]) / np.sum(
self._muscle_model.recruitment_thresholds
)
# Create NMJ distribution
# Branch spread increases with motor unit size
mu_sim.sim_nmj_branches_two_layers(
n_branches=int(n_branches[mu_idx]),
endplate_center=self._endplate_center__mm,
branches_z_std=1.5 + spread_factor * 4.0,
arborization_z_std=0.5 + spread_factor * 1.5,
)
def _calculate_muaps(self, n_jobs: int = -2) -> INTRAMUSCULAR_MUAP__Block:
"""
Pre-calculate motor unit action potentials (MUAPs) using parallel processing.
Parameters
----------
n_jobs : int, optional
Number of parallel workers for motor unit processing. Default is -2.
- n_jobs=-1: Use all CPU cores
- n_jobs=-2: Use all cores except one (recommended, keeps system responsive)
- n_jobs=-3: Use all cores except two
- n_jobs=1: No parallelization
- n_jobs=N: Use exactly N cores
Returns
-------
INTRAMUSCULAR_MUAP_SHAPE__TENSOR
Intramuscular MUAP shapes for all electrode arrays.
"""
if not self._motor_units:
raise ValueError("Must call _initialize_motor_units() first")
# Set default MUs to simulate
if self._MUs_to_simulate is None:
self._MUs_to_simulate = list(range(len(self._motor_units)))
# Helper function to process a single motor unit
def _process_single_mu(
mu_idx: int,
mu_sim: Optional[MotorUnitSim],
dt: float,
dz: float,
electrode_positions: np.ndarray,
) -> tuple[Optional[np.ndarray], int]:
"""
Process a single motor unit (calculate SFAP and MUAP).
Parameters
----------
mu_idx : int
Index of the motor unit to process.
mu_sim : Optional[MotorUnitSim]
Motor unit simulator object (None if MU has no fibers).
dt : float
Temporal resolution.
dz : float
Spatial resolution.
electrode_positions : np.ndarray
Electrode positions array.
Returns
-------
tuple[Optional[np.ndarray], int]
Tuple of (muap_array, muap_length) where muap_array is the MUAP signal
for this MU (or None if MU has no fibers) and muap_length is the length.
"""
try:
if mu_sim is None:
return None, 0
# Deep copy to avoid threading issues
mu_sim_copy = deepcopy(mu_sim)
# Calculate SFAPs
mu_sim_copy.calc_sfaps(
index=mu_idx,
dt=dt,
dz=dz,
electrode_positions=electrode_positions,
)
# Calculate MUAP (no jitter for templates)
muap = mu_sim_copy.calc_muap(jitter_std=0.0)
return muap, muap.shape[0]
except Exception as e:
# Log error and return None to avoid crashing entire parallel job
logging.error(f"Failed to process MU {mu_idx}: {e}")
return None, 0
# Process only specified motor units in parallel
n_motor_units = len(self._motor_units)
n_mus_to_compute = len(self._MUs_to_simulate)
logging.info(
f"Processing {n_mus_to_compute}/{n_motor_units} motor units using parallel processing..."
)
# Parallel execution with progress bar
# Only compute MUs in the subset
results = {} # Use dict to map MU_index -> result
max_length = 0
with tqdm(total=n_mus_to_compute, desc="Computing MUAPs", unit="MU") as pbar:
for muap, muap_length in Parallel(
n_jobs=n_jobs,
return_as="generator",
verbose=0,
batch_size="auto",
)(
delayed(_process_single_mu)(
mu_idx,
self._motor_units[mu_idx],
self._dt,
self._dz,
self._electrode_array.pts,
)
for mu_idx in self._MUs_to_simulate
):
# Results come in order, map back to original MU index
mu_idx = self._MUs_to_simulate[len(results)]
results[mu_idx] = muap
max_length = max(max_length, muap_length)
pbar.update(1)
# Create neo Block structure
# Create segments for ALL MUs (maintaining index order)
# Non-computed MUs get empty signals
block = Block()
for mu_idx in range(n_motor_units):
muap = results.get(mu_idx)
if muap is None:
# Create empty segment for MUs with no fibers or not selected
block.segments.append(segment := Segment(name="MUAP_None"))
segment.analogsignals.append(
AnalogSignal(
np.zeros((1, len(self._electrode_array.pts))) * pq.dimensionless,
sampling_rate=self._sampling_frequency__Hz * pq.Hz,
)
)
else:
block.segments.append(segment := Segment(name=f"MUAP_{muap.shape[0]}"))
segment.analogsignals.append(
AnalogSignal(
muap * pq.dimensionless,
sampling_rate=self._sampling_frequency__Hz * pq.Hz,
)
)
self._muaps__Block = block
return self._muaps__Block
def _analyze_detectable_motor_units(self) -> tuple[np.ndarray, list[int]]:
"""
Analyze which motor units are detectable by the electrode.
This implements s11_cl_get_detectable_mus.m logic for determining
motor unit visibility based on signal-to-noise ratio and contribution.
Returns
-------
tuple[np.ndarray, List[int]]
Boolean array of detectable motor units and their indices
"""
if self._muaps__Block is None:
raise ValueError("Must call simulate_muaps() first")
print("Analyzing motor unit detectability...")
detectable = np.zeros(len(self._motor_units), dtype=bool)
# Prominence criterion: MUAP amplitude vs noise
over_noise_threshold = 2.0 # 2x noise level
for i, mu_sim in enumerate(self._motor_units):
# Get peak MUAP amplitude across all channels
muap_data = self._muaps__Block.segments[i].analogsignals[0].magnitude
muap_amplitudes = np.max(np.abs(muap_data), axis=0)
max_amplitude = np.max(muap_amplitudes)
# Check if MUAP is prominent enough above noise
# Use a simple noise threshold estimate
all_muaps = np.array(
[seg.analogsignals[0].magnitude for seg in self._muaps__Block.segments]
)
noise_estimate = np.std(all_muaps) * 0.1 # Simple noise estimate
is_prominent = max_amplitude > over_noise_threshold * noise_estimate
# Additional criterion: contribution to total signal variance
relative_size = (i + 1) / len(self._motor_units)
min_contribution = 0.05 # Minimum 5% contribution
contributes_enough = relative_size > min_contribution
detectable[i] = is_prominent and contributes_enough
detectable_indices = [i for i, det in enumerate(detectable) if det]
print(f"Found {np.sum(detectable)} detectable motor units out of {len(self._motor_units)}")
return detectable, detectable_indices
[docs]
def simulate_intramuscular_emg(
self,
spike_train__Block: SPIKE_TRAIN__Block,
) -> INTRAMUSCULAR_EMG__Block:
"""
Generate intramuscular EMG signals using the provided spike train block.
This method convolves the pre-computed MUAP templates with spike trains
to synthesize realistic intramuscular EMG signals. The process includes temporal
resampling and supports both CPU and GPU acceleration for efficient computation.
Parameters
----------
spike_train__Block : SPIKE_TRAIN__Block
Block containing spike trains organized as segments (pools) with spiketrains.
Returns
-------
INTRAMUSCULAR_EMG__Block
Intramuscular EMG signals for the electrode array stored in a neo.Block.
Results are stored in the `intramuscular_emg__Block` property after execution.
Raises
------
ValueError
If MUAP templates have not been generated. Call simulate_muaps() first.
"""
if self._muaps__Block is None:
raise ValueError("MUAP templates have not been generated. Call simulate_muaps() first.")
if not HAS_ELEPHANT:
raise ImportError(
"Elephant is required for intramuscular EMG simulation. "
"Install with: pip install myogen[elephant]"
)
# Store spike train data privately
self._spike_train__Block = spike_train__Block
# Handle MUs to simulate
if self._MUs_to_simulate is None:
MUs_to_simulate = set(
range(len(self._muscle_model.resulting_number_of_innervated_fibers))
)
else:
MUs_to_simulate = set(self._MUs_to_simulate)
# Extract MUAP data from Block and pad to same length
muap_data_list = [seg.analogsignals[0].magnitude for seg in self._muaps__Block.segments]
# Find the maximum length among all MUAPs
max_length = max(muap.shape[0] for muap in muap_data_list)
n_electrodes = muap_data_list[0].shape[1]
# Pad all MUAPs to the same length (centered, pad both sides)
padded_muaps = []
for muap in muap_data_list:
pad_total = max_length - muap.shape[0]
if pad_total > 0:
pad_left = pad_total // 2
pad_right = pad_total - pad_left
pad_width = ((pad_left, pad_right), (0, 0))
padded_muap = np.pad(muap, pad_width, mode="constant", constant_values=0)
else:
padded_muap = muap
padded_muaps.append(padded_muap)
muap_array = np.array(padded_muaps)
# Extract timestep from the first spike train
first_spiketrain = spike_train__Block.segments[0].spiketrains[0]
spiketrain_timestep__ms = first_spiketrain.sampling_period.rescale("ms")
target_length = int(
np.round(
muap_array.shape[1]
/ self._sampling_frequency__Hz
* 1
/ (spiketrain_timestep__ms.rescale("s").magnitude)
)
)
muap_shapes = np.zeros((muap_array.shape[0], muap_array.shape[2], target_length))
for muap_nr in range(muap_shapes.shape[0]):
for electrode_nr in range(muap_shapes.shape[1]):
muap_shapes[muap_nr, electrode_nr] = np.interp(
np.linspace(
0,
muap_array.shape[1] / self._sampling_frequency__Hz,
target_length,
endpoint=False,
),
np.arange(
0,
muap_array.shape[1] / self._sampling_frequency__Hz,
1 / self._sampling_frequency__Hz,
),
muap_array[muap_nr, :, electrode_nr],
)
# Convert spike train block to numpy arrays
n_pools = len(spike_train__Block.segments)
n_neurons = len(spike_train__Block.segments[0].spiketrains)
n_electrodes = muap_shapes.shape[1]
# Convert spike trains to binary arrays using Elephant, suppressing rounding error logging
elephant_utils_logger = logging.getLogger(elephant.utils.__file__)
original_level = elephant_utils_logger.level
elephant_utils_logger.setLevel(logging.ERROR)
try:
spike_trains = np.array(
[
elephant.conversion.BinnedSpikeTrain(
segment.spiketrains, bin_size=spiketrain_timestep__ms
)
.to_array()
.astype(bool)
for segment in spike_train__Block.segments
]
)
finally:
elephant_utils_logger.setLevel(original_level)
# Create active neuron indices (all neurons are active in each pool for spike train block)
active_neuron_indices = [list(range(n_neurons)) for _ in range(n_pools)]
# Initialize result array
sample_conv = np.convolve(
spike_trains[0, 0],
muap_shapes[0, 0],
mode="same",
)
intramuscular_emg = np.zeros((n_pools, n_electrodes, len(sample_conv)))
# Normalize MUAP shapes before convolution
# Note: Unlike surface EMG, intramuscular MUAP amplitudes from biophysical
# calculations are in arbitrary units and must be normalized to prevent
# numerical overflow during convolution. Final EMG amplitudes are determined
# by the spike train convolution, not the raw MUAP amplitudes.
muap_shapes /= np.max(np.abs(muap_shapes))
# Perform convolution for each pool using GPU acceleration if available
if HAS_CUPY:
# Use GPU acceleration with CuPy
spike_gpu = cp.asarray(spike_trains)
muap_gpu = cp.asarray(muap_shapes)
intramuscular_emg_gpu = cp.zeros((n_pools, n_electrodes, len(sample_conv)))
for pool_idx in tqdm(
range(n_pools),
desc="Intramuscular EMG (GPU)",
unit="pool",
):
pool_active_neurons = set(active_neuron_indices[pool_idx])
for e_idx in range(n_electrodes):
# Process all active MUs on GPU
convolutions = []
for mu_idx in MUs_to_simulate.intersection(pool_active_neurons):
# Use mu_idx directly since muap_shapes now contains all MUs
if mu_idx < muap_gpu.shape[0]:
conv = cp.correlate(
spike_gpu[pool_idx, mu_idx],
muap_gpu[mu_idx, e_idx],
mode="same",
)
convolutions.append(conv)
convolutions = cp.array(convolutions) if convolutions else cp.array([])
# Sum across MUAPs on GPU
if len(convolutions) > 0:
intramuscular_emg_gpu[pool_idx, e_idx] = cp.sum(convolutions, axis=0)
# Transfer results back to CPU
intramuscular_emg = cp.asnumpy(intramuscular_emg_gpu)
else:
# Fallback to CPU computation with NumPy
for pool_idx in tqdm(
range(n_pools),
desc="Intramuscular EMG (CPU)",
unit="pool",
):
pool_active_neurons = set(active_neuron_indices[pool_idx])
for e_idx in range(n_electrodes):
# Process all active MUs
convolutions = []
for mu_idx in MUs_to_simulate.intersection(pool_active_neurons):
# Use mu_idx directly since muap_shapes now contains all MUs
if mu_idx < muap_shapes.shape[0]:
conv = np.correlate(
spike_trains[pool_idx, mu_idx],
muap_shapes[mu_idx, e_idx],
mode="same",
)
convolutions.append(conv)
if convolutions:
intramuscular_emg[pool_idx, e_idx] = np.sum(convolutions, axis=0)
# Temporal resampling
intramuscular_emg_resampled = np.zeros(
(
n_pools,
n_electrodes,
int(
intramuscular_emg.shape[-1]
* spiketrain_timestep__ms.rescale("s").magnitude
* self._sampling_frequency__Hz
),
)
)
for pool_idx in range(n_pools):
for e_idx in range(n_electrodes):
intramuscular_emg_resampled[pool_idx, e_idx] = np.interp(
x=np.arange(
start=0,
stop=intramuscular_emg.shape[-1]
* spiketrain_timestep__ms.rescale("s").magnitude,
step=1 / self._sampling_frequency__Hz,
),
xp=np.arange(
start=0,
stop=intramuscular_emg.shape[-1]
* spiketrain_timestep__ms.rescale("s").magnitude,
step=spiketrain_timestep__ms.rescale("s").magnitude,
),
fp=intramuscular_emg[pool_idx, e_idx],
)
# Create neo Block structure
block = Block()
# Create segments for each motor unit pool
for pool_idx in range(n_pools):
segment = Segment(name=f"Pool_{pool_idx}")
block.segments.append(segment)
# Create AnalogSignal for this pool's EMG data
segment.analogsignals.append(
AnalogSignal(
intramuscular_emg_resampled[pool_idx].T * pq.dimensionless,
t_start=0 * pq.ms,
sampling_rate=self._sampling_frequency__Hz * pq.Hz,
)
)
# Store results privately
self._intramuscular_emg__Block = block
return block
[docs]
def add_noise(self, snr__dB: float, noise_type: str = "gaussian") -> INTRAMUSCULAR_EMG__Block:
"""
Add noise to the electrode array.
This method adds realistic noise to the simulated intramuscular EMG signals
based on a specified signal-to-noise ratio. The noise is calculated
and applied independently for each electrode channel to ensure that
channels with different signal amplitudes maintain the specified SNR.
Parameters
----------
snr__dB : float
Signal-to-noise ratio in dB. Higher values result in cleaner signals.
Typical intramuscular EMG has SNR ranging from 15-50 dB.
The SNR is applied independently to each electrode channel.
noise_type : str, default="gaussian"
Type of noise to add. Currently supports "gaussian" for white noise.
Returns
-------
INTRAMUSCULAR_EMG__Block
Noisy intramuscular EMG signals for the electrode array as a neo.Block.
Results are stored in the `noisy_intramuscular_emg__Block` property after execution.
Raises
------
ValueError
If intramuscular EMG has not been simulated. Call simulate_intramuscular_emg() first.
Notes
-----
The noise is computed per-channel (per electrode) to maintain the specified
SNR independently across all channels. This ensures that electrodes with
different signal amplitudes receive appropriately scaled noise.
"""
if self._intramuscular_emg__Block is None:
raise ValueError(
"Intramuscular EMG has not been simulated. Call simulate_intramuscular_emg() first."
)
noisy_block = Block()
for pool_idx, segment in enumerate(self._intramuscular_emg__Block.segments):
noisy_segment = Segment(name=f"Pool_{pool_idx}")
noisy_block.segments.append(noisy_segment)
# Get the EMG signal data
emg_signal = segment.analogsignals[0]
emg_array = emg_signal.magnitude # Shape: (time, n_electrodes)
# Calculate signal power PER CHANNEL (per electrode)
# Mean along time axis (axis=0) gives power per electrode
signal_power_per_channel = np.mean(emg_array**2, axis=0) # Shape: (n_electrodes,)
# Calculate noise power per channel
snr_linear = 10 ** (snr__dB / 10)
noise_power_per_channel = signal_power_per_channel / snr_linear
noise_std_per_channel = np.sqrt(noise_power_per_channel) # Shape: (n_electrodes,)
# Generate noise
if noise_type.lower() == "gaussian":
# Generate standard normal noise, then scale per channel
noise = RANDOM_GENERATOR.normal(loc=0.0, scale=1.0, size=emg_array.shape)
# Broadcast noise_std_per_channel along time axis
# noise shape: (time, n_electrodes)
# noise_std_per_channel shape: (n_electrodes,)
# Broadcasting: (time, n_electrodes) * (1, n_electrodes)
noise = noise * noise_std_per_channel[np.newaxis, :]
else:
raise ValueError(f"Unsupported noise type: {noise_type}")
# Add noise
noisy_emg = emg_array + noise
# Create new AnalogSignal with noise
noisy_segment.analogsignals.append(
AnalogSignal(
noisy_emg * emg_signal.units,
t_start=emg_signal.t_start,
sampling_rate=emg_signal.sampling_rate,
)
)
# Store results privately
self._noisy_intramuscular_emg__Block = noisy_block
return noisy_block
# Property accessors for computed results
@property
def muaps__Block(self) -> INTRAMUSCULAR_MUAP__Block:
"""
Intramuscular MUAP shapes for the electrode array.
Returns
-------
INTRAMUSCULAR_MUAP__Block
Intramuscular MUAP templates for the electrode array as a neo.Block.
Raises
------
ValueError
If MUAP templates have not been computed yet.
"""
if self._muaps__Block is None:
raise ValueError("MUAP templates not computed. Call simulate_muaps() first.")
return self._muaps__Block
@property
def intramuscular_emg__Block(self) -> INTRAMUSCULAR_EMG__Block:
"""
Intramuscular EMG signals for the electrode array.
Returns
-------
INTRAMUSCULAR_EMG__Block
Intramuscular EMG signals for the electrode array as a neo.Block.
Raises
------
ValueError
If intramuscular EMG has not been computed yet.
"""
if self._intramuscular_emg__Block is None:
raise ValueError(
"Intramuscular EMG signals not computed. Call simulate_intramuscular_emg() first."
)
return self._intramuscular_emg__Block
@property
def noisy_intramuscular_emg__Block(self) -> INTRAMUSCULAR_EMG__Block:
"""
Noisy intramuscular EMG signals for the electrode array.
Returns
-------
INTRAMUSCULAR_EMG__Block
Noisy intramuscular EMG signals for the electrode array as a neo.Block.
Raises
------
ValueError
If noisy intramuscular EMG has not been computed yet.
"""
if self._noisy_intramuscular_emg__Block is None:
raise ValueError(
"Noisy intramuscular EMG signals not computed. Call add_noise() first."
)
return self._noisy_intramuscular_emg__Block
@property
def spike_train__Block(self) -> SPIKE_TRAIN__Block:
"""
Spike train block used for EMG generation.
Returns
-------
SPIKE_TRAIN__Block
The spike train block used in the simulation.
Raises
------
ValueError
If spike train block has not been set yet.
"""
if self._spike_train__Block is None:
raise ValueError("Spike train block not set. Call simulate_intramuscular_emg() first.")
return self._spike_train__Block