Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 29 additions & 25 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@
namespace arrow {
namespace ipc {

// This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message
constexpr int32_t kIpcContinuationToken = -1;

class Message::MessageImpl {
public:
explicit MessageImpl(const std::shared_ptr<Buffer>& metadata,
Expand Down Expand Up @@ -146,17 +143,25 @@ bool Message::Equals(const Message& other) const {
}
}

Status MaybeAlignMetadata(std::shared_ptr<Buffer>* metadata) {
if (reinterpret_cast<uintptr_t>((*metadata)->data()) % 8 != 0) {
// If the metadata memory is not aligned, we copy it here to avoid
// potential UBSAN issues from Flatbuffers
RETURN_NOT_OK((*metadata)->Copy(0, (*metadata)->size(), metadata));
}
return Status::OK();
}

Status CheckMetadataAndGetBodyLength(const Buffer& metadata, int64_t* body_length) {
// Check metadata memory alignment in debug builds
DCHECK_EQ(0, reinterpret_cast<uintptr_t>(metadata.data()) % 8);
const flatbuf::Message* fb_message;
RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &fb_message));
*body_length = fb_message->bodyLength();
return Status::OK();
}

Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
Status Message::ReadFrom(std::shared_ptr<Buffer> metadata, io::InputStream* stream,
std::unique_ptr<Message>* out) {
RETURN_NOT_OK(MaybeAlignMetadata(&metadata));
int64_t body_length = -1;
RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));

Expand All @@ -170,8 +175,9 @@ Status Message::ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStrea
return Message::Open(metadata, body, out);
}

Status Message::ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& metadata,
Status Message::ReadFrom(const int64_t offset, std::shared_ptr<Buffer> metadata,
io::RandomAccessFile* file, std::unique_ptr<Message>* out) {
RETURN_NOT_OK(MaybeAlignMetadata(&metadata));
int64_t body_length = -1;
RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length));

