Extracting Data from Neo Blocks#

This example demonstrates how to extract and work with data from the different Neo Block types used in MyoGen:

Each section shows practical code for accessing and analyzing the data.

Import Libraries#

from pathlib import Path

import elephant.statistics
import joblib
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

from myogen.utils.types import (
    INTRAMUSCULAR_EMG__Block,
    INTRAMUSCULAR_MUAP__Block,
    SPIKE_TRAIN__Block,
    SURFACE_EMG__Block,
    SURFACE_MUAP__Block,
)

plt.style.use("fivethirtyeight")
sns.set_palette("husl")

Setup Paths#

save_path = Path("./results")

# Check which files exist
files_to_check = {
    "spike_trains": save_path / "trapezoid_dd_spike_trains.pkl",
    "surface_muaps": save_path / "surface_muaps.pkl",
    "surface_emg": save_path / "surface_emg_signals.pkl",
    "intramuscular_muaps": save_path / "intramuscular_muaps.pkl",
    "intramuscular_emg": save_path / "intramuscular_emg.pkl",
}

available_files = {name: path for name, path in files_to_check.items() if path.exists()}

print("Available data files:")
for name, path in available_files.items():
    print(f"  ✓ {name}: {path.name}")

print("\nMissing data files (run respective examples first):")
for name, path in files_to_check.items():
    if name not in available_files:
        print(f"  ✗ {name}: {path.name}")
Available data files:
  ✓ spike_trains: trapezoid_dd_spike_trains.pkl
  ✓ surface_emg: surface_emg_signals.pkl

Missing data files (run respective examples first):
  ✗ surface_muaps: surface_muaps.pkl
  ✗ intramuscular_muaps: intramuscular_muaps.pkl
  ✗ intramuscular_emg: intramuscular_emg.pkl

Example 1: Extracting Spike Train Data#

if "spike_trains" in available_files:
    print("\n" + "=" * 70)
    print("EXAMPLE 1: SPIKE_TRAIN__Block")
    print("=" * 70)

    # Load spike train block
    spike_train__Block: SPIKE_TRAIN__Block = joblib.load(available_files["spike_trains"])

    print(f"\nBlock name: {spike_train__Block.name}")
    print(f"Number of segments (motor pools): {len(spike_train__Block.segments)}")

    # Access first motor pool
    motor_pool = spike_train__Block.segments[0]
    print(f"\nMotor pool: {motor_pool.name}")
    print(f"Number of neurons: {len(motor_pool.spiketrains)}")

    # Analyze individual neuron
    neuron_idx = 0
    spiketrain = motor_pool.spiketrains[neuron_idx]
    print(f"\nNeuron {neuron_idx}:")
    print(f"\tName: {spiketrain.name}")
    print(f"  Number of spikes: {len(spiketrain)}")
    print(f"  Sampling rate: {spiketrain.sampling_rate}")
    print(f"  Duration: {spiketrain.t_stop}")

    if len(spiketrain) > 0:
        spike_times__s = spiketrain.magnitude
        print(f"  First spike: {spike_times__s[0]:.4f} s")
        print(f"  Last spike: {spike_times__s[-1]:.4f} s")

        # Calculate firing rate
        firing_rate = elephant.statistics.mean_firing_rate(spiketrain)
        print(f"  Mean firing rate: {firing_rate:.2f}")

    # Calculate statistics across all neurons
    firing_rates = []
    active_neurons = 0

    for st in motor_pool.spiketrains:
        if len(st) > 0:
            rate = elephant.statistics.mean_firing_rate(st)
            firing_rates.append(rate.magnitude)
            active_neurons += 1

    if firing_rates:
        print("\nMotor pool statistics:")
        print(f"  Active neurons: {active_neurons}/{len(motor_pool.spiketrains)}")
        print(f"  Mean firing rate: {np.mean(firing_rates):.2f} Hz")
        print(f"  Std firing rate: {np.std(firing_rates):.2f} Hz")
        print(f"  Range: {np.min(firing_rates):.2f} - {np.max(firing_rates):.2f} Hz")

    # Plot raster plot
    fig, ax = plt.subplots(figsize=(12, 6))

    for neuron_idx, st in enumerate(motor_pool.spiketrains[:20]):  # First 20 neurons
        if len(st) > 0:
            spike_times = st.magnitude
            ax.scatter(spike_times, [neuron_idx] * len(spike_times), s=1, c="black")

    ax.set_xlabel("Time (s)")
    ax.set_ylabel("Neuron Index")
    ax.set_title("Spike Raster Plot (First 20 Neurons)")
    ax.set_ylim(-0.5, 19.5)
    plt.tight_layout()
    plt.savefig(save_path / "spike_raster.png", dpi=150)
    print(f"\nSaved raster plot to: {save_path / 'spike_raster.png'}")
