Outputs¶
Output ¶
Output(hz: float = 50)
Base class for "send the latest pushed vector at hz" outputs.
Subclass to define a new transport: override :meth:_send with the
actual write (LSL push_sample, UDP sendto, serial write, gRPC
RPC, ...). The base class handles everything else:
- A daemon output thread is started in
__init__and runs for the lifetime of the Output. Each tick it reads the latest pushed vector and calls :meth:_send. - :meth:
pushis the caller-facing API: write the latest value to an atomic slot (CPython's GIL guarantees atomic reference assignment). It is latest-wins, not queued - if you push faster thanhz, intermediate values are overwritten and never sent. That's the contract. - Exceptions raised by :meth:
_sendare caught, deduplicated per(error class, message)pair, and logged once. A flapping destination logs one line per failure mode and the send thread keeps running.
Subclassing checklist:
- Call
super().__init__(hz=...)from your__init__(after opening the underlying socket / serial port / channel - the send thread starts immediately). - Implement
_send(self, data: np.ndarray) -> None. Treatdataas read-only; validate shape; raise on misuse rather than silently mis-sending. - Override :meth:
stopif you need to close a resource (see :class:~myogestic.outputs.UDPOutputfor an example).
Outputs are user-owned: instantiate them at module scope, call
.push(data) from inside @pipeline.predict. Do not register
them with App; the framework does not track them.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
hz
|
float
|
Send rate of the daemon thread in Hz. Default 50. Tune to match your destination's appetite - LSL subscribers handle high rates well, a serial UART or a gRPC server may not. |
50
|
Examples:
>>> from myogestic.outputs import Output
>>> import socket, numpy as np
>>>
>>> class MyOutput(Output):
... def __init__(self, addr, hz=50):
... self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
... self._addr = addr
... super().__init__(hz=hz)
... def _send(self, data):
... self._sock.sendto(data.astype(np.float32).tobytes(),
... self._addr)
Source code in myogestic/outputs/base.py
push ¶
push(data: ndarray) -> None
Set the latest-value slot. Atomic; latest-wins; non-blocking.
Call from inside @pipeline.predict. The send thread picks
the value up on its next tick. If you push faster than hz,
intermediate values are dropped.
Source code in myogestic/outputs/base.py
stop ¶
Stop the send thread.
Subclasses that hold resources (sockets, serial ports) should
override and call super().stop() first.
LSLOutlet ¶
Bases: Output
Publish a 1-D vector to a Lab Streaming Layer outlet.
The dual of :class:~myogestic.sources.LSLSource - call .push(vec)
from inside @pipeline.predict, and the framework's daemon output
thread re-sends the latest pushed vector at the configured hz.
Channel count is locked at construction time so the LSL metadata
matches what subscribers see.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Outlet name advertised on the LSL network. Typically the stream name that downstream tools (the Virtual Hand, a recorder, another MyoGestic app) resolve by. |
required |
n_channels
|
int
|
Fixed channel count. Push vectors must have this
length or :meth: |
required |
hz
|
float
|
Send rate of the daemon thread (Hz). Default 50. Push
faster than |
50
|
Examples:
>>> outlet = LSLOutlet("VHI_Hand", n_channels=9, hz=32)
>>> @pipeline.predict
... def predict(model, features):
... pose = model.compose_pose(features)
... outlet.push(pose)
... return {"pose": pose}
Source code in myogestic/outputs/lsl.py
UDPOutput ¶
Bases: Output
Send the latest pushed vector as a UDP datagram to host:port.
Each _send writes one datagram containing the float32 bytes of
data (no length header, no framing). The receiver is expected
to know the channel count and dtype out-of-band - this is for cases
where a downstream process (Unity, ROS, a Max/MSP patch) has its
own decoder and you just want the freshest vector with minimal
latency.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
host
|
str
|
Destination hostname or IP. |
required |
port
|
int
|
Destination UDP port. |
required |
hz
|
float
|
Send rate of the daemon thread (Hz). Default 50. |
50
|
Examples:
>>> outlet = UDPOutput("127.0.0.1", 9000, hz=60)
>>> @pipeline.predict
... def predict(model, features):
... vec = model.predict(features.reshape(1, -1))[0]
... outlet.push(vec)
... return {"vec": vec}
Source code in myogestic/outputs/udp.py
SerialOutput¶
Opt-in: lives at myogestic.outputs.serial_output.SerialOutput. Direct import only (requires the serial extra for pyserial).
SerialOutput ¶
Bases: Output
Pack the latest vector with struct and write it to a serial port.
Each _send writes one little-endian packed frame: len(data)
values of format-char fmt. No framing byte, no checksum - the
receiver (an Arduino, a microcontroller, a haptic driver) is
expected to read a fixed-size frame matching its known channel
count.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
port
|
str
|
Serial port path. Examples: |
required |
baud
|
int
|
Baud rate. Default 115200 - matches typical Arduino sketches; tune to your firmware. |
115200
|
hz
|
float
|
Send rate of the daemon thread (Hz). Default 10. Keep this low: serial UART has a hard ceiling and queue-induced latency stacks fast. |
10
|
fmt
|
str
|
|
'f'
|
Examples:
>>> haptic = SerialOutput("/dev/ttyACM0", baud=115200, hz=30,
... fmt="B")
>>> @pipeline.predict
... def predict(model, features):
... intensities = (model.predict(features.reshape(1, -1))[0] * 255
... ).clip(0, 255).astype(np.uint8)
... haptic.push(intensities)
... return {"intensities": intensities}
Requires the serial extra (uv sync --extra serial) to pull
in pyserial.