Skip to content

Core API

App lifecycle

App

App(name: str, theme: bool = True, docking: bool = False, ui_scale: float | None = None)

Top-level application object.

Owns the GUI loop, the Context, the run-loop lifecycle hooks, and the recording state machine.

Construct one per process. Register streams via app.streams(...), register your UI via @app.ui, then call app.run(). Optional extensions like Pipeline(app) register themselves via app.before_run_hooks / app.cleanup_hooks - user code rarely needs to touch those lists directly.

Parameters:

Name Type Description Default
name str

Window title. Also used for the persisted ImGui state file (.imgui_state/<name>.ini) when docking=True.

required
theme bool

Apply MyoGestic's built-in ImGui theme. Set False to keep the Dear ImGui defaults.

True
docking bool

Experimental - enable ImGui docking + multi-viewport so panels registered via app.popout(...) become tearable DockableWindows. macOS Retina viewport sizing of detached windows can be wrong on initial draw; treat as experimental.

False
ui_scale float | None

Global UI zoom factor - scales the font and imgui's style metrics (padding, spacing, rounding). None uses $MYOGESTIC_UI_SCALE then 1.0. The env var, if set, overrides this - a per-machine display fix beats the example's value. Clamped to [0.5, 2.0]. Has no effect when theme=False.

None
Source code in myogestic/core.py
def __init__(
    self, name: str, theme: bool = True, docking: bool = False, ui_scale: float | None = None
):
    self.name = name
    self.ctx = Context()
    self._ui_fn: Callable[[Context], None] | None = None
    self._theme_enabled = theme
    self._ui_scale = ui_scale
    # Experimental: when True, _gui_loop enables ImGui docking +
    # multi-viewport so widgets wrapped in `popout_panel(...)` become
    # tearable, dock-able windows. macOS Metal/Retina caveats apply
    # - see the README "Status" note.
    self._docking = docking
    self._running = False
    # Extensions register here (e.g. myogestic.ml.attach_pipeline).
    # before_run: after streams.start, before _gui_loop, on main thread.
    # cleanup: in finally - always runs, each wrapped in try/except.
    self.before_run_hooks: list[Callable[[App], None]] = []
    self.cleanup_hooks: list[Callable[[App], None]] = []
    self._popout_specs: list[tuple[str, Callable[[], None], bool, bool, bool | None]] = []

streams

streams(*streams: Stream) -> None

Register one or more streams with the app.

Each stream is keyed by its name into ctx.streams. Acquisition threads start when app.run() is called, not at registration time. Calling this with the same name overwrites the previous registration - typically you call it once at setup.

Parameters:

Name Type Description Default
*streams Stream

One or more :class:Stream instances.

()
Source code in myogestic/core.py
def streams(self, *streams: Stream) -> None:
    """Register one or more streams with the app.

    Each stream is keyed by its ``name`` into ``ctx.streams``.
    Acquisition threads start when ``app.run()`` is called, not at
    registration time. Calling this with the same name overwrites
    the previous registration - typically you call it once at setup.

    Parameters
    ----------
    *streams
        One or more :class:`Stream` instances.
    """
    for s in streams:
        self.ctx.streams[s.name] = s

bridges

bridges(*bridges: Any) -> None

Register one or more Bridge subprocesses with the app.

Bridges run in their own process (webcam, ultrasound, depth camera, …) and publish an LSL clock stream the main app subscribes to. The cockpit's process_launcher widget shows their start/stop state. Mirrors :meth:streams exactly: each bridge is keyed by its .name into ctx.bridges; calling with the same name overwrites the previous registration.

Parameters:

Name Type Description Default
*bridges Any

One or more bridge instances - each must expose a .name attribute and a Bridge-like interface (.start(), .stop()).

()
Source code in myogestic/core.py
def bridges(self, *bridges: Any) -> None:
    """Register one or more Bridge subprocesses with the app.

    Bridges run in their own process (webcam, ultrasound, depth
    camera, …) and publish an LSL clock stream the main app
    subscribes to. The cockpit's ``process_launcher`` widget shows
    their start/stop state. Mirrors :meth:`streams` exactly: each
    bridge is keyed by its ``.name`` into ``ctx.bridges``; calling
    with the same name overwrites the previous registration.

    Parameters
    ----------
    *bridges
        One or more bridge instances - each must expose a
        ``.name`` attribute and a Bridge-like interface
        (``.start()``, ``.stop()``).
    """
    for b in bridges:
        self.ctx.bridges[b.name] = b

ui

ui(fn: Callable[[Context], None]) -> Callable[[Context], None]

Decorator. Register the render callback.

@app.ui def my_ui(ctx): imgui.text(f"State: {ctx.state}")

Source code in myogestic/core.py
def ui(self, fn: Callable[[Context], None]) -> Callable[[Context], None]:
    """Decorator. Register the render callback.

    @app.ui
    def my_ui(ctx):
        imgui.text(f"State: {ctx.state}")
    """
    self._ui_fn = fn
    return fn

popout

popout(title: str, gui_fn: Callable[[], None], *, default_open: bool = True, can_be_closed: bool = True, remember_is_visible: bool | None = None) -> None

Register a dockable window before run().

This is the preferred path for examples/apps using App(docking=True). It gives Hello ImGui the complete DockableWindow list before launch, instead of discovering windows on the first render frame.

Source code in myogestic/core.py
def popout(
    self,
    title: str,
    gui_fn: Callable[[], None],
    *,
    default_open: bool = True,
    can_be_closed: bool = True,
    remember_is_visible: bool | None = None,
) -> None:
    """Register a dockable window before `run()`.

    This is the preferred path for examples/apps using `App(docking=True)`.
    It gives Hello ImGui the complete DockableWindow list before launch,
    instead of discovering windows on the first render frame.
    """
    self._popout_specs = [spec for spec in self._popout_specs if spec[0] != title]
    self._popout_specs.append((title, gui_fn, default_open, can_be_closed, remember_is_visible))

start_recording

start_recording(base_path: str = 'sessions') -> None

Begin recording all connected streams to a new session.

