Skip to content

Session

open_session_store

open_session_store(path: str | Path) -> Session

Open a saved session folder or .session.zip as a read-only Session.

Folder sessions keep the existing layout and use zarr.open_array(str(path / "<stream>.zarr"), mode="r"). Zip sessions use a zarr.storage.ZipStore and read the same array paths inside the archive.

Source code in myogestic/session/_io.py
def open_session_store(path: str | Path) -> Session:
    """Open a saved session folder or .session.zip as a read-only Session.

    Folder sessions keep the existing layout and use
    `zarr.open_array(str(path / "<stream>.zarr"), mode="r")`. Zip sessions use
    a `zarr.storage.ZipStore` and read the same array paths inside the archive.
    """
    p = Path(path)
    zip_store = None
    if p.is_dir():
        meta = json.loads((p / "meta.json").read_text())
        labels_file = p / "labels.json"
        labels = json.loads(labels_file.read_text()) if labels_file.exists() else []

        def open_array(name: str) -> zarr.Array:
            return zarr.open_array(str(p / name), mode="r")

    elif p.name.endswith(".session.zip"):
        zip_store = zarr.storage.ZipStore(p, mode="r")
        with zipfile.ZipFile(p) as zf:
            names = set(zf.namelist())
            meta = json.loads(zf.read("meta.json"))
            labels = json.loads(zf.read("labels.json")) if "labels.json" in names else []

        def open_array(name: str) -> zarr.Array:
            return zarr.open_array(store=zip_store, path=name, mode="r")

    else:
        raise ValueError(f"Unsupported session path: {p}")

    session = Session.__new__(Session)
    session.path = p
    session.stores = {}
    session.ts_stores = {}
    session.label_track = [
        LabelEvent(
            timestamp=float(label.get("timestamp", 0.0)),
            class_index=int(label.get("class_index", -1)),
        )
        for label in labels
        if isinstance(label, dict)
    ]
    session._streams_info = {}
    session.class_names = list(meta.get("class_names") or [])
    # Always set (None for folder sessions): keeps Session.close() safe even
    # though open_session_store bypasses __init__ via Session.__new__.
    session._zip_store = zip_store

    for name, info in meta.get("streams", {}).items():
        data = open_array(f"{name}.zarr")
        session.stores[name] = data
        session.ts_stores[name] = open_array(f"{name}_timestamps.zarr")
        session._streams_info[name] = StreamInfo(
            n_channels=int(info.get("n_channels", data.shape[1] if data.ndim > 1 else 1)),
            fs=float(info.get("fs", 0.0)),
            dtype=np.dtype(info.get("dtype", data.dtype)),
            channel_names=info.get("channel_names"),
        )
    return session

iter_labeled_windows

iter_labeled_windows(paths: list[str] | list[Path], stream_name: str, window_ms: float, hop_ms: float, classes: set[int] | None = None) -> Iterator[tuple[ndarray, ndarray, int]]

Yield (window, ts, class_index) triples from labeled segments.

window is channels-first (n_channels, n_samples) — the library's standard signal layout. ts is the matching 1-D timestamp array. Walks each session's label track, finds the time interval each label covers (this label's timestamp to next label's timestamp), and chops that interval into fixed-size windows. Works for folders and .session.zip sessions.

