Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions cpp/src/arrow/allocator-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,16 @@ TEST(stl_allocator, FreeLargeMemory) {
}

TEST(stl_allocator, MaxMemory) {
DefaultMemoryPool pool;
auto pool = default_memory_pool();

ASSERT_EQ(0, pool.max_memory());
stl_allocator<uint8_t> alloc(&pool);
uint8_t* data = alloc.allocate(100);
uint8_t* data2 = alloc.allocate(100);
stl_allocator<uint8_t> alloc(pool);
uint8_t* data = alloc.allocate(1000);
uint8_t* data2 = alloc.allocate(1000);

alloc.deallocate(data, 100);
alloc.deallocate(data2, 100);
alloc.deallocate(data, 1000);
alloc.deallocate(data2, 1000);

ASSERT_EQ(200, pool.max_memory());
ASSERT_EQ(2000, pool->max_memory());
}

#endif // ARROW_VALGRIND
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class ARROW_EXPORT Array {
/// boundscheck
bool IsValid(int64_t i) const {
return null_bitmap_data_ != nullptr &&
BitUtil::GetBit(null_bitmap_data_, i + data_->offset);
BitUtil::GetBit(null_bitmap_data_, i + data_->offset);
}

/// Size in the number of elements this array contains.
Expand Down
161 changes: 115 additions & 46 deletions cpp/src/arrow/gpu/cuda_memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstdint>
#include <cstdlib>
#include <memory>
#include <mutex>

#include <cuda.h>

Expand Down Expand Up @@ -151,71 +152,139 @@ Status CudaBufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
// ----------------------------------------------------------------------
// CudaBufferWriter

CudaBufferWriter::CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer)
: io::FixedSizeBufferWriter(buffer),
context_(buffer->context()),
buffer_size_(0),
buffer_position_(0) {}

CudaBufferWriter::~CudaBufferWriter() {}

Status CudaBufferWriter::Close() { return Flush(); }
class CudaBufferWriter::CudaBufferWriterImpl {
public:
explicit CudaBufferWriterImpl(const std::shared_ptr<CudaBuffer>& buffer)
: context_(buffer->context()),
buffer_(buffer),
buffer_size_(0),
buffer_position_(0) {
buffer_ = buffer;
DCHECK(buffer->is_mutable()) << "Must pass mutable buffer";
mutable_data_ = buffer->mutable_data();
size_ = buffer->size();
position_ = 0;
}

Status CudaBufferWriter::Flush() {
if (buffer_size_ > 0 && buffer_position_ > 0) {
// Only need to flush when the write has been buffered
RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_ - buffer_position_,
host_buffer_data_, buffer_position_));
buffer_position_ = 0;
Status Seek(int64_t position) {
if (position < 0 || position >= size_) {
return Status::IOError("position out of bounds");
}
position_ = position;
return Status::OK();
}
return Status::OK();
}

Status CudaBufferWriter::Seek(int64_t position) {
if (buffer_position_ > 0) {
RETURN_NOT_OK(Flush());
Status Flush() {
if (buffer_size_ > 0 && buffer_position_ > 0) {
// Only need to flush when the write has been buffered
RETURN_NOT_OK(
context_->CopyHostToDevice(mutable_data_ + position_ - buffer_position_,
host_buffer_data_, buffer_position_));
buffer_position_ = 0;
}
return Status::OK();
}
return io::FixedSizeBufferWriter::Seek(position);
}

Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
if (memcopy_num_threads_ > 1) {
return Status::Invalid("parallel CUDA memcpy not supported");
Status Tell(int64_t* position) const {
*position = position_;
return Status::OK();
}

if (nbytes == 0) {
Status Write(const uint8_t* data, int64_t nbytes) {
if (nbytes == 0) {
return Status::OK();
}

if (buffer_size_ > 0) {
if (nbytes + buffer_position_ >= buffer_size_) {
// Reach end of buffer, write everything
RETURN_NOT_OK(Flush());
RETURN_NOT_OK(
context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes));
} else {
// Write bytes to buffer
std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes);
buffer_position_ += nbytes;
}
} else {
// Unbuffered write
RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes));
}
position_ += nbytes;
return Status::OK();
}

