From 042f533f52a7bb305d3adab54fd9eda85e9ff5c0 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 3 Apr 2017 19:23:51 -0400 Subject: [PATCH 1/2] Add random access writer for a mutable buffer. Rename WriteableFileInterface to WriteableFile for better consistency Change-Id: I16b7722728cd272ee4446eb1da679937e8f3232a --- cpp/src/arrow/io/interfaces.h | 6 ++-- cpp/src/arrow/io/io-memory-test.cc | 27 +++++++++++++++++ cpp/src/arrow/io/memory.cc | 45 ++++++++++++++++++++++++++++ cpp/src/arrow/io/memory.h | 23 ++++++++++++++ python/pyarrow/includes/libarrow.pxd | 4 +-- 5 files changed, 100 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index 258a3155743..b5a0bd85bf2 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -121,16 +121,16 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable { RandomAccessFile(); }; -class ARROW_EXPORT WriteableFileInterface : public OutputStream, public Seekable { +class ARROW_EXPORT WriteableFile : public OutputStream, public Seekable { public: virtual Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) = 0; protected: - WriteableFileInterface() { set_mode(FileMode::READ); } + WriteableFile() { set_mode(FileMode::READ); } }; class ARROW_EXPORT ReadWriteFileInterface : public RandomAccessFile, - public WriteableFileInterface { + public WriteableFile { protected: ReadWriteFileInterface() { RandomAccessFile::set_mode(FileMode::READWRITE); } }; diff --git a/cpp/src/arrow/io/io-memory-test.cc b/cpp/src/arrow/io/io-memory-test.cc index 442cd0c4bbc..4704fe8f4d3 100644 --- a/cpp/src/arrow/io/io-memory-test.cc +++ b/cpp/src/arrow/io/io-memory-test.cc @@ -66,6 +66,33 @@ TEST_F(TestBufferOutputStream, CloseResizes) { ASSERT_EQ(static_cast(K * data.size()), buffer_->size()); } +TEST(TestFixedSizeBufferWriter, Basics) { + std::shared_ptr buffer; + ASSERT_OK(AllocateBuffer(default_memory_pool(), 1024, &buffer)); + + FixedSizeBufferWriter writer(buffer); + + int64_t position; + ASSERT_OK(writer.Tell(&position)); + ASSERT_EQ(0, position); + + std::string data = "data123456"; + auto nbytes = static_cast(data.size()); + ASSERT_OK(writer.Write(reinterpret_cast(data.c_str()), nbytes)); + + ASSERT_OK(writer.Tell(&position)); + ASSERT_EQ(nbytes, position); + + ASSERT_OK(writer.Seek(4)); + ASSERT_OK(writer.Tell(&position)); + ASSERT_EQ(4, position); + + ASSERT_RAISES(IOError, writer.Seek(-1)); + ASSERT_RAISES(IOError, writer.Seek(1024)); + + ASSERT_OK(writer.Close()); +} + TEST(TestBufferReader, RetainParentReference) { // ARROW-387 std::string data = "data123456"; diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 5b5c8649dee..2e701e1104d 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -98,6 +98,51 @@ Status BufferOutputStream::Reserve(int64_t nbytes) { return Status::OK(); } +// ---------------------------------------------------------------------- +// In-memory buffer writer + +/// Input buffer must be mutable, will abort if not +FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr& buffer) { + buffer_ = buffer; + DCHECK(buffer->is_mutable()) << "Must pass mutable buffer"; + mutable_data_ = buffer->mutable_data(); + size_ = buffer->size(); + position_ = 0; +} + +FixedSizeBufferWriter::~FixedSizeBufferWriter() {} + +Status FixedSizeBufferWriter::Close() { + // No-op + return Status::OK(); +} + +Status FixedSizeBufferWriter::Seek(int64_t position) { + if (position < 0 || position >= size_) { + return Status::IOError("position out of bounds"); + } + position_ = position; + return Status::OK(); +} + +Status FixedSizeBufferWriter::Tell(int64_t* position) { + *position = position_; + return Status::OK(); +} + +Status FixedSizeBufferWriter::Write(const uint8_t* data, int64_t nbytes) { + std::memcpy(mutable_data_ + position_, data, nbytes); + position_ += nbytes; + return Status::OK(); +} + +Status FixedSizeBufferWriter::WriteAt( + int64_t position, const uint8_t* data, int64_t nbytes) { + std::lock_guard guard(lock_); + RETURN_NOT_OK(Seek(position)); + return Write(data, nbytes); +} + // ---------------------------------------------------------------------- // In-memory buffer reader diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index eb2a5091288..fbb186b7280 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -22,6 +22,7 @@ #include #include +#include #include #include "arrow/io/interfaces.h" @@ -66,6 +67,28 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream { uint8_t* mutable_data_; }; +/// \brief Enables random writes into a fixed-size mutable buffer +/// +class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile { + public: + /// Input buffer must be mutable, will abort if not + explicit FixedSizeBufferWriter(const std::shared_ptr& buffer); + ~FixedSizeBufferWriter(); + + Status Close() override; + Status Seek(int64_t position) override; + Status Tell(int64_t* position) override; + Status Write(const uint8_t* data, int64_t nbytes) override; + Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override; + + private: + std::mutex lock_; + std::shared_ptr buffer_; + uint8_t* mutable_data_; + int64_t size_; + int64_t position_; +}; + class ARROW_EXPORT BufferReader : public RandomAccessFile { public: explicit BufferReader(const std::shared_ptr& buffer); diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 67d6af910c2..2a0488f3a01 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -342,12 +342,12 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil: CStatus ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, shared_ptr[CBuffer]* out) - cdef cppclass WriteableFileInterface(OutputStream, Seekable): + cdef cppclass WriteableFile(OutputStream, Seekable): CStatus WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) cdef cppclass ReadWriteFileInterface(RandomAccessFile, - WriteableFileInterface): + WriteableFile): pass From be0d4bcabdb562d81e025098ed421df3ea7795ec Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 3 Apr 2017 21:37:34 -0400 Subject: [PATCH 2/2] Fix glib after renaming class Change-Id: Iadf907390fb8400c17c60b780e335a4af6fd2562 --- c_glib/arrow-glib/io-memory-mapped-file.cpp | 2 +- c_glib/arrow-glib/io-writeable-file.cpp | 2 +- c_glib/arrow-glib/io-writeable-file.h | 2 +- c_glib/arrow-glib/io-writeable-file.hpp | 8 ++++---- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/c_glib/arrow-glib/io-memory-mapped-file.cpp b/c_glib/arrow-glib/io-memory-mapped-file.cpp index 12c9a6c95ac..e2e255c0391 100644 --- a/c_glib/arrow-glib/io-memory-mapped-file.cpp +++ b/c_glib/arrow-glib/io-memory-mapped-file.cpp @@ -127,7 +127,7 @@ garrow_io_writeable_interface_init(GArrowIOWriteableInterface *iface) iface->get_raw = garrow_io_memory_mapped_file_get_raw_writeable_interface; } -static std::shared_ptr +static std::shared_ptr garrow_io_memory_mapped_file_get_raw_writeable_file_interface(GArrowIOWriteableFile *file) { auto memory_mapped_file = GARROW_IO_MEMORY_MAPPED_FILE(file); diff --git a/c_glib/arrow-glib/io-writeable-file.cpp b/c_glib/arrow-glib/io-writeable-file.cpp index 3de42dd60a9..41b682acd1e 100644 --- a/c_glib/arrow-glib/io-writeable-file.cpp +++ b/c_glib/arrow-glib/io-writeable-file.cpp @@ -76,7 +76,7 @@ garrow_io_writeable_file_write_at(GArrowIOWriteableFile *writeable_file, G_END_DECLS -std::shared_ptr +std::shared_ptr garrow_io_writeable_file_get_raw(GArrowIOWriteableFile *writeable_file) { auto *iface = GARROW_IO_WRITEABLE_FILE_GET_IFACE(writeable_file); diff --git a/c_glib/arrow-glib/io-writeable-file.h b/c_glib/arrow-glib/io-writeable-file.h index 4a4dee5111f..d1ebdbe630e 100644 --- a/c_glib/arrow-glib/io-writeable-file.h +++ b/c_glib/arrow-glib/io-writeable-file.h @@ -28,7 +28,7 @@ G_BEGIN_DECLS #define GARROW_IO_WRITEABLE_FILE(obj) \ (G_TYPE_CHECK_INSTANCE_CAST((obj), \ GARROW_IO_TYPE_WRITEABLE_FILE, \ - GArrowIOWriteableFileInterface)) + GArrowIOWriteableFile)) #define GARROW_IO_IS_WRITEABLE_FILE(obj) \ (G_TYPE_CHECK_INSTANCE_TYPE((obj), \ GARROW_IO_TYPE_WRITEABLE_FILE)) diff --git a/c_glib/arrow-glib/io-writeable-file.hpp b/c_glib/arrow-glib/io-writeable-file.hpp index 2043007ad58..aba95b209d8 100644 --- a/c_glib/arrow-glib/io-writeable-file.hpp +++ b/c_glib/arrow-glib/io-writeable-file.hpp @@ -24,15 +24,15 @@ #include /** - * GArrowIOWriteableFileInterface: + * GArrowIOWriteableFile: * - * It wraps `arrow::io::WriteableFileInterface`. + * It wraps `arrow::io::WriteableFile`. */ struct _GArrowIOWriteableFileInterface { GTypeInterface parent_iface; - std::shared_ptr (*get_raw)(GArrowIOWriteableFile *file); + std::shared_ptr (*get_raw)(GArrowIOWriteableFile *file); }; -std::shared_ptr garrow_io_writeable_file_get_raw(GArrowIOWriteableFile *writeable_file); +std::shared_ptr garrow_io_writeable_file_get_raw(GArrowIOWriteableFile *writeable_file);