From c32e1ec0ed1b03f407d2ae9109a086376036c5de Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 24 Aug 2023 22:12:04 +0800 Subject: [PATCH 1/8] Add FromString to ::arrow::io::BufferReader --- cpp/src/arrow/buffer.cc | 4 ++-- cpp/src/arrow/io/memory.cc | 4 ++++ cpp/src/arrow/io/memory.h | 5 +++-- cpp/src/arrow/io/memory_test.cc | 31 +++++++++++++++++++++++++++++++ cpp/src/arrow/ipc/reader.h | 2 +- 5 files changed, 41 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc index 99dc29cfe52..1bd789b7caf 100644 --- a/cpp/src/arrow/buffer.cc +++ b/cpp/src/arrow/buffer.cc @@ -114,14 +114,14 @@ void Buffer::CheckCPU() const { Result> Buffer::GetReader( std::shared_ptr buf) { - return buf->memory_manager_->GetBufferReader(buf); + return buf->memory_manager_->GetBufferReader(std::move(buf)); } Result> Buffer::GetWriter(std::shared_ptr buf) { if (!buf->is_mutable()) { return Status::Invalid("Expected mutable buffer"); } - return buf->memory_manager_->GetBufferWriter(buf); + return buf->memory_manager_->GetBufferWriter(std::move(buf)); } Result> Buffer::Copy(std::shared_ptr source, diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index c3a9b06c05e..a0bd8332065 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -278,6 +278,10 @@ BufferReader::BufferReader(std::string_view data) : BufferReader(reinterpret_cast(data.data()), static_cast(data.size())) {} +std::unique_ptr BufferReader::FromString(std::string data) { + return std::make_unique(Buffer::FromString(std::move(data))); +} + Status BufferReader::DoClose() { is_open_ = false; return Status::OK(); diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index b8377be9a6f..abb2e1b50bf 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -149,10 +149,11 @@ class ARROW_EXPORT BufferReader explicit BufferReader(const Buffer& buffer); BufferReader(const uint8_t* data, int64_t size); - /// \brief Instantiate from std::string or std::string_view. Does not - /// own data + /// \brief Instantiate from std::string_view. Does not own data explicit BufferReader(std::string_view data); + static std::unique_ptr FromString(std::string data); + bool closed() const override; bool supports_zero_copy() const override; diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc index 22f9a02fdbe..0940d009a16 100644 --- a/cpp/src/arrow/io/memory_test.cc +++ b/cpp/src/arrow/io/memory_test.cc @@ -291,6 +291,37 @@ TEST(TestBufferReader, WillNeed) { } } +void TestBufferReaderLifetime( + std::function(std::string&)> fn, + bool supports_zero_copy) { + std::shared_ptr result; + std::string data = "data12345678910111213"; + { + std::string data_inner = data; + std::unique_ptr reader = fn(data_inner); + EXPECT_EQ(supports_zero_copy, reader->supports_zero_copy()); + ASSERT_OK_AND_ASSIGN(result, reader->Read(data.length())); + } + EXPECT_EQ(std::string_view(data), std::string_view(*result)); +} + +TEST(TestBufferReader, Lifetime) { + // BufferReader(std::shared_ptr) + TestBufferReaderLifetime( + [](std::string& data) -> std::unique_ptr { + auto buffer = Buffer::FromString(std::move(data)); + return std::make_unique(std::move(buffer)); + }, + /*supports_zero_copy=*/true); + + // BufferReader(std::string) + TestBufferReaderLifetime( + [](std::string& data) -> std::unique_ptr { + return BufferReader::FromString(std::move(data)); + }, + /*supports_zero_copy=*/true); +} + TEST(TestRandomAccessFile, GetStream) { std::string data = "data1data2data3data4data5"; diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 9440fcaba92..0d7ae222640 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -190,7 +190,7 @@ class ARROW_EXPORT RecordBatchFileReader /// \return the read batch virtual Result> ReadRecordBatch(int i) = 0; - /// \brief Read a particular record batch along with its custom metadada from the file. + /// \brief Read a particular record batch along with its custom metadata from the file. /// Does not copy memory if the input source supports zero-copy. /// /// \param[in] i the index of the record batch to return From f74b8d258a846ac8c0062235f547f6c412db02a2 Mon Sep 17 00:00:00 2001 From: mwish Date: Thu, 24 Aug 2023 22:29:11 +0800 Subject: [PATCH 2/8] deprecate previous ctors --- cpp/src/arrow/adapters/orc/adapter_test.cc | 10 +++----- cpp/src/arrow/compute/function_internal.cc | 3 ++- cpp/src/arrow/io/memory.h | 9 +++++++ cpp/src/arrow/io/memory_test.cc | 29 +++++++++++----------- 4 files changed, 29 insertions(+), 22 deletions(-) diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc index 93cc4f4649d..ff70e771251 100644 --- a/cpp/src/arrow/adapters/orc/adapter_test.cc +++ b/cpp/src/arrow/adapters/orc/adapter_test.cc @@ -483,9 +483,8 @@ TEST(TestAdapterRead, ReadCharAndVarcharType) { writer->add(*batch); writer->close(); - std::shared_ptr in_stream(std::make_shared( - reinterpret_cast(mem_stream.getData()), - static_cast(mem_stream.getLength()))); + std::shared_ptr in_stream(io::BufferReader::FromString( + std::string(mem_stream.getData(), mem_stream.getLength()))); ASSERT_OK_AND_ASSIGN( auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_EQ(row_count, reader->NumberOfRows()); @@ -557,9 +556,8 @@ TEST(TestAdapterRead, ReadFieldAttributes) { auto writer = CreateWriter(/*stripe_size=*/1024, *orc_type, &mem_stream); writer->close(); - std::shared_ptr in_stream(std::make_shared( - reinterpret_cast(mem_stream.getData()), - static_cast(mem_stream.getLength()))); + std::shared_ptr in_stream(io::BufferReader::FromString( + std::string(mem_stream.getData(), mem_stream.getLength()))); ASSERT_OK_AND_ASSIGN( auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_EQ(0, reader->NumberOfRows()); diff --git a/cpp/src/arrow/compute/function_internal.cc b/cpp/src/arrow/compute/function_internal.cc index cd73462e953..72e13e9b72b 100644 --- a/cpp/src/arrow/compute/function_internal.cc +++ b/cpp/src/arrow/compute/function_internal.cc @@ -83,7 +83,8 @@ Result> GenericOptionsType::Deserialize( Result> DeserializeFunctionOptions( const Buffer& buffer) { - io::BufferReader stream(buffer); + // Create a non-owned Buffer to avoid copying + io::BufferReader stream(std::make_shared(std::string_view(buffer))); ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(&stream)); ARROW_ASSIGN_OR_RAISE(auto batch, reader->ReadRecordBatch(0)); if (batch->num_rows() != 1) { diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index abb2e1b50bf..385180054fe 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -146,10 +146,19 @@ class ARROW_EXPORT BufferReader : public internal::RandomAccessFileConcurrencyWrapper { public: explicit BufferReader(std::shared_ptr buffer); + ARROW_DEPRECATED( + "Deprecated in 12.0.0. Use FromString or BufferReader(std::shared_ptr " + "buffer) instead.") explicit BufferReader(const Buffer& buffer); + ARROW_DEPRECATED( + "Deprecated in 12.0.0. Use FromString or BufferReader(std::shared_ptr " + "buffer) instead.") BufferReader(const uint8_t* data, int64_t size); /// \brief Instantiate from std::string_view. Does not own data + ARROW_DEPRECATED( + "Deprecated in 12.0.0. Use FromString or BufferReader(std::shared_ptr " + "buffer) instead.") explicit BufferReader(std::string_view data); static std::unique_ptr FromString(std::string data); diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc index 0940d009a16..bc97e91a8cd 100644 --- a/cpp/src/arrow/io/memory_test.cc +++ b/cpp/src/arrow/io/memory_test.cc @@ -168,11 +168,11 @@ TEST(TestBufferReader, FromStrings) { std::string data = "data123456"; auto view = std::string_view(data); - BufferReader reader1(data); - BufferReader reader2(view); + std::unique_ptr reader1 = BufferReader::FromString(data); + BufferReader reader2(std::make_shared<::arrow::Buffer>(view)); std::shared_ptr piece; - ASSERT_OK_AND_ASSIGN(piece, reader1.Read(4)); + ASSERT_OK_AND_ASSIGN(piece, reader1->Read(4)); ASSERT_EQ(0, memcmp(piece->data(), data.data(), 4)); ASSERT_OK(reader2.Seek(2)); @@ -191,17 +191,17 @@ TEST(TestBufferReader, FromNullBuffer) { TEST(TestBufferReader, Seeking) { std::string data = "data123456"; - BufferReader reader(data); - ASSERT_OK_AND_EQ(0, reader.Tell()); + std::unique_ptr reader = BufferReader::FromString(data); + ASSERT_OK_AND_EQ(0, reader->Tell()); - ASSERT_OK(reader.Seek(9)); - ASSERT_OK_AND_EQ(9, reader.Tell()); + ASSERT_OK(reader->Seek(9)); + ASSERT_OK_AND_EQ(9, reader->Tell()); - ASSERT_OK(reader.Seek(10)); - ASSERT_OK_AND_EQ(10, reader.Tell()); + ASSERT_OK(reader->Seek(10)); + ASSERT_OK_AND_EQ(10, reader->Tell()); - ASSERT_RAISES(IOError, reader.Seek(11)); - ASSERT_OK_AND_EQ(10, reader.Tell()); + ASSERT_RAISES(IOError, reader->Seek(11)); + ASSERT_OK_AND_EQ(10, reader->Tell()); } TEST(TestBufferReader, Peek) { @@ -283,11 +283,10 @@ TEST(TestBufferReader, WillNeed) { } { std::string data = "data123456"; - BufferReader reader(reinterpret_cast(data.data()), - static_cast(data.size())); + auto reader = BufferReader::FromString(data); - ASSERT_OK(reader.WillNeed({{0, 4}, {4, 6}})); - ASSERT_RAISES(IOError, reader.WillNeed({{11, 1}})); // Out of bounds + ASSERT_OK(reader->WillNeed({{0, 4}, {4, 6}})); + ASSERT_RAISES(IOError, reader->WillNeed({{11, 1}})); // Out of bounds } } From cbe761eec4cb553e3c51e0fdedeec8370c35e8de Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 25 Aug 2023 00:35:58 +0800 Subject: [PATCH 3/8] try to fix CI --- cpp/src/arrow/io/memory.cc | 7 +++---- cpp/src/arrow/io/memory.h | 6 +++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index a0bd8332065..37c266d917f 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -269,14 +269,13 @@ BufferReader::BufferReader(std::shared_ptr buffer) is_open_(true) {} BufferReader::BufferReader(const uint8_t* data, int64_t size) - : buffer_(nullptr), data_(data), size_(size), position_(0), is_open_(true) {} + : BufferReader(std::make_shared(data, size)) {} BufferReader::BufferReader(const Buffer& buffer) - : BufferReader(buffer.data(), buffer.size()) {} + : BufferReader(std::make_shared(buffer.data(), buffer.size())) {} BufferReader::BufferReader(std::string_view data) - : BufferReader(reinterpret_cast(data.data()), - static_cast(data.size())) {} + : BufferReader(std::make_shared(data)) {} std::unique_ptr BufferReader::FromString(std::string data) { return std::make_unique(Buffer::FromString(std::move(data))); diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index 385180054fe..84e5231ba5d 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -147,17 +147,17 @@ class ARROW_EXPORT BufferReader public: explicit BufferReader(std::shared_ptr buffer); ARROW_DEPRECATED( - "Deprecated in 12.0.0. Use FromString or BufferReader(std::shared_ptr " + "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr " "buffer) instead.") explicit BufferReader(const Buffer& buffer); ARROW_DEPRECATED( - "Deprecated in 12.0.0. Use FromString or BufferReader(std::shared_ptr " + "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr " "buffer) instead.") BufferReader(const uint8_t* data, int64_t size); /// \brief Instantiate from std::string_view. Does not own data ARROW_DEPRECATED( - "Deprecated in 12.0.0. Use FromString or BufferReader(std::shared_ptr " + "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr " "buffer) instead.") explicit BufferReader(std::string_view data); From f79a72fc00e10cacdcfba52ac39b29c52e44f1b2 Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 25 Aug 2023 01:33:51 +0800 Subject: [PATCH 4/8] tiny fix --- cpp/src/arrow/flight/sql/client.cc | 8 ++++++-- cpp/src/arrow/flight/types.cc | 6 ++++-- cpp/src/arrow/io/memory_test.cc | 6 +++--- cpp/src/arrow/ipc/read_write_test.cc | 8 ++++---- 4 files changed, 17 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/flight/sql/client.cc b/cpp/src/arrow/flight/sql/client.cc index db955574908..d0552e33df5 100644 --- a/cpp/src/arrow/flight/sql/client.cc +++ b/cpp/src/arrow/flight/sql/client.cc @@ -519,13 +519,17 @@ arrow::Result> PreparedStatement::ParseRespon std::shared_ptr dataset_schema; if (!serialized_dataset_schema.empty()) { - io::BufferReader dataset_schema_reader(serialized_dataset_schema); + // Create a non-owned Buffer to avoid copying + io::BufferReader dataset_schema_reader( + std::make_shared(serialized_dataset_schema)); ipc::DictionaryMemo in_memo; ARROW_ASSIGN_OR_RAISE(dataset_schema, ReadSchema(&dataset_schema_reader, &in_memo)); } std::shared_ptr parameter_schema; if (!serialized_parameter_schema.empty()) { - io::BufferReader parameter_schema_reader(serialized_parameter_schema); + // Create a non-owned Buffer to avoid copying + io::BufferReader parameter_schema_reader( + std::make_shared(serialized_parameter_schema)); ipc::DictionaryMemo in_memo; ARROW_ASSIGN_OR_RAISE(parameter_schema, ReadSchema(¶meter_schema_reader, &in_memo)); diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc index 8395919e4da..a24a9ef8758 100644 --- a/cpp/src/arrow/flight/types.cc +++ b/cpp/src/arrow/flight/types.cc @@ -152,7 +152,8 @@ Status FlightPayload::Validate() const { arrow::Result> SchemaResult::GetSchema( ipc::DictionaryMemo* dictionary_memo) const { - io::BufferReader schema_reader(raw_schema_); + // Create a non-owned Buffer to avoid copying + io::BufferReader schema_reader(std::make_shared(raw_schema_)); return ipc::ReadSchema(&schema_reader, dictionary_memo); } @@ -275,7 +276,8 @@ arrow::Result> FlightInfo::GetSchema( if (reconstructed_schema_) { return schema_; } - io::BufferReader schema_reader(data_.schema); + // Create a non-owned Buffer to avoid copying + io::BufferReader schema_reader(std::make_shared(data_.schema)); RETURN_NOT_OK(ipc::ReadSchema(&schema_reader, dictionary_memo).Value(&schema_)); reconstructed_schema_ = true; return schema_; diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc index bc97e91a8cd..0ba4eadb5f5 100644 --- a/cpp/src/arrow/io/memory_test.cc +++ b/cpp/src/arrow/io/memory_test.cc @@ -760,7 +760,7 @@ TEST(RangeReadCache, Basics) { for (auto lazy : std::vector{false, true}) { SCOPED_TRACE(lazy); options.lazy = lazy; - auto file = std::make_shared(Buffer(data)); + auto file = std::make_shared(std::make_shared(data)); internal::ReadRangeCache cache(file, {}, options); ASSERT_OK(cache.Cache({{1, 2}, {3, 2}, {8, 2}, {20, 2}, {25, 0}})); @@ -838,7 +838,7 @@ TEST(RangeReadCache, Concurrency) { TEST(RangeReadCache, Lazy) { std::string data = "abcdefghijklmnopqrstuvwxyz"; - auto file = std::make_shared(Buffer(data)); + auto file = std::make_shared(std::make_shared(data)); CacheOptions options = CacheOptions::LazyDefaults(); options.hole_size_limit = 2; options.range_size_limit = 10; @@ -879,7 +879,7 @@ TEST(RangeReadCache, Lazy) { TEST(RangeReadCache, LazyWithPrefetching) { std::string data = "abcdefghijklmnopqrstuvwxyz"; - auto file = std::make_shared(Buffer(data)); + auto file = std::make_shared(std::make_shared(data)); CacheOptions options = CacheOptions::LazyDefaults(); options.hole_size_limit = 1; options.range_size_limit = 3; diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 6b4f7076370..69b827b8fe7 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -574,12 +574,12 @@ TEST_F(TestIpcRoundTrip, SpecificMetadataVersion) { TEST(TestReadMessage, CorruptedSmallInput) { std::string data = "abc"; - io::BufferReader reader(data); - ASSERT_RAISES(Invalid, ReadMessage(&reader)); + auto reader = io::BufferReader::FromString(data); + ASSERT_RAISES(Invalid, ReadMessage(reader.get())); // But no error on unsignaled EOS - io::BufferReader reader2(""); - ASSERT_OK_AND_ASSIGN(auto message, ReadMessage(&reader2)); + auto reader2 = io::BufferReader::FromString(""); + ASSERT_OK_AND_ASSIGN(auto message, ReadMessage(reader2.get())); ASSERT_EQ(nullptr, message); } From af6e0fef787fed15c105bcdd1e8179610b41e1d8 Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 25 Aug 2023 11:12:17 +0800 Subject: [PATCH 5/8] tiny update --- cpp/examples/arrow/join_example.cc | 2 +- cpp/src/arrow/adapters/orc/adapter_test.cc | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/cpp/examples/arrow/join_example.cc b/cpp/examples/arrow/join_example.cc index 17f709c720e..c1c6e5e82ff 100644 --- a/cpp/examples/arrow/join_example.cc +++ b/cpp/examples/arrow/join_example.cc @@ -64,7 +64,7 @@ arrow::Result> CreateDataSetFromCSVData std::string csv_data = is_left ? kLeftRelationCsvData : kRightRelationCsvData; std::cout << csv_data << std::endl; std::string_view sv = csv_data; - input = std::make_shared(sv); + input = arrow::io::BufferReader::FromString(std::string(sv)); auto read_options = arrow::csv::ReadOptions::Defaults(); auto parse_options = arrow::csv::ParseOptions::Defaults(); auto convert_options = arrow::csv::ConvertOptions::Defaults(); diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc index ff70e771251..73ecde6b9b5 100644 --- a/cpp/src/arrow/adapters/orc/adapter_test.cc +++ b/cpp/src/arrow/adapters/orc/adapter_test.cc @@ -483,8 +483,9 @@ TEST(TestAdapterRead, ReadCharAndVarcharType) { writer->add(*batch); writer->close(); - std::shared_ptr in_stream(io::BufferReader::FromString( - std::string(mem_stream.getData(), mem_stream.getLength()))); + std::shared_ptr in_stream = std::make_shared( + std::make_shared(reinterpret_cast(mem_stream.getData()), + mem_stream.getLength())); ASSERT_OK_AND_ASSIGN( auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_EQ(row_count, reader->NumberOfRows()); @@ -556,8 +557,9 @@ TEST(TestAdapterRead, ReadFieldAttributes) { auto writer = CreateWriter(/*stripe_size=*/1024, *orc_type, &mem_stream); writer->close(); - std::shared_ptr in_stream(io::BufferReader::FromString( - std::string(mem_stream.getData(), mem_stream.getLength()))); + std::shared_ptr in_stream = std::make_shared( + std::make_shared(reinterpret_cast(mem_stream.getData()), + mem_stream.getLength())); ASSERT_OK_AND_ASSIGN( auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_EQ(0, reader->NumberOfRows()); From fa7b1f99f0f2fb92d743b35048339da567658d17 Mon Sep 17 00:00:00 2001 From: mwish Date: Fri, 25 Aug 2023 14:39:37 +0800 Subject: [PATCH 6/8] add docstring --- cpp/src/arrow/io/memory.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index 84e5231ba5d..35282f79c17 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -145,6 +145,9 @@ class ARROW_EXPORT FixedSizeBufferWriter : public WritableFile { class ARROW_EXPORT BufferReader : public internal::RandomAccessFileConcurrencyWrapper { public: + /// \brief Instantiate from std::shared_ptr. + /// User should guarantee that the buffer live longer than the BufferReader and + /// zero-copy Buffer result. explicit BufferReader(std::shared_ptr buffer); ARROW_DEPRECATED( "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr " @@ -161,6 +164,7 @@ class ARROW_EXPORT BufferReader "buffer) instead.") explicit BufferReader(std::string_view data); + /// \brief Instantiate from std::string. Owns data. static std::unique_ptr FromString(std::string data); bool closed() const override; From 94d1bc4b19ead2db337939810c3e21e8d9d2f8cc Mon Sep 17 00:00:00 2001 From: mwish Date: Tue, 29 Aug 2023 00:24:50 +0800 Subject: [PATCH 7/8] Resolve comment --- cpp/src/arrow/compute/function_internal.cc | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/function_internal.cc b/cpp/src/arrow/compute/function_internal.cc index 72e13e9b72b..ac61821f728 100644 --- a/cpp/src/arrow/compute/function_internal.cc +++ b/cpp/src/arrow/compute/function_internal.cc @@ -83,9 +83,10 @@ Result> GenericOptionsType::Deserialize( Result> DeserializeFunctionOptions( const Buffer& buffer) { - // Create a non-owned Buffer to avoid copying - io::BufferReader stream(std::make_shared(std::string_view(buffer))); - ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(&stream)); + // Copying the buffer here is not ideal, but we need to do it to avoid + // lifetime issues with the zero-copy buffer read. + auto stream = io::BufferReader::FromString(buffer.ToString()); + ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(stream.get())); ARROW_ASSIGN_OR_RAISE(auto batch, reader->ReadRecordBatch(0)); if (batch->num_rows() != 1) { return Status::Invalid( From 30b4cca5769d7f67abc48730a0c3f1ec35069716 Mon Sep 17 00:00:00 2001 From: mwish Date: Wed, 30 Aug 2023 16:16:09 +0800 Subject: [PATCH 8/8] fix comment --- cpp/src/arrow/compute/function_internal.cc | 2 +- cpp/src/arrow/io/memory.h | 4 ++-- cpp/src/arrow/io/memory_test.cc | 23 +++++++++------------- 3 files changed, 12 insertions(+), 17 deletions(-) diff --git a/cpp/src/arrow/compute/function_internal.cc b/cpp/src/arrow/compute/function_internal.cc index ac61821f728..2ef1d265ea0 100644 --- a/cpp/src/arrow/compute/function_internal.cc +++ b/cpp/src/arrow/compute/function_internal.cc @@ -84,7 +84,7 @@ Result> GenericOptionsType::Deserialize( Result> DeserializeFunctionOptions( const Buffer& buffer) { // Copying the buffer here is not ideal, but we need to do it to avoid - // lifetime issues with the zero-copy buffer read. + // use-after-free issues with the zero-copy buffer read. auto stream = io::BufferReader::FromString(buffer.ToString()); ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(stream.get())); ARROW_ASSIGN_OR_RAISE(auto batch, reader->ReadRecordBatch(0)); diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index 35282f79c17..d13e0714cbf 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -146,8 +146,8 @@ class ARROW_EXPORT BufferReader : public internal::RandomAccessFileConcurrencyWrapper { public: /// \brief Instantiate from std::shared_ptr. - /// User should guarantee that the buffer live longer than the BufferReader and - /// zero-copy Buffer result. + /// + /// This is a zero-copy constructor. explicit BufferReader(std::shared_ptr buffer); ARROW_DEPRECATED( "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr " diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc index 0ba4eadb5f5..bd898f17181 100644 --- a/cpp/src/arrow/io/memory_test.cc +++ b/cpp/src/arrow/io/memory_test.cc @@ -291,14 +291,13 @@ TEST(TestBufferReader, WillNeed) { } void TestBufferReaderLifetime( - std::function(std::string&)> fn, - bool supports_zero_copy) { + std::function(std::string&)> fn) { std::shared_ptr result; std::string data = "data12345678910111213"; { std::string data_inner = data; std::unique_ptr reader = fn(data_inner); - EXPECT_EQ(supports_zero_copy, reader->supports_zero_copy()); + EXPECT_EQ(true, reader->supports_zero_copy()); ASSERT_OK_AND_ASSIGN(result, reader->Read(data.length())); } EXPECT_EQ(std::string_view(data), std::string_view(*result)); @@ -306,19 +305,15 @@ void TestBufferReaderLifetime( TEST(TestBufferReader, Lifetime) { // BufferReader(std::shared_ptr) - TestBufferReaderLifetime( - [](std::string& data) -> std::unique_ptr { - auto buffer = Buffer::FromString(std::move(data)); - return std::make_unique(std::move(buffer)); - }, - /*supports_zero_copy=*/true); + TestBufferReaderLifetime([](std::string& data) -> std::unique_ptr { + auto buffer = Buffer::FromString(std::move(data)); + return std::make_unique(std::move(buffer)); + }); // BufferReader(std::string) - TestBufferReaderLifetime( - [](std::string& data) -> std::unique_ptr { - return BufferReader::FromString(std::move(data)); - }, - /*supports_zero_copy=*/true); + TestBufferReaderLifetime([](std::string& data) -> std::unique_ptr { + return BufferReader::FromString(std::move(data)); + }); } TEST(TestRandomAccessFile, GetStream) {