chore: tighten dispatcher carve-out to Thread.new only#181
Conversation
The first governance PR (#180) carved out `Thread` for the dispatcher by skipping the entire `FORBIDDEN_THREAD_SENDS = %i[new start fork]` check whenever the file path matched. That was broader than the documented exception, which says "Thread.new for the worker pool" — `Thread.start` and `Thread.fork` were silently allowed too. Tighten to the documented scope: - New `DISPATCHER_RELAXED_THREAD_SENDS = %i[new]` constant. - The cop only skips the offense when the method matches that list. - `Thread.start` and `Thread.fork` are still flagged in the dispatcher. Bounded `parallel_map` does not need start or fork; keeping them blocked closes the gap between the documented carve-out and the cop allow-list. Tests in `spec/r0/rubocop_cops_test.rb` add explicit pins: `Thread.new` allowed in dispatcher.rb (pre-existing); `Thread.start` and `Thread.fork` flagged in dispatcher.rb (new). Roadmap §2.4 (carve-out paragraph + concurrency table row), §9.1 (split rows for `Thread.new` vs `Thread.start`/`Thread.fork`), Appendix F (cop description), CLAUDE.md, and CHANGELOG.md updated to state the documented surface explicitly. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 6df2fd7f84
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if receiver&.const_type? && %w[Thread Ractor].include?(receiver.const_name) | ||
| return if dispatcher_relaxed_file? && receiver.const_name == "Thread" | ||
| if dispatcher_relaxed_file? && receiver.const_name == "Thread" && | ||
| DISPATCHER_RELAXED_THREAD_SENDS.include?(method_name) | ||
| return | ||
| end | ||
|
|
||
| add_offense(node) if FORBIDDEN_THREAD_SENDS.include?(method_name) |
There was a problem hiding this comment.
Ban non-
Thread.new sends in dispatcher carve-out
The new carve-out is documented as allowing only Thread.new in lib/dag/effects/dispatcher.rb, but this branch still permits any other Thread send that is not in FORBIDDEN_THREAD_SENDS (for example Thread.current or Thread.abort_on_exception=), because Thread constants are exempted in that file and only new/start/fork are checked. That leaves the cop weaker than the stated policy and allows thread-global behavior changes to slip through without offenses.
Useful? React with 👍 / 👎.
#182) * feat: add Dispatcher.new(parallelism:) for bounded intra-tick dispatch V1.3 introduces bounded intra-tick parallel dispatch so consumers can fan out N co-eligible effects (e.g. tool calls and LLM calls in the same workflow) instead of paying `N × per_record` wall time. The opt-in is a single additive kwarg; the default is identical to V1.2. ## Dispatcher - New `parallelism: 1` kwarg on `DAG::Effects::Dispatcher.new`, validated as a positive Integer via `DAG::Validation`. - New private `parallel_map(items)` replaces the V1.2 `claimed.map { dispatch_record(record) }`. With `parallelism <= 1` or `items.length <= 1` it reduces to `items.map`, preserving the V1.2 serial contract bit-identical. Otherwise it pre-allocates a result Array, feeds a `Queue` of `[record, slot_index]` pairs, and spawns at most `parallelism` `Thread.new` workers. Workers `pop` pairs from the queue, invoke the block, and write the outcome into their slot. `workers.each(&:value)` propagates any exception that escaped the worker via `Thread#value` after `join` — preserving the V1.2 serial-map exception semantics where any exception not caught by `dispatch_record` (still only `StaleLeaseError`) re-emerges from `tick`. - Order preservation is per collection: `succeeded.map(&:id)` is a subsequence of `claimed.map(&:id)` in original order. Slot alignment between the two arrays is not promised (and was never promised even at V1.2 with mixed outcomes). ## Storage thread-safety contract `parallelism > 1` requires the storage adapter to declare thread-safety by implementing `#thread_safe_for_dispatch?` and answering truthy. Adapters that do not declare it cause `Dispatcher.new(..., parallelism: > 1)` to raise `ArgumentError`, surfacing the contract mismatch at construction. `Memory::Storage` is single-process by Roadmap §2.4 and intentionally does not declare it; durable adapters (typically SQLite-backed in consumer hosts) bind every dispatcher-touched method to a transaction and declare it explicitly. ## Tests `spec/r2/effects_dispatcher_test.rb` adds 7 V1.3 tests: - Default `parallelism: 1` keeps `max_in_flight == 1` over a 4-record batch — V1.2 serial behavior bit-identical. - `parallelism: 3` over a 6-record batch caps `max_in_flight <= 3` and observes `>= 2` (so we know we actually parallelized). - `parallelism: 4` with mixed success/failure outcomes preserves collection order: `succeeded.map(&:id)` and `failed.map(&:id)` are both subsequences of `claimed.map(&:id)`. - Non-`StandardError` exceptions raised inside a worker thread propagate out of `tick` (verified with a custom `Exception` subclass to bypass `invoke_handler`'s `rescue => caught`). - `parallelism > 1` with `Memory::Storage` raises `ArgumentError` with a message naming `thread_safe_for_dispatch?`. - `parallelism` validates as positive Integer (`0`, `-1`, `1.5` raise). A test-scope `TestThreadSafeMemoryStorage < Memory::Storage` opts into the contract by declaring `thread_safe_for_dispatch? = true`, since the parallel tests rely on per-record-isolated state and CRuby's GVL — production parallel dispatch uses durable adapters that satisfy the contract by transaction binding. The `IntervalRecordingHandler` test helper writes per-record [entered_at_ns, exited_at_ns] timestamps into disjoint slots and sweeps them ex post to compute `max_concurrent`. This avoids needing thread synchronization primitives in the test (writes target disjoint Hash keys; CRuby's GVL keeps `Hash#[]=` atomic). ## Release artifacts - `lib/dag/version.rb` bumped 1.2.0 → 1.3.0; `Gemfile.lock` updated. - `CHANGELOG.md`: V1.3 release notes plus governance carve-out entry that already shipped in #180/#181. - `ROADMAP.md`: V1.3 dispatcher parallelism + Release v1.3 marked Done. - `CONTRACT.md`: "Dispatcher Concurrency Contract" subsection now names `thread_safe_for_dispatch?` explicitly. - `spec/r0/v1_2_release_gate_test.rb`: removes the `assert_equal "1.2.0", DAG::VERSION` check (release-gate version pin moves to the V1.3 gate, matching the pattern from V1.1 → V1.2). - `spec/r0/v1_3_release_gate_test.rb`: new release gate covering version, changelog, roadmap, dispatcher kwarg, and contract docs. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: join all workers before propagating worker exceptions The first iteration used `workers.each(&:value)` to propagate the first worker exception. That re-raises immediately when it hits the dead worker, leaving peer workers still in flight — they keep calling `mark_effect_*`, `release_nodes_satisfied_by_effect`, and `append_event` on storage *after* `#tick` has already returned to its caller. The caller sees an exception with an inconsistent post-tick state. `Thread#join` has the same shortcoming: per Ruby semantics, joining a thread that terminated abnormally re-raises that thread's exception, so a `workers.each(&:join)` followed by `workers.each(&:value)` short- circuits identically. Fix: capture exceptions inside each worker, park them in a shared `errors` list, set a shared abort flag (so peer workers stop pulling new records after their current item completes), and exit the worker loop normally so no worker terminates with an exception. Now `workers.each(&:join)` actually waits for every worker — including the peers that are still finishing their current record — and only then does `#tick` raise the first captured exception. `# standard:disable Lint/RescueException` is required on the rescue: we deliberately catch every exception class (not just StandardError) so non-StandardError throws also propagate through the same path, preserving the V1.2 serial-map exception contract. A new test in `spec/r2/effects_dispatcher_test.rb` pins the invariant: with `parallelism: 2` and 4 records where `k0` raises after a 20ms delay (giving the peer worker time to grab `k1` and enter its 100ms-sleep handler), the test asserts that the peer worker's completion timestamp is strictly less than the timestamp captured immediately after `#tick` raised. Without the fix the peer hadn't yet returned from its 100ms sleep when `#tick` re-raised, which the test would observe as `finished_at == {}`. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix: drain queue on worker failure instead of using a shared flag The previous iteration used a shared `abort_flag` Array that the failing worker pushed to and that peer workers checked at the top of each loop iteration. This had two weaknesses Codex stop-time review flagged as "abort flag does not actually stop post-failure work": 1. The check sat *before* `queue.pop(true)`, so a peer that had already passed the check could still pop a new record after the failing worker raised — the flag arrived too late for that iteration. 2. Cross-thread visibility of mutations to a plain `Array` is not guaranteed independently of `Queue`'s built-in synchronization. On CRuby with the GVL it usually works, but the contract should not depend on that. Fix: when a worker rescues an exception, it drains the work queue itself (popping non-blockingly until `ThreadError`). Peer workers that try to pop after the drain see `ThreadError` immediately and exit. The signal rides on `Queue`'s thread-safety guarantees, with no auxiliary state. Records that were drained but never processed remain in `:dispatching` state in storage; their leases will expire and a future `tick` can re-claim them — same semantics as if a worker process had crashed mid-batch. Peer workers still complete whatever record they were already processing when the failure happened (that's the join-before-raise invariant from the previous commit). The drain only stops them from *starting* new records. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Summary
Tightening follow-up to #180. The first PR carved out
Threadforlib/dag/effects/dispatcher.rbby short-circuiting all ofFORBIDDEN_THREAD_SENDS = %i[new start fork]whenever the file matched. That was broader than the documented exception, which says "Thread.new for the worker pool" —Thread.startandThread.forkwere silently allowed.This PR closes the gap between the documented carve-out and the cop allow-list.
What changes
DISPATCHER_RELAXED_THREAD_SENDS = %i[new]constant.on_sendonly skips the offense when the method matches that list.Thread.startandThread.forkstay flagged everywhere, including the dispatcher.spec/r0/rubocop_cops_test.rbpin the chirurgical scope:Thread.startandThread.forkflagged inlib/dag/effects/dispatcher.rb.Why
Bounded
parallel_map(the V1.3 feature pattern) needsThread.newonly.Thread.start(suspended-on-create) andThread.fork(alias forstart) are not used by the pattern, and leaving them globally banned removes a vector for the carve-out to drift in subtle ways later.Test plan
bundle exec rake— 609 runs, 40633 assertions, 0 failuresbundle exec rubocopcleanThread.startflagged in dispatcher.rb;Thread.forkflagged in dispatcher.rb; existingThread.newallowed test still green🤖 Generated with Claude Code