Creates base_path/<timestamp>/ and starts appending each stream's data + timestamps to per-stream Zarr arrays. Streams whose info is still None (disconnected) are skipped - they won't be retroactively captured if they connect later in the recording. Refuses to start if ctx.state isn't "idle"; updates ctx.status_message with the result.

Parameters:

Name Type Description Default
base_path str

Directory where the per-session subfolder is created. Defaults to "sessions".

'sessions'
Source code in myogestic/core.py
def start_recording(self, base_path: str = "sessions") -> None:
    """Begin recording all connected streams to a new session.

    Creates ``base_path/<timestamp>/`` and starts appending each
    stream's data + timestamps to per-stream Zarr arrays. Streams
    whose ``info`` is still ``None`` (disconnected) are skipped -
    they won't be retroactively captured if they connect later in
    the recording. Refuses to start if ``ctx.state`` isn't
    ``"idle"``; updates ``ctx.status_message`` with the result.

    Parameters
    ----------
    base_path
        Directory where the per-session subfolder is
        created. Defaults to ``"sessions"``.
    """
    if not can_transition(self.ctx.state, AppState.RECORDING):
        self.ctx.status_message = (
            f"Cannot start recording: state is {self.ctx.state!r}, expected 'idle'."
        )
        return
    # Only record from streams that have connected - a stream with info=None
    # has no zarr schema and would fail on append. Disconnected streams are
    # skipped; if they connect later they won't be retroactively captured.
    self.ctx.state = AppState.RECORDING
    self.ctx.session = Session(base_path=base_path)
    n_ready = 0
    for name, stream in self.ctx.streams.items():
        if stream.info is None:
            continue
        self.ctx.session.init_stream(name, stream.info)
        stream.attach_session(self.ctx.session)
        n_ready += 1
    if n_ready == 0:
        self.ctx.status_message = "No connected streams to record"
        self.ctx.log("Recording: no connected streams")
    else:
        self.ctx.status_message = f"Recording to {self.ctx.session.path}"
        self.ctx.log(f"Recording → {self.ctx.session.path}")

stop_recording

stop_recording() -> None

Stop the active recording and pack the session to a .session.zip.

Finalises the per-stream Zarr arrays, writes the label track to labels.json, and kicks off a daemon thread that packs the session folder into a single <timestamp>.session.zip archive (the original folder is kept until the pack succeeds). Refuses to stop if ctx.state isn't "recording".

Source code in myogestic/core.py
def stop_recording(self) -> None:
    """Stop the active recording and pack the session to a ``.session.zip``.

    Finalises the per-stream Zarr arrays, writes the label track to
    ``labels.json``, and kicks off a daemon thread that packs the
    session folder into a single ``<timestamp>.session.zip`` archive
    (the original folder is kept until the pack succeeds). Refuses
    to stop if ``ctx.state`` isn't ``"recording"``.
    """
    if not can_transition(self.ctx.state, AppState.IDLE):
        self.ctx.status_message = (
            f"Cannot stop recording: state is {self.ctx.state!r}, expected 'recording'."
        )
        return
    self.ctx.state = AppState.IDLE
    # Detach every stream *before* finalising/packing the session.
    # detach_session() waits for any in-flight append on the acquire
    # thread, so the daemon pack thread below can clear the Zarr stores
    # without racing the acquire loop (was: KeyError mid-append).
    for stream in self.ctx.streams.values():
        stream.detach_session()
    if self.ctx.session is not None:
        session = self.ctx.session
        session.save_meta(self.name, class_names=self.ctx.class_names or None)
        n = len(session.label_track)
        self.ctx.status_message = f"Saved {n} labels - finalizing…"

        # Pack session folder → .session.zip in a daemon thread so the
        # UI stays responsive during finalization. Register with
        # session_manager only after pack succeeds.
        import threading

        def _finalize() -> None:
            try:
                zip_path = session.pack_to_zip()
                self.ctx.status_message = f"Saved {n} labels"
                self.ctx.log(f"Session saved → {zip_path}")
                from myogestic.widgets.training.session_manager import (
                    add_recorded_session,
                )

                add_recorded_session(str(zip_path))
                log.info("packed session to %s", zip_path)
            except Exception as e:
                log.exception("pack_to_zip failed: %s", e)
                self.ctx.status_message = f"Pack failed: {e} - folder kept"
                self.ctx.log(f"Pack failed: {e} - folder kept at {session.path}")
                # Fall back: register the folder so user doesn't lose it
                try:
                    from myogestic.widgets.training.session_manager import (
                        add_recorded_session,
                    )

                    add_recorded_session(str(session.path))
                except Exception:
                    pass

        threading.Thread(target=_finalize, daemon=True).start()

run

run(mode: str = 'gui', window_size: tuple[int, int] = (1280, 800), fullscreen: bool = False) -> None

Blocking entry point.

Call tree (top → bottom = runtime order):

App.run()
├─ 1. Stream.start()          per stream → daemon acquire thread
├─ 2. before_run_hooks(app)  extensions register here
│    └─ e.g. myogestic.ml.attach_pipeline → starts predict thread
├─ 3. self._gui_loop()  ← main thread, BLOCKS
│    └─ immapp.run → per frame: self._ui_fn(self.ctx)  (your @app.ui)
└─ 4. [finally] cleanup - always runs, even on startup failure
     ├─ cleanup_hooks(app)   each wrapped in try/except
     ├─ Stream.stop()         per stream
     ├─ Bridge.stop()         per bridge
     └─ process_launcher._cleanup_all()

Core has only idle ↔ recording. myogestic.ml.attach_pipeline(app) adds training/predicting states + their transition methods.

