Skip to content
Merged
22 changes: 20 additions & 2 deletions cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cstdint>
#include <cstring>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <utility>
Expand Down Expand Up @@ -57,18 +58,31 @@ class ARROW_EXPORT Buffer {
///
/// \note The passed memory must be kept alive through some other means
Buffer(const uint8_t* data, int64_t size)
: is_mutable_(false), is_cpu_(true), data_(data), size_(size), capacity_(size) {
: is_mutable_(false),
is_cpu_(true),
data_(data),
size_(size),
capacity_(size),
device_type_(DeviceAllocationType::kCPU) {
SetMemoryManager(default_cpu_memory_manager());
}

Buffer(const uint8_t* data, int64_t size, std::shared_ptr<MemoryManager> mm,
std::shared_ptr<Buffer> parent = NULLPTR)
std::shared_ptr<Buffer> parent = NULLPTR,
std::optional<DeviceAllocationType> device_type = std::nullopt)
: is_mutable_(false),
data_(data),
size_(size),
capacity_(size),
parent_(std::move(parent)) {
// SetMemoryManager will also set device_type_
SetMemoryManager(std::move(mm));
// if a device type is specified, use that instead. for example:
// CUDA_HOST. The CudaMemoryManager will set device_type_ to CUDA,
// but you can specify CUDA_HOST as the device type to override it.
if (device_type != std::nullopt) {
device_type_ = device_type;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would a DCHECK be useful here to confirm that device_type_ ended-up being compatible with the memory manager?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly, we'd have to add a Validate or Valid method to the memory manager to do so, to allow the memory manager to specify if the device_type is valid for it. Not sure how worthwhile that is though.

}

Buffer(uintptr_t address, int64_t size, std::shared_ptr<MemoryManager> mm,
Expand Down Expand Up @@ -240,6 +254,8 @@ class ARROW_EXPORT Buffer {

const std::shared_ptr<MemoryManager>& memory_manager() const { return memory_manager_; }

std::optional<DeviceAllocationType> device_type() const { return device_type_; }

std::shared_ptr<Buffer> parent() const { return parent_; }

/// \brief Get a RandomAccessFile for reading a buffer
Expand Down Expand Up @@ -294,6 +310,7 @@ class ARROW_EXPORT Buffer {
const uint8_t* data_;
int64_t size_;
int64_t capacity_;
std::optional<DeviceAllocationType> device_type_;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this optional, are there any cases where the information is not available?


// null by default, but may be set
std::shared_ptr<Buffer> parent_;
Expand All @@ -309,6 +326,7 @@ class ARROW_EXPORT Buffer {
void SetMemoryManager(std::shared_ptr<MemoryManager> mm) {
memory_manager_ = std::move(mm);
is_cpu_ = memory_manager_->is_cpu();
device_type_ = memory_manager_->device()->device_type();
}

private:
Expand Down
14 changes: 14 additions & 0 deletions cpp/src/arrow/buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ using internal::checked_cast;
using internal::checked_pointer_cast;

static const char kMyDeviceTypeName[] = "arrowtest::MyDevice";
static const DeviceAllocationType kMyDeviceType = DeviceAllocationType::kEXT_DEV;

static const int kMyDeviceAllowCopy = 1;
static const int kMyDeviceAllowView = 2;
Expand Down Expand Up @@ -70,6 +71,8 @@ class MyDevice : public Device {
return checked_cast<const MyDevice&>(other).value_ == value_;
}

DeviceAllocationType device_type() const override { return kMyDeviceType; }

std::shared_ptr<MemoryManager> default_memory_manager() override;

int value() const { return value_; }
Expand Down Expand Up @@ -256,13 +259,15 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_NE(buffer->address(), cpu_src_->address());
ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyNonOwned(*cpu_src_, cpu_mm_));
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_NE(buffer->address(), cpu_src_->address());
ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

Expand All @@ -271,6 +276,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), my_copy_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_NE(buffer->address(), cpu_src_->address());
ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
Expand All @@ -280,6 +286,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), my_copy_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_NE(buffer->address(), cpu_src_->address());
ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
Expand All @@ -290,13 +297,15 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_NE(buffer->address(), my_copy_src_->address());
ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

ASSERT_OK_AND_ASSIGN(buffer, MemoryManager::CopyNonOwned(*my_copy_src_, cpu_mm_));
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_NE(buffer->address(), my_copy_src_->address());
ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

Expand All @@ -305,6 +314,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), my_copy_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_NE(buffer->address(), my_copy_src_->address());
ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
Expand All @@ -315,6 +325,7 @@ TEST_F(TestDevice, Copy) {
ASSERT_EQ(buffer->device(), my_copy_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_NE(buffer->address(), my_copy_src_->address());
ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
Expand All @@ -330,6 +341,7 @@ TEST_F(TestDevice, View) {
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_EQ(buffer->address(), cpu_src_->address());
ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

Expand All @@ -338,6 +350,7 @@ TEST_F(TestDevice, View) {
ASSERT_EQ(buffer->device(), my_view_device_);
ASSERT_FALSE(buffer->is_cpu());
ASSERT_EQ(buffer->address(), cpu_src_->address());
ASSERT_EQ(buffer->device_type(), kMyDeviceType);
#ifdef NDEBUG
ASSERT_EQ(buffer->data(), nullptr);
#endif
Expand All @@ -348,6 +361,7 @@ TEST_F(TestDevice, View) {
ASSERT_EQ(buffer->device(), cpu_device_);
ASSERT_TRUE(buffer->is_cpu());
ASSERT_EQ(buffer->address(), my_copy_src_->address());
ASSERT_EQ(buffer->device_type(), DeviceAllocationType::kCPU);
ASSERT_NE(buffer->data(), nullptr);
AssertBufferEqual(*buffer, "some data");

Expand Down
Loading