From e13182bd4d24a2c5f4182a8fdc6be200c3aa113b Mon Sep 17 00:00:00 2001 From: Andy Jost Date: Thu, 16 Oct 2025 12:52:13 -0700 Subject: [PATCH 1/9] Initial implementation of IPC-enabled events. Changes the type of `max_size` memory resource attribute to size_t from int32. Various updates and additions to test helpers. --- cuda_core/cuda/core/experimental/_device.pyx | 3 +- cuda_core/cuda/core/experimental/_event.pxd | 2 + cuda_core/cuda/core/experimental/_event.pyx | 105 ++++++- cuda_core/cuda/core/experimental/_memory.pyx | 8 +- cuda_core/cuda/core/experimental/_stream.pyx | 8 +- .../tests/{helpers.py => helpers/__init__.py} | 2 +- cuda_core/tests/helpers/latch.py | 69 +++++ cuda_core/tests/memory_ipc/test_ipc_event.py | 270 ++++++++++++++++++ cuda_core/tests/memory_ipc/utility.py | 171 +++++++++-- cuda_core/tests/test_event.py | 51 +--- cuda_core/tests/test_helpers.py | 33 +++ 11 files changed, 629 insertions(+), 93 deletions(-) rename cuda_core/tests/{helpers.py => helpers/__init__.py} (96%) create mode 100644 cuda_core/tests/helpers/latch.py create mode 100644 cuda_core/tests/memory_ipc/test_ipc_event.py create mode 100644 cuda_core/tests/test_helpers.py diff --git a/cuda_core/cuda/core/experimental/_device.pyx b/cuda_core/cuda/core/experimental/_device.pyx index bcba09f985..1db2adbf8d 100644 --- a/cuda_core/cuda/core/experimental/_device.pyx +++ b/cuda_core/cuda/core/experimental/_device.pyx @@ -28,6 +28,7 @@ from cuda.core.experimental._utils.cuda_utils import ( from cuda.core.experimental._stream cimport default_stream + # TODO: I prefer to type these as "cdef object" and avoid accessing them from within Python, # but it seems it is very convenient to expose them for testing purposes... _tls = threading.local() @@ -1273,7 +1274,7 @@ class Device: """ self._check_context_initialized() ctx = self._get_current_context() - return Event._init(self._id, ctx, options) + return Event._init(self._id, ctx, options, True) def allocate(self, size, stream: Optional[Stream] = None) -> Buffer: """Allocate device memory from a specified stream. diff --git a/cuda_core/cuda/core/experimental/_event.pxd b/cuda_core/cuda/core/experimental/_event.pxd index 0972063af3..1f586f18df 100644 --- a/cuda_core/cuda/core/experimental/_event.pxd +++ b/cuda_core/cuda/core/experimental/_event.pxd @@ -11,6 +11,8 @@ cdef class Event: cydriver.CUevent _handle bint _timing_disabled bint _busy_waited + bint _ipc_enabled + object _ipc_descriptor int _device_id object _ctx_handle diff --git a/cuda_core/cuda/core/experimental/_event.pyx b/cuda_core/cuda/core/experimental/_event.pyx index 962556597a..327aef3f6f 100644 --- a/cuda_core/cuda/core/experimental/_event.pyx +++ b/cuda_core/cuda/core/experimental/_event.pyx @@ -4,7 +4,9 @@ from __future__ import annotations +cimport cpython from libc.stdint cimport uintptr_t +from libc.string cimport memcpy from cuda.bindings cimport cydriver @@ -14,12 +16,14 @@ from cuda.core.experimental._utils.cuda_utils cimport ( ) from dataclasses import dataclass +import multiprocessing from typing import TYPE_CHECKING, Optional from cuda.core.experimental._context import Context from cuda.core.experimental._utils.cuda_utils import ( CUDAError, driver, + handle_return, ) if TYPE_CHECKING: import cuda.bindings @@ -40,7 +44,7 @@ cdef class EventOptions: has actually been completed. Otherwise, the CPU thread will busy-wait until the event has been completed. (Default to False) - support_ipc : bool, optional + ipc_enabled : bool, optional Event will be suitable for interprocess use. Note that enable_timing must be False. (Default to False) @@ -48,7 +52,7 @@ cdef class EventOptions: enable_timing: Optional[bool] = False busy_waited_sync: Optional[bool] = False - support_ipc: Optional[bool] = False + ipc_enabled: Optional[bool] = False cdef class Event: @@ -86,24 +90,35 @@ cdef class Event: raise RuntimeError("Event objects cannot be instantiated directly. Please use Stream APIs (record).") @classmethod - def _init(cls, device_id: int, ctx_handle: Context, options=None): + def _init(cls, device_id: int, ctx_handle: Context, options=None, is_free=False): cdef Event self = Event.__new__(cls) cdef EventOptions opts = check_or_create_options(EventOptions, options, "Event options") cdef unsigned int flags = 0x0 self._timing_disabled = False self._busy_waited = False + self._ipc_enabled = False + self._ipc_descriptor = None if not opts.enable_timing: flags |= cydriver.CUevent_flags.CU_EVENT_DISABLE_TIMING self._timing_disabled = True if opts.busy_waited_sync: flags |= cydriver.CUevent_flags.CU_EVENT_BLOCKING_SYNC self._busy_waited = True - if opts.support_ipc: - raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/103") + if opts.ipc_enabled: + if is_free: + raise TypeError( + "IPC-enabled events must be bound; use Stream.record for creation." + ) + flags |= cydriver.CUevent_flags.CU_EVENT_INTERPROCESS + self._ipc_enabled = True + if not self._timing_disabled: + raise TypeError("IPC-enabled events cannot use timing.") with nogil: HANDLE_RETURN(cydriver.cuEventCreate(&self._handle, flags)) self._device_id = device_id self._ctx_handle = ctx_handle + if opts.ipc_enabled: + self.get_ipc_descriptor() return self cpdef close(self): @@ -151,6 +166,40 @@ cdef class Event: raise CUDAError(err) raise RuntimeError(explanation) + def get_ipc_descriptor(self) -> IPCEventDescriptor: + """Export an event allocated for sharing between processes.""" + if self._ipc_descriptor is not None: + return self._ipc_descriptor + if not self.is_ipc_enabled: + raise RuntimeError("Event is not IPC-enabled") + cdef cydriver.CUipcEventHandle data + with nogil: + HANDLE_RETURN(cydriver.cuIpcGetEventHandle(&data, (self._handle))) + cdef bytes data_b = cpython.PyBytes_FromStringAndSize((data.reserved), sizeof(data.reserved)) + self._ipc_descriptor = IPCEventDescriptor._init(data_b, self._busy_waited) + return self._ipc_descriptor + + @classmethod + def from_ipc_descriptor(cls, ipc_descriptor: IPCEventDescriptor) -> Event: + """Import an event that was exported from another process.""" + cdef cydriver.CUipcEventHandle data + memcpy(data.reserved, (ipc_descriptor._reserved), sizeof(data.reserved)) + cdef Event self = Event.__new__(cls) + with nogil: + HANDLE_RETURN(cydriver.cuIpcOpenEventHandle(&self._handle, data)) + self._timing_disabled = True + self._busy_waited = ipc_descriptor._busy_waited + self._ipc_enabled = True + self._ipc_descriptor = ipc_descriptor + self._device_id = -1 # ?? + self._ctx_handle = None # ?? + return self + + @property + def is_ipc_enabled(self) -> bool: + """Return True if the event can be shared across process boundaries, otherwise False.""" + return self._ipc_enabled + @property def is_timing_disabled(self) -> bool: """Return True if the event does not record timing data, otherwise False.""" @@ -161,11 +210,6 @@ cdef class Event: """Return True if the event synchronization would keep the CPU busy-waiting, otherwise False.""" return self._busy_waited - @property - def is_ipc_supported(self) -> bool: - """Return True if this event can be used as an interprocess event, otherwise False.""" - raise NotImplementedError("WIP: https://github.com/NVIDIA/cuda-python/issues/103") - def sync(self): """Synchronize until the event completes. @@ -212,12 +256,43 @@ cdef class Event: context is set current after a event is created. """ - - from cuda.core.experimental._device import Device # avoid circular import - - return Device(self._device_id) + if self._device_id >= 0: + from ._device import Device # avoid circular import + return Device(self._device_id) @property def context(self) -> Context: """Return the :obj:`~_context.Context` associated with this event.""" - return Context._from_ctx(self._ctx_handle, self._device_id) + if self._ctx_handle is not None and self._device_id >= 0: + return Context._from_ctx(self._ctx_handle, self._device_id) + + +cdef class IPCEventDescriptor: + """Serializable object describing an event that can be shared between processes.""" + + cdef: + bytes _reserved + bint _busy_waited + + def __init__(self, *arg, **kwargs): + raise RuntimeError("IPCEventDescriptor objects cannot be instantiated directly. Please use Event APIs.") + + @classmethod + def _init(cls, reserved: bytes, busy_waited: bint): + cdef IPCEventDescriptor self = IPCEventDescriptor.__new__(cls) + self._reserved = reserved + self._busy_waited = busy_waited + return self + + def __eq__(self, IPCEventDescriptor rhs): + # No need to check self._busy_waited. + return self._reserved == rhs._reserved + + def __reduce__(self): + return self._init, (self._reserved, self._busy_waited) + + +def _reduce_event(event): + return event.from_ipc_descriptor, (event.get_ipc_descriptor(),) + +multiprocessing.reduction.register(Event, _reduce_event) diff --git a/cuda_core/cuda/core/experimental/_memory.pyx b/cuda_core/cuda/core/experimental/_memory.pyx index 34e9d18e4b..d64e7cea9b 100644 --- a/cuda_core/cuda/core/experimental/_memory.pyx +++ b/cuda_core/cuda/core/experimental/_memory.pyx @@ -231,11 +231,11 @@ cdef class Buffer(_cyBuffer, MemoryResourceAttributes): if stream is None: # Note: match this behavior to DeviceMemoryResource.allocate() stream = default_stream() - cdef cydriver.CUmemPoolPtrExportData share_data - memcpy(share_data.reserved, (ipc_buffer._reserved), sizeof(share_data.reserved)) + cdef cydriver.CUmemPoolPtrExportData data + memcpy(data.reserved, (ipc_buffer._reserved), sizeof(data.reserved)) cdef cydriver.CUdeviceptr ptr with nogil: - HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._mempool_handle, &share_data)) + HANDLE_RETURN(cydriver.cuMemPoolImportPointer(&ptr, mr._mempool_handle, &data)) return Buffer._init(ptr, ipc_buffer.size, mr, stream) def copy_to(self, dst: Buffer = None, *, stream: Stream) -> Buffer: @@ -516,7 +516,7 @@ cdef class DeviceMemoryResourceOptions: (Default to 0) """ ipc_enabled : cython.bint = False - max_size : cython.int = 0 + max_size : cython.size_t = 0 # TODO: cythonize this? diff --git a/cuda_core/cuda/core/experimental/_stream.pyx b/cuda_core/cuda/core/experimental/_stream.pyx index 4afde6baff..87a948e8a8 100644 --- a/cuda_core/cuda/core/experimental/_stream.pyx +++ b/cuda_core/cuda/core/experimental/_stream.pyx @@ -262,7 +262,13 @@ cdef class Stream: # and CU_EVENT_RECORD_EXTERNAL, can be set in EventOptions. if event is None: self._get_device_and_context() - event = Event._init((self._device_id), (self._ctx_handle), options) + event = Event._init((self._device_id), (self._ctx_handle), options, False) + elif event.is_ipc_enabled: + raise TypeError( + "IPC-enabled events should not be re-recorded, instead create a " + "new event by supplying options." + ) + cdef cydriver.CUevent e = ((event))._handle with nogil: HANDLE_RETURN(cydriver.cuEventRecord(e, self._handle)) diff --git a/cuda_core/tests/helpers.py b/cuda_core/tests/helpers/__init__.py similarity index 96% rename from cuda_core/tests/helpers.py rename to cuda_core/tests/helpers/__init__.py index 10af3dcc22..e87122e649 100644 --- a/cuda_core/tests/helpers.py +++ b/cuda_core/tests/helpers/__init__.py @@ -22,7 +22,7 @@ import cuda_python_test_helpers except ImportError: # Import shared platform helpers for tests across repos - sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[2] / "cuda_python_test_helpers")) + sys.path.insert(0, str(pathlib.Path(__file__).resolve().parents[3] / "cuda_python_test_helpers")) import cuda_python_test_helpers diff --git a/cuda_core/tests/helpers/latch.py b/cuda_core/tests/helpers/latch.py new file mode 100644 index 0000000000..bb352c3200 --- /dev/null +++ b/cuda_core/tests/helpers/latch.py @@ -0,0 +1,69 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from cuda.core.experimental import ( + LaunchConfig, + LegacyPinnedMemoryResource, + Program, + ProgramOptions, + launch, +) +import helpers +import ctypes + +class LatchKernel: + """ + Manages a kernel that blocks progress until released. + """ + + def __init__(self, device): + code = """ + #include + + extern "C" + __global__ void latch(int* val) { + cuda::atomic_ref signal{*val}; + while (true) { + if (signal.load(cuda::memory_order_relaxed)) { + break; + } + } + } + """ + program_options = ProgramOptions( + std="c++17", + arch=f"sm_{''.join(f'{i}' for i in device.compute_capability)}", + include_path=helpers.CCCL_INCLUDE_PATHS, + ) + prog = Program(code, code_type="c++", options=program_options) + mod = prog.compile(target_type="cubin") + self.kernel = mod.get_kernel("latch") + + mr = LegacyPinnedMemoryResource() + self.buffer = mr.allocate(4) + self.busy_wait_flag[0] = 0 + + def launch(self, stream): + config = LaunchConfig(grid=1, block=1) + launch(stream, config, self.kernel, self.busy_wait_flag_address) + + def release(self): + self.busy_wait_flag[0] = 1 + + @property + def busy_wait_flag_address(self): + return int(self.buffer.handle) + + @property + def busy_wait_flag(self): + return ctypes.cast(self.busy_wait_flag_address, ctypes.POINTER(ctypes.c_int32)) + + def close(self): + buffer = getattr(self, 'buffer', None) + if buffer is not None: + buffer.close() + + def __del__(self): + self.close() + + diff --git a/cuda_core/tests/memory_ipc/test_ipc_event.py b/cuda_core/tests/memory_ipc/test_ipc_event.py new file mode 100644 index 0000000000..a589df6e05 --- /dev/null +++ b/cuda_core/tests/memory_ipc/test_ipc_event.py @@ -0,0 +1,270 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +from conftest import skipif_need_cuda_headers +from cuda.core.experimental import Device, DeviceMemoryResource, DeviceMemoryResourceOptions, EventOptions +from utility import TimestampedLogger, make_scratch_buffer, compare_buffers +from helpers.latch import LatchKernel +import ctypes +import pytest +import multiprocessing as mp +import time + +ENABLE_LOGGING = False # Set True for test debugging and development +CHILD_TIMEOUT_SEC = 20 +NBYTES = 1024*1024*1024 +POOL_SIZE = NBYTES +NCOPIES = 500 + +class TestIpcEvent: + """Check the basic usage of IPC-enabled events.""" + + def test_main(self, ipc_device): + log = TimestampedLogger(prefix="parent: ", enabled=ENABLE_LOGGING) + device = ipc_device + stream1 = device.create_stream() + mr_options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) + mr = DeviceMemoryResource(device, options=mr_options) + + # Start the child process. + q_out, q_in = [mp.Queue() for _ in range(2)] + process = mp.Process(target=self.child_main, args=(log, q_out, q_in)) + process.start() + + # Prepare scratch buffers. + target = make_scratch_buffer(device, 0, NBYTES) + ones = make_scratch_buffer(device, 1, NBYTES) + twos = make_scratch_buffer(device, 2, NBYTES) + + # Allocate the buffer and send it to the child. + buffer = mr.allocate(NBYTES, stream=stream1) + log("sending buffer") + q_out.put(buffer) + + e_copy_start, e_copy_end = [device.create_event({"enable_timing": True}) for _ in range(2)] + + # Stream 1: + stream1.record(e_copy_start) + log("begin enqueuing copies on stream1") + for i in range(NCOPIES): + buffer.copy_from(ones, stream=stream1) + if i % 100 == 0: + log(f"{i:>3}/{NCOPIES}") + stream1.record(e_copy_end) + log("done enqueuing copies") + + ipc_event_options = EventOptions(ipc_enabled=True) + e = stream1.record(options=ipc_event_options) + log(f"recorded event ({hex(e.handle)})") + q_out.put(e) + log("sent event") + + # Wait on the child. + log("waiting for copies") + e_copy_end.sync() + parent_done_copying_timestamp = time.time_ns() + log("done copying") + process.join() + assert process.exitcode == 0 + log("done") + + # Finish up. + target.copy_from(buffer, stream=stream1) + stream1.sync() + assert compare_buffers(target, twos) == 0 + elapsed_ms = e_copy_end - e_copy_start + log(f"Elapsed time for {NCOPIES} copies: {int(elapsed_ms)} ms") + + # Make sure the child finished enqueuing its work before the copies finished; + # otherwise the test has no meaning. If this trips, adjust NCOPIES and/or + # NBYTES. + child_done_enqueuing_timestamp = q_in.get(timeout=CHILD_TIMEOUT_SEC) + assert child_done_enqueuing_timestamp < parent_done_copying_timestamp + + + def child_main(self, log, q_in, q_out): + log.prefix = " child: " + log("ready") + device = Device() + device.set_current() + stream2 = device.create_stream() + twos = make_scratch_buffer(device, 2, NBYTES) + buffer = q_in.get(timeout=CHILD_TIMEOUT_SEC) + log("got buffer") + e = q_in.get(timeout=CHILD_TIMEOUT_SEC) + log(f"got event ({hex(e.handle)})") + stream2.wait(e) + log("enqueuing copy on stream2") + buffer.copy_from(twos, stream=stream2) + q_out.put(time.time_ns()) # Time when enqueuing finished + log("waiting") + stream2.sync() + log("done") + +class TestIpcEventWithLatch: + """Check the basic usage of IPC-enabled events with a latch kernel.""" + + @skipif_need_cuda_headers # libcu++ + def test_main(self, ipc_device): + log = TimestampedLogger(prefix="parent: ", enabled=ENABLE_LOGGING) + device = ipc_device + stream1 = device.create_stream() + mr_options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) + mr = DeviceMemoryResource(device, options=mr_options) + + # Start the child process. + q_out, q_in = [mp.Queue() for _ in range(2)] + process = mp.Process(target=self.child_main, args=(log, q_out, q_in)) + process.start() + + # Prepare scratch buffers. + target = make_scratch_buffer(device, 0, NBYTES) + ones = make_scratch_buffer(device, 1, NBYTES) + twos = make_scratch_buffer(device, 2, NBYTES) + + # Allocate the buffer and send it to the child. + buffer = mr.allocate(NBYTES, stream=stream1) + log("sending buffer") + q_out.put(buffer) + + # Stream 1: + latch = LatchKernel(device) + log("enqueuing latch kernel on stream1") + latch.launch(stream1) + log("enqueuing copy on stream1") + buffer.copy_from(ones, stream=stream1) + + ipc_event_options = EventOptions(ipc_enabled=True) + e = stream1.record(options=ipc_event_options) + log(f"recorded event ({hex(e.handle)})") + q_out.put(e) + log("sent event") + + # Wait on the child. + log("waiting for child") + none = q_in.get(timeout=CHILD_TIMEOUT_SEC) + assert none is None + + log("releasing stream1") + latch.release() + process.join() + assert process.exitcode == 0 + log("done") + + # Finish up. + target.copy_from(buffer, stream=stream1) + stream1.sync() + assert compare_buffers(target, twos) == 0 + + + def child_main(self, log, q_in, q_out): + log.prefix = " child: " + log("ready") + device = Device() + device.set_current() + stream2 = device.create_stream() + twos = make_scratch_buffer(device, 2, NBYTES) + buffer = q_in.get(timeout=CHILD_TIMEOUT_SEC) + log("got buffer") + e = q_in.get(timeout=CHILD_TIMEOUT_SEC) + log(f"got event ({hex(e.handle)})") + stream2.wait(e) + log("enqueuing copy on stream2") + buffer.copy_from(twos, stream=stream2) + log("signaling parent") + q_out.put(None) + log("waiting") + stream2.sync() + log("done") + + +def test_event_is_monadic(ipc_device): + """Check that IPC-enabled events are always bound and cannot be reset.""" + device = ipc_device + with pytest.raises(TypeError, match=r"^IPC-enabled events must be bound; use Stream.record for creation\.$"): + device.create_event({"ipc_enabled": True}) + + stream = device.create_stream() + e = stream.record(options={"ipc_enabled": True}) + with pytest.raises(TypeError, match=r"^IPC-enabled events should not be re-recorded, instead create a new event by supplying options\.$"): + stream.record(e) + + +@pytest.mark.parametrize( + "options", [ {"ipc_enabled": True, "enable_timing": True}, + EventOptions(ipc_enabled=True, enable_timing=True)] +) +def test_event_timing_disabled(ipc_device, options): + """Check that IPC-enabled events cannot be created with timing enabled.""" + device = ipc_device + stream = device.create_stream() + with pytest.raises(TypeError, match=r"^IPC-enabled events cannot use timing\.$"): + stream.record(options=options) + +class TestIpcEventProperties: + """ + Check that event properties are properly set after transfer to a child + process. + """ + @pytest.mark.parametrize("busy_waited_sync", [True, False]) + @pytest.mark.parametrize("use_options_cls", [True, False]) + @pytest.mark.parametrize("use_option_kw", [True, False]) + def test_main(self, ipc_device, busy_waited_sync, use_options_cls, use_option_kw): + device = ipc_device + stream = device.create_stream() + + # Start the child process. + q_out, q_in = [mp.Queue() for _ in range(2)] + process = mp.Process(target=self.child_main, args=(q_out, q_in)) + process.start() + + # Create an event and send it. + options = \ + EventOptions(ipc_enabled=True, busy_waited_sync=busy_waited_sync) \ + if use_options_cls else \ + {"ipc_enabled": True, "busy_waited_sync": busy_waited_sync} + e = stream.record(options=options) \ + if use_option_kw else \ + stream.record(None, options) + q_out.put(e) + + # Check its properties. + props = q_in.get(timeout=CHILD_TIMEOUT_SEC) + assert props[0] == e.get_ipc_descriptor() + assert props[1] == e.is_ipc_enabled + assert props[2] == e.is_timing_disabled + assert props[3] == e.is_sync_busy_waited + assert props[4] is None + assert props[5] is None + + process.join() + assert process.exitcode == 0 + + def child_main(self, q_in, q_out): + device = Device() + device.set_current() + stream = device.create_stream() + + # Get the event. + e = q_in.get(timeout=CHILD_TIMEOUT_SEC) + + # Send its properties. + props = (e.get_ipc_descriptor(), + e.is_ipc_enabled, + e.is_timing_disabled, + e.is_sync_busy_waited, + e.device, + e.context,) + q_out.put(props) + + + +# TODO: daisy chain processes + +if __name__ == "__main__": + mp.set_start_method("spawn") + device = Device() + device.set_current() + TestIpcEventWithLatch().test_main(device) + + diff --git a/cuda_core/tests/memory_ipc/utility.py b/cuda_core/tests/memory_ipc/utility.py index 7ce7752b6d..d364f694cb 100644 --- a/cuda_core/tests/memory_ipc/utility.py +++ b/cuda_core/tests/memory_ipc/utility.py @@ -2,10 +2,17 @@ # SPDX-License-Identifier: Apache-2.0 import ctypes +import sys +import time from cuda.core.experimental import Buffer, MemoryResource from cuda.core.experimental._utils.cuda_utils import driver, handle_return +if sys.platform.startswith("win"): + libc = ctypes.CDLL("msvcrt.dll") +else: + libc = ctypes.CDLL("libc.so.6") + class DummyUnifiedMemoryResource(MemoryResource): def __init__(self, device): @@ -32,34 +39,148 @@ def device_id(self) -> int: class IPCBufferTestHelper: - """A helper for manipulating memory buffers in IPC tests. + """ + A helper for manipulating memory buffers in IPC tests. + + Provides methods to fill a target buffer with a known test pattern and + verify the expected values. + + If a stream is provided, operations are synchronized with respect to that + stream. Otherwise, they are synchronized over the device. - Provides methods to fill a buffer with one of two test patterns and verify - the expected values. + The test pattern is either a fixed value or a cyclic function of the byte + offset. The cyclic pattern is based on two arguments: + + `flipped`: whether to flip (invert) each byte. + `starting_from`: an offset to start counting from. + + For a fixed pattern, specify `value`. This supercedes the above arguments. + + Distinct test patterns are stored in separate buffers called pattern + buffers. Calls to `fill_buffer` copy from a pattern buffer to the target + buffer. Calls to `verify_buffer` copy from the target buffer to a scratch + buffer and then perform a comparison. """ - def __init__(self, device, buffer): + def __init__(self, device, buffer, stream=None): self.device = device self.buffer = buffer + self.stream = stream if stream is not None else device.create_stream() + self.sync_target = stream if stream is not None else device self.scratch_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.buffer.size) - self.stream = device.create_stream() - - def fill_buffer(self, flipped=False, starting_from=0): - """Fill a device buffer with test pattern using unified memory.""" - ptr = ctypes.cast(int(self.scratch_buffer.handle), ctypes.POINTER(ctypes.c_byte)) - op = (lambda i: 255 - i) if flipped else (lambda i: i) - for i in range(self.buffer.size): - ptr[i] = ctypes.c_byte(op(starting_from + i)) - self.buffer.copy_from(self.scratch_buffer, stream=self.stream) - self.device.sync() - - def verify_buffer(self, flipped=False, starting_from=0): - """Verify the buffer contents.""" - self.scratch_buffer.copy_from(self.buffer, stream=self.stream) - self.device.sync() - ptr = ctypes.cast(int(self.scratch_buffer.handle), ctypes.POINTER(ctypes.c_byte)) - op = (lambda i: 255 - i) if flipped else (lambda i: i) - for i in range(self.buffer.size): - assert ctypes.c_byte(ptr[i]).value == ctypes.c_byte(op(starting_from + i)).value, ( - f"Buffer contains incorrect data at index {i}" - ) + self.pattern_buffers = {} + + @property + def size(self): + """The buffer size in bytes.""" + return self.buffer.size + + def fill_buffer(self, flipped=False, starting_from=0, value=None, repeat=1, sync=True): + """Fill a device buffer with a sequential test pattern using unified memory.""" + pattern_buffer = self._get_pattern_buffer(flipped, starting_from, value) + for _ in range(repeat): + self.buffer.copy_from(pattern_buffer, stream=self.stream) + if sync: + self.sync() + + def verify_buffer(self, flipped=False, starting_from=0, value=None, repeat=1): + """Verify the buffer contents against a sequential pattern.""" + ptr_test = self._ptr(self.scratch_buffer) + pattern_buffer = self._get_pattern_buffer(flipped, starting_from, value) + ptr_expected = self._ptr(pattern_buffer) + for _ in range(repeat): + self.scratch_buffer.copy_from(self.buffer, stream=self.stream) + self.sync() + assert libc.memcmp(ptr_test, ptr_expected, self.size) == 0 + + def sync(self): + """Synchronize against the sync target (a stream or device).""" + self.sync_target.sync() + + @staticmethod + def _ptr(buffer): + """Get a pointer to the specified buffer.""" + return ctypes.cast(int(buffer.handle), ctypes.POINTER(ctypes.c_ubyte)) + + def _get_pattern_buffer(self, flipped, starting_from, value): + """Get a buffer holding the specified test pattern.""" + assert value is None or (not flipped and starting_from == 0) + key = (value & 0xFF,) if value is not None else (flipped, starting_from) + pattern_buffer = self.pattern_buffers.get(key, None) + if pattern_buffer is None: + if value is not None: + pattern_buffer = make_scratch_buffer(self.device, value, self.size) + else: + pattern_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.size) + ptr = self._ptr(pattern_buffer) + pattern = lambda i: (starting_from + i) & 0xFF # noqa: E731 + if flipped: + for i in range(self.size): + ptr[i] = ~pattern(i) + else: + for i in range(self.size): + ptr[i] = pattern(i) + self.pattern_buffers[key] = pattern_buffer + return pattern_buffer + + +def make_scratch_buffer(device, value, nbytes): + """Create a unified memory buffer with the specified value.""" + buffer = DummyUnifiedMemoryResource(device).allocate(nbytes) + ptr = ctypes.cast(int(buffer.handle), ctypes.POINTER(ctypes.c_byte)) + ctypes.memset(ptr, value & 0xFF, nbytes) + return buffer + + +def compare_buffers(buffer1, buffer2): + """Compare the contents of two host-accessible buffers a la memcmp.""" + assert buffer1.size == buffer2.size + ptr1 = ctypes.cast(int(buffer1.handle), ctypes.POINTER(ctypes.c_byte)) + ptr2 = ctypes.cast(int(buffer2.handle), ctypes.POINTER(ctypes.c_byte)) + return libc.memcmp(ptr1, ptr2, buffer1.size) + + +class TimestampedLogger: + """ + A logger that prefixes each output with a timestamp, containing the elapsed + time since the logger was created. + + Example: + + import multiprocess as mp + import time + + def main(): + log = TimestampedLogger(prefix="parent: ") + log("begin") + process = mp.Process(target=child_main, args=(log,)) + process.start() + process.join() + log("done") + + def child_main(log): + log.prefix = " child: " + log("begin") + time.sleep(1) + log("done") + + if __name__ == "__main__": + main() + + Possible output: + + [ 0.003 ms] parent: begin + [ 819.464 ms] child: begin + [ 1819.666 ms] child: done + [ 1882.954 ms] parent: done + """ + + def __init__(self, prefix=None, start_time=None, enabled=True): + self.prefix = "" if prefix is None else prefix + self.start_time = start_time if start_time is not None else time.time_ns() + self.enabled = enabled + + def __call__(self, msg): + if self.enabled: + now = (time.time_ns() - self.start_time) * 1e-6 + print(f"[{now:>10.3f} ms] {self.prefix}{msg}") diff --git a/cuda_core/tests/test_event.py b/cuda_core/tests/test_event.py index 746121946d..9db073a388 100644 --- a/cuda_core/tests/test_event.py +++ b/cuda_core/tests/test_event.py @@ -5,19 +5,13 @@ import time import cuda.core.experimental -import helpers -import numpy as np import pytest from cuda.core.experimental import ( Device, Event, EventOptions, - LaunchConfig, - LegacyPinnedMemoryResource, - Program, - ProgramOptions, - launch, ) +from helpers.latch import LatchKernel from conftest import skipif_need_cuda_headers from cuda_python_test_helpers import IS_WSL @@ -122,59 +116,24 @@ def test_error_timing_recorded(): @skipif_need_cuda_headers # libcu++ -@pytest.mark.skipif(tuple(int(i) for i in np.__version__.split(".")[:2]) < (2, 1), reason="need numpy 2.1.0+") def test_error_timing_incomplete(): device = Device() device.set_current() - - # This kernel is designed to busy loop until a signal is received - code = """ -#include - -extern "C" -__global__ void wait(int* val) { - cuda::atomic_ref signal{*val}; - while (true) { - if (signal.load(cuda::memory_order_relaxed)) { - break; - } - } -} -""" - - arch = "".join(f"{i}" for i in device.compute_capability) - program_options = ProgramOptions( - std="c++17", - arch=f"sm_{arch}", - include_path=helpers.CCCL_INCLUDE_PATHS, - ) - prog = Program(code, code_type="c++", options=program_options) - mod = prog.compile(target_type="cubin") - ker = mod.get_kernel("wait") - - mr = LegacyPinnedMemoryResource() - b = mr.allocate(4) - arr = np.from_dlpack(b).view(np.int32) - arr[0] = 0 - - config = LaunchConfig(grid=1, block=1) - ker_args = (arr.ctypes.data,) - + latch = LatchKernel(device) enabled = EventOptions(enable_timing=True) stream = device.create_stream() event1 = stream.record(options=enabled) - launch(stream, config, ker, *ker_args) + latch.launch(stream) event3 = stream.record(options=enabled) - # event3 will never complete because the stream is waiting on wait() to complete + # event3 will never complete because the latch has not been released with pytest.raises(RuntimeError, match="^One or both events have not completed."): event3 - event1 - arr[0] = 1 + latch.release() event3.sync() event3 - event1 # this should work - b.close() def test_event_device(init_cuda): diff --git a/cuda_core/tests/test_helpers.py b/cuda_core/tests/test_helpers.py new file mode 100644 index 0000000000..3bc781f5f3 --- /dev/null +++ b/cuda_core/tests/test_helpers.py @@ -0,0 +1,33 @@ +from cuda.core.experimental import Device +from helpers.latch import LatchKernel +from memory_ipc.utility import TimestampedLogger, make_scratch_buffer, compare_buffers +import time + +ENABLE_LOGGING = False # Set True for test debugging and development +NBYTES = 64 + +def test_latchkernel(): + log = TimestampedLogger() + log("begin") + device = Device() + device.set_current() + stream = device.create_stream() + target = make_scratch_buffer(device, 0, NBYTES) + zeros = make_scratch_buffer(device, 0, NBYTES) + ones = make_scratch_buffer(device, 1, NBYTES) + latch = LatchKernel(device) + log("launching latch kernel") + latch.launch(stream) + log("launching copy (0->1) kernel") + target.copy_from(ones, stream=stream) + log("going to sleep") + time.sleep(1) + log("checking target == 0") + assert compare_buffers(target, zeros) == 0 + log("releasing latch and syncing") + latch.release() + stream.sync() + log("checking target == 1") + assert compare_buffers(target, ones) == 0 + log("done") + From 26678bf886e551afac0da7a5facfe0f11ff27f35 Mon Sep 17 00:00:00 2001 From: Andy Jost Date: Thu, 16 Oct 2025 16:00:55 -0700 Subject: [PATCH 2/9] Move contents of `memory_ipc/utility` module to `helpers`. Rename `IPCBufferTestHelper` to `PatternGen` and combine `flipped` and `starting_from` arguments to just `seed`. Rename `compare_buffers` to `compare_equal_buffers` and have it return a Boolean. --- .../utility.py => helpers/buffers.py} | 114 +++++------------- cuda_core/tests/helpers/logging.py | 49 ++++++++ cuda_core/tests/memory_ipc/test_ipc_event.py | 9 +- cuda_core/tests/memory_ipc/test_memory_ipc.py | 62 +++++----- .../tests/memory_ipc/test_send_buffers.py | 24 ++-- cuda_core/tests/memory_ipc/test_serialize.py | 59 ++++----- cuda_core/tests/memory_ipc/test_workerpool.py | 20 +-- cuda_core/tests/test_helpers.py | 40 +++++- cuda_core/tests/test_memory.py | 25 +--- 9 files changed, 207 insertions(+), 195 deletions(-) rename cuda_core/tests/{memory_ipc/utility.py => helpers/buffers.py} (52%) create mode 100644 cuda_core/tests/helpers/logging.py diff --git a/cuda_core/tests/memory_ipc/utility.py b/cuda_core/tests/helpers/buffers.py similarity index 52% rename from cuda_core/tests/memory_ipc/utility.py rename to cuda_core/tests/helpers/buffers.py index d364f694cb..cc15d607a1 100644 --- a/cuda_core/tests/memory_ipc/utility.py +++ b/cuda_core/tests/helpers/buffers.py @@ -3,7 +3,6 @@ import ctypes import sys -import time from cuda.core.experimental import Buffer, MemoryResource from cuda.core.experimental._utils.cuda_utils import driver, handle_return @@ -14,6 +13,9 @@ libc = ctypes.CDLL("libc.so.6") +__all__ = ["DummyUnifiedMemoryResource", "PatternGen", "make_scratch_buffer", "compare_equal_buffers"] + + class DummyUnifiedMemoryResource(MemoryResource): def __init__(self, device): self.device = device @@ -38,23 +40,16 @@ def device_id(self) -> int: return self.device -class IPCBufferTestHelper: +class PatternGen: """ - A helper for manipulating memory buffers in IPC tests. - - Provides methods to fill a target buffer with a known test pattern and + Provides methods to fill a target buffer with known test patterns and verify the expected values. If a stream is provided, operations are synchronized with respect to that stream. Otherwise, they are synchronized over the device. - The test pattern is either a fixed value or a cyclic function of the byte - offset. The cyclic pattern is based on two arguments: - - `flipped`: whether to flip (invert) each byte. - `starting_from`: an offset to start counting from. - - For a fixed pattern, specify `value`. This supercedes the above arguments. + The test pattern is either a fixed value or is generated from a 8-bit seed. + Only one of `value` or `seed` should be supplied. Distinct test patterns are stored in separate buffers called pattern buffers. Calls to `fill_buffer` copy from a pattern buffer to the target @@ -62,34 +57,31 @@ class IPCBufferTestHelper: buffer and then perform a comparison. """ - def __init__(self, device, buffer, stream=None): + def __init__(self, device, size, stream=None): self.device = device - self.buffer = buffer + self.size = size self.stream = stream if stream is not None else device.create_stream() self.sync_target = stream if stream is not None else device - self.scratch_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.buffer.size) + self.scratch_buffer = DummyUnifiedMemoryResource(self.device).allocate(size) self.pattern_buffers = {} - @property - def size(self): - """The buffer size in bytes.""" - return self.buffer.size - - def fill_buffer(self, flipped=False, starting_from=0, value=None, repeat=1, sync=True): + def fill_buffer(self, buffer, seed=None, value=None, repeat=1, sync=True): """Fill a device buffer with a sequential test pattern using unified memory.""" - pattern_buffer = self._get_pattern_buffer(flipped, starting_from, value) + assert buffer.size == self.size + pattern_buffer = self._get_pattern_buffer(seed, value) for _ in range(repeat): - self.buffer.copy_from(pattern_buffer, stream=self.stream) + buffer.copy_from(pattern_buffer, stream=self.stream) if sync: self.sync() - def verify_buffer(self, flipped=False, starting_from=0, value=None, repeat=1): + def verify_buffer(self, buffer, seed=None, value=None, repeat=1): """Verify the buffer contents against a sequential pattern.""" + assert buffer.size == self.size ptr_test = self._ptr(self.scratch_buffer) - pattern_buffer = self._get_pattern_buffer(flipped, starting_from, value) + pattern_buffer = self._get_pattern_buffer(seed, value) ptr_expected = self._ptr(pattern_buffer) for _ in range(repeat): - self.scratch_buffer.copy_from(self.buffer, stream=self.stream) + self.scratch_buffer.copy_from(buffer, stream=self.stream) self.sync() assert libc.memcmp(ptr_test, ptr_expected, self.size) == 0 @@ -102,10 +94,12 @@ def _ptr(buffer): """Get a pointer to the specified buffer.""" return ctypes.cast(int(buffer.handle), ctypes.POINTER(ctypes.c_ubyte)) - def _get_pattern_buffer(self, flipped, starting_from, value): + def _get_pattern_buffer(self, seed, value): """Get a buffer holding the specified test pattern.""" - assert value is None or (not flipped and starting_from == 0) - key = (value & 0xFF,) if value is not None else (flipped, starting_from) + assert seed is None or value is None + if value is None: + seed = (0 if seed is None else seed) & 0xFF + key = seed, value pattern_buffer = self.pattern_buffers.get(key, None) if pattern_buffer is None: if value is not None: @@ -113,13 +107,8 @@ def _get_pattern_buffer(self, flipped, starting_from, value): else: pattern_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.size) ptr = self._ptr(pattern_buffer) - pattern = lambda i: (starting_from + i) & 0xFF # noqa: E731 - if flipped: - for i in range(self.size): - ptr[i] = ~pattern(i) - else: - for i in range(self.size): - ptr[i] = pattern(i) + for i in range(self.size): + ptr[i] = (seed + i) & 0xFF self.pattern_buffers[key] = pattern_buffer return pattern_buffer @@ -132,55 +121,12 @@ def make_scratch_buffer(device, value, nbytes): return buffer -def compare_buffers(buffer1, buffer2): - """Compare the contents of two host-accessible buffers a la memcmp.""" - assert buffer1.size == buffer2.size +def compare_equal_buffers(buffer1, buffer2): + """Compare the contents of two host-accessible buffers for bitwise equality.""" + if buffer1.size != buffer2.size: + return False ptr1 = ctypes.cast(int(buffer1.handle), ctypes.POINTER(ctypes.c_byte)) ptr2 = ctypes.cast(int(buffer2.handle), ctypes.POINTER(ctypes.c_byte)) - return libc.memcmp(ptr1, ptr2, buffer1.size) - - -class TimestampedLogger: - """ - A logger that prefixes each output with a timestamp, containing the elapsed - time since the logger was created. - - Example: - - import multiprocess as mp - import time - - def main(): - log = TimestampedLogger(prefix="parent: ") - log("begin") - process = mp.Process(target=child_main, args=(log,)) - process.start() - process.join() - log("done") - - def child_main(log): - log.prefix = " child: " - log("begin") - time.sleep(1) - log("done") - - if __name__ == "__main__": - main() - - Possible output: - - [ 0.003 ms] parent: begin - [ 819.464 ms] child: begin - [ 1819.666 ms] child: done - [ 1882.954 ms] parent: done - """ + return libc.memcmp(ptr1, ptr2, buffer1.size) == 0 - def __init__(self, prefix=None, start_time=None, enabled=True): - self.prefix = "" if prefix is None else prefix - self.start_time = start_time if start_time is not None else time.time_ns() - self.enabled = enabled - def __call__(self, msg): - if self.enabled: - now = (time.time_ns() - self.start_time) * 1e-6 - print(f"[{now:>10.3f} ms] {self.prefix}{msg}") diff --git a/cuda_core/tests/helpers/logging.py b/cuda_core/tests/helpers/logging.py new file mode 100644 index 0000000000..6c376770c7 --- /dev/null +++ b/cuda_core/tests/helpers/logging.py @@ -0,0 +1,49 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import time + +class TimestampedLogger: + """ + A logger that prefixes each output with a timestamp, containing the elapsed + time since the logger was created. + + Example: + + import multiprocess as mp + import time + + def main(): + log = TimestampedLogger(prefix="parent: ") + log("begin") + process = mp.Process(target=child_main, args=(log,)) + process.start() + process.join() + log("done") + + def child_main(log): + log.prefix = " child: " + log("begin") + time.sleep(1) + log("done") + + if __name__ == "__main__": + main() + + Possible output: + + [ 0.003 ms] parent: begin + [ 819.464 ms] child: begin + [ 1819.666 ms] child: done + [ 1882.954 ms] parent: done + """ + + def __init__(self, prefix=None, start_time=None, enabled=True): + self.prefix = "" if prefix is None else prefix + self.start_time = start_time if start_time is not None else time.time_ns() + self.enabled = enabled + + def __call__(self, msg): + if self.enabled: + now = (time.time_ns() - self.start_time) * 1e-6 + print(f"[{now:>10.3f} ms] {self.prefix}{msg}") diff --git a/cuda_core/tests/memory_ipc/test_ipc_event.py b/cuda_core/tests/memory_ipc/test_ipc_event.py index a589df6e05..387be5b563 100644 --- a/cuda_core/tests/memory_ipc/test_ipc_event.py +++ b/cuda_core/tests/memory_ipc/test_ipc_event.py @@ -3,11 +3,12 @@ from conftest import skipif_need_cuda_headers from cuda.core.experimental import Device, DeviceMemoryResource, DeviceMemoryResourceOptions, EventOptions -from utility import TimestampedLogger, make_scratch_buffer, compare_buffers +from helpers.buffers import make_scratch_buffer, compare_equal_buffers from helpers.latch import LatchKernel +from helpers.logging import TimestampedLogger import ctypes -import pytest import multiprocessing as mp +import pytest import time ENABLE_LOGGING = False # Set True for test debugging and development @@ -71,7 +72,7 @@ def test_main(self, ipc_device): # Finish up. target.copy_from(buffer, stream=stream1) stream1.sync() - assert compare_buffers(target, twos) == 0 + assert compare_equal_buffers(target, twos) elapsed_ms = e_copy_end - e_copy_start log(f"Elapsed time for {NCOPIES} copies: {int(elapsed_ms)} ms") @@ -154,7 +155,7 @@ def test_main(self, ipc_device): # Finish up. target.copy_from(buffer, stream=stream1) stream1.sync() - assert compare_buffers(target, twos) == 0 + assert compare_equal_buffers(target, twos) def child_main(self, log, q_in, q_out): diff --git a/cuda_core/tests/memory_ipc/test_memory_ipc.py b/cuda_core/tests/memory_ipc/test_memory_ipc.py index c980a7ad84..f45aec1111 100644 --- a/cuda_core/tests/memory_ipc/test_memory_ipc.py +++ b/cuda_core/tests/memory_ipc/test_memory_ipc.py @@ -1,12 +1,10 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -import multiprocessing as mp - from cuda.core.experimental import Buffer, DeviceMemoryResource -from utility import IPCBufferTestHelper - from cuda_python_test_helpers import supports_ipc_mempool +from helpers.buffers import PatternGen +import multiprocessing as mp CHILD_TIMEOUT_SEC = 20 NBYTES = 64 @@ -24,6 +22,7 @@ def test_main(self, ipc_device, ipc_memory_resource): # Set up the IPC-enabled memory pool and share it. device = ipc_device mr = ipc_memory_resource + pgen = PatternGen(device, NBYTES) # Start the child process. queue = mp.Queue() @@ -32,8 +31,7 @@ def test_main(self, ipc_device, ipc_memory_resource): # Allocate and fill memory. buffer = mr.allocate(NBYTES) - helper = IPCBufferTestHelper(device, buffer) - helper.fill_buffer(flipped=False) + pgen.fill_buffer(buffer, seed=False) # Export the buffer via IPC. queue.put(buffer) @@ -43,15 +41,15 @@ def test_main(self, ipc_device, ipc_memory_resource): assert process.exitcode == 0 # Verify that the buffer was modified. - helper.verify_buffer(flipped=True) + pgen.verify_buffer(buffer, seed=True) buffer.close() def child_main(self, device, mr, queue): device.set_current() buffer = queue.get(timeout=CHILD_TIMEOUT_SEC) - helper = IPCBufferTestHelper(device, buffer) - helper.verify_buffer(flipped=False) - helper.fill_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.verify_buffer(buffer, seed=False) + pgen.fill_buffer(buffer, seed=True) buffer.close() @@ -88,21 +86,23 @@ def test_main(self, ipc_device, ipc_memory_resource): assert p2.exitcode == 0 # Verify that the buffers were modified. - IPCBufferTestHelper(device, buffer1).verify_buffer(flipped=False) - IPCBufferTestHelper(device, buffer2).verify_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.verify_buffer(buffer1, seed=1) + pgen.verify_buffer(buffer2, seed=2) buffer1.close() buffer2.close() - def child_main(self, device, mr, idx, queue): + def child_main(self, device, mr, seed, queue): # Note: passing the mr registers it so that buffers can be passed # directly. device.set_current() buffer1 = queue.get(timeout=CHILD_TIMEOUT_SEC) buffer2 = queue.get(timeout=CHILD_TIMEOUT_SEC) - if idx == 1: - IPCBufferTestHelper(device, buffer1).fill_buffer(flipped=False) - elif idx == 2: - IPCBufferTestHelper(device, buffer2).fill_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + if seed == 1: + pgen.fill_buffer(buffer1, seed=1) + elif seed == 2: + pgen.fill_buffer(buffer2, seed=2) buffer1.close() buffer2.close() @@ -124,8 +124,8 @@ def test_main(self, ipc_device, ipc_memory_resource): # Start children. q1, q2 = (mp.Queue() for _ in range(2)) - p1 = mp.Process(target=self.child_main, args=(device, alloc_handle, 1, q1)) - p2 = mp.Process(target=self.child_main, args=(device, alloc_handle, 2, q2)) + p1 = mp.Process(target=self.child_main, args=(device, alloc_handle, False, q1)) + p2 = mp.Process(target=self.child_main, args=(device, alloc_handle, True, q2)) p1.start() p2.start() @@ -142,12 +142,13 @@ def test_main(self, ipc_device, ipc_memory_resource): assert p2.exitcode == 0 # Verify results. - IPCBufferTestHelper(device, buffer1).verify_buffer(starting_from=1) - IPCBufferTestHelper(device, buffer2).verify_buffer(starting_from=2) + pgen = PatternGen(device, NBYTES) + pgen.verify_buffer(buffer1, seed=False) + pgen.verify_buffer(buffer2, seed=True) buffer1.close() buffer2.close() - def child_main(self, device, alloc_handle, idx, queue): + def child_main(self, device, alloc_handle, seed, queue): """Fills a shared memory buffer.""" # In this case, the device needs to be set up (passing the mr does it # implicitly in other tests). @@ -155,7 +156,8 @@ def child_main(self, device, alloc_handle, idx, queue): mr = DeviceMemoryResource.from_allocation_handle(device, alloc_handle) buffer_descriptor = queue.get(timeout=CHILD_TIMEOUT_SEC) buffer = Buffer.from_ipc_descriptor(mr, buffer_descriptor) - IPCBufferTestHelper(device, buffer).fill_buffer(starting_from=idx) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=seed) buffer.close() @@ -171,8 +173,8 @@ def test_main(self, ipc_device, ipc_memory_resource): # Start children. q1, q2 = (mp.Queue() for _ in range(2)) - p1 = mp.Process(target=self.child_main, args=(device, alloc_handle, 1, q1)) - p2 = mp.Process(target=self.child_main, args=(device, alloc_handle, 2, q2)) + p1 = mp.Process(target=self.child_main, args=(device, alloc_handle, False, q1)) + p2 = mp.Process(target=self.child_main, args=(device, alloc_handle, True, q2)) p1.start() p2.start() @@ -189,12 +191,13 @@ def test_main(self, ipc_device, ipc_memory_resource): assert p2.exitcode == 0 # Verify results. - IPCBufferTestHelper(device, buffer1).verify_buffer(starting_from=1) - IPCBufferTestHelper(device, buffer2).verify_buffer(starting_from=2) + pgen = PatternGen(device, NBYTES) + pgen.verify_buffer(buffer1, seed=False) + pgen.verify_buffer(buffer2, seed=True) buffer1.close() buffer2.close() - def child_main(self, device, alloc_handle, idx, queue): + def child_main(self, device, alloc_handle, seed, queue): """Fills a shared memory buffer.""" device.set_current() @@ -203,5 +206,6 @@ def child_main(self, device, alloc_handle, idx, queue): # Now get buffers. buffer = queue.get(timeout=CHILD_TIMEOUT_SEC) - IPCBufferTestHelper(device, buffer).fill_buffer(starting_from=idx) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=seed) buffer.close() diff --git a/cuda_core/tests/memory_ipc/test_send_buffers.py b/cuda_core/tests/memory_ipc/test_send_buffers.py index 19685a94f7..7f46d1f084 100644 --- a/cuda_core/tests/memory_ipc/test_send_buffers.py +++ b/cuda_core/tests/memory_ipc/test_send_buffers.py @@ -6,7 +6,7 @@ import pytest from cuda.core.experimental import DeviceMemoryResource, DeviceMemoryResourceOptions -from utility import IPCBufferTestHelper +from helpers.buffers import PatternGen from cuda_python_test_helpers import supports_ipc_mempool @@ -29,18 +29,12 @@ def test_ipc_send_buffers(ipc_device, nmrs): # Allocate and fill memory. buffers = [mr.allocate(NBYTES) for mr, _ in zip(cycle(mrs), range(NTASKS))] + pgen = PatternGen(device, NBYTES) for buffer in buffers: - helper = IPCBufferTestHelper(device, buffer) - helper.fill_buffer(flipped=False) + pgen.fill_buffer(buffer, seed=False) # Start the child process. - process = mp.Process( - target=child_main, - args=( - device, - buffers, - ), - ) + process = mp.Process(target=child_main, args=(device, buffers)) process.start() # Wait for the child process. @@ -48,16 +42,16 @@ def test_ipc_send_buffers(ipc_device, nmrs): assert process.exitcode == 0 # Verify that the buffers were modified. + pgen = PatternGen(device, NBYTES) for buffer in buffers: - helper = IPCBufferTestHelper(device, buffer) - helper.verify_buffer(flipped=True) + pgen.verify_buffer(buffer, seed=True) buffer.close() def child_main(device, buffers): device.set_current() + pgen = PatternGen(device, NBYTES) for buffer in buffers: - helper = IPCBufferTestHelper(device, buffer) - helper.verify_buffer(flipped=False) - helper.fill_buffer(flipped=True) + pgen.verify_buffer(buffer, seed=False) + pgen.fill_buffer(buffer, seed=True) buffer.close() diff --git a/cuda_core/tests/memory_ipc/test_serialize.py b/cuda_core/tests/memory_ipc/test_serialize.py index 215538ae68..ceac50e502 100644 --- a/cuda_core/tests/memory_ipc/test_serialize.py +++ b/cuda_core/tests/memory_ipc/test_serialize.py @@ -6,7 +6,7 @@ import os from cuda.core.experimental import Buffer, Device, DeviceMemoryResource -from utility import IPCBufferTestHelper +from helpers.buffers import PatternGen CHILD_TIMEOUT_SEC = 20 NBYTES = 64 @@ -46,8 +46,9 @@ def test_main(self, ipc_device, ipc_memory_resource): assert process.exitcode == 0 # Confirm buffers were modified. - IPCBufferTestHelper(device, buffer1).verify_buffer(flipped=True) - IPCBufferTestHelper(device, buffer2).verify_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.verify_buffer(buffer1, seed=True) + pgen.verify_buffer(buffer2, seed=True) buffer1.close() buffer2.close() @@ -67,8 +68,9 @@ def child_main(self, conn): buffer2 = Buffer.from_ipc_descriptor(mr, buffer_desc) # by descriptor # Modify the buffers. - IPCBufferTestHelper(device, buffer1).fill_buffer(flipped=True) - IPCBufferTestHelper(device, buffer2).fill_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer1, seed=True) + pgen.fill_buffer(buffer2, seed=True) buffer1.close() buffer2.close() @@ -100,7 +102,8 @@ def test_main(self, ipc_device, ipc_memory_resource): assert process.exitcode == 0 # Confirm buffer was modified. - IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.verify_buffer(buffer, seed=True) buffer.close() def child_main(self, pipe, _): @@ -114,7 +117,8 @@ def child_main(self, pipe, _): # Buffer. buffer = pipe[0].get(timeout=CHILD_TIMEOUT_SEC) assert buffer.memory_resource.handle == mr.handle - IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=True) buffer.close() @@ -134,8 +138,8 @@ def test_object_passing(ipc_device, ipc_memory_resource): buffer = mr.allocate(NBYTES) buffer_desc = buffer.get_ipc_descriptor() - helper = IPCBufferTestHelper(device, buffer) - helper.fill_buffer(flipped=False) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=False) # Start the child process. process = mp.Process(target=child_main, args=(alloc_handle, mr, buffer_desc, buffer)) @@ -143,7 +147,7 @@ def test_object_passing(ipc_device, ipc_memory_resource): process.join(timeout=CHILD_TIMEOUT_SEC) assert process.exitcode == 0 - helper.verify_buffer(flipped=True) + pgen.verify_buffer(buffer, seed=True) buffer.close() @@ -151,40 +155,37 @@ def child_main(alloc_handle, mr1, buffer_desc, buffer1): device = Device() device.set_current() mr2 = DeviceMemoryResource.from_allocation_handle(device, alloc_handle) + pgen = PatternGen(device, NBYTES) # OK to build the buffer from either mr and the descriptor. # All buffer* objects point to the same memory. buffer2 = Buffer.from_ipc_descriptor(mr1, buffer_desc) buffer3 = Buffer.from_ipc_descriptor(mr2, buffer_desc) - helper1 = IPCBufferTestHelper(device, buffer1) - helper2 = IPCBufferTestHelper(device, buffer2) - helper3 = IPCBufferTestHelper(device, buffer3) - - helper1.verify_buffer(flipped=False) - helper2.verify_buffer(flipped=False) - helper3.verify_buffer(flipped=False) + pgen.verify_buffer(buffer1, seed=False) + pgen.verify_buffer(buffer2, seed=False) + pgen.verify_buffer(buffer3, seed=False) # Modify 1. - helper1.fill_buffer(flipped=True) + pgen.fill_buffer(buffer1, seed=True) - helper1.verify_buffer(flipped=True) - helper2.verify_buffer(flipped=True) - helper3.verify_buffer(flipped=True) + pgen.verify_buffer(buffer1, seed=True) + pgen.verify_buffer(buffer2, seed=True) + pgen.verify_buffer(buffer3, seed=True) # Modify 2. - helper2.fill_buffer(flipped=False) + pgen.fill_buffer(buffer2, seed=False) - helper1.verify_buffer(flipped=False) - helper2.verify_buffer(flipped=False) - helper3.verify_buffer(flipped=False) + pgen.verify_buffer(buffer1, seed=False) + pgen.verify_buffer(buffer2, seed=False) + pgen.verify_buffer(buffer3, seed=False) # Modify 3. - helper3.fill_buffer(flipped=True) + pgen.fill_buffer(buffer3, seed=True) - helper1.verify_buffer(flipped=True) - helper2.verify_buffer(flipped=True) - helper3.verify_buffer(flipped=True) + pgen.verify_buffer(buffer1, seed=True) + pgen.verify_buffer(buffer2, seed=True) + pgen.verify_buffer(buffer3, seed=True) # Close any one buffer. buffer1.close() diff --git a/cuda_core/tests/memory_ipc/test_workerpool.py b/cuda_core/tests/memory_ipc/test_workerpool.py index 6372b5668d..190974a69a 100644 --- a/cuda_core/tests/memory_ipc/test_workerpool.py +++ b/cuda_core/tests/memory_ipc/test_workerpool.py @@ -7,7 +7,7 @@ import pytest from cuda.core.experimental import Buffer, Device, DeviceMemoryResource, DeviceMemoryResourceOptions -from utility import IPCBufferTestHelper +from helpers.buffers import PatternGen from cuda_python_test_helpers import supports_ipc_mempool @@ -40,14 +40,16 @@ def test_main(self, ipc_device, nmrs): with mp.Pool(NWORKERS) as pool: pool.map(self.process_buffer, buffers) + pgen = PatternGen(device, NBYTES) for buffer in buffers: - IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True) + pgen.verify_buffer(buffer, seed=True) buffer.close() def process_buffer(self, buffer): device = Device(buffer.memory_resource.device_id) device.set_current() - IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=True) buffer.close() @@ -79,8 +81,9 @@ def test_main(self, ipc_device, nmrs): [(mrs.index(buffer.memory_resource), buffer.get_ipc_descriptor()) for buffer in buffers], ) + pgen = PatternGen(device, NBYTES) for buffer in buffers: - IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True) + pgen.verify_buffer(buffer, seed=True) buffer.close() def process_buffer(self, mr_idx, buffer_desc): @@ -88,7 +91,8 @@ def process_buffer(self, mr_idx, buffer_desc): device = Device(mr.device_id) device.set_current() buffer = Buffer.from_ipc_descriptor(mr, buffer_desc) - IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=True) buffer.close() @@ -120,12 +124,14 @@ def test_main(self, ipc_device, nmrs): with mp.Pool(NWORKERS, initializer=self.init_worker, initargs=(mrs,)) as pool: pool.starmap(self.process_buffer, [(device, pickle.dumps(buffer)) for buffer in buffers]) + pgen = PatternGen(device, NBYTES) for buffer in buffers: - IPCBufferTestHelper(device, buffer).verify_buffer(flipped=True) + pgen.verify_buffer(buffer, seed=True) buffer.close() def process_buffer(self, device, buffer_s): device.set_current() buffer = pickle.loads(buffer_s) # noqa: S301 - IPCBufferTestHelper(device, buffer).fill_buffer(flipped=True) + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=True) buffer.close() diff --git a/cuda_core/tests/test_helpers.py b/cuda_core/tests/test_helpers.py index 3bc781f5f3..cf709ea9dd 100644 --- a/cuda_core/tests/test_helpers.py +++ b/cuda_core/tests/test_helpers.py @@ -1,12 +1,19 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE + from cuda.core.experimental import Device from helpers.latch import LatchKernel -from memory_ipc.utility import TimestampedLogger, make_scratch_buffer, compare_buffers +from helpers.logging import TimestampedLogger +from helpers.buffers import make_scratch_buffer, compare_equal_buffers, PatternGen import time +import pytest ENABLE_LOGGING = False # Set True for test debugging and development NBYTES = 64 def test_latchkernel(): + """Test LatchKernel.""" log = TimestampedLogger() log("begin") device = Device() @@ -23,11 +30,38 @@ def test_latchkernel(): log("going to sleep") time.sleep(1) log("checking target == 0") - assert compare_buffers(target, zeros) == 0 + assert compare_equal_buffers(target, zeros) log("releasing latch and syncing") latch.release() stream.sync() log("checking target == 1") - assert compare_buffers(target, ones) == 0 + assert compare_equal_buffers(target, ones) log("done") +def test_patterngen_seeds(): + """Test PatternGen with seed argument.""" + device = Device() + device.set_current() + buffer = make_scratch_buffer(device, 0, NBYTES) + + # All seeds are pairwise different. + pgen = PatternGen(device, NBYTES) + for i in range(256): + pgen.fill_buffer(buffer, seed=i) + pgen.verify_buffer(buffer, seed=i) + for j in range(i+1, 256): + with pytest.raises(AssertionError): + pgen.verify_buffer(buffer, seed=j) + +def test_patterngen_values(): + """Test PatternGen with value argument, also compare_equal_buffers.""" + device = Device() + device.set_current() + ones = make_scratch_buffer(device, 1, NBYTES) + twos = make_scratch_buffer(device, 2, NBYTES) + assert compare_equal_buffers(ones, ones) + assert not compare_equal_buffers(ones, twos) + pgen = PatternGen(device, NBYTES) + pgen.verify_buffer(ones, value=1) + pgen.verify_buffer(twos, value=2) + diff --git a/cuda_core/tests/test_memory.py b/cuda_core/tests/test_memory.py index 904997f116..b2a15181c1 100644 --- a/cuda_core/tests/test_memory.py +++ b/cuda_core/tests/test_memory.py @@ -13,6 +13,7 @@ np = None import ctypes import platform +from helpers.buffers import DummyUnifiedMemoryResource import pytest from cuda.core.experimental import ( @@ -95,30 +96,6 @@ def device_id(self) -> int: raise RuntimeError("the pinned memory resource is not bound to any GPU") -class DummyUnifiedMemoryResource(MemoryResource): - def __init__(self, device): - self.device = device - - def allocate(self, size, stream=None) -> Buffer: - ptr = handle_return(driver.cuMemAllocManaged(size, driver.CUmemAttach_flags.CU_MEM_ATTACH_GLOBAL.value)) - return Buffer.from_handle(ptr=ptr, size=size, mr=self) - - def deallocate(self, ptr, size, stream=None): - handle_return(driver.cuMemFree(ptr)) - - @property - def is_device_accessible(self) -> bool: - return True - - @property - def is_host_accessible(self) -> bool: - return True - - @property - def device_id(self) -> int: - return 0 - - class DummyPinnedMemoryResource(MemoryResource): def __init__(self, device): self.device = device From 556a9adcee91e42546393562a97d796e78a2b1d8 Mon Sep 17 00:00:00 2001 From: Andy Jost Date: Thu, 16 Oct 2025 16:14:43 -0700 Subject: [PATCH 3/9] Remove redundant `supports_ipc_mempool` checks. --- cuda_core/tests/memory_ipc/test_errors.py | 6 ------ cuda_core/tests/memory_ipc/test_memory_ipc.py | 13 ------------- cuda_core/tests/memory_ipc/test_send_buffers.py | 4 ---- cuda_core/tests/memory_ipc/test_workerpool.py | 8 -------- 4 files changed, 31 deletions(-) diff --git a/cuda_core/tests/memory_ipc/test_errors.py b/cuda_core/tests/memory_ipc/test_errors.py index d1a235603d..3e8265b39c 100644 --- a/cuda_core/tests/memory_ipc/test_errors.py +++ b/cuda_core/tests/memory_ipc/test_errors.py @@ -8,8 +8,6 @@ from cuda.core.experimental import Buffer, Device, DeviceMemoryResource, DeviceMemoryResourceOptions from cuda.core.experimental._utils.cuda_utils import CUDAError -from cuda_python_test_helpers import supports_ipc_mempool - CHILD_TIMEOUT_SEC = 20 NBYTES = 64 POOL_SIZE = 2097152 @@ -20,10 +18,6 @@ class ChildErrorHarness: PARENT_ACTION, CHILD_ACTION, and ASSERT (see below for examples).""" def test_main(self, ipc_device, ipc_memory_resource): - if not supports_ipc_mempool(ipc_device): - import pytest - - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") """Parent process that checks child errors.""" # Attach fixtures to this object for convenience. These can be accessed # from PARENT_ACTION. diff --git a/cuda_core/tests/memory_ipc/test_memory_ipc.py b/cuda_core/tests/memory_ipc/test_memory_ipc.py index f45aec1111..dd1f8967e1 100644 --- a/cuda_core/tests/memory_ipc/test_memory_ipc.py +++ b/cuda_core/tests/memory_ipc/test_memory_ipc.py @@ -2,7 +2,6 @@ # SPDX-License-Identifier: Apache-2.0 from cuda.core.experimental import Buffer, DeviceMemoryResource -from cuda_python_test_helpers import supports_ipc_mempool from helpers.buffers import PatternGen import multiprocessing as mp @@ -14,10 +13,6 @@ class TestIpcMempool: def test_main(self, ipc_device, ipc_memory_resource): - if not supports_ipc_mempool(ipc_device): - import pytest - - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") """Test IPC with memory pools.""" # Set up the IPC-enabled memory pool and share it. device = ipc_device @@ -55,10 +50,6 @@ def child_main(self, device, mr, queue): class TestIPCMempoolMultiple: def test_main(self, ipc_device, ipc_memory_resource): - if not supports_ipc_mempool(ipc_device): - import pytest - - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") """Test IPC with memory pools using multiple processes.""" # Construct an IPC-enabled memory resource and share it with two children. device = ipc_device @@ -109,10 +100,6 @@ def child_main(self, device, mr, seed, queue): class TestIPCSharedAllocationHandleAndBufferDescriptors: def test_main(self, ipc_device, ipc_memory_resource): - if not supports_ipc_mempool(ipc_device): - import pytest - - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") """ Demonstrate that a memory pool allocation handle can be reused for IPC with multiple processes. Uses buffer descriptors. diff --git a/cuda_core/tests/memory_ipc/test_send_buffers.py b/cuda_core/tests/memory_ipc/test_send_buffers.py index 7f46d1f084..75a4651925 100644 --- a/cuda_core/tests/memory_ipc/test_send_buffers.py +++ b/cuda_core/tests/memory_ipc/test_send_buffers.py @@ -8,8 +8,6 @@ from cuda.core.experimental import DeviceMemoryResource, DeviceMemoryResourceOptions from helpers.buffers import PatternGen -from cuda_python_test_helpers import supports_ipc_mempool - CHILD_TIMEOUT_SEC = 20 NBYTES = 64 NMRS = 3 @@ -20,8 +18,6 @@ @pytest.mark.parametrize("nmrs", (1, NMRS)) def test_ipc_send_buffers(ipc_device, nmrs): """Test passing buffers sourced from multiple memory resources.""" - if not supports_ipc_mempool(ipc_device): - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") # Set up several IPC-enabled memory pools. device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) diff --git a/cuda_core/tests/memory_ipc/test_workerpool.py b/cuda_core/tests/memory_ipc/test_workerpool.py index 190974a69a..3f3f46cd27 100644 --- a/cuda_core/tests/memory_ipc/test_workerpool.py +++ b/cuda_core/tests/memory_ipc/test_workerpool.py @@ -9,8 +9,6 @@ from cuda.core.experimental import Buffer, Device, DeviceMemoryResource, DeviceMemoryResourceOptions from helpers.buffers import PatternGen -from cuda_python_test_helpers import supports_ipc_mempool - CHILD_TIMEOUT_SEC = 20 NBYTES = 64 NWORKERS = 2 @@ -30,8 +28,6 @@ class TestIpcWorkerPool: @pytest.mark.parametrize("nmrs", (1, NMRS)) def test_main(self, ipc_device, nmrs): - if not supports_ipc_mempool(ipc_device): - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] @@ -68,8 +64,6 @@ def init_worker(mrs): @pytest.mark.parametrize("nmrs", (1, NMRS)) def test_main(self, ipc_device, nmrs): - if not supports_ipc_mempool(ipc_device): - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] @@ -114,8 +108,6 @@ def init_worker(mrs): @pytest.mark.parametrize("nmrs", (1, NMRS)) def test_main(self, ipc_device, nmrs): - if not supports_ipc_mempool(ipc_device): - pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") device = ipc_device options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) mrs = [DeviceMemoryResource(device, options=options) for _ in range(nmrs)] From 88c25c46c066e9d5bbde2a4c1187d99cff9af3e7 Mon Sep 17 00:00:00 2001 From: Andy Jost Date: Thu, 16 Oct 2025 16:16:58 -0700 Subject: [PATCH 4/9] Rename `test_ipc_event.py` to `test_event_ipc.py` to align with other tests. --- .../tests/memory_ipc/{test_ipc_event.py => test_event_ipc.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename cuda_core/tests/memory_ipc/{test_ipc_event.py => test_event_ipc.py} (100%) diff --git a/cuda_core/tests/memory_ipc/test_ipc_event.py b/cuda_core/tests/memory_ipc/test_event_ipc.py similarity index 100% rename from cuda_core/tests/memory_ipc/test_ipc_event.py rename to cuda_core/tests/memory_ipc/test_event_ipc.py From 438ebd40947843c612f3b50a067bc920a98f456f Mon Sep 17 00:00:00 2001 From: Andy Jost Date: Thu, 16 Oct 2025 16:20:09 -0700 Subject: [PATCH 5/9] Remove the copy-based IPC event test, which is less efficient than the latch-based test. --- cuda_core/tests/memory_ipc/test_event_ipc.py | 96 +------------------- 1 file changed, 4 insertions(+), 92 deletions(-) diff --git a/cuda_core/tests/memory_ipc/test_event_ipc.py b/cuda_core/tests/memory_ipc/test_event_ipc.py index 387be5b563..f8208baa61 100644 --- a/cuda_core/tests/memory_ipc/test_event_ipc.py +++ b/cuda_core/tests/memory_ipc/test_event_ipc.py @@ -13,105 +13,17 @@ ENABLE_LOGGING = False # Set True for test debugging and development CHILD_TIMEOUT_SEC = 20 -NBYTES = 1024*1024*1024 -POOL_SIZE = NBYTES -NCOPIES = 500 +NBYTES = 64 -class TestIpcEvent: - """Check the basic usage of IPC-enabled events.""" - - def test_main(self, ipc_device): - log = TimestampedLogger(prefix="parent: ", enabled=ENABLE_LOGGING) - device = ipc_device - stream1 = device.create_stream() - mr_options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) - mr = DeviceMemoryResource(device, options=mr_options) - - # Start the child process. - q_out, q_in = [mp.Queue() for _ in range(2)] - process = mp.Process(target=self.child_main, args=(log, q_out, q_in)) - process.start() - - # Prepare scratch buffers. - target = make_scratch_buffer(device, 0, NBYTES) - ones = make_scratch_buffer(device, 1, NBYTES) - twos = make_scratch_buffer(device, 2, NBYTES) - - # Allocate the buffer and send it to the child. - buffer = mr.allocate(NBYTES, stream=stream1) - log("sending buffer") - q_out.put(buffer) - - e_copy_start, e_copy_end = [device.create_event({"enable_timing": True}) for _ in range(2)] - - # Stream 1: - stream1.record(e_copy_start) - log("begin enqueuing copies on stream1") - for i in range(NCOPIES): - buffer.copy_from(ones, stream=stream1) - if i % 100 == 0: - log(f"{i:>3}/{NCOPIES}") - stream1.record(e_copy_end) - log("done enqueuing copies") - - ipc_event_options = EventOptions(ipc_enabled=True) - e = stream1.record(options=ipc_event_options) - log(f"recorded event ({hex(e.handle)})") - q_out.put(e) - log("sent event") - - # Wait on the child. - log("waiting for copies") - e_copy_end.sync() - parent_done_copying_timestamp = time.time_ns() - log("done copying") - process.join() - assert process.exitcode == 0 - log("done") - - # Finish up. - target.copy_from(buffer, stream=stream1) - stream1.sync() - assert compare_equal_buffers(target, twos) - elapsed_ms = e_copy_end - e_copy_start - log(f"Elapsed time for {NCOPIES} copies: {int(elapsed_ms)} ms") - - # Make sure the child finished enqueuing its work before the copies finished; - # otherwise the test has no meaning. If this trips, adjust NCOPIES and/or - # NBYTES. - child_done_enqueuing_timestamp = q_in.get(timeout=CHILD_TIMEOUT_SEC) - assert child_done_enqueuing_timestamp < parent_done_copying_timestamp - - - def child_main(self, log, q_in, q_out): - log.prefix = " child: " - log("ready") - device = Device() - device.set_current() - stream2 = device.create_stream() - twos = make_scratch_buffer(device, 2, NBYTES) - buffer = q_in.get(timeout=CHILD_TIMEOUT_SEC) - log("got buffer") - e = q_in.get(timeout=CHILD_TIMEOUT_SEC) - log(f"got event ({hex(e.handle)})") - stream2.wait(e) - log("enqueuing copy on stream2") - buffer.copy_from(twos, stream=stream2) - q_out.put(time.time_ns()) # Time when enqueuing finished - log("waiting") - stream2.sync() - log("done") - -class TestIpcEventWithLatch: +class TestEventIpc: """Check the basic usage of IPC-enabled events with a latch kernel.""" @skipif_need_cuda_headers # libcu++ - def test_main(self, ipc_device): + def test_main(self, ipc_device, ipc_memory_resource): log = TimestampedLogger(prefix="parent: ", enabled=ENABLE_LOGGING) device = ipc_device + mr = ipc_memory_resource stream1 = device.create_stream() - mr_options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) - mr = DeviceMemoryResource(device, options=mr_options) # Start the child process. q_out, q_in = [mp.Queue() for _ in range(2)] From 02975e9e2428bd03d616660e6cd35f409c67e14b Mon Sep 17 00:00:00 2001 From: Andy Jost Date: Fri, 17 Oct 2025 09:27:16 -0700 Subject: [PATCH 6/9] Simplify PatternGen. --- cuda_core/tests/helpers/buffers.py | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/cuda_core/tests/helpers/buffers.py b/cuda_core/tests/helpers/buffers.py index cc15d607a1..c5c569aebe 100644 --- a/cuda_core/tests/helpers/buffers.py +++ b/cuda_core/tests/helpers/buffers.py @@ -48,10 +48,10 @@ class PatternGen: If a stream is provided, operations are synchronized with respect to that stream. Otherwise, they are synchronized over the device. - The test pattern is either a fixed value or is generated from a 8-bit seed. - Only one of `value` or `seed` should be supplied. + The test pattern is either a fixed value or a cyclic pattern generated from + an 8-bit seed. Only one of `value` or `seed` should be supplied. - Distinct test patterns are stored in separate buffers called pattern + Distinct test patterns are stored in private buffers called pattern buffers. Calls to `fill_buffer` copy from a pattern buffer to the target buffer. Calls to `verify_buffer` copy from the target buffer to a scratch buffer and then perform a comparison. @@ -62,32 +62,25 @@ def __init__(self, device, size, stream=None): self.size = size self.stream = stream if stream is not None else device.create_stream() self.sync_target = stream if stream is not None else device - self.scratch_buffer = DummyUnifiedMemoryResource(self.device).allocate(size) self.pattern_buffers = {} - def fill_buffer(self, buffer, seed=None, value=None, repeat=1, sync=True): + def fill_buffer(self, buffer, seed=None, value=None): """Fill a device buffer with a sequential test pattern using unified memory.""" assert buffer.size == self.size pattern_buffer = self._get_pattern_buffer(seed, value) - for _ in range(repeat): - buffer.copy_from(pattern_buffer, stream=self.stream) - if sync: - self.sync() + buffer.copy_from(pattern_buffer, stream=self.stream) - def verify_buffer(self, buffer, seed=None, value=None, repeat=1): + def verify_buffer(self, buffer, seed=None, value=None): """Verify the buffer contents against a sequential pattern.""" assert buffer.size == self.size - ptr_test = self._ptr(self.scratch_buffer) + scratch_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.size) + ptr_test = self._ptr(scratch_buffer) pattern_buffer = self._get_pattern_buffer(seed, value) ptr_expected = self._ptr(pattern_buffer) - for _ in range(repeat): - self.scratch_buffer.copy_from(buffer, stream=self.stream) - self.sync() - assert libc.memcmp(ptr_test, ptr_expected, self.size) == 0 - - def sync(self): - """Synchronize against the sync target (a stream or device).""" + scratch_buffer.copy_from(buffer, stream=self.stream) self.sync_target.sync() + assert libc.memcmp(ptr_test, ptr_expected, self.size) == 0 + @staticmethod def _ptr(buffer): From f98a286bac6057bc12125a6fdc6bb5e8f1af1d68 Mon Sep 17 00:00:00 2001 From: Andy Jost Date: Fri, 17 Oct 2025 10:21:44 -0700 Subject: [PATCH 7/9] Simplify LatchKernel and fix formatting. --- cuda_core/tests/helpers/buffers.py | 5 +- cuda_core/tests/helpers/latch.py | 30 ++++----- cuda_core/tests/helpers/logging.py | 1 + cuda_core/tests/memory_ipc/test_event_ipc.py | 67 ++++++++----------- cuda_core/tests/memory_ipc/test_memory_ipc.py | 3 +- cuda_core/tests/test_event.py | 2 - cuda_core/tests/test_helpers.py | 13 ++-- cuda_core/tests/test_memory.py | 2 +- 8 files changed, 53 insertions(+), 70 deletions(-) diff --git a/cuda_core/tests/helpers/buffers.py b/cuda_core/tests/helpers/buffers.py index c5c569aebe..52220956d2 100644 --- a/cuda_core/tests/helpers/buffers.py +++ b/cuda_core/tests/helpers/buffers.py @@ -81,7 +81,6 @@ def verify_buffer(self, buffer, seed=None, value=None): self.sync_target.sync() assert libc.memcmp(ptr_test, ptr_expected, self.size) == 0 - @staticmethod def _ptr(buffer): """Get a pointer to the specified buffer.""" @@ -101,7 +100,7 @@ def _get_pattern_buffer(self, seed, value): pattern_buffer = DummyUnifiedMemoryResource(self.device).allocate(self.size) ptr = self._ptr(pattern_buffer) for i in range(self.size): - ptr[i] = (seed + i) & 0xFF + ptr[i] = (seed + i) & 0xFF self.pattern_buffers[key] = pattern_buffer return pattern_buffer @@ -121,5 +120,3 @@ def compare_equal_buffers(buffer1, buffer2): ptr1 = ctypes.cast(int(buffer1.handle), ctypes.POINTER(ctypes.c_byte)) ptr2 = ctypes.cast(int(buffer2.handle), ctypes.POINTER(ctypes.c_byte)) return libc.memcmp(ptr1, ptr2, buffer1.size) == 0 - - diff --git a/cuda_core/tests/helpers/latch.py b/cuda_core/tests/helpers/latch.py index bb352c3200..e1166973d8 100644 --- a/cuda_core/tests/helpers/latch.py +++ b/cuda_core/tests/helpers/latch.py @@ -1,6 +1,9 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +import ctypes + +import pytest from cuda.core.experimental import ( LaunchConfig, LegacyPinnedMemoryResource, @@ -8,15 +11,18 @@ ProgramOptions, launch, ) + import helpers -import ctypes + class LatchKernel: """ - Manages a kernel that blocks progress until released. + Manages a kernel that blocks stream progress until released. """ def __init__(self, device): + if helpers.CUDA_INCLUDE_PATH is None: + pytest.skip("need CUDA header") code = """ #include @@ -44,26 +50,14 @@ def __init__(self, device): self.busy_wait_flag[0] = 0 def launch(self, stream): + """Launch the latch kernel, blocking stream progress via busy waiting.""" config = LaunchConfig(grid=1, block=1) - launch(stream, config, self.kernel, self.busy_wait_flag_address) + launch(stream, config, self.kernel, int(self.buffer.handle)) def release(self): + """Release the latch, allowing stream progress.""" self.busy_wait_flag[0] = 1 - @property - def busy_wait_flag_address(self): - return int(self.buffer.handle) - @property def busy_wait_flag(self): - return ctypes.cast(self.busy_wait_flag_address, ctypes.POINTER(ctypes.c_int32)) - - def close(self): - buffer = getattr(self, 'buffer', None) - if buffer is not None: - buffer.close() - - def __del__(self): - self.close() - - + return ctypes.cast(int(self.buffer.handle), ctypes.POINTER(ctypes.c_int32)) diff --git a/cuda_core/tests/helpers/logging.py b/cuda_core/tests/helpers/logging.py index 6c376770c7..d5be94a280 100644 --- a/cuda_core/tests/helpers/logging.py +++ b/cuda_core/tests/helpers/logging.py @@ -3,6 +3,7 @@ import time + class TimestampedLogger: """ A logger that prefixes each output with a timestamp, containing the elapsed diff --git a/cuda_core/tests/memory_ipc/test_event_ipc.py b/cuda_core/tests/memory_ipc/test_event_ipc.py index f8208baa61..fc1acb329d 100644 --- a/cuda_core/tests/memory_ipc/test_event_ipc.py +++ b/cuda_core/tests/memory_ipc/test_event_ipc.py @@ -1,29 +1,28 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 -from conftest import skipif_need_cuda_headers -from cuda.core.experimental import Device, DeviceMemoryResource, DeviceMemoryResourceOptions, EventOptions -from helpers.buffers import make_scratch_buffer, compare_equal_buffers -from helpers.latch import LatchKernel -from helpers.logging import TimestampedLogger -import ctypes import multiprocessing as mp + import pytest -import time +from cuda.core.experimental import Device, EventOptions +from helpers.buffers import compare_equal_buffers, make_scratch_buffer +from helpers.latch import LatchKernel +from helpers.logging import TimestampedLogger ENABLE_LOGGING = False # Set True for test debugging and development CHILD_TIMEOUT_SEC = 20 NBYTES = 64 + class TestEventIpc: """Check the basic usage of IPC-enabled events with a latch kernel.""" - @skipif_need_cuda_headers # libcu++ def test_main(self, ipc_device, ipc_memory_resource): log = TimestampedLogger(prefix="parent: ", enabled=ENABLE_LOGGING) device = ipc_device mr = ipc_memory_resource stream1 = device.create_stream() + latch = LatchKernel(device) # Start the child process. q_out, q_in = [mp.Queue() for _ in range(2)] @@ -41,7 +40,6 @@ def test_main(self, ipc_device, ipc_memory_resource): q_out.put(buffer) # Stream 1: - latch = LatchKernel(device) log("enqueuing latch kernel on stream1") latch.launch(stream1) log("enqueuing copy on stream1") @@ -69,7 +67,6 @@ def test_main(self, ipc_device, ipc_memory_resource): stream1.sync() assert compare_equal_buffers(target, twos) - def child_main(self, log, q_in, q_out): log.prefix = " child: " log("ready") @@ -99,13 +96,15 @@ def test_event_is_monadic(ipc_device): stream = device.create_stream() e = stream.record(options={"ipc_enabled": True}) - with pytest.raises(TypeError, match=r"^IPC-enabled events should not be re-recorded, instead create a new event by supplying options\.$"): + with pytest.raises( + TypeError, + match=r"^IPC-enabled events should not be re-recorded, instead create a new event by supplying options\.$", + ): stream.record(e) @pytest.mark.parametrize( - "options", [ {"ipc_enabled": True, "enable_timing": True}, - EventOptions(ipc_enabled=True, enable_timing=True)] + "options", [{"ipc_enabled": True, "enable_timing": True}, EventOptions(ipc_enabled=True, enable_timing=True)] ) def test_event_timing_disabled(ipc_device, options): """Check that IPC-enabled events cannot be created with timing enabled.""" @@ -114,11 +113,13 @@ def test_event_timing_disabled(ipc_device, options): with pytest.raises(TypeError, match=r"^IPC-enabled events cannot use timing\.$"): stream.record(options=options) + class TestIpcEventProperties: """ Check that event properties are properly set after transfer to a child process. """ + @pytest.mark.parametrize("busy_waited_sync", [True, False]) @pytest.mark.parametrize("use_options_cls", [True, False]) @pytest.mark.parametrize("use_option_kw", [True, False]) @@ -132,13 +133,12 @@ def test_main(self, ipc_device, busy_waited_sync, use_options_cls, use_option_kw process.start() # Create an event and send it. - options = \ - EventOptions(ipc_enabled=True, busy_waited_sync=busy_waited_sync) \ - if use_options_cls else \ - {"ipc_enabled": True, "busy_waited_sync": busy_waited_sync} - e = stream.record(options=options) \ - if use_option_kw else \ - stream.record(None, options) + options = ( + EventOptions(ipc_enabled=True, busy_waited_sync=busy_waited_sync) + if use_options_cls + else {"ipc_enabled": True, "busy_waited_sync": busy_waited_sync} + ) + e = stream.record(options=options) if use_option_kw else stream.record(None, options) q_out.put(e) # Check its properties. @@ -156,28 +156,17 @@ def test_main(self, ipc_device, busy_waited_sync, use_options_cls, use_option_kw def child_main(self, q_in, q_out): device = Device() device.set_current() - stream = device.create_stream() # Get the event. e = q_in.get(timeout=CHILD_TIMEOUT_SEC) # Send its properties. - props = (e.get_ipc_descriptor(), - e.is_ipc_enabled, - e.is_timing_disabled, - e.is_sync_busy_waited, - e.device, - e.context,) + props = ( + e.get_ipc_descriptor(), + e.is_ipc_enabled, + e.is_timing_disabled, + e.is_sync_busy_waited, + e.device, + e.context, + ) q_out.put(props) - - - -# TODO: daisy chain processes - -if __name__ == "__main__": - mp.set_start_method("spawn") - device = Device() - device.set_current() - TestIpcEventWithLatch().test_main(device) - - diff --git a/cuda_core/tests/memory_ipc/test_memory_ipc.py b/cuda_core/tests/memory_ipc/test_memory_ipc.py index dd1f8967e1..23a3e91b7f 100644 --- a/cuda_core/tests/memory_ipc/test_memory_ipc.py +++ b/cuda_core/tests/memory_ipc/test_memory_ipc.py @@ -1,9 +1,10 @@ # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 +import multiprocessing as mp + from cuda.core.experimental import Buffer, DeviceMemoryResource from helpers.buffers import PatternGen -import multiprocessing as mp CHILD_TIMEOUT_SEC = 20 NBYTES = 64 diff --git a/cuda_core/tests/test_event.py b/cuda_core/tests/test_event.py index 9db073a388..36b696aa75 100644 --- a/cuda_core/tests/test_event.py +++ b/cuda_core/tests/test_event.py @@ -13,7 +13,6 @@ ) from helpers.latch import LatchKernel -from conftest import skipif_need_cuda_headers from cuda_python_test_helpers import IS_WSL @@ -115,7 +114,6 @@ def test_error_timing_recorded(): event3 - event2 -@skipif_need_cuda_headers # libcu++ def test_error_timing_incomplete(): device = Device() device.set_current() diff --git a/cuda_core/tests/test_helpers.py b/cuda_core/tests/test_helpers.py index cf709ea9dd..25d0c26a54 100644 --- a/cuda_core/tests/test_helpers.py +++ b/cuda_core/tests/test_helpers.py @@ -2,16 +2,18 @@ # # SPDX-License-Identifier: LicenseRef-NVIDIA-SOFTWARE-LICENSE +import time + +import pytest from cuda.core.experimental import Device +from helpers.buffers import PatternGen, compare_equal_buffers, make_scratch_buffer from helpers.latch import LatchKernel from helpers.logging import TimestampedLogger -from helpers.buffers import make_scratch_buffer, compare_equal_buffers, PatternGen -import time -import pytest ENABLE_LOGGING = False # Set True for test debugging and development NBYTES = 64 + def test_latchkernel(): """Test LatchKernel.""" log = TimestampedLogger() @@ -38,6 +40,7 @@ def test_latchkernel(): assert compare_equal_buffers(target, ones) log("done") + def test_patterngen_seeds(): """Test PatternGen with seed argument.""" device = Device() @@ -49,10 +52,11 @@ def test_patterngen_seeds(): for i in range(256): pgen.fill_buffer(buffer, seed=i) pgen.verify_buffer(buffer, seed=i) - for j in range(i+1, 256): + for j in range(i + 1, 256): with pytest.raises(AssertionError): pgen.verify_buffer(buffer, seed=j) + def test_patterngen_values(): """Test PatternGen with value argument, also compare_equal_buffers.""" device = Device() @@ -64,4 +68,3 @@ def test_patterngen_values(): pgen = PatternGen(device, NBYTES) pgen.verify_buffer(ones, value=1) pgen.verify_buffer(twos, value=2) - diff --git a/cuda_core/tests/test_memory.py b/cuda_core/tests/test_memory.py index b2a15181c1..0052136c4c 100644 --- a/cuda_core/tests/test_memory.py +++ b/cuda_core/tests/test_memory.py @@ -13,7 +13,6 @@ np = None import ctypes import platform -from helpers.buffers import DummyUnifiedMemoryResource import pytest from cuda.core.experimental import ( @@ -28,6 +27,7 @@ from cuda.core.experimental._memory import DLDeviceType, IPCBufferDescriptor from cuda.core.experimental._utils.cuda_utils import handle_return from cuda.core.experimental.utils import StridedMemoryView +from helpers.buffers import DummyUnifiedMemoryResource from cuda_python_test_helpers import IS_WSL, supports_ipc_mempool From 5ad48ca9bfbe656e74edbebeafeb1723aeffb489 Mon Sep 17 00:00:00 2001 From: Andy Jost Date: Fri, 17 Oct 2025 10:40:42 -0700 Subject: [PATCH 8/9] Fix cython-lint error. --- cuda_core/cuda/core/experimental/_event.pyx | 1 - 1 file changed, 1 deletion(-) diff --git a/cuda_core/cuda/core/experimental/_event.pyx b/cuda_core/cuda/core/experimental/_event.pyx index 327aef3f6f..dd6ef0b06e 100644 --- a/cuda_core/cuda/core/experimental/_event.pyx +++ b/cuda_core/cuda/core/experimental/_event.pyx @@ -23,7 +23,6 @@ from cuda.core.experimental._context import Context from cuda.core.experimental._utils.cuda_utils import ( CUDAError, driver, - handle_return, ) if TYPE_CHECKING: import cuda.bindings From 820bf1bd3782ce71d84817ce869d00e2867db8a1 Mon Sep 17 00:00:00 2001 From: Andy Jost Date: Fri, 17 Oct 2025 14:36:34 -0700 Subject: [PATCH 9/9] Test fix. --- cuda_core/tests/test_stream.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cuda_core/tests/test_stream.py b/cuda_core/tests/test_stream.py index 505cbbf22a..da4c66d548 100644 --- a/cuda_core/tests/test_stream.py +++ b/cuda_core/tests/test_stream.py @@ -49,12 +49,6 @@ def test_stream_record(init_cuda): assert isinstance(event, Event) -def test_stream_record_invalid_event(init_cuda): - stream = Device().create_stream(options=StreamOptions()) - with pytest.raises(TypeError): - stream.record(event="invalid_event") - - def test_stream_wait_event(init_cuda): s1 = Device().create_stream() s2 = Device().create_stream()