Anatomy of a MyoGestic app¶
Read this first. The deep-dive concept pages (Streams, Pipeline, Recording, Widgets) explain each part of the framework in detail; this page explains how the parts fit together in the order you write them.
By the end, the 35-line script below will read like prose. Open it in your editor and read alongside.
from myogestic import App, Stream, TrainingData
from myogestic.ml import Pipeline, save_pickle, load_pickle
from myogestic.ml.widgets import pipeline_panel
from myogestic.session import iter_labeled_windows
from myogestic.sources import LSLSource
from myogestic.widgets import recording_controls, session_manager, signal_viewer
from sklearn.linear_model import LogisticRegression
import numpy as np
app = App("EMG Demo")
app.streams(Stream("emg", source=LSLSource("EMG"), window_seconds=0.2))
pipeline = Pipeline(app, predict_hz=20)
pipeline.save_model = save_pickle
pipeline.load_model = load_pickle
@pipeline.extract
def extract(windows):
return windows["emg"].mean(axis=1)
@pipeline.train
def train(data: TrainingData):
X, y = [], []
for sw, _ts, c in iter_labeled_windows(data.paths, "emg", 0.2, 0.1, classes=data.classes):
X.append(extract({"emg": sw.data}))
y.append(c)
return LogisticRegression().fit(np.array(X), np.array(y))
@pipeline.predict
def predict(model, features):
return {"class": int(model.predict(features.reshape(1, -1))[0])}
@app.ui
def ui(ctx):
signal_viewer(ctx, "emg")
recording_controls(
ctx, ["Rest", "Fist"], on_record=app.start_recording, on_stop=app.stop_recording
)
pipeline.training_data = session_manager("sessions", class_names=["Rest", "Fist"])
pipeline_panel(pipeline)
app.run()
The shape¶
A MyoGestic app is one App object, with streams flowing through it, optionally a Pipeline that adds ML lifecycle, and a @app.ui callback that draws widgets every frame. app.run() boots threads and blocks on the GUI loop. That is the whole skeleton; everything else is variation.
flowchart LR
A["App('Demo')"]
S["Stream('emg', ...)"]
P["Pipeline(app)"]
UI["@app.ui ui(ctx)"]
R["app.run()"]
A --> S
A --> P
A --> UI
A --> R
S -.flows into.-> UI
P -.runs predict thread on.-> S
Imports, in three layers¶
from myogestic import App, Stream, TrainingData # the orchestrator + data primitives
from myogestic.ml import Pipeline, save_pickle, load_pickle # ML lifecycle layer (opt-in)
from myogestic.session import iter_labeled_windows # session reading helpers
from myogestic.sources import LSLSource # one of the built-in sources
from myogestic.widgets import recording_controls, ... # stateless ImGui widgets
from sklearn.linear_model import LogisticRegression # YOUR ML library
Three layers, from framework-owned to user-owned:
- Framework primitives (
App,Stream,Pipeline,TrainingData) - what you compose. - Built-in capabilities (
sources,widgets,ml,session) - what you wire in. - Your domain code (sklearn, MyoVerse, scipy, PyTorch, ...) - what you bring.
The framework deliberately ships nothing at layer 3 - DSP, ML models, feature extraction are user concerns. See Design principles.
App - the orchestrator¶
One per process. App owns the GUI loop, the shared Context, the run-loop lifecycle hooks, and the recording state machine. Construct it, then attach things (streams, widgets, optional pipeline), then call app.run().
App is not a base class to subclass. There is no class MyApp(App):. You compose by attaching, not inheriting.
→ Deep dive: Architecture, Design principles.
Stream - the data plumbing¶
A Stream wraps a Source plus a fixed-memory ring buffer. Each stream owns one daemon acquisition thread that polls the source, appends to the buffer, refreshes the display snapshot, and (when app.start_recording() is active) writes to a Zarr array.
Two reads come out the other side:
stream.get_window()returns the most recentwindow_secondsof data, channels-first(n_channels, n_samples). This is what feature extractors and ML models consume.stream.get_display(n_pixels)returns a min/max envelope decimated for rendering. This is whatsignal_viewerconsumes.
app.streams(*streams) registers one or more. The same method takes any Source implementation, so swapping LSLSource for ReplaySource (offline replay) or your own custom source changes one line.
→ Deep dive: Streams.
Pipeline - the optional ML lifecycle¶
pipeline = Pipeline(app, predict_hz=20)
pipeline.save_model = save_pickle
pipeline.load_model = load_pickle
Pipeline(app) adds:
- A predict thread that fires every
1/predict_hzseconds. - Two new states (
training,predicting) joined to the app'sidle ↔ recordingstate machine. - Three function-decorator slots (
extract,train,predict) for your code.
If your app is just recording and visualisation, skip the Pipeline. If it does ML, attach one.
save_model / load_model are plain attribute slots. Setting them turns on the Save Model / Load Model buttons in the ML widgets. The default save_pickle / load_pickle round-trip anything pickleable.
→ Deep dive: Pipeline.
@pipeline.extract - feature extraction¶
Receives a dict keyed by stream name. Each value is channels-first (n_channels, n_samples). Return whatever your model wants to consume - a feature vector, a tuple, the raw window. Whatever you return becomes the features argument to predict().
The same function runs from inside train() over recorded windows, and on the predict thread over live windows. Same code, two call sites.
@pipeline.train - training¶
@pipeline.train
def train(data: TrainingData):
X, y = [], []
for sw, _ts, c in iter_labeled_windows(data.paths, "emg", 0.2, 0.1, classes=data.classes):
X.append(extract({"emg": sw.data}))
y.append(c)
return LogisticRegression().fit(np.array(X), np.array(y))
Runs on a one-shot training thread when the user clicks Train. Receives TrainingData (with paths, class_names, classes) populated by session_manager. Return any object - it lands on pipeline.model and stays there.
iter_labeled_windows does the heavy lifting of opening sessions (folder or .session.zip), walking the label track, and slicing trials into overlapping windows. You only write the feature extraction.
→ Deep dive: Pipeline, Recording.
@pipeline.predict - live prediction¶
@pipeline.predict
def predict(model, features):
return {"class": int(model.predict(features.reshape(1, -1))[0])}
Fires once per 1/predict_hz while the user is in Predict mode. Must return a dict[str, Any]; the result lands in pipeline.predictions for widgets to read.
If you push to outputs (LSL, robots, VHI), do it inside predict() before returning. The output thread takes the latest pushed value and drains it at its own rate.
@app.ui - the rendering surface¶
@app.ui
def ui(ctx):
signal_viewer(ctx, "emg")
recording_controls(
ctx, ["Rest", "Fist"], on_record=app.start_recording, on_stop=app.stop_recording
)
pipeline.training_data = session_manager("sessions", class_names=["Rest", "Fist"])
pipeline_panel(pipeline)
The @app.ui callback runs on the main thread every frame. The function body is a flat sequence of widget calls; each widget is a stateless function that draws ImGui commands from ctx (and arguments).
Three things live here:
- Display widgets (
signal_viewer) - render data fromctx.streams. - Control widgets (
recording_controls,pipeline_panel) - fire callbacks to mutate state (start recording, train, predict). - Bridge widgets (
session_manager) - return aTrainingDatayou assign topipeline.training_data. This is how UI selection feeds into ML training.
For grid layouts use Grid and with grid[r, c]:. For pop-out windows use App(docking=True) + app.popout(...).
→ Deep dive: Widgets.
app.run() - the blocking event loop¶
Last line of every script. In order:
- Starts each
Stream's acquisition thread. - Fires
before_run_hooks(wherePipelineregisters its predict thread). - Hands control to the GUI event loop on the main thread - blocks here.
- On window close, runs
cleanup_hooks, then stops all streams and bridges.
app.run(mode="gui") is the default; mode="headless" skips the GUI and runs the threads only (useful for unattended recording).
→ Deep dive: Architecture, Threading.
What's not in this script¶
- Recording. Streams flow whether you record or not. Recording starts when
app.start_recording()is called (typically from therecording_controlswidget's Record button). See Recording. - Outputs. This skeleton predicts a class index but doesn't push to anything physical. Add an
LSLOutlet,UDPOutput,SerialOutput, or your own subclass and calloutlet.push(value)frompredict(). See Add a custom output. - Multi-stream. A second
Stream("imu", ...)and yourextract()seeswindows["imu"]too. Match theiter_aligned_windowspattern intrain()for paired primary/target streams. See Recording. - Bridges. Heavy-data sources (webcam, ultrasound) run as subprocesses via
app.bridges(...). See Threading.
Where to read next¶
You now have the skeleton. The deep-dive concept pages add the meat:
- Streams - ring buffer geometry, the channels-first contract,
get_displaydecimation. - Pipeline - state machine, decorator semantics, stale-tick guards.
- Recording - sessions, label tracks, the
.session.ziparchive. - Widgets - the stateless-function pattern,
Gridlayout, pop-out windows. - Architecture - the runtime architecture (which thread runs what, the data flow diagram).
- Threading - GIL release, GPU contention rule, bridge subprocesses.
- Design principles - the eight rules the framework keeps to.
Or jump straight into a hands-on walkthrough: the EMG classification tutorial builds on this skeleton with real features, real data, and a real model.