Skip to content

Sources

A Source wraps a device, file, or transport behind a uniform interface. Built-in sources live here; custom sources implement the Source protocol below. See Add a custom source for the recipe.

The protocol

Source

Bases: Protocol

Protocol every data source must implement.

Three methods, no inheritance: connect opens the device or file and returns a :class:StreamInfo describing the data, read polls non-blockingly for the next chunk, disconnect releases the device. The framework's :class:Stream wraps any object matching this Protocol and runs it on a daemon acquisition thread.

See Add a custom source for worked examples and the full contract.

connect

connect() -> StreamInfo

Open the device / file / socket. Return a :class:StreamInfo.

Source code in myogestic/stream.py
def connect(self) -> StreamInfo:
    """Open the device / file / socket. Return a :class:`StreamInfo`."""
    ...

read

read() -> tuple[ndarray | None, ndarray | None]

Return (data, ts) where data is sample-major (n_samples, n_channels) and ts is (n_samples,) float64 LSL clock timestamps. Return (None, None) if no new data is available.

Source code in myogestic/stream.py
def read(self) -> tuple[np.ndarray | None, np.ndarray | None]:
    """Return ``(data, ts)`` where ``data`` is sample-major
    ``(n_samples, n_channels)`` and ``ts`` is ``(n_samples,)``
    float64 LSL clock timestamps. Return ``(None, None)`` if no
    new data is available.
    """
    ...

disconnect

disconnect() -> None

Release the device. Idempotent - may be called multiple times during shutdown.

Source code in myogestic/stream.py
def disconnect(self) -> None:
    """Release the device. Idempotent - may be called multiple
    times during shutdown."""
    ...

Built-in sources

LSLSource

LSLSource(stream_name: str)

Pull samples from a Lab Streaming Layer outlet by name.

The default real-time source for MyoGestic: name the outlet you want on your local LSL network, drop the source into a :class:Stream, and the framework's acquisition thread handles the rest. Uses mne_lsl under the hood.

Parameters:

Name Type Description Default
stream_name str

LSL outlet name to subscribe to (e.g. "TestEMG1", "VHI_Control"). Resolved by name only - channel layout and sample rate come from the outlet's own metadata.

required
Example

from myogestic import Stream from myogestic.sources import LSLSource stream = Stream("emg", source=LSLSource("TestEMG1"), ... window_seconds=1.0)

The source is non-blocking: :meth:read pulls whatever is immediately available from the inlet and returns (None, None) when the outlet has produced nothing new since the last call. The acquisition thread paces itself by waiting for the inlet to fill, so a fast spin loop is harmless.

Source code in myogestic/sources/lsl.py
def __init__(self, stream_name: str):
    self._name = stream_name
    self._inlet: StreamInlet | None = None

connect

connect() -> StreamInfo

Resolve the outlet by name and open an inlet.

Returns a :class:StreamInfo whose channel count, sample rate, and dtype come from the outlet's metadata. Blocks up to 10 s waiting for the outlet to appear on the network.

Raises:

Type Description
RuntimeError

if no outlet with stream_name is found. The error message lists every outlet that is currently advertised, to make typos and stream-name mismatches obvious.

Source code in myogestic/sources/lsl.py
def connect(self) -> StreamInfo:
    """Resolve the outlet by name and open an inlet.

    Returns a :class:`StreamInfo` whose channel count, sample rate,
    and dtype come from the outlet's metadata. Blocks up to 10 s
    waiting for the outlet to appear on the network.

    Raises:
        RuntimeError: if no outlet with ``stream_name`` is found.
            The error message lists every outlet that *is* currently
            advertised, to make typos and stream-name mismatches
            obvious.
    """
    streams = resolve_streams(timeout=10.0, name=self._name)
    if not streams:
        available = [s.name for s in resolve_streams(timeout=2.0)]
        raise RuntimeError(
            f"LSL stream '{self._name}' not found. "
            f"Available streams: {available}"
        )
    info = streams[0]
    self._inlet = StreamInlet(info, max_buffered=10)
    return StreamInfo(
        n_channels=info.n_channels,
        fs=info.sfreq,
        dtype=np.dtype(np.float32),
    )

read