if (buffer_size_ > 0) {
if (nbytes + buffer_position_ >= buffer_size_) {
// Reach end of buffer, write everything
Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
std::lock_guard<std::mutex> guard(lock_);
RETURN_NOT_OK(Seek(position));
return Write(data, nbytes);
}

Status SetBufferSize(const int64_t buffer_size) {
if (buffer_position_ > 0) {
// Flush any buffered data
RETURN_NOT_OK(Flush());
RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes));
} else {
// Write bytes to buffer
std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes);
buffer_position_ += nbytes;
}
} else {
// Unbuffered write
RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes));
RETURN_NOT_OK(AllocateCudaHostBuffer(buffer_size, &host_buffer_));
host_buffer_data_ = host_buffer_->mutable_data();
buffer_size_ = buffer_size;
return Status::OK();
}
position_ += nbytes;
return Status::OK();

int64_t buffer_size() const { return buffer_size_; }

int64_t buffer_position() const { return buffer_position_; }

private:
std::shared_ptr<CudaContext> context_;
std::shared_ptr<CudaBuffer> buffer_;
std::mutex lock_;
uint8_t* mutable_data_;
int64_t size_;
int64_t position_;

// Pinned host buffer for buffering writes on CPU before calling cudaMalloc
int64_t buffer_size_;
int64_t buffer_position_;
std::shared_ptr<CudaHostBuffer> host_buffer_;
uint8_t* host_buffer_data_;
};

CudaBufferWriter::CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer) {
impl_.reset(new CudaBufferWriterImpl(buffer));
}

Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) {
if (buffer_position_ > 0) {
// Flush any buffered data
CudaBufferWriter::~CudaBufferWriter() {}

Status CudaBufferWriter::Close() { return Flush(); }

Status CudaBufferWriter::Flush() { return impl_->Flush(); }

Status CudaBufferWriter::Seek(int64_t position) {
if (impl_->buffer_position() > 0) {
RETURN_NOT_OK(Flush());
}
RETURN_NOT_OK(AllocateCudaHostBuffer(buffer_size, &host_buffer_));
host_buffer_data_ = host_buffer_->mutable_data();
buffer_size_ = buffer_size;
return Status::OK();
return impl_->Seek(position);
}

Status CudaBufferWriter::Tell(int64_t* position) const { return impl_->Tell(position); }

Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
return impl_->Write(data, nbytes);
}

Status CudaBufferWriter::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
return impl_->WriteAt(position, data, nbytes);
}

Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) {
return impl_->SetBufferSize(buffer_size);
}

int64_t CudaBufferWriter::buffer_size() const { return impl_->buffer_size(); }

int64_t CudaBufferWriter::num_bytes_buffered() const { return impl_->buffer_position(); }

// ----------------------------------------------------------------------

