Skip to content

Sources

A Source wraps a device, file, or transport behind a uniform interface. Built-in sources live here; custom sources implement the Source protocol below. See Add a custom source for the recipe.

The protocol

Source

Bases: Protocol

Protocol every data source must implement.

Three methods, no inheritance: connect opens the device or file and returns a :class:StreamInfo describing the data, read polls non-blockingly for the next chunk, disconnect releases the device. The framework's :class:Stream wraps any object matching this Protocol and runs it on a daemon acquisition thread.

See Add a custom source for worked examples and the full contract.

connect

connect() -> StreamInfo

Open the device / file / socket. Return a :class:StreamInfo.

Source code in myogestic/stream.py
def connect(self) -> StreamInfo:
    """Open the device / file / socket. Return a :class:`StreamInfo`."""
    ...

read

read() -> tuple[ndarray | None, ndarray | None]

Poll for the next chunk of samples.

Return (data, ts) where data is sample-major (n_samples, n_channels) and ts is (n_samples,) float64 LSL clock timestamps. Return (None, None) if no new data is available.

Source code in myogestic/stream.py
def read(self) -> tuple[np.ndarray | None, np.ndarray | None]:
    """Poll for the next chunk of samples.

    Return ``(data, ts)`` where ``data`` is sample-major
    ``(n_samples, n_channels)`` and ``ts`` is ``(n_samples,)``
    float64 LSL clock timestamps. Return ``(None, None)`` if no
    new data is available.
    """
    ...

disconnect

disconnect() -> None

Release the device.

Idempotent - may be called multiple times during shutdown.

Source code in myogestic/stream.py
def disconnect(self) -> None:
    """Release the device.

    Idempotent - may be called multiple times during shutdown.
    """
    ...

Built-in sources

LSLSource

LSLSource(stream_name: str, dtype: DTypeLike | None = 'float32')

Pull samples from a Lab Streaming Layer outlet by name.

The default real-time source for MyoGestic: name the outlet you want on your local LSL network, drop the source into a :class:Stream, and the framework's acquisition thread handles the rest. Uses mne_lsl under the hood.

Parameters:

Name Type Description Default
stream_name str

LSL outlet name to subscribe to (e.g. "TestEMG1", "VHI_Control"). Resolved by name only - channel layout and sample rate come from the outlet's own metadata.

required
dtype DTypeLike | None

Dtype the samples are stored as (one of :data:~myogestic.stream.SUPPORTED_DTYPES). Default "float32". Incoming samples are cast to this dtype, so a compact choice (e.g. "int16") halves ring-buffer and recording size. None keeps the outlet's native wire format (lossless for int amps). Note: the window passed to @pipeline.extract is always upcast to float32 regardless of this choice.

'float32'

Examples:

>>> from myogestic import Stream
>>> from myogestic.sources import LSLSource
>>> stream = Stream("emg", source=LSLSource("TestEMG1"),
...                 window_ms=1000)
>>> # keep a 16-bit amp's native format to halve memory / disk:
>>> raw = LSLSource("TestEMG1", dtype=None)

The source is non-blocking: :meth:read pulls whatever is immediately available from the inlet and returns (None, None) when the outlet has produced nothing new since the last call. The acquisition thread paces itself by waiting for the inlet to fill, so a fast spin loop is harmless.

Source code in myogestic/sources/lsl.py
def __init__(self, stream_name: str, dtype: npt.DTypeLike | None = "float32"):
    self._name = stream_name
    # None -> honour the outlet's native wire format (resolved in connect);
    # otherwise cast incoming samples to this dtype.
    self._requested_dtype = None if dtype is None else np.dtype(dtype)
    self._dtype = np.dtype(np.float32)  # resolved in connect()
    self._inlet: StreamInlet | None = None

connect

connect() -> StreamInfo

Resolve the outlet by name and open an inlet.

Returns a :class:StreamInfo whose channel count and sample rate come from the outlet's metadata. dtype is the value requested at construction (default float32), or the outlet's native wire format when constructed with dtype=None. Blocks up to 10 s waiting for the outlet to appear on the network.

Raises:

Type Description
RuntimeError

if no outlet with stream_name is found. The error message lists every outlet that is currently advertised, to make typos and stream-name mismatches obvious.

