Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ jobs:
python -m pip install --upgrade pip
pip install -e ".[docs]"

- name: Verify generated API reference is in sync
run: python docs/gen_ref_pages.py --check

- name: Build docs
run: zensical build -f docs/mkdocs.yml

Expand Down
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,8 @@ cython_debug/
marimo/_static/
marimo/_lsp/
__marimo__/

# Internal-only docs (not published)
/docs/critique.md
/docs/plans/
/docs/TODO
45 changes: 13 additions & 32 deletions docs/components/discovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
> [`cortex.discovery.client`](../reference/discovery/client.md),
> [`cortex.discovery.protocol`](../reference/discovery/protocol.md)

Discovery is Cortex's control plane: a single long-lived process that maps
topic names to ZMQ endpoints. It sits off the data path — once a subscriber
has an endpoint, messages flow publisher → subscriber directly without the
daemon's involvement.
A single long-lived process mapping topic names to ZMQ endpoints. Off the data path — once a subscriber has an endpoint, messages flow publisher → subscriber directly.

## Moving parts

Expand All @@ -25,21 +22,15 @@ flowchart LR
PR -.-> CL
```

Everyone agrees on the wire format via `protocol.py`. The daemon runs a
single-threaded REP loop. The client speaks REQ from every publisher and
subscriber in the graph.
Both sides agree on the wire format via `protocol.py`. The daemon runs a single-threaded REP loop; publishers/subscribers speak REQ.

## Daemon

Implemented in [`DiscoveryDaemon`][cortex.discovery.daemon.DiscoveryDaemon].

Key behaviors:
[`DiscoveryDaemon`][cortex.discovery.daemon.DiscoveryDaemon]:

- Binds `zmq.REP` at `ipc:///tmp/cortex/discovery.sock` by default.
- Maintains `_topics: dict[str, TopicInfo]` — **one publisher per topic**.
- `RCVTIMEO=1000` on the socket so the loop can check `_running` for clean
Ctrl-C. This also means the daemon is naturally single-request-at-a-time —
a slow client blocks all others.
- Maintains `_topics: dict[str, TopicInfo]` — one publisher per topic.
- `RCVTIMEO=1000` so the loop can check `_running` for clean Ctrl-C. Single request at a time — a slow client blocks others.

### State transitions

Expand All @@ -66,12 +57,7 @@ stateDiagram-v2

## Client

Implemented in [`DiscoveryClient`][cortex.discovery.client.DiscoveryClient].

Thin REQ wrapper around the protocol. Important operational detail: **REQ
sockets stick after a timeout** — they block subsequent sends waiting for a
reply that never came. The client handles this by closing and recreating the
socket on every timeout (`_reconnect`). Callers don't see it.
[`DiscoveryClient`][cortex.discovery.client.DiscoveryClient] is a thin REQ wrapper. Operational detail: **REQ sockets stick after a timeout** — they block subsequent sends waiting for a reply that never came. The client closes and recreates the socket on every timeout (`_reconnect`); callers don't see it.

### REQ timeout recovery

Expand All @@ -98,7 +84,7 @@ flowchart TD

## Protocol

Implemented in [`cortex.discovery.protocol`](../reference/discovery/protocol.md).
[`cortex.discovery.protocol`](../reference/discovery/protocol.md):

| Type | Purpose |
| -------------------------------------------------------------------- | ----------------------------------------- |
Expand All @@ -108,21 +94,16 @@ Implemented in [`cortex.discovery.protocol`](../reference/discovery/protocol.md)
| [`DiscoveryRequest`][cortex.discovery.protocol.DiscoveryRequest] | command + optional topic_info / topic_name |
| [`DiscoveryResponse`][cortex.discovery.protocol.DiscoveryResponse] | status, message, topic_info, topics |

All payloads are msgpack. `TopicInfo` is nested as a packed sub-blob so
discovery responses stay flat.

## Known limitations
All payloads are msgpack. `TopicInfo` is nested as a packed sub-blob so responses stay flat.

Summarized here, detailed in [critique.md](../critique.md):
## Limitations

