Source code for myogestic.gui.protocols.record
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Optional, Tuple
import numpy as np
from PySide6.QtCore import QObject
from PySide6.QtGui import QCloseEvent
from myogestic.gui.widgets.logger import LoggerLevel
from myogestic.gui.widgets.templates.visual_interface import VisualInterface
if TYPE_CHECKING:
from myogestic.gui.myogestic import MyoGestic
PROGRESS_BAR_MAX = 100
[docs]
class RecordProtocol(QObject):
"""Protocol for recording EMG and kinematics data.
This class provides methods for recording EMG and kinematics data during specified tasks.
It enables users to specify the recording duration, task type, and label, and handles the data recording process.
Parameters
----------
main_window : MyoGestic
The parent application that manages the recording protocol.
Attributes
----------
_selected_visual_interface : Optional[VisualInterfaceTemplate]
The visual interface used for the recording.
_sampling_frequency : Optional[int]
Sampling frequency of the biosignal device.
_total_samples_to_record : int
Total number of samples to be recorded.
_biosignal__buffer : list[Tuple[float, np.ndarray]]
Buffer for storing timestamped EMG data samples.
is_biosignal_recording_complete : bool
Indicates if the EMG recording process has completed.
recording_start_time : float
Start time of the recording session.
"""
[docs]
def __init__(self, main_window: MyoGestic) -> None:
super().__init__(main_window)
self._main_window = main_window
self._sampling_frequency: Optional[int] = None
self._selected_visual_interface: Optional[VisualInterface] = None
self._total_samples_to_record: int = 0
self._biosignal__buffer: list[Tuple[float, np.ndarray]] = []
self.is_biosignal_recording_complete: bool = False
self.recording_start_time: float = 0.0
[docs]
def start_recording_preparation(self, duration: float) -> bool:
"""Prepare for EMG data recording.
Parameters
----------
duration : float
Duration of the recording in seconds.
Returns
-------
bool
True if preparation succeeds, False otherwise.
"""
device_widget = self._main_window.device__widget
if not device_widget._get_current_widget()._device._is_streaming: # noqa
self._main_window.logger.print(
"Biosignal device is not streaming!", level=LoggerLevel.ERROR
)
return False
self._sampling_frequency = device_widget.get_device_information()[
"sampling_frequency"
]
self._total_samples_to_record = int(duration * self._sampling_frequency)
self._biosignal__buffer.clear()
self.is_biosignal_recording_complete = False
self.recording_start_time = time.time()
device_widget.biosignal_data_arrived.connect(self.update_biosignal_buffer)
return True
[docs]
def update_biosignal_buffer(self, data: np.ndarray) -> None:
"""Update the buffer with incoming EMG data.
Parameters
----------
data : np.ndarray
New EMG data sample.
"""
self._biosignal__buffer.append((time.time(), data))
total_collected_samples = sum(
sample.shape[1] for _, sample in self._biosignal__buffer
)
self._main_window.ui.recordEMGProgressBar.setValue(
int(
(total_collected_samples / self._total_samples_to_record)
* PROGRESS_BAR_MAX
)
)
if total_collected_samples >= self._total_samples_to_record:
self._complete_recording_process()
[docs]
def _complete_recording_process(self) -> None:
"""Finalize the recording process."""
self._main_window.logger.print(
f"EMG recording finished in {round(time.time() - self.recording_start_time, 2)} seconds."
)
self.is_biosignal_recording_complete = True
self._main_window.device__widget.biosignal_data_arrived.disconnect(
self.update_biosignal_buffer
)
if self._selected_visual_interface:
self._selected_visual_interface.recording_interface_ui.check_recording_completion()
[docs]
def retrieve_recorded_data(self) -> Tuple[np.ndarray, np.ndarray]:
"""Retrieve recorded EMG data and timestamps.
Returns
-------
Tuple[np.ndarray, np.ndarray]
A tuple of EMG data and corresponding timestamps.
"""
emg, timings = [], []
for timestamp, sample in self._biosignal__buffer:
emg.append(sample)
timings.append(timestamp)
return np.stack(emg, axis=-1)[..., : self._total_samples_to_record], np.array(
timings
)
[docs]
def _reset_recording_ui(self) -> None:
"""Reset the recording UI and clear the buffer."""
self._main_window.ui.recordEMGProgressBar.setValue(0)
self._biosignal__buffer.clear()
[docs]
def close_event(self, _: QCloseEvent) -> None:
"""Handle the close event for the recording protocol."""
self._reset_recording_ui()
self.is_biosignal_recording_complete = False
self._main_window.logger.print("Recording protocol closed.")