Spike Raster Plot (First 20 Neurons)
======================================================================
EXAMPLE 1: SPIKE_TRAIN__Block
======================================================================

Block name: Trapezoidal DD Spike Trains
Number of segments (motor pools): 1

Motor pool: Motor Neurons
Number of neurons: 100

Neuron 0:
        Name: MN_0
  Number of spikes: 214
  Sampling rate: 10000.0 Hz
  Duration: 15.0 s
  First spike: 1.2382 s
  Last spike: 11.7640 s
  Mean firing rate: 14.27 1/s

Motor pool statistics:
  Active neurons: 76/100
  Mean firing rate: 12.04 Hz
  Std firing rate: 3.08 Hz
  Range: 0.13 - 16.53 Hz

Saved raster plot to: results/spike_raster.png

Example 2: Extracting Surface MUAP Data#

if "surface_muaps" in available_files:
    print("\n" + "=" * 70)
    print("EXAMPLE 2: SURFACE_MUAP__Block")
    print("=" * 70)

    # Load MUAP block
    muaps__Block: SURFACE_MUAP__Block = joblib.load(available_files["surface_muaps"])

    print(f"\nNumber of electrode arrays: {len(muaps__Block.groups)}")

    # Access first electrode array
    electrode_array = muaps__Block.groups[0]
    print(f"\nElectrode array: {electrode_array.name}")
    print(f"Number of MUAPs: {len(electrode_array.segments)}")

    # Access a specific MUAP
    muap_idx = min(5, len(electrode_array.segments) - 1)
    muap_segment = electrode_array.segments[muap_idx]
    muap_signal = muap_segment.analogsignals[0]

    print(f"\nMUAP {muap_idx} ({muap_segment.name}):")
    print(f"  Shape: {muap_signal.shape}")
    print(f"  Time samples: {muap_signal.shape[0]}")
    print(f"  Electrode grid: {muap_signal.shape[1]} rows × {muap_signal.shape[2]} columns")
    print(f"  Sampling rate: {muap_signal.sampling_rate}")
    print(f"  Duration: {muap_signal.t_stop}")
    print(f"  Units: {muap_signal.units}")

    # Extract MUAP from center electrode
    center_row = muap_signal.shape[1] // 2
    center_col = muap_signal.shape[2] // 2
    center_muap = muap_signal[:, center_row, center_col]

    print(f"\nMUAP at center electrode ({center_row}, {center_col}):")
    print(f"  Peak-to-peak: {np.ptp(center_muap.magnitude):.3f} {center_muap.units}")
    print(f"  Max amplitude: {np.max(np.abs(center_muap.magnitude)):.3f} {center_muap.units}")

    # Find peak time and create spatial map
    peak_time_idx = np.argmax(np.abs(muap_signal.magnitude))
    peak_time_idx = np.unravel_index(peak_time_idx, muap_signal.shape)[0]
    spatial_map = muap_signal[peak_time_idx, :, :].magnitude

    # Plot MUAP waveform and spatial distribution
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

    # Waveform at center electrode
    time__s = muap_signal.times.magnitude
    ax1.plot(time__s * 1000, center_muap.magnitude, linewidth=2)  # Convert to ms
    ax1.set_xlabel("Time (ms)")
    ax1.set_ylabel(f"Amplitude ({muap_signal.units})")
    ax1.set_title(f"MUAP {muap_idx} at Center Electrode")
    ax1.grid(True, alpha=0.3)

    # Spatial distribution at peak time
    im = ax2.imshow(spatial_map, cmap="seismic", aspect="auto")
    ax2.set_xlabel("Electrode Column")
    ax2.set_ylabel("Electrode Row")
    ax2.set_title(f"Spatial Distribution at t={time__s[peak_time_idx] * 1000:.1f}ms")
    plt.colorbar(im, ax=ax2, label=f"Amplitude ({muap_signal.units})")

    plt.tight_layout()
    plt.savefig(save_path / "surface_muap_analysis.png", dpi=150)
    print(f"\nSaved MUAP analysis to: {save_path / 'surface_muap_analysis.png'}")

Example 3: Extracting Surface EMG Data#

