Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cpp/src/arrow/io/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,9 @@ BufferReader::BufferReader(const std::shared_ptr<Buffer>& buffer)
BufferReader::BufferReader(const uint8_t* data, int64_t size)
: buffer_(nullptr), data_(data), size_(size), position_(0) {}

BufferReader::BufferReader(const Buffer& buffer)
: BufferReader(buffer.data(), buffer.size()) {}

Status BufferReader::Close() {
// no-op
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/io/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile {
class ARROW_EXPORT BufferReader : public RandomAccessFile {
public:
explicit BufferReader(const std::shared_ptr<Buffer>& buffer);
explicit BufferReader(const Buffer& buffer);
BufferReader(const uint8_t* data, int64_t size);

Status Close() override;
Expand Down
30 changes: 27 additions & 3 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,35 @@ Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) {
// ----------------------------------------------------------------------
// Implement InputStream message reader

Status InputStreamMessageReader::ReadNextMessage(std::unique_ptr<Message>* message) {
return ReadMessage(stream_, message);
/// \brief Implementation of MessageReader that reads from InputStream
class InputStreamMessageReader : public MessageReader {
public:
explicit InputStreamMessageReader(io::InputStream* stream) : stream_(stream) {}

explicit InputStreamMessageReader(const std::shared_ptr<io::InputStream>& owned_stream)
: InputStreamMessageReader(owned_stream.get()) {
owned_stream_ = owned_stream;
}

~InputStreamMessageReader() {}

Status ReadNextMessage(std::unique_ptr<Message>* message) {
return ReadMessage(stream_, message);
}

private:
io::InputStream* stream_;
std::shared_ptr<io::InputStream> owned_stream_;
};

std::unique_ptr<MessageReader> MessageReader::Open(io::InputStream* stream) {
return std::unique_ptr<MessageReader>(new InputStreamMessageReader(stream));
}

InputStreamMessageReader::~InputStreamMessageReader() {}
std::unique_ptr<MessageReader> MessageReader::Open(
const std::shared_ptr<io::InputStream>& owned_stream) {
return std::unique_ptr<MessageReader>(new InputStreamMessageReader(owned_stream));
}

} // namespace ipc
} // namespace arrow
27 changes: 7 additions & 20 deletions cpp/src/arrow/ipc/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,33 +144,20 @@ class ARROW_EXPORT MessageReader {
public:
virtual ~MessageReader() = default;

/// \brief Create MessageReader that reads from InputStream
static std::unique_ptr<MessageReader> Open(io::InputStream* stream);

/// \brief Create MessageReader that reads from owned InputStream
static std::unique_ptr<MessageReader> Open(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record, is there a rationale or convention for the use of unique_ptr vs shared_ptr here? :-)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I try to only use shared_ptr when there is some reasonable expectation that shared ownership may be frequently needed (of course one can always transfer the pointer to a shared_ptr if needed). There are some other places in the library where shared_ptr is returned (or an out-variable) that would be better as unique_ptr

const std::shared_ptr<io::InputStream>& owned_stream);

/// \brief Read next Message from the interface
///
/// \param[out] message an arrow::ipc::Message instance
/// \return Status
virtual Status ReadNextMessage(std::unique_ptr<Message>* message) = 0;
};

/// \brief Implementation of MessageReader that reads from InputStream
/// \since 0.5.0
class ARROW_EXPORT InputStreamMessageReader : public MessageReader {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was never necessary to export this class

public:
explicit InputStreamMessageReader(io::InputStream* stream) : stream_(stream) {}

explicit InputStreamMessageReader(const std::shared_ptr<io::InputStream>& owned_stream)
: InputStreamMessageReader(owned_stream.get()) {
owned_stream_ = owned_stream;
}

~InputStreamMessageReader();

Status ReadNextMessage(std::unique_ptr<Message>* message) override;

private:
io::InputStream* stream_;
std::shared_ptr<io::InputStream> owned_stream_;
};

/// \brief Read encapulated RPC message from position in file
///
/// Read a length-prefixed message flatbuffer starting at the indicated file
Expand Down
13 changes: 7 additions & 6 deletions cpp/src/arrow/ipc/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -480,14 +480,12 @@ Status RecordBatchStreamReader::Open(std::unique_ptr<MessageReader> message_read

Status RecordBatchStreamReader::Open(io::InputStream* stream,
std::shared_ptr<RecordBatchReader>* out) {
std::unique_ptr<MessageReader> message_reader(new InputStreamMessageReader(stream));
return Open(std::move(message_reader), out);
return Open(MessageReader::Open(stream), out);
}

Status RecordBatchStreamReader::Open(const std::shared_ptr<io::InputStream>& stream,
std::shared_ptr<RecordBatchReader>* out) {
std::unique_ptr<MessageReader> message_reader(new InputStreamMessageReader(stream));
return Open(std::move(message_reader), out);
return Open(MessageReader::Open(stream), out);
}

std::shared_ptr<Schema> RecordBatchStreamReader::schema() const {
Expand Down Expand Up @@ -717,14 +715,17 @@ Status ReadTensor(int64_t offset, io::RandomAccessFile* file,

std::unique_ptr<Message> message;
RETURN_NOT_OK(ReadContiguousPayload(file, &message));
return ReadTensor(*message, out);
}

Status ReadTensor(const Message& message, std::shared_ptr<Tensor>* out) {
std::shared_ptr<DataType> type;
std::vector<int64_t> shape;
std::vector<int64_t> strides;
std::vector<std::string> dim_names;
RETURN_NOT_OK(internal::GetTensorMetadata(*message->metadata(), &type, &shape, &strides,
RETURN_NOT_OK(internal::GetTensorMetadata(*message.metadata(), &type, &shape, &strides,
&dim_names));
*out = std::make_shared<Tensor>(type, message->body(), shape, strides, dim_names);
*out = std::make_shared<Tensor>(type, message.body(), shape, strides, dim_names);
return Status::OK();
}

Expand Down
10 changes: 9 additions & 1 deletion cpp/src/arrow/ipc/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ Status ReadRecordBatch(const Buffer& metadata, const std::shared_ptr<Schema>& sc
int max_recursion_depth, io::RandomAccessFile* file,
std::shared_ptr<RecordBatch>* out);

/// EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file
/// \brief EXPERIMENTAL: Read arrow::Tensor as encapsulated IPC message in file
///
/// \param[in] offset the file location of the start of the message
/// \param[in] file the file where the batch is located
Expand All @@ -229,6 +229,14 @@ ARROW_EXPORT
Status ReadTensor(int64_t offset, io::RandomAccessFile* file,
std::shared_ptr<Tensor>* out);

/// \brief EXPERIMENTAL: Read arrow::Tensor from IPC message
///
/// \param[in] message a Message containing the tensor metadata and body
/// \param[out] out the read tensor
/// \return Status
ARROW_EXPORT
Status ReadTensor(const Message& message, std::shared_ptr<Tensor>* out);

} // namespace ipc
} // namespace arrow

Expand Down
64 changes: 55 additions & 9 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,18 @@ Status WriteLargeRecordBatch(const RecordBatch& batch, int64_t buffer_start_offs
pool, kMaxNestingDepth, true);
}

static Status WriteStridedTensorData(int dim_index, int64_t offset, int elem_size,
const Tensor& tensor, uint8_t* scratch_space,
io::OutputStream* dst) {
namespace {

Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst,
int32_t* metadata_length, int64_t* body_length) {
std::shared_ptr<Buffer> metadata;
RETURN_NOT_OK(internal::WriteTensorMessage(tensor, 0, &metadata));
return internal::WriteMessage(*metadata, dst, metadata_length);
}

Status WriteStridedTensorData(int dim_index, int64_t offset, int elem_size,
const Tensor& tensor, uint8_t* scratch_space,
io::OutputStream* dst) {
if (dim_index == tensor.ndim() - 1) {
const uint8_t* data_ptr = tensor.raw_data() + offset;
const int64_t stride = tensor.strides()[dim_index];
Expand All @@ -580,16 +589,37 @@ static Status WriteStridedTensorData(int dim_index, int64_t offset, int elem_siz
return Status::OK();
}

Status WriteTensorHeader(const Tensor& tensor, io::OutputStream* dst,
int32_t* metadata_length, int64_t* body_length) {
RETURN_NOT_OK(AlignStreamPosition(dst));
std::shared_ptr<Buffer> metadata;
RETURN_NOT_OK(internal::WriteTensorMessage(tensor, 0, &metadata));
return internal::WriteMessage(*metadata, dst, metadata_length);
Status GetContiguousTensor(const Tensor& tensor, MemoryPool* pool,
std::unique_ptr<Tensor>* out) {
const auto& type = static_cast<const FixedWidthType&>(*tensor.type());
const int elem_size = type.bit_width() / 8;

// TODO(wesm): Do we care enough about this temporary allocation to pass in
// a MemoryPool to this function?
std::shared_ptr<Buffer> scratch_space;
RETURN_NOT_OK(AllocateBuffer(default_memory_pool(),
tensor.shape()[tensor.ndim() - 1] * elem_size,
&scratch_space));

std::shared_ptr<ResizableBuffer> contiguous_data;
RETURN_NOT_OK(
AllocateResizableBuffer(pool, tensor.size() * elem_size, &contiguous_data));

io::BufferOutputStream stream(contiguous_data);
RETURN_NOT_OK(WriteStridedTensorData(0, 0, elem_size, tensor,
scratch_space->mutable_data(), &stream));

out->reset(new Tensor(tensor.type(), contiguous_data, tensor.shape()));

return Status::OK();
}

} // namespace

Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadata_length,
int64_t* body_length) {
RETURN_NOT_OK(AlignStreamPosition(dst));

if (tensor.is_contiguous()) {
RETURN_NOT_OK(WriteTensorHeader(tensor, dst, metadata_length, body_length));
auto data = tensor.data();
Expand Down Expand Up @@ -619,6 +649,22 @@ Status WriteTensor(const Tensor& tensor, io::OutputStream* dst, int32_t* metadat
}
}

Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool,
std::unique_ptr<Message>* out) {
const Tensor* tensor_to_write = &tensor;
std::unique_ptr<Tensor> temp_tensor;

if (!tensor.is_contiguous()) {
RETURN_NOT_OK(GetContiguousTensor(tensor, pool, &temp_tensor));
tensor_to_write = temp_tensor.get();
}

std::shared_ptr<Buffer> metadata;
RETURN_NOT_OK(internal::WriteTensorMessage(*tensor_to_write, 0, &metadata));
out->reset(new Message(metadata, tensor_to_write->data()));
return Status::OK();
}

Status WriteDictionary(int64_t dictionary_id, const std::shared_ptr<Array>& dictionary,
int64_t buffer_start_offset, io::OutputStream* dst,
int32_t* metadata_length, int64_t* body_length, MemoryPool* pool) {
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/ipc/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,17 @@ Status GetRecordBatchSize(const RecordBatch& batch, int64_t* size);
ARROW_EXPORT
Status GetTensorSize(const Tensor& tensor, int64_t* size);

/// \brief EXPERIMENTAL: Convert arrow::Tensor to a Message with minimal memory
/// allocation
///
/// \param[in] tensor the Tensor to write
/// \param[in] pool MemoryPool to allocate space for metadata
/// \param[out] out the resulting Message
/// \return Status
ARROW_EXPORT
Status GetTensorMessage(const Tensor& tensor, MemoryPool* pool,
std::unique_ptr<Message>* out);

/// \brief EXPERIMENTAL: Write arrow::Tensor as a contiguous message
///
/// \param[in] tensor the Tensor to write
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/python/arrow_to_pandas.cc
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ inline Status ConvertStruct(PandasOptions options, const ChunkedArray& data,
Py_INCREF(Py_None);
field_value.reset(Py_None);
}
// PyDict_SetItemString does not steal the value reference
// PyDict_SetItemString increments reference count
auto setitem_result =
PyDict_SetItemString(dict_item.obj(), name.c_str(), field_value.obj());
RETURN_IF_PYERROR();
Expand Down
60 changes: 58 additions & 2 deletions cpp/src/arrow/python/arrow_to_python.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@

#include "arrow/array.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/reader.h"
#include "arrow/table.h"
#include "arrow/util/logging.h"

#include "arrow/python/common.h"
#include "arrow/python/helpers.h"
#include "arrow/python/numpy_convert.h"
#include "arrow/python/pyarrow.h"
#include "arrow/python/python_to_arrow.h"
#include "arrow/python/util/datetime.h"
#include "arrow/table.h"
#include "arrow/util/logging.h"

namespace arrow {
namespace py {
Expand Down Expand Up @@ -286,5 +288,59 @@ Status DeserializeObject(PyObject* context, const SerializedPyObject& obj, PyObj
obj, out);
}

Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* data,
SerializedPyObject* out) {
PyAcquireGIL gil;
const Py_ssize_t data_length = PyList_Size(data);
RETURN_IF_PYERROR();

const Py_ssize_t expected_data_length = 1 + num_tensors * 2 + num_buffers;
if (data_length != expected_data_length) {
return Status::Invalid("Invalid number of buffers in data");
}

auto GetBuffer = [&data](Py_ssize_t index, std::shared_ptr<Buffer>* out) {
PyObject* py_buf = PyList_GET_ITEM(data, index);
return unwrap_buffer(py_buf, out);
};

Py_ssize_t buffer_index = 0;

// Read the union batch describing object structure
{
std::shared_ptr<Buffer> data_buffer;
RETURN_NOT_OK(GetBuffer(buffer_index++, &data_buffer));
gil.release();
io::BufferReader buf_reader(data_buffer);
std::shared_ptr<RecordBatchReader> reader;
RETURN_NOT_OK(ipc::RecordBatchStreamReader::Open(&buf_reader, &reader));
RETURN_NOT_OK(reader->ReadNext(&out->batch));
gil.acquire();
}

// Zero-copy reconstruct tensors
for (int i = 0; i < num_tensors; ++i) {
std::shared_ptr<Buffer> metadata;
std::shared_ptr<Buffer> body;
std::shared_ptr<Tensor> tensor;
RETURN_NOT_OK(GetBuffer(buffer_index++, &metadata));
RETURN_NOT_OK(GetBuffer(buffer_index++, &body));

ipc::Message message(metadata, body);

RETURN_NOT_OK(ReadTensor(message, &tensor));
out->tensors.emplace_back(std::move(tensor));
}

// Unwrap and append buffers
for (int i = 0; i < num_buffers; ++i) {
std::shared_ptr<Buffer> buffer;
RETURN_NOT_OK(GetBuffer(buffer_index++, &buffer));
out->buffers.emplace_back(std::move(buffer));
}

return Status::OK();
}

} // namespace py
} // namespace arrow
13 changes: 13 additions & 0 deletions cpp/src/arrow/python/arrow_to_python.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ namespace py {
ARROW_EXPORT
Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out);

/// \brief Reconstruct SerializedPyObject from representation produced by
/// SerializedPyObject::GetComponents.
///
/// \param[in] num_tensors
/// \param[in] num_buffers
/// \param[in] data a list containing pyarrow.Buffer instances. Must be 1 +
/// num_tensors * 2 + num_buffers in length
/// \param[out] out the reconstructed object
/// \return Status
ARROW_EXPORT
Status GetSerializedFromComponents(int num_tensors, int num_buffers, PyObject* data,
SerializedPyObject* out);

/// \brief Reconstruct Python object from Arrow-serialized representation
/// \param[in] context Serialization context which contains custom serialization
/// and deserialization callbacks. Can be any Python object with a
Expand Down
Loading