Source code in myogestic/session/_windows.py
def iter_labeled_windows(
    paths: list[str] | list[Path],
    stream_name: str,
    window_ms: float,
    hop_ms: float,
    classes: set[int] | None = None,
) -> Iterator[tuple[np.ndarray, np.ndarray, int]]:
    """Yield ``(window, ts, class_index)`` triples from labeled segments.

    ``window`` is channels-first ``(n_channels, n_samples)`` — the
    library's standard signal layout. ``ts`` is the matching 1-D
    timestamp array. Walks each session's label track, finds the time
    interval each label covers (this label's timestamp to next label's
    timestamp), and chops that interval into fixed-size windows. Works
    for folders and ``.session.zip`` sessions.
    """
    if window_ms <= 0:
        raise ValueError(f"window_ms must be > 0 (got {window_ms})")
    if hop_ms <= 0:
        raise ValueError(f"hop_ms must be > 0 (got {hop_ms})")

    for path in paths:
        try:
            sess = open_session_store(path)
        except Exception as e:
            log.warning("skipping %s: %s", path, e)
            continue
        # finally: close the session each iteration so its zarr/ZipStore handles
        # are released — on Windows an open ZipStore locks the .session.zip, so a
        # leaked handle blocks deleting/moving the file after training.
        try:
            if stream_name not in sess.stores:
                log.info("skipping %s: stream %r not present", path, stream_name)
                continue
            info = sess.stream_info(stream_name)
            fs = info.fs
            if fs <= 0:
                log.warning("skipping %s: bad fs=%s for stream %r", path, fs, stream_name)
                continue
            win_samples = int(window_ms / 1000 * fs)
            hop_samples = max(1, int(hop_ms / 1000 * fs))
            if win_samples < 1:
                continue

            data = np.array(sess.stores[stream_name]).astype(np.float32, copy=False)
            ts = np.array(sess.ts_stores[stream_name])
            events = sess.label_track
            if len(data) == 0 or not events:
                log.info("skipping %s: empty data or no labels", path)
                continue

            for i, event in enumerate(events):
                if event.class_index < 0:
                    continue
                if classes is not None and event.class_index not in classes:
                    continue
                idx_start = int(np.argmin(np.abs(ts - event.timestamp)))
                idx_end = (
                    int(np.argmin(np.abs(ts - events[i + 1].timestamp)))
                    if i + 1 < len(events)
                    else len(ts)
                )
                if idx_end - idx_start < win_samples:
                    continue
                for start in range(idx_start, idx_end - win_samples + 1, hop_samples):
                    stop = start + win_samples
                    yield data[start:stop].T, ts[start:stop], event.class_index
        finally:
            sess.close()

iter_aligned_windows

iter_aligned_windows(paths: list[str] | list[Path], primary_stream_name: str, aligned_stream_names: list[str], window_ms: float, hop_ms: float, n_alignment_samples: int = 1) -> Iterator[tuple[ndarray, dict[str, ndarray], ndarray]]

Yield (primary_window, aligned, ts) for regression training.

primary_window is channels-first (n_channels, n_samples). For each primary window, find the nearest sample in every aligned stream at the window midpoint and average n_alignment_samples around that index.

