Source code for biosignal_device_interface.devices.otb.otb_muovi
# Python Libraries"""Device class for real-time interfacing the Muovi device.Developer: Dominik I. BraunContact: dome.braun@fau.deLast Update: 2024-06-05"""from__future__importannotationsfromtypingimportTYPE_CHECKING,Union,DictfromPySide6.QtNetworkimportQTcpSocket,QTcpServer,QHostAddressimportnumpyasnp# Local Librariesfrombiosignal_device_interface.devices.core.base_deviceimportBaseDevicefrombiosignal_device_interface.constants.devices.core.base_device_constantsimport(DeviceType,DeviceChannelTypes,)# Constantsfrombiosignal_device_interface.constants.devices.otb.otb_muovi_constantsimport(MUOVI_CONVERSION_FACTOR_DICT,MuoviWorkingMode,MuoviDetectionMode,MUOVI_WORKING_MODE_CHARACTERISTICS_DICT,MUOVI_SAMPLES_PER_FRAME_DICT,MUOVI_AVAILABLE_CHANNELS_DICT,)ifTYPE_CHECKING:fromPySide6.QtWidgetsimportQMainWindow,QWidgetfromaenumimportEnum
[docs]classOTBMuovi(BaseDevice):""" Muovi device class derived from BaseDevice class. Args: parent (Union[QMainWindow, QWidget], optional): Parent widget to which the device is assigned to. Defaults to None. is_muovi_plus (bool): True if the device is a Muovi Plus, False if not. The Muovi class is using a TCP/IP protocol to communicate with the device. """def__init__(self,parent:Union[QMainWindow,QWidget]=None,is_muovi_plus:bool=False,)->None:""" Initialize the Muovi device. Args: parent (Union[QMainWindow, QWidget], optional): Parent widget. Defaults to None. is_muovi_plus (bool, optional): Boolean to initialize the Muovi device as Muovi+ (64 biosignal channels) or Muovi (32 biosignal channels). Defaults to False (Muovi). """super().__init__(parent)# Device Parametersself._device_type:DeviceType=(DeviceType.OTB_MUOVI_PLUSifis_muovi_pluselseDeviceType.OTB_MUOVI)# Connection Parametersself._interface:QTcpServer=Noneself._client_socket:QTcpSocket|None=None# Configuration Parametersself._working_mode:MuoviWorkingMode=MuoviWorkingMode.NONEself._detection_mode:MuoviDetectionMode=MuoviDetectionMode.NONEself._configuration_command:int|None=None
[docs]defconfigure_device(self,params:Dict[str,Union[Enum,Dict[str,Enum]]]# type: ignore)->None:super().configure_device(params)ifnotself._is_connectedorself._client_socketisNone:return# Check if detection mode is valid for working mode (Case EEG -> MONOPOLAR_GAIN_4 => MONOPOLAR_GAIN_8)ifself._working_mode==MuoviWorkingMode.EEG:ifself._detection_mode==MuoviDetectionMode.MONOPOLAR_GAIN_4:self._detection_mode=MuoviDetectionMode.MONOPOLAR_GAIN_8self._conversion_factor_biosignal=MUOVI_CONVERSION_FACTOR_DICT[self._detection_mode]self._conversion_factor_auxiliary=self._conversion_factor_biosignal# Set configuration parameters for data transferworking_mode_characteristics=MUOVI_WORKING_MODE_CHARACTERISTICS_DICT[self._working_mode]self._sampling_frequency=working_mode_characteristics["sampling_frequency"]self._bytes_per_sample=working_mode_characteristics["bytes_per_sample"]self._samples_per_frame=MUOVI_SAMPLES_PER_FRAME_DICT[self._device_type][self._working_mode]self._number_of_channels=MUOVI_AVAILABLE_CHANNELS_DICT[self._device_type][DeviceChannelTypes.ALL]self._number_of_biosignal_channels=MUOVI_AVAILABLE_CHANNELS_DICT[self._device_type][DeviceChannelTypes.BIOSIGNAL]self._biosignal_channel_indices=np.arange(self._number_of_biosignal_channels)self._number_of_auxiliary_channels=MUOVI_AVAILABLE_CHANNELS_DICT[self._device_type][DeviceChannelTypes.AUXILIARY]self._auxiliary_channel_indices=np.arange(self._number_of_biosignal_channels,self._number_of_biosignal_channels+self._number_of_auxiliary_channels,)self._buffer_size=(self._number_of_channels*self._samples_per_frame*self._bytes_per_sample)self._received_bytes=bytearray()self._configure_command()self._send_configuration_to_device()
[docs]def_process_data(self,input:bytearray)->None:super()._process_data(input)decoded_data=self._bytes_to_integers(input)processed_data=decoded_data.reshape(self._number_of_channels,-1,order="F").astype(np.float32)# Emit the dataself.data_available.emit(processed_data)self.biosignal_data_available.emit(self._extract_biosignal_data(processed_data))self.auxiliary_data_available.emit(self._extract_auxiliary_data(processed_data))
# Convert channels from bytes to integersdef_bytes_to_integers(self,data:bytearray,)->np.ndarray:channel_values=[]# Separate channels from byte-string. One channel has# "bytes_in_sample" many bytes in it.forchannel_indexinrange(len(data)//2):channel_start=channel_index*self._bytes_per_samplechannel_end=(channel_index+1)*self._bytes_per_samplechannel=data[channel_start:channel_end]# Convert channel's byte value to integermatchself._working_mode:caseMuoviWorkingMode.EMG:value=self._decode_int16(channel)caseMuoviWorkingMode.EEG:value=self._decode_int24(channel)channel_values.append(value)returnnp.array(channel_values)def_decode_int16(self,bytes_value:bytearray)->int:value=None# Combine 2 bytes to a 16 bit integer valuevalue=bytes_value[0]*2**8+bytes_value[1]# See if the value is negative and make the two's complementifvalue>=2**15:value-=2**16returnvalue# Convert byte-array value to an integer value and apply two's complementdef_decode_int24(self,bytes_value:bytearray)->int:value=None# Combine 3 bytes to a 24 bit integer valuevalue=bytes_value[0]*2**16+bytes_value[1]*2**8+bytes_value[2]# See if the value is negative and make the two's complementifvalue>=2**23:value-=2**24returnvalue