diff --git a/ci.py b/ci.py index c8cb7f871..5e4035df0 100644 --- a/ci.py +++ b/ci.py @@ -665,6 +665,7 @@ def _run_device_worker_subprocess( pto_isa_commit: str | None = None, print_log_on_fail: bool = False, quiet: bool = True, + timeout: int | None = None, ) -> list[TaskResult]: """Run a task batch in one device-worker subprocess and return its reported results. @@ -703,9 +704,11 @@ def _run_device_worker_subprocess( logger.info(f"[{tag}:dev{device_id}] Launching: {' '.join(full_cmd)}") try: if quiet: - proc = subprocess.run(full_cmd, check=False, capture_output=True, text=True) + proc = subprocess.run(full_cmd, check=False, capture_output=True, text=True, timeout=timeout) else: - proc = subprocess.run(full_cmd, check=False, stdout=None, stderr=subprocess.PIPE, text=True) + proc = subprocess.run( + full_cmd, check=False, stdout=None, stderr=subprocess.PIPE, text=True, timeout=timeout + ) device_results = _read_results_json(result_path) if proc.returncode != 0: if print_log_on_fail and quiet: @@ -732,6 +735,24 @@ def _run_device_worker_subprocess( ) ) return device_results + except subprocess.TimeoutExpired: + logger.error(f"[{tag}:dev{device_id}] Subprocess timed out after {timeout}s") + device_results = _read_results_json(result_path) + reported_names = {r.name for r in device_results} + for t in tasks: + if t.name not in reported_names: + device_results.append( + TaskResult( + name=t.name, + platform=t.platform, + passed=False, + device=str(device_id), + attempt=0, + elapsed_s=0, + error=f"Timed out after {timeout}s", + ) + ) + return device_results finally: task_list_path.unlink(missing_ok=True) result_path.unlink(missing_ok=True) @@ -756,39 +777,6 @@ def _normalize_task_result( ) -def run_sim_tasks_subprocess( - tasks: list[TaskSpec], - args: argparse.Namespace, - pto_isa_commit: str | None = None, -) -> list[TaskResult]: - """Run simulation tasks: one subprocess per runtime group. - - Tasks sharing the same runtime reuse a single ChipWorker within their - subprocess. Different runtimes get separate subprocesses so the host SO - is never dlclose/dlopen'd within a single process. - """ - groups: dict[str, list[TaskSpec]] = {} - for t in tasks: - groups.setdefault(t.runtime_name, []).append(t) - - is_pin_retry = pto_isa_commit is not None - results: list[TaskResult] = [] - for rt_name, group_tasks in groups.items(): - logger.info(f"[sim] Launching subprocess for runtime {rt_name} ({len(group_tasks)} task(s))") - results.extend( - _run_device_worker_subprocess( - group_tasks, - 0, - args, - tag="sim", - pto_isa_commit=pto_isa_commit, - print_log_on_fail=is_pin_retry, - quiet=False, - ) - ) - return results - - def run_hw_tasks_subprocess( tasks: list[TaskSpec], devices: list[int], @@ -992,7 +980,13 @@ def _run_tasks_on_device( pto_isa_root: str, args: argparse.Namespace, ) -> list[TaskResult]: - """Compile and run all tasks on a single device. Returns all TaskResults.""" + """Compile and run all tasks on a single device. Returns all TaskResults. + + For simulation platforms with sufficient CPUs, tasks are distributed + across multiple virtual device IDs and executed in parallel threads. + ChipWorker.run() internally uses std::thread + join, so GIL is released + during execution, enabling true parallelism. + """ logger.info(f"Compiling {len(tasks)} tasks...") try: compiled = compile_all_tasks( @@ -1012,6 +1006,60 @@ def _run_tasks_on_device( for t in tasks ] + is_sim = platform.endswith("sim") + if is_sim: + cpu_count = os.cpu_count() or 1 + max_workers = min(max(cpu_count // 20, 1), len(compiled)) + else: + max_workers = 1 + + if max_workers <= 1: + return _run_compiled_tasks(compiled, device_id, platform) + + # Parallel: distribute tasks round-robin across virtual device IDs + buckets: list[list[CompiledTask]] = [[] for _ in range(max_workers)] + for i, ct in enumerate(compiled): + buckets[i % max_workers].append(ct) + + logger.info(f"[sim] Parallel execution: {max_workers} workers, {len(compiled)} tasks") + + results: list[TaskResult] = [] + results_lock = Lock() + completed_count = [0] + total = len(compiled) + + def _worker(worker_id: int, worker_tasks: list[CompiledTask]): + dev_id = worker_id + worker_results = _run_compiled_tasks(worker_tasks, dev_id, platform) + with results_lock: + for r in worker_results: + completed_count[0] += 1 + n = completed_count[0] + results.append(r) + status = "PASS" if r.passed else "FAIL" + logger.info(f"[dev{dev_id}] [{n}/{total}] {status}: {r.name} ({r.elapsed_s:.1f}s)") + + threads = [] + for i in range(max_workers): + if not buckets[i]: + continue + t = Thread(target=_worker, args=(i, buckets[i])) + t.start() + threads.append(t) + + for t in threads: + t.join() + + return results + + +def _run_compiled_tasks( + compiled: list[CompiledTask], + device_id: int, + platform: str, +) -> list[TaskResult]: + """Run compiled tasks serially on a single device.""" + groups = group_by_runtime(compiled) all_results: list[TaskResult] = [] @@ -1163,16 +1211,13 @@ def _run_single_platform(platform: str, args: argparse.Namespace) -> list[TaskRe return [] logger.info(f"[{platform}] Discovered {len(tasks)} tasks") - # Compile and run. - # Both sim and hw use subprocess isolation (different runtimes cannot share a process). - # Within each subprocess, tasks with the same runtime share a ChipWorker. - # Override platform in args for subprocess spawning. + # Compile and run via subprocess isolation. + # Sim: single subprocess with all tasks (ChipWorker reuse + parallel within). + # HW: one subprocess per task with device-level quarantine. sub_args = argparse.Namespace(**vars(args)) sub_args.platform = platform if is_sim: - all_results = _run_with_timeout( - f"{platform} initial pass", args.timeout, lambda: run_sim_tasks_subprocess(tasks, sub_args) - ) + all_results = _run_device_worker_subprocess(tasks, 0, sub_args, tag="sim", timeout=args.timeout, quiet=False) else: all_results = _run_with_timeout( f"{platform} initial pass", @@ -1191,10 +1236,15 @@ def _run_single_platform(platform: str, args: argparse.Namespace) -> list[TaskRe failed_tasks = [t for t in tasks if t.name in failed_names] logger.info(f"[{platform}] {len(failed_tasks)} failure(s), retrying with pinned PTO-ISA {args.pto_isa_commit}") if is_sim: - pin_results = _run_with_timeout( - f"{platform} pin retry", - args.timeout, - lambda: run_sim_tasks_subprocess(failed_tasks, sub_args, pto_isa_commit=args.pto_isa_commit), + pin_results = _run_device_worker_subprocess( + failed_tasks, + 0, + sub_args, + tag="sim", + pto_isa_commit=args.pto_isa_commit, + print_log_on_fail=True, + quiet=False, + timeout=args.timeout, ) else: pin_results = _run_with_timeout( diff --git a/src/a2a3/platform/onboard/host/device_runner.cpp b/src/a2a3/platform/onboard/host/device_runner.cpp index fa92ef0f4..41d2235c8 100644 --- a/src/a2a3/platform/onboard/host/device_runner.cpp +++ b/src/a2a3/platform/onboard/host/device_runner.cpp @@ -230,7 +230,6 @@ std::thread DeviceRunner::create_thread(std::function fn) { return std::thread([dev_id, fn = std::move(fn)]() { rtSetDevice(dev_id); fn(); - rtDeviceReset(dev_id); }); } @@ -248,20 +247,20 @@ int DeviceRunner::ensure_device_initialized( } int DeviceRunner::ensure_device_set(int device_id) { - // Check if already initialized - if (stream_aicpu_ != nullptr) { - return 0; - } - - device_id_ = device_id; - - // Set device + // Always set device for the calling thread (CANN device context is per-thread) int rc = rtSetDevice(device_id); if (rc != 0) { LOG_ERROR("rtSetDevice(%d) failed: %d", device_id, rc); return rc; } + // Create streams only on first call + if (stream_aicpu_ != nullptr) { + return 0; + } + + device_id_ = device_id; + // Create streams rc = rtStreamCreate(&stream_aicpu_, 0); if (rc != 0) { @@ -281,6 +280,19 @@ int DeviceRunner::ensure_device_set(int device_id) { return 0; } +void DeviceRunner::reset_device_context() { + // Destroy streams (they belong to the current thread's CANN context) + if (stream_aicpu_ != nullptr) { + rtStreamDestroy(stream_aicpu_); + stream_aicpu_ = nullptr; + } + if (stream_aicore_ != nullptr) { + rtStreamDestroy(stream_aicore_); + stream_aicore_ = nullptr; + } + rtDeviceReset(device_id_); +} + int DeviceRunner::ensure_binaries_loaded( const std::vector &aicpu_so_binary, const std::vector &aicore_kernel_binary ) { diff --git a/src/a2a3/platform/onboard/host/device_runner.h b/src/a2a3/platform/onboard/host/device_runner.h index b0bdbb41b..0c7598363 100644 --- a/src/a2a3/platform/onboard/host/device_runner.h +++ b/src/a2a3/platform/onboard/host/device_runner.h @@ -365,6 +365,13 @@ class DeviceRunner { */ int ensure_device_set(int device_id); + /** + * Reset per-thread CANN device context and clear cached streams. + * Called after each run_runtime() completes so the next run on a + * fresh thread can recreate streams in its own context. + */ + void reset_device_context(); + private: // Internal state int device_id_{-1}; diff --git a/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp b/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp index eff903896..ceab62b52 100644 --- a/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp +++ b/src/a2a3/platform/onboard/host/pto_runtime_c_api.cpp @@ -113,12 +113,9 @@ void destroy_device_context(DeviceContextHandle ctx) { delete static_cast(ctx)->ensure_device_set(device_id); - } catch (...) { - return -1; - } + (void)ctx; + (void)device_id; + return 0; } int run_runtime( @@ -129,58 +126,68 @@ int run_runtime( if (ctx == NULL || runtime == NULL) return -1; if (aicpu_binary == NULL || aicpu_size == 0 || aicore_binary == NULL || aicore_size == 0) return -1; - pthread_once(&g_runner_key_once, create_runner_key); - pthread_setspecific(g_runner_key, ctx); DeviceRunner *runner = static_cast(ctx); - try { - // Phase 1: placement new + build graph - Runtime *r = new (runtime) Runtime(); - r->host_api.device_malloc = device_malloc; - r->host_api.device_free = device_free; - r->host_api.copy_to_device = copy_to_device; - r->host_api.copy_from_device = copy_from_device; - r->host_api.upload_kernel_binary = upload_kernel_binary_wrapper; - r->host_api.remove_kernel_binary = remove_kernel_binary_wrapper; - - LOG_DEBUG("About to call init_runtime_impl, r=%p", (void *)r); - int rc = init_runtime_impl( - r, reinterpret_cast(callable), reinterpret_cast(args) - ); - LOG_DEBUG("init_runtime_impl returned: %d", rc); - if (rc != 0) { - r->set_pto2_gm_sm_ptr(nullptr); - validate_runtime_impl(r); - r->~Runtime(); - pthread_setspecific(g_runner_key, nullptr); - return rc; - } - - // Phase 2: profiling - if (enable_profiling) { - r->enable_profiling = true; - } - - // Phase 3: launch - std::vector aicpu_vec(aicpu_binary, aicpu_binary + aicpu_size); - std::vector aicore_vec(aicore_binary, aicore_binary + aicore_size); - rc = runner->run(*r, block_dim, device_id, aicpu_vec, aicore_vec, aicpu_thread_num); - if (rc != 0) { - validate_runtime_impl(r); - r->~Runtime(); - pthread_setspecific(g_runner_key, nullptr); - return rc; - } - - // Phase 4: finalize (copy results back) - rc = validate_runtime_impl(r); - r->~Runtime(); - pthread_setspecific(g_runner_key, nullptr); - return rc; - } catch (...) { - pthread_setspecific(g_runner_key, nullptr); - return -1; - } + int rc = -1; + runner + ->create_thread([&]() { + pthread_once(&g_runner_key_once, create_runner_key); + pthread_setspecific(g_runner_key, ctx); + + try { + // Ensure device + streams are initialized on this thread + rc = runner->ensure_device_set(device_id); + if (rc != 0) return; + + // Phase 1: placement new + build graph + Runtime *r = new (runtime) Runtime(); + r->host_api.device_malloc = device_malloc; + r->host_api.device_free = device_free; + r->host_api.copy_to_device = copy_to_device; + r->host_api.copy_from_device = copy_from_device; + r->host_api.upload_kernel_binary = upload_kernel_binary_wrapper; + r->host_api.remove_kernel_binary = remove_kernel_binary_wrapper; + + LOG_DEBUG("About to call init_runtime_impl, r=%p", (void *)r); + rc = init_runtime_impl( + r, reinterpret_cast(callable), + reinterpret_cast(args) + ); + LOG_DEBUG("init_runtime_impl returned: %d", rc); + if (rc != 0) { + r->set_pto2_gm_sm_ptr(nullptr); + validate_runtime_impl(r); + r->~Runtime(); + return; + } + + // Phase 2: profiling + if (enable_profiling) { + r->enable_profiling = true; + } + + // Phase 3: launch + std::vector aicpu_vec(aicpu_binary, aicpu_binary + aicpu_size); + std::vector aicore_vec(aicore_binary, aicore_binary + aicore_size); + rc = runner->run(*r, block_dim, device_id, aicpu_vec, aicore_vec, aicpu_thread_num); + if (rc != 0) { + validate_runtime_impl(r); + r->~Runtime(); + return; + } + + // Phase 4: finalize (copy results back) + rc = validate_runtime_impl(r); + r->~Runtime(); + runner->reset_device_context(); + } catch (...) { + runner->reset_device_context(); + rc = -1; + } + }) + .join(); + + return rc; } int finalize_device(DeviceContextHandle ctx) { diff --git a/src/a5/platform/onboard/host/device_runner.cpp b/src/a5/platform/onboard/host/device_runner.cpp index 064567a90..e451a5efa 100644 --- a/src/a5/platform/onboard/host/device_runner.cpp +++ b/src/a5/platform/onboard/host/device_runner.cpp @@ -166,20 +166,20 @@ int DeviceRunner::ensure_device_initialized( } int DeviceRunner::ensure_device_set(int device_id) { - // Check if already initialized - if (stream_aicpu_ != nullptr) { - return 0; - } - - device_id_ = device_id; - - // Set device + // Always set device for the calling thread (CANN device context is per-thread) int rc = rtSetDevice(device_id); if (rc != 0) { LOG_ERROR("rtSetDevice(%d) failed: %d", device_id, rc); return rc; } + // Check if streams already initialized + if (stream_aicpu_ != nullptr) { + return 0; + } + + device_id_ = device_id; + // Create streams rc = rtStreamCreate(&stream_aicpu_, 0); if (rc != 0) { @@ -199,6 +199,18 @@ int DeviceRunner::ensure_device_set(int device_id) { return 0; } +void DeviceRunner::reset_device_context() { + if (stream_aicpu_ != nullptr) { + rtStreamDestroy(stream_aicpu_); + stream_aicpu_ = nullptr; + } + if (stream_aicore_ != nullptr) { + rtStreamDestroy(stream_aicore_); + stream_aicore_ = nullptr; + } + rtDeviceReset(device_id_); +} + int DeviceRunner::ensure_binaries_loaded( const std::vector &aicpu_so_binary, const std::vector &aicore_kernel_binary ) { diff --git a/src/a5/platform/onboard/host/device_runner.h b/src/a5/platform/onboard/host/device_runner.h index 0a76800c4..6658f7221 100644 --- a/src/a5/platform/onboard/host/device_runner.h +++ b/src/a5/platform/onboard/host/device_runner.h @@ -329,6 +329,11 @@ class DeviceRunner { */ int ensure_device_set(int device_id); + /** + * Reset per-thread CANN device context and clear cached streams. + */ + void reset_device_context(); + private: // Internal state int device_id_{-1}; diff --git a/src/a5/platform/onboard/host/pto_runtime_c_api.cpp b/src/a5/platform/onboard/host/pto_runtime_c_api.cpp index eff903896..ceab62b52 100644 --- a/src/a5/platform/onboard/host/pto_runtime_c_api.cpp +++ b/src/a5/platform/onboard/host/pto_runtime_c_api.cpp @@ -113,12 +113,9 @@ void destroy_device_context(DeviceContextHandle ctx) { delete static_cast(ctx)->ensure_device_set(device_id); - } catch (...) { - return -1; - } + (void)ctx; + (void)device_id; + return 0; } int run_runtime( @@ -129,58 +126,68 @@ int run_runtime( if (ctx == NULL || runtime == NULL) return -1; if (aicpu_binary == NULL || aicpu_size == 0 || aicore_binary == NULL || aicore_size == 0) return -1; - pthread_once(&g_runner_key_once, create_runner_key); - pthread_setspecific(g_runner_key, ctx); DeviceRunner *runner = static_cast(ctx); - try { - // Phase 1: placement new + build graph - Runtime *r = new (runtime) Runtime(); - r->host_api.device_malloc = device_malloc; - r->host_api.device_free = device_free; - r->host_api.copy_to_device = copy_to_device; - r->host_api.copy_from_device = copy_from_device; - r->host_api.upload_kernel_binary = upload_kernel_binary_wrapper; - r->host_api.remove_kernel_binary = remove_kernel_binary_wrapper; - - LOG_DEBUG("About to call init_runtime_impl, r=%p", (void *)r); - int rc = init_runtime_impl( - r, reinterpret_cast(callable), reinterpret_cast(args) - ); - LOG_DEBUG("init_runtime_impl returned: %d", rc); - if (rc != 0) { - r->set_pto2_gm_sm_ptr(nullptr); - validate_runtime_impl(r); - r->~Runtime(); - pthread_setspecific(g_runner_key, nullptr); - return rc; - } - - // Phase 2: profiling - if (enable_profiling) { - r->enable_profiling = true; - } - - // Phase 3: launch - std::vector aicpu_vec(aicpu_binary, aicpu_binary + aicpu_size); - std::vector aicore_vec(aicore_binary, aicore_binary + aicore_size); - rc = runner->run(*r, block_dim, device_id, aicpu_vec, aicore_vec, aicpu_thread_num); - if (rc != 0) { - validate_runtime_impl(r); - r->~Runtime(); - pthread_setspecific(g_runner_key, nullptr); - return rc; - } - - // Phase 4: finalize (copy results back) - rc = validate_runtime_impl(r); - r->~Runtime(); - pthread_setspecific(g_runner_key, nullptr); - return rc; - } catch (...) { - pthread_setspecific(g_runner_key, nullptr); - return -1; - } + int rc = -1; + runner + ->create_thread([&]() { + pthread_once(&g_runner_key_once, create_runner_key); + pthread_setspecific(g_runner_key, ctx); + + try { + // Ensure device + streams are initialized on this thread + rc = runner->ensure_device_set(device_id); + if (rc != 0) return; + + // Phase 1: placement new + build graph + Runtime *r = new (runtime) Runtime(); + r->host_api.device_malloc = device_malloc; + r->host_api.device_free = device_free; + r->host_api.copy_to_device = copy_to_device; + r->host_api.copy_from_device = copy_from_device; + r->host_api.upload_kernel_binary = upload_kernel_binary_wrapper; + r->host_api.remove_kernel_binary = remove_kernel_binary_wrapper; + + LOG_DEBUG("About to call init_runtime_impl, r=%p", (void *)r); + rc = init_runtime_impl( + r, reinterpret_cast(callable), + reinterpret_cast(args) + ); + LOG_DEBUG("init_runtime_impl returned: %d", rc); + if (rc != 0) { + r->set_pto2_gm_sm_ptr(nullptr); + validate_runtime_impl(r); + r->~Runtime(); + return; + } + + // Phase 2: profiling + if (enable_profiling) { + r->enable_profiling = true; + } + + // Phase 3: launch + std::vector aicpu_vec(aicpu_binary, aicpu_binary + aicpu_size); + std::vector aicore_vec(aicore_binary, aicore_binary + aicore_size); + rc = runner->run(*r, block_dim, device_id, aicpu_vec, aicore_vec, aicpu_thread_num); + if (rc != 0) { + validate_runtime_impl(r); + r->~Runtime(); + return; + } + + // Phase 4: finalize (copy results back) + rc = validate_runtime_impl(r); + r->~Runtime(); + runner->reset_device_context(); + } catch (...) { + runner->reset_device_context(); + rc = -1; + } + }) + .join(); + + return rc; } int finalize_device(DeviceContextHandle ctx) { diff --git a/src/common/worker/chip_worker.cpp b/src/common/worker/chip_worker.cpp index 151045e7f..19b35603b 100644 --- a/src/common/worker/chip_worker.cpp +++ b/src/common/worker/chip_worker.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include namespace {