Skip to content

ML pipeline

Pipeline

Pipeline(app: App, predict_hz: float = 50.0)

ML lifecycle + state for an App.

Constructor registers the predict thread + cleanup on the App's hook lists; they fire on app.run() start/exit. Decorators set the callbacks. Transition methods flip app.ctx.state.

Parameters:

Name Type Description Default
app App

The myogestic App.

required
predict_hz float

Maximum predict-loop tick rate. Set to 0 or negative to remove the cap (run at full speed).

50.0
Source code in myogestic/ml/pipeline.py
def __init__(self, app: App, predict_hz: float = 50.0):
    self.app = app
    self.predict_hz = predict_hz
    self.model: Any = None
    self.predictions: dict[str, Any] = {}
    self.train_log: list[str] = []
    self.on_extract: Callable | None = None
    self.on_train: Callable | None = None
    self.on_predict: Callable | None = None
    # Set if you want save/load buttons to do anything; the
    # `myogestic.ml.save_pickle` / `load_pickle` joblib helpers are the
    # obvious default but the library doesn't force them.
    self.save_model: Callable | None = None
    self.load_model: Callable | None = None
    # Set this from inside `@app.ui` to publish what the user picked
    # in `session_manager(...)` to `@pipeline.train`.
    self.training_data: TrainingData | None = None
    self._stop = threading.Event()
    self._thread: threading.Thread | None = None

    app.before_run_hooks.append(self._start_predict_thread)
    app.cleanup_hooks.append(self._cleanup)

extract

extract(fn: Callable) -> Callable

Decorator: register the feature-extraction callback.

The wrapped function receives windows: dict[str, np.ndarray] keyed by stream name — each array is channels-first (n_channels, n_samples). Return whatever shape your model wants to consume. The same function is invoked from inside train() (over recorded windows) and on the predict thread (over live windows), so keep its return type stable.

Source code in myogestic/ml/pipeline.py
def extract(self, fn: Callable) -> Callable:
    """Decorator: register the feature-extraction callback.

    The wrapped function receives ``windows: dict[str, np.ndarray]``
    keyed by stream name — each array is **channels-first**
    ``(n_channels, n_samples)``. Return whatever shape your model
    wants to consume. The same function is invoked from inside
    ``train()`` (over recorded windows) and on the predict thread
    (over live windows), so keep its return type stable.
    """
    self.on_extract = fn
    return fn

train

train(fn: Callable) -> Callable

Decorator: register the training callback.

The wrapped function receives one :class:TrainingData and must return any object — it's stored on pipeline.model and forwarded to every subsequent predict() call. If pipeline.save_model is set, the Save Model button calls it as save_model(pipeline.model, path).

Source code in myogestic/ml/pipeline.py
def train(self, fn: Callable) -> Callable:
    """Decorator: register the training callback.

    The wrapped function receives one :class:`TrainingData` and
    must return any object — it's stored on ``pipeline.model`` and
    forwarded to every subsequent ``predict()`` call. If
    ``pipeline.save_model`` is set, the **Save Model** button calls
    it as ``save_model(pipeline.model, path)``.
    """
    self.on_train = fn
    return fn

predict

predict(fn: Callable) -> Callable

Decorator: register the predict callback.

The wrapped function is called every 1/predict_hz seconds with (model, features) where features is the return value of the extract callback. Must return a dict[str, Any] — non-dict returns are silently dropped (the previous prediction stays in pipeline.predictions).

Source code in myogestic/ml/pipeline.py
def predict(self, fn: Callable) -> Callable:
    """Decorator: register the predict callback.

    The wrapped function is called every ``1/predict_hz`` seconds
    with ``(model, features)`` where ``features`` is the return
    value of the ``extract`` callback. **Must return a
    ``dict[str, Any]``** — non-dict returns are silently dropped
    (the previous prediction stays in ``pipeline.predictions``).
    """
    self.on_predict = fn
    return fn

start_training

start_training() -> None

Run the @pipeline.train callback on a worker thread.

No-op (sets ctx.status_message) unless the state is idle, a train callback is registered, and non-empty :attr:training_data is set. Flips the state to training for the duration and stores the returned object on :attr:model.

