diff --git a/cuda_core/tests/conftest.py b/cuda_core/tests/conftest.py index 3804124438..c0ea03930e 100644 --- a/cuda_core/tests/conftest.py +++ b/cuda_core/tests/conftest.py @@ -12,6 +12,7 @@ except ImportError: from cuda import cuda as driver +import cuda.core.experimental from cuda.core.experimental import Device, DeviceMemoryResource, DeviceMemoryResourceOptions, _device from cuda.core.experimental._utils.cuda_utils import handle_return @@ -28,7 +29,7 @@ def session_setup(): @pytest.fixture(scope="function") def init_cuda(): # TODO: rename this to e.g. init_context - device = Device() + device = Device(0) device.set_current() # Set option to avoid spin-waiting on synchronization. @@ -83,7 +84,7 @@ def pop_all_contexts(): def ipc_device(): """Obtains a device suitable for IPC-enabled mempool tests, or skips.""" # Check if IPC is supported on this platform/device - device = Device() + device = Device(0) device.set_current() if not device.properties.memory_pools_supported: @@ -113,7 +114,7 @@ def ipc_memory_resource(ipc_device): @pytest.fixture def mempool_device(): """Obtains a device suitable for mempool tests, or skips.""" - device = Device() + device = Device(0) device.set_current() if not device.properties.memory_pools_supported: @@ -122,4 +123,34 @@ def mempool_device(): return device +def _mempool_device_impl(num): + num_devices = len(cuda.core.experimental.system.devices) + if num_devices < num: + pytest.skip(f"Test requires at least {num} GPUs") + + devs = [Device(i) for i in range(num)] + for i in reversed(range(num)): + devs[i].set_current() # ends with device 0 current + + if not all(devs[i].can_access_peer(j) for i in range(num) for j in range(num)): + pytest.skip("Test requires GPUs with peer access") + + if not all(devs[i].properties.memory_pools_supported for i in range(num)): + pytest.skip("Device does not support mempool operations") + + return devs + + +@pytest.fixture +def mempool_device_x2(): + """Fixture that provides two devices if available, otherwise skips test.""" + return _mempool_device_impl(2) + + +@pytest.fixture +def mempool_device_x3(): + """Fixture that provides three devices if available, otherwise skips test.""" + return _mempool_device_impl(3) + + skipif_need_cuda_headers = pytest.mark.skipif(helpers.CUDA_INCLUDE_PATH is None, reason="need CUDA header") diff --git a/cuda_core/tests/memory_ipc/test_peer_access.py b/cuda_core/tests/memory_ipc/test_peer_access.py new file mode 100644 index 0000000000..87dc459ffc --- /dev/null +++ b/cuda_core/tests/memory_ipc/test_peer_access.py @@ -0,0 +1,121 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 + +import multiprocessing as mp + +import pytest +from cuda.core.experimental import Device, DeviceMemoryResource, DeviceMemoryResourceOptions +from cuda.core.experimental._utils.cuda_utils import CUDAError +from helpers.buffers import PatternGen + +CHILD_TIMEOUT_SEC = 20 +NBYTES = 64 +POOL_SIZE = 2097152 + + +class TestPeerAccessNotPreservedOnImport: + """ + Verify that peer access settings are not preserved when a memory resource + is sent to another process via IPC, and that peer access can be set after import. + """ + + def test_main(self, mempool_device_x2): + dev0, dev1 = mempool_device_x2 + + # Parent Process - Create and Configure MR + dev1.set_current() + options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) + mr = DeviceMemoryResource(dev1, options=options) + mr.peer_accessible_by = [dev0] + assert mr.peer_accessible_by == (0,) + + # Spawn child process + process = mp.Process(target=self.child_main, args=(mr,)) + process.start() + process.join(timeout=CHILD_TIMEOUT_SEC) + assert process.exitcode == 0 + + # Verify parent's MR still has peer access set (independent state) + assert mr.peer_accessible_by == (0,) + mr.close() + + def child_main(self, mr): + Device(1).set_current() + assert mr.is_mapped is True + assert mr.device_id == 1 + assert mr.peer_accessible_by == () + mr.peer_accessible_by = [0] + assert mr.peer_accessible_by == (0,) + mr.peer_accessible_by = [] + assert mr.peer_accessible_by == () + mr.close() + + +class TestBufferPeerAccessAfterImport: + """ + Verify that buffers imported via IPC can be accessed from peer devices after + setting peer access on the imported memory resource, and that access can be revoked. + """ + + @pytest.mark.parametrize("grant_access_in_parent", [True, False]) + def test_main(self, mempool_device_x2, grant_access_in_parent): + dev0, dev1 = mempool_device_x2 + + # Parent Process - Create MR and Buffer + dev1.set_current() + options = DeviceMemoryResourceOptions(max_size=POOL_SIZE, ipc_enabled=True) + mr = DeviceMemoryResource(dev1, options=options) + if grant_access_in_parent: + mr.peer_accessible_by = [dev0] + assert mr.peer_accessible_by == (0,) + else: + assert mr.peer_accessible_by == () + buffer = mr.allocate(NBYTES) + pgen = PatternGen(dev1, NBYTES) + pgen.fill_buffer(buffer, seed=False) + + # Spawn child process + process = mp.Process(target=self.child_main, args=(mr, buffer)) + process.start() + process.join(timeout=CHILD_TIMEOUT_SEC) + assert process.exitcode == 0 + + buffer.close() + mr.close() + + def child_main(self, mr, buffer): + # Verify MR and buffer are mapped + Device(1).set_current() + assert mr.is_mapped is True + assert buffer.is_mapped is True + assert mr.device_id == 1 + assert buffer.device_id == 1 + + # Test 1: Buffer accessible from resident device (dev1) - should always work + dev1 = Device(1) + dev1.set_current() + PatternGen(dev1, NBYTES).verify_buffer(buffer, seed=False) + + # Test 2: Buffer NOT accessible from dev0 initially (peer access not preserved) + dev0 = Device(0) + dev0.set_current() + with pytest.raises(CUDAError, match="CUDA_ERROR_INVALID_VALUE"): + PatternGen(dev0, NBYTES).verify_buffer(buffer, seed=False) + + # Test 3: Set peer access and verify buffer becomes accessible + dev1.set_current() + mr.peer_accessible_by = [0] + assert mr.peer_accessible_by == (0,) + dev0.set_current() + PatternGen(dev0, NBYTES).verify_buffer(buffer, seed=False) + + # Test 4: Revoke peer access and verify buffer becomes inaccessible + dev1.set_current() + mr.peer_accessible_by = [] + assert mr.peer_accessible_by == () + dev0.set_current() + with pytest.raises(CUDAError, match="CUDA_ERROR_INVALID_VALUE"): + PatternGen(dev0, NBYTES).verify_buffer(buffer, seed=False) + + buffer.close() + mr.close() diff --git a/cuda_core/tests/memory_ipc/test_serialize.py b/cuda_core/tests/memory_ipc/test_serialize.py index ceac50e502..7fe65b2b4a 100644 --- a/cuda_core/tests/memory_ipc/test_serialize.py +++ b/cuda_core/tests/memory_ipc/test_serialize.py @@ -122,7 +122,7 @@ def child_main(self, pipe, _): buffer.close() -def test_object_passing(ipc_device, ipc_memory_resource): +class TestObjectPassing: """ Test sending objects as arguments when starting a process. @@ -131,61 +131,61 @@ def test_object_passing(ipc_device, ipc_memory_resource): in multiprocessing (e.g., Queue) work. """ - # Define the objects. - device = ipc_device - mr = ipc_memory_resource - alloc_handle = mr.get_allocation_handle() - buffer = mr.allocate(NBYTES) - buffer_desc = buffer.get_ipc_descriptor() - - pgen = PatternGen(device, NBYTES) - pgen.fill_buffer(buffer, seed=False) + def test_main(self, ipc_device, ipc_memory_resource): + # Define the objects. + device = ipc_device + mr = ipc_memory_resource + alloc_handle = mr.get_allocation_handle() + buffer = mr.allocate(NBYTES) + buffer_desc = buffer.get_ipc_descriptor() - # Start the child process. - process = mp.Process(target=child_main, args=(alloc_handle, mr, buffer_desc, buffer)) - process.start() - process.join(timeout=CHILD_TIMEOUT_SEC) - assert process.exitcode == 0 + pgen = PatternGen(device, NBYTES) + pgen.fill_buffer(buffer, seed=False) - pgen.verify_buffer(buffer, seed=True) - buffer.close() + # Start the child process. + process = mp.Process(target=self.child_main, args=(alloc_handle, mr, buffer_desc, buffer)) + process.start() + process.join(timeout=CHILD_TIMEOUT_SEC) + assert process.exitcode == 0 + pgen.verify_buffer(buffer, seed=True) + buffer.close() -def child_main(alloc_handle, mr1, buffer_desc, buffer1): - device = Device() - device.set_current() - mr2 = DeviceMemoryResource.from_allocation_handle(device, alloc_handle) - pgen = PatternGen(device, NBYTES) + def child_main(self, alloc_handle, mr1, buffer_desc, buffer1): + device = Device() + device.set_current() + mr2 = DeviceMemoryResource.from_allocation_handle(device, alloc_handle) + pgen = PatternGen(device, NBYTES) - # OK to build the buffer from either mr and the descriptor. - # All buffer* objects point to the same memory. - buffer2 = Buffer.from_ipc_descriptor(mr1, buffer_desc) - buffer3 = Buffer.from_ipc_descriptor(mr2, buffer_desc) + # OK to build the buffer from either mr and the descriptor. + # All buffer* objects point to the same memory. + buffer2 = Buffer.from_ipc_descriptor(mr1, buffer_desc) + buffer3 = Buffer.from_ipc_descriptor(mr2, buffer_desc) - pgen.verify_buffer(buffer1, seed=False) - pgen.verify_buffer(buffer2, seed=False) - pgen.verify_buffer(buffer3, seed=False) + pgen.verify_buffer(buffer1, seed=False) + pgen.verify_buffer(buffer2, seed=False) + pgen.verify_buffer(buffer3, seed=False) - # Modify 1. - pgen.fill_buffer(buffer1, seed=True) + # Modify 1. + pgen.fill_buffer(buffer1, seed=True) - pgen.verify_buffer(buffer1, seed=True) - pgen.verify_buffer(buffer2, seed=True) - pgen.verify_buffer(buffer3, seed=True) + pgen.verify_buffer(buffer1, seed=True) + pgen.verify_buffer(buffer2, seed=True) + pgen.verify_buffer(buffer3, seed=True) - # Modify 2. - pgen.fill_buffer(buffer2, seed=False) + # Modify 2. + pgen.fill_buffer(buffer2, seed=False) - pgen.verify_buffer(buffer1, seed=False) - pgen.verify_buffer(buffer2, seed=False) - pgen.verify_buffer(buffer3, seed=False) + pgen.verify_buffer(buffer1, seed=False) + pgen.verify_buffer(buffer2, seed=False) + pgen.verify_buffer(buffer3, seed=False) - # Modify 3. - pgen.fill_buffer(buffer3, seed=True) + # Modify 3. + pgen.fill_buffer(buffer3, seed=True) - pgen.verify_buffer(buffer1, seed=True) - pgen.verify_buffer(buffer2, seed=True) - pgen.verify_buffer(buffer3, seed=True) + pgen.verify_buffer(buffer1, seed=True) + pgen.verify_buffer(buffer2, seed=True) + pgen.verify_buffer(buffer3, seed=True) - # Close any one buffer. - buffer1.close() + # Close any one buffer. + buffer1.close() diff --git a/cuda_core/tests/test_memory_peer_access.py b/cuda_core/tests/test_memory_peer_access.py index 4bb9fe561d..66c2af23f1 100644 --- a/cuda_core/tests/test_memory_peer_access.py +++ b/cuda_core/tests/test_memory_peer_access.py @@ -3,43 +3,13 @@ import cuda.core.experimental import pytest -from cuda.core.experimental import Device, DeviceMemoryResource +from cuda.core.experimental import DeviceMemoryResource from cuda.core.experimental._utils.cuda_utils import CUDAError from helpers.buffers import PatternGen, compare_buffer_to_constant, make_scratch_buffer NBYTES = 1024 -def _mempool_device_impl(num): - num_devices = len(cuda.core.experimental.system.devices) - if num_devices < num: - pytest.skip("Test requires at least {num} GPUs") - - devs = [Device(i) for i in range(num)] - for i in reversed(range(num)): - devs[i].set_current() - - if not all(devs[i].can_access_peer(j) for i in range(num) for j in range(num)): - pytest.skip("Test requires GPUs with peer access") - - if not all(devs[i].properties.memory_pools_supported for i in range(num)): - pytest.skip("Device does not support mempool operations") - - return devs - - -@pytest.fixture -def mempool_device_x2(): - """Fixture that provides two devices if available, otherwise skips test.""" - return _mempool_device_impl(2) - - -@pytest.fixture -def mempool_device_x3(): - """Fixture that provides three devices if available, otherwise skips test.""" - return _mempool_device_impl(3) - - def test_peer_access_basic(mempool_device_x2): """Basic tests for dmr.peer_accessible_by.""" dev0, dev1 = mempool_device_x2