Outputs¶
Output ¶
Output(hz: float = 50)
Base class for "send the latest pushed vector at hz" outputs.
Subclass to define a new transport: override :meth:_send with the
actual write (LSL push_sample, UDP sendto, serial write, gRPC
RPC, ...). The base class handles everything else:
- A daemon output thread is started in
__init__and runs for the lifetime of the Output. Each tick it reads the latest pushed vector and calls :meth:_send. - :meth:
pushis the caller-facing API: write the latest value to an atomic slot (CPython's GIL guarantees atomic reference assignment). It is latest-wins, not queued - if you push faster thanhz, intermediate values are overwritten and never sent. That's the contract. - Exceptions raised by :meth:
_sendare caught, deduplicated per(error class, message)pair, and logged once. A flapping destination logs one line per failure mode and the send thread keeps running.
Subclassing checklist:
- Call
super().__init__(hz=...)from your__init__(after opening the underlying socket / serial port / channel - the send thread starts immediately). - Implement
_send(self, data: np.ndarray) -> None. Treatdataas read-only; validate shape; raise on misuse rather than silently mis-sending. - Override :meth:
stopif you need to close a resource (see :class:~myogestic.outputs.UDPOutputfor an example).
Outputs are user-owned: instantiate them at module scope, call
.push(data) from inside @pipeline.predict. Do not register
them with App; the framework does not track them.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hz
|
float
|
Send rate of the daemon thread in Hz. Default 50. Tune to match your destination's appetite - LSL subscribers handle high rates well, a serial UART or a gRPC server may not. |
50
|
Example
from myogestic.outputs import Output import socket, numpy as np
class MyOutput(Output): ... def init(self, addr, hz=50): ... self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ... self._addr = addr ... super().init(hz=hz) ... def _send(self, data): ... self._sock.sendto(data.astype(np.float32).tobytes(), ... self._addr)
Source code in myogestic/outputs/__init__.py
push ¶
push(data: ndarray) -> None
Set the latest-value slot. Atomic; latest-wins; non-blocking.
Call from inside @pipeline.predict. The send thread picks
the value up on its next tick. If you push faster than hz,
intermediate values are dropped.
Source code in myogestic/outputs/__init__.py
stop ¶
Stop the send thread. Subclasses that hold resources (sockets,
serial ports) should override and call super().stop() first.
LSLOutlet ¶
Bases: Output
Publish a 1-D vector to a Lab Streaming Layer outlet.
The dual of :class:~myogestic.sources.LSLSource - call .push(vec)
from inside @pipeline.predict, and the framework's daemon output
thread re-sends the latest pushed vector at the configured hz.
Channel count is locked at construction time so the LSL metadata
matches what subscribers see.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Outlet name advertised on the LSL network. Typically the stream name that downstream tools (the Virtual Hand, a recorder, another MyoGestic app) resolve by. |
required |
n_channels
|
int
|
Fixed channel count. Push vectors must have this
length or :meth: |
required |
hz
|
float
|
Send rate of the daemon thread (Hz). Default 50. Push
faster than |
50
|
Example
outlet = LSLOutlet("VHI_Hand", n_channels=9, hz=32) @pipeline.predict ... def predict(model, features): ... pose = model.compose_pose(features) ... outlet.push(pose) ... return {"pose": pose}
Source code in myogestic/outputs/lsl.py
UDPOutput ¶
Bases: Output
Send the latest pushed vector as a UDP datagram to host:port.
Each _send writes one datagram containing the float32 bytes of
data (no length header, no framing). The receiver is expected
to know the channel count and dtype out-of-band - this is for cases
where a downstream process (Unity, ROS, a Max/MSP patch) has its
own decoder and you just want the freshest vector with minimal
latency.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
Destination hostname or IP. |
required |
port
|
int
|
Destination UDP port. |
required |
hz
|
float
|
Send rate of the daemon thread (Hz). Default 50. |
50
|
Example
outlet = UDPOutput("127.0.0.1", 9000, hz=60) @pipeline.predict ... def predict(model, features): ... vec = model.predict(features.reshape(1, -1))[0] ... outlet.push(vec) ... return {"vec": vec}
Source code in myogestic/outputs/udp.py
SerialOutput¶
Opt-in: lives at myogestic.outputs.serial_output.SerialOutput. Direct import only (requires the serial extra for pyserial).
SerialOutput ¶
Bases: Output
Pack the latest vector with struct and write it to a serial port.
Each _send writes one little-endian packed frame: len(data)
values of format-char fmt. No framing byte, no checksum - the
receiver (an Arduino, a microcontroller, a haptic driver) is
expected to read a fixed-size frame matching its known channel
count.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
port
|
str
|
Serial port path. Examples: |
required |
baud
|
int
|
Baud rate. Default 115200 - matches typical Arduino sketches; tune to your firmware. |
115200
|
hz
|
float
|
Send rate of the daemon thread (Hz). Default 10. Keep this low: serial UART has a hard ceiling and queue-induced latency stacks fast. |
10
|
fmt
|
str
|
|
'f'
|
Example
haptic = SerialOutput("/dev/ttyACM0", baud=115200, hz=30, ... fmt="B") @pipeline.predict ... def predict(model, features): ... intensities = (model.predict(features.reshape(1, -1))[0] * 255 ... ).clip(0, 255).astype(np.uint8) ... haptic.push(intensities) ... return {"intensities": intensities}
Requires the serial extra (uv sync --extra serial) to pull
in pyserial.