Skip to content

fix: use non-blocking dispatch to prevent pipeline starvation#505

Merged
nabinchha merged 3 commits intomainfrom
nmulepati/fix/504-stale-dispatch-loop
Apr 8, 2026
Merged

fix: use non-blocking dispatch to prevent pipeline starvation#505
nabinchha merged 3 commits intomainfrom
nmulepati/fix/504-stale-dispatch-loop

Conversation

@nabinchha
Copy link
Copy Markdown
Contributor

@nabinchha nabinchha commented Apr 7, 2026

📋 Summary

The async scheduler's dispatch loop processes a stale snapshot of ready tasks, preventing pipeline parallelism in multi-model workloads. When the submission semaphore is full, the loop blocks on acquire() while newly-ready downstream tasks accumulate in the frontier unseen. This PR replaces the blocking acquire with a non-blocking try_acquire that breaks out of the loop when the semaphore is contended, causing the outer loop to re-query the frontier and pick up downstream tasks.

🔗 Related Issue

Fixes #504

🔄 Changes

  • Add TrackingSemaphore.try_acquire() — a non-blocking acquire that returns False when no permits are available instead of blocking (async_scheduler.py#L58-L63)
  • Replace await self._submission_semaphore.acquire() with try_acquire() + break in both _main_dispatch_loop and _drain_frontier, so the dispatch loop breaks out when the semaphore is full and re-queries the frontier on the next iteration (async_scheduler.py#L306-L315, async_scheduler.py#L412-L419)
  • Update wake-event condition in _main_dispatch_loop from if not ready to if not ready or semaphore_full so the loop waits for a worker completion before re-querying when it broke out early (async_scheduler.py#L334)

🧪 Testing

  • make test passes (317/317 dataset builder tests)
  • Unit test added for TrackingSemaphore.try_acquire
  • Integration test added (test_scheduler_downstream_interleaves_with_upstream) — builds the exact pipeline topology from the issue (topic → 3 gen cols → 3 judge cols), uses a small semaphore (4) with 10 records, and asserts that judge tasks begin dispatching before all gen tasks finish
  • Verify e2e running configs at scale

✅ Checklist

  • Follows commit message conventions
  • Commits are signed off (DCO)

Made with Cursor

Replace blocking semaphore acquire in the dispatch loop with a
non-blocking try_acquire that breaks out when the semaphore is full.
This causes the outer loop to re-query the frontier, picking up
newly-ready downstream tasks instead of draining a stale snapshot.

Fixes #504

Made-with: Cursor
@nabinchha nabinchha requested a review from a team as a code owner April 7, 2026 20:38
@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Apr 7, 2026

Greptile Summary

Replaces blocking await self._submission_semaphore.acquire() with a new non-blocking TrackingSemaphore.try_acquire() in both _main_dispatch_loop and _drain_frontier, breaking out of the inner dispatch loop when the semaphore is contended so the outer loop re-queries the frontier and picks up newly-ready downstream tasks. A semaphore_full flag is added to _main_dispatch_loop so the outer loop still waits on the wake event when it broke out early, preventing a busy-spin.

Confidence Score: 5/5

Safe to merge — the core logic change is correct and well-tested; the single finding is a P2 type annotation issue in a test.

Both changed files are sound. The try_acquire() implementation is correct for asyncio's single-threaded model, the semaphore_full flag correctly gates re-waiting on worker completion, and _drain_frontier handles the break-out path safely. The only finding is a P2 type annotation violation in the new test (passing graph.columns instead of an ArtifactStorage mock), which does not affect runtime correctness.

No files require special attention.

Vulnerabilities

No security concerns identified.

Important Files Changed

Filename Overview
packages/data-designer-engine/src/data_designer/engine/dataset_builders/async_scheduler.py Adds TrackingSemaphore.try_acquire() and replaces blocking acquire() calls in _main_dispatch_loop and _drain_frontier with non-blocking try_acquire() + break; the semaphore_full flag in the main loop correctly gates re-waiting on worker completion.
packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py Adds unit test for TrackingSemaphore.try_acquire and integration test for downstream-interleaving; the new buffer manager is passed a list[str] instead of an ArtifactStorage instance, which is a type violation (though harmless here).

Flowchart

%%{init: {'theme': 'neutral'}}%%
flowchart TD
    A[_main_dispatch_loop: outer while True] --> B[clear wake_event\nget_ready_tasks]
    B --> C{for task in ready}
    C --> D{try_acquire?}
    D -- False --> E[semaphore_full = True\nbreak inner loop]
    D -- True --> F[dispatch task\n_spawn_worker]
    F --> C
    E --> G[checkpoint completed RGs]
    G --> H{all_done?}
    H -- Yes --> I[exit loop]
    H -- No --> J{not ready\nOR semaphore_full?}
    J -- Yes --> K[await wake_event.wait]
    K --> A
    J -- No --> A

    subgraph Worker task
        W1[_execute_task_inner_impl] --> W2{LLM task?}
        W2 -- Yes --> W3[acquire llm_wait_sem\nrelease submission_sem]
        W2 -- No --> W4[run generator]
        W3 --> W4
        W4 --> W5[finally: in_flight.discard\nrelease submission_sem\nwake_event.set]
    end
Loading
Prompt To Fix All With AI
This is a comment left during a code review.
Path: packages/data-designer-engine/tests/engine/dataset_builders/test_async_scheduler.py
Line: 1526

Comment:
**Wrong type passed to `RowGroupBufferManager`**

`graph.columns` is a `list[str]`, but `RowGroupBufferManager.__init__` expects an `ArtifactStorage`. This works at runtime here because `on_finalize_row_group` is `None` so `checkpoint_row_group` (the only code path that calls `self._artifact_storage`) is never reached — but it violates the type contract and would confuse a reader or a type-checker. A `MagicMock()` (consistent with the pattern used in other tests such as `test_scheduler_with_buffer_manager`) would be the correct substitute.

```suggestion
    buffer_manager = RowGroupBufferManager(MagicMock())
```

How can I resolve this? If you propose a fix, please make it concise.

Reviews (3): Last reviewed commit: "Merge branch 'main' into nmulepati/fix/5..." | Re-trigger Greptile

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Apr 8, 2026

Docs preview: https://96a60b73.dd-docs-preview.pages.dev

Notebook tutorials are placeholder-only in previews.

Copy link
Copy Markdown
Contributor

@andreatgretel andreatgretel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! 🚢

@nabinchha nabinchha merged commit c27ad62 into main Apr 8, 2026
48 checks passed
@nabinchha nabinchha deleted the nmulepati/fix/504-stale-dispatch-loop branch April 8, 2026 19:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Async scheduler dispatch loop processes stale ready-task list, preventing pipeline parallelism in multi-model workloads

2 participants