Source code in myogestic/core.py
def run(
    self,
    mode: str = "gui",
    window_size: tuple[int, int] = (1280, 800),
    fullscreen: bool = False,
) -> None:
    """Blocking entry point.

    Call tree (top → bottom = runtime order):

        App.run()
        ├─ 1. Stream.start()          per stream → daemon acquire thread
        ├─ 2. before_run_hooks(app)  extensions register here
        │    └─ e.g. myogestic.ml.attach_pipeline → starts predict thread
        ├─ 3. self._gui_loop()  ← main thread, BLOCKS
        │    └─ immapp.run → per frame: self._ui_fn(self.ctx)  (your @app.ui)
        └─ 4. [finally] cleanup - always runs, even on startup failure
             ├─ cleanup_hooks(app)   each wrapped in try/except
             ├─ Stream.stop()         per stream
             ├─ Bridge.stop()         per bridge
             └─ process_launcher._cleanup_all()

    Core has only idle ↔ recording. `myogestic.ml.attach_pipeline(app)` adds
    training/predicting states + their transition methods.
    """
    if self._running:
        raise RuntimeError("App.run() is not re-entrant")
    if mode not in ("gui", "headless"):
        raise ValueError(
            f"App.run(mode={mode!r}) - unknown mode. Supported: 'gui', 'headless'."
        )
    self._running = True

    log.info("run() - streams=%s mode=%s", list(self.ctx.streams), mode)

    from myogestic._browser import IS_BROWSER

    def _do_cleanup() -> None:
        log.info("run() - cleanup")
        for hook in self.cleanup_hooks:
            try:
                hook(self)
            except Exception as e:
                log.exception("cleanup hook %r failed: %s", hook, e)
        for stream in self.ctx.streams.values():
            try:
                stream.stop()
            except Exception as e:
                log.exception("stream stop failed: %s", e)
        for bridge in self.ctx.bridges.values():
            try:
                bridge.stop()
            except Exception as e:
                log.exception("bridge stop failed: %s", e)
        try:
            from myogestic.widgets.panels.process_launcher import _cleanup_all

            _cleanup_all()
        except Exception as e:
            log.exception("process cleanup failed: %s", e)
        self._running = False

    try:
        for name, stream in self.ctx.streams.items():
            log.info("  start stream %r", name)
            stream.start()

        for hook in self.before_run_hooks:
            log.info("  before_run hook: %s", getattr(hook, "__qualname__", hook))
            hook(self)

        log.info("  enter %s loop", mode)
        if mode == "gui":
            self._gui_loop(window_size=window_size, fullscreen=fullscreen)
        elif mode == "headless":
            self._headless_loop()
    except BaseException:
        # Always tear down on error - half-started state must not
        # leak (orphan threads on desktop, orphan scheduler entries
        # in browser).
        _do_cleanup()
        raise

    if IS_BROWSER:
        # In Pyodide, immapp.run returns immediately and the
        # browser's requestAnimationFrame drives the GUI from here.
        # Skipping cleanup keeps the app alive after this function
        # returns; tab unload is the implicit teardown.
        return

    _do_cleanup()

AppState

Bases: StrEnum

Core app-state values. Extensions (e.g. myogestic.ml.PipelineState) add more.

Context.state is a bare str so extensions can introduce their own states without subclassing. Each module validates transitions within its own namespace only.

Context dataclass

Context(streams: dict[str, Stream] = dict(), bridges: dict[str, Any] = dict(), state: str = IDLE, session: Session | None = None, class_names: list[str] = list(), current_label: int = -1, status_message: str = '', logs: list[str] = list())

Shared state all threads read/write.

Extensions may add own fields dynamically on the owning App, but Context itself is core-only.

log

log(message: str, max_lines: int = 500) -> None

Append a one-line app event for the log_panel widget.

Bounded to max_lines (oldest dropped). Use for high-level events - recording saved, training start/done, model load - not per-frame chatter. Safe to call from any thread (list.append/pop are GIL-atomic).

Source code in myogestic/core.py
def log(self, message: str, max_lines: int = 500) -> None:
    """Append a one-line app event for the `log_panel` widget.

    Bounded to `max_lines` (oldest dropped). Use for high-level events -
    recording saved, training start/done, model load - not per-frame
    chatter. Safe to call from any thread (list.append/pop are GIL-atomic).
    """
    from time import strftime

    line = f"[{strftime('%H:%M:%S')}] {message}"
    self.logs.append(line)
    if len(self.logs) > max_lines:
        del self.logs[0 : len(self.logs) - max_lines]

Stream

Stream(name: str, source: Source, window_ms: float, buffer_ms: float = 10000)

A named ring-buffered live stream backed by a :class:Source.

The framework's central data primitive: pair a name ("emg") with a source (LSLSource("TestEMG1")) and a window duration, register the stream with app.streams(...), and the rest of the framework can pull windows (for ML), display snapshots (for the signal viewer), or recorded chunks (for the session) by stream name.

Architecture:

  • One daemon acquisition thread is started per Stream when App.run() begins. It loops source.read(), appends to the ring buffer, refreshes the display snapshot, and (if a recording session is active) appends to the session's Zarr store.
  • Two consumer surfaces are then available concurrently: :meth:get_window (channels-first, exact window-seconds long, consumed by @pipeline.extract) and :meth:get_display (min/max envelope decimated for 60 fps rendering, consumed by signal_viewer).
  • The ring buffer holds the last buffer_ms of samples so transient consumers (slow extract, momentary GUI hitches) don't lose data.

Examples:

>>> from myogestic import App, Stream
>>> from myogestic.sources import LSLSource
>>> app = App("hello")
>>> app.streams(
...     Stream("emg", source=LSLSource("TestEMG1"),
...            window_ms=1000, buffer_ms=10000),
... )

See Streams concept for the buffer + decimation model in depth, and Add a custom source for the matching source-side contract.

Live ring-buffered stream with display decimation.

Parameters:

Name Type Description Default
name str

Stream label (also used as the recorded zarr stream key).

required
source Source

Anything implementing the :class:Source protocol.

required
window_ms float

Duration in milliseconds of the window returned by :meth:get_window.

required
buffer_ms float

Ring-buffer depth in milliseconds. Defaults to 10000 (10 s).

