Skip to content

Session

open_session_store

open_session_store(path: str | Path) -> Session

Open a saved session folder or .session.zip as a read-only Session.

Folder sessions keep the existing layout and use zarr.open_array(str(path / "<stream>.zarr"), mode="r"). Zip sessions use a zarr.storage.ZipStore and read the same array paths inside the archive.

Source code in myogestic/_session_io.py
def open_session_store(path: str | Path) -> Session:
    """Open a saved session folder or .session.zip as a read-only Session.

    Folder sessions keep the existing layout and use
    `zarr.open_array(str(path / "<stream>.zarr"), mode="r")`. Zip sessions use
    a `zarr.storage.ZipStore` and read the same array paths inside the archive.
    """
    p = Path(path)
    zip_store = None
    if p.is_dir():
        meta = json.loads((p / "meta.json").read_text())
        labels_file = p / "labels.json"
        labels = json.loads(labels_file.read_text()) if labels_file.exists() else []

        def open_array(name: str) -> zarr.Array:
            return zarr.open_array(str(p / name), mode="r")

    elif p.name.endswith(".session.zip"):
        zip_store = zarr.storage.ZipStore(p, mode="r")
        with zipfile.ZipFile(p) as zf:
            names = set(zf.namelist())
            meta = json.loads(zf.read("meta.json"))
            labels = json.loads(zf.read("labels.json")) if "labels.json" in names else []

        def open_array(name: str) -> zarr.Array:
            return zarr.open_array(store=zip_store, path=name, mode="r")

    else:
        raise ValueError(f"Unsupported session path: {p}")

    session = Session.__new__(Session)
    session.path = p
    session.stores = {}
    session.ts_stores = {}
    session.label_track = [
        LabelEvent(
            timestamp=float(label.get("timestamp", 0.0)),
            class_index=int(label.get("class_index", -1)),
        )
        for label in labels
        if isinstance(label, dict)
    ]
    session._streams_info = {}
    session.class_names = list(meta.get("class_names") or [])
    if zip_store is not None:
        session._zip_store = zip_store

    for name, info in meta.get("streams", {}).items():
        data = open_array(f"{name}.zarr")
        session.stores[name] = data
        session.ts_stores[name] = open_array(f"{name}_timestamps.zarr")
        session._streams_info[name] = StreamInfo(
            n_channels=int(info.get("n_channels", data.shape[1] if data.ndim > 1 else 1)),
            fs=float(info.get("fs", 0.0)),
            dtype=np.dtype(info.get("dtype", data.dtype)),
            channel_names=info.get("channel_names"),
        )
    return session

iter_labeled_windows

iter_labeled_windows(paths: list[str] | list[Path], stream_name: str, win_seconds: float, hop_seconds: float, classes: set[int] | None = None) -> Iterator[tuple[ndarray, ndarray, int]]

Yield (window, ts, class_index) triples from labeled segments.

window is channels-first (n_channels, n_samples) — the library's standard signal layout. ts is the matching 1-D timestamp array. Walks each session's label track, finds the time interval each label covers (this label's timestamp to next label's timestamp), and chops that interval into fixed-size windows. Works for folders and .session.zip sessions.

Source code in myogestic/_session_windows.py
def iter_labeled_windows(
    paths: list[str] | list[Path],
    stream_name: str,
    win_seconds: float,
    hop_seconds: float,
    classes: set[int] | None = None,
) -> Iterator[tuple[np.ndarray, np.ndarray, int]]:
    """Yield ``(window, ts, class_index)`` triples from labeled segments.

    ``window`` is channels-first ``(n_channels, n_samples)`` — the
    library's standard signal layout. ``ts`` is the matching 1-D
    timestamp array. Walks each session's label track, finds the time
    interval each label covers (this label's timestamp to next label's
    timestamp), and chops that interval into fixed-size windows. Works
    for folders and ``.session.zip`` sessions.
    """
    if win_seconds <= 0:
        raise ValueError(f"win_seconds must be > 0 (got {win_seconds})")
    if hop_seconds <= 0:
        raise ValueError(f"hop_seconds must be > 0 (got {hop_seconds})")

    for path in paths:
        try:
            sess = open_session_store(path)
        except Exception as e:
            log.warning("skipping %s: %s", path, e)
            continue
        if stream_name not in sess.stores:
            log.info("skipping %s: stream %r not present", path, stream_name)
            continue
        info = sess.stream_info(stream_name)
        fs = info.fs
        if fs <= 0:
            log.warning("skipping %s: bad fs=%s for stream %r", path, fs, stream_name)
            continue
        win_samples = int(win_seconds * fs)
        hop_samples = max(1, int(hop_seconds * fs))
        if win_samples < 1:
            continue

        data = np.array(sess.stores[stream_name])
        ts = np.array(sess.ts_stores[stream_name])
        events = sess.label_track
        if len(data) == 0 or not events:
            log.info("skipping %s: empty data or no labels", path)
            continue

        for i, event in enumerate(events):
            if event.class_index < 0:
                continue
            if classes is not None and event.class_index not in classes:
                continue
            idx_start = int(np.argmin(np.abs(ts - event.timestamp)))
            idx_end = (
                int(np.argmin(np.abs(ts - events[i + 1].timestamp)))
                if i + 1 < len(events)
                else len(ts)
            )
            if idx_end - idx_start < win_samples:
                continue
            for start in range(idx_start, idx_end - win_samples + 1, hop_samples):
                stop = start + win_samples
                yield data[start:stop].T, ts[start:stop], event.class_index