- One-publisher-per-topic.
- No heartbeats or leases — crashed publishers leave stale entries.
- Single-threaded REP — slow client starves others.
- `retries=1` in the client is a fencepost; effective retries today is zero.
- Daemon state lost on restart; publishers do not auto-re-register.
- One publisher per topic.
- No heartbeats or leases — a crashed publisher leaves a stale entry.
- Single-threaded REP — a slow client blocks others.
- Daemon state is lost on restart; publishers don't auto-re-register.

## See also

- [Concepts → Discovery protocol](../concepts/discovery-protocol.md)
- [Getting started → Running the discovery daemon](../getting-started/discovery-daemon.md)
- [Critique](../critique.md)
18 changes: 5 additions & 13 deletions docs/components/messages.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@
> **Source:** [`cortex.messages.base`](../reference/messages/base.md),
> [`cortex.messages.standard`](../reference/messages/standard.md)

Messages are just `@dataclass`es that inherit from
[`Message`][cortex.messages.base.Message]. Registering with the type system,
computing a fingerprint, and (de)serialization all happen automatically.
A message is a `@dataclass` that inherits from [`Message`][cortex.messages.base.Message]. Registration, fingerprinting, and serialization are automatic.

## Anatomy of a message

Expand Down Expand Up @@ -55,22 +53,16 @@ class JointTrajectory(Message):
frame_id: str = ""
```

That is the entire contract. The class is registered into
[`MessageType._registry`][cortex.messages.base.MessageType] by fingerprint at
import time, and gains:
The class registers into [`MessageType._registry`][cortex.messages.base.MessageType] by fingerprint at import time and gains:

- `JointTrajectory.fingerprint()` — 64-bit ID.
- `msg.to_frames()` / `JointTrajectory.from_frames(frames)` — the transport path.
- `msg.to_bytes()` / `JointTrajectory.from_bytes(data)` — the legacy blob path.
- `msg.to_frames()` / `JointTrajectory.from_frames(frames)` — transport path.
- `msg.to_bytes()` / `JointTrajectory.from_bytes(data)` — legacy single-blob path.
- `Message.decode(blob)` — class dispatch via fingerprint registry.

## Sequence numbering

!!! warning "Class-level counter"
`Message._sequence_counter` is shared across **all publisher instances** of
the same message class in the process. Two `ArrayMessage` publishers
interleave sequence numbers. Per-topic gap detection therefore needs a
per-publisher counter today; see [critique.md § 12](../critique.md).
Per-publisher monotonic counter, attached to each message's header. A class-level fallback counter on `Message._sequence_counter` covers direct `to_bytes`/`to_frames` use outside a Publisher (tests, ad-hoc serialization).

## Built-in messages

Expand Down
71 changes: 12 additions & 59 deletions docs/components/node-and-executors.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
> **Source:** [`cortex.core.node`](../reference/core/node.md),
> [`cortex.core.executor`](../reference/core/executor.md)

A [`Node`][cortex.core.node.Node] is the user-facing composition unit: it owns
a shared ZMQ async context and a collection of publishers, subscribers, and
timers. Executors provide the scheduling primitives that timers and
subscriber receive loops run on.
A [`Node`][cortex.core.node.Node] owns a ZMQ async context, a set of publishers/subscribers, and a list of timers. Executors are the loops that drive timers and subscriber receive paths.

## Responsibilities

Expand All @@ -28,11 +25,7 @@ flowchart TB
S -. uses .-> CTX
```