10000
Source code in myogestic/stream.py
def __init__(
    self,
    name: str,
    source: Source,
    window_ms: float,
    buffer_ms: float = 10000,
):
    """Live ring-buffered stream with display decimation.

    Parameters
    ----------
    name
        Stream label (also used as the recorded zarr stream key).
    source
        Anything implementing the :class:`Source` protocol.
    window_ms
        Duration in **milliseconds** of the window returned
        by :meth:`get_window`.
    buffer_ms
        Ring-buffer depth in milliseconds. Defaults to 10000 (10 s).
    """
    self.name = name
    self._source = source
    self._window = window_ms / 1000.0
    self._buffer_seconds = buffer_ms / 1000.0
    self._running = False
    self._session: Session | None = None
    # Guards `_session` against the acquire loop. `stop_recording()` tears
    # the session down (and a daemon thread clears its Zarr stores) while
    # this stream's acquire thread may still be inside `session.append()`.
    # Holding this lock around both the append and attach/detach makes the
    # teardown wait for any in-flight append, so the stores are never
    # cleared mid-append. Deliberately separate from `_lock` (which guards
    # the display buffers and is read by the GUI thread) so we never block
    # rendering on Zarr disk I/O — only the rare Stop path ever waits here.
    self._session_lock = threading.Lock()
    self.status = "disconnected"
    self.last_error = ""
    self.info: StreamInfo | None = None
    self._connected = False

    # These are initialized in _connect()
    self._cap: int = 0
    self._data: RingBuffer | None = None
    self._timestamps: RingBuffer | None = None
    self._lock = threading.Lock()
    self._display_d = np.empty(0)
    self._display_t = np.empty(0)
    self._display_n: int = 0
    self._snap_interval: int = 1
    self._samples_since_snap: int = 0
    self._m4_n_pixels: int = 2000
    self._m4_t = np.empty(0, dtype=np.float64)
    self._m4_d = np.empty(0)
    self._m4_n: int = 0
    # Per-stream M4 scratch (was module globals — not thread-safe across streams)
    self._m4_downsampler: M4Downsampler | None = None
    self._m4_work_col: np.ndarray | None = None
    self._m4_work_idx: np.ndarray | None = None
    self._m4_work_d: np.ndarray | None = None
    self._m4_work_t: np.ndarray | None = None
    self._win_d = np.empty(0)
    self._win_t = np.empty(0)

reconnect

reconnect(target: str | None = None) -> bool

Reconnect source. Optionally switch to a different target.

If the source implements reconnect(), uses that (preserves source-specific logic like LSL resolve or serial port open). Otherwise falls back to disconnect + connect. Either way the source is connected ONCE — buffers are then (re)allocated from the returned StreamInfo.

Holds self._lock for the whole swap to prevent the acquire loop from reading through a half-torn state.

Source code in myogestic/stream.py
def reconnect(self, target: str | None = None) -> bool:
    """Reconnect source. Optionally switch to a different target.

    If the source implements reconnect(), uses that (preserves source-specific
    logic like LSL resolve or serial port open). Otherwise falls back to
    disconnect + connect. Either way the source is connected ONCE — buffers
    are then (re)allocated from the returned StreamInfo.

    Holds `self._lock` for the whole swap to prevent the acquire loop
    from reading through a half-torn state.
    """
    with self._lock:
        self._connected = False
        self._display_n = 0
        self._m4_n = 0
        self.status = "disconnected"

        try:
            if hasattr(self._source, "reconnect"):
                # `reconnect` is an optional Source extension (not in the
                # Protocol); the hasattr guard makes the call safe.
                self.info = self._source.reconnect(target)  # type: ignore
            else:
                self._source.disconnect()
                self.info = self._source.connect()
        except Exception as e:
            self.last_error = str(e)
            return False

        # Re-init buffers for potentially different channel count/fs
        # (no double-connect — source was already (re)connected above).
        self._allocate_buffers()
        return True

start

start() -> None

Start the acquisition loop (a daemon thread, or a per-frame task in the browser).

Source code in myogestic/stream.py
def start(self) -> None:
    """Start the acquisition loop (a daemon thread, or a per-frame task in the browser)."""
    self._running = True
    if _IS_BROWSER:
        # Pyodide: no OS threads, and asyncio tasks scheduled here
        # don't dispatch (immapp.run blocks Python inside the
        # Emscripten main loop). Register one step with the
        # per-frame scheduler instead - the App's GUI callback
        # ticks it every frame.
        from myogestic._browser import register

        register(lambda: self._acquire_step() if self._running else 1.0)
    else:
        self._thread = threading.Thread(target=self._acquire_loop, daemon=True)
        self._thread.start()

stop

stop() -> None

Stop the acquisition loop and disconnect the source (errors suppressed).

Source code in myogestic/stream.py
def stop(self) -> None:
    """Stop the acquisition loop and disconnect the source (errors suppressed)."""
    self._running = False
    try:
        self._source.disconnect()
    except Exception:
        pass

attach_session

attach_session(session: Session) -> None

Begin recording this stream into session.

Called by :meth:App.start_recording. Set under _session_lock so the acquire loop sees a fully-attached session atomically.

Source code in myogestic/stream.py
def attach_session(self, session: Session) -> None:
    """Begin recording this stream into ``session``.

    Called by :meth:`App.start_recording`. Set under ``_session_lock``
    so the acquire loop sees a fully-attached session atomically.
    """
    with self._session_lock:
        self._session = session

detach_session

detach_session() -> None

Stop recording this stream (called by :meth:App.stop_recording).

Holding _session_lock makes this wait for any append currently in flight on the acquire thread and guarantees no further append can start. Once this returns, the caller may safely finalise/clear the session's Zarr stores without racing the acquire loop.

Source code in myogestic/stream.py
def detach_session(self) -> None:
    """Stop recording this stream (called by :meth:`App.stop_recording`).

    Holding ``_session_lock`` makes this *wait for* any append currently
    in flight on the acquire thread and guarantees no further append can
    start. Once this returns, the caller may safely finalise/clear the
    session's Zarr stores without racing the acquire loop.
    """
    with self._session_lock:
        self._session = None

get_window

get_window() -> tuple[ndarray, ndarray]

Return the most recent window_ms as (data, ts).

data is channels-first (n_channels, n_samples) — the same convention used everywhere user code touches signal data — and always float32, whatever dtype the stream buffers in, so @pipeline.extract never has to branch on dtype. ts is a view into a reusable per-stream buffer; for a float32 stream data is a view too, otherwise a fresh float32 copy. Copy explicitly if you need to retain either past the next call.

