feat: add Dispatcher.new(parallelism:) for bounded intra-tick dispatch#182
Conversation
V1.3 introduces bounded intra-tick parallel dispatch so consumers can
fan out N co-eligible effects (e.g. tool calls and LLM calls in the
same workflow) instead of paying `N × per_record` wall time. The
opt-in is a single additive kwarg; the default is identical to V1.2.
## Dispatcher
- New `parallelism: 1` kwarg on `DAG::Effects::Dispatcher.new`,
validated as a positive Integer via `DAG::Validation`.
- New private `parallel_map(items)` replaces the V1.2
`claimed.map { dispatch_record(record) }`. With `parallelism <= 1`
or `items.length <= 1` it reduces to `items.map`, preserving the
V1.2 serial contract bit-identical. Otherwise it pre-allocates a
result Array, feeds a `Queue` of `[record, slot_index]` pairs, and
spawns at most `parallelism` `Thread.new` workers. Workers `pop`
pairs from the queue, invoke the block, and write the outcome into
their slot. `workers.each(&:value)` propagates any exception that
escaped the worker via `Thread#value` after `join` — preserving
the V1.2 serial-map exception semantics where any exception not
caught by `dispatch_record` (still only `StaleLeaseError`)
re-emerges from `tick`.
- Order preservation is per collection: `succeeded.map(&:id)` is a
subsequence of `claimed.map(&:id)` in original order. Slot
alignment between the two arrays is not promised (and was never
promised even at V1.2 with mixed outcomes).
## Storage thread-safety contract
`parallelism > 1` requires the storage adapter to declare
thread-safety by implementing `#thread_safe_for_dispatch?` and
answering truthy. Adapters that do not declare it cause
`Dispatcher.new(..., parallelism: > 1)` to raise `ArgumentError`,
surfacing the contract mismatch at construction. `Memory::Storage`
is single-process by Roadmap §2.4 and intentionally does not
declare it; durable adapters (typically SQLite-backed in consumer
hosts) bind every dispatcher-touched method to a transaction and
declare it explicitly.
## Tests
`spec/r2/effects_dispatcher_test.rb` adds 7 V1.3 tests:
- Default `parallelism: 1` keeps `max_in_flight == 1` over a 4-record
batch — V1.2 serial behavior bit-identical.
- `parallelism: 3` over a 6-record batch caps `max_in_flight <= 3`
and observes `>= 2` (so we know we actually parallelized).
- `parallelism: 4` with mixed success/failure outcomes preserves
collection order: `succeeded.map(&:id)` and `failed.map(&:id)` are
both subsequences of `claimed.map(&:id)`.
- Non-`StandardError` exceptions raised inside a worker thread
propagate out of `tick` (verified with a custom `Exception`
subclass to bypass `invoke_handler`'s `rescue => caught`).
- `parallelism > 1` with `Memory::Storage` raises `ArgumentError`
with a message naming `thread_safe_for_dispatch?`.
- `parallelism` validates as positive Integer (`0`, `-1`, `1.5`
raise).
A test-scope `TestThreadSafeMemoryStorage < Memory::Storage` opts
into the contract by declaring `thread_safe_for_dispatch? = true`,
since the parallel tests rely on per-record-isolated state and
CRuby's GVL — production parallel dispatch uses durable adapters
that satisfy the contract by transaction binding.
The `IntervalRecordingHandler` test helper writes per-record
[entered_at_ns, exited_at_ns] timestamps into disjoint slots and
sweeps them ex post to compute `max_concurrent`. This avoids needing
thread synchronization primitives in the test (writes target
disjoint Hash keys; CRuby's GVL keeps `Hash#[]=` atomic).
## Release artifacts
- `lib/dag/version.rb` bumped 1.2.0 → 1.3.0; `Gemfile.lock` updated.
- `CHANGELOG.md`: V1.3 release notes plus governance carve-out
entry that already shipped in #180/#181.
- `ROADMAP.md`: V1.3 dispatcher parallelism + Release v1.3 marked
Done.
- `CONTRACT.md`: "Dispatcher Concurrency Contract" subsection now
names `thread_safe_for_dispatch?` explicitly.
- `spec/r0/v1_2_release_gate_test.rb`: removes the
`assert_equal "1.2.0", DAG::VERSION` check (release-gate version
pin moves to the V1.3 gate, matching the pattern from V1.1 → V1.2).
- `spec/r0/v1_3_release_gate_test.rb`: new release gate covering
version, changelog, roadmap, dispatcher kwarg, and contract docs.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 8f3b5a9c42
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| end | ||
| end | ||
| end | ||
| workers.each(&:value) |
There was a problem hiding this comment.
Drain all workers before propagating thread exceptions
When parallelism > 1, workers.each(&:value) raises as soon as the first worker thread has an unhandled exception, which exits parallel_map without waiting for the remaining workers. In that case, other worker threads can continue dispatching and mutating storage after tick has already failed, so callers observe an exception while additional effects are still being marked/released in the background. Add an ensure path that joins/terminates all workers before re-raising to keep failure behavior bounded and deterministic.
Useful? React with 👍 / 👎.
The first iteration used `workers.each(&:value)` to propagate the
first worker exception. That re-raises immediately when it hits the
dead worker, leaving peer workers still in flight — they keep calling
`mark_effect_*`, `release_nodes_satisfied_by_effect`, and
`append_event` on storage *after* `#tick` has already returned to
its caller. The caller sees an exception with an inconsistent
post-tick state.
`Thread#join` has the same shortcoming: per Ruby semantics, joining a
thread that terminated abnormally re-raises that thread's exception,
so a `workers.each(&:join)` followed by `workers.each(&:value)` short-
circuits identically.
Fix: capture exceptions inside each worker, park them in a shared
`errors` list, set a shared abort flag (so peer workers stop pulling
new records after their current item completes), and exit the worker
loop normally so no worker terminates with an exception. Now
`workers.each(&:join)` actually waits for every worker — including
the peers that are still finishing their current record — and only
then does `#tick` raise the first captured exception.
`# standard:disable Lint/RescueException` is required on the rescue:
we deliberately catch every exception class (not just StandardError)
so non-StandardError throws also propagate through the same path,
preserving the V1.2 serial-map exception contract.
A new test in `spec/r2/effects_dispatcher_test.rb` pins the
invariant: with `parallelism: 2` and 4 records where `k0` raises
after a 20ms delay (giving the peer worker time to grab `k1` and
enter its 100ms-sleep handler), the test asserts that the peer
worker's completion timestamp is strictly less than the timestamp
captured immediately after `#tick` raised. Without the fix the peer
hadn't yet returned from its 100ms sleep when `#tick` re-raised,
which the test would observe as `finished_at == {}`.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Pushed a follow-up commit The fix captures exceptions inside each worker into a shared A new test
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d51cd9c924
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| break unless abort_flag.empty? | ||
| pair = begin | ||
| queue.pop(true) |
There was a problem hiding this comment.
Prevent workers from dequeuing new items after abort
The abort check and dequeue are split (break unless abort_flag.empty? then queue.pop(true)), so they are not atomic across threads. When one worker captures an exception and sets abort_flag, a peer can still pass the empty-check and pop a new record before seeing the flag, which means additional effects may be marked/released even though tick is already on an exception path. This violates the intended “finish only already-running work, then raise” behavior and can make failure-side effects larger than expected for parallelism > 1.
Useful? React with 👍 / 👎.
The previous iteration used a shared `abort_flag` Array that the failing worker pushed to and that peer workers checked at the top of each loop iteration. This had two weaknesses Codex stop-time review flagged as "abort flag does not actually stop post-failure work": 1. The check sat *before* `queue.pop(true)`, so a peer that had already passed the check could still pop a new record after the failing worker raised — the flag arrived too late for that iteration. 2. Cross-thread visibility of mutations to a plain `Array` is not guaranteed independently of `Queue`'s built-in synchronization. On CRuby with the GVL it usually works, but the contract should not depend on that. Fix: when a worker rescues an exception, it drains the work queue itself (popping non-blockingly until `ThreadError`). Peer workers that try to pop after the drain see `ThreadError` immediately and exit. The signal rides on `Queue`'s thread-safety guarantees, with no auxiliary state. Records that were drained but never processed remain in `:dispatching` state in storage; their leases will expire and a future `tick` can re-claim them — same semantics as if a worker process had crashed mid-batch. Peer workers still complete whatever record they were already processing when the failure happened (that's the join-before-raise invariant from the previous commit). The drain only stops them from *starting* new records. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Pushed The previous iteration used a shared
Fix: when a worker rescues an exception, it drains the work queue itself (popping non-blockingly until Records that were drained but never processed remain in The join-before-raise invariant from
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: b1df2d7a3a
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| errors << exception | ||
| drain_queue(queue) | ||
| break |
There was a problem hiding this comment.
Serialize abort so peers cannot dequeue after worker failure
After a worker hits an exception, drain_queue(queue) is intended to stop further dispatch, but draining is not exclusive: peer workers can still win a concurrent queue.pop(true) before the queue is fully drained and start processing new records that were not in flight when the failure occurred. That means tick can still execute additional handlers/storage mutations on its exception path, which breaks the bounded-failure behavior documented in this change. Fresh evidence relative to the prior comment: the old abort-flag check is gone, but the new drain-only approach at this site still has the same dequeue race because there is no synchronized abort gate around pop.
Useful? React with 👍 / 👎.
Summary
V1.3 introduces bounded intra-tick parallel dispatch. Consumers can fan out N co-eligible effects in one
tickinstead of payingN × per_recordwall time. The opt-in is a single additive kwarg; the default is identical to V1.2.This implements the feature half of the V1.3 split. The governance carve-out shipped first as #180 (cop allow-list) and #181 (cop tightening).
What's new
DAG::Effects::Dispatcher.new(parallelism: 1)(default) preserves the V1.2 serial contract bit-identical.parallelism > 1claims records as before but dispatches them through a boundedThread.newpool of at mostparallelismworkers. Pool reads from aQueueof[record, slot_index]pairs, writes outcomes into a pre-allocated Array slot.succeeded.map(&:id)is a subsequence ofclaimed.map(&:id)in original order.#tickviaThread#valueafterjoin— V1.2 serial-map exception semantics preserved (dispatch_recordstill catches onlyStaleLeaseError).parallelism > 1requires the storage adapter to declare#thread_safe_for_dispatch?truthy.Memory::Storagedoes not declare it (single-process per Roadmap §2.4); constructingDispatcher.new(..., parallelism: > 1)with it raisesArgumentError. Durable adapters (typically SQLite-backed in consumer hosts) declare it explicitly.Tests (
spec/r2/effects_dispatcher_test.rb)7 new tests:
parallelism: 1default →max_in_flight == 1over a 4-record batchparallelism: 3over 6 records →max_in_flight <= 3and>= 2parallelism: 4with mixed success/failure → collection order preservedStandardErrorworker exceptions propagate out oftickparallelism > 1withMemory::Storage→ArgumentErrornamingthread_safe_for_dispatch?parallelismvalidation:0,-1,1.5all raiseparallelism: 1default.)The
IntervalRecordingHandlertest helper writes per-record[entered_at_ns, exited_at_ns]into disjoint Hash slots and sweeps the merged interval set ex post to computemax_concurrent. No thread-sync primitives needed in the test (writes target disjoint keys; CRuby's GVL keepsHash#[]=atomic per key). End-events sort before start-events at equal timestamps to avoid false-positive overlap on abutting intervals.Release artifacts
lib/dag/version.rb: 1.2.0 → 1.3.0Gemfile.lock: version pin updatedCHANGELOG.md: V1.3 release section with feature notes + already-merged governance carve-outROADMAP.md: V1.3 dispatcher parallelism + Release v1.3 marked DoneCONTRACT.md: "Dispatcher Concurrency Contract" subsection now namesthread_safe_for_dispatch?explicitlyspec/r0/v1_2_release_gate_test.rb: drops theassert_equal \"1.2.0\", DAG::VERSIONcheck (release-gate version pin moves to V1.3 gate, matching the V1.1 → V1.2 pattern)spec/r0/v1_3_release_gate_test.rb: new release gate covering version, changelog, roadmap, dispatcher kwarg, and contract docsTest plan
bundle exec rake— 619 runs, 40676 assertions, 0 failuresbundle exec rubocop— clean (Thread.new,Queue,<<,pop,[]=in dispatcher.rb pass the V1.3 carve-out;Mutex/Monitor/etc. still banned)bundle exec standardrb— cleanbundle exec yard doc— 99.15% documentedWhat's intentionally NOT in this PR
per_type_parallelism: {}). V1.4+.workflow_id:filter onclaim_ready_effects. V1.4+.Runner. Separate design pass; would move §2.1 Determinism pillar.🤖 Generated with Claude Code