Skip to content

Async scheduler dispatch loop processes stale ready-task list, preventing pipeline parallelism in multi-model workloads #504

@andreatgretel

Description

@andreatgretel

Priority Level

High (Major improvement)

Is your feature request related to a problem? Please describe.

In multi-model pipelines where generator columns feed judge columns (each on separate inference endpoints), the async scheduler's dispatch loop can prevent pipeline parallelism.

The scheduler calls get_ready_tasks() to get a snapshot of all currently dispatchable tasks, then iterates through the entire list, submitting each one. With 3 generator columns x 10k records, this snapshot contains 30k root-ready gen tasks. The loop submits them one at a time, blocking on the submission semaphore when it's full and resuming as slots free up.

Meanwhile, as gen tasks complete, _enqueue_downstream correctly adds judge tasks to the frontier. However, the dispatch loop is still iterating through its original 30k-task snapshot. It never re-calls get_ready_tasks() to see the newly-ready judge tasks. Judge tasks sit in the frontier until all 30k gen tasks have been submitted, at which point the loop finishes, control returns to the scheduler's main loop, and get_ready_tasks() is called again - finally seeing the accumulated judge tasks.

The result: gen runs to completion, then judge runs to completion. No overlap. Identical to sync mode.

The workaround is using a small buffer_size (e.g. 50-100) which creates many row groups. Each row group has a small number of tasks (e.g. 300 for 50 records x 6 columns), so the dispatch loop processes them quickly, returns to the main loop, and get_ready_tasks() picks up downstream tasks from earlier completed row groups. With the default buffer_size equal to num_records (one large row group), no interleaving occurs.

Evidence

Dual-model benchmark: gpt-oss-120b (generator, TP=4) + Ministral 8B (judge, TP=1) with 10k records per shard, max_parallel_requests=64, default buffer_size (one row group).

Request logs from the async run show:

  • First 22 minutes: only the generator model received requests
  • Last 2 minutes: the judge model received the bulk of its requests
  • Total time identical to sync (~1465s async vs ~1503s sync average)

Same pipeline with buffer_size=50 (20 row groups) at max_parallel_requests=16 shows proper gen/judge overlap and meaningful speedup.

Describe the solution you'd like

The dispatch loop should periodically re-check the frontier for newly-ready downstream tasks rather than processing a stale snapshot. Options:

  1. Break and re-fetch when the semaphore blocks: If _submission_semaphore.acquire() has to wait, break out of the current task list, return to the main loop, and call get_ready_tasks() again. The new snapshot now includes downstream tasks from completed upstream work.

  2. Priority-based dispatch: When fetching ready tasks, prioritize downstream tasks over root tasks. They unblock row completion and checkpoint, while root tasks just add more work to the queue.

  3. Incremental frontier consumption: Instead of materializing the full ready-task list, consume from the frontier incrementally, checking for new downstream tasks between submissions.

Describe alternatives you've considered

  • Using small buffer_size: works as a workaround by creating many small row groups, but users shouldn't need to tune this for pipeline parallelism.
  • Reducing max_parallel_requests: slows down each column, making the stale-list problem less severe, but artificially limits GPU utilization.

Agent Investigation

In async_scheduler.py, the relevant code path (simplified):

# Main scheduler loop
while True:
    # Snapshot of all currently ready tasks - called once per iteration
    ready = self._tracker.get_ready_tasks(self._dispatched, admitted_ids)

    # Iterates through the ENTIRE snapshot before returning to the while loop
    for task in ready:
        await self._submission_semaphore.acquire()   # blocks when full
        self._dispatched.add(task)
        self._spawn_worker(self._execute_task(task))

    # Only here does the while loop re-iterate and call get_ready_tasks() again
    # By this point, downstream tasks have been waiting in the frontier

_enqueue_downstream in completion_tracker.py correctly adds per-row downstream tasks to the frontier when upstream tasks complete. The frontier has the right tasks; the dispatch loop just doesn't see them until it finishes iterating through its current snapshot.

Checklist

  • I've reviewed existing issues and the documentation
  • This is a design proposal, not a "please build this" request

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions