diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index a281b0d4181..d08d57a8d35 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -37,9 +37,6 @@ namespace arrow { namespace ipc { -// This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message -constexpr int32_t kIpcContinuationToken = -1; - class Message::MessageImpl { public: explicit MessageImpl(const std::shared_ptr& metadata, @@ -146,17 +143,25 @@ bool Message::Equals(const Message& other) const { } } +Status MaybeAlignMetadata(std::shared_ptr* metadata) { + if (reinterpret_cast((*metadata)->data()) % 8 != 0) { + // If the metadata memory is not aligned, we copy it here to avoid + // potential UBSAN issues from Flatbuffers + RETURN_NOT_OK((*metadata)->Copy(0, (*metadata)->size(), metadata)); + } + return Status::OK(); +} + Status CheckMetadataAndGetBodyLength(const Buffer& metadata, int64_t* body_length) { - // Check metadata memory alignment in debug builds - DCHECK_EQ(0, reinterpret_cast(metadata.data()) % 8); const flatbuf::Message* fb_message; RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &fb_message)); *body_length = fb_message->bodyLength(); return Status::OK(); } -Status Message::ReadFrom(const std::shared_ptr& metadata, io::InputStream* stream, +Status Message::ReadFrom(std::shared_ptr metadata, io::InputStream* stream, std::unique_ptr* out) { + RETURN_NOT_OK(MaybeAlignMetadata(&metadata)); int64_t body_length = -1; RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length)); @@ -170,8 +175,9 @@ Status Message::ReadFrom(const std::shared_ptr& metadata, io::InputStrea return Message::Open(metadata, body, out); } -Status Message::ReadFrom(const int64_t offset, const std::shared_ptr& metadata, +Status Message::ReadFrom(const int64_t offset, std::shared_ptr metadata, io::RandomAccessFile* file, std::unique_ptr* out) { + RETURN_NOT_OK(MaybeAlignMetadata(&metadata)); int64_t body_length = -1; RETURN_NOT_OK(CheckMetadataAndGetBodyLength(*metadata, &body_length)); @@ -252,7 +258,7 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile // The size of the Flatbuffer including padding int32_t flatbuffer_length = -1; int32_t prefix_size = -1; - if (continuation == kIpcContinuationToken) { + if (continuation == internal::kIpcContinuationToken) { if (metadata_length < 8) { return Status::Invalid( "Corrupted IPC message, had continuation token " @@ -263,10 +269,6 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile // Valid IPC message, parse the message length now flatbuffer_length = util::SafeLoadAs(buffer->data() + 4); prefix_size = 8; - } else if (continuation == 0) { - // EOS - *message = nullptr; - return Status::OK(); } else { // ARROW-6314: Backwards compatibility for reading old IPC // messages produced prior to version 0.15.0 @@ -274,6 +276,12 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile prefix_size = 4; } + if (flatbuffer_length == 0) { + // EOS + *message = nullptr; + return Status::OK(); + } + if (flatbuffer_length + prefix_size != metadata_length) { return Status::Invalid("flatbuffer size ", flatbuffer_length, " invalid. File offset: ", offset, @@ -282,12 +290,6 @@ Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile std::shared_ptr metadata = SliceBuffer(buffer, prefix_size, buffer->size() - prefix_size); - if (prefix_size == 4) { - // ARROW-6314: For old messages we copy the metadata to fix UBSAN - // issues with Flatbuffers. For new messages, they are already - // aligned - RETURN_NOT_OK(metadata->Copy(0, metadata->size(), &metadata)); - } return Message::ReadFrom(offset + metadata_length, metadata, file, message); } @@ -328,21 +330,17 @@ Status ReadMessage(io::InputStream* file, MemoryPool* pool, bool copy_metadata, reinterpret_cast(&continuation))); if (bytes_read != sizeof(int32_t)) { - // EOS + // EOS without indication *message = nullptr; return Status::OK(); } int32_t flatbuffer_length = -1; bool legacy_format = false; - if (continuation == kIpcContinuationToken) { + if (continuation == internal::kIpcContinuationToken) { // Valid IPC message, read the message length now RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read, reinterpret_cast(&flatbuffer_length))); - } else if (continuation == 0) { - // EOS - *message = nullptr; - return Status::OK(); } else { // ARROW-6314: Backwards compatibility for reading old IPC // messages produced prior to version 0.15.0 @@ -350,6 +348,12 @@ Status ReadMessage(io::InputStream* file, MemoryPool* pool, bool copy_metadata, legacy_format = true; } + if (flatbuffer_length == 0) { + // EOS + *message = nullptr; + return Status::OK(); + } + std::shared_ptr metadata; if (legacy_format || copy_metadata) { DCHECK_NE(pool, nullptr); @@ -394,7 +398,7 @@ Status WriteMessage(const Buffer& message, const IpcOptions& options, // ARROW-6314: Write continuation / padding token if (!options.write_legacy_ipc_format) { - RETURN_NOT_OK(file->Write(&kIpcContinuationToken, sizeof(int32_t))); + RETURN_NOT_OK(file->Write(&internal::kIpcContinuationToken, sizeof(int32_t))); } // Write the flatbuffer size prefix including padding diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h index 89be45e83ab..ffa93ca9c8c 100644 --- a/cpp/src/arrow/ipc/message.h +++ b/cpp/src/arrow/ipc/message.h @@ -91,7 +91,7 @@ class ARROW_EXPORT Message { /// \return Status /// /// \note If stream supports zero-copy, this is zero-copy - static Status ReadFrom(const std::shared_ptr& metadata, io::InputStream* stream, + static Status ReadFrom(std::shared_ptr metadata, io::InputStream* stream, std::unique_ptr* out); /// \brief Read message body from position in file, and create Message given @@ -103,7 +103,7 @@ class ARROW_EXPORT Message { /// \return Status /// /// \note If file supports zero-copy, this is zero-copy - static Status ReadFrom(const int64_t offset, const std::shared_ptr& metadata, + static Status ReadFrom(const int64_t offset, std::shared_ptr metadata, io::RandomAccessFile* file, std::unique_ptr* out); /// \brief Return true if message type and contents are equal diff --git a/cpp/src/arrow/ipc/metadata_internal.h b/cpp/src/arrow/ipc/metadata_internal.h index 420cfb1f890..803d67df46c 100644 --- a/cpp/src/arrow/ipc/metadata_internal.h +++ b/cpp/src/arrow/ipc/metadata_internal.h @@ -58,6 +58,9 @@ class DictionaryMemo; namespace internal { +// This 0xFFFFFFFF value is the first 4 bytes of a valid IPC message +constexpr int32_t kIpcContinuationToken = -1; + static constexpr flatbuf::MetadataVersion kCurrentMetadataVersion = flatbuf::MetadataVersion_V4; diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 71273006b07..97d9c8318dd 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -984,7 +984,8 @@ class RecordBatchPayloadWriter : public RecordBatchWriter { class StreamBookKeeper { public: - explicit StreamBookKeeper(io::OutputStream* sink) : sink_(sink), position_(-1) {} + explicit StreamBookKeeper(const IpcOptions& options, io::OutputStream* sink) + : options_(options), sink_(sink), position_(-1) {} Status UpdatePosition() { return sink_->Tell(&position_); } @@ -1013,11 +1014,15 @@ class StreamBookKeeper { Status WriteEOS() { // End of stream marker - constexpr int64_t kEos = 0; - return Write(&kEos, sizeof(kEos)); + constexpr int32_t kZeroLength = 0; + if (!options_.write_legacy_ipc_format) { + RETURN_NOT_OK(Write(&internal::kIpcContinuationToken, sizeof(int32_t))); + } + return Write(&kZeroLength, sizeof(int32_t)); } protected: + IpcOptions options_; io::OutputStream* sink_; int64_t position_; }; @@ -1028,7 +1033,7 @@ class PayloadStreamWriter : public internal::IpcPayloadWriter, protected StreamBookKeeper { public: PayloadStreamWriter(const IpcOptions& options, io::OutputStream* sink) - : StreamBookKeeper(sink), options_(options) {} + : StreamBookKeeper(options, sink) {} ~PayloadStreamWriter() override = default; @@ -1045,9 +1050,6 @@ class PayloadStreamWriter : public internal::IpcPayloadWriter, } Status Close() override { return WriteEOS(); } - - private: - IpcOptions options_; }; /// A IpcPayloadWriter implementation that writes to a IPC file @@ -1056,7 +1058,7 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo public: PayloadFileWriter(const IpcOptions& options, const std::shared_ptr& schema, io::OutputStream* sink) - : StreamBookKeeper(sink), options_(options), schema_(schema) {} + : StreamBookKeeper(options, sink), schema_(schema) {} ~PayloadFileWriter() override = default; @@ -1122,7 +1124,6 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo } protected: - IpcOptions options_; std::shared_ptr schema_; std::vector dictionaries_; std::vector record_batches_; diff --git a/docs/source/format/Columnar.rst b/docs/source/format/Columnar.rst index 47e10c68717..8411c2c54a2 100644 --- a/docs/source/format/Columnar.rst +++ b/docs/source/format/Columnar.rst @@ -979,15 +979,16 @@ a ``RecordBatch`` it should be defined in a ``DictionaryBatch``. :: ... - + When a stream reader implementation is reading a stream, after each message, it may read the next 8 bytes to determine both if the stream continues and the size of the message metadata that follows. Once the message flatbuffer is read, you can then read the message body. -The stream writer can signal end-of-stream (EOS) either by writing 8 -zero (`0x00`) bytes or closing the stream interface. +The stream writer can signal end-of-stream (EOS) either by writing 8 bytes +containing the 4-byte continuation indicator (``0xFFFFFFFF``) followed by 0 +metadata length (``0x00000000``) or closing the stream interface. IPC File Format ---------------