diff --git a/cuda_core/build_hooks.py b/cuda_core/build_hooks.py index e38f5676df..6191dcb706 100644 --- a/cuda_core/build_hooks.py +++ b/cuda_core/build_hooks.py @@ -95,7 +95,7 @@ def get_cuda_paths(): ) nthreads = int(os.environ.get("CUDA_PYTHON_PARALLEL_LEVEL", os.cpu_count() // 2)) - compile_time_env = {"CUDA_CORE_BUILD_MAJOR": _get_proper_cuda_bindings_major_version()} + compile_time_env = {"CUDA_CORE_BUILD_MAJOR": int(_get_proper_cuda_bindings_major_version())} _extensions = cythonize( ext_modules, verbose=True, diff --git a/cuda_core/cuda/core/experimental/__init__.py b/cuda_core/cuda/core/experimental/__init__.py index 826ea70b97..92174468d1 100644 --- a/cuda_core/cuda/core/experimental/__init__.py +++ b/cuda_core/cuda/core/experimental/__init__.py @@ -44,7 +44,11 @@ DeviceMemoryResourceOptions, GraphMemoryResource, LegacyPinnedMemoryResource, + ManagedMemoryResource, + ManagedMemoryResourceOptions, MemoryResource, + PinnedMemoryResource, + PinnedMemoryResourceOptions, VirtualMemoryResource, VirtualMemoryResourceOptions, ) diff --git a/cuda_core/cuda/core/experimental/_device.pyx b/cuda_core/cuda/core/experimental/_device.pyx index cd802943a5..8ebbb7b8d7 100644 --- a/cuda_core/cuda/core/experimental/_device.pyx +++ b/cuda_core/cuda/core/experimental/_device.pyx @@ -1080,7 +1080,7 @@ class Device: if self._uuid is None: dev = self._id with nogil: - IF CUDA_CORE_BUILD_MAJOR == "12": + IF CUDA_CORE_BUILD_MAJOR == 12: HANDLE_RETURN(cydriver.cuDeviceGetUuid_v2(&uuid, dev)) ELSE: # 13.0+ HANDLE_RETURN(cydriver.cuDeviceGetUuid(&uuid, dev)) diff --git a/cuda_core/cuda/core/experimental/_memory/__init__.pxd b/cuda_core/cuda/core/experimental/_memory/__init__.pxd new file mode 100644 index 0000000000..e69de29bb2 diff --git a/cuda_core/cuda/core/experimental/_memory/__init__.py b/cuda_core/cuda/core/experimental/_memory/__init__.py index 20b90d7fdd..9d141ebca2 100644 --- a/cuda_core/cuda/core/experimental/_memory/__init__.py +++ b/cuda_core/cuda/core/experimental/_memory/__init__.py @@ -7,4 +7,6 @@ from ._graph_memory_resource import * # noqa: F403 from ._ipc import * # noqa: F403 from ._legacy import * # noqa: F403 +from ._managed_memory_resource import * # noqa: F403 +from ._pinned_memory_resource import * # noqa: F403 from ._virtual_memory_resource import * # noqa: F403 diff --git a/cuda_core/cuda/core/experimental/_memory/_buffer.pyx b/cuda_core/cuda/core/experimental/_memory/_buffer.pyx index b26471ed0e..8f4ac46051 100644 --- a/cuda_core/cuda/core/experimental/_memory/_buffer.pyx +++ b/cuda_core/cuda/core/experimental/_memory/_buffer.pyx @@ -8,7 +8,8 @@ cimport cython from libc.stdint cimport uintptr_t, int64_t, uint64_t from cuda.bindings cimport cydriver -from cuda.core.experimental._memory._device_memory_resource cimport DeviceMemoryResource +from cuda.core.experimental._memory._device_memory_resource import DeviceMemoryResource +from cuda.core.experimental._memory._pinned_memory_resource import PinnedMemoryResource from cuda.core.experimental._memory._ipc cimport IPCBufferDescriptor, IPCDataForBuffer from cuda.core.experimental._memory cimport _ipc from cuda.core.experimental._stream cimport Stream_accept, Stream @@ -106,7 +107,7 @@ cdef class Buffer: @classmethod def from_ipc_descriptor( - cls, mr: DeviceMemoryResource, ipc_descriptor: IPCBufferDescriptor, + cls, mr: DeviceMemoryResource | PinnedMemoryResource, ipc_descriptor: IPCBufferDescriptor, stream: Stream = None ) -> Buffer: """Import a buffer that was exported from another process.""" diff --git a/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pxd b/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pxd index 823a270b27..17ee12e54f 100644 --- a/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pxd +++ b/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pxd @@ -2,20 +2,12 @@ # # SPDX-License-Identifier: Apache-2.0 -from cuda.bindings cimport cydriver -from cuda.core.experimental._memory._buffer cimport MemoryResource +from cuda.core.experimental._memory._memory_pool cimport _MemPool from cuda.core.experimental._memory._ipc cimport IPCDataForMR -cdef class DeviceMemoryResource(MemoryResource): - cdef: - int _dev_id - cydriver.CUmemoryPool _handle - bint _mempool_owned - IPCDataForMR _ipc_data - object _attributes - object _peer_accessible_by - object __weakref__ +cdef class DeviceMemoryResource(_MemPool): + pass cpdef DMR_mempool_get_access(DeviceMemoryResource, int) diff --git a/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pyx b/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pyx index ac18079a62..49c590374e 100644 --- a/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pyx +++ b/cuda_core/cuda/core/experimental/_memory/_device_memory_resource.pyx @@ -4,31 +4,24 @@ from __future__ import annotations -from libc.limits cimport ULLONG_MAX -from libc.stdint cimport uintptr_t -from libc.stdlib cimport malloc, free -from libc.string cimport memset - from cuda.bindings cimport cydriver -from cuda.core.experimental._memory._buffer cimport Buffer, MemoryResource +from cuda.core.experimental._memory._memory_pool cimport _MemPool, _MemPoolOptions from cuda.core.experimental._memory cimport _ipc -from cuda.core.experimental._memory._ipc cimport IPCAllocationHandle, IPCDataForMR -from cuda.core.experimental._stream cimport default_stream, Stream_accept, Stream +from cuda.core.experimental._memory._ipc cimport IPCAllocationHandle from cuda.core.experimental._utils.cuda_utils cimport ( check_or_create_options, HANDLE_RETURN, ) from dataclasses import dataclass +import multiprocessing from typing import Optional, TYPE_CHECKING import platform # no-cython-lint import uuid -import weakref -from cuda.core.experimental._utils.cuda_utils import driver +from cuda.core.experimental._utils.cuda_utils import check_multiprocessing_start_method if TYPE_CHECKING: - from cuda.core.experimental._memory.buffer import DevicePointerT from .._device import Device __all__ = ['DeviceMemoryResource', 'DeviceMemoryResourceOptions'] @@ -53,92 +46,7 @@ cdef class DeviceMemoryResourceOptions: max_size : int = 0 -cdef class DeviceMemoryResourceAttributes: - cdef: - object _mr_weakref - - def __init__(self, *args, **kwargs): - raise RuntimeError("DeviceMemoryResourceAttributes cannot be instantiated directly. Please use MemoryResource APIs.") - - @classmethod - def _init(cls, mr): - cdef DeviceMemoryResourceAttributes self = DeviceMemoryResourceAttributes.__new__(cls) - self._mr_weakref = mr - return self - - def __repr__(self): - return f"{self.__class__.__name__}(%s)" % ", ".join( - f"{attr}={getattr(self, attr)}" for attr in dir(self) - if not attr.startswith("_") - ) - - cdef int _getattribute(self, cydriver.CUmemPool_attribute attr_enum, void* value) except?-1: - cdef DeviceMemoryResource mr = (self._mr_weakref()) - if mr is None: - raise RuntimeError("DeviceMemoryResource is expired") - cdef cydriver.CUmemoryPool pool_handle = mr._handle - with nogil: - HANDLE_RETURN(cydriver.cuMemPoolGetAttribute(pool_handle, attr_enum, value)) - return 0 - - @property - def reuse_follow_event_dependencies(self): - """Allow memory to be reused when there are event dependencies between streams.""" - cdef int value - self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_REUSE_FOLLOW_EVENT_DEPENDENCIES, &value) - return bool(value) - - @property - def reuse_allow_opportunistic(self): - """Allow reuse of completed frees without dependencies.""" - cdef int value - self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_REUSE_ALLOW_OPPORTUNISTIC, &value) - return bool(value) - - @property - def reuse_allow_internal_dependencies(self): - """Allow insertion of new stream dependencies for memory reuse.""" - cdef int value - self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_REUSE_ALLOW_INTERNAL_DEPENDENCIES, &value) - return bool(value) - - @property - def release_threshold(self): - """Amount of reserved memory to hold before OS release.""" - cdef cydriver.cuuint64_t value - self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD, &value) - return int(value) - - @property - def reserved_mem_current(self): - """Current amount of backing memory allocated.""" - cdef cydriver.cuuint64_t value - self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RESERVED_MEM_CURRENT, &value) - return int(value) - - @property - def reserved_mem_high(self): - """High watermark of backing memory allocated.""" - cdef cydriver.cuuint64_t value - self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RESERVED_MEM_HIGH, &value) - return int(value) - - @property - def used_mem_current(self): - """Current amount of memory in use.""" - cdef cydriver.cuuint64_t value - self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_USED_MEM_CURRENT, &value) - return int(value) - - @property - def used_mem_high(self): - """High watermark of memory in use.""" - cdef cydriver.cuuint64_t value - self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_USED_MEM_HIGH, &value) - return int(value) - - -cdef class DeviceMemoryResource(MemoryResource): +cdef class DeviceMemoryResource(_MemPool): """ A device memory resource managing a stream-ordered memory pool. @@ -217,36 +125,27 @@ cdef class DeviceMemoryResource(MemoryResource): associated MMR. """ - def __cinit__(self): - self._dev_id = cydriver.CU_DEVICE_INVALID - self._handle = NULL - self._mempool_owned = False - self._ipc_data = None - self._attributes = None - self._peer_accessible_by = () - def __init__(self, device_id: Device | int, options=None): from .._device import Device cdef int dev_id = Device(device_id).device_id - opts = check_or_create_options( + cdef DeviceMemoryResourceOptions opts = check_or_create_options( DeviceMemoryResourceOptions, options, "DeviceMemoryResource options", keep_none=True ) + cdef _MemPoolOptions opts_base = _MemPoolOptions() - if opts is None: - DMR_init_current(self, dev_id) - else: - DMR_init_create(self, dev_id, opts) + cdef bint ipc_enabled = False + if opts: + ipc_enabled = opts.ipc_enabled + if ipc_enabled and not _ipc.is_supported(): + raise RuntimeError("IPC is not available on {platform.system()}") + opts_base._max_size = opts.max_size + opts_base._use_current = False + opts_base._ipc_enabled = ipc_enabled + opts_base._location = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + opts_base._type = cydriver.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_PINNED - def __dealloc__(self): - DMR_close(self) - - def close(self): - """ - Close the device memory resource and destroy the associated memory pool - if owned. - """ - DMR_close(self) + super().__init__(dev_id, opts_base) def __reduce__(self): return DeviceMemoryResource.from_registry, (self.uuid,) @@ -261,7 +160,7 @@ cdef class DeviceMemoryResource(MemoryResource): RuntimeError If no mapped memory resource is found in the registry. """ - return _ipc.DMR_from_registry(uuid) + return (_ipc.MP_from_registry(uuid)) def register(self, uuid: uuid.UUID) -> DeviceMemoryResource: # no-cython-lint """ @@ -272,7 +171,7 @@ cdef class DeviceMemoryResource(MemoryResource): The registered mapped memory resource. If one was previously registered with the given key, it is returned. """ - return _ipc.DMR_register(self, uuid) + return (_ipc.MP_register(self, uuid)) @classmethod def from_allocation_handle( @@ -299,7 +198,11 @@ cdef class DeviceMemoryResource(MemoryResource): ------- A new device memory resource instance with the imported handle. """ - return _ipc.DMR_from_allocation_handle(cls, device_id, alloc_handle) + cdef DeviceMemoryResource mr = ( + _ipc.MP_from_allocation_handle(cls, alloc_handle)) + from .._device import Device + mr._dev_id = Device(device_id).device_id + return mr def get_allocation_handle(self) -> IPCAllocationHandle: """Export the memory pool handle to be shared (requires IPC). @@ -315,73 +218,11 @@ cdef class DeviceMemoryResource(MemoryResource): raise RuntimeError("Memory resource is not IPC-enabled") return self._ipc_data._alloc_handle - def allocate(self, size_t size, stream: Stream | GraphBuilder | None = None) -> Buffer: - """Allocate a buffer of the requested size. - - Parameters - ---------- - size : int - The size of the buffer to allocate, in bytes. - stream : :obj:`~_stream.Stream` | :obj:`~_graph.GraphBuilder`, optional - The stream on which to perform the allocation asynchronously. - If None, an internal stream is used. - - Returns - ------- - Buffer - The allocated buffer object, which is accessible on the device that this memory - resource was created for. - """ - if self.is_mapped: - raise TypeError("Cannot allocate from a mapped IPC-enabled memory resource") - stream = Stream_accept(stream) if stream is not None else default_stream() - return DMR_allocate(self, size, stream) - - def deallocate(self, ptr: DevicePointerT, size_t size, stream: Stream | GraphBuilder | None = None): - """Deallocate a buffer previously allocated by this resource. - - Parameters - ---------- - ptr : :obj:`~_memory.DevicePointerT` - The pointer or handle to the buffer to deallocate. - size : int - The size of the buffer to deallocate, in bytes. - stream : :obj:`~_stream.Stream` | :obj:`~_graph.GraphBuilder`, optional - The stream on which to perform the deallocation asynchronously. - If the buffer is deallocated without an explicit stream, the allocation stream - is used. - """ - stream = Stream_accept(stream) if stream is not None else default_stream() - DMR_deallocate(self, ptr, size, stream) - - @property - def attributes(self) -> DeviceMemoryResourceAttributes: - """Memory pool attributes.""" - if self._attributes is None: - ref = weakref.ref(self) - self._attributes = DeviceMemoryResourceAttributes._init(ref) - return self._attributes - - @property - def device_id(self) -> int: - """The associated device ordinal.""" - return self._dev_id - - @property - def handle(self) -> driver.CUmemoryPool: - """Handle to the underlying memory pool.""" - return driver.CUmemoryPool((self._handle)) - @property def is_device_accessible(self) -> bool: """Return True. This memory resource provides device-accessible buffers.""" return True - @property - def is_handle_owned(self) -> bool: - """Whether the memory resource handle is owned. If False, ``close`` has no effect.""" - return self._mempool_owned - @property def is_host_accessible(self) -> bool: """Return False. This memory resource does not provide host-accessible buffers.""" @@ -408,197 +249,6 @@ cdef class DeviceMemoryResource(MemoryResource): """ return getattr(self._ipc_data, 'uuid', None) - @property - def peer_accessible_by(self): - """ - Get or set the devices that can access allocations from this memory - pool. Access can be modified at any time and affects all allocations - from this memory pool. - - Returns a tuple of sorted device IDs that currently have peer access to - allocations from this memory pool. - - When setting, accepts a sequence of Device objects or device IDs. - Setting to an empty sequence revokes all peer access. - - Examples - -------- - >>> dmr = DeviceMemoryResource(0) - >>> dmr.peer_accessible_by = [1] # Grant access to device 1 - >>> assert dmr.peer_accessible_by == (1,) - >>> dmr.peer_accessible_by = [] # Revoke access - """ - return self._peer_accessible_by - - @peer_accessible_by.setter - def peer_accessible_by(self, devices): - """Set which devices can access this memory pool.""" - from .._device import Device - - # Convert all devices to device IDs - cdef set[int] target_ids = {Device(dev).device_id for dev in devices} - target_ids.discard(self._dev_id) # exclude this device from peer access list - this_dev = Device(self._dev_id) - cdef list bad = [dev for dev in target_ids if not this_dev.can_access_peer(dev)] - if bad: - raise ValueError(f"Device {self._dev_id} cannot access peer(s): {', '.join(map(str, bad))}") - cdef set[int] cur_ids = set(self._peer_accessible_by) - cdef set[int] to_add = target_ids - cur_ids - cdef set[int] to_rm = cur_ids - target_ids - cdef size_t count = len(to_add) + len(to_rm) # transaction size - cdef cydriver.CUmemAccessDesc* access_desc = NULL - cdef size_t i = 0 - - if count > 0: - access_desc = malloc(count * sizeof(cydriver.CUmemAccessDesc)) - if access_desc == NULL: - raise MemoryError("Failed to allocate memory for access descriptors") - - try: - for dev_id in to_add: - access_desc[i].flags = cydriver.CUmemAccess_flags.CU_MEM_ACCESS_FLAGS_PROT_READWRITE - access_desc[i].location.type = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE - access_desc[i].location.id = dev_id - i += 1 - - for dev_id in to_rm: - access_desc[i].flags = cydriver.CUmemAccess_flags.CU_MEM_ACCESS_FLAGS_PROT_NONE - access_desc[i].location.type = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE - access_desc[i].location.id = dev_id - i += 1 - - with nogil: - HANDLE_RETURN(cydriver.cuMemPoolSetAccess(self._handle, access_desc, count)) - finally: - if access_desc != NULL: - free(access_desc) - - self._peer_accessible_by = tuple(target_ids) - - -# DeviceMemoryResource Implementation -# ----------------------------------- - -cdef void DMR_init_current(DeviceMemoryResource self, int dev_id): - # Get the current memory pool. - cdef cydriver.cuuint64_t current_threshold - cdef cydriver.cuuint64_t max_threshold = ULLONG_MAX - - self._dev_id = dev_id - self._mempool_owned = False - - with nogil: - HANDLE_RETURN(cydriver.cuDeviceGetMemPool(&(self._handle), dev_id)) - - # Set a higher release threshold to improve performance when there are - # no active allocations. By default, the release threshold is 0, which - # means memory is immediately released back to the OS when there are no - # active suballocations, causing performance issues. - HANDLE_RETURN( - cydriver.cuMemPoolGetAttribute( - self._handle, - cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD, - ¤t_threshold - ) - ) - - # If threshold is 0 (default), set it to maximum to retain memory in the pool. - if current_threshold == 0: - HANDLE_RETURN(cydriver.cuMemPoolSetAttribute( - self._handle, - cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD, - &max_threshold - )) - - -cdef void DMR_init_create( - DeviceMemoryResource self, int dev_id, DeviceMemoryResourceOptions opts -): - # Create a new memory pool. - cdef cydriver.CUmemPoolProps properties - - if opts.ipc_enabled and not _ipc.is_supported(): - raise RuntimeError("IPC is not available on {platform.system()}") - - memset(&properties, 0, sizeof(cydriver.CUmemPoolProps)) - properties.allocType = cydriver.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_PINNED - properties.handleTypes = _ipc.IPC_HANDLE_TYPE if opts.ipc_enabled else cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE - properties.location.id = dev_id - properties.location.type = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE - properties.maxSize = opts.max_size - properties.win32SecurityAttributes = NULL - properties.usage = 0 - - self._dev_id = dev_id - self._mempool_owned = True - - with nogil: - HANDLE_RETURN(cydriver.cuMemPoolCreate(&(self._handle), &properties)) - # TODO: should we also set the threshold here? - - if opts.ipc_enabled: - alloc_handle = _ipc.DMR_export_mempool(self) - self._ipc_data = IPCDataForMR(alloc_handle, False) - - -# Raise an exception if the given stream is capturing. -# A result of CU_STREAM_CAPTURE_STATUS_INVALIDATED is considered an error. -cdef inline int check_not_capturing(cydriver.CUstream s) except?-1 nogil: - cdef cydriver.CUstreamCaptureStatus capturing - HANDLE_RETURN(cydriver.cuStreamIsCapturing(s, &capturing)) - if capturing != cydriver.CUstreamCaptureStatus.CU_STREAM_CAPTURE_STATUS_NONE: - raise RuntimeError("DeviceMemoryResource cannot perform memory operations on " - "a capturing stream (consider using GraphMemoryResource).") - - -cdef inline Buffer DMR_allocate(DeviceMemoryResource self, size_t size, Stream stream): - cdef cydriver.CUstream s = stream._handle - cdef cydriver.CUdeviceptr devptr - with nogil: - check_not_capturing(s) - HANDLE_RETURN(cydriver.cuMemAllocFromPoolAsync(&devptr, size, self._handle, s)) - cdef Buffer buf = Buffer.__new__(Buffer) - buf._ptr = (devptr) - buf._ptr_obj = None - buf._size = size - buf._memory_resource = self - buf._alloc_stream = stream - return buf - - -cdef inline void DMR_deallocate( - DeviceMemoryResource self, uintptr_t ptr, size_t size, Stream stream -) noexcept: - cdef cydriver.CUstream s = stream._handle - cdef cydriver.CUdeviceptr devptr = ptr - cdef cydriver.CUresult r - with nogil: - r = cydriver.cuMemFreeAsync(devptr, s) - if r != cydriver.CUDA_ERROR_INVALID_CONTEXT: - HANDLE_RETURN(r) - - -cdef inline DMR_close(DeviceMemoryResource self): - if self._handle == NULL: - return - - # This works around nvbug 5698116. When a memory pool handle is recycled - # the new handle inherits the peer access state of the previous handle. - if self._peer_accessible_by: - self.peer_accessible_by = [] - - try: - if self._mempool_owned: - with nogil: - HANDLE_RETURN(cydriver.cuMemPoolDestroy(self._handle)) - finally: - self._dev_id = cydriver.CU_DEVICE_INVALID - self._handle = NULL - self._attributes = None - self._mempool_owned = False - self._ipc_data = None - self._peer_accessible_by = () - # Note: this is referenced in instructions to debug nvbug 5698116. cpdef DMR_mempool_get_access(DeviceMemoryResource dmr, int device_id): @@ -633,3 +283,14 @@ cpdef DMR_mempool_get_access(DeviceMemoryResource dmr, int device_id): return "r" else: return "" + + +def _deep_reduce_device_memory_resource(mr): + check_multiprocessing_start_method() + from .._device import Device + device = Device(mr.device_id) + alloc_handle = mr.get_allocation_handle() + return mr.from_allocation_handle, (device, alloc_handle) + + +multiprocessing.reduction.register(DeviceMemoryResource, _deep_reduce_device_memory_resource) diff --git a/cuda_core/cuda/core/experimental/_memory/_ipc.pxd b/cuda_core/cuda/core/experimental/_memory/_ipc.pxd index 60d96a3b33..3fed2b7188 100644 --- a/cuda_core/cuda/core/experimental/_memory/_ipc.pxd +++ b/cuda_core/cuda/core/experimental/_memory/_ipc.pxd @@ -4,10 +4,10 @@ from cuda.bindings cimport cydriver from cuda.core.experimental._memory._buffer cimport Buffer -from cuda.core.experimental._memory._device_memory_resource cimport DeviceMemoryResource +from cuda.core.experimental._memory._memory_pool cimport _MemPool -# Holds DeviceMemoryResource objects imported by this process. This enables +# Holds _MemPool objects imported by this process. This enables # buffer serialization, as buffers can reduce to a pair comprising the memory # resource UUID (the key into this registry) and the serialized buffer # descriptor. @@ -53,12 +53,12 @@ cdef class IPCAllocationHandle: # Buffer IPC Implementation # ------------------------- cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer) -cdef Buffer Buffer_from_ipc_descriptor(cls, DeviceMemoryResource, IPCBufferDescriptor, stream) +cdef Buffer Buffer_from_ipc_descriptor(cls, _MemPool, IPCBufferDescriptor, stream) -# DeviceMemoryResource IPC Implementation -# --------------------------------------- -cdef DeviceMemoryResource DMR_from_allocation_handle(cls, device_id, alloc_handle) -cdef DeviceMemoryResource DMR_from_registry(uuid) -cdef DeviceMemoryResource DMR_register(DeviceMemoryResource, uuid) -cdef IPCAllocationHandle DMR_export_mempool(DeviceMemoryResource) +# _MemPool IPC Implementation +# --------------------------- +cdef _MemPool MP_from_allocation_handle(cls, alloc_handle) +cdef _MemPool MP_from_registry(uuid) +cdef _MemPool MP_register(_MemPool, uuid) +cdef IPCAllocationHandle MP_export_mempool(_MemPool) diff --git a/cuda_core/cuda/core/experimental/_memory/_ipc.pyx b/cuda_core/cuda/core/experimental/_memory/_ipc.pyx index c9931855cf..980e814e11 100644 --- a/cuda_core/cuda/core/experimental/_memory/_ipc.pyx +++ b/cuda_core/cuda/core/experimental/_memory/_ipc.pyx @@ -142,17 +142,6 @@ def _reconstruct_allocation_handle(cls, df, uuid): # no-cython-lint multiprocessing.reduction.register(IPCAllocationHandle, _reduce_allocation_handle) -def _deep_reduce_device_memory_resource(mr): - check_multiprocessing_start_method() - from .._device import Device - device = Device(mr.device_id) - alloc_handle = mr.get_allocation_handle() - return mr.from_allocation_handle, (device, alloc_handle) - - -multiprocessing.reduction.register(DeviceMemoryResource, _deep_reduce_device_memory_resource) - - # Buffer IPC Implementation # ------------------------- cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer self): @@ -169,13 +158,13 @@ cdef IPCBufferDescriptor Buffer_get_ipc_descriptor(Buffer self): return IPCBufferDescriptor._init(data_b, self.size) cdef Buffer Buffer_from_ipc_descriptor( - cls, DeviceMemoryResource mr, IPCBufferDescriptor ipc_descriptor, stream + cls, _MemPool mr, IPCBufferDescriptor ipc_descriptor, stream ): """Import a buffer that was exported from another process.""" if not mr.is_ipc_enabled: raise RuntimeError("Memory resource is not IPC-enabled") if stream is None: - # Note: match this behavior to DeviceMemoryResource.allocate() + # Note: match this behavior to _MemPool.allocate() stream = default_stream() cdef cydriver.CUmemPoolPtrExportData data memcpy( @@ -189,10 +178,10 @@ cdef Buffer Buffer_from_ipc_descriptor( return Buffer._init(ptr, ipc_descriptor.size, mr, stream, ipc_descriptor) -# DeviceMemoryResource IPC Implementation -# --------------------------------------- +# _MemPool IPC Implementation +# --------------------------- -cdef DeviceMemoryResource DMR_from_allocation_handle(cls, device_id, alloc_handle): +cdef _MemPool MP_from_allocation_handle(cls, alloc_handle): # Quick exit for registry hits. uuid = getattr(alloc_handle, 'uuid', None) # no-cython-lint mr = registry.get(uuid) @@ -209,10 +198,8 @@ cdef DeviceMemoryResource DMR_from_allocation_handle(cls, device_id, alloc_handl os.close(fd) raise - # Construct a new DMR. - cdef DeviceMemoryResource self = DeviceMemoryResource.__new__(cls) - from .._device import Device - self._dev_id = Device(device_id).device_id + # Construct a new mempool + cdef _MemPool self = <_MemPool>(cls.__new__(cls)) self._mempool_owned = True self._ipc_data = IPCDataForMR(alloc_handle, True) @@ -231,14 +218,14 @@ cdef DeviceMemoryResource DMR_from_allocation_handle(cls, device_id, alloc_handl return self -cdef DeviceMemoryResource DMR_from_registry(uuid): +cdef _MemPool MP_from_registry(uuid): try: return registry[uuid] except KeyError: raise RuntimeError(f"Memory resource {uuid} was not found") from None -cdef DeviceMemoryResource DMR_register(DeviceMemoryResource self, uuid): +cdef _MemPool MP_register(_MemPool self, uuid): existing = registry.get(uuid) if existing is not None: return existing @@ -248,7 +235,7 @@ cdef DeviceMemoryResource DMR_register(DeviceMemoryResource self, uuid): return self -cdef IPCAllocationHandle DMR_export_mempool(DeviceMemoryResource self): +cdef IPCAllocationHandle MP_export_mempool(_MemPool self): # Note: This is Linux only (int for file descriptor) cdef int fd with nogil: diff --git a/cuda_core/cuda/core/experimental/_memory/_managed_memory_resource.pxd b/cuda_core/cuda/core/experimental/_memory/_managed_memory_resource.pxd new file mode 100644 index 0000000000..3e9aed7bee --- /dev/null +++ b/cuda_core/cuda/core/experimental/_memory/_managed_memory_resource.pxd @@ -0,0 +1,9 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from cuda.core.experimental._memory._memory_pool cimport _MemPool + + +cdef class ManagedMemoryResource(_MemPool): + pass diff --git a/cuda_core/cuda/core/experimental/_memory/_managed_memory_resource.pyx b/cuda_core/cuda/core/experimental/_memory/_managed_memory_resource.pyx new file mode 100644 index 0000000000..7636213a63 --- /dev/null +++ b/cuda_core/cuda/core/experimental/_memory/_managed_memory_resource.pyx @@ -0,0 +1,119 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from cuda.bindings cimport cydriver +from cuda.core.experimental._memory._memory_pool cimport _MemPool, _MemPoolOptions +from cuda.core.experimental._utils.cuda_utils cimport ( + check_or_create_options, +) + +from dataclasses import dataclass +from typing import Optional + +__all__ = ['ManagedMemoryResource', 'ManagedMemoryResourceOptions'] + + +@dataclass +cdef class ManagedMemoryResourceOptions: + """Customizable :obj:`~_memory.ManagedMemoryResource` options. + + Attributes + ---------- + preferred_location : int | None, optional + The preferred device location for the managed memory. + Use a device ID (0, 1, 2, ...) for device preference, -1 for CPU/host, + or None to let the driver decide. + (Default to None) + """ + preferred_location : Optional[int] = None + + +cdef class ManagedMemoryResource(_MemPool): + """ + A managed memory resource managing a stream-ordered memory pool. + + Managed memory is accessible from both the host and device, with automatic + migration between them as needed. + + Parameters + ---------- + options : ManagedMemoryResourceOptions + Memory resource creation options. + + If set to `None`, the memory resource uses the driver's current + stream-ordered memory pool. If no memory pool is set as current, + the driver's default memory pool is used. + + If not set to `None`, a new memory pool is created, which is owned by + the memory resource. + + When using an existing (current or default) memory pool, the returned + managed memory resource does not own the pool (`is_handle_owned` is + `False`), and closing the resource has no effect. + + Notes + ----- + IPC (Inter-Process Communication) is not currently supported for managed + memory pools. + """ + + def __init__(self, options=None): + cdef ManagedMemoryResourceOptions opts = check_or_create_options( + ManagedMemoryResourceOptions, options, "ManagedMemoryResource options", + keep_none=True + ) + cdef _MemPoolOptions opts_base = _MemPoolOptions() + + cdef int device_id = -1 + cdef object preferred_location = None + if opts: + preferred_location = opts.preferred_location + if preferred_location is not None: + device_id = preferred_location + opts_base._use_current = False + + opts_base._ipc_enabled = False # IPC not supported for managed memory pools + + IF CUDA_CORE_BUILD_MAJOR >= 13: + # Set location based on preferred_location + if preferred_location is None: + # Let the driver decide + opts_base._location = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_NONE + elif device_id == -1: + # CPU/host preference + opts_base._location = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_HOST + else: + # Device preference + opts_base._location = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + + opts_base._type = cydriver.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_MANAGED + ELSE: + raise RuntimeError("ManagedMemoryResource requires CUDA 13.0 or later") + + super().__init__(device_id, opts_base) + + @property + def is_device_accessible(self) -> bool: + """Return True. This memory resource provides device-accessible buffers.""" + return True + + @property + def is_host_accessible(self) -> bool: + """Return True. This memory resource provides host-accessible buffers.""" + return True + + @property + def is_ipc_enabled(self) -> bool: + """Whether this memory resource has IPC enabled.""" + return self._ipc_data is not None + + @property + def is_mapped(self) -> bool: + """ + Whether this is a mapping of an IPC-enabled memory resource from + another process. If True, allocation is not permitted. + """ + return self._ipc_data is not None and self._ipc_data._is_mapped diff --git a/cuda_core/cuda/core/experimental/_memory/_memory_pool.pxd b/cuda_core/cuda/core/experimental/_memory/_memory_pool.pxd new file mode 100644 index 0000000000..68b2e6438f --- /dev/null +++ b/cuda_core/cuda/core/experimental/_memory/_memory_pool.pxd @@ -0,0 +1,28 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from cuda.bindings cimport cydriver +from cuda.core.experimental._memory._buffer cimport MemoryResource +from cuda.core.experimental._memory._ipc cimport IPCDataForMR + + +cdef class _MemPool(MemoryResource): + cdef: + int _dev_id + cydriver.CUmemoryPool _handle + bint _mempool_owned + IPCDataForMR _ipc_data + object _attributes + object _peer_accessible_by + object __weakref__ + + +cdef class _MemPoolOptions: + + cdef: + bint _ipc_enabled + size_t _max_size + cydriver.CUmemLocationType _location + cydriver.CUmemAllocationType _type + bint _use_current diff --git a/cuda_core/cuda/core/experimental/_memory/_memory_pool.pyx b/cuda_core/cuda/core/experimental/_memory/_memory_pool.pyx new file mode 100644 index 0000000000..dbbcc75715 --- /dev/null +++ b/cuda_core/cuda/core/experimental/_memory/_memory_pool.pyx @@ -0,0 +1,438 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from libc.limits cimport ULLONG_MAX +from libc.stdint cimport uintptr_t +from libc.string cimport memset +from cpython.mem cimport PyMem_Malloc, PyMem_Free + +from cuda.bindings cimport cydriver +from cuda.core.experimental._memory._buffer cimport Buffer, MemoryResource +from cuda.core.experimental._memory cimport _ipc +from cuda.core.experimental._stream cimport default_stream, Stream_accept, Stream +from cuda.core.experimental._utils.cuda_utils cimport ( + HANDLE_RETURN, +) + +from typing import TYPE_CHECKING +import platform # no-cython-lint +import weakref + +from cuda.core.experimental._utils.cuda_utils import driver + +if TYPE_CHECKING: + from cuda.core.experimental._memory.buffer import DevicePointerT + from .._device import Device + + +cdef class _MemPoolOptions: + + def __cinit__(self): + self._ipc_enabled = False + self._max_size = 0 + self._location = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_INVALID + self._type = cydriver.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_INVALID + self._use_current = True + + +cdef class _MemPoolAttributes: + cdef: + object _mr_weakref + + def __init__(self, *args, **kwargs): + raise RuntimeError("_MemPoolAttributes cannot be instantiated directly. Please use MemoryResource APIs.") + + @classmethod + def _init(cls, mr): + cdef _MemPoolAttributes self = _MemPoolAttributes.__new__(cls) + self._mr_weakref = mr + return self + + def __repr__(self): + return f"{self.__class__.__name__}(%s)" % ", ".join( + f"{attr}={getattr(self, attr)}" for attr in dir(self) + if not attr.startswith("_") + ) + + cdef int _getattribute(self, cydriver.CUmemPool_attribute attr_enum, void* value) except?-1: + cdef _MemPool mr = <_MemPool>(self._mr_weakref()) + if mr is None: + raise RuntimeError("_MemPool is expired") + cdef cydriver.CUmemoryPool pool_handle = mr._handle + with nogil: + HANDLE_RETURN(cydriver.cuMemPoolGetAttribute(pool_handle, attr_enum, value)) + return 0 + + @property + def reuse_follow_event_dependencies(self): + """Allow memory to be reused when there are event dependencies between streams.""" + cdef int value + self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_REUSE_FOLLOW_EVENT_DEPENDENCIES, &value) + return bool(value) + + @property + def reuse_allow_opportunistic(self): + """Allow reuse of completed frees without dependencies.""" + cdef int value + self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_REUSE_ALLOW_OPPORTUNISTIC, &value) + return bool(value) + + @property + def reuse_allow_internal_dependencies(self): + """Allow insertion of new stream dependencies for memory reuse.""" + cdef int value + self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_REUSE_ALLOW_INTERNAL_DEPENDENCIES, &value) + return bool(value) + + @property + def release_threshold(self): + """Amount of reserved memory to hold before OS release.""" + cdef cydriver.cuuint64_t value + self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD, &value) + return int(value) + + @property + def reserved_mem_current(self): + """Current amount of backing memory allocated.""" + cdef cydriver.cuuint64_t value + self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RESERVED_MEM_CURRENT, &value) + return int(value) + + @property + def reserved_mem_high(self): + """High watermark of backing memory allocated.""" + cdef cydriver.cuuint64_t value + self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RESERVED_MEM_HIGH, &value) + return int(value) + + @property + def used_mem_current(self): + """Current amount of memory in use.""" + cdef cydriver.cuuint64_t value + self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_USED_MEM_CURRENT, &value) + return int(value) + + @property + def used_mem_high(self): + """High watermark of memory in use.""" + cdef cydriver.cuuint64_t value + self._getattribute(cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_USED_MEM_HIGH, &value) + return int(value) + + +cdef class _MemPool(MemoryResource): + + def __cinit__(self): + self._dev_id = cydriver.CU_DEVICE_INVALID + self._handle = NULL + self._mempool_owned = False + self._ipc_data = None + self._attributes = None + self._peer_accessible_by = () + + def __init__(self, int device_id, _MemPoolOptions opts): + if opts._use_current: + _MP_init_current(self, device_id, opts) + else: + _MP_init_create(self, device_id, opts) + + def __dealloc__(self): + _MP_close(self) + + def close(self): + """ + Close the device memory resource and destroy the associated memory pool + if owned. + """ + _MP_close(self) + + def allocate(self, size_t size, stream: Stream | GraphBuilder | None = None) -> Buffer: + """Allocate a buffer of the requested size. + + Parameters + ---------- + size : int + The size of the buffer to allocate, in bytes. + stream : :obj:`~_stream.Stream` | :obj:`~_graph.GraphBuilder`, optional + The stream on which to perform the allocation asynchronously. + If None, an internal stream is used. + + Returns + ------- + Buffer + The allocated buffer object, which is accessible on the device that this memory + resource was created for. + """ + if self.is_mapped: + raise TypeError("Cannot allocate from a mapped IPC-enabled memory resource") + stream = Stream_accept(stream) if stream is not None else default_stream() + return _MP_allocate(self, size, stream) + + def deallocate(self, ptr: DevicePointerT, size_t size, stream: Stream | GraphBuilder | None = None): + """Deallocate a buffer previously allocated by this resource. + + Parameters + ---------- + ptr : :obj:`~_memory.DevicePointerT` + The pointer or handle to the buffer to deallocate. + size : int + The size of the buffer to deallocate, in bytes. + stream : :obj:`~_stream.Stream` | :obj:`~_graph.GraphBuilder`, optional + The stream on which to perform the deallocation asynchronously. + If the buffer is deallocated without an explicit stream, the allocation stream + is used. + """ + stream = Stream_accept(stream) if stream is not None else default_stream() + _MP_deallocate(self, ptr, size, stream) + + @property + def attributes(self) -> _MemPoolAttributes: + """Memory pool attributes.""" + if self._attributes is None: + ref = weakref.ref(self) + self._attributes = _MemPoolAttributes._init(ref) + return self._attributes + + @property + def device_id(self) -> int: + """The associated device ordinal.""" + return self._dev_id + + @property + def handle(self) -> driver.CUmemoryPool: + """Handle to the underlying memory pool.""" + return driver.CUmemoryPool((self._handle)) + + @property + def is_handle_owned(self) -> bool: + """Whether the memory resource handle is owned. If False, ``close`` has no effect.""" + return self._mempool_owned + + @property + def peer_accessible_by(self): + """ + Get or set the devices that can access allocations from this memory + pool. Access can be modified at any time and affects all allocations + from this memory pool. + + Returns a tuple of sorted device IDs that currently have peer access to + allocations from this memory pool. + + When setting, accepts a sequence of Device objects or device IDs. + Setting to an empty sequence revokes all peer access. + + Examples + -------- + >>> dmr = DeviceMemoryResource(0) + >>> dmr.peer_accessible_by = [1] # Grant access to device 1 + >>> assert dmr.peer_accessible_by == (1,) + >>> dmr.peer_accessible_by = [] # Revoke access + """ + return self._peer_accessible_by + + @peer_accessible_by.setter + def peer_accessible_by(self, devices): + """Set which devices can access this memory pool.""" + from .._device import Device + + # Convert all devices to device IDs + cdef set[int] target_ids = {Device(dev).device_id for dev in devices} + target_ids.discard(self._dev_id) # exclude this device from peer access list + this_dev = Device(self._dev_id) + cdef list bad = [dev for dev in target_ids if not this_dev.can_access_peer(dev)] + if bad: + raise ValueError(f"Device {self._dev_id} cannot access peer(s): {', '.join(map(str, bad))}") + cdef set[int] cur_ids = set(self._peer_accessible_by) + cdef set[int] to_add = target_ids - cur_ids + cdef set[int] to_rm = cur_ids - target_ids + cdef size_t count = len(to_add) + len(to_rm) # transaction size + cdef cydriver.CUmemAccessDesc* access_desc = NULL + cdef size_t i = 0 + + if count > 0: + access_desc = PyMem_Malloc(count * sizeof(cydriver.CUmemAccessDesc)) + if access_desc == NULL: + raise MemoryError("Failed to allocate memory for access descriptors") + + try: + for dev_id in to_add: + access_desc[i].flags = cydriver.CUmemAccess_flags.CU_MEM_ACCESS_FLAGS_PROT_READWRITE + access_desc[i].location.type = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + access_desc[i].location.id = dev_id + i += 1 + + for dev_id in to_rm: + access_desc[i].flags = cydriver.CUmemAccess_flags.CU_MEM_ACCESS_FLAGS_PROT_NONE + access_desc[i].location.type = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + access_desc[i].location.id = dev_id + i += 1 + + with nogil: + HANDLE_RETURN(cydriver.cuMemPoolSetAccess(self._handle, access_desc, count)) + finally: + if access_desc != NULL: + PyMem_Free(access_desc) + + self._peer_accessible_by = tuple(target_ids) + + +# _MemPool Implementation +# ----------------------- + +cdef int _MP_init_current(_MemPool self, int dev_id, _MemPoolOptions opts) except?-1: + # Get the current memory pool. + cdef cydriver.cuuint64_t current_threshold + cdef cydriver.cuuint64_t max_threshold = ULLONG_MAX + cdef cydriver.CUmemLocation loc + + self._dev_id = dev_id + self._mempool_owned = False + + with nogil: + if opts._type == cydriver.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_PINNED \ + and opts._location == cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE: + assert dev_id >= 0 + HANDLE_RETURN(cydriver.cuDeviceGetMemPool(&(self._handle), dev_id)) + + # Set a higher release threshold to improve performance when there are + # no active allocations. By default, the release threshold is 0, which + # means memory is immediately released back to the OS when there are no + # active suballocations, causing performance issues. + HANDLE_RETURN( + cydriver.cuMemPoolGetAttribute( + self._handle, + cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD, + ¤t_threshold + ) + ) + + # If threshold is 0 (default), set it to maximum to retain memory in the pool. + if current_threshold == 0: + HANDLE_RETURN(cydriver.cuMemPoolSetAttribute( + self._handle, + cydriver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD, + &max_threshold + )) + elif opts._type == cydriver.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_PINNED \ + and opts._location == cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_HOST: + IF CUDA_CORE_BUILD_MAJOR >= 13: + assert dev_id == -1 + loc.id = dev_id + loc.type = opts._location + HANDLE_RETURN(cydriver.cuMemGetMemPool(&(self._handle), &loc, opts._type)) + ELSE: + raise RuntimeError("not supported") + elif opts._type == cydriver.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_PINNED \ + and opts._location == cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_HOST_NUMA: + IF CUDA_CORE_BUILD_MAJOR >= 13: + assert dev_id == 0 + loc.id = 0 + loc.type = opts._location + HANDLE_RETURN(cydriver.cuMemGetMemPool(&(self._handle), &loc, opts._type)) + ELSE: + raise RuntimeError("not supported") + else: + IF CUDA_CORE_BUILD_MAJOR >= 13: + if opts._type == cydriver.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_MANAGED: + # Managed memory pools + loc.id = dev_id + loc.type = opts._location + HANDLE_RETURN(cydriver.cuMemGetMemPool(&(self._handle), &loc, opts._type)) + else: + assert False + ELSE: + assert False + + return 0 + + +cdef int _MP_init_create(_MemPool self, int dev_id, _MemPoolOptions opts) except?-1: + cdef cydriver.CUmemPoolProps properties + memset(&properties, 0, sizeof(cydriver.CUmemPoolProps)) + + cdef bint ipc_enabled = opts._ipc_enabled + properties.allocType = opts._type + properties.handleTypes = _ipc.IPC_HANDLE_TYPE if ipc_enabled else cydriver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE + properties.location.id = dev_id + properties.location.type = opts._location + # managed memory does not support maxSize as of CUDA 13.0 + IF CUDA_CORE_BUILD_MAJOR >= 13: + if properties.allocType != cydriver.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_MANAGED: + properties.maxSize = opts._max_size + ELSE: + properties.maxSize = opts._max_size + + self._dev_id = dev_id + self._mempool_owned = True + + with nogil: + HANDLE_RETURN(cydriver.cuMemPoolCreate(&(self._handle), &properties)) + # TODO: should we also set the threshold here? + + if ipc_enabled: + alloc_handle = _ipc.MP_export_mempool(self) + self._ipc_data = _ipc.IPCDataForMR(alloc_handle, False) + + return 0 + + +# Raise an exception if the given stream is capturing. +# A result of CU_STREAM_CAPTURE_STATUS_INVALIDATED is considered an error. +cdef inline int check_not_capturing(cydriver.CUstream s) except?-1 nogil: + cdef cydriver.CUstreamCaptureStatus capturing + HANDLE_RETURN(cydriver.cuStreamIsCapturing(s, &capturing)) + if capturing != cydriver.CUstreamCaptureStatus.CU_STREAM_CAPTURE_STATUS_NONE: + raise RuntimeError("_MemPool cannot perform memory operations on " + "a capturing stream (consider using GraphMemoryResource).") + + +cdef inline Buffer _MP_allocate(_MemPool self, size_t size, Stream stream): + cdef cydriver.CUstream s = stream._handle + cdef cydriver.CUdeviceptr devptr + with nogil: + check_not_capturing(s) + HANDLE_RETURN(cydriver.cuMemAllocFromPoolAsync(&devptr, size, self._handle, s)) + cdef Buffer buf = Buffer.__new__(Buffer) + buf._ptr = (devptr) + buf._ptr_obj = None + buf._size = size + buf._memory_resource = self + buf._alloc_stream = stream + return buf + + +cdef inline void _MP_deallocate( + _MemPool self, uintptr_t ptr, size_t size, Stream stream +) noexcept: + cdef cydriver.CUstream s = stream._handle + cdef cydriver.CUdeviceptr devptr = ptr + cdef cydriver.CUresult r + with nogil: + r = cydriver.cuMemFreeAsync(devptr, s) + if r != cydriver.CUDA_ERROR_INVALID_CONTEXT: + HANDLE_RETURN(r) + + +cdef inline _MP_close(_MemPool self): + if self._handle == NULL: + return + + # This works around nvbug 5698116. When a memory pool handle is recycled + # the new handle inherits the peer access state of the previous handle. + if self._peer_accessible_by: + self.peer_accessible_by = [] + + try: + if self._mempool_owned: + with nogil: + HANDLE_RETURN(cydriver.cuMemPoolDestroy(self._handle)) + finally: + self._dev_id = cydriver.CU_DEVICE_INVALID + self._handle = NULL + self._attributes = None + self._mempool_owned = False + self._ipc_data = None + self._peer_accessible_by = () diff --git a/cuda_core/cuda/core/experimental/_memory/_pinned_memory_resource.pxd b/cuda_core/cuda/core/experimental/_memory/_pinned_memory_resource.pxd new file mode 100644 index 0000000000..df225c1860 --- /dev/null +++ b/cuda_core/cuda/core/experimental/_memory/_pinned_memory_resource.pxd @@ -0,0 +1,10 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from cuda.core.experimental._memory._memory_pool cimport _MemPool +from cuda.core.experimental._memory._ipc cimport IPCDataForMR + + +cdef class PinnedMemoryResource(_MemPool): + pass diff --git a/cuda_core/cuda/core/experimental/_memory/_pinned_memory_resource.pyx b/cuda_core/cuda/core/experimental/_memory/_pinned_memory_resource.pyx new file mode 100644 index 0000000000..f5395308e5 --- /dev/null +++ b/cuda_core/cuda/core/experimental/_memory/_pinned_memory_resource.pyx @@ -0,0 +1,274 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +from cuda.bindings cimport cydriver +from cuda.core.experimental._memory._memory_pool cimport _MemPool, _MemPoolOptions +from cuda.core.experimental._memory cimport _ipc +from cuda.core.experimental._memory._ipc cimport IPCAllocationHandle +from cuda.core.experimental._utils.cuda_utils cimport ( + check_or_create_options, + HANDLE_RETURN, +) + +from dataclasses import dataclass +from typing import Optional +import multiprocessing +import platform # no-cython-lint +import subprocess +import uuid +import warnings + +from cuda.core.experimental._utils.cuda_utils import check_multiprocessing_start_method + + +# Cache to ensure NUMA warning is only raised once per process +cdef bint _numa_warning_shown = False + + +def _check_numa_nodes(): + """Check if system has multiple NUMA nodes and warn if so.""" + global _numa_warning_shown + if _numa_warning_shown: + return + + if platform.system() != "Linux": + return + + numa_count = None + + # Try /sys filesystem first (most reliable and doesn't require external tools) + try: + import os + node_path = "/sys/devices/system/node" + if os.path.exists(node_path): + # Count directories named "node[0-9]+" + nodes = [d for d in os.listdir(node_path) if d.startswith("node") and d[4:].isdigit()] + numa_count = len(nodes) + except (OSError, PermissionError): + pass + + # Fallback to lscpu if /sys check didn't work + if numa_count is None: + try: + result = subprocess.run( + ["lscpu"], + capture_output=True, + text=True, + timeout=1 + ) + for line in result.stdout.splitlines(): + if line.startswith("NUMA node(s):"): + numa_count = int(line.split(":")[1].strip()) + break + except (subprocess.SubprocessError, ValueError, FileNotFoundError): + pass + + # Warn if multiple NUMA nodes detected + if numa_count is not None and numa_count > 1: + warnings.warn( + f"System has {numa_count} NUMA nodes. IPC-enabled pinned memory " + f"uses location ID 0, which may not work correctly with multiple " + f"NUMA nodes.", + UserWarning, + stacklevel=3 + ) + + _numa_warning_shown = True + + +__all__ = ['PinnedMemoryResource', 'PinnedMemoryResourceOptions'] + + +@dataclass +cdef class PinnedMemoryResourceOptions: + """Customizable :obj:`~_memory.PinnedMemoryResource` options. + + Attributes + ---------- + ipc_enabled : bool, optional + Specifies whether to create an IPC-enabled memory pool. When set to + True, the memory pool and its allocations can be shared with other + processes. (Default to False) + + max_size : int, optional + Maximum pool size. When set to 0, defaults to a system-dependent value. + (Default to 0) + """ + ipc_enabled : bool = False + max_size : int = 0 + + +cdef class PinnedMemoryResource(_MemPool): + """ + A host-pinned memory resource managing a stream-ordered memory pool. + + Parameters + ---------- + options : PinnedMemoryResourceOptions + Memory resource creation options. + + If set to `None`, the memory resource uses the driver's current + stream-ordered memory pool. If no memory + pool is set as current, the driver's default memory pool + is used. + + If not set to `None`, a new memory pool is created, which is owned by + the memory resource. + + When using an existing (current or default) memory pool, the returned + host-pinned memory resource does not own the pool (`is_handle_owned` is + `False`), and closing the resource has no effect. + + Notes + ----- + To create an IPC-Enabled memory resource (MR) that is capable of sharing + allocations between processes, specify ``ipc_enabled=True`` in the initializer + option. When IPC is enabled, the location type is automatically set to + CU_MEM_LOCATION_TYPE_HOST_NUMA instead of CU_MEM_LOCATION_TYPE_HOST, + with location ID 0. + + Note: IPC support for pinned memory requires a single NUMA node. A warning + is issued if multiple NUMA nodes are detected. + + See :class:`DeviceMemoryResource` for more details on IPC usage patterns. + """ + + def __init__(self, options=None): + cdef PinnedMemoryResourceOptions opts = check_or_create_options( + PinnedMemoryResourceOptions, options, "PinnedMemoryResource options", + keep_none=True + ) + cdef _MemPoolOptions opts_base = _MemPoolOptions() + + cdef bint ipc_enabled = False + if opts: + ipc_enabled = opts.ipc_enabled + if ipc_enabled and not _ipc.is_supported(): + raise RuntimeError(f"IPC is not available on {platform.system()}") + if ipc_enabled: + # Check for multiple NUMA nodes on Linux + _check_numa_nodes() + opts_base._max_size = opts.max_size + opts_base._use_current = False + opts_base._ipc_enabled = ipc_enabled + if ipc_enabled: + opts_base._location = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_HOST_NUMA + else: + opts_base._location = cydriver.CUmemLocationType.CU_MEM_LOCATION_TYPE_HOST + opts_base._type = cydriver.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_PINNED + + super().__init__(0 if ipc_enabled else -1, opts_base) + + def __reduce__(self): + return PinnedMemoryResource.from_registry, (self.uuid,) + + @staticmethod + def from_registry(uuid: uuid.UUID) -> PinnedMemoryResource: # no-cython-lint + """ + Obtain a registered mapped memory resource. + + Raises + ------ + RuntimeError + If no mapped memory resource is found in the registry. + """ + return (_ipc.MP_from_registry(uuid)) + + def register(self, uuid: uuid.UUID) -> PinnedMemoryResource: # no-cython-lint + """ + Register a mapped memory resource. + + Returns + ------- + The registered mapped memory resource. If one was previously registered + with the given key, it is returned. + """ + return (_ipc.MP_register(self, uuid)) + + @classmethod + def from_allocation_handle( + cls, alloc_handle: int | IPCAllocationHandle + ) -> PinnedMemoryResource: + """Create a host-pinned memory resource from an allocation handle. + + Construct a new `PinnedMemoryResource` instance that imports a memory + pool from a shareable handle. The memory pool is marked as owned. + + Parameters + ---------- + alloc_handle : int | IPCAllocationHandle + The shareable handle of the host-pinned memory resource to import. If an + integer is supplied, it must represent a valid platform-specific + handle. It is the caller's responsibility to close that handle. + + Returns + ------- + A new host-pinned memory resource instance with the imported handle. + """ + # cuMemPoolImportFromShareableHandle requires CUDA to be initialized, but in + # a child process CUDA may not be initialized yet. For DeviceMemoryResource, + # this is not a concern because most likely when retrieving the device_id the + # user would have already initialized CUDA. But since PinnedMemoryResource is + # not device-specific it is unlikelt the case. + HANDLE_RETURN(cydriver.cuInit(0)) + + cdef PinnedMemoryResource mr = ( + _ipc.MP_from_allocation_handle(cls, alloc_handle)) + return mr + + def get_allocation_handle(self) -> IPCAllocationHandle: + """Export the memory pool handle to be shared (requires IPC). + + The handle can be used to share the memory pool with other processes. + The handle is cached in this `MemoryResource` and owned by it. + + Returns + ------- + The shareable handle for the memory pool. + """ + if not self.is_ipc_enabled: + raise RuntimeError("Memory resource is not IPC-enabled") + return self._ipc_data._alloc_handle + + @property + def is_device_accessible(self) -> bool: + """Return True. This memory resource provides device-accessible buffers.""" + return True + + @property + def is_host_accessible(self) -> bool: + """Return True. This memory resource provides host-accessible buffers.""" + return True + + @property + def is_ipc_enabled(self) -> bool: + """Whether this memory resource has IPC enabled.""" + return self._ipc_data is not None + + @property + def is_mapped(self) -> bool: + """ + Whether this is a mapping of an IPC-enabled memory resource from + another process. If True, allocation is not permitted. + """ + return self._ipc_data is not None and self._ipc_data._is_mapped + + @property + def uuid(self) -> Optional[uuid.UUID]: + """ + A universally unique identifier for this memory resource. Meaningful + only for IPC-enabled memory resources. + """ + return getattr(self._ipc_data, 'uuid', None) + + +def _deep_reduce_pinned_memory_resource(mr): + check_multiprocessing_start_method() + alloc_handle = mr.get_allocation_handle() + return mr.from_allocation_handle, (alloc_handle,) + + +multiprocessing.reduction.register(PinnedMemoryResource, _deep_reduce_pinned_memory_resource) diff --git a/cuda_core/docs/source/api.rst b/cuda_core/docs/source/api.rst index 45be638eb6..1feeba5b12 100644 --- a/cuda_core/docs/source/api.rst +++ b/cuda_core/docs/source/api.rst @@ -26,12 +26,16 @@ CUDA runtime Event MemoryResource DeviceMemoryResource + PinnedMemoryResource + ManagedMemoryResource LegacyPinnedMemoryResource VirtualMemoryResource :template: dataclass.rst DeviceMemoryResourceOptions + PinnedMemoryResourceOptions + ManagedMemoryResourceOptions EventOptions GraphCompleteOptions GraphDebugPrintOptions diff --git a/cuda_core/docs/source/release/0.5.x-notes.rst b/cuda_core/docs/source/release/0.5.x-notes.rst index 4626a770c1..5b1378963a 100644 --- a/cuda_core/docs/source/release/0.5.x-notes.rst +++ b/cuda_core/docs/source/release/0.5.x-notes.rst @@ -21,7 +21,10 @@ None. New features ------------ -None. +- Added :class:`PinnedMemoryResource` and :class:`PinnedMemoryResourceOptions` for managing + host-pinned memory pools with optional IPC support. +- Added :class:`ManagedMemoryResource` and :class:`ManagedMemoryResourceOptions` for managing + unified memory pools accessible from both host and device. New examples diff --git a/cuda_core/tests/conftest.py b/cuda_core/tests/conftest.py index c0ea03930e..95539df16a 100644 --- a/cuda_core/tests/conftest.py +++ b/cuda_core/tests/conftest.py @@ -13,10 +13,44 @@ from cuda import cuda as driver import cuda.core.experimental -from cuda.core.experimental import Device, DeviceMemoryResource, DeviceMemoryResourceOptions, _device +from cuda.core.experimental import ( + Device, + DeviceMemoryResource, + DeviceMemoryResourceOptions, + ManagedMemoryResource, + ManagedMemoryResourceOptions, + PinnedMemoryResource, + PinnedMemoryResourceOptions, + _device, +) from cuda.core.experimental._utils.cuda_utils import handle_return +def skip_if_pinned_memory_unsupported(device): + try: + if not device.properties.host_memory_pools_supported: + pytest.skip("Device does not support host mempool operations") + except AttributeError: + pytest.skip("PinnedMemoryResource requires CUDA 13.0 or later") + + +def skip_if_managed_memory_unsupported(device): + try: + if not device.properties.memory_pools_supported or not device.properties.concurrent_managed_access: + pytest.skip("Device does not support managed memory pool operations") + except AttributeError: + pytest.skip("ManagedMemoryResource requires CUDA 13.0 or later") + + +def create_managed_memory_resource_or_skip(*args, **kwargs): + try: + return ManagedMemoryResource(*args, **kwargs) + except RuntimeError as e: + if "requires CUDA 13.0" in str(e): + pytest.skip("ManagedMemoryResource requires CUDA 13.0 or later") + raise + + @pytest.fixture(scope="session", autouse=True) def session_setup(): # Always init CUDA. @@ -102,11 +136,25 @@ def ipc_device(): return device -@pytest.fixture -def ipc_memory_resource(ipc_device): +@pytest.fixture( + params=[ + pytest.param("device", id="DeviceMR"), + pytest.param("pinned", id="PinnedMR"), + ] +) +def ipc_memory_resource(request, ipc_device): + """Provides IPC-enabled memory resource (either Device or Pinned).""" POOL_SIZE = 2097152 - options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) - mr = DeviceMemoryResource(ipc_device, options=options) + mr_type = request.param + + if mr_type == "device": + options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) + mr = DeviceMemoryResource(ipc_device, options=options) + else: # pinned + skip_if_pinned_memory_unsupported(ipc_device) + options = PinnedMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) + mr = PinnedMemoryResource(options=options) + assert mr.is_ipc_enabled return mr @@ -153,4 +201,30 @@ def mempool_device_x3(): return _mempool_device_impl(3) +@pytest.fixture( + params=[ + pytest.param((DeviceMemoryResource, DeviceMemoryResourceOptions), id="DeviceMR"), + pytest.param((PinnedMemoryResource, PinnedMemoryResourceOptions), id="PinnedMR"), + pytest.param((ManagedMemoryResource, ManagedMemoryResourceOptions), id="ManagedMR"), + ] +) +def memory_resource_factory(request, init_cuda): + """Parametrized fixture providing memory resource types. + + Returns a 2-tuple of (MRClass, MROptionClass). + + Usage: + def test_something(memory_resource_factory): + MRClass, MROptions = memory_resource_factory + device = Device() + if MRClass is DeviceMemoryResource: + mr = MRClass(device) + elif MRClass is PinnedMemoryResource: + mr = MRClass() + elif MRClass is ManagedMemoryResource: + mr = MRClass() + """ + return request.param + + skipif_need_cuda_headers = pytest.mark.skipif(helpers.CUDA_INCLUDE_PATH is None, reason="need CUDA header") diff --git a/cuda_core/tests/memory_ipc/test_event_ipc.py b/cuda_core/tests/memory_ipc/test_event_ipc.py index ce756cba21..5edf97f2ae 100644 --- a/cuda_core/tests/memory_ipc/test_event_ipc.py +++ b/cuda_core/tests/memory_ipc/test_event_ipc.py @@ -22,7 +22,12 @@ def test_main(self, ipc_device, ipc_memory_resource): device = ipc_device mr = ipc_memory_resource stream1 = device.create_stream() - latch = LatchKernel(device) + # TODO: We pick a timeout here to ensure forward progress (it needs to be + # less than CHILD_TIMEOUT_SEC) when a pinned memory resource is in use, + # in which case the call to buffer.copy_from(...) below is a synchronous + # operation that blocks the host. But calling the latch kernel here does not + # make any sense. We should refactor this test. + latch = LatchKernel(device, timeout_sec=5) # Start the child process. q_out, q_in = [mp.Queue() for _ in range(2)] diff --git a/cuda_core/tests/memory_ipc/test_serialize.py b/cuda_core/tests/memory_ipc/test_serialize.py index 7fe65b2b4a..f5686db28c 100644 --- a/cuda_core/tests/memory_ipc/test_serialize.py +++ b/cuda_core/tests/memory_ipc/test_serialize.py @@ -151,41 +151,20 @@ def test_main(self, ipc_device, ipc_memory_resource): pgen.verify_buffer(buffer, seed=True) buffer.close() - def child_main(self, alloc_handle, mr1, buffer_desc, buffer1): + def child_main(self, alloc_handle, mr1, buffer_desc, buffer): device = Device() device.set_current() - mr2 = DeviceMemoryResource.from_allocation_handle(device, alloc_handle) + mr2 = DeviceMemoryResource.from_allocation_handle(device, alloc_handle) # noqa: F841 pgen = PatternGen(device, NBYTES) - # OK to build the buffer from either mr and the descriptor. - # All buffer* objects point to the same memory. - buffer2 = Buffer.from_ipc_descriptor(mr1, buffer_desc) - buffer3 = Buffer.from_ipc_descriptor(mr2, buffer_desc) + # Verify initial content + pgen.verify_buffer(buffer, seed=False) - pgen.verify_buffer(buffer1, seed=False) - pgen.verify_buffer(buffer2, seed=False) - pgen.verify_buffer(buffer3, seed=False) - - # Modify 1. - pgen.fill_buffer(buffer1, seed=True) - - pgen.verify_buffer(buffer1, seed=True) - pgen.verify_buffer(buffer2, seed=True) - pgen.verify_buffer(buffer3, seed=True) - - # Modify 2. - pgen.fill_buffer(buffer2, seed=False) - - pgen.verify_buffer(buffer1, seed=False) - pgen.verify_buffer(buffer2, seed=False) - pgen.verify_buffer(buffer3, seed=False) - - # Modify 3. - pgen.fill_buffer(buffer3, seed=True) + # Modify the buffer + pgen.fill_buffer(buffer, seed=True) - pgen.verify_buffer(buffer1, seed=True) - pgen.verify_buffer(buffer2, seed=True) - pgen.verify_buffer(buffer3, seed=True) + # Verify modified content + pgen.verify_buffer(buffer, seed=True) - # Close any one buffer. - buffer1.close() + # Clean up - only ONE free + buffer.close() diff --git a/cuda_core/tests/test_graph_mem.py b/cuda_core/tests/test_graph_mem.py index 964ce03b93..15b34dc359 100644 --- a/cuda_core/tests/test_graph_mem.py +++ b/cuda_core/tests/test_graph_mem.py @@ -275,7 +275,7 @@ def test_dmr_check_capture_state(mempool_device, mode): gb = device.create_graph_builder().begin_building(mode=mode) with pytest.raises( RuntimeError, - match=r"DeviceMemoryResource cannot perform memory operations on a capturing " + match=r"cannot perform memory operations on a capturing " r"stream \(consider using GraphMemoryResource\)\.", ): dmr.allocate(1, stream=gb) diff --git a/cuda_core/tests/test_memory.py b/cuda_core/tests/test_memory.py index be46802493..fd2d1f1b08 100644 --- a/cuda_core/tests/test_memory.py +++ b/cuda_core/tests/test_memory.py @@ -23,7 +23,11 @@ DeviceMemoryResource, DeviceMemoryResourceOptions, GraphMemoryResource, + ManagedMemoryResource, + ManagedMemoryResourceOptions, MemoryResource, + PinnedMemoryResource, + PinnedMemoryResourceOptions, VirtualMemoryResource, VirtualMemoryResourceOptions, ) @@ -37,6 +41,11 @@ from helpers import IS_WINDOWS from helpers.buffers import DummyUnifiedMemoryResource +from conftest import ( + create_managed_memory_resource_or_skip, + skip_if_managed_memory_unsupported, + skip_if_pinned_memory_unsupported, +) from cuda_python_test_helpers import supports_ipc_mempool POOL_SIZE = 2097152 # 2MB size @@ -132,6 +141,10 @@ def test_package_contents(): "IPCBufferDescriptor", "IPCAllocationHandle", "LegacyPinnedMemoryResource", + "ManagedMemoryResource", + "ManagedMemoryResourceOptions", + "PinnedMemoryResourceOptions", + "PinnedMemoryResource", "VirtualMemoryResourceOptions", "VirtualMemoryResource", ] @@ -544,6 +557,45 @@ def test_device_memory_resource_initialization(use_device_object): buffer.close() +def test_pinned_memory_resource_initialization(init_cuda): + device = Device() + skip_if_pinned_memory_unsupported(device) + + device.set_current() + + mr = PinnedMemoryResource() + assert mr.is_device_accessible + assert mr.is_host_accessible + + # Test allocation/deallocation works + buffer = mr.allocate(1024) + assert buffer.size == 1024 + assert buffer.device_id == -1 # Not bound to any GPU + assert buffer.is_host_accessible + assert buffer.memory_resource == mr + assert buffer.is_device_accessible + buffer.close() + + +def test_managed_memory_resource_initialization(init_cuda): + device = Device() + skip_if_managed_memory_unsupported(device) + + device.set_current() + + mr = create_managed_memory_resource_or_skip() + assert mr.is_device_accessible + assert mr.is_host_accessible + + # Test allocation/deallocation works + buffer = mr.allocate(1024) + assert buffer.size == 1024 + assert buffer.is_host_accessible # But accessible from host + assert buffer.memory_resource == mr + assert buffer.is_device_accessible + buffer.close() + + def get_handle_type(): if IS_WINDOWS: return (("win32", None), ("win32_kmt", None)) @@ -745,9 +797,8 @@ def test_vmm_allocator_rdma_unsupported_exception(): VirtualMemoryResource(device, config=options) -def test_device_memory_resource(): +def test_device_memory_resource_with_options(init_cuda): device = Device() - if not device.properties.memory_pools_supported: pytest.skip("Device does not support mempool operations") @@ -781,7 +832,98 @@ def test_device_memory_resource(): stream = device.create_stream() buffer = mr.allocate(1024, stream=stream) assert buffer.handle != 0 - buffer.close() + buffer.close(stream) + + # Test memory copying between buffers from same pool + src_buffer = mr.allocate(64) + dst_buffer = mr.allocate(64) + stream = device.create_stream() + src_buffer.copy_to(dst_buffer, stream=stream) + device.sync() + dst_buffer.close() + src_buffer.close() + + +def test_pinned_memory_resource_with_options(init_cuda): + device = Device() + skip_if_pinned_memory_unsupported(device) + + device.set_current() + + # Test basic pool creation + options = PinnedMemoryResourceOptions(max_size=POOL_SIZE) + mr = PinnedMemoryResource(options) + assert mr.device_id == -1 # Not bound to any GPU + assert mr.is_device_accessible + assert mr.is_host_accessible + assert not mr.is_ipc_enabled + + # Test allocation and deallocation + buffer1 = mr.allocate(1024) + assert buffer1.handle != 0 + assert buffer1.size == 1024 + assert buffer1.memory_resource == mr + buffer1.close() + + # Test multiple allocations + buffer1 = mr.allocate(1024) + buffer2 = mr.allocate(2048) + assert buffer1.handle != buffer2.handle + assert buffer1.size == 1024 + assert buffer2.size == 2048 + buffer1.close() + buffer2.close() + + # Test stream-based allocation + stream = device.create_stream() + buffer = mr.allocate(1024, stream=stream) + assert buffer.handle != 0 + buffer.close(stream) + + # Test memory copying between buffers from same pool + src_buffer = mr.allocate(64) + dst_buffer = mr.allocate(64) + stream = device.create_stream() + src_buffer.copy_to(dst_buffer, stream=stream) + device.sync() + dst_buffer.close() + src_buffer.close() + + +def test_managed_memory_resource_with_options(init_cuda): + device = Device() + skip_if_managed_memory_unsupported(device) + + device.set_current() + + # Test basic pool creation + options = ManagedMemoryResourceOptions() + mr = create_managed_memory_resource_or_skip(options) + assert mr.is_device_accessible + assert mr.is_host_accessible + assert not mr.is_ipc_enabled + + # Test allocation and deallocation + buffer1 = mr.allocate(1024) + assert buffer1.handle != 0 + assert buffer1.size == 1024 + assert buffer1.memory_resource == mr + buffer1.close() + + # Test multiple allocations + buffer1 = mr.allocate(1024) + buffer2 = mr.allocate(2048) + assert buffer1.handle != buffer2.handle + assert buffer1.size == 1024 + assert buffer2.size == 2048 + buffer1.close() + buffer2.close() + + # Test stream-based allocation + stream = device.create_stream() + buffer = mr.allocate(1024, stream=stream) + assert buffer.handle != 0 + buffer.close(stream) # Test memory copying between buffers from same pool src_buffer = mr.allocate(64) @@ -814,6 +956,76 @@ def test_mempool_ipc_errors(mempool_device): buffer.close() +def test_pinned_mempool_ipc_basic(): + """Test basic IPC functionality for PinnedMemoryResource.""" + device = Device() + device.set_current() + + skip_if_pinned_memory_unsupported(device) + + if platform.system() == "Windows": + pytest.skip("IPC not implemented for Windows") + + if not supports_ipc_mempool(device): + pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") + + # Test IPC-enabled PinnedMemoryResource creation + options = PinnedMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) + mr = PinnedMemoryResource(options) + assert mr.is_ipc_enabled + assert mr.is_device_accessible + assert mr.is_host_accessible + assert mr.device_id == 0 # IPC-enabled uses location id 0 + + # Test allocation handle export + alloc_handle = mr.get_allocation_handle() + assert alloc_handle is not None + + # Test buffer allocation + buffer = mr.allocate(1024) + assert buffer.size == 1024 + assert buffer.is_device_accessible + assert buffer.is_host_accessible + + # Test IPC descriptor + ipc_desc = buffer.get_ipc_descriptor() + assert ipc_desc is not None + assert ipc_desc.size == 1024 + + buffer.close() + mr.close() + + +def test_pinned_mempool_ipc_errors(): + """Test error cases when IPC operations are disabled for PinnedMemoryResource.""" + device = Device() + device.set_current() + + skip_if_pinned_memory_unsupported(device) + + # Test with IPC disabled (default) + options = PinnedMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=False) + mr = PinnedMemoryResource(options) + assert not mr.is_ipc_enabled + assert mr.device_id == -1 # Non-IPC uses location id -1 + + buffer = mr.allocate(64) + ipc_error_msg = "Memory resource is not IPC-enabled" + + with pytest.raises(RuntimeError, match=ipc_error_msg): + mr.get_allocation_handle() + + with pytest.raises(RuntimeError, match=ipc_error_msg): + buffer.get_ipc_descriptor() + + with pytest.raises(RuntimeError, match=ipc_error_msg): + handle = IPCBufferDescriptor._init(b"", 0) + Buffer.from_ipc_descriptor(mr, handle) + + buffer.close() + mr.close() + + @pytest.mark.parametrize("ipc_enabled", [True, False]) @pytest.mark.parametrize( "property_name,expected_type", @@ -828,18 +1040,42 @@ def test_mempool_ipc_errors(mempool_device): ("used_mem_high", int), ], ) -def test_mempool_attributes(ipc_enabled, mempool_device, property_name, expected_type): - """Test all properties of the DeviceMemoryResource class.""" - device = mempool_device +def test_mempool_attributes(ipc_enabled, memory_resource_factory, property_name, expected_type): + """Test all properties of memory pool attributes for all memory resource types.""" + MR, MRops = memory_resource_factory + device = Device() + + if MR is DeviceMemoryResource and not device.properties.memory_pools_supported: + pytest.skip("Device does not support mempool operations") + elif MR is PinnedMemoryResource: + skip_if_pinned_memory_unsupported(device) + elif MR is ManagedMemoryResource: + skip_if_managed_memory_unsupported(device) + + # ManagedMemoryResource does not support IPC + if MR is ManagedMemoryResource and ipc_enabled: + pytest.skip(f"{MR.__name__} does not support IPC") + + device.set_current() + if platform.system() == "Windows": return # IPC not implemented for Windows if ipc_enabled and not supports_ipc_mempool(device): pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") - options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=ipc_enabled) - mr = DeviceMemoryResource(device, options=options) - assert mr.is_ipc_enabled == ipc_enabled + if MR is DeviceMemoryResource: + options = MRops(max_size=POOL_SIZE, ipc_enabled=ipc_enabled) + mr = MR(device, options=options) + assert mr.is_ipc_enabled == ipc_enabled + elif MR is PinnedMemoryResource: + options = MRops(max_size=POOL_SIZE, ipc_enabled=ipc_enabled) + mr = MR(options) + assert mr.is_ipc_enabled == ipc_enabled + elif MR is ManagedMemoryResource: + options = MRops() + mr = create_managed_memory_resource_or_skip(options) + assert not mr.is_ipc_enabled # Get the property value value = getattr(mr.attributes, property_name) @@ -872,15 +1108,32 @@ def test_mempool_attributes(ipc_enabled, mempool_device, property_name, expected assert value >= current_value, f"{property_name} should be >= {current_prop}" -def test_mempool_attributes_repr(mempool_device): +def test_mempool_attributes_repr(memory_resource_factory): + """Test the repr of memory pool attributes for all memory resource types.""" + MR, MRops = memory_resource_factory device = Device() + + if MR is DeviceMemoryResource and not device.properties.memory_pools_supported: + pytest.skip("Device does not support mempool operations") + elif MR is PinnedMemoryResource: + skip_if_pinned_memory_unsupported(device) + elif MR is ManagedMemoryResource: + skip_if_managed_memory_unsupported(device) + device.set_current() - mr = DeviceMemoryResource(device, options={"max_size": 2048}) + + if MR is DeviceMemoryResource: + mr = MR(device, options={"max_size": 2048}) + elif MR is PinnedMemoryResource: + mr = MR(options={"max_size": 2048}) + elif MR is ManagedMemoryResource: + mr = create_managed_memory_resource_or_skip(options={}) + buffer1 = mr.allocate(64) buffer2 = mr.allocate(64) buffer1.close() assert re.match( - r"DeviceMemoryResourceAttributes\(release_threshold=\d+, reserved_mem_current=\d+, reserved_mem_high=\d+, " + r".*Attributes\(release_threshold=\d+, reserved_mem_current=\d+, reserved_mem_high=\d+, " r"reuse_allow_internal_dependencies=(True|False), reuse_allow_opportunistic=(True|False), " r"reuse_follow_event_dependencies=(True|False), used_mem_current=\d+, used_mem_high=\d+\)", str(mr.attributes), @@ -888,26 +1141,49 @@ def test_mempool_attributes_repr(mempool_device): buffer2.close() -def test_mempool_attributes_ownership(mempool_device): - """Ensure the attributes bundle handles references correctly.""" - device = mempool_device - # Skip if IPC mempool is not supported on this platform/device - if not supports_ipc_mempool(device): +def test_mempool_attributes_ownership(memory_resource_factory): + """Ensure the attributes bundle handles references correctly for all memory resource types.""" + MR, MRops = memory_resource_factory + device = Device() + + if MR is DeviceMemoryResource and not device.properties.memory_pools_supported: + pytest.skip("Device does not support mempool operations") + elif MR is PinnedMemoryResource: + skip_if_pinned_memory_unsupported(device) + elif MR is ManagedMemoryResource: + skip_if_managed_memory_unsupported(device) + + # Skip if IPC mempool is not supported on this platform/device (only relevant for DeviceMemoryResource) + if MR is DeviceMemoryResource and not supports_ipc_mempool(device): pytest.skip("Driver rejects IPC-enabled mempool creation on this platform") - mr = DeviceMemoryResource(device, dict(max_size=POOL_SIZE)) + device.set_current() + + if MR is DeviceMemoryResource: + mr = MR(device, dict(max_size=POOL_SIZE)) + elif MR is PinnedMemoryResource: + mr = MR(dict(max_size=POOL_SIZE)) + elif MR is ManagedMemoryResource: + mr = create_managed_memory_resource_or_skip(dict()) + attributes = mr.attributes mr.close() del mr # After deleting the memory resource, the attributes suite is disconnected. - with pytest.raises(RuntimeError, match="DeviceMemoryResource is expired"): + with pytest.raises(RuntimeError, match="is expired"): _ = attributes.used_mem_high # Even when a new object is created (we found a case where the same # mempool handle was really reused). - mr = DeviceMemoryResource(device, dict(max_size=POOL_SIZE)) # noqa: F841 - with pytest.raises(RuntimeError, match="DeviceMemoryResource is expired"): + if MR is DeviceMemoryResource: + mr = MR(device, dict(max_size=POOL_SIZE)) # noqa: F841 + elif MR is PinnedMemoryResource: + mr = MR(dict(max_size=POOL_SIZE)) # noqa: F841 + elif MR is ManagedMemoryResource: + mr = create_managed_memory_resource_or_skip(dict()) # noqa: F841 + + with pytest.raises(RuntimeError, match="is expired"): _ = attributes.used_mem_high diff --git a/cuda_core/tests/test_multiprocessing_warning.py b/cuda_core/tests/test_multiprocessing_warning.py index 945ea83964..8b490af233 100644 --- a/cuda_core/tests/test_multiprocessing_warning.py +++ b/cuda_core/tests/test_multiprocessing_warning.py @@ -14,10 +14,8 @@ from cuda.core.experimental import DeviceMemoryResource, DeviceMemoryResourceOptions, EventOptions from cuda.core.experimental._event import _reduce_event -from cuda.core.experimental._memory._ipc import ( - _deep_reduce_device_memory_resource, - _reduce_allocation_handle, -) +from cuda.core.experimental._memory._device_memory_resource import _deep_reduce_device_memory_resource +from cuda.core.experimental._memory._ipc import _reduce_allocation_handle from cuda.core.experimental._utils.cuda_utils import reset_fork_warning