Source code in myogestic/ml/pipeline.py
def start_training(self) -> None:
    """Run the ``@pipeline.train`` callback on a worker thread.

    No-op (sets ``ctx.status_message``) unless the state is ``idle``,
    a train callback is registered, and non-empty
    :attr:`training_data` is set. Flips the state to ``training`` for
    the duration and stores the returned object on :attr:`model`.
    """
    ctx = self.app.ctx
    if ctx.state != "idle":
        ctx.status_message = f"Cannot start training: state is {ctx.state!r}, expected 'idle'."
        return
    if self.on_train is None:
        ctx.status_message = "No train callback set (use @pipeline.train)"
        return
    data = self.training_data
    if data is None or data.is_empty:
        ctx.status_message = (
            "No training data — set "
            "pipeline.training_data = session_manager(...) inside @app.ui."
        )
        return

    ctx.state = PipelineState.TRAINING
    ctx.status_message = "Training..."
    ctx.log("Training started")
    on_train = self.on_train

    def _worker() -> None:
        try:
            self.model = on_train(data)
            ctx.status_message = "Training complete"
            self.train_log.append("Training complete")
            ctx.log("Training complete")
        except Exception as e:
            tb = traceback.format_exc()
            ctx.status_message = f"Training failed: {e}"
            self.train_log.append(f"FAILED: {e}")
            self.train_log.append(tb)
            ctx.log(f"Training failed: {e}")
        finally:
            ctx.state = "idle"

    if _IS_BROWSER:
        # Pyodide: no threads. Run synchronously on the UI frame
        # that triggered the click. Blocks that frame for the
        # duration of training - acceptable for the small models
        # the playground demos. Heavy models would need an explicit
        # split-step trainer, out of scope here.
        _worker()
    else:
        threading.Thread(target=_worker, daemon=True).start()

start_predicting

start_predicting() -> None

Flip the state to predicting so the predict thread runs.

No-op (sets ctx.status_message) unless the state is idle and a :attr:model is loaded.

Source code in myogestic/ml/pipeline.py
def start_predicting(self) -> None:
    """Flip the state to ``predicting`` so the predict thread runs.

    No-op (sets ``ctx.status_message``) unless the state is ``idle``
    and a :attr:`model` is loaded.
    """
    ctx = self.app.ctx
    if ctx.state != "idle":
        ctx.status_message = (
            f"Cannot start predicting: state is {ctx.state!r}, expected 'idle'."
        )
        return
    if self.model is None:
        ctx.status_message = "No model loaded — train one first or load from disk."
        return
    ctx.state = PipelineState.PREDICTING
    ctx.status_message = "Predicting..."

stop_predicting

stop_predicting() -> None

Return to idle, pausing the predict loop.

No-op (sets ctx.status_message) unless the state is currently predicting.

Source code in myogestic/ml/pipeline.py
def stop_predicting(self) -> None:
    """Return to ``idle``, pausing the predict loop.

    No-op (sets ``ctx.status_message``) unless the state is currently
    ``predicting``.
    """
    ctx = self.app.ctx
    if ctx.state != PipelineState.PREDICTING:
        ctx.status_message = (
            f"Cannot stop predicting: state is {ctx.state!r}, expected 'predicting'."
        )
        return
    ctx.state = "idle"
    ctx.status_message = "Prediction stopped"

PipelineState

Bases: StrEnum

ML-side extension of :class:~myogestic.AppState.

