Skip to content

Anatomy of a MyoGestic app

Read this first. The deep-dive concept pages (Streams, Pipeline, Recording, Widgets) explain each part of the framework in detail; this page explains how the parts fit together in the order you write them.

By the end, the 35-line script below will read like prose. Open it in your editor and read alongside.

from myogestic import App, Stream, TrainingData
from myogestic.ml import Pipeline, save_pickle, load_pickle
from myogestic.ml.widgets import pipeline_panel
from myogestic.session import iter_labeled_windows
from myogestic.sources import LSLSource
from myogestic.widgets import recording_controls, session_manager, signal_viewer
from sklearn.linear_model import LogisticRegression
import numpy as np

app = App("EMG Demo")
app.streams(Stream("emg", source=LSLSource("EMG"), window_seconds=0.2))

pipeline = Pipeline(app, predict_hz=20)
pipeline.save_model = save_pickle
pipeline.load_model = load_pickle


@pipeline.extract
def extract(windows):
    return windows["emg"].mean(axis=1)


@pipeline.train
def train(data: TrainingData):
    X, y = [], []
    for sw, _ts, c in iter_labeled_windows(data.paths, "emg", 0.2, 0.1, classes=data.classes):
        X.append(extract({"emg": sw.data}))
        y.append(c)
    return LogisticRegression().fit(np.array(X), np.array(y))


@pipeline.predict
def predict(model, features):
    return {"class": int(model.predict(features.reshape(1, -1))[0])}


@app.ui
def ui(ctx):
    signal_viewer(ctx, "emg")
    recording_controls(
        ctx, ["Rest", "Fist"], on_record=app.start_recording, on_stop=app.stop_recording
    )
    pipeline.training_data = session_manager("sessions", class_names=["Rest", "Fist"])
    pipeline_panel(pipeline)


app.run()

The shape

A MyoGestic app is one App object, with streams flowing through it, optionally a Pipeline that adds ML lifecycle, and a @app.ui callback that draws widgets every frame. app.run() boots threads and blocks on the GUI loop. That is the whole skeleton; everything else is variation.

flowchart LR
    A["App('Demo')"]
    S["Stream('emg', ...)"]
    P["Pipeline(app)"]
    UI["@app.ui ui(ctx)"]
    R["app.run()"]

    A --> S
    A --> P
    A --> UI
    A --> R
    S -.flows into.-> UI
    P -.runs predict thread on.-> S

Imports, in three layers

from myogestic import App, Stream, TrainingData            # the orchestrator + data primitives
from myogestic.ml import Pipeline, save_pickle, load_pickle # ML lifecycle layer (opt-in)
from myogestic.session import iter_labeled_windows         # session reading helpers
from myogestic.sources import LSLSource                    # one of the built-in sources
from myogestic.widgets import recording_controls, ...      # stateless ImGui widgets
from sklearn.linear_model import LogisticRegression        # YOUR ML library

Three layers, from framework-owned to user-owned:

  1. Framework primitives (App, Stream, Pipeline, TrainingData) - what you compose.
  2. Built-in capabilities (sources, widgets, ml, session) - what you wire in.
  3. Your domain code (sklearn, MyoVerse, scipy, PyTorch, ...) - what you bring.

The framework deliberately ships nothing at layer 3 - DSP, ML models, feature extraction are user concerns. See Design principles.

App - the orchestrator

app = App("EMG Demo")

One per process. App owns the GUI loop, the shared Context, the run-loop lifecycle hooks, and the recording state machine. Construct it, then attach things (streams, widgets, optional pipeline), then call app.run().

App is not a base class to subclass. There is no class MyApp(App):. You compose by attaching, not inheriting.

→ Deep dive: Architecture, Design principles.

Stream - the data plumbing

app.streams(Stream("emg", source=LSLSource("EMG"), window_seconds=0.2))

A Stream wraps a Source plus a fixed-memory ring buffer. Each stream owns one daemon acquisition thread that polls the source, appends to the buffer, refreshes the display snapshot, and (when app.start_recording() is active) writes to a Zarr array.

Two reads come out the other side:

  • stream.get_window() returns the most recent window_seconds of data, channels-first (n_channels, n_samples). This is what feature extractors and ML models consume.
  • stream.get_display(n_pixels) returns a min/max envelope decimated for rendering. This is what signal_viewer consumes.

app.streams(*streams) registers one or more. The same method takes any Source implementation, so swapping LSLSource for ReplaySource (offline replay) or your own custom source changes one line.

→ Deep dive: Streams.

Pipeline - the optional ML lifecycle

pipeline = Pipeline(app, predict_hz=20)
pipeline.save_model = save_pickle
pipeline.load_model = load_pickle

Pipeline(app) adds:

  • A predict thread that fires every 1/predict_hz seconds.
  • Two new states (training, predicting) joined to the app's idle ↔ recording state machine.
  • Three function-decorator slots (extract, train, predict) for your code.

