Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 1 addition & 25 deletions cpp/src/arrow/gpu/cuda_arrow_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,31 +63,7 @@ Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx,

Status ReadMessage(CudaBufferReader* reader, MemoryPool* pool,
std::unique_ptr<ipc::Message>* out) {
int32_t message_length = 0;
int64_t bytes_read = 0;

RETURN_NOT_OK(reader->Read(sizeof(int32_t), &bytes_read,
reinterpret_cast<uint8_t*>(&message_length)));
if (bytes_read != sizeof(int32_t)) {
*out = nullptr;
return Status::OK();
}

if (message_length == 0) {
// Optional 0 EOS control message
*out = nullptr;
return Status::OK();
}

std::shared_ptr<Buffer> metadata;
RETURN_NOT_OK(AllocateBuffer(pool, message_length, &metadata));
RETURN_NOT_OK(reader->Read(message_length, &bytes_read, metadata->mutable_data()));
if (bytes_read != message_length) {
return Status::IOError("Expected ", message_length, " metadata bytes, but only got ",
bytes_read);
}

return ipc::Message::ReadFrom(metadata, reader, out);
return ipc::ReadMessageCopy(reader, pool, out);
}

Status ReadRecordBatch(const std::shared_ptr<Schema>& schema,
Expand Down
154 changes: 131 additions & 23 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@
#include "arrow/ipc/util.h"
#include "arrow/status.h"
#include "arrow/util/logging.h"
#include "arrow/util/ubsan.h"

namespace arrow {
namespace ipc {

// This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message
constexpr int32_t kIpcContinuationToken = -1;

class Message::MessageImpl {
public:
explicit MessageImpl(const std::shared_ptr<Buffer>& metadata,
Expand Down Expand Up @@ -142,12 +146,19 @@ bool Message::Equals(const Message& other) const {
}
}

Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
std::unique_ptr<Message>* out) {
Status CheckMetadataAndGetBodyLength(const Buffer& metadata, int64_t* body_length) {
// Check metadata memory alignment in debug builds
DCHECK_EQ(0, reinterpret_cast<uintptr_t>(metadata.data()) % 8);
const flatbuf::Message* fb_message;
RETURN_NOT_OK(internal::VerifyMessage(metadata->data(), metadata->size(), &fb_message));
RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &fb_message));
*body_length = fb_message->bodyLength();
return Status::OK();
}

int64_t body_length = fb_message->bodyLength();
Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
std::unique_ptr<Message>* out) {
int64_t body_length = -1;
RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));

std::shared_ptr<Buffer> body;
RETURN_NOT_OK(stream->Read(body_length, &body));
Expand All @@ -161,9 +172,8 @@ Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStrea

Status Message::ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& metadata,
io::RandomAccessFile* file, std::unique_ptr<Message>* out) {
const flatbuf::Message* fb_message;
RETURN_NOT_OK(internal::VerifyMessage(metadata->data(), metadata->size(), &fb_message));
int64_t body_length = fb_message->bodyLength();
int64_t body_length = -1;
RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));

std::shared_ptr<Buffer> body;
RETURN_NOT_OK(file->ReadAt(offset, body_length, &body));
Expand All @@ -184,10 +194,10 @@ Status WritePadding(io::OutputStream* stream, int64_t nbytes) {
return Status::OK();
}

Status Message::SerializeTo(io::OutputStream* stream, int32_t alignment,
Status Message::SerializeTo(io::OutputStream* stream, const IpcOptions& options,
int64_t* output_length) const {
int32_t metadata_length = 0;
RETURN_NOT_OK(internal::WriteMessage(*metadata(), alignment, stream, &metadata_length));
RETURN_NOT_OK(WriteMessage(*metadata(), options, stream, &metadata_length));

*output_length = metadata_length;

Expand Down Expand Up @@ -237,15 +247,47 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile
" metadata bytes but got ", buffer->size());
}

int32_t flatbuffer_size = *reinterpret_cast<const int32_t*>(buffer->data());
const int32_t continuation = util::SafeLoadAs<int32_t>(buffer->data());

