Skip to content

Core API

App lifecycle

App

App(name: str, theme: bool = True, docking: bool = False, ui_scale: float | None = None)

Top-level application object. Owns the GUI loop, the Context, the run-loop lifecycle hooks, and the recording state machine.

Construct one per process. Register streams via app.streams(...), register your UI via @app.ui, then call app.run(). Optional extensions like Pipeline(app) register themselves via app.before_run_hooks / app.cleanup_hooks - user code rarely needs to touch those lists directly.

Parameters:

Name Type Description Default
name str

Window title. Also used for the persisted ImGui state file (.imgui_state/<name>.ini) when docking=True.

required
theme bool

Apply MyoGestic's built-in ImGui theme. Set False to keep the Dear ImGui defaults.

True
docking bool

Experimental - enable ImGui docking + multi-viewport so panels registered via app.popout(...) become tearable DockableWindows. macOS Retina viewport sizing of detached windows can be wrong on initial draw; treat as experimental.

False
ui_scale float | None

Global UI zoom factor - scales the font and imgui's style metrics (padding, spacing, rounding). None uses $MYOGESTIC_UI_SCALE then 1.0. The env var, if set, overrides this - a per-machine display fix beats the example's value. Clamped to [0.5, 2.0]. Has no effect when theme=False.

None
Source code in myogestic/core.py
def __init__(self, name: str, theme: bool = True, docking: bool = False,
             ui_scale: float | None = None):
    self.name = name
    self.ctx = Context()
    self._ui_fn: Callable[[Context], None] | None = None
    self._theme_enabled = theme
    self._ui_scale = ui_scale
    # Experimental: when True, _gui_loop enables ImGui docking +
    # multi-viewport so widgets wrapped in `popout_panel(...)` become
    # tearable, dock-able windows. macOS Metal/Retina caveats apply
    # - see the README "Status" note.
    self._docking = docking
    self._running = False
    # Extensions register here (e.g. myogestic.ml.attach_pipeline).
    # before_run: after streams.start, before _gui_loop, on main thread.
    # cleanup: in finally - always runs, each wrapped in try/except.
    self.before_run_hooks: list[Callable[[App], None]] = []
    self.cleanup_hooks: list[Callable[[App], None]] = []
    self._popout_specs: list[tuple[str, Callable[[], None], bool, bool, bool | None]] = []

streams

streams(*streams: Stream) -> None

Register one or more streams with the app.

Each stream is keyed by its name into ctx.streams. Acquisition threads start when app.run() is called, not at registration time. Calling this with the same name overwrites the previous registration - typically you call it once at setup.

Parameters:

Name Type Description Default
*streams Stream

One or more :class:Stream instances.

()
Source code in myogestic/core.py
def streams(self, *streams: Stream) -> None:
    """Register one or more streams with the app.

    Each stream is keyed by its ``name`` into ``ctx.streams``.
    Acquisition threads start when ``app.run()`` is called, not at
    registration time. Calling this with the same name overwrites
    the previous registration - typically you call it once at setup.

    Args:
        *streams: One or more :class:`Stream` instances.
    """
    for s in streams:
        self.ctx.streams[s.name] = s

bridges

bridges(*bridges: Any) -> None

Register one or more Bridge subprocesses with the app.

Bridges run in their own process (webcam, ultrasound, depth camera, …) and publish an LSL clock stream the main app subscribes to. The cockpit's process_launcher widget shows their start/stop state. Mirrors :meth:streams exactly: each bridge is keyed by its .name into ctx.bridges; calling with the same name overwrites the previous registration.

Parameters:

Name Type Description Default
*bridges Any

One or more bridge instances - each must expose a .name attribute and a Bridge-like interface (.start(), .stop()).

()
Source code in myogestic/core.py
def bridges(self, *bridges: Any) -> None:
    """Register one or more Bridge subprocesses with the app.

    Bridges run in their own process (webcam, ultrasound, depth
    camera, …) and publish an LSL clock stream the main app
    subscribes to. The cockpit's ``process_launcher`` widget shows
    their start/stop state. Mirrors :meth:`streams` exactly: each
    bridge is keyed by its ``.name`` into ``ctx.bridges``; calling
    with the same name overwrites the previous registration.

    Args:
        *bridges: One or more bridge instances - each must expose a
            ``.name`` attribute and a Bridge-like interface
            (``.start()``, ``.stop()``).
    """
    for b in bridges:
        self.ctx.bridges[b.name] = b

ui

ui(fn: Callable[[Context], None]) -> Callable[[Context], None]

Decorator. Register the render callback.

@app.ui def my_ui(ctx): imgui.text(f"State: {ctx.state}")

Source code in myogestic/core.py
def ui(self, fn: Callable[[Context], None]) -> Callable[[Context], None]:
    """Decorator. Register the render callback.

    @app.ui
    def my_ui(ctx):
        imgui.text(f"State: {ctx.state}")
    """
    self._ui_fn = fn
    return fn

popout

popout(title: str, gui_fn: Callable[[], None], *, default_open: bool = True, can_be_closed: bool = True, remember_is_visible: bool | None = None) -> None

Register a dockable window before run().

This is the preferred path for examples/apps using App(docking=True). It gives Hello ImGui the complete DockableWindow list before launch, instead of discovering windows on the first render frame.

Source code in myogestic/core.py
def popout(
    self,
    title: str,
    gui_fn: Callable[[], None],
    *,
    default_open: bool = True,
    can_be_closed: bool = True,
    remember_is_visible: bool | None = None,
) -> None:
    """Register a dockable window before `run()`.

    This is the preferred path for examples/apps using `App(docking=True)`.
    It gives Hello ImGui the complete DockableWindow list before launch,
    instead of discovering windows on the first render frame.
    """
    self._popout_specs = [spec for spec in self._popout_specs if spec[0] != title]
    self._popout_specs.append(
        (title, gui_fn, default_open, can_be_closed, remember_is_visible)
    )

