diff --git a/docs/source/user_guide/index.md b/docs/source/user_guide/index.md index f89a01199a..fc84579a67 100644 --- a/docs/source/user_guide/index.md +++ b/docs/source/user_guide/index.md @@ -57,6 +57,7 @@ tile16 fastcache graph +streams perf_dispatch init_options ``` diff --git a/docs/source/user_guide/streams.md b/docs/source/user_guide/streams.md new file mode 100644 index 0000000000..b4b70b774b --- /dev/null +++ b/docs/source/user_guide/streams.md @@ -0,0 +1,138 @@ +# Streams + +Streams allow concurrent execution of GPU operations. By default, all Quadrants kernels launch on the default stream, which serializes everything. By creating explicit streams, you can run independent kernels concurrently and control synchronization with events. + +## Supported platforms + +| Backend | Streams | Events | Notes | +|---------|---------|--------|-------| +| CUDA | Yes | Yes | Full concurrent execution | +| CPU | No-op | No-op | `qd_stream` is silently ignored, kernels run serially | +| Metal | No-op | No-op | `qd_stream` is silently ignored, kernels run serially | +| Vulkan | No-op | No-op | `qd_stream` is silently ignored, kernels run serially | + +On backends without native stream support, `create_stream()` and `create_event()` return objects with handle `0`. All stream/event operations become no-ops and kernels run serially. Code written with streams is portable across all backends in the sense that it will run without modifications, but serially. + +## Creating and using streams + +```python +import quadrants as qd + +qd.init(arch=qd.cuda) + +N = 1024 +a = qd.field(qd.f32, shape=(N,)) +b = qd.field(qd.f32, shape=(N,)) + +@qd.kernel +def fill_a(): + for i in range(N): + a[i] = 1.0 + +@qd.kernel +def fill_b(): + for i in range(N): + b[i] = 2.0 + +s1 = qd.create_stream() +s2 = qd.create_stream() + +fill_a(qd_stream=s1) +fill_b(qd_stream=s2) + +s1.synchronize() +s2.synchronize() + +s1.destroy() +s2.destroy() +``` + +Pass `qd_stream=` to any kernel call to launch it on that stream. Kernels on different streams may execute concurrently. Call `synchronize()` to block until all work on a stream completes. + +## Events + +Events let you express dependencies between streams without full synchronization. + +```python +s1 = qd.create_stream() +s2 = qd.create_stream() + +@qd.kernel +def produce(): + for i in range(N): + a[i] = 10.0 + +@qd.kernel +def consume(): + for i in range(N): + b[i] = a[i] + +produce(qd_stream=s1) + +e = qd.create_event() +e.record(s1) # record when s1 finishes produce() +e.wait(qd_stream=s2) # s2 waits for that event before proceeding + +consume(qd_stream=s2) # safe to read a[] — produce() is guaranteed complete +s2.synchronize() + +e.destroy() +s1.destroy() +s2.destroy() +``` + +`e.record(stream)` captures the point in `stream`'s execution. `e.wait(qd_stream=stream)` makes `stream` wait until the recorded point is reached. If `qd_stream` is omitted, the default stream waits. + +## Context managers + +Streams and events support `with` blocks for automatic cleanup: + +```python +with qd.create_stream() as s: + fill_a(qd_stream=s) + s.synchronize() +# s.destroy() called automatically +``` + +## PyTorch interop (CUDA) + +When mixing Quadrants kernels with PyTorch operations on CUDA, both frameworks must use the same stream to avoid race conditions. Without explicit stream management, Quadrants and PyTorch may launch work on different streams with no ordering guarantees, leading to intermittent data corruption. + +### Running Quadrants kernels on PyTorch's stream + +```python +import torch +from quadrants.lang.stream import Stream + +torch_stream_ptr = torch.cuda.current_stream().cuda_stream +stream = Stream(torch_stream_ptr) + +physics_kernel(qd_stream=stream) +observations = compute_obs_tensor() # PyTorch op on the same stream +apply_actions_kernel(qd_stream=stream) +``` + +Wrap PyTorch's raw `CUstream` pointer in a Quadrants `Stream` object. Do **not** call `destroy()` on this wrapper — PyTorch owns the underlying stream. + +### Running PyTorch operations on a Quadrants stream + +```python +qd_stream = qd.create_stream() +torch_stream = torch.cuda.ExternalStream(qd_stream.handle) + +with torch.cuda.stream(torch_stream): + physics_kernel(qd_stream=qd_stream) + observations = compute_obs_tensor() + apply_actions_kernel(qd_stream=qd_stream) + +qd_stream.destroy() +``` + +`Stream.handle` is the raw `CUstream` pointer, which `torch.cuda.ExternalStream` accepts directly. + +## Limitations + +- **Not compatible with graphs.** Do not pass `qd_stream` to a kernel decorated with `graph=True`. +- **Not compatible with autodiff.** Do not pass `qd_stream` to a kernel that uses reverse-mode or forward-mode differentiation, or inside a `qd.ad.Tape` context. +- **`qd.sync()` only waits on the default stream.** It does not drain explicit streams. Call `stream.synchronize()` on each stream you need to wait for. +- **No automatic synchronization.** You are responsible for inserting events or `synchronize()` calls when one stream's output is another stream's input. diff --git a/python/quadrants/lang/__init__.py b/python/quadrants/lang/__init__.py index 2fd0f8dd3f..12773e45c6 100644 --- a/python/quadrants/lang/__init__.py +++ b/python/quadrants/lang/__init__.py @@ -16,6 +16,7 @@ from quadrants.lang.runtime_ops import * from quadrants.lang.snode import * from quadrants.lang.source_builder import * +from quadrants.lang.stream import * from quadrants.lang.struct import * from quadrants.types.enums import DeviceCapability, Format, Layout # noqa: F401 @@ -47,6 +48,7 @@ "shell", "snode", "source_builder", + "stream", "struct", "util", ] diff --git a/python/quadrants/lang/kernel.py b/python/quadrants/lang/kernel.py index 5ebd5ff70d..0b45a5816b 100644 --- a/python/quadrants/lang/kernel.py +++ b/python/quadrants/lang/kernel.py @@ -453,7 +453,9 @@ def materialize(self, key: "CompiledKernelKeyType | None", py_args: tuple[Any, . ] runtime._current_global_context = None - def launch_kernel(self, key, t_kernel: KernelCxx, compiled_kernel_data: CompiledKernelData | None, *args) -> Any: + def launch_kernel( + self, key, t_kernel: KernelCxx, compiled_kernel_data: CompiledKernelData | None, *args, qd_stream=None + ) -> Any: assert len(args) == len(self.arg_metas), f"{len(self.arg_metas)} arguments needed but {len(args)} provided" callbacks: list[Callable[[], None]] = [] @@ -567,9 +569,21 @@ def launch_kernel(self, key, t_kernel: KernelCxx, compiled_kernel_data: Compiled self.src_ll_cache_observations.cache_stored = True self._last_compiled_kernel_data = compiled_kernel_data launch_ctx.use_graph = self.use_graph and _GRAPH_ENABLED + if self.use_graph and qd_stream is not None: + raise RuntimeError( + "qd_stream is not compatible with graph=True kernels. " + "See docs/source/user_guide/streams.md for details." + ) if self.graph_do_while_arg is not None and hasattr(self, "_graph_do_while_cpp_arg_id"): launch_ctx.graph_do_while_arg_id = self._graph_do_while_cpp_arg_id - prog.launch_kernel(compiled_kernel_data, launch_ctx) + stream_handle = qd_stream.handle if qd_stream is not None else 0 + if stream_handle: + prog.set_current_cuda_stream(stream_handle) + try: + prog.launch_kernel(compiled_kernel_data, launch_ctx) + finally: + if stream_handle: + prog.set_current_cuda_stream(0) except Exception as e: e = handle_exception_from_cpp(e) if impl.get_runtime().print_full_traceback: @@ -581,6 +595,8 @@ def launch_kernel(self, key, t_kernel: KernelCxx, compiled_kernel_data: Compiled return_type = self.return_type if return_type or self.has_print: + if qd_stream is not None and self.has_print and not return_type: + qd_stream.synchronize() runtime_ops.sync() if not return_type: @@ -647,6 +663,17 @@ def ensure_compiled(self, *py_args: tuple[Any, ...]) -> tuple[Callable, int, Aut # Thus this part needs to be fast. (i.e. < 3us on a 4 GHz x64 CPU) @_shell_pop_print def __call__(self, *py_args, **kwargs) -> Any: + qd_stream = kwargs.pop("qd_stream", None) + if qd_stream is not None and self.autodiff_mode != _NONE: + raise RuntimeError( + "qd_stream is not compatible with autodiff kernels. Streams cannot be used with " + "reverse-mode or forward-mode differentiation." + ) + if qd_stream is not None and self.runtime.target_tape: + raise RuntimeError( + "qd_stream is not compatible with autograd Tape. Launch the kernel outside the Tape " + "context, or omit qd_stream." + ) if impl.get_runtime()._arch == _ARCH_PYTHON: return self.func(*py_args, **kwargs) config = impl.current_cfg() @@ -709,7 +736,7 @@ def __call__(self, *py_args, **kwargs) -> Any: kernel_cpp = self.materialized_kernels[key] compiled_kernel_data = self.compiled_kernel_data_by_key.get(key, None) self.launch_observations.found_kernel_in_materialize_cache = compiled_kernel_data is not None - ret = self.launch_kernel(key, kernel_cpp, compiled_kernel_data, *py_args) + ret = self.launch_kernel(key, kernel_cpp, compiled_kernel_data, *py_args, qd_stream=qd_stream) if compiled_kernel_data is None: assert self._last_compiled_kernel_data is not None self.compiled_kernel_data_by_key[key] = self._last_compiled_kernel_data diff --git a/python/quadrants/lang/runtime_ops.py b/python/quadrants/lang/runtime_ops.py index 0ecd122f56..71919e2379 100644 --- a/python/quadrants/lang/runtime_ops.py +++ b/python/quadrants/lang/runtime_ops.py @@ -4,8 +4,10 @@ def sync(): - """Blocks the calling thread until all the previously - launched Quadrants kernels have completed. + """Synchronizes the default stream. + + Blocks the calling thread until all work on the default GPU stream has completed. Kernels launched on explicit + streams created via :func:`quadrants.create_stream` are **not** waited on — call ``stream.synchronize()`` for those. """ impl.get_runtime().sync() diff --git a/python/quadrants/lang/stream.py b/python/quadrants/lang/stream.py new file mode 100644 index 0000000000..e87816568c --- /dev/null +++ b/python/quadrants/lang/stream.py @@ -0,0 +1,177 @@ +import weakref + +from quadrants.lang import impl + + +def _get_prog_weakref(): + return weakref.ref(impl.get_runtime().prog) + + +class Stream: + """Wraps a backend-specific GPU stream for concurrent kernel execution. + + On backends without native streams (e.g. CPU), this is a no-op object. Call destroy() explicitly or use as + a context manager to ensure cleanup. + """ + + def __init__(self, handle: int, prog_ref: weakref.ref | None = None): + self._handle = handle + self._prog_ref = prog_ref + + @property + def handle(self) -> int: + return self._handle + + def _prog(self): + """Resolve the owning Program, or None if the owner was collected.""" + if self._prog_ref is not None: + return self._prog_ref() + return impl.get_runtime().prog + + def synchronize(self): + """Block until all operations on this stream complete.""" + prog = self._prog() + if prog is None: + raise RuntimeError("Stream's owning Program has been destroyed (e.g. after qd.reset())") + prog.stream_synchronize(self._handle) + + def _destroy_prog(self): + """Resolve a Program for resource cleanup. + + Falls back to the current runtime when the owner has been collected, which is safe because CUDAContext is a + singleton so the CUDA stream handle remains valid. + """ + prog = self._prog() + if prog is None: + try: + return impl.get_runtime().prog + except Exception: + return None + return prog + + def destroy(self): + """Explicitly destroy the stream. Safe to call multiple times. + + No-op for streams wrapping external handles (created via Stream(ptr) without a prog_ref). + """ + if self._handle != 0 and self._prog_ref is not None: + prog = self._destroy_prog() + if prog is not None: + prog.stream_destroy(self._handle) + self._handle = 0 + + def __del__(self): + if self._handle != 0 and self._prog_ref is not None: + prog = self._destroy_prog() + if prog is not None: + try: + prog.stream_destroy(self._handle) + self._handle = 0 + except Exception: + pass + + def __enter__(self): + return self + + def __exit__(self, *args): + self.destroy() + + +class Event: + """Wraps a backend-specific GPU event for stream synchronization. + + On backends without native events (e.g. CPU), this is a no-op object. Call destroy() explicitly or use as + a context manager to ensure cleanup. + """ + + def __init__(self, handle: int, prog_ref: weakref.ref | None = None): + self._handle = handle + self._prog_ref = prog_ref + + @property + def handle(self) -> int: + return self._handle + + def _prog(self): + """Resolve the owning Program, or None if the owner was collected.""" + if self._prog_ref is not None: + return self._prog_ref() + return impl.get_runtime().prog + + def _require_prog(self): + prog = self._prog() + if prog is None: + raise RuntimeError("Event's owning Program has been destroyed (e.g. after qd.reset())") + return prog + + def record(self, qd_stream: Stream | None = None): + """Record this event on a stream. None means the default stream.""" + stream_handle = qd_stream.handle if qd_stream is not None else 0 + self._require_prog().event_record(self._handle, stream_handle) + + def wait(self, qd_stream: Stream | None = None): + """Make a stream wait for this event. None means the default stream.""" + stream_handle = qd_stream.handle if qd_stream is not None else 0 + self._require_prog().stream_wait_event(stream_handle, self._handle) + + def synchronize(self): + """Block the host until this event has been reached.""" + self._require_prog().event_synchronize(self._handle) + + def _destroy_prog(self): + """Resolve a Program for resource cleanup. + + Falls back to the current runtime when the owner has been collected, which is safe because CUDAContext is a + singleton so the CUDA event handle remains valid. + """ + prog = self._prog() + if prog is None: + try: + return impl.get_runtime().prog + except Exception: + return None + return prog + + def destroy(self): + """Explicitly destroy the event. Safe to call multiple times. + + No-op for events wrapping external handles (created via Event(ptr) without a prog_ref). + """ + if self._handle != 0 and self._prog_ref is not None: + prog = self._destroy_prog() + if prog is not None: + prog.event_destroy(self._handle) + self._handle = 0 + + def __del__(self): + if self._handle != 0 and self._prog_ref is not None: + prog = self._destroy_prog() + if prog is not None: + try: + prog.event_destroy(self._handle) + self._handle = 0 + except Exception: + pass + + def __enter__(self): + return self + + def __exit__(self, *args): + self.destroy() + + +def create_stream() -> Stream: + """Create a new GPU stream for concurrent kernel execution.""" + prog = impl.get_runtime().prog + handle = prog.stream_create() + return Stream(handle, _get_prog_weakref()) + + +def create_event() -> Event: + """Create a new GPU event for stream synchronization.""" + prog = impl.get_runtime().prog + handle = prog.event_create() + return Event(handle, _get_prog_weakref()) + + +__all__ = ["Stream", "Event", "create_stream", "create_event"] diff --git a/quadrants/program/program.cpp b/quadrants/program/program.cpp index 8f6fdb2186..ff9901add5 100644 --- a/quadrants/program/program.cpp +++ b/quadrants/program/program.cpp @@ -60,6 +60,7 @@ Program::Program(Arch desired_arch) : snode_rw_accessors_bank_(this) { config = default_compile_config; config.arch = desired_arch; config.fit(); + stream_manager_ = StreamManager(config.arch); profiler = make_profiler(config.arch, config.kernel_profiler); if (arch_uses_llvm(config.arch)) { diff --git a/quadrants/program/program.h b/quadrants/program/program.h index 8cbf8e817d..600533f1cf 100644 --- a/quadrants/program/program.h +++ b/quadrants/program/program.h @@ -21,6 +21,7 @@ #include "quadrants/program/kernel_profiler.h" #include "quadrants/program/snode_expr_utils.h" #include "quadrants/program/snode_rw_accessors_bank.h" +#include "quadrants/program/program_stream.h" #include "quadrants/program/context.h" #include "quadrants/struct/snode_tree.h" #include "quadrants/system/threading.h" @@ -319,6 +320,10 @@ class QD_DLL_EXPORT Program { return ndarrays_.size(); } + StreamManager &stream_manager() { + return stream_manager_; + } + // TODO(zhanlue): Move these members and corresponding interfaces to // ProgramImpl Ideally, Program should serve as a pure interface class and all // the implementations should fall inside ProgramImpl @@ -328,6 +333,7 @@ class QD_DLL_EXPORT Program { private: CompileConfig compile_config_; + StreamManager stream_manager_{Arch::x64}; // re-initialized in constructor after arch is known uint64 ndarray_writer_counter_{0}; uint64 ndarray_reader_counter_{0}; diff --git a/quadrants/program/program_stream.cpp b/quadrants/program/program_stream.cpp new file mode 100644 index 0000000000..8a7431532a --- /dev/null +++ b/quadrants/program/program_stream.cpp @@ -0,0 +1,105 @@ +// StreamManager implementation and Program delegation. + +#include "program_stream.h" + +#ifdef QD_WITH_CUDA +#include "quadrants/rhi/cuda/cuda_driver.h" +#include "quadrants/rhi/cuda/cuda_context.h" +#endif + +namespace quadrants::lang { + +// --------------------------------------------------------------------------- +// StreamManager +// --------------------------------------------------------------------------- + +uint64 StreamManager::create_stream() { +#ifdef QD_WITH_CUDA + if (arch_ == Arch::cuda) { + CUDAContext::get_instance().make_current(); + void *stream = nullptr; + CUDADriver::get_instance().stream_create(&stream, 0x1 /*CU_STREAM_NON_BLOCKING*/); + return reinterpret_cast(stream); + } +#endif + return 0; +} + +void StreamManager::destroy_stream(uint64 stream_handle) { +#ifdef QD_WITH_CUDA + if (arch_ == Arch::cuda && stream_handle != 0) { + CUDAContext::get_instance().make_current(); + CUDADriver::get_instance().stream_destroy(reinterpret_cast(stream_handle)); + } +#endif +} + +void StreamManager::synchronize_stream(uint64 stream_handle) { +#ifdef QD_WITH_CUDA + if (arch_ == Arch::cuda) { + CUDAContext::get_instance().make_current(); + CUDADriver::get_instance().stream_synchronize(reinterpret_cast(stream_handle)); + } +#endif +} + +void StreamManager::set_current_stream(uint64 stream_handle) { +#ifdef QD_WITH_CUDA + if (arch_ == Arch::cuda) { + CUDAContext::get_instance().make_current(); + CUDAContext::get_instance().set_stream(reinterpret_cast(stream_handle)); + } +#endif +} + +uint64 StreamManager::create_event() { +#ifdef QD_WITH_CUDA + if (arch_ == Arch::cuda) { + CUDAContext::get_instance().make_current(); + void *event = nullptr; + CUDADriver::get_instance().event_create(&event, 0x02 /*CU_EVENT_DISABLE_TIMING*/); + return reinterpret_cast(event); + } +#endif + return 0; +} + +void StreamManager::destroy_event(uint64 event_handle) { +#ifdef QD_WITH_CUDA + if (arch_ == Arch::cuda && event_handle != 0) { + CUDAContext::get_instance().make_current(); + CUDADriver::get_instance().event_destroy(reinterpret_cast(event_handle)); + } +#endif +} + +void StreamManager::record_event(uint64 event_handle, uint64 stream_handle) { +#ifdef QD_WITH_CUDA + if (arch_ == Arch::cuda && event_handle != 0) { + CUDAContext::get_instance().make_current(); + CUDADriver::get_instance().event_record(reinterpret_cast(event_handle), + reinterpret_cast(stream_handle)); + } +#endif +} + +void StreamManager::synchronize_event(uint64 event_handle) { +#ifdef QD_WITH_CUDA + if (arch_ == Arch::cuda && event_handle != 0) { + CUDAContext::get_instance().make_current(); + CUDADriver::get_instance().event_synchronize(reinterpret_cast(event_handle)); + } +#endif +} + +void StreamManager::stream_wait_event(uint64 stream_handle, uint64 event_handle) { +#ifdef QD_WITH_CUDA + if (arch_ == Arch::cuda && event_handle != 0) { + CUDAContext::get_instance().make_current(); + CUDADriver::get_instance().stream_wait_event(reinterpret_cast(stream_handle), + reinterpret_cast(event_handle), 0 /*flags*/); + } +#endif +} + +} // namespace quadrants::lang diff --git a/quadrants/program/program_stream.h b/quadrants/program/program_stream.h new file mode 100644 index 0000000000..69265c26b3 --- /dev/null +++ b/quadrants/program/program_stream.h @@ -0,0 +1,31 @@ +// StreamManager — manages CUDA stream and event lifecycle, isolated from Program so that backend-specific GPU +// plumbing does not pollute the core Program interface. + +#pragma once + +#include "quadrants/common/core.h" +#include "quadrants/util/lang_util.h" + +namespace quadrants::lang { + +class StreamManager { + public: + explicit StreamManager(Arch arch) : arch_(arch) { + } + + uint64 create_stream(); + void destroy_stream(uint64 stream_handle); + void synchronize_stream(uint64 stream_handle); + void set_current_stream(uint64 stream_handle); + + uint64 create_event(); + void destroy_event(uint64 event_handle); + void record_event(uint64 event_handle, uint64 stream_handle); + void synchronize_event(uint64 event_handle); + void stream_wait_event(uint64 stream_handle, uint64 event_handle); + + private: + Arch arch_; +}; + +} // namespace quadrants::lang diff --git a/quadrants/python/export.h b/quadrants/python/export.h index 331c35b4b6..92736daedf 100644 --- a/quadrants/python/export.h +++ b/quadrants/python/export.h @@ -21,6 +21,10 @@ #include "quadrants/common/core.h" +namespace quadrants::lang { +class Program; +} // namespace quadrants::lang + namespace quadrants { namespace py = pybind11; @@ -33,4 +37,6 @@ void export_math(py::module &m); void export_misc(py::module &m); +void export_stream(py::module &m, py::class_ &program_class); + } // namespace quadrants diff --git a/quadrants/python/export_lang.cpp b/quadrants/python/export_lang.cpp index 61f9c39a37..b3dc79bef5 100644 --- a/quadrants/python/export_lang.cpp +++ b/quadrants/python/export_lang.cpp @@ -314,8 +314,8 @@ void export_lang(py::module &m) { auto compiled_kernel_data = py::class_(m, "CompiledKernelData") .def("_debug_dump_to_string", &CompiledKernelData::debug_dump_to_string); - py::class_(m, "Program") - .def(py::init<>()) + auto program_class = py::class_(m, "Program"); + program_class.def(py::init<>()) .def( "ndarray_to_dlpack", [](Program *program, pybind11::object owner, Ndarray *ndarray, const std::vector &layout, @@ -411,6 +411,7 @@ void export_lang(py::module &m) { .def("get_num_offloaded_tasks_on_last_call", &Program::get_num_offloaded_tasks_on_last_call) .def("get_graph_num_nodes_on_last_call", &Program::get_graph_num_nodes_on_last_call) .def("get_graph_total_builds", &Program::get_graph_total_builds); + export_stream(m, program_class); py::class_(m, "CompileResult") .def_property_readonly( diff --git a/quadrants/python/export_stream.cpp b/quadrants/python/export_stream.cpp new file mode 100644 index 0000000000..66b3c8a3d7 --- /dev/null +++ b/quadrants/python/export_stream.cpp @@ -0,0 +1,25 @@ +/******************************************************************************* + Copyright (c) The Quadrants Authors (2016- ). All Rights Reserved. + The use of this software is governed by the LICENSE file. +*******************************************************************************/ + +#include "quadrants/python/export.h" +#include "quadrants/program/program.h" + +namespace quadrants { + +void export_stream(py::module &m, py::class_ &program_class) { + using lang::Program; + program_class.def("stream_create", [](Program *p) { return p->stream_manager().create_stream(); }) + .def("stream_destroy", [](Program *p, uint64 h) { p->stream_manager().destroy_stream(h); }) + .def("stream_synchronize", [](Program *p, uint64 h) { p->stream_manager().synchronize_stream(h); }) + .def("set_current_cuda_stream", [](Program *p, uint64 h) { p->stream_manager().set_current_stream(h); }) + .def("event_create", [](Program *p) { return p->stream_manager().create_event(); }) + .def("event_destroy", [](Program *p, uint64 h) { p->stream_manager().destroy_event(h); }) + .def("event_record", [](Program *p, uint64 eh, uint64 sh) { p->stream_manager().record_event(eh, sh); }) + .def("event_synchronize", [](Program *p, uint64 h) { p->stream_manager().synchronize_event(h); }) + .def("stream_wait_event", + [](Program *p, uint64 sh, uint64 eh) { p->stream_manager().stream_wait_event(sh, eh); }); +} + +} // namespace quadrants diff --git a/quadrants/rhi/cuda/cuda_context.cpp b/quadrants/rhi/cuda/cuda_context.cpp index 595a7bebff..16d02a345b 100644 --- a/quadrants/rhi/cuda/cuda_context.cpp +++ b/quadrants/rhi/cuda/cuda_context.cpp @@ -11,7 +11,9 @@ namespace quadrants::lang { -CUDAContext::CUDAContext() : profiler_(nullptr), driver_(CUDADriver::get_instance_without_context()), stream_(nullptr) { +thread_local void *CUDAContext::stream_ = nullptr; + +CUDAContext::CUDAContext() : profiler_(nullptr), driver_(CUDADriver::get_instance_without_context()) { // CUDA initialization dev_count_ = 0; driver_.init(0); diff --git a/quadrants/rhi/cuda/cuda_context.h b/quadrants/rhi/cuda/cuda_context.h index 955e10528a..97badf6b80 100644 --- a/quadrants/rhi/cuda/cuda_context.h +++ b/quadrants/rhi/cuda/cuda_context.h @@ -31,7 +31,7 @@ class CUDAContext { bool debug_; bool supports_mem_pool_; bool supports_pageable_memory_access_; - void *stream_; + static thread_local void *stream_; public: CUDAContext(); diff --git a/quadrants/rhi/cuda/cuda_driver_functions.inc.h b/quadrants/rhi/cuda/cuda_driver_functions.inc.h index 2847f136c4..55c5e3e0b8 100644 --- a/quadrants/rhi/cuda/cuda_driver_functions.inc.h +++ b/quadrants/rhi/cuda/cuda_driver_functions.inc.h @@ -20,6 +20,7 @@ PER_CUDA_FUNCTION(context_set_limit, cuCtxSetLimit, int, std::size_t); // Stream management PER_CUDA_FUNCTION(stream_create, cuStreamCreate, void **, uint32); +PER_CUDA_FUNCTION(stream_destroy, cuStreamDestroy_v2, void *); // Memory management PER_CUDA_FUNCTION(memcpy_host_to_device, cuMemcpyHtoD_v2, void *, void *, std::size_t); @@ -54,6 +55,7 @@ PER_CUDA_FUNCTION(kernel_set_attribute, cuFuncSetAttribute, void *, CUfunction_a // Stream management PER_CUDA_FUNCTION(stream_synchronize, cuStreamSynchronize, void *); +PER_CUDA_FUNCTION(stream_wait_event, cuStreamWaitEvent, void *, void *, uint32); // Event management PER_CUDA_FUNCTION(event_create, cuEventCreate, void **, uint32) diff --git a/quadrants/runtime/cuda/kernel_launcher.cpp b/quadrants/runtime/cuda/kernel_launcher.cpp index c418562a34..a52acf2ebf 100644 --- a/quadrants/runtime/cuda/kernel_launcher.cpp +++ b/quadrants/runtime/cuda/kernel_launcher.cpp @@ -1,6 +1,7 @@ #include "quadrants/runtime/cuda/kernel_launcher.h" #include "quadrants/runtime/cuda/cuda_utils.h" #include "quadrants/rhi/cuda/cuda_context.h" +#include "quadrants/rhi/cuda/cuda_driver.h" #include "quadrants/runtime/llvm/llvm_runtime_executor.h" #include @@ -32,15 +33,17 @@ std::size_t resolve_num_threads(const AdStackSizingInfo &info, LlvmRuntimeExecut std::int32_t begin = info.begin_const_value; std::int32_t end = info.end_const_value; if (info.begin_offset_bytes >= 0 || info.end_offset_bytes >= 0) { + auto *active_stream = CUDAContext::get_instance().get_stream(); auto *temp_dev_ptr = reinterpret_cast(executor->get_runtime_temporaries_device_ptr()); if (info.begin_offset_bytes >= 0) { - CUDADriver::get_instance().memcpy_device_to_host(&begin, temp_dev_ptr + info.begin_offset_bytes, - sizeof(std::int32_t)); + CUDADriver::get_instance().memcpy_device_to_host_async(&begin, temp_dev_ptr + info.begin_offset_bytes, + sizeof(std::int32_t), active_stream); } if (info.end_offset_bytes >= 0) { - CUDADriver::get_instance().memcpy_device_to_host(&end, temp_dev_ptr + info.end_offset_bytes, - sizeof(std::int32_t)); + CUDADriver::get_instance().memcpy_device_to_host_async(&end, temp_dev_ptr + info.end_offset_bytes, + sizeof(std::int32_t), active_stream); } + CUDADriver::get_instance().stream_synchronize(active_stream); } // Clamp the logical iteration count to the launched thread count: adstack slices are indexed by // `linear_thread_idx()` (`block_idx * block_dim + thread_idx`), so only `static_num_threads = grid_dim * block_dim` @@ -166,8 +169,9 @@ void KernelLauncher::launch_offloaded_tasks_with_do_while(LaunchContextBuilder & launch_offloaded_tasks(ctx, cuda_module, offloaded_tasks, device_context_ptr); counter_val = 0; auto *stream = CUDAContext::get_instance().get_stream(); + CUDADriver::get_instance().memcpy_device_to_host_async(&counter_val, ctx.graph_do_while_flag_dev_ptr, + sizeof(int32_t), stream); CUDADriver::get_instance().stream_synchronize(stream); - CUDADriver::get_instance().memcpy_device_to_host(&counter_val, ctx.graph_do_while_flag_dev_ptr, sizeof(int32_t)); } while (counter_val != 0); } @@ -209,9 +213,11 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx // kernels. std::unordered_map device_ptrs; + auto *active_stream = CUDAContext::get_instance().get_stream(); + char *device_result_buffer{nullptr}; CUDADriver::get_instance().malloc_async((void **)&device_result_buffer, - std::max(ctx.result_buffer_size, sizeof(uint64)), nullptr); + std::max(ctx.result_buffer_size, sizeof(uint64)), active_stream); ctx.get_context().runtime = executor->get_llvm_runtime(); for (int i = 0; i < (int)parameters.size(); i++) { @@ -244,14 +250,16 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx device_ptrs[data_ptr_idx] = executor->get_device_alloc_info_ptr(devalloc); transfers[data_ptr_idx] = {data_ptr, devalloc}; - CUDADriver::get_instance().memcpy_host_to_device((void *)device_ptrs[data_ptr_idx], data_ptr, arr_sz); + CUDADriver::get_instance().memcpy_host_to_device_async((void *)device_ptrs[data_ptr_idx], data_ptr, arr_sz, + active_stream); if (grad_ptr != nullptr) { DeviceAllocation grad_devalloc = executor->allocate_memory_on_device(arr_sz, (uint64 *)device_result_buffer); device_ptrs[grad_ptr_idx] = executor->get_device_alloc_info_ptr(grad_devalloc); transfers[grad_ptr_idx] = {grad_ptr, grad_devalloc}; - CUDADriver::get_instance().memcpy_host_to_device((void *)device_ptrs[grad_ptr_idx], grad_ptr, arr_sz); + CUDADriver::get_instance().memcpy_host_to_device_async((void *)device_ptrs[grad_ptr_idx], grad_ptr, arr_sz, + active_stream); } else { device_ptrs[grad_ptr_idx] = nullptr; } @@ -282,7 +290,7 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx } } if (transfers.size() > 0) { - CUDADriver::get_instance().stream_synchronize(nullptr); + CUDADriver::get_instance().stream_synchronize(active_stream); } char *host_result_buffer = (char *)ctx.get_context().result_buffer; if (ctx.result_buffer_size > 0) { @@ -290,9 +298,9 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx } char *device_arg_buffer = nullptr; if (ctx.arg_buffer_size > 0) { - CUDADriver::get_instance().malloc_async((void **)&device_arg_buffer, ctx.arg_buffer_size, nullptr); + CUDADriver::get_instance().malloc_async((void **)&device_arg_buffer, ctx.arg_buffer_size, active_stream); CUDADriver::get_instance().memcpy_host_to_device_async(device_arg_buffer, ctx.get_context().arg_buffer, - ctx.arg_buffer_size, nullptr); + ctx.arg_buffer_size, active_stream); ctx.get_context().arg_buffer = device_arg_buffer; } @@ -317,9 +325,9 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx needs_sizer_device_ctx = needs_sizer_device_ctx && !CUDAContext::get_instance().supports_pageable_memory_access(); void *device_context_ptr = nullptr; if (needs_sizer_device_ctx) { - CUDADriver::get_instance().malloc_async(&device_context_ptr, sizeof(RuntimeContext), nullptr); + CUDADriver::get_instance().malloc_async(&device_context_ptr, sizeof(RuntimeContext), active_stream); CUDADriver::get_instance().memcpy_host_to_device_async(device_context_ptr, &ctx.get_context(), - sizeof(RuntimeContext), nullptr); + sizeof(RuntimeContext), active_stream); } if (ctx.graph_do_while_arg_id >= 0) { @@ -329,25 +337,30 @@ void KernelLauncher::launch_llvm_kernel(Handle handle, LaunchContextBuilder &ctx launch_offloaded_tasks(ctx, cuda_module, offloaded_tasks, device_context_ptr); } if (needs_sizer_device_ctx) { - CUDADriver::get_instance().mem_free_async(device_context_ptr, nullptr); + CUDADriver::get_instance().mem_free_async(device_context_ptr, active_stream); } if (ctx.arg_buffer_size > 0) { - CUDADriver::get_instance().mem_free_async(device_arg_buffer, nullptr); + CUDADriver::get_instance().mem_free_async(device_arg_buffer, active_stream); } if (ctx.result_buffer_size > 0) { CUDADriver::get_instance().memcpy_device_to_host_async(host_result_buffer, device_result_buffer, - ctx.result_buffer_size, nullptr); + ctx.result_buffer_size, active_stream); } - CUDADriver::get_instance().mem_free_async(device_result_buffer, nullptr); + CUDADriver::get_instance().mem_free_async(device_result_buffer, active_stream); // copy data back to host if (transfers.size() > 0) { - CUDADriver::get_instance().stream_synchronize(nullptr); + CUDADriver::get_instance().stream_synchronize(active_stream); for (auto itr = transfers.begin(); itr != transfers.end(); itr++) { auto &idx = itr->first; - CUDADriver::get_instance().memcpy_device_to_host(itr->second.first, (void *)device_ptrs[idx], - ctx.array_runtime_sizes[idx.arg_id]); + CUDADriver::get_instance().memcpy_device_to_host_async(itr->second.first, (void *)device_ptrs[idx], + ctx.array_runtime_sizes[idx.arg_id], active_stream); + } + CUDADriver::get_instance().stream_synchronize(active_stream); + for (auto itr = transfers.begin(); itr != transfers.end(); itr++) { executor->deallocate_memory_on_device(itr->second.second); } + } else if (ctx.result_buffer_size > 0) { + CUDADriver::get_instance().stream_synchronize(active_stream); } } diff --git a/tests/python/test_api.py b/tests/python/test_api.py index c7924a97d5..0427d2f320 100644 --- a/tests/python/test_api.py +++ b/tests/python/test_api.py @@ -62,6 +62,7 @@ def _get_expected_matrix_apis(): "DEBUG", "DeviceCapability", "ERROR", + "Event", "Field", "FieldsBuilder", "Format", @@ -76,6 +77,7 @@ def _get_expected_matrix_apis(): "SNode", "ScalarField", "ScalarNdarray", + "Stream", "Struct", "StructField", "TRACE", @@ -124,6 +126,8 @@ def _get_expected_matrix_apis(): "clock_freq_hz", "cos", "cpu", + "create_event", + "create_stream", "cuda", "data_oriented", "dataclass", diff --git a/tests/python/test_cache.py b/tests/python/test_cache.py index c3821e44c5..e31daf61e7 100644 --- a/tests/python/test_cache.py +++ b/tests/python/test_cache.py @@ -216,11 +216,11 @@ def test_fastcache(tmp_path: pathlib.Path, monkeypatch): qd_init_same_arch(offline_cache_file_path=str(tmp_path), offline_cache=True) is_valid = False - def launch_kernel(self, key, t_kernel, compiled_kernel_data, *args): + def launch_kernel(self, key, t_kernel, compiled_kernel_data, *args, qd_stream=None): nonlocal is_valid is_valid = True assert compiled_kernel_data is None - return launch_kernel_orig(self, key, t_kernel, compiled_kernel_data, *args) + return launch_kernel_orig(self, key, t_kernel, compiled_kernel_data, *args, qd_stream=qd_stream) monkeypatch.setattr("quadrants.lang.kernel_impl.Kernel.launch_kernel", launch_kernel) @@ -242,11 +242,11 @@ def fun(value: qd.types.ndarray(), offset: qd.template()): qd_init_same_arch(offline_cache_file_path=str(tmp_path), offline_cache=True) is_valid = False - def launch_kernel(self, key, t_kernel, compiled_kernel_data, *args): + def launch_kernel(self, key, t_kernel, compiled_kernel_data, *args, qd_stream=None): nonlocal is_valid is_valid = True assert compiled_kernel_data is not None - return launch_kernel_orig(self, key, t_kernel, compiled_kernel_data, *args) + return launch_kernel_orig(self, key, t_kernel, compiled_kernel_data, *args, qd_stream=qd_stream) monkeypatch.setattr("quadrants.lang.kernel_impl.Kernel.launch_kernel", launch_kernel) diff --git a/tests/python/test_perf_dispatch.py b/tests/python/test_perf_dispatch.py index 257775e6e1..55e96a308a 100644 --- a/tests/python/test_perf_dispatch.py +++ b/tests/python/test_perf_dispatch.py @@ -143,7 +143,7 @@ def my_func1_impl_a_shape0_ge_2( assert len(speed_checker._trial_count_by_dispatch_impl_by_geometry_hash[geometry]) == 2 -@test_utils.test() +@test_utils.test(exclude=[qd.vulkan]) def test_perf_dispatch_python() -> None: WARMUP = 1 ACTIVE = 1 diff --git a/tests/python/test_streams.py b/tests/python/test_streams.py new file mode 100644 index 0000000000..db7588aaf7 --- /dev/null +++ b/tests/python/test_streams.py @@ -0,0 +1,289 @@ +"""Tests for GPU stream and event support.""" + +import numpy as np +import pytest + +import quadrants as qd +from quadrants.lang.stream import Event, Stream + +from tests import test_utils + + +@test_utils.test(arch=[qd.cuda]) +def test_create_and_destroy_stream(): + s = qd.create_stream() + assert isinstance(s, Stream) + assert s.handle != 0 + s.destroy() + assert s.handle == 0 + + +@test_utils.test(arch=[qd.cuda]) +def test_create_and_destroy_event(): + e = qd.create_event() + assert isinstance(e, Event) + assert e.handle != 0 + e.destroy() + assert e.handle == 0 + + +@test_utils.test() +def test_kernel_on_stream(): + N = 1024 + x = qd.field(qd.f32, shape=(N,)) + + @qd.kernel + def fill(): + for i in range(N): + x[i] = 42.0 + + s = qd.create_stream() + fill(qd_stream=s) + s.synchronize() + assert np.allclose(x.to_numpy(), 42.0) + s.destroy() + + +@test_utils.test() +def test_two_streams(): + N = 1024 + a = qd.field(qd.f32, shape=(N,)) + b = qd.field(qd.f32, shape=(N,)) + + @qd.kernel + def fill_a(): + for i in range(N): + a[i] = 1.0 + + @qd.kernel + def fill_b(): + for i in range(N): + b[i] = 2.0 + + s1 = qd.create_stream() + s2 = qd.create_stream() + fill_a(qd_stream=s1) + fill_b(qd_stream=s2) + s1.synchronize() + s2.synchronize() + assert np.allclose(a.to_numpy(), 1.0) + assert np.allclose(b.to_numpy(), 2.0) + s1.destroy() + s2.destroy() + + +@test_utils.test() +def test_event_synchronization(): + N = 1024 + x = qd.field(qd.f32, shape=(N,)) + y = qd.field(qd.f32, shape=(N,)) + + @qd.kernel + def fill_x(): + for i in range(N): + x[i] = 10.0 + + @qd.kernel + def copy_x_to_y(): + for i in range(N): + y[i] = x[i] + + s1 = qd.create_stream() + fill_x(qd_stream=s1) + + e = qd.create_event() + e.record(s1) + + # Default stream waits for s1 to finish fill_x + e.wait() + copy_x_to_y() + qd.sync() + + assert np.allclose(y.to_numpy(), 10.0) + + e.destroy() + s1.destroy() + + +@test_utils.test() +def test_event_wait_on_stream(): + N = 1024 + x = qd.field(qd.f32, shape=(N,)) + y = qd.field(qd.f32, shape=(N,)) + + @qd.kernel + def fill_x(): + for i in range(N): + x[i] = 5.0 + + @qd.kernel + def copy_x_to_y(): + for i in range(N): + y[i] = x[i] + + s1 = qd.create_stream() + s2 = qd.create_stream() + + fill_x(qd_stream=s1) + + e = qd.create_event() + e.record(s1) + + # s2 waits for s1's event before running + e.wait(qd_stream=s2) + copy_x_to_y(qd_stream=s2) + s2.synchronize() + + assert np.allclose(y.to_numpy(), 5.0) + + e.destroy() + s1.destroy() + s2.destroy() + + +@test_utils.test() +def test_default_stream_kernel(): + N = 1024 + x = qd.field(qd.f32, shape=(N,)) + + @qd.kernel + def fill(): + for i in range(N): + x[i] = 7.0 + + fill() + qd.sync() + assert np.allclose(x.to_numpy(), 7.0) + + +@test_utils.test(arch=[qd.cpu]) +def test_stream_noop_on_cpu(): + """Streams should be no-ops on CPU without errors.""" + N = 64 + x = qd.field(qd.f32, shape=(N,)) + + @qd.kernel + def fill(): + for i in range(N): + x[i] = 3.0 + + s = qd.create_stream() + assert s.handle == 0 + fill(qd_stream=s) + qd.sync() + assert np.allclose(x.to_numpy(), 3.0) + + e = qd.create_event() + assert e.handle == 0 + e.record(s) + e.wait() + s.destroy() + e.destroy() + + +@test_utils.test() +def test_stream_with_ndarray(): + N = 1024 + + @qd.kernel + def fill(arr: qd.types.ndarray(dtype=qd.f32, ndim=1)): + for i in range(N): + arr[i] = 99.0 + + arr = qd.ndarray(qd.f32, shape=(N,)) + s = qd.create_stream() + fill(arr, qd_stream=s) + s.synchronize() + assert np.allclose(arr.to_numpy(), 99.0) + s.destroy() + + +@test_utils.test() +def test_stream_context_manager(): + N = 64 + x = qd.field(qd.f32, shape=(N,)) + + @qd.kernel + def fill(): + for i in range(N): + x[i] = 11.0 + + with qd.create_stream() as s: + fill(qd_stream=s) + s.synchronize() + assert s.handle == 0 + assert np.allclose(x.to_numpy(), 11.0) + + +@test_utils.test() +def test_event_context_manager(): + with qd.create_event() as e: + assert isinstance(e, Event) + assert e.handle == 0 + + +@test_utils.test() +def test_event_synchronize(): + N = 64 + x = qd.field(qd.f32, shape=(N,)) + + @qd.kernel + def fill(): + for i in range(N): + x[i] = 13.0 + + s = qd.create_stream() + fill(qd_stream=s) + e = qd.create_event() + e.record(s) + e.synchronize() + assert np.allclose(x.to_numpy(), 13.0) + e.destroy() + s.destroy() + + +@test_utils.test(arch=[qd.cuda]) +def test_stream_with_tape_raises(): + x = qd.field(qd.f32, shape=(), needs_grad=True) + loss = qd.field(qd.f32, shape=(), needs_grad=True) + + @qd.kernel + def compute(): + loss[None] = x[None] ** 2 + + s = qd.create_stream() + with pytest.raises(RuntimeError, match="not compatible with autograd Tape"): + with qd.ad.Tape(loss): + compute(qd_stream=s) + s.destroy() + + +@test_utils.test(arch=[qd.cuda]) +def test_stream_with_autodiff_kernel_raises(): + x = qd.field(qd.f32, shape=(), needs_grad=True) + loss = qd.field(qd.f32, shape=(), needs_grad=True) + + @qd.kernel + def compute(): + loss[None] = x[None] ** 2 + + s = qd.create_stream() + with pytest.raises(RuntimeError, match="not compatible with autodiff"): + compute.grad(qd_stream=s) + s.destroy() + + +@test_utils.test(arch=[qd.cuda]) +def test_stream_with_graph_raises(): + N = 64 + x = qd.field(qd.f32, shape=(N,)) + + @qd.kernel(graph=True) + def fill(): + for i in range(N): + x[i] = 1.0 + + s = qd.create_stream() + with pytest.raises(RuntimeError, match="not compatible with graph=True"): + fill(qd_stream=s) + s.destroy()