// The size of the Flatbuffer including padding
int32_t flatbuffer_length = -1;
int32_t prefix_size = -1;
if (continuation == kIpcContinuationToken) {
if (metadata_length < 8) {
return Status::Invalid(
"Corrupted IPC message, had continuation token "
" but length ",
metadata_length);
}

if (flatbuffer_size + static_cast<int>(sizeof(int32_t)) > metadata_length) {
return Status::Invalid("flatbuffer size ", metadata_length,
// Valid IPC message, parse the message length now
flatbuffer_length = util::SafeLoadAs<int32_t>(buffer->data() + 4);
prefix_size = 8;
} else if (continuation == 0) {
// EOS
*message = nullptr;
return Status::OK();
} else {
// ARROW-6314: Backwards compatibility for reading old IPC
// messages produced prior to version 0.15.0
flatbuffer_length = continuation;
prefix_size = 4;
}

if (flatbuffer_length + prefix_size != metadata_length) {
return Status::Invalid("flatbuffer size ", flatbuffer_length,
" invalid. File offset: ", offset,
", metadata length: ", metadata_length);
}

auto metadata = SliceBuffer(buffer, 4, buffer->size() - 4);
std::shared_ptr<Buffer> metadata =
SliceBuffer(buffer, prefix_size, buffer->size() - prefix_size);
if (prefix_size == 4) {
// ARROW-6314: For old messages we copy the metadata to fix UBSAN
// issues with Flatbuffers. For new messages, they are already
// aligned
RETURN_NOT_OK(metadata->Copy(0, metadata->size(), &metadata));
}
return Message::ReadFrom(offset + metadata_length, metadata, file, message);
}

Expand All @@ -269,39 +311,105 @@ Status CheckAligned(io::FileInterface* stream, int32_t alignment) {
int64_t current_position;
ARROW_RETURN_NOT_OK(stream->Tell(&current_position));
if (current_position % alignment != 0) {
return Status::Invalid("Stream is not aligned");
return Status::Invalid("Stream is not aligned pos: ", current_position,
" alignment: ", alignment);
} else {
return Status::OK();
}
}

Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* message) {
int32_t message_length = 0;
namespace {

Status ReadMessage(io::InputStream* file, MemoryPool* pool, bool copy_metadata,
std::unique_ptr<Message>* message) {
int32_t continuation = 0;
int64_t bytes_read = 0;
RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
reinterpret_cast<uint8_t*>(&message_length)));
reinterpret_cast<uint8_t*>(&continuation)));

if (bytes_read != sizeof(int32_t)) {
// EOS
*message = nullptr;
return Status::OK();
}

if (message_length == 0) {
// Optional 0 EOS control message
int32_t flatbuffer_length = -1;
bool legacy_format = false;
if (continuation == kIpcContinuationToken) {
// Valid IPC message, read the message length now
RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
reinterpret_cast<uint8_t*>(&flatbuffer_length)));
} else if (continuation == 0) {
// EOS
*message = nullptr;
return Status::OK();
} else {
// ARROW-6314: Backwards compatibility for reading old IPC
// messages produced prior to version 0.15.0
flatbuffer_length = continuation;
legacy_format = true;
}

std::shared_ptr<Buffer> metadata;
RETURN_NOT_OK(file->Read(message_length, &metadata));
if (metadata->size() != message_length) {
return Status::Invalid("Expected to read ", message_length, " metadata bytes, but ",
"only read ", metadata->size());
if (legacy_format || copy_metadata) {
DCHECK_NE(pool, nullptr);
RETURN_NOT_OK(AllocateBuffer(pool, flatbuffer_length, &metadata));
RETURN_NOT_OK(file->Read(flatbuffer_length, &bytes_read, metadata->mutable_data()));
} else {
RETURN_NOT_OK(file->Read(flatbuffer_length, &metadata));
bytes_read = metadata->size();
}
if (bytes_read != flatbuffer_length) {
return Status::Invalid("Expected to read ", flatbuffer_length,
" metadata bytes, but ", "only read ", bytes_read);
}

return Message::ReadFrom(metadata, file, message);
}

} // namespace

Status ReadMessage(io::InputStream* file, std::unique_ptr<Message>* out) {
return ReadMessage(file, default_memory_pool(), /*copy_metadata=*/false, out);
}

Status ReadMessageCopy(io::InputStream* file, MemoryPool* pool,
std::unique_ptr<Message>* out) {
return ReadMessage(file, pool, /*copy_metadata=*/true, out);
}

Status WriteMessage(const Buffer& message, const IpcOptions& options,
io::OutputStream* file, int32_t* message_length) {
const int32_t prefix_size = options.write_legacy_ipc_format ? 4 : 8;
const int32_t flatbuffer_size = static_cast<int32_t>(message.size());

int32_t padded_message_length = static_cast<int32_t>(
PaddedLength(flatbuffer_size + prefix_size, options.alignment));

int32_t padding = padded_message_length - flatbuffer_size - prefix_size;

// The returned message size includes the length prefix, the flatbuffer,
// plus padding
*message_length = padded_message_length;

// ARROW-6314: Write continuation / padding token
if (!options.write_legacy_ipc_format) {
RETURN_NOT_OK(file->Write(&kIpcContinuationToken, sizeof(int32_t)));
}

// Write the flatbuffer size prefix including padding
int32_t padded_flatbuffer_size = padded_message_length - prefix_size;
RETURN_NOT_OK(file->Write(&padded_flatbuffer_size, sizeof(int32_t)));

// Write the flatbuffer
RETURN_NOT_OK(file->Write(message.data(), flatbuffer_size));
if (padding > 0) {
RETURN_NOT_OK(file->Write(kPaddingBytes, padding));
}

return Status::OK();
}

