Skip to content

[Perf] Streams 3: Add qd.stream_parallel() context manager#409

Open
hughperkins wants to merge 40 commits intohp/streams-quadrantsic-2-amdgpu-cpufrom
hp/streams-quadrantsic-3-stream-parallel
Open

[Perf] Streams 3: Add qd.stream_parallel() context manager#409
hughperkins wants to merge 40 commits intohp/streams-quadrantsic-2-amdgpu-cpufrom
hp/streams-quadrantsic-3-stream-parallel

Conversation

@hughperkins
Copy link
Copy Markdown
Collaborator

@hughperkins hughperkins commented Mar 11, 2026

Introduces stream_parallel() for running top-level for-loop blocks on separate GPU streams. The AST transformer maps 'with qd.stream_parallel()' blocks to stream-parallel group IDs, which propagate through IR lowering and offloading to the CUDA/AMDGPU kernel launchers. Each unique group ID gets its own stream at launch time. Includes validation that all top-level kernel statements must be stream_parallel blocks (no mixing), and offline cache key support.

lines added: +377 - 161 = +216

Issue: #

Brief Summary

copilot:summary

Walkthrough

copilot:walkthrough

Introduces stream_parallel() for running top-level for-loop blocks on
separate GPU streams. The AST transformer maps 'with qd.stream_parallel()'
blocks to stream-parallel group IDs, which propagate through IR lowering
and offloading to the CUDA/AMDGPU kernel launchers. Each unique group ID
gets its own stream at launch time. Includes validation that all top-level
kernel statements must be stream_parallel blocks (no mixing), and offline
cache key support.
@hughperkins hughperkins marked this pull request as draft March 11, 2026 23:54
…adrantsic-3-stream-parallel

# Conflicts:
#	python/quadrants/lang/stream.py
Prevents stale group IDs from leaking if insert_for is called after a
path that set a non-zero stream_parallel_group_id, matching the reset
pattern of all other ForLoopConfig fields.
Add an error check in begin_stream_parallel() to prevent nesting, which
would produce undefined group ID semantics.
…context safety

Add comments explaining that streams are created/destroyed per launch
(stream pooling as future optimization), and that RuntimeContext sharing
across concurrent streams is safe because kernels only read from it.
@hughperkins
Copy link
Copy Markdown
Collaborator Author

Review from Opus (predates last 5 commits above):

PR Review: qd.stream_parallel() context manager for implicit stream parallelism

Summary

This PR introduces a qd.stream_parallel() context manager that allows users to declare groups of for-loops within a kernel that should execute on separate GPU streams. The feature spans the full stack:

  1. Python API (stream.py): A @contextmanager no-op at runtime, intercepted at compile time by the AST transformer.
  2. AST transform (ast_transformer.py, function_def_transformer.py): Recognizes with qd.stream_parallel(): blocks, calls begin_stream_parallel()/end_stream_parallel() on the C++ ASTBuilder, and validates that all top-level kernel statements are stream_parallel blocks (or none are).
  3. IR propagation (frontend_ir.h/cpp, statements.h/cpp, lower_ast.cpp, offload.cpp): Threads a stream_parallel_group_id through ForLoopConfig -> FrontendForStmt -> RangeForStmt/StructForStmt -> OffloadedStmt -> OffloadedTask.
  4. Codegen (codegen_cuda.cpp, codegen_amdgpu.cpp): Copies stream_parallel_group_id onto the OffloadedTask.
  5. Runtime (kernel_launcher.cpp for CUDA and AMDGPU): Groups consecutive tasks with non-zero group IDs, creates one stream per unique group ID, launches them concurrently, then synchronizes and destroys the streams.

The design is clean: each with qd.stream_parallel(): block gets a monotonically increasing group ID, loops within the same block share a stream (serialized on that stream), while loops in different blocks get different streams (concurrent). On CPU/Metal, the group ID simply has no runtime effect, providing a natural serial fallback.

Architecture & Design Feedback

Strengths:

  • The compile-time interception of a Python context manager is elegant -- stream_parallel() is a no-op yield at Python runtime, but the AST transformer gives it compile-time semantics.
  • The exclusivity validation (all top-level statements must be stream_parallel blocks if any are) is a pragmatic simplification that avoids complex interleaving semantics.
  • The group ID approach naturally handles multiple loops within one stream_parallel block (they share a stream and execute serially on it).

Concerns:

  1. Stream creation/destruction per launch. Every kernel invocation creates and destroys GPU streams. Stream creation is a driver-level synchronization point and can be expensive (~5-50us on CUDA). For kernels invoked in a loop, this overhead could negate the parallelism benefit. Consider a stream pool that reuses streams across invocations, or at least document this as a known limitation.

  2. Near-identical CUDA/AMDGPU launcher code. The stream-parallel dispatch logic in quadrants/runtime/cuda/kernel_launcher.cpp and quadrants/runtime/amdgpu/kernel_launcher.cpp is copy-pasted (~40 lines each). If any bug is found or behavior changes, both must be updated in lockstep. Consider extracting a shared template or helper function.

Potential Bugs / Issues

  1. ForLoopDecoratorRecorder::reset() does not clear stream_parallel_group_id.
// quadrants/ir/frontend_ir.h:950-957
void reset() {
    config.is_bit_vectorized = false;
    config.num_cpu_threads = 0;
    config.uniform = false;
    config.mem_access_opt.clear();
    config.block_dim = 0;
    config.strictly_serialized = false;
}

Every other config field is reset here except stream_parallel_group_id. This works today because all begin_frontend_*_for methods explicitly stamp current_stream_parallel_group_id_ onto the config before use. However, ASTBuilder::insert_for creates a FrontendForStmt using for_loop_dec_.config without first setting stream_parallel_group_id:

// quadrants/ir/frontend_ir.cpp:1347-1359
void ASTBuilder::insert_for(const Expr &s,
                            const Expr &e,
                            const std::function<void(Expr)> &func) {
    auto i = Expr(std::make_shared<IdExpression>(get_next_id()));
    auto stmt_unique = std::make_unique<FrontendForStmt>(i, s, e, this->arch_,
                                                         for_loop_dec_.config);
    for_loop_dec_.reset();
    // ...
}

If insert_for were ever called after a path that set a non-zero stream_parallel_group_id, the stale value would leak. Add config.stream_parallel_group_id = 0; to reset() for safety.

  1. No exception safety on stream creation. In both CUDA and AMDGPU launchers, if a launch() call throws after some streams have been created, those streams leak. This is consistent with the existing error handling style in the codebase, but worth noting. A RAII wrapper or scope guard would make this robust.

  2. Shared RuntimeContext across concurrent streams. In the CUDA launcher, all parallel tasks share the same ctx.get_context() pointer:

// quadrants/runtime/cuda/kernel_launcher.cpp:170-177
for (size_t j = group_start; j < i; j++) {
    auto &t = offloaded_tasks[j];
    CUDAContext::get_instance().set_stream(
        stream_by_id[t.stream_parallel_group_id]);
    cuda_module->launch(t.name, t.grid_dim, t.block_dim,
                        t.dynamic_shared_array_bytes, {&ctx.get_context()},
                        {});
}

If any kernel writes to the RuntimeContext (e.g. result buffer), concurrent kernels sharing the same context could race. This is probably safe if the kernels only read from the context (args), but worth verifying that no kernel writes back into the context during execution. The same applies to AMDGPU where context_pointer is shared.

Edge Cases

  1. Empty stream_parallel blocks. A with qd.stream_parallel(): block with no for-loops inside would set a group ID but generate no tasks. The begin_stream_parallel()/end_stream_parallel() still increments the counter. This is harmless but untested.

  2. Nested stream_parallel blocks. The build_With handler doesn't prevent nesting:

with qd.stream_parallel():
    with qd.stream_parallel():  # nested -- what happens?
        for i in range(...):
            ...

The inner begin_stream_parallel() would overwrite current_stream_parallel_group_id_, and the outer's end_stream_parallel() would reset it to 0. The inner end_stream_parallel() would have already set it to 0 before the outer's. This seems like it would work "accidentally" but the semantics are undefined. Consider explicitly rejecting nested stream_parallel blocks.

  1. with statement in non-kernel qd.func. The build_With handler is registered on the generic AST transformer but the exclusivity validation in _validate_stream_parallel_exclusivity only runs for ctx.is_kernel. What happens if stream_parallel is used inside a @qd.func? The begin_stream_parallel()/end_stream_parallel() calls would still execute and tag loops with group IDs. If those loops get inlined into a kernel, they'd carry the group IDs. Consider whether this should be rejected for @qd.func.

Code Style

  1. The formatting-only changes (multi-line function signatures in ast_transformer.py) are fine but arguably belong in a separate commit to keep the diff focused.

  2. The test_stream_with_ndarray test was moved (not deleted) to the end of the file. This is fine but could cause confusion in the diff for reviewers.

Tests

The test coverage is good:

  • test_stream_parallel_basic -- correctness of two independent parallel blocks
  • test_stream_parallel_multiple_loops_per_stream -- multiple loops sharing a stream
  • test_stream_parallel_timing -- actual speedup measurement on GPU with serial fallback tolerance
  • test_stream_parallel_rejects_mixed_top_level -- validation error testing

Missing tests:

  • Empty stream_parallel block (no loops inside)
  • stream_parallel with struct-for loops (only range-for is tested)
  • stream_parallel with ndarray arguments

Suggestions Summary

