Source code for doc_octopy.datasets.supervised

import pickle
from pathlib import Path
from typing import Optional, Sequence, Tuple

import numpy as np
import zarr
from tqdm import tqdm

from doc_octopy.datasets.filters._template import FilterBaseClass, EMGAugmentation
from doc_octopy.datasets.filters.generic import ChunkizeDataFilter
from doc_octopy.datatypes import DATA_TYPES_MAP, _Data


def _split_data(data: np.ndarray, split_ratio: float) -> tuple[np.ndarray, np.ndarray]:
    split_amount = int(data.shape[0] * split_ratio / 2)
    middle_index = data.shape[0] // 2

    mask = np.ones(data.shape[0], dtype=bool)
    mask[middle_index - split_amount : middle_index + split_amount] = False

    return data[mask], data[~mask]


def _add_to_dataset(group: zarr.Group, data: Optional[np.ndarray], name: str):
    if data is None:
        return

    try:
        group[name].append(data)
    except KeyError:
        group.create_dataset(
            name, data=data, shape=data.shape, chunks=(1, *data.shape[1:])
        )


[docs] class EMGDataset: """ Class for creating a dataset from EMG and ground truth data. Parameters ---------- emg_data_path : pathlib.Path Path to the EMG data file. It should be a pickle file containing a dictionary with the keys being the task number and the values being a numpy array of shape (n_channels, n_samples). ground_truth_data_path : pathlib.Path Path to the ground truth data file. It should be a pickle file containing a dictionary with the keys being the task number and the values being a numpy array of custom shape (..., n_samples). The custom shape can be anything, but the last dimension should be the same as the EMG data. tasks_to_use : Sequence[str] Sequence of strings containing the task numbers to use. If empty, all tasks will be used. save_path : pathlib.Path Path to save the dataset to. It should be a zarr file. emg_filter_pipeline_before_chunking : list[list[FilterBaseClass]] Sequence of filters to apply to the EMG data before chunking. The filters should inherit from FilterBaseClass. emg_filter_pipeline_after_chunking : list[list[FilterBaseClass]] Sequence of filters to apply to the EMG data after chunking. The filters should inherit from FilterBaseClass. ground_truth_filter_pipeline_before_chunking : list[list[FilterBaseClass]] Sequence of filters to apply to the ground truth data before chunking. The filters should inherit from FilterBaseClass. ground_truth_filter_pipeline_after_chunking : list[list[FilterBaseClass]] Sequence of filters to apply to the ground truth data after chunking. The filters should inherit from FilterBaseClass. chunk_size : int Size of the chunks to create from the data. chunk_shift : int Shift between the chunks. testing_split_ratio : float Ratio of the data to use for testing. The data will be split in the middle. The first half will be used for training and the second half will be used for testing. If 0, no data will be used for testing. validation_split_ratio : float Ratio of the data to use for validation. The data will be split in the middle. The first half will be used for training and the second half will be used for validation. If 0, no data will be used for validation. augmentation_pipelines : list[list[EMGAugmentation]] Sequence of augmentation_pipelines to apply to the training data. The augmentation_pipelines should inherit from EMGAugmentation. amount_of_chunks_to_augment_at_once : int Amount of chunks to augment at once. This is done to speed up the process. Methods ------- create_dataset() Creates the dataset. """ def __init__( self, emg_data_path: Path = Path("REPLACE ME"), emg_data: dict[str, np.ndarray] = {}, ground_truth_data_path: Path = Path("REPLACE ME"), ground_truth_data: dict[str, np.ndarray] = {}, ground_truth_data_type: str = "kinematics", sampling_frequency: float = 0.0, tasks_to_use: Sequence[str] = (), save_path: Path = Path("REPLACE ME"), emg_filter_pipeline_before_chunking: list[list[FilterBaseClass]] = (), emg_representations_to_filter_before_chunking: list[str] = (), emg_filter_pipeline_after_chunking: list[list[FilterBaseClass]] = (), emg_representations_to_filter_after_chunking: list[str] = (), ground_truth_filter_pipeline_before_chunking: list[list[FilterBaseClass]] = (), ground_truth_representations_to_filter_before_chunking: list[str] = (), ground_truth_filter_pipeline_after_chunking: list[list[FilterBaseClass]] = (), ground_truth_representations_to_filter_after_chunking: list[str] = (), chunk_size: int = 192, chunk_shift: int = 64, testing_split_ratio: float = 0.2, validation_split_ratio: float = 0.2, augmentation_pipelines: list[list[EMGAugmentation]] = (), amount_of_chunks_to_augment_at_once: int = 250, debug: bool = False, ): self.emg_data_path = emg_data_path self.emg_data = emg_data self.ground_truth_data_path = ground_truth_data_path self.ground_truth_data = ground_truth_data # check if at least one of the data sources is provided if not self.emg_data and not self.emg_data_path: raise ValueError("At least one of the EMG data sources should be provided.") if not self.ground_truth_data and not self.ground_truth_data_path: raise ValueError( "At least one of the ground truth data sources should be provided." ) self.ground_truth_data_type = ground_truth_data_type self.sampling_frequency = sampling_frequency self.tasks_to_use = tasks_to_use self.save_path = save_path self.emg_filter_pipeline_before_chunking = emg_filter_pipeline_before_chunking self.emg_representations_to_filter_before_chunking = ( emg_representations_to_filter_before_chunking ) self.ground_truth_filter_pipeline_before_chunking = ( ground_truth_filter_pipeline_before_chunking ) self.ground_truth_representations_to_filter_before_chunking = ( ground_truth_representations_to_filter_before_chunking ) self.emg_filter_pipeline_after_chunking = emg_filter_pipeline_after_chunking self.emg_representations_to_filter_after_chunking = ( emg_representations_to_filter_after_chunking ) self.ground_truth_filter_pipeline_after_chunking = ( ground_truth_filter_pipeline_after_chunking ) self.ground_truth_representations_to_filter_after_chunking = ( ground_truth_representations_to_filter_after_chunking ) self.chunk_size = chunk_size self.chunk_shift = chunk_shift self.testing_split_ratio = testing_split_ratio self.validation_split_ratio = validation_split_ratio self.augmentation_pipelines = augmentation_pipelines self.amount_of_chunks_to_augment_at_once = amount_of_chunks_to_augment_at_once self.debug = debug self.__tasks_string_length = 0 def __add_data_to_dataset( self, data: _Data, groups: list[zarr.Group] ) -> Tuple[list[int], list[int], list[int]]: training_data_sizes, testing_data_sizes, validation_data_sizes = [], [], [] for k, v in data.output_representations.items(): validation_data_from_task = None if self.testing_split_ratio > 0: training_data_from_task, testing_data_from_task = _split_data( v, self.testing_split_ratio ) if self.validation_split_ratio > 0: testing_data_from_task, validation_data_from_task = _split_data( testing_data_from_task, self.validation_split_ratio ) else: training_data_from_task = v testing_data_from_task = None for g, data_from_task in zip( groups, ( training_data_from_task, testing_data_from_task, validation_data_from_task, ), ): _add_to_dataset(g, data_from_task, k) training_data_sizes.append(training_data_from_task.shape[0]) testing_data_sizes.append( testing_data_from_task.shape[0] if testing_data_from_task is not None else 0 ) validation_data_sizes.append( validation_data_from_task.shape[0] if validation_data_from_task is not None else 0 ) return training_data_sizes, testing_data_sizes, validation_data_sizes
[docs] def create_dataset(self): emg_data = self.emg_data or pickle.load(self.emg_data_path.open("rb")) ground_truth_data = self.ground_truth_data or pickle.load( self.ground_truth_data_path.open("rb") ) self.save_path.mkdir(parents=True, exist_ok=True) dataset = zarr.open(str(self.save_path), mode="w") training_group = dataset.create_group("training") testing_group = dataset.create_group("testing") validation_group = dataset.create_group("validation") # need this to know the saving dtype for the labels self.__tasks_string_length = len(max(self.tasks_to_use, key=len)) for task in tqdm(self.tasks_to_use, desc="Filtering and splitting data"): emg_data_from_task = emg_data[task] ground_truth_data_from_task = ground_truth_data[task] min_length = min( emg_data_from_task.shape[-1], ground_truth_data_from_task.shape[-1] ) emg_data_from_task = emg_data_from_task[..., :min_length] ground_truth_data_from_task = ground_truth_data_from_task[..., :min_length] emg_data_from_task = DATA_TYPES_MAP["emg"]( input_data=emg_data_from_task, sampling_frequency=self.sampling_frequency, ) ground_truth_data_from_task = DATA_TYPES_MAP[self.ground_truth_data_type]( input_data=ground_truth_data_from_task, sampling_frequency=self.sampling_frequency, ) if emg_data_from_task.is_chunked != ground_truth_data_from_task.is_chunked: raise ValueError( f"The EMG and ground truth data should have the same chunking status, but the EMG data is " f"{'' if emg_data_from_task.is_chunked else 'not '}chunked and the ground truth data is " f"{'' if ground_truth_data_from_task.is_chunked else 'not '}chunked." ) if self.debug: print("After loading:") print(emg_data_from_task) emg_data_from_task.plot_graph() print(ground_truth_data_from_task) ground_truth_data_from_task.plot_graph() if not emg_data_from_task.is_chunked["Input"]: emg_data_from_task.apply_filter_pipeline( filter_pipeline=self.emg_filter_pipeline_before_chunking, representations_to_filter=self.emg_representations_to_filter_before_chunking, ) ground_truth_data_from_task.apply_filter_pipeline( filter_pipeline=self.ground_truth_filter_pipeline_before_chunking, representations_to_filter=self.ground_truth_representations_to_filter_before_chunking, ) emg_data_from_task.apply_filter( filter=ChunkizeDataFilter( chunk_size=self.chunk_size, chunk_shift=self.chunk_shift, is_output=len(self.emg_filter_pipeline_after_chunking) == 0, name="EMG_Chunkizer" ), representation_to_filter="Last", ) chunked_emg_data_from_task = emg_data_from_task ground_truth_data_from_task.apply_filter( filter=ChunkizeDataFilter( chunk_size=self.chunk_size, chunk_shift=self.chunk_shift, is_output=len(self.ground_truth_filter_pipeline_after_chunking) == 0, ), representation_to_filter="Last", ) chunked_ground_truth_data_from_task = ground_truth_data_from_task if self.debug: print("After chunking:") print(chunked_emg_data_from_task) chunked_emg_data_from_task.plot_graph() print(chunked_ground_truth_data_from_task) chunked_ground_truth_data_from_task.plot_graph() else: chunked_emg_data_from_task = emg_data_from_task # [:min_length] i = 0 temp = [] while ( i + self.amount_of_chunks_to_augment_at_once <= chunked_emg_data_from_task.shape[0] ): temp.append( np.concatenate( chunked_emg_data_from_task[ i : i + self.amount_of_chunks_to_augment_at_once ], axis=-1, ) ) i += self.amount_of_chunks_to_augment_at_once chunked_emg_data_from_task = np.stack(temp, axis=1) chunked_ground_truth_data_from_task = ( ground_truth_data_from_task # [:min_length] ) chunked_emg_data_from_task.apply_filter_pipeline( filter_pipeline=self.emg_filter_pipeline_after_chunking, representations_to_filter=self.emg_representations_to_filter_after_chunking, ) chunked_ground_truth_data_from_task.apply_filter_pipeline( filter_pipeline=self.ground_truth_filter_pipeline_after_chunking, representations_to_filter=self.ground_truth_representations_to_filter_after_chunking, ) if self.debug: print("After filtering the chunked data:") print(emg_data_from_task) chunked_emg_data_from_task.plot_graph() print(ground_truth_data_from_task) chunked_ground_truth_data_from_task.plot_graph() for group_name, chunked_data_from_task in zip( ["emg", "ground_truth"], [chunked_emg_data_from_task, chunked_ground_truth_data_from_task], ): # assumption is made that the emg and ground truth data have the same amount of chunks ( training_sizes, testing_sizes, validation_sizes, ) = self.__add_data_to_dataset( chunked_data_from_task, [ ( g.create_group(group_name) if group_name not in list(g.group_keys()) else g[group_name] ) for g in (training_group, testing_group, validation_group) ], ) data_length = list( chunked_emg_data_from_task.output_representations.values() )[-1].shape[0] data_length_ground_truth = list( chunked_ground_truth_data_from_task.output_representations.values() )[-1].shape[0] # check that all values in training_sizes, testing_sizes, and validation_sizes are the same assert len(set(training_sizes)) == 1, "The training sizes are not the same." assert len(set(testing_sizes)) == 1, "The testing sizes are not the same." assert len(set(validation_sizes)) == 1, "The validation sizes are not the same." assert ( data_length == data_length_ground_truth ), "The data lengths of the EMG and ground truth data should be the same. For task {}, the EMG data has length {} and the ground truth data has length {}.".format( task, data_length, data_length_ground_truth ) for g, size in zip( (training_group, testing_group, validation_group), (training_sizes[0], testing_sizes[0], validation_sizes[0]), # assumption is made that all output representations have the same length ): _add_to_dataset( g, np.array([task] * size)[..., None].astype( f"<U{self.__tasks_string_length}" ), "label", ) _add_to_dataset( g, np.array([self.tasks_to_use.index(task)] * size)[..., None].astype( np.int8 ), "class", ) _add_to_dataset( g, np.repeat( np.array( [ np.eye(len(self.tasks_to_use))[ self.tasks_to_use.index(task) ] ] ), size, axis=0, ).astype(np.int8), "one_hot_class", ) for augmentation_pipeline in self.augmentation_pipelines: emg_to_append = {k: [] for k in dataset["training\emg"]} ground_truth_to_append = {k: [] for k in dataset["training/ground_truth"]} label_to_append = [] class_to_append = [] one_hot_class_to_append = [] for i in tqdm( range( list(chunked_emg_data_from_task.output_representations.values())[ -1 ].shape[0] ), desc=f"Augmenting with {str(augmentation_pipeline)}", ): for k in dataset["training\emg"]: temp = DATA_TYPES_MAP["emg"]( input_data=dataset["training/emg"][k][i].astype(np.float32), sampling_frequency=self.sampling_frequency, ) temp.apply_filter_pipeline( filter_pipeline=[augmentation_pipeline], representations_to_filter=["Last"], ) emg_to_append[k].append(temp["Last"]) for k in dataset["training/ground_truth"]: ground_truth_to_append[k].append( dataset["training/ground_truth"][k][i] ) label_to_append.append(dataset["training/label"][i]) class_to_append.append(dataset["training/class"][i]) one_hot_class_to_append.append(dataset["training/one_hot_class"][i]) if i % self.amount_of_chunks_to_augment_at_once == 0: for k, v in emg_to_append.items(): _add_to_dataset(training_group["emg"], np.array(v), name=k) for k, v in ground_truth_to_append.items(): _add_to_dataset( training_group["ground_truth"], np.array(v), name=k ) _add_to_dataset( training_group, np.array(label_to_append), name="label" ) _add_to_dataset( training_group, np.array(class_to_append), name="class" ) _add_to_dataset( training_group, np.array(one_hot_class_to_append), name="one_hot_class", ) emg_to_append = {k: [] for k in dataset["training\emg"]} ground_truth_to_append = { k: [] for k in dataset["training/ground_truth"] } label_to_append = [] class_to_append = [] one_hot_class_to_append = [] if len(list(emg_to_append.values())[0]) > 0: for k, v in emg_to_append.items(): _add_to_dataset(training_group["emg"], np.array(v), name=k) for k, v in ground_truth_to_append.items(): _add_to_dataset(training_group["ground_truth"], np.array(v), name=k) _add_to_dataset(training_group, np.array(label_to_append), name="label") _add_to_dataset( training_group, np.array(class_to_append), name=f"class" ) _add_to_dataset( training_group, np.array(one_hot_class_to_append), name=f"one_hot_class", )