Pipeline¶
Pipeline(app, predict_hz=50) is the optional ML layer. It adds a predict thread, training/predicting states to ctx.state, and three function decorators that are your entire ML surface - no base classes, no registration, no config.
from myogestic.ml import Pipeline, save_pickle, load_pickle
pipeline = Pipeline(app, predict_hz=20)
pipeline.save_model = save_pickle
pipeline.load_model = load_pickle
@pipeline.extract
def extract(windows: dict[str, np.ndarray]):
return windows["emg"] # whatever shape downstream wants
@pipeline.train
def train(data: TrainingData):
rows = iter_labeled_windows(data.paths, "emg", win_s=0.2, hop_s=0.1)
X, y = ... # build feature matrix
return CatBoostClassifier().fit(X, y)
@pipeline.predict
def predict(model, features):
pred = model.predict(features.reshape(1, -1))[0]
return {"class": pred}
That's the whole protocol.
The three decorators¶
@pipeline.extract¶
Runs on the predict thread, every 1/predict_hz seconds. Receives a dict[str, np.ndarray] keyed by stream name - each value is channels-first (n_channels, n_samples). Return whatever your model wants to consume: a feature vector, a tuple, the raw window, anything pickleable-or-not.
The same callable runs during training, but it's invoked by user code inside train(), not by the framework. (See the worked examples in examples/synthetic/emg_classification.py.)
@pipeline.train¶
Runs on a one-shot training thread when the user clicks Train. Receives one TrainingData instance:
@dataclass
class TrainingData:
paths: list[str] # selected session paths (folders or .session.zip)
class_names: list[str] # ordered class labels
classes: set[int] # subset selected in the UI
Return any object - it's stored on pipeline.model and forwarded to every subsequent predict() call. If pipeline.save_model is set, the Save Model button calls it as save_model(pipeline.model, path). The persistence helpers save_pickle / load_pickle from myogestic.ml are the simplest sane default.
pipeline.training_data is set externally - usually from session_manager inside @app.ui:
with grid[2, 0]:
pipeline.training_data = session_manager(str(Path("sessions")), class_names=CLASSES)
This separation lets the Train button stay disabled until at least one session is ticked.
@pipeline.predict¶
Runs on the predict thread once the model is loaded and Predict is clicked. Receives (model, features) where features is the return value of extract(). Must return a dict[str, Any] - non-dict returns are silently dropped (the previous prediction stays in pipeline.predictions).
Inside predict() you typically also push to outputs:
@pipeline.predict
def predict(model, features):
pose = model.predict(features)
pose_smooth = pose_filter(pose, t=time.monotonic())
vhi_outlet.push(pose_smooth)
return {"pose": pose_smooth}
State machine¶
stateDiagram-v2
[*] --> idle
idle --> recording: app.start_recording()
recording --> idle: app.stop_recording()
idle --> training: pipeline.start_training()
training --> idle: train() returns
idle --> predicting: pipeline.start_predicting()
predicting --> idle: pipeline.stop_predicting()
note right of training
Training and predicting are
mutually exclusive - clicking
Train while predicting is on
stops predict first.
end note
Only one state at a time. Training pauses prediction (and vice versa) - there's no parallel GPU contention juggling. When the user clicks Train while predict is running, predict stops first; when training completes, the user clicks Predict again to resume.
The user-facing UI layer (pipeline_panel, train_button, predict_button) handles transitions; user code rarely calls start_training() / stop_predicting() directly.
Stale-tick guard¶
The predict thread wakes every 1/predict_hz, but the acquisition thread might not have new data - the network or device could be slow. Stateful models (state machines, sequence-aware classifiers) need to know when a tick repeats data. The convention:
@pipeline.extract
def extract(windows):
emg, ts = emg_stream.get_window()
last_ts = float(ts[-1]) if ts.size > 0 else None
return (emg, last_ts)
@pipeline.predict
def predict(model, features):
emg, last_ts = features
return model.step(emg, last_ts=last_ts) # model decides if tick is stale
A stateful model can use exactly this - its step() returns the previous prediction if last_ts hasn't advanced since the last call.
Lifecycle hooks¶
Pipeline registers itself via app.before_run_hooks and app.cleanup_hooks. The predict thread starts at app.run() and runs for the lifetime of the app, but it only does work while ctx.state == "predicting" - when the user clicks Predict, the state flips and the thread starts calling extract / predict; clicking Predict again or closing the window flips it back to idle and the loop short-circuits. Cleanup signals via a threading.Event and joins with a short timeout. You don't manage threads yourself.
Common mistakes¶
See also: full Troubleshooting index, organised by symptom across every subsystem.
- Returning a non-dict from
predict(). Silently dropped. Always return{"key": value}. - Mutating
pipeline.training_dataoutside@app.ui. It can be set anywhere, but most experiments letsession_managerwrite it from the UI. - Heavy work inside
extract()orpredict(). They run on the predict thread atpredict_hz. Keep CPU work bounded; offload long jobs to the training thread. - Forgetting
pipeline.save_model = save_pickle. The save/load buttons render but do nothing without it.