Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 67 additions & 37 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ struct CSVBlock {
std::shared_ptr<Buffer> buffer;
int64_t block_index;
bool is_final;
int64_t bytes_skipped;
std::function<Status(int64_t)> consume_bytes;
};

Expand All @@ -153,7 +154,7 @@ struct CSVBlock {

template <>
struct IterationTraits<csv::CSVBlock> {
static csv::CSVBlock End() { return csv::CSVBlock{{}, {}, {}, -1, true, {}}; }
static csv::CSVBlock End() { return csv::CSVBlock{{}, {}, {}, -1, true, 0, {}}; }
static bool IsEnd(const csv::CSVBlock& val) { return val.block_index < 0; }
};

Expand Down Expand Up @@ -222,16 +223,20 @@ class SerialBlockReader : public BlockReader {
}

bool is_final = (next_buffer == nullptr);
int64_t bytes_skipped = 0;

if (skip_rows_) {
bytes_skipped += partial_->size();
auto orig_size = buffer_->size();
RETURN_NOT_OK(
chunker_->ProcessSkip(partial_, buffer_, is_final, &skip_rows_, &buffer_));
bytes_skipped += orig_size - buffer_->size();
partial_ = SliceBuffer(buffer_, 0, 0);
if (skip_rows_) {
// Still have rows beyond this buffer to skip return empty block
buffer_ = next_buffer;
return TransformYield<CSVBlock>(CSVBlock{partial_, partial_, partial_,
block_index_++, is_final,
block_index_++, is_final, bytes_skipped,
[](int64_t) { return Status::OK(); }});
}
}
Expand Down Expand Up @@ -262,7 +267,7 @@ class SerialBlockReader : public BlockReader {
};

return TransformYield<CSVBlock>(CSVBlock{partial_, completion, buffer_,
block_index_++, is_final,
block_index_++, is_final, bytes_skipped,
std::move(consume_bytes)});
}
};
Expand Down Expand Up @@ -294,10 +299,14 @@ class ThreadedBlockReader : public BlockReader {

auto current_partial = std::move(partial_);
auto current_buffer = std::move(buffer_);
int64_t bytes_skipped = 0;

if (skip_rows_) {
auto orig_size = current_buffer->size();
bytes_skipped = current_partial->size();
RETURN_NOT_OK(chunker_->ProcessSkip(current_partial, current_buffer, is_final,
&skip_rows_, &current_buffer));
bytes_skipped += orig_size - current_buffer->size();
current_partial = SliceBuffer(current_buffer, 0, 0);
if (skip_rows_) {
partial_ = std::move(current_buffer);
Expand All @@ -307,6 +316,7 @@ class ThreadedBlockReader : public BlockReader {
current_partial,
block_index_++,
is_final,
bytes_skipped,
{}});
}
}
Expand All @@ -332,8 +342,8 @@ class ThreadedBlockReader : public BlockReader {
partial_ = std::move(next_partial);
buffer_ = std::move(next_buffer);

return TransformYield<CSVBlock>(
CSVBlock{current_partial, completion, whole, block_index_++, is_final, {}});
return TransformYield<CSVBlock>(CSVBlock{
current_partial, completion, whole, block_index_++, is_final, bytes_skipped, {}});
}
};

Expand Down Expand Up @@ -761,12 +771,13 @@ class SerialStreamingReader : public BaseStreamingReader,

auto self = shared_from_this();
// Read schema from first batch
return ReadNextAsync().Then([self](const std::shared_ptr<RecordBatch>& first_batch)
-> Result<std::shared_ptr<csv::StreamingReader>> {
self->pending_batch_ = first_batch;
DCHECK_NE(self->schema_, nullptr);
return self;
});
return ReadNextAsync(true).Then(
[self](const std::shared_ptr<RecordBatch>& first_batch)
-> Result<std::shared_ptr<csv::StreamingReader>> {
self->pending_batch_ = first_batch;
DCHECK_NE(self->schema_, nullptr);
return self;
});
}

