Source code for biosignal_device_interface.devices.otb.otb_quattrocento_light
"""
Quattrocento Light class for real-time interface to
Quattrocento using OT Biolab Light.
Developer: Dominik I. Braun
Contact: dome.braun@fau.de
Last Update: 2023-06-05
"""
# Python Libraries
from __future__ import annotations
from typing import TYPE_CHECKING, Union, Dict
from PySide6.QtNetwork import QTcpSocket, QHostAddress
from PySide6.QtCore import QIODevice
import numpy as np
from biosignal_device_interface.devices.core.base_device import BaseDevice
from biosignal_device_interface.constants.devices.core.base_device_constants import (
DeviceType,
)
from biosignal_device_interface.constants.devices.otb.otb_quattrocento_light_constants import (
COMMAND_START_STREAMING,
COMMAND_STOP_STREAMING,
CONNECTION_RESPONSE,
QUATTROCENTO_LIGHT_STREAMING_FREQUENCY_DICT,
QUATTROCENTO_SAMPLING_FREQUENCY_DICT,
QuattrocentoLightSamplingFrequency,
QuattrocentoLightStreamingFrequency,
)
if TYPE_CHECKING:
# Python Libraries
from PySide6.QtWidgets import QMainWindow, QWidget
from aenum import Enum
[docs]
class OTBQuattrocentoLight(BaseDevice):
"""
QuattrocentoLight device class derived from BaseDevice class.
The QuattrocentoLight is using a TCP/IP protocol to communicate with the device.
This class directly interfaces with the OT Biolab Light software from
OT Bioelettronica. The configured settings of the device have to
match the settings from the OT Biolab Light software!
"""
def __init__(
self,
parent: Union[QMainWindow, QWidget] = None,
) -> None:
super().__init__(parent)
# Device Parameters
self._device_type: DeviceType = DeviceType.OTB_QUATTROCENTO_LIGHT
# Device Information
self._number_of_channels: int = 408 # Fix value
self._auxiliary_channel_start_index: int = 384 # Fix value
self._number_of_auxiliary_channels: int = 16 # Fix value
self._conversion_factor_biosignal: float = 5 / (2**16) / 150 * 1000 # in mV
self._conversion_factor_auxiliary: float = 5 / (2**16) / 0.5 # in V
self._bytes_per_sample: int = 2 # Fix value
# Quattrocento unique parameters
self._streaming_frequency: int | None = None
# Connection Parameters
self._interface: QTcpSocket = QTcpSocket()
# Configuration Parameters
self._grids: list[int] | None = None
self._grid_size: int = 64 # TODO: This is only valid for the big electrodes
self._streaming_frequency_mode: QuattrocentoLightStreamingFrequency | None = (
None
)
self._sampling_frequency_mode: QuattrocentoLightSamplingFrequency | None = None
[docs]
def _connect_to_device(self) -> bool:
super()._connect_to_device()
self._received_bytes: bytearray = bytearray()
return self._make_request()
[docs]
def _make_request(self) -> bool:
super()._make_request()
# Signal self.connect_toggled is emitted in _read_data
self._interface.connectToHost(
QHostAddress(self._connection_settings[0]),
self._connection_settings[1],
QIODevice.ReadWrite,
)
if not self._interface.waitForConnected(1000):
self._disconnect_from_device()
return False
self._interface.readyRead.connect(self._read_data)
return True
[docs]
def _disconnect_from_device(self) -> None:
super()._disconnect_from_device()
self._interface.disconnectFromHost()
self._interface.readyRead.disconnect(self._read_data)
self._interface.close()
[docs]
def _start_streaming(self) -> None:
super()._start_streaming()
self._interface.write(COMMAND_START_STREAMING)
[docs]
def _stop_streaming(self) -> None:
super()._stop_streaming()
self._interface.write(COMMAND_STOP_STREAMING)
self._interface.waitForBytesWritten(1000)
[docs]
def clear_socket(self) -> None:
super().clear_socket()
self._interface.readAll()
[docs]
def _read_data(self) -> None:
super()._read_data()
# Wait for connection response
if not self.is_connected and (
self._interface.bytesAvailable() == len(CONNECTION_RESPONSE)
and self._interface.readAll() == CONNECTION_RESPONSE
):
self.is_connected = True
self.connect_toggled.emit(True)
return
if not self._is_streaming:
self.clear_socket()
return
while self._interface.bytesAvailable() > self._buffer_size:
packet = self._interface.read(self._buffer_size)
if not packet:
continue
self._received_bytes.extend(packet)
while len(self._received_bytes) >= self._buffer_size:
data_to_process = self._received_bytes[: self._buffer_size]
self._process_data(data_to_process)
self._received_bytes = self._received_bytes[self._buffer_size :]
[docs]
def _process_data(self, input: bytearray) -> None:
super()._process_data(input)
# Decode the data
decoded_data = np.frombuffer(input, dtype=np.int16)
# Reshape it to the correct format
processed_data = decoded_data.reshape(
self._number_of_channels, -1, order="F"
).astype(np.float32)
# Emit the data
self.data_available.emit(processed_data)
biosignal_data = self._extract_biosignal_data(processed_data)
self.biosignal_data_available.emit(biosignal_data)
auxiliary_data = self._extract_auxiliary_data(processed_data)
self.auxiliary_data_available.emit(auxiliary_data)