start_recording

start_recording(base_path: str = 'sessions') -> None

Begin recording all connected streams to a new session.

Creates base_path/<timestamp>/ and starts appending each stream's data + timestamps to per-stream Zarr arrays. Streams whose info is still None (disconnected) are skipped - they won't be retroactively captured if they connect later in the recording. Refuses to start if ctx.state isn't "idle"; updates ctx.status_message with the result.

Parameters:

Name Type Description Default
base_path str

Directory where the per-session subfolder is created. Defaults to "sessions".

'sessions'
Source code in myogestic/core.py
def start_recording(self, base_path: str = "sessions") -> None:
    """Begin recording all connected streams to a new session.

    Creates ``base_path/<timestamp>/`` and starts appending each
    stream's data + timestamps to per-stream Zarr arrays. Streams
    whose ``info`` is still ``None`` (disconnected) are skipped -
    they won't be retroactively captured if they connect later in
    the recording. Refuses to start if ``ctx.state`` isn't
    ``"idle"``; updates ``ctx.status_message`` with the result.

    Args:
        base_path: Directory where the per-session subfolder is
            created. Defaults to ``"sessions"``.
    """
    if not can_transition(self.ctx.state, AppState.RECORDING):
        self.ctx.status_message = (
            f"Cannot start recording: state is {self.ctx.state!r}, expected 'idle'."
        )
        return
    # Only record from streams that have connected - a stream with info=None
    # has no zarr schema and would fail on append. Disconnected streams are
    # skipped; if they connect later they won't be retroactively captured.
    self.ctx.state = AppState.RECORDING
    self.ctx.session = Session(base_path=base_path)
    n_ready = 0
    for name, stream in self.ctx.streams.items():
        if stream.info is None:
            continue
        self.ctx.session.init_stream(name, stream.info)
        stream._session = self.ctx.session
        n_ready += 1
    if n_ready == 0:
        self.ctx.status_message = "No connected streams to record"
        self.ctx.log("Recording: no connected streams")
    else:
        self.ctx.status_message = f"Recording to {self.ctx.session.path}"
        self.ctx.log(f"Recording → {self.ctx.session.path}")

stop_recording

stop_recording() -> None

Stop the active recording and pack the session to a .session.zip.

Finalises the per-stream Zarr arrays, writes the label track to labels.json, and kicks off a daemon thread that packs the session folder into a single <timestamp>.session.zip archive (the original folder is kept until the pack succeeds). Refuses to stop if ctx.state isn't "recording".

Source code in myogestic/core.py
def stop_recording(self) -> None:
    """Stop the active recording and pack the session to a ``.session.zip``.

    Finalises the per-stream Zarr arrays, writes the label track to
    ``labels.json``, and kicks off a daemon thread that packs the
    session folder into a single ``<timestamp>.session.zip`` archive
    (the original folder is kept until the pack succeeds). Refuses
    to stop if ``ctx.state`` isn't ``"recording"``.
    """
    if not can_transition(self.ctx.state, AppState.IDLE):
        self.ctx.status_message = (
            f"Cannot stop recording: state is {self.ctx.state!r}, expected 'recording'."
        )
        return
    self.ctx.state = AppState.IDLE
    for stream in self.ctx.streams.values():
        stream._session = None
    if self.ctx.session is not None:
        session = self.ctx.session
        session.save_meta(self.name, class_names=self.ctx.class_names or None)
        n = len(session.label_track)
        self.ctx.status_message = f"Saved {n} labels - finalizing…"

        # Pack session folder → .session.zip in a daemon thread so the
        # UI stays responsive during finalization. Register with
        # session_manager only after pack succeeds.
        import threading

        def _finalize() -> None:
            try:
                zip_path = session.pack_to_zip()
                self.ctx.status_message = f"Saved {n} labels"
                self.ctx.log(f"Session saved → {zip_path}")
                from myogestic.widgets.session_manager import add_recorded_session
                add_recorded_session(str(zip_path))
                log.info("packed session to %s", zip_path)
            except Exception as e:
                log.exception("pack_to_zip failed: %s", e)
                self.ctx.status_message = f"Pack failed: {e} - folder kept"
                self.ctx.log(f"Pack failed: {e} - folder kept at {session.path}")
                # Fall back: register the folder so user doesn't lose it
                try:
                    from myogestic.widgets.session_manager import (
                        add_recorded_session,
                    )
                    add_recorded_session(str(session.path))
                except Exception:
                    pass

        threading.Thread(target=_finalize, daemon=True).start()

run

run(mode: str = 'gui', window_size: tuple[int, int] = (1280, 800), fullscreen: bool = False) -> None

Blocking entry point.

Call tree (top → bottom = runtime order):

App.run()
├─ 1. Stream.start()          per stream → daemon acquire thread
├─ 2. before_run_hooks(app)  extensions register here
│    └─ e.g. myogestic.ml.attach_pipeline → starts predict thread
├─ 3. self._gui_loop()  ← main thread, BLOCKS
│    └─ immapp.run → per frame: self._ui_fn(self.ctx)  (your @app.ui)
└─ 4. [finally] cleanup - always runs, even on startup failure
     ├─ cleanup_hooks(app)   each wrapped in try/except
     ├─ Stream.stop()         per stream
     ├─ Bridge.stop()         per bridge
     └─ process_launcher._cleanup_all()

Core has only idle ↔ recording. myogestic.ml.attach_pipeline(app) adds training/predicting states + their transition methods.