Source code in myogestic/stream.py
def get_window(self) -> tuple[np.ndarray, np.ndarray]:
    """Return the most recent ``window_ms`` as ``(data, ts)``.

    ``data`` is **channels-first** ``(n_channels, n_samples)`` — the
    same convention used everywhere user code touches signal data —
    and **always float32**, whatever dtype the stream buffers in, so
    ``@pipeline.extract`` never has to branch on dtype. ``ts`` is a
    view into a reusable per-stream buffer; for a float32 stream
    ``data`` is a view too, otherwise a fresh float32 copy. Copy
    explicitly if you need to retain either past the next call.
    """
    if self._data is None or self._timestamps is None or self.info is None:
        return (
            np.empty((0, 0), dtype=np.float32),
            np.empty(0, dtype=np.float64),
        )
    with self._lock:
        nd = _unwrap_ring_into(self._data, self._win_d, self._cap)
        _unwrap_ring_into(self._timestamps, self._win_t, self._cap)
    if nd == 0:
        return self._win_d[:0].T.astype(np.float32, copy=False), self._win_t[:0]
    n = int(self._window * self.info.fs)
    if nd < n:
        return self._win_d[:nd].T.astype(np.float32, copy=False), self._win_t[:nd]
    return (
        self._win_d[nd - n : nd].T.astype(np.float32, copy=False),
        self._win_t[nd - n : nd],
    )

get_display

get_display(n_pixels: int = 800) -> tuple[ndarray, ndarray] | None

Read pre-computed M4 result. Zero work on render thread.

The acquire thread computes M4 in _update_display_snapshot. This just reads the result via atomic ref (GIL-safe).

Source code in myogestic/stream.py
def get_display(self, n_pixels: int = 800) -> tuple[np.ndarray, np.ndarray] | None:
    """Read pre-computed M4 result. Zero work on render thread.

    The acquire thread computes M4 in _update_display_snapshot.
    This just reads the result via atomic ref (GIL-safe).
    """
    # Update target resolution for next snapshot
    self._m4_n_pixels = n_pixels
    n = self._m4_n
    if n < 2:
        return None
    return self._m4_t[:n], self._m4_d[:n]

get_raw_snapshot

get_raw_snapshot() -> tuple[ndarray, ndarray] | None

Lock-free read of the display snapshot.

Source code in myogestic/stream.py
def get_raw_snapshot(self) -> tuple[np.ndarray, np.ndarray] | None:
    """Lock-free read of the display snapshot."""
    n = self._display_n
    if n < 2:
        return None
    return self._display_t[:n], self._display_d[:n]

last_timestamp

last_timestamp() -> float | None

Most recent sample timestamp, or None if no samples yet.

Holds _lock while reading _display_t[_display_n-1] so a concurrent reconnect() (which zeroes _display_n and reallocates _display_t) cannot strand the read on a torn buffer.

Source code in myogestic/stream.py
def last_timestamp(self) -> float | None:
    """Most recent sample timestamp, or None if no samples yet.

    Holds `_lock` while reading `_display_t[_display_n-1]` so a concurrent
    `reconnect()` (which zeroes `_display_n` and reallocates `_display_t`)
    cannot strand the read on a torn buffer.
    """
    with self._lock:
        n = self._display_n
        if n < 1 or self._display_t.shape[0] < n:
            return None
        ts = float(self._display_t[n - 1])
    return ts if ts > 0.0 else None

StreamInfo dataclass

StreamInfo(n_channels: int, fs: float, dtype: dtype = dtype(float32), channel_names: list[str] | None = None)

Describes the shape and dtype of a :class:Source's data.

Returned by :meth:Source.connect. The framework uses it to size the ring buffer, lay out the signal viewer, and decide how to serialise the stream when recording.

Attributes:

Name Type Description
n_channels int

Channel count. Fixed for the life of the source.

fs float

Sample rate in Hz. Used to convert window_ms / buffer_ms into sample counts.

dtype dtype

NumPy dtype of each sample, one of :data:SUPPORTED_DTYPES. Defaults to float32. Accepts anything :func:numpy.dtype understands ("int16", np.int16, np.dtype("int16")) and normalises it. Picking a compact dtype (e.g. int16) keeps the ring buffer and Zarr recording small; the window handed to @pipeline.extract is always upcast to float32 regardless, so feature/ML code never has to branch on dtype.

channel_names list[str] | None

Optional per-channel labels for the signal viewer legend. None (default) renders as ch0, ch1, ...

TrainingData dataclass

TrainingData(paths: list[str] = list(), class_names: list[str] = list(), classes: set[int] = set())

Inputs delivered to the user's @pipeline.train callback.

Built by session_manager() and assigned by the user to pipeline.training_data from inside @app.ui::

@app.ui
def ui(ctx):
    pipeline.training_data = session_manager(...)

Attributes:

Name Type Description
paths list[str]

Session locations (folders or .session.zip archives).

class_names list[str]

Human-readable labels — same list passed to recording_controls / session_manager.

classes set[int]

Active class indices to include. Pass as the classes= arg to iter_labeled_windows / iter_aligned_windows.

is_empty property

is_empty: bool

True when no session paths have been assigned yet.

Layout

Grid

Grid(rows: int, cols: int, row_height: list[Track] | None = None, col_width: list[Track] | None = None)

Grid layout manager. Index with [row, col] or [row, col_start:col_end].

Both axes accept the same Px/Fr track specs. See module docstring for examples.

Parameters:

Name Type Description Default
rows int

Number of rows.

required
cols int

Number of columns.

required
row_height list[Track] | None

Per-row track specs (length must equal rows). Default None → all rows share equally ([Fr(1)] * rows).

None
col_width list[Track] | None

Per-column track specs (length must equal cols). Default None → all columns share equally ([Fr(1)] * cols).

None

Raises:

Type Description
ValueError

if a list length doesn't match rows / cols, or if any track value is non-finite or negative.

TypeError

if a track entry isn't Px, Fr, or a number.

