Source code for biosignal_device_interface.devices.otb.otb_quattrocento_light
"""1) Quattrocento Light class for real-time interface to Quattrocento using OT Biolab Light.2) Quattrocento class for direct real-time interface to Quattrocento without using OT Biolab Light.Developer: Dominik I. BraunContact: dome.braun@fau.deLast Update: 2023-06-05"""# Python Librariesfrom__future__importannotationsfromtypingimportTYPE_CHECKING,Union,DictfromPySide6.QtNetworkimportQTcpSocket,QHostAddressfromPySide6.QtCoreimportQIODeviceimportnumpyasnpfrombiosignal_device_interface.devices.core.base_deviceimportBaseDevicefrombiosignal_device_interface.constants.devices.core.base_device_constantsimport(DeviceType,)frombiosignal_device_interface.constants.devices.otb.otb_quattrocento_light_constantsimport(COMMAND_START_STREAMING,COMMAND_STOP_STREAMING,CONNECTION_RESPONSE,QUATTROCENTO_LIGHT_STREAMING_FREQUENCY_DICT,QUATTROCENTO_SAMPLING_FREQUENCY_DICT,QuattrocentoLightSamplingFrequency,QuattrocentoLightStreamingFrequency,)ifTYPE_CHECKING:# Python LibrariesfromPySide6.QtWidgetsimportQMainWindow,QWidgetfromaenumimportEnum
[docs]classOTBQuattrocentoLight(BaseDevice):""" QuattrocentoLight device class derived from BaseDevice class. The QuattrocentoLight is using a TCP/IP protocol to communicate with the device. This class directly interfaces with the OT Biolab Light software from OT Bioelettronica. The configured settings of the device have to match the settings from the OT Biolab Light software! """def__init__(self,parent:Union[QMainWindow,QWidget]=None,)->None:super().__init__(parent)# Device Parametersself._device_type:DeviceType=DeviceType.OTB_QUATTROCENTO_LIGHT# Device Informationself._number_of_channels:int=408# Fix valueself._auxiliary_channel_start_index:int=384# Fix valueself._number_of_auxiliary_channels:int=16# Fix valueself._conversion_factor_biosignal:float=5/(2**16)/150*1000# in mVself._conversion_factor_auxiliary:float=5/(2**16)/0.5# in mVself._bytes_per_sample:int=2# Fix value# Quattrocento unique parametersself._streaming_frequency:int|None=None# Connection Parametersself._interface:QTcpSocket=QTcpSocket()# Configuration Parametersself._grids:list[int]|None=Noneself._grid_size:int=64# TODO: This is only valid for the big electrodesself._streaming_frequency_mode:QuattrocentoLightStreamingFrequency|None=(None)self._sampling_frequency_mode:QuattrocentoLightSamplingFrequency|None=None
[docs]def_make_request(self)->bool:super()._make_request()# Signal self.connect_toggled is emitted in _read_dataself._interface.connectToHost(QHostAddress(self._connection_settings[0]),self._connection_settings[1],QIODevice.ReadWrite,)ifnotself._interface.waitForConnected(1000):self._disconnect_from_device()returnFalseself._interface.readyRead.connect(self._read_data)returnTrue
[docs]def_read_data(self)->None:super()._read_data()# Wait for connection responseifnotself._is_connectedand(self._interface.bytesAvailable()==len(CONNECTION_RESPONSE)andself._interface.readAll()==CONNECTION_RESPONSE):self._is_connected=Trueself.connect_toggled.emit(True)returnifnotself._is_streaming:self.clear_socket()returnwhileself._interface.bytesAvailable()>self._buffer_size:packet=self._interface.read(self._buffer_size)ifnotpacket:continueself._received_bytes.extend(packet)whilelen(self._received_bytes)>=self._buffer_size:data_to_process=self._received_bytes[:self._buffer_size]self._process_data(data_to_process)self._received_bytes=self._received_bytes[self._buffer_size:]
[docs]def_process_data(self,input:bytearray)->None:super()._process_data(input)# Decode the datadecoded_data=np.frombuffer(input,dtype=np.int16)# Reshape it to the correct formatprocessed_data=decoded_data.reshape(self._number_of_channels,-1,order="F").astype(np.float32)# Emit the dataself.data_available.emit(processed_data)biosignal_data=self._extract_biosignal_data(processed_data)self.biosignal_data_available.emit(biosignal_data)auxiliary_data=self._extract_auxiliary_data(processed_data)self.auxiliary_data_available.emit(auxiliary_data)