from typing import Optional
from myogen.utils.neo import GridAnalogSignal
try:
import cupy as cp
HAS_CUPY = True
except ImportError:
HAS_CUPY = False
try:
import elephant
import elephant.utils
HAS_ELEPHANT = True
except ImportError:
HAS_ELEPHANT = False
elephant = None # type: ignore
import logging
from copy import deepcopy
import numpy as np
import quantities as pq
from joblib import Parallel, delayed
from neo import Block, Group, Segment
from scipy.signal import resample
from tqdm import tqdm
from myogen import RANDOM_GENERATOR
from myogen.simulator.core.emg.electrodes import SurfaceElectrodeArray
from myogen.simulator.core.emg.surface.simulate_fiber import simulate_fiber_v2
from myogen.simulator.core.muscle import Muscle
from myogen.utils.decorators import beartowertype
from myogen.utils.types import (
Quantity__Hz,
SPIKE_TRAIN__Block,
SURFACE_EMG__Block,
SURFACE_MUAP__Block,
)
[docs]
@beartowertype
class SurfaceEMG:
"""
Surface Electromyography (sEMG) Simulation.
This class provides a simulation framework for generating
surface electromyography signals from the muscle. It implements the
multi-layered cylindrical volume conductor model from Farina et al. 2004 [1]_.
Parameters
----------
muscle_model : Muscle
Pre-computed muscle model (see :class:`myogen.simulator.Muscle`).
electrode_arrays : list[SurfaceElectrodeArray]
List of electrode arrays to use for simulation (see :class:`myogen.simulator.SurfaceElectrodeArray`).
sampling_frequency__Hz : float, default=2048.0
Sampling frequency in Hz. Default is set to 2048 Hz as used by the Quattrocento (OT Bioelettronica, Turin, Italy) system.
sampling_points_in_t_and_z_domains : int, default=256
Spatial and temporal discretization resolution for numerical integration.
Controls the accuracy of the volume conductor calculations but significantly
impacts computational cost (scales quadratically).
Higher values provide better numerical accuracy at the expense of simulation time.
Default is set to 256 samples.
sampling_points_in_theta_domain : int, default=32
Angular discretization for cylindrical coordinate system in degrees.
Higher values provide better spatial resolution but cause numerical overflow in Bessel functions.
Default is set to 32 points to avoid numerical instability.
WARNING: Values >64 cause extreme Bessel function overflow leading to incorrect results.
This is suitable for most EMG studies.
MUs_to_simulate : list[int], optional
Indices of motor units to simulate. If None, all motor units are simulated.
Default is None. For computational efficiency, consider
simulating subsets for initial analysis.
Indices correspond to the recruitment order (0 is recruited first).
internal_sampling_frequency__Hz : Quantity__Hz, optional
Internal sampling frequency for MUAP computation before downsampling.
If None, defaults to 10 kHz. Higher values provide better temporal resolution
but increase computation time. Default is 10 kHz.
iap_kernel_length__mm : float, optional
Physical spatial extent for intracellular action potential (IAP) kernel evaluation in mm.
If None (default), uses individual fiber lengths from muscle model, ensuring
MUAP duration is physiologically accurate and independent of sampling resolution.
**Recommended**: Leave as None to use fiber-specific lengths for most realistic MUAPs.
Alternatively, set to a fixed value (e.g., 80-100 mm) to use the same kernel
extent for all fibers, which can simplify analysis but may be less physiologically
accurate for muscles with variable fiber lengths.
This parameter controls the spatial extent over which the IAP waveform is computed,
directly affecting MUAP duration: duration ≈ iap_kernel_length__mm / (2 * v) ms.
Attributes
----------
muaps__Block : SURFACE_MUAP__Block
Motor Unit Action Potential (MUAP) templates for each electrode array as a neo.Block. Available after simulate_muaps().
surface_emg__Block : SURFACE_EMG__Block
Surface EMG signals for each electrode array as a neo.Block. Available after simulate_surface_emg().
noisy_surface_emg__Block : SURFACE_EMG__Block
Noisy surface EMG signals for each electrode array as a neo.Block. Available after add_noise().
spike_train__Block : SPIKE_TRAIN__Block
Spike train block used for EMG generation signals. Available after simulate_surface_emg().
References
----------
.. [1] Farina, D., Mesin, L., Martina, S., Merletti, R., 2004. A surface EMG generation model with multilayer cylindrical description of the volume conductor. IEEE Transactions on Biomedical Engineering 51, 415–426. https://doi.org/10.1109/TBME.2003.820998
"""
[docs]
def __init__(
self,
muscle_model: Muscle,
electrode_arrays: list[SurfaceElectrodeArray],
sampling_frequency__Hz: Quantity__Hz = 2048.0 * pq.Hz,
sampling_points_in_t_and_z_domains: int = 256,
sampling_points_in_theta_domain: int = 32,
MUs_to_simulate: list[int] | None = None,
internal_sampling_frequency__Hz: Quantity__Hz | None = None,
iap_kernel_length__mm: float | None = None,
):
# Immutable public arguments - never modify these
self.muscle_model = muscle_model
self.electrode_arrays = electrode_arrays
self.sampling_frequency__Hz = sampling_frequency__Hz
self.sampling_points_in_t_and_z_domains = sampling_points_in_t_and_z_domains
self.sampling_points_in_theta_domain = sampling_points_in_theta_domain
self.MUs_to_simulate = MUs_to_simulate
self.iap_kernel_length__mm = iap_kernel_length__mm
# Internal sampling frequency for higher resolution MUAP computation
# If not specified, defaults to 10 kHz for better MUAP resolution
if internal_sampling_frequency__Hz is None:
internal_sampling_frequency__Hz = 10000.0 * pq.Hz
self.internal_sampling_frequency__Hz = internal_sampling_frequency__Hz
# Private copies for internal modifications (extract magnitudes)
self._muscle_model = muscle_model
self._electrode_arrays = electrode_arrays
self._sampling_frequency__Hz = float(sampling_frequency__Hz.rescale(pq.Hz).magnitude)
self._internal_sampling_frequency__Hz = float(
internal_sampling_frequency__Hz.rescale(pq.Hz).magnitude
)
# Calculate upsampling factor and internal sample count
self._upsampling_factor = (
self._internal_sampling_frequency__Hz / self._sampling_frequency__Hz
)
self._internal_sampling_points = int(
np.round(sampling_points_in_t_and_z_domains * self._upsampling_factor)
)
self._sampling_points_in_t_and_z_domains = sampling_points_in_t_and_z_domains
self._sampling_points_in_theta_domain = sampling_points_in_theta_domain
self._MUs_to_simulate = MUs_to_simulate
self._iap_kernel_length__mm = iap_kernel_length__mm
# Derived properties from muscle model - immutable public access
self.mean_conduction_velocity__m_s = self._muscle_model.mean_conduction_velocity__m_s
self.mean_fiber_length__mm = self._muscle_model.mean_fiber_length__mm
self.var_fiber_length__mm = self._muscle_model.var_fiber_length__mm
self.radius_bone__mm = self._muscle_model.radius_bone__mm
self.fat_thickness__mm = self._muscle_model.fat_thickness__mm
self.skin_thickness__mm = self._muscle_model.skin_thickness__mm
self.muscle_conductivity_radial__S_m = self._muscle_model.muscle_conductivity_radial__S_m
self.muscle_conductivity_longitudinal__S_m = (
self._muscle_model.muscle_conductivity_longitudinal__S_m
)
self.fat_conductivity__S_m = self._muscle_model.fat_conductivity__S_m
self.skin_conductivity__S_m = self._muscle_model.skin_conductivity__S_m
# Private copies for internal modifications (extract magnitudes if Quantity objects)
def _extract_value(val):
"""Helper to extract magnitude from Quantity or return float directly."""
if hasattr(val, "magnitude"):
return float(val.magnitude)
return float(val)
self._mean_conduction_velocity__m_s = _extract_value(
self._muscle_model.mean_conduction_velocity__m_s
)
self._mean_fiber_length__mm = _extract_value(self._muscle_model.mean_fiber_length__mm)
self._var_fiber_length__mm = _extract_value(self._muscle_model.var_fiber_length__mm)
self._radius_bone__mm = _extract_value(self._muscle_model.radius_bone__mm)
self._fat_thickness__mm = _extract_value(self._muscle_model.fat_thickness__mm)
self._skin_thickness__mm = _extract_value(self._muscle_model.skin_thickness__mm)
self._muscle_conductivity_radial__S_m = _extract_value(
self._muscle_model.muscle_conductivity_radial__S_m
)
self._muscle_conductivity_longitudinal__S_m = _extract_value(
self._muscle_model.muscle_conductivity_longitudinal__S_m
)
self._fat_conductivity__S_m = _extract_value(self._muscle_model.fat_conductivity__S_m)
self._skin_conductivity__S_m = _extract_value(self._muscle_model.skin_conductivity__S_m)
# Calculate total radius - immutable and private
self._radius_muscle__mm = _extract_value(self._muscle_model.radius__mm)
self.radius_total = (
self._radius_muscle__mm + self._fat_thickness__mm + self._skin_thickness__mm
)
self._radius_total = self.radius_total
# Simulation results - stored privately, accessed via properties
self._muaps__Block: Optional[SURFACE_MUAP__Block] = None
self._surface_emg__Block: Optional[SURFACE_EMG__Block] = None
self._noisy_surface_emg__Block: Optional[SURFACE_EMG__Block] = None
self._spike_train__Block: Optional[SPIKE_TRAIN__Block] = None
[docs]
def simulate_muaps(self, n_jobs: int = -2) -> SURFACE_MUAP__Block:
"""
Simulate MUAPs for all electrode arrays using the provided muscle model.
This method generates Motor Unit Action Potential (MUAP) templates that represent
the electrical signature of each motor unit as recorded by the surface electrodes.
The simulation uses the multi-layered cylindrical volume conductor model with
parallel processing for improved performance.
Parameters
----------
n_jobs : int, optional
Number of parallel workers for motor unit processing. Default is -2.
- n_jobs=-1: Use all CPU cores
- n_jobs=-2: Use all cores except one (recommended, keeps system responsive)
- n_jobs=-3: Use all cores except two
- n_jobs=1: No parallelization
- n_jobs=N: Use exactly N cores
Returns
-------
SURFACE_MUAP__Block
neo.Block of generated MUAP templates for each electrode array.
Results are stored in the `muaps` property after execution.
Notes
-----
This method must be called before simulate_surface_emg(). The generated MUAP
templates are used as basis functions for EMG signal synthesis.
The motor units are processed in parallel using joblib, with each motor unit's
fibers processed sequentially to maintain optimization efficiency.
"""
# Set default MUs to simulate
if self._MUs_to_simulate is None:
self._MUs_to_simulate = list(
range(len(self._muscle_model.resulting_number_of_innervated_fibers))
)
# Calculate innervation zone variance
innervation_zone_variance = (
self._mean_fiber_length__mm * 0.1
) # 10% of the mean fiber length (see Botelho et al. 2019 [6]_)
# Extract fiber counts
number_of_fibers_per_MUs = self._muscle_model.resulting_number_of_innervated_fibers
# Create time array at INTERNAL sampling frequency for higher resolution
t_internal = np.linspace(
0,
(self._internal_sampling_points - 1) / self._internal_sampling_frequency__Hz * 1e-3,
self._internal_sampling_points,
)
# Get total number of motor units
n_motor_units = len(number_of_fibers_per_MUs)
# Pre-calculate innervation zones for all MUs
innervation_zones = RANDOM_GENERATOR.uniform(
low=-innervation_zone_variance / 2,
high=innervation_zone_variance / 2,
size=n_motor_units,
)
# Pre-allocate result shape at INTERNAL resolution (optimization: avoid repeated shape calculations)
# Will be downsampled to output resolution after simulation
internal_result_shape = (
self._electrode_arrays[0].num_rows,
self._electrode_arrays[0].num_cols,
len(t_internal),
)
# Final output shape after downsampling
output_result_shape = (
self._electrode_arrays[0].num_rows,
self._electrode_arrays[0].num_cols,
self._sampling_points_in_t_and_z_domains,
)
# Helper function to process a single motor unit
def _process_single_mu(
MU_index: int,
electrode_array_original: SurfaceElectrodeArray,
) -> tuple[np.ndarray, str]:
"""
Process a single motor unit (all its fibers) in parallel.
Parameters
----------
MU_index : int
Index of the motor unit to process.
electrode_array_original : SurfaceElectrodeArray
Original electrode array (will be deep-copied to avoid threading issues).
Returns
-------
tuple[np.ndarray, str]
Tuple of (array_result, segment_name) where array_result is the accumulated
MUAP signal for this MU and segment_name is the name for the segment.
"""
try:
# Deep copy electrode array to avoid threading issues
electrode_array = deepcopy(electrode_array_original)
# Pre-allocated result array at INTERNAL resolution (optimization: use pre-computed shape)
array_result_internal = np.zeros(internal_result_shape, dtype=np.float64)
number_of_fibers = number_of_fibers_per_MUs[MU_index]
if number_of_fibers == 0:
# Return empty signal (downsampled to output resolution)
array_result_downsampled = np.zeros(output_result_shape, dtype=np.float64)
return array_result_downsampled, f"MUAP_{MU_index}"
# Get fiber positions
position_of_fibers_raw = self._muscle_model.resulting_fiber_assignment(MU_index)
# Extract magnitude if Quantity, otherwise use as-is
if hasattr(position_of_fibers_raw, "magnitude"):
position_of_fibers = position_of_fibers_raw.magnitude
else:
position_of_fibers = position_of_fibers_raw
innervation_zone = innervation_zones[MU_index]
# Batch generate random fiber lengths (optimization: single RNG call)
fiber_length_variations = RANDOM_GENERATOR.uniform(
low=-self._var_fiber_length__mm,
high=self._var_fiber_length__mm,
size=number_of_fibers,
)
# Pre-compute geometric values for all fibers (optimization: vectorized)
R_values = np.sqrt(position_of_fibers[:, 0] ** 2 + position_of_fibers[:, 1] ** 2)
theta_values = np.arctan2(position_of_fibers[:, 1], position_of_fibers[:, 0])
fiber_lengths = self._mean_fiber_length__mm + fiber_length_variations
# Matrix optimization variables (local to this MU)
A_matrix = None
B_incomplete = None
# Process each fiber (inner loop - must remain sequential)
for fiber_number in range(number_of_fibers):
# Use pre-computed values (optimization: vectorized calculations)
R = R_values[fiber_number]
theta = theta_values[fiber_number]
fiber_length__mm = fiber_lengths[fiber_number]
electrode_array._center_point__mm_deg = (
electrode_array._center_point__mm_deg[0],
electrode_array._center_point__mm_deg[1] - np.rad2deg(theta),
)
electrode_array._create_electrode_grid()
# Calculate fiber end positions
L1 = abs(innervation_zone + fiber_length__mm / 2)
L2 = abs(innervation_zone - fiber_length__mm / 2)
# Determine IAP kernel length: use fixed value or scaled mean fiber length
# IMPORTANT: Use scaled mean, not individual fiber_length__mm, to avoid boundary artifacts
if self._iap_kernel_length__mm is not None:
kernel_length = self._iap_kernel_length__mm
else:
# Use scaled mean fiber length (2.5×) for all fibers to prevent boundary truncation
IAP_SCALE_FACTOR = 2.5
kernel_length = self._mean_fiber_length__mm * IAP_SCALE_FACTOR
# Use the new simulate_fiber_v2 function with INTERNAL sampling frequency
phi_temp, A_matrix, B_incomplete = simulate_fiber_v2(
Fs=self._internal_sampling_frequency__Hz * 1e-3,
v=self._mean_conduction_velocity__m_s,
N=self._internal_sampling_points,
M=self._sampling_points_in_theta_domain,
r=self._radius_total,
r_bone=self._radius_bone__mm,
th_fat=self._fat_thickness__mm,
th_skin=self._skin_thickness__mm,
R=R,
L1=L1,
L2=L2,
zi=innervation_zone,
electrode_array=electrode_array,
sig_muscle_rho=self._muscle_conductivity_radial__S_m,
sig_muscle_z=self._muscle_conductivity_longitudinal__S_m,
sig_skin=self._skin_conductivity__S_m,
sig_fat=self._fat_conductivity__S_m,
A_matrix=None if fiber_number == 0 else A_matrix,
B_incomplete=None if fiber_number == 0 else B_incomplete,
fiber_length__mm=kernel_length, # NEW: Use fiber-specific or fixed IAP kernel length
)
array_result_internal += phi_temp
# Downsample from internal resolution to output resolution
# resample operates on the last axis (time), which is axis=2
array_result_downsampled = resample(
array_result_internal, self._sampling_points_in_t_and_z_domains, axis=2
)
return array_result_downsampled, f"MUAP_{MU_index}"
except Exception as e:
# Log error and return empty result to avoid crashing entire parallel job
logging.error(
f"Failed to process MU {MU_index} for electrode array {array_idx}: {e}"
)
# Return empty signal with error marker at output resolution
empty_result = np.zeros(output_result_shape, dtype=np.float64)
return empty_result, f"MUAP_{MU_index}_FAILED"
block = Block()
for array_idx, electrode_array in enumerate(self._electrode_arrays):
group = Group(name=f"ElectrodeArray_{array_idx}")
block.groups.append(group)
# Process only specified motor units in parallel
n_mus_to_compute = len(self._MUs_to_simulate)
logging.info(
f"Processing {n_mus_to_compute}/{n_motor_units} motor units for electrode array {array_idx + 1}/{len(self._electrode_arrays)} using parallel processing..."
)
# Parallel execution of motor units with tqdm progress bar
# batch_size="auto" optimizes task distribution across workers
# Only compute MUs in the subset
results = {} # Use dict to map MU_index -> result
with tqdm(
total=n_mus_to_compute,
desc=f"Electrode Array {array_idx + 1}/{len(self._electrode_arrays)}",
) as pbar:
for array_result, segment_name in Parallel(
n_jobs=n_jobs,
return_as="generator",
verbose=0,
batch_size="auto",
)(
delayed(_process_single_mu)(MU_index, electrode_array)
for MU_index in self._MUs_to_simulate
):
# Extract MU index from segment name "MUAP_{MU_index}"
mu_idx = int(segment_name.split("_")[1].split("_")[0])
results[mu_idx] = (array_result, segment_name)
pbar.update(1)
# Calculate actual MUAP duration based on fiber lengths
# Use iap_kernel_length__mm if specified, otherwise use scaled fiber length from muscle model
if self._iap_kernel_length__mm is not None:
kernel_length_mm = self._iap_kernel_length__mm
else:
# Scale fiber length to avoid boundary truncation of IAP kernel
# The IAP kernel needs ~2.5x the fiber length to fully develop and decay
# This prevents edge artifacts while maintaining proportionality to fiber length
IAP_SCALE_FACTOR = 2.5
kernel_length_mm = self._mean_fiber_length__mm * IAP_SCALE_FACTOR
# Physical duration based on IAP kernel length (after /=2 scaling in simulate_fiber)
# Duration = (kernel_length_mm / 2) / velocity
muap_duration__s = (
kernel_length_mm / 2.0
) / self._mean_conduction_velocity__m_s / 1000.0 # Convert ms to s
# Create custom time array for this duration
times__s = np.linspace(0, muap_duration__s, self._sampling_points_in_t_and_z_domains)
# Calculate effective sampling rate for these times
effective_sampling_rate__Hz = (
self._sampling_points_in_t_and_z_domains - 1
) / muap_duration__s
use_custom_times = True
# Create segments for ALL MUs (maintaining index order)
# Non-computed MUs get empty signals at output resolution
for MU_index in range(n_motor_units):
if MU_index in results:
array_result, segment_name = results[MU_index]
else:
# Create empty MUAP for non-computed MUs at output resolution
array_result = np.zeros(output_result_shape, dtype=np.float64)
segment_name = f"MUAP_{MU_index}"
segment = Segment(name=segment_name)
group.segments.append(segment)
# Use actual physical duration based on fiber lengths
segment.analogsignals.append(
GridAnalogSignal(
signal=np.transpose(array_result, (2, 0, 1)) * pq.mV,
t_start=0 * pq.s,
times=times__s * pq.s,
sampling_rate=effective_sampling_rate__Hz * pq.Hz,
)
)
# Store results privately
self._muaps__Block = block
return block
[docs]
def simulate_surface_emg(self, spike_train__Block: SPIKE_TRAIN__Block) -> SURFACE_EMG__Block:
"""
Generate surface EMG signals for all electrode arrays using the provided spike train block.
This method convolves the pre-computed MUAP templates with the spike trains
to synthesize realistic surface EMG signals. The process includes temporal resampling
to match the spike train timestep and supports both CPU and GPU acceleration.
Parameters
----------
spike_train__Block : SPIKE_TRAIN__Block
Block containing spike trains organized as segments (pools) with spiketrains.
Returns
-------
SURFACE_EMG__Block
Surface EMG signals for each electrode array stored in a neo.Block.
Results are stored in the `surface_emg__tensors` property after execution.
Raises
------
ValueError
If MUAP templates have not been generated. Call simulate_muaps() first.
"""
if self._muaps__Block is None:
raise ValueError("MUAP templates have not been generated. Call simulate_muaps() first.")
if not HAS_ELEPHANT:
raise ImportError(
"Elephant is required for surface EMG simulation. "
"Install with: pip install myogen[elephant]"
)
# Store spike train data privately
self._spike_train__Block = spike_train__Block
# Extract timestep from the first spike train
muap_timestep__ms = float((1 / self._sampling_frequency__Hz) * 1000) * pq.ms
# Convert spike train block to numpy arrays
n_pools = len(spike_train__Block.segments)
n_neurons = len(spike_train__Block.segments[0].spiketrains)
# Extract spike train durations to determine time length
first_spiketrain = spike_train__Block.segments[0].spiketrains[0]
spiketrain_timestep__ms = first_spiketrain.sampling_period.rescale("ms")
# Convert spike trains to binary arrays using Elephant, suppressing rounding error logging
elephant_utils_logger = logging.getLogger(elephant.utils.__file__)
original_level = elephant_utils_logger.level
elephant_utils_logger.setLevel(logging.ERROR)
try:
spike_trains = np.array(
[
elephant.conversion.BinnedSpikeTrain(
segment.spiketrains, bin_size=spiketrain_timestep__ms
)
.to_array()
.astype(bool)
for segment in spike_train__Block.segments
]
)
finally:
elephant_utils_logger.setLevel(original_level)
# Handle MUs to simulate
if self._MUs_to_simulate is None:
MUs_to_simulate = set(range(n_neurons))
else:
MUs_to_simulate = set(self._MUs_to_simulate)
# Create active neuron indices (all neurons are active in each pool for spike train block)
active_neuron_indices = [list(range(n_neurons)) for _ in range(n_pools)]
block = Block()
muap_data_list = [
np.array([seg.analogsignals[0].magnitude for seg in group.segments])
for group in self._muaps__Block.groups
]
for array_idx, muap_array in enumerate(muap_data_list):
emg_group = Group(name=f"ElectrodeArray_{array_idx}")
block.groups.append(emg_group)
muap_array = np.transpose(muap_array, (0, 2, 3, 1))
# Temporal resampling
new_muap_time_length = max(
1,
np.round(
muap_array.shape[3]
/ self._sampling_frequency__Hz
* (1 / spiketrain_timestep__ms.rescale("s"))
).astype(int),
)
muap_shapes = np.zeros(
(
muap_array.shape[0],
muap_array.shape[1],
muap_array.shape[2],
new_muap_time_length,
)
)
for muap_nr in range(muap_shapes.shape[0]):
for row in range(muap_shapes.shape[1]):
for col in range(muap_shapes.shape[2]):
muap_shapes[muap_nr, row, col] = np.interp(
x=np.arange(
start=0,
stop=muap_array.shape[-1] / self._sampling_frequency__Hz,
step=spiketrain_timestep__ms.rescale(pq.s).magnitude,
),
xp=np.arange(
start=0,
stop=muap_array.shape[-1] / self._sampling_frequency__Hz,
step=muap_timestep__ms.rescale(pq.s).magnitude,
),
fp=muap_array[muap_nr, row, col],
)
# n_pools already defined above from spike_train_block
n_rows = muap_shapes.shape[1]
n_cols = muap_shapes.shape[2]
# Initialize result array
sample_conv = np.convolve(spike_trains[0, 0], muap_shapes[0, 0, 0], mode="same")
surface_emg = np.zeros((n_pools, n_rows, n_cols, len(sample_conv)))
# No normalization needed - MUAPs are in absolute units (mV) from biophysical model
# Perform convolution for each pool using GPU acceleration if available
if HAS_CUPY:
# Use GPU acceleration with CuPy
spike_gpu = cp.asarray(spike_trains)
muap_gpu = cp.asarray(muap_shapes)
surface_emg_gpu = cp.zeros((n_pools, n_rows, n_cols, len(sample_conv)))
for pool_idx in tqdm(
range(n_pools),
desc=f"Electrode Array {array_idx + 1}/{len(self._muaps__Block.groups)} Surface EMG (GPU)",
unit="pools",
):
pool_active_neurons = set(active_neuron_indices[pool_idx])
for row_idx in range(n_rows):
for col_idx in range(n_cols):
# Process all active MUs on GPU
convolutions = cp.array(
[
cp.correlate(
spike_gpu[pool_idx, mu_idx],
muap_gpu[mu_idx, row_idx, col_idx],
mode="same",
)
for mu_idx in MUs_to_simulate.intersection(pool_active_neurons)
]
)
# Sum across MUAPs on GPU
if len(convolutions) > 0:
surface_emg_gpu[pool_idx, row_idx, col_idx] = cp.sum(
convolutions, axis=0
)
# Transfer results back to CPU
surface_emg = cp.asnumpy(surface_emg_gpu)
else:
# Fallback to CPU computation with NumPy
for pool_idx in tqdm(
range(n_pools),
desc=f"Electrode Array {array_idx + 1}/{len(self._muaps__Block.groups)} Surface EMG (CPU)",
unit="pools",
):
pool_active_neurons = set(active_neuron_indices[pool_idx])
for row_idx in range(n_rows):
for col_idx in range(n_cols):
# Process all active MUs
convolutions = []
for mu_idx in MUs_to_simulate.intersection(pool_active_neurons):
conv = np.correlate(
spike_trains[pool_idx, mu_idx],
muap_shapes[mu_idx, row_idx, col_idx],
mode="same",
)
convolutions.append(conv)
if convolutions:
surface_emg[pool_idx, row_idx, col_idx] = np.sum(
convolutions, axis=0
)
# Temporal resampling
surface_emg_resampled = np.zeros(
(
n_pools,
n_rows,
n_cols,
int(
surface_emg.shape[-1]
* spiketrain_timestep__ms.rescale(pq.s).magnitude
* self._sampling_frequency__Hz
),
)
)
for pool_idx in range(n_pools):
for row_idx in range(n_rows):
for col_idx in range(n_cols):
surface_emg_resampled[pool_idx, row_idx, col_idx] = np.interp(
x=np.arange(
start=0,
stop=surface_emg.shape[-1]
* spiketrain_timestep__ms.rescale(pq.s).magnitude,
step=1 / self._sampling_frequency__Hz,
),
xp=np.arange(
start=0,
stop=surface_emg.shape[-1]
* spiketrain_timestep__ms.rescale(pq.s).magnitude,
step=spiketrain_timestep__ms.rescale(pq.s).magnitude,
),
fp=surface_emg[pool_idx, row_idx, col_idx],
)
# Create segments for each motor unit pool within this electrode array group
for pool_idx in range(n_pools):
segment = Segment(name=f"Pool_{pool_idx}")
emg_group.segments.append(segment)
# Create GridAnalogSignal for this pool's EMG data
segment.analogsignals.append(
GridAnalogSignal(
signal=np.transpose(surface_emg_resampled[pool_idx], (2, 0, 1)) * pq.mV,
sampling_rate=self._sampling_frequency__Hz * pq.Hz,
)
)
# Store results privately
self._surface_emg__Block = block
return block
[docs]
def add_noise(self, snr__dB: float, noise_type: str = "gaussian") -> SURFACE_EMG__Block:
"""
Add noise to all electrode arrays.
This method adds realistic noise to the simulated surface EMG signals
based on a specified signal-to-noise ratio. The noise is calculated
and applied independently for each electrode channel to ensure that
channels with different signal amplitudes maintain the specified SNR.
Parameters
----------
snr__dB : float
Signal-to-noise ratio in dB. Higher values result in cleaner signals.
Typical physiological EMG has SNR ranging from 10-40 dB.
The SNR is applied independently to each electrode channel.
noise_type : str, default="gaussian"
Type of noise to add. Currently supports "gaussian" for white noise.
Returns
-------
SURFACE_EMG__Block
Noisy EMG signals for each electrode array as a neo.Block.
Results are stored in the `noisy_surface_emg__Block` property after execution.
Raises
------
ValueError
If surface EMG has not been simulated. Call simulate_surface_emg() first.
Notes
-----
The noise is computed per-channel (per electrode) to maintain the specified
SNR independently across all channels. This ensures that electrodes with
different signal amplitudes receive appropriately scaled noise.
"""
if self._surface_emg__Block is None:
raise ValueError(
"Surface EMG has not been simulated. Call simulate_surface_emg() first."
)
noisy_block = Block()
for array_idx, emg_group in enumerate(self._surface_emg__Block.groups):
noisy_group = Group(name=f"ElectrodeArray_{array_idx}")
noisy_block.groups.append(noisy_group)
for pool_idx, segment in enumerate(emg_group.segments):
noisy_segment = Segment(name=f"Pool_{pool_idx}")
noisy_group.segments.append(noisy_segment)
# Get the EMG signal data
emg_signal = segment.analogsignals[0]
emg_array = emg_signal.magnitude # Shape: (time, rows, cols)
# Calculate signal power PER CHANNEL (per electrode)
# Mean along time axis (axis=0) gives power per spatial location
signal_power_per_channel = np.mean(emg_array**2, axis=0) # Shape: (rows, cols)
# Calculate noise power per channel
snr_linear = 10 ** (snr__dB / 10)
noise_power_per_channel = signal_power_per_channel / snr_linear
noise_std_per_channel = np.sqrt(noise_power_per_channel) # Shape: (rows, cols)
# Generate noise
if noise_type.lower() == "gaussian":
# Generate standard normal noise, then scale per channel
noise = RANDOM_GENERATOR.normal(loc=0.0, scale=1.0, size=emg_array.shape)
# Broadcast noise_std_per_channel along time axis
# noise shape: (time, rows, cols)
# noise_std_per_channel shape: (rows, cols)
# Broadcasting: (time, rows, cols) * (1, rows, cols)
noise = noise * noise_std_per_channel[np.newaxis, :, :]
else:
raise ValueError(f"Unsupported noise type: {noise_type}")
# Add noise
noisy_emg = emg_array + noise
# Create new GridAnalogSignal with noise
noisy_segment.analogsignals.append(
GridAnalogSignal(
signal=noisy_emg * emg_signal.units,
t_start=emg_signal.t_start,
sampling_rate=emg_signal.sampling_rate,
)
)
# Store results privately
self._noisy_surface_emg__Block = noisy_block
return noisy_block
# Property accessors for computed results
@property
def muaps__Block(self) -> SURFACE_MUAP__Block:
"""
Motor Unit Action Potential (MUAP) templates for each electrode array.
Returns
-------
list[SURFACE_MUAP_SHAPE__TENSOR]
List of MUAP templates for each electrode array.
Raises
------
ValueError
If MUAP templates have not been computed yet.
"""
if self._muaps__Block is None:
raise ValueError("MUAP templates not computed. Call simulate_muaps() first.")
return self._muaps__Block
@property
def surface_emg__Block(self) -> SURFACE_EMG__Block:
"""
Surface EMG signals for each electrode array stored in a neo.Block.
Returns
-------
SURFACE_EMG__Block
Surface EMG signals for each electrode array stored in a neo.Block.
Raises
------
ValueError
If surface EMG has not been computed yet.
"""
if self._surface_emg__Block is None:
raise ValueError("Surface EMG signals not computed. Call simulate_surface_emg() first.")
return self._surface_emg__Block
@property
def noisy_surface_emg__Block(self) -> SURFACE_EMG__Block:
"""
Noisy surface EMG signals for each electrode array.
Returns
-------
SURFACE_EMG__Block
Noisy surface EMG signals for each electrode array.
Raises
------
ValueError
If noisy surface EMG has not been computed yet.
"""
if self._noisy_surface_emg__Block is None:
raise ValueError("Noisy surface EMG signals not computed. Call add_noise() first.")
return self._noisy_surface_emg__Block
@property
def spike_train__Block(self) -> SPIKE_TRAIN__Block:
"""
Spike train block used for EMG generation.
Returns
-------
SPIKE_TRAIN__Block
The spike train block used in the simulation.
Raises
------
ValueError
If spike train block has not been set yet.
"""
if self._spike_train__Block is None:
raise ValueError("Spike train block not set. Call simulate_surface_emg() first.")
return self._spike_train__Block