Source code in myogestic/grid.py
def __init__(
    self,
    rows: int,
    cols: int,
    row_height: list[Track] | None = None,
    col_width: list[Track] | None = None,
):
    if not isinstance(rows, int) or isinstance(rows, bool) or rows <= 0:
        raise ValueError(f"rows must be a positive int, got {rows!r}")
    if not isinstance(cols, int) or isinstance(cols, bool) or cols <= 0:
        raise ValueError(f"cols must be a positive int, got {cols!r}")
    self.rows = rows
    self.cols = cols
    self._row_tracks: list[Track] = self._normalize(row_height, rows, "row_height")
    self._col_tracks: list[Track] = self._normalize(col_width, cols, "col_width")
    self._scaled_heights: list[float] = [0.0] * rows
    self._last_frame: int = -1

end_frame

end_frame() -> None

No-op. Frame reset is handled automatically via frame counter.

Source code in myogestic/grid.py
def end_frame(self) -> None:
    """No-op. Frame reset is handled automatically via frame counter."""

Px dataclass

Px(value: float)

Fixed pixel size. Px(300) means "exactly 300 px wide/tall".

Fr dataclass

Fr(value: float)

Fractional unit (CSS-grid fr).

Fr(1) means "1 share of the space remaining after :class:Px tracks are subtracted". Multiple Fr entries split the remainder proportionally to their values, so [Fr(1), Fr(2)] splits leftover space 1:2.

Track module-attribute

Track = Px | Fr

Event helpers

EdgeTrigger

EdgeTrigger(callback: Callable[[T], None], *, n_stable_ticks: int = 1)

Calls callback(value) only when value differs from the last fire.

Parameters:

Name Type Description Default
callback Callable[[T], None]

Invoked with the new value when an edge fires.

required
n_stable_ticks int