Source code in myogestic/sources/lsl.py
def connect(self) -> StreamInfo:
    """Resolve the outlet by name and open an inlet.

    Returns a :class:`StreamInfo` whose channel count and sample rate
    come from the outlet's metadata. ``dtype`` is the value requested
    at construction (default float32), or the outlet's **native** wire
    format when constructed with ``dtype=None``. Blocks up to 10 s
    waiting for the outlet to appear on the network.

    Raises
    ------
    RuntimeError
        if no outlet with ``stream_name`` is found.
        The error message lists every outlet that *is* currently
        advertised, to make typos and stream-name mismatches
        obvious.
    """
    streams = resolve_streams(timeout=10.0, name=self._name)
    if not streams:
        available = [s.name for s in resolve_streams(timeout=2.0)]
        raise RuntimeError(
            f"LSL stream '{self._name}' not found. Available streams: {available}"
        )
    info = streams[0]
    self._inlet = StreamInlet(info, max_buffered=10)
    # mne_lsl exposes the outlet's native wire dtype as info.dtype.
    native = np.dtype(info.dtype)
    self._dtype = self._requested_dtype if self._requested_dtype is not None else native
    return StreamInfo(n_channels=info.n_channels, fs=info.sfreq, dtype=self._dtype)

read

read() -> tuple[ndarray | None, ndarray | None]

Pull whatever samples are immediately available.

Non-blocking. Returns (data, timestamps) where data is (n_samples, n_channels) in the configured dtype (default float32) and timestamps is a 1-D float64 array of LSL clock seconds. Returns (None, None) if the inlet hasn't been opened or no new samples are pending.

Source code in myogestic/sources/lsl.py
def read(self) -> tuple[np.ndarray | None, np.ndarray | None]:
    """Pull whatever samples are immediately available.

    Non-blocking. Returns ``(data, timestamps)`` where ``data`` is
    ``(n_samples, n_channels)`` in the configured ``dtype`` (default
    float32) and ``timestamps`` is a 1-D float64 array of LSL clock
    seconds. Returns ``(None, None)`` if the inlet hasn't been opened
    or no new samples are pending.
    """
    if self._inlet is None:
        return None, None
    data, timestamps = self._inlet.pull_chunk(timeout=0.0)
    if timestamps is None or len(timestamps) == 0:
        return None, None
    return (np.asarray(data, dtype=self._dtype), np.asarray(timestamps, dtype=np.float64)
    )

disconnect

disconnect() -> None

Close the inlet. Safe to call multiple times.

Source code in myogestic/sources/lsl.py
def disconnect(self) -> None:
    """Close the inlet. Safe to call multiple times."""
    if self._inlet:
        self._inlet.close_stream()
        self._inlet = None

discover

discover() -> list[dict[str, str]]

Scan for available LSL streams on the network.

Source code in myogestic/sources/lsl.py
def discover(self) -> list[dict[str, str]]:
    """Scan for available LSL streams on the network."""
    found = resolve_streams(timeout=2.0)
    return [
        {"name": s.name, "info": f"{s.n_channels}ch {s.sfreq:.0f}Hz {s.stype}"} for s in found
    ]

reconnect

reconnect(target: str | None = None) -> StreamInfo

Reconnect to the same or a different LSL stream.

Source code in myogestic/sources/lsl.py
def reconnect(self, target: str | None = None) -> StreamInfo:
    """Reconnect to the same or a different LSL stream."""
    self.disconnect()
    if target is not None:
        self._name = target
    return self.connect()

ReplaySource

ReplaySource(session_path: str, stream_name: str, speed: float = 1.0)

Replays a recorded session as if it were a live stream.

Accepts either a folder-format session or a .session.zip archive — delegates to open_session_store so both layouts work transparently.

Source code in myogestic/sources/replay.py
def __init__(self, session_path: str, stream_name: str, speed: float = 1.0):
    self._path = Path(session_path)
    self._stream_name = stream_name
    self._speed = speed
    self._pos = 0
    self._last_read_time: float | None = None
    self._chunk_size = 64
    self._session: Session | None = None

connect

connect() -> StreamInfo

Open the recorded session and return its :class:StreamInfo.

Raises :class:ValueError if the requested stream name is not present in the session.

Source code in myogestic/sources/replay.py
def connect(self) -> StreamInfo:
    """Open the recorded session and return its :class:`StreamInfo`.

    Raises :class:`ValueError` if the requested stream name is not
    present in the session.
    """
    sess = open_session_store(self._path)
    # Hold the session: reads are lazy off its zarr/ZipStore, so it must stay
    # open until disconnect() — and we must close it there, or the
    # .session.zip stays locked on Windows.
    self._session = sess
    if self._stream_name not in sess.stores:
        sess.close()
        self._session = None
        raise ValueError(
            f"Stream {self._stream_name!r} not in session "
            f"{self._path} (have: {list(sess.stores)})"
        )
    self._data = sess.stores[self._stream_name]
    self._ts = sess.ts_stores[self._stream_name]
    info = sess.stream_info(self._stream_name)
    self._fs = info.fs
    return info

read

read() -> tuple[ndarray | None, ndarray | None]

