Source code for myoverse.datasets.filters.emg_augmentations

import numba
import numpy as np
import pywt
from scipy import interpolate

from myoverse.datasets.filters._template import EMGAugmentation


@numba.njit(nogil=True, fastmath=True)
def _gaussian_noise(input_array: np.ndarray, target_snr__db: float) -> np.ndarray:
    squared_chunk = np.square(input_array.astype(np.float64))
    mean_squared_chunk = np.zeros((input_array.shape[0]))
    for i in range(input_array.shape[1]):
        mean_squared_chunk += squared_chunk[:, i]
    mean_squared_chunk /= input_array.shape[1]

    noise_avg_per_channel_and_electrode__watts = np.sqrt(
        np.power(
            10, ((10 * np.log10(mean_squared_chunk + 1e-15)) - target_snr__db) / 10
        )
    )

    for electrode_index in range(input_array.shape[0]):
        input_array[electrode_index] += np.random.normal(
            loc=0.0,
            scale=noise_avg_per_channel_and_electrode__watts[electrode_index],
            size=input_array.shape[1],
        ).astype(np.int16)

    return input_array


[docs] class GaussianNoise(EMGAugmentation): """Adds Gaussian noise to the input EMG data. This augmentation is based on the paper [1]_ Parameters ---------- target_snr__db : float, optional The target signal-to-noise ratio in decibels, by default 5.0. input_is_chunked : bool Whether the input is chunked or not. is_output : bool Whether the filter is an output filter. If True, the resulting signal will be outputted by and dataset pipeline. Methods ------- __call__(chunk: numpy.ndarray) -> numpy.ndarray Augments the chunk. Notes ----- .. [1] Tsinganos, P., Cornelis, B., Cornelis, J., Jansen, B., Skodras, A., 2020. Data Augmentation of Surface Electromyography for Hand Gesture Recognition. Sensors 20, 4892. https://doi.org/10/grc7ph """ def __init__( self, target_snr__db: float = 5.0, input_is_chunked: bool = None, is_output: bool = False, ): super().__init__(input_is_chunked=input_is_chunked, is_output=is_output) self.target_snr__db = target_snr__db def _filter(self, input_array: np.ndarray) -> np.ndarray: return _gaussian_noise( input_array=input_array, target_snr__db=self.target_snr__db )
[docs] class MagnitudeWarping(EMGAugmentation): """Magnitude warping augmentation. This augmentation is based on the paper [2]_ Parameters ---------- nr_of_point_for_spline : int, optional The number of points to use for the spline, by default 6. gaussian_mean : float, optional The mean of the Gaussian distribution, by default 1.0. gaussian_std : float, optional The standard deviation of the Gaussian distribution, by default 0.35. nr_of_grids : int, optional The number of grids to use, by default 5. input_is_chunked : bool Whether the input is chunked or not. is_output : bool Whether the filter is an output filter. If True, the resulting signal will be outputted by and dataset pipeline. Methods ------- __call__(chunk: numpy.ndarray) -> numpy.ndarray Augments the chunk. Notes ----- .. [2] Tsinganos, P., Cornelis, B., Cornelis, J., Jansen, B., Skodras, A., 2020. Data Augmentation of Surface Electromyography for Hand Gesture Recognition. Sensors 20, 4892. https://doi.org/10/grc7ph """ def __init__( self, nr_of_point_for_spline: int = 6, gaussian_mean: float = 1.0, gaussian_std: float = 0.35, nr_of_grids: int = None, input_is_chunked: bool = None, is_output: bool = False, ): super().__init__(input_is_chunked=input_is_chunked, is_output=is_output) self.nr_of_point_for_spline = nr_of_point_for_spline self.gaussian_mean = gaussian_mean self.gaussian_std = gaussian_std self.nr_of_grids = nr_of_grids if self.nr_of_grids is None: raise ValueError("nr_of_grids must be specified.") def _filter(self, input_array: np.ndarray) -> np.ndarray: random_gens = [np.random.default_rng() for _ in range(self.nr_of_grids)] return np.multiply( input_array.astype(np.float64), np.repeat( np.array( [ interpolate.interp1d( np.linspace( start=0, stop=input_array.shape[-1], num=self.nr_of_point_for_spline, ), random_gens[i].normal( loc=self.gaussian_mean, scale=self.gaussian_std, size=self.nr_of_point_for_spline, ), kind="cubic", )(np.arange(input_array.shape[-1])) for i in range(self.nr_of_grids) ] ), repeats=input_array.shape[0] // self.nr_of_grids, axis=0, ), )
[docs] class WaveletDecomposition(EMGAugmentation): """Wavelet decomposition augmentation. This augmentation is based on the paper [3]_ Parameters ---------- b : float, optional The scaling factor, by default 0.25. wavelet : str, optional The wavelet to use, by default "db7". level : int, optional The level of decomposition, by default 5. nr_of_grids : int, optional The number of grids to use, by default 5. input_is_chunked : bool Whether the input is chunked or not. is_output : bool Whether the filter is an output filter. If True, the resulting signal will be outputted by and dataset pipeline. Methods ------- __call__(chunk: numpy.ndarray) -> numpy.ndarray Augments the chunk. Notes ----- .. [3] Tsinganos, P., Cornelis, B., Cornelis, J., Jansen, B., Skodras, A., 2020. Data Augmentation of Surface Electromyography for Hand Gesture Recognition. Sensors 20, 4892. https://doi.org/10/grc7ph """ def __init__( self, b: float = 0.25, wavelet: str = "db7", level: int = 5, nr_of_grids: int = None, input_is_chunked: bool = None, is_output: bool = False, ): super().__init__(input_is_chunked=input_is_chunked, is_output=is_output) self.b = b self.wavelet = wavelet self.level = level self.nr_of_grids = nr_of_grids if self.nr_of_grids is None: raise ValueError("nr_of_grids must be specified.") def _filter(self, input_array: np.ndarray) -> np.ndarray: coefficients_per_grid = [ pywt.wavedec( grid.astype(np.float64), wavelet=self.wavelet, level=self.level, mode="reflect", axis=1, ) for grid in np.array_split(input_array, self.nr_of_grids, axis=0) ] return np.concatenate( [ pywt.waverec( [coefficients[0]] + [coefficients[i] * self.b for i in range(1, self.level + 1)], wavelet=self.wavelet, axis=1, ) for coefficients in coefficients_per_grid ], axis=0, )