diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index 97afd6a4e..e1b561cce 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -311,7 +311,7 @@ struct AicpuExecutor { struct alignas(64) SyncStartDrainState { std::atomic sync_start_pending{0}; // 0=normal; -1=initializing; >0=active (value=block_num) std::atomic drain_worker_elected{0}; // 0=none; >0: elected thread's (thread_idx+1) - std::atomic drain_ack_mask{0}; // bit per thread; all-set = all threads finished dispatch + std::atomic drain_ack_mask{0}; // bit per thread; all-set = all threads reached ack barrier PTO2TaskSlotState *pending_task{nullptr}; // held task (not re-queued) int32_t _pad[10]; }; @@ -811,12 +811,13 @@ struct AicpuExecutor { // Called by each scheduler thread when drain_state_.sync_start_pending != 0. // - // Three-phase protocol: - // 1. Ack barrier: all threads signal they've stopped Phase 2 dispatch. - // If not all acked yet, return to Phase 1 (completion polling). - // 2. Resource check: elected thread verifies global idle resources >= block_num. - // If insufficient, reset election state and return — all threads resume - // Phase 1 to free running cores, then retry next iteration. + // Protocol (single-stage ack barrier): + // 1. Ack barrier: all threads signal they've stopped dispatch, then spin + // until all ack bits are set. + // If this thread's bit gets cleared while waiting, a reset occurred — return. + // 2. Election: one thread wins the CAS and becomes the drain worker. + // If resources are insufficient, reset ack/election fields and return — + // all threads resume completion polling to free running cores, then retry. // 3. Dispatch: elected thread dispatches all blocks (one pass, resources guaranteed). // Non-elected threads spin-wait until sync_start_pending == 0. // During dispatch the elected thread has exclusive tracker access. @@ -834,14 +835,21 @@ struct AicpuExecutor { } while (block_num < 0); if (block_num == 0) return; - // Phase 1: Ack barrier — signal this thread has stopped Phase 2 dispatch. uint32_t all_acked = (1u << active_sched_threads_) - 1; + + // Ack barrier — signal this thread has stopped dispatch. drain_state_.drain_ack_mask.fetch_or(1u << thread_idx, std::memory_order_release); - // If not all threads have acked, return to do Phase 1 (completion polling). - if ((drain_state_.drain_ack_mask.load(std::memory_order_acquire) & all_acked) != all_acked) return; + // Spin until all threads have acked. + // If our bit is cleared while waiting, elected reset due to insufficient resources. + while (true) { + uint32_t ack = drain_state_.drain_ack_mask.load(std::memory_order_acquire); + if ((ack & all_acked) == all_acked) break; + if ((ack & (1u << thread_idx)) == 0) return; + SPIN_WAIT_HINT(); + } - // Phase 2: Election — exactly one thread wins the CAS. + // Election — exactly one thread wins the CAS. int32_t expected = 0; drain_state_.drain_worker_elected.compare_exchange_strong( expected, thread_idx + 1, std::memory_order_acquire, std::memory_order_relaxed @@ -862,13 +870,14 @@ struct AicpuExecutor { int32_t available = count_global_available(shape); if (available < block_num) { - // Insufficient resources — reset election, let all threads do Phase 1. - drain_state_.drain_ack_mask.store(0, std::memory_order_relaxed); + // Insufficient resources — reset drain fields so threads can resume + // completion polling to free running cores, then retry. + drain_state_.drain_ack_mask.store(0, std::memory_order_release); drain_state_.drain_worker_elected.store(0, std::memory_order_release); return; } - // Phase 3: Dispatch — all other threads are spinning, exclusive tracker access. + // Dispatch — all other threads are spinning, elected thread has exclusive tracker access. drain_worker_dispatch( runtime, block_num #if PTO2_PROFILING @@ -1460,8 +1469,6 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime *runtime, int32_t threa // Phase 2 drain check: if a sync_start task is waiting for resources, // pause normal dispatch and let the drain protocol run. - // relaxed load is enough — drain state only needs to be visible within - // a few iterations; exact ordering is enforced inside handle_drain_mode. if (drain_state_.sync_start_pending.load(std::memory_order_acquire) != 0) { handle_drain_mode( runtime, thread_idx diff --git a/src/a5/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp b/src/a5/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp index dcf3d5658..cf2e022f4 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp +++ b/src/a5/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp @@ -310,7 +310,7 @@ struct AicpuExecutor { struct alignas(64) SyncStartDrainState { std::atomic sync_start_pending{0}; // 0=normal; -1=initializing; >0=active (value=block_num) std::atomic drain_worker_elected{0}; // 0=none; >0: elected thread's (thread_idx+1) - std::atomic drain_ack_mask{0}; // bit per thread; all-set = all threads finished dispatch + std::atomic drain_ack_mask{0}; // bit per thread; all-set = all threads reached ack barrier PTO2TaskSlotState *pending_task{nullptr}; // held task (not re-queued) int32_t _pad[10]; }; @@ -799,12 +799,13 @@ struct AicpuExecutor { // Called by each scheduler thread when drain_state_.sync_start_pending != 0. // - // Three-phase protocol: - // 1. Ack barrier: all threads signal they've stopped Phase 2 dispatch. - // If not all acked yet, return to Phase 1 (completion polling). - // 2. Resource check: elected thread verifies global idle resources >= block_num. - // If insufficient, reset election state and return — all threads resume - // Phase 1 to free running cores, then retry next iteration. + // Protocol (single-stage ack barrier): + // 1. Ack barrier: all threads signal they've stopped dispatch, then spin + // until all ack bits are set. + // If this thread's bit gets cleared while waiting, a reset occurred — return. + // 2. Election: one thread wins the CAS and becomes the drain worker. + // If resources are insufficient, reset ack/election fields and return — + // all threads resume completion polling to free running cores, then retry. // 3. Dispatch: elected thread dispatches all blocks (one pass, resources guaranteed). // Non-elected threads spin-wait until sync_start_pending == 0. // During dispatch the elected thread has exclusive tracker access. @@ -822,14 +823,21 @@ struct AicpuExecutor { } while (block_num < 0); if (block_num == 0) return; - // Phase 1: Ack barrier — signal this thread has stopped Phase 2 dispatch. uint32_t all_acked = (1u << active_sched_threads_) - 1; + + // Ack barrier — signal this thread has stopped dispatch. drain_state_.drain_ack_mask.fetch_or(1u << thread_idx, std::memory_order_release); - // If not all threads have acked, return to do Phase 1 (completion polling). - if ((drain_state_.drain_ack_mask.load(std::memory_order_acquire) & all_acked) != all_acked) return; + // Spin until all threads have acked. + // If our bit is cleared while waiting, elected reset due to insufficient resources. + while (true) { + uint32_t ack = drain_state_.drain_ack_mask.load(std::memory_order_acquire); + if ((ack & all_acked) == all_acked) break; + if ((ack & (1u << thread_idx)) == 0) return; + SPIN_WAIT_HINT(); + } - // Phase 2: Election — exactly one thread wins the CAS. + // Election — exactly one thread wins the CAS. int32_t expected = 0; drain_state_.drain_worker_elected.compare_exchange_strong( expected, thread_idx + 1, std::memory_order_acquire, std::memory_order_relaxed @@ -850,13 +858,14 @@ struct AicpuExecutor { int32_t available = count_global_available(shape); if (available < block_num) { - // Insufficient resources — reset election, let all threads do Phase 1. - drain_state_.drain_ack_mask.store(0, std::memory_order_relaxed); + // Insufficient resources — reset drain fields so threads can resume + // completion polling to free running cores, then retry. + drain_state_.drain_ack_mask.store(0, std::memory_order_release); drain_state_.drain_worker_elected.store(0, std::memory_order_release); return; } - // Phase 3: Dispatch — all other threads are spinning, exclusive tracker access. + // Dispatch — all other threads are spinning, elected thread has exclusive tracker access. drain_worker_dispatch( block_num #if PTO2_PROFILING