From cf1cadc50b7e1a30a0d0339ac884b7e33217f0c4 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 17 Jun 2021 03:00:59 -1000 Subject: [PATCH 01/10] ARROW-11889: Converted streaming CSV reader to an async generator based implementation. The parser and decoder are now operator functions and sequencing logic has been removed from them. Parallel readahead has been added to the streaming reader to allow for parallel streaming CSV reads. --- cpp/src/arrow/csv/CMakeLists.txt | 1 + cpp/src/arrow/csv/column_decoder.cc | 227 +++-------- cpp/src/arrow/csv/column_decoder.h | 39 +- cpp/src/arrow/csv/column_decoder_test.cc | 228 ++++------- cpp/src/arrow/csv/reader.cc | 496 ++++++++++++----------- cpp/src/arrow/csv/reader_test.cc | 91 ++++- cpp/src/arrow/util/async_generator.h | 2 + python/pyarrow/tests/test_csv.py | 29 +- 8 files changed, 506 insertions(+), 607 deletions(-) diff --git a/cpp/src/arrow/csv/CMakeLists.txt b/cpp/src/arrow/csv/CMakeLists.txt index 561faf1b584..8b3d484e99e 100644 --- a/cpp/src/arrow/csv/CMakeLists.txt +++ b/cpp/src/arrow/csv/CMakeLists.txt @@ -28,6 +28,7 @@ if(ARROW_COMPUTE) list(APPEND CSV_TEST_SRCS writer_test.cc) endif() +add_arrow_test(csv-decoder-test SOURCES column_decoder_test.cc) add_arrow_test(csv-test SOURCES ${CSV_TEST_SRCS}) add_arrow_benchmark(converter_benchmark PREFIX "arrow-csv") diff --git a/cpp/src/arrow/csv/column_decoder.cc b/cpp/src/arrow/csv/column_decoder.cc index 1dd13bc9086..3fc7a896460 100644 --- a/cpp/src/arrow/csv/column_decoder.cc +++ b/cpp/src/arrow/csv/column_decoder.cc @@ -45,91 +45,13 @@ using internal::TaskGroup; class ConcreteColumnDecoder : public ColumnDecoder { public: - explicit ConcreteColumnDecoder(MemoryPool* pool, - std::shared_ptr task_group, - int32_t col_index = -1) - : ColumnDecoder(std::move(task_group)), - pool_(pool), - col_index_(col_index), - num_chunks_(-1), - next_chunk_(0) {} - - void Append(const std::shared_ptr& parser) override { - Insert(static_cast(chunks_.size()), parser); - } - - void SetEOF(int64_t num_blocks) override { - std::lock_guard lock(mutex_); - - DCHECK_EQ(num_chunks_, -1) << "Cannot change EOF"; - num_chunks_ = num_blocks; - - // If further chunks have been requested in NextChunk(), arrange to return nullptr - for (int64_t i = num_chunks_; i < static_cast(chunks_.size()); ++i) { - auto* chunk = &chunks_[i]; - if (chunk->is_valid()) { - DCHECK(!IsFutureFinished(chunk->state())); - chunk->MarkFinished(std::shared_ptr()); - } - } - } - - Result> NextChunk() override { - std::unique_lock lock(mutex_); - - if (num_chunks_ > 0 && next_chunk_ >= num_chunks_) { - return nullptr; // EOF - } - PrepareChunkUnlocked(next_chunk_); - auto chunk_index = next_chunk_++; - WaitForChunkUnlocked(chunk_index); - // Move Future to avoid keeping chunk alive - return chunks_[chunk_index].MoveResult(); - } + explicit ConcreteColumnDecoder(MemoryPool* pool, int32_t col_index = -1) + : ColumnDecoder(), pool_(pool), col_index_(col_index) {} protected: // XXX useful? virtual std::shared_ptr type() const = 0; - void WaitForChunkUnlocked(int64_t chunk_index) { - auto future = chunks_[chunk_index]; // Make copy because of resizes - mutex_.unlock(); - future.Wait(); - mutex_.lock(); - } - - void PrepareChunk(int64_t block_index) { - std::lock_guard lock(mutex_); - PrepareChunkUnlocked(block_index); - } - - void PrepareChunkUnlocked(int64_t block_index) { - size_t chunk_index = static_cast(block_index); - if (chunks_.size() <= chunk_index) { - chunks_.resize(chunk_index + 1); - } - if (!chunks_[block_index].is_valid()) { - chunks_[block_index] = Future>::Make(); - } - } - - void SetChunk(int64_t chunk_index, Result> maybe_array) { - std::lock_guard lock(mutex_); - SetChunkUnlocked(chunk_index, std::move(maybe_array)); - } - - void SetChunkUnlocked(int64_t chunk_index, Result> maybe_array) { - auto* chunk = &chunks_[chunk_index]; - DCHECK(chunk->is_valid()); - DCHECK(!IsFutureFinished(chunk->state())); - - if (maybe_array.ok()) { - chunk->MarkFinished(std::move(maybe_array)); - } else { - chunk->MarkFinished(WrapConversionError(maybe_array.status())); - } - } - Status WrapConversionError(const Status& st) { if (st.ok()) { return st; @@ -142,12 +64,7 @@ class ConcreteColumnDecoder : public ColumnDecoder { MemoryPool* pool_; int32_t col_index_; - - std::vector>> chunks_; - int64_t num_chunks_; - int64_t next_chunk_; - - std::mutex mutex_; + internal::Executor* executor_; }; ////////////////////////////////////////////////////////////////////////// @@ -155,11 +72,11 @@ class ConcreteColumnDecoder : public ColumnDecoder { class NullColumnDecoder : public ConcreteColumnDecoder { public: - explicit NullColumnDecoder(const std::shared_ptr& type, MemoryPool* pool, - const std::shared_ptr& task_group) - : ConcreteColumnDecoder(pool, task_group), type_(type) {} + explicit NullColumnDecoder(const std::shared_ptr& type, MemoryPool* pool) + : ConcreteColumnDecoder(pool), type_(type) {} - void Insert(int64_t block_index, const std::shared_ptr& parser) override; + Future> Decode( + const std::shared_ptr& parser) override; protected: std::shared_ptr type() const override { return type_; } @@ -167,24 +84,18 @@ class NullColumnDecoder : public ConcreteColumnDecoder { std::shared_ptr type_; }; -void NullColumnDecoder::Insert(int64_t block_index, - const std::shared_ptr& parser) { - PrepareChunk(block_index); - +Future> NullColumnDecoder::Decode( + const std::shared_ptr& parser) { // Spawn a task that will build an array of nulls with the right DataType const int32_t num_rows = parser->num_rows(); DCHECK_GE(num_rows, 0); - task_group_->Append([=]() -> Status { - std::unique_ptr builder; - RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder)); - std::shared_ptr array; - RETURN_NOT_OK(builder->AppendNulls(num_rows)); - RETURN_NOT_OK(builder->Finish(&array)); - - SetChunk(block_index, array); - return Status::OK(); - }); + std::unique_ptr builder; + RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder)); + std::shared_ptr array; + RETURN_NOT_OK(builder->AppendNulls(num_rows)); + RETURN_NOT_OK(builder->Finish(&array)); + return Future>::MakeFinished(std::move(array)); } ////////////////////////////////////////////////////////////////////////// @@ -193,15 +104,13 @@ void NullColumnDecoder::Insert(int64_t block_index, class TypedColumnDecoder : public ConcreteColumnDecoder { public: TypedColumnDecoder(const std::shared_ptr& type, int32_t col_index, - const ConvertOptions& options, MemoryPool* pool, - const std::shared_ptr& task_group) - : ConcreteColumnDecoder(pool, task_group, col_index), - type_(type), - options_(options) {} + const ConvertOptions& options, MemoryPool* pool) + : ConcreteColumnDecoder(pool, col_index), type_(type), options_(options) {} Status Init(); - void Insert(int64_t block_index, const std::shared_ptr& parser) override; + Future> Decode( + const std::shared_ptr& parser) override; protected: std::shared_ptr type() const override { return type_; } @@ -219,17 +128,11 @@ Status TypedColumnDecoder::Init() { return Status::OK(); } -void TypedColumnDecoder::Insert(int64_t block_index, - const std::shared_ptr& parser) { +Future> TypedColumnDecoder::Decode( + const std::shared_ptr& parser) { DCHECK_NE(converter_, nullptr); - - PrepareChunk(block_index); - - // We're careful that all references in the closure outlive the Append() call - task_group_->Append([=]() -> Status { - SetChunk(block_index, converter_->Convert(*parser, col_index_)); - return Status::OK(); - }); + return Future>::MakeFinished( + converter_->Convert(*parser, col_index_)); } ////////////////////////////////////////////////////////////////////////// @@ -238,16 +141,19 @@ void TypedColumnDecoder::Insert(int64_t block_index, class InferringColumnDecoder : public ConcreteColumnDecoder { public: InferringColumnDecoder(int32_t col_index, const ConvertOptions& options, - MemoryPool* pool, - const std::shared_ptr& task_group) - : ConcreteColumnDecoder(pool, task_group, col_index), + MemoryPool* pool) + : ConcreteColumnDecoder(pool, col_index), options_(options), infer_status_(options), - type_frozen_(false) {} + type_frozen_(false) { + first_inference_run_ = Future<>::Make(); + first_inferrer_ = 0; + } Status Init(); - void Insert(int64_t block_index, const std::shared_ptr& parser) override; + Future> Decode( + const std::shared_ptr& parser) override; protected: std::shared_ptr type() const override { @@ -265,10 +171,9 @@ class InferringColumnDecoder : public ConcreteColumnDecoder { // Current inference status InferStatus infer_status_; bool type_frozen_; + std::atomic first_inferrer_; + Future<> first_inference_run_; std::shared_ptr converter_; - - // The parsers corresponding to each chunk (for reconverting) - std::vector> parsers_; }; Status InferringColumnDecoder::Init() { return UpdateType(); } @@ -283,55 +188,37 @@ Result> InferringColumnDecoder::RunInference( // (no one else should be updating converter_ concurrently) auto maybe_array = converter_->Convert(*parser, col_index_); - std::unique_lock lock(mutex_); if (maybe_array.ok() || !infer_status_.can_loosen_type()) { // Conversion succeeded, or failed definitively + DCHECK(!type_frozen_); + type_frozen_ = true; return maybe_array; } // Conversion failed temporarily, try another type infer_status_.LoosenType(maybe_array.status()); - RETURN_NOT_OK(UpdateType()); + auto update_status = UpdateType(); + if (!update_status.ok()) { + return update_status; + } } } -void InferringColumnDecoder::Insert(int64_t block_index, - const std::shared_ptr& parser) { - PrepareChunk(block_index); - +Future> InferringColumnDecoder::Decode( + const std::shared_ptr& parser) { + bool already_taken = first_inferrer_.fetch_or(1); // First block: run inference - if (block_index == 0) { - task_group_->Append([=]() -> Status { - auto maybe_array = RunInference(parser); - - std::unique_lock lock(mutex_); - DCHECK(!type_frozen_); - type_frozen_ = true; - SetChunkUnlocked(block_index, std::move(maybe_array)); - return Status::OK(); - }); - return; + if (!already_taken) { + auto maybe_array = RunInference(parser); + first_inference_run_.MarkFinished(); + return Future>::MakeFinished(maybe_array); } // Non-first block: wait for inference to finish on first block now, // without blocking a TaskGroup thread. - { - std::unique_lock lock(mutex_); - PrepareChunkUnlocked(0); - WaitForChunkUnlocked(0); - if (!chunks_[0].status().ok()) { - // Failed converting first chunk: bail out by marking EOF, - // because we can't decide a type for the other chunks. - SetChunkUnlocked(block_index, std::shared_ptr()); - } + return first_inference_run_.Then([this, parser] { DCHECK(type_frozen_); - } - - // Then use the inferred type to convert this block. - task_group_->Append([=]() -> Status { auto maybe_array = converter_->Convert(*parser, col_index_); - - SetChunk(block_index, std::move(maybe_array)); - return Status::OK(); + return maybe_array; }); } @@ -339,28 +226,24 @@ void InferringColumnDecoder::Insert(int64_t block_index, // Factory functions Result> ColumnDecoder::Make( - MemoryPool* pool, int32_t col_index, const ConvertOptions& options, - std::shared_ptr task_group) { - auto ptr = std::make_shared(col_index, options, pool, - std::move(task_group)); + MemoryPool* pool, int32_t col_index, const ConvertOptions& options) { + auto ptr = std::make_shared(col_index, options, pool); RETURN_NOT_OK(ptr->Init()); return ptr; } Result> ColumnDecoder::Make( MemoryPool* pool, std::shared_ptr type, int32_t col_index, - const ConvertOptions& options, std::shared_ptr task_group) { - auto ptr = std::make_shared(std::move(type), col_index, options, - pool, std::move(task_group)); + const ConvertOptions& options) { + auto ptr = + std::make_shared(std::move(type), col_index, options, pool); RETURN_NOT_OK(ptr->Init()); return ptr; } Result> ColumnDecoder::MakeNull( - MemoryPool* pool, std::shared_ptr type, - std::shared_ptr task_group) { - return std::make_shared(std::move(type), pool, - std::move(task_group)); + MemoryPool* pool, std::shared_ptr type) { + return std::make_shared(std::move(type), pool); } } // namespace csv diff --git a/cpp/src/arrow/csv/column_decoder.h b/cpp/src/arrow/csv/column_decoder.h index 92644e3769f..ffaa1a1a827 100644 --- a/cpp/src/arrow/csv/column_decoder.h +++ b/cpp/src/arrow/csv/column_decoder.h @@ -36,45 +36,28 @@ class ARROW_EXPORT ColumnDecoder { public: virtual ~ColumnDecoder() = default; - /// Spawn a task that will try to convert and append the given CSV block. - /// All calls to Append() should happen on the same thread, otherwise - /// call Insert() instead. - virtual void Append(const std::shared_ptr& parser) = 0; - /// Spawn a task that will try to convert and insert the given CSV block - virtual void Insert(int64_t block_index, - const std::shared_ptr& parser) = 0; - - /// Set EOF at the given number of blocks. Must only be called once. - virtual void SetEOF(int64_t num_blocks) = 0; - - /// Fetch a chunk. - virtual Result> NextChunk() = 0; - - std::shared_ptr task_group() { return task_group_; } + virtual Future> Decode( + const std::shared_ptr& parser) = 0; /// Construct a strictly-typed ColumnDecoder. - static Result> Make( - MemoryPool* pool, std::shared_ptr type, int32_t col_index, - const ConvertOptions& options, std::shared_ptr task_group); + static Result> Make(MemoryPool* pool, + std::shared_ptr type, + int32_t col_index, + const ConvertOptions& options); /// Construct a type-inferring ColumnDecoder. /// Inference will run only on the first block, the type will be frozen afterwards. - static Result> Make( - MemoryPool* pool, int32_t col_index, const ConvertOptions& options, - std::shared_ptr task_group); + static Result> Make(MemoryPool* pool, int32_t col_index, + const ConvertOptions& options); /// Construct a ColumnDecoder for a column of nulls /// (i.e. not present in the CSV file). - static Result> MakeNull( - MemoryPool* pool, std::shared_ptr type, - std::shared_ptr task_group); + static Result> MakeNull(MemoryPool* pool, + std::shared_ptr type); protected: - explicit ColumnDecoder(std::shared_ptr task_group) - : task_group_(std::move(task_group)) {} - - std::shared_ptr task_group_; + explicit ColumnDecoder() = default; }; } // namespace csv diff --git a/cpp/src/arrow/csv/column_decoder_test.cc b/cpp/src/arrow/csv/column_decoder_test.cc index 231ffb85e1b..48d9d97121d 100644 --- a/cpp/src/arrow/csv/column_decoder_test.cc +++ b/cpp/src/arrow/csv/column_decoder_test.cc @@ -27,11 +27,11 @@ #include "arrow/csv/test_common.h" #include "arrow/memory_pool.h" #include "arrow/table.h" +#include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" #include "arrow/type.h" #include "arrow/util/checked_cast.h" -#include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" namespace arrow { @@ -41,7 +41,6 @@ class BlockParser; using internal::checked_cast; using internal::GetCpuThreadPool; -using internal::TaskGroup; using ChunkData = std::vector>; @@ -65,58 +64,64 @@ ThreadJoiner RunThread(Func&& func) { return ThreadJoiner(std::make_shared(std::forward(func))); } -struct SerialExecutor { - static std::shared_ptr task_group() { return TaskGroup::MakeSerial(); } -}; - -struct ParallelExecutor { - static std::shared_ptr task_group() { - return TaskGroup::MakeThreaded(GetCpuThreadPool()); +template +void RunThreadsAndJoin(Func&& func, int iters) { + std::vector threads; + for (int i = 0; i < iters; i++) { + threads.emplace_back(std::make_shared([i, func] { func(i); })); } -}; - -using ExecutorTypes = ::testing::Types; +} class ColumnDecoderTest : public ::testing::Test { public: - ColumnDecoderTest() : tg_(TaskGroup::MakeSerial()), num_chunks_(0) {} + ColumnDecoderTest() : num_chunks_(0), read_ptr_(0) {} void SetDecoder(std::shared_ptr decoder) { decoder_ = std::move(decoder); + decoded_chunks_.clear(); num_chunks_ = 0; + read_ptr_ = 0; } - void InsertChunk(int64_t num_chunk, std::vector chunk) { + void InsertChunk(std::vector chunk) { std::shared_ptr parser; MakeColumnParser(chunk, &parser); - decoder_->Insert(num_chunk, parser); + auto decoded = decoder_->Decode(parser); + decoded_chunks_.push_back(decoded); + ++num_chunks_; } void AppendChunks(const ChunkData& chunks) { for (const auto& chunk : chunks) { - std::shared_ptr parser; - MakeColumnParser(chunk, &parser); - decoder_->Append(parser); - ++num_chunks_; + InsertChunk(chunk); } } - void SetEOF() { decoder_->SetEOF(num_chunks_); } + Result> NextChunk() { + EXPECT_LT(read_ptr_, decoded_chunks_.size()); + return decoded_chunks_[read_ptr_++].result(); + } + + void AssertChunk(std::vector chunk, std::shared_ptr expected) { + std::shared_ptr parser; + MakeColumnParser(chunk, &parser); + ASSERT_FINISHES_OK_AND_ASSIGN(auto decoded, decoder_->Decode(parser)); + AssertArraysEqual(*expected, *decoded); + } void AssertFetch(std::shared_ptr expected_chunk) { - ASSERT_OK_AND_ASSIGN(auto chunk, decoder_->NextChunk()); + ASSERT_OK_AND_ASSIGN(auto chunk, NextChunk()); ASSERT_NE(chunk, nullptr); AssertArraysEqual(*expected_chunk, *chunk); } - void AssertFetchInvalid() { ASSERT_RAISES(Invalid, decoder_->NextChunk()); } - - void AssertFetchEOF() { ASSERT_OK_AND_EQ(nullptr, decoder_->NextChunk()); } + void AssertFetchInvalid() { ASSERT_RAISES(Invalid, NextChunk()); } protected: - std::shared_ptr tg_; std::shared_ptr decoder_; - int64_t num_chunks_; + std::vector>> decoded_chunks_; + int64_t num_chunks_ = 0; + int64_t read_ptr_ = 0; ConvertOptions default_options = ConvertOptions::Defaults(); }; @@ -124,14 +129,13 @@ class ColumnDecoderTest : public ::testing::Test { ////////////////////////////////////////////////////////////////////////// // Tests for null column decoder -template class NullColumnDecoderTest : public ColumnDecoderTest { public: - NullColumnDecoderTest() { tg_ = ExecutorType::task_group(); } + NullColumnDecoderTest() {} void MakeDecoder(std::shared_ptr type) { ASSERT_OK_AND_ASSIGN(auto decoder, - ColumnDecoder::MakeNull(default_memory_pool(), type, tg_)); + ColumnDecoder::MakeNull(default_memory_pool(), type)); SetDecoder(decoder); } @@ -141,10 +145,8 @@ class NullColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(type); AppendChunks({{"1", "2", "3"}, {"4", "5"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[null, null, null]")); AssertFetch(ArrayFromJSON(type, "[null, null]")); - AssertFetchEOF(); MakeDecoder(type); @@ -153,8 +155,6 @@ class NullColumnDecoderTest : public ColumnDecoderTest { AppendChunks({{"7", "8"}}); AssertFetch(ArrayFromJSON(type, "[null]")); AssertFetch(ArrayFromJSON(type, "[null, null]")); - SetEOF(); - AssertFetchEOF(); } void TestOtherType() { @@ -163,57 +163,40 @@ class NullColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(type); AppendChunks({{"1", "2", "3"}, {"4", "5"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[null, null, null]")); AssertFetch(ArrayFromJSON(type, "[null, null]")); - AssertFetchEOF(); - AssertFetchEOF(); } void TestThreaded() { + constexpr int NITERS = 10; auto type = int32(); - MakeDecoder(type); - auto joiner = RunThread([&]() { - InsertChunk(1, {"4", "5"}); - InsertChunk(0, {"1", "2", "3"}); - InsertChunk(3, {"6"}); - InsertChunk(2, {}); - decoder_->SetEOF(4); - }); - - AssertFetch(ArrayFromJSON(type, "[null, null, null]")); - AssertFetch(ArrayFromJSON(type, "[null, null]")); - AssertFetch(ArrayFromJSON(type, "[]")); - AssertFetch(ArrayFromJSON(type, "[null]")); - AssertFetchEOF(); - AssertFetchEOF(); + RunThreadsAndJoin( + [&](int thread_id) { + AssertChunk({"4", "5", std::to_string(thread_id)}, + ArrayFromJSON(type, "[null, null, null]")); + }, + NITERS); } - - protected: - ExecutorType executor_; }; -TYPED_TEST_SUITE(NullColumnDecoderTest, ExecutorTypes); - -TYPED_TEST(NullColumnDecoderTest, NullType) { this->TestNullType(); } +TEST_F(NullColumnDecoderTest, NullType) { this->TestNullType(); } -TYPED_TEST(NullColumnDecoderTest, OtherType) { this->TestOtherType(); } +TEST_F(NullColumnDecoderTest, OtherType) { this->TestOtherType(); } -TYPED_TEST(NullColumnDecoderTest, Threaded) { this->TestThreaded(); } +TEST_F(NullColumnDecoderTest, Threaded) { this->TestThreaded(); } ////////////////////////////////////////////////////////////////////////// // Tests for fixed-type column decoder -template class TypedColumnDecoderTest : public ColumnDecoderTest { public: - TypedColumnDecoderTest() { tg_ = ExecutorType::task_group(); } + TypedColumnDecoderTest() {} void MakeDecoder(const std::shared_ptr& type, const ConvertOptions& options) { - ASSERT_OK_AND_ASSIGN( - auto decoder, ColumnDecoder::Make(default_memory_pool(), type, 0, options, tg_)); + ASSERT_OK_AND_ASSIGN(auto decoder, + ColumnDecoder::Make(default_memory_pool(), type, 0, options)); SetDecoder(decoder); } @@ -223,11 +206,8 @@ class TypedColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(type, default_options); AppendChunks({{"123", "456", "-78"}, {"901", "N/A"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[123, 456, -78]")); AssertFetch(ArrayFromJSON(type, "[901, null]")); - AssertFetchEOF(); - AssertFetchEOF(); MakeDecoder(type, default_options); @@ -236,9 +216,6 @@ class TypedColumnDecoderTest : public ColumnDecoderTest { AppendChunks({{"N/A", "N/A"}}); AssertFetch(ArrayFromJSON(type, "[-987]")); AssertFetch(ArrayFromJSON(type, "[null, null]")); - SetEOF(); - AssertFetchEOF(); - AssertFetchEOF(); } void TestOptions() { @@ -247,10 +224,7 @@ class TypedColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(type, default_options); AppendChunks({{"true", "false", "N/A"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[true, false, null]")); - AssertFetchEOF(); - AssertFetchEOF(); // With non-default options auto options = default_options; @@ -260,10 +234,7 @@ class TypedColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(type, options); AppendChunks({{"true", "false", "N/A"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[null, true, false]")); - AssertFetchEOF(); - AssertFetchEOF(); } void TestErrors() { @@ -273,56 +244,48 @@ class TypedColumnDecoderTest : public ColumnDecoderTest { AppendChunks({{"123", "456", "N/A"}, {"-901"}}); AppendChunks({{"N/A", "1000"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[123, 456, null]")); AssertFetchInvalid(); AssertFetch(ArrayFromJSON(type, "[null, 1000]")); - AssertFetchEOF(); } - void TestThreaded() { - auto type = uint32(); + // void TestThreaded() { + // auto type = uint32(); - MakeDecoder(type, default_options); + // MakeDecoder(type, default_options); - auto joiner = RunThread([&]() { - InsertChunk(1, {"4", "-5"}); - InsertChunk(0, {"1", "2", "3"}); - InsertChunk(3, {"6"}); - InsertChunk(2, {}); - decoder_->SetEOF(4); - }); + // auto joiner = RunThread([&]() { + // InsertChunk(1, {"4", "-5"}); + // InsertChunk(0, {"1", "2", "3"}); + // InsertChunk(3, {"6"}); + // InsertChunk(2, {}); + // }); - AssertFetch(ArrayFromJSON(type, "[1, 2, 3]")); - AssertFetchInvalid(); - AssertFetch(ArrayFromJSON(type, "[]")); - AssertFetch(ArrayFromJSON(type, "[6]")); - AssertFetchEOF(); - AssertFetchEOF(); - } + // AssertFetch(ArrayFromJSON(type, "[1, 2, 3]")); + // AssertFetchInvalid(); + // AssertFetch(ArrayFromJSON(type, "[]")); + // AssertFetch(ArrayFromJSON(type, "[6]")); + // } }; -TYPED_TEST_SUITE(TypedColumnDecoderTest, ExecutorTypes); +TEST_F(TypedColumnDecoderTest, Integers) { this->TestIntegers(); } -TYPED_TEST(TypedColumnDecoderTest, Integers) { this->TestIntegers(); } +TEST_F(TypedColumnDecoderTest, Options) { this->TestOptions(); } -TYPED_TEST(TypedColumnDecoderTest, Options) { this->TestOptions(); } +TEST_F(TypedColumnDecoderTest, Errors) { this->TestErrors(); } -TYPED_TEST(TypedColumnDecoderTest, Errors) { this->TestErrors(); } - -TYPED_TEST(TypedColumnDecoderTest, Threaded) { this->TestThreaded(); } +// TEST_F(TypedColumnDecoderTest, Threaded) { this->TestThreaded(); } ////////////////////////////////////////////////////////////////////////// // Tests for type-inferring column decoder -template class InferringColumnDecoderTest : public ColumnDecoderTest { public: - InferringColumnDecoderTest() { tg_ = ExecutorType::task_group(); } + InferringColumnDecoderTest() {} void MakeDecoder(const ConvertOptions& options) { ASSERT_OK_AND_ASSIGN(auto decoder, - ColumnDecoder::Make(default_memory_pool(), 0, options, tg_)); + ColumnDecoder::Make(default_memory_pool(), 0, options)); SetDecoder(decoder); } @@ -332,36 +295,30 @@ class InferringColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(default_options); AppendChunks({{"123", "456", "-78"}, {"901", "N/A"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[123, 456, -78]")); AssertFetch(ArrayFromJSON(type, "[901, null]")); - AssertFetchEOF(); - AssertFetchEOF(); } - void TestThreaded() { - auto type = float64(); + // void TestThreaded() { + // auto type = float64(); - MakeDecoder(default_options); + // MakeDecoder(default_options); - auto joiner = RunThread([&]() { - SleepFor(1e-3); - InsertChunk(0, {"1.5", "2", "3"}); - InsertChunk(3, {"6"}); - decoder_->SetEOF(4); - }); + // auto joiner = RunThread([&]() { + // SleepFor(1e-3); + // InsertChunk(0, {"1.5", "2", "3"}); + // InsertChunk(3, {"6"}); + // }); - // These chunks will wait for inference to run on chunk 0 - InsertChunk(1, {"4", "-5", "N/A"}); - InsertChunk(2, {}); + // // These chunks will wait for inference to run on chunk 0 + // InsertChunk(1, {"4", "-5", "N/A"}); + // InsertChunk(2, {}); - AssertFetch(ArrayFromJSON(type, "[1.5, 2, 3]")); - AssertFetch(ArrayFromJSON(type, "[4, -5, null]")); - AssertFetch(ArrayFromJSON(type, "[]")); - AssertFetch(ArrayFromJSON(type, "[6]")); - AssertFetchEOF(); - AssertFetchEOF(); - } + // AssertFetch(ArrayFromJSON(type, "[1.5, 2, 3]")); + // AssertFetch(ArrayFromJSON(type, "[4, -5, null]")); + // AssertFetch(ArrayFromJSON(type, "[]")); + // AssertFetch(ArrayFromJSON(type, "[6]")); + // } void TestOptions() { auto type = boolean(); @@ -373,11 +330,8 @@ class InferringColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(options); AppendChunks({{"true", "false", "N/A"}, {"true"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[null, true, false]")); AssertFetch(ArrayFromJSON(type, "[null]")); - AssertFetchEOF(); - AssertFetchEOF(); } void TestErrors() { @@ -387,12 +341,9 @@ class InferringColumnDecoderTest : public ColumnDecoderTest { AppendChunks({{"123", "456", "-78"}, {"9.5", "N/A"}}); AppendChunks({{"1000", "N/A"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[123, 456, -78]")); AssertFetchInvalid(); AssertFetch(ArrayFromJSON(type, "[1000, null]")); - AssertFetchEOF(); - AssertFetchEOF(); } void TestEmpty() { @@ -401,25 +352,20 @@ class InferringColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(default_options); AppendChunks({{}, {}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[]")); AssertFetch(ArrayFromJSON(type, "[]")); - AssertFetchEOF(); - AssertFetchEOF(); } }; -TYPED_TEST_SUITE(InferringColumnDecoderTest, ExecutorTypes); - -TYPED_TEST(InferringColumnDecoderTest, Integers) { this->TestIntegers(); } +TEST_F(InferringColumnDecoderTest, Integers) { this->TestIntegers(); } -TYPED_TEST(InferringColumnDecoderTest, Threaded) { this->TestThreaded(); } +// TEST_F(InferringColumnDecoderTest, Threaded) { this->TestThreaded(); } -TYPED_TEST(InferringColumnDecoderTest, Options) { this->TestOptions(); } +TEST_F(InferringColumnDecoderTest, Options) { this->TestOptions(); } -TYPED_TEST(InferringColumnDecoderTest, Errors) { this->TestErrors(); } +TEST_F(InferringColumnDecoderTest, Errors) { this->TestErrors(); } -TYPED_TEST(InferringColumnDecoderTest, Empty) { this->TestEmpty(); } +TEST_F(InferringColumnDecoderTest, Empty) { this->TestEmpty(); } // More inference tests are in InferringColumnBuilderTest diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index d57a2f15667..f530bffdd72 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -50,6 +50,7 @@ #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" #include "arrow/util/utf8.h" +#include "arrow/util/vector.h" namespace arrow { namespace csv { @@ -349,6 +350,175 @@ class ThreadedBlockReader : public BlockReader { } }; +struct ParsedBlock { + std::shared_ptr parser; + int64_t block_index; + int64_t bytes_parsed_or_skipped; +}; + +} // namespace + +} // namespace csv + +template <> +struct IterationTraits { + static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; } + static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; } +}; + +namespace csv { + +namespace { + +// A functor that takes in a buffer of CSV data and returns a parsed batch of CSV data. +// The parsed batch contains a list of offsets for each of the columns so that columns +// can be individually scanned +// +// This operator is not re-entrant +class BlockParsingOperator { + public: + BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options, + int num_csv_cols, bool count_rows) + : io_context_(io_context), + parse_options_(parse_options), + num_csv_cols_(num_csv_cols), + count_rows_(count_rows) {} + + Result operator()(const CSVBlock& block) { + static constexpr int32_t max_num_rows = std::numeric_limits::max(); + auto parser = std::make_shared( + io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows); + + std::shared_ptr straddling; + std::vector views; + if (block.partial->size() != 0 || block.completion->size() != 0) { + if (block.partial->size() == 0) { + straddling = block.completion; + } else if (block.completion->size() == 0) { + straddling = block.partial; + } else { + ARROW_ASSIGN_OR_RAISE( + straddling, + ConcatenateBuffers({block.partial, block.completion}, io_context_.pool())); + } + views = {util::string_view(*straddling), util::string_view(*block.buffer)}; + } else { + views = {util::string_view(*block.buffer)}; + } + uint32_t parsed_size; + if (block.is_final) { + RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size)); + } else { + RETURN_NOT_OK(parser->Parse(views, &parsed_size)); + } + if (count_rows_) { + num_rows_seen_ += parser->num_rows(); + } + block.consume_bytes(parsed_size); + return ParsedBlock{std::move(parser), block.block_index, + static_cast(parsed_size) + block.bytes_skipped}; + } + + private: + io::IOContext io_context_; + ParseOptions parse_options_; + int num_csv_cols_; + bool count_rows_; + int num_rows_seen_ = 0; +}; + +class BlockDecodingOperator { + public: + Future> operator()(const ParsedBlock& block) { + DCHECK(!state_->column_decoders.empty()); + std::vector>> decoded_array_futs; + for (auto& decoder : state_->column_decoders) { + decoded_array_futs.push_back(decoder->Decode(block.parser)); + } + auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped; + auto decoded_arrays_fut = All(decoded_array_futs); + auto state = state_; + return decoded_arrays_fut.Then( + [state, bytes_parsed_or_skipped]( + const std::vector>>& maybe_decoded_arrays) + -> Result> { + state->bytes_decoded_->fetch_add(bytes_parsed_or_skipped); + ARROW_ASSIGN_OR_RAISE(auto decoded_arrays, + internal::UnwrapOrRaise(maybe_decoded_arrays)); + return state->DecodedArraysToBatch(decoded_arrays); + }); + } + + static Result Make(io::IOContext io_context, + ConvertOptions convert_options, + ConversionSchema conversion_schema, + std::atomic* bytes_decoded) { + BlockDecodingOperator op(std::move(io_context), std::move(convert_options), + std::move(conversion_schema), bytes_decoded); + RETURN_NOT_OK(op.state_->MakeColumnDecoders()); + return op; + } + + private: + BlockDecodingOperator(io::IOContext io_context, ConvertOptions convert_options, + ConversionSchema conversion_schema, + std::atomic* bytes_decoded) + : state_(std::make_shared(std::move(io_context), std::move(convert_options), + std::move(conversion_schema), bytes_decoded)) {} + + struct State { + State(io::IOContext io_context, ConvertOptions convert_options, + ConversionSchema conversion_schema, std::atomic* bytes_decoded) + : io_context(std::move(io_context)), + convert_options(std::move(convert_options)), + conversion_schema(std::move(conversion_schema)), + bytes_decoded_(bytes_decoded) {} + + Result> DecodedArraysToBatch( + std::vector>& arrays) { + if (schema == nullptr) { + FieldVector fields(arrays.size()); + for (size_t i = 0; i < arrays.size(); ++i) { + fields[i] = field(conversion_schema.columns[i].name, arrays[i]->type()); + } + schema = arrow::schema(std::move(fields)); + } + const auto n_rows = arrays[0]->length(); + return RecordBatch::Make(schema, n_rows, std::move(arrays)); + } + + // Make column decoders from conversion schema + Status MakeColumnDecoders() { + for (const auto& column : conversion_schema.columns) { + std::shared_ptr decoder; + if (column.is_missing) { + ARROW_ASSIGN_OR_RAISE(decoder, + ColumnDecoder::MakeNull(io_context.pool(), column.type)); + } else if (column.type != nullptr) { + ARROW_ASSIGN_OR_RAISE( + decoder, ColumnDecoder::Make(io_context.pool(), column.type, column.index, + convert_options)); + } else { + ARROW_ASSIGN_OR_RAISE( + decoder, + ColumnDecoder::Make(io_context.pool(), column.index, convert_options)); + } + column_decoders.push_back(std::move(decoder)); + } + return Status::OK(); + } + + io::IOContext io_context; + ConvertOptions convert_options; + ConversionSchema conversion_schema; + std::vector> column_decoders; + std::shared_ptr schema; + std::atomic* bytes_decoded_; + }; + + std::shared_ptr state_; +}; + ///////////////////////////////////////////////////////////////////////// // Base class for common functionality @@ -367,8 +537,9 @@ class ReaderMixin { protected: // Read header and column names from buffer, create column builders - Status ProcessHeader(const std::shared_ptr& buf, - std::shared_ptr* rest) { + // Returns the # of bytes consumed + Result ProcessHeader(const std::shared_ptr& buf, + std::shared_ptr* rest) { const uint8_t* data = buf->data(); const auto data_end = data + buf->size(); DCHECK_GT(data_end - data, 0); @@ -430,12 +601,14 @@ class ReaderMixin { num_rows_seen_ += read_options_.skip_rows_after_names; } - *rest = SliceBuffer(buf, data - buf->data()); + auto bytes_consumed = data - buf->data(); + *rest = SliceBuffer(buf, bytes_consumed); num_csv_cols_ = static_cast(column_names_.size()); DCHECK_GT(num_csv_cols_, 0); - return MakeConversionSchema(); + RETURN_NOT_OK(MakeConversionSchema()); + return bytes_consumed; } std::vector GenerateColumnNames(int32_t num_cols) { @@ -548,6 +721,8 @@ class ReaderMixin { return ParseResult{std::move(parser), static_cast(parsed_size)}; } + friend class HeaderParsingOperator; + io::IOContext io_context_; ReadOptions read_options_; ParseOptions parse_options_; @@ -642,123 +817,18 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader { ///////////////////////////////////////////////////////////////////////// // Base class for streaming readers -class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader { +class StreamingReaderImpl : public ReaderMixin, + public csv::StreamingReader, + public std::enable_shared_from_this { public: - BaseStreamingReader(io::IOContext io_context, Executor* cpu_executor, - std::shared_ptr input, + StreamingReaderImpl(io::IOContext io_context, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options, bool count_rows) : ReaderMixin(io_context, std::move(input), read_options, parse_options, convert_options, count_rows), - cpu_executor_(cpu_executor) {} - - virtual Future> Init() = 0; - - std::shared_ptr schema() const override { return schema_; } - - Status ReadNext(std::shared_ptr* batch) override { - auto next_fut = ReadNextAsync(); - auto next_result = next_fut.result(); - return std::move(next_result).Value(batch); - } + bytes_decoded_(0) {} - protected: - // Make column decoders from conversion schema - Status MakeColumnDecoders() { - for (const auto& column : conversion_schema_.columns) { - std::shared_ptr decoder; - if (column.is_missing) { - ARROW_ASSIGN_OR_RAISE(decoder, ColumnDecoder::MakeNull(io_context_.pool(), - column.type, task_group_)); - } else if (column.type != nullptr) { - ARROW_ASSIGN_OR_RAISE( - decoder, ColumnDecoder::Make(io_context_.pool(), column.type, column.index, - convert_options_, task_group_)); - } else { - ARROW_ASSIGN_OR_RAISE(decoder, - ColumnDecoder::Make(io_context_.pool(), column.index, - convert_options_, task_group_)); - } - column_decoders_.push_back(std::move(decoder)); - } - return Status::OK(); - } - - Result ParseAndInsert(const std::shared_ptr& partial, - const std::shared_ptr& completion, - const std::shared_ptr& block, - int64_t block_index, bool is_final) { - ARROW_ASSIGN_OR_RAISE(auto result, - Parse(partial, completion, block, block_index, is_final)); - RETURN_NOT_OK(ProcessData(result.parser, block_index)); - return result.parsed_bytes; - } - - // Trigger conversion of parsed block data - Status ProcessData(const std::shared_ptr& parser, int64_t block_index) { - for (auto& decoder : column_decoders_) { - decoder->Insert(block_index, parser); - } - return Status::OK(); - } - - Result> DecodeNextBatch() { - DCHECK(!column_decoders_.empty()); - ArrayVector arrays; - arrays.reserve(column_decoders_.size()); - Status st; - for (auto& decoder : column_decoders_) { - auto maybe_array = decoder->NextChunk(); - if (!maybe_array.ok()) { - // If there's an error, still fetch results from other decoders to - // keep them in sync. - st &= maybe_array.status(); - } else { - arrays.push_back(*std::move(maybe_array)); - } - } - RETURN_NOT_OK(st); - DCHECK_EQ(arrays.size(), column_decoders_.size()); - const bool is_null = (arrays[0] == nullptr); -#ifndef NDEBUG - for (const auto& array : arrays) { - DCHECK_EQ(array == nullptr, is_null); - } -#endif - if (is_null) { - eof_ = true; - return nullptr; - } - - if (schema_ == nullptr) { - FieldVector fields(arrays.size()); - for (size_t i = 0; i < arrays.size(); ++i) { - fields[i] = field(conversion_schema_.columns[i].name, arrays[i]->type()); - } - schema_ = arrow::schema(std::move(fields)); - } - const auto n_rows = arrays[0]->length(); - return RecordBatch::Make(schema_, n_rows, std::move(arrays)); - } - - // Column decoders (in ConversionSchema order) - std::vector> column_decoders_; - std::shared_ptr schema_; - std::shared_ptr pending_batch_; - AsyncGenerator> buffer_generator_; - Executor* cpu_executor_; - bool eof_ = false; -}; - -///////////////////////////////////////////////////////////////////////// -// Serial StreamingReader implementation - -class SerialStreamingReader : public BaseStreamingReader, - public std::enable_shared_from_this { - public: - using BaseStreamingReader::BaseStreamingReader; - - Future> Init() override { + Future<> Init(Executor* cpu_executor) { ARROW_ASSIGN_OR_RAISE(auto istream_it, io::MakeInputStreamIterator(input_, read_options_.block_size)); @@ -766,140 +836,93 @@ class SerialStreamingReader : public BaseStreamingReader, ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it), io_context_.executor())); - auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_); + auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor); - buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it)); - task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token()); + auto buffer_generator = CSVBufferIterator::MakeAsync(std::move(transferred_it)); + int max_readahead = cpu_executor->GetCapacity(); auto self = shared_from_this(); - // Read schema from first batch - return ReadNextAsync(true).Then( - [self](const std::shared_ptr& first_batch) - -> Result> { - self->pending_batch_ = first_batch; - DCHECK_NE(self->schema_, nullptr); - return self; - }); - } - Result> DecodeBatchAndUpdateSchema() { - auto maybe_batch = DecodeNextBatch(); - if (schema_ == nullptr && maybe_batch.ok()) { - schema_ = (*maybe_batch)->schema(); - } - return maybe_batch; + return buffer_generator().Then([self, buffer_generator, max_readahead]( + const std::shared_ptr& first_buffer) { + return self->InitAfterFirstBuffer(first_buffer, buffer_generator, max_readahead); + }); } - Future> DoReadNext( - std::shared_ptr self) { - auto batch = std::move(pending_batch_); - if (batch != nullptr) { - return Future>::MakeFinished(batch); - } + std::shared_ptr schema() const override { return schema_; } - if (!source_eof_) { - return block_generator_() - .Then([self](const CSVBlock& maybe_block) -> Status { - if (!IsIterationEnd(maybe_block)) { - self->bytes_parsed_ += maybe_block.bytes_skipped; - self->last_block_index_ = maybe_block.block_index; - auto maybe_parsed = self->ParseAndInsert( - maybe_block.partial, maybe_block.completion, maybe_block.buffer, - maybe_block.block_index, maybe_block.is_final); - if (!maybe_parsed.ok()) { - // Parse error => bail out - self->eof_ = true; - return maybe_parsed.status(); - } - self->bytes_parsed_ += *maybe_parsed; - RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed)); - } else { - self->source_eof_ = true; - for (auto& decoder : self->column_decoders_) { - decoder->SetEOF(self->last_block_index_ + 1); - } - } - return Status::OK(); - }) - .Then([self]() -> Result> { - return self->DecodeBatchAndUpdateSchema(); - }); - } - return Future>::MakeFinished( - DecodeBatchAndUpdateSchema()); - } + int64_t bytes_read() const override { return bytes_decoded_.load(); } - Future> ReadNextSkippingEmpty( - std::shared_ptr self, bool internal_read) { - return DoReadNext(self).Then( - [self, internal_read](const std::shared_ptr& batch) { - if (batch != nullptr && batch->num_rows() == 0) { - return self->ReadNextSkippingEmpty(self, internal_read); - } - if (!internal_read) { - self->bytes_decoded_ += self->bytes_parsed_; - self->bytes_parsed_ = 0; - } - return Future>::MakeFinished(batch); - }); + Status ReadNext(std::shared_ptr* batch) override { + auto next_fut = ReadNextAsync(); + auto next_result = next_fut.result(); + return std::move(next_result).Value(batch); } Future> ReadNextAsync() override { - return ReadNextAsync(false); - }; - - int64_t bytes_read() const override { return bytes_decoded_; } + return record_batch_gen_(); + } protected: - Future<> SetupReader(std::shared_ptr self) { - return buffer_generator_().Then([self](const std::shared_ptr& first_buffer) { - if (first_buffer == nullptr) { - return Status::Invalid("Empty CSV file"); - } - auto own_first_buffer = first_buffer; - auto start = own_first_buffer->data(); - RETURN_NOT_OK(self->ProcessHeader(own_first_buffer, &own_first_buffer)); - self->bytes_decoded_ = own_first_buffer->data() - start; - RETURN_NOT_OK(self->MakeColumnDecoders()); - - self->block_generator_ = SerialBlockReader::MakeAsyncIterator( - std::move(self->buffer_generator_), MakeChunker(self->parse_options_), - std::move(own_first_buffer), self->read_options_.skip_rows_after_names); - return Status::OK(); - }); + Future<> InitAfterFirstBuffer(const std::shared_ptr& first_buffer, + AsyncGenerator> buffer_generator, + int max_readahead) { + if (first_buffer == nullptr) { + return Status::Invalid("Empty CSV file"); + } + + std::shared_ptr after_header; + ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed, + ProcessHeader(first_buffer, &after_header)); + bytes_decoded_.fetch_add(header_bytes_consumed); + auto parser_op = + BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, count_rows_); + ARROW_ASSIGN_OR_RAISE(auto decoder_op, BlockDecodingOperator::Make( + io_context_, convert_options_, + conversion_schema_, &bytes_decoded_)); + auto block_gen = SerialBlockReader::MakeAsyncIterator( + std::move(buffer_generator), MakeChunker(parse_options_), std::move(after_header), + read_options_.skip_rows_after_names); + auto parsed_block_gen = + MakeMappedGenerator(std::move(block_gen), std::move(parser_op)); + auto rb_gen = MakeMappedGenerator>( + std::move(parsed_block_gen), decoder_op); + auto self = shared_from_this(); + return rb_gen().Then( + [self, rb_gen, max_readahead](const std::shared_ptr& first_batch) { + return self->InitAfterFirstBatch(first_batch, std::move(rb_gen), max_readahead); + }); } - Future> ReadNextAsync(bool internal_read) { - if (eof_) { - return Future>::MakeFinished(nullptr); - } - if (io_context_.stop_token().IsStopRequested()) { - eof_ = true; - return io_context_.stop_token().Poll(); + Status InitAfterFirstBatch(const std::shared_ptr& first_batch, + AsyncGenerator> batch_gen, + int max_readahead) { + schema_ = first_batch->schema(); + + AsyncGenerator> readahead_gen; + if (read_options_.use_threads) { + readahead_gen = MakeReadaheadGenerator(std::move(batch_gen), max_readahead); + } else { + readahead_gen = std::move(batch_gen); } - auto self = shared_from_this(); - if (!block_generator_) { - return SetupReader(self).Then( - [self, internal_read]() -> Future> { - return self->ReadNextSkippingEmpty(self, internal_read); - }, - [self](const Status& err) -> Result> { - self->eof_ = true; - return err; - }); + + AsyncGenerator> restarted_gen; + // Streaming reader should not emit empty record batches + if (first_batch->num_rows() > 0) { + restarted_gen = MakeGeneratorStartsWith({first_batch}, std::move(readahead_gen)); } else { - return self->ReadNextSkippingEmpty(self, internal_read); + restarted_gen = std::move(readahead_gen); } + record_batch_gen_ = + MakeCancellable(std::move(restarted_gen), io_context_.stop_token()); + return Status::OK(); } - bool source_eof_ = false; - int64_t last_block_index_ = 0; - AsyncGenerator block_generator_; - // bytes of data parsed but not yet decoded - int64_t bytes_parsed_ = 0; + std::shared_ptr schema_; + AsyncGenerator> record_batch_gen_; // bytes which have been decoded for caller - int64_t bytes_decoded_ = 0; -}; + std::atomic bytes_decoded_; +}; // namespace ///////////////////////////////////////////////////////////////////////// // Serial TableReader implementation @@ -1089,11 +1112,13 @@ Future> MakeStreamingReader( RETURN_NOT_OK(parse_options.Validate()); RETURN_NOT_OK(read_options.Validate()); RETURN_NOT_OK(convert_options.Validate()); - std::shared_ptr reader; - reader = std::make_shared( - io_context, cpu_executor, input, read_options, parse_options, convert_options, - /*count_rows=*/true); - return reader->Init(); + std::shared_ptr reader; + reader = std::make_shared(io_context, input, read_options, + parse_options, convert_options, + /*count_rows=*/true); + return reader->Init(cpu_executor).Then([reader] { + return std::dynamic_pointer_cast(reader); + }); } ///////////////////////////////////////////////////////////////////////// @@ -1139,8 +1164,9 @@ class CSVRowCounter : public ReaderMixin, } Future DoCount(const std::shared_ptr& self) { - // We must return a value instead of Status/Future<> to work with MakeMappedGenerator, - // and we must use a type with a valid end value to work with IterationEnd. + // We must return a value instead of Status/Future<> to work with + // MakeMappedGenerator, and we must use a type with a valid end value to work with + // IterationEnd. std::function>(const CSVBlock&)> count_cb = [self](const CSVBlock& maybe_block) -> Result> { ARROW_ASSIGN_OR_RAISE( diff --git a/cpp/src/arrow/csv/reader_test.cc b/cpp/src/arrow/csv/reader_test.cc index 1ab49fa8664..2b22fb6127d 100644 --- a/cpp/src/arrow/csv/reader_test.cc +++ b/cpp/src/arrow/csv/reader_test.cc @@ -67,6 +67,38 @@ class StreamingReaderAsTableReader : public TableReader { using TableReaderFactory = std::function>(std::shared_ptr)>; +using StreamingReaderFactory = std::function>( + std::shared_ptr)>; + +void TestEmptyTable(TableReaderFactory reader_factory) { + auto empty_buffer = std::make_shared(""); + auto empty_input = std::make_shared(empty_buffer); + auto maybe_reader = reader_factory(empty_input); + // Streaming reader fails on open, table readers fail on first read + if (maybe_reader.ok()) { + ASSERT_FINISHES_AND_RAISES(Invalid, (*maybe_reader)->ReadAsync()); + } else { + ASSERT_TRUE(maybe_reader.status().IsInvalid()); + } +} + +void TestHeaderOnly(TableReaderFactory reader_factory) { + auto header_only_buffer = std::make_shared("a,b,c\n"); + auto input = std::make_shared(header_only_buffer); + ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input)); + ASSERT_FINISHES_OK_AND_ASSIGN(auto table, reader->ReadAsync()); + ASSERT_EQ(table->schema()->num_fields(), 3); + ASSERT_EQ(table->num_rows(), 0); +} + +void TestHeaderOnlyStreaming(StreamingReaderFactory reader_factory) { + auto header_only_buffer = std::make_shared("a,b,c\n"); + auto input = std::make_shared(header_only_buffer); + ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input)); + std::shared_ptr next_batch; + ASSERT_OK(reader->ReadNext(&next_batch)); + ASSERT_EQ(next_batch, nullptr); +} void StressTableReader(TableReaderFactory reader_factory) { #ifdef ARROW_VALGRIND @@ -151,6 +183,8 @@ TableReaderFactory MakeSerialFactory() { }; } +TEST(SerialReaderTests, Empty) { TestEmptyTable(MakeSerialFactory()); } +TEST(SerialReaderTests, HeaderOnly) { TestHeaderOnly(MakeSerialFactory()); } TEST(SerialReaderTests, Stress) { StressTableReader(MakeSerialFactory()); } TEST(SerialReaderTests, StressInvalid) { StressInvalidTableReader(MakeSerialFactory()); } TEST(SerialReaderTests, NestedParallelism) { @@ -175,6 +209,14 @@ Result MakeAsyncFactory( }; } +TEST(AsyncReaderTests, Empty) { + ASSERT_OK_AND_ASSIGN(auto table_factory, MakeAsyncFactory()); + TestEmptyTable(table_factory); +} +TEST(AsyncReaderTests, HeaderOnly) { + ASSERT_OK_AND_ASSIGN(auto table_factory, MakeAsyncFactory()); + TestHeaderOnly(table_factory); +} TEST(AsyncReaderTests, Stress) { ASSERT_OK_AND_ASSIGN(auto table_factory, MakeAsyncFactory()); StressTableReader(table_factory); @@ -194,6 +236,7 @@ Result MakeStreamingFactory() { -> Result> { auto read_options = ReadOptions::Defaults(); read_options.block_size = 1 << 10; + read_options.use_threads = true; ARROW_ASSIGN_OR_RAISE( auto streaming_reader, StreamingReader::Make(io::default_io_context(), input_stream, read_options, @@ -202,6 +245,25 @@ Result MakeStreamingFactory() { }; } +Result MakeStreamingReaderFactory() { + return [](std::shared_ptr input_stream) + -> Result> { + auto read_options = ReadOptions::Defaults(); + read_options.block_size = 1 << 10; + read_options.use_threads = true; + return StreamingReader::Make(io::default_io_context(), input_stream, read_options, + ParseOptions::Defaults(), ConvertOptions::Defaults()); + }; +} + +TEST(StreamingReaderTests, Empty) { + ASSERT_OK_AND_ASSIGN(auto table_factory, MakeStreamingFactory()); + TestEmptyTable(table_factory); +} +TEST(StreamingReaderTests, HeaderOnly) { + ASSERT_OK_AND_ASSIGN(auto table_factory, MakeStreamingReaderFactory()); + TestHeaderOnlyStreaming(table_factory); +} TEST(StreamingReaderTests, Stress) { ASSERT_OK_AND_ASSIGN(auto table_factory, MakeStreamingFactory()); StressTableReader(table_factory); @@ -227,18 +289,23 @@ TEST(StreamingReaderTest, BytesRead) { auto read_options = ReadOptions::Defaults(); read_options.block_size = 20; + read_options.use_threads = false; ASSERT_OK_AND_ASSIGN( auto streaming_reader, StreamingReader::Make(io::default_io_context(), input, read_options, ParseOptions::Defaults(), ConvertOptions::Defaults())); std::shared_ptr batch; - int64_t bytes = 6; // Size of header + int64_t bytes = 18; // Size of header and first batch do { ASSERT_EQ(bytes, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); bytes += 12; // Add size of each row - } while (batch); + } while (bytes <= 42); + ASSERT_EQ(42, streaming_reader->bytes_read()); + // Should be able to read past the end without bumping bytes_read + ASSERT_OK(streaming_reader->ReadNext(&batch)); ASSERT_EQ(42, streaming_reader->bytes_read()); + ASSERT_EQ(batch.get(), nullptr); } // Interaction of skip_rows and bytes_read() @@ -246,18 +313,23 @@ TEST(StreamingReaderTest, BytesRead) { auto input = std::make_shared(table_buffer); auto read_options = ReadOptions::Defaults(); - read_options.skip_rows = 2; + read_options.skip_rows = 1; + read_options.block_size = 32; ASSERT_OK_AND_ASSIGN( auto streaming_reader, StreamingReader::Make(io::default_io_context(), input, read_options, ParseOptions::Defaults(), ConvertOptions::Defaults())); std::shared_ptr batch; - // first two rows and third row as header + // Skip the actual header (6 bytes) and then treat first row as header (12 bytes) + // and then streaming reader reads in first batch (12 bytes) ASSERT_EQ(30, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); ASSERT_NE(batch.get(), nullptr); ASSERT_EQ(42, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); + ASSERT_NE(batch.get(), nullptr); + ASSERT_EQ(42, streaming_reader->bytes_read()); + ASSERT_OK(streaming_reader->ReadNext(&batch)); ASSERT_EQ(batch.get(), nullptr); } @@ -266,7 +338,8 @@ TEST(StreamingReaderTest, BytesRead) { auto input = std::make_shared(table_buffer); auto read_options = ReadOptions::Defaults(); - read_options.skip_rows_after_names = 2; + read_options.block_size = 32; + read_options.skip_rows_after_names = 1; ASSERT_OK_AND_ASSIGN( auto streaming_reader, @@ -274,8 +347,12 @@ TEST(StreamingReaderTest, BytesRead) { ParseOptions::Defaults(), ConvertOptions::Defaults())); std::shared_ptr batch; - // Just header - ASSERT_EQ(6, streaming_reader->bytes_read()); + // To open the header is read (6 bytes) and the first batch (12 bytes) but to get to + // it we have to skip 1 row (12 bytes) + ASSERT_EQ(30, streaming_reader->bytes_read()); + ASSERT_OK(streaming_reader->ReadNext(&batch)); + ASSERT_NE(batch.get(), nullptr); + ASSERT_EQ(42, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); ASSERT_NE(batch.get(), nullptr); ASSERT_EQ(42, streaming_reader->bytes_read()); diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index c2aad6cd680..8992e7bcac2 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -907,6 +907,8 @@ AsyncGenerator MakeVectorGenerator(std::vector vec) { return [state]() { auto idx = state->vec_idx.fetch_add(1); if (idx >= state->vec.size()) { + // Eagerly return memory + state->vec.clear(); return AsyncGeneratorEnd(); } return Future::MakeFinished(state->vec[idx]); diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 050342de747..f099cfcb05b 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -1322,25 +1322,6 @@ def test_inference_failure(self): with pytest.raises(StopIteration): reader.read_next_batch() - # Inference on first block, then conversion failure on second block, - # then success on third block - rows = b"a,b\n1,2\nabc,def\n45,67\n" - read_options.block_size = 8 - reader = self.open_bytes(rows, read_options=read_options) - expected_schema = pa.schema([('a', pa.int64()), - ('b', pa.int64())]) - assert reader.schema == expected_schema - assert reader.read_next_batch().to_pydict() == {'a': [1], 'b': [2]} - # Second block - with pytest.raises(ValueError, - match="CSV conversion error to int64"): - reader.read_next_batch() - # Third block - assert reader.read_next_batch().to_pydict() == {'a': [45], 'b': [67]} - # EOF - with pytest.raises(StopIteration): - reader.read_next_batch() - def test_invalid_csv(self): # CSV errors on first block rows = b"a,b\n1,2,3\n4,5\n6,7\n" @@ -1519,8 +1500,8 @@ def test_stress_block_sizes(self): class TestSerialStreamingCSVRead(BaseTestStreamingCSVRead, unittest.TestCase): def open_csv(self, *args, **kwargs): - read_options = kwargs.setdefault('read_options', ReadOptions()) - read_options.use_threads = False + # read_options = kwargs.setdefault('read_options', ReadOptions()) + # read_options.use_threads = False return open_csv(*args, **kwargs) def test_batch_lifetime(self): @@ -1540,11 +1521,11 @@ def check_one_batch(reader, expected): check_one_batch(reader, {'a': [10], 'b': [11]}) allocated_after_first_batch = pa.total_allocated_bytes() check_one_batch(reader, {'a': [12], 'b': [13]}) - assert pa.total_allocated_bytes() == allocated_after_first_batch + assert pa.total_allocated_bytes() <= allocated_after_first_batch check_one_batch(reader, {'a': [14], 'b': [15]}) - assert pa.total_allocated_bytes() == allocated_after_first_batch + assert pa.total_allocated_bytes() <= allocated_after_first_batch check_one_batch(reader, {'a': [16], 'b': [17]}) - assert pa.total_allocated_bytes() == allocated_after_first_batch + assert pa.total_allocated_bytes() <= allocated_after_first_batch with pytest.raises(StopIteration): reader.read_next_batch() assert pa.total_allocated_bytes() == old_allocated From efe3292b8301869369a0eecbee96a6224a944182 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 23 Jun 2021 23:33:13 -1000 Subject: [PATCH 02/10] ARROW-11889: Removed debugging entry in CMakeLists, added back in some commented out column decoder tests --- cpp/src/arrow/csv/CMakeLists.txt | 1 - cpp/src/arrow/csv/column_decoder_test.cc | 84 ++++++++++++++---------- 2 files changed, 48 insertions(+), 37 deletions(-) diff --git a/cpp/src/arrow/csv/CMakeLists.txt b/cpp/src/arrow/csv/CMakeLists.txt index 8b3d484e99e..561faf1b584 100644 --- a/cpp/src/arrow/csv/CMakeLists.txt +++ b/cpp/src/arrow/csv/CMakeLists.txt @@ -28,7 +28,6 @@ if(ARROW_COMPUTE) list(APPEND CSV_TEST_SRCS writer_test.cc) endif() -add_arrow_test(csv-decoder-test SOURCES column_decoder_test.cc) add_arrow_test(csv-test SOURCES ${CSV_TEST_SRCS}) add_arrow_benchmark(converter_benchmark PREFIX "arrow-csv") diff --git a/cpp/src/arrow/csv/column_decoder_test.cc b/cpp/src/arrow/csv/column_decoder_test.cc index 48d9d97121d..66fcf327096 100644 --- a/cpp/src/arrow/csv/column_decoder_test.cc +++ b/cpp/src/arrow/csv/column_decoder_test.cc @@ -109,6 +109,12 @@ class ColumnDecoderTest : public ::testing::Test { AssertArraysEqual(*expected, *decoded); } + void AssertChunkInvalid(std::vector chunk) { + std::shared_ptr parser; + MakeColumnParser(chunk, &parser); + ASSERT_FINISHES_AND_RAISES(Invalid, decoder_->Decode(parser)); + } + void AssertFetch(std::shared_ptr expected_chunk) { ASSERT_OK_AND_ASSIGN(auto chunk, NextChunk()); ASSERT_NE(chunk, nullptr); @@ -249,23 +255,21 @@ class TypedColumnDecoderTest : public ColumnDecoderTest { AssertFetch(ArrayFromJSON(type, "[null, 1000]")); } - // void TestThreaded() { - // auto type = uint32(); - - // MakeDecoder(type, default_options); - - // auto joiner = RunThread([&]() { - // InsertChunk(1, {"4", "-5"}); - // InsertChunk(0, {"1", "2", "3"}); - // InsertChunk(3, {"6"}); - // InsertChunk(2, {}); - // }); + void TestThreaded() { + constexpr int NITERS = 10; + auto type = uint32(); + MakeDecoder(type, default_options); - // AssertFetch(ArrayFromJSON(type, "[1, 2, 3]")); - // AssertFetchInvalid(); - // AssertFetch(ArrayFromJSON(type, "[]")); - // AssertFetch(ArrayFromJSON(type, "[6]")); - // } + RunThreadsAndJoin( + [&](int thread_id) { + if (thread_id % 2 == 0) { + AssertChunkInvalid({"4", "-5"}); + } else { + AssertChunk({"1", "2", "3"}, ArrayFromJSON(type, "[1, 2, 3]")); + } + }, + NITERS); + } }; TEST_F(TypedColumnDecoderTest, Integers) { this->TestIntegers(); } @@ -274,7 +278,7 @@ TEST_F(TypedColumnDecoderTest, Options) { this->TestOptions(); } TEST_F(TypedColumnDecoderTest, Errors) { this->TestErrors(); } -// TEST_F(TypedColumnDecoderTest, Threaded) { this->TestThreaded(); } +TEST_F(TypedColumnDecoderTest, Threaded) { this->TestThreaded(); } ////////////////////////////////////////////////////////////////////////// // Tests for type-inferring column decoder @@ -299,26 +303,34 @@ class InferringColumnDecoderTest : public ColumnDecoderTest { AssertFetch(ArrayFromJSON(type, "[901, null]")); } - // void TestThreaded() { - // auto type = float64(); - - // MakeDecoder(default_options); - - // auto joiner = RunThread([&]() { - // SleepFor(1e-3); - // InsertChunk(0, {"1.5", "2", "3"}); - // InsertChunk(3, {"6"}); - // }); + void TestThreaded() { + constexpr int NITERS = 10; + auto type = float64(); + MakeDecoder(default_options); - // // These chunks will wait for inference to run on chunk 0 - // InsertChunk(1, {"4", "-5", "N/A"}); - // InsertChunk(2, {}); + // One of these will do the inference so we need to make sure they all have floating + // point + RunThreadsAndJoin( + [&](int thread_id) { + if (thread_id % 2 == 0) { + AssertChunk({"6.3", "7.2"}, ArrayFromJSON(type, "[6.3, 7.2]")); + } else { + AssertChunk({"1.1", "2", "3"}, ArrayFromJSON(type, "[1.1, 2, 3]")); + } + }, + NITERS); - // AssertFetch(ArrayFromJSON(type, "[1.5, 2, 3]")); - // AssertFetch(ArrayFromJSON(type, "[4, -5, null]")); - // AssertFetch(ArrayFromJSON(type, "[]")); - // AssertFetch(ArrayFromJSON(type, "[6]")); - // } + // These will run after the inference + RunThreadsAndJoin( + [&](int thread_id) { + if (thread_id % 2 == 0) { + AssertChunk({"1", "2"}, ArrayFromJSON(type, "[1, 2]")); + } else { + AssertChunkInvalid({"xyz"}); + } + }, + NITERS); + } void TestOptions() { auto type = boolean(); @@ -359,7 +371,7 @@ class InferringColumnDecoderTest : public ColumnDecoderTest { TEST_F(InferringColumnDecoderTest, Integers) { this->TestIntegers(); } -// TEST_F(InferringColumnDecoderTest, Threaded) { this->TestThreaded(); } +TEST_F(InferringColumnDecoderTest, Threaded) { this->TestThreaded(); } TEST_F(InferringColumnDecoderTest, Options) { this->TestOptions(); } From 27ac193de22a3d4343e49bd4dbbe14a35686d7ef Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 23 Jun 2021 23:50:17 -1000 Subject: [PATCH 03/10] ARROW-11889: Lint --- cpp/src/arrow/csv/column_decoder.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/csv/column_decoder.h b/cpp/src/arrow/csv/column_decoder.h index ffaa1a1a827..5fbbd5df58b 100644 --- a/cpp/src/arrow/csv/column_decoder.h +++ b/cpp/src/arrow/csv/column_decoder.h @@ -57,7 +57,7 @@ class ARROW_EXPORT ColumnDecoder { std::shared_ptr type); protected: - explicit ColumnDecoder() = default; + ColumnDecoder() = default; }; } // namespace csv From 74cf9fe5c7de7283bd44ab405d980f7badadb22a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 23 Jun 2021 23:59:28 -1000 Subject: [PATCH 04/10] ARROW-11889: Compiler warnings --- cpp/src/arrow/csv/reader.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index f530bffdd72..e2ed8815ae1 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -414,7 +414,7 @@ class BlockParsingOperator { if (count_rows_) { num_rows_seen_ += parser->num_rows(); } - block.consume_bytes(parsed_size); + RETURN_NOT_OK(block.consume_bytes(parsed_size)); return ParsedBlock{std::move(parser), block.block_index, static_cast(parsed_size) + block.bytes_skipped}; } From 19ad0fe5ce4f70c78657cdcda90720f4c287faad Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 24 Jun 2021 07:58:49 -1000 Subject: [PATCH 05/10] ARROW-11889: More compiler warnings --- cpp/src/arrow/csv/column_decoder_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/csv/column_decoder_test.cc b/cpp/src/arrow/csv/column_decoder_test.cc index 66fcf327096..c8b96e04696 100644 --- a/cpp/src/arrow/csv/column_decoder_test.cc +++ b/cpp/src/arrow/csv/column_decoder_test.cc @@ -98,7 +98,7 @@ class ColumnDecoderTest : public ::testing::Test { } Result> NextChunk() { - EXPECT_LT(read_ptr_, decoded_chunks_.size()); + EXPECT_LT(read_ptr_, static_cast(decoded_chunks_.size())); return decoded_chunks_[read_ptr_++].result(); } From c57c6d71d1382582ea41894c576487dd06580ac4 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 7 Jul 2021 16:35:57 -1000 Subject: [PATCH 06/10] ARROW-11889: Addressing review feedback. --- cpp/src/arrow/csv/reader.cc | 99 ++++++++++++++++++-------------- cpp/src/arrow/csv/reader_test.cc | 21 +++---- python/pyarrow/tests/test_csv.py | 95 +++++++++++++++--------------- 3 files changed, 113 insertions(+), 102 deletions(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index e2ed8815ae1..ea058f74ea4 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -356,6 +356,13 @@ struct ParsedBlock { int64_t bytes_parsed_or_skipped; }; +struct DecodedBlock { + std::shared_ptr record_batch; + // Represents the number of input bytes represented by this batch + // This will include bytes skipped when skipping rows after the header + int64_t bytes_processed; +}; + } // namespace } // namespace csv @@ -366,8 +373,13 @@ struct IterationTraits { static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; } }; -namespace csv { +template <> +struct IterationTraits { + static csv::DecodedBlock End() { return csv::DecodedBlock{nullptr, -1}; } + static bool IsEnd(const csv::DecodedBlock& val) { return val.bytes_processed < 0; } +}; +namespace csv { namespace { // A functor that takes in a buffer of CSV data and returns a parsed batch of CSV data. @@ -429,7 +441,7 @@ class BlockParsingOperator { class BlockDecodingOperator { public: - Future> operator()(const ParsedBlock& block) { + Future operator()(const ParsedBlock& block) { DCHECK(!state_->column_decoders.empty()); std::vector>> decoded_array_futs; for (auto& decoder : state_->column_decoders) { @@ -441,38 +453,35 @@ class BlockDecodingOperator { return decoded_arrays_fut.Then( [state, bytes_parsed_or_skipped]( const std::vector>>& maybe_decoded_arrays) - -> Result> { - state->bytes_decoded_->fetch_add(bytes_parsed_or_skipped); + -> Result { ARROW_ASSIGN_OR_RAISE(auto decoded_arrays, internal::UnwrapOrRaise(maybe_decoded_arrays)); - return state->DecodedArraysToBatch(decoded_arrays); + + ARROW_ASSIGN_OR_RAISE(auto batch, state->DecodedArraysToBatch(decoded_arrays)); + return DecodedBlock{std::move(batch), bytes_parsed_or_skipped}; }); } static Result Make(io::IOContext io_context, ConvertOptions convert_options, - ConversionSchema conversion_schema, - std::atomic* bytes_decoded) { + ConversionSchema conversion_schema) { BlockDecodingOperator op(std::move(io_context), std::move(convert_options), - std::move(conversion_schema), bytes_decoded); - RETURN_NOT_OK(op.state_->MakeColumnDecoders()); + std::move(conversion_schema)); + RETURN_NOT_OK(op.state_->MakeColumnDecoders(io_context)); return op; } private: BlockDecodingOperator(io::IOContext io_context, ConvertOptions convert_options, - ConversionSchema conversion_schema, - std::atomic* bytes_decoded) + ConversionSchema conversion_schema) : state_(std::make_shared(std::move(io_context), std::move(convert_options), - std::move(conversion_schema), bytes_decoded)) {} + std::move(conversion_schema))) {} struct State { State(io::IOContext io_context, ConvertOptions convert_options, - ConversionSchema conversion_schema, std::atomic* bytes_decoded) - : io_context(std::move(io_context)), - convert_options(std::move(convert_options)), - conversion_schema(std::move(conversion_schema)), - bytes_decoded_(bytes_decoded) {} + ConversionSchema conversion_schema) + : convert_options(std::move(convert_options)), + conversion_schema(std::move(conversion_schema)) {} Result> DecodedArraysToBatch( std::vector>& arrays) { @@ -488,7 +497,7 @@ class BlockDecodingOperator { } // Make column decoders from conversion schema - Status MakeColumnDecoders() { + Status MakeColumnDecoders(io::IOContext io_context) { for (const auto& column : conversion_schema.columns) { std::shared_ptr decoder; if (column.is_missing) { @@ -508,12 +517,10 @@ class BlockDecodingOperator { return Status::OK(); } - io::IOContext io_context; ConvertOptions convert_options; ConversionSchema conversion_schema; std::vector> column_decoders; std::shared_ptr schema; - std::atomic* bytes_decoded_; }; std::shared_ptr state_; @@ -721,8 +728,6 @@ class ReaderMixin { return ParseResult{std::move(parser), static_cast(parsed_size)}; } - friend class HeaderParsingOperator; - io::IOContext io_context_; ReadOptions read_options_; ParseOptions parse_options_; @@ -877,52 +882,60 @@ class StreamingReaderImpl : public ReaderMixin, bytes_decoded_.fetch_add(header_bytes_consumed); auto parser_op = BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, count_rows_); - ARROW_ASSIGN_OR_RAISE(auto decoder_op, BlockDecodingOperator::Make( - io_context_, convert_options_, - conversion_schema_, &bytes_decoded_)); + ARROW_ASSIGN_OR_RAISE( + auto decoder_op, + BlockDecodingOperator::Make(io_context_, convert_options_, conversion_schema_)); auto block_gen = SerialBlockReader::MakeAsyncIterator( std::move(buffer_generator), MakeChunker(parse_options_), std::move(after_header), read_options_.skip_rows_after_names); auto parsed_block_gen = MakeMappedGenerator(std::move(block_gen), std::move(parser_op)); - auto rb_gen = MakeMappedGenerator>( - std::move(parsed_block_gen), decoder_op); + auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), + std::move(decoder_op)); auto self = shared_from_this(); - return rb_gen().Then( - [self, rb_gen, max_readahead](const std::shared_ptr& first_batch) { - return self->InitAfterFirstBatch(first_batch, std::move(rb_gen), max_readahead); - }); + return rb_gen().Then([self, rb_gen, max_readahead](const DecodedBlock& first_block) { + return self->InitAfterFirstBatch(first_block, std::move(rb_gen), max_readahead); + }); } - Status InitAfterFirstBatch(const std::shared_ptr& first_batch, - AsyncGenerator> batch_gen, - int max_readahead) { - schema_ = first_batch->schema(); + Status InitAfterFirstBatch(const DecodedBlock& first_block, + AsyncGenerator batch_gen, int max_readahead) { + schema_ = first_block.record_batch->schema(); - AsyncGenerator> readahead_gen; + AsyncGenerator readahead_gen; if (read_options_.use_threads) { readahead_gen = MakeReadaheadGenerator(std::move(batch_gen), max_readahead); } else { readahead_gen = std::move(batch_gen); } - AsyncGenerator> restarted_gen; + AsyncGenerator restarted_gen; // Streaming reader should not emit empty record batches - if (first_batch->num_rows() > 0) { - restarted_gen = MakeGeneratorStartsWith({first_batch}, std::move(readahead_gen)); + if (first_block.record_batch->num_rows() > 0) { + restarted_gen = MakeGeneratorStartsWith({first_block}, std::move(readahead_gen)); } else { restarted_gen = std::move(readahead_gen); } - record_batch_gen_ = - MakeCancellable(std::move(restarted_gen), io_context_.stop_token()); + + auto self = shared_from_this(); + auto unwrap_and_record_bytes = + [self](const DecodedBlock& block) -> Result> { + self->bytes_decoded_.fetch_add(block.bytes_processed); + return block.record_batch; + }; + + auto unwrapped = MakeMappedGenerator>( + std::move(restarted_gen), std::move(unwrap_and_record_bytes)); + + record_batch_gen_ = MakeCancellable(std::move(unwrapped), io_context_.stop_token()); return Status::OK(); } std::shared_ptr schema_; AsyncGenerator> record_batch_gen_; - // bytes which have been decoded for caller + // bytes which have been decoded and asked for by the caller std::atomic bytes_decoded_; -}; // namespace +}; ///////////////////////////////////////////////////////////////////////// // Serial TableReader implementation diff --git a/cpp/src/arrow/csv/reader_test.cc b/cpp/src/arrow/csv/reader_test.cc index 2b22fb6127d..88ead7677f3 100644 --- a/cpp/src/arrow/csv/reader_test.cc +++ b/cpp/src/arrow/csv/reader_test.cc @@ -295,16 +295,13 @@ TEST(StreamingReaderTest, BytesRead) { StreamingReader::Make(io::default_io_context(), input, read_options, ParseOptions::Defaults(), ConvertOptions::Defaults())); std::shared_ptr batch; - int64_t bytes = 18; // Size of header and first batch + int64_t bytes = 6; // Size of header (counted during StreamingReader::Make) do { ASSERT_EQ(bytes, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); bytes += 12; // Add size of each row } while (bytes <= 42); ASSERT_EQ(42, streaming_reader->bytes_read()); - // Should be able to read past the end without bumping bytes_read - ASSERT_OK(streaming_reader->ReadNext(&batch)); - ASSERT_EQ(42, streaming_reader->bytes_read()); ASSERT_EQ(batch.get(), nullptr); } @@ -320,12 +317,12 @@ TEST(StreamingReaderTest, BytesRead) { StreamingReader::Make(io::default_io_context(), input, read_options, ParseOptions::Defaults(), ConvertOptions::Defaults())); std::shared_ptr batch; - // Skip the actual header (6 bytes) and then treat first row as header (12 bytes) - // and then streaming reader reads in first batch (12 bytes) - ASSERT_EQ(30, streaming_reader->bytes_read()); + // The header (6 bytes) and first skipped row (12 bytes) are counted during + // StreamingReader::Make + ASSERT_EQ(18, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); ASSERT_NE(batch.get(), nullptr); - ASSERT_EQ(42, streaming_reader->bytes_read()); + ASSERT_EQ(30, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); ASSERT_NE(batch.get(), nullptr); ASSERT_EQ(42, streaming_reader->bytes_read()); @@ -347,12 +344,12 @@ TEST(StreamingReaderTest, BytesRead) { ParseOptions::Defaults(), ConvertOptions::Defaults())); std::shared_ptr batch; - // To open the header is read (6 bytes) and the first batch (12 bytes) but to get to - // it we have to skip 1 row (12 bytes) - ASSERT_EQ(30, streaming_reader->bytes_read()); + // The header is read as part of StreamingReader::Make + ASSERT_EQ(6, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); ASSERT_NE(batch.get(), nullptr); - ASSERT_EQ(42, streaming_reader->bytes_read()); + // Next the skipped batch (12 bytes) and 1 row (12 bytes) + ASSERT_EQ(30, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); ASSERT_NE(batch.get(), nullptr); ASSERT_EQ(42, streaming_reader->bytes_read()); diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index f099cfcb05b..e4a4e3e5935 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -1250,10 +1250,16 @@ def read_csv(self, *args, validate_full=True, **kwargs): return table -class BaseTestStreamingCSVRead: +@pytest.mark.parametrize('use_threads', [False, True]) +class TestStreamingCSVRead: - def open_bytes(self, b, **kwargs): - return self.open_csv(pa.py_buffer(b), **kwargs) + def open_bytes(self, b, use_threads, **kwargs): + return self.open_csv(pa.py_buffer(b), use_threads, **kwargs) + + def open_csv(self, b, use_threads, *args, **kwargs): + read_options = kwargs.setdefault('read_options', ReadOptions()) + read_options.use_threads = use_threads + return open_csv(b, *args, **kwargs) def check_reader(self, reader, expected_schema, expected_data): assert reader.schema == expected_schema @@ -1264,24 +1270,24 @@ def check_reader(self, reader, expected_schema, expected_data): assert batch.schema == expected_schema assert batch.to_pydict() == expected_batch - def test_file_object(self): + def test_file_object(self, use_threads): data = b"a,b\n1,2\n3,4\n" expected_data = {'a': [1, 3], 'b': [2, 4]} bio = io.BytesIO(data) - reader = self.open_csv(bio) + reader = self.open_csv(bio, use_threads) expected_schema = pa.schema([('a', pa.int64()), ('b', pa.int64())]) self.check_reader(reader, expected_schema, [expected_data]) - def test_header(self): + def test_header(self, use_threads): rows = b"abc,def,gh\n" - reader = self.open_bytes(rows) + reader = self.open_bytes(rows, use_threads) expected_schema = pa.schema([('abc', pa.null()), ('def', pa.null()), ('gh', pa.null())]) self.check_reader(reader, expected_schema, []) - def test_inference(self): + def test_inference(self, use_threads): # Inference is done on first block rows = b"a,b\n123,456\nabc,de\xff\ngh,ij\n" expected_schema = pa.schema([('a', pa.string()), @@ -1289,25 +1295,25 @@ def test_inference(self): read_options = ReadOptions() read_options.block_size = len(rows) - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) self.check_reader(reader, expected_schema, [{'a': ['123', 'abc', 'gh'], 'b': [b'456', b'de\xff', b'ij']}]) read_options.block_size = len(rows) - 1 - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) self.check_reader(reader, expected_schema, [{'a': ['123', 'abc'], 'b': [b'456', b'de\xff']}, {'a': ['gh'], 'b': [b'ij']}]) - def test_inference_failure(self): + def test_inference_failure(self, use_threads): # Inference on first block, then conversion failure on second block rows = b"a,b\n123,456\nabc,de\xff\ngh,ij\n" read_options = ReadOptions() read_options.block_size = len(rows) - 7 - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) expected_schema = pa.schema([('a', pa.int64()), ('b', pa.int64())]) assert reader.schema == expected_schema @@ -1322,19 +1328,20 @@ def test_inference_failure(self): with pytest.raises(StopIteration): reader.read_next_batch() - def test_invalid_csv(self): + def test_invalid_csv(self, use_threads): # CSV errors on first block rows = b"a,b\n1,2,3\n4,5\n6,7\n" read_options = ReadOptions() read_options.block_size = 10 with pytest.raises(pa.ArrowInvalid, match="Expected 2 columns, got 3"): - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes( + rows, use_threads, read_options=read_options) # CSV errors on second block rows = b"a,b\n1,2\n3,4,5\n6,7\n" read_options.block_size = 8 - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) assert reader.read_next_batch().to_pydict() == {'a': [1], 'b': [2]} with pytest.raises(pa.ArrowInvalid, match="Expected 2 columns, got 3"): @@ -1343,9 +1350,9 @@ def test_invalid_csv(self): with pytest.raises(StopIteration): reader.read_next_batch() - def test_options_delimiter(self): + def test_options_delimiter(self, use_threads): rows = b"a;b,c\nde,fg;eh\n" - reader = self.open_bytes(rows) + reader = self.open_bytes(rows, use_threads) expected_schema = pa.schema([('a;b', pa.string()), ('c', pa.string())]) self.check_reader(reader, expected_schema, @@ -1353,17 +1360,17 @@ def test_options_delimiter(self): 'c': ['fg;eh']}]) opts = ParseOptions(delimiter=';') - reader = self.open_bytes(rows, parse_options=opts) + reader = self.open_bytes(rows, use_threads, parse_options=opts) expected_schema = pa.schema([('a', pa.string()), ('b,c', pa.string())]) self.check_reader(reader, expected_schema, [{'a': ['de,fg'], 'b,c': ['eh']}]) - def test_no_ending_newline(self): + def test_no_ending_newline(self, use_threads): # No \n after last line rows = b"a,b,c\n1,2,3\n4,5,6" - reader = self.open_bytes(rows) + reader = self.open_bytes(rows, use_threads) expected_schema = pa.schema([('a', pa.int64()), ('b', pa.int64()), ('c', pa.int64())]) @@ -1372,16 +1379,16 @@ def test_no_ending_newline(self): 'b': [2, 5], 'c': [3, 6]}]) - def test_empty_file(self): + def test_empty_file(self, use_threads): with pytest.raises(ValueError, match="Empty CSV file"): - self.open_bytes(b"") + self.open_bytes(b"", use_threads) - def test_column_options(self): + def test_column_options(self, use_threads): # With column_names rows = b"1,2,3\n4,5,6" read_options = ReadOptions() read_options.column_names = ['d', 'e', 'f'] - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) expected_schema = pa.schema([('d', pa.int64()), ('e', pa.int64()), ('f', pa.int64())]) @@ -1393,7 +1400,7 @@ def test_column_options(self): # With include_columns convert_options = ConvertOptions() convert_options.include_columns = ['f', 'e'] - reader = self.open_bytes(rows, read_options=read_options, + reader = self.open_bytes(rows, use_threads, read_options=read_options, convert_options=convert_options) expected_schema = pa.schema([('f', pa.int64()), ('e', pa.int64())]) @@ -1403,7 +1410,7 @@ def test_column_options(self): # With column_types convert_options.column_types = {'e': pa.string()} - reader = self.open_bytes(rows, read_options=read_options, + reader = self.open_bytes(rows, use_threads, read_options=read_options, convert_options=convert_options) expected_schema = pa.schema([('f', pa.int64()), ('e', pa.string())]) @@ -1416,11 +1423,12 @@ def test_column_options(self): with pytest.raises( KeyError, match="Column 'g' in include_columns does not exist"): - reader = self.open_bytes(rows, read_options=read_options, + reader = self.open_bytes(rows, use_threads, + read_options=read_options, convert_options=convert_options) convert_options.include_missing_columns = True - reader = self.open_bytes(rows, read_options=read_options, + reader = self.open_bytes(rows, use_threads, read_options=read_options, convert_options=convert_options) expected_schema = pa.schema([('g', pa.null()), ('f', pa.int64()), @@ -1431,7 +1439,7 @@ def test_column_options(self): 'f': [3, 6]}]) convert_options.column_types = {'e': pa.string(), 'g': pa.float64()} - reader = self.open_bytes(rows, read_options=read_options, + reader = self.open_bytes(rows, use_threads, read_options=read_options, convert_options=convert_options) expected_schema = pa.schema([('g', pa.float64()), ('f', pa.int64()), @@ -1441,11 +1449,11 @@ def test_column_options(self): 'e': ["2", "5"], 'f': [3, 6]}]) - def test_encoding(self): + def test_encoding(self, use_threads): # latin-1 (invalid utf-8) rows = b"a,b\nun,\xe9l\xe9phant" read_options = ReadOptions() - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) expected_schema = pa.schema([('a', pa.string()), ('b', pa.binary())]) self.check_reader(reader, expected_schema, @@ -1453,7 +1461,7 @@ def test_encoding(self): 'b': [b"\xe9l\xe9phant"]}]) read_options.encoding = 'latin1' - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) expected_schema = pa.schema([('a', pa.string()), ('b', pa.string())]) self.check_reader(reader, expected_schema, @@ -1464,22 +1472,22 @@ def test_encoding(self): rows = (b'\xff\xfea\x00,\x00b\x00\n\x00u\x00n\x00,' b'\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00') read_options.encoding = 'utf16' - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) expected_schema = pa.schema([('a', pa.string()), ('b', pa.string())]) self.check_reader(reader, expected_schema, [{'a': ["un"], 'b': ["éléphant"]}]) - def test_small_random_csv(self): + def test_small_random_csv(self, use_threads): csv, expected = make_random_csv(num_cols=2, num_rows=10) - reader = self.open_bytes(csv) + reader = self.open_bytes(csv, use_threads) table = reader.read_all() assert table.schema == expected.schema assert table.equals(expected) assert table.to_pydict() == expected.to_pydict() - def test_stress_block_sizes(self): + def test_stress_block_sizes(self, use_threads): # Test a number of small block sizes to stress block stitching csv_base, expected = make_random_csv(num_cols=2, num_rows=500) block_sizes = [19, 21, 23, 26, 37, 111] @@ -1489,22 +1497,15 @@ def test_stress_block_sizes(self): # Need at least two lines for type inference assert csv[:block_size].count(b'\n') >= 2 read_options = ReadOptions(block_size=block_size) - reader = self.open_bytes(csv, read_options=read_options) + reader = self.open_bytes( + csv, use_threads, read_options=read_options) table = reader.read_all() assert table.schema == expected.schema if not table.equals(expected): # Better error output assert table.to_pydict() == expected.to_pydict() - -class TestSerialStreamingCSVRead(BaseTestStreamingCSVRead, unittest.TestCase): - - def open_csv(self, *args, **kwargs): - # read_options = kwargs.setdefault('read_options', ReadOptions()) - # read_options.use_threads = False - return open_csv(*args, **kwargs) - - def test_batch_lifetime(self): + def test_batch_lifetime(self, use_threads): gc.collect() old_allocated = pa.total_allocated_bytes() @@ -1517,7 +1518,7 @@ def check_one_batch(reader, expected): read_options = ReadOptions() read_options.column_names = ['a', 'b'] read_options.block_size = 6 - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) check_one_batch(reader, {'a': [10], 'b': [11]}) allocated_after_first_batch = pa.total_allocated_bytes() check_one_batch(reader, {'a': [12], 'b': [13]}) From 3808254b66a5d56e03041d4c86929f02599f59bb Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 7 Jul 2021 16:48:17 -1000 Subject: [PATCH 07/10] ARROW-11889: Rebase --- cpp/src/arrow/csv/reader.cc | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index ea058f74ea4..4a68e1adadb 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -889,9 +889,8 @@ class StreamingReaderImpl : public ReaderMixin, std::move(buffer_generator), MakeChunker(parse_options_), std::move(after_header), read_options_.skip_rows_after_names); auto parsed_block_gen = - MakeMappedGenerator(std::move(block_gen), std::move(parser_op)); - auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), - std::move(decoder_op)); + MakeMappedGenerator(std::move(block_gen), std::move(parser_op)); + auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op)); auto self = shared_from_this(); return rb_gen().Then([self, rb_gen, max_readahead](const DecodedBlock& first_block) { return self->InitAfterFirstBatch(first_block, std::move(rb_gen), max_readahead); @@ -924,8 +923,8 @@ class StreamingReaderImpl : public ReaderMixin, return block.record_batch; }; - auto unwrapped = MakeMappedGenerator>( - std::move(restarted_gen), std::move(unwrap_and_record_bytes)); + auto unwrapped = + MakeMappedGenerator(std::move(restarted_gen), std::move(unwrap_and_record_bytes)); record_batch_gen_ = MakeCancellable(std::move(unwrapped), io_context_.stop_token()); return Status::OK(); From 61d4bce64cd6a41eafff59cc8279b70b722aa810 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 13 Jul 2021 11:10:18 -1000 Subject: [PATCH 08/10] ARROW-11889: The final operator that was incrementing bytes_decoded had a reference to self which was causing a circular reference. Moved the reference to bytes_decoded itself. --- cpp/src/arrow/csv/reader.cc | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 4a68e1adadb..0937344133a 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -831,7 +831,7 @@ class StreamingReaderImpl : public ReaderMixin, const ConvertOptions& convert_options, bool count_rows) : ReaderMixin(io_context, std::move(input), read_options, parse_options, convert_options, count_rows), - bytes_decoded_(0) {} + bytes_decoded_(std::make_shared>(0)) {} Future<> Init(Executor* cpu_executor) { ARROW_ASSIGN_OR_RAISE(auto istream_it, @@ -856,7 +856,7 @@ class StreamingReaderImpl : public ReaderMixin, std::shared_ptr schema() const override { return schema_; } - int64_t bytes_read() const override { return bytes_decoded_.load(); } + int64_t bytes_read() const override { return bytes_decoded_->load(); } Status ReadNext(std::shared_ptr* batch) override { auto next_fut = ReadNextAsync(); @@ -879,7 +879,7 @@ class StreamingReaderImpl : public ReaderMixin, std::shared_ptr after_header; ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed, ProcessHeader(first_buffer, &after_header)); - bytes_decoded_.fetch_add(header_bytes_consumed); + bytes_decoded_->fetch_add(header_bytes_consumed); auto parser_op = BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, count_rows_); ARROW_ASSIGN_OR_RAISE( @@ -916,10 +916,11 @@ class StreamingReaderImpl : public ReaderMixin, restarted_gen = std::move(readahead_gen); } - auto self = shared_from_this(); + auto bytes_decoded = bytes_decoded_; auto unwrap_and_record_bytes = - [self](const DecodedBlock& block) -> Result> { - self->bytes_decoded_.fetch_add(block.bytes_processed); + [bytes_decoded]( + const DecodedBlock& block) -> Result> { + bytes_decoded->fetch_add(block.bytes_processed); return block.record_batch; }; @@ -933,7 +934,7 @@ class StreamingReaderImpl : public ReaderMixin, std::shared_ptr schema_; AsyncGenerator> record_batch_gen_; // bytes which have been decoded and asked for by the caller - std::atomic bytes_decoded_; + std::shared_ptr> bytes_decoded_; }; ///////////////////////////////////////////////////////////////////////// From a1364e1bf3be156f12fd04f962cae8326c436497 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 15 Jul 2021 11:36:28 -1000 Subject: [PATCH 09/10] Apply suggestions from code review Co-authored-by: Benjamin Kietzman --- cpp/src/arrow/csv/column_decoder.cc | 4 ++-- cpp/src/arrow/csv/reader.cc | 18 +++++++++++------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/csv/column_decoder.cc b/cpp/src/arrow/csv/column_decoder.cc index 3fc7a896460..0e1b088ea4c 100644 --- a/cpp/src/arrow/csv/column_decoder.cc +++ b/cpp/src/arrow/csv/column_decoder.cc @@ -210,7 +210,7 @@ Future> InferringColumnDecoder::Decode( if (!already_taken) { auto maybe_array = RunInference(parser); first_inference_run_.MarkFinished(); - return Future>::MakeFinished(maybe_array); + return Future>::MakeFinished(std::move(maybe_array)); } // Non-first block: wait for inference to finish on first block now, @@ -218,7 +218,7 @@ Future> InferringColumnDecoder::Decode( return first_inference_run_.Then([this, parser] { DCHECK(type_frozen_); auto maybe_array = converter_->Convert(*parser, col_index_); - return maybe_array; + return converter_->Convert(*parser, col_index_); }); } diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index 0937344133a..d2be7a9fafa 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -382,7 +382,8 @@ struct IterationTraits { namespace csv { namespace { -// A functor that takes in a buffer of CSV data and returns a parsed batch of CSV data. +// A function object that takes in a buffer of CSV data and returns a parsed batch of CSV +// data (CSVBlock -> ParsedBlock) for use with MakeMappedGenerator. // The parsed batch contains a list of offsets for each of the columns so that columns // can be individually scanned // @@ -397,7 +398,7 @@ class BlockParsingOperator { count_rows_(count_rows) {} Result operator()(const CSVBlock& block) { - static constexpr int32_t max_num_rows = std::numeric_limits::max(); + constexpr int32_t max_num_rows = std::numeric_limits::max(); auto parser = std::make_shared( io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows); @@ -439,6 +440,8 @@ class BlockParsingOperator { int num_rows_seen_ = 0; }; +// A function object that takes in parsed batch of CSV data and decodes it to an arrow +// record batch (ParsedBlock -> DecodedBlock) for use with MakeMappedGenerator. class BlockDecodingOperator { public: Future operator()(const ParsedBlock& block) { @@ -448,7 +451,7 @@ class BlockDecodingOperator { decoded_array_futs.push_back(decoder->Decode(block.parser)); } auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped; - auto decoded_arrays_fut = All(decoded_array_futs); + auto decoded_arrays_fut = All(std::move(decoded_array_futs)); auto state = state_; return decoded_arrays_fut.Then( [state, bytes_parsed_or_skipped]( @@ -457,7 +460,8 @@ class BlockDecodingOperator { ARROW_ASSIGN_OR_RAISE(auto decoded_arrays, internal::UnwrapOrRaise(maybe_decoded_arrays)); - ARROW_ASSIGN_OR_RAISE(auto batch, state->DecodedArraysToBatch(decoded_arrays)); + ARROW_ASSIGN_OR_RAISE(auto batch, + state->DecodedArraysToBatch(std::move(decoded_arrays))); return DecodedBlock{std::move(batch), bytes_parsed_or_skipped}; }); } @@ -484,7 +488,7 @@ class BlockDecodingOperator { conversion_schema(std::move(conversion_schema)) {} Result> DecodedArraysToBatch( - std::vector>& arrays) { + std::vector> arrays) { if (schema == nullptr) { FieldVector fields(arrays.size()); for (size_t i = 0; i < arrays.size(); ++i) { @@ -1177,8 +1181,8 @@ class CSVRowCounter : public ReaderMixin, } Future DoCount(const std::shared_ptr& self) { - // We must return a value instead of Status/Future<> to work with - // MakeMappedGenerator, and we must use a type with a valid end value to work with + // count_cb must return a value instead of Status/Future<> to work with + // MakeMappedGenerator, and it must use a type with a valid end value to work with // IterationEnd. std::function>(const CSVBlock&)> count_cb = [self](const CSVBlock& maybe_block) -> Result> { From ab9b932ef578df93304e11215f83dfddb4fa50e8 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Thu, 15 Jul 2021 11:40:36 -1000 Subject: [PATCH 10/10] ARROW-11889: Applying additional suggestions from code review --- cpp/src/arrow/csv/column_decoder.cc | 12 ++---------- cpp/src/arrow/csv/reader.cc | 3 +++ 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/csv/column_decoder.cc b/cpp/src/arrow/csv/column_decoder.cc index 0e1b088ea4c..ff5d01d8c4d 100644 --- a/cpp/src/arrow/csv/column_decoder.cc +++ b/cpp/src/arrow/csv/column_decoder.cc @@ -86,16 +86,8 @@ class NullColumnDecoder : public ConcreteColumnDecoder { Future> NullColumnDecoder::Decode( const std::shared_ptr& parser) { - // Spawn a task that will build an array of nulls with the right DataType - const int32_t num_rows = parser->num_rows(); - DCHECK_GE(num_rows, 0); - - std::unique_ptr builder; - RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder)); - std::shared_ptr array; - RETURN_NOT_OK(builder->AppendNulls(num_rows)); - RETURN_NOT_OK(builder->Finish(&array)); - return Future>::MakeFinished(std::move(array)); + DCHECK_GE(parser->num_rows(), 0); + return MakeArrayOfNull(type_, parser->num_rows(), pool_); } ////////////////////////////////////////////////////////////////////////// diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index d2be7a9fafa..11437297b80 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -884,17 +884,20 @@ class StreamingReaderImpl : public ReaderMixin, ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed, ProcessHeader(first_buffer, &after_header)); bytes_decoded_->fetch_add(header_bytes_consumed); + auto parser_op = BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, count_rows_); ARROW_ASSIGN_OR_RAISE( auto decoder_op, BlockDecodingOperator::Make(io_context_, convert_options_, conversion_schema_)); + auto block_gen = SerialBlockReader::MakeAsyncIterator( std::move(buffer_generator), MakeChunker(parse_options_), std::move(after_header), read_options_.skip_rows_after_names); auto parsed_block_gen = MakeMappedGenerator(std::move(block_gen), std::move(parser_op)); auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op)); + auto self = shared_from_this(); return rb_gen().Then([self, rb_gen, max_readahead](const DecodedBlock& first_block) { return self->InitAfterFirstBatch(first_block, std::move(rb_gen), max_readahead);