Source code in myogestic/core.py
def run(
    self,
    mode: str = "gui",
    window_size: tuple[int, int] = (1280, 800),
    fullscreen: bool = False,
) -> None:
    """Blocking entry point.

    Call tree (top → bottom = runtime order):

        App.run()
        ├─ 1. Stream.start()          per stream → daemon acquire thread
        ├─ 2. before_run_hooks(app)  extensions register here
        │    └─ e.g. myogestic.ml.attach_pipeline → starts predict thread
        ├─ 3. self._gui_loop()  ← main thread, BLOCKS
        │    └─ immapp.run → per frame: self._ui_fn(self.ctx)  (your @app.ui)
        └─ 4. [finally] cleanup - always runs, even on startup failure
             ├─ cleanup_hooks(app)   each wrapped in try/except
             ├─ Stream.stop()         per stream
             ├─ Bridge.stop()         per bridge
             └─ process_launcher._cleanup_all()

    Core has only idle ↔ recording. `myogestic.ml.attach_pipeline(app)` adds
    training/predicting states + their transition methods.
    """
    if self._running:
        raise RuntimeError("App.run() is not re-entrant")
    if mode not in ("gui", "headless"):
        raise ValueError(
            f"App.run(mode={mode!r}) - unknown mode. "
            "Supported: 'gui', 'headless'."
        )
    self._running = True

    log.info("run() - streams=%s mode=%s", list(self.ctx.streams), mode)

    from myogestic._browser import IS_BROWSER

    def _do_cleanup() -> None:
        log.info("run() - cleanup")
        for hook in self.cleanup_hooks:
            try:
                hook(self)
            except Exception as e:
                log.exception("cleanup hook %r failed: %s", hook, e)
        for stream in self.ctx.streams.values():
            try:
                stream.stop()
            except Exception as e:
                log.exception("stream stop failed: %s", e)
        for bridge in self.ctx.bridges.values():
            try:
                bridge.stop()
            except Exception as e:
                log.exception("bridge stop failed: %s", e)
        try:
            from myogestic.widgets.process_launcher import _cleanup_all
            _cleanup_all()
        except Exception as e:
            log.exception("process cleanup failed: %s", e)
        self._running = False

    try:
        for name, stream in self.ctx.streams.items():
            log.info("  start stream %r", name)
            stream.start()

        for hook in self.before_run_hooks:
            log.info("  before_run hook: %s", getattr(hook, "__qualname__", hook))
            hook(self)

        log.info("  enter %s loop", mode)
        if mode == "gui":
            self._gui_loop(window_size=window_size, fullscreen=fullscreen)
        elif mode == "headless":
            self._headless_loop()
    except BaseException:
        # Always tear down on error - half-started state must not
        # leak (orphan threads on desktop, orphan scheduler entries
        # in browser).
        _do_cleanup()
        raise

    if IS_BROWSER:
        # In Pyodide, immapp.run returns immediately and the
        # browser's requestAnimationFrame drives the GUI from here.
        # Skipping cleanup keeps the app alive after this function
        # returns; tab unload is the implicit teardown.
        return

    _do_cleanup()

AppState

Bases: StrEnum

Core app-state values. Extensions (e.g. myogestic.ml.PipelineState) add more.

Context.state is a bare str so extensions can introduce their own states without subclassing. Each module validates transitions within its own namespace only.

Context dataclass

Context(streams: dict[str, Stream] = dict(), bridges: dict[str, Any] = dict(), state: str = IDLE, session: Session | None = None, class_names: list[str] = list(), current_label: int = -1, status_message: str = '', logs: list[str] = list())

Shared state all threads read/write. Extensions may add own fields dynamically on the owning App, but Context itself is core-only.

log

log(message: str, max_lines: int = 500) -> None

Append a one-line app event for the log_panel widget.

Bounded to max_lines (oldest dropped). Use for high-level events - recording saved, training start/done, model load - not per-frame chatter. Safe to call from any thread (list.append/pop are GIL-atomic).

Source code in myogestic/core.py
def log(self, message: str, max_lines: int = 500) -> None:
    """Append a one-line app event for the `log_panel` widget.

    Bounded to `max_lines` (oldest dropped). Use for high-level events -
    recording saved, training start/done, model load - not per-frame
    chatter. Safe to call from any thread (list.append/pop are GIL-atomic).
    """
    from time import strftime
    line = f"[{strftime('%H:%M:%S')}] {message}"
    self.logs.append(line)
    if len(self.logs) > max_lines:
        del self.logs[0 : len(self.logs) - max_lines]

Stream

Stream(name: str, source: Source, window_seconds: float, buffer_seconds: float = 10)

A named ring-buffered live stream backed by a :class:Source.

The framework's central data primitive: pair a name ("emg") with a source (LSLSource("TestEMG1")) and a window duration, register the stream with app.streams(...), and the rest of the framework can pull windows (for ML), display snapshots (for the signal viewer), or recorded chunks (for the session) by stream name.

Architecture:

  • One daemon acquisition thread is started per Stream when App.run() begins. It loops source.read(), appends to the ring buffer, refreshes the display snapshot, and (if a recording session is active) appends to the session's Zarr store.
  • Two consumer surfaces are then available concurrently: :meth:get_window (channels-first, exact window-seconds long, consumed by @pipeline.extract) and :meth:get_display (min/max envelope decimated for 60 fps rendering, consumed by signal_viewer).
  • The ring buffer holds the last buffer_seconds of samples so transient consumers (slow extract, momentary GUI hitches) don't lose data.
Example

from myogestic import App, Stream from myogestic.sources import LSLSource app = App("hello") app.streams( ... Stream("emg", source=LSLSource("TestEMG1"), ... window_seconds=1.0, buffer_seconds=10), ... )