// ----------------------------------------------------------------------
// Implement InputStream message reader

Expand Down
47 changes: 34 additions & 13 deletions cpp/src/arrow/ipc/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

// C++ object model and user API for interprocess schema messaging

#ifndef ARROW_IPC_MESSAGE_H
#define ARROW_IPC_MESSAGE_H
#pragma once

#include <cstdint>
#include <memory>
Expand All @@ -32,6 +31,7 @@
namespace arrow {

class Buffer;
class MemoryPool;

namespace io {

Expand Down Expand Up @@ -137,13 +137,10 @@ class ARROW_EXPORT Message {
/// \brief Write length-prefixed metadata and body to output stream
///
/// \param[in] file output stream to write to
/// \param[in] alignment byte alignment for metadata, usually 8 or
/// 64. Whether the body is padded depends on the metadata; if the body
/// buffer is smaller than the size indicated in the metadata, then extra
/// padding bytes will be written
/// \param[in] options IPC writing options including alignment
/// \param[out] output_length the number of bytes written
/// \return Status
Status SerializeTo(io::OutputStream* file, int32_t alignment,
Status SerializeTo(io::OutputStream* file, const IpcOptions& options,
int64_t* output_length) const;

/// \brief Return true if the Message metadata passes Flatbuffer validation
Expand Down Expand Up @@ -223,15 +220,39 @@ Status AlignStream(io::OutputStream* stream, int32_t alignment = 8);
ARROW_EXPORT
Status CheckAligned(io::FileInterface* stream, int32_t alignment = 8);

/// \brief Read encapsulated RPC message (metadata and body) from InputStream
/// \brief Read encapsulated IPC message (metadata and body) from InputStream
///
/// Read length-prefixed message with as-yet unknown length. Returns null if
/// there are not enough bytes available or the message length is 0 (e.g. EOS
/// in a stream)
/// Returns null if there are not enough bytes available or the
/// message length is 0 (e.g. EOS in a stream)
ARROW_EXPORT
Status ReadMessage(io::InputStream* stream, std::unique_ptr<Message>* message);

/// \brief Read encapsulated IPC message (metadata and body) from InputStream
///
/// Like ReadMessage, except that the metadata is copied in a new buffer.
/// This is necessary if the stream returns non-CPU buffers.
ARROW_EXPORT
Status ReadMessageCopy(io::InputStream* stream, MemoryPool* pool,
std::unique_ptr<Message>* message);

/// Write encapsulated IPC message Does not make assumptions about
/// whether the stream is aligned already. Can write legacy (pre
/// version 0.15.0) IPC message if option set
///
/// continuation: 0xFFFFFFFF
/// message_size: int32
/// message: const void*
/// padding
///
/// \param[in] message a buffer containing the metadata to write
/// \param[in] options IPC writing options, including alignment and
/// legacy message support
/// \param[in,out] file the OutputStream to write to
/// \param[out] message_length the total size of the payload written including
/// padding
/// \return Status
Status WriteMessage(const Buffer& message, const IpcOptions& options,
io::OutputStream* file, int32_t* message_length);

} // namespace ipc
} // namespace arrow

#endif // ARROW_IPC_MESSAGE_H
32 changes: 0 additions & 32 deletions cpp/src/arrow/ipc/metadata_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1239,38 +1239,6 @@ Status GetSparseTensorMetadata(const Buffer& metadata, std::shared_ptr<DataType>
return ConcreteTypeFromFlatbuffer(sparse_tensor->type_type(), type_data, {}, type);
}

// ----------------------------------------------------------------------
// Implement message writing

Status WriteMessage(const Buffer& message, int32_t alignment, io::OutputStream* file,
int32_t* message_length) {
// ARROW-3212: We do not make assumptions that the output stream is aligned
int32_t padded_message_length = static_cast<int32_t>(message.size()) + 4;
const int32_t remainder = padded_message_length % alignment;
if (remainder != 0) {
padded_message_length += alignment - remainder;
}

// The returned message size includes the length prefix, the flatbuffer,
// plus padding
*message_length = padded_message_length;

// Write the flatbuffer size prefix including padding
int32_t flatbuffer_size = padded_message_length - 4;
RETURN_NOT_OK(file->Write(&flatbuffer_size, sizeof(int32_t)));

// Write the flatbuffer
RETURN_NOT_OK(file->Write(message.data(), message.size()));

// Write any padding
int32_t padding = padded_message_length - static_cast<int32_t>(message.size()) - 4;
if (padding > 0) {
RETURN_NOT_OK(file->Write(kPaddingBytes, padding));
}

return Status::OK();
}

} // namespace internal
} // namespace ipc
} // namespace arrow
Loading