Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/examples/arrow/join_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ arrow::Result<std::shared_ptr<arrow::dataset::Dataset>> CreateDataSetFromCSVData
std::string csv_data = is_left ? kLeftRelationCsvData : kRightRelationCsvData;
std::cout << csv_data << std::endl;
std::string_view sv = csv_data;
input = std::make_shared<arrow::io::BufferReader>(sv);
input = arrow::io::BufferReader::FromString(std::string(sv));
auto read_options = arrow::csv::ReadOptions::Defaults();
auto parse_options = arrow::csv::ParseOptions::Defaults();
auto convert_options = arrow::csv::ConvertOptions::Defaults();
Expand Down
12 changes: 6 additions & 6 deletions cpp/src/arrow/adapters/orc/adapter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -483,9 +483,9 @@ TEST(TestAdapterRead, ReadCharAndVarcharType) {
writer->add(*batch);
writer->close();

std::shared_ptr<io::RandomAccessFile> in_stream(std::make_shared<io::BufferReader>(
reinterpret_cast<const uint8_t*>(mem_stream.getData()),
static_cast<int64_t>(mem_stream.getLength())));
std::shared_ptr<io::RandomAccessFile> in_stream = std::make_shared<io::BufferReader>(
std::make_shared<Buffer>(reinterpret_cast<const uint8_t*>(mem_stream.getData()),
mem_stream.getLength()));
ASSERT_OK_AND_ASSIGN(
auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool()));
ASSERT_EQ(row_count, reader->NumberOfRows());
Expand Down Expand Up @@ -557,9 +557,9 @@ TEST(TestAdapterRead, ReadFieldAttributes) {
auto writer = CreateWriter(/*stripe_size=*/1024, *orc_type, &mem_stream);
writer->close();

std::shared_ptr<io::RandomAccessFile> in_stream(std::make_shared<io::BufferReader>(
reinterpret_cast<const uint8_t*>(mem_stream.getData()),
static_cast<int64_t>(mem_stream.getLength())));
std::shared_ptr<io::RandomAccessFile> in_stream = std::make_shared<io::BufferReader>(
std::make_shared<Buffer>(reinterpret_cast<const uint8_t*>(mem_stream.getData()),
mem_stream.getLength()));
ASSERT_OK_AND_ASSIGN(
auto reader, adapters::orc::ORCFileReader::Open(in_stream, default_memory_pool()));
ASSERT_EQ(0, reader->NumberOfRows());
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,14 @@ void Buffer::CheckCPU() const {

Result<std::shared_ptr<io::RandomAccessFile>> Buffer::GetReader(
std::shared_ptr<Buffer> buf) {
return buf->memory_manager_->GetBufferReader(buf);
return buf->memory_manager_->GetBufferReader(std::move(buf));
}

Result<std::shared_ptr<io::OutputStream>> Buffer::GetWriter(std::shared_ptr<Buffer> buf) {
if (!buf->is_mutable()) {
return Status::Invalid("Expected mutable buffer");
}
return buf->memory_manager_->GetBufferWriter(buf);
return buf->memory_manager_->GetBufferWriter(std::move(buf));
}

Result<std::shared_ptr<Buffer>> Buffer::Copy(std::shared_ptr<Buffer> source,
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/compute/function_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ Result<std::unique_ptr<FunctionOptions>> GenericOptionsType::Deserialize(

Result<std::unique_ptr<FunctionOptions>> DeserializeFunctionOptions(
const Buffer& buffer) {
io::BufferReader stream(buffer);
ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(&stream));
// Copying the buffer here is not ideal, but we need to do it to avoid
// use-after-free issues with the zero-copy buffer read.
auto stream = io::BufferReader::FromString(buffer.ToString());
ARROW_ASSIGN_OR_RAISE(auto reader, ipc::RecordBatchFileReader::Open(stream.get()));
ARROW_ASSIGN_OR_RAISE(auto batch, reader->ReadRecordBatch(0));
if (batch->num_rows() != 1) {
return Status::Invalid(
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/arrow/flight/sql/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -519,13 +519,17 @@ arrow::Result<std::shared_ptr<PreparedStatement>> PreparedStatement::ParseRespon

std::shared_ptr<Schema> dataset_schema;
if (!serialized_dataset_schema.empty()) {
io::BufferReader dataset_schema_reader(serialized_dataset_schema);
// Create a non-owned Buffer to avoid copying
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lidavidm do you think this is ok? Seems it only return ReadSchema here, I think just keep zero-copy is ok?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be OK.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My above lifetime comment does not apply here since there's no way the deserialized Schema could reference memory in the buffer we're reading; zero copy lgtm

io::BufferReader dataset_schema_reader(
std::make_shared<Buffer>(serialized_dataset_schema));
ipc::DictionaryMemo in_memo;
ARROW_ASSIGN_OR_RAISE(dataset_schema, ReadSchema(&dataset_schema_reader, &in_memo));
}
std::shared_ptr<Schema> parameter_schema;
if (!serialized_parameter_schema.empty()) {
io::BufferReader parameter_schema_reader(serialized_parameter_schema);
// Create a non-owned Buffer to avoid copying
io::BufferReader parameter_schema_reader(
std::make_shared<Buffer>(serialized_parameter_schema));
ipc::DictionaryMemo in_memo;
ARROW_ASSIGN_OR_RAISE(parameter_schema,
ReadSchema(&parameter_schema_reader, &in_memo));
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ Status FlightPayload::Validate() const {

arrow::Result<std::shared_ptr<Schema>> SchemaResult::GetSchema(
ipc::DictionaryMemo* dictionary_memo) const {
io::BufferReader schema_reader(raw_schema_);
// Create a non-owned Buffer to avoid copying
io::BufferReader schema_reader(std::make_shared<Buffer>(raw_schema_));
return ipc::ReadSchema(&schema_reader, dictionary_memo);
}

Expand Down Expand Up @@ -275,7 +276,8 @@ arrow::Result<std::shared_ptr<Schema>> FlightInfo::GetSchema(
if (reconstructed_schema_) {
return schema_;
}
io::BufferReader schema_reader(data_.schema);
// Create a non-owned Buffer to avoid copying
io::BufferReader schema_reader(std::make_shared<Buffer>(data_.schema));
RETURN_NOT_OK(ipc::ReadSchema(&schema_reader, dictionary_memo).Value(&schema_));
reconstructed_schema_ = true;
return schema_;
Expand Down
11 changes: 7 additions & 4 deletions cpp/src/arrow/io/memory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,14 +269,17 @@ BufferReader::BufferReader(std::shared_ptr<Buffer> buffer)
is_open_(true) {}

BufferReader::BufferReader(const uint8_t* data, int64_t size)
: buffer_(nullptr), data_(data), size_(size), position_(0), is_open_(true) {}
: BufferReader(std::make_shared<Buffer>(data, size)) {}

BufferReader::BufferReader(const Buffer& buffer)
: BufferReader(buffer.data(), buffer.size()) {}
: BufferReader(std::make_shared<Buffer>(buffer.data(), buffer.size())) {}

BufferReader::BufferReader(std::string_view data)
: BufferReader(reinterpret_cast<const uint8_t*>(data.data()),
static_cast<int64_t>(data.size())) {}
: BufferReader(std::make_shared<Buffer>(data)) {}

std::unique_ptr<BufferReader> BufferReader::FromString(std::string data) {
return std::make_unique<BufferReader>(Buffer::FromString(std::move(data)));
}

Status BufferReader::DoClose() {
is_open_ = false;
Expand Down
18 changes: 16 additions & 2 deletions cpp/src/arrow/io/memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,28 @@ class ARROW_EXPORT FixedSizeBufferWriter : public WritableFile {
class ARROW_EXPORT BufferReader
: public internal::RandomAccessFileConcurrencyWrapper<BufferReader> {
public:
/// \brief Instantiate from std::shared_ptr<Buffer>.
///
/// This is a zero-copy constructor.
explicit BufferReader(std::shared_ptr<Buffer> buffer);
ARROW_DEPRECATED(
"Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr<Buffer> "
"buffer) instead.")
explicit BufferReader(const Buffer& buffer);
ARROW_DEPRECATED(
"Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr<Buffer> "
"buffer) instead.")
BufferReader(const uint8_t* data, int64_t size);

/// \brief Instantiate from std::string or std::string_view. Does not
/// own data
/// \brief Instantiate from std::string_view. Does not own data
ARROW_DEPRECATED(
"Deprecated in 14.0.0. Use FromString or BufferReader(std::shared_ptr<Buffer> "
"buffer) instead.")
explicit BufferReader(std::string_view data);

/// \brief Instantiate from std::string. Owns data.
static std::unique_ptr<BufferReader> FromString(std::string data);

bool closed() const override;

bool supports_zero_copy() const override;
Expand Down
61 changes: 43 additions & 18 deletions cpp/src/arrow/io/memory_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,11 @@ TEST(TestBufferReader, FromStrings) {
std::string data = "data123456";
auto view = std::string_view(data);

BufferReader reader1(data);
BufferReader reader2(view);
std::unique_ptr<BufferReader> reader1 = BufferReader::FromString(data);
BufferReader reader2(std::make_shared<::arrow::Buffer>(view));

std::shared_ptr<Buffer> piece;
ASSERT_OK_AND_ASSIGN(piece, reader1.Read(4));
ASSERT_OK_AND_ASSIGN(piece, reader1->Read(4));
ASSERT_EQ(0, memcmp(piece->data(), data.data(), 4));

ASSERT_OK(reader2.Seek(2));
Expand All @@ -191,17 +191,17 @@ TEST(TestBufferReader, FromNullBuffer) {
TEST(TestBufferReader, Seeking) {
std::string data = "data123456";

BufferReader reader(data);
ASSERT_OK_AND_EQ(0, reader.Tell());
std::unique_ptr<BufferReader> reader = BufferReader::FromString(data);
ASSERT_OK_AND_EQ(0, reader->Tell());

ASSERT_OK(reader.Seek(9));
ASSERT_OK_AND_EQ(9, reader.Tell());
ASSERT_OK(reader->Seek(9));
ASSERT_OK_AND_EQ(9, reader->Tell());

ASSERT_OK(reader.Seek(10));
ASSERT_OK_AND_EQ(10, reader.Tell());
ASSERT_OK(reader->Seek(10));
ASSERT_OK_AND_EQ(10, reader->Tell());

ASSERT_RAISES(IOError, reader.Seek(11));
ASSERT_OK_AND_EQ(10, reader.Tell());
ASSERT_RAISES(IOError, reader->Seek(11));
ASSERT_OK_AND_EQ(10, reader->Tell());
}

TEST(TestBufferReader, Peek) {
Expand Down Expand Up @@ -283,12 +283,37 @@ TEST(TestBufferReader, WillNeed) {
}
{
std::string data = "data123456";
BufferReader reader(reinterpret_cast<const uint8_t*>(data.data()),
static_cast<int64_t>(data.size()));
auto reader = BufferReader::FromString(data);

ASSERT_OK(reader.WillNeed({{0, 4}, {4, 6}}));
ASSERT_RAISES(IOError, reader.WillNeed({{11, 1}})); // Out of bounds
ASSERT_OK(reader->WillNeed({{0, 4}, {4, 6}}));
ASSERT_RAISES(IOError, reader->WillNeed({{11, 1}})); // Out of bounds
}
}

void TestBufferReaderLifetime(
std::function<std::unique_ptr<BufferReader>(std::string&)> fn) {
std::shared_ptr<Buffer> result;
std::string data = "data12345678910111213";
{
std::string data_inner = data;
std::unique_ptr<BufferReader> reader = fn(data_inner);
EXPECT_EQ(true, reader->supports_zero_copy());
ASSERT_OK_AND_ASSIGN(result, reader->Read(data.length()));
}
EXPECT_EQ(std::string_view(data), std::string_view(*result));
}

TEST(TestBufferReader, Lifetime) {
// BufferReader(std::shared_ptr<Buffer>)
TestBufferReaderLifetime([](std::string& data) -> std::unique_ptr<BufferReader> {
auto buffer = Buffer::FromString(std::move(data));
return std::make_unique<BufferReader>(std::move(buffer));
});

// BufferReader(std::string)
TestBufferReaderLifetime([](std::string& data) -> std::unique_ptr<BufferReader> {
return BufferReader::FromString(std::move(data));
});
}

TEST(TestRandomAccessFile, GetStream) {
Expand Down Expand Up @@ -730,7 +755,7 @@ TEST(RangeReadCache, Basics) {
for (auto lazy : std::vector<bool>{false, true}) {
SCOPED_TRACE(lazy);
options.lazy = lazy;
auto file = std::make_shared<CountingBufferReader>(Buffer(data));
auto file = std::make_shared<CountingBufferReader>(std::make_shared<Buffer>(data));
internal::ReadRangeCache cache(file, {}, options);

ASSERT_OK(cache.Cache({{1, 2}, {3, 2}, {8, 2}, {20, 2}, {25, 0}}));
Expand Down Expand Up @@ -808,7 +833,7 @@ TEST(RangeReadCache, Concurrency) {
TEST(RangeReadCache, Lazy) {
std::string data = "abcdefghijklmnopqrstuvwxyz";

auto file = std::make_shared<CountingBufferReader>(Buffer(data));
auto file = std::make_shared<CountingBufferReader>(std::make_shared<Buffer>(data));
CacheOptions options = CacheOptions::LazyDefaults();
options.hole_size_limit = 2;
options.range_size_limit = 10;
Expand Down Expand Up @@ -849,7 +874,7 @@ TEST(RangeReadCache, Lazy) {
TEST(RangeReadCache, LazyWithPrefetching) {
std::string data = "abcdefghijklmnopqrstuvwxyz";

auto file = std::make_shared<CountingBufferReader>(Buffer(data));
auto file = std::make_shared<CountingBufferReader>(std::make_shared<Buffer>(data));
CacheOptions options = CacheOptions::LazyDefaults();
options.hole_size_limit = 1;
options.range_size_limit = 3;
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/ipc/read_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -574,12 +574,12 @@ TEST_F(TestIpcRoundTrip, SpecificMetadataVersion) {

TEST(TestReadMessage, CorruptedSmallInput) {
std::string data = "abc";
io::BufferReader reader(data);
ASSERT_RAISES(Invalid, ReadMessage(&reader));
auto reader = io::BufferReader::FromString(data);
ASSERT_RAISES(Invalid, ReadMessage(reader.get()));

// But no error on unsignaled EOS
io::BufferReader reader2("");
ASSERT_OK_AND_ASSIGN(auto message, ReadMessage(&reader2));
auto reader2 = io::BufferReader::FromString("");
ASSERT_OK_AND_ASSIGN(auto message, ReadMessage(reader2.get()));
ASSERT_EQ(nullptr, message);
}

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/ipc/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class ARROW_EXPORT RecordBatchFileReader
/// \return the read batch
virtual Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(int i) = 0;

/// \brief Read a particular record batch along with its custom metadada from the file.
/// \brief Read a particular record batch along with its custom metadata from the file.
/// Does not copy memory if the input source supports zero-copy.
///
/// \param[in] i the index of the record batch to return
Expand Down