Debounce: the new value must hold for this many consecutive :meth:fire_if_changed calls before firing. 1 (default) fires immediately on the first changed value — i.e. dedupe only. Use >1 to swallow tick-to-tick flicker (e.g. a classifier's argmax oscillating during a sliding-window transition) so the side effect isn't re-fired on every flip. It counts calls, not time — convert a duration with the loop rate: n_stable_ticks=math.ceil(seconds * predict_hz).

1
Notes

Thread-safety: the typical pattern is "one writer (predict thread) + occasional rebase() from the UI thread". The whole (last, candidate, count) state is held in one tuple replaced in a single assignment, so under CPython's GIL no lock is needed; a race between the two callers can at worst cost one extra suppressed-or-fired callback — harmless for the intended uses (RPC dedup, audio-cue gating, robot-movement commands).

Source code in myogestic/outputs/edge_trigger.py
def __init__(self, callback: Callable[[T], None], *, n_stable_ticks: int = 1) -> None:
    if n_stable_ticks < 1:
        raise ValueError(f"n_stable_ticks must be >= 1 (got {n_stable_ticks})")
    self._callback = callback
    self._n_stable_ticks = n_stable_ticks
    # (last_fired, pending_candidate, candidate_count) — replaced atomically.
    self._state: tuple[T | None, T | None, int] = (None, None, 0)

last property

last: T | None

The most recently fired (or rebased) value; None before first fire.

fire_if_changed

fire_if_changed(value: T) -> bool

Fire the callback when value becomes a new, stable value.

Fires iff value differs from the last fired value and (when n_stable_ticks > 1) has held for n_stable_ticks consecutive calls. Returns True when the callback ran, False when suppressed.

Source code in myogestic/outputs/edge_trigger.py
def fire_if_changed(self, value: T) -> bool:
    """Fire the callback when ``value`` becomes a new, stable value.

    Fires iff ``value`` differs from the last fired value and (when
    ``n_stable_ticks > 1``) has held for ``n_stable_ticks``
    consecutive calls. Returns ``True`` when the callback ran,
    ``False`` when suppressed.
    """
    last, candidate, count = self._state
    if value == last:
        # Back to the current value — drop any half-formed candidate.
        if candidate is not None:
            self._state = (last, None, 0)
        return False
    count = count + 1 if value == candidate else 1
    if count >= self._n_stable_ticks:
        self._state = (value, None, 0)
        self._callback(value)
        return True
    self._state = (last, value, count)
    return False

rebase

rebase(value: T) -> None

Set the "last fired" value without firing.

Discards any pending debounce candidate. Use when another code path already performed the equivalent action; the next different value must then earn the full n_stable_ticks count, so a flicker candidate in progress can't complete on top of the manual one.

Source code in myogestic/outputs/edge_trigger.py
def rebase(self, value: T) -> None:
    """Set the "last fired" value without firing.

    Discards any pending debounce candidate. Use when another code
    path already performed the equivalent action; the
    next *different* value must then earn the full ``n_stable_ticks`` count, so
    a flicker candidate in progress can't complete on top of the manual one.
    """
    self._state = (value, None, 0)

Built-in features

features

Classic time-domain EMG features — the starter set every example used to copy-paste.

Use as-is, mix with your own, or replace entirely::

from myogestic.recipes.features import rms, mav, wl
from myogestic.widgets import FeatureSelector

feats = FeatureSelector(
    {"RMS": rms, "MAV": mav, "WL": wl, "MyCustom": my_custom_fn},
    default=["RMS", "MAV"],
)

All take an EMG window of shape (n_channels, n_samples) and return a per-channel scalar vector (n_channels,) of dtype float32.

rms

rms(emg: ndarray) -> ndarray

Root mean square per channel.

Source code in myogestic/recipes/features.py
def rms(emg: np.ndarray) -> np.ndarray:
    """Root mean square per channel."""
    return np.sqrt(np.mean(emg**2, axis=1)).astype(np.float32)

mav

mav(emg: ndarray) -> ndarray

Mean absolute value per channel.

Source code in myogestic/recipes/features.py
def mav(emg: np.ndarray) -> np.ndarray:
    """Mean absolute value per channel."""
    return np.mean(np.abs(emg), axis=1).astype(np.float32)

wl

wl(emg: ndarray) -> ndarray

Waveform length per channel — sum of absolute first differences.

Source code in myogestic/recipes/features.py
def wl(emg: np.ndarray) -> np.ndarray:
    """Waveform length per channel — sum of absolute first differences."""
    return np.sum(np.abs(np.diff(emg, axis=1)), axis=1).astype(np.float32)

var

var(emg: ndarray) -> ndarray

Variance per channel.

Source code in myogestic/recipes/features.py
def var(emg: np.ndarray) -> np.ndarray:
    """Variance per channel."""
    return np.var(emg, axis=1).astype(np.float32)

zc

zc(emg: ndarray) -> ndarray

Zero-crossing count per channel.

Source code in myogestic/recipes/features.py
def zc(emg: np.ndarray) -> np.ndarray:
    """Zero-crossing count per channel."""
    sign_flips = np.diff(np.signbit(emg), axis=1)
    return np.sum(sign_flips, axis=1).astype(np.float32)

External interfaces

virtual_hand

virtual_hand(godot_bin: str | None = None, vhi_path: str | None = None, grpc_host: str | None = None, grpc_port: int | None = None, launch_mode: str | None = None) -> InterfaceSpec

The MyoGestic Virtual Hand Interface (VHI).

Parameters:

Name Type Description Default
godot_bin str | None

Path to the Godot binary, for source-mode launch. Falls back to $GODOT_BIN, then which("godot4") / which("godot"), then platform GUI defaults.

None
vhi_path str | None

Directory containing VHI (binary install OR Godot project). Falls back to $VHI_PATH, then the default install root — <repo>/tools/MyoGestic-VHI in a git checkout, otherwise <user_data>/myogestic/vhi.

None
grpc_host str | None

VHI gRPC host. Falls back to $VHI_GRPC_HOST then 127.0.0.1.

None
grpc_port int | None

VHI gRPC port. Falls back to $VHI_GRPC_PORT then 50051.

None
launch_mode str | None

Launch mode — "binary", "godot", or "auto" (default). Also reads $VHI_LAUNCH_MODE. Explicit launch_mode always wins.

None

Returns:

Type Description
An ``InterfaceSpec`` with the resolved argv, ready to wire into
``process_launcher()``. If VHI isn't installed yet, ``launcher()`` raises
a ``FileNotFoundError`` pointing at ``install_vhi``.
Source code in myogestic/vhi/interfaces.py
def virtual_hand(
    godot_bin: str | None = None,
    vhi_path: str | None = None,
    grpc_host: str | None = None,
    grpc_port: int | None = None,
    launch_mode: str | None = None,
) -> InterfaceSpec:
    """The MyoGestic Virtual Hand Interface (VHI).

    Parameters
    ----------
    godot_bin
        Path to the Godot binary, for source-mode launch. Falls
        back to ``$GODOT_BIN``, then ``which("godot4")`` /
        ``which("godot")``, then platform GUI defaults.
    vhi_path
        Directory containing VHI (binary install OR Godot project).
        Falls back to ``$VHI_PATH``, then the default install root —
        ``<repo>/tools/MyoGestic-VHI`` in a git checkout, otherwise
        ``<user_data>/myogestic/vhi``.
    grpc_host
        VHI gRPC host. Falls back to ``$VHI_GRPC_HOST`` then
        ``127.0.0.1``.
    grpc_port
        VHI gRPC port. Falls back to ``$VHI_GRPC_PORT`` then
        ``50051``.
    launch_mode
        Launch mode — ``"binary"``, ``"godot"``, or ``"auto"`` (default).
        Also reads ``$VHI_LAUNCH_MODE``. Explicit ``launch_mode`` always wins.

    Returns
    -------
    An ``InterfaceSpec`` with the resolved argv, ready to wire into
    ``process_launcher()``. If VHI isn't installed yet, ``launcher()`` raises
    a ``FileNotFoundError`` pointing at ``install_vhi``.
    """
    install_root = Path(vhi_path or os.environ.get("VHI_PATH") or _default_install_root())
    launch_mode = launch_mode or os.environ.get("VHI_LAUNCH_MODE", "auto")
    if launch_mode not in ("auto", "binary", "godot"):
        raise ValueError(f"launch_mode must be 'auto', 'binary', or 'godot'; got {launch_mode!r}")
    grpc_host = grpc_host or os.environ.get("VHI_GRPC_HOST", "127.0.0.1")
    if grpc_port is None:
        grpc_port = int(os.environ.get("VHI_GRPC_PORT", "50051"))

    process = _resolve_vhi_launch(install_root, godot_bin, launch_mode)

    return InterfaceSpec(
        name="VHI Hand",
        process=process,
        output_stream_name="MyoGestic_Output",
        n_output_channels=9,
        output_hz=32.0,
        control_stream_name="VHI_Control",
        n_control_channels=9,
        control_pose_stream_name="MyoGestic_ControlPose",
        n_control_pose_channels=9,
        control_pose_hz=32.0,
        grpc_host=grpc_host,
        grpc_port=grpc_port,
        install_root=install_root,
    )

InterfaceSpec dataclass

InterfaceSpec(name: str, process: list[str], output_stream_name: str, n_output_channels: int, output_hz: float, control_stream_name: str | None = None, n_control_channels: int | None = None, control_pose_stream_name: str | None = None, n_control_pose_channels: int | None = None, control_pose_hz: float | None = None, grpc_host: str = '127.0.0.1', grpc_port: int = 50051, install_root: Path | None = None)

Description of an external visual-feedback interface (e.g. VHI).

Attributes:

Name Type Description
name str

Human label, used as the process_launcher row title.

process list[str]

argv to spawn the interface (passed to subprocess.Popen). An empty list means "VHI not installed" — launcher() surfaces a friendly error pointing at install_vhi rather than letting Popen fail mysteriously.

output_stream_name str

LSL outlet name the interface listens on.

n_output_channels int

Number of channels in the output vector.

output_hz float

Outlet send rate.

control_stream_name str | None

LSL inlet name the interface publishes when the user drives it manually (used for regression targets). May be None.

n_control_channels int | None

Channel count of the control stream, if known.

control_pose_stream_name str | None

LSL outlet name for streaming a continuous pose TO the interface's control hand (opt-in; consumed only when VHI is in STREAM control mode). Opposite direction to control_stream_name.

n_control_pose_channels int | None

Channel count of the control-pose outlet.

control_pose_hz float | None

Send rate of the control-pose outlet.

grpc_host str

VHI gRPC control-server host.

grpc_port int

VHI gRPC control-server port.

install_root Path | None

The directory we resolved process from. Carried so the "not installed" error can quote it.

outlet

outlet() -> LSLOutlet

Construct an LSLOutlet matching this interface's output stream.

Source code in myogestic/vhi/interfaces.py
def outlet(self) -> LSLOutlet:
    """Construct an LSLOutlet matching this interface's output stream."""
    return LSLOutlet(
        name=self.output_stream_name,
        n_channels=self.n_output_channels,
        hz=self.output_hz,
    )

control_client

control_client() -> VhiControlClient

Construct a gRPC control client for this interface.

Imported lazily so a plain install (no [grpc] extra) can still use outlet() / launcher() without grpcio present.

Source code in myogestic/vhi/interfaces.py
def control_client(self) -> VhiControlClient:
    """Construct a gRPC control client for this interface.

    Imported lazily so a plain install (no ``[grpc]`` extra) can still use
    ``outlet()`` / ``launcher()`` without grpcio present.
    """
    from myogestic.vhi._client import VhiControlClient

    return VhiControlClient(host=self.grpc_host, port=self.grpc_port)

control_outlet

control_outlet() -> LSLOutlet

Construct an :class:LSLOutlet for streaming a continuous pose to the control hand.

Opt-in: only consumed when VHI is put in STREAM control mode via control_client().set_control_mode("STREAM"). Raises :class:ValueError if this interface has no control-pose stream configured.

Source code in myogestic/vhi/interfaces.py
def control_outlet(self) -> LSLOutlet:
    """Construct an :class:`LSLOutlet` for streaming a continuous pose to the control hand.

    Opt-in: only consumed when VHI is put in STREAM control mode via
    ``control_client().set_control_mode("STREAM")``. Raises
    :class:`ValueError` if this interface has no control-pose stream
    configured.
    """
    if self.control_pose_stream_name is None:
        raise ValueError(f"{self.name}: no control_pose_stream_name configured")
    return LSLOutlet(
        name=self.control_pose_stream_name,
        n_channels=self.n_control_pose_channels or self.n_output_channels,
        hz=self.control_pose_hz or self.output_hz,
    )

launcher

launcher() -> list[tuple[str, list[str]]]

Return the (name, argv) tuple list expected by process_launcher.

Raises FileNotFoundError with an install_vhi hint when VHI is not installed at the resolved location — better than a silent Popen failure on first run.

Source code in myogestic/vhi/interfaces.py
def launcher(self) -> list[tuple[str, list[str]]]:
    """Return the (name, argv) tuple list expected by `process_launcher`.

    Raises ``FileNotFoundError`` with an ``install_vhi`` hint when VHI
    is not installed at the resolved location — better than a silent
    ``Popen`` failure on first run.
    """
    if not self.process:
        location = f" at {self.install_root}" if self.install_root else ""
        raise FileNotFoundError(
            f"{self.name}: not installed{location}.\n"
            f"  Run `python -m myogestic.tools.install_vhi` to fetch the "
            f"latest release for this platform.\n"
            f"  Or set $VHI_PATH to an existing VHI Godot project and "
            f"$GODOT_BIN to a Godot 4.x binary for source-mode."
        )
    return [(self.name, list(self.process))]

Tools

control_outlet

control_outlet(name: str = DEFAULT_CONTROL_STREAM) -> StreamOutlet

LSL outlet for steering the EMG generator from another script.

The generator listens on a stream named name for a single float (channel = 1) that selects the next gesture amplitude — typically 0.0 (rest) … 1.0 (full). Push samples like::

from myogestic.tools.emg_generator import control_outlet
out = control_outlet()
out.push_sample(np.array([0.0], dtype=np.float32))  # rest
out.push_sample(np.array([1.0], dtype=np.float32))  # fist

Matches the protocol the --control flag on python -m myogestic.tools.emg_generator listens for.

Source code in myogestic/tools/emg_generator.py
def control_outlet(name: str = DEFAULT_CONTROL_STREAM) -> StreamOutlet:
    """LSL outlet for steering the EMG generator from another script.

    The generator listens on a stream named ``name`` for a single float
    (channel = 1) that selects the next gesture amplitude — typically
    ``0.0`` (rest) … ``1.0`` (full). Push samples like::

        from myogestic.tools.emg_generator import control_outlet
        out = control_outlet()
        out.push_sample(np.array([0.0], dtype=np.float32))  # rest
        out.push_sample(np.array([1.0], dtype=np.float32))  # fist

    Matches the protocol the ``--control`` flag on
    ``python -m myogestic.tools.emg_generator`` listens for.
    """
    return StreamOutlet(
        sinfo=StreamInfo(
            name=name,
            stype="Control",
            n_channels=1,
            sfreq=0,
            dtype="float32",
            source_id="ctrl",
        )
    )

myogestic.tools.install_vhi

Install the Virtual Hand Interface release binary for this platform.

VHI ships pre-built artifacts on every release at https://github.com/NsquaredLab/MyoGestic-VHI/releases. This CLI picks the right asset for the host OS/arch, downloads it, unpacks it into the location virtual_hand() looks at, and drops a vhi-version.txt marker so a later install knows what's already there.

Usage: python -m myogestic.tools.install_vhi # latest, default dest python -m myogestic.tools.install_vhi --tag v1.0.0 # pinned version python -m myogestic.tools.install_vhi --dest /custom/path python -m myogestic.tools.install_vhi --force # reinstall over existing

Or after pip install myogestic: myogestic-install-vhi

Pin --tag in production: latest is convenient for a fresh checkout but not reproducible — a downstream rebuild months later may pick up a different VHI version with subtly different behaviour.

main

main() -> None

Console-script entry point (myogestic-install-vhi).

Source code in myogestic/tools/install_vhi.py
def main() -> None:
    """Console-script entry point (``myogestic-install-vhi``)."""
    typer.run(_install)