See Streams concept for the buffer + decimation model in depth, and Add a custom source for the matching source-side contract.

Live ring-buffered stream with display decimation.

Parameters:

Name Type Description Default
name str

Stream label (also used as the recorded zarr stream key).

required
source Source

Anything implementing the :class:Source protocol.

required
window_seconds float

Duration in seconds of the window returned by :meth:get_window.

required
buffer_seconds float

Ring-buffer depth in seconds. Defaults to 10.

10
Source code in myogestic/stream.py
def __init__(
    self,
    name: str,
    source: Source,
    window_seconds: float,
    buffer_seconds: float = 10,
):
    """Live ring-buffered stream with display decimation.

    Args:
        name: Stream label (also used as the recorded zarr stream key).
        source: Anything implementing the :class:`Source` protocol.
        window_seconds: Duration in **seconds** of the window returned
            by :meth:`get_window`.
        buffer_seconds: Ring-buffer depth in seconds. Defaults to 10.
    """
    self.name = name
    self._source = source
    self._window = float(window_seconds)
    self._buffer_seconds = buffer_seconds
    self._running = False
    self._session: Session | None = None
    self.status = "disconnected"
    self.last_error = ""
    self.info: StreamInfo | None = None
    self._connected = False

    # These are initialized in _connect()
    self._cap: int = 0
    self._data: RingBuffer | None = None
    self._timestamps: RingBuffer | None = None
    self._lock = threading.Lock()
    self._display_d = np.empty(0)
    self._display_t = np.empty(0)
    self._display_n: int = 0
    self._snap_interval: int = 1
    self._samples_since_snap: int = 0
    self._m4_n_pixels: int = 2000
    self._m4_t = np.empty(0, dtype=np.float64)
    self._m4_d = np.empty(0)
    self._m4_n: int = 0
    # Per-stream M4 scratch (was module globals — not thread-safe across streams)
    self._m4_downsampler: object | None = None
    self._m4_work_col: np.ndarray | None = None
    self._m4_work_idx: np.ndarray | None = None
    self._m4_work_d: np.ndarray | None = None
    self._m4_work_t: np.ndarray | None = None
    self._win_d = np.empty(0)
    self._win_t = np.empty(0)

reconnect

reconnect(target: str | None = None) -> bool

Reconnect source. Optionally switch to a different target.

If the source implements reconnect(), uses that (preserves source-specific logic like LSL resolve or serial port open). Otherwise falls back to disconnect + connect. Either way the source is connected ONCE — buffers are then (re)allocated from the returned StreamInfo.

Holds self._lock for the whole swap to prevent the acquire loop from reading through a half-torn state.

Source code in myogestic/stream.py
def reconnect(self, target: str | None = None) -> bool:
    """Reconnect source. Optionally switch to a different target.

    If the source implements reconnect(), uses that (preserves source-specific
    logic like LSL resolve or serial port open). Otherwise falls back to
    disconnect + connect. Either way the source is connected ONCE — buffers
    are then (re)allocated from the returned StreamInfo.

    Holds `self._lock` for the whole swap to prevent the acquire loop
    from reading through a half-torn state.
    """
    with self._lock:
        self._connected = False
        self._display_n = 0
        self._m4_n = 0
        self.status = "disconnected"

        try:
            if hasattr(self._source, "reconnect"):
                self.info = self._source.reconnect(target)
            else:
                self._source.disconnect()
                self.info = self._source.connect()
        except Exception as e:
            self.last_error = str(e)
            return False

        # Re-init buffers for potentially different channel count/fs
        # (no double-connect — source was already (re)connected above).
        self._allocate_buffers()
        return True

get_window

get_window() -> tuple[ndarray, ndarray]

Return the most recent window_seconds as (data, ts).

data is channels-first (n_channels, n_samples) — the same convention used everywhere user code touches signal data. Both arrays are views into a reusable per-stream buffer; copy explicitly if you need to retain them past the next call.

Source code in myogestic/stream.py
def get_window(self) -> tuple[np.ndarray, np.ndarray]:
    """Return the most recent ``window_seconds`` as ``(data, ts)``.

    ``data`` is **channels-first** ``(n_channels, n_samples)`` — the
    same convention used everywhere user code touches signal data.
    Both arrays are views into a reusable per-stream buffer; copy
    explicitly if you need to retain them past the next call.
    """
    if self._data is None or self._timestamps is None or self.info is None:
        return (
            np.empty((0, 0), dtype=np.float32),
            np.empty(0, dtype=np.float64),
        )
    with self._lock:
        nd = _unwrap_ring_into(self._data, self._win_d, self._cap)
        _unwrap_ring_into(self._timestamps, self._win_t, self._cap)
    if nd == 0:
        return self._win_d[:0].T, self._win_t[:0]
    n = int(self._window * self.info.fs)
    if nd < n:
        return self._win_d[:nd].T, self._win_t[:nd]
    return self._win_d[nd - n : nd].T, self._win_t[nd - n : nd]

get_display

get_display(n_pixels: int = 800) -> tuple[ndarray, ndarray] | None

Read pre-computed M4 result. Zero work on render thread.

The acquire thread computes M4 in _update_display_snapshot. This just reads the result via atomic ref (GIL-safe).

Source code in myogestic/stream.py
def get_display(
    self, n_pixels: int = 800
) -> tuple[np.ndarray, np.ndarray] | None:
    """Read pre-computed M4 result. Zero work on render thread.

    The acquire thread computes M4 in _update_display_snapshot.
    This just reads the result via atomic ref (GIL-safe).
    """
    # Update target resolution for next snapshot
    self._m4_n_pixels = n_pixels
    n = self._m4_n
    if n < 2:
        return None
    return self._m4_t[:n], self._m4_d[:n]

get_raw_snapshot

get_raw_snapshot() -> tuple[ndarray, ndarray] | None

Lock-free read of the display snapshot.

Source code in myogestic/stream.py
def get_raw_snapshot(self) -> tuple[np.ndarray, np.ndarray] | None:
    """Lock-free read of the display snapshot."""
    n = self._display_n
    if n < 2:
        return None
    return self._display_t[:n], self._display_d[:n]

last_timestamp

last_timestamp() -> float | None

Most recent sample timestamp, or None if no samples yet.

Holds _lock while reading _display_t[_display_n-1] so a concurrent reconnect() (which zeroes _display_n and reallocates _display_t) cannot strand the read on a torn buffer.

Source code in myogestic/stream.py
def last_timestamp(self) -> float | None:
    """Most recent sample timestamp, or None if no samples yet.

    Holds `_lock` while reading `_display_t[_display_n-1]` so a concurrent
    `reconnect()` (which zeroes `_display_n` and reallocates `_display_t`)
    cannot strand the read on a torn buffer.
    """
    with self._lock:
        n = self._display_n
        if n < 1 or self._display_t.shape[0] < n:
            return None
        ts = float(self._display_t[n - 1])
    return ts if ts > 0.0 else None

StreamInfo dataclass

StreamInfo(n_channels: int, fs: float, dtype: dtype = dtype(float32), channel_names: list[str] | None = None)

Describes the shape and dtype of a :class:Source's data.

Returned by :meth:Source.connect. The framework uses it to size the ring buffer, lay out the signal viewer, and decide how to serialise the stream when recording.

Attributes:

Name Type Description
n_channels int

Channel count. Fixed for the life of the source.

fs float

Sample rate in Hz. Used to convert window_seconds / buffer_seconds into sample counts.

dtype dtype

NumPy dtype of each sample. Defaults to float32; most signal-processing widgets assume this.

channel_names list[str] | None

Optional per-channel labels for the signal viewer legend. None (default) renders as ch0, ch1, ...

TrainingData dataclass

TrainingData(paths: list[str] = list(), class_names: list[str] = list(), classes: set[int] = set())

Inputs delivered to the user's @pipeline.train callback.

Built by session_manager() and assigned by the user to pipeline.training_data from inside @app.ui::

@app.ui
def ui(ctx):
    pipeline.training_data = session_manager(...)

Attributes:

Name Type Description
paths list[str]

Session locations (folders or .session.zip archives).

class_names list[str]

Human-readable labels — same list passed to recording_controls / session_manager.

classes set[int]

Active class indices to include. Pass as the classes= arg to iter_labeled_windows / iter_aligned_windows.

Layout

Grid

Grid(rows: int, cols: int, row_height: list[Track] | None = None, col_width: list[Track] | None = None)

Grid layout manager. Index with [row, col] or [row, col_start:col_end].

Both axes accept the same Px/Fr track specs. See module docstring for examples.

Parameters:

Name Type Description Default
rows int

Number of rows.

required
cols int

Number of columns.

required
row_height list[Track] | None

Per-row track specs (length must equal rows). Default None → all rows share equally ([Fr(1)] * rows).

None
col_width list[Track] | None

Per-column track specs (length must equal cols). Default None → all columns share equally ([Fr(1)] * cols).

None

Raises:

Type Description
ValueError

if a list length doesn't match rows / cols, or if any track value is non-finite or negative.

TypeError

if a track entry isn't Px, Fr, or a number.

Source code in myogestic/grid.py
def __init__(
    self,
    rows: int,
    cols: int,
    row_height: list[Track] | None = None,
    col_width: list[Track] | None = None,
):
    if not isinstance(rows, int) or isinstance(rows, bool) or rows <= 0:
        raise ValueError(f"rows must be a positive int, got {rows!r}")
    if not isinstance(cols, int) or isinstance(cols, bool) or cols <= 0:
        raise ValueError(f"cols must be a positive int, got {cols!r}")
    self.rows = rows
    self.cols = cols
    self._row_tracks: list[Track] = self._normalize(
        row_height, rows, "row_height"
    )
    self._col_tracks: list[Track] = self._normalize(
        col_width, cols, "col_width"
    )
    self._scaled_heights: list[float] = [0.0] * rows
    self._last_frame: int = -1

end_frame

end_frame() -> None

No-op. Frame reset is handled automatically via frame counter.

Source code in myogestic/grid.py
def end_frame(self) -> None:
    """No-op. Frame reset is handled automatically via frame counter."""

Px dataclass

Px(value: float)

Fixed pixel size. Px(300) means "exactly 300 px wide/tall".

Fr dataclass

Fr(value: float)

Fractional unit (CSS-grid fr). Fr(1) means "1 share of the space remaining after :class:Px tracks are subtracted". Multiple Fr entries split the remainder proportionally to their values, so [Fr(1), Fr(2)] splits leftover space 1:2.

Track module-attribute

Track = Union[Px, Fr]

Event helpers

EdgeTrigger

EdgeTrigger(callback: Callable[[T], None])

Bases: Generic[T]

Calls callback(value) only when value differs from the last fire.

Thread-safety: the typical pattern is "one writer (predict thread) + occasional rebase() from the UI thread". Both assignments to self._last are atomic under CPython's GIL, so no explicit lock is needed; the cost is that a race between the two callers can result in one extra suppressed-or-fired callback, which is harmless for the intended use cases (RPC dedup, audio cue gating, etc.).

Source code in myogestic/edge_trigger.py
def __init__(self, callback: Callable[[T], None]) -> None:
    self._callback = callback
    self._last: T | None = None

last property

last: T | None

The most recently fired (or rebased) value; None before first fire.

fire_if_changed

fire_if_changed(value: T) -> bool

