diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index f221ffcadd9..f644b86f89f 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -145,6 +145,7 @@ struct CSVBlock { std::shared_ptr buffer; int64_t block_index; bool is_final; + int64_t bytes_skipped; std::function consume_bytes; }; @@ -153,7 +154,7 @@ struct CSVBlock { template <> struct IterationTraits { - static csv::CSVBlock End() { return csv::CSVBlock{{}, {}, {}, -1, true, {}}; } + static csv::CSVBlock End() { return csv::CSVBlock{{}, {}, {}, -1, true, 0, {}}; } static bool IsEnd(const csv::CSVBlock& val) { return val.block_index < 0; } }; @@ -222,16 +223,20 @@ class SerialBlockReader : public BlockReader { } bool is_final = (next_buffer == nullptr); + int64_t bytes_skipped = 0; if (skip_rows_) { + bytes_skipped += partial_->size(); + auto orig_size = buffer_->size(); RETURN_NOT_OK( chunker_->ProcessSkip(partial_, buffer_, is_final, &skip_rows_, &buffer_)); + bytes_skipped += orig_size - buffer_->size(); partial_ = SliceBuffer(buffer_, 0, 0); if (skip_rows_) { // Still have rows beyond this buffer to skip return empty block buffer_ = next_buffer; return TransformYield(CSVBlock{partial_, partial_, partial_, - block_index_++, is_final, + block_index_++, is_final, bytes_skipped, [](int64_t) { return Status::OK(); }}); } } @@ -262,7 +267,7 @@ class SerialBlockReader : public BlockReader { }; return TransformYield(CSVBlock{partial_, completion, buffer_, - block_index_++, is_final, + block_index_++, is_final, bytes_skipped, std::move(consume_bytes)}); } }; @@ -294,10 +299,14 @@ class ThreadedBlockReader : public BlockReader { auto current_partial = std::move(partial_); auto current_buffer = std::move(buffer_); + int64_t bytes_skipped = 0; if (skip_rows_) { + auto orig_size = current_buffer->size(); + bytes_skipped = current_partial->size(); RETURN_NOT_OK(chunker_->ProcessSkip(current_partial, current_buffer, is_final, &skip_rows_, ¤t_buffer)); + bytes_skipped += orig_size - current_buffer->size(); current_partial = SliceBuffer(current_buffer, 0, 0); if (skip_rows_) { partial_ = std::move(current_buffer); @@ -307,6 +316,7 @@ class ThreadedBlockReader : public BlockReader { current_partial, block_index_++, is_final, + bytes_skipped, {}}); } } @@ -332,8 +342,8 @@ class ThreadedBlockReader : public BlockReader { partial_ = std::move(next_partial); buffer_ = std::move(next_buffer); - return TransformYield( - CSVBlock{current_partial, completion, whole, block_index_++, is_final, {}}); + return TransformYield(CSVBlock{ + current_partial, completion, whole, block_index_++, is_final, bytes_skipped, {}}); } }; @@ -761,12 +771,13 @@ class SerialStreamingReader : public BaseStreamingReader, auto self = shared_from_this(); // Read schema from first batch - return ReadNextAsync().Then([self](const std::shared_ptr& first_batch) - -> Result> { - self->pending_batch_ = first_batch; - DCHECK_NE(self->schema_, nullptr); - return self; - }); + return ReadNextAsync(true).Then( + [self](const std::shared_ptr& first_batch) + -> Result> { + self->pending_batch_ = first_batch; + DCHECK_NE(self->schema_, nullptr); + return self; + }); } Result> DecodeBatchAndUpdateSchema() { @@ -788,6 +799,7 @@ class SerialStreamingReader : public BaseStreamingReader, return block_generator_() .Then([self](const CSVBlock& maybe_block) -> Status { if (!IsIterationEnd(maybe_block)) { + self->bytes_parsed_ += maybe_block.bytes_skipped; self->last_block_index_ = maybe_block.block_index; auto maybe_parsed = self->ParseAndInsert( maybe_block.partial, maybe_block.completion, maybe_block.buffer, @@ -797,6 +809,7 @@ class SerialStreamingReader : public BaseStreamingReader, self->eof_ = true; return maybe_parsed.status(); } + self->bytes_parsed_ += *maybe_parsed; RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed)); } else { self->source_eof_ = true; @@ -815,16 +828,46 @@ class SerialStreamingReader : public BaseStreamingReader, } Future> ReadNextSkippingEmpty( - std::shared_ptr self) { - return DoReadNext(self).Then([self](const std::shared_ptr& batch) { - if (batch != nullptr && batch->num_rows() == 0) { - return self->ReadNextSkippingEmpty(self); + std::shared_ptr self, bool internal_read) { + return DoReadNext(self).Then( + [self, internal_read](const std::shared_ptr& batch) { + if (batch != nullptr && batch->num_rows() == 0) { + return self->ReadNextSkippingEmpty(self, internal_read); + } + if (!internal_read) { + self->bytes_decoded_ += self->bytes_parsed_; + self->bytes_parsed_ = 0; + } + return Future>::MakeFinished(batch); + }); + } + + Future> ReadNextAsync() override { + return ReadNextAsync(false); + }; + + int64_t bytes_read() const override { return bytes_decoded_; } + + protected: + Future<> SetupReader(std::shared_ptr self) { + return buffer_generator_().Then([self](const std::shared_ptr& first_buffer) { + if (first_buffer == nullptr) { + return Status::Invalid("Empty CSV file"); } - return Future>::MakeFinished(batch); + auto own_first_buffer = first_buffer; + auto start = own_first_buffer->data(); + RETURN_NOT_OK(self->ProcessHeader(own_first_buffer, &own_first_buffer)); + self->bytes_decoded_ = own_first_buffer->data() - start; + RETURN_NOT_OK(self->MakeColumnDecoders()); + + self->block_generator_ = SerialBlockReader::MakeAsyncIterator( + std::move(self->buffer_generator_), MakeChunker(self->parse_options_), + std::move(own_first_buffer), self->read_options_.skip_rows_after_names); + return Status::OK(); }); } - Future> ReadNextAsync() override { + Future> ReadNextAsync(bool internal_read) { if (eof_) { return Future>::MakeFinished(nullptr); } @@ -835,38 +878,25 @@ class SerialStreamingReader : public BaseStreamingReader, auto self = shared_from_this(); if (!block_generator_) { return SetupReader(self).Then( - [self]() -> Future> { - return self->ReadNextSkippingEmpty(self); + [self, internal_read]() -> Future> { + return self->ReadNextSkippingEmpty(self, internal_read); }, [self](const Status& err) -> Result> { self->eof_ = true; return err; }); } else { - return self->ReadNextSkippingEmpty(self); + return self->ReadNextSkippingEmpty(self, internal_read); } - }; - - protected: - Future<> SetupReader(std::shared_ptr self) { - return buffer_generator_().Then([self](const std::shared_ptr& first_buffer) { - if (first_buffer == nullptr) { - return Status::Invalid("Empty CSV file"); - } - auto own_first_buffer = first_buffer; - RETURN_NOT_OK(self->ProcessHeader(own_first_buffer, &own_first_buffer)); - RETURN_NOT_OK(self->MakeColumnDecoders()); - - self->block_generator_ = SerialBlockReader::MakeAsyncIterator( - std::move(self->buffer_generator_), MakeChunker(self->parse_options_), - std::move(own_first_buffer), self->read_options_.skip_rows_after_names); - return Status::OK(); - }); } bool source_eof_ = false; int64_t last_block_index_ = 0; AsyncGenerator block_generator_; + // bytes of data parsed but not yet decoded + int64_t bytes_parsed_ = 0; + // bytes which have been decoded for caller + int64_t bytes_decoded_ = 0; }; ///////////////////////////////////////////////////////////////////////// diff --git a/cpp/src/arrow/csv/reader.h b/cpp/src/arrow/csv/reader.h index 5314104f048..48f02882b10 100644 --- a/cpp/src/arrow/csv/reader.h +++ b/cpp/src/arrow/csv/reader.h @@ -73,6 +73,21 @@ class ARROW_EXPORT StreamingReader : public RecordBatchReader { virtual Future> ReadNextAsync() = 0; + /// \brief Return the number of bytes which have been read and processed + /// + /// The returned number includes CSV bytes which the StreamingReader has + /// finished processing, but not bytes for which some processing (e.g. + /// CSV parsing or conversion to Arrow layout) is still ongoing. + /// + /// Furthermore, the following rules apply: + /// - bytes skipped by `ReadOptions.skip_rows` are counted as being read before + /// any records are returned. + /// - bytes read while parsing the header are counted as being read before any + /// records are returned. + /// - bytes skipped by `ReadOptions.skip_rows_after_names` are counted after the + /// first batch is returned. + virtual int64_t bytes_read() const = 0; + /// Create a StreamingReader instance /// /// This involves some I/O as the first batch must be loaded during the creation process diff --git a/cpp/src/arrow/csv/reader_test.cc b/cpp/src/arrow/csv/reader_test.cc index 4d4f04964bd..1ab49fa8664 100644 --- a/cpp/src/arrow/csv/reader_test.cc +++ b/cpp/src/arrow/csv/reader_test.cc @@ -216,6 +216,74 @@ TEST(StreamingReaderTests, NestedParallelism) { TestNestedParallelism(thread_pool, table_factory); } +TEST(StreamingReaderTest, BytesRead) { + ASSERT_OK_AND_ASSIGN(auto thread_pool, internal::ThreadPool::Make(1)); + auto table_buffer = + std::make_shared("a,b,c\n123,456,789\n101,112,131\n415,161,718\n"); + + // Basic read without any skips and small block size + { + auto input = std::make_shared(table_buffer); + + auto read_options = ReadOptions::Defaults(); + read_options.block_size = 20; + ASSERT_OK_AND_ASSIGN( + auto streaming_reader, + StreamingReader::Make(io::default_io_context(), input, read_options, + ParseOptions::Defaults(), ConvertOptions::Defaults())); + std::shared_ptr batch; + int64_t bytes = 6; // Size of header + do { + ASSERT_EQ(bytes, streaming_reader->bytes_read()); + ASSERT_OK(streaming_reader->ReadNext(&batch)); + bytes += 12; // Add size of each row + } while (batch); + ASSERT_EQ(42, streaming_reader->bytes_read()); + } + + // Interaction of skip_rows and bytes_read() + { + auto input = std::make_shared(table_buffer); + + auto read_options = ReadOptions::Defaults(); + read_options.skip_rows = 2; + ASSERT_OK_AND_ASSIGN( + auto streaming_reader, + StreamingReader::Make(io::default_io_context(), input, read_options, + ParseOptions::Defaults(), ConvertOptions::Defaults())); + std::shared_ptr batch; + // first two rows and third row as header + ASSERT_EQ(30, streaming_reader->bytes_read()); + ASSERT_OK(streaming_reader->ReadNext(&batch)); + ASSERT_NE(batch.get(), nullptr); + ASSERT_EQ(42, streaming_reader->bytes_read()); + ASSERT_OK(streaming_reader->ReadNext(&batch)); + ASSERT_EQ(batch.get(), nullptr); + } + + // Interaction of skip_rows_after_names and bytes_read() + { + auto input = std::make_shared(table_buffer); + + auto read_options = ReadOptions::Defaults(); + read_options.skip_rows_after_names = 2; + + ASSERT_OK_AND_ASSIGN( + auto streaming_reader, + StreamingReader::Make(io::default_io_context(), input, read_options, + ParseOptions::Defaults(), ConvertOptions::Defaults())); + std::shared_ptr batch; + + // Just header + ASSERT_EQ(6, streaming_reader->bytes_read()); + ASSERT_OK(streaming_reader->ReadNext(&batch)); + ASSERT_NE(batch.get(), nullptr); + ASSERT_EQ(42, streaming_reader->bytes_read()); + ASSERT_OK(streaming_reader->ReadNext(&batch)); + ASSERT_EQ(batch.get(), nullptr); + } +} + TEST(CountRowsAsync, Basics) { constexpr int NROWS = 4096; ASSERT_OK_AND_ASSIGN(auto table_buffer, MakeSampleCsvBuffer(NROWS));