Skip to content

Outputs

Output

Output(hz: float = 50)

Base class for "send the latest pushed vector at hz" outputs.

Subclass to define a new transport: override :meth:_send with the actual write (LSL push_sample, UDP sendto, serial write, gRPC RPC, ...). The base class handles everything else:

  • A daemon output thread is started in __init__ and runs for the lifetime of the Output. Each tick it reads the latest pushed vector and calls :meth:_send.
  • :meth:push is the caller-facing API: write the latest value to an atomic slot (CPython's GIL guarantees atomic reference assignment). It is latest-wins, not queued - if you push faster than hz, intermediate values are overwritten and never sent. That's the contract.
  • Exceptions raised by :meth:_send are caught, deduplicated per (error class, message) pair, and logged once. A flapping destination logs one line per failure mode and the send thread keeps running.

Subclassing checklist:

  1. Call super().__init__(hz=...) from your __init__ (after opening the underlying socket / serial port / channel - the send thread starts immediately).
  2. Implement _send(self, data: np.ndarray) -> None. Treat data as read-only; validate shape; raise on misuse rather than silently mis-sending.
  3. Override :meth:stop if you need to close a resource (see :class:~myogestic.outputs.UDPOutput for an example).

Outputs are user-owned: instantiate them at module scope, call .push(data) from inside @pipeline.predict. Do not register them with App; the framework does not track them.

Parameters:

Name Type Description Default
hz float

Send rate of the daemon thread in Hz. Default 50. Tune to match your destination's appetite - LSL subscribers handle high rates well, a serial UART or a gRPC server may not.

50

Examples:

>>> from myogestic.outputs import Output
>>> import socket, numpy as np
>>>
>>> class MyOutput(Output):
...     def __init__(self, addr, hz=50):
...         self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
...         self._addr = addr
...         super().__init__(hz=hz)
...     def _send(self, data):
...         self._sock.sendto(data.astype(np.float32).tobytes(),
...                           self._addr)
Source code in myogestic/outputs/base.py
def __init__(self, hz: float = 50):
    self._latest: np.ndarray | None = None
    self._hz = hz
    self._running = True
    # Dedup key per (exception class name, str(exception)) - log the
    # first occurrence per kind, suppress subsequent ones so a noisy
    # disconnect does not spam the log.
    self._seen_send_errors: set[tuple[str, str]] = set()
    if _IS_BROWSER:
        # Pyodide: no threads, and asyncio tasks don't dispatch while
        # immapp.run blocks Python. Register one step with the
        # per-frame scheduler that the App's GUI callback ticks.
        from myogestic._browser import register

        register(lambda: self._send_step() if self._running else 1.0)
    else:
        self._thread = threading.Thread(target=self._send_loop, daemon=True)
        self._thread.start()

push

push(data: ndarray) -> None

Set the latest-value slot. Atomic; latest-wins; non-blocking.

Call from inside @pipeline.predict. The send thread picks the value up on its next tick. If you push faster than hz, intermediate values are dropped.

Source code in myogestic/outputs/base.py
def push(self, data: np.ndarray) -> None:
    """Set the latest-value slot. Atomic; latest-wins; non-blocking.

    Call from inside ``@pipeline.predict``. The send thread picks
    the value up on its next tick. If you push faster than ``hz``,
    intermediate values are dropped.
    """
    self._latest = data  # GIL guarantees atomic ref assignment

stop

stop() -> None

Stop the send thread.

Subclasses that hold resources (sockets, serial ports) should override and call super().stop() first.

Source code in myogestic/outputs/base.py
def stop(self) -> None:
    """Stop the send thread.

    Subclasses that hold resources (sockets, serial ports) should
    override and call ``super().stop()`` first.
    """
    self._running = False

LSLOutlet

LSLOutlet(name: str, n_channels: int, hz: float = 50)

Bases: Output

Publish a 1-D vector to a Lab Streaming Layer outlet.

The dual of :class:~myogestic.sources.LSLSource - call .push(vec) from inside @pipeline.predict, and the framework's daemon output thread re-sends the latest pushed vector at the configured hz. Channel count is locked at construction time so the LSL metadata matches what subscribers see.

Parameters:

Name Type Description Default
name str

Outlet name advertised on the LSL network. Typically the stream name that downstream tools (the Virtual Hand, a recorder, another MyoGestic app) resolve by.

required
n_channels int

Fixed channel count. Push vectors must have this length or :meth:_send raises ValueError instead of silently mis-sending.

required
hz float

Send rate of the daemon thread (Hz). Default 50. Push faster than hz is fine: latest-wins, the slot just gets overwritten.

50

Examples:

>>> outlet = LSLOutlet("VHI_Hand", n_channels=9, hz=32)
>>> @pipeline.predict
... def predict(model, features):
...     pose = model.compose_pose(features)
...     outlet.push(pose)
...     return {"pose": pose}
Source code in myogestic/outputs/lsl.py
def __init__(self, name: str, n_channels: int, hz: float = 50):
    info = StreamInfo(name, "Control", n_channels, hz, "float32", "")
    self._outlet = StreamOutlet(info)
    self._n_channels = int(n_channels)
    super().__init__(hz=hz)

UDPOutput

UDPOutput(host: str, port: int, hz: float = 50)

Bases: Output

Send the latest pushed vector as a UDP datagram to host:port.

Each _send writes one datagram containing the float32 bytes of data (no length header, no framing). The receiver is expected to know the channel count and dtype out-of-band - this is for cases where a downstream process (Unity, ROS, a Max/MSP patch) has its own decoder and you just want the freshest vector with minimal latency.

Parameters:

Name Type Description Default
host str

Destination hostname or IP. "127.0.0.1" for same-machine consumers; a LAN address for the next box over.

required
port int

Destination UDP port.

required
hz float

Send rate of the daemon thread (Hz). Default 50.

50

Examples:

>>> outlet = UDPOutput("127.0.0.1", 9000, hz=60)
>>> @pipeline.predict
... def predict(model, features):
...     vec = model.predict(features.reshape(1, -1))[0]
...     outlet.push(vec)
...     return {"vec": vec}
Source code in myogestic/outputs/udp.py
def __init__(self, host: str, port: int, hz: float = 50):
    self._addr = (host, port)
    self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    super().__init__(hz=hz)

stop

stop() -> None

Stop the daemon thread and close the UDP socket.

Source code in myogestic/outputs/udp.py
def stop(self) -> None:
    """Stop the daemon thread and close the UDP socket."""
    super().stop()
    self._sock.close()

SerialOutput

Opt-in: lives at myogestic.outputs.serial_output.SerialOutput. Direct import only (requires the serial extra for pyserial).

SerialOutput

SerialOutput(port: str, baud: int = 115200, hz: float = 10, fmt: str = 'f')

Bases: Output

Pack the latest vector with struct and write it to a serial port.

Each _send writes one little-endian packed frame: len(data) values of format-char fmt. No framing byte, no checksum - the receiver (an Arduino, a microcontroller, a haptic driver) is expected to read a fixed-size frame matching its known channel count.

Parameters:

Name Type Description Default
port str

Serial port path. Examples: "/dev/cu.usbmodem1101" on macOS, "/dev/ttyACM0" on Linux, "COM3" on Windows.

required
baud int

Baud rate. Default 115200 - matches typical Arduino sketches; tune to your firmware.

115200
hz float

Send rate of the daemon thread (Hz). Default 10. Keep this low: serial UART has a hard ceiling and queue-induced latency stacks fast.

10
fmt str

struct format character per value. "f" for float32 (default), "B" for uint8, "h" for int16, etc. See the Python struct docs.

'f'

Examples:

>>> haptic = SerialOutput("/dev/ttyACM0", baud=115200, hz=30,
...                       fmt="B")
>>> @pipeline.predict
... def predict(model, features):
...     intensities = (model.predict(features.reshape(1, -1))[0] * 255
...                    ).clip(0, 255).astype(np.uint8)
...     haptic.push(intensities)
...     return {"intensities": intensities}

Requires the serial extra (uv sync --extra serial) to pull in pyserial.

Source code in myogestic/outputs/serial_output.py
def __init__(self, port: str, baud: int = 115200, hz: float = 10, fmt: str = "f"):
    import serial  # type: ignore

    self._ser = serial.Serial(port, baud)
    self._fmt = fmt
    super().__init__(hz=hz)