Source code in myogestic/session/_windows.py
def iter_aligned_windows(
    paths: list[str] | list[Path],
    primary_stream_name: str,
    aligned_stream_names: list[str],
    window_ms: float,
    hop_ms: float,
    n_alignment_samples: int = 1,
) -> Iterator[tuple[np.ndarray, dict[str, np.ndarray], np.ndarray]]:
    """Yield ``(primary_window, aligned, ts)`` for regression training.

    ``primary_window`` is channels-first ``(n_channels, n_samples)``.
    For each primary window, find the nearest sample in every aligned
    stream at the window midpoint and average ``n_alignment_samples``
    around that index.
    """
    if window_ms <= 0:
        raise ValueError(f"window_ms must be > 0 (got {window_ms})")
    if hop_ms <= 0:
        raise ValueError(f"hop_ms must be > 0 (got {hop_ms})")
    if n_alignment_samples < 1:
        raise ValueError(f"n_alignment_samples must be >= 1 (got {n_alignment_samples})")

    n_left = n_alignment_samples // 2
    n_right = n_alignment_samples - n_left

    for path in paths:
        try:
            sess = open_session_store(path)
        except Exception as e:
            log.warning("skipping %s: %s", path, e)
            continue
        # finally: release the session's handles each iteration (see
        # iter_labeled_windows) — a leaked ZipStore locks the .session.zip on
        # Windows.
        try:
            if primary_stream_name not in sess.stores:
                log.info("skipping %s: primary stream %r missing", path, primary_stream_name)
                continue
            missing = [s for s in aligned_stream_names if s not in sess.stores]
            if missing:
                log.info("skipping %s: aligned streams missing: %s", path, missing)
                continue

            info = sess.stream_info(primary_stream_name)
            fs = info.fs
            if fs <= 0:
                log.warning("skipping %s: bad fs=%s on %r", path, fs, primary_stream_name)
                continue
            win_samples = int(window_ms / 1000 * fs)
            hop_samples = max(1, int(hop_ms / 1000 * fs))
            if win_samples < 1:
                continue

            primary_data = np.array(sess.stores[primary_stream_name]).astype(np.float32, copy=False)
            primary_ts = np.array(sess.ts_stores[primary_stream_name])
            aligned_data = {
                name: np.array(sess.stores[name]).astype(np.float32, copy=False)
                for name in aligned_stream_names
            }
            aligned_ts = {name: np.array(sess.ts_stores[name]) for name in aligned_stream_names}

            if (
                len(primary_data) == 0
                or len(primary_ts) == 0
                or any(len(t) == 0 for t in aligned_ts.values())
            ):
                log.info("skipping %s: empty stream data", path)
                continue

            n = len(primary_data)
            if n < win_samples:
                continue

            for start in range(0, n - win_samples + 1, hop_samples):
                stop = start + win_samples
                mid_t = primary_ts[start + win_samples // 2]
                aligned_vals: dict[str, np.ndarray] = {}
                ok = True
                for name in aligned_stream_names:
                    a_ts = aligned_ts[name]
                    a_data = aligned_data[name]
                    idx = int(np.argmin(np.abs(a_ts - mid_t)))
                    lo = max(0, idx - n_left)
                    hi = min(len(a_data), idx + n_right)
                    if hi <= lo:
                        ok = False
                        break
                    aligned_vals[name] = np.mean(a_data[lo:hi], axis=0)
                if not ok:
                    continue
                yield primary_data[start:stop].T, aligned_vals, primary_ts[start:stop]
        finally:
            sess.close()

Session

Session(base_path: str = 'sessions')

One recording session on disk: per-stream Zarr arrays + a label track.

Created when the user clicks Record, finalised when they click Stop. While active, every acquisition thread that has its stream registered appends to the session's Zarr stores; UI label clicks emit :class:LabelEvent entries onto the label track. Closing the session writes meta.json and labels.json alongside the Zarr folders, and optionally packs the whole tree into a portable .session.zip.

Layout on disk (one folder per recording, named with the start timestamp)::

sessions/2026-05-17_14-23-05/
    emg.zarr/                  # shape (n_samples, n_channels)
    emg_timestamps.zarr/       # shape (n_samples,) float64
    vhi_control.zarr/          # any additional stream
    vhi_control_timestamps.zarr/
    meta.json                  # streams_info, app_name, class_names
    labels.json                # the LabelEvent list

Read sessions back with :func:~myogestic.session.open_session_store, which transparently handles both folders and .session.zip archives.

Parameters:

Name Type Description Default
base_path str

Parent directory; the session creates a timestamp-named subdirectory inside. Default "sessions" (created if missing).

'sessions'
Source code in myogestic/session/_core.py
def __init__(self, base_path: str = "sessions"):
    ts = time.strftime("%Y-%m-%d_%H-%M-%S")
    # Append a short random suffix so two sessions started within the same
    # wall-clock second (e.g. Stop then immediately Record) never share a
    # folder. pack_to_zip() runs on a daemon thread and shutil.rmtree()s its
    # own folder; a shared name would let the old session's pack thread wipe
    # the new recording's data (and collide on the <name>.session.zip path).
    self.path = Path(base_path) / f"{ts}_{uuid.uuid4().hex[:8]}"
    self.path.mkdir(parents=True, exist_ok=True)
    self.stores: dict[str, zarr.Array] = {}
    self.ts_stores: dict[str, zarr.Array] = {}
    self.label_track: list[LabelEvent] = []
    self.class_names: list[str] = []  # populated by save_meta / open_session_store
    self._streams_info: dict[str, StreamInfo] = {}
    # Anchors a ZipStore's lifetime when opened from a .session.zip (see
    # open_session_store): zarr reads arrays lazily from the archive, so the
    # store handle must outlive this Session. Assigned there, never read.
    self._zip_store: zarr.storage.ZipStore | None = None

init_stream

init_stream(stream_name: str, info: StreamInfo) -> None

Called once per stream when recording starts.

Source code in myogestic/session/_core.py
def init_stream(self, stream_name: str, info: StreamInfo) -> None:
    """Called once per stream when recording starts."""
    self._streams_info[stream_name] = info
    self.stores[stream_name] = zarr.open_array(
        str(self.path / f"{stream_name}.zarr"),
        mode="w",
        shape=(0, info.n_channels),
        chunks=(int(info.fs), info.n_channels),
        dtype=info.dtype,
    )
    self.ts_stores[stream_name] = zarr.open_array(
        str(self.path / f"{stream_name}_timestamps.zarr"),
        mode="w",
        shape=(0,),
        chunks=(int(info.fs),),
        dtype=np.float64,
    )

append

append(stream_name: str, data: ndarray, timestamps: ndarray) -> None

Called from acquire loop when recording. data: (n_samples, n_channels).

Source code in myogestic/session/_core.py
def append(self, stream_name: str, data: np.ndarray, timestamps: np.ndarray) -> None:
    """Called from acquire loop when recording. data: (n_samples, n_channels)."""
    # Defense-in-depth: Stream.detach_session() (under its lock) is what
    # actually prevents an append from racing pack_to_zip()'s clear(); this
    # guard keeps a stray/late append from raising KeyError if the stores
    # were already finalised, rather than crashing the caller's thread.
    store = self.stores.get(stream_name)
    ts_store = self.ts_stores.get(stream_name)
    if store is None or ts_store is None:
        # Should not happen now that Stream.detach_session() drains
        # in-flight appends before pack_to_zip() clears the stores; log
        # it so a future regression that drops samples is observable
        # rather than silent.
        log.debug("dropping late append for finalised stream %r", stream_name)
        return
    store.append(data)
    ts_store.append(timestamps)

add_label

add_label(class_index: int, timestamp: float | None = None) -> None

Append a label event to the session's label track.

Parameters:

Name Type Description Default
class_index int

Class index for the event; -1 marks a rest / no-class boundary.

required
timestamp float | None

Event time in seconds (mne_lsl clock). Defaults to the current local_clock() when omitted.

None
Source code in myogestic/session/_core.py
def add_label(self, class_index: int, timestamp: float | None = None) -> None:
    """Append a label event to the session's label track.

    Parameters
    ----------
    class_index
        Class index for the event; ``-1`` marks a rest / no-class boundary.
    timestamp
        Event time in seconds (``mne_lsl`` clock). Defaults to the current
        ``local_clock()`` when omitted.
    """
    from mne_lsl.lsl import local_clock

    ts = timestamp if timestamp is not None else local_clock()
    self.label_track.append(LabelEvent(timestamp=ts, class_index=class_index))

save_meta

save_meta(app_name: str, class_names: list[str] | None = None) -> None

Write meta.json + labels.json to the session folder.

Parameters:

Name Type Description Default
app_name str

Identifier for the producing app.

required
class_names list[str] | None

Optional human-readable names for label class indices. Persisting them makes old sessions self-describing: readers can render labels without an external lookup.

None
Source code in myogestic/session/_core.py
def save_meta(self, app_name: str, class_names: list[str] | None = None) -> None:
    """Write meta.json + labels.json to the session folder.

    Parameters
    ----------
    app_name
        Identifier for the producing app.
    class_names
        Optional human-readable names for label class indices.
        Persisting them makes old sessions self-describing: readers can
        render labels without an external lookup.
    """
    meta: dict[str, object] = {
        "app_name": app_name,
        "created": time.strftime("%Y-%m-%dT%H:%M:%S"),
        "streams": {
            name: {
                "n_channels": info.n_channels,
                "fs": info.fs,
                "dtype": str(info.dtype),
            }
            for name, info in self._streams_info.items()
        },
    }
    if class_names is not None:
        meta["class_names"] = list(class_names)
    (self.path / "meta.json").write_text(json.dumps(meta, indent=2))
    labels = [
        {"timestamp": e.timestamp, "class_index": e.class_index} for e in self.label_track
    ]
    (self.path / "labels.json").write_text(json.dumps(labels, indent=2))

pack_to_zip

pack_to_zip() -> Path

Pack the session folder into a single <name>.session.zip file.

Uses ZIP_STORED (no compression). Zarr chunks are already compressed internally; an outer compression layer would add CPU for little gain.

Source code in myogestic/session/_core.py
def pack_to_zip(self) -> Path:
    """Pack the session folder into a single `<name>.session.zip` file.

    Uses ZIP_STORED (no compression). Zarr chunks are already compressed
    internally; an outer compression layer would add CPU for little gain.
    """
    # zarr v3 arrays expose no explicit close, so drop refs and force a GC
    # to release the underlying chunk-file handles. On POSIX this is belt-
    # and-braces (open files can be unlinked anyway); on Windows it is
    # required - the later rmtree/replace fail with WinError 32 if any
    # handle into the folder is still open.
    self.stores.clear()
    self.ts_stores.clear()
    gc.collect()

    zip_path = self.path.with_name(self.path.name + ".session.zip")
    tmp_path = zip_path.with_suffix(zip_path.suffix + ".tmp")
    if tmp_path.exists():
        tmp_path.unlink()

    with zipfile.ZipFile(tmp_path, "w", compression=zipfile.ZIP_STORED) as zf:
        for f in self.path.rglob("*"):
            if f.is_file():
                zf.write(f, arcname=str(f.relative_to(self.path)))

    try:
        with zipfile.ZipFile(tmp_path) as zf:
            names = zf.namelist()
            if "meta.json" not in names:
                raise RuntimeError("meta.json missing in packed zip")
        store = zarr.storage.ZipStore(tmp_path, mode="r")
        try:
            for name in self._streams_info:
                zarr.open_array(store=store, path=f"{name}.zarr", mode="r")
        finally:
            store.close()
    except Exception:
        tmp_path.unlink(missing_ok=True)
        raise

    # os.replace (not Path.rename): atomic overwrite on both OSes - Windows
    # rename raises if the destination already exists.
    os.replace(tmp_path, zip_path)
    _robust_rmtree(self.path)
    self.path = zip_path
    return zip_path

get_trials

get_trials(stream_name: str, pre_s: float = 0.0, post_s: float = 0.0, class_names: list[str] | None = None) -> list[Recording]

Extract discrete labeled windows for classification training.

Source code in myogestic/session/_core.py
def get_trials(
    self,
    stream_name: str,
    pre_s: float = 0.0,
    post_s: float = 0.0,
    class_names: list[str] | None = None,
) -> list[Recording]:
    """Extract discrete labeled windows for classification training."""
    data = np.array(self.stores[stream_name])
    ts = np.array(self.ts_stores[stream_name])
    if len(ts) == 0 or len(self.label_track) == 0:
        return []

    trials = []
    for i, event in enumerate(self.label_track):
        if event.class_index == -1:
            continue

        idx_start = int(np.argmin(np.abs(ts - (event.timestamp - pre_s))))
        if i + 1 < len(self.label_track):
            idx_end = int(np.argmin(np.abs(ts - self.label_track[i + 1].timestamp)))
        elif post_s > 0:
            idx_end = int(np.argmin(np.abs(ts - (event.timestamp + post_s))))
        else:
            idx_end = len(ts)

        if idx_end <= idx_start:
            continue

        name = class_names[event.class_index] if class_names else str(event.class_index)
        trials.append(
            Recording(
                class_index=event.class_index,
                class_name=name,
                data=data[idx_start:idx_end],
                timestamps=ts[idx_start:idx_end],
            )
        )
    return trials

get_continuous

get_continuous(stream_name: str) -> tuple[ndarray, ndarray]

Return full stream data + timestamps for regression training.

Source code in myogestic/session/_core.py
def get_continuous(self, stream_name: str) -> tuple[np.ndarray, np.ndarray]:
    """Return full stream data + timestamps for regression training."""
    return np.array(self.stores[stream_name]), np.array(self.ts_stores[stream_name])

stream_info

stream_info(stream_name: str) -> StreamInfo

Public accessor for a stream's StreamInfo.

Source code in myogestic/session/_core.py
def stream_info(self, stream_name: str) -> StreamInfo:
    """Public accessor for a stream's StreamInfo."""
    return self._streams_info[stream_name]

close

close() -> None

Release file handles held by this session.

Closes the ZipStore opened by :func:open_session_store for a .session.zip and drops the array references. Safe to call more than once. On Windows an open ZipStore keeps the archive locked, so close the session before moving or deleting the .session.zip. Use as a context manager (with open_session_store(p) as s: ...) to do this automatically.

Source code in myogestic/session/_core.py
def close(self) -> None:
    """Release file handles held by this session.

    Closes the ``ZipStore`` opened by :func:`open_session_store` for a
    ``.session.zip`` and drops the array references. Safe to call more than
    once. On Windows an open ``ZipStore`` keeps the archive **locked**, so
    close the session before moving or deleting the ``.session.zip``. Use as
    a context manager (``with open_session_store(p) as s: ...``) to do this
    automatically.
    """
    store = getattr(self, "_zip_store", None)
    if store is not None:
        try:
            store.close()
        except Exception:  # noqa: BLE001 - best-effort release on teardown
            log.debug("ZipStore close failed for %s", self.path, exc_info=True)
        self._zip_store = None
    self.stores.clear()
    self.ts_stores.clear()

LabelEvent dataclass

LabelEvent(timestamp: float, class_index: int)

One entry in a session's label track: "at LSL time T, the user picked class N".

Recorded whenever the user clicks a class button in :func:~myogestic.widgets.panels.recording.recording_controls. The label track is a chronological list of these events; the recording-window iterators (:func:~myogestic.session.iter_labeled_windows, :func:~myogestic.session.iter_aligned_windows) walk the track to decide which sample range gets which class index.

Attributes:

Name Type Description
timestamp float

LSL clock time (seconds) when the label was emitted. Use mne_lsl.lsl.local_clock() if you ever need to mint one by hand.

class_index int

Index into the session's class_names list. -1 is the unlabeled sentinel (the iterators skip it).

Recording dataclass

Recording(class_index: int, class_name: str, data: ndarray, timestamps: ndarray)

A single labeled trial, extracted from a Session.