Return the next recorded chunk, paced to wall-clock time x speed.

Returns (None, None) when no samples are due yet; loops back to the start of the recording once the end is reached.

Source code in myogestic/sources/replay.py
def read(self) -> tuple[np.ndarray | None, np.ndarray | None]:
    """Return the next recorded chunk, paced to wall-clock time x ``speed``.

    Returns ``(None, None)`` when no samples are due yet; loops back
    to the start of the recording once the end is reached.
    """
    now = time.perf_counter()
    if self._last_read_time is not None:
        elapsed = (now - self._last_read_time) * self._speed
        samples_due = int(elapsed * self._fs)
        if samples_due < 1:
            return None, None
    else:
        samples_due = self._chunk_size

    self._last_read_time = now
    total = self._data.shape[0]
    end = min(self._pos + samples_due, total)
    if self._pos >= end:
        self._pos = 0  # loop
        return None, None

    data = np.array(self._data[self._pos : end])
    ts = np.array(self._ts[self._pos : end])
    self._pos = end
    return data, ts

disconnect

disconnect() -> None

Close the session and rewind the replay position.

Closing releases the ZipStore — on Windows an open handle keeps the .session.zip locked, so the file couldn't be deleted or re-recorded until the source was garbage-collected.

Source code in myogestic/sources/replay.py
def disconnect(self) -> None:
    """Close the session and rewind the replay position.

    Closing releases the ``ZipStore`` — on Windows an open handle keeps the
    ``.session.zip`` locked, so the file couldn't be deleted or re-recorded
    until the source was garbage-collected.
    """
    if self._session is not None:
        self._session.close()
        self._session = None
    self._pos = 0

SerialSource

Opt-in: lives at myogestic.sources.serial_source.SerialSource. Direct import only (requires the serial extra for pyserial).

SerialSource

SerialSource(port: str, baud: int, n_channels: int, fs: float)

Reads fixed-width binary frames from a serial port.

Each frame is n_channels float32 values (little-endian). The source self-paces via serial blocking reads; timestamps are stamped on arrival with mne_lsl.lsl.local_clock().

Source code in myogestic/sources/serial_source.py
def __init__(self, port: str, baud: int, n_channels: int, fs: float):
    self._port = port
    self._baud = baud
    self._n_channels = n_channels
    self._fs = fs
    self._ser: serial.Serial | None = None
    self._frame_bytes = n_channels * 4  # float32 = 4 bytes

connect

connect() -> StreamInfo

Open the serial port and return the configured :class:StreamInfo.

Source code in myogestic/sources/serial_source.py
def connect(self) -> StreamInfo:
    """Open the serial port and return the configured :class:`StreamInfo`."""
    import serial  # type: ignore

    self._ser = serial.Serial(self._port, self._baud, timeout=1.0)
    return StreamInfo(
        n_channels=self._n_channels,
        fs=self._fs,
        dtype=np.dtype(np.float32),
    )

read

read() -> tuple[ndarray | None, ndarray | None]

Read one n_channels-float32 frame, timestamped on arrival.

Returns (None, None) when the port is closed or a short read yields fewer than n_channels values.

Source code in myogestic/sources/serial_source.py
def read(self) -> tuple[np.ndarray | None, np.ndarray | None]:
    """Read one ``n_channels``-float32 frame, timestamped on arrival.

    Returns ``(None, None)`` when the port is closed or a short read
    yields fewer than ``n_channels`` values.
    """
    if self._ser is None:
        return None, None
    raw = self._ser.read(self._frame_bytes)
    if len(raw) < self._frame_bytes:
        return None, None
    values = struct.unpack(f"<{self._n_channels}f", raw)
    data = np.array([values], dtype=np.float32)
    ts = np.array([local_clock()], dtype=np.float64)
    return data, ts

disconnect

disconnect() -> None

Close the serial port if open.

Source code in myogestic/sources/serial_source.py
def disconnect(self) -> None:
    """Close the serial port if open."""
    if self._ser is not None:
        self._ser.close()
        self._ser = None

discover

discover() -> list[dict[str, str]]

List available serial ports.

Source code in myogestic/sources/serial_source.py
def discover(self) -> list[dict[str, str]]:
    """List available serial ports."""
    import serial.tools.list_ports  # type: ignore

    return [
        {"name": p.device, "info": p.description or p.device}
        for p in serial.tools.list_ports.comports()
    ]

reconnect

reconnect(target: str | None = None) -> StreamInfo

Reconnect to the same or a different serial port.

Source code in myogestic/sources/serial_source.py
def reconnect(self, target: str | None = None) -> StreamInfo:
    """Reconnect to the same or a different serial port."""
    self.disconnect()
    if target is not None:
        self._port = target
    return self.connect()