Expand Down Expand Up @@ -252,7 +258,7 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile
// The size of the Flatbuffer including padding
int32_t flatbuffer_length = -1;
int32_t prefix_size = -1;
if (continuation == kIpcContinuationToken) {
if (continuation == internal::kIpcContinuationToken) {
if (metadata_length < 8) {
return Status::Invalid(
"Corrupted IPC message, had continuation token "
Expand All @@ -263,17 +269,19 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile
// Valid IPC message, parse the message length now
flatbuffer_length = util::SafeLoadAs<int32_t>(buffer->data() + 4);
prefix_size = 8;
} else if (continuation == 0) {
// EOS
*message = nullptr;
return Status::OK();
} else {
// ARROW-6314: Backwards compatibility for reading old IPC
// messages produced prior to version 0.15.0
flatbuffer_length = continuation;
prefix_size = 4;
}

if (flatbuffer_length == 0) {
// EOS
*message = nullptr;
return Status::OK();
}

if (flatbuffer_length + prefix_size != metadata_length) {
return Status::Invalid("flatbuffer size ", flatbuffer_length,
" invalid. File offset: ", offset,
Expand All @@ -282,12 +290,6 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile

std::shared_ptr<Buffer> metadata =
SliceBuffer(buffer, prefix_size, buffer->size() - prefix_size);
if (prefix_size == 4) {
// ARROW-6314: For old messages we copy the metadata to fix UBSAN
// issues with Flatbuffers. For new messages, they are already
// aligned
RETURN_NOT_OK(metadata->Copy(0, metadata->size(), &metadata));
}
return Message::ReadFrom(offset + metadata_length, metadata, file, message);
}

Expand Down Expand Up @@ -328,28 +330,30 @@ Status ReadMessage(io::InputStream* file, MemoryPool* pool, bool copy_metadata,
reinterpret_cast<uint8_t*>(&continuation)));

if (bytes_read != sizeof(int32_t)) {
// EOS
// EOS without indication
*message = nullptr;
return Status::OK();
}

int32_t flatbuffer_length = -1;
bool legacy_format = false;
if (continuation == kIpcContinuationToken) {
if (continuation == internal::kIpcContinuationToken) {
// Valid IPC message, read the message length now
RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read,
reinterpret_cast<uint8_t*>(&flatbuffer_length)));
} else if (continuation == 0) {
// EOS
*message = nullptr;
return Status::OK();
} else {
// ARROW-6314: Backwards compatibility for reading old IPC
// messages produced prior to version 0.15.0
flatbuffer_length = continuation;
legacy_format = true;
}

if (flatbuffer_length == 0) {
// EOS
*message = nullptr;
return Status::OK();
}

std::shared_ptr<Buffer> metadata;
if (legacy_format || copy_metadata) {
DCHECK_NE(pool, nullptr);
Expand Down Expand Up @@ -394,7 +398,7 @@ Status WriteMessage(const Buffer& message, const IpcOptions& options,

// ARROW-6314: Write continuation / padding token
if (!options.write_legacy_ipc_format) {
RETURN_NOT_OK(file->Write(&kIpcContinuationToken, sizeof(int32_t)));
RETURN_NOT_OK(file->Write(&internal::kIpcContinuationToken, sizeof(int32_t)));
}

// Write the flatbuffer size prefix including padding
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/ipc/message.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ARROW_EXPORT Message {
/// \return Status
///
/// \note If stream supports zero-copy, this is zero-copy
static Status ReadFrom(const std::shared_ptr<Buffer>& metadata, io::InputStream* stream,
static Status ReadFrom(std::shared_ptr<Buffer> metadata, io::InputStream* stream,
std::unique_ptr<Message>* out);

/// \brief Read message body from position in file, and create Message given
Expand All @@ -103,7 +103,7 @@ class ARROW_EXPORT Message {
/// \return Status
///
/// \note If file supports zero-copy, this is zero-copy
static Status ReadFrom(const int64_t offset, const std::shared_ptr<Buffer>& metadata,
static Status ReadFrom(const int64_t offset, std::shared_ptr<Buffer> metadata,
io::RandomAccessFile* file, std::unique_ptr<Message>* out);

/// \brief Return true if message type and contents are equal
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/ipc/metadata_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ class DictionaryMemo;

namespace internal {

// This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message
constexpr int32_t kIpcContinuationToken = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for some reason i thought there were linkage issues in C++11 using constexpr in a header.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

constexpr implies inline/static linkage. It's not an error (though I sort of expected it to be) in clang to declare

extern constexpr int32_t kIpcContinuationToken = -1;

... if you want to declare something constexpr within this translation unit but also export it as a symbol for other translation units, but the expectation is that you'll just include the header into whatever TU needs that constant.


static constexpr flatbuf::MetadataVersion kCurrentMetadataVersion =
flatbuf::MetadataVersion_V4;

Expand Down
19 changes: 10 additions & 9 deletions cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,8 @@ class RecordBatchPayloadWriter : public RecordBatchWriter {

class StreamBookKeeper {
public:
explicit StreamBookKeeper(io::OutputStream* sink) : sink_(sink), position_(-1) {}
explicit StreamBookKeeper(const IpcOptions& options, io::OutputStream* sink)
: options_(options), sink_(sink), position_(-1) {}

Status UpdatePosition() { return sink_->Tell(&position_); }

Expand Down Expand Up @@ -1013,11 +1014,15 @@ class StreamBookKeeper {

Status WriteEOS() {
// End of stream marker
constexpr int64_t kEos = 0;
return Write(&kEos, sizeof(kEos));
constexpr int32_t kZeroLength = 0;
if (!options_.write_legacy_ipc_format) {
RETURN_NOT_OK(Write(&internal::kIpcContinuationToken, sizeof(int32_t)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
RETURN_NOT_OK(Write(&internal::kIpcContinuationToken, sizeof(int32_t)));
RETURN_NOT_OK(Write(&internal::kIpcContinuationToken, sizeof(internal::kIpcContinuationToken)));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to let this one slide -- I kind of prefer int32_t for the documentation aspect

}
return Write(&kZeroLength, sizeof(int32_t));
}

protected:
IpcOptions options_;
io::OutputStream* sink_;
int64_t position_;
};
Expand All @@ -1028,7 +1033,7 @@ class PayloadStreamWriter : public internal::IpcPayloadWriter,
protected StreamBookKeeper {
public:
PayloadStreamWriter(const IpcOptions& options, io::OutputStream* sink)
: StreamBookKeeper(sink), options_(options) {}
: StreamBookKeeper(options, sink) {}

~PayloadStreamWriter() override = default;

Expand All @@ -1045,9 +1050,6 @@ class PayloadStreamWriter : public internal::IpcPayloadWriter,
}

Status Close() override { return WriteEOS(); }

private:
IpcOptions options_;
};

/// A IpcPayloadWriter implementation that writes to a IPC file
Expand All @@ -1056,7 +1058,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
public:
PayloadFileWriter(const IpcOptions& options, const std::shared_ptr<Schema>& schema,
io::OutputStream* sink)
: StreamBookKeeper(sink), options_(options), schema_(schema) {}
: StreamBookKeeper(options, sink), schema_(schema) {}

~PayloadFileWriter() override = default;

Expand Down Expand Up @@ -1122,7 +1124,6 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo
}

protected:
IpcOptions options_;
std::shared_ptr<Schema> schema_;
std::vector<FileBlock> dictionaries_;
std::vector<FileBlock> record_batches_;
Expand Down
7 changes: 4 additions & 3 deletions docs/source/format/Columnar.rst
Original file line number Diff line number Diff line change
Expand Up @@ -979,15 +979,16 @@ a ``RecordBatch`` it should be defined in a ``DictionaryBatch``. ::
<DICTIONARY y DELTA>
...
<RECORD BATCH n - 1>
<EOS [optional]: 0x0000000000000000>
<EOS [optional]: 0xFFFFFFFF 0x00000000>

When a stream reader implementation is reading a stream, after each
message, it may read the next 8 bytes to determine both if the stream
continues and the size of the message metadata that follows. Once the
message flatbuffer is read, you can then read the message body.

The stream writer can signal end-of-stream (EOS) either by writing 8
zero (`0x00`) bytes or closing the stream interface.
The stream writer can signal end-of-stream (EOS) either by writing 8 bytes
containing the 4-byte continuation indicator (``0xFFFFFFFF``) followed by 0
metadata length (``0x00000000``) or closing the stream interface.

IPC File Format
---------------
Expand Down