Source code for myogestic.gui.widgets.visual_interfaces.virtual_cursor_interface.setup_interface

import ast
import sys
import time
from pathlib import Path
import os

import numpy as np
from PySide6.QtCore import QByteArray, QMetaObject, QProcess, QTimer, Qt, Signal
from PySide6.QtGui import QCloseEvent
from PySide6.QtNetwork import QHostAddress, QUdpSocket
from PySide6.QtWidgets import QCheckBox, QPushButton, QWidget, QApplication

from myogestic.gui.widgets.logger import LoggerLevel
from myogestic.gui.widgets.templates.visual_interface import SetupInterfaceTemplate
from myogestic.gui.widgets.visual_interfaces.virtual_cursor_interface.ui import Ui_SetupVirtualCursorInterface
from myogestic.utils.constants import MYOGESTIC_UDP_PORT

# Stylesheets
NOT_CONNECTED_STYLESHEET = "background-color: red; border-radius: 5px;"
CONNECTED_STYLESHEET = "background-color: green; border-radius: 5px;"

# Constants
STREAMING_FREQUENCY = 60
TIME_BETWEEN_MESSAGES = 1 / STREAMING_FREQUENCY

SOCKET_IP = "127.0.0.1"
STATUS_REQUEST = "status"
STATUS_RESPONSE = "active"


# Ports
# on this port the VCI listens for incoming messages from MyoGestic
VCI__UDP_PORT = 1236

# on this port the VCI sends the status check
VCI_STATUS__UDP_PORT = 1235

# on this port the VCI sends the currently displayed predicted cursor
VCI_PREDICTION__UDP_PORT = 1234