iter_aligned_windows

iter_aligned_windows(paths: list[str] | list[Path], primary_stream: str, aligned_streams: list[str], win_seconds: float, hop_seconds: float, align_window_samples: int = 1) -> Iterator[tuple[ndarray, dict[str, ndarray], ndarray]]

Yield (primary_window, aligned, ts) for regression training.

primary_window is channels-first (n_channels, n_samples). For each primary window, find the nearest sample in every aligned stream at the window midpoint and average align_window_samples around that index.

Source code in myogestic/_session_windows.py
def iter_aligned_windows(
    paths: list[str] | list[Path],
    primary_stream: str,
    aligned_streams: list[str],
    win_seconds: float,
    hop_seconds: float,
    align_window_samples: int = 1,
) -> Iterator[tuple[np.ndarray, dict[str, np.ndarray], np.ndarray]]:
    """Yield ``(primary_window, aligned, ts)`` for regression training.

    ``primary_window`` is channels-first ``(n_channels, n_samples)``.
    For each primary window, find the nearest sample in every aligned
    stream at the window midpoint and average ``align_window_samples``
    around that index.
    """
    if win_seconds <= 0:
        raise ValueError(f"win_seconds must be > 0 (got {win_seconds})")
    if hop_seconds <= 0:
        raise ValueError(f"hop_seconds must be > 0 (got {hop_seconds})")
    if align_window_samples < 1:
        raise ValueError(
            f"align_window_samples must be >= 1 (got {align_window_samples})"
        )

    n_left = align_window_samples // 2
    n_right = align_window_samples - n_left

    for path in paths:
        try:
            sess = open_session_store(path)
        except Exception as e:
            log.warning("skipping %s: %s", path, e)
            continue
        if primary_stream not in sess.stores:
            log.info("skipping %s: primary stream %r missing", path, primary_stream)
            continue
        missing = [s for s in aligned_streams if s not in sess.stores]
        if missing:
            log.info("skipping %s: aligned streams missing: %s", path, missing)
            continue

        info = sess.stream_info(primary_stream)
        fs = info.fs
        if fs <= 0:
            log.warning("skipping %s: bad fs=%s on %r", path, fs, primary_stream)
            continue
        win_samples = int(win_seconds * fs)
        hop_samples = max(1, int(hop_seconds * fs))
        if win_samples < 1:
            continue

        primary_data = np.array(sess.stores[primary_stream])
        primary_ts = np.array(sess.ts_stores[primary_stream])
        aligned_data = {name: np.array(sess.stores[name]) for name in aligned_streams}
        aligned_ts = {name: np.array(sess.ts_stores[name]) for name in aligned_streams}

        if (
            len(primary_data) == 0
            or len(primary_ts) == 0
            or any(len(t) == 0 for t in aligned_ts.values())
        ):
            log.info("skipping %s: empty stream data", path)
            continue

        n = len(primary_data)
        if n < win_samples:
            continue

        for start in range(0, n - win_samples + 1, hop_samples):
            stop = start + win_samples
            mid_t = primary_ts[start + win_samples // 2]
            aligned_vals: dict[str, np.ndarray] = {}
            ok = True
            for name in aligned_streams:
                a_ts = aligned_ts[name]
                a_data = aligned_data[name]
                idx = int(np.argmin(np.abs(a_ts - mid_t)))
                lo = max(0, idx - n_left)
                hi = min(len(a_data), idx + n_right)
                if hi <= lo:
                    ok = False
                    break
                aligned_vals[name] = np.mean(a_data[lo:hi], axis=0)
            if not ok:
                continue
            yield primary_data[start:stop].T, aligned_vals, primary_ts[start:stop]

Session

Session(base_path: str = 'sessions')

One recording session on disk: per-stream Zarr arrays + a label track.

Created when the user clicks Record, finalised when they click Stop. While active, every acquisition thread that has its stream registered appends to the session's Zarr stores; UI label clicks emit :class:LabelEvent entries onto the label track. Closing the session writes meta.json and labels.json alongside the Zarr folders, and optionally packs the whole tree into a portable .session.zip.

Layout on disk (one folder per recording, named with the start timestamp)::

sessions/2026-05-17_14-23-05/
    emg.zarr/                  # shape (n_samples, n_channels)
    emg_timestamps.zarr/       # shape (n_samples,) float64
    vhi_control.zarr/          # any additional stream
    vhi_control_timestamps.zarr/
    meta.json                  # streams_info, app_name, class_names
    labels.json                # the LabelEvent list

Read sessions back with :func:~myogestic.session.open_session_store, which transparently handles both folders and .session.zip archives.

Parameters:

Name Type Description Default
base_path str

Parent directory; the session creates a timestamp-named subdirectory inside. Default "sessions" (created if missing).

'sessions'
Source code in myogestic/_session_core.py
def __init__(self, base_path: str = "sessions"):
    ts = time.strftime("%Y-%m-%d_%H-%M-%S")
    self.path = Path(base_path) / ts
    self.path.mkdir(parents=True, exist_ok=True)
    self.stores: dict[str, zarr.Array] = {}
    self.ts_stores: dict[str, zarr.Array] = {}
    self.label_track: list[LabelEvent] = []
    self.class_names: list[str] = []  # populated by save_meta / open_session_store
    self._streams_info: dict[str, StreamInfo] = {}

init_stream

init_stream(name: str, info: StreamInfo) -> None

Called once per stream when recording starts.

Source code in myogestic/_session_core.py
def init_stream(self, name: str, info: StreamInfo) -> None:
    """Called once per stream when recording starts."""
    self._streams_info[name] = info
    self.stores[name] = zarr.open_array(
        str(self.path / f"{name}.zarr"),
        mode="w",
        shape=(0, info.n_channels),
        chunks=(int(info.fs), info.n_channels),
        dtype=info.dtype,
    )
    self.ts_stores[name] = zarr.open_array(
        str(self.path / f"{name}_timestamps.zarr"),
        mode="w",
        shape=(0,),
        chunks=(int(info.fs),),
        dtype=np.float64,
    )

append

append(name: str, data: ndarray, timestamps: ndarray) -> None

Called from acquire loop when recording. data: (n_samples, n_channels).

Source code in myogestic/_session_core.py
def append(self, name: str, data: np.ndarray, timestamps: np.ndarray) -> None:
    """Called from acquire loop when recording. data: (n_samples, n_channels)."""
    self.stores[name].append(data)
    self.ts_stores[name].append(timestamps)

save_meta

save_meta(app_name: str, class_names: list[str] | None = None) -> None

Write meta.json + labels.json to the session folder.

Parameters:

Name Type Description Default
app_name str

Identifier for the producing app.

required
class_names list[str] | None

Optional human-readable names for label class indices. Persisting them makes old sessions self-describing: readers can render labels without an external lookup.

None
Source code in myogestic/_session_core.py
def save_meta(self, app_name: str, class_names: list[str] | None = None) -> None:
    """Write meta.json + labels.json to the session folder.

    Args:
        app_name: Identifier for the producing app.
        class_names: Optional human-readable names for label class indices.
            Persisting them makes old sessions self-describing: readers can
            render labels without an external lookup.
    """
    meta: dict[str, object] = {
        "app_name": app_name,
        "created": time.strftime("%Y-%m-%dT%H:%M:%S"),
        "streams": {
            name: {
                "n_channels": info.n_channels,
                "fs": info.fs,
                "dtype": str(info.dtype),
            }
            for name, info in self._streams_info.items()
        },
    }
    if class_names is not None:
        meta["class_names"] = list(class_names)
    (self.path / "meta.json").write_text(json.dumps(meta, indent=2))
    labels = [
        {"timestamp": e.timestamp, "class_index": e.class_index}
        for e in self.label_track
    ]
    (self.path / "labels.json").write_text(json.dumps(labels, indent=2))

pack_to_zip

pack_to_zip() -> Path

Pack the session folder into a single <name>.session.zip file.

Uses ZIP_STORED (no compression). Zarr chunks are already compressed internally; an outer compression layer would add CPU for little gain.

Source code in myogestic/_session_core.py
def pack_to_zip(self) -> Path:
    """Pack the session folder into a single `<name>.session.zip` file.

    Uses ZIP_STORED (no compression). Zarr chunks are already compressed
    internally; an outer compression layer would add CPU for little gain.
    """
    # zarr v3 arrays do not expose explicit close, but DirectoryStore
    # flushes on append. Dropping refs before zip verification is enough.
    self.stores.clear()
    self.ts_stores.clear()

    zip_path = self.path.with_name(self.path.name + ".session.zip")
    tmp_path = zip_path.with_suffix(zip_path.suffix + ".tmp")
    if tmp_path.exists():
        tmp_path.unlink()

    with zipfile.ZipFile(tmp_path, "w", compression=zipfile.ZIP_STORED) as zf:
        for f in self.path.rglob("*"):
            if f.is_file():
                zf.write(f, arcname=str(f.relative_to(self.path)))

    try:
        with zipfile.ZipFile(tmp_path) as zf:
            names = zf.namelist()
            if "meta.json" not in names:
                raise RuntimeError("meta.json missing in packed zip")
        store = zarr.storage.ZipStore(tmp_path, mode="r")
        try:
            for name in self._streams_info:
                zarr.open_array(store=store, path=f"{name}.zarr", mode="r")
        finally:
            store.close()
    except Exception:
        tmp_path.unlink(missing_ok=True)
        raise

    tmp_path.rename(zip_path)
    shutil.rmtree(self.path)
    self.path = zip_path
    return zip_path

get_trials

get_trials(stream_name: str, pre: float = 0.0, post: float = 0.0, class_names: list[str] | None = None) -> list[Recording]

Extract discrete labeled windows for classification training.

Source code in myogestic/_session_core.py
def get_trials(
    self,
    stream_name: str,
    pre: float = 0.0,
    post: float = 0.0,
    class_names: list[str] | None = None,
) -> list[Recording]:
    """Extract discrete labeled windows for classification training."""
    data = np.array(self.stores[stream_name])
    ts = np.array(self.ts_stores[stream_name])
    if len(ts) == 0 or len(self.label_track) == 0:
        return []

    trials = []
    for i, event in enumerate(self.label_track):
        if event.class_index == -1:
            continue

        idx_start = int(np.argmin(np.abs(ts - (event.timestamp - pre))))
        if i + 1 < len(self.label_track):
            idx_end = int(np.argmin(np.abs(ts - self.label_track[i + 1].timestamp)))
        elif post > 0:
            idx_end = int(np.argmin(np.abs(ts - (event.timestamp + post))))
        else:
            idx_end = len(ts)

        if idx_end <= idx_start:
            continue

        name = class_names[event.class_index] if class_names else str(event.class_index)
        trials.append(
            Recording(
                class_index=event.class_index,
                class_name=name,
                data=data[idx_start:idx_end],
                timestamps=ts[idx_start:idx_end],
            )
        )
    return trials

get_continuous

get_continuous(stream_name: str) -> tuple[ndarray, ndarray]

Return full stream data + timestamps for regression training.

Source code in myogestic/_session_core.py
def get_continuous(self, stream_name: str) -> tuple[np.ndarray, np.ndarray]:
    """Return full stream data + timestamps for regression training."""
    return np.array(self.stores[stream_name]), np.array(self.ts_stores[stream_name])

stream_info

stream_info(stream_name: str) -> StreamInfo

Public accessor for a stream's StreamInfo.

Source code in myogestic/_session_core.py
def stream_info(self, stream_name: str) -> StreamInfo:
    """Public accessor for a stream's StreamInfo."""
    return self._streams_info[stream_name]

LabelEvent dataclass

LabelEvent(timestamp: float, class_index: int)

One entry in a session's label track: "at LSL time T, the user picked class N".

Recorded whenever the user clicks a class button in :func:~myogestic.widgets.recording_controls. The label track is a chronological list of these events; the recording-window iterators (:func:~myogestic.session.iter_labeled_windows, :func:~myogestic.session.iter_aligned_windows) walk the track to decide which sample range gets which class index.

Attributes:

Name Type Description
timestamp float

LSL clock time (seconds) when the label was emitted. Use mne_lsl.lsl.local_clock() if you ever need to mint one by hand.

class_index int

Index into the session's class_names list. -1 is the unlabeled sentinel (the iterators skip it).

Recording dataclass

Recording(class_index: int, class_name: str, data: ndarray, timestamps: ndarray)

A single labeled trial, extracted from a Session.