Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion c_glib/arrow-glib/io-memory-mapped-file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ garrow_io_writeable_interface_init(GArrowIOWriteableInterface *iface)
iface->get_raw = garrow_io_memory_mapped_file_get_raw_writeable_interface;
}

static std::shared_ptr<arrow::io::WriteableFileInterface>
static std::shared_ptr<arrow::io::WriteableFile>
garrow_io_memory_mapped_file_get_raw_writeable_file_interface(GArrowIOWriteableFile *file)
{
auto memory_mapped_file = GARROW_IO_MEMORY_MAPPED_FILE(file);
Expand Down
2 changes: 1 addition & 1 deletion c_glib/arrow-glib/io-writeable-file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ garrow_io_writeable_file_write_at(GArrowIOWriteableFile *writeable_file,

G_END_DECLS

std::shared_ptr<arrow::io::WriteableFileInterface>
std::shared_ptr<arrow::io::WriteableFile>
garrow_io_writeable_file_get_raw(GArrowIOWriteableFile *writeable_file)
{
auto *iface = GARROW_IO_WRITEABLE_FILE_GET_IFACE(writeable_file);
Expand Down
2 changes: 1 addition & 1 deletion c_glib/arrow-glib/io-writeable-file.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ G_BEGIN_DECLS
#define GARROW_IO_WRITEABLE_FILE(obj) \
(G_TYPE_CHECK_INSTANCE_CAST((obj), \
GARROW_IO_TYPE_WRITEABLE_FILE, \
GArrowIOWriteableFileInterface))
GArrowIOWriteableFile))
#define GARROW_IO_IS_WRITEABLE_FILE(obj) \
(G_TYPE_CHECK_INSTANCE_TYPE((obj), \
GARROW_IO_TYPE_WRITEABLE_FILE))
Expand Down
8 changes: 4 additions & 4 deletions c_glib/arrow-glib/io-writeable-file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@
#include <arrow-glib/io-writeable-file.h>

/**
* GArrowIOWriteableFileInterface:
* GArrowIOWriteableFile:
*
* It wraps `arrow::io::WriteableFileInterface`.
* It wraps `arrow::io::WriteableFile`.
*/
struct _GArrowIOWriteableFileInterface
{
GTypeInterface parent_iface;

std::shared_ptr<arrow::io::WriteableFileInterface> (*get_raw)(GArrowIOWriteableFile *file);
std::shared_ptr<arrow::io::WriteableFile> (*get_raw)(GArrowIOWriteableFile *file);
};

std::shared_ptr<arrow::io::WriteableFileInterface> garrow_io_writeable_file_get_raw(GArrowIOWriteableFile *writeable_file);
std::shared_ptr<arrow::io::WriteableFile> garrow_io_writeable_file_get_raw(GArrowIOWriteableFile *writeable_file);
6 changes: 3 additions & 3 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,16 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable {
RandomAccessFile();
};

class ARROW_EXPORT WriteableFileInterface : public OutputStream, public Seekable {
class ARROW_EXPORT WriteableFile : public OutputStream, public Seekable {
public:
virtual Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) = 0;

protected:
WriteableFileInterface() { set_mode(FileMode::READ); }
WriteableFile() { set_mode(FileMode::READ); }
};

class ARROW_EXPORT ReadWriteFileInterface : public RandomAccessFile,
public WriteableFileInterface {
public WriteableFile {
protected:
ReadWriteFileInterface() { RandomAccessFile::set_mode(FileMode::READWRITE); }
};
Expand Down
27 changes: 27 additions & 0 deletions cpp/src/arrow/io/io-memory-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,33 @@ TEST_F(TestBufferOutputStream, CloseResizes) {
ASSERT_EQ(static_cast<int64_t>(K * data.size()), buffer_->size());
}

TEST(TestFixedSizeBufferWriter, Basics) {
std::shared_ptr<MutableBuffer> buffer;
ASSERT_OK(AllocateBuffer(default_memory_pool(), 1024, &buffer));

FixedSizeBufferWriter writer(buffer);

int64_t position;
ASSERT_OK(writer.Tell(&position));
ASSERT_EQ(0, position);

std::string data = "data123456";
auto nbytes = static_cast<int64_t>(data.size());
ASSERT_OK(writer.Write(reinterpret_cast<const uint8_t*>(data.c_str()), nbytes));

ASSERT_OK(writer.Tell(&position));
ASSERT_EQ(nbytes, position);

ASSERT_OK(writer.Seek(4));
ASSERT_OK(writer.Tell(&position));
ASSERT_EQ(4, position);

ASSERT_RAISES(IOError, writer.Seek(-1));
ASSERT_RAISES(IOError, writer.Seek(1024));

ASSERT_OK(writer.Close());
}

TEST(TestBufferReader, RetainParentReference) {
// ARROW-387
std::string data = "data123456";
Expand Down
45 changes: 45 additions & 0 deletions cpp/src/arrow/io/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,51 @@ Status BufferOutputStream::Reserve(int64_t nbytes) {
return Status::OK();
}

// ----------------------------------------------------------------------
// In-memory buffer writer

/// Input buffer must be mutable, will abort if not
FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer) {
buffer_ = buffer;
DCHECK(buffer->is_mutable()) << "Must pass mutable buffer";
mutable_data_ = buffer->mutable_data();
size_ = buffer->size();
position_ = 0;
}

FixedSizeBufferWriter::~FixedSizeBufferWriter() {}

Status FixedSizeBufferWriter::Close() {
// No-op
return Status::OK();
}

Status FixedSizeBufferWriter::Seek(int64_t position) {
if (position < 0 || position >= size_) {
return Status::IOError("position out of bounds");
}
position_ = position;
return Status::OK();
}

Status FixedSizeBufferWriter::Tell(int64_t* position) {
*position = position_;
return Status::OK();
}

Status FixedSizeBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
std::memcpy(mutable_data_ + position_, data, nbytes);
position_ += nbytes;
return Status::OK();
}

Status FixedSizeBufferWriter::WriteAt(
int64_t position, const uint8_t* data, int64_t nbytes) {
std::lock_guard<std::mutex> guard(lock_);
RETURN_NOT_OK(Seek(position));
return Write(data, nbytes);
}

// ----------------------------------------------------------------------
// In-memory buffer reader

Expand Down
23 changes: 23 additions & 0 deletions cpp/src/arrow/io/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <cstdint>
#include <memory>
#include <mutex>
#include <string>

#include "arrow/io/interfaces.h"
Expand Down Expand Up @@ -66,6 +67,28 @@ class ARROW_EXPORT BufferOutputStream : public OutputStream {
uint8_t* mutable_data_;
};

/// \brief Enables random writes into a fixed-size mutable buffer
///
class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile {
public:
/// Input buffer must be mutable, will abort if not
explicit FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer);
~FixedSizeBufferWriter();

Status Close() override;
Status Seek(int64_t position) override;
Status Tell(int64_t* position) override;
Status Write(const uint8_t* data, int64_t nbytes) override;
Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override;

private:
std::mutex lock_;
std::shared_ptr<Buffer> buffer_;
uint8_t* mutable_data_;
int64_t size_;
int64_t position_;
};

class ARROW_EXPORT BufferReader : public RandomAccessFile {
public:
explicit BufferReader(const std::shared_ptr<Buffer>& buffer);
Expand Down
4 changes: 2 additions & 2 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,12 @@ cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
CStatus ReadAt(int64_t position, int64_t nbytes,
int64_t* bytes_read, shared_ptr[CBuffer]* out)

cdef cppclass WriteableFileInterface(OutputStream, Seekable):
cdef cppclass WriteableFile(OutputStream, Seekable):
CStatus WriteAt(int64_t position, const uint8_t* data,
int64_t nbytes)

cdef cppclass ReadWriteFileInterface(RandomAccessFile,
WriteableFileInterface):
WriteableFile):
pass


Expand Down