From 02cb383a5fe7d40fd65374589289ef5cfede4d75 Mon Sep 17 00:00:00 2001 From: Richeek Das Date: Tue, 28 Apr 2026 02:18:35 -0400 Subject: [PATCH 1/8] PING addded to Discovery, RateExecutor now catches up, Top level imports removed --- examples/publisher_dict.py | 3 ++- examples/publisher_numpy.py | 3 ++- examples/publisher_tensor.py | 3 ++- examples/subscriber_dict.py | 3 ++- examples/subscriber_numpy.py | 3 ++- examples/subscriber_tensor.py | 3 ++- src/cortex/__init__.py | 22 +++++----------------- src/cortex/core/executor.py | 20 ++++++++++---------- src/cortex/discovery/client.py | 15 +++++++++++++++ src/cortex/discovery/daemon.py | 6 ++++++ src/cortex/discovery/protocol.py | 1 + tests/test_discovery.py | 16 ++++++++++++++++ 12 files changed, 65 insertions(+), 33 deletions(-) diff --git a/examples/publisher_dict.py b/examples/publisher_dict.py index 0552b2f..ff43d8f 100644 --- a/examples/publisher_dict.py +++ b/examples/publisher_dict.py @@ -23,7 +23,8 @@ import numpy as np import cortex -from cortex import DictMessage, Node +from cortex import Node +from cortex.messages.standard import DictMessage class DictPublisherNode(Node): diff --git a/examples/publisher_numpy.py b/examples/publisher_numpy.py index bd14a25..33d1fdb 100644 --- a/examples/publisher_numpy.py +++ b/examples/publisher_numpy.py @@ -19,7 +19,8 @@ import numpy as np import cortex -from cortex import ArrayMessage, Node +from cortex import Node +from cortex.messages.standard import ArrayMessage class ArrayPublisherNode(Node): diff --git a/examples/publisher_tensor.py b/examples/publisher_tensor.py index e17362e..5773d29 100644 --- a/examples/publisher_tensor.py +++ b/examples/publisher_tensor.py @@ -23,7 +23,8 @@ exit(1) import cortex -from cortex import Node, TensorMessage +from cortex import Node +from cortex.messages.standard import TensorMessage class TensorPublisherNode(Node): diff --git a/examples/subscriber_dict.py b/examples/subscriber_dict.py index e977310..717a7d3 100644 --- a/examples/subscriber_dict.py +++ b/examples/subscriber_dict.py @@ -20,8 +20,9 @@ import contextlib import cortex -from cortex import DictMessage, Node +from cortex import Node from cortex.messages.base import MessageHeader +from cortex.messages.standard import DictMessage class DictSubscriberNode(Node): diff --git a/examples/subscriber_numpy.py b/examples/subscriber_numpy.py index 369d19c..b8a2c24 100644 --- a/examples/subscriber_numpy.py +++ b/examples/subscriber_numpy.py @@ -17,8 +17,9 @@ """ import cortex -from cortex import ArrayMessage, Node +from cortex import Node from cortex.messages.base import MessageHeader +from cortex.messages.standard import ArrayMessage async def on_array_received(msg: ArrayMessage, header: MessageHeader): diff --git a/examples/subscriber_tensor.py b/examples/subscriber_tensor.py index 82a806b..b093f87 100644 --- a/examples/subscriber_tensor.py +++ b/examples/subscriber_tensor.py @@ -23,8 +23,9 @@ exit(1) import cortex -from cortex import Node, TensorMessage +from cortex import Node from cortex.messages.base import MessageHeader +from cortex.messages.standard import TensorMessage async def on_tensor_received(msg: TensorMessage, header: MessageHeader): diff --git a/src/cortex/__init__.py b/src/cortex/__init__.py index f680d7c..371d4ca 100644 --- a/src/cortex/__init__.py +++ b/src/cortex/__init__.py @@ -7,21 +7,17 @@ - Support for numpy arrays, torch tensors, and Python dicts - 64-bit fingerprint hashing for message type identification - Asyncio-based architecture for cooperative multitasking (with uvloop on Unix) + +The top-level package exposes only the core API: ``Node``, ``Publisher``, +``Subscriber``, ``run``, ``Message``, and ``MessageType``. Standard message +implementations live in :mod:`cortex.messages.standard`; executors in +:mod:`cortex.core.executor`; discovery in :mod:`cortex.discovery`. """ -from cortex.core.executor import AsyncExecutor, RateExecutor from cortex.core.node import Node from cortex.core.publisher import Publisher from cortex.core.subscriber import Subscriber from cortex.messages.base import Message, MessageType -from cortex.messages.standard import ( - ArrayMessage, - DictMessage, - FloatMessage, - IntMessage, - StringMessage, - TensorMessage, -) from cortex.utils.loop import run __version__ = "0.1.0" @@ -29,15 +25,7 @@ "Node", "Publisher", "Subscriber", - "AsyncExecutor", - "RateExecutor", "Message", "MessageType", - "ArrayMessage", - "TensorMessage", - "DictMessage", - "StringMessage", - "FloatMessage", - "IntMessage", "run", ] diff --git a/src/cortex/core/executor.py b/src/cortex/core/executor.py index 668d272..d9e687a 100644 --- a/src/cortex/core/executor.py +++ b/src/cortex/core/executor.py @@ -94,10 +94,12 @@ async def _run_impl(self, *args, **kwargs) -> None: class RateExecutor(BaseExecutor): """Runs an async callable at a target rate in Hz. - Uses ``time.perf_counter`` for scheduling and catches up on overruns by - advancing ``next_exec_time`` instead of firing back-to-back. Dropped - ticks are **not** reported — suitable for telemetry and periodic I/O, - but not for hard real-time control without external monitoring. + Uses ``time.perf_counter`` for scheduling. If a callback overruns the + nominal period, ``next_exec_time`` stays on the fixed grid (only + ``+ interval`` per invocation); the loop then sleeps 0 until the clock + catches up, so **missed ticks are not skipped**. This matches the + historical neurosim ``ZMQNODE`` constant-rate executor behavior and is + appropriate for simulation stepping. Example: ```python @@ -123,9 +125,11 @@ def __init__(self, func: AsyncCallback, rate_hz: float): async def _run_impl(self, *args, **kwargs) -> None: """ - Run a function at constant rate with precise timing. + Run a function on a fixed ``perf_counter`` grid at ``rate_hz``. - Executions happen at exact intervals regardless of execution time. + When the callback is slow, ticks are not skipped: ``next_exec_time`` + advances by one interval per invocation and the loop yields until + the clock catches up (zero-length sleeps while behind). """ next_exec_time = time.perf_counter() @@ -137,10 +141,6 @@ async def _run_impl(self, *args, **kwargs) -> None: await self.func(*args, **kwargs) next_exec_time += self.interval - # If we've fallen behind, catch up - if next_exec_time < current_time: - next_exec_time = current_time + self.interval - await asyncio.sleep(0) # Yield to event loop except asyncio.CancelledError: break diff --git a/src/cortex/discovery/client.py b/src/cortex/discovery/client.py index e07fcdd..3ddd012 100644 --- a/src/cortex/discovery/client.py +++ b/src/cortex/discovery/client.py @@ -249,6 +249,21 @@ async def wait_for_topic_async( return None + def ping(self) -> bool: + """Check whether the discovery daemon is reachable. + + Returns: + True if the daemon responded with OK within the configured + timeout/retries, False otherwise. + """ + request = DiscoveryRequest(command=DiscoveryCommand.PING) + try: + response = self._send_request(request) + except Exception as e: + logger.debug(f"Ping failed: {e}") + return False + return response.status == DiscoveryStatus.OK + def list_topics(self) -> list[TopicInfo]: """ List all registered topics. diff --git a/src/cortex/discovery/daemon.py b/src/cortex/discovery/daemon.py index c1a9aca..132a6f9 100644 --- a/src/cortex/discovery/daemon.py +++ b/src/cortex/discovery/daemon.py @@ -152,6 +152,8 @@ def _handle_request(self, request_bytes: bytes) -> DiscoveryResponse: return self._handle_lookup(request) elif request.command == DiscoveryCommand.LIST_TOPICS: return self._handle_list() + elif request.command == DiscoveryCommand.PING: + return self._handle_ping() elif request.command == DiscoveryCommand.SHUTDOWN: return self._handle_shutdown() else: @@ -251,6 +253,10 @@ def _handle_list(self) -> DiscoveryResponse: return DiscoveryResponse(status=DiscoveryStatus.OK, topics=topics) + def _handle_ping(self) -> DiscoveryResponse: + """Handle ping request — used by clients to verify daemon liveness.""" + return DiscoveryResponse(status=DiscoveryStatus.OK, message="pong") + def _handle_shutdown(self) -> DiscoveryResponse: """Handle shutdown request.""" self._running = False diff --git a/src/cortex/discovery/protocol.py b/src/cortex/discovery/protocol.py index a13eeda..004b91c 100644 --- a/src/cortex/discovery/protocol.py +++ b/src/cortex/discovery/protocol.py @@ -17,6 +17,7 @@ class DiscoveryCommand(IntEnum): UNREGISTER_TOPIC = 2 LOOKUP_TOPIC = 3 LIST_TOPICS = 4 + PING = 5 SHUTDOWN = 99 diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 97ac9aa..d45cd92 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -129,6 +129,22 @@ def test_register_and_lookup(self, discovery_daemon, discovery_address): client.close() + def test_ping_alive(self, discovery_daemon, discovery_address): + """Ping should return True when the daemon is reachable.""" + client = DiscoveryClient(discovery_address=discovery_address) + assert client.ping() is True + client.close() + + def test_ping_dead(self): + """Ping should return False when no daemon is listening.""" + client = DiscoveryClient( + discovery_address="ipc:///tmp/cortex/discovery_no_daemon.sock", + timeout_ms=200, + retries=1, + ) + assert client.ping() is False + client.close() + def test_lookup_nonexistent(self, discovery_daemon, discovery_address): """Lookup of nonexistent topic should return None.""" client = DiscoveryClient(discovery_address=discovery_address) From f4d0ee9f68cee65cd405e51aec74ec7ee2acf507 Mon Sep 17 00:00:00 2001 From: Richeek Das Date: Sun, 3 May 2026 19:26:50 -0400 Subject: [PATCH 2/8] add some comments on header and counter handling --- src/cortex/messages/base.py | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/cortex/messages/base.py b/src/cortex/messages/base.py index 91d8dfa..41227db 100644 --- a/src/cortex/messages/base.py +++ b/src/cortex/messages/base.py @@ -114,7 +114,10 @@ class PointCloud(Message): intensity: float = 1.0 """ - # Class-level sequence counter + # Class-level sequence counter, kept as a fallback for callers that + # serialize a message directly via ``to_bytes``/``to_frames`` without + # going through ``Publisher``. Real per-publisher gap detection lives + # on :class:`cortex.core.publisher.Publisher` (one counter per topic). _sequence_counter: ClassVar[int] = 0 _field_names_cache: ClassVar[tuple[str, ...] | None] = None @@ -132,7 +135,11 @@ def fingerprint(cls) -> int: @classmethod def _next_sequence(cls) -> int: - """Get the next sequence number.""" + """Get the next sequence number on the class-level counter. + + Used as a fallback when no explicit ``sequence`` is supplied to + :meth:`to_bytes` / :meth:`to_frames`. + """ seq = cls._sequence_counter cls._sequence_counter += 1 return seq @@ -160,15 +167,22 @@ def _build_instance(cls: type[T], values: list[object]) -> T: ) return cls(**dict(zip(field_names, values, strict=True))) - def _build_header(self) -> MessageHeader: - """Create a message header for the current instance.""" + def _build_header(self, sequence: int | None = None) -> MessageHeader: + """Create a message header for the current instance. + + Args: + sequence: Explicit sequence number (typically supplied by the + owning :class:`Publisher`). When ``None``, falls back to + the class-level counter so direct ``to_bytes`` / ``to_frames`` + calls keep working in tests and ad-hoc serialization. + """ return MessageHeader( fingerprint=self.fingerprint(), timestamp_ns=time.time_ns(), - sequence=self._next_sequence(), + sequence=self._next_sequence() if sequence is None else sequence, ) - def to_bytes(self) -> bytes: + def to_bytes(self, sequence: int | None = None) -> bytes: """ Serialize the message to bytes. @@ -176,18 +190,18 @@ def to_bytes(self) -> bytes: - 24 bytes: header (fingerprint, timestamp, sequence) - remaining: serialized field data """ - header_bytes = self._build_header().to_bytes() + header_bytes = self._build_header(sequence).to_bytes() data_bytes = serialize_message_values(self._field_values()) return header_bytes + data_bytes - def to_frames(self) -> list[object]: + def to_frames(self, sequence: int | None = None) -> list[object]: """Serialize the message into transport frames. The first frame is always the fixed-size header. The second frame holds packed metadata, and any remaining frames are raw out-of-band buffers. """ return [ - self._build_header().to_bytes(), + self._build_header(sequence).to_bytes(), *serialize_message_frames(self._field_values()), ] From 4f63f0bfbee7affaf9137f493a73f51b28742a32 Mon Sep 17 00:00:00 2001 From: Richeek Das Date: Sun, 3 May 2026 19:27:25 -0400 Subject: [PATCH 3/8] detect free threaded runtime --- src/cortex/utils/runtime.py | 101 ++++++++++++++++++++++++++++++++++++ 1 file changed, 101 insertions(+) create mode 100644 src/cortex/utils/runtime.py diff --git a/src/cortex/utils/runtime.py b/src/cortex/utils/runtime.py new file mode 100644 index 0000000..9703db3 --- /dev/null +++ b/src/cortex/utils/runtime.py @@ -0,0 +1,101 @@ +""" +Runtime introspection helpers for Cortex. + +The synchronous subscriber path relies on dedicated OS threads. Its +latency floor is dramatically tighter on a free-threaded build of CPython +(PEP 779, available as ``python3.14t``) because the receive thread does +not contend with the asyncio thread for the GIL. + +These helpers let the rest of the framework probe the running interpreter +once and adapt — defaults, log lines, and capability checks live here so +hot paths don't re-discover the environment per call. +""" + +import os +import platform +import sys +import sysconfig +from dataclasses import dataclass +from functools import cache + + +@dataclass(frozen=True, slots=True) +class RuntimeInfo: + """Snapshot of the Python runtime relevant to Cortex tuning.""" + + python_version: tuple[int, int, int] + implementation: str + free_threaded: bool + """True when the GIL is disabled at runtime (PEP 779).""" + free_threaded_build: bool + """True when running on a ``python3.14t``-style build that *can* disable + the GIL — even if a C extension has re-enabled it at runtime.""" + gil_supported: bool + """True when the interpreter exposes ``sys._is_gil_enabled`` (CPython 3.13+).""" + cpu_count: int + """Number of CPUs available to this process (``os.process_cpu_count`` on 3.13+).""" + + +@cache +def runtime_info() -> RuntimeInfo: + """Return an immutable snapshot of the running interpreter.""" + version = sys.version_info[:3] + implementation = platform.python_implementation() + + gil_probe = getattr(sys, "_is_gil_enabled", None) + gil_supported = gil_probe is not None + free_threaded = gil_supported and not gil_probe() # type: ignore[misc] + free_threaded_build = bool(sysconfig.get_config_var("Py_GIL_DISABLED")) + + process_cpu_count = getattr(os, "process_cpu_count", None) + if process_cpu_count is not None: + cpu_count = process_cpu_count() or os.cpu_count() or 1 + else: + cpu_count = os.cpu_count() or 1 + + return RuntimeInfo( + python_version=version, + implementation=implementation, + free_threaded=free_threaded, + free_threaded_build=free_threaded_build, + gil_supported=gil_supported, + cpu_count=cpu_count, + ) + + +def is_free_threaded() -> bool: + """Convenience accessor: ``True`` on free-threaded CPython.""" + return runtime_info().free_threaded + + +def low_latency_advisory() -> str | None: + """Return a one-line hint when a control-loop subscriber would benefit + from a free-threaded interpreter. + + Returns ``None`` if the current runtime is already optimal, otherwise a + short suggestion string suitable for a single ``logger.info`` call at + subscriber start. + """ + info = runtime_info() + if info.free_threaded: + return None + if info.free_threaded_build: + # python3.14t binary, but a C extension (commonly msgpack) re-enabled + # the GIL on import. The workaround is documented and safe enough + # for benchmarks. + return ( + "Sync subscriber running on a free-threaded Python build with " + "the GIL re-enabled (likely a C extension that has not declared " + "free-thread safety). Set PYTHON_GIL=0 to override and unlock " + "free-threaded behavior." + ) + if info.python_version < (3, 14): + return ( + "Sync subscriber running on Python " + f"{info.python_version[0]}.{info.python_version[1]} with GIL enabled; " + "for tighter tail latency use python3.14t (free-threaded build)." + ) + return ( + "Sync subscriber running on Python 3.14 with GIL enabled; " + "switch to python3.14t (free-threaded) for tighter tail latency." + ) From 4ae885f9490f3808957abf4960b0e522eb8e5952 Mon Sep 17 00:00:00 2001 From: Richeek Das Date: Sun, 3 May 2026 19:27:44 -0400 Subject: [PATCH 4/8] add latency tracing in cortex --- src/cortex/utils/tracing.py | 89 +++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 src/cortex/utils/tracing.py diff --git a/src/cortex/utils/tracing.py b/src/cortex/utils/tracing.py new file mode 100644 index 0000000..ecdff4e --- /dev/null +++ b/src/cortex/utils/tracing.py @@ -0,0 +1,89 @@ +""" +Per-stage latency tracing for the Cortex receive path. + +Tracing is **off by default**: when ``CORTEX_TRACE_LATENCY`` is unset (or +``0``), :func:`stage` is a near-zero-cost context manager that does not +allocate. When the env var is set to a positive integer, that many of the +most recent samples per stage are kept in a ring buffer that benchmarks +and tests can read out via :func:`snapshot`. + +This is intentionally tiny — the whole point of measuring sub-100 µs +paths is that the instrumentation itself can't be on the order of the +thing being measured. We use ``time.perf_counter_ns`` for monotonic +nanoseconds and avoid any logging or string formatting in the hot path. +""" + +import os +import threading +from collections import deque +from collections.abc import Iterator +from contextlib import contextmanager +from time import perf_counter_ns + + +def _trace_capacity() -> int: + raw = os.environ.get("CORTEX_TRACE_LATENCY", "0") + try: + n = int(raw) + except ValueError: + return 0 + return max(n, 0) + + +_CAPACITY = _trace_capacity() +_ENABLED = _CAPACITY > 0 + +_lock = threading.Lock() +_samples: dict[str, deque[int]] = {} + + +def enabled() -> bool: + """Return True when latency tracing is active for this process.""" + return _ENABLED + + +@contextmanager +def stage(name: str) -> Iterator[None]: + """Time a code region and record the elapsed nanoseconds under ``name``. + + No-op (and zero allocations beyond the generator object) when tracing + is disabled. + """ + if not _ENABLED: + yield + return + start = perf_counter_ns() + try: + yield + finally: + elapsed = perf_counter_ns() - start + with _lock: + buf = _samples.get(name) + if buf is None: + buf = deque(maxlen=_CAPACITY) + _samples[name] = buf + buf.append(elapsed) + + +def record(name: str, nanoseconds: int) -> None: + """Record a pre-computed elapsed time for ``name`` (no-op when off).""" + if not _ENABLED: + return + with _lock: + buf = _samples.get(name) + if buf is None: + buf = deque(maxlen=_CAPACITY) + _samples[name] = buf + buf.append(int(nanoseconds)) + + +def snapshot() -> dict[str, list[int]]: + """Return a copy of all collected samples in nanoseconds.""" + with _lock: + return {k: list(v) for k, v in _samples.items()} + + +def reset() -> None: + """Clear all recorded samples.""" + with _lock: + _samples.clear() From d17fbe068040ccdf610b52802207253ac3fdd812 Mon Sep 17 00:00:00 2001 From: Richeek Das Date: Sun, 3 May 2026 19:28:38 -0400 Subject: [PATCH 5/8] add synchronous pub sub --- src/cortex/core/__init__.py | 14 +- src/cortex/core/node.py | 422 ++++++++++++++++++++++++++--- src/cortex/core/publisher.py | 14 +- src/cortex/core/subscriber.py | 272 ++++++------------- src/cortex/core/subscriber_base.py | 201 ++++++++++++++ src/cortex/core/sync_subscriber.py | 321 ++++++++++++++++++++++ 6 files changed, 1012 insertions(+), 232 deletions(-) create mode 100644 src/cortex/core/subscriber_base.py create mode 100644 src/cortex/core/sync_subscriber.py diff --git a/src/cortex/core/__init__.py b/src/cortex/core/__init__.py index 3661048..4c73d31 100644 --- a/src/cortex/core/__init__.py +++ b/src/cortex/core/__init__.py @@ -7,15 +7,27 @@ ) from cortex.core.node import Node from cortex.core.publisher import Publisher -from cortex.core.subscriber import Subscriber +from cortex.core.subscriber import AsyncSubscriber, Subscriber +from cortex.core.subscriber_base import ( + MessageFingerprintError, + SubscriberBase, + SubscriberStats, +) +from cortex.core.sync_subscriber import SyncMessageCallback, ThreadedSubscriber from cortex.core.types import AsyncCallback, MessageCallback __all__ = [ "Node", "Publisher", "Subscriber", + "AsyncSubscriber", + "ThreadedSubscriber", + "SubscriberBase", + "SubscriberStats", + "MessageFingerprintError", "AsyncCallback", "MessageCallback", + "SyncMessageCallback", "BaseExecutor", "AsyncExecutor", "RateExecutor", diff --git a/src/cortex/core/node.py b/src/cortex/core/node.py index 3bef724..609c6df 100644 --- a/src/cortex/core/node.py +++ b/src/cortex/core/node.py @@ -7,6 +7,9 @@ import asyncio import logging +import threading +from collections.abc import Callable +from typing import Literal import zmq import zmq.asyncio @@ -14,10 +17,14 @@ from cortex.core.executor import RateExecutor from cortex.core.publisher import Publisher from cortex.core.subscriber import Subscriber +from cortex.core.sync_subscriber import SyncMessageCallback, ThreadedSubscriber from cortex.core.types import AsyncCallback, MessageCallback from cortex.discovery.daemon import DEFAULT_DISCOVERY_ADDRESS from cortex.messages.base import Message +SubscriberMode = Literal["async", "sync"] +PublisherMode = Literal["async", "sync"] + logger = logging.getLogger("cortex.node") @@ -69,21 +76,35 @@ def __init__( # ZMQ async context self._context = zmq.asyncio.Context() - # Publishers and subscribers + # Publishers and subscribers (async and sync share one keyed dict) self._publishers: dict[str, Publisher] = {} - self._subscribers: dict[str, Subscriber] = {} + self._subscribers: dict[str, Subscriber | ThreadedSubscriber] = {} # Timer executors: (period, callback, RateExecutor) self._timers: list[tuple[float, AsyncCallback, RateExecutor]] = [] - # Subscribers with callbacks (need to run receive loops) + # Async subscribers with callbacks need their receive loop scheduled + # as an asyncio task; sync subscribers run on their own OS thread and + # are tracked separately so close() can join them deterministically. self._active_subscribers: list[Subscriber] = [] + self._sync_subscribers: list[ThreadedSubscriber] = [] + + # Independent zmq contexts created for sync-mode publishers; we + # own their lifecycle and term them on close(). + self._owned_pub_contexts: list[zmq.Context] = [] + + # Sync-side worker threads spawned via ``spawn_thread``. They share + # one ``threading.Event`` for shutdown so ``stop()`` can signal all + # of them at once and ``close()`` can join them deterministically. + self._sync_stop_event = threading.Event() + self._spawned_threads: list[threading.Thread] = [] # Tasks self._tasks: list[asyncio.Task] = [] # State self._running = False + self._stop_event: asyncio.Event | None = None logger.info(f"Created node: {name}") @@ -92,14 +113,23 @@ def create_publisher( topic_name: str, message_type: type[Message], queue_size: int = 10, + mode: PublisherMode = "async", ) -> Publisher: """ Create a publisher for a topic. Args: - topic_name: Name of the topic - message_type: Type of messages to publish - queue_size: Output queue size + topic_name: Name of the topic. + message_type: Type of messages to publish. + queue_size: Output queue size. + mode: ``'async'`` (default) shares the node's + :class:`zmq.asyncio.Context` (with a sync shadow). ``'sync'`` + gives the publisher its own independent + :class:`zmq.Context` so ``publish()`` does not bounce + through asyncio's IO threads — recommended for control-loop + publishers calling ``publish()`` from a non-asyncio thread. + Note that :class:`zmq.PUB` sockets are not thread-safe; + only call ``publish()`` from one thread per Publisher. Returns: Publisher instance @@ -108,17 +138,25 @@ def create_publisher( logger.warning(f"Publisher for {topic_name} already exists") return self._publishers[topic_name] + if mode == "async": + pub_context = self._context + elif mode == "sync": + pub_context = zmq.Context() + self._owned_pub_contexts.append(pub_context) + else: + raise ValueError(f"Unknown publisher mode: {mode!r}") + pub = Publisher( topic_name=topic_name, message_type=message_type, node_name=self.name, discovery_address=self.discovery_address, queue_size=queue_size, - context=self._context, + context=pub_context, ) self._publishers[topic_name] = pub - logger.info(f"Created publisher for {topic_name}") + logger.info("Created %s publisher for %s", mode, topic_name) return pub @@ -126,49 +164,176 @@ def create_subscriber( self, topic_name: str, message_type: type[Message], - callback: MessageCallback | None = None, + callback: MessageCallback | SyncMessageCallback | None = None, queue_size: int = 10, wait_for_topic: bool = True, topic_timeout: float = 30.0, - ) -> Subscriber: + mode: SubscriberMode = "async", + strict_fingerprint: bool | None = None, + cpu_affinity: list[int] | None = None, + sched_priority: int | None = None, + ) -> Subscriber | ThreadedSubscriber: """ Create a subscriber for a topic. Args: - topic_name: Name of the topic - message_type: Type of messages expected - callback: Async function to call when messages are received - queue_size: Input queue size - wait_for_topic: Whether to wait for the topic to be available - topic_timeout: Timeout for waiting for topic + topic_name: Name of the topic. + message_type: Type of messages expected. + callback: Function to call when messages arrive. ``mode='async'`` + expects an async callback; ``mode='sync'`` expects a plain + synchronous callable and rejects coroutine functions. + queue_size: Input queue size (ignored when ``conflate=True`` in + sync mode). + wait_for_topic: Whether to wait for the topic to be available. + topic_timeout: Timeout for waiting for topic, in seconds. + mode: ``'async'`` (default) routes through asyncio. ``'sync'`` + runs a dedicated OS thread with synchronous zmq + Poller — + use for control loops needing tight p99 latency. In sync + mode the default ``queue_size`` of ``1`` gives latest-wins + semantics suitable for control commands. + strict_fingerprint: When True, a fingerprint mismatch between + the topic and ``message_type`` raises ``MessageFingerprintError`` + instead of logging a warning. Default behavior is mode- + dependent: ``True`` in sync mode, ``False`` in async mode + (kept lax for backward compatibility). Pass ``True`` + explicitly on async control topics where silent type + confusion would corrupt downstream state. + cpu_affinity: Sync mode only. Pin the receive thread to the + given CPU set (Linux only; ignored elsewhere). + sched_priority: Sync mode only. Run the receive thread under + ``SCHED_FIFO`` at the given priority (Linux only; requires + ``CAP_SYS_NICE``). Failure is logged and the thread falls + back to the default scheduler. Returns: - Subscriber instance + ``Subscriber`` for ``mode='async'``, ``ThreadedSubscriber`` for + ``mode='sync'``. """ if topic_name in self._subscribers: logger.warning(f"Subscriber for {topic_name} already exists") return self._subscribers[topic_name] - sub = Subscriber( - topic_name=topic_name, - message_type=message_type, - callback=callback, - node_name=self.name, - discovery_address=self.discovery_address, - queue_size=queue_size, - wait_for_topic=wait_for_topic, - topic_timeout=topic_timeout, - context=self._context, - ) + if mode == "async": + # Async default: lax (logs and continues) for compatibility. + # Callers opt into strict via strict_fingerprint=True. + async_strict = False if strict_fingerprint is None else strict_fingerprint + sub: Subscriber | ThreadedSubscriber = Subscriber( + topic_name=topic_name, + message_type=message_type, + callback=callback, + node_name=self.name, + discovery_address=self.discovery_address, + queue_size=queue_size, + wait_for_topic=wait_for_topic, + topic_timeout=topic_timeout, + context=self._context, + strict_fingerprint=async_strict, + ) + if callback is not None: + self._active_subscribers.append(sub) + elif mode == "sync": + if callback is None: + raise ValueError("Sync subscribers require a callback") + if strict_fingerprint is False: + # Allow callers to relax sync mode if they really mean it, + # but the ThreadedSubscriber currently hard-codes strict. + # Surface the override expectation as a clear log line so + # the future relaxation is discoverable. + logger.info( + "strict_fingerprint=False ignored for sync subscriber " + "%s; sync mode is always strict.", + topic_name, + ) + sub = ThreadedSubscriber( + topic_name=topic_name, + message_type=message_type, + callback=callback, # type: ignore[arg-type] + node_name=self.name, + discovery_address=self.discovery_address, + queue_size=queue_size, + wait_for_topic=wait_for_topic, + topic_timeout=topic_timeout, + cpu_affinity=cpu_affinity, + sched_priority=sched_priority, + ) + self._sync_subscribers.append(sub) + else: + raise ValueError(f"Unknown subscriber mode: {mode!r}") self._subscribers[topic_name] = sub - logger.info(f"Created subscriber for {topic_name}") + logger.info("Created %s subscriber for %s", mode, topic_name) + return sub + + @property + def stop_event(self) -> threading.Event: + """Shared ``threading.Event`` set when the node is stopping. + + Sync code that opts into the node's lifecycle (publisher threads, + I/O loops, anything spawned via :meth:`spawn_thread`) should poll + ``node.stop_event.is_set()`` and exit promptly when it goes True. + Async code should not need this — it gets cancellation through the + normal asyncio task lifecycle. + """ + return self._sync_stop_event - # Add subscriber to active list if it has a callback - if callback is not None: - self._active_subscribers.append(sub) + def spawn_thread( + self, + target: Callable[..., None], + *args, + name: str | None = None, + **kwargs, + ) -> threading.Thread: + """Start an OS thread owned by this node. + + ``target`` is invoked as ``target(stop_event, *args, **kwargs)`` — + the first positional argument is always the node's shared + ``threading.Event``. The thread is started immediately, registered + for ``run()`` keepalive (so the asyncio side won't fall through), + and joined deterministically by :meth:`close`. + + This is the canonical way to drive sync-mode publishers, custom + polling loops, or any blocking I/O the node should manage. - return sub + Args: + target: The thread body. Must accept the stop event as its + first positional arg. + *args: Forwarded to ``target`` after the stop event. + name: Thread name; defaults to ``"-thread-"``. + **kwargs: Forwarded to ``target``. + + Returns: + The :class:`threading.Thread` instance, already running. + + Example: + ```python + def control_loop(stop, pub, rate_hz): + interval = 1.0 / rate_hz + next_t = time.perf_counter() + while not stop.is_set(): + ... + pub.publish(WheelCommand(...)) + next_t += interval + time.sleep(max(0, next_t - time.perf_counter())) + + pub = node.create_publisher(..., mode="sync") + node.spawn_thread(control_loop, pub, 1000.0) + await node.run() # blocks until Ctrl+C; close() joins the thread + ``` + """ + thread_name = name or f"{self.name}-thread-{len(self._spawned_threads)}" + stop = self._sync_stop_event + + def _runner() -> None: + try: + target(stop, *args, **kwargs) + except Exception: + logger.exception("Spawned thread %s crashed", thread_name) + + thread = threading.Thread(target=_runner, name=thread_name, daemon=False) + thread.start() + self._spawned_threads.append(thread) + logger.info("Spawned thread %s", thread_name) + return thread def create_timer( self, @@ -192,19 +357,40 @@ async def run(self) -> None: """ Run the node, processing messages and timers. - This is the main async entry point for the node. + This is the main async entry point for the node. Sync subscribers + are started on their own OS threads and run independently of the + asyncio event loop. """ self._running = True + # Start sync subscribers first — they don't depend on the loop and + # we want them receiving as early as possible. + for sub in self._sync_subscribers: + sub.start() + # Start all timer executors for _period, _callback, executor in self._timers: self._tasks.append(asyncio.create_task(executor.run())) - # Start all subscriber receive loops + # Start all async subscriber receive loops for sub in self._active_subscribers: self._tasks.append(asyncio.create_task(sub.run())) - logger.info(f"Node {self.name} running with {len(self._tasks)} tasks") + # If the node has no async work but does have sync work to manage + # (sync subscribers and/or threads spawned via spawn_thread), keep + # run() alive so the asyncio side does not fall through and trip + # the finally block. Released by stop() / close(). + has_sync_work = bool(self._sync_subscribers) or bool(self._spawned_threads) + if not self._tasks and has_sync_work: + self._stop_event = asyncio.Event() + self._tasks.append(asyncio.create_task(self._stop_event.wait())) + + logger.info( + "Node %s running with %d async tasks, %d sync threads", + self.name, + len(self._tasks), + len(self._sync_subscribers), + ) try: await asyncio.gather(*self._tasks, return_exceptions=True) @@ -217,6 +403,8 @@ async def run(self) -> None: executor.stop() for sub in self._active_subscribers: sub.stop() + for sub in self._sync_subscribers: + sub.stop() def stop(self) -> None: """Stop the node.""" @@ -228,6 +416,15 @@ def stop(self) -> None: executor.stop() for sub in self._active_subscribers: sub.stop() + for sub in self._sync_subscribers: + sub.stop() + + # Signal all spawned sync threads to wind down. + self._sync_stop_event.set() + + # Release the keepalive task (if any) so run() can return cleanly. + if self._stop_event is not None and not self._stop_event.is_set(): + self._stop_event.set() # Cancel all tasks for task in self._tasks: @@ -250,16 +447,33 @@ async def close(self) -> None: pub.close() self._publishers.clear() - # Close all subscribers + # Close all subscribers (joins sync receive threads) for sub in self._subscribers.values(): sub.close() self._subscribers.clear() + # Join spawned sync worker threads — stop() already set the event. + for thread in self._spawned_threads: + thread.join(timeout=2.0) + if thread.is_alive(): + logger.warning( + "Spawned thread %s did not exit within 2.0s", thread.name + ) + self._spawned_threads.clear() + self._timers.clear() self._active_subscribers.clear() + self._sync_subscribers.clear() - # Terminate ZMQ context + # Terminate ZMQ contexts: shared async first, then any sync contexts + # created for sync-mode publishers. self._context.term() + for ctx in self._owned_pub_contexts: + try: + ctx.term() + except Exception as exc: + logger.debug("Error terming sync publisher context: %s", exc) + self._owned_pub_contexts.clear() logger.info(f"Node {self.name} closed") @@ -286,6 +500,138 @@ def is_running(self) -> bool: """Check if the node is running.""" return self._running + # ------------------------------------------------------------------ + # Sync entry points — for nodes that only own sync work + # ------------------------------------------------------------------ + + def _has_async_work(self) -> bool: + """True if the node has anything that needs an asyncio loop.""" + return bool(self._timers) or bool(self._active_subscribers) + + def spin(self, timeout: float | None = None) -> None: + """Block the calling thread until the node is stopped. + + Sync counterpart to :meth:`run`. Use this when the node owns only + sync work — sync subscribers, threads spawned via + :meth:`spawn_thread`, or nothing more than a publisher driven from + the calling thread itself. No asyncio loop is created. + + Raises ``RuntimeError`` if the node has async timers or async + subscribers, since those need :meth:`run` to be scheduled. ``Ctrl+C`` + is delivered as :class:`KeyboardInterrupt` and propagates so the + caller can decide whether to swallow it. + + Args: + timeout: Optional cap (seconds) on how long to block. ``None`` + means "wait forever, until :meth:`stop` is called". + + Example: + ```python + node = Node("controller") + pub = node.create_publisher("/cmd", WheelCommand, mode="sync") + node.spawn_thread(control_loop, pub, 1000.0) + try: + node.spin() # blocks until Ctrl+C + except KeyboardInterrupt: + pass + finally: + node.close_sync() + ``` + """ + if self._has_async_work(): + raise RuntimeError( + "Node.spin() does not start an asyncio loop, but this node " + "has async timers/subscribers. Use `await node.run()` instead, " + "or remove the async work." + ) + + self._running = True + for sub in self._sync_subscribers: + sub.start() + + logger.info( + "Node %s spinning with %d sync subscribers and %d threads", + self.name, + len(self._sync_subscribers), + len(self._spawned_threads), + ) + try: + # ``Event.wait`` is interruptible by Ctrl+C on the main thread. + self._sync_stop_event.wait(timeout=timeout) + finally: + self._running = False + for sub in self._sync_subscribers: + sub.stop() + + def close_sync(self) -> None: + """Sync counterpart to :meth:`close`. + + Tears down sockets, joins spawned threads, and terms zmq contexts + without ever entering an asyncio loop. Safe to call from a plain + ``def main()`` — including from inside ``__exit__`` when the node + is used as a regular ``with`` context manager. + + Will refuse to run if the node has async timers/subscribers; for + those, use ``await node.close()``. + """ + if self._has_async_work(): + raise RuntimeError( + "Node.close_sync() cannot tear down async timers/subscribers. " + "Use `await node.close()` instead." + ) + + logger.info("Closing node %s (sync)", self.name) + + # Signal everyone, then synchronously join. + self._sync_stop_event.set() + for sub in self._sync_subscribers: + sub.stop() + self._running = False + + # Close publishers and (sync) subscribers. + for pub in self._publishers.values(): + pub.close() + self._publishers.clear() + for sub in self._subscribers.values(): + sub.close() + self._subscribers.clear() + + # Join spawned worker threads. + for thread in self._spawned_threads: + thread.join(timeout=2.0) + if thread.is_alive(): + logger.warning( + "Spawned thread %s did not exit within 2.0s", thread.name + ) + self._spawned_threads.clear() + + self._sync_subscribers.clear() + + # Term zmq contexts. The shared async context is never used by a + # purely-sync node, but term it anyway so leaks don't accumulate. + try: + self._context.term() + except Exception as exc: + logger.debug("Error terming async context: %s", exc) + for ctx in self._owned_pub_contexts: + try: + ctx.term() + except Exception as exc: + logger.debug("Error terming sync publisher context: %s", exc) + self._owned_pub_contexts.clear() + + logger.info("Node %s closed", self.name) + + # ------------------------------------------------------------------ + # Context managers + # ------------------------------------------------------------------ + + def __enter__(self) -> "Node": + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.close_sync() + async def __aenter__(self) -> "Node": return self diff --git a/src/cortex/core/publisher.py b/src/cortex/core/publisher.py index 8a2123d..78b86b6 100644 --- a/src/cortex/core/publisher.py +++ b/src/cortex/core/publisher.py @@ -121,6 +121,12 @@ def __init__( self._publish_count = 0 self._last_publish_time: float | None = None + # Per-publisher monotonic sequence counter. Subscribers infer drops + # by tracking gaps in this number per ``(publisher_node, fingerprint)`` + # pair, so it must be one-counter-per-publisher rather than the + # class-level counter that used to live on ``Message``. + self._sequence: int = 0 + # Initialize self._setup_socket() if auto_register: @@ -197,9 +203,13 @@ def publish(self, message: Message, flags: int = zmq.NOBLOCK) -> bool: try: # Send with topic name as first frame for filtering. # Message payload uses frame-aware transport to keep large buffers - # out of the metadata blob. + # out of the metadata blob. Sequence numbers come from this + # publisher (not the class-level fallback) so receivers can + # detect drops per-source. + sequence = self._sequence + self._sequence += 1 self._socket.send_multipart( - [self._topic_bytes, *message.to_frames()], + [self._topic_bytes, *message.to_frames(sequence=sequence)], flags=flags, ) diff --git a/src/cortex/core/subscriber.py b/src/cortex/core/subscriber.py index a50fae6..fe1c8ec 100644 --- a/src/cortex/core/subscriber.py +++ b/src/cortex/core/subscriber.py @@ -1,36 +1,45 @@ """ -Subscriber implementation for Cortex. +Asynchronous subscriber implementation. -Provides a ZeroMQ-based subscriber that queries the discovery daemon -and subscribes to topics using IPC sockets with asyncio. +Builds on :class:`cortex.core.subscriber_base.SubscriberBase` and pulls +frames off the wire through ``zmq.asyncio``. Use this for the common +case — telemetry, dashboards, anything that lives inside an asyncio +event loop. For control-loop topics that need <100 µs p99, see +:class:`cortex.core.sync_subscriber.ThreadedSubscriber`. """ import asyncio import contextlib import logging import time +from time import perf_counter_ns from typing import Any import zmq import zmq.asyncio from cortex.core.executor import AsyncExecutor +from cortex.core.subscriber_base import ( + MessageFingerprintError, + SubscriberBase, + decode_frames, + update_stats_for_header, +) from cortex.core.types import MessageCallback -from cortex.discovery.client import DiscoveryClient from cortex.discovery.daemon import DEFAULT_DISCOVERY_ADDRESS -from cortex.discovery.protocol import TopicInfo from cortex.messages.base import Message, MessageHeader +from cortex.utils import tracing logger = logging.getLogger("cortex.subscriber") -class Subscriber: - """Receives typed messages on a topic from a ZMQ SUB socket. +class Subscriber(SubscriberBase): + """Async subscriber: receives typed messages on a topic from a ZMQ SUB socket. On construction, the subscriber performs a non-blocking lookup against the discovery daemon. If the topic already has a publisher it connects - immediately; otherwise it defers and retries with a polling wait inside - :meth:`run`. + immediately; otherwise it defers and retries with an async polling + wait inside :meth:`run`. When constructed with a ``callback`` the subscriber drives its own receive loop (one task, one callback at a time — see @@ -38,16 +47,6 @@ class Subscriber: subscriber is passive and the caller polls via :meth:`receive`. Always create via :meth:`Node.create_subscriber`. - - Example: - ```python - async def callback(msg, header): - print(f"Received: {msg}") - - async with Node("my_node") as node: - node.create_subscriber("/topic", MyMsg, callback) - await node.run() - ``` """ def __init__( @@ -61,266 +60,157 @@ def __init__( wait_for_topic: bool = True, topic_timeout: float = 600.0, context: zmq.asyncio.Context | None = None, + strict_fingerprint: bool = False, ): - """ - Initialize the subscriber. - - Args: - topic_name: Name of the topic to subscribe to - message_type: Type of message expected - callback: Async callback function for received messages - node_name: Name of the node creating this subscriber - discovery_address: Address of the discovery daemon - queue_size: High-water mark for incoming messages - wait_for_topic: Whether to wait for topic to be available - topic_timeout: Timeout for waiting for topic (seconds) - context: Shared ZMQ async context from Node - """ - self.topic_name = topic_name - self.message_type = message_type + super().__init__( + topic_name=topic_name, + message_type=message_type, + node_name=node_name, + discovery_address=discovery_address, + topic_timeout=topic_timeout, + wait_for_topic=wait_for_topic, + strict_fingerprint=strict_fingerprint, + ) self._callback = callback - self.node_name = node_name - self.discovery_address = discovery_address self.queue_size = queue_size - self.topic_timeout = topic_timeout - self._wait_for_topic = wait_for_topic - - # Connection info - self._topic_info: TopicInfo | None = None - self._connected = False - # ZMQ setup - context provided by Node self._context: zmq.asyncio.Context = context or zmq.asyncio.Context() self._socket: zmq.asyncio.Socket | None = None - # Discovery client - self._discovery_client: DiscoveryClient | None = DiscoveryClient( - discovery_address=self.discovery_address - ) - - # Statistics - self._receive_count = 0 + # Compatibility shim: legacy code reads ``_last_receive_time`` directly. self._last_receive_time: float | None = None - # Executor for receive loop self._executor: AsyncExecutor | None = None # Try non-blocking connect (will succeed if topic already exists) - self._connect() - - def _connect(self) -> bool: - """ - Connect to the topic (non-blocking lookup only). - - Returns: - True if connected successfully - """ - try: - # Non-blocking lookup only - self._topic_info = self._discovery_client.lookup_topic(self.topic_name) - return self._finalize_connection() - - except Exception as e: - logger.error(f"Failed to connect to topic: {e}") - return False + if self._lookup_nonblocking(): + self._setup_socket(self._topic_info.address) + self._connected = True + logger.info( + "Connected to topic %s at %s", self.topic_name, self._topic_info.address + ) + else: + logger.warning( + "Topic %s not found yet, will retry in run()", self.topic_name + ) async def _async_connect(self) -> bool: - """ - Async connect to the topic, waiting if necessary. - - Uses DiscoveryClient.wait_for_topic_async for non-blocking wait. - - Returns: - True if connected successfully - """ + """Async wait for the topic and connect once available.""" if self._connected: return True - try: if self._wait_for_topic: - logger.info(f"Waiting for topic {self.topic_name}...") + logger.info("Waiting for topic %s...", self.topic_name) self._topic_info = await self._discovery_client.wait_for_topic_async( self.topic_name, timeout=self.topic_timeout ) else: self._topic_info = self._discovery_client.lookup_topic(self.topic_name) - - return self._finalize_connection() - - except Exception as e: - logger.error(f"Failed to connect to topic: {e}") + except Exception as exc: + logger.error("Failed to connect to topic: %s", exc) return False - def _finalize_connection(self) -> bool: - """ - Finalize connection after topic info is obtained. - - Returns: - True if connected successfully - """ - if self._topic_info: - # Verify message type - if self._topic_info.fingerprint != self.message_type.fingerprint(): - logger.warning( - f"Message type mismatch for {self.topic_name}: " - f"expected {self.message_type.__name__}, " - f"got {self._topic_info.message_type}" - ) - - # Connect to the publisher - self._setup_socket(self._topic_info.address) - self._connected = True - logger.info( - f"Connected to topic {self.topic_name} at {self._topic_info.address}" - ) - return True - else: - logger.warning( - f"Topic {self.topic_name} not found yet, will retry in run()" - ) + if self._topic_info is None: return False + try: + self._validate_fingerprint(self._topic_info) + except MessageFingerprintError: + raise + + self._setup_socket(self._topic_info.address) + self._connected = True + logger.info( + "Connected to topic %s at %s", self.topic_name, self._topic_info.address + ) + return True def _setup_socket(self, address: str) -> None: - """Set up the ZMQ subscriber socket.""" + """Create the SUB socket, set HWM/topic filter, and connect.""" self._socket = self._context.socket(zmq.SUB) - - # Set high-water mark self._socket.setsockopt(zmq.RCVHWM, self.queue_size) - - # Set linger to 0 for immediate shutdown self._socket.setsockopt(zmq.LINGER, 0) - - # Subscribe to topic self._socket.setsockopt_string(zmq.SUBSCRIBE, self.topic_name) - - # Connect to publisher self._socket.connect(address) - - logger.debug(f"Subscriber socket connected to {address}") + logger.debug("Subscriber socket connected to %s", address) async def receive(self) -> tuple[Message, MessageHeader] | None: - """ - Receive a single message (async). - - Returns: - Tuple of (message, header) or None if not connected - """ if not self._connected or self._socket is None: return None try: - # Receive multipart message [topic, header, metadata, *buffers] - frames = await self._socket.recv_multipart(copy=False) + with tracing.stage("async.recv_multipart"): + frames = await self._socket.recv_multipart(copy=False) - if len(frames) < 2: - logger.warning(f"Unexpected frame count: {len(frames)}") + with tracing.stage("async.decode"): + decoded = decode_frames(self.message_type, frames) + if decoded is None: return None + message, header = decoded - payload_frames = frames[1:] - if len(payload_frames) == 1: - raw_payload = ( - memoryview(payload_frames[0].buffer) - if hasattr(payload_frames[0], "buffer") - else payload_frames[0] - ) - message, header = self.message_type.from_bytes(raw_payload) - else: - message, header = self.message_type.from_frames(payload_frames) - - self._receive_count += 1 + update_stats_for_header(self.stats, header, perf_counter_ns()) self._last_receive_time = time.time() - return message, header except asyncio.CancelledError: raise - except Exception as e: - logger.error(f"Failed to receive message: {e}") + except Exception as exc: + logger.error("Failed to receive message: %s", exc) return None async def _receive_and_callback(self) -> Any: - """Receive a message and invoke the callback.""" result = await self.receive() - if result: - message, header = result + if result is None: + return None + message, header = result + with tracing.stage("async.callback"): return await self._callback(message, header) def start(self) -> None: - """Start the subscriber receive loop.""" if self._executor: self._executor.start() def stop(self) -> None: - """Stop the subscriber receive loop.""" if self._executor: self._executor.stop() @property def running(self) -> bool: - """Check if the subscriber is running.""" return self._executor.running if self._executor else False async def run(self) -> None: - """ - Run the subscriber's async receive loop. - - Continuously receives messages and calls the callback. - Uses AsyncExecutor for consistent execution pattern. - """ if self._callback is None: - logger.warning(f"No callback set for subscriber {self.topic_name}") + logger.warning("No callback set for subscriber %s", self.topic_name) return if not self._connected and not await self._async_connect(): - logger.error(f"Failed to connect subscriber for {self.topic_name}") + logger.error("Failed to connect subscriber for %s", self.topic_name) return - logger.info(f"Subscriber for {self.topic_name} running") - + logger.info("Subscriber for %s running", self.topic_name) self._executor = AsyncExecutor(self._receive_and_callback) await self._executor.run() - - logger.info(f"Subscriber for {self.topic_name} stopped") - - @property - def is_connected(self) -> bool: - """Check if subscriber is connected to a publisher.""" - return self._connected - - @property - def topic_info(self) -> TopicInfo | None: - """Get information about the connected topic.""" - return self._topic_info - - @property - def receive_count(self) -> int: - """Get the number of messages received.""" - return self._receive_count + logger.info("Subscriber for %s stopped", self.topic_name) @property def last_receive_time(self) -> float | None: - """Get the timestamp of the last received message.""" return self._last_receive_time def close(self) -> None: - """Close the subscriber and release resources.""" - logger.info(f"Closing subscriber for {self.topic_name}") - - # Stop the executor + logger.info("Closing subscriber for %s", self.topic_name) if self._executor: self._executor.stop() self._executor = None - # Close discovery client (best effort - daemon may be gone) - if self._discovery_client: - with contextlib.suppress(Exception): - self._discovery_client.close() - self._discovery_client = None + self._close_discovery() - # Close socket if self._socket: with contextlib.suppress(Exception): self._socket.close() self._socket = None self._connected = False + + +# Public alias — callers that opt in to the explicit naming get it; the +# default ``Subscriber`` import path stays where it has always been. +AsyncSubscriber = Subscriber diff --git a/src/cortex/core/subscriber_base.py b/src/cortex/core/subscriber_base.py new file mode 100644 index 0000000..7c590a2 --- /dev/null +++ b/src/cortex/core/subscriber_base.py @@ -0,0 +1,201 @@ +""" +Shared subscriber primitives. + +The async subscriber (``cortex.core.subscriber.Subscriber``) and the +threaded subscriber (``cortex.core.sync_subscriber.ThreadedSubscriber``) +diverge in *how* they pull frames off the wire — one ``await``-s a +``zmq.asyncio`` socket, the other blocks an OS thread on a +``zmq.Poller``. Everything **around** that — discovery lookup, type +fingerprint validation, frame decoding, stats, and per-publisher +sequence-gap detection — is identical and lives here. + +This module owns no zmq sockets. It is pure dataflow + bookkeeping. +""" + +import logging +from dataclasses import dataclass, field + +from cortex.discovery.client import DiscoveryClient +from cortex.discovery.daemon import DEFAULT_DISCOVERY_ADDRESS +from cortex.discovery.protocol import TopicInfo +from cortex.messages.base import Message, MessageHeader + +logger = logging.getLogger("cortex.subscriber") + + +class MessageFingerprintError(RuntimeError): + """Raised when an incoming topic's fingerprint doesn't match the expected type.""" + + +@dataclass +class SubscriberStats: + """Per-subscriber counters; updated by the receive loop.""" + + received: int = 0 + dropped_estimated: int = 0 + last_recv_perf_ns: int | None = None + last_sequence_by_publisher: dict[int, int] = field(default_factory=dict) + + +def decode_frames( + message_type: type[Message], frames: list[object] +) -> tuple[Message, MessageHeader] | None: + """Decode ``[topic, header, metadata, *buffers]`` into a typed message. + + Returns ``None`` and logs a warning on malformed input rather than + raising — the receive loop should not die on a single bad frame. + """ + if len(frames) < 2: + logger.warning("Unexpected frame count: %d", len(frames)) + return None + + payload_frames = frames[1:] + try: + if len(payload_frames) == 1: + raw = ( + memoryview(payload_frames[0].buffer) + if hasattr(payload_frames[0], "buffer") + else payload_frames[0] + ) + return message_type.from_bytes(raw) + return message_type.from_frames(payload_frames) + except Exception as exc: + logger.error("Decode failed: %s", exc) + return None + + +def update_stats_for_header( + stats: SubscriberStats, header: MessageHeader, now_perf_ns: int +) -> int: + """Bump receive counters and infer dropped messages from sequence gaps. + + Each ``Subscriber`` connects to exactly one topic, and each topic has a + single publisher (today), so keying by ``fingerprint`` is effectively + keying by ``(publisher, type)``. When multi-publisher fan-in lands we + will extend the key to ``(publisher_node, fingerprint)``. + + Returns the number of dropped messages inferred from this header. + """ + stats.received += 1 + stats.last_recv_perf_ns = now_perf_ns + + last = stats.last_sequence_by_publisher.get(header.fingerprint) + stats.last_sequence_by_publisher[header.fingerprint] = header.sequence + if last is None: + return 0 + gap = header.sequence - last - 1 + if gap > 0: + stats.dropped_estimated += gap + return gap + return 0 + + +class SubscriberBase: + """Discovery + connection scaffolding shared by all subscriber implementations. + + Subclasses are responsible only for the I/O loop. They set + :attr:`_topic_info` via :meth:`_lookup_blocking` (or the async variant + used by the asyncio subscriber) and then open whatever socket they + prefer against :attr:`_topic_info.address`. + """ + + def __init__( + self, + topic_name: str, + message_type: type[Message], + node_name: str = "anonymous", + discovery_address: str = DEFAULT_DISCOVERY_ADDRESS, + topic_timeout: float = 600.0, + wait_for_topic: bool = True, + strict_fingerprint: bool = False, + ): + self.topic_name = topic_name + self.message_type = message_type + self.node_name = node_name + self.discovery_address = discovery_address + self.topic_timeout = topic_timeout + self._wait_for_topic = wait_for_topic + self._strict_fingerprint = strict_fingerprint + + self._topic_info: TopicInfo | None = None + self._connected = False + self._discovery_client: DiscoveryClient | None = DiscoveryClient( + discovery_address=self.discovery_address + ) + self.stats = SubscriberStats() + + # ------------------------------------------------------------------ discovery + + def _validate_fingerprint(self, info: TopicInfo) -> None: + """Refuse or warn on type mismatch. + + Strict mode raises; lax mode preserves historical + warning-and-continue behavior (kept until callers opt in). + """ + expected = self.message_type.fingerprint() + if info.fingerprint == expected: + return + msg = ( + f"Message type mismatch for {self.topic_name}: " + f"expected {self.message_type.__name__} (fp={expected:#018x}), " + f"got {info.message_type} (fp={info.fingerprint:#018x})" + ) + if self._strict_fingerprint: + raise MessageFingerprintError(msg) + logger.warning(msg) + + def _lookup_nonblocking(self) -> bool: + """One-shot lookup. Returns True on success.""" + try: + self._topic_info = self._discovery_client.lookup_topic(self.topic_name) + except Exception as exc: + logger.error("Failed to lookup topic: %s", exc) + return False + if self._topic_info is None: + return False + self._validate_fingerprint(self._topic_info) + return True + + def _lookup_blocking(self, poll_interval: float = 0.5) -> bool: + """Block-and-poll for the topic up to :attr:`topic_timeout`.""" + try: + self._topic_info = self._discovery_client.wait_for_topic( + self.topic_name, + timeout=self.topic_timeout, + poll_interval=poll_interval, + ) + except Exception as exc: + logger.error("Failed to wait for topic: %s", exc) + return False + if self._topic_info is None: + return False + self._validate_fingerprint(self._topic_info) + return True + + # ------------------------------------------------------------------ properties + + @property + def is_connected(self) -> bool: + return self._connected + + @property + def topic_info(self) -> TopicInfo | None: + return self._topic_info + + @property + def receive_count(self) -> int: + return self.stats.received + + @property + def dropped_count(self) -> int: + return self.stats.dropped_estimated + + # ------------------------------------------------------------------ shutdown + + def _close_discovery(self) -> None: + if self._discovery_client is not None: + try: + self._discovery_client.close() + except Exception as exc: # best-effort + logger.debug("Discovery close error: %s", exc) + self._discovery_client = None diff --git a/src/cortex/core/sync_subscriber.py b/src/cortex/core/sync_subscriber.py new file mode 100644 index 0000000..3e24485 --- /dev/null +++ b/src/cortex/core/sync_subscriber.py @@ -0,0 +1,321 @@ +""" +Thread-backed synchronous subscriber for low-latency control topics. + +The async subscriber goes through ``zmq.asyncio`` and the asyncio event +loop — that costs ~4 ``await`` boundaries per message and limits p99 to +roughly 1 ms even on inproc/IPC. For control loops at >100 Hz where +jitter matters, this implementation runs a dedicated OS thread that +pulls frames synchronously through a ``zmq.Poller`` and dispatches to a +**sync** user callback inline. + +On a free-threaded build of CPython (``python3.14t``, PEP 779) the +receive thread does not contend with the asyncio thread for the GIL, +which is what makes the <100 µs p99 target reachable. The class works +on stock CPython too — just with a higher floor — and emits a one-line +runtime hint when it detects a GIL-enabled interpreter so users know +the upgrade exists. + +Public API mirrors :class:`cortex.core.subscriber.Subscriber` where it +makes sense (topic name, message type, callback, queue size, discovery +plumbing) but diverges in two important ways: + +1. The callback **must be synchronous**. Awaiting on a worker thread + would re-introduce the per-await scheduling cost we're trying to + escape. A clear :class:`TypeError` is raised at construction time if + a coroutine function is passed. +2. By default ``queue_size=1``. For control commands you want the latest + message, never a queued backlog. Note that ZMQ's ``CONFLATE`` socket + option *cannot* be used here — it strips multipart messages, and + Cortex publishers always send multipart frames. ``RCVHWM=1`` gives + the equivalent "drop old, keep newest" effect on the receiver while + preserving the wire format. +""" + +import contextlib +import inspect +import logging +import os +import threading +from collections.abc import Callable +from time import perf_counter_ns + +import zmq + +from cortex.core.subscriber_base import ( + SubscriberBase, + decode_frames, + update_stats_for_header, +) +from cortex.discovery.daemon import DEFAULT_DISCOVERY_ADDRESS +from cortex.messages.base import Message, MessageHeader +from cortex.utils import tracing +from cortex.utils.runtime import is_free_threaded, low_latency_advisory + +logger = logging.getLogger("cortex.subscriber.sync") + +SyncMessageCallback = Callable[[Message, MessageHeader], None] +"""A blocking callback invoked on the receive thread — must not return a coroutine.""" + + +class ThreadedSubscriber(SubscriberBase): + """Synchronous SUB-side receive loop running on a dedicated OS thread. + + Lifecycle: + + * Construction blocks on a discovery lookup (with optional wait), opens + a fresh sync ``zmq.Context``, and connects the SUB socket. + Construction does **not** start the worker thread. + * :meth:`start` spins up the thread; the thread blocks in + ``poller.poll(timeout_ms)`` between messages so shutdown is prompt. + * :meth:`stop` signals the thread and joins it (with a 1 s default + grace period); :meth:`close` calls :meth:`stop` and tears down zmq. + + The class is reentrant-safe in the trivial sense that ``start`` / + ``stop`` / ``close`` are idempotent. ``zmq.SUB`` itself is single- + threaded; do not call :meth:`receive` from another thread while the + worker is running. + """ + + _POLL_TIMEOUT_MS = 50 # bound on shutdown latency + _JOIN_TIMEOUT_S = 1.0 + + def __init__( + self, + topic_name: str, + message_type: type[Message], + callback: SyncMessageCallback, + node_name: str = "anonymous", + discovery_address: str = DEFAULT_DISCOVERY_ADDRESS, + queue_size: int = 1, + wait_for_topic: bool = True, + topic_timeout: float = 30.0, + cpu_affinity: list[int] | None = None, + sched_priority: int | None = None, + ): + # Strict fingerprint by default in sync mode: callers picked sync + # for predictability, so silent type-confusion is unacceptable. + super().__init__( + topic_name=topic_name, + message_type=message_type, + node_name=node_name, + discovery_address=discovery_address, + topic_timeout=topic_timeout, + wait_for_topic=wait_for_topic, + strict_fingerprint=True, + ) + + if inspect.iscoroutinefunction(callback): + raise TypeError( + "ThreadedSubscriber requires a *synchronous* callback. " + "Pass an async callback through Node.create_subscriber(mode='async') " + "instead." + ) + + self._callback = callback + self._queue_size = queue_size + self._cpu_affinity = cpu_affinity + self._sched_priority = sched_priority + + self._context = zmq.Context() + self._socket: zmq.Socket | None = None + self._poller: zmq.Poller | None = None + + self._thread: threading.Thread | None = None + self._stop_event = threading.Event() + self._started = False + + # Resolve the topic and open the socket up front, so construction + # failures surface to the caller (not the worker thread). + if not self._lookup_blocking(): + raise TimeoutError( + f"Topic {self.topic_name} not registered with discovery within " + f"{self.topic_timeout}s" + ) + self._setup_socket(self._topic_info.address) + self._connected = True + + advisory = low_latency_advisory() + if advisory: + logger.info(advisory) + + # ------------------------------------------------------------------ socket + + def _setup_socket(self, address: str) -> None: + sock = self._context.socket(zmq.SUB) + # RCVHWM gives "drop old, keep newest" semantics on overflow — the + # right default for control topics. We deliberately do NOT use + # ZMQ_CONFLATE; it is incompatible with multipart messages and + # would silently strip every frame except the last. + sock.setsockopt(zmq.RCVHWM, max(self._queue_size, 1)) + sock.setsockopt(zmq.LINGER, 0) + sock.setsockopt_string(zmq.SUBSCRIBE, self.topic_name) + sock.connect(address) + self._socket = sock + + poller = zmq.Poller() + poller.register(sock, zmq.POLLIN) + self._poller = poller + + # ------------------------------------------------------------------ thread + + def start(self) -> None: + """Spin up the receive thread (idempotent).""" + if self._started: + return + if not self._connected: + raise RuntimeError( + f"Subscriber {self.topic_name} is not connected; cannot start" + ) + self._stop_event.clear() + self._thread = threading.Thread( + target=self._run, + name=f"cortex-sub-{self.topic_name}", + daemon=False, + ) + self._thread.start() + self._started = True + + def stop(self, timeout: float | None = None) -> None: + """Signal the worker and join it (idempotent).""" + if not self._started: + return + self._stop_event.set() + if self._thread is not None: + self._thread.join(timeout if timeout is not None else self._JOIN_TIMEOUT_S) + if self._thread.is_alive(): + logger.warning( + "ThreadedSubscriber for %s did not stop within %.1fs", + self.topic_name, + timeout if timeout is not None else self._JOIN_TIMEOUT_S, + ) + self._thread = None + self._started = False + + @property + def running(self) -> bool: + return self._started and self._thread is not None and self._thread.is_alive() + + # ------------------------------------------------------------------ loop + + def _apply_thread_tuning(self) -> None: + """Apply CPU affinity and (if requested) real-time scheduling. + + Both are best-effort: we log a warning and continue on any failure + (missing capability, non-Linux platform, EPERM). The receive loop + works without either knob — they only buy lower jitter. + """ + if self._cpu_affinity is not None: + sched_setaffinity = getattr(os, "sched_setaffinity", None) + if sched_setaffinity is None: + logger.warning( + "CPU affinity requested but not supported on this platform" + ) + else: + try: + sched_setaffinity(0, set(self._cpu_affinity)) + logger.info( + "Pinned receive thread to CPUs %s", + sorted(self._cpu_affinity), + ) + except OSError as exc: + logger.warning("Failed to set CPU affinity: %s", exc) + + if self._sched_priority is not None: + sched_setscheduler = getattr(os, "sched_setscheduler", None) + sched_param_cls = getattr(os, "sched_param", None) + sched_fifo = getattr(os, "SCHED_FIFO", None) + if ( + sched_setscheduler is None + or sched_param_cls is None + or sched_fifo is None + ): + logger.warning( + "SCHED_FIFO requested but not supported on this platform" + ) + return + try: + sched_setscheduler(0, sched_fifo, sched_param_cls(self._sched_priority)) + logger.info( + "Receive thread set to SCHED_FIFO at priority %d", + self._sched_priority, + ) + except (OSError, PermissionError) as exc: + # Most common failure mode: missing CAP_SYS_NICE. Don't bail — + # the receive loop still works on the default scheduler. + logger.warning( + "Failed to set SCHED_FIFO priority %d (need CAP_SYS_NICE): %s", + self._sched_priority, + exc, + ) + + def _run(self) -> None: + """Worker thread entry point.""" + self._apply_thread_tuning() + if is_free_threaded(): + logger.debug( + "Sync subscriber %s on free-threaded interpreter (no GIL contention)", + self.topic_name, + ) + + sock = self._socket + poller = self._poller + assert sock is not None and poller is not None + + timeout_ms = self._POLL_TIMEOUT_MS + try: + while not self._stop_event.is_set(): + events = dict(poller.poll(timeout=timeout_ms)) + if sock not in events: + continue + + with tracing.stage("sync.recv_multipart"): + try: + frames = sock.recv_multipart(copy=False, flags=zmq.NOBLOCK) + except zmq.Again: + continue + + with tracing.stage("sync.decode"): + decoded = decode_frames(self.message_type, frames) + if decoded is None: + continue + message, header = decoded + + update_stats_for_header(self.stats, header, perf_counter_ns()) + + with tracing.stage("sync.callback"): + try: + self._callback(message, header) + except Exception as exc: + # Don't kill the receive thread on a user error. + logger.exception( + "Callback raised on topic %s: %s", self.topic_name, exc + ) + except Exception: + logger.exception( + "ThreadedSubscriber receive loop crashed for %s", self.topic_name + ) + + # ------------------------------------------------------------------ shutdown + + def close(self) -> None: + """Stop the worker and tear down zmq state (idempotent).""" + logger.info("Closing sync subscriber for %s", self.topic_name) + self.stop() + + self._close_discovery() + + if self._socket is not None: + with contextlib.suppress(Exception): + self._socket.close() + self._socket = None + self._poller = None + + with contextlib.suppress(Exception): + self._context.term() + self._connected = False + + # ------------------------------------------------------------------ stats + + @property + def is_running(self) -> bool: + return self.running From 1cee7352ebd7cee346092bc00f76e047824f13e5 Mon Sep 17 00:00:00 2001 From: Richeek Das Date: Sun, 3 May 2026 19:29:03 -0400 Subject: [PATCH 6/8] tests for sync and inproc transfers --- tests/test_pubsub.py | 32 ++ tests/test_sync_subscriber.py | 539 ++++++++++++++++++++++++++++++++++ 2 files changed, 571 insertions(+) create mode 100644 tests/test_sync_subscriber.py diff --git a/tests/test_pubsub.py b/tests/test_pubsub.py index e97afed..4449f23 100644 --- a/tests/test_pubsub.py +++ b/tests/test_pubsub.py @@ -77,6 +77,38 @@ def test_publisher_publishes_messages(self, discovery_daemon, discovery_address) pub.close() + def test_per_publisher_sequence_counter(self, discovery_daemon, discovery_address): + """Two publishers of the same message type must not interleave sequences. + + Each Publisher owns its own counter; the counter starts at 0 and + advances by one per successful publish, independently of any other + Publisher in the process. + """ + pub_a = Publisher( + topic_name="/test/seq_a", + message_type=SampleMessage, + node_name="seq_node_a", + discovery_address=discovery_address, + ) + pub_b = Publisher( + topic_name="/test/seq_b", + message_type=SampleMessage, + node_name="seq_node_b", + discovery_address=discovery_address, + ) + + for i in range(5): + assert pub_a.publish(SampleMessage(value=i, name="a")) + for i in range(3): + assert pub_b.publish(SampleMessage(value=i, name="b")) + + # ``_sequence`` is the next-to-emit value, so after N publishes it == N. + assert pub_a._sequence == 5 + assert pub_b._sequence == 3 + + pub_a.close() + pub_b.close() + def test_publisher_type_checking(self, discovery_daemon, discovery_address): """Publisher should reject wrong message types.""" pub = Publisher( diff --git a/tests/test_sync_subscriber.py b/tests/test_sync_subscriber.py new file mode 100644 index 0000000..fb65e5d --- /dev/null +++ b/tests/test_sync_subscriber.py @@ -0,0 +1,539 @@ +""" +Tests for ThreadedSubscriber — the synchronous, OS-thread-backed +low-latency subscriber path. + +These tests share the existing ``discovery_daemon`` fixture from +``conftest.py`` but spin up subscribers and publishers directly without +going through ``Node`` so we can isolate the threaded receive logic from +asyncio. There is one separate test (``test_node_mode_dispatch``) that +exercises the ``Node.create_subscriber(mode='sync')`` plumbing. +""" + +import asyncio +import threading +import time +from dataclasses import dataclass + +import pytest + +from cortex.core.publisher import Publisher +from cortex.core.subscriber_base import MessageFingerprintError +from cortex.core.sync_subscriber import ThreadedSubscriber +from cortex.messages.base import Message + + +@dataclass +class CmdMessage(Message): + """Tiny control-style payload.""" + + seq: int + value: float + + +@dataclass +class OtherMessage(Message): + """A second type with a different fingerprint.""" + + payload: str + + +def _wait_for(condition, timeout: float = 5.0, interval: float = 0.01) -> bool: + """Spin until ``condition()`` is truthy or ``timeout`` elapses.""" + end = time.monotonic() + timeout + while time.monotonic() < end: + if condition(): + return True + time.sleep(interval) + return False + + +# --------------------------------------------------------------------------- +# Construction-time contract checks (no network needed) +# --------------------------------------------------------------------------- + + +def test_rejects_async_callback(discovery_daemon, discovery_address): + """Sync mode must refuse coroutine callbacks loudly at construction.""" + pub = Publisher( + topic_name="/test/sync_async_cb", + message_type=CmdMessage, + node_name="pub_async_cb", + discovery_address=discovery_address, + ) + time.sleep(0.1) + + async def coro_cb(_msg, _hdr): + pass + + try: + with pytest.raises(TypeError, match="synchronous"): + ThreadedSubscriber( + topic_name="/test/sync_async_cb", + message_type=CmdMessage, + callback=coro_cb, + discovery_address=discovery_address, + topic_timeout=2.0, + ) + finally: + pub.close() + + +def test_topic_timeout_raises(discovery_daemon, discovery_address): + """If the topic never registers, construction should raise TimeoutError.""" + with pytest.raises(TimeoutError): + ThreadedSubscriber( + topic_name="/test/sync_never_registered", + message_type=CmdMessage, + callback=lambda _m, _h: None, + discovery_address=discovery_address, + topic_timeout=0.5, + ) + + +def test_fingerprint_mismatch_is_fatal(discovery_daemon, discovery_address): + """Sync mode is strict about types — register one type, subscribe to another.""" + pub = Publisher( + topic_name="/test/sync_fp_mismatch", + message_type=OtherMessage, + node_name="pub_fp_mismatch", + discovery_address=discovery_address, + ) + time.sleep(0.2) + + try: + with pytest.raises(MessageFingerprintError): + ThreadedSubscriber( + topic_name="/test/sync_fp_mismatch", + message_type=CmdMessage, + callback=lambda _m, _h: None, + discovery_address=discovery_address, + topic_timeout=2.0, + ) + finally: + pub.close() + + +# --------------------------------------------------------------------------- +# Receive path +# --------------------------------------------------------------------------- + + +def _warmup(pub: Publisher, sub: ThreadedSubscriber, timeout: float = 3.0) -> None: + """Pump sentinel messages until the SUB-PUB handshake completes. + + ZMQ SUB has slow-joiner semantics; messages published before the + subscriber's filter has reached the publisher are silently dropped. + Tests use sequence numbers >= 100000 for the warmup so callers can + skip them. + """ + deadline = time.monotonic() + timeout + seq = 100000 + initial = sub.receive_count + while sub.receive_count == initial and time.monotonic() < deadline: + pub.publish(CmdMessage(seq=seq, value=0.0)) + seq += 1 + time.sleep(0.01) + if sub.receive_count == initial: + raise RuntimeError("SUB-PUB handshake never completed") + + +def test_roundtrip(discovery_daemon, discovery_address): + """End-to-end: sync subscriber receives messages from a sync publisher.""" + pub = Publisher( + topic_name="/test/sync_roundtrip", + message_type=CmdMessage, + node_name="pub_roundtrip", + discovery_address=discovery_address, + ) + time.sleep(0.2) + + received: list[CmdMessage] = [] + received_lock = threading.Lock() + + def cb(msg, _hdr): + with received_lock: + received.append(msg) + + sub = ThreadedSubscriber( + topic_name="/test/sync_roundtrip", + message_type=CmdMessage, + callback=cb, + discovery_address=discovery_address, + # Plenty of headroom so we don't lose intermediate messages on bursts. + queue_size=64, + topic_timeout=2.0, + ) + sub.start() + + try: + _warmup(pub, sub) + baseline = len(received) + for i in range(5): + assert pub.publish(CmdMessage(seq=i, value=float(i))) + time.sleep(0.05) + + assert _wait_for(lambda: len(received) - baseline >= 5, timeout=3.0), ( + f"only got {len(received) - baseline} of 5 after warmup" + ) + + # Filter out warmup sentinels (seq >= 100000) + post_warmup = [m for m in received[baseline:] if m.seq < 100000] + seqs = sorted(m.seq for m in post_warmup[:5]) + assert seqs == [0, 1, 2, 3, 4] + assert sub.receive_count >= 5 + finally: + sub.close() + pub.close() + + +def test_clean_shutdown_when_idle(discovery_daemon, discovery_address): + """No messages, no problem: stop() should join inside the timeout.""" + pub = Publisher( + topic_name="/test/sync_idle", + message_type=CmdMessage, + node_name="pub_idle", + discovery_address=discovery_address, + ) + time.sleep(0.2) + + sub = ThreadedSubscriber( + topic_name="/test/sync_idle", + message_type=CmdMessage, + callback=lambda _m, _h: None, + discovery_address=discovery_address, + topic_timeout=2.0, + ) + sub.start() + assert sub.running + + t0 = time.monotonic() + sub.close() + elapsed = time.monotonic() - t0 + # Poll timeout is 50ms + some slack; 1s is comfortable. + assert elapsed < 1.0, f"shutdown took {elapsed:.2f}s" + assert not sub.running + + pub.close() + + +def test_callback_exception_does_not_kill_thread(discovery_daemon, discovery_address): + """A throwing callback should be logged but the receive loop must continue.""" + pub = Publisher( + topic_name="/test/sync_throwing_cb", + message_type=CmdMessage, + node_name="pub_throwing", + discovery_address=discovery_address, + ) + time.sleep(0.2) + + seen: list[int] = [] + seen_lock = threading.Lock() + + def cb(msg, _hdr): + with seen_lock: + seen.append(msg.seq) + if msg.seq == 0: + raise RuntimeError("synthetic failure") + + sub = ThreadedSubscriber( + topic_name="/test/sync_throwing_cb", + message_type=CmdMessage, + callback=cb, + discovery_address=discovery_address, + queue_size=64, + topic_timeout=2.0, + ) + sub.start() + + try: + # Warmup using the same sentinel scheme — but the cb here only + # raises on seq==0, so warmup sentinels (>=100000) are safe. + _warmup(pub, sub) + baseline = len(seen) + + for i in range(3): + assert pub.publish(CmdMessage(seq=i, value=float(i))) + time.sleep(0.05) + + assert _wait_for(lambda: len(seen) - baseline >= 3, timeout=3.0), ( + f"only got {len(seen) - baseline} of 3 after warmup" + ) + assert sub.running, "receive thread died on user exception" + finally: + sub.close() + pub.close() + + +def test_small_queue_drops_intermediate(discovery_daemon, discovery_address): + """With ``queue_size=1`` and a slow callback, the publisher's burst is + dropped on the receive side: the subscriber sees far fewer messages + than were sent, and the dropped counter reflects the gap.""" + pub = Publisher( + topic_name="/test/sync_small_queue", + message_type=CmdMessage, + node_name="pub_small_queue", + discovery_address=discovery_address, + ) + time.sleep(0.2) + + received: list[int] = [] + received_lock = threading.Lock() + + def cb(msg, _hdr): + # Slow callback so the publisher can outrun the receiver. + time.sleep(0.02) + with received_lock: + received.append(msg.seq) + + sub = ThreadedSubscriber( + topic_name="/test/sync_small_queue", + message_type=CmdMessage, + callback=cb, + discovery_address=discovery_address, + queue_size=1, + topic_timeout=2.0, + ) + sub.start() + + try: + _warmup(pub, sub) + baseline_received = sub.receive_count + + # Burst 200 messages as fast as possible. + for i in range(200): + pub.publish(CmdMessage(seq=i, value=float(i))) + time.sleep(2.0) # give the slow callback time to drain what survives + + post = sub.receive_count - baseline_received + # We should have seen *some* messages but far fewer than 200. + assert post > 0 + assert post < 200, f"expected drops but got all {post} messages" + # Drop counter should reflect the gap. + assert sub.dropped_count > 0 + finally: + sub.close() + pub.close() + + +def test_node_spin_pure_sync(discovery_daemon, discovery_address): + """Node with only sync work runs end-to-end without entering asyncio.""" + from cortex.core.node import Node + + received: list[int] = [] + received_lock = threading.Lock() + + def cb(msg, _hdr): + with received_lock: + received.append(msg.seq) + + def producer(stop, pub): + seq = 0 + while not stop.is_set() and seq < 1000: + pub.publish(CmdMessage(seq=seq, value=float(seq))) + seq += 1 + time.sleep(0.001) + + pub_node = Node(name="pure_sync_pub", discovery_address=discovery_address) + sub_node = Node(name="pure_sync_sub", discovery_address=discovery_address) + + try: + pub = pub_node.create_publisher( + topic_name="/test/spin_sync", + message_type=CmdMessage, + mode="sync", + ) + time.sleep(0.1) + + sub_node.create_subscriber( + topic_name="/test/spin_sync", + message_type=CmdMessage, + callback=cb, + mode="sync", + queue_size=64, + topic_timeout=2.0, + ) + + # Spawn a producer on the publisher node and spin until it's done. + pub_node.spawn_thread(producer, pub, name="producer") + + # Run the subscriber spin in another thread so we can also spin + # the publisher — both need a thread to host their `spin()`. + sub_thread = threading.Thread( + target=lambda: sub_node.spin(timeout=3.0), + name="sub-spin", + ) + sub_thread.start() + + # Spin publisher node until producer finishes (stop_event is auto-set + # once close_sync runs; we just give it time). + pub_node.spin(timeout=2.0) + + sub_thread.join(timeout=2.0) + assert sub_node._running is False + assert pub_node._running is False + assert len(received) > 100, f"only got {len(received)} messages" + finally: + sub_node.close_sync() + pub_node.close_sync() + + +def test_node_spin_rejects_async_work(discovery_daemon, discovery_address): + """spin() must refuse if the node has async timers/subscribers.""" + from cortex.core.node import Node + + node = Node(name="mixed_node", discovery_address=discovery_address) + try: + + async def tick(): + pass + + node.create_timer(0.1, tick) + with pytest.raises(RuntimeError, match="async"): + node.spin() + finally: + # Have to use async close — node has an async timer + asyncio.run(node.close()) + + +def test_async_strict_fingerprint(discovery_daemon, discovery_address): + """Async subscriber with strict_fingerprint=True must raise on mismatch.""" + from cortex.core.subscriber import Subscriber + + pub = Publisher( + topic_name="/test/async_fp_strict", + message_type=OtherMessage, + node_name="pub_async_fp", + discovery_address=discovery_address, + ) + time.sleep(0.2) + + try: + # Construction does the fingerprint check via _lookup_nonblocking. + with pytest.raises(MessageFingerprintError): + Subscriber( + topic_name="/test/async_fp_strict", + message_type=CmdMessage, + callback=None, + discovery_address=discovery_address, + wait_for_topic=False, + strict_fingerprint=True, + ) + finally: + pub.close() + + +# --------------------------------------------------------------------------- +# Node integration +# --------------------------------------------------------------------------- + + +def test_node_sync_publisher_mode(discovery_daemon, discovery_address): + """``Node.create_publisher(mode='sync')`` returns a Publisher with its + own zmq.Context, suitable for being driven from a non-asyncio thread.""" + import zmq + + from cortex.core.node import Node + + async def main() -> None: + node = Node(name="sync_pub_node", discovery_address=discovery_address) + try: + pub = node.create_publisher( + topic_name="/test/sync_publisher", + message_type=CmdMessage, + mode="sync", + ) + # Sync mode should NOT have wrapped the node's async context. + assert not isinstance(pub._context, zmq.asyncio.Context) + assert pub.publish(CmdMessage(seq=0, value=0.0)) + assert pub._sequence == 1 + finally: + await node.close() + + asyncio.run(main()) + + +def test_node_mode_dispatch(discovery_daemon, discovery_address): + """Node.create_subscriber(mode='sync') returns a ThreadedSubscriber and + Node.run starts/stops its thread cleanly.""" + from cortex.core.node import Node + + async def main() -> None: + node_pub = Node(name="pub_node", discovery_address=discovery_address) + node_sub = Node(name="sub_node", discovery_address=discovery_address) + + try: + pub = node_pub.create_publisher( + topic_name="/test/node_sync", + message_type=CmdMessage, + ) + await asyncio.sleep(0.2) + + received: list[int] = [] + received_lock = threading.Lock() + + def cb(msg, _hdr): + with received_lock: + received.append(msg.seq) + + sub = node_sub.create_subscriber( + topic_name="/test/node_sync", + message_type=CmdMessage, + callback=cb, + mode="sync", + queue_size=64, + topic_timeout=2.0, + ) + assert isinstance(sub, ThreadedSubscriber) + + run_task = asyncio.create_task(node_sub.run()) + # Give the node a tick to start the worker thread. + await asyncio.sleep(0.1) + assert sub.running + + # Warmup: pump sentinels (seq >= 100000) until the SUB filter + # propagates and we see at least one delivery. + warmup_deadline = time.monotonic() + 3.0 + wseq = 100000 + while time.monotonic() < warmup_deadline: + with received_lock: + if received: + break + pub.publish(CmdMessage(seq=wseq, value=0.0)) + wseq += 1 + await asyncio.sleep(0.01) + with received_lock: + assert received, "SUB-PUB handshake never completed" + baseline = len(received) + + for i in range(3): + assert pub.publish(CmdMessage(seq=i, value=float(i))) + await asyncio.sleep(0.05) + + for _ in range(60): + with received_lock: + if len(received) - baseline >= 3: + break + await asyncio.sleep(0.05) + with received_lock: + post = [s for s in received[baseline:] if s < 100000] + assert sorted(post[:3]) == [0, 1, 2] + + node_sub.stop() + run_task.cancel() + with contextlib_suppress(): + await run_task + finally: + await node_pub.close() + await node_sub.close() + + asyncio.run(main()) + + +# Tiny helper to keep the asyncio test readable without a top-level import +# of contextlib.suppress (the rest of this file is sync). +class contextlib_suppress: + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return exc_type is asyncio.CancelledError From 4b5642831d73ff2d03674cb3cfdcbba67ffe8459 Mon Sep 17 00:00:00 2001 From: Richeek Das Date: Sun, 3 May 2026 19:29:19 -0400 Subject: [PATCH 7/8] sync and inproc benchmarks --- benchmarks/bench_latency_inproc.py | 301 +++++++++++++++++++++++++++++ benchmarks/bench_latency_sync.py | 275 ++++++++++++++++++++++++++ 2 files changed, 576 insertions(+) create mode 100644 benchmarks/bench_latency_inproc.py create mode 100644 benchmarks/bench_latency_sync.py diff --git a/benchmarks/bench_latency_inproc.py b/benchmarks/bench_latency_inproc.py new file mode 100644 index 0000000..9881bf3 --- /dev/null +++ b/benchmarks/bench_latency_inproc.py @@ -0,0 +1,301 @@ +#!/usr/bin/env python3 +""" +Same-process latency benchmark. + +The other latency benchmarks split publisher and subscriber into separate +OS processes, which means each side has its own GIL and the +free-threaded build of CPython buys nothing on those numbers. + +In a real robotics node you typically have an asyncio loop running +timers, telemetry subscribers, and CPU-bound housekeeping (state +estimation, logging, planning) **alongside** the low-latency control +subscriber. They share a process. On stock CPython the asyncio thread +holds the GIL between sleeps; the sync receive thread spends time +*waiting* for that GIL even when the kernel has woken it. On +``python3.14t`` (PEP 779) there is no GIL and the receive thread is +free to run on a separate core without contention. + +This benchmark stresses exactly that scenario: + +* main thread runs an asyncio loop with a periodic CPU-bound job +* a publisher publishes to an IPC topic at a fixed rate from another + thread +* a ``ThreadedSubscriber`` receives them on a third thread + +The reported latency is the wall-clock difference between +``send_time_ns`` (set by the publisher) and the moment the sync +callback runs. +""" + +import argparse +import asyncio +import contextlib +import statistics +import sys +import threading +import time +from dataclasses import dataclass +from pathlib import Path + +import numpy as np + +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) + +from cortex.core.publisher import Publisher # noqa: E402 +from cortex.core.sync_subscriber import ThreadedSubscriber # noqa: E402 +from cortex.discovery.daemon import DiscoveryDaemon # noqa: E402 +from cortex.messages.base import Message # noqa: E402 +from cortex.utils.runtime import runtime_info # noqa: E402 + + +@dataclass +class TickMessage(Message): + """Carries publisher-side perf_counter_ns timestamp + optional payload. + + The ``payload`` is deliberately ``bytes`` (not numpy) so the wire + cost is dominated by the value the caller picks via ``--payload-size``. + Header + two ``int`` fields contribute ~40 bytes of fixed overhead. + """ + + send_time_ns: int + sequence: int + payload: bytes + + +def _cpu_burn_ms(target_ms: float) -> None: + """Burn ~``target_ms`` of CPU time. Used to keep the asyncio thread busy.""" + end = time.perf_counter() + target_ms / 1000.0 + x = 0 + while time.perf_counter() < end: + # Trivial work the JIT can't elide; exercises the GIL on stock CPython. + x = (x * 1664525 + 1013904223) & 0xFFFFFFFF + + +async def _async_busy_loop(period_s: float, burn_ms: float, stop: asyncio.Event): + """Periodic CPU-bound asyncio task — emulates planner / state estimator.""" + while not stop.is_set(): + _cpu_burn_ms(burn_ms) + with contextlib.suppress(asyncio.TimeoutError): + await asyncio.wait_for(stop.wait(), timeout=period_s) + + +def _publisher_thread( + pub: Publisher, + num_messages: int, + rate_hz: float, + payload: bytes, + ready: threading.Event, +) -> None: + """Sync publisher in a dedicated thread so it never fights the loop.""" + interval = 1.0 / rate_hz + next_t = time.perf_counter() + ready.set() + for i in range(num_messages): + next_t += interval + sleep = next_t - time.perf_counter() + if sleep > 0: + time.sleep(sleep) + pub.publish( + TickMessage( + send_time_ns=time.perf_counter_ns(), + sequence=i, + payload=payload, + ) + ) + + +async def run_benchmark( + num_messages: int, + rate_hz: float, + burn_ms: float, + busy_period_s: float, + payload_size: int, +) -> dict: + """Run pub + sync sub + busy asyncio task all inside one process.""" + daemon_thread = threading.Thread( + target=DiscoveryDaemon().start, name="bench-discovery", daemon=True + ) + daemon_thread.start() + await asyncio.sleep(0.5) + + topic = "/bench/inproc" + pub = Publisher( + topic_name=topic, + message_type=TickMessage, + node_name="inproc_pub", + ) + await asyncio.sleep(0.2) + + payload = b"\x00" * payload_size + latencies_ns: list[int] = [] + seen_lock = threading.Lock() + + def cb(msg: TickMessage, _hdr) -> None: + recv = time.perf_counter_ns() + with seen_lock: + latencies_ns.append(recv - msg.send_time_ns) + + sub = ThreadedSubscriber( + topic_name=topic, + message_type=TickMessage, + callback=cb, + queue_size=64, + topic_timeout=5.0, + ) + sub.start() + + # Warmup until SUB filter has propagated. + deadline = time.monotonic() + 3.0 + seq = -1 + while time.monotonic() < deadline: + with seen_lock: + if latencies_ns: + break + pub.publish( + TickMessage( + send_time_ns=time.perf_counter_ns(), + sequence=seq, + payload=payload, + ) + ) + seq -= 1 + await asyncio.sleep(0.01) + with seen_lock: + latencies_ns.clear() + + # Start the asyncio CPU-burn loop alongside the receiver. + stop_event = asyncio.Event() + busy_task = asyncio.create_task( + _async_busy_loop(busy_period_s, burn_ms, stop_event) + ) + + pub_ready = threading.Event() + pub_thread = threading.Thread( + target=_publisher_thread, + args=(pub, num_messages, rate_hz, payload, pub_ready), + name="bench-pub", + ) + pub_thread.start() + pub_ready.wait(timeout=2.0) + + bench_start = time.perf_counter() + pub_thread.join(timeout=120) + # Allow last-message delivery to settle. + await asyncio.sleep(0.2) + bench_end = time.perf_counter() + + stop_event.set() + await busy_task + + sub.close() + pub.close() + + if not latencies_ns: + return {"error": "no messages received"} + + duration = bench_end - bench_start + arr_us = [ns / 1000.0 for ns in latencies_ns] + received = len(arr_us) + return { + "received": received, + "duration_s": duration, + "payload_size": payload_size, + "throughput_msg_per_s": received / duration if duration else 0, + "throughput_bytes_per_s": (received * payload_size / duration) + if duration + else 0, + "latency_min_us": min(arr_us), + "latency_p50_us": float(np.percentile(arr_us, 50)), + "latency_mean_us": statistics.mean(arr_us), + "latency_p90_us": float(np.percentile(arr_us, 90)), + "latency_p99_us": float(np.percentile(arr_us, 99)), + "latency_p999_us": float(np.percentile(arr_us, 99.9)), + "latency_max_us": max(arr_us), + "dropped": sub.dropped_count, + } + + +def _human_byte_rate(bytes_per_s: float) -> str: + """Format bytes/sec as KB/s, MB/s, or GB/s.""" + if bytes_per_s >= 1024**3: + return f"{bytes_per_s / 1024**3:,.2f} GB/s" + if bytes_per_s >= 1024**2: + return f"{bytes_per_s / 1024**2:,.2f} MB/s" + if bytes_per_s >= 1024: + return f"{bytes_per_s / 1024:,.2f} KB/s" + return f"{bytes_per_s:,.0f} B/s" + + +def print_results(results: dict, num_messages: int, rate_hz: float, burn_ms: float): + info = runtime_info() + label = "free-threaded" if info.free_threaded else "GIL" + print("\n" + "=" * 60) + print( + f"INPROC LATENCY (CPython {info.python_version[0]}.{info.python_version[1]} {label})" + ) + print("=" * 60) + print( + f"Pub rate: {rate_hz} Hz, msgs: {num_messages}, asyncio burn: {burn_ms} ms/period" + ) + if "error" in results: + print(f"ERROR: {results['error']}") + return + print( + f"Received: {results['received']:,} / {num_messages:,} Drops: {results['dropped']}" + ) + print(f"Duration: {results['duration_s']:.2f}s") + print( + f"Throughput: {results['throughput_msg_per_s']:,.0f} msg/s " + f"({_human_byte_rate(results['throughput_bytes_per_s'])} payload, " + f"{results['payload_size']:,} B/msg)" + ) + print("\nLatency (µs):") + print(f" min: {results['latency_min_us']:>9.1f}") + print(f" p50: {results['latency_p50_us']:>9.1f}") + print(f" mean: {results['latency_mean_us']:>9.1f}") + print(f" p90: {results['latency_p90_us']:>9.1f}") + print(f" p99: {results['latency_p99_us']:>9.1f}") + print(f" p99.9: {results['latency_p999_us']:>9.1f}") + print(f" max: {results['latency_max_us']:>9.1f}") + print("=" * 60 + "\n") + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("-n", "--num-messages", type=int, default=2000) + parser.add_argument("-r", "--rate", type=float, default=500.0) + parser.add_argument( + "-s", + "--payload-size", + type=int, + default=256, + help="Bytes of payload per message (default: 256)", + ) + parser.add_argument( + "--burn-ms", + type=float, + default=2.0, + help="CPU-burn time per asyncio busy iteration (ms)", + ) + parser.add_argument( + "--busy-period", + type=float, + default=0.005, + help="Period between asyncio busy iterations (s)", + ) + args = parser.parse_args() + + results = asyncio.run( + run_benchmark( + num_messages=args.num_messages, + rate_hz=args.rate, + burn_ms=args.burn_ms, + busy_period_s=args.busy_period, + payload_size=args.payload_size, + ) + ) + print_results(results, args.num_messages, args.rate, args.burn_ms) + + +if __name__ == "__main__": + main() diff --git a/benchmarks/bench_latency_sync.py b/benchmarks/bench_latency_sync.py new file mode 100644 index 0000000..2d26e98 --- /dev/null +++ b/benchmarks/bench_latency_sync.py @@ -0,0 +1,275 @@ +#!/usr/bin/env python3 +""" +Synchronous-path latency benchmark for Cortex. + +Establishes the latency floor achievable on this hardware when the +subscriber side runs on a dedicated OS thread with synchronous zmq + +``zmq.Poller``, bypassing the asyncio scheduler entirely. + +Companion to ``bench_latency.py``. The publisher is identical (sync zmq), +so any difference in p50/p99 between the two scripts is purely the cost +of ``zmq.asyncio`` + ``asyncio`` + ``await user_callback`` on the +subscriber side. + +Use: + python benchmarks/bench_latency_sync.py -n 5000 -s 1024 -r 1000 + +On a free-threaded build (``python3.14t``) the same script will report +markedly tighter p99 because the receive thread does not contend with +any asyncio loop for the GIL. +""" + +import argparse +import multiprocessing as mp +import statistics +import sys +import time +from dataclasses import dataclass +from pathlib import Path + +import numpy as np +import zmq + +sys.path.insert(0, str(Path(__file__).parent.parent / "src")) + +from cortex.core.publisher import Publisher # noqa: E402 +from cortex.discovery.daemon import DiscoveryDaemon # noqa: E402 +from cortex.messages.base import Message # noqa: E402 +from cortex.utils.runtime import runtime_info # noqa: E402 + + +@dataclass +class LatencyMessage(Message): + """Message with a wall-clock send timestamp for one-way latency.""" + + send_time_ns: int + sequence: int + payload: bytes + + +def _run_discovery_daemon() -> None: + DiscoveryDaemon().start() + + +def _run_publisher( + topic: str, + num_messages: int, + payload_size: int, + rate_hz: float, + ready_event, + start_event, +) -> None: + time.sleep(0.5) + + pub = Publisher( + topic_name=topic, + message_type=LatencyMessage, + node_name="latency_publisher_sync", + ) + ready_event.set() + start_event.wait() + + payload = b"\x00" * payload_size + interval = 1.0 / rate_hz if rate_hz > 0 else 0.0 + + for i in range(num_messages): + msg = LatencyMessage( + send_time_ns=time.time_ns(), + sequence=i, + payload=payload, + ) + pub.publish(msg) + if interval > 0.0: + time.sleep(interval) + + time.sleep(0.1) + pub.close() + + +def _run_sync_subscriber( + topic: str, + num_messages: int, + ready_event, + start_event, + results_queue, +) -> None: + """Pure-sync receive loop: zmq.Context + zmq.Poller, no asyncio.""" + from cortex.discovery.client import DiscoveryClient # local import + + discovery = DiscoveryClient() + info = discovery.wait_for_topic(topic, timeout=30.0) + if info is None: + results_queue.put({"received": 0, "latencies": [], "error": "topic_timeout"}) + discovery.close() + return + discovery.close() + + ctx = zmq.Context.instance() + sock = ctx.socket(zmq.SUB) + sock.setsockopt(zmq.RCVHWM, 10) + sock.setsockopt(zmq.LINGER, 0) + sock.setsockopt_string(zmq.SUBSCRIBE, topic) + sock.connect(info.address) + + poller = zmq.Poller() + poller.register(sock, zmq.POLLIN) + + ready_event.set() + start_event.wait() + + latencies: list[float] = [] + deadline = time.monotonic() + 60.0 + + while len(latencies) < num_messages and time.monotonic() < deadline: + events = dict(poller.poll(timeout=1000)) + if sock not in events: + continue + frames = sock.recv_multipart(copy=False, flags=zmq.NOBLOCK) + if len(frames) < 2: + continue + payload_frames = frames[1:] + if len(payload_frames) == 1: + raw = memoryview(payload_frames[0].buffer) + msg, _hdr = LatencyMessage.from_bytes(raw) + else: + msg, _hdr = LatencyMessage.from_frames(payload_frames) + recv_ns = time.time_ns() + latencies.append((recv_ns - msg.send_time_ns) / 1000.0) + + sock.close() + ctx.term() + results_queue.put({"received": len(latencies), "latencies": latencies}) + + +def run_sync_benchmark( + num_messages: int = 1000, + payload_size: int = 1024, + rate_hz: float = 1000.0, +) -> dict: + topic = "/benchmark/latency_sync" + + discovery_proc = mp.Process(target=_run_discovery_daemon, daemon=True) + discovery_proc.start() + time.sleep(1.0) + + pub_ready = mp.Event() + sub_ready = mp.Event() + start_event = mp.Event() + results_queue: mp.Queue = mp.Queue() + + sub_proc = mp.Process( + target=_run_sync_subscriber, + args=(topic, num_messages, sub_ready, start_event, results_queue), + ) + sub_proc.start() + + pub_proc = mp.Process( + target=_run_publisher, + args=(topic, num_messages, payload_size, rate_hz, pub_ready, start_event), + ) + pub_proc.start() + + pub_ready.wait(timeout=10) + sub_ready.wait(timeout=15) + time.sleep(0.2) + + bench_start = time.time() + start_event.set() + pub_proc.join(timeout=120) + sub_proc.join(timeout=120) + duration = time.time() - bench_start + + results = results_queue.get(timeout=5) + discovery_proc.terminate() + discovery_proc.join(timeout=2) + + latencies = results["latencies"] + if not latencies: + return {"error": results.get("error", "no messages"), "received": 0} + + return { + "num_messages": num_messages, + "payload_size": payload_size, + "rate_hz": rate_hz, + "received": results["received"], + "loss_rate": (num_messages - results["received"]) / num_messages * 100, + "duration_s": duration, + "latency_min_us": min(latencies), + "latency_max_us": max(latencies), + "latency_mean_us": statistics.mean(latencies), + "latency_median_us": statistics.median(latencies), + "latency_std_us": statistics.stdev(latencies) if len(latencies) > 1 else 0.0, + "latency_p50_us": float(np.percentile(latencies, 50)), + "latency_p90_us": float(np.percentile(latencies, 90)), + "latency_p99_us": float(np.percentile(latencies, 99)), + "throughput_msg_per_s": results["received"] / duration if duration else 0, + # Byte throughput approximates only the user-supplied payload, not + # the 24-byte header or msgpack metadata. Good enough to compare + # benches at different ``payload_size`` settings. + "throughput_bytes_per_s": ( + results["received"] * payload_size / duration if duration else 0 + ), + } + + +def _human_byte_rate(bytes_per_s: float) -> str: + """Format a bytes/sec rate as KB/s, MB/s, or GB/s.""" + if bytes_per_s >= 1024**3: + return f"{bytes_per_s / 1024**3:,.2f} GB/s" + if bytes_per_s >= 1024**2: + return f"{bytes_per_s / 1024**2:,.2f} MB/s" + if bytes_per_s >= 1024: + return f"{bytes_per_s / 1024:,.2f} KB/s" + return f"{bytes_per_s:,.0f} B/s" + + +def print_results(results: dict) -> None: + info = runtime_info() + print("\n" + "=" * 60) + print("SYNC LATENCY BENCHMARK (raw zmq.Poller, no asyncio)") + print("=" * 60) + print( + f"Runtime: {info.implementation} " + f"{info.python_version[0]}.{info.python_version[1]}.{info.python_version[2]} " + f"(GIL {'disabled' if info.free_threaded else 'enabled'})" + ) + + if "error" in results: + print(f"ERROR: {results['error']}") + return + + print("\nDelivery:") + print(f" Received: {results['received']:,} / {results['num_messages']:,}") + print(f" Loss rate: {results['loss_rate']:.2f}%") + print(f" Payload size: {results['payload_size']:,} bytes/msg") + print(f" Throughput: {results['throughput_msg_per_s']:,.0f} msg/s") + print(f" Bytes/s: {_human_byte_rate(results['throughput_bytes_per_s'])}") + + print("\nLatency (microseconds):") + print(f" Min: {results['latency_min_us']:,.1f}") + print(f" Median (p50): {results['latency_p50_us']:,.1f}") + print(f" Mean: {results['latency_mean_us']:,.1f}") + print(f" P90: {results['latency_p90_us']:,.1f}") + print(f" P99: {results['latency_p99_us']:,.1f}") + print(f" Max: {results['latency_max_us']:,.1f}") + print("=" * 60 + "\n") + + +def main() -> None: + parser = argparse.ArgumentParser(description="Cortex sync latency benchmark") + parser.add_argument("-n", "--num-messages", type=int, default=1000) + parser.add_argument("-s", "--payload-size", type=int, default=1024) + parser.add_argument("-r", "--rate", type=float, default=1000.0) + args = parser.parse_args() + + print_results( + run_sync_benchmark( + num_messages=args.num_messages, + payload_size=args.payload_size, + rate_hz=args.rate, + ) + ) + + +if __name__ == "__main__": + main() From 3ef8c054f00ce58774ec4d1e5c6a7909567e4e1c Mon Sep 17 00:00:00 2001 From: Richeek Das Date: Sun, 3 May 2026 19:29:38 -0400 Subject: [PATCH 8/8] sycn and async examples for pub sub controls --- examples/async_publisher_control_loop.py | 102 +++++++++++++++++++ examples/async_subscriber_control_loop.py | 111 +++++++++++++++++++++ examples/sync_publisher_control_loop.py | 100 +++++++++++++++++++ examples/sync_subscriber_control_loop.py | 113 ++++++++++++++++++++++ 4 files changed, 426 insertions(+) create mode 100644 examples/async_publisher_control_loop.py create mode 100644 examples/async_subscriber_control_loop.py create mode 100644 examples/sync_publisher_control_loop.py create mode 100644 examples/sync_subscriber_control_loop.py diff --git a/examples/async_publisher_control_loop.py b/examples/async_publisher_control_loop.py new file mode 100644 index 0000000..8d0e306 --- /dev/null +++ b/examples/async_publisher_control_loop.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +""" +Example: async publisher driving the same control topic as the sync example. + +Uses ``Node.create_publisher`` (default ``mode='async'``) and a periodic +``Node.create_timer`` to publish wheel commands at 1 kHz through the +asyncio scheduler. Pair with either ``async_subscriber_control_loop.py`` +or ``sync_subscriber_control_loop.py`` — they all share the same topic +and message, so any pub/sub combination works on the wire. + +Compare the printed jitter against ``sync_publisher_control_loop.py``: +the asyncio loop's scheduling resolution is what limits how tightly we +can hit a high-rate target. On most machines this caps out around +~500-1000 Hz with notable variance; the sync example sits cleanly at +1 kHz on the same hardware. + +Usage: + # Terminal 1 + python -m cortex.discovery.daemon + + # Terminal 2 + python examples/async_publisher_control_loop.py + + # Terminal 3 (pick either) + python examples/async_subscriber_control_loop.py + python examples/sync_subscriber_control_loop.py +""" + +import math +import time +from dataclasses import dataclass + +import cortex +from cortex import Message, Node + + +@dataclass +class WheelCommand(Message): + """Identical layout to the sync examples — the fingerprint must match.""" + + left_rad_s: float + right_rad_s: float + issued_at_ns: int + + +class AsyncControlPublisherNode(Node): + """Publishes /cmd/wheel_velocity through the asyncio scheduler.""" + + def __init__(self, rate_hz: float = 1000.0) -> None: + super().__init__(name="async_control_pub") + + self._rate_hz = rate_hz + self._t0 = time.perf_counter() + self._sent = 0 + + # Default mode is "async": shares the node's zmq.asyncio.Context. + self.pub = self.create_publisher( + topic_name="/cmd/wheel_velocity", + message_type=WheelCommand, + ) + + # Periodic timer scheduled by the asyncio event loop. + self.create_timer(1.0 / rate_hz, self._tick) + + print(f"Publishing /cmd/wheel_velocity at ~{rate_hz:.0f} Hz (async)") + print("Press Ctrl+C to stop") + + async def _tick(self) -> None: + t = time.perf_counter() - self._t0 + left = 1.5 * math.sin(2 * math.pi * 0.25 * t) + right = 1.5 * math.sin(2 * math.pi * 0.25 * t + math.pi / 8) + + # ``publish`` itself is sync (zmq.PUB.send_multipart) — only the + # scheduling around it goes through asyncio. + self.pub.publish( + WheelCommand( + left_rad_s=left, + right_rad_s=right, + issued_at_ns=time.time_ns(), + ) + ) + self._sent += 1 + + if self._sent % 1000 == 0: + elapsed = time.perf_counter() - self._t0 + print( + f" sent {self._sent} cmds in {elapsed:.1f}s " + f"(actual {self._sent / elapsed:.0f} Hz, target {self._rate_hz:.0f} Hz)" + ) + + +async def main() -> None: + print("Starting async control-loop publisher...") + async with AsyncControlPublisherNode() as node: + try: + await node.run() + except KeyboardInterrupt: + print("\nShutting down...") + + +if __name__ == "__main__": + cortex.run(main()) diff --git a/examples/async_subscriber_control_loop.py b/examples/async_subscriber_control_loop.py new file mode 100644 index 0000000..3742b49 --- /dev/null +++ b/examples/async_subscriber_control_loop.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python3 +""" +Example: async subscriber for the same /cmd/wheel_velocity control topic. + +Uses ``Node.create_subscriber`` (default ``mode='async'``) with an +``async def`` callback driven by the asyncio event loop. Pair with +either publisher example — the wire format is identical. + +Compare the printed end-to-end latency to +``sync_subscriber_control_loop.py``: + +* on stock CPython the async path adds ~3-4 ``await`` hops, so p99 + sits around 1-1.5 ms and the worst case stretches into many ms when + the asyncio thread is busy +* the sync path stays bounded at hundreds of microseconds even under + load + +Usage: + # Terminal 1 + python -m cortex.discovery.daemon + + # Terminal 2 (pick either) + python examples/async_publisher_control_loop.py + python examples/sync_publisher_control_loop.py + + # Terminal 3 + python examples/async_subscriber_control_loop.py +""" + +import asyncio +import time +from dataclasses import dataclass + +import cortex +from cortex import Message, Node + + +@dataclass +class WheelCommand(Message): + """Identical layout to the publisher / sync examples.""" + + left_rad_s: float + right_rad_s: float + issued_at_ns: int + + +class AsyncControlSubscriberNode(Node): + """Subscribes to /cmd/wheel_velocity and prints rolling latency stats.""" + + def __init__(self) -> None: + super().__init__(name="async_control_sub") + + self.cmd_count = 0 + self.latency_sum_us = 0.0 + self.last_cmd: WheelCommand | None = None + + # Default mode is "async": one asyncio task drives recv → decode → cb. + self.create_subscriber( + topic_name="/cmd/wheel_velocity", + message_type=WheelCommand, + callback=self.on_wheel_command, # async def, see below + queue_size=64, + ) + + # An async telemetry timer running in the same node — exactly the + # pattern the sync example shows, but here everything is async. + self.create_timer(1 / 50.0, self.publish_telemetry) + + print("Subscribed to /cmd/wheel_velocity in async mode") + print("Async telemetry running at 50 Hz") + print("Press Ctrl+C to stop") + + async def on_wheel_command(self, msg: WheelCommand, header) -> None: + """Async callback — runs as part of the asyncio event loop.""" + latency_us = (time.time_ns() - msg.issued_at_ns) / 1000.0 + self.last_cmd = msg + self.cmd_count += 1 + self.latency_sum_us += latency_us + + if self.cmd_count % 100 == 0: + avg_us = self.latency_sum_us / self.cmd_count + print( + f"[async] {self.cmd_count} cmds " + f"L={msg.left_rad_s:+.2f} R={msg.right_rad_s:+.2f} " + f"latency last={latency_us:.0f}µs avg={avg_us:.0f}µs " + f"seq={header.sequence}" + ) + + # Yielding here is a no-op for this example, but real callbacks + # often await downstream IO (DB, HTTP, another publish) — that's + # the kind of work async mode composes well with. + await asyncio.sleep(0) + + async def publish_telemetry(self) -> None: + if self.last_cmd is None: + return + if self.cmd_count and self.cmd_count % 250 == 0: + print(f"[async] heartbeat — {self.cmd_count} cmds applied so far") + + +async def main() -> None: + print("Starting async control-loop subscriber...") + async with AsyncControlSubscriberNode() as node: + try: + await node.run() + except KeyboardInterrupt: + print("\nShutting down...") + + +if __name__ == "__main__": + cortex.run(main()) diff --git a/examples/sync_publisher_control_loop.py b/examples/sync_publisher_control_loop.py new file mode 100644 index 0000000..90a70e0 --- /dev/null +++ b/examples/sync_publisher_control_loop.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +""" +Example: pure-sync publisher driving a tight control loop. + +Demonstrates that a node which owns only sync work needs no asyncio at +all: no ``async def``, no ``await``, no ``cortex.run`` — just plain +Python. The node provides ``spawn_thread`` for tracked sync workers and +``spin`` / ``close_sync`` for the lifecycle. + +Pairs with ``examples/sync_subscriber_control_loop.py``. + +Usage: + # Terminal 1 + python -m cortex.discovery.daemon + + # Terminal 2 + python examples/sync_publisher_control_loop.py + + # Terminal 3 + python examples/sync_subscriber_control_loop.py +""" + +import math +import threading +import time +from dataclasses import dataclass + +from cortex import Message, Node + + +@dataclass +class WheelCommand(Message): + """Must match the dataclass in sync_subscriber_control_loop.py exactly + so the fingerprint is identical.""" + + left_rad_s: float + right_rad_s: float + issued_at_ns: int + + +def control_loop(stop: threading.Event, pub, rate_hz: float) -> None: + """Publish a sinusoidal wheel command at ``rate_hz`` until ``stop`` fires. + + The first argument is supplied by ``Node.spawn_thread`` — the node's + shared stop event. Polling it on every iteration is what makes the + thread shut down promptly when the node closes. + """ + interval = 1.0 / rate_hz + next_t = time.perf_counter() + sent = 0 + t0 = time.perf_counter() + + print(f"Publishing /cmd/wheel_velocity at {rate_hz:.0f} Hz from a sync thread") + + while not stop.is_set(): + next_t += interval + sleep = next_t - time.perf_counter() + if sleep > 0: + # Cap at 50 ms so we stay responsive to ``stop`` even if the + # caller drops the rate way below 20 Hz. + time.sleep(min(sleep, 0.05)) + + t = time.perf_counter() - t0 + left = 1.5 * math.sin(2 * math.pi * 0.25 * t) + right = 1.5 * math.sin(2 * math.pi * 0.25 * t + math.pi / 8) + pub.publish( + WheelCommand( + left_rad_s=left, + right_rad_s=right, + issued_at_ns=time.time_ns(), + ) + ) + sent += 1 + if sent % 1000 == 0: + elapsed = time.perf_counter() - t0 + print(f" sent {sent} cmds in {elapsed:.1f}s ({sent / elapsed:.0f} Hz)") + + +def main() -> None: + print("Starting sync-mode control-loop publisher...") + with Node(name="control_loop_publisher") as node: + pub = node.create_publisher( + topic_name="/cmd/wheel_velocity", + message_type=WheelCommand, + mode="sync", # independent zmq.Context, no asyncio shadow + ) + + # The node now owns the publisher thread: it gets the shared stop + # event, ``spin()`` stays alive while it's running, and the + # context manager joins it on exit. + node.spawn_thread(control_loop, pub, 1000.0, name="cmd-publisher") + + try: + node.spin() # blocks until Ctrl+C or node.stop() + except KeyboardInterrupt: + print("\nShutting down...") + + +if __name__ == "__main__": + main() diff --git a/examples/sync_subscriber_control_loop.py b/examples/sync_subscriber_control_loop.py new file mode 100644 index 0000000..b8667fc --- /dev/null +++ b/examples/sync_subscriber_control_loop.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 +""" +Example: synchronous low-latency subscriber for a control topic. + +Pairs with ``examples/sync_publisher_control_loop.py``. Demonstrates the +``mode='sync'`` opt-in: the receive loop runs on a dedicated OS thread +with a synchronous ``zmq.Poller``, the callback is a plain function (no +``await``), and ``queue_size=1`` gives latest-wins semantics suitable +for control commands. + +The node also runs an async telemetry timer at 50 Hz so you can see the +two paths coexisting in the same process — sync for the hot loop, async +for everything else. + +Usage: + # Terminal 1 + python -m cortex.discovery.daemon + + # Terminal 2 + python examples/sync_publisher_control_loop.py + + # Terminal 3 + python examples/sync_subscriber_control_loop.py +""" + +import time +from dataclasses import dataclass + +import cortex +from cortex import Message, Node + + +@dataclass +class WheelCommand(Message): + """Wheel-velocity command. Tiny payload — control-loop shape.""" + + left_rad_s: float + right_rad_s: float + issued_at_ns: int + + +class ControlLoopNode(Node): + """Subscribes to wheel commands on a sync thread and applies them. + + A real robot would forward each command to the motor driver. Here we + just measure end-to-end latency from publish to callback so you can + eyeball the difference between sync and async modes. + """ + + def __init__(self) -> None: + super().__init__(name="control_loop_node") + + self.last_cmd: WheelCommand | None = None + self.cmd_count = 0 + self.latency_sum_us = 0.0 + + # ---- sync receive on a dedicated thread ------------------------ + self.create_subscriber( + topic_name="/cmd/wheel_velocity", + message_type=WheelCommand, + callback=self.on_wheel_command, # plain def, NOT async def + mode="sync", + queue_size=1, # latest-wins + # cpu_affinity=[3], # uncomment on Linux to pin + # sched_priority=20, # uncomment if you have CAP_SYS_NICE + ) + + # ---- async telemetry timer in the same process ---------------- + self.create_timer(1 / 50.0, self.publish_telemetry) + + print("Subscribed to /cmd/wheel_velocity in sync mode") + print("Async telemetry running at 50 Hz") + print("Press Ctrl+C to stop") + + # callback runs on the sync receive thread + def on_wheel_command(self, msg: WheelCommand, header) -> None: + latency_us = (time.time_ns() - msg.issued_at_ns) / 1000.0 + self.last_cmd = msg + self.cmd_count += 1 + self.latency_sum_us += latency_us + + if self.cmd_count % 100 == 0: + avg_us = self.latency_sum_us / self.cmd_count + print( + f"[sync] {self.cmd_count} cmds " + f"L={msg.left_rad_s:+.2f} R={msg.right_rad_s:+.2f} " + f"latency last={latency_us:.0f}µs avg={avg_us:.0f}µs " + f"seq={header.sequence}" + ) + + # async timer body — coexists happily with the sync subscriber + async def publish_telemetry(self) -> None: + if self.last_cmd is None: + return + # In a real node this would publish wheel encoder readings, IMU, + # or whatever you want at lower-priority. We just print a heartbeat. + if self.cmd_count and self.cmd_count % 250 == 0: + print(f"[async] heartbeat — {self.cmd_count} cmds applied so far") + + +async def main() -> None: + print("Starting sync-mode control-loop subscriber...") + node = ControlLoopNode() + try: + await node.run() + except KeyboardInterrupt: + print("\nShutting down...") + finally: + await node.close() + + +if __name__ == "__main__": + cortex.run(main())