From 03d0baf7f2a5c69fcbf72ca8adafa0a4422664b0 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 25 Aug 2017 11:38:16 -0400 Subject: [PATCH 01/11] Start cuda_ipc file Change-Id: Ife0b48c87b27983352a498f1d3adbcc5d952265b --- cpp/src/arrow/gpu/cuda_ipc.h | 55 ++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 cpp/src/arrow/gpu/cuda_ipc.h diff --git a/cpp/src/arrow/gpu/cuda_ipc.h b/cpp/src/arrow/gpu/cuda_ipc.h new file mode 100644 index 00000000000..ccdc13eb379 --- /dev/null +++ b/cpp/src/arrow/gpu/cuda_ipc.h @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_GPU_CUDA_MEMORY_H +#define ARROW_GPU_CUDA_MEMORY_H + +#include +#include + +#include "arrow/buffer.h" +#include "arrow/status.h" +#include "arrow/util/visibility.h" + +#include "arrow/cuda_memory.h" + +namespace arrow { +namespace gpu { + +/// \brief Write record batch message to GPU device memory +/// +/// +ARROW_EXPORT +SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, + std::shared_ptr* out); + +/// \brief Write record batch to pre-allocated GPU device memory +/// +/// \param[in] batch the record batch to write +/// \param[in] out the CudaBufferWriter to write the output to +/// \return Status +/// +/// The CudaBufferWriter must have enough pre-allocated space to accommodate +/// the record batch. You can use arrow::ipc::GetRecordBatchSize to compute +/// this +ARROW_EXPORT +SerializeRecordBatch(const RecordBatch& batch, CudaBufferWriter* out); + +} // namespace gpu +} // namespace arrow + +#endif // ARROW_GPU_CUDA_MEMORY_H From 3a37fdfec538fc1b779579db382cc5c550121d75 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 23 Aug 2017 17:45:59 -0400 Subject: [PATCH 02/11] Start cuda context class Change-Id: I97ba1480dfc13b1a95b2bb05e68a1de2afe9c7cc --- cpp/src/arrow/gpu/cuda_context.cc | 48 ++++++++++++++++++++++++++++ cpp/src/arrow/gpu/cuda_context.h | 52 +++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 cpp/src/arrow/gpu/cuda_context.cc create mode 100644 cpp/src/arrow/gpu/cuda_context.h diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc new file mode 100644 index 00000000000..f8249df6a8d --- /dev/null +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/gpu/cuda_context.h" + +#include +#include +#include + +#include + +namespace arrow { +namespace gpu { + +class CudaContext::CudaContextImpl { + public: + Status Init() { + CUDADRV_RETURN_NOT_OK(cuInit(0)); + CUDADRV_RETURN_NOT_OK(cuDeviceGetCount(&num_devices_)); + + // Create contexts + device_contexts_.resize(num_devices_); + for (int i = 0; i < num_devices_; ++i) { + CUresult ret = cuCtxCreate(&device_contexts_[i], 0, + } + } + + private: + int num_devices_; + std::vector device_contexts_; +}; + +} // namespace gpu +} // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h new file mode 100644 index 00000000000..5b24ffa87ff --- /dev/null +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_GPU_CUDA_CONTEXT_H +#define ARROW_GPU_CUDA_CONTEXT_H + +#include +#include + +#include "arrow/status.h" + +namespace arrow { +namespace gpu { + +class ARROW_EXPORT CudaDevice {}; + +/// \class CudaContext +/// \brief Friendlier interface to the CUDA driver API +class ARROW_EXPORT CudaContext { + public: + static Status Create(std::unique_ptr* ctx); + + Status Close(); + + Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes); + Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes); + + ~CudaContext(); + + private: + class CudaContextImpl; + std::unique_ptr impl_; +}; + +} // namespace gpu +} // namespace arrow + +#endif // ARROW_GPU_CUDA_CONTEXT_H From 2840c60bfee05f8b8ef44b8d6ae5eef3e0ee90c6 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Fri, 25 Aug 2017 15:43:39 -0400 Subject: [PATCH 03/11] Progress Change-Id: I487458dd66e7f544402a3b1fc91793d492ff114e --- cpp/src/arrow/gpu/cuda-test.cc | 44 ++++++-- cpp/src/arrow/gpu/cuda_common.h | 15 +-- cpp/src/arrow/gpu/cuda_context.cc | 168 ++++++++++++++++++++++++++++-- cpp/src/arrow/gpu/cuda_context.h | 43 +++++++- cpp/src/arrow/gpu/cuda_memory.cc | 65 ++++++------ cpp/src/arrow/gpu/cuda_memory.h | 22 ++-- 6 files changed, 284 insertions(+), 73 deletions(-) diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc index f479701eaeb..76dc103fef1 100644 --- a/cpp/src/arrow/gpu/cuda-test.cc +++ b/cpp/src/arrow/gpu/cuda-test.cc @@ -31,13 +31,29 @@ namespace gpu { constexpr int kGpuNumber = 0; -class TestCudaBuffer : public ::testing::Test {}; +class TestCudaBufferBase : public ::testing::Test { + public: + void SetUp() { + ASSERT_OK(CudaDeviceManager::GetInstance(&manager_)); + ASSERT_OK(manager_->CreateContext(kGpuNumber, &context)); + } + + protected: + CudaDeviceManager* manager_; + std::shared_ptr context_; +}; + +class TestCudaBuffer : public TestCudaBufferBase { + public: + void SetUp() { + TestCudaBufferBase::SetUp(); + } +}; TEST_F(TestCudaBuffer, Allocate) { const int64_t kSize = 100; std::shared_ptr buffer; - - ASSERT_OK(AllocateCudaBuffer(kGpuNumber, kSize, &buffer)); + ASSERT_OK(context_->Allocate(kSize, &buffer)); ASSERT_EQ(kSize, buffer->size()); } @@ -52,7 +68,7 @@ void AssertCudaBufferEquals(const CudaBuffer& buffer, const uint8_t* host_data, TEST_F(TestCudaBuffer, CopyFromHost) { const int64_t kSize = 1000; std::shared_ptr device_buffer; - ASSERT_OK(AllocateCudaBuffer(kGpuNumber, kSize, &device_buffer)); + ASSERT_OK(context_->Allocate(kSize, &device_buffer)); std::shared_ptr host_buffer; ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), &host_buffer)); @@ -63,10 +79,14 @@ TEST_F(TestCudaBuffer, CopyFromHost) { AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize); } -class TestCudaBufferWriter : public ::testing::Test { +class TestCudaBufferWriter : public TestCudaBufferBase { public: + void SetUp() { + TestCudaBufferBase::SetUp(); + } + void Allocate(const int64_t size) { - ASSERT_OK(AllocateCudaBuffer(kGpuNumber, size, &device_buffer_)); + ASSERT_OK(context_->Allocate(size, &device_buffer_)); writer_.reset(new CudaBufferWriter(device_buffer_)); } @@ -105,6 +125,9 @@ class TestCudaBufferWriter : public ::testing::Test { } protected: + CudaDeviceManager* manager_; + std::shared_ptr context_; + std::shared_ptr device_buffer_; std::unique_ptr writer_; }; @@ -164,7 +187,14 @@ TEST_F(TestCudaBufferWriter, EdgeCases) { AssertCudaBufferEquals(*device_buffer_, host_data, 1000); } -TEST(TestCudaBufferReader, Basics) { +class TestCudaBufferReader : public TestCudaBufferBase { + public: + void SetUp() { + TestCudaBufferBase::SetUp(); + } +}; + +TEST_F(TestCudaBufferReader, Basics) { std::shared_ptr device_buffer; const int64_t size = 1000; diff --git a/cpp/src/arrow/gpu/cuda_common.h b/cpp/src/arrow/gpu/cuda_common.h index 1d65f96adbc..06579e54717 100644 --- a/cpp/src/arrow/gpu/cuda_common.h +++ b/cpp/src/arrow/gpu/cuda_common.h @@ -22,7 +22,7 @@ #include -#include +#include namespace arrow { namespace gpu { @@ -34,18 +34,7 @@ namespace gpu { (void)ret; \ } while (0) -#define CUDA_RETURN_NOT_OK(STMT) \ - do { \ - cudaError_t ret = (STMT); \ - if (ret != cudaSuccess) { \ - std::stringstream ss; \ - ss << "Cuda API call in " << __FILE__ << " at line " << __LINE__ \ - << " failed: " << #STMT; \ - return Status::IOError(ss.str()); \ - } \ - } while (0) - -#define CUDADRV_RETURN_NOT_OK(STMT) \ +#define CU_RETURN_NOT_OK(STMT) \ do { \ CUresult ret = (STMT); \ if (ret != CUDA_SUCCESS) { \ diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index f8249df6a8d..d5b9489a583 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -26,23 +26,179 @@ namespace arrow { namespace gpu { +struct CudaDevice { + int device_num; + CUdevice handle; + int64_t total_memory; +}; + class CudaContext::CudaContextImpl { public: + CudaContextImpl() {} + + Status Init(const CudaDevice& device) { + device_ = device; + CU_RETURN_NOT_OK(cuCtxCreate(context_, 0, device_.handle)); + is_open_ = true; + return Status::OK(); + } + + Status Close() { + if (is_open_ && own_context_) { + CU_RETURN_NOT_OK(cuCtxDestroy(context_)); + } + is_open_ = false; + return Status::OK(); + } + + int64_t bytes_allocated() const { return bytes_allocated_.load(); } + + Status Allocate(int64_t nbytes, uint8_t** out) { + CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); + + CUdeviceptr data; + CU_RETURN_NOT_OK(cuMemAlloc(&data, nbytes)); + *out = reinterpret_cast(data); + return Status::OK(); + } + + Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes) { + CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); + CU_RETURN_NOT_OK(cuMemcpyDtoH(reinterpret_cast(dst), + src, nbytes)); + return Statsu::OK(); + } + + Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes) { + CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); + CU_RETURN_NOT_OK(cuMemcpyHtoD(src, reinterpret_cast(src), + nbytes)); + return Status::OK(); + } + + Status Free(uint8_t* device_ptr, int64_t nbytes) { + CU_RETURN_NOT_OK(cuMemFree(reinterpret_cast(device_ptr))); + return Status::OK(); + } + + const CudaDevice device() const { return device_; } + + private: + CudaDevice device_; + CUcontext context_; + bool is_open_; + + // So that we can utilize a CUcontext that was created outside this library + bool own_context_; + + std::atomic bytes_allocated_; +}; + +class CudaDeviceManager::CudaDeviceManagerImpl { + public: + CudaDeviceManagerImpl() + : host_bytes_allocated_(0) {} + Status Init() { - CUDADRV_RETURN_NOT_OK(cuInit(0)); - CUDADRV_RETURN_NOT_OK(cuDeviceGetCount(&num_devices_)); + CU_RETURN_NOT_OK(cuInit(0)); + CU_RETURN_NOT_OK(cuDeviceGetCount(&num_devices_)); - // Create contexts - device_contexts_.resize(num_devices_); + devices_.resize(num_devices_); for (int i = 0; i < num_devices_; ++i) { - CUresult ret = cuCtxCreate(&device_contexts_[i], 0, + RETURN_NOT_OK(GetDeviceProperties(i, &devices_[i])); } + return Status::OK(); + } + + Status AllocateHost(int64_t nbytes, uint8_t** out) { + CU_RETURN_NOT_OK(cuMemHostAlloc(reinterpret_cast(out), + nbytes, CU_MEMHOSTALLOC_PORTABLE)); + host_bytes_allocated_ += nbytes; + return Status::OK(); + } + + Status FreeHost(uint8_t* data, int64_t nbytes) { + CU_RETURN_NOT_OK(cuMemFreeHost(data)); + host_bytes_allocated_ -= nbytes; + return Status::OK(); + } + + Status GetDeviceProperties(int device_number, CudaDevice* device) { + device->device_num = device_number; + CU_RETURN_NOT_OK(cuDeviceGet(&device->handle, device_number)); + + size_t total_memory = 0; + CU_RETURN_NOT_OK(cuDeviceTotalMem(&total_memory, device->handle)); + device->total_memory = total_memory; + return Status::OK(); + } + + Status CreateContext(int device_number, std::shared_ptr* out) { + *out = std::shared_ptr(new CudaContext()); + return (*out)->impl_->Init(devices_[i]); } + int num_devices() const { return num_devices_; } + private: int num_devices_; - std::vector device_contexts_; + std::vector devices_; + + int host_bytes_allocated_; }; +CudaDeviceManager::CudaDeviceManager() { + impl_.reset(new CudaDeviceManagerImpl()); +} + +Status CudaDeviceManager::GetInstance(CudaDeviceManager** manager) { + if (!instance_) { + instance_.reset(new CudaDeviceManager()); + RETURN_NOT_OK(instance_->impl_->Init()); + } + *manager = instance_.get(); + return Status::OK(); +} + +Status CudaDeviceManager::Create(int device_number, + std::shared_ptr* out) { + return impl_->Create(device_number, out); +} + +Status CudaDeviceManager::AllocateHost(int64_t nbytes, + std::shared_ptr* out) { + uint8_t* data; + RETURN_NOT_OK(impl_->AllocateHost(nbytes, &data)); + *out = std::shared_ptr(data, nbytes); + return Status::OK(); +} + +Status CudaDeviceManager::FreeHost(uint8_t* data, int64_t nbytes) { + return impl_->FreeHost(data, nbytes)); +} + +int CudaDeviceManager::num_devices() const { + return impl_->num_devices(); +} + +// ---------------------------------------------------------------------- +// CudaContext public API + +Status CudaContext::Allocate(int64_t nbytes, std::shared_ptr* out) { + return impl_->AllocateHost(nbytes, out); +} + +Status CudaContext::CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes) { + return impl_->CopyHostToDevice(dst, src, nbytes); +} + +Status CudaContext::CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes) { + return impl_->CopyDeviceToHost(dst, src, nbytes); +} + +Status CudaContext::Free(uint8_t* device_ptr, int64_t nbytes) { + return impl_->Free(device_ptr, nbytes); +} + } // namespace gpu } // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h index 5b24ffa87ff..052d1e374ff 100644 --- a/cpp/src/arrow/gpu/cuda_context.h +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -22,28 +22,63 @@ #include #include "arrow/status.h" +#include "arrow/util/visibility.h" namespace arrow { namespace gpu { -class ARROW_EXPORT CudaDevice {}; +class CudaBuffer; +class CudaHostBuffer; + +// Forward declaration +class CudaContext; + +class ARROW_EXPORT CudaDeviceManager { + public: + static Status GetInstance(CudaDeviceManager** manager); + + /// \brief Create a CUDA driver context for a particular device + Status CreateContext(int gpu_number, std::shared_ptr* ctx); + + Status AllocateHost(int64_t nbytes, std::shared_ptr* buffer); + Status FreeHost(uint8_t* data, int64_t nbytes); + + int num_devices() const; + + private: + std::unique_ptr instance_; + + class CudaDeviceManagerImpl; + std::unique_ptr impl_; + + friend CudaContext; +}; + +struct ARROW_EXPORT CudaDeviceInfo {}; /// \class CudaContext /// \brief Friendlier interface to the CUDA driver API class ARROW_EXPORT CudaContext { public: - static Status Create(std::unique_ptr* ctx); + ~CudaContext(); - Status Close(); + Status Destroy(); Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes); Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes); - ~CudaContext(); + Status Allocate(int64_t nbytes, std::shared_ptr* buffer); + Status Free(uint8_t* device_ptr, int64_t nbytes); + + int64_t bytes_allocated() const; private: + CudaContext(); + class CudaContextImpl; std::unique_ptr impl_; + + friend CudaDeviceManager::CudaDeviceManagerImpl; }; } // namespace gpu diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index 4870813b661..bf77ec1abfe 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -27,60 +27,59 @@ #include "arrow/util/logging.h" #include "arrow/gpu/cuda_common.h" +#include "arrow/gpu/cuda_context.h" namespace arrow { namespace gpu { CudaBuffer::~CudaBuffer() { if (own_data_) { - CUDA_DCHECK(cudaFree(mutable_data_)); + DCHECK(context_->Free(mutable_data_, size_).ok()); } } CudaBuffer::CudaBuffer(const std::shared_ptr& parent, const int64_t offset, const int64_t size) - : Buffer(parent, offset, size), gpu_number_(parent->gpu_number()) {} + : Buffer(parent, offset, size), + context_(parent->context()) {} Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes, uint8_t* out) const { - CUDA_RETURN_NOT_OK(cudaMemcpy(out, data_ + position, nbytes, cudaMemcpyDeviceToHost)); - return Status::OK(); + return context_->CopyDeviceToHost(out, data_ + position, nbytes); } Status CudaBuffer::CopyFromHost(const int64_t position, const uint8_t* data, int64_t nbytes) { DCHECK_LE(nbytes, size_ - position) << "Copy would overflow buffer"; - CUDA_RETURN_NOT_OK( - cudaMemcpy(mutable_data_ + position, data, nbytes, cudaMemcpyHostToDevice)); - return Status::OK(); + return context_->CopyHostToDevice(mutable_data_ + position, data, nbytes); } -Status AllocateCudaBuffer(int gpu_number, const int64_t size, +Status AllocateCudaBuffer(const int64_t size, + std::shared_ptr& context, std::shared_ptr* out) { - CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number)); - uint8_t* data = nullptr; - CUDA_RETURN_NOT_OK( - cudaMalloc(reinterpret_cast(&data), static_cast(size))); - *out = std::make_shared(data, size, gpu_number, true); - return Status::OK(); + return context->Allocate(size, out); } -CudaHostBuffer::~CudaHostBuffer() { CUDA_DCHECK(cudaFreeHost(mutable_data_)); } +CudaHostBuffer::~CudaHostBuffer() { + CudaDeviceManager* manager = nullptr; + DCHECK(CudaDeviceManager::GetInstance(&manager).ok()); + DCHECK(manager->FreeHost(mutable_data_, size_).ok()); +} // ---------------------------------------------------------------------- // CudaBufferReader CudaBufferReader::CudaBufferReader(const std::shared_ptr& buffer) - : io::BufferReader(buffer), cuda_buffer_(buffer), gpu_number_(buffer->gpu_number()) {} + : io::BufferReader(buffer), + cuda_buffer_(buffer), + context_(buffer->context()) {} CudaBufferReader::~CudaBufferReader() {} Status CudaBufferReader::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) { nbytes = std::min(nbytes, size_ - position_); - CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_)); - CUDA_RETURN_NOT_OK( - cudaMemcpy(buffer, data_ + position_, nbytes, cudaMemcpyDeviceToHost)); *bytes_read = nbytes; + RETURN_NOT_OK(context_->CopyDeviceToHost(buffer, data_ + position_, nbytes)); position_ += nbytes; return Status::OK(); } @@ -97,7 +96,7 @@ Status CudaBufferReader::Read(int64_t nbytes, std::shared_ptr* out) { CudaBufferWriter::CudaBufferWriter(const std::shared_ptr& buffer) : io::FixedSizeBufferWriter(buffer), - gpu_number_(buffer->gpu_number()), + context_(buffer->context()), buffer_size_(0), buffer_position_(0) {} @@ -108,10 +107,10 @@ Status CudaBufferWriter::Close() { return Flush(); } Status CudaBufferWriter::Flush() { if (buffer_size_ > 0 && buffer_position_ > 0) { // Only need to flush when the write has been buffered - CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_)); - CUDA_RETURN_NOT_OK(cudaMemcpy(mutable_data_ + position_ - buffer_position_, - host_buffer_data_, buffer_position_, - cudaMemcpyHostToDevice)); + RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_ - + buffer_position_, + host_buffer_data_, + buffer_position_)); buffer_position_ = 0; } return Status::OK(); @@ -137,9 +136,8 @@ Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) { if (nbytes + buffer_position_ >= buffer_size_) { // Reach end of buffer, write everything RETURN_NOT_OK(Flush()); - CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_)); - CUDA_RETURN_NOT_OK( - cudaMemcpy(mutable_data_ + position_, data, nbytes, cudaMemcpyHostToDevice)); + RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, + data, nbytes)); } else { // Write bytes to buffer std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes); @@ -147,9 +145,8 @@ Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) { } } else { // Unbuffered write - CUDA_RETURN_NOT_OK(cudaSetDevice(gpu_number_)); - CUDA_RETURN_NOT_OK( - cudaMemcpy(mutable_data_ + position_, data, nbytes, cudaMemcpyHostToDevice)); + RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, + data, nbytes)); } position_ += nbytes; return Status::OK(); @@ -169,11 +166,9 @@ Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) { // ---------------------------------------------------------------------- Status AllocateCudaHostBuffer(const int64_t size, std::shared_ptr* out) { - uint8_t* data = nullptr; - CUDA_RETURN_NOT_OK( - cudaMallocHost(reinterpret_cast(&data), static_cast(size))); - *out = std::make_shared(data, size); - return Status::OK(); + CudaDeviceManager* manager = nullptr; + RETURN_NOT_OK(CudaDeviceManager::GetInstance(&manager)); + return manager->AllocateHost(size, out); } } // namespace gpu diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index bd8b89a75ed..da3124d08be 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -26,6 +26,8 @@ #include "arrow/memory_pool.h" #include "arrow/status.h" +#include "arrow/gpu/cuda_context.h" + namespace arrow { namespace gpu { @@ -35,8 +37,10 @@ namespace gpu { /// Be careful using this in any Arrow code which may not be GPU-aware class ARROW_EXPORT CudaBuffer : public Buffer { public: - CudaBuffer(uint8_t* data, int64_t size, const int gpu_number, bool own_data = false) - : Buffer(data, size), gpu_number_(gpu_number), own_data_(own_data) { + CudaBuffer(uint8_t* data, int64_t size, + const std::shared_ptr& context, + bool own_data = false) + : Buffer(data, size), context_(context), own_data_(own_data) { is_mutable_ = true; mutable_data_ = data; } @@ -58,10 +62,10 @@ class ARROW_EXPORT CudaBuffer : public Buffer { /// \return Status Status CopyFromHost(const int64_t position, const uint8_t* data, int64_t nbytes); - int gpu_number() const { return gpu_number_; } + std::shared_ptr context() const { return context_; } private: - const int gpu_number_; + std::shared_ptr context_; bool own_data_; }; @@ -98,7 +102,7 @@ class ARROW_EXPORT CudaBufferReader : public io::BufferReader { private: std::shared_ptr cuda_buffer_; - int gpu_number_; + std::shared_ptr context_; }; /// \class CudaBufferWriter @@ -132,7 +136,7 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter { int64_t num_bytes_buffered() const { return buffer_position_; } private: - int gpu_number_; + std::shared_ptr context_; // Pinned host buffer for buffering writes on CPU before calling cudaMalloc int64_t buffer_size_; @@ -147,7 +151,8 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter { /// \param[out] out the allocated buffer /// \return Status ARROW_EXPORT -Status AllocateCudaBuffer(const int gpu_number, const int64_t size, +Status AllocateCudaBuffer(const int64_t size, + const std::shared_ptr& context, std::shared_ptr* out); /// \brief Allocate CUDA-accessible memory on CPU host @@ -155,7 +160,8 @@ Status AllocateCudaBuffer(const int gpu_number, const int64_t size, /// \param[out] out the allocated buffer /// \return Status ARROW_EXPORT -Status AllocateCudaHostBuffer(const int64_t size, std::shared_ptr* out); +Status AllocateCudaHostBuffer(const int64_t size, + std::shared_ptr* out); } // namespace gpu } // namespace arrow From 5d686fe963f13cddc59e22641b63643f6d21a9ab Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 26 Aug 2017 14:10:25 -0400 Subject: [PATCH 04/11] More progress Change-Id: I3ad52acd665869f0755566f3d7331dcf6567d654 --- cpp/src/arrow/gpu/CMakeLists.txt | 1 + cpp/src/arrow/gpu/cuda-benchmark.cc | 7 ++++++- cpp/src/arrow/gpu/cuda-test.cc | 4 ++-- cpp/src/arrow/gpu/cuda_context.cc | 1 + 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/gpu/CMakeLists.txt b/cpp/src/arrow/gpu/CMakeLists.txt index cab085302c6..96569fe2a94 100644 --- a/cpp/src/arrow/gpu/CMakeLists.txt +++ b/cpp/src/arrow/gpu/CMakeLists.txt @@ -73,6 +73,7 @@ find_package(CUDA REQUIRED) include_directories(SYSTEM ${CUDA_INCLUDE_DIRS}) set(ARROW_GPU_SRCS + cuda_context.cc cuda_memory.cc ) diff --git a/cpp/src/arrow/gpu/cuda-benchmark.cc b/cpp/src/arrow/gpu/cuda-benchmark.cc index 82caacc05e4..9936ac674ec 100644 --- a/cpp/src/arrow/gpu/cuda-benchmark.cc +++ b/cpp/src/arrow/gpu/cuda-benchmark.cc @@ -35,8 +35,13 @@ constexpr int64_t kGpuNumber = 0; static void CudaBufferWriterBenchmark(benchmark::State& state, const int64_t total_bytes, const int64_t chunksize, const int64_t buffer_size) { + CudaDeviceManager* manager; + ABORT_NOT_OK(CudaDeviceManager::GetInstance(&manager)); + std::shared_ptr context; + ABORT_NOT_OK(manager->CreateContext(kGpuNumber, &context)); + std::shared_ptr device_buffer; - ABORT_NOT_OK(AllocateCudaBuffer(kGpuNumber, total_bytes, &device_buffer)); + ABORT_NOT_OK(context->Allocate(total_bytes, &device_buffer)); CudaBufferWriter writer(device_buffer); if (buffer_size > 0) { diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc index 76dc103fef1..aec25522e92 100644 --- a/cpp/src/arrow/gpu/cuda-test.cc +++ b/cpp/src/arrow/gpu/cuda-test.cc @@ -35,7 +35,7 @@ class TestCudaBufferBase : public ::testing::Test { public: void SetUp() { ASSERT_OK(CudaDeviceManager::GetInstance(&manager_)); - ASSERT_OK(manager_->CreateContext(kGpuNumber, &context)); + ASSERT_OK(manager_->CreateContext(kGpuNumber, &context_)); } protected: @@ -198,7 +198,7 @@ TEST_F(TestCudaBufferReader, Basics) { std::shared_ptr device_buffer; const int64_t size = 1000; - ASSERT_OK(AllocateCudaBuffer(kGpuNumber, size, &device_buffer)); + ASSERT_OK(context_->Allocate(size, &device_buffer)); std::shared_ptr buffer; ASSERT_OK(test::MakeRandomBytePoolBuffer(1000, default_memory_pool(), &buffer)); diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index d5b9489a583..d323eb7c022 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -17,6 +17,7 @@ #include "arrow/gpu/cuda_context.h" +#include #include #include #include From f3c724e022cb84fed6e69928ec29da0b30695908 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 27 Aug 2017 11:41:33 -0400 Subject: [PATCH 05/11] Get things compiling / linking using driver API Change-Id: I24d9d9510c8164dea36d83028b3c4bdbddbe2d85 --- cpp/src/arrow/gpu/CMakeLists.txt | 26 ++++-------- cpp/src/arrow/gpu/cuda-benchmark.cc | 4 +- cpp/src/arrow/gpu/cuda-test.cc | 10 ++--- cpp/src/arrow/gpu/cuda_context.cc | 63 ++++++++++++++++++++--------- cpp/src/arrow/gpu/cuda_context.h | 9 +++-- cpp/src/arrow/gpu/cuda_memory.cc | 7 +++- 6 files changed, 68 insertions(+), 51 deletions(-) diff --git a/cpp/src/arrow/gpu/CMakeLists.txt b/cpp/src/arrow/gpu/CMakeLists.txt index 96569fe2a94..4646ccae20c 100644 --- a/cpp/src/arrow/gpu/CMakeLists.txt +++ b/cpp/src/arrow/gpu/CMakeLists.txt @@ -79,28 +79,16 @@ set(ARROW_GPU_SRCS set(ARROW_GPU_SHARED_LINK_LIBS arrow_shared + ${CUDA_LIBRARIES} + ${CUDA_CUDA_LIBRARY} ) -add_library(arrow_gpu_objlib OBJECT - ${ARROW_GPU_SRCS} +ADD_ARROW_LIB(arrow_gpu + SOURCES ${ARROW_GPU_SRCS} + SHARED_LINK_FLAGS "" + SHARED_LINK_LIBS ${ARROW_GPU_SHARED_LINK_LIBS} + STATIC_LINK_LIBS "" ) -set_property(TARGET arrow_gpu_objlib PROPERTY POSITION_INDEPENDENT_CODE 1) - -if (ARROW_BUILD_SHARED) - cuda_add_library(arrow_gpu_shared SHARED $) - install(TARGETS arrow_gpu_shared - RUNTIME DESTINATION bin - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} - ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) -endif() - -if (ARROW_BUILD_STATIC) - add_library(arrow_gpu_static STATIC $) - install(TARGETS arrow_gpu_static - RUNTIME DESTINATION bin - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} - ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}) -endif() install(FILES cuda_common.h diff --git a/cpp/src/arrow/gpu/cuda-benchmark.cc b/cpp/src/arrow/gpu/cuda-benchmark.cc index 9936ac674ec..26395ea9bf1 100644 --- a/cpp/src/arrow/gpu/cuda-benchmark.cc +++ b/cpp/src/arrow/gpu/cuda-benchmark.cc @@ -38,10 +38,10 @@ static void CudaBufferWriterBenchmark(benchmark::State& state, const int64_t tot CudaDeviceManager* manager; ABORT_NOT_OK(CudaDeviceManager::GetInstance(&manager)); std::shared_ptr context; - ABORT_NOT_OK(manager->CreateContext(kGpuNumber, &context)); + ABORT_NOT_OK(manager->GetContext(kGpuNumber, &context)); std::shared_ptr device_buffer; - ABORT_NOT_OK(context->Allocate(total_bytes, &device_buffer)); + ABORT_NOT_OK(AllocateCudaBuffer(total_bytes, context, &device_buffer)); CudaBufferWriter writer(device_buffer); if (buffer_size > 0) { diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc index aec25522e92..6bf61a37a5e 100644 --- a/cpp/src/arrow/gpu/cuda-test.cc +++ b/cpp/src/arrow/gpu/cuda-test.cc @@ -35,7 +35,7 @@ class TestCudaBufferBase : public ::testing::Test { public: void SetUp() { ASSERT_OK(CudaDeviceManager::GetInstance(&manager_)); - ASSERT_OK(manager_->CreateContext(kGpuNumber, &context_)); + ASSERT_OK(manager_->GetContext(kGpuNumber, &context_)); } protected: @@ -53,7 +53,7 @@ class TestCudaBuffer : public TestCudaBufferBase { TEST_F(TestCudaBuffer, Allocate) { const int64_t kSize = 100; std::shared_ptr buffer; - ASSERT_OK(context_->Allocate(kSize, &buffer)); + ASSERT_OK(AllocateCudaBuffer(kSize, context_, &buffer)); ASSERT_EQ(kSize, buffer->size()); } @@ -68,7 +68,7 @@ void AssertCudaBufferEquals(const CudaBuffer& buffer, const uint8_t* host_data, TEST_F(TestCudaBuffer, CopyFromHost) { const int64_t kSize = 1000; std::shared_ptr device_buffer; - ASSERT_OK(context_->Allocate(kSize, &device_buffer)); + ASSERT_OK(AllocateCudaBuffer(kSize, context_, &device_buffer)); std::shared_ptr host_buffer; ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), &host_buffer)); @@ -86,7 +86,7 @@ class TestCudaBufferWriter : public TestCudaBufferBase { } void Allocate(const int64_t size) { - ASSERT_OK(context_->Allocate(size, &device_buffer_)); + ASSERT_OK(AllocateCudaBuffer(size, context_, &device_buffer_)); writer_.reset(new CudaBufferWriter(device_buffer_)); } @@ -198,7 +198,7 @@ TEST_F(TestCudaBufferReader, Basics) { std::shared_ptr device_buffer; const int64_t size = 1000; - ASSERT_OK(context_->Allocate(size, &device_buffer)); + ASSERT_OK(AllocateCudaBuffer(size, context_, &device_buffer)); std::shared_ptr buffer; ASSERT_OK(test::MakeRandomBytePoolBuffer(1000, default_memory_pool(), &buffer)); diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index d323eb7c022..dd0b29b6d4b 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -21,9 +21,14 @@ #include #include #include +#include +#include #include +#include "arrow/gpu/cuda_common.h" +#include "arrow/gpu/cuda_memory.h" + namespace arrow { namespace gpu { @@ -39,7 +44,7 @@ class CudaContext::CudaContextImpl { Status Init(const CudaDevice& device) { device_ = device; - CU_RETURN_NOT_OK(cuCtxCreate(context_, 0, device_.handle)); + CU_RETURN_NOT_OK(cuCtxCreate(&context_, 0, device_.handle)); is_open_ = true; return Status::OK(); } @@ -58,22 +63,23 @@ class CudaContext::CudaContextImpl { CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); CUdeviceptr data; - CU_RETURN_NOT_OK(cuMemAlloc(&data, nbytes)); + CU_RETURN_NOT_OK(cuMemAlloc(&data, static_cast(nbytes))); *out = reinterpret_cast(data); return Status::OK(); } Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes) { CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); - CU_RETURN_NOT_OK(cuMemcpyDtoH(reinterpret_cast(dst), - src, nbytes)); - return Statsu::OK(); + CU_RETURN_NOT_OK(cuMemcpyHtoD(reinterpret_cast(dst), + reinterpret_cast(src), + static_cast(nbytes))); + return Status::OK(); } Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes) { CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); - CU_RETURN_NOT_OK(cuMemcpyHtoD(src, reinterpret_cast(src), - nbytes)); + CU_RETURN_NOT_OK(cuMemcpyDtoH(dst, reinterpret_cast(src), + static_cast(nbytes))); return Status::OK(); } @@ -113,7 +119,8 @@ class CudaDeviceManager::CudaDeviceManagerImpl { Status AllocateHost(int64_t nbytes, uint8_t** out) { CU_RETURN_NOT_OK(cuMemHostAlloc(reinterpret_cast(out), - nbytes, CU_MEMHOSTALLOC_PORTABLE)); + static_cast(nbytes), + CU_MEMHOSTALLOC_PORTABLE)); host_bytes_allocated_ += nbytes; return Status::OK(); } @@ -134,9 +141,16 @@ class CudaDeviceManager::CudaDeviceManagerImpl { return Status::OK(); } - Status CreateContext(int device_number, std::shared_ptr* out) { - *out = std::shared_ptr(new CudaContext()); - return (*out)->impl_->Init(devices_[i]); + Status GetContext(int device_number, std::shared_ptr* out) { + auto it = contexts_.find(device_number); + if (it == contexts_.end()) { + auto ctx = std::shared_ptr(new CudaContext()); + RETURN_NOT_OK(ctx->impl_->Init(devices_[device_number])); + contexts_[device_number] = *out = ctx; + } else { + *out = it->second; + } + return Status::OK(); } int num_devices() const { return num_devices_; } @@ -145,6 +159,9 @@ class CudaDeviceManager::CudaDeviceManagerImpl { int num_devices_; std::vector devices_; + // device_number -> CudaContext + std::unordered_map> contexts_; + int host_bytes_allocated_; }; @@ -152,6 +169,8 @@ CudaDeviceManager::CudaDeviceManager() { impl_.reset(new CudaDeviceManagerImpl()); } +std::unique_ptr CudaDeviceManager::instance_ = nullptr; + Status CudaDeviceManager::GetInstance(CudaDeviceManager** manager) { if (!instance_) { instance_.reset(new CudaDeviceManager()); @@ -161,21 +180,21 @@ Status CudaDeviceManager::GetInstance(CudaDeviceManager** manager) { return Status::OK(); } -Status CudaDeviceManager::Create(int device_number, - std::shared_ptr* out) { - return impl_->Create(device_number, out); +Status CudaDeviceManager::GetContext(int device_number, + std::shared_ptr* out) { + return impl_->GetContext(device_number, out); } Status CudaDeviceManager::AllocateHost(int64_t nbytes, std::shared_ptr* out) { - uint8_t* data; + uint8_t* data = nullptr; RETURN_NOT_OK(impl_->AllocateHost(nbytes, &data)); - *out = std::shared_ptr(data, nbytes); + *out = std::make_shared(data, nbytes); return Status::OK(); } Status CudaDeviceManager::FreeHost(uint8_t* data, int64_t nbytes) { - return impl_->FreeHost(data, nbytes)); + return impl_->FreeHost(data, nbytes); } int CudaDeviceManager::num_devices() const { @@ -185,8 +204,14 @@ int CudaDeviceManager::num_devices() const { // ---------------------------------------------------------------------- // CudaContext public API -Status CudaContext::Allocate(int64_t nbytes, std::shared_ptr* out) { - return impl_->AllocateHost(nbytes, out); +CudaContext::CudaContext() { + impl_.reset(new CudaContextImpl()); +} + +CudaContext::~CudaContext() {} + +Status CudaContext::Allocate(int64_t nbytes, uint8_t** out) { + return impl_->Allocate(nbytes, out); } Status CudaContext::CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes) { diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h index 052d1e374ff..c0626734674 100644 --- a/cpp/src/arrow/gpu/cuda_context.h +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -37,8 +37,8 @@ class ARROW_EXPORT CudaDeviceManager { public: static Status GetInstance(CudaDeviceManager** manager); - /// \brief Create a CUDA driver context for a particular device - Status CreateContext(int gpu_number, std::shared_ptr* ctx); + /// \brief Get the CUDA driver context for a particular device + Status GetContext(int gpu_number, std::shared_ptr* ctx); Status AllocateHost(int64_t nbytes, std::shared_ptr* buffer); Status FreeHost(uint8_t* data, int64_t nbytes); @@ -46,7 +46,8 @@ class ARROW_EXPORT CudaDeviceManager { int num_devices() const; private: - std::unique_ptr instance_; + CudaDeviceManager(); + static std::unique_ptr instance_; class CudaDeviceManagerImpl; std::unique_ptr impl_; @@ -67,7 +68,7 @@ class ARROW_EXPORT CudaContext { Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes); Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes); - Status Allocate(int64_t nbytes, std::shared_ptr* buffer); + Status Allocate(int64_t nbytes, uint8_t** out); Status Free(uint8_t* device_ptr, int64_t nbytes); int64_t bytes_allocated() const; diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index bf77ec1abfe..fe89aa5cb63 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -55,9 +55,12 @@ Status CudaBuffer::CopyFromHost(const int64_t position, const uint8_t* data, } Status AllocateCudaBuffer(const int64_t size, - std::shared_ptr& context, + const std::shared_ptr& context, std::shared_ptr* out) { - return context->Allocate(size, out); + uint8_t* data = nullptr; + RETURN_NOT_OK(context->Allocate(size, &data)); + *out = std::make_shared(data, size, context); + return Status::OK(); } CudaHostBuffer::~CudaHostBuffer() { From 508febb5c59255802f34c8779420c505e55019e5 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 27 Aug 2017 11:51:31 -0400 Subject: [PATCH 06/11] Test suite passing again Change-Id: I844b83e2e88c10d60f7c3d6bdd818c6642905a81 --- cpp/src/arrow/gpu/CMakeLists.txt | 48 +------------------------------ cpp/src/arrow/gpu/cuda-test.cc | 15 ++-------- cpp/src/arrow/gpu/cuda_common.h | 2 +- cpp/src/arrow/gpu/cuda_context.cc | 15 +++------- cpp/src/arrow/gpu/cuda_memory.cc | 23 ++++++--------- cpp/src/arrow/gpu/cuda_memory.h | 9 ++---- 6 files changed, 20 insertions(+), 92 deletions(-) diff --git a/cpp/src/arrow/gpu/CMakeLists.txt b/cpp/src/arrow/gpu/CMakeLists.txt index 4646ccae20c..d213ff20234 100644 --- a/cpp/src/arrow/gpu/CMakeLists.txt +++ b/cpp/src/arrow/gpu/CMakeLists.txt @@ -15,52 +15,6 @@ # specific language governing permissions and limitations # under the License. -function(ADD_ARROW_CUDA_TEST REL_TEST_NAME) - set(options) - set(single_value_args) - set(multi_value_args STATIC_LINK_LIBS) - cmake_parse_arguments(ARG "${options}" "${one_value_args}" "${multi_value_args}" ${ARGN}) - if(ARG_UNPARSED_ARGUMENTS) - message(SEND_ERROR "Error: unrecognized arguments: ${ARG_UNPARSED_ARGUMENTS}") - endif() - - if(NO_TESTS OR NOT ARROW_BUILD_STATIC) - return() - endif() - get_filename_component(TEST_NAME ${REL_TEST_NAME} NAME_WE) - - if(EXISTS ${CMAKE_CURRENT_SOURCE_DIR}/${REL_TEST_NAME}.cc) - # This test has a corresponding .cc file, set it up as an executable. - set(TEST_PATH "${EXECUTABLE_OUTPUT_PATH}/${TEST_NAME}") - cuda_add_executable(${TEST_NAME} "${REL_TEST_NAME}.cc") - - if (ARG_STATIC_LINK_LIBS) - # Customize link libraries - target_link_libraries(${TEST_NAME} ${ARG_STATIC_LINK_LIBS}) - else() - target_link_libraries(${TEST_NAME} ${ARROW_TEST_LINK_LIBS}) - endif() - add_dependencies(unittest ${TEST_NAME}) - else() - # No executable, just invoke the test (probably a script) directly. - set(TEST_PATH ${CMAKE_CURRENT_SOURCE_DIR}/${REL_TEST_NAME}) - endif() - - if (ARROW_TEST_MEMCHECK) - SET_PROPERTY(TARGET ${TEST_NAME} - APPEND_STRING PROPERTY - COMPILE_FLAGS " -DARROW_VALGRIND") - add_test(${TEST_NAME} - bash -c "cd ${EXECUTABLE_OUTPUT_PATH}; valgrind --tool=memcheck --leak-check=full --leak-check-heuristics=stdstring --error-exitcode=1 ${TEST_PATH}") - elseif(MSVC) - add_test(${TEST_NAME} ${TEST_PATH}) - else() - add_test(${TEST_NAME} - ${BUILD_SUPPORT_DIR}/run-test.sh ${CMAKE_BINARY_DIR} test ${TEST_PATH}) - endif() - set_tests_properties(${TEST_NAME} PROPERTIES LABELS "unittest") -endfunction() - ####################################### # arrow_gpu ####################################### @@ -117,7 +71,7 @@ set(ARROW_GPU_TEST_LINK_LIBS ${ARROW_TEST_LINK_LIBS}) if (ARROW_BUILD_TESTS) - ADD_ARROW_CUDA_TEST(cuda-test + ADD_ARROW_TEST(cuda-test STATIC_LINK_LIBS ${ARROW_GPU_TEST_LINK_LIBS}) endif() diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc index 6bf61a37a5e..6fed8d3a7e4 100644 --- a/cpp/src/arrow/gpu/cuda-test.cc +++ b/cpp/src/arrow/gpu/cuda-test.cc @@ -45,9 +45,7 @@ class TestCudaBufferBase : public ::testing::Test { class TestCudaBuffer : public TestCudaBufferBase { public: - void SetUp() { - TestCudaBufferBase::SetUp(); - } + void SetUp() { TestCudaBufferBase::SetUp(); } }; TEST_F(TestCudaBuffer, Allocate) { @@ -81,9 +79,7 @@ TEST_F(TestCudaBuffer, CopyFromHost) { class TestCudaBufferWriter : public TestCudaBufferBase { public: - void SetUp() { - TestCudaBufferBase::SetUp(); - } + void SetUp() { TestCudaBufferBase::SetUp(); } void Allocate(const int64_t size) { ASSERT_OK(AllocateCudaBuffer(size, context_, &device_buffer_)); @@ -125,9 +121,6 @@ class TestCudaBufferWriter : public TestCudaBufferBase { } protected: - CudaDeviceManager* manager_; - std::shared_ptr context_; - std::shared_ptr device_buffer_; std::unique_ptr writer_; }; @@ -189,9 +182,7 @@ TEST_F(TestCudaBufferWriter, EdgeCases) { class TestCudaBufferReader : public TestCudaBufferBase { public: - void SetUp() { - TestCudaBufferBase::SetUp(); - } + void SetUp() { TestCudaBufferBase::SetUp(); } }; TEST_F(TestCudaBufferReader, Basics) { diff --git a/cpp/src/arrow/gpu/cuda_common.h b/cpp/src/arrow/gpu/cuda_common.h index 06579e54717..5a949a58806 100644 --- a/cpp/src/arrow/gpu/cuda_common.h +++ b/cpp/src/arrow/gpu/cuda_common.h @@ -34,7 +34,7 @@ namespace gpu { (void)ret; \ } while (0) -#define CU_RETURN_NOT_OK(STMT) \ +#define CU_RETURN_NOT_OK(STMT) \ do { \ CUresult ret = (STMT); \ if (ret != CUDA_SUCCESS) { \ diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index dd0b29b6d4b..68a45ec62f8 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -103,8 +103,7 @@ class CudaContext::CudaContextImpl { class CudaDeviceManager::CudaDeviceManagerImpl { public: - CudaDeviceManagerImpl() - : host_bytes_allocated_(0) {} + CudaDeviceManagerImpl() : host_bytes_allocated_(0) {} Status Init() { CU_RETURN_NOT_OK(cuInit(0)); @@ -165,9 +164,7 @@ class CudaDeviceManager::CudaDeviceManagerImpl { int host_bytes_allocated_; }; -CudaDeviceManager::CudaDeviceManager() { - impl_.reset(new CudaDeviceManagerImpl()); -} +CudaDeviceManager::CudaDeviceManager() { impl_.reset(new CudaDeviceManagerImpl()); } std::unique_ptr CudaDeviceManager::instance_ = nullptr; @@ -197,16 +194,12 @@ Status CudaDeviceManager::FreeHost(uint8_t* data, int64_t nbytes) { return impl_->FreeHost(data, nbytes); } -int CudaDeviceManager::num_devices() const { - return impl_->num_devices(); -} +int CudaDeviceManager::num_devices() const { return impl_->num_devices(); } // ---------------------------------------------------------------------- // CudaContext public API -CudaContext::CudaContext() { - impl_.reset(new CudaContextImpl()); -} +CudaContext::CudaContext() { impl_.reset(new CudaContextImpl()); } CudaContext::~CudaContext() {} diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index fe89aa5cb63..8db6aef19eb 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -40,8 +40,7 @@ CudaBuffer::~CudaBuffer() { CudaBuffer::CudaBuffer(const std::shared_ptr& parent, const int64_t offset, const int64_t size) - : Buffer(parent, offset, size), - context_(parent->context()) {} + : Buffer(parent, offset, size), context_(parent->context()) {} Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes, uint8_t* out) const { @@ -54,9 +53,9 @@ Status CudaBuffer::CopyFromHost(const int64_t position, const uint8_t* data, return context_->CopyHostToDevice(mutable_data_ + position, data, nbytes); } -Status AllocateCudaBuffer(const int64_t size, - const std::shared_ptr& context, +Status AllocateCudaBuffer(const int64_t size, const std::shared_ptr& context, std::shared_ptr* out) { + DCHECK(context); uint8_t* data = nullptr; RETURN_NOT_OK(context->Allocate(size, &data)); *out = std::make_shared(data, size, context); @@ -73,9 +72,7 @@ CudaHostBuffer::~CudaHostBuffer() { // CudaBufferReader CudaBufferReader::CudaBufferReader(const std::shared_ptr& buffer) - : io::BufferReader(buffer), - cuda_buffer_(buffer), - context_(buffer->context()) {} + : io::BufferReader(buffer), cuda_buffer_(buffer), context_(buffer->context()) {} CudaBufferReader::~CudaBufferReader() {} @@ -110,10 +107,8 @@ Status CudaBufferWriter::Close() { return Flush(); } Status CudaBufferWriter::Flush() { if (buffer_size_ > 0 && buffer_position_ > 0) { // Only need to flush when the write has been buffered - RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_ - - buffer_position_, - host_buffer_data_, - buffer_position_)); + RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_ - buffer_position_, + host_buffer_data_, buffer_position_)); buffer_position_ = 0; } return Status::OK(); @@ -139,8 +134,7 @@ Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) { if (nbytes + buffer_position_ >= buffer_size_) { // Reach end of buffer, write everything RETURN_NOT_OK(Flush()); - RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, - data, nbytes)); + RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes)); } else { // Write bytes to buffer std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes); @@ -148,8 +142,7 @@ Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) { } } else { // Unbuffered write - RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, - data, nbytes)); + RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes)); } position_ += nbytes; return Status::OK(); diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index da3124d08be..9c91ad67987 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -37,8 +37,7 @@ namespace gpu { /// Be careful using this in any Arrow code which may not be GPU-aware class ARROW_EXPORT CudaBuffer : public Buffer { public: - CudaBuffer(uint8_t* data, int64_t size, - const std::shared_ptr& context, + CudaBuffer(uint8_t* data, int64_t size, const std::shared_ptr& context, bool own_data = false) : Buffer(data, size), context_(context), own_data_(own_data) { is_mutable_ = true; @@ -151,8 +150,7 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter { /// \param[out] out the allocated buffer /// \return Status ARROW_EXPORT -Status AllocateCudaBuffer(const int64_t size, - const std::shared_ptr& context, +Status AllocateCudaBuffer(const int64_t size, const std::shared_ptr& context, std::shared_ptr* out); /// \brief Allocate CUDA-accessible memory on CPU host @@ -160,8 +158,7 @@ Status AllocateCudaBuffer(const int64_t size, /// \param[out] out the allocated buffer /// \return Status ARROW_EXPORT -Status AllocateCudaHostBuffer(const int64_t size, - std::shared_ptr* out); +Status AllocateCudaHostBuffer(const int64_t size, std::shared_ptr* out); } // namespace gpu } // namespace arrow From 84e452527cd735b84f475bf71a08ce51df23e7fb Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 27 Aug 2017 13:33:49 -0400 Subject: [PATCH 07/11] Add classes and methods for simplifying use of CUDA IPC machinery. No tests yet Change-Id: Ib46b646219e83c35828cf19a5e8d3bc8cc096f25 --- cpp/src/arrow/gpu/CMakeLists.txt | 3 +- cpp/src/arrow/gpu/cuda-benchmark.cc | 4 +- cpp/src/arrow/gpu/cuda-test.cc | 10 ++-- cpp/src/arrow/gpu/cuda_api.h | 2 + cpp/src/arrow/gpu/cuda_context.cc | 33 +++++++++++- cpp/src/arrow/gpu/cuda_context.h | 30 ++++++++--- cpp/src/arrow/gpu/cuda_memory.cc | 78 +++++++++++++++++++++++++---- cpp/src/arrow/gpu/cuda_memory.h | 63 ++++++++++++++++------- 8 files changed, 179 insertions(+), 44 deletions(-) diff --git a/cpp/src/arrow/gpu/CMakeLists.txt b/cpp/src/arrow/gpu/CMakeLists.txt index d213ff20234..7cbaa76c415 100644 --- a/cpp/src/arrow/gpu/CMakeLists.txt +++ b/cpp/src/arrow/gpu/CMakeLists.txt @@ -45,7 +45,8 @@ ADD_ARROW_LIB(arrow_gpu ) install(FILES - cuda_common.h + cuda_api.h + cuda_context.h cuda_memory.h DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/gpu") diff --git a/cpp/src/arrow/gpu/cuda-benchmark.cc b/cpp/src/arrow/gpu/cuda-benchmark.cc index 26395ea9bf1..805a044403c 100644 --- a/cpp/src/arrow/gpu/cuda-benchmark.cc +++ b/cpp/src/arrow/gpu/cuda-benchmark.cc @@ -25,7 +25,7 @@ #include "arrow/memory_pool.h" #include "arrow/test-util.h" -#include "arrow/gpu/cuda_memory.h" +#include "arrow/gpu/cuda_api.h" namespace arrow { namespace gpu { @@ -41,7 +41,7 @@ static void CudaBufferWriterBenchmark(benchmark::State& state, const int64_t tot ABORT_NOT_OK(manager->GetContext(kGpuNumber, &context)); std::shared_ptr device_buffer; - ABORT_NOT_OK(AllocateCudaBuffer(total_bytes, context, &device_buffer)); + ABORT_NOT_OK(context->Allocate(total_bytes, &device_buffer)); CudaBufferWriter writer(device_buffer); if (buffer_size > 0) { diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc index 6fed8d3a7e4..1aa4199d1b2 100644 --- a/cpp/src/arrow/gpu/cuda-test.cc +++ b/cpp/src/arrow/gpu/cuda-test.cc @@ -24,7 +24,7 @@ #include "arrow/status.h" #include "arrow/test-util.h" -#include "arrow/gpu/cuda_memory.h" +#include "arrow/gpu/cuda_api.h" namespace arrow { namespace gpu { @@ -51,7 +51,7 @@ class TestCudaBuffer : public TestCudaBufferBase { TEST_F(TestCudaBuffer, Allocate) { const int64_t kSize = 100; std::shared_ptr buffer; - ASSERT_OK(AllocateCudaBuffer(kSize, context_, &buffer)); + ASSERT_OK(context_->Allocate(kSize, &buffer)); ASSERT_EQ(kSize, buffer->size()); } @@ -66,7 +66,7 @@ void AssertCudaBufferEquals(const CudaBuffer& buffer, const uint8_t* host_data, TEST_F(TestCudaBuffer, CopyFromHost) { const int64_t kSize = 1000; std::shared_ptr device_buffer; - ASSERT_OK(AllocateCudaBuffer(kSize, context_, &device_buffer)); + ASSERT_OK(context_->Allocate(kSize, &device_buffer)); std::shared_ptr host_buffer; ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), &host_buffer)); @@ -82,7 +82,7 @@ class TestCudaBufferWriter : public TestCudaBufferBase { void SetUp() { TestCudaBufferBase::SetUp(); } void Allocate(const int64_t size) { - ASSERT_OK(AllocateCudaBuffer(size, context_, &device_buffer_)); + ASSERT_OK(context_->Allocate(size, &device_buffer_)); writer_.reset(new CudaBufferWriter(device_buffer_)); } @@ -189,7 +189,7 @@ TEST_F(TestCudaBufferReader, Basics) { std::shared_ptr device_buffer; const int64_t size = 1000; - ASSERT_OK(AllocateCudaBuffer(size, context_, &device_buffer)); + ASSERT_OK(context_->Allocate(size, &device_buffer)); std::shared_ptr buffer; ASSERT_OK(test::MakeRandomBytePoolBuffer(1000, default_memory_pool(), &buffer)); diff --git a/cpp/src/arrow/gpu/cuda_api.h b/cpp/src/arrow/gpu/cuda_api.h index a70e0af92af..b9f2ba3b752 100644 --- a/cpp/src/arrow/gpu/cuda_api.h +++ b/cpp/src/arrow/gpu/cuda_api.h @@ -1,3 +1,4 @@ +<<<<<<< HEAD // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -18,6 +19,7 @@ #ifndef ARROW_GPU_CUDA_API_H #define ARROW_GPU_CUDA_API_H +#include "arrow/gpu/cuda_context.h" #include "arrow/gpu/cuda_memory.h" #include "arrow/gpu/cuda_version.h" diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index 68a45ec62f8..feabfc5252a 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -88,6 +88,17 @@ class CudaContext::CudaContextImpl { return Status::OK(); } + Status OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle, uint8_t** out) { + CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); + auto handle = reinterpret_cast(ipc_handle.handle()); + + CUdeviceptr data; + CU_RETURN_NOT_OK( + cuIpcOpenMemHandle(&data, *handle, CU_IPC_MEM_LAZY_ENABLE_PEER_ACCESS)); + *out = reinterpret_cast(data); + return Status::OK(); + } + const CudaDevice device() const { return device_; } private: @@ -203,8 +214,11 @@ CudaContext::CudaContext() { impl_.reset(new CudaContextImpl()); } CudaContext::~CudaContext() {} -Status CudaContext::Allocate(int64_t nbytes, uint8_t** out) { - return impl_->Allocate(nbytes, out); +Status CudaContext::Allocate(int64_t nbytes, std::shared_ptr* out) { + uint8_t* data = nullptr; + RETURN_NOT_OK(impl_->Allocate(nbytes, &data)); + *out = std::make_shared(data, nbytes, this->shared_from_this(), true); + return Status::OK(); } Status CudaContext::CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes) { @@ -219,5 +233,20 @@ Status CudaContext::Free(uint8_t* device_ptr, int64_t nbytes) { return impl_->Free(device_ptr, nbytes); } +Status CudaContext::OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle, + std::shared_ptr* out) { + uint8_t* data = nullptr; + RETURN_NOT_OK(impl_->OpenIpcBuffer(ipc_handle, &data)); + + // Need to ask the device how big the buffer is + size_t allocation_size = 0; + CU_RETURN_NOT_OK(cuMemGetAddressRange(nullptr, &allocation_size, + reinterpret_cast(data))); + + *out = std::make_shared(data, allocation_size, this->shared_from_this(), + true, true); + return Status::OK(); +} + } // namespace gpu } // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h index c0626734674..c433976608c 100644 --- a/cpp/src/arrow/gpu/cuda_context.h +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -24,12 +24,11 @@ #include "arrow/status.h" #include "arrow/util/visibility.h" +#include "arrow/gpu/cuda_memory.h" + namespace arrow { namespace gpu { -class CudaBuffer; -class CudaHostBuffer; - // Forward declaration class CudaContext; @@ -41,6 +40,7 @@ class ARROW_EXPORT CudaDeviceManager { Status GetContext(int gpu_number, std::shared_ptr* ctx); Status AllocateHost(int64_t nbytes, std::shared_ptr* buffer); + Status FreeHost(uint8_t* data, int64_t nbytes); int num_devices() const; @@ -59,26 +59,40 @@ struct ARROW_EXPORT CudaDeviceInfo {}; /// \class CudaContext /// \brief Friendlier interface to the CUDA driver API -class ARROW_EXPORT CudaContext { +class ARROW_EXPORT CudaContext : public std::enable_shared_from_this { public: ~CudaContext(); Status Destroy(); - Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes); - Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes); + /// \brief Allocate CUDA memory on GPU device for this context + /// \param[in] nbytes number of bytes + /// \param[out] out the allocated buffer + /// \return Status + Status Allocate(int64_t nbytes, std::shared_ptr* out); - Status Allocate(int64_t nbytes, uint8_t** out); - Status Free(uint8_t* device_ptr, int64_t nbytes); + /// \brief Open existing CUDA IPC memory handle + /// \param[in] ipc_handle opaque pointer to CUipcMemHandle (driver API) + /// \param[out] buffer a CudaBuffer referencing + /// \return Status + Status OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle, + std::shared_ptr* buffer); int64_t bytes_allocated() const; private: CudaContext(); + Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes); + Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes); + Status Free(uint8_t* device_ptr, int64_t nbytes); + class CudaContextImpl; std::unique_ptr impl_; + friend CudaBuffer; + friend CudaBufferReader; + friend CudaBufferWriter; friend CudaDeviceManager::CudaDeviceManagerImpl; }; diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index 8db6aef19eb..7c6464a7bf4 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -19,8 +19,11 @@ #include #include +#include #include +#include + #include "arrow/buffer.h" #include "arrow/io/memory.h" #include "arrow/status.h" @@ -32,15 +35,69 @@ namespace arrow { namespace gpu { -CudaBuffer::~CudaBuffer() { +// ---------------------------------------------------------------------- +// CUDA IPC memory handle + +struct CudaIpcMemHandle::CudaIpcMemHandleImpl { + explicit CudaIpcMemHandleImpl(const void* handle) { + memcpy(&ipc_handle, handle, sizeof(CUipcMemHandle)); + } + + CUipcMemHandle ipc_handle; +}; + +CudaIpcMemHandle::CudaIpcMemHandle(const void* handle) { + impl_.reset(new CudaIpcMemHandleImpl(handle)); +} + +CudaIpcMemHandle::~CudaIpcMemHandle() {} + +Status CudaIpcMemHandle::FromBuffer(const void* opaque_handle, + std::unique_ptr* handle) { + *handle = std::unique_ptr(new CudaIpcMemHandle(opaque_handle)); + return Status::OK(); +} + +Status CudaIpcMemHandle::Serialize(MemoryPool* pool, std::shared_ptr* out) const { + std::shared_ptr buffer; + constexpr size_t kHandleSize = sizeof(CUipcMemHandle); + RETURN_NOT_OK(AllocateBuffer(pool, static_cast(kHandleSize), &buffer)); + memcpy(buffer->mutable_data(), &impl_->ipc_handle, kHandleSize); + *out = buffer; + return Status::OK(); +} + +const void* CudaIpcMemHandle::handle() const { return &impl_->ipc_handle; } + +// ---------------------------------------------------------------------- + +CudaBuffer::CudaBuffer(uint8_t* data, int64_t size, + const std::shared_ptr& context, bool own_data, + bool is_ipc) + : Buffer(data, size), context_(context), own_data_(own_data), is_ipc_(is_ipc) { + is_mutable_ = true; + mutable_data_ = data; +} + +CudaBuffer::~CudaBuffer() { DCHECK(Close().ok()); } + +Status CudaBuffer::Close() { if (own_data_) { - DCHECK(context_->Free(mutable_data_, size_).ok()); + if (is_ipc_) { + CU_RETURN_NOT_OK(cuIpcCloseMemHandle(reinterpret_cast(mutable_data_))); + } else { + return context_->Free(mutable_data_, size_); + } } + return Status::OK(); } CudaBuffer::CudaBuffer(const std::shared_ptr& parent, const int64_t offset, const int64_t size) - : Buffer(parent, offset, size), context_(parent->context()) {} + : Buffer(parent, offset, size), + context_(parent->context()), + own_data_(false), + is_ipc_(false) {} Status CudaBuffer::CopyToHost(const int64_t position, const int64_t nbytes, uint8_t* out) const { @@ -53,12 +110,15 @@ Status CudaBuffer::CopyFromHost(const int64_t position, const uint8_t* data, return context_->CopyHostToDevice(mutable_data_ + position, data, nbytes); } -Status AllocateCudaBuffer(const int64_t size, const std::shared_ptr& context, - std::shared_ptr* out) { - DCHECK(context); - uint8_t* data = nullptr; - RETURN_NOT_OK(context->Allocate(size, &data)); - *out = std::make_shared(data, size, context); +Status CudaBuffer::ExportForIpc(std::unique_ptr* handle) { + if (is_ipc_) { + return Status::Invalid("Buffer has already been exported for IPC"); + } + CUipcMemHandle cu_handle; + CU_RETURN_NOT_OK( + cuIpcGetMemHandle(&cu_handle, reinterpret_cast(mutable_data_))); + is_ipc_ = true; + *handle = std::unique_ptr(new CudaIpcMemHandle(&cu_handle)); return Status::OK(); } diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index 9c91ad67987..b9003540a61 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -26,11 +26,12 @@ #include "arrow/memory_pool.h" #include "arrow/status.h" -#include "arrow/gpu/cuda_context.h" - namespace arrow { namespace gpu { +class CudaContext; +class CudaIpcMemHandle; + /// \class CudaBuffer /// \brief An Arrow buffer located on a GPU device /// @@ -38,11 +39,7 @@ namespace gpu { class ARROW_EXPORT CudaBuffer : public Buffer { public: CudaBuffer(uint8_t* data, int64_t size, const std::shared_ptr& context, - bool own_data = false) - : Buffer(data, size), context_(context), own_data_(own_data) { - is_mutable_ = true; - mutable_data_ = data; - } + bool own_data = false, bool is_ipc = false); CudaBuffer(const std::shared_ptr& parent, const int64_t offset, const int64_t size); @@ -61,11 +58,22 @@ class ARROW_EXPORT CudaBuffer : public Buffer { /// \return Status Status CopyFromHost(const int64_t position, const uint8_t* data, int64_t nbytes); + /// \brief Expose this device buffer as IPC memory which can be used in other processes + /// \param[out] handle the exported IPC handle + /// \return Status + /// + /// \note After calling this function, this device memory will not be freed + /// when the CudaBuffer is destructed + virtual Status ExportForIpc(std::unique_ptr* handle); + std::shared_ptr context() const { return context_; } - private: + protected: std::shared_ptr context_; bool own_data_; + bool is_ipc_; + + virtual Status Close(); }; /// \class CudaHostBuffer @@ -76,6 +84,36 @@ class ARROW_EXPORT CudaHostBuffer : public MutableBuffer { ~CudaHostBuffer(); }; +/// \class CudaIpcHandle +/// \brief A container for a CUDA IPC handle +class ARROW_EXPORT CudaIpcMemHandle { + public: + ~CudaIpcMemHandle(); + + /// \brief Create CudaIpcMemHandle from opaque buffer (e.g. from another process) + /// \param[in] opaque_handle a CUipcMemHandle as a const void* + /// \param[out] handle the CudaIpcMemHandle instance + /// \return Status + static Status FromBuffer(const void* opaque_handle, + std::unique_ptr* handle); + + /// \brief Write CudaIpcMemHandle to a Buffer + /// \param[in] pool a MemoryPool to allocate memory from + /// \param[out] out the serialized buffer + /// \return Status + Status Serialize(MemoryPool* pool, std::shared_ptr* out) const; + + const void* handle() const; + + private: + explicit CudaIpcMemHandle(const void* handle); + + struct CudaIpcMemHandleImpl; + std::unique_ptr impl_; + + friend CudaBuffer; +}; + /// \class CudaBufferReader /// \brief File interface for zero-copy read from CUDA buffers /// @@ -144,15 +182,6 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter { uint8_t* host_buffer_data_; }; -/// \brief Allocate CUDA memory on a GPU device -/// \param[in] gpu_number Device number to allocate -/// \param[in] size number of bytes -/// \param[out] out the allocated buffer -/// \return Status -ARROW_EXPORT -Status AllocateCudaBuffer(const int64_t size, const std::shared_ptr& context, - std::shared_ptr* out); - /// \brief Allocate CUDA-accessible memory on CPU host /// \param[in] size number of bytes /// \param[out] out the allocated buffer From 591aceb0110dbf31efc519696b45825f8232eda0 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sun, 27 Aug 2017 19:20:03 -0400 Subject: [PATCH 08/11] Draft SerializeRecordBatch for CUDA Change-Id: I8dd313ac4e1cc0c01fdbe760bcae325a55ec8818 --- cpp/src/arrow/gpu/CMakeLists.txt | 2 + cpp/src/arrow/gpu/cuda-test.cc | 37 +++++++++++++++++++ cpp/src/arrow/gpu/cuda_api.h | 1 + .../gpu/{cuda_ipc.h => cuda_arrow_ipc.h} | 34 +++++++---------- cpp/src/arrow/gpu/cuda_common.h | 2 +- cpp/src/arrow/gpu/cuda_context.cc | 31 ++++++++++++++-- cpp/src/arrow/gpu/cuda_context.h | 10 ++++- cpp/src/arrow/gpu/cuda_memory.cc | 7 +--- cpp/src/arrow/gpu/cuda_memory.h | 5 ++- cpp/src/arrow/ipc/writer.cc | 13 +++++-- cpp/src/arrow/ipc/writer.h | 12 ++++++ 11 files changed, 117 insertions(+), 37 deletions(-) rename cpp/src/arrow/gpu/{cuda_ipc.h => cuda_arrow_ipc.h} (61%) diff --git a/cpp/src/arrow/gpu/CMakeLists.txt b/cpp/src/arrow/gpu/CMakeLists.txt index 7cbaa76c415..2db038f5814 100644 --- a/cpp/src/arrow/gpu/CMakeLists.txt +++ b/cpp/src/arrow/gpu/CMakeLists.txt @@ -27,6 +27,7 @@ find_package(CUDA REQUIRED) include_directories(SYSTEM ${CUDA_INCLUDE_DIRS}) set(ARROW_GPU_SRCS + cuda_arrow_ipc.cc cuda_context.cc cuda_memory.cc ) @@ -46,6 +47,7 @@ ADD_ARROW_LIB(arrow_gpu install(FILES cuda_api.h + cuda_arrow_ipc.h cuda_context.h cuda_memory.h DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/gpu") diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc index 1aa4199d1b2..3fe5fba7f47 100644 --- a/cpp/src/arrow/gpu/cuda-test.cc +++ b/cpp/src/arrow/gpu/cuda-test.cc @@ -77,6 +77,43 @@ TEST_F(TestCudaBuffer, CopyFromHost) { AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize); } +// IPC only supported on Linux +#if defined(__linux) + +TEST_F(TestCudaBuffer, DISABLED_ExportForIpc) { + // For this test to work, a second process needs to be spawned + const int64_t kSize = 1000; + std::shared_ptr device_buffer; + ASSERT_OK(context_->Allocate(kSize, &device_buffer)); + + std::shared_ptr host_buffer; + ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), &host_buffer)); + ASSERT_OK(device_buffer->CopyFromHost(0, host_buffer->data(), kSize)); + + // Export for IPC and serialize + std::unique_ptr ipc_handle; + ASSERT_OK(device_buffer->ExportForIpc(&ipc_handle)); + + std::shared_ptr serialized_handle; + ASSERT_OK(ipc_handle->Serialize(default_memory_pool(), &serialized_handle)); + + // Deserialize IPC handle and open + std::unique_ptr ipc_handle2; + ASSERT_OK(CudaIpcMemHandle::FromBuffer(serialized_handle->data(), &ipc_handle2)); + + std::shared_ptr ipc_buffer; + ASSERT_OK(context_->OpenIpcBuffer(*ipc_handle2, &ipc_buffer)); + + ASSERT_EQ(kSize, ipc_buffer->size()); + + std::shared_ptr ipc_data; + ASSERT_OK(AllocateBuffer(default_memory_pool(), kSize, &ipc_data)); + ASSERT_OK(ipc_buffer->CopyToHost(0, kSize, ipc_data->mutable_data())); + ASSERT_EQ(0, std::memcmp(ipc_buffer->data(), host_buffer->data(), kSize)); +} + +#endif + class TestCudaBufferWriter : public TestCudaBufferBase { public: void SetUp() { TestCudaBufferBase::SetUp(); } diff --git a/cpp/src/arrow/gpu/cuda_api.h b/cpp/src/arrow/gpu/cuda_api.h index b9f2ba3b752..b05f00f5835 100644 --- a/cpp/src/arrow/gpu/cuda_api.h +++ b/cpp/src/arrow/gpu/cuda_api.h @@ -19,6 +19,7 @@ #ifndef ARROW_GPU_CUDA_API_H #define ARROW_GPU_CUDA_API_H +#include "arrow/gpu/cuda_arrow_ipc.h" #include "arrow/gpu/cuda_context.h" #include "arrow/gpu/cuda_memory.h" #include "arrow/gpu/cuda_version.h" diff --git a/cpp/src/arrow/gpu/cuda_ipc.h b/cpp/src/arrow/gpu/cuda_arrow_ipc.h similarity index 61% rename from cpp/src/arrow/gpu/cuda_ipc.h rename to cpp/src/arrow/gpu/cuda_arrow_ipc.h index ccdc13eb379..37d3c948b28 100644 --- a/cpp/src/arrow/gpu/cuda_ipc.h +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.h @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -#ifndef ARROW_GPU_CUDA_MEMORY_H -#define ARROW_GPU_CUDA_MEMORY_H +#ifndef ARROW_GPU_CUDA_ARROW_IPC_H +#define ARROW_GPU_CUDA_ARROW_IPC_H #include #include @@ -25,31 +25,25 @@ #include "arrow/status.h" #include "arrow/util/visibility.h" -#include "arrow/cuda_memory.h" - namespace arrow { + +class RecordBatch; + namespace gpu { -/// \brief Write record batch message to GPU device memory -/// -/// -ARROW_EXPORT -SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, - std::shared_ptr* out); +class CudaBuffer; +class CudaContext; -/// \brief Write record batch to pre-allocated GPU device memory -/// -/// \param[in] batch the record batch to write -/// \param[in] out the CudaBufferWriter to write the output to +/// \brief Write record batch message to GPU device memory +/// \param[in] batch record batch to write +/// \param[in] ctx CudaContext to allocate device memory from +/// \param[out] out the returned device buffer which contains the record batch message /// \return Status -/// -/// The CudaBufferWriter must have enough pre-allocated space to accommodate -/// the record batch. You can use arrow::ipc::GetRecordBatchSize to compute -/// this ARROW_EXPORT -SerializeRecordBatch(const RecordBatch& batch, CudaBufferWriter* out); +Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, + std::shared_ptr* out); } // namespace gpu } // namespace arrow -#endif // ARROW_GPU_CUDA_MEMORY_H +#endif // ARROW_GPU_CUDA_ARROW_IPC_H diff --git a/cpp/src/arrow/gpu/cuda_common.h b/cpp/src/arrow/gpu/cuda_common.h index 5a949a58806..c06c1a21ff4 100644 --- a/cpp/src/arrow/gpu/cuda_common.h +++ b/cpp/src/arrow/gpu/cuda_common.h @@ -40,7 +40,7 @@ namespace gpu { if (ret != CUDA_SUCCESS) { \ std::stringstream ss; \ ss << "Cuda Driver API call in " << __FILE__ << " at line " << __LINE__ \ - << " failed: " << #STMT; \ + << " failed with code " << ret << ": " << #STMT; \ return Status::IOError(ss.str()); \ } \ } while (0) diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index feabfc5252a..430ecab6fbe 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -88,6 +88,14 @@ class CudaContext::CudaContextImpl { return Status::OK(); } + Status ExportIpcBuffer(uint8_t* data, std::unique_ptr* handle) { + CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); + CUipcMemHandle cu_handle; + CU_RETURN_NOT_OK(cuIpcGetMemHandle(&cu_handle, reinterpret_cast(data))); + *handle = std::unique_ptr(new CudaIpcMemHandle(&cu_handle)); + return Status::OK(); + } + Status OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle, uint8_t** out) { CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); auto handle = reinterpret_cast(ipc_handle.handle()); @@ -151,12 +159,17 @@ class CudaDeviceManager::CudaDeviceManagerImpl { return Status::OK(); } + Status CreateNewContext(int device_number, std::shared_ptr* out) { + *out = std::shared_ptr(new CudaContext()); + return (*out)->impl_->Init(devices_[device_number]); + } + Status GetContext(int device_number, std::shared_ptr* out) { auto it = contexts_.find(device_number); if (it == contexts_.end()) { - auto ctx = std::shared_ptr(new CudaContext()); - RETURN_NOT_OK(ctx->impl_->Init(devices_[device_number])); - contexts_[device_number] = *out = ctx; + std::shared_ptr new_context; + RETURN_NOT_OK(CreateNewContext(device_number, &new_context)); + contexts_[device_number] = *out = new_context; } else { *out = it->second; } @@ -193,6 +206,11 @@ Status CudaDeviceManager::GetContext(int device_number, return impl_->GetContext(device_number, out); } +Status CudaDeviceManager::CreateNewContext(int device_number, + std::shared_ptr* out) { + return impl_->CreateNewContext(device_number, out); +} + Status CudaDeviceManager::AllocateHost(int64_t nbytes, std::shared_ptr* out) { uint8_t* data = nullptr; @@ -221,6 +239,11 @@ Status CudaContext::Allocate(int64_t nbytes, std::shared_ptr* out) { return Status::OK(); } +Status CudaContext::ExportIpcBuffer(uint8_t* data, + std::unique_ptr* handle) { + return impl_->ExportIpcBuffer(data, handle); +} + Status CudaContext::CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes) { return impl_->CopyHostToDevice(dst, src, nbytes); } @@ -229,6 +252,8 @@ Status CudaContext::CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t n return impl_->CopyDeviceToHost(dst, src, nbytes); } +Status CudaContext::Close() { return impl_->Close(); } + Status CudaContext::Free(uint8_t* device_ptr, int64_t nbytes) { return impl_->Free(device_ptr, nbytes); } diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h index c433976608c..64710596123 100644 --- a/cpp/src/arrow/gpu/cuda_context.h +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -36,9 +36,14 @@ class ARROW_EXPORT CudaDeviceManager { public: static Status GetInstance(CudaDeviceManager** manager); - /// \brief Get the CUDA driver context for a particular device + /// \brief Get the shared CUDA driver context for a particular device Status GetContext(int gpu_number, std::shared_ptr* ctx); + /// \brief Create a new context for a given device number + /// + /// In general code will use GetContext + Status CreateNewContext(int gpu_number, std::shared_ptr* ctx); + Status AllocateHost(int64_t nbytes, std::shared_ptr* buffer); Status FreeHost(uint8_t* data, int64_t nbytes); @@ -63,7 +68,7 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this* handle); Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes); Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes); Status Free(uint8_t* device_ptr, int64_t nbytes); diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index 7c6464a7bf4..3c88fe2d59f 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -114,11 +114,8 @@ Status CudaBuffer::ExportForIpc(std::unique_ptr* handle) { if (is_ipc_) { return Status::Invalid("Buffer has already been exported for IPC"); } - CUipcMemHandle cu_handle; - CU_RETURN_NOT_OK( - cuIpcGetMemHandle(&cu_handle, reinterpret_cast(mutable_data_))); - is_ipc_ = true; - *handle = std::unique_ptr(new CudaIpcMemHandle(&cu_handle)); + RETURN_NOT_OK(context_->ExportIpcBuffer(mutable_data_, handle)); + own_data_ = false; return Status::OK(); } diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index b9003540a61..d5407371f35 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -103,15 +103,16 @@ class ARROW_EXPORT CudaIpcMemHandle { /// \return Status Status Serialize(MemoryPool* pool, std::shared_ptr* out) const; - const void* handle() const; - private: explicit CudaIpcMemHandle(const void* handle); struct CudaIpcMemHandleImpl; std::unique_ptr impl_; + const void* handle() const; + friend CudaBuffer; + friend CudaContext; }; /// \class CudaBufferReader diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 9c05cba918d..e17b974adfc 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -901,14 +901,19 @@ Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, RETURN_NOT_OK(AllocateBuffer(pool, size, &buffer)); io::FixedSizeBufferWriter stream(buffer); - int32_t metadata_length = 0; - int64_t body_length = 0; - RETURN_NOT_OK(WriteRecordBatch(batch, 0, &stream, &metadata_length, &body_length, pool, - kMaxNestingDepth, true)); + RETURN_NOT_OK(SerializeRecordBatch(batch, pool, &stream)); *out = buffer; return Status::OK(); } +Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, + io::OutputStream* out) { + int32_t metadata_length = 0; + int64_t body_length = 0; + return WriteRecordBatch(batch, 0, out, &metadata_length, &body_length, pool, + kMaxNestingDepth, true); +} + Status SerializeSchema(const Schema& schema, MemoryPool* pool, std::shared_ptr* out) { std::shared_ptr stream; diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index d867982d2be..3f110fe26fb 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -177,6 +177,18 @@ ARROW_EXPORT Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, std::shared_ptr* out); +/// \brief Write record batch to OutputStream +/// +/// \param[in] batch the record batch to write +/// \param[in] out the OutputStream to write the output to +/// \return Status +/// +/// If writing to pre-allocated memory, you can use +/// arrow::ipc::GetRecordBatchSize to compute how much space is required +ARROW_EXPORT +Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, + io::OutputStream* out); + /// \brief Serialize schema using stream writer as a sequence of one or more /// IPC messages /// From 16d628f736044c22c3d29024050addc0036091aa Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 28 Aug 2017 09:32:08 -0400 Subject: [PATCH 09/11] More Arrow IPC scaffolding Change-Id: I9eaf54a1a058a18f17251816ec22e5e4e3a260da --- cpp/src/arrow/gpu/cuda-test.cc | 38 +++++++++++ cpp/src/arrow/gpu/cuda_arrow_ipc.cc | 98 +++++++++++++++++++++++++++++ cpp/src/arrow/gpu/cuda_arrow_ipc.h | 20 ++++++ 3 files changed, 156 insertions(+) create mode 100644 cpp/src/arrow/gpu/cuda_arrow_ipc.cc diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc index 3fe5fba7f47..9045e2fa427 100644 --- a/cpp/src/arrow/gpu/cuda-test.cc +++ b/cpp/src/arrow/gpu/cuda-test.cc @@ -22,6 +22,8 @@ #include "gtest/gtest.h" #include "arrow/status.h" +#include "arrow/ipc/test-common.h" +#include "arrow/ipc/api.h" #include "arrow/test-util.h" #include "arrow/gpu/cuda_api.h" @@ -262,5 +264,41 @@ TEST_F(TestCudaBufferReader, Basics) { ASSERT_EQ(0, std::memcmp(stack_buffer, host_data + 925, 75)); } +class TestCudaArrowIpc : public TestCudaBufferBase { + public: + void SetUp() { + TestCudaBufferBase::SetUp(); + pool_ = default_memory_pool(); + } + + protected: + MemoryPool* pool_; +}; + +TEST_F(TestCudaArrowIpc, BasicWriteRead) { + std::shared_ptr batch; + ASSERT_OK(ipc::MakeIntRecordBatch(&batch)); + + std::shared_ptr device_serialized; + ASSERT_OK(arrow::gpu::SerializeRecordBatch(*batch, context_.get(), + &device_serialized)); + + // Test that ReadRecordBatch works properly + std::shared_ptr device_batch; + ASSERT_OK(ReadRecordBatch(batch->schema(), device_serialized, &device_batch)); + + // Copy data from device, read batch, and compare + std::shared_ptr host_buffer; + int64_t size = device_serialized->size(); + ASSERT_OK(AllocateBuffer(pool_, size, &host_buffer)); + ASSERT_OK(device_serialized->CopyToHost(0, size, host_buffer->mutable_data())); + + std::shared_ptr cpu_batch; + io::BufferReader cpu_reader(host_buffer); + ASSERT_OK(ipc::ReadRecordBatch(batch->schema(), &cpu_reader, &cpu_batch)); + + ipc::CompareBatch(*batch, *cpu_batch); +} + } // namespace gpu } // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc new file mode 100644 index 00000000000..42f6d16d19e --- /dev/null +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "arrow/buffer.h" +#include "arrow/ipc/message.h" +#include "arrow/ipc/writer.h" +#include "arrow/status.h" +#include "arrow/table.h" +#include "arrow/util/visibility.h" + +#include "arrow/gpu/cuda_context.h" +#include "arrow/gpu/cuda_memory.h" + +namespace arrow { +namespace gpu { + +Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, + std::shared_ptr* out) { + int64_t size = 0; + RETURN_NOT_OK(ipc::GetRecordBatchSize(batch, &size)); + + std::shared_ptr buffer; + RETURN_NOT_OK(ctx->Allocate(size, &buffer)); + + CudaBufferWriter stream(buffer); + + // Use 8MB buffering, which yields generally good performance + RETURN_NOT_OK(stream.SetBufferSize(1 << 23)); + + // We use the default memory pool here since any allocations are ephemeral + RETURN_NOT_OK(ipc::SerializeRecordBatch(batch, default_memory_pool(), + &stream)); + *out = buffer; + return Status::OK(); +} + +Status ReadMessage(CudaBufferReader* stream, MemoryPool* pool, + std::unique_ptr* message) { + uint8_t length_buf[4] = {0}; + + int64_t bytes_read = 0; + RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read, length_buf)); + if (bytes_read != sizeof(int32_t)) { + *message = nullptr; + return Status::OK(); + } + + const int32_t metadata_length = *reinterpret_cast(length_buf); + + if (metadata_length == 0) { + // Optional 0 EOS control message + *message = nullptr; + return Status::OK(); + } + + std::shared_ptr metadata; + RETURN_NOT_OK(AllocateBuffer(pool, metadata_length, &metadata)); + RETURN_NOT_OK(file->Read(message_length, &bytes_read, metadata->mutable_data())); + if (bytes_read != metadata_length) { + return Status::IOError("Unexpected end of stream trying to read message"); + } + + auto fb_message = flatbuf::GetMessage(metadata->data()); + + int64_t body_length = fb_message->bodyLength(); + + // Zero copy + std::shared_ptr body; + RETURN_NOT_OK(stream->Read(body_length, &body)); + if (body->size() < body_length) { + std::stringstream ss; + ss << "Expected to be able to read " << body_length << " bytes for message body, got " + << body->size(); + return Status::IOError(ss.str()); + } + + return Message::Open(metadata, body, message); +} + +} // namespace gpu +} // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.h b/cpp/src/arrow/gpu/cuda_arrow_ipc.h index 37d3c948b28..f86cff51889 100644 --- a/cpp/src/arrow/gpu/cuda_arrow_ipc.h +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.h @@ -43,6 +43,26 @@ ARROW_EXPORT Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, std::shared_ptr* out); +/// \brief Read Arrow IPC message located on GPU device +/// \param[in] stream a CudaBufferReader +/// \param[in] pool a MemoryPool to allocate CPU memory for the metadata +/// \param[out] message the deserialized message, body still on device +/// +/// This function reads the message metadata into host memory, but leaves the +/// message body on the device +ARROW_EXPORT +Status ReadMessage(io::CudaBufferReader* stream, MemoryPool* pool, + std::unique_ptr* message); + +/// \brief ReadRecordBatch specialized to handle metadata on CUDA device +/// \param[in] schema +/// \param[in] buffer +/// \param[out] out the reconstructed RecordBatch, with device pointers +ARROW_EXPORT +Status ReadRecordBatch(const std::shared_ptr& schema + const std::shared_ptr& buffer, + std::shared_ptr* out); + } // namespace gpu } // namespace arrow From a8812afc28961bf1377a8645eb2b74f08019af7b Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 28 Aug 2017 11:06:51 -0400 Subject: [PATCH 10/11] Complete basic IPC message and record batch reads on GPU device memory Change-Id: I438d03b64f713e24299cc107b90f36a69da59f25 --- cpp/CMakeLists.txt | 10 +++-- cpp/src/arrow/gpu/.gitignore | 18 ++++++++ cpp/src/arrow/gpu/CMakeLists.txt | 19 +++++---- cpp/src/arrow/gpu/cuda-test.cc | 10 ++--- cpp/src/arrow/gpu/cuda_api.h | 3 +- cpp/src/arrow/gpu/cuda_arrow_ipc.cc | 66 +++++++++++++++++------------ cpp/src/arrow/gpu/cuda_arrow_ipc.h | 28 +++++++----- cpp/src/arrow/ipc/message.cc | 60 ++++++++++++-------------- cpp/src/arrow/ipc/message.h | 9 ++++ python/pyarrow/plasma.pyx | 14 +++--- python/pyarrow/tests/test_plasma.py | 25 +++++------ 11 files changed, 155 insertions(+), 107 deletions(-) create mode 100644 cpp/src/arrow/gpu/.gitignore diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 5d643336c05..cb7aa3a57b9 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -712,15 +712,17 @@ endif() add_subdirectory(src/arrow) add_subdirectory(src/arrow/io) +if (ARROW_GPU) + # IPC extensions required to build the GPU library + set(ARROW_IPC ON) + add_subdirectory(src/arrow/gpu) +endif() + if (ARROW_IPC) add_subdirectory(src/arrow/ipc) add_dependencies(arrow_dependencies metadata_fbs) endif() -if (ARROW_GPU) - add_subdirectory(src/arrow/gpu) -endif() - set(ARROW_SRCS src/arrow/array.cc src/arrow/buffer.cc diff --git a/cpp/src/arrow/gpu/.gitignore b/cpp/src/arrow/gpu/.gitignore new file mode 100644 index 00000000000..095e8bc16ff --- /dev/null +++ b/cpp/src/arrow/gpu/.gitignore @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +cuda_version.h \ No newline at end of file diff --git a/cpp/src/arrow/gpu/CMakeLists.txt b/cpp/src/arrow/gpu/CMakeLists.txt index 2db038f5814..176916e2822 100644 --- a/cpp/src/arrow/gpu/CMakeLists.txt +++ b/cpp/src/arrow/gpu/CMakeLists.txt @@ -45,6 +45,15 @@ ADD_ARROW_LIB(arrow_gpu STATIC_LINK_LIBS "" ) +# CUDA build version +configure_file(cuda_version.h.in + "${CMAKE_CURRENT_SOURCE_DIR}/cuda_version.h" + @ONLY) + +install(FILES + "${CMAKE_CURRENT_SOURCE_DIR}/cuda_version.h" + DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/gpu") + install(FILES cuda_api.h cuda_arrow_ipc.h @@ -56,19 +65,11 @@ install(FILES configure_file(arrow-gpu.pc.in "${CMAKE_CURRENT_BINARY_DIR}/arrow-gpu.pc" @ONLY) + install( FILES "${CMAKE_CURRENT_BINARY_DIR}/arrow-gpu.pc" DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/") -# CUDA build version -configure_file(cuda_version.h.in - "${CMAKE_CURRENT_BINARY_DIR}/cuda_version.h" - @ONLY) - -install(FILES - "${CMAKE_CURRENT_BINARY_DIR}/cuda_version.h" - DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/gpu") - set(ARROW_GPU_TEST_LINK_LIBS arrow_gpu_shared ${ARROW_TEST_LINK_LIBS}) diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc index 9045e2fa427..aa9d3efd2ac 100644 --- a/cpp/src/arrow/gpu/cuda-test.cc +++ b/cpp/src/arrow/gpu/cuda-test.cc @@ -21,9 +21,9 @@ #include "gtest/gtest.h" -#include "arrow/status.h" -#include "arrow/ipc/test-common.h" #include "arrow/ipc/api.h" +#include "arrow/ipc/test-common.h" +#include "arrow/status.h" #include "arrow/test-util.h" #include "arrow/gpu/cuda_api.h" @@ -280,12 +280,12 @@ TEST_F(TestCudaArrowIpc, BasicWriteRead) { ASSERT_OK(ipc::MakeIntRecordBatch(&batch)); std::shared_ptr device_serialized; - ASSERT_OK(arrow::gpu::SerializeRecordBatch(*batch, context_.get(), - &device_serialized)); + ASSERT_OK(arrow::gpu::SerializeRecordBatch(*batch, context_.get(), &device_serialized)); // Test that ReadRecordBatch works properly std::shared_ptr device_batch; - ASSERT_OK(ReadRecordBatch(batch->schema(), device_serialized, &device_batch)); + ASSERT_OK(ReadRecordBatch(batch->schema(), device_serialized, default_memory_pool(), + &device_batch)); // Copy data from device, read batch, and compare std::shared_ptr host_buffer; diff --git a/cpp/src/arrow/gpu/cuda_api.h b/cpp/src/arrow/gpu/cuda_api.h index b05f00f5835..c63b77e8721 100644 --- a/cpp/src/arrow/gpu/cuda_api.h +++ b/cpp/src/arrow/gpu/cuda_api.h @@ -1,4 +1,3 @@ -<<<<<<< HEAD // Licensed to the Apache Software Foundation (ASF) under one // or more contributor license agreements. See the NOTICE file // distributed with this work for additional information @@ -24,4 +23,4 @@ #include "arrow/gpu/cuda_memory.h" #include "arrow/gpu/cuda_version.h" -#endif // ARROW_GPU_CUDA_API_H +#endif // ARROW_GPU_CUDA_API_H diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc index 42f6d16d19e..669857d9203 100644 --- a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc @@ -15,11 +15,17 @@ // specific language governing permissions and limitations // under the License. +#include "arrow/gpu/cuda_arrow_ipc.h" + #include #include +#include +#include #include "arrow/buffer.h" +#include "arrow/ipc/Message_generated.h" #include "arrow/ipc/message.h" +#include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" #include "arrow/status.h" #include "arrow/table.h" @@ -29,6 +35,9 @@ #include "arrow/gpu/cuda_memory.h" namespace arrow { + +namespace flatbuf = org::apache::arrow::flatbuf; + namespace gpu { Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, @@ -45,53 +54,56 @@ Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, RETURN_NOT_OK(stream.SetBufferSize(1 << 23)); // We use the default memory pool here since any allocations are ephemeral - RETURN_NOT_OK(ipc::SerializeRecordBatch(batch, default_memory_pool(), - &stream)); + RETURN_NOT_OK(ipc::SerializeRecordBatch(batch, default_memory_pool(), &stream)); + RETURN_NOT_OK(stream.Close()); *out = buffer; return Status::OK(); } -Status ReadMessage(CudaBufferReader* stream, MemoryPool* pool, - std::unique_ptr* message) { - uint8_t length_buf[4] = {0}; - +Status ReadMessage(CudaBufferReader* reader, MemoryPool* pool, + std::unique_ptr* out) { + int32_t message_length = 0; int64_t bytes_read = 0; - RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read, length_buf)); + + RETURN_NOT_OK(reader->Read(sizeof(int32_t), &bytes_read, + reinterpret_cast(&message_length))); if (bytes_read != sizeof(int32_t)) { - *message = nullptr; + *out = nullptr; return Status::OK(); } - const int32_t metadata_length = *reinterpret_cast(length_buf); - - if (metadata_length == 0) { + if (message_length == 0) { // Optional 0 EOS control message - *message = nullptr; + *out = nullptr; return Status::OK(); } std::shared_ptr metadata; - RETURN_NOT_OK(AllocateBuffer(pool, metadata_length, &metadata)); - RETURN_NOT_OK(file->Read(message_length, &bytes_read, metadata->mutable_data())); - if (bytes_read != metadata_length) { - return Status::IOError("Unexpected end of stream trying to read message"); + RETURN_NOT_OK(AllocateBuffer(pool, message_length, &metadata)); + RETURN_NOT_OK(reader->Read(message_length, &bytes_read, metadata->mutable_data())); + if (bytes_read != message_length) { + std::stringstream ss; + ss << "Expected " << message_length << " metadata bytes, but only got " << bytes_read; + return Status::IOError(ss.str()); } - auto fb_message = flatbuf::GetMessage(metadata->data()); + return ipc::Message::ReadFrom(metadata, reader, out); +} + +Status ReadRecordBatch(const std::shared_ptr& schema, + const std::shared_ptr& buffer, MemoryPool* pool, + std::shared_ptr* out) { + CudaBufferReader cuda_reader(buffer); - int64_t body_length = fb_message->bodyLength(); + std::unique_ptr message; + RETURN_NOT_OK(ReadMessage(&cuda_reader, pool, &message)); - // Zero copy - std::shared_ptr body; - RETURN_NOT_OK(stream->Read(body_length, &body)); - if (body->size() < body_length) { - std::stringstream ss; - ss << "Expected to be able to read " << body_length << " bytes for message body, got " - << body->size(); - return Status::IOError(ss.str()); + if (!message) { + return Status::Invalid("Message is length 0"); } - return Message::Open(metadata, body, message); + // Zero-copy read on device memory + return ipc::ReadRecordBatch(*message, schema, out); } } // namespace gpu diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.h b/cpp/src/arrow/gpu/cuda_arrow_ipc.h index f86cff51889..52dd92473ea 100644 --- a/cpp/src/arrow/gpu/cuda_arrow_ipc.h +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.h @@ -25,14 +25,21 @@ #include "arrow/status.h" #include "arrow/util/visibility.h" +#include "arrow/gpu/cuda_memory.h" + namespace arrow { +class MemoryPool; class RecordBatch; +class Schema; -namespace gpu { +namespace ipc { + +class Message; -class CudaBuffer; -class CudaContext; +} // namespace ipc + +namespace gpu { /// \brief Write record batch message to GPU device memory /// \param[in] batch record batch to write @@ -44,23 +51,24 @@ Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, std::shared_ptr* out); /// \brief Read Arrow IPC message located on GPU device -/// \param[in] stream a CudaBufferReader +/// \param[in] reader a CudaBufferReader /// \param[in] pool a MemoryPool to allocate CPU memory for the metadata /// \param[out] message the deserialized message, body still on device /// /// This function reads the message metadata into host memory, but leaves the /// message body on the device ARROW_EXPORT -Status ReadMessage(io::CudaBufferReader* stream, MemoryPool* pool, - std::unique_ptr* message); +Status ReadMessage(CudaBufferReader* reader, MemoryPool* pool, + std::unique_ptr* message); /// \brief ReadRecordBatch specialized to handle metadata on CUDA device -/// \param[in] schema -/// \param[in] buffer +/// \param[in] schema the Schema for the record batch +/// \param[in] buffer a CudaBuffer containing the complete IPC message +/// \param[in] pool a MemoryPool to use for allocating space for the metadata /// \param[out] out the reconstructed RecordBatch, with device pointers ARROW_EXPORT -Status ReadRecordBatch(const std::shared_ptr& schema - const std::shared_ptr& buffer, +Status ReadRecordBatch(const std::shared_ptr& schema, + const std::shared_ptr& buffer, MemoryPool* pool, std::shared_ptr* out); } // namespace gpu diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index eb06aaf2fc6..53f0203f080 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -149,6 +149,24 @@ bool Message::Equals(const Message& other) const { } } +Status Message::ReadFrom(const std::shared_ptr& metadata, io::InputStream* stream, + std::unique_ptr* out) { + auto fb_message = flatbuf::GetMessage(metadata->data()); + + int64_t body_length = fb_message->bodyLength(); + + std::shared_ptr body; + RETURN_NOT_OK(stream->Read(body_length, &body)); + if (body->size() < body_length) { + std::stringstream ss; + ss << "Expected to be able to read " << body_length << " bytes for message body, got " + << body->size(); + return Status::IOError(ss.str()); + } + + return Message::Open(metadata, body, out); +} + Status Message::SerializeTo(io::OutputStream* file, int64_t* output_length) const { int32_t metadata_length = 0; RETURN_NOT_OK(WriteMessage(*metadata(), file, &metadata_length)); @@ -178,29 +196,6 @@ std::string FormatMessageType(Message::Type type) { return "unknown"; } -// ---------------------------------------------------------------------- -// Read and write messages - -static Status ReadFullMessage(const std::shared_ptr& metadata, - io::InputStream* stream, - std::unique_ptr* message) { - auto fb_message = flatbuf::GetMessage(metadata->data()); - - int64_t body_length = fb_message->bodyLength(); - - std::shared_ptr body; - RETURN_NOT_OK(stream->Read(body_length, &body)); - - if (body->size() < body_length) { - std::stringstream ss; - ss << "Expected to be able to read " << body_length << " bytes for message body, got " - << body->size(); - return Status::IOError(ss.str()); - } - - return Message::Open(metadata, body, message); -} - Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file, std::unique_ptr* message) { std::shared_ptr buffer; @@ -216,32 +211,33 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile } auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4); - return ReadFullMessage(metadata, file, message); + return Message::ReadFrom(metadata, file, message); } Status ReadMessage(io::InputStream* file, std::unique_ptr* message) { - std::shared_ptr buffer; + int32_t message_length = 0; + int64_t bytes_read = 0; + RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read, + reinterpret_cast(&message_length))); - RETURN_NOT_OK(file->Read(sizeof(int32_t), &buffer)); - if (buffer->size() != sizeof(int32_t)) { + if (bytes_read != sizeof(int32_t)) { *message = nullptr; return Status::OK(); } - int32_t message_length = *reinterpret_cast(buffer->data()); - if (message_length == 0) { // Optional 0 EOS control message *message = nullptr; return Status::OK(); } - RETURN_NOT_OK(file->Read(message_length, &buffer)); - if (buffer->size() != message_length) { + std::shared_ptr metadata; + RETURN_NOT_OK(file->Read(message_length, &metadata)); + if (metadata->size() != message_length) { return Status::IOError("Unexpected end of stream trying to read message"); } - return ReadFullMessage(buffer, file, message); + return Message::ReadFrom(metadata, file, message); } // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index dce4e27fb2d..dbc50d84490 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -79,6 +79,15 @@ class ARROW_EXPORT Message { static Status Open(const std::shared_ptr& metadata, const std::shared_ptr& body, std::unique_ptr* out); + /// \brief Read message body and create Message given Flatbuffer metadata + /// \param[in] metadata containing a serialized Message flatbuffer + /// \param[in] stream an InputStream + /// \param[out] out the created Message + /// + /// \note If stream supports zero-copy, this is zero-copy + static Status ReadFrom(const std::shared_ptr& metadata, io::InputStream* stream, + std::unique_ptr* out); + /// \brief Write length-prefixed metadata and body to output stream /// /// \param[in] file output stream to write to diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/plasma.pyx index aebef1b8812..515b600feec 100644 --- a/python/pyarrow/plasma.pyx +++ b/python/pyarrow/plasma.pyx @@ -386,7 +386,8 @@ cdef class PlasmaClient: ------- The object ID associated to the Python object. """ - cdef ObjectID target_id = object_id if object_id else ObjectID.from_random() + cdef ObjectID target_id = (object_id if object_id + else ObjectID.from_random()) # TODO(pcm): Make serialization code support non-sequences and # get rid of packing the value into a list here (and unpacking in get) serialized = pyarrow.serialize([value]) @@ -404,8 +405,8 @@ cdef class PlasmaClient: Parameters ---------- object_ids : list or ObjectID - Object ID or list of object IDs associated to the values we get from - the store. + Object ID or list of object IDs associated to the values we get + from the store. timeout_ms : int, default -1 The number of milliseconds that the get call should block before timing out and returning. Pass -1 if the call should block and 0 @@ -415,14 +416,15 @@ cdef class PlasmaClient: ------- list or object Python value or list of Python values for the data associated with - the object_ids and ObjectNotAvailable if the object was not available. + the object_ids and ObjectNotAvailable if the object was not + available. """ if isinstance(object_ids, collections.Sequence): results = [] buffers = self.get_buffers(object_ids, timeout_ms) for i in range(len(object_ids)): - # buffers[i] is None if this object was not available within the - # timeout + # buffers[i] is None if this object was not available within + # the timeout if buffers[i]: value, = pyarrow.deserialize(buffers[i]) results.append(value) diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py index d729c1ef2d2..a831ef29a5e 100644 --- a/python/pyarrow/tests/test_plasma.py +++ b/python/pyarrow/tests/test_plasma.py @@ -187,8 +187,8 @@ def test_create(self): # Seal the object. self.plasma_client.seal(object_id) # Get the object. - memory_buffer = np.frombuffer(self.plasma_client.get_buffers([object_id])[0], - dtype="uint8") + memory_buffer = np.frombuffer( + self.plasma_client.get_buffers([object_id])[0], dtype="uint8") for i in range(length): assert memory_buffer[i] == i % 256 @@ -241,7 +241,8 @@ def test_get(self): # Test timing out of get with various timeouts. for timeout in [0, 10, 100, 1000]: object_ids = [random_object_id() for _ in range(num_object_ids)] - results = self.plasma_client.get_buffers(object_ids, timeout_ms=timeout) + results = self.plasma_client.get_buffers(object_ids, + timeout_ms=timeout) assert results == num_object_ids * [None] data_buffers = [] @@ -257,7 +258,7 @@ def test_get(self): # timeouts. for timeout in [0, 10, 100, 1000]: data_results = self.plasma_client.get_buffers(object_ids, - timeout_ms=timeout) + timeout_ms=timeout) # metadata_results = self.plasma_client.get_metadata( # object_ids, timeout_ms=timeout) for i in range(num_object_ids): @@ -275,16 +276,16 @@ def test_get(self): def test_put_and_get(self): for value in [["hello", "world", 3, 1.0], None, "hello"]: - object_id = self.plasma_client.put(value) - [result] = self.plasma_client.get([object_id]) - assert result == value + object_id = self.plasma_client.put(value) + [result] = self.plasma_client.get([object_id]) + assert result == value - result = self.plasma_client.get(object_id) - assert result == value + result = self.plasma_client.get(object_id) + assert result == value - object_id = pa.plasma.ObjectID.from_random() - [result] = self.plasma_client.get([object_id], timeout_ms=0) - assert result == pa.plasma.ObjectNotAvailable + object_id = pa.plasma.ObjectID.from_random() + [result] = self.plasma_client.get([object_id], timeout_ms=0) + assert result == pa.plasma.ObjectNotAvailable def test_store_arrow_objects(self): data = np.random.randn(10, 4) From e436755b775b29816b6947d33f17cd272b125967 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 28 Aug 2017 14:45:43 -0400 Subject: [PATCH 11/11] Add newline at end of file Change-Id: I4b47942c3621d67cfeacb0b14502b89d8ba318cc --- cpp/src/arrow/gpu/.gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/gpu/.gitignore b/cpp/src/arrow/gpu/.gitignore index 095e8bc16ff..0ef3f98c58c 100644 --- a/cpp/src/arrow/gpu/.gitignore +++ b/cpp/src/arrow/gpu/.gitignore @@ -15,4 +15,4 @@ # specific language governing permissions and limitations # under the License. -cuda_version.h \ No newline at end of file +cuda_version.h