Source code for myogen.simulator.core.emg.intramuscular.intramuscular_emg

"""
Intramuscular Electromyography (iEMG) Simulation.

This module provides the main simulation framework for generating intramuscular
electromyography signals using needle electrodes. It integrates motor unit
simulation, electrode modeling, and signal generation with realistic noise.
"""

import warnings
from typing import Optional

import numpy as np
from tqdm import tqdm

from myogen import RANDOM_GENERATOR
from myogen.simulator.core.emg.electrodes import IntramuscularElectrodeArray
from myogen.simulator.core.muscle import Muscle
from myogen.simulator.core.spike_train import MotorNeuronPool
from myogen.utils.types import (
    INTRAMUSCULAR_EMG__TENSOR,
    INTRAMUSCULAR_MUAP_SHAPE__TENSOR,
    beartowertype,
)

try:
    import cupy as cp

    HAS_CUPY = True
except ImportError:
    HAS_CUPY = False

from .motor_unit_sim import MotorUnitSim

# Suppress warnings for cleaner output
warnings.filterwarnings("ignore")


[docs] @beartowertype class IntramuscularEMG: """ Intramuscular Electromyography (iEMG) Simulation. This class provides a comprehensive simulation framework for generating intramuscular EMG signals detected by needle electrodes. Parameters ---------- muscle_model : Muscle Pre-computed muscle model (see :class:`myogen.simulator.Muscle`). electrode_array : IntramuscularElectrodeArray Intramuscular electrode array configuration to use for simulation (see :class:`myogen.simulator.IntramuscularElectrodeArray`). sampling_frequency__Hz : float, default=10240.0 Sampling frequency in Hz for EMG simulation. Default is set to 10240 Hz as used by the Quattrocento (OT Bioelettronica, Turin, Italy) system. spatial_resolution__mm : float, default=0.01 Spatial resolution for fiber action potential calculation in mm. Default is set to 0.01 mm. endplate_center__percent : float, default=50 Percentage of muscle length where the endplate is located. By default, the endplate is located at the center of the muscle (50% of the muscle length). nmj_jitter__s : float, default=35e-6 Standard deviation of neuromuscular junction jitter in seconds. Default is set to 35e-6 s as determined by Konstantin et al. 2020 [1]_. branch_cvs__m_per_s : tuple[float, float], default=(5.0, 2.0) Conduction velocities for the two-layer model of the neuromuscular junction in m/s. Default is set to (5.0, 2.0) m/s as determined by Konstantin et al. 2020 [1]_. .. note:: The two-layer model is a simplification of the actual arborization pattern, but it is a good approximation for the purposes of this simulation. Follows the implementation of Kontos et al. 2020 [1]_. MUs_to_simulate : list[int], optional Indices of motor units to simulate. If None, all motor units are simulated. Default is None. For computational efficiency, consider simulating subsets for initial analysis. Indices correspond to the recruitment order (0 is recruited first). Attributes ---------- muaps : INTRAMUSCULAR_MUAP_SHAPE__TENSOR Intramuscular MUAP shapes for all electrode arrays. Available after simulate_muaps(). intramuscular_emg__tensor : INTRAMUSCULAR_EMG__TENSOR Intramuscular EMG signals for all electrode arrays. Available after simulate_intramuscular_emg(). noisy_intramuscular_emg__tensor : INTRAMUSCULAR_EMG__TENSOR Noisy intramuscular EMG signals for all electrode arrays. Available after add_noise(). motor_neuron_pool : MotorNeuronPool Motor neuron pool used for EMG generation. Available after simulate_intramuscular_emg(). References ---------- .. [1] Konstantin, A., Yu, T., Le Carpentier, E., Aoustin, Y., Farina, D., 2020. Simulation of Motor Unit Action Potential Recordings From Intramuscular Multichannel Scanning Electrodes. IEEE Transactions on Biomedical Engineering 67, 2005–2014. https://doi.org/10.1109/TBME.2019.2953680 """
[docs] def __init__( self, muscle_model: Muscle, electrode_array: IntramuscularElectrodeArray, sampling_frequency__Hz: float = 10240.0, spatial_resolution__mm: float = 0.01, endplate_center__percent: float = 50, nmj_jitter__s: float = 35e-6, branch_cvs__m_per_s: tuple[float, float] = (5.0, 2.0), MUs_to_simulate: list[int] | None = None, ): # Immutable public arguments - never modify these self.muscle_model = muscle_model self.electrode_array = electrode_array self.sampling_frequency__Hz = sampling_frequency__Hz self.spatial_resolution__mm = spatial_resolution__mm self.endplate_center__percent = endplate_center__percent self.nmj_jitter__s = nmj_jitter__s self.branch_cvs__m_per_s = branch_cvs__m_per_s self.MUs_to_simulate = MUs_to_simulate # Private copies for internal modifications self._muscle_model = muscle_model self._electrode_array = electrode_array self._sampling_frequency__Hz = sampling_frequency__Hz self._spatial_resolution__mm = spatial_resolution__mm self._endplate_center__percent = endplate_center__percent self._nmj_jitter__s = nmj_jitter__s self._branch_cvs__m_per_s = branch_cvs__m_per_s self._MUs_to_simulate = MUs_to_simulate # Derived parameters - immutable public access self.branch_cvs__mm_per_s = list( ( self._branch_cvs__m_per_s[0] * 1000.0, self._branch_cvs__m_per_s[1] * 1000.0, ) ) self.endplate_center__mm = self._muscle_model.length__mm * ( self._endplate_center__percent / 100.0 ) # Private copies for internal modifications self._branch_cvs__mm_per_s: list[float] = list( ( self._branch_cvs__m_per_s[0] * 1000.0, self._branch_cvs__m_per_s[1] * 1000.0, ) ) self._endplate_center__mm = self._muscle_model.length__mm * ( self._endplate_center__percent / 100.0 ) # Derived parameters - private for internal use self._dt = 1.0 / self._sampling_frequency__Hz self._dz = self._spatial_resolution__mm self._n_motor_units = len(self._muscle_model.recruitment_thresholds) # Motor unit selection if self._MUs_to_simulate is None: self._MUs_to_simulate = list(range(self._n_motor_units)) else: self._MUs_to_simulate = self._MUs_to_simulate # Motor unit simulations - private storage self._motor_units: list[MotorUnitSim] = [] # List of motor unit simulators self._muaps: Optional[INTRAMUSCULAR_MUAP_SHAPE__TENSOR] = None self._max_muap_length: int = 0 # Simulation results - stored privately, accessed via properties self._intramuscular_emg__tensor: Optional[INTRAMUSCULAR_EMG__TENSOR] = None self._noisy_intramuscular_emg__tensor: Optional[INTRAMUSCULAR_EMG__TENSOR] = ( None ) self._motor_neuron_pool: Optional[MotorNeuronPool] = None
[docs] def simulate_muaps(self) -> INTRAMUSCULAR_MUAP_SHAPE__TENSOR: """ Simulate MUAPs for all electrode arrays using the provided muscle model. This method generates intramuscular Motor Unit Action Potential (MUAP) templates by simulating individual motor units with realistic neuromuscular junction distributions and fiber action potential propagation. Returns ------- INTRAMUSCULAR_MUAP_SHAPE__TENSOR Intramuscular MUAP shapes for all electrode arrays. Results are stored in the `muaps` property after execution. Notes ----- This method must be called before simulate_intramuscular_emg(). The process includes: (1) motor unit initialization, (2) neuromuscular junction simulation, and (3) MUAP calculation with spatial filtering. """ self._initialize_motor_units() self._simulate_neuromuscular_junctions() return self._calculate_muaps()
def _initialize_motor_units(self) -> None: """ Initialize individual motor unit simulators. This method creates MotorUnitSim objects for each motor unit based on the muscle model fiber assignments and properties. """ if ( not hasattr(self._muscle_model, "assignment") or self._muscle_model.assignment is None ): raise ValueError( "Muscle model must have fiber assignments. Call muscle.assign_mfs2mns() first." ) self._motor_units = [] for mu_idx in tqdm( self._MUs_to_simulate, desc="Creating motor unit simulators", unit="Simulator", ): # Get fibers assigned to this motor unit fiber_mask = self._muscle_model.assignment == mu_idx if not np.any(fiber_mask): continue # Create motor unit simulator self._motor_units.append( MotorUnitSim( muscle_fiber_centers__mm=self._muscle_model.muscle_fiber_centers__mm[ fiber_mask ], muscle_length__mm=self._muscle_model.length__mm, muscle_fiber_diameters__mm=self._muscle_model.muscle_fiber_diameters__mm[ fiber_mask ], muscle_fiber_conduction_velocity__mm_per_s=self._muscle_model.muscle_fiber_conduction_velocities__mm_per_s[ fiber_mask ], neuromuscular_junction_conduction_velocities__mm_per_s=self._branch_cvs__mm_per_s, nominal_center__mm=self._muscle_model.innervation_center_positions__mm[ mu_idx ], ) ) def _simulate_neuromuscular_junctions(self) -> None: """ Simulate neuromuscular junction distributions for all motor units. This implements the logic from s08_cl_init_muaps.m for generating realistic NMJ branch patterns with size-dependent complexity. """ if not self._motor_units: raise ValueError("Must call _initialize_motor_units() first") n_branches = 1 + np.round( np.log( self._muscle_model.recruitment_thresholds / self._muscle_model.recruitment_thresholds[0] ) ) for mu_idx, mu_sim in enumerate( tqdm( self._motor_units, desc="Setting up NMJ distributions", unit="Simulator" ) ): spread_factor = np.sum( self._muscle_model.recruitment_thresholds[:mu_idx] ) / np.sum(self._muscle_model.recruitment_thresholds) # Create NMJ distribution # Branch spread increases with motor unit size mu_sim.sim_nmj_branches_two_layers( n_branches=int(n_branches[mu_idx]), endplate_center=self._endplate_center__mm, branches_z_std=1.5 + spread_factor * 4.0, arborization_z_std=0.5 + spread_factor * 1.5, ) def _calculate_muaps(self) -> INTRAMUSCULAR_MUAP_SHAPE__TENSOR: """ Pre-calculate motor unit action potentials (MUAPs). Returns ------- INTRAMUSCULAR_MUAP_SHAPE__TENSOR Intramuscular MUAP shapes for all electrode arrays. """ if not self._motor_units: raise ValueError("Must call _initialize_motor_units() first") # Calculate SFAPs for each motor unit for i, mu_sim in enumerate(self._motor_units): mu_sim.calc_sfaps( index=i, dt=self._dt, dz=self._dz, electrode_positions=self._electrode_array.pts, ) # Calculate MUAPs (no jitter for templates) muaps_list: list[np.ndarray] = [] max_length = 0 for mu_sim in tqdm(self._motor_units, desc="Computing MUAPs", unit="MU"): muap = mu_sim.calc_muap(jitter_std=0.0) # No jitter for templates muaps_list.append(muap) max_length = max(max_length, muap.shape[0]) self._muaps: INTRAMUSCULAR_MUAP_SHAPE__TENSOR = np.zeros( (len(self._motor_units), self._electrode_array.pts.shape[0], max_length) ) for i, muap in enumerate(muaps_list): self._muaps[i, :, : muap.shape[0]] = muap.T return self._muaps def _analyze_detectable_motor_units(self) -> tuple[np.ndarray, list[int]]: """ Analyze which motor units are detectable by the electrode. This implements s11_cl_get_detectable_mus.m logic for determining motor unit visibility based on signal-to-noise ratio and contribution. Returns ------- tuple[np.ndarray, List[int]] Boolean array of detectable motor units and their indices """ if self._muaps is None: raise ValueError("Must call simulate_muaps() first") print("Analyzing motor unit detectability...") detectable = np.zeros(len(self._motor_units), dtype=bool) # Prominence criterion: MUAP amplitude vs noise over_noise_threshold = 2.0 # 2x noise level for i, mu_sim in enumerate(self._motor_units): # Get peak MUAP amplitude across all channels muap_amplitudes = np.max(np.abs(self._muaps[i]), axis=0) max_amplitude = np.max(muap_amplitudes) # Check if MUAP is prominent enough above noise # Use a simple noise threshold estimate noise_estimate = np.std(self._muaps) * 0.1 # Simple noise estimate is_prominent = max_amplitude > over_noise_threshold * noise_estimate # Additional criterion: contribution to total signal variance relative_size = (i + 1) / len(self._motor_units) min_contribution = 0.05 # Minimum 5% contribution contributes_enough = relative_size > min_contribution detectable[i] = is_prominent and contributes_enough detectable_indices = [i for i, det in enumerate(detectable) if det] print( f"Found {np.sum(detectable)} detectable motor units out of {len(self._motor_units)}" ) return detectable, detectable_indices
[docs] def simulate_intramuscular_emg( self, motor_neuron_pool: MotorNeuronPool, ) -> INTRAMUSCULAR_EMG__TENSOR: """ Generate intramuscular EMG signals for all electrode arrays using the provided motor neuron pool. This method convolves the pre-computed MUAP templates with motor neuron spike trains to synthesize realistic intramuscular EMG signals. The process includes temporal resampling and supports both CPU and GPU acceleration for efficient computation. Parameters ---------- motor_neuron_pool : MotorNeuronPool Motor neuron pool with spike trains computed (see :class:`myogen.simulator.MotorNeuronPool`). Returns ------- INTRAMUSCULAR_EMG__TENSOR Intramuscular EMG signals for all electrode arrays. Results are stored in the `intramuscular_emg__tensor` property after execution. Raises ------ ValueError If MUAP templates have not been generated. Call simulate_muaps() first. """ if self._muaps is None: raise ValueError( "MUAP templates have not been generated. Call simulate_muaps() first." ) # Store motor neuron pool privately self._motor_neuron_pool = motor_neuron_pool # Handle MUs to simulate if self._MUs_to_simulate is None: MUs_to_simulate = set( range(len(self._muscle_model.resulting_number_of_innervated_fibers)) ) else: MUs_to_simulate = set(self._MUs_to_simulate) muap_array = self._muaps.copy() target_length = int( np.round( muap_array.shape[2] / self._sampling_frequency__Hz * 1 / self._motor_neuron_pool.timestep__ms * 1000 ) ) muap_shapes = np.zeros( (muap_array.shape[0], muap_array.shape[1], target_length) ) for muap_nr in range(muap_shapes.shape[0]): for electrode_nr in range(muap_shapes.shape[1]): muap_shapes[muap_nr, electrode_nr] = np.interp( np.linspace( 0, muap_array.shape[-1] / self._sampling_frequency__Hz, target_length, endpoint=False, ), np.arange( 0, muap_array.shape[-1] / self._sampling_frequency__Hz, 1 / self._sampling_frequency__Hz, ), muap_array[muap_nr, electrode_nr], ) n_pools = self._motor_neuron_pool.spike_trains.shape[0] n_electrodes = muap_shapes.shape[1] # Initialize result array sample_conv = np.convolve( self._motor_neuron_pool.spike_trains[0, 0], muap_shapes[0, 0], mode="same", ) intramuscular_emg = np.zeros((n_pools, n_electrodes, len(sample_conv))) muap_shapes /= np.max(np.abs(muap_shapes)) # Normalize MUAP shapes # Perform convolution for each pool using GPU acceleration if available if HAS_CUPY: # Use GPU acceleration with CuPy spike_gpu = cp.asarray(self._motor_neuron_pool.spike_trains) muap_gpu = cp.asarray(muap_shapes) intramuscular_emg_gpu = cp.zeros((n_pools, n_electrodes, len(sample_conv))) for pool_idx in tqdm( range(n_pools), desc=f"Intramuscular EMG (GPU)", unit="pool", ): active_neuron_indices = set( self._motor_neuron_pool.active_neuron_indices[pool_idx] ) for e_idx in range(n_electrodes): # Process all active MUs on GPU convolutions = cp.array( [ cp.correlate( spike_gpu[pool_idx, mu_idx], muap_gpu[i, e_idx], mode="same", ) for i, mu_idx in enumerate( MUs_to_simulate.intersection(active_neuron_indices) ) ] ) # Sum across MUAPs on GPU if len(convolutions) > 0: intramuscular_emg_gpu[pool_idx, e_idx] = cp.sum( convolutions, axis=0 ) # Transfer results back to CPU intramuscular_emg = cp.asnumpy(intramuscular_emg_gpu) else: # Fallback to CPU computation with NumPy for pool_idx in tqdm( range(n_pools), desc=f"Intramuscular EMG (CPU)", unit="pool", ): active_neuron_indices = set( self._motor_neuron_pool.active_neuron_indices[pool_idx] ) for e_idx in range(n_electrodes): # Process all active MUs convolutions = [] for i, mu_idx in enumerate( MUs_to_simulate.intersection(active_neuron_indices) ): conv = np.correlate( self._motor_neuron_pool.spike_trains[pool_idx, mu_idx], muap_shapes[i, e_idx], mode="same", ) convolutions.append(conv) if convolutions: intramuscular_emg[pool_idx, e_idx] = np.sum( convolutions, axis=0 ) intramuscular_emg_resampled = np.zeros( ( n_pools, n_electrodes, int( intramuscular_emg.shape[-1] / (1 / self._motor_neuron_pool.timestep__ms * 1000) * self._sampling_frequency__Hz ), ) ) for pool_idx in range(n_pools): for e_idx in range(n_electrodes): intramuscular_emg_resampled[pool_idx, e_idx] = np.interp( np.arange( 0, intramuscular_emg.shape[-1] * (self._motor_neuron_pool.timestep__ms / 1000), 1 / self._sampling_frequency__Hz, ), np.arange( 0, intramuscular_emg.shape[-1] * (self._motor_neuron_pool.timestep__ms / 1000), self._motor_neuron_pool.timestep__ms / 1000, ), intramuscular_emg[pool_idx, e_idx], ) # Store results privately self._intramuscular_emg__tensor = intramuscular_emg_resampled return intramuscular_emg_resampled
[docs] def add_noise( self, snr_db: float, noise_type: str = "gaussian" ) -> INTRAMUSCULAR_EMG__TENSOR: """ Add noise to all electrode arrays. This method adds realistic noise to the simulated intramuscular EMG signals based on a specified signal-to-noise ratio. The noise characteristics can be customized to match different recording conditions. Parameters ---------- snr_db : float Signal-to-noise ratio in dB. Higher values result in cleaner signals. Typical intramuscular EMG has SNR ranging from 15-50 dB. noise_type : str, default="gaussian" Type of noise to add. Currently supports "gaussian" for white noise. Returns ------- INTRAMUSCULAR_EMG__TENSOR Noisy intramuscular EMG signals for all electrode arrays. Results are stored in the `noisy_intramuscular_emg__tensor` property after execution. Raises ------ ValueError If intramuscular EMG has not been simulated. Call simulate_intramuscular_emg() first. """ if self._intramuscular_emg__tensor is None: raise ValueError( "Intramuscular EMG has not been simulated. Call simulate_intramuscular_emg() first." ) # Calculate signal power signal_power = np.mean(self._intramuscular_emg__tensor**2) # Calculate noise power snr_linear = 10 ** (snr_db / 10) noise_power = signal_power / snr_linear # Generate noise if noise_type.lower() == "gaussian": noise_std = np.sqrt(noise_power) noise = RANDOM_GENERATOR.normal( loc=0.0, scale=noise_std, size=self._intramuscular_emg__tensor.shape ) else: raise ValueError(f"Unsupported noise type: {noise_type}") # Add noise noisy_emg = self._intramuscular_emg__tensor + noise # Store results privately self._noisy_intramuscular_emg__tensor = noisy_emg return noisy_emg
# Property accessors for computed results @property def muaps(self) -> INTRAMUSCULAR_MUAP_SHAPE__TENSOR: """ Intramuscular MUAP shapes for all electrode arrays. Returns ------- INTRAMUSCULAR_MUAP_SHAPE__TENSOR Intramuscular MUAP templates for the electrode array. Raises ------ ValueError If MUAP templates have not been computed yet. """ if self._muaps is None: raise ValueError( "MUAP templates not computed. Call simulate_muaps() first." ) return self._muaps @property def intramuscular_emg__tensor(self) -> INTRAMUSCULAR_EMG__TENSOR: """ Intramuscular EMG signals for all electrode arrays. Returns ------- INTRAMUSCULAR_EMG__TENSOR Intramuscular EMG signals for the electrode array. Raises ------ ValueError If intramuscular EMG has not been computed yet. """ if self._intramuscular_emg__tensor is None: raise ValueError( "Intramuscular EMG signals not computed. Call simulate_intramuscular_emg() first." ) return self._intramuscular_emg__tensor @property def noisy_intramuscular_emg__tensor(self) -> INTRAMUSCULAR_EMG__TENSOR: """ Noisy intramuscular EMG signals for all electrode arrays. Returns ------- INTRAMUSCULAR_EMG__TENSOR Noisy intramuscular EMG signals for the electrode array. Raises ------ ValueError If noisy intramuscular EMG has not been computed yet. """ if self._noisy_intramuscular_emg__tensor is None: raise ValueError( "Noisy intramuscular EMG signals not computed. Call add_noise() first." ) return self._noisy_intramuscular_emg__tensor @property def motor_neuron_pool(self) -> MotorNeuronPool: """ Motor neuron pool used for EMG generation. Returns ------- MotorNeuronPool The motor neuron pool used in the simulation. Raises ------ ValueError If motor neuron pool has not been set yet. """ if self._motor_neuron_pool is None: raise ValueError( "Motor neuron pool not set. Call simulate_intramuscular_emg() first." ) return self._motor_neuron_pool