Result<std::shared_ptr<RecordBatch>> DecodeBatchAndUpdateSchema() {
Expand All @@ -788,6 +799,7 @@ class SerialStreamingReader : public BaseStreamingReader,
return block_generator_()
.Then([self](const CSVBlock& maybe_block) -> Status {
if (!IsIterationEnd(maybe_block)) {
self->bytes_parsed_ += maybe_block.bytes_skipped;
self->last_block_index_ = maybe_block.block_index;
auto maybe_parsed = self->ParseAndInsert(
maybe_block.partial, maybe_block.completion, maybe_block.buffer,
Expand All @@ -797,6 +809,7 @@ class SerialStreamingReader : public BaseStreamingReader,
self->eof_ = true;
return maybe_parsed.status();
}
self->bytes_parsed_ += *maybe_parsed;
RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed));
} else {
self->source_eof_ = true;
Expand All @@ -815,16 +828,46 @@ class SerialStreamingReader : public BaseStreamingReader,
}

Future<std::shared_ptr<RecordBatch>> ReadNextSkippingEmpty(
std::shared_ptr<SerialStreamingReader> self) {
return DoReadNext(self).Then([self](const std::shared_ptr<RecordBatch>& batch) {
if (batch != nullptr && batch->num_rows() == 0) {
return self->ReadNextSkippingEmpty(self);
std::shared_ptr<SerialStreamingReader> self, bool internal_read) {
return DoReadNext(self).Then(
[self, internal_read](const std::shared_ptr<RecordBatch>& batch) {
if (batch != nullptr && batch->num_rows() == 0) {
return self->ReadNextSkippingEmpty(self, internal_read);
}
if (!internal_read) {
self->bytes_decoded_ += self->bytes_parsed_;
self->bytes_parsed_ = 0;
}
return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch);
});
}

Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
return ReadNextAsync(false);
};

int64_t bytes_read() const override { return bytes_decoded_; }

protected:
Future<> SetupReader(std::shared_ptr<SerialStreamingReader> self) {
return buffer_generator_().Then([self](const std::shared_ptr<Buffer>& first_buffer) {
if (first_buffer == nullptr) {
return Status::Invalid("Empty CSV file");
}
return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch);
auto own_first_buffer = first_buffer;
auto start = own_first_buffer->data();
RETURN_NOT_OK(self->ProcessHeader(own_first_buffer, &own_first_buffer));
self->bytes_decoded_ = own_first_buffer->data() - start;
RETURN_NOT_OK(self->MakeColumnDecoders());

self->block_generator_ = SerialBlockReader::MakeAsyncIterator(
std::move(self->buffer_generator_), MakeChunker(self->parse_options_),
std::move(own_first_buffer), self->read_options_.skip_rows_after_names);
return Status::OK();
});
}

Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
Future<std::shared_ptr<RecordBatch>> ReadNextAsync(bool internal_read) {
if (eof_) {
return Future<std::shared_ptr<RecordBatch>>::MakeFinished(nullptr);
}
Expand All @@ -835,38 +878,25 @@ class SerialStreamingReader : public BaseStreamingReader,
auto self = shared_from_this();
if (!block_generator_) {
return SetupReader(self).Then(
[self]() -> Future<std::shared_ptr<RecordBatch>> {
return self->ReadNextSkippingEmpty(self);
[self, internal_read]() -> Future<std::shared_ptr<RecordBatch>> {
return self->ReadNextSkippingEmpty(self, internal_read);
},
[self](const Status& err) -> Result<std::shared_ptr<RecordBatch>> {
self->eof_ = true;
return err;
});
} else {
return self->ReadNextSkippingEmpty(self);
return self->ReadNextSkippingEmpty(self, internal_read);
}
};

protected:
Future<> SetupReader(std::shared_ptr<SerialStreamingReader> self) {
return buffer_generator_().Then([self](const std::shared_ptr<Buffer>& first_buffer) {
if (first_buffer == nullptr) {
return Status::Invalid("Empty CSV file");
}
auto own_first_buffer = first_buffer;
RETURN_NOT_OK(self->ProcessHeader(own_first_buffer, &own_first_buffer));
RETURN_NOT_OK(self->MakeColumnDecoders());

self->block_generator_ = SerialBlockReader::MakeAsyncIterator(
std::move(self->buffer_generator_), MakeChunker(self->parse_options_),
std::move(own_first_buffer), self->read_options_.skip_rows_after_names);
return Status::OK();
});
}

bool source_eof_ = false;
int64_t last_block_index_ = 0;
AsyncGenerator<CSVBlock> block_generator_;
// bytes of data parsed but not yet decoded
int64_t bytes_parsed_ = 0;
// bytes which have been decoded for caller
int64_t bytes_decoded_ = 0;
};

/////////////////////////////////////////////////////////////////////////
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/arrow/csv/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ class ARROW_EXPORT StreamingReader : public RecordBatchReader {

virtual Future<std::shared_ptr<RecordBatch>> ReadNextAsync() = 0;

/// \brief Return the number of bytes which have been read and processed
///
/// The returned number includes CSV bytes which the StreamingReader has
/// finished processing, but not bytes for which some processing (e.g.
/// CSV parsing or conversion to Arrow layout) is still ongoing.
///
/// Furthermore, the following rules apply:
/// - bytes skipped by `ReadOptions.skip_rows` are counted as being read before
/// any records are returned.
/// - bytes read while parsing the header are counted as being read before any
/// records are returned.
/// - bytes skipped by `ReadOptions.skip_rows_after_names` are counted after the
/// first batch is returned.
virtual int64_t bytes_read() const = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring may be a bit imprecise here. If there is some readahead going on, is it included in the result? Or is it the number of bytes corresponding to the batches already consumed by the caller?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think bytes_read means "bytes the CSV reader is completely finished with". So serial readahead (e.g. the readahead happening on the I/O context or "data read but not parsed or decoded") should not be included. Caller consumption should be irrelevant.

For parallel readahead (e.g. the CSV reader reading/parsing/decoding multiple batches of data at the same time) then my opinion is that bytes_read should be incremented as soon as a batch is ready to be delivered (even if there are other batches in front of it that aren't ready).

Perhaps bytes_processed or bytes_finished would remove the ambiguity? Or maybe just a clearer docstring.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A clearer docstring would be fine with me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated it to

  /// - bytes skipped by `ReadOptions.skip_rows` will be counted as being read before
  /// any records are returned.
  /// - bytes read while parsing the header will be counted as being read before any
  /// records are returned.
  /// - bytes skipped by `ReadOptions.skip_rows_after_names` will be counted after the
  /// first batch is returned.
  ///
  /// \return the number of bytes which have been read from the CSV stream and returned to
  /// caller


/// Create a StreamingReader instance
///
/// This involves some I/O as the first batch must be loaded during the creation process
Expand Down
68 changes: 68 additions & 0 deletions cpp/src/arrow/csv/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,74 @@ TEST(StreamingReaderTests, NestedParallelism) {
TestNestedParallelism(thread_pool, table_factory);
}

TEST(StreamingReaderTest, BytesRead) {
ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1));
auto table_buffer =
std::make_shared<Buffer>("a,b,c\n123,456,789\n101,112,131\n415,161,718\n");

// Basic read without any skips and small block size
{
auto input = std::make_shared<io::BufferReader>(table_buffer);

auto read_options = ReadOptions::Defaults();
read_options.block_size = 20;
ASSERT_OK_AND_ASSIGN(
auto streaming_reader,
StreamingReader::Make(io::default_io_context(), input, read_options,
ParseOptions::Defaults(), ConvertOptions::Defaults()));
std::shared_ptr<RecordBatch> batch;
int64_t bytes = 6; // Size of header
do {
ASSERT_EQ(bytes, streaming_reader->bytes_read());
ASSERT_OK(streaming_reader->ReadNext(&batch));
bytes += 12; // Add size of each row
} while (batch);
ASSERT_EQ(42, streaming_reader->bytes_read());
}

// Interaction of skip_rows and bytes_read()
{
auto input = std::make_shared<io::BufferReader>(table_buffer);

auto read_options = ReadOptions::Defaults();
read_options.skip_rows = 2;
ASSERT_OK_AND_ASSIGN(
auto streaming_reader,
StreamingReader::Make(io::default_io_context(), input, read_options,
ParseOptions::Defaults(), ConvertOptions::Defaults()));
std::shared_ptr<RecordBatch> batch;
// first two rows and third row as header
ASSERT_EQ(30, streaming_reader->bytes_read());
ASSERT_OK(streaming_reader->ReadNext(&batch));
ASSERT_NE(batch.get(), nullptr);
ASSERT_EQ(42, streaming_reader->bytes_read());
ASSERT_OK(streaming_reader->ReadNext(&batch));
ASSERT_EQ(batch.get(), nullptr);
}

// Interaction of skip_rows_after_names and bytes_read()
{
auto input = std::make_shared<io::BufferReader>(table_buffer);

auto read_options = ReadOptions::Defaults();
read_options.skip_rows_after_names = 2;

ASSERT_OK_AND_ASSIGN(
auto streaming_reader,
StreamingReader::Make(io::default_io_context(), input, read_options,
ParseOptions::Defaults(), ConvertOptions::Defaults()));
std::shared_ptr<RecordBatch> batch;

// Just header
ASSERT_EQ(6, streaming_reader->bytes_read());
ASSERT_OK(streaming_reader->ReadNext(&batch));
ASSERT_NE(batch.get(), nullptr);
ASSERT_EQ(42, streaming_reader->bytes_read());
ASSERT_OK(streaming_reader->ReadNext(&batch));
ASSERT_EQ(batch.get(), nullptr);
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add a test where the skip_rows and/or skip_rows_after_names options are set? What should be the semantics there?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


TEST(CountRowsAsync, Basics) {
constexpr int NROWS = 4096;
ASSERT_OK_AND_ASSIGN(auto table_buffer, MakeSampleCsvBuffer(NROWS));
Expand Down