Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions src/a2a3/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ struct AicpuExecutor {
struct alignas(64) SyncStartDrainState {
std::atomic<int32_t> sync_start_pending{0}; // 0=normal; -1=initializing; >0=active (value=block_num)
std::atomic<int32_t> drain_worker_elected{0}; // 0=none; >0: elected thread's (thread_idx+1)
std::atomic<uint32_t> drain_ack_mask{0}; // bit per thread; all-set = all threads finished dispatch
std::atomic<uint32_t> drain_ack_mask{0}; // bit per thread; all-set = all threads reached ack barrier
PTO2TaskSlotState *pending_task{nullptr}; // held task (not re-queued)
int32_t _pad[10];
};
Expand Down Expand Up @@ -811,12 +811,13 @@ struct AicpuExecutor {

// Called by each scheduler thread when drain_state_.sync_start_pending != 0.
//
// Three-phase protocol:
// 1. Ack barrier: all threads signal they've stopped Phase 2 dispatch.
// If not all acked yet, return to Phase 1 (completion polling).
// 2. Resource check: elected thread verifies global idle resources >= block_num.
// If insufficient, reset election state and return — all threads resume
// Phase 1 to free running cores, then retry next iteration.
// Protocol (single-stage ack barrier):
// 1. Ack barrier: all threads signal they've stopped dispatch, then spin
// until all ack bits are set.
// If this thread's bit gets cleared while waiting, a reset occurred — return.
// 2. Election: one thread wins the CAS and becomes the drain worker.
// If resources are insufficient, reset ack/election fields and return —
// all threads resume completion polling to free running cores, then retry.
// 3. Dispatch: elected thread dispatches all blocks (one pass, resources guaranteed).
// Non-elected threads spin-wait until sync_start_pending == 0.
// During dispatch the elected thread has exclusive tracker access.
Expand All @@ -834,14 +835,21 @@ struct AicpuExecutor {
} while (block_num < 0);
if (block_num == 0) return;

// Phase 1: Ack barrier — signal this thread has stopped Phase 2 dispatch.
uint32_t all_acked = (1u << active_sched_threads_) - 1;

// Ack barrier — signal this thread has stopped dispatch.
drain_state_.drain_ack_mask.fetch_or(1u << thread_idx, std::memory_order_release);

// If not all threads have acked, return to do Phase 1 (completion polling).
if ((drain_state_.drain_ack_mask.load(std::memory_order_acquire) & all_acked) != all_acked) return;
// Spin until all threads have acked.
// If our bit is cleared while waiting, elected reset due to insufficient resources.
while (true) {
uint32_t ack = drain_state_.drain_ack_mask.load(std::memory_order_acquire);
if ((ack & all_acked) == all_acked) break;
if ((ack & (1u << thread_idx)) == 0) return;
SPIN_WAIT_HINT();
}

// Phase 2: Election — exactly one thread wins the CAS.
// Election — exactly one thread wins the CAS.
int32_t expected = 0;
drain_state_.drain_worker_elected.compare_exchange_strong(
expected, thread_idx + 1, std::memory_order_acquire, std::memory_order_relaxed
Expand All @@ -862,13 +870,14 @@ struct AicpuExecutor {
int32_t available = count_global_available(shape);

if (available < block_num) {
// Insufficient resources — reset election, let all threads do Phase 1.
drain_state_.drain_ack_mask.store(0, std::memory_order_relaxed);
// Insufficient resources — reset drain fields so threads can resume
// completion polling to free running cores, then retry.
drain_state_.drain_ack_mask.store(0, std::memory_order_release);
drain_state_.drain_worker_elected.store(0, std::memory_order_release);
return;
}

// Phase 3: Dispatch — all other threads are spinning, exclusive tracker access.
// Dispatch — all other threads are spinning, elected thread has exclusive tracker access.
drain_worker_dispatch(
runtime, block_num
#if PTO2_PROFILING
Expand Down Expand Up @@ -1460,8 +1469,6 @@ int32_t AicpuExecutor::resolve_and_dispatch_pto2(Runtime *runtime, int32_t threa

// Phase 2 drain check: if a sync_start task is waiting for resources,
// pause normal dispatch and let the drain protocol run.
// relaxed load is enough — drain state only needs to be visible within
// a few iterations; exact ordering is enforced inside handle_drain_mode.
if (drain_state_.sync_start_pending.load(std::memory_order_acquire) != 0) {
handle_drain_mode(
runtime, thread_idx
Expand Down
37 changes: 23 additions & 14 deletions src/a5/runtime/tensormap_and_ringbuffer/aicpu/aicpu_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ struct AicpuExecutor {
struct alignas(64) SyncStartDrainState {
std::atomic<int32_t> sync_start_pending{0}; // 0=normal; -1=initializing; >0=active (value=block_num)
std::atomic<int32_t> drain_worker_elected{0}; // 0=none; >0: elected thread's (thread_idx+1)
std::atomic<uint32_t> drain_ack_mask{0}; // bit per thread; all-set = all threads finished dispatch
std::atomic<uint32_t> drain_ack_mask{0}; // bit per thread; all-set = all threads reached ack barrier
PTO2TaskSlotState *pending_task{nullptr}; // held task (not re-queued)
int32_t _pad[10];
};
Expand Down Expand Up @@ -799,12 +799,13 @@ struct AicpuExecutor {

// Called by each scheduler thread when drain_state_.sync_start_pending != 0.
//
// Three-phase protocol:
// 1. Ack barrier: all threads signal they've stopped Phase 2 dispatch.
// If not all acked yet, return to Phase 1 (completion polling).
// 2. Resource check: elected thread verifies global idle resources >= block_num.
// If insufficient, reset election state and return — all threads resume
// Phase 1 to free running cores, then retry next iteration.
// Protocol (single-stage ack barrier):
// 1. Ack barrier: all threads signal they've stopped dispatch, then spin
// until all ack bits are set.
// If this thread's bit gets cleared while waiting, a reset occurred — return.
// 2. Election: one thread wins the CAS and becomes the drain worker.
// If resources are insufficient, reset ack/election fields and return —
// all threads resume completion polling to free running cores, then retry.
// 3. Dispatch: elected thread dispatches all blocks (one pass, resources guaranteed).
// Non-elected threads spin-wait until sync_start_pending == 0.
// During dispatch the elected thread has exclusive tracker access.
Expand All @@ -822,14 +823,21 @@ struct AicpuExecutor {
} while (block_num < 0);
if (block_num == 0) return;

// Phase 1: Ack barrier — signal this thread has stopped Phase 2 dispatch.
uint32_t all_acked = (1u << active_sched_threads_) - 1;

// Ack barrier — signal this thread has stopped dispatch.
drain_state_.drain_ack_mask.fetch_or(1u << thread_idx, std::memory_order_release);

// If not all threads have acked, return to do Phase 1 (completion polling).
if ((drain_state_.drain_ack_mask.load(std::memory_order_acquire) & all_acked) != all_acked) return;
// Spin until all threads have acked.
// If our bit is cleared while waiting, elected reset due to insufficient resources.
while (true) {
uint32_t ack = drain_state_.drain_ack_mask.load(std::memory_order_acquire);
if ((ack & all_acked) == all_acked) break;
if ((ack & (1u << thread_idx)) == 0) return;
SPIN_WAIT_HINT();
}

// Phase 2: Election — exactly one thread wins the CAS.
// Election — exactly one thread wins the CAS.
int32_t expected = 0;
drain_state_.drain_worker_elected.compare_exchange_strong(
expected, thread_idx + 1, std::memory_order_acquire, std::memory_order_relaxed
Expand All @@ -850,13 +858,14 @@ struct AicpuExecutor {
int32_t available = count_global_available(shape);

if (available < block_num) {
// Insufficient resources — reset election, let all threads do Phase 1.
drain_state_.drain_ack_mask.store(0, std::memory_order_relaxed);
// Insufficient resources — reset drain fields so threads can resume
// completion polling to free running cores, then retry.
drain_state_.drain_ack_mask.store(0, std::memory_order_release);
drain_state_.drain_worker_elected.store(0, std::memory_order_release);
return;
}

// Phase 3: Dispatch — all other threads are spinning, exclusive tracker access.
// Dispatch — all other threads are spinning, elected thread has exclusive tracker access.
drain_worker_dispatch(
block_num
#if PTO2_PROFILING
Expand Down
Loading