If your app is just recording and visualisation, skip the Pipeline. If it does ML, attach one.

save_model / load_model are plain attribute slots. Setting them turns on the Save Model / Load Model buttons in the ML widgets. The default save_pickle / load_pickle round-trip anything pickleable.

→ Deep dive: Pipeline.

@pipeline.extract - feature extraction

@pipeline.extract
def extract(windows):
    return windows["emg"].mean(axis=1)

Receives a dict keyed by stream name. Each value is channels-first (n_channels, n_samples). Return whatever your model wants to consume - a feature vector, a tuple, the raw window. Whatever you return becomes the features argument to predict().

The same function runs from inside train() over recorded windows, and on the predict thread over live windows. Same code, two call sites.

@pipeline.train - training

@pipeline.train
def train(data: TrainingData):
    X, y = [], []
    for sw, _ts, c in iter_labeled_windows(data.paths, "emg", 0.2, 0.1, classes=data.classes):
        X.append(extract({"emg": sw.data}))
        y.append(c)
    return LogisticRegression().fit(np.array(X), np.array(y))

Runs on a one-shot training thread when the user clicks Train. Receives TrainingData (with paths, class_names, classes) populated by session_manager. Return any object - it lands on pipeline.model and stays there.

iter_labeled_windows does the heavy lifting of opening sessions (folder or .session.zip), walking the label track, and slicing trials into overlapping windows. You only write the feature extraction.

→ Deep dive: Pipeline, Recording.

@pipeline.predict - live prediction

@pipeline.predict
def predict(model, features):
    return {"class": int(model.predict(features.reshape(1, -1))[0])}

Fires once per 1/predict_hz while the user is in Predict mode. Must return a dict[str, Any]; the result lands in pipeline.predictions for widgets to read.

If you push to outputs (LSL, robots, VHI), do it inside predict() before returning. The output thread takes the latest pushed value and drains it at its own rate.

@app.ui - the rendering surface

@app.ui
def ui(ctx):
    signal_viewer(ctx, "emg")
    recording_controls(
        ctx, ["Rest", "Fist"], on_record=app.start_recording, on_stop=app.stop_recording
    )
    pipeline.training_data = session_manager("sessions", class_names=["Rest", "Fist"])
    pipeline_panel(pipeline)

The @app.ui callback runs on the main thread every frame. The function body is a flat sequence of widget calls; each widget is a stateless function that draws ImGui commands from ctx (and arguments).

Three things live here:

  1. Display widgets (signal_viewer) - render data from ctx.streams.
  2. Control widgets (recording_controls, pipeline_panel) - fire callbacks to mutate state (start recording, train, predict).
  3. Bridge widgets (session_manager) - return a TrainingData you assign to pipeline.training_data. This is how UI selection feeds into ML training.

For grid layouts use Grid and with grid[r, c]:. For pop-out windows use App(docking=True) + app.popout(...).

→ Deep dive: Widgets.

app.run() - the blocking event loop

app.run()

Last line of every script. In order:

  1. Starts each Stream's acquisition thread.
  2. Fires before_run_hooks (where Pipeline registers its predict thread).
  3. Hands control to the GUI event loop on the main thread - blocks here.
  4. On window close, runs cleanup_hooks, then stops all streams and bridges.

app.run(mode="gui") is the default; mode="headless" skips the GUI and runs the threads only (useful for unattended recording).

→ Deep dive: Architecture, Threading.

What's not in this script

  • Recording. Streams flow whether you record or not. Recording starts when app.start_recording() is called (typically from the recording_controls widget's Record button). See Recording.
  • Outputs. This skeleton predicts a class index but doesn't push to anything physical. Add an LSLOutlet, UDPOutput, SerialOutput, or your own subclass and call outlet.push(value) from predict(). See Add a custom output.
  • Multi-stream. A second Stream("imu", ...) and your extract() sees windows["imu"] too. Match the iter_aligned_windows pattern in train() for paired primary/target streams. See Recording.
  • Bridges. Heavy-data sources (webcam, ultrasound) run as subprocesses via app.bridges(...). See Threading.

You now have the skeleton. The deep-dive concept pages add the meat:

  • Streams - ring buffer geometry, the channels-first contract, get_display decimation.
  • Pipeline - state machine, decorator semantics, stale-tick guards.
  • Recording - sessions, label tracks, the .session.zip archive.
  • Widgets - the stateless-function pattern, Grid layout, pop-out windows.
  • Architecture - the runtime architecture (which thread runs what, the data flow diagram).
  • Threading - GIL release, GPU contention rule, bridge subprocesses.
  • Design principles - the eight rules the framework keeps to.

Or jump straight into a hands-on walkthrough: the EMG classification tutorial builds on this skeleton with real features, real data, and a real model.