Fire iff value differs from the last fired value.

Returns True when the callback ran, False when suppressed.

Source code in myogestic/edge_trigger.py
def fire_if_changed(self, value: T) -> bool:
    """Fire iff ``value`` differs from the last fired value.

    Returns ``True`` when the callback ran, ``False`` when suppressed.
    """
    if value == self._last:
        return False
    self._last = value
    self._callback(value)
    return True

rebase

rebase(value: T) -> None

Set the "last fired" value without firing.

Use when another code path already performed the equivalent action and the trigger should treat that as the new baseline.

Source code in myogestic/edge_trigger.py
def rebase(self, value: T) -> None:
    """Set the "last fired" value without firing.

    Use when another code path already performed the equivalent action
    and the trigger should treat that as the new baseline.
    """
    self._last = value

Built-in features

features

Classic time-domain EMG features — the starter set every example used to copy-paste.

Use as-is, mix with your own, or replace entirely::

from myogestic.contrib.features import rms, mav, wl
from myogestic.widgets import FeatureSelector

feats = FeatureSelector(
    {"RMS": rms, "MAV": mav, "WL": wl, "MyCustom": my_custom_fn},
    default=["RMS", "MAV"],
)

All take an EMG window of shape (n_channels, n_samples) and return a per-channel scalar vector (n_channels,) of dtype float32.

rms

rms(emg: ndarray) -> ndarray

Root mean square per channel.

Source code in myogestic/contrib/features.py
def rms(emg: np.ndarray) -> np.ndarray:
    """Root mean square per channel."""
    return np.sqrt(np.mean(emg ** 2, axis=1)).astype(np.float32)

mav

mav(emg: ndarray) -> ndarray

Mean absolute value per channel.

Source code in myogestic/contrib/features.py
def mav(emg: np.ndarray) -> np.ndarray:
    """Mean absolute value per channel."""
    return np.mean(np.abs(emg), axis=1).astype(np.float32)

wl

wl(emg: ndarray) -> ndarray

Waveform length per channel — sum of absolute first differences.

Source code in myogestic/contrib/features.py
def wl(emg: np.ndarray) -> np.ndarray:
    """Waveform length per channel — sum of absolute first differences."""
    return np.sum(np.abs(np.diff(emg, axis=1)), axis=1).astype(np.float32)

var

var(emg: ndarray) -> ndarray

Variance per channel.

Source code in myogestic/contrib/features.py
def var(emg: np.ndarray) -> np.ndarray:
    """Variance per channel."""
    return np.var(emg, axis=1).astype(np.float32)

zc

zc(emg: ndarray) -> ndarray

Zero-crossing count per channel.

Source code in myogestic/contrib/features.py
def zc(emg: np.ndarray) -> np.ndarray:
    """Zero-crossing count per channel."""
    sign_flips = np.diff(np.signbit(emg), axis=1)
    return np.sum(sign_flips, axis=1).astype(np.float32)

External interfaces

virtual_hand

virtual_hand(godot_bin: str | None = None, vhi_path: str | None = None, grpc_host: str | None = None, grpc_port: int | None = None, mode: str | None = None) -> InterfaceSpec

The MyoGestic Virtual Hand Interface (VHI).

Parameters:

Name Type Description Default
godot_bin str | None

Path to the Godot binary, for source-mode launch. Falls back to $GODOT_BIN, then which("godot4") / which("godot"), then platform GUI defaults.

None
vhi_path str | None

Directory containing VHI (binary install OR Godot project). Falls back to $VHI_PATH, then the default install root — <repo>/tools/MyoGestic-VHI in a git checkout, otherwise <user_data>/myogestic/vhi.

None
grpc_host str | None

VHI gRPC host. Falls back to $VHI_GRPC_HOST then 127.0.0.1.

None
grpc_port int | None

VHI gRPC port. Falls back to $VHI_GRPC_PORT then 50051.

None
mode str | None

Launch mode — "binary", "godot", or "auto" (default). Also reads $VHI_LAUNCH_MODE. Explicit mode always wins.

None

Returns: An InterfaceSpec with the resolved argv, ready to wire into process_launcher(). If VHI isn't installed yet, launcher() raises a FileNotFoundError pointing at install_vhi.

Source code in myogestic/interfaces.py
def virtual_hand(
    godot_bin: str | None = None,
    vhi_path: str | None = None,
    grpc_host: str | None = None,
    grpc_port: int | None = None,
    mode: str | None = None,
) -> InterfaceSpec:
    """The MyoGestic Virtual Hand Interface (VHI).

    Args:
        godot_bin: Path to the Godot binary, for source-mode launch. Falls
            back to ``$GODOT_BIN``, then ``which("godot4")`` /
            ``which("godot")``, then platform GUI defaults.
        vhi_path: Directory containing VHI (binary install OR Godot project).
            Falls back to ``$VHI_PATH``, then the default install root —
            ``<repo>/tools/MyoGestic-VHI`` in a git checkout, otherwise
            ``<user_data>/myogestic/vhi``.
        grpc_host: VHI gRPC host. Falls back to ``$VHI_GRPC_HOST`` then
            ``127.0.0.1``.
        grpc_port: VHI gRPC port. Falls back to ``$VHI_GRPC_PORT`` then
            ``50051``.
        mode: Launch mode — ``"binary"``, ``"godot"``, or ``"auto"`` (default).
            Also reads ``$VHI_LAUNCH_MODE``. Explicit ``mode`` always wins.

    Returns: An ``InterfaceSpec`` with the resolved argv, ready to wire into
    ``process_launcher()``. If VHI isn't installed yet, ``launcher()`` raises
    a ``FileNotFoundError`` pointing at ``install_vhi``.
    """
    install_root = Path(
        vhi_path or os.environ.get("VHI_PATH") or _default_install_root()
    )
    mode = mode or os.environ.get("VHI_LAUNCH_MODE", "auto")
    if mode not in ("auto", "binary", "godot"):
        raise ValueError(
            f"mode must be 'auto', 'binary', or 'godot'; got {mode!r}"
        )
    grpc_host = grpc_host or os.environ.get("VHI_GRPC_HOST", "127.0.0.1")
    if grpc_port is None:
        grpc_port = int(os.environ.get("VHI_GRPC_PORT", "50051"))

    process = _resolve_vhi_launch(install_root, godot_bin, mode)

    return InterfaceSpec(
        name="VHI Hand",
        process=process,
        output_stream="MyoGestic_Output",
        output_channels=9,
        output_hz=32.0,
        control_stream="VHI_Control",
        control_channels=9,
        control_pose_stream="MyoGestic_ControlPose",
        control_pose_channels=9,
        control_pose_hz=32.0,
        grpc_host=grpc_host,
        grpc_port=grpc_port,
        install_root=install_root,
    )

