Skip to content

Outputs

Output

Output(hz: float = 50)

Base class for "send the latest pushed vector at hz" outputs.

Subclass to define a new transport: override :meth:_send with the actual write (LSL push_sample, UDP sendto, serial write, gRPC RPC, ...). The base class handles everything else:

  • A daemon output thread is started in __init__ and runs for the lifetime of the Output. Each tick it reads the latest pushed vector and calls :meth:_send.
  • :meth:push is the caller-facing API: write the latest value to an atomic slot (CPython's GIL guarantees atomic reference assignment). It is latest-wins, not queued - if you push faster than hz, intermediate values are overwritten and never sent. That's the contract.
  • Exceptions raised by :meth:_send are caught, deduplicated per (error class, message) pair, and logged once. A flapping destination logs one line per failure mode and the send thread keeps running.

Subclassing checklist:

  1. Call super().__init__(hz=...) from your __init__ (after opening the underlying socket / serial port / channel - the send thread starts immediately).
  2. Implement _send(self, data: np.ndarray) -> None. Treat data as read-only; validate shape; raise on misuse rather than silently mis-sending.
  3. Override :meth:stop if you need to close a resource (see :class:~myogestic.outputs.UDPOutput for an example).

Outputs are user-owned: instantiate them at module scope, call .push(data) from inside @pipeline.predict. Do not register them with App; the framework does not track them.

Parameters:

Name Type Description Default
hz float

Send rate of the daemon thread in Hz. Default 50. Tune to match your destination's appetite - LSL subscribers handle high rates well, a serial UART or a gRPC server may not.

50
Example

from myogestic.outputs import Output import socket, numpy as np

class MyOutput(Output): ... def init(self, addr, hz=50): ... self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ... self._addr = addr ... super().init(hz=hz) ... def _send(self, data): ... self._sock.sendto(data.astype(np.float32).tobytes(), ... self._addr)

Source code in myogestic/outputs/__init__.py
def __init__(self, hz: float = 50):
    self._latest: np.ndarray | None = None
    self._hz = hz
    self._running = True
    # Dedup key per (exception class name, str(exception)) - log the
    # first occurrence per kind, suppress subsequent ones so a noisy
    # disconnect does not spam the log.
    self._seen_send_errors: set[tuple[str, str]] = set()
    if _IS_BROWSER:
        # Pyodide: no threads, and asyncio tasks don't dispatch while
        # immapp.run blocks Python. Register one step with the
        # per-frame scheduler that the App's GUI callback ticks.
        from myogestic._browser import register
        register(lambda: self._send_step() if self._running else 1.0)
    else:
        self._thread = threading.Thread(target=self._send_loop, daemon=True)
        self._thread.start()

push

push(data: ndarray) -> None

Set the latest-value slot. Atomic; latest-wins; non-blocking.

Call from inside @pipeline.predict. The send thread picks the value up on its next tick. If you push faster than hz, intermediate values are dropped.

Source code in myogestic/outputs/__init__.py
def push(self, data: np.ndarray) -> None:
    """Set the latest-value slot. Atomic; latest-wins; non-blocking.

    Call from inside ``@pipeline.predict``. The send thread picks
    the value up on its next tick. If you push faster than ``hz``,
    intermediate values are dropped.
    """
    self._latest = data  # GIL guarantees atomic ref assignment

stop

stop() -> None

Stop the send thread. Subclasses that hold resources (sockets, serial ports) should override and call super().stop() first.

Source code in myogestic/outputs/__init__.py
def stop(self) -> None:
    """Stop the send thread. Subclasses that hold resources (sockets,
    serial ports) should override and call ``super().stop()`` first."""
    self._running = False

LSLOutlet

LSLOutlet(name: str, n_channels: int, hz: float = 50)

Bases: Output

Publish a 1-D vector to a Lab Streaming Layer outlet.

The dual of :class:~myogestic.sources.LSLSource - call .push(vec) from inside @pipeline.predict, and the framework's daemon output thread re-sends the latest pushed vector at the configured hz. Channel count is locked at construction time so the LSL metadata matches what subscribers see.

Parameters:

Name Type Description Default
name str

Outlet name advertised on the LSL network. Typically the stream name that downstream tools (the Virtual Hand, a recorder, another MyoGestic app) resolve by.

required
n_channels int

Fixed channel count. Push vectors must have this length or :meth:_send raises ValueError instead of silently mis-sending.

required
hz float

Send rate of the daemon thread (Hz). Default 50. Push faster than hz is fine: latest-wins, the slot just gets overwritten.

50
Example

outlet = LSLOutlet("VHI_Hand", n_channels=9, hz=32) @pipeline.predict ... def predict(model, features): ... pose = model.compose_pose(features) ... outlet.push(pose) ... return {"pose": pose}

Source code in myogestic/outputs/lsl.py
def __init__(self, name: str, n_channels: int, hz: float = 50):
    info = StreamInfo(name, "Control", n_channels, hz, "float32", "")
    self._outlet = StreamOutlet(info)
    self._n_channels = int(n_channels)
    super().__init__(hz=hz)

UDPOutput

UDPOutput(host: str, port: int, hz: float = 50)

Bases: Output

Send the latest pushed vector as a UDP datagram to host:port.

Each _send writes one datagram containing the float32 bytes of data (no length header, no framing). The receiver is expected to know the channel count and dtype out-of-band - this is for cases where a downstream process (Unity, ROS, a Max/MSP patch) has its own decoder and you just want the freshest vector with minimal latency.

Parameters:

Name Type Description Default
host str

Destination hostname or IP. "127.0.0.1" for same-machine consumers; a LAN address for the next box over.

required
port int

Destination UDP port.

required
hz float

Send rate of the daemon thread (Hz). Default 50.

50
Example

outlet = UDPOutput("127.0.0.1", 9000, hz=60) @pipeline.predict ... def predict(model, features): ... vec = model.predict(features.reshape(1, -1))[0] ... outlet.push(vec) ... return {"vec": vec}

Source code in myogestic/outputs/udp.py
def __init__(self, host: str, port: int, hz: float = 50):
    self._addr = (host, port)
    self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    super().__init__(hz=hz)

stop

stop() -> None

Stop the daemon thread and close the UDP socket.

Source code in myogestic/outputs/udp.py
def stop(self) -> None:
    """Stop the daemon thread and close the UDP socket."""
    super().stop()
    self._sock.close()

SerialOutput

Opt-in: lives at myogestic.outputs.serial_output.SerialOutput. Direct import only (requires the serial extra for pyserial).

SerialOutput

SerialOutput(port: str, baud: int = 115200, hz: float = 10, fmt: str = 'f')

Bases: Output

Pack the latest vector with struct and write it to a serial port.

Each _send writes one little-endian packed frame: len(data) values of format-char fmt. No framing byte, no checksum - the receiver (an Arduino, a microcontroller, a haptic driver) is expected to read a fixed-size frame matching its known channel count.

Parameters:

Name Type Description Default
port str

Serial port path. Examples: "/dev/cu.usbmodem1101" on macOS, "/dev/ttyACM0" on Linux, "COM3" on Windows.

required
baud int

Baud rate. Default 115200 - matches typical Arduino sketches; tune to your firmware.

115200
hz float

Send rate of the daemon thread (Hz). Default 10. Keep this low: serial UART has a hard ceiling and queue-induced latency stacks fast.

10
fmt str

struct format character per value. "f" for float32 (default), "B" for uint8, "h" for int16, etc. See the Python struct docs.

'f'
Example

haptic = SerialOutput("/dev/ttyACM0", baud=115200, hz=30, ... fmt="B") @pipeline.predict ... def predict(model, features): ... intensities = (model.predict(features.reshape(1, -1))[0] * 255 ... ).clip(0, 255).astype(np.uint8) ... haptic.push(intensities) ... return {"intensities": intensities}

Requires the serial extra (uv sync --extra serial) to pull in pyserial.

Source code in myogestic/outputs/serial_output.py
def __init__(self, port: str, baud: int = 115200, hz: float = 10, fmt: str = "f"):
    import serial  # type: ignore[import-not-found]

    self._ser = serial.Serial(port, baud)
    self._fmt = fmt
    super().__init__(hz=hz)