if "surface_emg" in available_files:
    print("\n" + "=" * 70)
    print("EXAMPLE 3: SURFACE_EMG__Block")
    print("=" * 70)

    # Load surface EMG block
    surface_emg__Block: SURFACE_EMG__Block = joblib.load(available_files["surface_emg"])

    print(f"\nNumber of electrode arrays: {len(surface_emg__Block.groups)}")

    # Access first electrode array
    electrode_array = surface_emg__Block.groups[0]
    print(f"\nElectrode array: {electrode_array.name}")
    print(f"Number of motor pool segments: {len(electrode_array.segments)}")

    # Access first motor pool
    motor_pool = electrode_array.segments[0]
    emg_signal = motor_pool.analogsignals[0]

    print(f"\nMotor pool: {motor_pool.name}")
    print(f"  EMG shape: {emg_signal.shape}")
    print(f"  Time samples: {emg_signal.shape[0]}")
    print(f"  Electrode grid: {emg_signal.shape[1]} rows × {emg_signal.shape[2]} columns")
    print(f"  Sampling rate: {emg_signal.sampling_rate}")
    print(f"  Duration: {emg_signal.t_stop}")

    # Extract EMG from specific electrode
    row, col = 2, 2
    electrode_emg = emg_signal[:, row, col]
    time__s = emg_signal.times.magnitude

    print(f"\nEMG at electrode ({row}, {col}):")
    print(
        f"  RMS amplitude: {np.sqrt(np.mean(electrode_emg.magnitude**2)):.3f} {electrode_emg.units}"
    )
    print(f"  Peak-to-peak: {np.ptp(electrode_emg.magnitude):.3f} {electrode_emg.units}")

    # Calculate RMS envelope with moving window
    window_size = int(0.1 * emg_signal.sampling_rate.magnitude)  # 100ms window
    emg_magnitude = np.squeeze(electrode_emg.magnitude)  # Ensure 1D array
    rms_envelope = np.array(
        [
            np.sqrt(
                np.mean(emg_magnitude[max(0, i - window_size // 2) : i + window_size // 2] ** 2)
            )
            for i in range(len(emg_magnitude))
        ]
    )

    # Plot EMG signal and RMS envelope
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8), sharex=True)

    # Raw EMG
    ax1.plot(time__s, emg_magnitude, linewidth=0.5, alpha=0.7, label="Raw EMG")
    ax1.plot(time__s, rms_envelope, linewidth=2, color="red", label="RMS Envelope")
    ax1.set_ylabel(f"Amplitude ({emg_signal.units})")
    ax1.set_title(f"Surface EMG at Electrode ({row}, {col})")
    ax1.legend()
    ax1.grid(True, alpha=0.3)

    # Spectrogram
    from scipy import signal as scipy_signal

    f, t, Sxx = scipy_signal.spectrogram(
        emg_magnitude,
        fs=emg_signal.sampling_rate.magnitude,
        nperseg=min(256, len(emg_magnitude) // 4),
    )

    ax2.pcolormesh(t, f, 10 * np.log10(Sxx), shading="gouraud", cmap="viridis")
    ax2.set_ylabel("Frequency (Hz)")
    ax2.set_xlabel("Time (s)")
    ax2.set_title("EMG Spectrogram")
    ax2.set_ylim(0, 500)  # Focus on relevant frequency range

    plt.tight_layout()
    plt.savefig(save_path / "surface_emg_analysis.png", dpi=150)
    print(f"\nSaved EMG analysis to: {save_path / 'surface_emg_analysis.png'}")
Surface EMG at Electrode (2, 2), EMG Spectrogram
======================================================================
EXAMPLE 3: SURFACE_EMG__Block
======================================================================

Number of electrode arrays: 1

Electrode array: ElectrodeArray_0
Number of motor pool segments: 1

Motor pool: Pool_0
  EMG shape: (30720, 5, 5)
  Time samples: 30720
  Electrode grid: 5 rows × 5 columns
  Sampling rate: 2048.0 Hz
  Duration: 15.0 s

EMG at electrode (2, 2):
  RMS amplitude: 0.081 1.0 mV
  Peak-to-peak: 0.474 1.0 mV

Saved EMG analysis to: results/surface_emg_analysis.png

Example 4: Extracting Intramuscular MUAP Data#

if "intramuscular_muaps" in available_files:
    print("\n" + "=" * 70)
    print("EXAMPLE 4: INTRAMUSCULAR_MUAP__Block")
    print("=" * 70)

    # Load intramuscular MUAP block
    im_muaps__Block: INTRAMUSCULAR_MUAP__Block = joblib.load(available_files["intramuscular_muaps"])

    print(f"\nNumber of MUAPs: {len(im_muaps__Block.segments)}")

    # Access a specific MUAP
    muap_idx = min(3, len(im_muaps__Block.segments) - 1)
    muap_segment = im_muaps__Block.segments[muap_idx]
    muap_signal = muap_segment.analogsignals[0]

    print(f"\nMUAP {muap_idx} ({muap_segment.name}):")
    print(f"  Shape: {muap_signal.shape}")
    print(f"  Time samples: {muap_signal.shape[0]}")
    print(f"  Number of electrodes: {muap_signal.shape[1]}")
    print(f"  Sampling rate: {muap_signal.sampling_rate}")
    print(f"  Duration: {muap_signal.t_stop}")

    # Extract MUAP from all electrodes
    n_electrodes = muap_signal.shape[1]

    # Plot MUAPs from all electrodes
    fig, axes_raw = plt.subplots(n_electrodes, 1, figsize=(10, 2 * n_electrodes), sharex=True)

    # Convert to list for consistent indexing (matplotlib returns Axes or array of Axes)
    axes_to_use = [axes_raw] if n_electrodes == 1 else list(axes_raw)  # type: ignore

    time__s = muap_signal.times.magnitude

    for electrode_idx in range(n_electrodes):
        electrode_muap = muap_signal[:, electrode_idx]
        peak_amplitude = np.max(np.abs(electrode_muap.magnitude))

        ax = axes_to_use[electrode_idx]
        ax.plot(time__s * 1000, electrode_muap.magnitude, linewidth=2)
        ax.set_ylabel(f"E{electrode_idx}\n({muap_signal.units})")
        ax.grid(True, alpha=0.3)
        ax.set_title(
            f"Electrode {electrode_idx} (Peak: {peak_amplitude:.2f} {muap_signal.units})",
            fontsize=10,
        )

    axes_to_use[-1].set_xlabel("Time (ms)")
    plt.suptitle(f"Intramuscular MUAP {muap_idx} - All Electrodes")
    plt.tight_layout()
    plt.savefig(save_path / "intramuscular_muap_analysis.png", dpi=150)
    print(f"\nSaved MUAP analysis to: {save_path / 'intramuscular_muap_analysis.png'}")

Example 5: Extracting Intramuscular EMG Data#

if "intramuscular_emg" in available_files:
    print("\n" + "=" * 70)
    print("EXAMPLE 5: INTRAMUSCULAR_EMG__Block")
    print("=" * 70)

    # Load intramuscular EMG block
    im_emg__Block: INTRAMUSCULAR_EMG__Block = joblib.load(available_files["intramuscular_emg"])

    print(f"\nNumber of motor pool segments: {len(im_emg__Block.segments)}")

    # Access first motor pool
    motor_pool = im_emg__Block.segments[0]
    emg_signal = motor_pool.analogsignals[0]

    print(f"\nMotor pool: {motor_pool.name}")
    print(f"  EMG shape: {emg_signal.shape}")
    print(f"  Time samples: {emg_signal.shape[0]}")
    print(f"  Number of electrodes: {emg_signal.shape[1]}")
    print(f"  Sampling rate: {emg_signal.sampling_rate}")
    print(f"  Duration: {emg_signal.t_stop}")

    # Extract EMG from first electrode
    electrode_idx = 0
    electrode_emg = emg_signal[:, electrode_idx]

    print(f"\nEMG at electrode {electrode_idx}:")
    print(
        f"  RMS amplitude: {np.sqrt(np.mean(electrode_emg.magnitude**2)):.3f} {electrode_emg.units}"
    )
    print(f"  Peak-to-peak: {np.ptp(electrode_emg.magnitude):.3f} {electrode_emg.units}")

    # Calculate single differential EMG between adjacent contacts
    n_electrodes = emg_signal.shape[1]
    if n_electrodes > 1:
        differential_emg = np.zeros((emg_signal.shape[0], n_electrodes - 1))

        for i in range(n_electrodes - 1):
            differential_emg[:, i] = (emg_signal[:, i + 1] - emg_signal[:, i]).magnitude

        print(f"\nDifferential EMG calculated ({n_electrodes - 1} channels)")
        print(
            f"  Differential channel 0 RMS: {np.sqrt(np.mean(differential_emg[:, 0] ** 2)):.3f} {emg_signal.units}"
        )

        # Plot monopolar and differential EMG
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8), sharex=True)

        time__s = emg_signal.times.magnitude

        # Monopolar EMG
        ax1.plot(time__s, electrode_emg.magnitude, linewidth=0.5)
        ax1.set_ylabel(f"Amplitude ({emg_signal.units})")
        ax1.set_title(f"Monopolar EMG - Electrode {electrode_idx}")
        ax1.grid(True, alpha=0.3)

        # Differential EMG
        ax2.plot(time__s, differential_emg[:, 0], linewidth=0.5, color="orange")
        ax2.set_ylabel(f"Amplitude ({emg_signal.units})")
        ax2.set_xlabel("Time (s)")
        ax2.set_title(f"Differential EMG - Electrodes {electrode_idx} to {electrode_idx + 1}")
        ax2.grid(True, alpha=0.3)

        plt.tight_layout()
        plt.savefig(save_path / "intramuscular_emg_analysis.png", dpi=150)
        print(f"\nSaved EMG analysis to: {save_path / 'intramuscular_emg_analysis.png'}")

Total running time of the script: (0 minutes 1.635 seconds)

Gallery generated by Sphinx-Gallery