Source code for biosignal_device_interface.devices.otb.otb_muovi
"""
Device class for real-time interfacing the Muovi device.
Developer: Dominik I. Braun
Contact: dome.braun@fau.de
Last Update: 2024-06-05
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Union, Dict
from PySide6.QtNetwork import QTcpSocket, QTcpServer, QHostAddress
import numpy as np
# Local Libraries
from biosignal_device_interface.devices.core.base_device import BaseDevice
from biosignal_device_interface.constants.devices.core.base_device_constants import (
DeviceType,
DeviceChannelTypes,
)
# Constants
from biosignal_device_interface.constants.devices.otb.otb_muovi_constants import (
MUOVI_CONVERSION_FACTOR_DICT,
MuoviWorkingMode,
MuoviDetectionMode,
MUOVI_WORKING_MODE_CHARACTERISTICS_DICT,
MUOVI_SAMPLES_PER_FRAME_DICT,
MUOVI_AVAILABLE_CHANNELS_DICT,
)
if TYPE_CHECKING:
from PySide6.QtWidgets import QMainWindow, QWidget
from aenum import Enum
[docs]
class OTBMuovi(BaseDevice):
"""
Muovi device class derived from BaseDevice class.
Args:
parent (Union[QMainWindow, QWidget], optional):
Parent widget to which the device is assigned to.
Defaults to None.
is_muovi_plus (bool):
True if the device is a Muovi Plus, False if not.
The Muovi class is using a TCP/IP protocol to communicate with the device.
"""
def __init__(
self,
parent: Union[QMainWindow, QWidget] = None,
is_muovi_plus: bool = False,
) -> None:
"""
Initialize the Muovi device.
Args:
parent (Union[QMainWindow, QWidget], optional): Parent widget. Defaults to None.
is_muovi_plus (bool, optional): Boolean to initialize the Muovi device as Muovi+ (64 biosignal channels) or Muovi (32 biosignal channels). Defaults to False (Muovi).
"""
super().__init__(parent)
# Device Parameters
self._device_type: DeviceType = (
DeviceType.OTB_MUOVI_PLUS if is_muovi_plus else DeviceType.OTB_MUOVI
)
# Connection Parameters
self._interface: QTcpServer = None
self._client_socket: QTcpSocket | None = None
# Configuration Parameters
self._working_mode: MuoviWorkingMode = MuoviWorkingMode.NONE
self._detection_mode: MuoviDetectionMode = MuoviDetectionMode.NONE
self._configuration_command: int | None = None
[docs]
def _connect_to_device(self) -> bool:
super()._connect_to_device()
self._interface = QTcpServer(self)
self._received_bytes: bytearray = bytearray()
if not self._interface.listen(
QHostAddress(self._connection_settings[0]), self._connection_settings[1]
):
return False
self._interface.newConnection.connect(self._make_request)
self._connection_timeout_timer.start()
return True
[docs]
def _make_request(self) -> bool:
super()._make_request()
self._client_socket = self._interface.nextPendingConnection()
if self._client_socket:
self._client_socket.readyRead.connect(self._read_data)
if not self.is_connected:
self.is_connected = True
self.connect_toggled.emit(self.is_connected)
self._connection_timeout_timer.stop()
return True
elif not self._is_configured:
self._is_configured = True
self.configure_toggled.emit(self._is_configured)
return True
[docs]
def _disconnect_from_device(self) -> bool:
super()._disconnect_from_device()
if self._client_socket is not None:
self._client_socket.readyRead.disconnect(self._read_data)
self._client_socket.disconnectFromHost()
self._client_socket.close()
if self._interface is not None:
self._interface.close()
return True
def _send_configuration_to_device(self) -> None:
configuration_bytes = int(self._configuration_command).to_bytes(
1, byteorder="big"
)
success = self._client_socket.write(configuration_bytes)
if success == -1:
self._disconnect_from_device()
def _configure_command(self) -> None:
self._configuration_command = self._working_mode.value << 2
self._configuration_command += self._detection_mode.value
[docs]
def _start_streaming(self) -> None:
super()._start_streaming()
if self._configuration_command is None:
return
self._configuration_command += 1
self._send_configuration_to_device()
[docs]
def _stop_streaming(self) -> None:
super()._stop_streaming()
if self._configuration_command is None:
return
self._configuration_command -= 1
self._send_configuration_to_device()
[docs]
def clear_socket(self) -> None:
if self._client_socket is not None:
self._client_socket.readAll()
self._received_bytes = bytearray()
[docs]
def _read_data(self) -> None:
super()._read_data()
if not self._is_streaming:
self.clear_socket()
return
while self._client_socket.bytesAvailable() > self._buffer_size:
packet = self._client_socket.read(self._buffer_size)
if not packet:
continue
self._received_bytes.extend(packet)
while len(self._received_bytes) >= self._buffer_size:
data_to_process = self._received_bytes[: self._buffer_size]
self._process_data(data_to_process)
self._received_bytes = self._received_bytes[self._buffer_size :]
[docs]
def _process_data(self, input: bytearray) -> None:
super()._process_data(input)
decoded_data = self._bytes_to_integers(input)
processed_data = decoded_data.reshape(
self._number_of_channels, -1, order="F"
).astype(np.float32)
# Emit the data
self.data_available.emit(processed_data)
self.biosignal_data_available.emit(self._extract_biosignal_data(processed_data))
self.auxiliary_data_available.emit(self._extract_auxiliary_data(processed_data))
# Convert channels from bytes to integers
def _bytes_to_integers(
self,
data: bytearray,
) -> np.ndarray:
channel_values = []
# Separate channels from byte-string. One channel has
# "bytes_in_sample" many bytes in it.
for channel_index in range(len(data) // 2):
channel_start = channel_index * self._bytes_per_sample
channel_end = (channel_index + 1) * self._bytes_per_sample
channel = data[channel_start:channel_end]
# Convert channel's byte value to integer
match self._working_mode:
case MuoviWorkingMode.EMG:
value = self._decode_int16(channel)
case MuoviWorkingMode.EEG:
value = self._decode_int24(channel)
channel_values.append(value)
return np.array(channel_values)
def _decode_int16(self, bytes_value: bytearray) -> int:
value = None
# Combine 2 bytes to a 16 bit integer value
value = bytes_value[0] * 2**8 + bytes_value[1]
# See if the value is negative and make the two's complement
if value >= 2**15:
value -= 2**16
return value
# Convert byte-array value to an integer value and apply two's complement
def _decode_int24(self, bytes_value: bytearray) -> int:
value = None
# Combine 3 bytes to a 24 bit integer value
value = bytes_value[0] * 2**16 + bytes_value[1] * 2**8 + bytes_value[2]
# See if the value is negative and make the two's complement
if value >= 2**23:
value -= 2**24
return value