diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 5d643336c05..cb7aa3a57b9 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -712,15 +712,17 @@ endif() add_subdirectory(src/arrow) add_subdirectory(src/arrow/io) +if (ARROW_GPU) + # IPC extensions required to build the GPU library + set(ARROW_IPC ON) + add_subdirectory(src/arrow/gpu) +endif() + if (ARROW_IPC) add_subdirectory(src/arrow/ipc) add_dependencies(arrow_dependencies metadata_fbs) endif() -if (ARROW_GPU) - add_subdirectory(src/arrow/gpu) -endif() - set(ARROW_SRCS src/arrow/array.cc src/arrow/buffer.cc diff --git a/cpp/src/arrow/gpu/.gitignore b/cpp/src/arrow/gpu/.gitignore new file mode 100644 index 00000000000..0ef3f98c58c --- /dev/null +++ b/cpp/src/arrow/gpu/.gitignore @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cuda_version.h diff --git a/cpp/src/arrow/gpu/CMakeLists.txt b/cpp/src/arrow/gpu/CMakeLists.txt index cab085302c6..176916e2822 100644 --- a/cpp/src/arrow/gpu/CMakeLists.txt +++ b/cpp/src/arrow/gpu/CMakeLists.txt @@ -15,52 +15,6 @@ # specific language governing permissions and limitations # under the License. -function(ADD_ARROW_CUDA_TEST REL_TEST_NAME) - set(options) - set(single_value_args) - set(multi_value_args STATIC_LINK_LIBS) - cmake_parse_arguments(ARG "${options}" "${one_value_args}" "${multi_value_args}" ${ARGN}) - if(ARG_UNPARSED_ARGUMENTS) - message(SEND_ERROR "Error: unrecognized arguments: ${ARG_UNPARSED_ARGUMENTS}") - endif() - - if(NO_TESTS OR NOT ARROW_BUILD_STATIC) - return() - endif() - get_filename_component(TEST_NAME ${REL_TEST_NAME} NAME_WE) - - if(EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/${REL_TEST_NAME}.cc) - # This test has a corresponding .cc file, set it up as an executable. - set(TEST_PATH "${EXECUTABLE_OUTPUT_PATH}/${TEST_NAME}") - cuda_add_executable(${TEST_NAME} "${REL_TEST_NAME}.cc") - - if (ARG_STATIC_LINK_LIBS) - # Customize link libraries - target_link_libraries(${TEST_NAME} ${ARG_STATIC_LINK_LIBS}) - else() - target_link_libraries(${TEST_NAME} ${ARROW_TEST_LINK_LIBS}) - endif() - add_dependencies(unittest ${TEST_NAME}) - else() - # No executable, just invoke the test (probably a script) directly. - set(TEST_PATH ${CMAKE_CURRENT_SOURCE_DIR}/${REL_TEST_NAME}) - endif() - - if (ARROW_TEST_MEMCHECK) - SET_PROPERTY(TARGET ${TEST_NAME} - APPEND_STRING PROPERTY - COMPILE_FLAGS " -DARROW_VALGRIND") - add_test(${TEST_NAME} - bash -c "cd ${EXECUTABLE_OUTPUT_PATH}; valgrind --tool=memcheck --leak-check=full --leak-check-heuristics=stdstring --error-exitcode=1 ${TEST_PATH}") - elseif(MSVC) - add_test(${TEST_NAME} ${TEST_PATH}) - else() - add_test(${TEST_NAME} - ${BUILD_SUPPORT_DIR}/run-test.sh ${CMAKE_BINARY_DIR} test ${TEST_PATH}) - endif() - set_tests_properties(${TEST_NAME} PROPERTIES LABELS "unittest") -endfunction() - ####################################### # arrow_gpu ####################################### @@ -73,36 +27,37 @@ find_package(CUDA REQUIRED) include_directories(SYSTEM ${CUDA_INCLUDE_DIRS}) set(ARROW_GPU_SRCS + cuda_arrow_ipc.cc + cuda_context.cc cuda_memory.cc ) set(ARROW_GPU_SHARED_LINK_LIBS arrow_shared + ${CUDA_LIBRARIES} + ${CUDA_CUDA_LIBRARY} ) -add_library(arrow_gpu_objlib OBJECT - ${ARROW_GPU_SRCS} +ADD_ARROW_LIB(arrow_gpu + SOURCES ${ARROW_GPU_SRCS} + SHARED_LINK_FLAGS "" + SHARED_LINK_LIBS ${ARROW_GPU_SHARED_LINK_LIBS} + STATIC_LINK_LIBS "" ) -set_property(TARGET arrow_gpu_objlib PROPERTY POSITION_INDEPENDENT_CODE 1) -if (ARROW_BUILD_SHARED) - cuda_add_library(arrow_gpu_shared SHARED $) - install(TARGETS arrow_gpu_shared - RUNTIME DESTINATION bin - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} - ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) -endif() +# CUDA build version +configure_file(cuda_version.h.in + "${CMAKE_CURRENT_SOURCE_DIR}/cuda_version.h" + @ONLY) -if (ARROW_BUILD_STATIC) - add_library(arrow_gpu_static STATIC $) - install(TARGETS arrow_gpu_static - RUNTIME DESTINATION bin - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} - ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) -endif() +install(FILES + "${CMAKE_CURRENT_SOURCE_DIR}/cuda_version.h" + DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/gpu") install(FILES - cuda_common.h + cuda_api.h + cuda_arrow_ipc.h + cuda_context.h cuda_memory.h DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/gpu") @@ -110,25 +65,17 @@ install(FILES configure_file(arrow-gpu.pc.in "${CMAKE_CURRENT_BINARY_DIR}/arrow-gpu.pc" @ONLY) + install( FILES "${CMAKE_CURRENT_BINARY_DIR}/arrow-gpu.pc" DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/") -# CUDA build version -configure_file(cuda_version.h.in - "${CMAKE_CURRENT_BINARY_DIR}/cuda_version.h" - @ONLY) - -install(FILES - "${CMAKE_CURRENT_BINARY_DIR}/cuda_version.h" - DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/gpu") - set(ARROW_GPU_TEST_LINK_LIBS arrow_gpu_shared ${ARROW_TEST_LINK_LIBS}) if (ARROW_BUILD_TESTS) - ADD_ARROW_CUDA_TEST(cuda-test + ADD_ARROW_TEST(cuda-test STATIC_LINK_LIBS ${ARROW_GPU_TEST_LINK_LIBS}) endif() diff --git a/cpp/src/arrow/gpu/cuda-benchmark.cc b/cpp/src/arrow/gpu/cuda-benchmark.cc index 82caacc05e4..805a044403c 100644 --- a/cpp/src/arrow/gpu/cuda-benchmark.cc +++ b/cpp/src/arrow/gpu/cuda-benchmark.cc @@ -25,7 +25,7 @@ #include "arrow/memory_pool.h" #include "arrow/test-util.h" -#include "arrow/gpu/cuda_memory.h" +#include "arrow/gpu/cuda_api.h" namespace arrow { namespace gpu { @@ -35,8 +35,13 @@ constexpr int64_t kGpuNumber = 0; static void CudaBufferWriterBenchmark(benchmark::State& state, const int64_t total_bytes, const int64_t chunksize, const int64_t buffer_size) { + CudaDeviceManager* manager; + ABORT_NOT_OK(CudaDeviceManager::GetInstance(&manager)); + std::shared_ptr context; + ABORT_NOT_OK(manager->GetContext(kGpuNumber, &context)); + std::shared_ptr device_buffer; - ABORT_NOT_OK(AllocateCudaBuffer(kGpuNumber, total_bytes, &device_buffer)); + ABORT_NOT_OK(context->Allocate(total_bytes, &device_buffer)); CudaBufferWriter writer(device_buffer); if (buffer_size > 0) { diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc index f479701eaeb..aa9d3efd2ac 100644 --- a/cpp/src/arrow/gpu/cuda-test.cc +++ b/cpp/src/arrow/gpu/cuda-test.cc @@ -21,23 +21,39 @@ #include "gtest/gtest.h" +#include "arrow/ipc/api.h" +#include "arrow/ipc/test-common.h" #include "arrow/status.h" #include "arrow/test-util.h" -#include "arrow/gpu/cuda_memory.h" +#include "arrow/gpu/cuda_api.h" namespace arrow { namespace gpu { constexpr int kGpuNumber = 0; -class TestCudaBuffer : public ::testing::Test {}; +class TestCudaBufferBase : public ::testing::Test { + public: + void SetUp() { + ASSERT_OK(CudaDeviceManager::GetInstance(&manager_)); + ASSERT_OK(manager_->GetContext(kGpuNumber, &context_)); + } + + protected: + CudaDeviceManager* manager_; + std::shared_ptr context_; +}; + +class TestCudaBuffer : public TestCudaBufferBase { + public: + void SetUp() { TestCudaBufferBase::SetUp(); } +}; TEST_F(TestCudaBuffer, Allocate) { const int64_t kSize = 100; std::shared_ptr buffer; - - ASSERT_OK(AllocateCudaBuffer(kGpuNumber, kSize, &buffer)); + ASSERT_OK(context_->Allocate(kSize, &buffer)); ASSERT_EQ(kSize, buffer->size()); } @@ -52,7 +68,7 @@ void AssertCudaBufferEquals(const CudaBuffer& buffer, const uint8_t* host_data, TEST_F(TestCudaBuffer, CopyFromHost) { const int64_t kSize = 1000; std::shared_ptr device_buffer; - ASSERT_OK(AllocateCudaBuffer(kGpuNumber, kSize, &device_buffer)); + ASSERT_OK(context_->Allocate(kSize, &device_buffer)); std::shared_ptr host_buffer; ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), &host_buffer)); @@ -63,10 +79,49 @@ TEST_F(TestCudaBuffer, CopyFromHost) { AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize); } -class TestCudaBufferWriter : public ::testing::Test { +// IPC only supported on Linux +#if defined(__linux) + +TEST_F(TestCudaBuffer, DISABLED_ExportForIpc) { + // For this test to work, a second process needs to be spawned + const int64_t kSize = 1000; + std::shared_ptr device_buffer; + ASSERT_OK(context_->Allocate(kSize, &device_buffer)); + + std::shared_ptr host_buffer; + ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), &host_buffer)); + ASSERT_OK(device_buffer->CopyFromHost(0, host_buffer->data(), kSize)); + + // Export for IPC and serialize + std::unique_ptr ipc_handle; + ASSERT_OK(device_buffer->ExportForIpc(&ipc_handle)); + + std::shared_ptr serialized_handle; + ASSERT_OK(ipc_handle->Serialize(default_memory_pool(), &serialized_handle)); + + // Deserialize IPC handle and open + std::unique_ptr ipc_handle2; + ASSERT_OK(CudaIpcMemHandle::FromBuffer(serialized_handle->data(), &ipc_handle2)); + + std::shared_ptr ipc_buffer; + ASSERT_OK(context_->OpenIpcBuffer(*ipc_handle2, &ipc_buffer)); + + ASSERT_EQ(kSize, ipc_buffer->size()); + + std::shared_ptr ipc_data; + ASSERT_OK(AllocateBuffer(default_memory_pool(), kSize, &ipc_data)); + ASSERT_OK(ipc_buffer->CopyToHost(0, kSize, ipc_data->mutable_data())); + ASSERT_EQ(0, std::memcmp(ipc_buffer->data(), host_buffer->data(), kSize)); +} + +#endif + +class TestCudaBufferWriter : public TestCudaBufferBase { public: + void SetUp() { TestCudaBufferBase::SetUp(); } + void Allocate(const int64_t size) { - ASSERT_OK(AllocateCudaBuffer(kGpuNumber, size, &device_buffer_)); + ASSERT_OK(context_->Allocate(size, &device_buffer_)); writer_.reset(new CudaBufferWriter(device_buffer_)); } @@ -164,11 +219,16 @@ TEST_F(TestCudaBufferWriter, EdgeCases) { AssertCudaBufferEquals(*device_buffer_, host_data, 1000); } -TEST(TestCudaBufferReader, Basics) { +class TestCudaBufferReader : public TestCudaBufferBase { + public: + void SetUp() { TestCudaBufferBase::SetUp(); } +}; + +TEST_F(TestCudaBufferReader, Basics) { std::shared_ptr device_buffer; const int64_t size = 1000; - ASSERT_OK(AllocateCudaBuffer(kGpuNumber, size, &device_buffer)); + ASSERT_OK(context_->Allocate(size, &device_buffer)); std::shared_ptr buffer; ASSERT_OK(test::MakeRandomBytePoolBuffer(1000, default_memory_pool(), &buffer)); @@ -204,5 +264,41 @@ TEST(TestCudaBufferReader, Basics) { ASSERT_EQ(0, std::memcmp(stack_buffer, host_data + 925, 75)); } +class TestCudaArrowIpc : public TestCudaBufferBase { + public: + void SetUp() { + TestCudaBufferBase::SetUp(); + pool_ = default_memory_pool(); + } + + protected: + MemoryPool* pool_; +}; + +TEST_F(TestCudaArrowIpc, BasicWriteRead) { + std::shared_ptr batch; + ASSERT_OK(ipc::MakeIntRecordBatch(&batch)); + + std::shared_ptr device_serialized; + ASSERT_OK(arrow::gpu::SerializeRecordBatch(*batch, context_.get(), &device_serialized)); + + // Test that ReadRecordBatch works properly + std::shared_ptr device_batch; + ASSERT_OK(ReadRecordBatch(batch->schema(), device_serialized, default_memory_pool(), + &device_batch)); + + // Copy data from device, read batch, and compare + std::shared_ptr host_buffer; + int64_t size = device_serialized->size(); + ASSERT_OK(AllocateBuffer(pool_, size, &host_buffer)); + ASSERT_OK(device_serialized->CopyToHost(0, size, host_buffer->mutable_data())); + + std::shared_ptr cpu_batch; + io::BufferReader cpu_reader(host_buffer); + ASSERT_OK(ipc::ReadRecordBatch(batch->schema(), &cpu_reader, &cpu_batch)); + + ipc::CompareBatch(*batch, *cpu_batch); +} + } // namespace gpu } // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_api.h b/cpp/src/arrow/gpu/cuda_api.h index a70e0af92af..c63b77e8721 100644 --- a/cpp/src/arrow/gpu/cuda_api.h +++ b/cpp/src/arrow/gpu/cuda_api.h @@ -18,7 +18,9 @@ #ifndef ARROW_GPU_CUDA_API_H #define ARROW_GPU_CUDA_API_H +#include "arrow/gpu/cuda_arrow_ipc.h" +#include "arrow/gpu/cuda_context.h" #include "arrow/gpu/cuda_memory.h" #include "arrow/gpu/cuda_version.h" -#endif // ARROW_GPU_CUDA_API_H +#endif // ARROW_GPU_CUDA_API_H diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc new file mode 100644 index 00000000000..669857d9203 --- /dev/null +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/gpu/cuda_arrow_ipc.h" + +#include +#include +#include +#include + +#include "arrow/buffer.h" +#include "arrow/ipc/Message_generated.h" +#include "arrow/ipc/message.h" +#include "arrow/ipc/reader.h" +#include "arrow/ipc/writer.h" +#include "arrow/status.h" +#include "arrow/table.h" +#include "arrow/util/visibility.h" + +#include "arrow/gpu/cuda_context.h" +#include "arrow/gpu/cuda_memory.h" + +namespace arrow { + +namespace flatbuf = org::apache::arrow::flatbuf; + +namespace gpu { + +Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, + std::shared_ptr* out) { + int64_t size = 0; + RETURN_NOT_OK(ipc::GetRecordBatchSize(batch, &size)); + + std::shared_ptr buffer; + RETURN_NOT_OK(ctx->Allocate(size, &buffer)); + + CudaBufferWriter stream(buffer); + + // Use 8MB buffering, which yields generally good performance + RETURN_NOT_OK(stream.SetBufferSize(1 << 23)); + + // We use the default memory pool here since any allocations are ephemeral + RETURN_NOT_OK(ipc::SerializeRecordBatch(batch, default_memory_pool(), &stream)); + RETURN_NOT_OK(stream.Close()); + *out = buffer; + return Status::OK(); +} + +Status ReadMessage(CudaBufferReader* reader, MemoryPool* pool, + std::unique_ptr* out) { + int32_t message_length = 0; + int64_t bytes_read = 0; + + RETURN_NOT_OK(reader->Read(sizeof(int32_t), &bytes_read, + reinterpret_cast(&message_length))); + if (bytes_read != sizeof(int32_t)) { + *out = nullptr; + return Status::OK(); + } + + if (message_length == 0) { + // Optional 0 EOS control message + *out = nullptr; + return Status::OK(); + } + + std::shared_ptr metadata; + RETURN_NOT_OK(AllocateBuffer(pool, message_length, &metadata)); + RETURN_NOT_OK(reader->Read(message_length, &bytes_read, metadata->mutable_data())); + if (bytes_read != message_length) { + std::stringstream ss; + ss << "Expected " << message_length << " metadata bytes, but only got " << bytes_read; + return Status::IOError(ss.str()); + } + + return ipc::Message::ReadFrom(metadata, reader, out); +} + +Status ReadRecordBatch(const std::shared_ptr& schema, + const std::shared_ptr& buffer, MemoryPool* pool, + std::shared_ptr* out) { + CudaBufferReader cuda_reader(buffer); + + std::unique_ptr message; + RETURN_NOT_OK(ReadMessage(&cuda_reader, pool, &message)); + + if (!message) { + return Status::Invalid("Message is length 0"); + } + + // Zero-copy read on device memory + return ipc::ReadRecordBatch(*message, schema, out); +} + +} // namespace gpu +} // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.h b/cpp/src/arrow/gpu/cuda_arrow_ipc.h new file mode 100644 index 00000000000..52dd92473ea --- /dev/null +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.h @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_GPU_CUDA_ARROW_IPC_H +#define ARROW_GPU_CUDA_ARROW_IPC_H + +#include +#include + +#include "arrow/buffer.h" +#include "arrow/status.h" +#include "arrow/util/visibility.h" + +#include "arrow/gpu/cuda_memory.h" + +namespace arrow { + +class MemoryPool; +class RecordBatch; +class Schema; + +namespace ipc { + +class Message; + +} // namespace ipc + +namespace gpu { + +/// \brief Write record batch message to GPU device memory +/// \param[in] batch record batch to write +/// \param[in] ctx CudaContext to allocate device memory from +/// \param[out] out the returned device buffer which contains the record batch message +/// \return Status +ARROW_EXPORT +Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, + std::shared_ptr* out); + +/// \brief Read Arrow IPC message located on GPU device +/// \param[in] reader a CudaBufferReader +/// \param[in] pool a MemoryPool to allocate CPU memory for the metadata +/// \param[out] message the deserialized message, body still on device +/// +/// This function reads the message metadata into host memory, but leaves the +/// message body on the device +ARROW_EXPORT +Status ReadMessage(CudaBufferReader* reader, MemoryPool* pool, + std::unique_ptr* message); + +/// \brief ReadRecordBatch specialized to handle metadata on CUDA device +/// \param[in] schema the Schema for the record batch +/// \param[in] buffer a CudaBuffer containing the complete IPC message +/// \param[in] pool a MemoryPool to use for allocating space for the metadata +/// \param[out] out the reconstructed RecordBatch, with device pointers +ARROW_EXPORT +Status ReadRecordBatch(const std::shared_ptr& schema, + const std::shared_ptr& buffer, MemoryPool* pool, + std::shared_ptr* out); + +} // namespace gpu +} // namespace arrow + +#endif // ARROW_GPU_CUDA_ARROW_IPC_H diff --git a/cpp/src/arrow/gpu/cuda_common.h b/cpp/src/arrow/gpu/cuda_common.h index 1d65f96adbc..c06c1a21ff4 100644 --- a/cpp/src/arrow/gpu/cuda_common.h +++ b/cpp/src/arrow/gpu/cuda_common.h @@ -22,7 +22,7 @@ #include -#include +#include namespace arrow { namespace gpu { @@ -34,24 +34,13 @@ namespace gpu { (void)ret; \ } while (0) -#define CUDA_RETURN_NOT_OK(STMT) \ - do { \ - cudaError_t ret = (STMT); \ - if (ret != cudaSuccess) { \ - std::stringstream ss; \ - ss << "Cuda API call in " << __FILE__ << " at line " << __LINE__ \ - << " failed: " << #STMT; \ - return Status::IOError(ss.str()); \ - } \ - } while (0) - -#define CUDADRV_RETURN_NOT_OK(STMT) \ +#define CU_RETURN_NOT_OK(STMT) \ do { \ CUresult ret = (STMT); \ if (ret != CUDA_SUCCESS) { \ std::stringstream ss; \ ss << "Cuda Driver API call in " << __FILE__ << " at line " << __LINE__ \ - << " failed: " << #STMT; \ + << " failed with code " << ret << ": " << #STMT; \ return Status::IOError(ss.str()); \ } \ } while (0) diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc new file mode 100644 index 00000000000..430ecab6fbe --- /dev/null +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -0,0 +1,277 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/gpu/cuda_context.h" + +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/gpu/cuda_common.h" +#include "arrow/gpu/cuda_memory.h" + +namespace arrow { +namespace gpu { + +struct CudaDevice { + int device_num; + CUdevice handle; + int64_t total_memory; +}; + +class CudaContext::CudaContextImpl { + public: + CudaContextImpl() {} + + Status Init(const CudaDevice& device) { + device_ = device; + CU_RETURN_NOT_OK(cuCtxCreate(&context_, 0, device_.handle)); + is_open_ = true; + return Status::OK(); + } + + Status Close() { + if (is_open_ && own_context_) { + CU_RETURN_NOT_OK(cuCtxDestroy(context_)); + } + is_open_ = false; + return Status::OK(); + } + + int64_t bytes_allocated() const { return bytes_allocated_.load(); } + + Status Allocate(int64_t nbytes, uint8_t** out) { + CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); + + CUdeviceptr data; + CU_RETURN_NOT_OK(cuMemAlloc(&data, static_cast(nbytes))); + *out = reinterpret_cast(data); + return Status::OK(); + } + + Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes) { + CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); + CU_RETURN_NOT_OK(cuMemcpyHtoD(reinterpret_cast(dst), + reinterpret_cast(src), + static_cast(nbytes))); + return Status::OK(); + } + + Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes) { + CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); + CU_RETURN_NOT_OK(cuMemcpyDtoH(dst, reinterpret_cast(src), + static_cast(nbytes))); + return Status::OK(); + } + + Status Free(uint8_t* device_ptr, int64_t nbytes) { + CU_RETURN_NOT_OK(cuMemFree(reinterpret_cast(device_ptr))); + return Status::OK(); + } + + Status ExportIpcBuffer(uint8_t* data, std::unique_ptr* handle) { + CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); + CUipcMemHandle cu_handle; + CU_RETURN_NOT_OK(cuIpcGetMemHandle(&cu_handle, reinterpret_cast(data))); + *handle = std::unique_ptr(new CudaIpcMemHandle(&cu_handle)); + return Status::OK(); + } + + Status OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle, uint8_t** out) { + CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); + auto handle = reinterpret_cast(ipc_handle.handle()); + + CUdeviceptr data; + CU_RETURN_NOT_OK( + cuIpcOpenMemHandle(&data, *handle, CU_IPC_MEM_LAZY_ENABLE_PEER_ACCESS)); + *out = reinterpret_cast(data); + return Status::OK(); + } + + const CudaDevice device() const { return device_; } + + private: + CudaDevice device_; + CUcontext context_; + bool is_open_; + + // So that we can utilize a CUcontext that was created outside this library + bool own_context_; + + std::atomic bytes_allocated_; +}; + +class CudaDeviceManager::CudaDeviceManagerImpl { + public: + CudaDeviceManagerImpl() : host_bytes_allocated_(0) {} + + Status Init() { + CU_RETURN_NOT_OK(cuInit(0)); + CU_RETURN_NOT_OK(cuDeviceGetCount(&num_devices_)); + + devices_.resize(num_devices_); + for (int i = 0; i < num_devices_; ++i) { + RETURN_NOT_OK(GetDeviceProperties(i, &devices_[i])); + } + return Status::OK(); + } + + Status AllocateHost(int64_t nbytes, uint8_t** out) { + CU_RETURN_NOT_OK(cuMemHostAlloc(reinterpret_cast(out), + static_cast(nbytes), + CU_MEMHOSTALLOC_PORTABLE)); + host_bytes_allocated_ += nbytes; + return Status::OK(); + } + + Status FreeHost(uint8_t* data, int64_t nbytes) { + CU_RETURN_NOT_OK(cuMemFreeHost(data)); + host_bytes_allocated_ -= nbytes; + return Status::OK(); + } + + Status GetDeviceProperties(int device_number, CudaDevice* device) { + device->device_num = device_number; + CU_RETURN_NOT_OK(cuDeviceGet(&device->handle, device_number)); + + size_t total_memory = 0; + CU_RETURN_NOT_OK(cuDeviceTotalMem(&total_memory, device->handle)); + device->total_memory = total_memory; + return Status::OK(); + } + + Status CreateNewContext(int device_number, std::shared_ptr* out) { + *out = std::shared_ptr(new CudaContext()); + return (*out)->impl_->Init(devices_[device_number]); + } + + Status GetContext(int device_number, std::shared_ptr* out) { + auto it = contexts_.find(device_number); + if (it == contexts_.end()) { + std::shared_ptr new_context; + RETURN_NOT_OK(CreateNewContext(device_number, &new_context)); + contexts_[device_number] = *out = new_context; + } else { + *out = it->second; + } + return Status::OK(); + } + + int num_devices() const { return num_devices_; } + + private: + int num_devices_; + std::vector devices_; + + // device_number -> CudaContext + std::unordered_map> contexts_; + + int host_bytes_allocated_; +}; + +CudaDeviceManager::CudaDeviceManager() { impl_.reset(new CudaDeviceManagerImpl()); } + +std::unique_ptr CudaDeviceManager::instance_ = nullptr; + +Status CudaDeviceManager::GetInstance(CudaDeviceManager** manager) { + if (!instance_) { + instance_.reset(new CudaDeviceManager()); + RETURN_NOT_OK(instance_->impl_->Init()); + } + *manager = instance_.get(); + return Status::OK(); +} + +Status CudaDeviceManager::GetContext(int device_number, + std::shared_ptr* out) { + return impl_->GetContext(device_number, out); +} + +Status CudaDeviceManager::CreateNewContext(int device_number, + std::shared_ptr* out) { + return impl_->CreateNewContext(device_number, out); +} + +Status CudaDeviceManager::AllocateHost(int64_t nbytes, + std::shared_ptr* out) { + uint8_t* data = nullptr; + RETURN_NOT_OK(impl_->AllocateHost(nbytes, &data)); + *out = std::make_shared(data, nbytes); + return Status::OK(); +} + +Status CudaDeviceManager::FreeHost(uint8_t* data, int64_t nbytes) { + return impl_->FreeHost(data, nbytes); +} + +int CudaDeviceManager::num_devices() const { return impl_->num_devices(); } + +// ---------------------------------------------------------------------- +// CudaContext public API + +CudaContext::CudaContext() { impl_.reset(new CudaContextImpl()); } + +CudaContext::~CudaContext() {} + +Status CudaContext::Allocate(int64_t nbytes, std::shared_ptr* out) { + uint8_t* data = nullptr; + RETURN_NOT_OK(impl_->Allocate(nbytes, &data)); + *out = std::make_shared(data, nbytes, this->shared_from_this(), true); + return Status::OK(); +} + +Status CudaContext::ExportIpcBuffer(uint8_t* data, + std::unique_ptr* handle) { + return impl_->ExportIpcBuffer(data, handle); +} + +Status CudaContext::CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes) { + return impl_->CopyHostToDevice(dst, src, nbytes); +} + +Status CudaContext::CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes) { + return impl_->CopyDeviceToHost(dst, src, nbytes); +} + +Status CudaContext::Close() { return impl_->Close(); } + +Status CudaContext::Free(uint8_t* device_ptr, int64_t nbytes) { + return impl_->Free(device_ptr, nbytes); +} + +Status CudaContext::OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle, + std::shared_ptr* out) { + uint8_t* data = nullptr; + RETURN_NOT_OK(impl_->OpenIpcBuffer(ipc_handle, &data)); + + // Need to ask the device how big the buffer is + size_t allocation_size = 0; + CU_RETURN_NOT_OK(cuMemGetAddressRange(nullptr, &allocation_size, + reinterpret_cast(data))); + + *out = std::make_shared(data, allocation_size, this->shared_from_this(), + true, true); + return Status::OK(); +} + +} // namespace gpu +} // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h new file mode 100644 index 00000000000..64710596123 --- /dev/null +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -0,0 +1,108 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_GPU_CUDA_CONTEXT_H +#define ARROW_GPU_CUDA_CONTEXT_H + +#include +#include + +#include "arrow/status.h" +#include "arrow/util/visibility.h" + +#include "arrow/gpu/cuda_memory.h" + +namespace arrow { +namespace gpu { + +// Forward declaration +class CudaContext; + +class ARROW_EXPORT CudaDeviceManager { + public: + static Status GetInstance(CudaDeviceManager** manager); + + /// \brief Get the shared CUDA driver context for a particular device + Status GetContext(int gpu_number, std::shared_ptr* ctx); + + /// \brief Create a new context for a given device number + /// + /// In general code will use GetContext + Status CreateNewContext(int gpu_number, std::shared_ptr* ctx); + + Status AllocateHost(int64_t nbytes, std::shared_ptr* buffer); + + Status FreeHost(uint8_t* data, int64_t nbytes); + + int num_devices() const; + + private: + CudaDeviceManager(); + static std::unique_ptr instance_; + + class CudaDeviceManagerImpl; + std::unique_ptr impl_; + + friend CudaContext; +}; + +struct ARROW_EXPORT CudaDeviceInfo {}; + +/// \class CudaContext +/// \brief Friendlier interface to the CUDA driver API +class ARROW_EXPORT CudaContext : public std::enable_shared_from_this { + public: + ~CudaContext(); + + Status Close(); + + /// \brief Allocate CUDA memory on GPU device for this context + /// \param[in] nbytes number of bytes + /// \param[out] out the allocated buffer + /// \return Status + Status Allocate(int64_t nbytes, std::shared_ptr* out); + + /// \brief Open existing CUDA IPC memory handle + /// \param[in] ipc_handle opaque pointer to CUipcMemHandle (driver API) + /// \param[out] buffer a CudaBuffer referencing + /// \return Status + Status OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle, + std::shared_ptr* buffer); + + int64_t bytes_allocated() const; + + private: + CudaContext(); + + Status ExportIpcBuffer(uint8_t* data, std::unique_ptr* handle); + Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes); + Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes); + Status Free(uint8_t* device_ptr, int64_t nbytes); + + class CudaContextImpl; + std::unique_ptr impl_; + + friend CudaBuffer; + friend CudaBufferReader; + friend CudaBufferWriter; + friend CudaDeviceManager::CudaDeviceManagerImpl; +}; + +} // namespace gpu +} // namespace arrow + +#endif // ARROW_GPU_CUDA_CONTEXT_H diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index 4870813b661..3c88fe2d59f 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -19,68 +19,124 @@ #include #include +#include #include +#include + #include "arrow/buffer.h" #include "arrow/io/memory.h" #include "arrow/status.h" #include "arrow/util/logging.h" #include "arrow/gpu/cuda_common.h" +#include "arrow/gpu/cuda_context.h" namespace arrow { namespace gpu { -CudaBuffer::~CudaBuffer() { +// ---------------------------------------------------------------------- +// CUDA IPC memory handle + +struct CudaIpcMemHandle::CudaIpcMemHandleImpl { + explicit CudaIpcMemHandleImpl(const void* handle) { + memcpy(&ipc_handle, handle, sizeof(CUipcMemHandle)); + } + + CUipcMemHandle ipc_handle; +}; + +CudaIpcMemHandle::CudaIpcMemHandle(const void* handle) { + impl_.reset(new CudaIpcMemHandleImpl(handle)); +} + +CudaIpcMemHandle::~CudaIpcMemHandle() {} + +Status CudaIpcMemHandle::FromBuffer(const void* opaque_handle, + std::unique_ptr* handle) { + *handle = std::unique_ptr(new CudaIpcMemHandle(opaque_handle)); + return Status::OK(); +} + +Status CudaIpcMemHandle::Serialize(MemoryPool* pool, std::shared_ptr* out) const { + std::shared_ptr buffer; + constexpr size_t kHandleSize = sizeof(CUipcMemHandle); + RETURN_NOT_OK(AllocateBuffer(pool, static_cast(kHandleSize), &buffer)); + memcpy(buffer->mutable_data(), &impl_->ipc_handle, kHandleSize); + *out = buffer; + return Status::OK(); +} + +const void* CudaIpcMemHandle::handle() const { return &impl_->ipc_handle; } + +// ---------------------------------------------------------------------- + +CudaBuffer::CudaBuffer(uint8_t* data, int64_t size, + const std::shared_ptr& context, bool own_data, + bool is_ipc) + : Buffer(data, size), context_(context), own_data_(own_data), is_ipc_(is_ipc) { + is_mutable_ = true; + mutable_data_ = data; +} + +CudaBuffer::~CudaBuffer() { DCHECK(Close().ok()); } + +Status CudaBuffer::Close() { if (own_data_) { - CUDA_DCHECK(cudaFree(mutable_data_)); + if (is_ipc_) { + CU_RETURN_NOT_OK(cuIpcCloseMemHandle(reinterpret_cast(mutable_data_))); + } else { + return context_->Free(mutable_data_, size_); + } } + return Status::OK(); } CudaBuffer::CudaBuffer(const std::shared_ptr& parent, const int64_t offset, const int64_t size) - : Buffer(parent, offset, size), gpu_number_(parent->gpu_number()) {} + : Buffer(parent, offset, size), + context_(parent->context()), + own_data_(false), + is_ipc_(false) {} Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes, uint8_t* out) const { - CUDA_RETURN_NOT_OK(cudaMemcpy(out, data_ + position, nbytes, cudaMemcpyDeviceToHost)); - return Status::OK(); + return context_->CopyDeviceToHost(out, data_ + position, nbytes); } Status CudaBuffer::CopyFromHost(const int64_t position, const uint8_t* data, int64_t nbytes) { DCHECK_LE(nbytes, size_ - position) << "Copy would overflow buffer"; - CUDA_RETURN_NOT_OK( - cudaMemcpy(mutable_data_ + position, data, nbytes, cudaMemcpyHostToDevice)); - return Status::OK(); + return context_->CopyHostToDevice(mutable_data_ + position, data, nbytes); } -Status AllocateCudaBuffer(int gpu_number, const int64_t size, - std::shared_ptr* out) { - CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number)); - uint8_t* data = nullptr; - CUDA_RETURN_NOT_OK( - cudaMalloc(reinterpret_cast(&data), static_cast(size))); - *out = std::make_shared(data, size, gpu_number, true); +Status CudaBuffer::ExportForIpc(std::unique_ptr* handle) { + if (is_ipc_) { + return Status::Invalid("Buffer has already been exported for IPC"); + } + RETURN_NOT_OK(context_->ExportIpcBuffer(mutable_data_, handle)); + own_data_ = false; return Status::OK(); } -CudaHostBuffer::~CudaHostBuffer() { CUDA_DCHECK(cudaFreeHost(mutable_data_)); } +CudaHostBuffer::~CudaHostBuffer() { + CudaDeviceManager* manager = nullptr; + DCHECK(CudaDeviceManager::GetInstance(&manager).ok()); + DCHECK(manager->FreeHost(mutable_data_, size_).ok()); +} // ---------------------------------------------------------------------- // CudaBufferReader CudaBufferReader::CudaBufferReader(const std::shared_ptr& buffer) - : io::BufferReader(buffer), cuda_buffer_(buffer), gpu_number_(buffer->gpu_number()) {} + : io::BufferReader(buffer), cuda_buffer_(buffer), context_(buffer->context()) {} CudaBufferReader::~CudaBufferReader() {} Status CudaBufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { nbytes = std::min(nbytes, size_ - position_); - CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_)); - CUDA_RETURN_NOT_OK( - cudaMemcpy(buffer, data_ + position_, nbytes, cudaMemcpyDeviceToHost)); *bytes_read = nbytes; + RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, data_ + position_, nbytes)); position_ += nbytes; return Status::OK(); } @@ -97,7 +153,7 @@ Status CudaBufferReader::Read(int64_t nbytes, std::shared_ptr* out) { CudaBufferWriter::CudaBufferWriter(const std::shared_ptr& buffer) : io::FixedSizeBufferWriter(buffer), - gpu_number_(buffer->gpu_number()), + context_(buffer->context()), buffer_size_(0), buffer_position_(0) {} @@ -108,10 +164,8 @@ Status CudaBufferWriter::Close() { return Flush(); } Status CudaBufferWriter::Flush() { if (buffer_size_ > 0 && buffer_position_ > 0) { // Only need to flush when the write has been buffered - CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_)); - CUDA_RETURN_NOT_OK(cudaMemcpy(mutable_data_ + position_ - buffer_position_, - host_buffer_data_, buffer_position_, - cudaMemcpyHostToDevice)); + RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_ - buffer_position_, + host_buffer_data_, buffer_position_)); buffer_position_ = 0; } return Status::OK(); @@ -137,9 +191,7 @@ Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) { if (nbytes + buffer_position_ >= buffer_size_) { // Reach end of buffer, write everything RETURN_NOT_OK(Flush()); - CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_)); - CUDA_RETURN_NOT_OK( - cudaMemcpy(mutable_data_ + position_, data, nbytes, cudaMemcpyHostToDevice)); + RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes)); } else { // Write bytes to buffer std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes); @@ -147,9 +199,7 @@ Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) { } } else { // Unbuffered write - CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_)); - CUDA_RETURN_NOT_OK( - cudaMemcpy(mutable_data_ + position_, data, nbytes, cudaMemcpyHostToDevice)); + RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes)); } position_ += nbytes; return Status::OK(); @@ -169,11 +219,9 @@ Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) { // ---------------------------------------------------------------------- Status AllocateCudaHostBuffer(const int64_t size, std::shared_ptr* out) { - uint8_t* data = nullptr; - CUDA_RETURN_NOT_OK( - cudaMallocHost(reinterpret_cast(&data), static_cast(size))); - *out = std::make_shared(data, size); - return Status::OK(); + CudaDeviceManager* manager = nullptr; + RETURN_NOT_OK(CudaDeviceManager::GetInstance(&manager)); + return manager->AllocateHost(size, out); } } // namespace gpu diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index bd8b89a75ed..d5407371f35 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -29,17 +29,17 @@ namespace arrow { namespace gpu { +class CudaContext; +class CudaIpcMemHandle; + /// \class CudaBuffer /// \brief An Arrow buffer located on a GPU device /// /// Be careful using this in any Arrow code which may not be GPU-aware class ARROW_EXPORT CudaBuffer : public Buffer { public: - CudaBuffer(uint8_t* data, int64_t size, const int gpu_number, bool own_data = false) - : Buffer(data, size), gpu_number_(gpu_number), own_data_(own_data) { - is_mutable_ = true; - mutable_data_ = data; - } + CudaBuffer(uint8_t* data, int64_t size, const std::shared_ptr& context, + bool own_data = false, bool is_ipc = false); CudaBuffer(const std::shared_ptr& parent, const int64_t offset, const int64_t size); @@ -58,11 +58,22 @@ class ARROW_EXPORT CudaBuffer : public Buffer { /// \return Status Status CopyFromHost(const int64_t position, const uint8_t* data, int64_t nbytes); - int gpu_number() const { return gpu_number_; } + /// \brief Expose this device buffer as IPC memory which can be used in other processes + /// \param[out] handle the exported IPC handle + /// \return Status + /// + /// \note After calling this function, this device memory will not be freed + /// when the CudaBuffer is destructed + virtual Status ExportForIpc(std::unique_ptr* handle); + + std::shared_ptr context() const { return context_; } - private: - const int gpu_number_; + protected: + std::shared_ptr context_; bool own_data_; + bool is_ipc_; + + virtual Status Close(); }; /// \class CudaHostBuffer @@ -73,6 +84,37 @@ class ARROW_EXPORT CudaHostBuffer : public MutableBuffer { ~CudaHostBuffer(); }; +/// \class CudaIpcHandle +/// \brief A container for a CUDA IPC handle +class ARROW_EXPORT CudaIpcMemHandle { + public: + ~CudaIpcMemHandle(); + + /// \brief Create CudaIpcMemHandle from opaque buffer (e.g. from another process) + /// \param[in] opaque_handle a CUipcMemHandle as a const void* + /// \param[out] handle the CudaIpcMemHandle instance + /// \return Status + static Status FromBuffer(const void* opaque_handle, + std::unique_ptr* handle); + + /// \brief Write CudaIpcMemHandle to a Buffer + /// \param[in] pool a MemoryPool to allocate memory from + /// \param[out] out the serialized buffer + /// \return Status + Status Serialize(MemoryPool* pool, std::shared_ptr* out) const; + + private: + explicit CudaIpcMemHandle(const void* handle); + + struct CudaIpcMemHandleImpl; + std::unique_ptr impl_; + + const void* handle() const; + + friend CudaBuffer; + friend CudaContext; +}; + /// \class CudaBufferReader /// \brief File interface for zero-copy read from CUDA buffers /// @@ -98,7 +140,7 @@ class ARROW_EXPORT CudaBufferReader : public io::BufferReader { private: std::shared_ptr cuda_buffer_; - int gpu_number_; + std::shared_ptr context_; }; /// \class CudaBufferWriter @@ -132,7 +174,7 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter { int64_t num_bytes_buffered() const { return buffer_position_; } private: - int gpu_number_; + std::shared_ptr context_; // Pinned host buffer for buffering writes on CPU before calling cudaMalloc int64_t buffer_size_; @@ -141,15 +183,6 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter { uint8_t* host_buffer_data_; }; -/// \brief Allocate CUDA memory on a GPU device -/// \param[in] gpu_number Device number to allocate -/// \param[in] size number of bytes -/// \param[out] out the allocated buffer -/// \return Status -ARROW_EXPORT -Status AllocateCudaBuffer(const int gpu_number, const int64_t size, - std::shared_ptr* out); - /// \brief Allocate CUDA-accessible memory on CPU host /// \param[in] size number of bytes /// \param[out] out the allocated buffer diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index eb06aaf2fc6..53f0203f080 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -149,6 +149,24 @@ bool Message::Equals(const Message& other) const { } } +Status Message::ReadFrom(const std::shared_ptr& metadata, io::InputStream* stream, + std::unique_ptr* out) { + auto fb_message = flatbuf::GetMessage(metadata->data()); + + int64_t body_length = fb_message->bodyLength(); + + std::shared_ptr body; + RETURN_NOT_OK(stream->Read(body_length, &body)); + if (body->size() < body_length) { + std::stringstream ss; + ss << "Expected to be able to read " << body_length << " bytes for message body, got " + << body->size(); + return Status::IOError(ss.str()); + } + + return Message::Open(metadata, body, out); +} + Status Message::SerializeTo(io::OutputStream* file, int64_t* output_length) const { int32_t metadata_length = 0; RETURN_NOT_OK(WriteMessage(*metadata(), file, &metadata_length)); @@ -178,29 +196,6 @@ std::string FormatMessageType(Message::Type type) { return "unknown"; } -// ---------------------------------------------------------------------- -// Read and write messages - -static Status ReadFullMessage(const std::shared_ptr& metadata, - io::InputStream* stream, - std::unique_ptr* message) { - auto fb_message = flatbuf::GetMessage(metadata->data()); - - int64_t body_length = fb_message->bodyLength(); - - std::shared_ptr body; - RETURN_NOT_OK(stream->Read(body_length, &body)); - - if (body->size() < body_length) { - std::stringstream ss; - ss << "Expected to be able to read " << body_length << " bytes for message body, got " - << body->size(); - return Status::IOError(ss.str()); - } - - return Message::Open(metadata, body, message); -} - Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file, std::unique_ptr* message) { std::shared_ptr buffer; @@ -216,32 +211,33 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile } auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4); - return ReadFullMessage(metadata, file, message); + return Message::ReadFrom(metadata, file, message); } Status ReadMessage(io::InputStream* file, std::unique_ptr* message) { - std::shared_ptr buffer; + int32_t message_length = 0; + int64_t bytes_read = 0; + RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read, + reinterpret_cast(&message_length))); - RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer)); - if (buffer->size() != sizeof(int32_t)) { + if (bytes_read != sizeof(int32_t)) { *message = nullptr; return Status::OK(); } - int32_t message_length = *reinterpret_cast(buffer->data()); - if (message_length == 0) { // Optional 0 EOS control message *message = nullptr; return Status::OK(); } - RETURN_NOT_OK(file->Read(message_length, &buffer)); - if (buffer->size() != message_length) { + std::shared_ptr metadata; + RETURN_NOT_OK(file->Read(message_length, &metadata)); + if (metadata->size() != message_length) { return Status::IOError("Unexpected end of stream trying to read message"); } - return ReadFullMessage(buffer, file, message); + return Message::ReadFrom(metadata, file, message); } // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index dce4e27fb2d..dbc50d84490 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -79,6 +79,15 @@ class ARROW_EXPORT Message { static Status Open(const std::shared_ptr& metadata, const std::shared_ptr& body, std::unique_ptr* out); + /// \brief Read message body and create Message given Flatbuffer metadata + /// \param[in] metadata containing a serialized Message flatbuffer + /// \param[in] stream an InputStream + /// \param[out] out the created Message + /// + /// \note If stream supports zero-copy, this is zero-copy + static Status ReadFrom(const std::shared_ptr& metadata, io::InputStream* stream, + std::unique_ptr* out); + /// \brief Write length-prefixed metadata and body to output stream /// /// \param[in] file output stream to write to diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 9c05cba918d..e17b974adfc 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -901,14 +901,19 @@ Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, RETURN_NOT_OK(AllocateBuffer(pool, size, &buffer)); io::FixedSizeBufferWriter stream(buffer); - int32_t metadata_length = 0; - int64_t body_length = 0; - RETURN_NOT_OK(WriteRecordBatch(batch, 0, &stream, &metadata_length, &body_length, pool, - kMaxNestingDepth, true)); + RETURN_NOT_OK(SerializeRecordBatch(batch, pool, &stream)); *out = buffer; return Status::OK(); } +Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, + io::OutputStream* out) { + int32_t metadata_length = 0; + int64_t body_length = 0; + return WriteRecordBatch(batch, 0, out, &metadata_length, &body_length, pool, + kMaxNestingDepth, true); +} + Status SerializeSchema(const Schema& schema, MemoryPool* pool, std::shared_ptr* out) { std::shared_ptr stream; diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index d867982d2be..3f110fe26fb 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -177,6 +177,18 @@ ARROW_EXPORT Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, std::shared_ptr* out); +/// \brief Write record batch to OutputStream +/// +/// \param[in] batch the record batch to write +/// \param[in] out the OutputStream to write the output to +/// \return Status +/// +/// If writing to pre-allocated memory, you can use +/// arrow::ipc::GetRecordBatchSize to compute how much space is required +ARROW_EXPORT +Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, + io::OutputStream* out); + /// \brief Serialize schema using stream writer as a sequence of one or more /// IPC messages /// diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index aebef1b8812..515b600feec 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -386,7 +386,8 @@ cdef class PlasmaClient: ------- The object ID associated to the Python object. """ - cdef ObjectID target_id = object_id if object_id else ObjectID.from_random() + cdef ObjectID target_id = (object_id if object_id + else ObjectID.from_random()) # TODO(pcm): Make serialization code support non-sequences and # get rid of packing the value into a list here (and unpacking in get) serialized = pyarrow.serialize([value]) @@ -404,8 +405,8 @@ cdef class PlasmaClient: Parameters ---------- object_ids : list or ObjectID - Object ID or list of object IDs associated to the values we get from - the store. + Object ID or list of object IDs associated to the values we get + from the store. timeout_ms : int, default -1 The number of milliseconds that the get call should block before timing out and returning. Pass -1 if the call should block and 0 @@ -415,14 +416,15 @@ cdef class PlasmaClient: ------- list or object Python value or list of Python values for the data associated with - the object_ids and ObjectNotAvailable if the object was not available. + the object_ids and ObjectNotAvailable if the object was not + available. """ if isinstance(object_ids, collections.Sequence): results = [] buffers = self.get_buffers(object_ids, timeout_ms) for i in range(len(object_ids)): - # buffers[i] is None if this object was not available within the - # timeout + # buffers[i] is None if this object was not available within + # the timeout if buffers[i]: value, = pyarrow.deserialize(buffers[i]) results.append(value) diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index d729c1ef2d2..a831ef29a5e 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -187,8 +187,8 @@ def test_create(self): # Seal the object. self.plasma_client.seal(object_id) # Get the object. - memory_buffer = np.frombuffer(self.plasma_client.get_buffers([object_id])[0], - dtype="uint8") + memory_buffer = np.frombuffer( + self.plasma_client.get_buffers([object_id])[0], dtype="uint8") for i in range(length): assert memory_buffer[i] == i % 256 @@ -241,7 +241,8 @@ def test_get(self): # Test timing out of get with various timeouts. for timeout in [0, 10, 100, 1000]: object_ids = [random_object_id() for _ in range(num_object_ids)] - results = self.plasma_client.get_buffers(object_ids, timeout_ms=timeout) + results = self.plasma_client.get_buffers(object_ids, + timeout_ms=timeout) assert results == num_object_ids * [None] data_buffers = [] @@ -257,7 +258,7 @@ def test_get(self): # timeouts. for timeout in [0, 10, 100, 1000]: data_results = self.plasma_client.get_buffers(object_ids, - timeout_ms=timeout) + timeout_ms=timeout) # metadata_results = self.plasma_client.get_metadata( # object_ids, timeout_ms=timeout) for i in range(num_object_ids): @@ -275,16 +276,16 @@ def test_get(self): def test_put_and_get(self): for value in [["hello", "world", 3, 1.0], None, "hello"]: - object_id = self.plasma_client.put(value) - [result] = self.plasma_client.get([object_id]) - assert result == value + object_id = self.plasma_client.put(value) + [result] = self.plasma_client.get([object_id]) + assert result == value - result = self.plasma_client.get(object_id) - assert result == value + result = self.plasma_client.get(object_id) + assert result == value - object_id = pa.plasma.ObjectID.from_random() - [result] = self.plasma_client.get([object_id], timeout_ms=0) - assert result == pa.plasma.ObjectNotAvailable + object_id = pa.plasma.ObjectID.from_random() + [result] = self.plasma_client.get([object_id], timeout_ms=0) + assert result == pa.plasma.ObjectNotAvailable def test_store_arrow_objects(self): data = np.random.randn(10, 4)