InterfaceSpec dataclass

InterfaceSpec(name: str, process: list[str], output_stream: str, output_channels: int, output_hz: float, control_stream: str | None = None, control_channels: int | None = None, control_pose_stream: str | None = None, control_pose_channels: int | None = None, control_pose_hz: float | None = None, grpc_host: str = '127.0.0.1', grpc_port: int = 50051, install_root: Path | None = None)

Description of an external visual-feedback interface (e.g. VHI).

Parameters:

Name Type Description Default
name str

Human label, used as the process_launcher row title.

required
process list[str]

argv to spawn the interface (passed to subprocess.Popen). An empty list means "VHI not installed" — launcher() surfaces a friendly error pointing at install_vhi rather than letting Popen fail mysteriously.

required
output_stream str

LSL outlet name the interface listens on.

required
output_channels int

Number of channels in the output vector.

required
output_hz float

Outlet send rate.

required
control_stream str | None

LSL inlet name the interface publishes when the user drives it manually (used for regression targets). May be None.

None
control_channels int | None

Channel count of the control stream, if known.

None
control_pose_stream str | None

LSL outlet name for streaming a continuous pose TO the interface's control hand (opt-in; consumed only when VHI is in STREAM control mode). Opposite direction to control_stream.

None
control_pose_channels int | None

Channel count of the control-pose outlet.

None
control_pose_hz float | None

Send rate of the control-pose outlet.

None
grpc_host str

VHI gRPC control-server host.

'127.0.0.1'
grpc_port int

VHI gRPC control-server port.

50051
install_root Path | None

The directory we resolved process from. Carried so the "not installed" error can quote it.

None

outlet

outlet() -> LSLOutlet

Construct an LSLOutlet matching this interface's output stream.

Source code in myogestic/interfaces.py
def outlet(self) -> LSLOutlet:
    """Construct an LSLOutlet matching this interface's output stream."""
    return LSLOutlet(
        name=self.output_stream,
        n_channels=self.output_channels,
        hz=self.output_hz,
    )

control_client

control_client() -> VhiControlClient

Construct a gRPC control client for this interface.

Imported lazily so a plain install (no [grpc] extra) can still use outlet() / launcher() without grpcio present.

Source code in myogestic/interfaces.py
def control_client(self) -> VhiControlClient:
    """Construct a gRPC control client for this interface.

    Imported lazily so a plain install (no ``[grpc]`` extra) can still use
    ``outlet()`` / ``launcher()`` without grpcio present.
    """
    from myogestic._vhi_client import VhiControlClient

    return VhiControlClient(host=self.grpc_host, port=self.grpc_port)

control_outlet

control_outlet() -> LSLOutlet

Construct an LSLOutlet for streaming a continuous pose to the control hand. Opt-in: only consumed when VHI is put in STREAM control mode via control_client().set_control_mode("STREAM"). Raises if this interface has no control-pose stream configured.

Source code in myogestic/interfaces.py
def control_outlet(self) -> LSLOutlet:
    """Construct an LSLOutlet for streaming a continuous pose to the control
    hand. Opt-in: only consumed when VHI is put in STREAM control mode via
    ``control_client().set_control_mode("STREAM")``. Raises if this
    interface has no control-pose stream configured.
    """
    if self.control_pose_stream is None:
        raise ValueError(f"{self.name}: no control_pose_stream configured")
    return LSLOutlet(
        name=self.control_pose_stream,
        n_channels=self.control_pose_channels or self.output_channels,
        hz=self.control_pose_hz or self.output_hz,
    )

launcher

launcher() -> list[tuple[str, list[str]]]

Return the (name, argv) tuple list expected by process_launcher.

Raises FileNotFoundError with an install_vhi hint when VHI is not installed at the resolved location — better than a silent Popen failure on first run.

Source code in myogestic/interfaces.py
def launcher(self) -> list[tuple[str, list[str]]]:
    """Return the (name, argv) tuple list expected by `process_launcher`.

    Raises ``FileNotFoundError`` with an ``install_vhi`` hint when VHI
    is not installed at the resolved location — better than a silent
    ``Popen`` failure on first run.
    """
    if not self.process:
        location = f" at {self.install_root}" if self.install_root else ""
        raise FileNotFoundError(
            f"{self.name}: not installed{location}.\n"
            f"  Run `python -m myogestic.tools.install_vhi` to fetch the "
            f"latest release for this platform.\n"
            f"  Or set $VHI_PATH to an existing VHI Godot project and "
            f"$GODOT_BIN to a Godot 4.x binary for source-mode."
        )
    return [(self.name, list(self.process))]

Tools

control_outlet