Priority Item
High Add config.stream_parallel_group_id = 0; to ForLoopDecoratorRecorder::reset() (#3)
High Verify RuntimeContext is safe to share across concurrent streams (#5)
Medium Reject nested stream_parallel blocks explicitly (#7)
Medium Consider rejecting stream_parallel in @qd.func (#8)
Low Extract shared stream-dispatch logic from CUDA/AMDGPU launchers (#2)
Low Consider stream pooling for repeated kernel launches (#1)

@hughperkins
Copy link
Copy Markdown
Collaborator Author

For the concern about stream pool, added 4th pr to add stream pool #410

@hughperkins hughperkins marked this pull request as ready for review March 12, 2026 01:38
@hughperkins hughperkins changed the title [Perf] Streams part 3: Add qd.stream_parallel() context manager [Perf] Streams 3: Add qd.stream_parallel() context manager Mar 12, 2026
@hughperkins hughperkins marked this pull request as draft March 12, 2026 04:59
Comment on lines +312 to +314
if len(stmt.items) != 1:
return False
item = stmt.items[0]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is items ? Could you document here or somewhere else why the length can be 1 or more, and what does it means in this context?

Comment on lines +327 to +330
"When using qd.stream_parallel(), all top-level statements "
"in the kernel must be 'with qd.stream_parallel():' blocks. "
"Move non-parallel code to a separate kernel."
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't understand why you are moving to the next line before you have to. This is weird to me. But I don't care much.

Comment on lines +321 to +322
has_sp = any(FunctionDefTransformer._is_stream_parallel_with(s, global_vars) for s in body)
if not has_sp:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather do

# <Insert fancy comment explaining what this check is doing>
if not any(FunctionDefTransformer._is_stream_parallel_with(s, global_vars) for s in body):
    return

Comment on lines +1396 to +1398
if len(node.items) != 1:
raise QuadrantsSyntaxError("'with' in Quadrants kernels only supports a single context manager")
item = node.items[0]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. Not clear what items is.

Copy link
Copy Markdown
Contributor

@duburcqa duburcqa Mar 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All this "code duplication" (at least duplicate logics) is annoying but if there is no better choice and it is what we have been doing so far, then it is ok.

…cpu' into hp/streams-quadrantsic-3-stream-parallel

Made-with: Cursor

# Conflicts:
#	quadrants/codegen/llvm/llvm_compiled_data.h
#	quadrants/ir/frontend_ir.cpp
#	quadrants/ir/frontend_ir.h
#	quadrants/ir/statements.cpp
#	quadrants/ir/statements.h
#	quadrants/runtime/amdgpu/kernel_launcher.cpp
#	quadrants/runtime/cuda/kernel_launcher.cpp
#	quadrants/transforms/lower_ast.cpp
#	quadrants/transforms/offload.cpp
Made-with: Cursor
…cpu' into hp/streams-quadrantsic-3-stream-parallel

Made-with: Cursor

# Conflicts:
#	quadrants/codegen/llvm/llvm_compiled_data.h
#	quadrants/runtime/amdgpu/kernel_launcher.cpp
#	quadrants/runtime/cuda/kernel_launcher.cpp
@hughperkins
Copy link
Copy Markdown
Collaborator Author

migrated to use single PR on streams 4

@hughperkins hughperkins reopened this Apr 28, 2026
@hughperkins hughperkins force-pushed the hp/streams-quadrantsic-3-stream-parallel branch 5 times, most recently from bee7e65 to b3fbc39 Compare April 28, 2026 16:03
Comment on lines 126 to 132
offloaded->body->insert(std::move(s->body->statements[j]));
}
offloaded->range_hint = s->range_hint;
offloaded->stream_parallel_group_id = s->stream_parallel_group_id;
offloaded->loop_name = s->loop_name;
root_block->insert(std::move(offloaded));
} else if (auto st = stmt->cast<StructForStmt>()) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Non-static if/while wrapping a for-loop inside with qd.stream_parallel(): silently drops the for-loop's stream_parallel_group_id. The IfStmt/WhileStmt sits at root_block and falls into the else branch at offload.cpp:156, getting bundled into pending_serial_statements (a serial OffloadedStmt with default stream_parallel_group_id=0); meanwhile sibling top-level for-loops in the same with-block become per-group offloaded tasks on NON_BLOCKING streams with no dependency edge — silent data race when a sibling reads what the if-wrapped for writes. Fix: propagate the inner for-loop's group_id when bundling (splitting bundles when group_ids differ), or reject non-static if/while inside stream_parallel at compile time.

Extended reasoning...

What the bug is

The PR threads stream_parallel_group_id through the for-loop branches of Offloader::run (offload.cpp:129 for range_for, 241 for struct_for) but does not propagate it through the catch-all else branch at line 156 that bundles non-for-loop statements into pending_serial_statements. When a non-static if or while wraps a for-loop inside with qd.stream_parallel():, the wrapping IfStmt/WhileStmt is the top-level statement — it falls into that else branch and is moved into a serial OffloadedStmt with default-initialized stream_parallel_group_id=0. The inner RangeForStmt's stamped group_id is preserved on the inner statement but never reaches an OffloadedTask: codegen at codegen_cuda.cpp:641 / codegen_amdgpu.cpp:354 reads the OUTER OffloadedStmt's group_id (= 0).

Why nothing upstream prevents it

build_With (ast_transformer.py:1533-1547) only restricts the context-manager call site to qd.stream_parallel(); it does not introspect the with-body. _validate_stream_parallel_exclusivity (function_def_transformer.py:467-477) only walks top-level kernel-body With nodes — the if/while is INSIDE the with-body so validation passes. begin_stream_parallel only rejects nested-stream_parallel-within-stream_parallel. tests/python/test_streams.py only exercises bare for-loops inside stream_parallel, so this case is untested.

Distinct from related bugs already filed in this PR

  • Bug [Build] Add linters #6 (strictly_serialized): trigger is a top-level RangeForStmt with strictly_serialized=true that fails the !s->strictly_serialized predicate at offload.cpp:93. The cast at line 93 succeeds; the predicate fails.
  • Bug [Build] Disable linux runner running automatically on pr #10 (with nested in if/while/for): trigger is the with-block itself nested inside non-static control flow. Different upstream trigger.
  • Bug [Build] Add clang-tidy linter, and fix lint errors #11 (for-with-break): trigger is break inside the for-loop, which causes lower_ast to emit AllocaStmt+WhileStmt at root_block. No IfStmt involved.
  • This bug: trigger is a non-static if/while (without break) wrapping a for-loop INSIDE the with-body. The wrapping IfStmt/WhileStmt at root_block is the culprit. Same root-cause class (pending_serial_statements does not carry group_id) but a separate user-reachable path.

Step-by-step proof

@qd.kernel
def k(cond: qd.i32):
    with qd.stream_parallel():
        if cond:
            for i in range(N):
                a[i] = compute_a(i)        # for-A: stream_parallel_group_id=1, inside IfStmt
        for j in range(N):
            tmp[j] = a[j]                  # for-tmp: stream_parallel_group_id=1, at root_block
    with qd.stream_parallel():
        for k in range(N):
            b[k] = compute_b(k)            # for-B: stream_parallel_group_id=2, at root_block
  1. build_With calls begin_stream_parallel()current_stream_parallel_group_id_=1.
  2. build_If for non-static cond emits a FrontendIfStmt at root_block (the with-body is not a separate IR scope). The if-body opens a new scope where for-A is built.
  3. begin_frontend_range_for (frontend_ir.cpp:1395) stamps for_loop_dec_.config.stream_parallel_group_id=1 onto the FrontendForStmt for for-A. Same for for-tmp at the with-body's top level. The second with block sets current_stream_parallel_group_id_=2 for for-B.
  4. After lowering: root_block contains an IfStmt with for-A nested inside its true_statements (group_id=1), then for-tmp at root (group_id=1), then for-B at root (group_id=2). lower_ast.cpp:294 propagates group_id correctly onto the inner RangeForStmt.
  5. Offloader::run walks root_block (offload.cpp:90-158):
    • The IfStmt fails the RangeForStmt / StructForStmt / MeshForStmt casts at lines 93/132/135 → falls into the else at line 156 → moved into pending_serial_statements (a serial OffloadedStmt with default-initialized stream_parallel_group_id=0).
    • for-tmp (RangeForStmt with group_id=1) hits line 93 → assemble_serial_statements flushes the pending serial OffloadedStmt(group=0) into root_block; then for-tmp becomes a fresh range_for OffloadedStmt with group_id=1 (line 129 propagates).
    • for-B becomes range_for OffloadedStmt with group_id=2.
  6. Final OffloadedTask list: [serial(group=0, contains IfStmt+inner-for-A), range_for(group=1, for-tmp), range_for(group=2, for-B)].
  7. Launcher walk (runtime/cuda/kernel_launcher.cpp:55-95):
    • i=0, group=0 → default-stream branch: serial task queued on active_stream (single-threaded execution of inner if+for-A). Returns immediately (async).
    • i=1, group=1 → enters batch (inner while advances i=2 because group_id=2 is also non-zero). Creates s1, s2 with CU_STREAM_NON_BLOCKING; launches for-tmp on s1 and for-B on s2; host-syncs, destroys.

The race

s1 is created CU_STREAM_NON_BLOCKING — it has no implicit dependency on active_stream. The CUDA scheduler is free to begin executing for-tmp on s1 BEFORE for-A on active_stream finishes. for-tmp reads a[] written by for-A, so it can read uninitialized / partially-written values. Without with qd.stream_parallel():, the user would have gotten sequential execution on a single stream (no race). The realistic user pattern if some_flag: for i in range(N): heavy_path(i) (opting into a heavy code path conditionally) silently loses the stream_parallel contract this PR is meant to provide.

Fix

Two reasonable options, mirroring the choices proposed for the related strictly_serialized and for-with-break bugs:

(a) When bundling statements into pending_serial_statements at offload.cpp:156, recursively scan for nested RangeForStmt/StructForStmt with non-zero stream_parallel_group_id and propagate the group_id onto the bundle (splitting bundles when group_ids differ).

(b) Reject non-static if/while directly inside with qd.stream_parallel(): at compile time in build_With or in _validate_stream_parallel_exclusivity, with a clear error suggesting the user move the conditional outside the kernel or split into separate kernels.

Comment on lines +85 to +92
}

for (size_t j = group_start; j < i; j++) {
const auto &t = offloaded_tasks[j];
std::size_t n_t = resolve_num_threads(t.ad_stack, executor);
executor->publish_adstack_metadata(t.ad_stack, n_t, &ctx, device_context_ptr);
CUDAContext::get_instance().set_stream(stream_by_id[t.stream_parallel_group_id]);
cuda_module->launch(t.name, t.grid_dim, t.block_dim, t.dynamic_shared_array_bytes, {&ctx.get_context()}, {});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 🔴 qd.random() inside with qd.stream_parallel(): blocks races on the shared runtime->rand_states[] array. rand_u32 (runtime.cpp:2229-2230) indexes rand_states[linear_thread_idx(context)] non-atomically, where linear_thread_idx is block_idx*block_dim + thread_idx — purely per-grid local with no per-task offset. num_rand_states is sized for one launch's threads (llvm_runtime_executor.cpp:1072), under the documented assumption ("one CUDA thread per random state so we do not need expensive per-state locks") that only one task runs at a time. The new launcher (cuda/kernel_launcher.cpp:75-92, AMDGPU twin) violates that assumption: block(0)thread(0) of two concurrent stream_parallel groups both clobber rand_states[0] with non-atomic read-modify-write — silent identical / lost-update sequences across the two supposedly-independent samplers. Fix: reject qd.random() inside stream_parallel at compile time, or thread a per-task base offset into rand_u32, or at minimum document the restriction.

Extended reasoning...

What the bug is

rand_u32 at quadrants/runtime/llvm/runtime_module/runtime.cpp:2229-2230 does a non-atomic read-modify-write of runtime->rand_states[linear_thread_idx(context)]:

u32 rand_u32(RuntimeContext *context) {
  auto state = &((LLVMRuntime *)context->runtime)->rand_states[linear_thread_idx(context)];
  // x/y/z/w read, shuffle, write back — no atomics, no locks
  ...
}

linear_thread_idx (runtime.cpp:2093-2098) is block_idx() * block_dim() + thread_idx() on CUDA / AMDGPU — purely per-grid local. Two concurrent grids with the same block/grid shape both have block_idx ranges starting at 0, so thread (0,0) on stream A and thread (0,0) on stream B compute the same linear_thread_idx==0 and target the same slot.

num_rand_states is sized once at runtime init to saturating_grid_dim * max_block_dim (llvm_runtime_executor.cpp:1066-1078). The comment immediately above that line spells out the design assumption verbatim:

It is important to make sure that every CUDA thread has its own random state so that we do not need expensive per-state locks.

That assumption holds only when one task runs at a time — which was true pre-PR because all offloaded tasks were dispatched serially on the active stream. The PR's new launch_offloaded_tasks (quadrants/runtime/cuda/kernel_launcher.cpp:75-92, byte-identical AMDGPU twin) creates per-group streams (CU_STREAM_NON_BLOCKING after the related fix) and launches different-group_id tasks concurrently on them. The slot-isolation invariant is now silently broken for any kernel that calls qd.random() from inside two concurrent groups.

Step-by-step proof

User code (Monte Carlo / sampling — exactly what stream_parallel is designed to accelerate):

@qd.kernel
def k():
    with qd.stream_parallel():
        for i in range(N):
            a[i] = qd.random()
    with qd.stream_parallel():
        for j in range(N):
            b[j] = qd.random()
  1. Two range_for OffloadedTasks are emitted with stream_parallel_group_id 1 and 2.
  2. The launcher creates per-group streams s1 and s2, launches task A on s1 and task B on s2 — no inter-stream dependency.
  3. CUDA scheduler runs both grids concurrently. Thread (block_idx=0, thread_idx=0) on s1 and thread (block_idx=0, thread_idx=0) on s2 both compute linear_thread_idx == 0.
  4. rand_u32 in both kernels does state = &runtime->rand_states[0]; both then read x/y/z/w and write back updated values. Two non-atomic read-modify-write sequences on the same memory location race.
  5. Result: stale reads, lost updates, and frequently identical random numbers across the two kernels — silent non-determinism.

Why nothing prevents it

  • The launcher (cuda/kernel_launcher.cpp:67-101) only sets the stream and dispatches; it does nothing about per-task RNG state isolation.
  • Codegen emits the same rand_u32 device function regardless of stream_parallel_group_id; there is no per-launch base-offset machinery to thread through RuntimeContext.
  • num_rand_states is fixed once at init and never re-grown for K concurrent grids — and even if it were, the indexing in rand_u32 has no per-stream / per-group offset to differentiate slots between concurrent grids.
  • The new tests/python/test_streams.py tests do not exercise qd.random() inside stream_parallel, so the regression is untested.

Why this is distinct from existing bugs

This is not the adstack-metadata sharing concern flagged in the original Opus review (#5) or the deeper adstack publish-on-correct-stream issue: those concern host-side LLVMRuntime adstack fields published before launch. This bug is about a different shared array (rand_states[]) accessed at runtime by every device thread that calls qd.random(). Fixing the adstack issues does not fix this race.

Impact

Any user calling qd.random() inside stream_parallel gets non-deterministic, frequently identical sequences across the supposedly-independent parallel groups. Monte Carlo / sampling — exactly the workload that benefits most from launching independent samplers concurrently, and the documented motivation for stream_parallel — is silently broken. No error, no warning, timing-dependent, hard to reproduce.

The same family of issue applies in principle to other shared LLVMRuntime arrays accessed via linear_thread_idx (e.g. node_allocators[snode_id] for sparse-SNode append/activate), but rand_states is the most user-visible and the documented stream_parallel use case (concurrent independent for-loops) makes the contract violation most acute here.

Fix

Three options, in increasing order of ambition:

(a) Reject qd.random() inside stream_parallel at compile time with a clear error pointing the user to remove either the call or the stream_parallel block. Symmetric with the rest of the validators introduced in this PR (the all-or-nothing top-level rule, etc.).

(b) Document that qd.random() is incompatible with stream_parallel in docs/source/user_guide/streams.md (Limitations section). Minimum acceptable mitigation; leaves the silent race in place for users who do not read the docs.

(c) Extend the launcher to communicate a per-task rand_states base offset to the runtime (e.g. via a new RuntimeContext field set per-launch, with codegen adjusting the rand_u32 index accordingly), and grow num_rand_states to cover the maximum concurrent-grid count. Symmetric correctness fix that preserves the documented use case.

The minimum correctness fix is (a); (b) alone is insufficient because the failure is silent.

Comment thread quadrants/runtime/amdgpu/kernel_launcher.cpp
@hughperkins
Copy link
Copy Markdown
Collaborator Author

@claude review

…cpu' into hp/streams-quadrantsic-3-stream-parallel
}
current_task->block_dim = stmt->block_dim;
current_task->dynamic_shared_array_bytes = dynamic_shared_array_bytes;
current_task->stream_parallel_group_id = stmt->stream_parallel_group_id;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 qd.deactivate() inside with qd.stream_parallel(): silently runs serially. Two complementary sites strip stream_parallel_group_id from the gc tasks the offloader inserts: (1) offload.cpp::insert_gc constructs the gc OffloadedStmt without propagating the preceding for-loop's group id (defaults to 0), and (2) emit_cuda_gc / emit_amdgpu_gc emit 3 sub-tasks via init_offloaded_task_function and never copy stmt->stream_parallel_group_id onto current_task — bypassing the codegen_cuda.cpp:641 / codegen_amdgpu.cpp:354 propagation that exists in the else branch. Both fixes are required: the IR-level fix alone has no observable effect because codegen still strips the id; the codegen fix alone never sees a non-zero id to propagate.

Extended reasoning...

What the bug is

This PR threads stream_parallel_group_id through every for-loop branch in lowering, offloading and codegen — but two complementary sites along the gc-task path drop it, with the result that any kernel using qd.deactivate() (a public API) on a sparse SNode inside with qd.stream_parallel(): silently loses concurrency.

Site 1 — IR-level: offload.cpp::insert_gc (lines 630-651). insert_gc is a post-pass that runs after Offloader::run (called from offload() at line 734). It walks the root block, finds OffloadedStmts that deactivate sparse SNodes (via gather_deactivations), and inserts a gc OffloadedStmt right after each one. The construction at line 645:

auto gc_task = Stmt::make_typed<OffloadedStmt>(OffloadedStmt::TaskType::gc, config.arch, kernel);
gc_task->snode = snode;          // stream_parallel_group_id default-initialized to 0
b->insert(std::move(gc_task), i + 1);

OffloadedStmt::stream_parallel_group_id defaults to 0 (statements.h:1357). Nothing reads the preceding OffloadedStmt's group id; the inserted gc task always lands with group_id=0.

Site 2 — Codegen: emit_cuda_gc (codegen_cuda.cpp:522-551) and emit_amdgpu_gc (codegen_amdgpu.cpp:238-267). The propagation line at codegen_cuda.cpp:641 (current_task->stream_parallel_group_id = stmt->stream_parallel_group_id;) lives inside the else branch of visit(OffloadedStmt*). The if branch at line 602-604 short-circuits the gc case to emit_cuda_gc(stmt) and bypasses line 641 entirely. emit_cuda_gc emits 3 sub-tasks (gather_list, reinit_lists, zero_fill); for each it calls init_offloaded_task_function (codegen_llvm.cpp:1810: current_task = std::make_unique<OffloadedTask>(task_kernel_name)), which uses the OffloadedTask ctor that default-initializes stream_parallel_group_id to 0 (llvm_compiled_data.h:78-83). Neither emit_cuda_gc nor emit_amdgpu_gc reads stmt->stream_parallel_group_id or writes it onto current_task before offloaded_tasks.push_back(*current_task).

Why both fixes are required

A developer who applies only the site-1 fix sets a non-zero group_id on the gc OffloadedStmt — but codegen (site 2) immediately discards it when constructing the 3 OffloadedTasks. The launcher reads the OffloadedTask vector, so the user-observable batch-splitting symptom is unchanged. Conversely, the site-2 fix alone would faithfully propagate stmt->stream_parallel_group_id onto current_task — but that field is always 0 because site 1 never sets it. Both fixes are needed for the propagation chain to reach the runtime launcher.

Step-by-step proof

User code:

block = qd.root.pointer(qd.i, 64).dynamic(qd.i, 1024)
a = qd.field(qd.f32); block.place(a)

@qd.kernel
def k():
    with qd.stream_parallel():
        for i in range(64):
            qd.deactivate(block, [i])
    with qd.stream_parallel():
        for j in range(N):
            b[j] = compute_b(j)
  1. Frontend stamps stream_parallel_group_id 1 and 2 onto the two FrontendForStmts (frontend_ir.cpp:1395 + init_config).
  2. Lowering propagates the ids onto the two RangeForStmts (lower_ast.cpp:294).
  3. Offloader::run constructs two range_for OffloadedStmts, propagating group ids 1 and 2 (offload.cpp:129).
  4. insert_gc runs: gather_deactivations finds the deactivate inside the first range_for (since SNodeOpType::deactivate is what qd.deactivate() lowers to via FrontendSNodeOpStmt at frontend_ir.cpp:1480). is_gc_able(SNodeType::pointer) is true (snode_types.cpp:21-23), so a gc OffloadedStmt is inserted at index i+1. Site 1: the gc task gets default stream_parallel_group_id=0.
  5. Root block now contains: [range_for(g=1), gc(g=0), range_for(g=2)].
  6. Codegen visits each:
    • range_for(g=1) → else branch → line 641 sets group_id=1. ✓
    • gc(g=0) → if branch at line 602 → emit_cuda_gc. Site 2: 3 sub-tasks pushed, all with group_id=0 (init_offloaded_task_function default-constructs).
    • range_for(g=2) → else branch → line 641 sets group_id=2. ✓
  7. Final OffloadedTask vector: [range_for(1), gc_0(0), gc_1(0), gc_2(0), range_for(2)].
  8. Launcher walk (cuda/kernel_launcher.cpp:55-95, AMDGPU twin):
    • i=0, group=1: enters batch. Inner-while exits at i=1 (gc_0 is g=0). Batch=[range_for(1)]. Creates s_1, launches, host-syncs, destroys.
    • i=1,2,3, group=0: default-stream branch each, launches gc_0/1/2 on active_stream.
    • i=4, group=2: batch=[range_for(2)]. Creates s_2, launches, host-syncs, destroys.

The two stream_parallel blocks run strictly sequentially with host syncs between them. Zero concurrency, no error, no warning.

Why nothing upstream catches this

_validate_stream_parallel_exclusivity (function_def_transformer.py:467-477) only walks top-level With nodes; qd.deactivate() is inside the with-body, not at top level. build_With only restricts the context manager call site to qd.stream_parallel() and does not introspect the for-loop body. insert_gc runs as a host-side analysis pass after offloading, with no user-facing knob. tests/python/test_streams.py does not exercise sparse SNodes + deactivate inside stream_parallel.

Distinct from existing bugs in this PR

  • Bug [Doc] Check markup links #7 (clear_list / listgen for struct_for): hits the auxiliary tasks emitted in emit_struct_for (offload.cpp:192-205) before the struct_for. This bug is in the post-offload insert_gc pass and triggers on plain range_for + qd.deactivate (no struct_for needed).
  • Bug [Build] Add linters #6 (strictly_serialized) and [Build] Add clang-tidy linter, and fix lint errors #11 (for-with-break): caused by the for-loop falling into offload.cpp's else fallback. Here the for-loop reaches the range_for branch correctly with non-zero group_id; the gc task is appended after it as a separate post-pass.

Fix

Two minimal one-line changes:

(a) IR-level, in insert_gc:

auto gc_task = Stmt::make_typed<OffloadedStmt>(OffloadedStmt::TaskType::gc, config.arch, kernel);
gc_task->snode = snode;
auto *prev = b->statements[i]->cast<OffloadedStmt>();
if (prev) gc_task->stream_parallel_group_id = prev->stream_parallel_group_id;
b->insert(std::move(gc_task), i + 1);

(b) Codegen-level, in each of the 3 sub-blocks of emit_cuda_gc and emit_amdgpu_gc:

current_task->grid_dim = compile_config.saturating_grid_dim;
current_task->block_dim = 64;
current_task->stream_parallel_group_id = stmt->stream_parallel_group_id;  // add
offloaded_tasks.push_back(*current_task);

With both in place, [range_for(1), gc_0(1), gc_1(1), gc_2(1), range_for(2)] becomes a single 5-task batch with two per-group streams; the gc tasks naturally serialize after their triggering for-loop on stream s_1, and the second range_for runs concurrently on s_2.

Comment thread quadrants/ir/statements.h
…ectness

Without this, the offline cache considers two kernels that differ only in stream_parallel_group_id assignments
as identical, potentially serving a cached version with wrong group IDs.
Comment on lines +85 to +95
}

for (size_t j = group_start; j < i; j++) {
const auto &t = offloaded_tasks[j];
std::size_t n_t = resolve_num_threads(t.ad_stack, executor);
executor->publish_adstack_metadata(t.ad_stack, n_t, &ctx, device_context_ptr);
CUDAContext::get_instance().set_stream(stream_by_id[t.stream_parallel_group_id]);
cuda_module->launch(t.name, t.grid_dim, t.block_dim, t.dynamic_shared_array_bytes, {&ctx.get_context()}, {});
}

for (auto &[sid, s] : stream_by_id) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Even with the publish-ordering and missing-publish fixes already filed, the underlying runtime->adstack_offsets / adstack_max_sizes / adstack_per_thread_stride are SINGLE shared device buffers allocated once on the executor (llvm_runtime_executor.cpp:650-657 / .h:231-233). When two stream_parallel groups have different adstack layouts, iter j=B's per-task publish queues async H2D writes that target the SAME device memory iter j=A's kernel reads from at every AdStack* site — silent autodiff corruption with no error path. Fix surface is distinct from the publish-ordering bugs: needs per-group metadata triplets (or compile-time rejection of autodiff inside stream_parallel, or at minimum a doc note alongside the existing graph=True / Tape restrictions). Same bug at quadrants/runtime/amdgpu/kernel_launcher.cpp:84-89.

Extended reasoning...

What the bug is

publish_adstack_metadata writes per-task adstack metadata into the shared LLVMRuntime fields adstack_offsets[], adstack_max_sizes[], and adstack_per_thread_stride. The pointers stored in those fields point at SINGLE per-executor device allocations (adstack_offsets_alloc_ / adstack_max_sizes_alloc_ at llvm_runtime_executor.h:231-233, allocated once at llvm_runtime_executor.cpp:650-657; adstack_per_thread_stride is a single scalar field on LLVMRuntime). The comment at llvm_runtime_executor.cpp:619-623 captures the pre-PR contract: "we must write them before every launch even for tasks where the compile-time and launch-time bounds agree" — i.e. codegen reads from these fields at every device-side AdStack* site, and host must refresh them before each task.

Pre-PR that contract held trivially because all tasks were stream-ordered on the active stream. With the publish-ordering and missing-publish issues already noted in the timeline addressed, the inner stream_parallel loop becomes:

for (j = group_start; j < group_end; j++) {
  set_stream(stream_by_id[t.group_id]);    // per-group stream
  publish_adstack_metadata(t.ad_stack);    // queues H2D on s_K to SHARED dev buffer
  launch(t.name, ...);                     // reads SHARED runtime fields on s_K
}

Per-group streams are created CU_STREAM_NON_BLOCKING (and HIP_STREAM_NON_BLOCKING for AMDGPU), so s_1 and s_2 carry no inter-stream dependency. Iter j=A queues its publish + launch on s_1; iter j=B queues its publish on s_2, writing the same offsets_dev_ptr / max_sizes_dev_ptr device memory that A is currently reading from on s_1. Silent autodiff state corruption with no error path.

Step-by-step proof

@qd.kernel
def k():
    with qd.stream_parallel():
        for i in range(N):
            # reverse-mode autodiff body, adstack max_size = 8
            ...
    with qd.stream_parallel():
        for j in range(M):
            # reverse-mode autodiff body, adstack max_size = 32
            ...
  1. Two range_for OffloadedTasks emerge with stream_parallel_group_id 1 and 2; both have non-empty ad_stack.allocas (autodiff body) but with different max_size per slot.
  2. Launcher (runtime/cuda/kernel_launcher.cpp:85-95, AMDGPU twin):
    • j=group_start (task A): set_stream(s_1). publish_adstack_metadata queues 3 async H2D copies on s_1: stride / offsets_dev_ptr / max_sizes_dev_ptr. Then launch(A) on s_1.
    • j=group_start+1 (task B): set_stream(s_2). publish_adstack_metadata queues 3 async H2D copies on s_2, writing the same offsets_dev_ptr / max_sizes_dev_ptr device memory addresses. Then launch(B) on s_2.
  3. CUDA scheduler is free to overlap: s_1 may still be executing A (whose threads are reading runtime->adstack_max_sizes[0] to compute their per-thread slice base) when the s_2 H2D for max_sizes_dev_ptr arrives and overwrites slot 0 from 8 to 32.
  4. As threads now compute stack-top with max_size=32 instead of 8. Each threads adstack slice is sized for stride-with-max-size-8, but stack pushes index up to 32 — writes past the per-thread slice into the next threads state. Silent heap corruption.

The mirror failure mode is also real: B reading As narrower max_size, leading to under-sized push/pop bounds and either an internal bounds trip or wrap-around indexing depending on codegen.

Why this is distinct from existing comments

  • Publish missing for non-first tasks: the publish was hoisted into the inner loop. That fix moves the failure but does not address the shared-buffer race — every iter still writes the same device memory.
  • Publish-before-set_stream: even with the swap (set_stream first, then publish, then launch), the H2D copies still target the same device buffer, just queued on the per-group stream.
  • rand_states race: a different shared array (runtime->rand_states[]) accessed from device code; same class of issue but different fix surface (rand_states needs a per-task base offset; adstack metadata needs per-launch metadata pointers or per-group metadata buffers).

The two earlier bugs extended reasoning explicitly defer this — they call it "a separate design problem" / "deeper concern" — which is correct: it IS a distinct fix surface.

Why nothing else prevents it

  • The pinned-event guard at llvm_runtime_executor.cpp:854 (pinned_metadata_event_) only protects the pinned host scratch from being overwritten before its prior H2D copies retire. It does NOT protect the destination shared device buffer from being overwritten while a prior kernel still reads it.
  • publish_adstack_metadata early-returns when n_stacks == 0 (llvm_runtime_executor.cpp:621-623), so non-autodiff kernels — including every test in tests/python/test_streams.py — never exercise this path. The regression is latent in merged code.
  • auto_diff.cpp reverse-mode visitor calls for_stmt->clone(), which preserves stream_parallel_group_id (statements.cpp:225) — so a kernel mixing qd.ad.Tape with qd.stream_parallel() produces a backward kernel with non-zero group ids AND adstack pop ops. Reachable via the public API.
  • The docs note Not compatible with autograd Tape but scope it to the explicit qd_stream= parameter, not stream_parallel. There is no compile-time guard combining autodiff with stream_parallel.

Fix options

(a) Allocate one (offsets, max_sizes, stride) triplet per concurrent group and pass per-launch pointers via RuntimeContext (codegen reads from RuntimeContext per-launch metadata pointers instead of the shared runtime->* fields).
(b) Reject autodiff (kernel.autodiff_mode != kNone) inside stream_parallel at compile time, mirroring the existing exclusivity validators.
(c) Fall back to running adstack-using tasks serially even within a stream_parallel block, while non-adstack tasks remain concurrent.
(d) At minimum, document that stream_parallel is incompatible with autodiff, alongside the existing graph=True / Tape limitations in docs/source/user_guide/streams.md.

(b) is the conservative one-liner; (a) is the symmetric correctness fix that preserves the documented use case; (d) alone is insufficient because the failure is silent.

Comment on lines +87 to +93
for (size_t j = group_start; j < i; j++) {
const auto &t = offloaded_tasks[j];
std::size_t n_t = resolve_num_threads(t.ad_stack, executor);
executor->publish_adstack_metadata(t.ad_stack, n_t, &ctx, device_context_ptr);
CUDAContext::get_instance().set_stream(stream_by_id[t.stream_parallel_group_id]);
cuda_module->launch(t.name, t.grid_dim, t.block_dim, t.dynamic_shared_array_bytes, {&ctx.get_context()}, {});
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 In the inner stream_parallel loop at quadrants/runtime/cuda/kernel_launcher.cpp:87-93 (and the byte-identical AMDGPU twin at quadrants/runtime/amdgpu/kernel_launcher.cpp), resolve_num_threads(t.ad_stack, executor) runs before set_stream(stream_by_id[t.stream_parallel_group_id]). For autodiff range_for tasks with non-const bounds, resolve_num_threads reads CUDAContext::get_instance().get_stream() and calls stream_synchronize on it (line 38). On iteration j>group_start, that returns the previous iteration's per-group stream — host-blocking until the previous task fully completes and silently serializing what stream_parallel is documented to run concurrently. Fix: capture active_stream once and pass it into resolve_num_threads, or call set_stream(active_stream) (or set_stream(stream_by_id[t.stream_parallel_group_id])) before each resolve_num_threads call. Same fix needed in both backends.

Extended reasoning...

What the bug is

resolve_num_threads at quadrants/runtime/cuda/kernel_launcher.cpp:21-47 (AMDGPU equivalent at amdgpu/kernel_launcher.cpp:23-47) takes the dynamic-bound branch when info.dynamic_gpu_range_for == true AND (begin_offset_bytes >= 0 || end_offset_bytes >= 0). In that branch it captures active_stream = CUDAContext::get_instance().get_stream() at line 28, queues a 4-byte memcpy_device_to_host_async on that stream, and finally calls CUDADriver::get_instance().stream_synchronize(active_stream) at line 38. Crucially, active_stream here is the stream active at call time, not the active_stream captured at line 56 of the enclosing launch_offloaded_tasks.

The new inner stream_parallel loop at lines 87-93 sequences each iteration as:

for (size_t j = group_start; j < i; j++) {
  const auto &t = offloaded_tasks[j];
  std::size_t n_t = resolve_num_threads(t.ad_stack, executor);    // <-- line 89: get_stream()+sync on whatever was set last
  executor->publish_adstack_metadata(t.ad_stack, n_t, &ctx, device_context_ptr);
  CUDAContext::get_instance().set_stream(stream_by_id[t.stream_parallel_group_id]);  // <-- line 91: NOW switch
  cuda_module->launch(t.name, ...);
}

So on iteration j=group_start+1, resolve_num_threads runs before the per-iteration set_stream, and get_stream() returns whatever the previous iteration's last set_stream left active — i.e. the previous iteration's per-group stream s_{K-1}. Queueing the 4-byte memcpy on s_{K-1} (after the previous task's kernel was queued there) and then host-syncing on s_{K-1} blocks the host until the previous task fully completes, defeating the cross-group concurrency stream_parallel is meant to provide.

Why the trigger conditions hold

dynamic_gpu_range_for is set to true at codegen_cuda.cpp:650-660 (and the AMDGPU twin) only when per_thread_stride > 0 AND task_type == range_for AND !(const_begin && const_end). per_thread_stride > 0 requires the task to have at least one AdStackAlloca (i.e. autodiff). So the bug fires for autodiff range_for with non-const range bounds — a public-API-reachable combination via qd.ad.Tape + dynamic-bound range_for inside qd.stream_parallel(). Latent because tests/python/test_streams.py does not exercise autodiff inside stream_parallel.

Step-by-step proof

Two stream_parallel groups, each containing one dynamic-bound range_for with adstack (autodiff over a non-const range). After offloading, tasks A (group=1) and B (group=2). group_start=0, the inner-while at lines 73-75 advances i to 2, so group_end=2. Per-group streams s_1, s_2 are created at lines 78-85.

Inner launch loop (lines 87-93):

  1. j=0 (task A): resolve_num_threads(A.ad_stack, executor) calls get_stream() which returns the original active_stream (caller's stream, captured at line 56 — but set_stream hasn't run yet this iteration). Memcpy queued on active_stream, sync on active_stream. Returns n_A. publish_adstack_metadata runs. set_stream(s_1). Launch A on s_1 (async — returns immediately to host).
  2. j=1 (task B): resolve_num_threads(B.ad_stack, executor) calls get_stream() — which returns s_1 (the stream set by the previous iteration). It enters the dynamic_gpu_range_for branch, queues a DtoH memcpy on s_1 (which already has A's kernel queued in front of it), then calls stream_synchronize(s_1). The host blocks until A on s_1 fully completes. Only after that does the launcher proceed to publish_adstack_metadata, set_stream(s_2), and launch B on s_2.

Net result: tasks A and B run strictly sequentially with a host-side sync between them. No concurrency between groups, no error, no warning.

Why existing comments don't cover this

  • Inline-comment 3173021786 ("Preserve caller stream ordering") is about the entry handoff from active_stream to the per-group streams via an event record/wait. The host-side sync inside resolve_num_threads is a separate surface that exists before publish_adstack_metadata.
  • Inline-comment 3173135565 ("publish_adstack_metadata before set_stream") is about publish_adstack_metadata's own H2D copies queueing on the wrong stream. resolve_num_threads is a different function with its own get_stream + stream_synchronize pair.
  • Inline-comment 3173522419 ("shared adstack metadata buffers") is about the device buffer race once metadata IS published, distinct from the host-side sync that runs before publish.

Fix

Capture active_stream once in launch_offloaded_tasks (already done at line 56) and pass it explicitly into resolve_num_threads so the function uses an explicit caller stream rather than calling CUDAContext::get_instance().get_stream(). Equivalent alternative: set_stream(stream_by_id[t.stream_parallel_group_id]) first, then resolve_num_threads (so the dynamic-bound DtoH is naturally stream-ordered with the upcoming launch on the per-group stream); but note this still serializes B behind A if the metadata happens to be on the previous stream — the cleanest fix is the explicit-stream parameter. Mirror in amdgpu/kernel_launcher.cpp. One-line change in each backend.

Severity: normal. Silent loss of documented stream_parallel concurrency for autodiff range_for kernels with non-const bounds. Narrow trigger but reachable through the public qd.ad.Tape + qd.stream_parallel() combination, and the failure mode is concrete (full host-side sync between sibling groups, no overlap).

Comment on lines +1540 to +1543
if not isinstance(item.context_expr, ast.Call):
raise QuadrantsSyntaxError("'with' in Quadrants kernels requires a call expression")
if not ASTResolver.resolve_to(item.context_expr.func, stream_parallel, ctx.global_vars):
raise QuadrantsSyntaxError("'with' in Quadrants kernels only supports qd.stream_parallel()")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 qd.stream_parallel() context manager accepts but silently drops user-supplied args/kwargs. with qd.stream_parallel(123, num_streams=4): compiles and runs the same as the no-arg form because build_With (ast_transformer.py:1540-1543) checks the call resolves to stream_parallel but never inspects item.context_expr.args / .keywords, and the runtime contextmanager is never invoked (so the TypeError its zero-parameter signature would raise never fires). Fix: after the resolve_to check, raise QuadrantsSyntaxError if item.context_expr.args or item.context_expr.keywords is non-empty — a one-line addition mirroring the other validation checks already in build_With.

Extended reasoning...

What the bug is

build_With at python/quadrants/lang/ast/ast_transformer.py:1533-1547 performs five structural checks on a with statement: single context manager, no as binding, the context expression must be an ast.Call, the call must resolve to qd.stream_parallel, and the enclosing function must be a kernel. After those pass it dispatches begin_stream_parallel() / end_stream_parallel() on the C++ ASTBuilder. It never inspects item.context_expr.args or item.context_expr.keywords.

Meanwhile stream_parallel in python/quadrants/lang/stream.py:131-138 is declared as a zero-parameter @contextmanager:

@contextmanager
def stream_parallel():
    yield

So the only legal call shape is qd.stream_parallel(). Anywhere else in Python, qd.stream_parallel(123, num_streams=4) would raise TypeError: stream_parallel() takes 0 positional arguments but 1 was given at runtime — but inside a kernel the AST transformer intercepts the call at compile time, replaces it with begin_stream_parallel() / end_stream_parallel() IR calls, and the contextmanager body never executes. The arguments are simply never evaluated.

Step-by-step proof

@qd.kernel
def k():
    with qd.stream_parallel(123, num_streams=4):   # silently accepted
        for i in range(N):
            a[i] = 1.0
    with qd.stream_parallel():                     # also accepted
        for j in range(N):
            b[j] = 2.0
  1. build_FunctionDef runs _validate_stream_parallel_exclusivity. _is_stream_parallel_with returns True for both with statements (it only checks single context manager, that context_expr is an ast.Call, and that the func resolves to stream_parallel — args/keywords are not inspected). Validation passes.
  2. build_stmts walks each ast.With. build_With checks len(node.items) == 1 ✓, optional_vars is None ✓, isinstance(item.context_expr, ast.Call) ✓, ASTResolver.resolve_to(item.context_expr.func, stream_parallel, ...) ✓, ctx.is_kernel ✓. It then calls ctx.ast_builder.begin_stream_parallel(), recurses into the body, and calls ctx.ast_builder.end_stream_parallel().
  3. item.context_expr.args == [ast.Constant(123)] and item.context_expr.keywords == [ast.keyword(arg='num_streams', value=ast.Constant(4))] are present in the AST but never read.
  4. The kernel compiles cleanly and runs identically to the no-arg form. The user gets stream-parallel execution as if they had written qd.stream_parallel()num_streams=4 did nothing.

Why nothing else catches it

The contextmanager body never executes at Python runtime (the AST transformer rewrites the call before Func.__call__ would otherwise dispatch into Python land), so the natural TypeError Python would raise for too-many-arguments never fires. _validate_stream_parallel_exclusivity shares the _is_stream_parallel_with helper with build_With and inherits the same blind spot. There is no test in tests/python/test_streams.py that exercises a misshapen stream_parallel(...) call.

Impact

User-facing UX papercut. A user who reasonably guesses at an API like num_streams=N, priority=..., or a positional stream-count argument gets no compile-time error and no runtime error — their argument is silently a no-op. They may well believe their argument is in effect and chase a phantom bug elsewhere. No silent miscompilation, no incorrect runtime behavior, no perf regression — the kernel still does what stream_parallel() with no args would do.

Fix

One-line addition in build_With after the resolve_to check, consistent with the surrounding validation style:

if item.context_expr.args or item.context_expr.keywords:
    raise QuadrantsSyntaxError("qd.stream_parallel() takes no arguments")

Three independent verifiers reviewed this and confirmed it is correct, real, and a real (if minor) UX gap; all three suggested nit severity given there is no correctness or performance impact.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still applies — build_With (ast_transformer.py:1533-1549) still does not inspect item.context_expr.args or item.context_expr.keywords, so with qd.stream_parallel(123, num_streams=4): continues to compile and run identically to the no-arg form. One-line fix after the resolve_to check, mirroring the surrounding validation style: if item.context_expr.args or item.context_expr.keywords: raise QuadrantsSyntaxError("qd.stream_parallel() takes no arguments").

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still applies — build_With (ast_transformer.py:1531-1547) still does not inspect item.context_expr.args or item.context_expr.keywords. with qd.stream_parallel(123, num_streams=4): continues to compile and run identically to the no-arg form. One-line fix after the _is_stream_parallel_with check, mirroring the surrounding validation style: if item.context_expr.args or item.context_expr.keywords: raise QuadrantsSyntaxError("qd.stream_parallel() takes no arguments").

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 1, 2026

Coverage Report (ca560b64d)

File Coverage Missing
🔴 python/quadrants/lang/ast/ast_transformer.py 0% 31,43,1533-1549
🔴 python/quadrants/lang/ast/ast_transformers/function_def_transformer.py 58% 29,35,456-457,461,464,467-468,471-472,477
🔴 python/quadrants/lang/stream.py 0% 2,131-132,138,141
🔴 tests/python/test_streams.py 24% 266-268,270-277,279-282,288-291,293-302,304-307,309-314,320,322-323,325-336,338-351,353,356-358,360-364,366-370,372-374,378,398-402

Diff coverage: 26% · Overall: 60% · 170 lines, 126 missing

Full annotated report

…-3-stream-parallel

Resolve conflict in test_streams.py: keep both base branch tests (context manager, event,
tape/graph rejection) and head branch tests (stream_parallel basic, timing, mixed rejection).

Co-authored-by: Cursor <cursoragent@cursor.com>
Comment on lines +1546 to +1548
ctx.ast_builder.begin_stream_parallel()
build_stmts(ctx, node.body)
ctx.ast_builder.end_stream_parallel()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Non-for statements (e.g. a[0] = 1.0, qd.deactivate(snode, [k]), counter[None] = 0) placed directly inside with qd.stream_parallel(): silently race with sibling for-loops. The non-for statement is bundled into a serial OffloadedStmt with default stream_parallel_group_id=0 and dispatched on active_stream, while the sibling for-loop runs on a fresh CU_STREAM_NON_BLOCKING per-group stream that has no event handoff back to the active stream — so the for-loop can begin reading what the assignment is supposed to write before the assignment retires. Fix: in build_With, walk node.body and reject anything that is not a for-loop (mirroring the existing kernel-body validator), or stamp the active group_id onto every emitted root_block statement while inside a stream_parallel block.

Extended reasoning...

Bug\n\nbuild_With (python/quadrants/lang/ast/ast_transformer.py:1546-1548) does not introspect node.body and does not open a new IR scope — begin_stream_parallel only flips a counter on the ASTBuilder. So statements inside the with body are inserted directly into the kernel root_block. The frontend group-id stamping the PR added lives only in the four begin_frontend_*_for methods (frontend_ir.cpp:1395/1409/1423/1439), which means non-for statements (FrontendAssignStmt, FrontendSNodeOpStmt, etc.) never carry a stream_parallel_group_id — that field does not exist on those statement types.\n\nIn Offloader::run (quadrants/transforms/offload.cpp:90-158), only RangeForStmt / StructForStmt / MeshForStmt become standalone OffloadedStmts; everything else falls into the else at line 155-157 and is moved into pending_serial_statements — a serial OffloadedStmt constructed with default-initialized stream_parallel_group_id=0 (statements.h:1370). Sibling RangeForStmt at line 129 propagates group_id correctly. Codegen at codegen_cuda.cpp:641 / codegen_amdgpu.cpp:354 copies that 0 onto the OffloadedTask, and the launcher at runtime/cuda/kernel_launcher.cpp:55-95 (AMDGPU twin) takes the default-stream branch for group_id=0 and the per-group stream branch for the for-loop.\n\nThe two streams are independent: the per-group stream is created with CU_STREAM_NON_BLOCKING (line 80), which does NOT implicit-sync with the legacy NULL stream nor with arbitrary user-created streams. The launcher records no event from active_stream and inserts no stream_wait_event on the new stream — so the for-loop on s_K can begin executing before the serial assignment on active_stream finishes.\n\n## Distinct from existing PR-timeline bugs\n\n- Bug #11 (for-with-break): trigger requires a break inside the for-loop, which causes lower_ast to emit AllocaStmt+WhileStmt at root_block. No assignment-style trigger.\n- Bug #6 (strictly_serialized): trigger is a top-level RangeForStmt with strictly_serialized=true that fails the !s->strictly_serialized predicate at offload.cpp:93. The cast succeeds; the predicate fails. Different upstream path.\n- Bug #13 (non-static if/while wrapping for-loop): trigger is an IfStmt/WhileStmt at root_block whose body contains a for-loop. The cast at line 93 fails because of TYPE (IfStmt/WhileStmt), and the BUNDLE drops the inner for's group_id. Bug 13's proposed fix is to recursively scan the bundle for an inner for-loop and propagate that for-loop's group_id onto the bundle. That fix does not help here: the offending statement is itself the bundle entry (Assignment, SNodeOp, etc.) — there IS NO inner for-loop to read group_id from.\n- Bug #16 (qd.deactivate gc tasks): trigger is qd.deactivate INSIDE a for-loop, producing gc auxiliary tasks via insert_gc. This bug is qd.deactivate (or any non-for statement) at the with-body level, NOT inside a for-loop.\n\nThe shared root cause across these bugs is that pending_serial_statements always defaults to stream_parallel_group_id=0, but the user-reachable trigger here (plain non-for statement directly in the with-body) is not covered by any of those bugs' fix proposals.\n\n## Step-by-step proof\n\npython\n@qd.kernel\ndef k():\n with qd.stream_parallel():\n a[0] = 1.0 # FrontendAssignmentStmt at root_block, NO group_id\n for i in range(N):\n b[i] = a[0] * 2 # range_for, group_id=1, reads a[0]\n\n\n1. _validate_stream_parallel_exclusivity (function_def_transformer.py:472) walks node.body == [ast.With] — single with qd.stream_parallel():, all top-level entries match. Validation passes.\n2. build_With (ast_transformer.py:1533-1548) calls begin_stream_parallel() (counter→1), then build_stmts(ctx, node.body) which walks [ast.Assign, ast.For] at the SAME scope as kernel root. build_Assign emits a FrontendAssignmentStmt directly into root_block — no group_id stamping. build_For reaches begin_frontend_range_for which DOES stamp stream_parallel_group_id=1 onto the FrontendForStmt.\n3. After lowering: root_block = [FrontendAssignment, RangeForStmt(group_id=1)]. The FrontendAssignment has no stream_parallel_group_id field at all.\n4. Offloader::run iterates root_block. The FrontendAssignment fails every for-loop cast → falls into the else at offload.cpp:155-157 → moved into pending_serial_statements. RangeForStmt hits offload.cpp:93assemble_serial_statements flushes the serial OffloadedStmt (group_id=0) into root_block, then constructs a fresh range_for OffloadedStmt with group_id=1.\n5. Final OffloadedTask list: [serial(group=0, [Assignment]), range_for(group=1, [for-body])].\n6. Launcher walk:\n - i=0, group=0 → default-stream branch, launches serial on active_stream (async).\n - i=1, group=1 → enters else branch. Creates s_1 with CU_STREAM_NON_BLOCKING (line 80). Sets stream to s_1, launches range_for on s_1, syncs, destroys.\n7. Race: s_1 has no implicit dependency on active_stream (NON_BLOCKING semantics), and the launcher inserts no event handoff. The range_for on s_1 can begin reading a[0] before the serial task on active_stream finishes writing 1.0.\n\n## Reachable user patterns\n\n- a[0] = some_value before a for-loop (initialize accumulator, then iterate)\n- counter[None] = 0 (global atomic-store followed by parallel reduction)\n- qd.deactivate(snode, [k]) (plain snode op without a wrapping if/while)\n- qd.append(snode, [...], val)\n- A function call like qd.sync() or any custom @qd.func that performs a global store\n\nIn each case the user is inside with qd.stream_parallel(): and reasonably expects safe sequencing. The validator's contract ('all top-level statements... must be stream_parallel blocks') is enforced at the kernel-body level but not at the with-body level. tests/python/test_streams.py only exercises with-bodies that contain only for-loops, so this regression is untested.\n\n## Fix\n\n**(a) Conservative, one-liner**: in build_With, after the stream_parallel call resolves, walk node.body and reject anything that is not an ast.For. Symmetric with the kernel-body validator already introduced in this PR; the docs already imply only for-loops are intended inside with qd.stream_parallel():.\n\n**(b) Structural**: when emitting any root_block statement while current_stream_parallel_group_id_ != 0, propagate the group_id onto a new field on the statement (or split bundles in the offloader by surrounding group_id). More invasive, but preserves the legality of mixing assignments and for-loops in a single with block.

…cpu' into hp/streams-quadrantsic-3-stream-parallel
Comment on lines +86 to +92
for (size_t j = group_start; j < i; j++) {
const auto &t = offloaded_tasks[j];
executor->publish_adstack_metadata(t.ad_stack, resolve_num_threads(t, executor), &ctx, context_pointer);
AMDGPUContext::get_instance().set_stream(stream_by_id[t.stream_parallel_group_id]);
amdgpu_module->launch(t.name, t.grid_dim, t.block_dim, t.dynamic_shared_array_bytes, {(void *)&context_pointer},
{arg_size});
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 On AMDGPU with kernel_profiler=True, every with qd.stream_parallel(): block runs strictly sequentially: AMDGPUContext::launch calls profiler_->stop after each kernel dispatch, and KernelProfilerAMDGPU::stop (amdgpu_profiler.cpp:64, added on this branch by 3499bbc) does stream_synchronize(active_stream). Inside the new inner stream_parallel loop at amdgpu/kernel_launcher.cpp:86-92, active_stream IS the per-group stream just installed by set_stream(s_K), so the host blocks on s_K before the next iteration's set_stream(s_{K+1}) and launch — silently defeating the documented concurrency. Latent (profiler is not on by default) and perf-only (results remain correct), so likely fixable by deferring profiler->stop until after the per-group sync, switching to event_synchronize on the stop event, or documenting the incompatibility.

Extended reasoning...

What the bug is

This PR introduces a with qd.stream_parallel(): context manager that produces OffloadedTasks with non-zero stream_parallel_group_id. The new inner dispatch loop at quadrants/runtime/amdgpu/kernel_launcher.cpp:86-92 sequences each iteration as:

for (size_t j = group_start; j < i; j++) {
  const auto &t = offloaded_tasks[j];
  executor->publish_adstack_metadata(t.ad_stack, resolve_num_threads(t, executor), &ctx, context_pointer);
  AMDGPUContext::get_instance().set_stream(stream_by_id[t.stream_parallel_group_id]);
  amdgpu_module->launch(t.name, t.grid_dim, t.block_dim, ...);
}

Where amdgpu_module->launch routes (via jit_amdgpu.h:81) into AMDGPUContext::launch (amdgpu_context.cpp:171-206), which unconditionally calls profiler_->stop(task_handle) immediately after driver_.launch_kernel whenever profiler_ is set.

KernelProfilerAMDGPU::stop at quadrants/rhi/amdgpu/amdgpu_profiler.cpp:61-64 — added on this PR's merge chain by commit 3499bbc — reads active_stream = AMDGPUContext::get_instance().get_stream() and then calls AMDGPUDriver::get_instance().stream_synchronize(active_stream) so the subsequent event_elapsed_time read does not fault on a non-completed event.

Inside the inner stream_parallel loop the active stream at the moment profiler_->stop runs IS the per-group stream s_K just installed by set_stream(stream_by_id[t.stream_parallel_group_id]). So the host blocks on s_K before the loop's next iteration runs set_stream(s_{K+1}) and launch(...). The two per-group launches end up strictly serialized despite each being on its own stream — the documented concurrency stream_parallel exists to provide is silently lost.

Step-by-step proof

User program (AMDGPU + profiler + two-block stream_parallel):

qd.init(arch=qd.amdgpu, kernel_profiler=True)
@qd.kernel
def k():
    with qd.stream_parallel():
        for i in range(N): a[i] = compute_a(i)
    with qd.stream_parallel():
        for j in range(N): b[j] = compute_b(j)
  1. Two OffloadedTasks emerge with stream_parallel_group_id 1 and 2.
  2. Launcher creates s_1 and s_2 (HIP_STREAM_NON_BLOCKING) at lines 78-85.
  3. Iter j=group_start (task A, group=1): set_stream(s_1); amdgpu_module->launchAMDGPUContext::launch queues kernel A on s_1, then profiler_->stop(handle_A) → reads active_stream = s_1 and stream_synchronize(s_1). Host blocks until A on s_1 fully completes.
  4. Iter j=group_start+1 (task B, group=2): Only NOW does set_stream(s_2) run; launch queues B on s_2. By the time B is enqueued, A has already drained — there is no overlap.

The kernel results are correct; the concurrency contract is silently violated. test_stream_parallel_timing's >1.5x assertion would fail under qd.init(arch=qd.amdgpu, kernel_profiler=True), but tests/python/test_streams.py runs with kernel_profiler off by default, so this regression is latent.

Why nothing else catches this

  • tests/python/test_streams.py does not exercise kernel_profiler=True.
  • The CUDA twin at cuda_profiler.cpp:127-128 records its stop event on nullptr (legacy NULL stream) and stream_synchronize(nullptr), which on a CU_STREAM_NON_BLOCKING per-group stream produces ~0 ms timings rather than host-blocking — that is the same pre-3499bbc shape AMDGPU had before the fix landed; not the bug here, just an explanation of why CUDA does not exhibit the same regression.
  • This is distinct from the previously-flagged active_stream entry handoff, the blocking-flag bug, the rand_states race, and the adstack metadata buffer race — none of those involve the profiler.

Impact

Narrow trigger (opt-in profiler + stream_parallel) and silent perf-only failure (output is correct, just non-concurrent), but the failure mode directly contradicts the documented purpose of the feature this PR introduces. Two of the three verifiers rated this nit for those reasons; one rated it normal because the trigger is reachable through a public-API combination.

Fix options

(a) Defer profiler_->stop until after all per-group launches in a stream_parallel batch, so that the per-group stream_synchronize(s_K) already happening at lines 95-100 of the launcher provides the completion guarantee before event_elapsed_time is read.
(b) Change KernelProfilerAMDGPU::stop to use event_synchronize on the stop event itself (events synchronize independently of the stream), which preserves per-task timing without host-syncing the stream.
(c) Document that kernel_profiler defeats concurrency for qd.stream_parallel() on AMDGPU as a known limitation.

(a) and (b) are real fixes that preserve both timing and concurrency; (c) is the minimum acceptable mitigation. Note (b) also incidentally improves the CUDA side, which currently produces ~0 ms timings for kernels launched on non-default streams.

Comment on lines +467 to +483
@staticmethod
def _is_docstring(stmt: ast.stmt, index: int) -> bool:
return index == 0 and isinstance(stmt, ast.Expr) and isinstance(stmt.value, (ast.Constant, ast.Str))

@staticmethod
def _validate_stream_parallel_exclusivity(body: list[ast.stmt], global_vars: dict[str, Any]) -> None:
if not any(FunctionDefTransformer._is_stream_parallel_with(s, global_vars) for s in body):
return
for i, stmt in enumerate(body):
if FunctionDefTransformer._is_docstring(stmt, i):
continue
if not FunctionDefTransformer._is_stream_parallel_with(stmt, global_vars):
raise QuadrantsSyntaxError(
"When using qd.stream_parallel(), all top-level statements "
"in the kernel must be 'with qd.stream_parallel():' blocks. "
"Move non-parallel code to a separate kernel."
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The new _validate_stream_parallel_exclusivity check (function_def_transformer.py:467-477) only carves out docstrings (index==0 ast.Expr(Constant)), but ast.Pass and qd.static_assert(...) / qd.static_print(...) at the kernel top-level — both compile-time directives that emit no IR — also trip the validator with the misleading "Move non-parallel code to a separate kernel" error. A kernel that writes qd.static_assert(N > 0) (the idiomatic pattern shown in tests/python/test_assert.py:138/150) followed by with qd.stream_parallel(): blocks fails compilation; the workaround is to delete or relocate the directive. Fix is a one-helper extension that also skips ast.Pass and ast.Expr(Call) whose call resolves to known compile-time directives.

Extended reasoning...

What the bug is

_validate_stream_parallel_exclusivity (function_def_transformer.py:467-477) iterates node.body and raises QuadrantsSyntaxError("...all top-level statements must be with qd.stream_parallel(): blocks. Move non-parallel code to a separate kernel.") for any statement that is neither _is_stream_parallel_with nor _is_docstring. The new _is_docstring carve-out only matches index == 0 and isinstance(stmt, ast.Expr) and isinstance(stmt.value, (ast.Constant, ast.Str)) — i.e. PEP 257 docstrings at body[0]. Three other harmless top-level constructs are not handled:

  1. ast.Passpass placeholder, lowered by build_Pass (ast_transformer.py) to a no-op (return None). Emits no IR.
  2. qd.static_assert(...) at top level — pure-Python compile-time check (impl.py:615-638), uses Python assert against a static value. Emits no IR. Idiomatic kernel-top directive, exercised at tests/python/test_assert.py:138, 150, 161, 171 and tests/python/test_lexical_scope.py:13, 17. Parses to ast.Expr(value=ast.Call(func=Attribute(...static_assert)))ast.Call, not ast.Constant, so _is_docstring returns False even at index 0.
  3. qd.static_print(...) at top level — same shape, same compile-time-only semantics, same incorrect rejection.

Note: the original synthesis also lists Python assert as a harmless construct, but build_Assert (ast_transformer.py:1475) does emit real runtime-checked IR that becomes part of the offloaded task graph. Rejecting it at top level is correct (it would race with sibling stream_parallel for-loops the same way any other store would). I am narrowing the bug to ast.Pass + qd.static_assert + qd.static_print and leaving assert out.

How the failure manifests

@qd.kernel
def k():
    qd.static_assert(N > 0)              # ast.Expr(Call(static_assert)) — no IR
    with qd.stream_parallel():
        for i in range(N): a[i] = 1.0
    with qd.stream_parallel():
        for j in range(N): b[j] = 2.0

Step-by-step trace:

  1. build_FunctionDef calls _validate_stream_parallel_exclusivity(node.body, ctx.global_vars).
  2. The body has [ast.Expr(Call(static_assert)), ast.With, ast.With]. _is_stream_parallel_with returns True for the two ast.With nodes → has_sp = True.
  3. The walk iterates body. At i=0, stmt=ast.Expr(Call): _is_docstring(stmt, 0) checks isinstance(stmt.value, (ast.Constant, ast.Str))stmt.value is ast.Call, returns False. _is_stream_parallel_with(stmt) returns False (not ast.With). Validator raises QuadrantsSyntaxError("...Move non-parallel code to a separate kernel.").

The user sees an error telling them to "move non-parallel code", but there is no non-parallel runtime code to move — qd.static_assert only emits a Python-level assertion at kernel compile time. The workaround is to delete the invariant check or to relocate it inside one of the with bodies (where it still runs at compile time and so still works), but neither preserves the kernel-wide intent. Same story for pass and static_print.

Why no existing safeguard catches it

  • _is_docstring only matches index 0 + ast.Expr + ast.Constant/ast.Strast.Call is neither.
  • _is_stream_parallel_with only matches ast.With.
  • The frontend group-id stamping is irrelevant here; the rejection happens during the AST walk before any IR is emitted.

Impact and severity

User-facing impact is a confusing compile-time error. No silent miscompilation, no perf regression, no data race. Workaround is trivial (delete or relocate the directive). All four verifiers independently confirmed at nit severity:

  • Verifier 1: "narrow papercut, not a correctness issue, low frequency of occurrence."
  • Verifier 2: explicitly excluded assert from the carve-out (see narrowing above).
  • Verifier 3: "Worth folding into the existing carve-out as a one-helper change."
  • Verifier 4: "no correctness/perf impact and the workaround is trivial; the papercut is most acute for users following defensive-programming patterns or PEP 257-style annotations."

Fix

Extend the existing _is_docstring carve-out in function_def_transformer.py:

@staticmethod
def _is_skippable(stmt: ast.stmt, index: int, global_vars: dict[str, Any]) -> bool:
    if FunctionDefTransformer._is_docstring(stmt, index):
        return True
    if isinstance(stmt, ast.Pass):
        return True
    if isinstance(stmt, ast.Expr) and isinstance(stmt.value, ast.Call):
        for target in (static_assert, static_print):
            if ASTResolver.resolve_to(stmt.value.func, target, global_vars):
                return True
    return False

Then call _is_skippable in place of _is_docstring inside _validate_stream_parallel_exclusivity. The principled alternative noted in the original Bug #9 — only reject statements that actually emit offloaded tasks — would catch this class of constructs (and any future no-op directive) for free, but it is a larger refactor.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 1, 2026

Coverage Report (388a797e0)

File Coverage Missing
🔴 python/quadrants/lang/ast/ast_transformer.py 0% 31,43,1533-1549
🔴 python/quadrants/lang/ast/ast_transformers/function_def_transformer.py 58% 29,35,456-457,461,464,467-468,471-472,477
🔴 python/quadrants/lang/stream.py 0% 2,131-132,138,141
🔴 tests/python/test_streams.py 24% 342-344,346-353,355-358,364-367,369-378,380-383,385-390,396,398-399,401-412,414-427,429,432-434,436-440,442-446,448-450,454,474-478

Diff coverage: 26% · Overall: 65% · 170 lines, 126 missing

Full annotated report

hughperkins and others added 3 commits May 1, 2026 11:12
The _is_stream_parallel_with validation uses ASTResolver.resolve_to which
compares objects with `is`. On Linux build runners where quadrants is
available from both the source tree and installed location, the
stream_parallel function object may differ between import paths. Add a
fallback that checks __name__ and __module__ when identity fails, and
add ASTResolver.resolve_value for general AST-to-object resolution.

Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
Copy link
Copy Markdown

@claude claude Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additional findings (outside current diff — PR may have been updated during review):

  • 🟡 python/quadrants/lang/ast/ast_transformer.py:31 — Unused import: from quadrants.lang.ast.symbol_resolver import ASTResolver was added at ast_transformer.py:31 but the symbol is never used in this file. The actual usage of ASTResolver lives in function_def_transformer.py via _is_stream_parallel_with, which build_With calls indirectly. Likely a leftover from an earlier draft where build_With called ASTResolver directly. Pure nit — remove the import line.

    Extended reasoning...

    What the bug is

    This PR adds from quadrants.lang.ast.symbol_resolver import ASTResolver at python/quadrants/lang/ast/ast_transformer.py:31, but the imported symbol is never referenced anywhere else in this file.

    Verification

    Grepping for ASTResolver in python/quadrants/lang/ast/ast_transformer.py returns exactly one match — the import line itself at line 31. There are no further usages in the module. The build_With handler (around line 1533-1547) that this import was presumably added to support delegates the stream_parallel detection to FunctionDefTransformer._is_stream_parallel_with (line 1541), which has its own ASTResolver import in function_def_transformer.py. So the actual user of ASTResolver is the other module, not this one.

    Why it slipped through

    This looks like a leftover from an earlier draft of build_With that called ASTResolver.resolve_to(...) inline before the resolution logic was refactored into FunctionDefTransformer._is_stream_parallel_with. The import was kept; the call site was removed.

    Step-by-step proof

    1. Open python/quadrants/lang/ast/ast_transformer.py and grep for ASTResolver — single match on line 31 (the import).
    2. Open python/quadrants/lang/ast/ast_transformers/function_def_transformer.py and grep for ASTResolver — multiple matches: import plus actual usage in _is_stream_parallel_with (and resolve_value callers).
    3. The build_With handler in ast_transformer.py (lines 1533-1547) only references FunctionDefTransformer._is_stream_parallel_with, never ASTResolver directly.

    Impact

    Zero behavioral impact — purely a dead import. Linters (ruff/flake8 with F401) will flag it, and a future grep for ASTResolver in this file would mislead a maintainer into thinking the symbol is used here.

    Fix

    Delete the from quadrants.lang.ast.symbol_resolver import ASTResolver line at ast_transformer.py:31. One-line removal.

Comment on lines +131 to +137
@contextmanager
def stream_parallel():
"""Run top-level for loops in this block on separate GPU streams.

Used inside @qd.kernel. At Python runtime (outside kernels), this is a no-op. During kernel compilation, the AST
transformer calls into the C++ ASTBuilder to tag loops with a stream-parallel group ID.
"""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The docstring on stream_parallel says 'Run top-level for loops in this block on separate GPU streams' (plural), but per the same PR's user_guide/streams.md ('Multiple for loops within a single block share a stream and run serially on it') and the actual implementation, all for-loops within ONE with qd.stream_parallel(): block share ONE stream — it is consecutive blocks that get separate streams. Suggest clarifying to e.g. 'Run this block on its own GPU stream (separate from sibling stream_parallel blocks). Multiple for-loops inside one block share that stream and execute serially on it.'

Extended reasoning...

What the docstring says vs. what the code does

python/quadrants/lang/stream.py:131-138:

@contextmanager
def stream_parallel():
    """Run top-level for loops in this block on separate GPU streams.

    Used inside @qd.kernel. ...
    """
    yield

The plural 'separate GPU streams' for the for-loops within a single block reads naturally as 'each for-loop here gets its own stream'. That contradicts the actual semantics established by this same PR.

Why it is wrong

ASTBuilder::begin_stream_parallel (quadrants/ir/frontend_ir.h:1027-1029) increments stream_parallel_group_counter_ once per with-block and assigns the new value to current_stream_parallel_group_id_:

void begin_stream_parallel() {
  QD_ERROR_IF(current_stream_parallel_group_id_ != 0, ...);
  current_stream_parallel_group_id_ = ++stream_parallel_group_counter_;
}

Every for-loop inside the with-body then reads that single value. begin_frontend_range_for at frontend_ir.cpp:1395 stamps it onto for_loop_dec_.config, and the same pattern fires in begin_frontend_struct_for_on_snode (1409), begin_frontend_struct_for_on_external_tensor (1423), and begin_frontend_mesh_for (1439). So all for-loops in one block carry the same group id.

The launcher at quadrants/runtime/cuda/kernel_launcher.cpp:75-83 (and the byte-identical AMDGPU twin) creates one stream per unique group id — stream_by_id is keyed by stream_parallel_group_id, so a single block always maps to a single stream.

Why I'm certain — corroborated by the PR's own docs

The user-guide rewrite in this same PR (docs/source/user_guide/streams.md) spells out the correct behavior verbatim:

Consecutive with qd.stream_parallel(): blocks run concurrently. Multiple for loops within a single block share a stream and run serially on it.

So the docstring directly contradicts the prose docs the same PR ships.

Step-by-step proof — two for-loops in one block

@qd.kernel
def k():
    with qd.stream_parallel():
        for i in range(N): a[i] = 1.0   # for-A
        for j in range(N): b[j] = 2.0   # for-B
  1. build_With calls begin_stream_parallel() → counter goes 0→1, current_stream_parallel_group_id_=1.
  2. begin_frontend_range_for for for-A stamps stream_parallel_group_id=1 onto its FrontendForStmt.
  3. begin_frontend_range_for for for-B reads the same current_stream_parallel_group_id_=1 and stamps 1 onto its FrontendForStmt — the counter is not incremented again.
  4. end_stream_parallel resets to 0 (counter retains the high-water mark).
  5. After lowering and offloading both tasks carry stream_parallel_group_id=1.
  6. Launcher walks offloaded_tasks: enters the stream-parallel branch, builds stream_by_id keyed by group id → exactly one entry for id=1, so exactly one stream is created. for-A and for-B are launched on that same stream sequentially.

So the user-visible behavior for the docstring's described scenario ('top-level for loops in this block') is one stream, not multiple. The plural is wrong.

Fix

Replace the docstring with text that matches the implementation and the .md docs, e.g.:

@contextmanager
def stream_parallel():
    """Run this block on its own GPU stream, separate from sibling stream_parallel blocks.

    Used inside @qd.kernel. Consecutive `with qd.stream_parallel():` blocks execute concurrently on
    different streams; multiple for-loops inside one block share that block's stream and execute
    serially on it. At Python runtime (outside kernels), this is a no-op — the AST transformer
    interprets the with-statement at compile time and tags loops with a stream-parallel group ID.
    """
    yield

Severity

All four verifiers rated this nit: pure documentation papercut, no behavioral or correctness impact, the help() output for stream_parallel is the only user-facing surface affected. Worth folding in because the contradiction is right next to the PR's own .md prose.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment is unchanged at python/quadrants/lang/stream.py:133-137 — still reads "Run top-level for loops in this block on separate GPU streams" (plural). The contradiction with user_guide/streams.md ("Multiple for loops within a single block share a stream and run serially on it") still stands.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 1, 2026

Coverage Report (70eb47152)

File Coverage Missing
🔴 python/quadrants/lang/ast/ast_transformer.py 0% 1531-1547
🔴 python/quadrants/lang/ast/ast_transformers/function_def_transformer.py 57% 29,35,456-457,461,464,468-469,475-476,479-480,485
🔴 python/quadrants/lang/ast/symbol_resolver.py 0% 59-60,66-67,69-70,72-79,81-84,86-89
🔴 python/quadrants/lang/stream.py 0% 2,131-132,138,141
🔴 tests/python/test_streams.py 24% 342-344,346-353,355-358,364-367,369-378,380-383,385-390,396,398-399,401-412,414-427,429,432-434,436-440,442-446,448-450,454,474-478

Diff coverage: 24% · Overall: 60% · 194 lines, 148 missing

Full annotated report

hughperkins and others added 2 commits May 1, 2026 12:36
When object resolution fails (dual import paths), fall back to checking
the AST node name directly. Inside @qd.kernel the only valid with-context
is qd.stream_parallel(), so checking the attribute name is sufficient.

Co-authored-by: Cursor <cursoragent@cursor.com>
Include the failing statement type, index, and body length to help
debug the persistent Linux build x64 test failures.

Co-authored-by: Cursor <cursoragent@cursor.com>
Comment thread tests/python/test_streams.py
Keep both new limitation bullets from head and base branches.

Co-authored-by: Cursor <cursoragent@cursor.com>
Comment on lines +497 to +501
raise QuadrantsSyntaxError(
"When using qd.stream_parallel(), all top-level statements "
"in the kernel must be 'with qd.stream_parallel():' blocks. "
f"Move non-parallel code to a separate kernel. "
f"[stmt {i}: {stmt_desc}, body_len={len(body)}]"
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 The error message at function_def_transformer.py:497-501 ends with a debug-instrumentation suffix [stmt {i}: {stmt_desc}, body_len={len(body)}] (added by commit a6c3852 to chase the persistent Linux build x64 test failures) that leaks raw ast.dump() output to end users. The user-facing portion ("all top-level statements... must be with qd.stream_parallel(): blocks. Move non-parallel code to a separate kernel.") is sufficient on its own; please drop the bracketed suffix once the test failure being debugged is resolved.

Extended reasoning...

What the bug is

The QuadrantsSyntaxError raised by _validate_stream_parallel_exclusivity at function_def_transformer.py:497-501 currently formats as:

When using qd.stream_parallel(), all top-level statements in the kernel must be 'with qd.stream_parallel():' blocks. Move non-parallel code to a separate kernel. [stmt {i}: {stmt_desc}, body_len={len(body)}]

The trailing bracket is debug instrumentation. The commit that added it (a6c3852Add diagnostic info to stream_parallel exclusivity error message) explicitly states the intent in its commit body: "Include the failing statement type, index, and body length to help debug the persistent Linux build x64 test failures." That is a temporary diagnostic for an in-flight investigation, not durable user-facing text.

Why the suffix is unsuitable for users

stmt_desc is built at function_def_transformer.py:493-496 by appending ast.dump(ctx_expr.func) whenever the offending statement is an ast.With whose context is a Call with an Attribute func. ast.dump produces raw Python AST repr strings like Attribute(value=Name(id='qd', ctx=Load()), attr='static_assert', ctx=Load()) — implementation-detail strings that would land verbatim in a user-visible SyntaxError. body_len (the count of top-level kernel-body statements) has no actionable value to a kernel author, and a bare integer index without source location adds little.

Why this is safe to drop

The user-facing first sentence is already actionable on its own: it names the rule ("all top-level statements... must be with qd.stream_parallel(): blocks") and the remediation ("Move non-parallel code to a separate kernel"). Removing the diagnostic bracket has zero correctness or performance impact — it only improves the polish of the displayed message. The diagnostic served its purpose during the Linux x64 investigation, but durable error text should not carry chase-the-bug instrumentation into a release.

Step-by-step proof

  1. A user kernel mixes a with qd.stream_parallel(): block with a non-stream_parallel statement (e.g. with some_other_ctx(): followed by a with qd.stream_parallel(): block).
  2. _validate_stream_parallel_exclusivity (function_def_transformer.py:483-501) walks node.body, finds has_sp = True, then encounters the offending ast.With whose call is some_other_ctx().
  3. _is_stream_parallel_with returns False, so the validator constructs stmt_desc = "With(with Attribute(value=Name(id='some_other_ctx', ctx=Load()), attr='__call__', ctx=Load()))" (or similar AST-dump string for the resolved attribute chain).
  4. The raised message reads: ...Move non-parallel code to a separate kernel. [stmt 0: With(with Attribute(...)), body_len=2].
  5. The trailing bracket is implementation noise unrelated to the user's fix.

Fix

Drop the stmt_desc and body_len formatting and the f-string at the tail of the error, leaving:

raise QuadrantsSyntaxError(
    "When using qd.stream_parallel(), all top-level statements "
    "in the kernel must be 'with qd.stream_parallel():' blocks. "
    "Move non-parallel code to a separate kernel."
)

Once the underlying Linux build x64 test failure that motivated commit a6c3852 is resolved, the diagnostic bracket can be removed in the same change. If the diagnostic is still occasionally useful, consider gating it behind QD_LOG_LEVEL / logger.debug rather than embedding it in the user-visible exception text.

Co-authored-by: Cursor <cursoragent@cursor.com>
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 1, 2026

Coverage Report (3af5bc860)

File Coverage Missing
🔴 python/quadrants/lang/ast/ast_transformer.py 0% 1531-1547
🔴 python/quadrants/lang/ast/ast_transformers/function_def_transformer.py 57% 29,35,191,382,394,446,507-508,512,515,519-521,524-528,530-531,538-539,549,553-554,557
🔴 python/quadrants/lang/ast/symbol_resolver.py 0% 59-60,66-67,69-70,72-79,81-84,86-89
🔴 python/quadrants/lang/stream.py 0% 2,131-132,138,141
🔴 tests/python/test_streams.py 24% 342-344,346-353,355-358,364-367,369-378,380-383,385-390,396,398-399,401-412,414-427,429,432-434,436-440,442-446,448-450,454,474-478

Diff coverage: 28% · Overall: 65% · 224 lines, 161 missing

Full annotated report

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants