Add a custom source¶
A Source is a Protocol - three methods, no inheritance. Implement them, hand the instance to Stream(...), and the framework handles threading, buffering, and recording for you.
The protocol¶
from typing import Protocol
import numpy as np
from myogestic import StreamInfo
class Source(Protocol):
def connect(self) -> StreamInfo: ...
def read(self) -> tuple[np.ndarray | None, np.ndarray | None]: ...
def disconnect(self) -> None: ...
That's the whole interface.
connect() -> StreamInfo¶
Open the device / file / socket. Return a StreamInfo describing the data: n_channels, fs, optional dtype (defaults to np.float32), optional channel_names. Auto-discovery is the goal - the user shouldn't have to pass these manually unless the source genuinely can't self-describe.
If the source can't connect, raise - the framework surfaces the error in the GUI status bar.
read() -> (data, ts) | (None, None)¶
Called repeatedly by the acquisition thread. Return either:
(data, ts)wheredata.shape == (n_samples, n_channels)(sample-major) andts.shape == (n_samples,)(float64 LSL clock timestamps). The framework converts to channels-first atStream.get_window()for you.(None, None)if no new data is available right now. The framework will callread()again. Don't block forever - return promptly so the thread can be stopped cleanly when the app exits.
disconnect() -> None¶
Release the device. Idempotent - may be called multiple times during shutdown.
Worked example: a CSV replay source¶
A toy source that replays a CSV file at the original sample rate:
from pathlib import Path
import numpy as np
import time
from mne_lsl.lsl import local_clock
from myogestic import StreamInfo
class CSVSource:
def __init__(self, path: str, fs: float, channel_names: list[str] | None = None):
self.path = Path(path)
self.fs = fs
self.channel_names = channel_names
self._data: np.ndarray | None = None
self._idx = 0
self._t0 = 0.0
def connect(self) -> StreamInfo:
arr = np.loadtxt(self.path, delimiter=",", dtype=np.float32)
if arr.ndim == 1:
arr = arr[:, None]
self._data = arr
self._idx = 0
self._t0 = local_clock()
return StreamInfo(
n_channels=arr.shape[1],
fs=self.fs,
dtype=np.dtype("float32"),
channel_names=self.channel_names,
)
def read(self) -> tuple[np.ndarray | None, np.ndarray | None]:
if self._data is None or self._idx >= self._data.shape[0]:
return None, None
# How many samples should have arrived by now (real-time pacing)?
elapsed = local_clock() - self._t0
target = int(elapsed * self.fs)
if target <= self._idx:
return None, None
end = min(target, self._data.shape[0])
chunk = self._data[self._idx : end]
ts = (np.arange(self._idx, end) / self.fs + self._t0).astype(np.float64)
self._idx = end
return chunk, ts
def disconnect(self) -> None:
self._data = None
Use it:
from myogestic import App, Stream
app = App("CSV replay")
app.streams(Stream("emg", source=CSVSource("recording.csv", fs=2000.0), window_seconds=1.0))
app.run()
That's the whole integration.
Worked example: bridge a third-party SDK¶
Suppose you have a vendor SDK that emits chunks via a callback. Adapter pattern:
import threading
import queue
import numpy as np
from myogestic import StreamInfo
class VendorSource:
def __init__(self, vendor_handle):
self._vendor = vendor_handle
self._queue: queue.Queue = queue.Queue(maxsize=1000)
def connect(self) -> StreamInfo:
self._vendor.set_callback(self._on_chunk)
self._vendor.start()
info = self._vendor.describe() # whatever the SDK exposes
return StreamInfo(n_channels=info.n_ch, fs=info.fs)
def _on_chunk(self, samples: np.ndarray, ts: np.ndarray) -> None:
try:
self._queue.put_nowait((samples, ts))
except queue.Full:
pass # backpressure: drop on overflow rather than block the SDK
def read(self) -> tuple[np.ndarray | None, np.ndarray | None]:
try:
return self._queue.get_nowait()
except queue.Empty:
return None, None
def disconnect(self) -> None:
self._vendor.stop()
The vendor's callback runs on its own thread; the queue decouples it from the acquisition thread.
Reference implementations¶
| Source | Where | Pattern |
|---|---|---|
LSLSource |
myogestic/sources/lsl.py |
LSL inlet polling with pull_chunk(timeout=0.0) |
ReplaySource |
myogestic/sources/replay.py |
Reads from a .session.zip, paces with real-time clock |
SerialSource |
myogestic/sources/serial_source.py |
pyserial loop with line-based parsing |
Mirror whichever is closest to your transport.
Testing your source without the GUI¶
import time
src = MySource(...)
info = src.connect()
print(f"{info.n_channels} ch @ {info.fs} Hz")
for _ in range(20):
data, ts = src.read()
if data is not None:
print(f"got {data.shape[0]} samples")
time.sleep(0.05)
src.disconnect()
Once that loop produces sane output, the framework will work too.