From 50ed2214e17337666f4ee573b8a851493be82f25 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 12 Jul 2020 03:17:16 +0000 Subject: [PATCH 1/3] initial commit --- cpp/src/arrow/ipc/message.cc | 18 ++++++++++-------- cpp/src/arrow/ipc/read_write_test.cc | 6 ++++++ cpp/src/arrow/ipc/reader.cc | 3 ++- cpp/src/arrow/ipc/writer.cc | 2 ++ 4 files changed, 20 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc index aeb106e1c70..dcf61ef0616 100644 --- a/cpp/src/arrow/ipc/message.cc +++ b/cpp/src/arrow/ipc/message.cc @@ -424,8 +424,9 @@ Status WriteMessage(const Buffer& message, const IpcWriteOptions& options, RETURN_NOT_OK(file->Write(&internal::kIpcContinuationToken, sizeof(int32_t))); } - // Write the flatbuffer size prefix including padding - int32_t padded_flatbuffer_size = padded_message_length - prefix_size; + // Write the flatbuffer size prefix including padding in little endian + int32_t padded_flatbuffer_size = + BitUtil::ToLittleEndian(padded_message_length - prefix_size); RETURN_NOT_OK(file->Write(&padded_flatbuffer_size, sizeof(int32_t))); // Write the flatbuffer @@ -577,18 +578,18 @@ class MessageDecoder::MessageDecoderImpl { } Status ConsumeInitialData(const uint8_t* data, int64_t size) { - return ConsumeInitial(util::SafeLoadAs(data)); + return ConsumeInitial(BitUtil::FromLittleEndian(util::SafeLoadAs(data))); } Status ConsumeInitialBuffer(const std::shared_ptr& buffer) { ARROW_ASSIGN_OR_RAISE(auto continuation, ConsumeDataBufferInt32(buffer)); - return ConsumeInitial(continuation); + return ConsumeInitial(BitUtil::FromLittleEndian(continuation)); } Status ConsumeInitialChunks() { int32_t continuation = 0; RETURN_NOT_OK(ConsumeDataChunks(sizeof(int32_t), &continuation)); - return ConsumeInitial(continuation); + return ConsumeInitial(BitUtil::FromLittleEndian(continuation)); } Status ConsumeInitial(int32_t continuation) { @@ -616,18 +617,19 @@ class MessageDecoder::MessageDecoderImpl { } Status ConsumeMetadataLengthData(const uint8_t* data, int64_t size) { - return ConsumeMetadataLength(util::SafeLoadAs(data)); + return ConsumeMetadataLength( + BitUtil::FromLittleEndian(util::SafeLoadAs(data))); } Status ConsumeMetadataLengthBuffer(const std::shared_ptr& buffer) { ARROW_ASSIGN_OR_RAISE(auto metadata_length, ConsumeDataBufferInt32(buffer)); - return ConsumeMetadataLength(metadata_length); + return ConsumeMetadataLength(BitUtil::FromLittleEndian(metadata_length)); } Status ConsumeMetadataLengthChunks() { int32_t metadata_length = 0; RETURN_NOT_OK(ConsumeDataChunks(sizeof(int32_t), &metadata_length)); - return ConsumeMetadataLength(metadata_length); + return ConsumeMetadataLength(BitUtil::FromLittleEndian(metadata_length)); } Status ConsumeMetadataLength(int32_t metadata_length) { diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 9e4f4c9e570..b68c49a1807 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -131,6 +131,11 @@ TEST_P(TestMessage, SerializeTo) { ASSERT_EQ(BitUtil::RoundUp(metadata->size() + prefix_size, alignment) + body_length, output_length); ASSERT_OK_AND_EQ(output_length, stream->Tell()); + ASSERT_OK_AND_ASSIGN(auto buffer, stream->Finish()); + // chech whether length is written in little endian + auto buffer_ptr = buffer.get()->data(); + ASSERT_EQ(output_length - body_length - prefix_size, + BitUtil::FromLittleEndian(*(uint32_t *)(buffer_ptr + 4))); }; CheckWithAlignment(8); @@ -1504,6 +1509,7 @@ TEST(TestIpcFileFormat, FooterMetaData) { ASSERT_OK(helper.Finish()); ASSERT_OK_AND_ASSIGN(auto out_metadata, helper.ReadFooterMetadata()); + ASSERT_TRUE(out_metadata->Equals(*metadata)); } diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc index 3c51fef033c..75f22139607 100644 --- a/cpp/src/arrow/ipc/reader.cc +++ b/cpp/src/arrow/ipc/reader.cc @@ -979,7 +979,8 @@ class RecordBatchFileReaderImpl : public RecordBatchFileReader { return Status::Invalid("Not an Arrow file"); } - int32_t footer_length = *reinterpret_cast(buffer->data()); + int32_t footer_length = + BitUtil::FromLittleEndian(*reinterpret_cast(buffer->data())); if (footer_length <= 0 || footer_length > footer_offset_ - magic_size * 2 - 4) { return Status::Invalid("File is smaller than indicated metadata size"); diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 292fe9cb2fc..d3af24d7f9a 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -1184,6 +1184,8 @@ class PayloadFileWriter : public internal::IpcPayloadWriter, protected StreamBoo return Status::Invalid("Invalid file footer"); } + // write footer length in little endian + footer_length = BitUtil::ToLittleEndian(footer_length); RETURN_NOT_OK(Write(&footer_length, sizeof(int32_t))); // Write magic bytes to end file From d4eaa57c3a418d3f38cd953e2f2d508c858993d9 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 12 Jul 2020 03:24:28 +0000 Subject: [PATCH 2/3] remove unnecessary change --- cpp/src/arrow/ipc/read_write_test.cc | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index b68c49a1807..d32332cf378 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -1509,7 +1509,6 @@ TEST(TestIpcFileFormat, FooterMetaData) { ASSERT_OK(helper.Finish()); ASSERT_OK_AND_ASSIGN(auto out_metadata, helper.ReadFooterMetadata()); - ASSERT_TRUE(out_metadata->Equals(*metadata)); } From 061b735309f29590a81ef6ca192c0f9c8f457f5d Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 12 Jul 2020 04:01:23 +0000 Subject: [PATCH 3/3] fix lint error --- cpp/src/arrow/ipc/read_write_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index d32332cf378..6ae76114b01 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -135,7 +135,7 @@ TEST_P(TestMessage, SerializeTo) { // chech whether length is written in little endian auto buffer_ptr = buffer.get()->data(); ASSERT_EQ(output_length - body_length - prefix_size, - BitUtil::FromLittleEndian(*(uint32_t *)(buffer_ptr + 4))); + BitUtil::FromLittleEndian(*(uint32_t*)(buffer_ptr + 4))); }; CheckWithAlignment(8);