Status AllocateCudaHostBuffer(const int64_t size, std::shared_ptr<CudaHostBuffer>* out) {
Expand Down
21 changes: 10 additions & 11 deletions cpp/src/arrow/gpu/cuda_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class ARROW_EXPORT CudaBufferReader : public io::BufferReader {

/// \class CudaBufferWriter
/// \brief File interface for writing to CUDA buffers, with optional buffering
class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter {
class ARROW_EXPORT CudaBufferWriter : public io::WriteableFile {
public:
explicit CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer);
~CudaBufferWriter();
Expand All @@ -156,10 +156,14 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter {
/// \brief Flush buffered bytes to GPU
Status Flush() override;

// Seek requires flushing if any bytes are buffered
Status Seek(int64_t position) override;

Status Write(const uint8_t* data, int64_t nbytes) override;

Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override;

Status Tell(int64_t* position) const override;

/// \brief Set CPU buffer size to limit calls to cudaMemcpy
/// \param[in] buffer_size the size of CPU buffer to allocate
/// \return Status
Expand All @@ -168,19 +172,14 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter {
Status SetBufferSize(const int64_t buffer_size);

/// \brief Returns size of host (CPU) buffer, 0 for unbuffered
int64_t buffer_size() const { return buffer_size_; }
int64_t buffer_size() const;

/// \brief Returns number of bytes buffered on host
int64_t num_bytes_buffered() const { return buffer_position_; }
int64_t num_bytes_buffered() const;

private:
std::shared_ptr<CudaContext> context_;

// Pinned host buffer for buffering writes on CPU before calling cudaMalloc
int64_t buffer_size_;
int64_t buffer_position_;
std::shared_ptr<CudaHostBuffer> host_buffer_;
uint8_t* host_buffer_data_;
class CudaBufferWriterImpl;
std::unique_ptr<CudaBufferWriterImpl> impl_;
};

/// \brief Allocate CUDA-accessible memory on CPU host
Expand Down
43 changes: 40 additions & 3 deletions cpp/src/arrow/io/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,15 @@ class OSFile {
}

Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
std::lock_guard<std::mutex> guard(lock_);
return FileRead(fd_, out, nbytes, bytes_read);
}

Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
std::lock_guard<std::mutex> guard(lock_);
RETURN_NOT_OK(Seek(position));
return Read(nbytes, bytes_read, out);
}

Status Seek(int64_t pos) {
if (pos < 0) {
return Status::Invalid("Invalid position");
Expand All @@ -384,6 +389,8 @@ class OSFile {

FileMode::type mode() const { return mode_; }

std::mutex& lock() { return lock_; }

protected:
Status SetFileName(const std::string& file_name) {
#if defined(_MSC_VER)
Expand Down Expand Up @@ -458,10 +465,24 @@ Status ReadableFile::Close() { return impl_->Close(); }
Status ReadableFile::Tell(int64_t* pos) const { return impl_->Tell(pos); }

Status ReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
std::lock_guard<std::mutex> guard(impl_->lock());
return impl_->Read(nbytes, bytes_read, out);
}

Status ReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
uint8_t* out) {
return impl_->ReadAt(position, nbytes, bytes_read, out);
}

Status ReadableFile::ReadAt(int64_t position, int64_t nbytes,
std::shared_ptr<Buffer>* out) {
std::lock_guard<std::mutex> guard(impl_->lock());
RETURN_NOT_OK(Seek(position));
return impl_->ReadBuffer(nbytes, out);
}

Status ReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
std::lock_guard<std::mutex> guard(impl_->lock());
return impl_->ReadBuffer(nbytes, out);
}

Expand Down Expand Up @@ -590,6 +611,8 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {

int fd() const { return file_->fd(); }

std::mutex& lock() { return file_->lock(); }

private:
std::unique_ptr<OSFile> file_;
int64_t position_;
Expand Down Expand Up @@ -671,10 +694,24 @@ Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
return Status::OK();
}

Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
uint8_t* out) {
std::lock_guard<std::mutex> guard(memory_map_->lock());
RETURN_NOT_OK(Seek(position));
return Read(nbytes, bytes_read, out);
}

Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes,
std::shared_ptr<Buffer>* out) {
std::lock_guard<std::mutex> guard(memory_map_->lock());
RETURN_NOT_OK(Seek(position));
return Read(nbytes, out);
}

bool MemoryMappedFile::supports_zero_copy() const { return true; }

Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
std::lock_guard<std::mutex> guard(lock_);
std::lock_guard<std::mutex> guard(memory_map_->lock());

if (!memory_map_->opened() || !memory_map_->writable()) {
return Status::IOError("Unable to write");
Expand All @@ -685,7 +722,7 @@ Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t
}

Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) {
std::lock_guard<std::mutex> guard(lock_);
std::lock_guard<std::mutex> guard(memory_map_->lock());

if (!memory_map_->opened() || !memory_map_->writable()) {
return Status::IOError("Unable to write");
Expand Down
13 changes: 13 additions & 0 deletions cpp/src/arrow/io/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ class ARROW_EXPORT ReadableFile : public RandomAccessFile {
Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;

/// \brief Thread-safe implementation of ReadAt
Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
uint8_t* out) override;

/// \brief Thread-safe implementation of ReadAt
Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;

Status GetSize(int64_t* size) override;
Status Seek(int64_t position) override;

Expand Down Expand Up @@ -139,6 +146,12 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
// Zero copy read. Not thread-safe
Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;

Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
uint8_t* out) override;

/// Default implementation is thread-safe
Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;

bool supports_zero_copy() const override;

/// Write data at the current position in the file. Thread-safe
Expand Down
Loading