From 77647a28f5167cf80714a1982265c9ae9a46d35b Mon Sep 17 00:00:00 2001 From: yanghaoran29 Date: Mon, 13 Apr 2026 15:39:06 +0800 Subject: [PATCH] Fix: spin-wait ack barrier replaces check-and-return in sync_start drain protocol MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the non-blocking ack check (load and return if not all acked) with a spin-wait loop that blocks until all scheduler threads have set their bit in drain_ack_mask. This eliminates the window where a non-elected thread returns to the scheduler loop and resumes tracker writes while the drain worker already has exclusive tracker access. Remove drain_barrier_mask (the second atomic introduced as an intermediate step) — the single spin-wait on drain_ack_mask is sufficient for the full-stop guarantee. Reset detection uses drain_ack_mask bit-clear (release store on insufficient resources), not drain_worker_elected which remains zero until after the barrier completes. Also fix drain_ack_mask reset ordering: use memory_order_release instead of relaxed so the clearing store is visible to threads spinning on their own bit. --- .../aicpu/aicpu_executor.cpp | 39 +++++++++++-------- .../aicpu/aicpu_executor.cpp | 37 +++++++++++------- 2 files changed, 46 insertions(+), 30 deletions(-) diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index 97afd6a4e..e1b561cce 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -311,7 +311,7 @@ struct AicpuExecutor { struct alignas(64) SyncStartDrainState { std::atomic sync_start_pending{0}; // 0=normal; -1=initializing; >0=active (value=block_num) std::atomic drain_worker_elected{0}; // 0=none; >0: elected thread's (thread_idx+1) - std::atomic drain_ack_mask{0}; // bit per thread; all-set = all threads finished dispatch + std::atomic drain_ack_mask{0}; // bit per thread; all-set = all threads reached ack barrier PTO2TaskSlotState *pending_task{nullptr}; // held task (not re-queued) int32_t _pad[10]; }; @@ -811,12 +811,13 @@ struct AicpuExecutor { // Called by each scheduler thread when drain_state_.sync_start_pending != 0. // - // Three-phase protocol: - // 1. Ack barrier: all threads signal they've stopped Phase 2 dispatch. - // If not all acked yet, return to Phase 1 (completion polling). - // 2. Resource check: elected thread verifies global idle resources >= block_num. - // If insufficient, reset election state and return — all threads resume - // Phase 1 to free running cores, then retry next iteration. + // Protocol (single-stage ack barrier): + // 1. Ack barrier: all threads signal they've stopped dispatch, then spin + // until all ack bits are set. + // If this thread's bit gets cleared while waiting, a reset occurred — return. + // 2. Election: one thread wins the CAS and becomes the drain worker. + // If resources are insufficient, reset ack/election fields and return — + // all threads resume completion polling to free running cores, then retry. // 3. Dispatch: elected thread dispatches all blocks (one pass, resources guaranteed). // Non-elected threads spin-wait until sync_start_pending == 0. // During dispatch the elected thread has exclusive tracker access. @@ -834,14 +835,21 @@ struct AicpuExecutor { } while (block_num < 0); if (block_num == 0) return; - // Phase 1: Ack barrier — signal this thread has stopped Phase 2 dispatch. uint32_t all_acked = (1u << active_sched_threads_) - 1; + + // Ack barrier — signal this thread has stopped dispatch. drain_state_.drain_ack_mask.fetch_or(1u << thread_idx, std::memory_order_release); - // If not all threads have acked, return to do Phase 1 (completion polling). - if ((drain_state_.drain_ack_mask.load(std::memory_order_acquire) & all_acked) != all_acked) return; + // Spin until all threads have acked. + // If our bit is cleared while waiting, elected reset due to insufficient resources. + while (true) { + uint32_t ack = drain_state_.drain_ack_mask.load(std::memory_order_acquire); + if ((ack & all_acked) == all_acked) break; + if ((ack & (1u << thread_idx)) == 0) return; + SPIN_WAIT_HINT(); + } - // Phase 2: Election — exactly one thread wins the CAS. + // Election — exactly one thread wins the CAS. int32_t expected = 0; drain_state_.drain_worker_elected.compare_exchange_strong( expected, thread_idx + 1, std::memory_order_acquire, std::memory_order_relaxed @@ -862,13 +870,14 @@ struct AicpuExecutor { int32_t available = count_global_available(shape); if (available < block_num) { - // Insufficient resources — reset election, let all threads do Phase 1. - drain_state_.drain_ack_mask.store(0, std::memory_order_relaxed); + // Insufficient resources — reset drain fields so threads can resume + // completion polling to free running cores, then retry. + drain_state_.drain_ack_mask.store(0, std::memory_order_release); drain_state_.drain_worker_elected.store(0, std::memory_order_release); return; } - // Phase 3: Dispatch — all other threads are spinning, exclusive tracker access. + // Dispatch — all other threads are spinning, elected thread has exclusive tracker access. drain_worker_dispatch( runtime, block_num #if PTO2_PROFILING @@ -1460,8 +1469,6 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime *runtime, int32_t threa // Phase 2 drain check: if a sync_start task is waiting for resources, // pause normal dispatch and let the drain protocol run. - // relaxed load is enough — drain state only needs to be visible within - // a few iterations; exact ordering is enforced inside handle_drain_mode. if (drain_state_.sync_start_pending.load(std::memory_order_acquire) != 0) { handle_drain_mode( runtime, thread_idx diff --git a/src/a5/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/a5/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index dcf3d5658..cf2e022f4 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/a5/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -310,7 +310,7 @@ struct AicpuExecutor { struct alignas(64) SyncStartDrainState { std::atomic sync_start_pending{0}; // 0=normal; -1=initializing; >0=active (value=block_num) std::atomic drain_worker_elected{0}; // 0=none; >0: elected thread's (thread_idx+1) - std::atomic drain_ack_mask{0}; // bit per thread; all-set = all threads finished dispatch + std::atomic drain_ack_mask{0}; // bit per thread; all-set = all threads reached ack barrier PTO2TaskSlotState *pending_task{nullptr}; // held task (not re-queued) int32_t _pad[10]; }; @@ -799,12 +799,13 @@ struct AicpuExecutor { // Called by each scheduler thread when drain_state_.sync_start_pending != 0. // - // Three-phase protocol: - // 1. Ack barrier: all threads signal they've stopped Phase 2 dispatch. - // If not all acked yet, return to Phase 1 (completion polling). - // 2. Resource check: elected thread verifies global idle resources >= block_num. - // If insufficient, reset election state and return — all threads resume - // Phase 1 to free running cores, then retry next iteration. + // Protocol (single-stage ack barrier): + // 1. Ack barrier: all threads signal they've stopped dispatch, then spin + // until all ack bits are set. + // If this thread's bit gets cleared while waiting, a reset occurred — return. + // 2. Election: one thread wins the CAS and becomes the drain worker. + // If resources are insufficient, reset ack/election fields and return — + // all threads resume completion polling to free running cores, then retry. // 3. Dispatch: elected thread dispatches all blocks (one pass, resources guaranteed). // Non-elected threads spin-wait until sync_start_pending == 0. // During dispatch the elected thread has exclusive tracker access. @@ -822,14 +823,21 @@ struct AicpuExecutor { } while (block_num < 0); if (block_num == 0) return; - // Phase 1: Ack barrier — signal this thread has stopped Phase 2 dispatch. uint32_t all_acked = (1u << active_sched_threads_) - 1; + + // Ack barrier — signal this thread has stopped dispatch. drain_state_.drain_ack_mask.fetch_or(1u << thread_idx, std::memory_order_release); - // If not all threads have acked, return to do Phase 1 (completion polling). - if ((drain_state_.drain_ack_mask.load(std::memory_order_acquire) & all_acked) != all_acked) return; + // Spin until all threads have acked. + // If our bit is cleared while waiting, elected reset due to insufficient resources. + while (true) { + uint32_t ack = drain_state_.drain_ack_mask.load(std::memory_order_acquire); + if ((ack & all_acked) == all_acked) break; + if ((ack & (1u << thread_idx)) == 0) return; + SPIN_WAIT_HINT(); + } - // Phase 2: Election — exactly one thread wins the CAS. + // Election — exactly one thread wins the CAS. int32_t expected = 0; drain_state_.drain_worker_elected.compare_exchange_strong( expected, thread_idx + 1, std::memory_order_acquire, std::memory_order_relaxed @@ -850,13 +858,14 @@ struct AicpuExecutor { int32_t available = count_global_available(shape); if (available < block_num) { - // Insufficient resources — reset election, let all threads do Phase 1. - drain_state_.drain_ack_mask.store(0, std::memory_order_relaxed); + // Insufficient resources — reset drain fields so threads can resume + // completion polling to free running cores, then retry. + drain_state_.drain_ack_mask.store(0, std::memory_order_release); drain_state_.drain_worker_elected.store(0, std::memory_order_release); return; } - // Phase 3: Dispatch — all other threads are spinning, exclusive tracker access. + // Dispatch — all other threads are spinning, elected thread has exclusive tracker access. drain_worker_dispatch( block_num #if PTO2_PROFILING