Source code for myogen.simulator.core.emg.surface.surface_emg

import warnings
from typing import Optional

try:
    import cupy as cp

    HAS_CUPY = True
except ImportError:
    HAS_CUPY = False

import numpy as np
from tqdm import tqdm

from myogen import RANDOM_GENERATOR
from myogen.simulator.core.muscle import Muscle
from myogen.simulator.core.spike_train import MotorNeuronPool
from myogen.simulator.core.emg.surface.simulate_fiber import simulate_fiber_v2
from myogen.simulator.core.emg.electrodes import SurfaceElectrodeArray
from myogen.utils.types import (
    SURFACE_MUAP_SHAPE__TENSOR,
    SURFACE_EMG__TENSOR,
    beartowertype,
)

# Suppress warnings for cleaner output during batch simulations
warnings.filterwarnings("ignore")


[docs] @beartowertype class SurfaceEMG: """ Surface Electromyography (sEMG) Simulation. This class provides a simulation framework for generating surface electromyography signals from the muscle. It implements the multi-layered cylindrical volume conductor model from Farina et al. 2004 [1]_. Parameters ---------- muscle_model : Muscle Pre-computed muscle model (see :class:`myogen.simulator.Muscle`). electrode_arrays : list[SurfaceElectrodeArray] List of electrode arrays to use for simulation (see :class:`myogen.simulator.SurfaceElectrodeArray`). sampling_frequency__Hz : float, default=2048.0 Sampling frequency in Hz. Default is set to 2048 Hz as used by the Quattrocento (OT Bioelettronica, Turin, Italy) system. sampling_points_in_t_and_z_domains : int, default=256 Spatial and temporal discretization resolution for numerical integration. Controls the accuracy of the volume conductor calculations but significantly impacts computational cost (scales quadratically). Higher values provide better numerical accuracy at the expense of simulation time. Default is set to 256 samples. sampling_points_in_theta_domain : int, default=180 Angular discretization for cylindrical coordinate system in degrees. Higher values provide better spatial resolution for circumferential electrode placement studies. Default is set to 180 points, which provides 2° angular resolution. This is suitable for most EMG studies. MUs_to_simulate : list[int], optional Indices of motor units to simulate. If None, all motor units are simulated. Default is None. For computational efficiency, consider simulating subsets for initial analysis. Indices correspond to the recruitment order (0 is recruited first). Attributes ---------- muaps : list[SURFACE_MUAP_SHAPE__TENSOR] Motor Unit Action Potential (MUAP) templates for each electrode array. Available after simulate_muaps(). surface_emg__tensors : list[SURFACE_EMG__TENSOR] Surface EMG signals for each electrode array. Available after simulate_surface_emg(). noisy_surface_emg__tensors : list[SURFACE_EMG__TENSOR] Noisy surface EMG signals for each electrode array. Available after add_noise(). motor_neuron_pool : MotorNeuronPool Motor neuron pool used for EMG generation. Available after simulate_surface_emg(). References ---------- .. [1] Farina, D., Mesin, L., Martina, S., Merletti, R., 2004. A surface EMG generation model with multilayer cylindrical description of the volume conductor. IEEE Transactions on Biomedical Engineering 51, 415–426. https://doi.org/10.1109/TBME.2003.820998 """
[docs] def __init__( self, muscle_model: Muscle, electrode_arrays: list[SurfaceElectrodeArray], sampling_frequency__Hz: float = 2048.0, sampling_points_in_t_and_z_domains: int = 256, sampling_points_in_theta_domain: int = 180, MUs_to_simulate: list[int] | None = None, ): # Immutable public arguments - never modify these self.muscle_model = muscle_model self.electrode_arrays = electrode_arrays self.sampling_frequency__Hz = sampling_frequency__Hz self.sampling_points_in_t_and_z_domains = sampling_points_in_t_and_z_domains self.sampling_points_in_theta_domain = sampling_points_in_theta_domain self.MUs_to_simulate = MUs_to_simulate # Private copies for internal modifications self._muscle_model = muscle_model self._electrode_arrays = electrode_arrays self._sampling_frequency__Hz = sampling_frequency__Hz self._sampling_points_in_t_and_z_domains = sampling_points_in_t_and_z_domains self._sampling_points_in_theta_domain = sampling_points_in_theta_domain self._MUs_to_simulate = MUs_to_simulate # Derived properties from muscle model - immutable public access self.mean_conduction_velocity__m_s = ( self._muscle_model.mean_conduction_velocity__m_s ) self.mean_fiber_length__mm = self._muscle_model.mean_fiber_length__mm self.var_fiber_length__mm = self._muscle_model.var_fiber_length__mm self.radius_bone__mm = self._muscle_model.radius_bone__mm self.fat_thickness__mm = self._muscle_model.fat_thickness__mm self.skin_thickness__mm = self._muscle_model.skin_thickness__mm self.muscle_conductivity_radial__S_m = ( self._muscle_model.muscle_conductivity_radial__S_m ) self.muscle_conductivity_longitudinal__S_m = ( self._muscle_model.muscle_conductivity_longitudinal__S_m ) self.fat_conductivity__S_m = self._muscle_model.fat_conductivity__S_m self.skin_conductivity__S_m = self._muscle_model.skin_conductivity__S_m # Private copies for internal modifications self._mean_conduction_velocity__m_s = ( self._muscle_model.mean_conduction_velocity__m_s ) self._mean_fiber_length__mm = self._muscle_model.mean_fiber_length__mm self._var_fiber_length__mm = self._muscle_model.var_fiber_length__mm self._radius_bone__mm = self._muscle_model.radius_bone__mm self._fat_thickness__mm = self._muscle_model.fat_thickness__mm self._skin_thickness__mm = self._muscle_model.skin_thickness__mm self._muscle_conductivity_radial__S_m = ( self._muscle_model.muscle_conductivity_radial__S_m ) self._muscle_conductivity_longitudinal__S_m = ( self._muscle_model.muscle_conductivity_longitudinal__S_m ) self._fat_conductivity__S_m = self._muscle_model.fat_conductivity__S_m self._skin_conductivity__S_m = self._muscle_model.skin_conductivity__S_m # Calculate total radius - immutable and private self.radius_total = ( self._muscle_model.radius__mm + self._fat_thickness__mm + self._skin_thickness__mm ) self._radius_total = self.radius_total # Simulation results - stored privately, accessed via properties self._muaps: Optional[list[SURFACE_MUAP_SHAPE__TENSOR]] = None self._surface_emg__tensors: Optional[list[SURFACE_EMG__TENSOR]] = None self._noisy_surface_emg__tensors: Optional[list[SURFACE_EMG__TENSOR]] = None self._motor_neuron_pool: Optional[MotorNeuronPool] = None
[docs] def simulate_muaps(self) -> list[SURFACE_MUAP_SHAPE__TENSOR]: """ Simulate MUAPs for all electrode arrays using the provided muscle model. This method generates Motor Unit Action Potential (MUAP) templates that represent the electrical signature of each motor unit as recorded by the surface electrodes. The simulation uses the multi-layered cylindrical volume conductor model. Returns ------- list[SURFACE_MUAP_SHAPE__TENSOR] List of generated MUAP templates for each electrode array. Results are stored in the `muaps` property after execution. Notes ----- This method must be called before simulate_surface_emg(). The generated MUAP templates are used as basis functions for EMG signal synthesis. """ # Set default MUs to simulate if self._MUs_to_simulate is None: self._MUs_to_simulate = list( range(len(self._muscle_model.resulting_number_of_innervated_fibers)) ) # Calculate innervation zone variance innervation_zone_variance = ( self._mean_fiber_length__mm * 0.1 ) # 10% of the mean fiber length (see Botelho et al. 2019 [6]_) # Extract fiber counts number_of_fibers_per_MUs = ( self._muscle_model.resulting_number_of_innervated_fibers ) # Create time array t = np.linspace( 0, (self._sampling_points_in_t_and_z_domains - 1) / self._sampling_frequency__Hz * 1e-3, self._sampling_points_in_t_and_z_domains, ) # Pre-calculate innervation zones innervation_zones = RANDOM_GENERATOR.uniform( low=-innervation_zone_variance / 2, high=innervation_zone_variance / 2, size=len(self._MUs_to_simulate), ) # Initialize output arrays for each electrode array # Each electrode array can have different dimensions electrode_array_results = [] for array_idx, electrode_array in enumerate(self._electrode_arrays): # Initialize array for this electrode configuration array_result = np.zeros( ( len(self._MUs_to_simulate), electrode_array.num_rows, electrode_array.num_cols, len(t), ) ) # Matrix optimization variables A_matrix = None B_incomplete = None # Process each motor unit for MU_number, MU_index in enumerate(self._MUs_to_simulate): number_of_fibers = number_of_fibers_per_MUs[MU_index] if number_of_fibers == 0: continue # Get fiber positions position_of_fibers = self._muscle_model.resulting_fiber_assignment( MU_index ) innervation_zone = innervation_zones[MU_number] # Process each fiber for fiber_number in tqdm( range(number_of_fibers), desc=f"Electrode Array {array_idx + 1}/{len(self._electrode_arrays)} MU {MU_number + 1}/{len(self._MUs_to_simulate)}", unit="fiber(s)", ): fiber_position = position_of_fibers[fiber_number] # Calculate fiber distance from center R = np.sqrt(fiber_position[0] ** 2 + fiber_position[1] ** 2) # Generate fiber length fiber_length__mm = ( self._mean_fiber_length__mm + RANDOM_GENERATOR.uniform( low=-self._var_fiber_length__mm, high=self._var_fiber_length__mm, ) ) # Calculate fiber end positions L2 = abs(innervation_zone - fiber_length__mm / 2) L1 = abs(innervation_zone + fiber_length__mm / 2) # Use the new simulate_fiber_v2 function if fiber_number == 0 or A_matrix is None: phi_temp, A_matrix, B_incomplete = simulate_fiber_v2( Fs=self._sampling_frequency__Hz * 1e-3, v=self._mean_conduction_velocity__m_s, N=self._sampling_points_in_t_and_z_domains, M=self._sampling_points_in_theta_domain, r=self._radius_total, r_bone=self._radius_bone__mm, th_fat=self._fat_thickness__mm, th_skin=self._skin_thickness__mm, R=R, L1=L1, L2=L2, zi=innervation_zone, electrode_array=electrode_array, sig_muscle_rho=self._muscle_conductivity_radial__S_m, sig_muscle_z=self._muscle_conductivity_longitudinal__S_m, sig_skin=self._skin_conductivity__S_m, sig_fat=self._fat_conductivity__S_m, ) else: phi_temp, _, _ = simulate_fiber_v2( Fs=self._sampling_frequency__Hz * 1e-3, v=self._mean_conduction_velocity__m_s, N=self._sampling_points_in_t_and_z_domains, M=self._sampling_points_in_theta_domain, r=self._radius_total, r_bone=self._radius_bone__mm, th_fat=self._fat_thickness__mm, th_skin=self._skin_thickness__mm, R=R, L1=L1, L2=L2, zi=innervation_zone, electrode_array=electrode_array, sig_muscle_rho=self._muscle_conductivity_radial__S_m, sig_muscle_z=self._muscle_conductivity_longitudinal__S_m, sig_skin=self._skin_conductivity__S_m, sig_fat=self._fat_conductivity__S_m, A_matrix=A_matrix, B_incomplete=B_incomplete, ) array_result[MU_number] += phi_temp electrode_array_results.append(array_result) # Store results privately self._muaps = electrode_array_results return electrode_array_results
[docs] def simulate_surface_emg( self, motor_neuron_pool: MotorNeuronPool ) -> list[SURFACE_EMG__TENSOR]: """ Generate surface EMG signals for all electrode arrays using the provided motor neuron pool. This method convolves the pre-computed MUAP templates with the motor neuron spike trains to synthesize realistic surface EMG signals. The process includes temporal resampling to match the motor neuron pool timestep and supports both CPU and GPU acceleration. Parameters ---------- motor_neuron_pool : MotorNeuronPool Motor neuron pool with spike trains computed (see :class:`myogen.simulator.MotorNeuronPool`). Returns ------- list[SURFACE_EMG__TENSOR] Surface EMG signals for each electrode array. Results are stored in the `surface_emg__tensors` property after execution. Raises ------ ValueError If MUAP templates have not been generated. Call simulate_muaps() first. """ if self._muaps is None: raise ValueError( "MUAP templates have not been generated. Call simulate_muaps() first." ) # Store motor neuron pool privately self._motor_neuron_pool = motor_neuron_pool # Handle MUs to simulate if self._MUs_to_simulate is None: MUs_to_simulate = set( range(len(self._muscle_model.resulting_number_of_innervated_fibers)) ) else: MUs_to_simulate = set(self._MUs_to_simulate) emg_results = [] for array_idx, muap_array in enumerate(self._muaps): # Temporal resampling muap_shapes = np.zeros( ( muap_array.shape[0], muap_array.shape[1], muap_array.shape[2], int( muap_array.shape[3] / self._sampling_frequency__Hz * 1 / self._motor_neuron_pool.timestep__ms * 1000 ), ) ) for muap_nr in range(muap_shapes.shape[0]): for row in range(muap_shapes.shape[1]): for col in range(muap_shapes.shape[2]): muap_shapes[muap_nr, row, col] = np.interp( np.arange( 0, muap_array.shape[-1] / self._sampling_frequency__Hz, self._motor_neuron_pool.timestep__ms / 1000, ), np.arange( 0, muap_array.shape[-1] / self._sampling_frequency__Hz, 1 / self._sampling_frequency__Hz, ), muap_array[muap_nr, row, col], ) n_pools = self._motor_neuron_pool.spike_trains.shape[0] n_rows = muap_shapes.shape[1] n_cols = muap_shapes.shape[2] # Initialize result array sample_conv = np.convolve( self._motor_neuron_pool.spike_trains[0, 0], muap_shapes[0, 0, 0], mode="same", ) surface_emg = np.zeros((n_pools, n_rows, n_cols, len(sample_conv))) muap_shapes /= np.max(np.abs(muap_shapes)) # Normalize MUAP shapes # Perform convolution for each pool using GPU acceleration if available if HAS_CUPY: # Use GPU acceleration with CuPy spike_gpu = cp.asarray(self._motor_neuron_pool.spike_trains) muap_gpu = cp.asarray(muap_shapes) surface_emg_gpu = cp.zeros((n_pools, n_rows, n_cols, len(sample_conv))) for pool_idx in tqdm( range(n_pools), desc=f"Electrode Array {array_idx + 1}/{len(self._muaps)} Surface EMG (GPU)", unit="pools", ): active_neuron_indices = set( self._motor_neuron_pool.active_neuron_indices[pool_idx] ) for row_idx in range(n_rows): for col_idx in range(n_cols): # Process all active MUs on GPU convolutions = cp.array( [ cp.correlate( spike_gpu[pool_idx, mu_idx], muap_gpu[i, row_idx, col_idx], mode="same", ) for i, mu_idx in enumerate( MUs_to_simulate.intersection( active_neuron_indices ) ) ] ) # Sum across MUAPs on GPU if len(convolutions) > 0: surface_emg_gpu[pool_idx, row_idx, col_idx] = cp.sum( convolutions, axis=0 ) # Transfer results back to CPU surface_emg = cp.asnumpy(surface_emg_gpu) else: # Fallback to CPU computation with NumPy for pool_idx in tqdm( range(n_pools), desc=f"Electrode Array {array_idx + 1}/{len(self._muaps)} Surface EMG (CPU)", unit="pools", ): active_neuron_indices = set( self._motor_neuron_pool.active_neuron_indices[pool_idx] ) for row_idx in range(n_rows): for col_idx in range(n_cols): # Process all active MUs convolutions = [] for i, mu_idx in enumerate( MUs_to_simulate.intersection(active_neuron_indices) ): conv = np.correlate( self._motor_neuron_pool.spike_trains[ pool_idx, mu_idx ], muap_shapes[i, row_idx, col_idx], mode="same", ) convolutions.append(conv) if convolutions: surface_emg[pool_idx, row_idx, col_idx] = np.sum( convolutions, axis=0 ) surface_emg_resampled = np.zeros( ( n_pools, n_rows, n_cols, int( surface_emg.shape[-1] / (1 / self._motor_neuron_pool.timestep__ms * 1000) * self._sampling_frequency__Hz ), ) ) for pool_idx in range(n_pools): for row_idx in range(n_rows): for col_idx in range(n_cols): surface_emg_resampled[pool_idx, row_idx, col_idx] = np.interp( np.arange( 0, surface_emg.shape[-1] * (self._motor_neuron_pool.timestep__ms / 1000), 1 / self._sampling_frequency__Hz, ), np.arange( 0, surface_emg.shape[-1] * (self._motor_neuron_pool.timestep__ms / 1000), self._motor_neuron_pool.timestep__ms / 1000, ), surface_emg[pool_idx, row_idx, col_idx], ) emg_results.append(surface_emg_resampled) # Store results privately self._surface_emg__tensors = emg_results return emg_results
[docs] def add_noise( self, snr_db: float, noise_type: str = "gaussian" ) -> list[SURFACE_EMG__TENSOR]: """ Add noise to all electrode arrays. This method adds realistic noise to the simulated surface EMG signals based on a specified signal-to-noise ratio. The noise characteristics can be customized to match different recording conditions. Parameters ---------- snr_db : float Signal-to-noise ratio in dB. Higher values result in cleaner signals. Typical physiological EMG has SNR ranging from 10-40 dB. noise_type : str, default="gaussian" Type of noise to add. Currently supports "gaussian" for white noise. Returns ------- list[SURFACE_EMG__TENSOR] Noisy EMG signals for each electrode array. Results are stored in the `noisy_surface_emg__tensors` property after execution. Raises ------ ValueError If surface EMG has not been simulated. Call simulate_surface_emg() first. """ if self._surface_emg__tensors is None: raise ValueError( "Surface EMG has not been simulated. Call simulate_surface_emg() first." ) noisy_emg_results = [] for _, emg_array in enumerate(self._surface_emg__tensors): # Calculate signal power signal_power = np.mean(emg_array**2) # Calculate noise power snr_linear = 10 ** (snr_db / 10) noise_power = signal_power / snr_linear # Generate noise if noise_type.lower() == "gaussian": noise_std = np.sqrt(noise_power) noise = RANDOM_GENERATOR.normal( loc=0.0, scale=noise_std, size=emg_array.shape ) else: raise ValueError(f"Unsupported noise type: {noise_type}") # Add noise noisy_emg = emg_array + noise noisy_emg_results.append(noisy_emg) # Store results privately self._noisy_surface_emg__tensors = noisy_emg_results return noisy_emg_results
# Property accessors for computed results @property def muaps(self) -> list[SURFACE_MUAP_SHAPE__TENSOR]: """ Motor Unit Action Potential (MUAP) templates for each electrode array. Returns ------- list[SURFACE_MUAP_SHAPE__TENSOR] List of MUAP templates for each electrode array. Raises ------ ValueError If MUAP templates have not been computed yet. """ if self._muaps is None: raise ValueError( "MUAP templates not computed. Call simulate_muaps() first." ) return self._muaps @property def surface_emg__tensors(self) -> list[SURFACE_EMG__TENSOR]: """ Surface EMG signals for each electrode array. Returns ------- list[SURFACE_EMG__TENSOR] Surface EMG signals for each electrode array. Raises ------ ValueError If surface EMG has not been computed yet. """ if self._surface_emg__tensors is None: raise ValueError( "Surface EMG signals not computed. Call simulate_surface_emg() first." ) return self._surface_emg__tensors @property def noisy_surface_emg__tensors(self) -> list[SURFACE_EMG__TENSOR]: """ Noisy surface EMG signals for each electrode array. Returns ------- list[SURFACE_EMG__TENSOR] Noisy surface EMG signals for each electrode array. Raises ------ ValueError If noisy surface EMG has not been computed yet. """ if self._noisy_surface_emg__tensors is None: raise ValueError( "Noisy surface EMG signals not computed. Call add_noise() first." ) return self._noisy_surface_emg__tensors @property def motor_neuron_pool(self) -> MotorNeuronPool: """ Motor neuron pool used for EMG generation. Returns ------- MotorNeuronPool The motor neuron pool used in the simulation. Raises ------ ValueError If motor neuron pool has not been set yet. """ if self._motor_neuron_pool is None: raise ValueError( "Motor neuron pool not set. Call simulate_surface_emg() first." ) return self._motor_neuron_pool