read() -> tuple[ndarray | None, ndarray | None]

Pull whatever samples are immediately available.

Non-blocking. Returns (data, timestamps) where data is (n_samples, n_channels) float32 and timestamps is a 1-D float64 array of LSL clock seconds. Returns (None, None) if the inlet hasn't been opened or no new samples are pending.

Source code in myogestic/sources/lsl.py
def read(self) -> tuple[np.ndarray | None, np.ndarray | None]:
    """Pull whatever samples are immediately available.

    Non-blocking. Returns ``(data, timestamps)`` where ``data`` is
    ``(n_samples, n_channels)`` float32 and ``timestamps`` is a 1-D
    float64 array of LSL clock seconds. Returns ``(None, None)`` if
    the inlet hasn't been opened or no new samples are pending.
    """
    if self._inlet is None:
        return None, None
    data, timestamps = self._inlet.pull_chunk(timeout=0.0)
    if timestamps is None or len(timestamps) == 0:
        return None, None
    return (
        np.asarray(data, dtype=np.float32),
        np.asarray(timestamps, dtype=np.float64),
    )

disconnect

disconnect() -> None

Close the inlet. Safe to call multiple times.

Source code in myogestic/sources/lsl.py
def disconnect(self) -> None:
    """Close the inlet. Safe to call multiple times."""
    if self._inlet:
        self._inlet.close_stream()
        self._inlet = None

discover

discover() -> list[dict[str, str]]

Scan for available LSL streams on the network.

Source code in myogestic/sources/lsl.py
def discover(self) -> list[dict[str, str]]:
    """Scan for available LSL streams on the network."""
    found = resolve_streams(timeout=2.0)
    return [
        {"name": s.name, "info": f"{s.n_channels}ch {s.sfreq:.0f}Hz {s.stype}"}
        for s in found
    ]

reconnect

reconnect(target: str | None = None) -> StreamInfo

Reconnect to the same or a different LSL stream.

Source code in myogestic/sources/lsl.py
def reconnect(self, target: str | None = None) -> StreamInfo:
    """Reconnect to the same or a different LSL stream."""
    self.disconnect()
    if target is not None:
        self._name = target
    return self.connect()

ReplaySource

ReplaySource(session_path: str, stream_name: str, speed: float = 1.0)

Replays a recorded session as if it were a live stream.

Accepts either a folder-format session or a .session.zip archive — delegates to open_session_store so both layouts work transparently.

Source code in myogestic/sources/replay.py
def __init__(self, session_path: str, stream_name: str, speed: float = 1.0):
    self._path = Path(session_path)
    self._stream_name = stream_name
    self._speed = speed
    self._pos = 0
    self._last_read_time: float | None = None
    self._chunk_size = 64

SerialSource

Opt-in: lives at myogestic.sources.serial_source.SerialSource. Direct import only (requires the serial extra for pyserial).

SerialSource

SerialSource(port: str, baud: int, n_channels: int, fs: float)

Reads fixed-width binary frames from a serial port.

Each frame is n_channels float32 values (little-endian). The source self-paces via serial blocking reads; timestamps are stamped on arrival with mne_lsl.lsl.local_clock().

Source code in myogestic/sources/serial_source.py
def __init__(self, port: str, baud: int, n_channels: int, fs: float):
    self._port = port
    self._baud = baud
    self._n_channels = n_channels
    self._fs = fs
    self._ser: serial.Serial | None = None
    self._frame_bytes = n_channels * 4  # float32 = 4 bytes

discover

discover() -> list[dict[str, str]]

List available serial ports.

Source code in myogestic/sources/serial_source.py
def discover(self) -> list[dict[str, str]]:
    """List available serial ports."""
    import serial.tools.list_ports  # type: ignore[import-not-found]

    return [
        {"name": p.device, "info": p.description or p.device}
        for p in serial.tools.list_ports.comports()
    ]

reconnect

reconnect(target: str | None = None) -> StreamInfo

Reconnect to the same or a different serial port.

Source code in myogestic/sources/serial_source.py
def reconnect(self, target: str | None = None) -> StreamInfo:
    """Reconnect to the same or a different serial port."""
    self.disconnect()
    if target is not None:
        self._port = target
    return self.connect()