The core app only knows about "idle" and "recording"; attaching a :class:Pipeline (via Pipeline(app)) adds two more states for the ML lifecycle. Mutually exclusive with each other and with the core states: a Pipeline cannot be predicting and training at the same time, by design (the train pause exists so the GPU isn't fought over).

The enum is a StrEnum so it compares cleanly against the raw string written to app.ctx.state by the transition methods.

Attributes:

Name Type Description
TRAINING

train() is running on a background thread. Predict ticks short-circuit so they don't fight for GPU.

PREDICTING

The predict thread is calling extract + predict each tick at predict_hz and writing the result to pipeline.predictions.

save_pickle

save_pickle(model: Any, path: str | Path) -> str

Persist model to path via joblib, creating parent dirs as needed.

Returns the path as a string.

Source code in myogestic/ml/persistence.py
def save_pickle(model: Any, path: str | Path) -> str:
    """Persist ``model`` to ``path`` via joblib, creating parent dirs as needed.

    Returns the path as a string.
    """
    p = Path(path)
    p.parent.mkdir(parents=True, exist_ok=True)
    joblib.dump(model, str(p))
    return str(p)

load_pickle

load_pickle(path: str | Path) -> Any

Inverse of :func:save_pickle — load a joblib-saved model.

Source code in myogestic/ml/persistence.py
def load_pickle(path: str | Path) -> Any:
    """Inverse of :func:`save_pickle` — load a joblib-saved model."""
    return joblib.load(str(path))

Widgets

train_button

train_button(pipeline: Pipeline, size: tuple[float, float] = (80, 0)) -> None

Draw a Train button that calls :meth:Pipeline.start_training on click.

Source code in myogestic/ml/widgets.py
def train_button(pipeline: Pipeline, size: tuple[float, float] = (80, 0)) -> None:
    """Draw a Train button that calls :meth:`Pipeline.start_training` on click."""
    if imgui.button(f"{fa.ICON_FA_GEARS}  Train##ml_train", imgui.ImVec2(*size)):
        pipeline.start_training()

predict_button

predict_button(pipeline: Pipeline, size: tuple[float, float] = (92, 0)) -> None

Draw a Predict/Stop toggle button reflecting the pipeline's predict state.

Enabled to start only when the state is idle, a model is loaded, and both the extract and predict callbacks are wired; shows a Stop button while predicting and is disabled otherwise.

Source code in myogestic/ml/widgets.py
def predict_button(pipeline: Pipeline, size: tuple[float, float] = (92, 0)) -> None:
    """Draw a Predict/Stop toggle button reflecting the pipeline's predict state.

    Enabled to start only when the state is ``idle``, a model is loaded,
    and both the extract and predict callbacks are wired; shows a Stop
    button while predicting and is disabled otherwise.
    """
    state = pipeline.app.ctx.state
    # Predict needs three things together: the state must be idle, a model
    # must be loaded, AND both extract + predict callbacks must be wired.
    # A model alone (e.g. just-loaded from disk) is not sufficient.
    can_start = (
        state == "idle"
        and pipeline.model is not None
        and pipeline.on_extract is not None
        and pipeline.on_predict is not None
    )
    if can_start:
        if imgui.button(f"{fa.ICON_FA_PLAY}  Predict##ml_pred", imgui.ImVec2(*size)):
            pipeline.start_predicting()
    elif state == PipelineState.PREDICTING:
        if imgui.button(f"{fa.ICON_FA_STOP}  Predict##ml_pred", imgui.ImVec2(*size)):
            pipeline.stop_predicting()
    else:
        imgui.begin_disabled()
        imgui.button(f"{fa.ICON_FA_PLAY}  Predict##ml_pred", imgui.ImVec2(*size))
        imgui.end_disabled()

training_log

training_log(pipeline: Pipeline, height: float = 100.0, *, widget_id: str = 'ml') -> None

Read-only view of pipeline.train_log with smart autoscroll.

Uses the same scrollable-child + tail-follow renderer as process_launcher. The autoscroll/popout toggles aren't drawn here — they're rendered as part of pipeline_panel's control row so they sit next to Train/Predict.

Source code in myogestic/ml/widgets.py
def training_log(pipeline: Pipeline, height: float = 100.0, *, widget_id: str = "ml") -> None:
    """Read-only view of ``pipeline.train_log`` with smart autoscroll.

    Uses the same scrollable-child + tail-follow renderer as
    ``process_launcher``. The autoscroll/popout *toggles* aren't drawn
    here — they're rendered as part of ``pipeline_panel``'s control row
    so they sit next to Train/Predict.
    """
    if not pipeline.train_log:
        return
    autoscroll = _autoscroll.setdefault(widget_id, True)
    render_log(widget_id, pipeline.train_log, height=height, autoscroll=autoscroll)

save_model_button

save_model_button(pipeline: Pipeline, path: str, size: tuple[float, float] = (100, 0)) -> None

Draw a Save button that writes the model to path via pipeline.save_model.

Disabled unless both pipeline.save_model and pipeline.model are set; on click calls save_model(model, path) and reports the result through the status message and app log.

Source code in myogestic/ml/widgets.py
def save_model_button(pipeline: Pipeline, path: str, size: tuple[float, float] = (100, 0)) -> None:
    """Draw a Save button that writes the model to ``path`` via ``pipeline.save_model``.

    Disabled unless both ``pipeline.save_model`` and ``pipeline.model``
    are set; on click calls ``save_model(model, path)`` and reports the
    result through the status message and app log.
    """
    if pipeline.save_model is None or pipeline.model is None:
        imgui.begin_disabled()
        imgui.button(f"{fa.ICON_FA_FLOPPY_DISK}  Save##ml_save", imgui.ImVec2(*size))
        imgui.end_disabled()
        return
    if imgui.button(f"{fa.ICON_FA_FLOPPY_DISK}  Save##ml_save", imgui.ImVec2(*size)):
        try:
            pipeline.save_model(pipeline.model, path)
            pipeline.app.ctx.status_message = f"Saved model to {path}"
            pipeline.app.ctx.log(f"Model saved → {path}")
        except Exception as e:
            pipeline.app.ctx.status_message = f"Save failed: {e}"
            pipeline.app.ctx.log(f"Model save failed: {e}")

load_model_button

load_model_button(pipeline: Pipeline, path: str, size: tuple[float, float] = (100, 0)) -> None

Draw a Load button that reads a model from path via pipeline.load_model.

Disabled unless pipeline.load_model is set; on click stores the returned object on pipeline.model and reports the result through the status message and app log.

Source code in myogestic/ml/widgets.py
def load_model_button(pipeline: Pipeline, path: str, size: tuple[float, float] = (100, 0)) -> None:
    """Draw a Load button that reads a model from ``path`` via ``pipeline.load_model``.

    Disabled unless ``pipeline.load_model`` is set; on click stores the
    returned object on ``pipeline.model`` and reports the result through
    the status message and app log.
    """
    if pipeline.load_model is None:
        imgui.begin_disabled()
        imgui.button(f"{fa.ICON_FA_FOLDER_OPEN}  Load##ml_load", imgui.ImVec2(*size))
        imgui.end_disabled()
        return
    if imgui.button(f"{fa.ICON_FA_FOLDER_OPEN}  Load##ml_load", imgui.ImVec2(*size)):
        try:
            pipeline.model = pipeline.load_model(path)
            pipeline.app.ctx.status_message = f"Loaded model from {path}"
            pipeline.app.ctx.log(f"Model loaded ← {path}")
        except Exception as e:
            pipeline.app.ctx.status_message = f"Load failed: {e}"
            pipeline.app.ctx.log(f"Model load failed: {e}")

pipeline_panel

pipeline_panel(pipeline: Pipeline, *, log_height: float = 80.0, widget_id: str = 'ml') -> None

Train + Predict + log as a single titled panel.

Matches the visual style of recording_controls, session_manager, and FilterControl.

The log inherits the same autoscroll + popout UX as the process launcher's log: a double-chevron-down icon toggles auto-tail-follow, a box-out icon detaches the log into a floating ImGui window that survives across selection / frame churn.

Source code in myogestic/ml/widgets.py
def pipeline_panel(
    pipeline: Pipeline,
    *,
    log_height: float = 80.0,
    widget_id: str = "ml",
) -> None:
    """Train + Predict + log as a single titled panel.

    Matches the visual style of `recording_controls`, `session_manager`,
    and `FilterControl`.

    The log inherits the same autoscroll + popout UX as the process
    launcher's log: a double-chevron-down icon toggles auto-tail-follow,
    a box-out icon detaches the log into a floating ImGui window that
    survives across selection / frame churn.
    """
    # Render any open popout first so it survives frames even when the
    # surrounding panel scrolls out of view (same pattern as
    # process_launcher._render_open_popouts).
    if _popout_open.get(widget_id, False):
        autoscroll = _autoscroll.setdefault(widget_id, True)
        still_open = render_log_popout(
            widget_id,
            pipeline.train_log,
            title="Model training log",
            autoscroll=autoscroll,
        )
        if not still_open:
            _popout_open[widget_id] = False

    panel_header("MODEL", fa.ICON_FA_BRAIN)
    train_button(pipeline)
    imgui.same_line()
    predict_button(pipeline)

    # Autoscroll + popout toggles, same look as the process launcher.
    imgui.same_line()
    autoscroll = _autoscroll.setdefault(widget_id, True)
    popped = _popout_open.get(widget_id, False)
    autoscroll, popped = render_log_buttons(widget_id, autoscroll=autoscroll, popped_out=popped)
    _autoscroll[widget_id] = autoscroll
    _popout_open[widget_id] = popped

    if popped:
        imgui.text_disabled("(log popped out — see 'Model training log' window)")
    elif pipeline.train_log:
        render_log(
            widget_id,
            pipeline.train_log,
            height=log_height,
            autoscroll=autoscroll,
        )

pipeline_panel