diff --git a/cpp/src/arrow/buffer_test.cc b/cpp/src/arrow/buffer_test.cc index 13f6ea63b5e..06ed0bfba04 100644 --- a/cpp/src/arrow/buffer_test.cc +++ b/cpp/src/arrow/buffer_test.cc @@ -1023,4 +1023,17 @@ TEST(TestBufferConcatenation, EmptyBuffer) { AssertMyBufferEqual(*result, contents); } +TEST(TestDeviceRegistry, Basics) { + // Test the error cases for the device registry + + // CPU is already registered + ASSERT_RAISES(KeyError, + RegisterDeviceMapper(DeviceAllocationType::kCPU, [](int64_t device_id) { + return default_cpu_memory_manager(); + })); + + // VPI is not registered + ASSERT_RAISES(KeyError, GetDeviceMapper(DeviceAllocationType::kVPI)); +} + } // namespace arrow diff --git a/cpp/src/arrow/c/bridge.cc b/cpp/src/arrow/c/bridge.cc index 4ec79a73029..d004de7a2ea 100644 --- a/cpp/src/arrow/c/bridge.cc +++ b/cpp/src/arrow/c/bridge.cc @@ -1967,12 +1967,11 @@ Result> ImportRecordBatch(struct ArrowArray* array, return ImportRecordBatch(array, *maybe_schema); } -Result> DefaultDeviceMapper(ArrowDeviceType device_type, - int64_t device_id) { - if (device_type != ARROW_DEVICE_CPU) { - return Status::NotImplemented("Only importing data on CPU is supported"); - } - return default_cpu_memory_manager(); +Result> DefaultDeviceMemoryMapper( + ArrowDeviceType device_type, int64_t device_id) { + ARROW_ASSIGN_OR_RAISE(auto mapper, + GetDeviceMapper(static_cast(device_type))); + return mapper(device_id); } Result> ImportDeviceArray(struct ArrowDeviceArray* array, diff --git a/cpp/src/arrow/c/bridge.h b/cpp/src/arrow/c/bridge.h index 0ced3d38cd1..74a302be4c2 100644 --- a/cpp/src/arrow/c/bridge.h +++ b/cpp/src/arrow/c/bridge.h @@ -219,8 +219,8 @@ using DeviceMemoryMapper = std::function>(ArrowDeviceType, int64_t)>; ARROW_EXPORT -Result> DefaultDeviceMapper(ArrowDeviceType device_type, - int64_t device_id); +Result> DefaultDeviceMemoryMapper( + ArrowDeviceType device_type, int64_t device_id); /// \brief EXPERIMENTAL: Import C++ device array from the C data interface. /// @@ -236,7 +236,7 @@ Result> DefaultDeviceMapper(ArrowDeviceType devic ARROW_EXPORT Result> ImportDeviceArray( struct ArrowDeviceArray* array, std::shared_ptr type, - const DeviceMemoryMapper& mapper = DefaultDeviceMapper); + const DeviceMemoryMapper& mapper = DefaultDeviceMemoryMapper); /// \brief EXPERIMENTAL: Import C++ device array and its type from the C data interface. /// @@ -253,7 +253,7 @@ Result> ImportDeviceArray( ARROW_EXPORT Result> ImportDeviceArray( struct ArrowDeviceArray* array, struct ArrowSchema* type, - const DeviceMemoryMapper& mapper = DefaultDeviceMapper); + const DeviceMemoryMapper& mapper = DefaultDeviceMemoryMapper); /// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device from the C data /// interface. @@ -271,7 +271,7 @@ Result> ImportDeviceArray( ARROW_EXPORT Result> ImportDeviceRecordBatch( struct ArrowDeviceArray* array, std::shared_ptr schema, - const DeviceMemoryMapper& mapper = DefaultDeviceMapper); + const DeviceMemoryMapper& mapper = DefaultDeviceMemoryMapper); /// \brief EXPERIMENTAL: Import C++ record batch with buffers on a device and its schema /// from the C data interface. @@ -291,7 +291,7 @@ Result> ImportDeviceRecordBatch( ARROW_EXPORT Result> ImportDeviceRecordBatch( struct ArrowDeviceArray* array, struct ArrowSchema* schema, - const DeviceMemoryMapper& mapper = DefaultDeviceMapper); + const DeviceMemoryMapper& mapper = DefaultDeviceMemoryMapper); /// @} diff --git a/cpp/src/arrow/device.cc b/cpp/src/arrow/device.cc index 3736a4e018c..98b8f7b3039 100644 --- a/cpp/src/arrow/device.cc +++ b/cpp/src/arrow/device.cc @@ -18,6 +18,8 @@ #include "arrow/device.h" #include +#include +#include #include #include "arrow/array.h" @@ -268,4 +270,65 @@ std::shared_ptr CPUDevice::default_memory_manager() { return default_cpu_memory_manager(); } +namespace { + +class DeviceMapperRegistryImpl { + public: + DeviceMapperRegistryImpl() {} + + Status RegisterDevice(DeviceAllocationType device_type, DeviceMapper memory_mapper) { + std::lock_guard lock(lock_); + auto [_, inserted] = registry_.try_emplace(device_type, std::move(memory_mapper)); + if (!inserted) { + return Status::KeyError("Device type ", static_cast(device_type), + " is already registered"); + } + return Status::OK(); + } + + Result GetMapper(DeviceAllocationType device_type) { + std::lock_guard lock(lock_); + auto it = registry_.find(device_type); + if (it == registry_.end()) { + return Status::KeyError("Device type ", static_cast(device_type), + "is not registered"); + } + return it->second; + } + + private: + std::mutex lock_; + std::unordered_map registry_; +}; + +Result> DefaultCPUDeviceMapper(int64_t device_id) { + return default_cpu_memory_manager(); +} + +static std::unique_ptr CreateDeviceRegistry() { + auto registry = std::make_unique(); + + // Always register the CPU device + DCHECK_OK(registry->RegisterDevice(DeviceAllocationType::kCPU, DefaultCPUDeviceMapper)); + + return registry; +} + +DeviceMapperRegistryImpl* GetDeviceRegistry() { + static auto g_registry = CreateDeviceRegistry(); + return g_registry.get(); +} + +} // namespace + +Status RegisterDeviceMapper(DeviceAllocationType device_type, DeviceMapper mapper) { + auto registry = GetDeviceRegistry(); + return registry->RegisterDevice(device_type, std::move(mapper)); +} + +Result GetDeviceMapper(DeviceAllocationType device_type) { + auto registry = GetDeviceRegistry(); + return registry->GetMapper(device_type); +} + } // namespace arrow diff --git a/cpp/src/arrow/device.h b/cpp/src/arrow/device.h index efb0a5ab400..622551c6bd0 100644 --- a/cpp/src/arrow/device.h +++ b/cpp/src/arrow/device.h @@ -363,4 +363,32 @@ class ARROW_EXPORT CPUMemoryManager : public MemoryManager { ARROW_EXPORT std::shared_ptr default_cpu_memory_manager(); +using DeviceMapper = + std::function>(int64_t device_id)>; + +/// \brief Register a function to retrieve a MemoryManager for a Device type +/// +/// This registers the device type globally. A specific device type can only +/// be registered once. This method is thread-safe. +/// +/// Currently, this registry is only used for importing data through the C Device +/// Data Interface (for the default Device to MemoryManager mapper in +/// arrow::ImportDeviceArray/ImportDeviceRecordBatch). +/// +/// \param[in] device_type the device type for which to register a MemoryManager +/// \param[in] mapper function that takes a device id and returns the appropriate +/// MemoryManager for the registered device type and given device id +/// \return Status +ARROW_EXPORT +Status RegisterDeviceMapper(DeviceAllocationType device_type, DeviceMapper mapper); + +/// \brief Get the registered function to retrieve a MemoryManager for the +/// given Device type +/// +/// \param[in] device_type the device type +/// \return function that takes a device id and returns the appropriate +/// MemoryManager for the registered device type and given device id +ARROW_EXPORT +Result GetDeviceMapper(DeviceAllocationType device_type); + } // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index 860c6311d7b..6972321006a 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -27,6 +27,7 @@ #include #include "arrow/buffer.h" +#include "arrow/device.h" #include "arrow/io/memory.h" #include "arrow/memory_pool.h" #include "arrow/status.h" @@ -501,5 +502,23 @@ Result> DefaultMemoryMapper(ArrowDeviceType devic } } +namespace { + +Result> DefaultCUDADeviceMapper(int64_t device_id) { + ARROW_ASSIGN_OR_RAISE(auto device, arrow::cuda::CudaDevice::Make(device_id)); + return device->default_memory_manager(); +} + +bool RegisterCUDADeviceInternal() { + DCHECK_OK(RegisterDeviceMapper(DeviceAllocationType::kCUDA, DefaultCUDADeviceMapper)); + // TODO add the CUDA_HOST and CUDA_MANAGED allocation types when they are supported in + // the CudaDevice + return true; +} + +static auto cuda_registered = RegisterCUDADeviceInternal(); + +} // namespace + } // namespace cuda } // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index d323bef0349..488f4183730 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -260,7 +260,9 @@ Result GetDeviceAddress(const uint8_t* cpu_data, ARROW_EXPORT Result GetHostAddress(uintptr_t device_ptr); -ARROW_EXPORT +ARROW_DEPRECATED( + "Deprecated in 16.0.0. The CUDA device is registered by default, and you can use " + "arrow::DefaultDeviceMapper instead.") Result> DefaultMemoryMapper(ArrowDeviceType device_type, int64_t device_id); diff --git a/cpp/src/arrow/gpu/cuda_test.cc b/cpp/src/arrow/gpu/cuda_test.cc index d2f01cb3bbc..4c450bf3899 100644 --- a/cpp/src/arrow/gpu/cuda_test.cc +++ b/cpp/src/arrow/gpu/cuda_test.cc @@ -716,17 +716,6 @@ class TestCudaDeviceArrayRoundtrip : public ::testing::Test { public: using ArrayFactory = std::function>()>; - static Result> DeviceMapper(ArrowDeviceType type, - int64_t id) { - if (type != ARROW_DEVICE_CUDA) { - return Status::NotImplemented("should only be CUDA device"); - } - - ARROW_ASSIGN_OR_RAISE(auto manager, cuda::CudaDeviceManager::Instance()); - ARROW_ASSIGN_OR_RAISE(auto device, manager->GetDevice(id)); - return device->default_memory_manager(); - } - static ArrayFactory JSONArrayFactory(std::shared_ptr type, const char* json) { return [=]() { return ArrayFromJSON(type, json); }; } @@ -759,7 +748,7 @@ class TestCudaDeviceArrayRoundtrip : public ::testing::Test { std::shared_ptr device_array_roundtripped; ASSERT_OK_AND_ASSIGN(device_array_roundtripped, - ImportDeviceArray(&c_array, &c_schema, DeviceMapper)); + ImportDeviceArray(&c_array, &c_schema)); ASSERT_TRUE(ArrowSchemaIsReleased(&c_schema)); ASSERT_TRUE(ArrowArrayIsReleased(&c_array.array)); @@ -779,7 +768,7 @@ class TestCudaDeviceArrayRoundtrip : public ::testing::Test { ASSERT_OK(ExportDeviceArray(*device_array, sync, &c_array, &c_schema)); device_array_roundtripped.reset(); ASSERT_OK_AND_ASSIGN(device_array_roundtripped, - ImportDeviceArray(&c_array, &c_schema, DeviceMapper)); + ImportDeviceArray(&c_array, &c_schema)); ASSERT_TRUE(ArrowSchemaIsReleased(&c_schema)); ASSERT_TRUE(ArrowArrayIsReleased(&c_array.array));