One node = one process boundary in practice. Nothing stops you running
multiple nodes in the same process (`asyncio.gather([n.run() for n in nodes])`,
see [`examples/multi_node_system.py`](https://github.com/sudoRicheek/cortex/blob/main/examples/multi_node_system.py)),
but remember they share the same event loop — a slow callback in one still
blocks the others.
One node ≈ one process. You can run several in the same process via `asyncio.gather([n.run() for n in nodes])` ([`examples/multi_node_system.py`](https://github.com/sudoRicheek/cortex/blob/main/examples/multi_node_system.py)) — but they share the event loop, so a slow callback in one blocks the others.

## Lifecycle

Expand All @@ -47,11 +40,9 @@ stateDiagram-v2
Closed --> [*]: context terminated
```

### `node.run()`
`node.run()` spawns one asyncio task per timer and one per callback-bearing subscriber, then `asyncio.gather`s them.

Spawns one asyncio task per timer and one per callback-bearing subscriber,
then `asyncio.gather`s them. Returns when all tasks complete or the node is
stopped.
`node.close()` stops executors, cancels tasks, closes all sockets, and terms the ZMQ context. Idempotent.

```python
async with Node("my_node") as node:
Expand All @@ -61,15 +52,9 @@ async with Node("my_node") as node:
# __aexit__ calls close() automatically
```

### `node.close()`

Stops all executors, cancels outstanding tasks, closes every publisher and
subscriber (each of which unregisters/unbinds their own socket), and
terminates the shared ZMQ context. Idempotent.

## Executors

Two flavours, both subclasses of `BaseExecutor`.
Two subclasses of `BaseExecutor`:

```mermaid
classDiagram
Expand All @@ -95,24 +80,11 @@ classDiagram

### `AsyncExecutor`

"Run this coroutine as fast as possible, yielding between iterations."

```mermaid
flowchart LR
Start --> Check{running?}
Check -- no --> End
Check -- yes --> Call[await func]
Call -- exception --> Log[log error]
Log --> Sleep
Call --> Sleep[await sleep 0]
Sleep --> Check
```

Used by `Subscriber.run` to drive the receive-dispatch loop.
Tight loop: `await func(); await asyncio.sleep(0)`. Used by `Subscriber.run` for receive-dispatch.

### `RateExecutor`

"Run this coroutine at a constant rate, catching up on overruns."
Fixed-grid timer at `rate_hz`. `next_exec_time` is initialized once, then advances by exactly one `interval` per callback invocation — never reset to "now."

```mermaid
flowchart TD
Expand All @@ -122,21 +94,12 @@ flowchart TD
Now --> Due{now >= next?}
Due -- yes --> Call[await func]
Call --> Advance[next += interval]
Advance --> Behind{next < now?}
Behind -- yes --> Reset[next = now + interval]
Behind -- no --> Wait
Reset --> Wait
Due -- no --> Wait[await sleep next - now]
Advance --> Wait
Due -- no --> Wait[await sleep max 0, next - now]
Wait --> Loop
```

The catch-up branch silently drops ticks — if your 100 Hz callback takes
20 ms once, you do not get two callbacks back-to-back; you skip one tick.

!!! warning "Redundant yield"
Today there is an `await asyncio.sleep(0)` inside the loop *and*
`await asyncio.sleep(max(0, dt))` at the bottom. That generates an extra
wakeup per tick. See [critique § 15](../critique.md).
**Missed ticks are not skipped.** If a 100 Hz callback overruns by 20 ms, the next two ticks fire back-to-back with zero-length sleeps until the clock catches up. The grid is preserved; no tick is silently dropped.

## Timer usage

Expand All @@ -145,21 +108,11 @@ node.create_timer(1.0 / 30, self.publish_frame) # 30 Hz
node.create_timer(1.0, self.log_stats) # 1 Hz
```

Timers are plain async functions — no decorator, no magic. They run in the
same event loop as subscriber callbacks, so the same head-of-line caveat
applies.
Plain async functions, no decorator. They share the event loop with subscriber callbacks — same head-of-line caveat.

## Shared ZMQ context

Every publisher and subscriber created through a node **reuses** the node's
`zmq.asyncio.Context`. This means:

- Socket creation is cheap.
- io threads are shared across all sockets in the node.
- Terminating the node's context cleanly shuts down all its sockets.

Do not create your own context inside callbacks; you'll leak resources and
defeat the shared-io-thread optimization.
Every publisher/subscriber created through a node reuses the node's `zmq.asyncio.Context`. Socket creation is cheap, io threads are shared, terminating the context shuts everything down. Don't create your own context inside callbacks.

## Minimal complete node

Expand Down
Loading
Loading