[docs] class VirtualCursorInterface_SetupInterface(SetupInterfaceTemplate): """ Setup interface for the Virtual Cursor Interface. This class is responsible for setting up the Virtual Cursor Interface. Attributes ---------- predicted_cursor__signal : Signal Signal that emits the predicted cursor data. incoming_message_signal : Signal Signal that emits both an array and a string. Parameters ---------- main_window : QMainWindow The main window of the application. name : str The name of the interface. Default is "VirtualCursorInterface". .. important:: The name of the interface must be unique. """ predicted_cursor__signal = Signal(np.ndarray) incoming_message_signal = Signal(np.ndarray)
[docs] def __init__(self, main_window, name="VirtualCursorInterface"): super().__init__(main_window, name, ui=Ui_SetupVirtualCursorInterface()) self._cursor_process = QProcess() self._cursor_process.setProgram(sys.executable) self._cursor_process.setArguments([self._get_cursor_py_executable()]) self._cursor_process.started.connect(lambda: self._main_window.toggle_selected_visual_interface(self.name)) self._cursor_process.finished.connect(self.interface_was_killed) self._cursor_process.finished.connect(lambda: self._main_window.toggle_selected_visual_interface(self.name)) self._setup_timers() self._last_message_time = time.time() self._is_connected: bool = False self._streaming__udp_socket: QUdpSocket | None = None # Custom Stuff self._predicted_cursor__udp_socket: QUdpSocket | None = None self._predicted_cursor_recording__buffer: list[(float, np.ndarray)] = [] # Initialize Virtual Cursor Interface UI self.initialize_ui_logic()
[docs] def _get_cursor_py_executable(self) -> str: """Get the path to the Python main script for the cursor application.""" python_executable = sys.executable script_dir = os.path.dirname(__file__) base_gui_path = os.path.abspath(os.path.join(script_dir, "..", "..", "..")) main_cursor_path = os.path.join(base_gui_path, "cursor_interface", "main_cursor.py") return main_cursor_path
[docs] def _setup_timers(self): """Setup the timers for the Virtual Cursor Interface.""" self._status_request__timer = QTimer(self) self._status_request__timer.setInterval(2000) self._status_request__timer.timeout.connect(self.write_status_message) self._status_request_timeout__timer = QTimer(self) self._status_request_timeout__timer.setSingleShot(True) self._status_request_timeout__timer.setInterval(1000) self._status_request_timeout__timer.timeout.connect(self._update_status)
[docs] def initialize_ui_logic(self): """Initialize the UI logic for the Virtual Cursor Interface.""" self._main_window.ui.visualInterfacesVerticalLayout.addWidget(self.ui.groupBox) self._toggle_virtual_cursor_interface__push_button: QPushButton = self.ui.toggleVirtualCursorInterfacePushButton self._toggle_virtual_cursor_interface__push_button.clicked.connect(self.toggle_virtual_cursor_interface) self._virtual_cursor_interface__status_widget: QWidget = self.ui.virtualCursorInterfaceStatusWidget self._virtual_cursor_interface__status_widget.setStyleSheet(NOT_CONNECTED_STYLESHEET) self._virtual_cursor_interface__status_widget.setToolTip( "Connection status: Red = Not connected, Green = Connected" ) self._use_external_virtual_cursor_interface__check_box: QCheckBox = ( self.ui.useExternalVirtualCursorInterfaceCheckBox ) self._use_external_virtual_cursor_interface__check_box.setToolTip( "Enable this to use an externally running Virtual Cursor Interface instead of launching the built-in one" )
[docs] def start_interface(self): """Start the Virtual Cursor Interface.""" if not self._use_external_virtual_cursor_interface__check_box.isChecked(): # Connect to process finished signal before starting self._cursor_process.finished.connect(self.stop_interface) self._cursor_process.start() self._cursor_process.waitForStarted() self._status_request__timer.start() self.toggle_streaming()
[docs] def stop_interface(self): """Stop the Virtual Cursor Interface.""" if not self._use_external_virtual_cursor_interface__check_box.isChecked(): # Disconnect from process finished signal try: self._cursor_process.finished.disconnect(self.stop_interface) except (TypeError, RuntimeError): pass # Signal was not connected if self._cursor_process.state() != QProcess.NotRunning: self._cursor_process.kill() self._cursor_process.waitForFinished() QMetaObject.invokeMethod(self._status_request__timer, "stop", Qt.QueuedConnection) self.toggle_streaming()
[docs] def interface_was_killed(self) -> None: """Handle the case when the Virtual Cursor Interface was killed or finished.""" self._toggle_virtual_cursor_interface__push_button.setChecked(False) self._toggle_virtual_cursor_interface__push_button.setText("Open") self._use_external_virtual_cursor_interface__check_box.setEnabled(True) self._virtual_cursor_interface__status_widget.setStyleSheet(NOT_CONNECTED_STYLESHEET) self._is_connected = False
[docs] def close_event(self, _: QCloseEvent) -> None: """Handle the close event of the Virtual Cursor Interface.""" try: if self._streaming__udp_socket: self._streaming__udp_socket.close() if self._predicted_cursor__udp_socket: self._predicted_cursor__udp_socket.close() if self._cursor_process.state() != QProcess.NotRunning: self._cursor_process.kill() self._cursor_process.waitForFinished() except Exception as e: self._main_window.logger.print(f"Error during cleanup: {e}", level=LoggerLevel.ERROR)
[docs] def _update_status(self) -> None: """Update the status of the Virtual Cursor Interface.""" self._is_connected = False self._virtual_cursor_interface__status_widget.setStyleSheet(NOT_CONNECTED_STYLESHEET)
[docs] def toggle_virtual_cursor_interface(self): """Toggle the Virtual Cursor Interface.""" if self._toggle_virtual_cursor_interface__push_button.isChecked(): print("Opening Virtual Cursor Interface") self.start_interface() self._use_external_virtual_cursor_interface__check_box.setEnabled(False) self._toggle_virtual_cursor_interface__push_button.setText("Close") else: print("Closing Virtual Cursor Interface") self.stop_interface() self._use_external_virtual_cursor_interface__check_box.setEnabled(True) self._toggle_virtual_cursor_interface__push_button.setText("Open")
[docs] def toggle_streaming(self) -> None: """Toggle the streaming of the Virtual Cursor Interface.""" if self._toggle_virtual_cursor_interface__push_button.isChecked(): self._streaming__udp_socket = QUdpSocket(self) self._streaming__udp_socket.readyRead.connect(self.read_message) self.outgoing_message_signal.connect(self.write_message) self._streaming__udp_socket.bind(QHostAddress(SOCKET_IP), MYOGESTIC_UDP_PORT) self._predicted_cursor__udp_socket = QUdpSocket(self) self._predicted_cursor__udp_socket.bind(QHostAddress(SOCKET_IP), VCI_PREDICTION__UDP_PORT) self._predicted_cursor__udp_socket.readyRead.connect(self.read_predicted_cursor) self._last_message_time = time.time() else: try: self._streaming__udp_socket.close() self._predicted_cursor__udp_socket.close() except AttributeError: pass self._streaming__udp_socket = None self._predicted_cursor__udp_socket = None self._is_connected = False self._virtual_cursor_interface__status_widget.setStyleSheet(NOT_CONNECTED_STYLESHEET)
[docs] def read_predicted_cursor(self) -> None: """Read the predicted cursor data from the Virtual Cursor Interface.""" while self._predicted_cursor__udp_socket.hasPendingDatagrams(): datagram, _, _ = self._predicted_cursor__udp_socket.readDatagram( self._predicted_cursor__udp_socket.pendingDatagramSize() ) data = datagram.data().decode("utf-8") if not data: return self.predicted_cursor__signal.emit(np.array(ast.literal_eval(data)))
[docs] def write_message(self, message: QByteArray) -> None: """Write a message to the Virtual Cursor Interface.""" if self._is_connected and (time.time() - self._last_message_time >= TIME_BETWEEN_MESSAGES): self._last_message_time = time.time() output_bytes = self._streaming__udp_socket.writeDatagram(message, QHostAddress(SOCKET_IP), VCI__UDP_PORT) if output_bytes == -1: self._main_window.logger.print( "Error in sending message to Virtual Cursor Interface!", level=LoggerLevel.ERROR, )
[docs] def read_message(self) -> None: """Read a message from the Virtual Cursor Interface.""" if self._toggle_virtual_cursor_interface__push_button.isChecked(): while self._streaming__udp_socket.hasPendingDatagrams(): datagram, _, _ = self._streaming__udp_socket.readDatagram( self._streaming__udp_socket.pendingDatagramSize() ) try: data_str = datagram.data().decode("utf-8") if not data_str: return if data_str == STATUS_RESPONSE: self._is_connected = True self._virtual_cursor_interface__status_widget.setStyleSheet(CONNECTED_STYLESHEET) self._status_request_timeout__timer.stop() return # Default values task_label = "" coord_data_str = data_str # Check for underscore to separate coordinates and task label if "_" in data_str: parts = data_str.rsplit("_", 1) if len(parts) == 2: coord_data_str = parts[0] task_label = parts[1] try: # Convert the coordinate part to a numpy array coord_array = np.array(ast.literal_eval(coord_data_str)) self.incoming_message_signal.emit(coord_array) except (SyntaxError, ValueError) as e: self._main_window.logger.print( f'Error parsing coordinate data "{coord_data_str}" from message "{data_str}": {e}', level=LoggerLevel.WARNING, ) except UnicodeDecodeError: self._main_window.logger.print( f"Failed to decode UDP message: {datagram.data()[:50]}...", level=LoggerLevel.WARNING ) # Log part of the message except Exception as e: # Catch any other unexpected errors during message processing self._main_window.logger.print( f"Unexpected error processing message in read_message: {e}", level=LoggerLevel.ERROR )
[docs] def write_status_message(self) -> None: """Write a status message to the Virtual Cursor Interface.""" if self._toggle_virtual_cursor_interface__push_button.isChecked(): output_bytes = self._streaming__udp_socket.writeDatagram( STATUS_REQUEST.encode("utf-8"), QHostAddress(SOCKET_IP), VCI_STATUS__UDP_PORT, ) if output_bytes == -1: self._main_window.logger.print( "Error in sending status message to Virtual Cursor Interface!", level=LoggerLevel.ERROR, ) return self._status_request_timeout__timer.start()
[docs] def connect_custom_signals(self) -> None: """Connect custom signals for the Virtual Cursor Interface.""" self.predicted_cursor__signal.connect(self.online_predicted_cursor_update)
[docs] def disconnect_custom_signals(self) -> None: """Disconnect custom signals for the Virtual Cursor Interface.""" self.predicted_cursor__signal.disconnect(self.online_predicted_cursor_update)
[docs] def get_custom_save_data(self) -> dict: """Get custom save data for the Virtual Cursor Interface.""" return { "predicted_cursor": np.vstack( [data for _, data in self._predicted_cursor_recording__buffer], ).T, "predicted_cursor_timings": np.array( [time for time, _ in self._predicted_cursor_recording__buffer], ), }
[docs] def clear_custom_signal_buffers(self) -> None: """Clear custom signal buffers for the Virtual Cursor Interface.""" self._predicted_cursor_recording__buffer = []
[docs] def online_predicted_cursor_update(self, data: np.ndarray) -> None: """Update the predicted cursor data for the online protocol.""" if self._online_protocol.online_record_toggle_push_button.isChecked(): self._predicted_cursor_recording__buffer.append( (time.time() - self._online_protocol.recording_start_time, data) )