control_outlet(name: str = DEFAULT_CONTROL_STREAM) -> StreamOutlet

LSL outlet for steering the EMG generator from another script.

The generator listens on a stream named name for a single float (channel = 1) that selects the next gesture amplitude — typically 0.0 (rest) … 1.0 (full). Push samples like::

from myogestic.tools.emg_generator import control_outlet
out = control_outlet()
out.push_sample(np.array([0.0], dtype=np.float32))  # rest
out.push_sample(np.array([1.0], dtype=np.float32))  # fist

Matches the protocol the --control flag on python -m myogestic.tools.emg_generator listens for.

Source code in myogestic/tools/emg_generator.py
def control_outlet(name: str = DEFAULT_CONTROL_STREAM) -> StreamOutlet:
    """LSL outlet for steering the EMG generator from another script.

    The generator listens on a stream named ``name`` for a single float
    (channel = 1) that selects the next gesture amplitude — typically
    ``0.0`` (rest) … ``1.0`` (full). Push samples like::

        from myogestic.tools.emg_generator import control_outlet
        out = control_outlet()
        out.push_sample(np.array([0.0], dtype=np.float32))  # rest
        out.push_sample(np.array([1.0], dtype=np.float32))  # fist

    Matches the protocol the ``--control`` flag on
    ``python -m myogestic.tools.emg_generator`` listens for.
    """
    return StreamOutlet(
        sinfo=StreamInfo(
            name=name,
            stype="Control",
            n_channels=1,
            sfreq=0,
            dtype="float32",
            source_id="ctrl",
        )
    )

myogestic.tools.install_vhi

Install the Virtual Hand Interface release binary for this platform.

VHI ships pre-built artifacts on every release at https://github.com/NsquaredLab/MyoGestic-VHI/releases. This CLI picks the right asset for the host OS/arch, downloads it, unpacks it into the location virtual_hand() looks at, and drops a vhi-version.txt marker so a later install knows what's already there.

Usage

python -m myogestic.tools.install_vhi # latest, default dest python -m myogestic.tools.install_vhi --tag v1.0.0 # pinned version python -m myogestic.tools.install_vhi --dest /custom/path python -m myogestic.tools.install_vhi --force # reinstall over existing

Or after pip install myogestic: myogestic-install-vhi

Pin --tag in production: latest is convenient for a fresh checkout but not reproducible — a downstream rebuild months later may pick up a different VHI version with subtly different behaviour.

main

main(argv: list[str] | None = None) -> int
Source code in myogestic/tools/install_vhi.py
def main(argv: list[str] | None = None) -> int:
    parser = argparse.ArgumentParser(
        prog="myogestic-install-vhi",
        description="Install a MyoGestic-VHI release binary for this platform.",
    )
    parser.add_argument(
        "--tag",
        default="latest",
        help="Release tag, e.g. 'v1.0.0' (default: 'latest'). Pin in "
        "production for reproducible installs.",
    )
    parser.add_argument(
        "--dest",
        type=Path,
        default=None,
        help="Install directory (default: matches `virtual_hand()`'s install "
        "root — repo `tools/MyoGestic-VHI` in a git checkout, else a per-user "
        "data dir).",
    )
    parser.add_argument(
        "--force",
        action="store_true",
        help="Reinstall over an existing VHI install. Refuses by default to "
        "avoid clobbering an unrelated directory.",
    )
    parser.add_argument(
        "--no-verify",
        action="store_true",
        help="Skip SHA-256 verification against the GitHub release digest. "
        "Default is to verify; disable only if you know what you're doing.",
    )
    args = parser.parse_args(argv)

    dest = args.dest or _default_dest()
    asset = _resolve_asset()
    url = _download_url(args.tag, asset)

    print(f"Installing VHI {args.tag}{dest}")

    if dest.exists() and any(dest.iterdir()):
        looks_like_previous_install = any(
            (dest / m).exists() for m in INSTALL_MARKERS
        )
        if not args.force:
            hint = (
                "Use --force to reinstall."
                if looks_like_previous_install
                else "It does not look like a previous VHI install; refusing "
                "to clobber. Use --force or pass --dest to a fresh directory."
            )
            print(f"{dest} is non-empty. {hint}", file=sys.stderr)
            return 1
        if not looks_like_previous_install:
            print(
                f"WARNING: --force on a directory that does not look like a "
                f"previous VHI install ({dest}).",
                file=sys.stderr,
            )

    # Fetch the expected digest before download so a missing-tag-on-API error
    # surfaces immediately, not after a 150 MB download.
    expected_digest = None if args.no_verify else _fetch_release_digest(args.tag, asset)

    # Atomic install: stage in a temp dir, validate, then swap with dest.
    # A failed download or malformed archive never leaves a half-installed
    # dest behind. Checksum verification happens before unpack — a tampered
    # archive never reaches the file extraction step.
    with tempfile.TemporaryDirectory(prefix="myogestic-vhi-") as tmp:
        tmp_path = Path(tmp)
        archive = tmp_path / asset
        _download(url, archive)
        if not args.no_verify:
            _verify(archive, expected_digest)
        staging = tmp_path / "staging"
        _unpack(archive, staging)
        _validate(staging)
        _restore_exec_bits(staging)
        _write_marker(staging, args.tag, asset)

        if dest.exists():
            shutil.rmtree(dest)
        dest.parent.mkdir(parents=True, exist_ok=True)
        # `shutil.move` falls back to copy+rmtree across filesystems where
        # plain rename() fails with EXDEV — temp dirs are often on a
        # different mount than the install target.
        shutil.move(str(staging), str(dest))

    _strip_quarantine(dest)
    print(f"✓ VHI {args.tag} installed at {dest}")
    _macos_gatekeeper_note(dest)
    return 0