diff --git a/cpp/examples/arrow/join_example.cc b/cpp/examples/arrow/join_example.cc index 17f709c720e..c1c6e5e82ff 100644 --- a/cpp/examples/arrow/join_example.cc +++ b/cpp/examples/arrow/join_example.cc @@ -64,7 +64,7 @@ arrow::Result> CreateDataSetFromCSVData std::string csv_data = is_left ? kLeftRelationCsvData : kRightRelationCsvData; std::cout << csv_data << std::endl; std::string_view sv = csv_data; - input = std::make_shared(sv); + input = arrow::io::BufferReader::FromString(std::string(sv)); auto read_options = arrow::csv::ReadOptions::Defaults(); auto parse_options = arrow::csv::ParseOptions::Defaults(); auto convert_options = arrow::csv::ConvertOptions::Defaults(); diff --git a/cpp/src/arrow/adapters/orc/adapter_test.cc b/cpp/src/arrow/adapters/orc/adapter_test.cc index 93cc4f4649d..73ecde6b9b5 100644 --- a/cpp/src/arrow/adapters/orc/adapter_test.cc +++ b/cpp/src/arrow/adapters/orc/adapter_test.cc @@ -483,9 +483,9 @@ TEST(TestAdapterRead, ReadCharAndVarcharType) { writer->add(*batch); writer->close(); - std::shared_ptr in_stream(std::make_shared( - reinterpret_cast(mem_stream.getData()), - static_cast(mem_stream.getLength()))); + std::shared_ptr in_stream = std::make_shared( + std::make_shared(reinterpret_cast(mem_stream.getData()), + mem_stream.getLength())); ASSERT_OK_AND_ASSIGN( auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_EQ(row_count, reader->NumberOfRows()); @@ -557,9 +557,9 @@ TEST(TestAdapterRead, ReadFieldAttributes) { auto writer = CreateWriter(/*stripe_size=*/1024, *orc_type, &mem_stream); writer->close(); - std::shared_ptr in_stream(std::make_shared( - reinterpret_cast(mem_stream.getData()), - static_cast(mem_stream.getLength()))); + std::shared_ptr in_stream = std::make_shared( + std::make_shared(reinterpret_cast(mem_stream.getData()), + mem_stream.getLength())); ASSERT_OK_AND_ASSIGN( auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool())); ASSERT_EQ(0, reader->NumberOfRows()); diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc index 99dc29cfe52..1bd789b7caf 100644 --- a/cpp/src/arrow/buffer.cc +++ b/cpp/src/arrow/buffer.cc @@ -114,14 +114,14 @@ void Buffer::CheckCPU() const { Result> Buffer::GetReader( std::shared_ptr buf) { - return buf->memory_manager_->GetBufferReader(buf); + return buf->memory_manager_->GetBufferReader(std::move(buf)); } Result> Buffer::GetWriter(std::shared_ptr buf) { if (!buf->is_mutable()) { return Status::Invalid("Expected mutable buffer"); } - return buf->memory_manager_->GetBufferWriter(buf); + return buf->memory_manager_->GetBufferWriter(std::move(buf)); } Result> Buffer::Copy(std::shared_ptr source, diff --git a/cpp/src/arrow/compute/function_internal.cc b/cpp/src/arrow/compute/function_internal.cc index cd73462e953..2ef1d265ea0 100644 --- a/cpp/src/arrow/compute/function_internal.cc +++ b/cpp/src/arrow/compute/function_internal.cc @@ -83,8 +83,10 @@ Result> GenericOptionsType::Deserialize( Result> DeserializeFunctionOptions( const Buffer& buffer) { - io::BufferReader stream(buffer); - ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(&stream)); + // Copying the buffer here is not ideal, but we need to do it to avoid + // use-after-free issues with the zero-copy buffer read. + auto stream = io::BufferReader::FromString(buffer.ToString()); + ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(stream.get())); ARROW_ASSIGN_OR_RAISE(auto batch, reader->ReadRecordBatch(0)); if (batch->num_rows() != 1) { return Status::Invalid( diff --git a/cpp/src/arrow/flight/sql/client.cc b/cpp/src/arrow/flight/sql/client.cc index db955574908..d0552e33df5 100644 --- a/cpp/src/arrow/flight/sql/client.cc +++ b/cpp/src/arrow/flight/sql/client.cc @@ -519,13 +519,17 @@ arrow::Result> PreparedStatement::ParseRespon std::shared_ptr dataset_schema; if (!serialized_dataset_schema.empty()) { - io::BufferReader dataset_schema_reader(serialized_dataset_schema); + // Create a non-owned Buffer to avoid copying + io::BufferReader dataset_schema_reader( + std::make_shared(serialized_dataset_schema)); ipc::DictionaryMemo in_memo; ARROW_ASSIGN_OR_RAISE(dataset_schema, ReadSchema(&dataset_schema_reader, &in_memo)); } std::shared_ptr parameter_schema; if (!serialized_parameter_schema.empty()) { - io::BufferReader parameter_schema_reader(serialized_parameter_schema); + // Create a non-owned Buffer to avoid copying + io::BufferReader parameter_schema_reader( + std::make_shared(serialized_parameter_schema)); ipc::DictionaryMemo in_memo; ARROW_ASSIGN_OR_RAISE(parameter_schema, ReadSchema(¶meter_schema_reader, &in_memo)); diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc index 8395919e4da..a24a9ef8758 100644 --- a/cpp/src/arrow/flight/types.cc +++ b/cpp/src/arrow/flight/types.cc @@ -152,7 +152,8 @@ Status FlightPayload::Validate() const { arrow::Result> SchemaResult::GetSchema( ipc::DictionaryMemo* dictionary_memo) const { - io::BufferReader schema_reader(raw_schema_); + // Create a non-owned Buffer to avoid copying + io::BufferReader schema_reader(std::make_shared(raw_schema_)); return ipc::ReadSchema(&schema_reader, dictionary_memo); } @@ -275,7 +276,8 @@ arrow::Result> FlightInfo::GetSchema( if (reconstructed_schema_) { return schema_; } - io::BufferReader schema_reader(data_.schema); + // Create a non-owned Buffer to avoid copying + io::BufferReader schema_reader(std::make_shared(data_.schema)); RETURN_NOT_OK(ipc::ReadSchema(&schema_reader, dictionary_memo).Value(&schema_)); reconstructed_schema_ = true; return schema_; diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index c3a9b06c05e..37c266d917f 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -269,14 +269,17 @@ BufferReader::BufferReader(std::shared_ptr buffer) is_open_(true) {} BufferReader::BufferReader(const uint8_t* data, int64_t size) - : buffer_(nullptr), data_(data), size_(size), position_(0), is_open_(true) {} + : BufferReader(std::make_shared(data, size)) {} BufferReader::BufferReader(const Buffer& buffer) - : BufferReader(buffer.data(), buffer.size()) {} + : BufferReader(std::make_shared(buffer.data(), buffer.size())) {} BufferReader::BufferReader(std::string_view data) - : BufferReader(reinterpret_cast(data.data()), - static_cast(data.size())) {} + : BufferReader(std::make_shared(data)) {} + +std::unique_ptr BufferReader::FromString(std::string data) { + return std::make_unique(Buffer::FromString(std::move(data))); +} Status BufferReader::DoClose() { is_open_ = false; diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index b8377be9a6f..d13e0714cbf 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -145,14 +145,28 @@ class ARROW_EXPORT FixedSizeBufferWriter : public WritableFile { class ARROW_EXPORT BufferReader : public internal::RandomAccessFileConcurrencyWrapper { public: + /// \brief Instantiate from std::shared_ptr. + /// + /// This is a zero-copy constructor. explicit BufferReader(std::shared_ptr buffer); + ARROW_DEPRECATED( + "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr " + "buffer) instead.") explicit BufferReader(const Buffer& buffer); + ARROW_DEPRECATED( + "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr " + "buffer) instead.") BufferReader(const uint8_t* data, int64_t size); - /// \brief Instantiate from std::string or std::string_view. Does not - /// own data + /// \brief Instantiate from std::string_view. Does not own data + ARROW_DEPRECATED( + "Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr " + "buffer) instead.") explicit BufferReader(std::string_view data); + /// \brief Instantiate from std::string. Owns data. + static std::unique_ptr FromString(std::string data); + bool closed() const override; bool supports_zero_copy() const override; diff --git a/cpp/src/arrow/io/memory_test.cc b/cpp/src/arrow/io/memory_test.cc index 22f9a02fdbe..bd898f17181 100644 --- a/cpp/src/arrow/io/memory_test.cc +++ b/cpp/src/arrow/io/memory_test.cc @@ -168,11 +168,11 @@ TEST(TestBufferReader, FromStrings) { std::string data = "data123456"; auto view = std::string_view(data); - BufferReader reader1(data); - BufferReader reader2(view); + std::unique_ptr reader1 = BufferReader::FromString(data); + BufferReader reader2(std::make_shared<::arrow::Buffer>(view)); std::shared_ptr piece; - ASSERT_OK_AND_ASSIGN(piece, reader1.Read(4)); + ASSERT_OK_AND_ASSIGN(piece, reader1->Read(4)); ASSERT_EQ(0, memcmp(piece->data(), data.data(), 4)); ASSERT_OK(reader2.Seek(2)); @@ -191,17 +191,17 @@ TEST(TestBufferReader, FromNullBuffer) { TEST(TestBufferReader, Seeking) { std::string data = "data123456"; - BufferReader reader(data); - ASSERT_OK_AND_EQ(0, reader.Tell()); + std::unique_ptr reader = BufferReader::FromString(data); + ASSERT_OK_AND_EQ(0, reader->Tell()); - ASSERT_OK(reader.Seek(9)); - ASSERT_OK_AND_EQ(9, reader.Tell()); + ASSERT_OK(reader->Seek(9)); + ASSERT_OK_AND_EQ(9, reader->Tell()); - ASSERT_OK(reader.Seek(10)); - ASSERT_OK_AND_EQ(10, reader.Tell()); + ASSERT_OK(reader->Seek(10)); + ASSERT_OK_AND_EQ(10, reader->Tell()); - ASSERT_RAISES(IOError, reader.Seek(11)); - ASSERT_OK_AND_EQ(10, reader.Tell()); + ASSERT_RAISES(IOError, reader->Seek(11)); + ASSERT_OK_AND_EQ(10, reader->Tell()); } TEST(TestBufferReader, Peek) { @@ -283,12 +283,37 @@ TEST(TestBufferReader, WillNeed) { } { std::string data = "data123456"; - BufferReader reader(reinterpret_cast(data.data()), - static_cast(data.size())); + auto reader = BufferReader::FromString(data); - ASSERT_OK(reader.WillNeed({{0, 4}, {4, 6}})); - ASSERT_RAISES(IOError, reader.WillNeed({{11, 1}})); // Out of bounds + ASSERT_OK(reader->WillNeed({{0, 4}, {4, 6}})); + ASSERT_RAISES(IOError, reader->WillNeed({{11, 1}})); // Out of bounds + } +} + +void TestBufferReaderLifetime( + std::function(std::string&)> fn) { + std::shared_ptr result; + std::string data = "data12345678910111213"; + { + std::string data_inner = data; + std::unique_ptr reader = fn(data_inner); + EXPECT_EQ(true, reader->supports_zero_copy()); + ASSERT_OK_AND_ASSIGN(result, reader->Read(data.length())); } + EXPECT_EQ(std::string_view(data), std::string_view(*result)); +} + +TEST(TestBufferReader, Lifetime) { + // BufferReader(std::shared_ptr) + TestBufferReaderLifetime([](std::string& data) -> std::unique_ptr { + auto buffer = Buffer::FromString(std::move(data)); + return std::make_unique(std::move(buffer)); + }); + + // BufferReader(std::string) + TestBufferReaderLifetime([](std::string& data) -> std::unique_ptr { + return BufferReader::FromString(std::move(data)); + }); } TEST(TestRandomAccessFile, GetStream) { @@ -730,7 +755,7 @@ TEST(RangeReadCache, Basics) { for (auto lazy : std::vector{false, true}) { SCOPED_TRACE(lazy); options.lazy = lazy; - auto file = std::make_shared(Buffer(data)); + auto file = std::make_shared(std::make_shared(data)); internal::ReadRangeCache cache(file, {}, options); ASSERT_OK(cache.Cache({{1, 2}, {3, 2}, {8, 2}, {20, 2}, {25, 0}})); @@ -808,7 +833,7 @@ TEST(RangeReadCache, Concurrency) { TEST(RangeReadCache, Lazy) { std::string data = "abcdefghijklmnopqrstuvwxyz"; - auto file = std::make_shared(Buffer(data)); + auto file = std::make_shared(std::make_shared(data)); CacheOptions options = CacheOptions::LazyDefaults(); options.hole_size_limit = 2; options.range_size_limit = 10; @@ -849,7 +874,7 @@ TEST(RangeReadCache, Lazy) { TEST(RangeReadCache, LazyWithPrefetching) { std::string data = "abcdefghijklmnopqrstuvwxyz"; - auto file = std::make_shared(Buffer(data)); + auto file = std::make_shared(std::make_shared(data)); CacheOptions options = CacheOptions::LazyDefaults(); options.hole_size_limit = 1; options.range_size_limit = 3; diff --git a/cpp/src/arrow/ipc/read_write_test.cc b/cpp/src/arrow/ipc/read_write_test.cc index 6b4f7076370..69b827b8fe7 100644 --- a/cpp/src/arrow/ipc/read_write_test.cc +++ b/cpp/src/arrow/ipc/read_write_test.cc @@ -574,12 +574,12 @@ TEST_F(TestIpcRoundTrip, SpecificMetadataVersion) { TEST(TestReadMessage, CorruptedSmallInput) { std::string data = "abc"; - io::BufferReader reader(data); - ASSERT_RAISES(Invalid, ReadMessage(&reader)); + auto reader = io::BufferReader::FromString(data); + ASSERT_RAISES(Invalid, ReadMessage(reader.get())); // But no error on unsignaled EOS - io::BufferReader reader2(""); - ASSERT_OK_AND_ASSIGN(auto message, ReadMessage(&reader2)); + auto reader2 = io::BufferReader::FromString(""); + ASSERT_OK_AND_ASSIGN(auto message, ReadMessage(reader2.get())); ASSERT_EQ(nullptr, message); } diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h index 9440fcaba92..0d7ae222640 100644 --- a/cpp/src/arrow/ipc/reader.h +++ b/cpp/src/arrow/ipc/reader.h @@ -190,7 +190,7 @@ class ARROW_EXPORT RecordBatchFileReader /// \return the read batch virtual Result> ReadRecordBatch(int i) = 0; - /// \brief Read a particular record batch along with its custom metadada from the file. + /// \brief Read a particular record batch along with its custom metadata from the file. /// Does not copy memory if the input source supports zero-copy. /// /// \param[in] i the index of the record batch to return