diff --git a/docs/source/user_guide/streams.md b/docs/source/user_guide/streams.md index b4b70b774b..fff357b759 100644 --- a/docs/source/user_guide/streams.md +++ b/docs/source/user_guide/streams.md @@ -7,6 +7,7 @@ Streams allow concurrent execution of GPU operations. By default, all Quadrants | Backend | Streams | Events | Notes | |---------|---------|--------|-------| | CUDA | Yes | Yes | Full concurrent execution | +| AMDGPU | Yes | Yes | Full concurrent execution (requires ROCm >= 5.4) | | CPU | No-op | No-op | `qd_stream` is silently ignored, kernels run serially | | Metal | No-op | No-op | `qd_stream` is silently ignored, kernels run serially | | Vulkan | No-op | No-op | `qd_stream` is silently ignored, kernels run serially | diff --git a/quadrants/program/program_stream.cpp b/quadrants/program/program_stream.cpp index 8a7431532a..9686a86332 100644 --- a/quadrants/program/program_stream.cpp +++ b/quadrants/program/program_stream.cpp @@ -7,6 +7,11 @@ #include "quadrants/rhi/cuda/cuda_context.h" #endif +#ifdef QD_WITH_AMDGPU +#include "quadrants/rhi/amdgpu/amdgpu_driver.h" +#include "quadrants/rhi/amdgpu/amdgpu_context.h" +#endif + namespace quadrants::lang { // --------------------------------------------------------------------------- @@ -21,6 +26,14 @@ uint64 StreamManager::create_stream() { CUDADriver::get_instance().stream_create(&stream, 0x1 /*CU_STREAM_NON_BLOCKING*/); return reinterpret_cast(stream); } +#endif +#ifdef QD_WITH_AMDGPU + if (arch_ == Arch::amdgpu) { + AMDGPUContext::get_instance().make_current(); + void *stream = nullptr; + AMDGPUDriver::get_instance().stream_create(&stream, 0x1 /*HIP_STREAM_NON_BLOCKING*/); + return reinterpret_cast(stream); + } #endif return 0; } @@ -32,6 +45,12 @@ void StreamManager::destroy_stream(uint64 stream_handle) { CUDADriver::get_instance().stream_destroy(reinterpret_cast(stream_handle)); } #endif +#ifdef QD_WITH_AMDGPU + if (arch_ == Arch::amdgpu && stream_handle != 0) { + AMDGPUContext::get_instance().make_current(); + AMDGPUDriver::get_instance().stream_destroy(reinterpret_cast(stream_handle)); + } +#endif } void StreamManager::synchronize_stream(uint64 stream_handle) { @@ -41,6 +60,12 @@ void StreamManager::synchronize_stream(uint64 stream_handle) { CUDADriver::get_instance().stream_synchronize(reinterpret_cast(stream_handle)); } #endif +#ifdef QD_WITH_AMDGPU + if (arch_ == Arch::amdgpu) { + AMDGPUContext::get_instance().make_current(); + AMDGPUDriver::get_instance().stream_synchronize(reinterpret_cast(stream_handle)); + } +#endif } void StreamManager::set_current_stream(uint64 stream_handle) { @@ -50,6 +75,12 @@ void StreamManager::set_current_stream(uint64 stream_handle) { CUDAContext::get_instance().set_stream(reinterpret_cast(stream_handle)); } #endif +#ifdef QD_WITH_AMDGPU + if (arch_ == Arch::amdgpu) { + AMDGPUContext::get_instance().make_current(); + AMDGPUContext::get_instance().set_stream(reinterpret_cast(stream_handle)); + } +#endif } uint64 StreamManager::create_event() { @@ -60,6 +91,14 @@ uint64 StreamManager::create_event() { CUDADriver::get_instance().event_create(&event, 0x02 /*CU_EVENT_DISABLE_TIMING*/); return reinterpret_cast(event); } +#endif +#ifdef QD_WITH_AMDGPU + if (arch_ == Arch::amdgpu) { + AMDGPUContext::get_instance().make_current(); + void *event = nullptr; + AMDGPUDriver::get_instance().event_create(&event, 0x02 /*hipEventDisableTiming*/); + return reinterpret_cast(event); + } #endif return 0; } @@ -71,6 +110,12 @@ void StreamManager::destroy_event(uint64 event_handle) { CUDADriver::get_instance().event_destroy(reinterpret_cast(event_handle)); } #endif +#ifdef QD_WITH_AMDGPU + if (arch_ == Arch::amdgpu && event_handle != 0) { + AMDGPUContext::get_instance().make_current(); + AMDGPUDriver::get_instance().event_destroy(reinterpret_cast(event_handle)); + } +#endif } void StreamManager::record_event(uint64 event_handle, uint64 stream_handle) { @@ -81,6 +126,13 @@ void StreamManager::record_event(uint64 event_handle, uint64 stream_handle) { reinterpret_cast(stream_handle)); } #endif +#ifdef QD_WITH_AMDGPU + if (arch_ == Arch::amdgpu && event_handle != 0) { + AMDGPUContext::get_instance().make_current(); + AMDGPUDriver::get_instance().event_record(reinterpret_cast(event_handle), + reinterpret_cast(stream_handle)); + } +#endif } void StreamManager::synchronize_event(uint64 event_handle) { @@ -90,6 +142,12 @@ void StreamManager::synchronize_event(uint64 event_handle) { CUDADriver::get_instance().event_synchronize(reinterpret_cast(event_handle)); } #endif +#ifdef QD_WITH_AMDGPU + if (arch_ == Arch::amdgpu && event_handle != 0) { + AMDGPUContext::get_instance().make_current(); + AMDGPUDriver::get_instance().event_synchronize(reinterpret_cast(event_handle)); + } +#endif } void StreamManager::stream_wait_event(uint64 stream_handle, uint64 event_handle) { @@ -100,6 +158,13 @@ void StreamManager::stream_wait_event(uint64 stream_handle, uint64 event_handle) reinterpret_cast(event_handle), 0 /*flags*/); } #endif +#ifdef QD_WITH_AMDGPU + if (arch_ == Arch::amdgpu && event_handle != 0) { + AMDGPUContext::get_instance().make_current(); + AMDGPUDriver::get_instance().stream_wait_event(reinterpret_cast(stream_handle), + reinterpret_cast(event_handle), 0 /*flags*/); + } +#endif } } // namespace quadrants::lang diff --git a/quadrants/rhi/amdgpu/amdgpu_context.cpp b/quadrants/rhi/amdgpu/amdgpu_context.cpp index ae5f40d1f2..5f1a125c59 100644 --- a/quadrants/rhi/amdgpu/amdgpu_context.cpp +++ b/quadrants/rhi/amdgpu/amdgpu_context.cpp @@ -13,6 +13,8 @@ namespace quadrants { namespace lang { +thread_local void *AMDGPUContext::stream_ = nullptr; + AMDGPUContext::AMDGPUContext() : driver_(AMDGPUDriver::get_instance_without_context()) { dev_count_ = 0; driver_.init(0); @@ -190,7 +192,7 @@ void AMDGPUContext::launch(void *func, if (grid_dim > 0) { std::lock_guard _(lock_); void *config[] = {(void *)0x01, (void *)packed_arg, (void *)0x02, (void *)&pack_size, (void *)0x03}; - driver_.launch_kernel(func, grid_dim, 1, 1, block_dim, 1, 1, dynamic_shared_mem_bytes, nullptr, nullptr, + driver_.launch_kernel(func, grid_dim, 1, 1, block_dim, 1, 1, dynamic_shared_mem_bytes, stream_, nullptr, reinterpret_cast(&config)); } std::free(packed_arg); @@ -199,7 +201,7 @@ void AMDGPUContext::launch(void *func, profiler_->stop(task_handle); if (debug_) { - driver_.stream_synchronize(nullptr); + driver_.stream_synchronize(stream_); } } diff --git a/quadrants/rhi/amdgpu/amdgpu_context.h b/quadrants/rhi/amdgpu/amdgpu_context.h index 269106b077..083406c3f9 100644 --- a/quadrants/rhi/amdgpu/amdgpu_context.h +++ b/quadrants/rhi/amdgpu/amdgpu_context.h @@ -24,6 +24,7 @@ class AMDGPUContext { AMDGPUDriver &driver_; bool debug_{false}; bool supports_mem_pool_{false}; + static thread_local void *stream_; public: AMDGPUContext(); @@ -113,6 +114,14 @@ class AMDGPUContext { return std::unique_lock(lock_); } + void set_stream(void *stream) { + stream_ = stream; + } + + void *get_stream() const { + return stream_; + } + static AMDGPUContext &get_instance(); }; diff --git a/quadrants/rhi/amdgpu/amdgpu_device.cpp b/quadrants/rhi/amdgpu/amdgpu_device.cpp index 68c377a73a..280cd9f7e1 100644 --- a/quadrants/rhi/amdgpu/amdgpu_device.cpp +++ b/quadrants/rhi/amdgpu/amdgpu_device.cpp @@ -1,4 +1,5 @@ #include "quadrants/rhi/amdgpu/amdgpu_device.h" +#include "quadrants/rhi/amdgpu/amdgpu_context.h" #include "quadrants/rhi/llvm/device_memory_pool.h" #include "quadrants/jit/jit_module.h" @@ -93,11 +94,12 @@ uint64_t *AmdgpuDevice::allocate_llvm_runtime_memory_jit(const LlvmRuntimeAllocP // the kernel without writing to *result. To detect that here, zero the slot first so a null readback unambiguously // means "allocation failed" and we can surface a helpful host-side message instead of letting the downstream // hipMemset trip on the stale pointer with a cryptic hipErrorInvalidValue. + void *active_stream = AMDGPUContext::get_instance().get_stream(); uint64 zero = 0; - AMDGPUDriver::get_instance().memcpy_host_to_device(params.result_buffer, &zero, sizeof(uint64)); + AMDGPUDriver::get_instance().memcpy_host_to_device_async(params.result_buffer, &zero, sizeof(uint64), active_stream); params.runtime_jit->call("runtime_memory_allocate_aligned", params.runtime, params.size, quadrants_page_size, params.result_buffer); - AMDGPUDriver::get_instance().stream_synchronize(nullptr); + AMDGPUDriver::get_instance().stream_synchronize(active_stream); uint64 *ret{nullptr}; AMDGPUDriver::get_instance().memcpy_device_to_host(&ret, params.result_buffer, sizeof(uint64)); QD_ERROR_IF(ret == nullptr, @@ -123,7 +125,7 @@ void AmdgpuDevice::dealloc_memory(DeviceAllocation handle) { } QD_ASSERT(!info.is_imported); if (info.use_memory_pool) { - AMDGPUDriver::get_instance().mem_free_async(info.ptr, nullptr); + AMDGPUDriver::get_instance().mem_free(info.ptr); } else if (info.use_cached) { DeviceMemoryPool::get_instance(Arch::amdgpu, false /*merge_upon_release*/) .release(info.size, (uint64_t *)info.ptr, false); diff --git a/quadrants/rhi/amdgpu/amdgpu_driver_functions.inc.h b/quadrants/rhi/amdgpu/amdgpu_driver_functions.inc.h index 5665e4b588..c94a7f14db 100644 --- a/quadrants/rhi/amdgpu/amdgpu_driver_functions.inc.h +++ b/quadrants/rhi/amdgpu/amdgpu_driver_functions.inc.h @@ -15,8 +15,12 @@ PER_AMDGPU_FUNCTION(context_create, hipCtxCreate, void *, int, void *); PER_AMDGPU_FUNCTION(context_set_current, hipCtxSetCurrent, void *); PER_AMDGPU_FUNCTION(context_get_current, hipCtxGetCurrent, void **); +// Device synchronization +PER_AMDGPU_FUNCTION(device_synchronize, hipDeviceSynchronize); + // Stream management -PER_AMDGPU_FUNCTION(stream_create, hipStreamCreate, void **, uint32); +PER_AMDGPU_FUNCTION(stream_create, hipStreamCreateWithFlags, void **, uint32); +PER_AMDGPU_FUNCTION(stream_destroy, hipStreamDestroy, void *); // Memory management PER_AMDGPU_FUNCTION(memcpy_host_to_device, hipMemcpyHtoD, void *, void *, std::size_t); @@ -27,6 +31,8 @@ PER_AMDGPU_FUNCTION(memcpy_async, hipMemcpyAsync, void *, void *, std::size_t, u PER_AMDGPU_FUNCTION(memcpy_host_to_device_async, hipMemcpyHtoDAsync, void *, void *, std::size_t, void *); PER_AMDGPU_FUNCTION(memcpy_device_to_host_async, hipMemcpyDtoHAsync, void *, void *, std::size_t, void *); PER_AMDGPU_FUNCTION(malloc, hipMalloc, void **, std::size_t); +// hipMallocAsync/hipFreeAsync require ROCm >= 5.4; the AMDGPUDriver wrappers fall back to the synchronous variants +// on devices without memory-pool support. PER_AMDGPU_FUNCTION(malloc_async_impl, hipMallocAsync, void **, std::size_t, void *); PER_AMDGPU_FUNCTION(malloc_managed, hipMallocManaged, void **, std::size_t, uint32); PER_AMDGPU_FUNCTION(memset, hipMemset, void *, uint8, std::size_t); @@ -61,6 +67,7 @@ PER_AMDGPU_FUNCTION(kernel_get_occupancy, hipOccupancyMaxActiveBlocksPerMultipro // Stream management PER_AMDGPU_FUNCTION(stream_synchronize, hipStreamSynchronize, void *); +PER_AMDGPU_FUNCTION(stream_wait_event, hipStreamWaitEvent, void *, void *, uint32); // Event management PER_AMDGPU_FUNCTION(event_create, hipEventCreateWithFlags, void **, uint32); diff --git a/quadrants/rhi/amdgpu/amdgpu_profiler.cpp b/quadrants/rhi/amdgpu/amdgpu_profiler.cpp index 731d536bca..e963f7df20 100644 --- a/quadrants/rhi/amdgpu/amdgpu_profiler.cpp +++ b/quadrants/rhi/amdgpu/amdgpu_profiler.cpp @@ -59,8 +59,9 @@ void KernelProfilerAMDGPU::trace(KernelProfilerBase::TaskHandle &task_handle, } void KernelProfilerAMDGPU::stop(KernelProfilerBase::TaskHandle handle) { - AMDGPUDriver::get_instance().event_record(handle, 0); - AMDGPUDriver::get_instance().stream_synchronize(nullptr); + void *active_stream = AMDGPUContext::get_instance().get_stream(); + AMDGPUDriver::get_instance().event_record(handle, active_stream); + AMDGPUDriver::get_instance().stream_synchronize(active_stream); // get elapsed time and destroy events auto record = event_toolkit_->get_current_event_record(); @@ -154,7 +155,8 @@ KernelProfilerBase::TaskHandle EventToolkitAMDGPU::start_with_handle(const std:: AMDGPUDriver::get_instance().event_create(&(record.start_event), HIP_EVENT_DEFAULT); AMDGPUDriver::get_instance().event_create(&(record.stop_event), HIP_EVENT_DEFAULT); - AMDGPUDriver::get_instance().event_record((record.start_event), 0); + void *active_stream = AMDGPUContext::get_instance().get_stream(); + AMDGPUDriver::get_instance().event_record((record.start_event), active_stream); event_records_.push_back(record); if (!base_event_) { @@ -163,7 +165,7 @@ KernelProfilerBase::TaskHandle EventToolkitAMDGPU::start_with_handle(const std:: for (int i = 0; i < n_iters; i++) { void *e; AMDGPUDriver::get_instance().event_create(&e, HIP_EVENT_DEFAULT); - AMDGPUDriver::get_instance().event_record(e, 0); + AMDGPUDriver::get_instance().event_record(e, active_stream); AMDGPUDriver::get_instance().event_synchronize(e); auto final_t = Time::get_time(); if (i == n_iters - 1) { diff --git a/quadrants/rhi/cuda/cuda_driver_functions.inc.h b/quadrants/rhi/cuda/cuda_driver_functions.inc.h index 55c5e3e0b8..b4164b7c33 100644 --- a/quadrants/rhi/cuda/cuda_driver_functions.inc.h +++ b/quadrants/rhi/cuda/cuda_driver_functions.inc.h @@ -53,6 +53,9 @@ PER_CUDA_FUNCTION(kernel_get_occupancy, cuOccupancyMaxActiveBlocksPerMultiproces PER_CUDA_FUNCTION(kernel_set_attribute, cuFuncSetAttribute, void *, CUfunction_attribute_enum, int); +// Context management +PER_CUDA_FUNCTION(context_synchronize, cuCtxSynchronize); + // Stream management PER_CUDA_FUNCTION(stream_synchronize, cuStreamSynchronize, void *); PER_CUDA_FUNCTION(stream_wait_event, cuStreamWaitEvent, void *, void *, uint32); diff --git a/quadrants/runtime/amdgpu/kernel_launcher.cpp b/quadrants/runtime/amdgpu/kernel_launcher.cpp index 44fc2e65c9..30c75a49ee 100644 --- a/quadrants/runtime/amdgpu/kernel_launcher.cpp +++ b/quadrants/runtime/amdgpu/kernel_launcher.cpp @@ -134,7 +134,8 @@ void KernelLauncher::launch_offloaded_tasks_with_do_while(LaunchContextBuilder & do { launch_offloaded_tasks(ctx, amdgpu_module, offloaded_tasks, context_pointer, arg_size); counter_val = 0; - AMDGPUDriver::get_instance().stream_synchronize(nullptr); + auto *stream = AMDGPUContext::get_instance().get_stream(); + AMDGPUDriver::get_instance().stream_synchronize(stream); AMDGPUDriver::get_instance().memcpy_device_to_host(&counter_val, ctx.graph_do_while_flag_dev_ptr, sizeof(int32_t)); } while (counter_val != 0); } @@ -162,6 +163,8 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx std::unordered_map, ArgArrayPtrKeyHasher> transfers; std::unordered_map device_ptrs; + auto *active_stream = AMDGPUContext::get_instance().get_stream(); + char *device_result_buffer{nullptr}; // Here we have to guarantee the result_result_buffer isn't nullptr // It is interesting - The code following @@ -171,7 +174,8 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx // Memory access fault by GPU node-1 (Agent handle: 0xeda5ca0) on address // (nil). Reason: Page not present or supervisor privilege. // if you don't allocate it. - AMDGPUDriver::get_instance().malloc((void **)&device_result_buffer, std::max(ctx.result_buffer_size, sizeof(uint64))); + AMDGPUDriver::get_instance().malloc_async((void **)&device_result_buffer, + std::max(ctx.result_buffer_size, sizeof(uint64)), active_stream); for (int i = 0; i < (int)parameters.size(); i++) { const auto &kv = parameters[i]; @@ -197,14 +201,16 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx device_ptrs[data_ptr_idx] = executor->get_device_alloc_info_ptr(devalloc); transfers[data_ptr_idx] = {data_ptr, devalloc}; - AMDGPUDriver::get_instance().memcpy_host_to_device((void *)device_ptrs[data_ptr_idx], data_ptr, arr_sz); + AMDGPUDriver::get_instance().memcpy_host_to_device_async((void *)device_ptrs[data_ptr_idx], data_ptr, arr_sz, + active_stream); if (grad_ptr != nullptr) { DeviceAllocation grad_devalloc = executor->allocate_memory_on_device(arr_sz, (uint64 *)device_result_buffer); device_ptrs[grad_ptr_idx] = executor->get_device_alloc_info_ptr(grad_devalloc); transfers[grad_ptr_idx] = {grad_ptr, grad_devalloc}; - AMDGPUDriver::get_instance().memcpy_host_to_device((void *)device_ptrs[grad_ptr_idx], grad_ptr, arr_sz); + AMDGPUDriver::get_instance().memcpy_host_to_device_async((void *)device_ptrs[grad_ptr_idx], grad_ptr, + arr_sz, active_stream); } else { device_ptrs[grad_ptr_idx] = nullptr; } @@ -234,24 +240,24 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx } } if (transfers.size() > 0) { - AMDGPUDriver::get_instance().stream_synchronize(nullptr); + AMDGPUDriver::get_instance().stream_synchronize(active_stream); } char *host_result_buffer = (char *)ctx.get_context().result_buffer; if (ctx.result_buffer_size > 0) { - // Malloc_Async and Free_Async are available after ROCm 5.4 ctx.get_context().result_buffer = (uint64 *)device_result_buffer; } char *device_arg_buffer = nullptr; if (ctx.arg_buffer_size > 0) { - AMDGPUDriver::get_instance().malloc((void **)&device_arg_buffer, ctx.arg_buffer_size); - AMDGPUDriver::get_instance().memcpy_host_to_device(device_arg_buffer, ctx.get_context().arg_buffer, - ctx.arg_buffer_size); + AMDGPUDriver::get_instance().malloc_async((void **)&device_arg_buffer, ctx.arg_buffer_size, active_stream); + AMDGPUDriver::get_instance().memcpy_host_to_device_async(device_arg_buffer, ctx.get_context().arg_buffer, + ctx.arg_buffer_size, active_stream); ctx.get_context().arg_buffer = device_arg_buffer; } void *context_pointer; int arg_size = sizeof(RuntimeContext *); - AMDGPUDriver::get_instance().malloc((void **)&context_pointer, sizeof(RuntimeContext)); - AMDGPUDriver::get_instance().memcpy_host_to_device(context_pointer, &ctx.get_context(), sizeof(RuntimeContext)); + AMDGPUDriver::get_instance().malloc_async((void **)&context_pointer, sizeof(RuntimeContext), active_stream); + AMDGPUDriver::get_instance().memcpy_host_to_device_async(context_pointer, &ctx.get_context(), sizeof(RuntimeContext), + active_stream); if (ctx.graph_do_while_arg_id >= 0) { QD_ASSERT(ctx.graph_do_while_flag_dev_ptr); @@ -261,31 +267,33 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx } QD_TRACE("Launching kernel"); if (ctx.arg_buffer_size > 0) { - AMDGPUDriver::get_instance().mem_free(device_arg_buffer); + AMDGPUDriver::get_instance().mem_free_async(device_arg_buffer, active_stream); } if (ctx.result_buffer_size > 0) { - AMDGPUDriver::get_instance().memcpy_device_to_host(host_result_buffer, device_result_buffer, - ctx.result_buffer_size); + AMDGPUDriver::get_instance().memcpy_device_to_host_async(host_result_buffer, device_result_buffer, + ctx.result_buffer_size, active_stream); } - if (transfers.size()) { + AMDGPUDriver::get_instance().mem_free_async(device_result_buffer, active_stream); + if (transfers.size() > 0) { + AMDGPUDriver::get_instance().stream_synchronize(active_stream); for (auto itr = transfers.begin(); itr != transfers.end(); itr++) { auto &idx = itr->first; - auto arg_id = idx.arg_id; - AMDGPUDriver::get_instance().memcpy_device_to_host(itr->second.first, (void *)device_ptrs[idx], - ctx.array_runtime_sizes[arg_id]); + AMDGPUDriver::get_instance().memcpy_device_to_host_async(itr->second.first, (void *)device_ptrs[idx], + ctx.array_runtime_sizes[idx.arg_id], active_stream); + } + AMDGPUDriver::get_instance().stream_synchronize(active_stream); + for (auto itr = transfers.begin(); itr != transfers.end(); itr++) { executor->deallocate_memory_on_device(itr->second.second); } + } else if (ctx.result_buffer_size > 0) { + AMDGPUDriver::get_instance().stream_synchronize(active_stream); } - // Since we always allocating above then we should always free - AMDGPUDriver::get_instance().mem_free(device_result_buffer); - // Free the per-launch `RuntimeContext` device allocation. `hipFree` synchronizes implicitly with pending - // kernels on the device, so this is safe even when the launches above were asynchronous. Done here rather - // than through `AMDGPUContext`'s deferred free list because that list used to be drained by - // `LlvmRuntimeExecutor::synchronize`, which is also called from `fetch_result_uint64` during - // `ensure_adstack_heap`'s field-pointer query -- that path would hipFree `context_pointer` mid-launch, and - // HIP could recycle the freed address for the adstack heap allocated right after, clobbering the - // `RuntimeContext` the next task still reads from. - AMDGPUDriver::get_instance().mem_free(context_pointer); + // Free the per-launch `RuntimeContext` on the active stream rather than through `AMDGPUContext`'s deferred free + // list. The deferred list is drained by `LlvmRuntimeExecutor::synchronize`, which is also called from + // `fetch_result_uint64` during `ensure_adstack_heap`'s field-pointer query -- that path would free + // `context_pointer` mid-launch, and HIP could recycle the address for the adstack heap allocated right after, + // clobbering the `RuntimeContext` the next task still reads from. + AMDGPUDriver::get_instance().mem_free_async(context_pointer, active_stream); } KernelLauncher::Handle KernelLauncher::register_llvm_kernel(const LLVM::CompiledKernelData &compiled) { diff --git a/quadrants/runtime/llvm/llvm_runtime_executor.cpp b/quadrants/runtime/llvm/llvm_runtime_executor.cpp index 658c139c0f..5ed41012f6 100644 --- a/quadrants/runtime/llvm/llvm_runtime_executor.cpp +++ b/quadrants/runtime/llvm/llvm_runtime_executor.cpp @@ -196,13 +196,15 @@ void LlvmRuntimeExecutor::print_list_manager_info(void *list_manager, uint64 *re void LlvmRuntimeExecutor::synchronize() { if (config_.arch == Arch::cuda) { #if defined(QD_WITH_CUDA) - CUDADriver::get_instance().stream_synchronize(nullptr); + CUDAContext::get_instance().make_current(); + CUDADriver::get_instance().context_synchronize(); #else QD_ERROR("No CUDA support"); #endif } else if (config_.arch == Arch::amdgpu) { #if defined(QD_WITH_AMDGPU) - AMDGPUDriver::get_instance().stream_synchronize(nullptr); + AMDGPUContext::get_instance().make_current(); + AMDGPUDriver::get_instance().device_synchronize(); #else QD_ERROR("No AMDGPU support"); #endif diff --git a/tests/python/test_streams.py b/tests/python/test_streams.py index db7588aaf7..f5a4d8f6d0 100644 --- a/tests/python/test_streams.py +++ b/tests/python/test_streams.py @@ -9,7 +9,7 @@ from tests import test_utils -@test_utils.test(arch=[qd.cuda]) +@test_utils.test(arch=[qd.cuda, qd.amdgpu]) def test_create_and_destroy_stream(): s = qd.create_stream() assert isinstance(s, Stream) @@ -18,7 +18,7 @@ def test_create_and_destroy_stream(): assert s.handle == 0 -@test_utils.test(arch=[qd.cuda]) +@test_utils.test(arch=[qd.cuda, qd.amdgpu]) def test_create_and_destroy_event(): e = qd.create_event() assert isinstance(e, Event) @@ -198,6 +198,85 @@ def fill(arr: qd.types.ndarray(dtype=qd.f32, ndim=1)): s.destroy() +@test_utils.test() +def test_concurrent_streams_with_events(): + """Two slow kernels on separate streams run concurrently (~1s on GPU), serial fallback on CPU/Metal.""" + SPIN_ITERS = 5_000_000 + + @qd.kernel + def slow_fill( + a: qd.types.ndarray(dtype=qd.f32, ndim=1), + lcg_state: qd.types.ndarray(dtype=qd.i32, ndim=1), + index: qd.i32, + value: qd.f32, + ): + qd.loop_config(block_dim=1) + for _ in range(1): + x = lcg_state[index] + for _j in range(SPIN_ITERS): + x = (1664525 * x + 1013904223) % 2147483647 + lcg_state[index] = x + a[index] = value + + @qd.kernel + def add_first_two(a: qd.types.ndarray(dtype=qd.f32, ndim=1)): + qd.loop_config(block_dim=1) + for _ in range(1): + a[2] = a[0] + a[1] + + import time + + # Warm up JIT + a_warmup = qd.ndarray(qd.f32, shape=(3,)) + lcg_warmup = qd.ndarray(qd.i32, shape=(3,)) + slow_fill(a_warmup, lcg_warmup, 0, 0.0) + add_first_two(a_warmup) + qd.sync() + + # Serial baseline + a = qd.ndarray(qd.f32, shape=(3,)) + lcg = qd.ndarray(qd.i32, shape=(3,)) + qd.sync() + t0 = time.perf_counter() + slow_fill(a, lcg, 0, 5.0) + slow_fill(a, lcg, 1, 7.0) + add_first_two(a) + qd.sync() + serial_time = time.perf_counter() - t0 + assert np.isclose(a.to_numpy()[2], 12.0) + + # Streams + a = qd.ndarray(qd.f32, shape=(3,)) + lcg = qd.ndarray(qd.i32, shape=(3,)) + s1 = qd.create_stream() + s2 = qd.create_stream() + e1 = qd.create_event() + e2 = qd.create_event() + qd.sync() + t0 = time.perf_counter() + slow_fill(a, lcg, 0, 5.0, qd_stream=s1) + slow_fill(a, lcg, 1, 7.0, qd_stream=s2) + e1.record(s1) + e2.record(s2) + e1.wait() + e2.wait() + add_first_two(a) + qd.sync() + stream_time = time.perf_counter() - t0 + assert np.isclose(a.to_numpy()[2], 12.0) + + speedup = serial_time / stream_time + if qd.lang.impl.current_cfg().arch in (qd.cuda, qd.amdgpu): + assert speedup > 1.5, f"Expected >1.5x speedup, got {speedup:.2f}x" + else: + assert speedup > 0.75, f"Expected >=0.75x (serial fallback), got {speedup:.2f}x" + + s1.destroy() + s2.destroy() + e1.destroy() + e2.destroy() + + @test_utils.test() def test_stream_context_manager(): N = 64