diff --git a/cpp/src/arrow/csv/column_decoder.cc b/cpp/src/arrow/csv/column_decoder.cc index 1dd13bc9086..ff5d01d8c4d 100644 --- a/cpp/src/arrow/csv/column_decoder.cc +++ b/cpp/src/arrow/csv/column_decoder.cc @@ -45,91 +45,13 @@ using internal::TaskGroup; class ConcreteColumnDecoder : public ColumnDecoder { public: - explicit ConcreteColumnDecoder(MemoryPool* pool, - std::shared_ptr task_group, - int32_t col_index = -1) - : ColumnDecoder(std::move(task_group)), - pool_(pool), - col_index_(col_index), - num_chunks_(-1), - next_chunk_(0) {} - - void Append(const std::shared_ptr& parser) override { - Insert(static_cast(chunks_.size()), parser); - } - - void SetEOF(int64_t num_blocks) override { - std::lock_guard lock(mutex_); - - DCHECK_EQ(num_chunks_, -1) << "Cannot change EOF"; - num_chunks_ = num_blocks; - - // If further chunks have been requested in NextChunk(), arrange to return nullptr - for (int64_t i = num_chunks_; i < static_cast(chunks_.size()); ++i) { - auto* chunk = &chunks_[i]; - if (chunk->is_valid()) { - DCHECK(!IsFutureFinished(chunk->state())); - chunk->MarkFinished(std::shared_ptr()); - } - } - } - - Result> NextChunk() override { - std::unique_lock lock(mutex_); - - if (num_chunks_ > 0 && next_chunk_ >= num_chunks_) { - return nullptr; // EOF - } - PrepareChunkUnlocked(next_chunk_); - auto chunk_index = next_chunk_++; - WaitForChunkUnlocked(chunk_index); - // Move Future to avoid keeping chunk alive - return chunks_[chunk_index].MoveResult(); - } + explicit ConcreteColumnDecoder(MemoryPool* pool, int32_t col_index = -1) + : ColumnDecoder(), pool_(pool), col_index_(col_index) {} protected: // XXX useful? virtual std::shared_ptr type() const = 0; - void WaitForChunkUnlocked(int64_t chunk_index) { - auto future = chunks_[chunk_index]; // Make copy because of resizes - mutex_.unlock(); - future.Wait(); - mutex_.lock(); - } - - void PrepareChunk(int64_t block_index) { - std::lock_guard lock(mutex_); - PrepareChunkUnlocked(block_index); - } - - void PrepareChunkUnlocked(int64_t block_index) { - size_t chunk_index = static_cast(block_index); - if (chunks_.size() <= chunk_index) { - chunks_.resize(chunk_index + 1); - } - if (!chunks_[block_index].is_valid()) { - chunks_[block_index] = Future>::Make(); - } - } - - void SetChunk(int64_t chunk_index, Result> maybe_array) { - std::lock_guard lock(mutex_); - SetChunkUnlocked(chunk_index, std::move(maybe_array)); - } - - void SetChunkUnlocked(int64_t chunk_index, Result> maybe_array) { - auto* chunk = &chunks_[chunk_index]; - DCHECK(chunk->is_valid()); - DCHECK(!IsFutureFinished(chunk->state())); - - if (maybe_array.ok()) { - chunk->MarkFinished(std::move(maybe_array)); - } else { - chunk->MarkFinished(WrapConversionError(maybe_array.status())); - } - } - Status WrapConversionError(const Status& st) { if (st.ok()) { return st; @@ -142,12 +64,7 @@ class ConcreteColumnDecoder : public ColumnDecoder { MemoryPool* pool_; int32_t col_index_; - - std::vector>> chunks_; - int64_t num_chunks_; - int64_t next_chunk_; - - std::mutex mutex_; + internal::Executor* executor_; }; ////////////////////////////////////////////////////////////////////////// @@ -155,11 +72,11 @@ class ConcreteColumnDecoder : public ColumnDecoder { class NullColumnDecoder : public ConcreteColumnDecoder { public: - explicit NullColumnDecoder(const std::shared_ptr& type, MemoryPool* pool, - const std::shared_ptr& task_group) - : ConcreteColumnDecoder(pool, task_group), type_(type) {} + explicit NullColumnDecoder(const std::shared_ptr& type, MemoryPool* pool) + : ConcreteColumnDecoder(pool), type_(type) {} - void Insert(int64_t block_index, const std::shared_ptr& parser) override; + Future> Decode( + const std::shared_ptr& parser) override; protected: std::shared_ptr type() const override { return type_; } @@ -167,24 +84,10 @@ class NullColumnDecoder : public ConcreteColumnDecoder { std::shared_ptr type_; }; -void NullColumnDecoder::Insert(int64_t block_index, - const std::shared_ptr& parser) { - PrepareChunk(block_index); - - // Spawn a task that will build an array of nulls with the right DataType - const int32_t num_rows = parser->num_rows(); - DCHECK_GE(num_rows, 0); - - task_group_->Append([=]() -> Status { - std::unique_ptr builder; - RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder)); - std::shared_ptr array; - RETURN_NOT_OK(builder->AppendNulls(num_rows)); - RETURN_NOT_OK(builder->Finish(&array)); - - SetChunk(block_index, array); - return Status::OK(); - }); +Future> NullColumnDecoder::Decode( + const std::shared_ptr& parser) { + DCHECK_GE(parser->num_rows(), 0); + return MakeArrayOfNull(type_, parser->num_rows(), pool_); } ////////////////////////////////////////////////////////////////////////// @@ -193,15 +96,13 @@ void NullColumnDecoder::Insert(int64_t block_index, class TypedColumnDecoder : public ConcreteColumnDecoder { public: TypedColumnDecoder(const std::shared_ptr& type, int32_t col_index, - const ConvertOptions& options, MemoryPool* pool, - const std::shared_ptr& task_group) - : ConcreteColumnDecoder(pool, task_group, col_index), - type_(type), - options_(options) {} + const ConvertOptions& options, MemoryPool* pool) + : ConcreteColumnDecoder(pool, col_index), type_(type), options_(options) {} Status Init(); - void Insert(int64_t block_index, const std::shared_ptr& parser) override; + Future> Decode( + const std::shared_ptr& parser) override; protected: std::shared_ptr type() const override { return type_; } @@ -219,17 +120,11 @@ Status TypedColumnDecoder::Init() { return Status::OK(); } -void TypedColumnDecoder::Insert(int64_t block_index, - const std::shared_ptr& parser) { +Future> TypedColumnDecoder::Decode( + const std::shared_ptr& parser) { DCHECK_NE(converter_, nullptr); - - PrepareChunk(block_index); - - // We're careful that all references in the closure outlive the Append() call - task_group_->Append([=]() -> Status { - SetChunk(block_index, converter_->Convert(*parser, col_index_)); - return Status::OK(); - }); + return Future>::MakeFinished( + converter_->Convert(*parser, col_index_)); } ////////////////////////////////////////////////////////////////////////// @@ -238,16 +133,19 @@ void TypedColumnDecoder::Insert(int64_t block_index, class InferringColumnDecoder : public ConcreteColumnDecoder { public: InferringColumnDecoder(int32_t col_index, const ConvertOptions& options, - MemoryPool* pool, - const std::shared_ptr& task_group) - : ConcreteColumnDecoder(pool, task_group, col_index), + MemoryPool* pool) + : ConcreteColumnDecoder(pool, col_index), options_(options), infer_status_(options), - type_frozen_(false) {} + type_frozen_(false) { + first_inference_run_ = Future<>::Make(); + first_inferrer_ = 0; + } Status Init(); - void Insert(int64_t block_index, const std::shared_ptr& parser) override; + Future> Decode( + const std::shared_ptr& parser) override; protected: std::shared_ptr type() const override { @@ -265,10 +163,9 @@ class InferringColumnDecoder : public ConcreteColumnDecoder { // Current inference status InferStatus infer_status_; bool type_frozen_; + std::atomic first_inferrer_; + Future<> first_inference_run_; std::shared_ptr converter_; - - // The parsers corresponding to each chunk (for reconverting) - std::vector> parsers_; }; Status InferringColumnDecoder::Init() { return UpdateType(); } @@ -283,55 +180,37 @@ Result> InferringColumnDecoder::RunInference( // (no one else should be updating converter_ concurrently) auto maybe_array = converter_->Convert(*parser, col_index_); - std::unique_lock lock(mutex_); if (maybe_array.ok() || !infer_status_.can_loosen_type()) { // Conversion succeeded, or failed definitively + DCHECK(!type_frozen_); + type_frozen_ = true; return maybe_array; } // Conversion failed temporarily, try another type infer_status_.LoosenType(maybe_array.status()); - RETURN_NOT_OK(UpdateType()); + auto update_status = UpdateType(); + if (!update_status.ok()) { + return update_status; + } } } -void InferringColumnDecoder::Insert(int64_t block_index, - const std::shared_ptr& parser) { - PrepareChunk(block_index); - +Future> InferringColumnDecoder::Decode( + const std::shared_ptr& parser) { + bool already_taken = first_inferrer_.fetch_or(1); // First block: run inference - if (block_index == 0) { - task_group_->Append([=]() -> Status { - auto maybe_array = RunInference(parser); - - std::unique_lock lock(mutex_); - DCHECK(!type_frozen_); - type_frozen_ = true; - SetChunkUnlocked(block_index, std::move(maybe_array)); - return Status::OK(); - }); - return; + if (!already_taken) { + auto maybe_array = RunInference(parser); + first_inference_run_.MarkFinished(); + return Future>::MakeFinished(std::move(maybe_array)); } // Non-first block: wait for inference to finish on first block now, // without blocking a TaskGroup thread. - { - std::unique_lock lock(mutex_); - PrepareChunkUnlocked(0); - WaitForChunkUnlocked(0); - if (!chunks_[0].status().ok()) { - // Failed converting first chunk: bail out by marking EOF, - // because we can't decide a type for the other chunks. - SetChunkUnlocked(block_index, std::shared_ptr()); - } + return first_inference_run_.Then([this, parser] { DCHECK(type_frozen_); - } - - // Then use the inferred type to convert this block. - task_group_->Append([=]() -> Status { auto maybe_array = converter_->Convert(*parser, col_index_); - - SetChunk(block_index, std::move(maybe_array)); - return Status::OK(); + return converter_->Convert(*parser, col_index_); }); } @@ -339,28 +218,24 @@ void InferringColumnDecoder::Insert(int64_t block_index, // Factory functions Result> ColumnDecoder::Make( - MemoryPool* pool, int32_t col_index, const ConvertOptions& options, - std::shared_ptr task_group) { - auto ptr = std::make_shared(col_index, options, pool, - std::move(task_group)); + MemoryPool* pool, int32_t col_index, const ConvertOptions& options) { + auto ptr = std::make_shared(col_index, options, pool); RETURN_NOT_OK(ptr->Init()); return ptr; } Result> ColumnDecoder::Make( MemoryPool* pool, std::shared_ptr type, int32_t col_index, - const ConvertOptions& options, std::shared_ptr task_group) { - auto ptr = std::make_shared(std::move(type), col_index, options, - pool, std::move(task_group)); + const ConvertOptions& options) { + auto ptr = + std::make_shared(std::move(type), col_index, options, pool); RETURN_NOT_OK(ptr->Init()); return ptr; } Result> ColumnDecoder::MakeNull( - MemoryPool* pool, std::shared_ptr type, - std::shared_ptr task_group) { - return std::make_shared(std::move(type), pool, - std::move(task_group)); + MemoryPool* pool, std::shared_ptr type) { + return std::make_shared(std::move(type), pool); } } // namespace csv diff --git a/cpp/src/arrow/csv/column_decoder.h b/cpp/src/arrow/csv/column_decoder.h index 92644e3769f..5fbbd5df58b 100644 --- a/cpp/src/arrow/csv/column_decoder.h +++ b/cpp/src/arrow/csv/column_decoder.h @@ -36,45 +36,28 @@ class ARROW_EXPORT ColumnDecoder { public: virtual ~ColumnDecoder() = default; - /// Spawn a task that will try to convert and append the given CSV block. - /// All calls to Append() should happen on the same thread, otherwise - /// call Insert() instead. - virtual void Append(const std::shared_ptr& parser) = 0; - /// Spawn a task that will try to convert and insert the given CSV block - virtual void Insert(int64_t block_index, - const std::shared_ptr& parser) = 0; - - /// Set EOF at the given number of blocks. Must only be called once. - virtual void SetEOF(int64_t num_blocks) = 0; - - /// Fetch a chunk. - virtual Result> NextChunk() = 0; - - std::shared_ptr task_group() { return task_group_; } + virtual Future> Decode( + const std::shared_ptr& parser) = 0; /// Construct a strictly-typed ColumnDecoder. - static Result> Make( - MemoryPool* pool, std::shared_ptr type, int32_t col_index, - const ConvertOptions& options, std::shared_ptr task_group); + static Result> Make(MemoryPool* pool, + std::shared_ptr type, + int32_t col_index, + const ConvertOptions& options); /// Construct a type-inferring ColumnDecoder. /// Inference will run only on the first block, the type will be frozen afterwards. - static Result> Make( - MemoryPool* pool, int32_t col_index, const ConvertOptions& options, - std::shared_ptr task_group); + static Result> Make(MemoryPool* pool, int32_t col_index, + const ConvertOptions& options); /// Construct a ColumnDecoder for a column of nulls /// (i.e. not present in the CSV file). - static Result> MakeNull( - MemoryPool* pool, std::shared_ptr type, - std::shared_ptr task_group); + static Result> MakeNull(MemoryPool* pool, + std::shared_ptr type); protected: - explicit ColumnDecoder(std::shared_ptr task_group) - : task_group_(std::move(task_group)) {} - - std::shared_ptr task_group_; + ColumnDecoder() = default; }; } // namespace csv diff --git a/cpp/src/arrow/csv/column_decoder_test.cc b/cpp/src/arrow/csv/column_decoder_test.cc index 231ffb85e1b..c8b96e04696 100644 --- a/cpp/src/arrow/csv/column_decoder_test.cc +++ b/cpp/src/arrow/csv/column_decoder_test.cc @@ -27,11 +27,11 @@ #include "arrow/csv/test_common.h" #include "arrow/memory_pool.h" #include "arrow/table.h" +#include "arrow/testing/future_util.h" #include "arrow/testing/gtest_util.h" #include "arrow/testing/util.h" #include "arrow/type.h" #include "arrow/util/checked_cast.h" -#include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" namespace arrow { @@ -41,7 +41,6 @@ class BlockParser; using internal::checked_cast; using internal::GetCpuThreadPool; -using internal::TaskGroup; using ChunkData = std::vector>; @@ -65,58 +64,70 @@ ThreadJoiner RunThread(Func&& func) { return ThreadJoiner(std::make_shared(std::forward(func))); } -struct SerialExecutor { - static std::shared_ptr task_group() { return TaskGroup::MakeSerial(); } -}; - -struct ParallelExecutor { - static std::shared_ptr task_group() { - return TaskGroup::MakeThreaded(GetCpuThreadPool()); +template +void RunThreadsAndJoin(Func&& func, int iters) { + std::vector threads; + for (int i = 0; i < iters; i++) { + threads.emplace_back(std::make_shared([i, func] { func(i); })); } -}; - -using ExecutorTypes = ::testing::Types; +} class ColumnDecoderTest : public ::testing::Test { public: - ColumnDecoderTest() : tg_(TaskGroup::MakeSerial()), num_chunks_(0) {} + ColumnDecoderTest() : num_chunks_(0), read_ptr_(0) {} void SetDecoder(std::shared_ptr decoder) { decoder_ = std::move(decoder); + decoded_chunks_.clear(); num_chunks_ = 0; + read_ptr_ = 0; } - void InsertChunk(int64_t num_chunk, std::vector chunk) { + void InsertChunk(std::vector chunk) { std::shared_ptr parser; MakeColumnParser(chunk, &parser); - decoder_->Insert(num_chunk, parser); + auto decoded = decoder_->Decode(parser); + decoded_chunks_.push_back(decoded); + ++num_chunks_; } void AppendChunks(const ChunkData& chunks) { for (const auto& chunk : chunks) { - std::shared_ptr parser; - MakeColumnParser(chunk, &parser); - decoder_->Append(parser); - ++num_chunks_; + InsertChunk(chunk); } } - void SetEOF() { decoder_->SetEOF(num_chunks_); } + Result> NextChunk() { + EXPECT_LT(read_ptr_, static_cast(decoded_chunks_.size())); + return decoded_chunks_[read_ptr_++].result(); + } + + void AssertChunk(std::vector chunk, std::shared_ptr expected) { + std::shared_ptr parser; + MakeColumnParser(chunk, &parser); + ASSERT_FINISHES_OK_AND_ASSIGN(auto decoded, decoder_->Decode(parser)); + AssertArraysEqual(*expected, *decoded); + } + + void AssertChunkInvalid(std::vector chunk) { + std::shared_ptr parser; + MakeColumnParser(chunk, &parser); + ASSERT_FINISHES_AND_RAISES(Invalid, decoder_->Decode(parser)); + } void AssertFetch(std::shared_ptr expected_chunk) { - ASSERT_OK_AND_ASSIGN(auto chunk, decoder_->NextChunk()); + ASSERT_OK_AND_ASSIGN(auto chunk, NextChunk()); ASSERT_NE(chunk, nullptr); AssertArraysEqual(*expected_chunk, *chunk); } - void AssertFetchInvalid() { ASSERT_RAISES(Invalid, decoder_->NextChunk()); } - - void AssertFetchEOF() { ASSERT_OK_AND_EQ(nullptr, decoder_->NextChunk()); } + void AssertFetchInvalid() { ASSERT_RAISES(Invalid, NextChunk()); } protected: - std::shared_ptr tg_; std::shared_ptr decoder_; - int64_t num_chunks_; + std::vector>> decoded_chunks_; + int64_t num_chunks_ = 0; + int64_t read_ptr_ = 0; ConvertOptions default_options = ConvertOptions::Defaults(); }; @@ -124,14 +135,13 @@ class ColumnDecoderTest : public ::testing::Test { ////////////////////////////////////////////////////////////////////////// // Tests for null column decoder -template class NullColumnDecoderTest : public ColumnDecoderTest { public: - NullColumnDecoderTest() { tg_ = ExecutorType::task_group(); } + NullColumnDecoderTest() {} void MakeDecoder(std::shared_ptr type) { ASSERT_OK_AND_ASSIGN(auto decoder, - ColumnDecoder::MakeNull(default_memory_pool(), type, tg_)); + ColumnDecoder::MakeNull(default_memory_pool(), type)); SetDecoder(decoder); } @@ -141,10 +151,8 @@ class NullColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(type); AppendChunks({{"1", "2", "3"}, {"4", "5"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[null, null, null]")); AssertFetch(ArrayFromJSON(type, "[null, null]")); - AssertFetchEOF(); MakeDecoder(type); @@ -153,8 +161,6 @@ class NullColumnDecoderTest : public ColumnDecoderTest { AppendChunks({{"7", "8"}}); AssertFetch(ArrayFromJSON(type, "[null]")); AssertFetch(ArrayFromJSON(type, "[null, null]")); - SetEOF(); - AssertFetchEOF(); } void TestOtherType() { @@ -163,57 +169,40 @@ class NullColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(type); AppendChunks({{"1", "2", "3"}, {"4", "5"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[null, null, null]")); AssertFetch(ArrayFromJSON(type, "[null, null]")); - AssertFetchEOF(); - AssertFetchEOF(); } void TestThreaded() { + constexpr int NITERS = 10; auto type = int32(); - MakeDecoder(type); - auto joiner = RunThread([&]() { - InsertChunk(1, {"4", "5"}); - InsertChunk(0, {"1", "2", "3"}); - InsertChunk(3, {"6"}); - InsertChunk(2, {}); - decoder_->SetEOF(4); - }); - - AssertFetch(ArrayFromJSON(type, "[null, null, null]")); - AssertFetch(ArrayFromJSON(type, "[null, null]")); - AssertFetch(ArrayFromJSON(type, "[]")); - AssertFetch(ArrayFromJSON(type, "[null]")); - AssertFetchEOF(); - AssertFetchEOF(); + RunThreadsAndJoin( + [&](int thread_id) { + AssertChunk({"4", "5", std::to_string(thread_id)}, + ArrayFromJSON(type, "[null, null, null]")); + }, + NITERS); } - - protected: - ExecutorType executor_; }; -TYPED_TEST_SUITE(NullColumnDecoderTest, ExecutorTypes); - -TYPED_TEST(NullColumnDecoderTest, NullType) { this->TestNullType(); } +TEST_F(NullColumnDecoderTest, NullType) { this->TestNullType(); } -TYPED_TEST(NullColumnDecoderTest, OtherType) { this->TestOtherType(); } +TEST_F(NullColumnDecoderTest, OtherType) { this->TestOtherType(); } -TYPED_TEST(NullColumnDecoderTest, Threaded) { this->TestThreaded(); } +TEST_F(NullColumnDecoderTest, Threaded) { this->TestThreaded(); } ////////////////////////////////////////////////////////////////////////// // Tests for fixed-type column decoder -template class TypedColumnDecoderTest : public ColumnDecoderTest { public: - TypedColumnDecoderTest() { tg_ = ExecutorType::task_group(); } + TypedColumnDecoderTest() {} void MakeDecoder(const std::shared_ptr& type, const ConvertOptions& options) { - ASSERT_OK_AND_ASSIGN( - auto decoder, ColumnDecoder::Make(default_memory_pool(), type, 0, options, tg_)); + ASSERT_OK_AND_ASSIGN(auto decoder, + ColumnDecoder::Make(default_memory_pool(), type, 0, options)); SetDecoder(decoder); } @@ -223,11 +212,8 @@ class TypedColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(type, default_options); AppendChunks({{"123", "456", "-78"}, {"901", "N/A"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[123, 456, -78]")); AssertFetch(ArrayFromJSON(type, "[901, null]")); - AssertFetchEOF(); - AssertFetchEOF(); MakeDecoder(type, default_options); @@ -236,9 +222,6 @@ class TypedColumnDecoderTest : public ColumnDecoderTest { AppendChunks({{"N/A", "N/A"}}); AssertFetch(ArrayFromJSON(type, "[-987]")); AssertFetch(ArrayFromJSON(type, "[null, null]")); - SetEOF(); - AssertFetchEOF(); - AssertFetchEOF(); } void TestOptions() { @@ -247,10 +230,7 @@ class TypedColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(type, default_options); AppendChunks({{"true", "false", "N/A"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[true, false, null]")); - AssertFetchEOF(); - AssertFetchEOF(); // With non-default options auto options = default_options; @@ -260,10 +240,7 @@ class TypedColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(type, options); AppendChunks({{"true", "false", "N/A"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[null, true, false]")); - AssertFetchEOF(); - AssertFetchEOF(); } void TestErrors() { @@ -273,56 +250,46 @@ class TypedColumnDecoderTest : public ColumnDecoderTest { AppendChunks({{"123", "456", "N/A"}, {"-901"}}); AppendChunks({{"N/A", "1000"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[123, 456, null]")); AssertFetchInvalid(); AssertFetch(ArrayFromJSON(type, "[null, 1000]")); - AssertFetchEOF(); } void TestThreaded() { + constexpr int NITERS = 10; auto type = uint32(); - MakeDecoder(type, default_options); - auto joiner = RunThread([&]() { - InsertChunk(1, {"4", "-5"}); - InsertChunk(0, {"1", "2", "3"}); - InsertChunk(3, {"6"}); - InsertChunk(2, {}); - decoder_->SetEOF(4); - }); - - AssertFetch(ArrayFromJSON(type, "[1, 2, 3]")); - AssertFetchInvalid(); - AssertFetch(ArrayFromJSON(type, "[]")); - AssertFetch(ArrayFromJSON(type, "[6]")); - AssertFetchEOF(); - AssertFetchEOF(); + RunThreadsAndJoin( + [&](int thread_id) { + if (thread_id % 2 == 0) { + AssertChunkInvalid({"4", "-5"}); + } else { + AssertChunk({"1", "2", "3"}, ArrayFromJSON(type, "[1, 2, 3]")); + } + }, + NITERS); } }; -TYPED_TEST_SUITE(TypedColumnDecoderTest, ExecutorTypes); +TEST_F(TypedColumnDecoderTest, Integers) { this->TestIntegers(); } -TYPED_TEST(TypedColumnDecoderTest, Integers) { this->TestIntegers(); } +TEST_F(TypedColumnDecoderTest, Options) { this->TestOptions(); } -TYPED_TEST(TypedColumnDecoderTest, Options) { this->TestOptions(); } +TEST_F(TypedColumnDecoderTest, Errors) { this->TestErrors(); } -TYPED_TEST(TypedColumnDecoderTest, Errors) { this->TestErrors(); } - -TYPED_TEST(TypedColumnDecoderTest, Threaded) { this->TestThreaded(); } +TEST_F(TypedColumnDecoderTest, Threaded) { this->TestThreaded(); } ////////////////////////////////////////////////////////////////////////// // Tests for type-inferring column decoder -template class InferringColumnDecoderTest : public ColumnDecoderTest { public: - InferringColumnDecoderTest() { tg_ = ExecutorType::task_group(); } + InferringColumnDecoderTest() {} void MakeDecoder(const ConvertOptions& options) { ASSERT_OK_AND_ASSIGN(auto decoder, - ColumnDecoder::Make(default_memory_pool(), 0, options, tg_)); + ColumnDecoder::Make(default_memory_pool(), 0, options)); SetDecoder(decoder); } @@ -332,35 +299,37 @@ class InferringColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(default_options); AppendChunks({{"123", "456", "-78"}, {"901", "N/A"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[123, 456, -78]")); AssertFetch(ArrayFromJSON(type, "[901, null]")); - AssertFetchEOF(); - AssertFetchEOF(); } void TestThreaded() { + constexpr int NITERS = 10; auto type = float64(); - MakeDecoder(default_options); - auto joiner = RunThread([&]() { - SleepFor(1e-3); - InsertChunk(0, {"1.5", "2", "3"}); - InsertChunk(3, {"6"}); - decoder_->SetEOF(4); - }); - - // These chunks will wait for inference to run on chunk 0 - InsertChunk(1, {"4", "-5", "N/A"}); - InsertChunk(2, {}); - - AssertFetch(ArrayFromJSON(type, "[1.5, 2, 3]")); - AssertFetch(ArrayFromJSON(type, "[4, -5, null]")); - AssertFetch(ArrayFromJSON(type, "[]")); - AssertFetch(ArrayFromJSON(type, "[6]")); - AssertFetchEOF(); - AssertFetchEOF(); + // One of these will do the inference so we need to make sure they all have floating + // point + RunThreadsAndJoin( + [&](int thread_id) { + if (thread_id % 2 == 0) { + AssertChunk({"6.3", "7.2"}, ArrayFromJSON(type, "[6.3, 7.2]")); + } else { + AssertChunk({"1.1", "2", "3"}, ArrayFromJSON(type, "[1.1, 2, 3]")); + } + }, + NITERS); + + // These will run after the inference + RunThreadsAndJoin( + [&](int thread_id) { + if (thread_id % 2 == 0) { + AssertChunk({"1", "2"}, ArrayFromJSON(type, "[1, 2]")); + } else { + AssertChunkInvalid({"xyz"}); + } + }, + NITERS); } void TestOptions() { @@ -373,11 +342,8 @@ class InferringColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(options); AppendChunks({{"true", "false", "N/A"}, {"true"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[null, true, false]")); AssertFetch(ArrayFromJSON(type, "[null]")); - AssertFetchEOF(); - AssertFetchEOF(); } void TestErrors() { @@ -387,12 +353,9 @@ class InferringColumnDecoderTest : public ColumnDecoderTest { AppendChunks({{"123", "456", "-78"}, {"9.5", "N/A"}}); AppendChunks({{"1000", "N/A"}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[123, 456, -78]")); AssertFetchInvalid(); AssertFetch(ArrayFromJSON(type, "[1000, null]")); - AssertFetchEOF(); - AssertFetchEOF(); } void TestEmpty() { @@ -401,25 +364,20 @@ class InferringColumnDecoderTest : public ColumnDecoderTest { MakeDecoder(default_options); AppendChunks({{}, {}}); - SetEOF(); AssertFetch(ArrayFromJSON(type, "[]")); AssertFetch(ArrayFromJSON(type, "[]")); - AssertFetchEOF(); - AssertFetchEOF(); } }; -TYPED_TEST_SUITE(InferringColumnDecoderTest, ExecutorTypes); - -TYPED_TEST(InferringColumnDecoderTest, Integers) { this->TestIntegers(); } +TEST_F(InferringColumnDecoderTest, Integers) { this->TestIntegers(); } -TYPED_TEST(InferringColumnDecoderTest, Threaded) { this->TestThreaded(); } +TEST_F(InferringColumnDecoderTest, Threaded) { this->TestThreaded(); } -TYPED_TEST(InferringColumnDecoderTest, Options) { this->TestOptions(); } +TEST_F(InferringColumnDecoderTest, Options) { this->TestOptions(); } -TYPED_TEST(InferringColumnDecoderTest, Errors) { this->TestErrors(); } +TEST_F(InferringColumnDecoderTest, Errors) { this->TestErrors(); } -TYPED_TEST(InferringColumnDecoderTest, Empty) { this->TestEmpty(); } +TEST_F(InferringColumnDecoderTest, Empty) { this->TestEmpty(); } // More inference tests are in InferringColumnBuilderTest diff --git a/cpp/src/arrow/csv/reader.cc b/cpp/src/arrow/csv/reader.cc index d57a2f15667..11437297b80 100644 --- a/cpp/src/arrow/csv/reader.cc +++ b/cpp/src/arrow/csv/reader.cc @@ -50,6 +50,7 @@ #include "arrow/util/task_group.h" #include "arrow/util/thread_pool.h" #include "arrow/util/utf8.h" +#include "arrow/util/vector.h" namespace arrow { namespace csv { @@ -349,6 +350,186 @@ class ThreadedBlockReader : public BlockReader { } }; +struct ParsedBlock { + std::shared_ptr parser; + int64_t block_index; + int64_t bytes_parsed_or_skipped; +}; + +struct DecodedBlock { + std::shared_ptr record_batch; + // Represents the number of input bytes represented by this batch + // This will include bytes skipped when skipping rows after the header + int64_t bytes_processed; +}; + +} // namespace + +} // namespace csv + +template <> +struct IterationTraits { + static csv::ParsedBlock End() { return csv::ParsedBlock{nullptr, -1, -1}; } + static bool IsEnd(const csv::ParsedBlock& val) { return val.block_index < 0; } +}; + +template <> +struct IterationTraits { + static csv::DecodedBlock End() { return csv::DecodedBlock{nullptr, -1}; } + static bool IsEnd(const csv::DecodedBlock& val) { return val.bytes_processed < 0; } +}; + +namespace csv { +namespace { + +// A function object that takes in a buffer of CSV data and returns a parsed batch of CSV +// data (CSVBlock -> ParsedBlock) for use with MakeMappedGenerator. +// The parsed batch contains a list of offsets for each of the columns so that columns +// can be individually scanned +// +// This operator is not re-entrant +class BlockParsingOperator { + public: + BlockParsingOperator(io::IOContext io_context, ParseOptions parse_options, + int num_csv_cols, bool count_rows) + : io_context_(io_context), + parse_options_(parse_options), + num_csv_cols_(num_csv_cols), + count_rows_(count_rows) {} + + Result operator()(const CSVBlock& block) { + constexpr int32_t max_num_rows = std::numeric_limits::max(); + auto parser = std::make_shared( + io_context_.pool(), parse_options_, num_csv_cols_, num_rows_seen_, max_num_rows); + + std::shared_ptr straddling; + std::vector views; + if (block.partial->size() != 0 || block.completion->size() != 0) { + if (block.partial->size() == 0) { + straddling = block.completion; + } else if (block.completion->size() == 0) { + straddling = block.partial; + } else { + ARROW_ASSIGN_OR_RAISE( + straddling, + ConcatenateBuffers({block.partial, block.completion}, io_context_.pool())); + } + views = {util::string_view(*straddling), util::string_view(*block.buffer)}; + } else { + views = {util::string_view(*block.buffer)}; + } + uint32_t parsed_size; + if (block.is_final) { + RETURN_NOT_OK(parser->ParseFinal(views, &parsed_size)); + } else { + RETURN_NOT_OK(parser->Parse(views, &parsed_size)); + } + if (count_rows_) { + num_rows_seen_ += parser->num_rows(); + } + RETURN_NOT_OK(block.consume_bytes(parsed_size)); + return ParsedBlock{std::move(parser), block.block_index, + static_cast(parsed_size) + block.bytes_skipped}; + } + + private: + io::IOContext io_context_; + ParseOptions parse_options_; + int num_csv_cols_; + bool count_rows_; + int num_rows_seen_ = 0; +}; + +// A function object that takes in parsed batch of CSV data and decodes it to an arrow +// record batch (ParsedBlock -> DecodedBlock) for use with MakeMappedGenerator. +class BlockDecodingOperator { + public: + Future operator()(const ParsedBlock& block) { + DCHECK(!state_->column_decoders.empty()); + std::vector>> decoded_array_futs; + for (auto& decoder : state_->column_decoders) { + decoded_array_futs.push_back(decoder->Decode(block.parser)); + } + auto bytes_parsed_or_skipped = block.bytes_parsed_or_skipped; + auto decoded_arrays_fut = All(std::move(decoded_array_futs)); + auto state = state_; + return decoded_arrays_fut.Then( + [state, bytes_parsed_or_skipped]( + const std::vector>>& maybe_decoded_arrays) + -> Result { + ARROW_ASSIGN_OR_RAISE(auto decoded_arrays, + internal::UnwrapOrRaise(maybe_decoded_arrays)); + + ARROW_ASSIGN_OR_RAISE(auto batch, + state->DecodedArraysToBatch(std::move(decoded_arrays))); + return DecodedBlock{std::move(batch), bytes_parsed_or_skipped}; + }); + } + + static Result Make(io::IOContext io_context, + ConvertOptions convert_options, + ConversionSchema conversion_schema) { + BlockDecodingOperator op(std::move(io_context), std::move(convert_options), + std::move(conversion_schema)); + RETURN_NOT_OK(op.state_->MakeColumnDecoders(io_context)); + return op; + } + + private: + BlockDecodingOperator(io::IOContext io_context, ConvertOptions convert_options, + ConversionSchema conversion_schema) + : state_(std::make_shared(std::move(io_context), std::move(convert_options), + std::move(conversion_schema))) {} + + struct State { + State(io::IOContext io_context, ConvertOptions convert_options, + ConversionSchema conversion_schema) + : convert_options(std::move(convert_options)), + conversion_schema(std::move(conversion_schema)) {} + + Result> DecodedArraysToBatch( + std::vector> arrays) { + if (schema == nullptr) { + FieldVector fields(arrays.size()); + for (size_t i = 0; i < arrays.size(); ++i) { + fields[i] = field(conversion_schema.columns[i].name, arrays[i]->type()); + } + schema = arrow::schema(std::move(fields)); + } + const auto n_rows = arrays[0]->length(); + return RecordBatch::Make(schema, n_rows, std::move(arrays)); + } + + // Make column decoders from conversion schema + Status MakeColumnDecoders(io::IOContext io_context) { + for (const auto& column : conversion_schema.columns) { + std::shared_ptr decoder; + if (column.is_missing) { + ARROW_ASSIGN_OR_RAISE(decoder, + ColumnDecoder::MakeNull(io_context.pool(), column.type)); + } else if (column.type != nullptr) { + ARROW_ASSIGN_OR_RAISE( + decoder, ColumnDecoder::Make(io_context.pool(), column.type, column.index, + convert_options)); + } else { + ARROW_ASSIGN_OR_RAISE( + decoder, + ColumnDecoder::Make(io_context.pool(), column.index, convert_options)); + } + column_decoders.push_back(std::move(decoder)); + } + return Status::OK(); + } + + ConvertOptions convert_options; + ConversionSchema conversion_schema; + std::vector> column_decoders; + std::shared_ptr schema; + }; + + std::shared_ptr state_; +}; + ///////////////////////////////////////////////////////////////////////// // Base class for common functionality @@ -367,8 +548,9 @@ class ReaderMixin { protected: // Read header and column names from buffer, create column builders - Status ProcessHeader(const std::shared_ptr& buf, - std::shared_ptr* rest) { + // Returns the # of bytes consumed + Result ProcessHeader(const std::shared_ptr& buf, + std::shared_ptr* rest) { const uint8_t* data = buf->data(); const auto data_end = data + buf->size(); DCHECK_GT(data_end - data, 0); @@ -430,12 +612,14 @@ class ReaderMixin { num_rows_seen_ += read_options_.skip_rows_after_names; } - *rest = SliceBuffer(buf, data - buf->data()); + auto bytes_consumed = data - buf->data(); + *rest = SliceBuffer(buf, bytes_consumed); num_csv_cols_ = static_cast(column_names_.size()); DCHECK_GT(num_csv_cols_, 0); - return MakeConversionSchema(); + RETURN_NOT_OK(MakeConversionSchema()); + return bytes_consumed; } std::vector GenerateColumnNames(int32_t num_cols) { @@ -642,123 +826,18 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader { ///////////////////////////////////////////////////////////////////////// // Base class for streaming readers -class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader { +class StreamingReaderImpl : public ReaderMixin, + public csv::StreamingReader, + public std::enable_shared_from_this { public: - BaseStreamingReader(io::IOContext io_context, Executor* cpu_executor, - std::shared_ptr input, + StreamingReaderImpl(io::IOContext io_context, std::shared_ptr input, const ReadOptions& read_options, const ParseOptions& parse_options, const ConvertOptions& convert_options, bool count_rows) : ReaderMixin(io_context, std::move(input), read_options, parse_options, convert_options, count_rows), - cpu_executor_(cpu_executor) {} - - virtual Future> Init() = 0; - - std::shared_ptr schema() const override { return schema_; } - - Status ReadNext(std::shared_ptr* batch) override { - auto next_fut = ReadNextAsync(); - auto next_result = next_fut.result(); - return std::move(next_result).Value(batch); - } - - protected: - // Make column decoders from conversion schema - Status MakeColumnDecoders() { - for (const auto& column : conversion_schema_.columns) { - std::shared_ptr decoder; - if (column.is_missing) { - ARROW_ASSIGN_OR_RAISE(decoder, ColumnDecoder::MakeNull(io_context_.pool(), - column.type, task_group_)); - } else if (column.type != nullptr) { - ARROW_ASSIGN_OR_RAISE( - decoder, ColumnDecoder::Make(io_context_.pool(), column.type, column.index, - convert_options_, task_group_)); - } else { - ARROW_ASSIGN_OR_RAISE(decoder, - ColumnDecoder::Make(io_context_.pool(), column.index, - convert_options_, task_group_)); - } - column_decoders_.push_back(std::move(decoder)); - } - return Status::OK(); - } - - Result ParseAndInsert(const std::shared_ptr& partial, - const std::shared_ptr& completion, - const std::shared_ptr& block, - int64_t block_index, bool is_final) { - ARROW_ASSIGN_OR_RAISE(auto result, - Parse(partial, completion, block, block_index, is_final)); - RETURN_NOT_OK(ProcessData(result.parser, block_index)); - return result.parsed_bytes; - } - - // Trigger conversion of parsed block data - Status ProcessData(const std::shared_ptr& parser, int64_t block_index) { - for (auto& decoder : column_decoders_) { - decoder->Insert(block_index, parser); - } - return Status::OK(); - } - - Result> DecodeNextBatch() { - DCHECK(!column_decoders_.empty()); - ArrayVector arrays; - arrays.reserve(column_decoders_.size()); - Status st; - for (auto& decoder : column_decoders_) { - auto maybe_array = decoder->NextChunk(); - if (!maybe_array.ok()) { - // If there's an error, still fetch results from other decoders to - // keep them in sync. - st &= maybe_array.status(); - } else { - arrays.push_back(*std::move(maybe_array)); - } - } - RETURN_NOT_OK(st); - DCHECK_EQ(arrays.size(), column_decoders_.size()); - const bool is_null = (arrays[0] == nullptr); -#ifndef NDEBUG - for (const auto& array : arrays) { - DCHECK_EQ(array == nullptr, is_null); - } -#endif - if (is_null) { - eof_ = true; - return nullptr; - } - - if (schema_ == nullptr) { - FieldVector fields(arrays.size()); - for (size_t i = 0; i < arrays.size(); ++i) { - fields[i] = field(conversion_schema_.columns[i].name, arrays[i]->type()); - } - schema_ = arrow::schema(std::move(fields)); - } - const auto n_rows = arrays[0]->length(); - return RecordBatch::Make(schema_, n_rows, std::move(arrays)); - } + bytes_decoded_(std::make_shared>(0)) {} - // Column decoders (in ConversionSchema order) - std::vector> column_decoders_; - std::shared_ptr schema_; - std::shared_ptr pending_batch_; - AsyncGenerator> buffer_generator_; - Executor* cpu_executor_; - bool eof_ = false; -}; - -///////////////////////////////////////////////////////////////////////// -// Serial StreamingReader implementation - -class SerialStreamingReader : public BaseStreamingReader, - public std::enable_shared_from_this { - public: - using BaseStreamingReader::BaseStreamingReader; - - Future> Init() override { + Future<> Init(Executor* cpu_executor) { ARROW_ASSIGN_OR_RAISE(auto istream_it, io::MakeInputStreamIterator(input_, read_options_.block_size)); @@ -766,139 +845,103 @@ class SerialStreamingReader : public BaseStreamingReader, ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it), io_context_.executor())); - auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_); + auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor); - buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it)); - task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token()); + auto buffer_generator = CSVBufferIterator::MakeAsync(std::move(transferred_it)); + int max_readahead = cpu_executor->GetCapacity(); auto self = shared_from_this(); - // Read schema from first batch - return ReadNextAsync(true).Then( - [self](const std::shared_ptr& first_batch) - -> Result> { - self->pending_batch_ = first_batch; - DCHECK_NE(self->schema_, nullptr); - return self; - }); - } - Result> DecodeBatchAndUpdateSchema() { - auto maybe_batch = DecodeNextBatch(); - if (schema_ == nullptr && maybe_batch.ok()) { - schema_ = (*maybe_batch)->schema(); - } - return maybe_batch; + return buffer_generator().Then([self, buffer_generator, max_readahead]( + const std::shared_ptr& first_buffer) { + return self->InitAfterFirstBuffer(first_buffer, buffer_generator, max_readahead); + }); } - Future> DoReadNext( - std::shared_ptr self) { - auto batch = std::move(pending_batch_); - if (batch != nullptr) { - return Future>::MakeFinished(batch); - } + std::shared_ptr schema() const override { return schema_; } - if (!source_eof_) { - return block_generator_() - .Then([self](const CSVBlock& maybe_block) -> Status { - if (!IsIterationEnd(maybe_block)) { - self->bytes_parsed_ += maybe_block.bytes_skipped; - self->last_block_index_ = maybe_block.block_index; - auto maybe_parsed = self->ParseAndInsert( - maybe_block.partial, maybe_block.completion, maybe_block.buffer, - maybe_block.block_index, maybe_block.is_final); - if (!maybe_parsed.ok()) { - // Parse error => bail out - self->eof_ = true; - return maybe_parsed.status(); - } - self->bytes_parsed_ += *maybe_parsed; - RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed)); - } else { - self->source_eof_ = true; - for (auto& decoder : self->column_decoders_) { - decoder->SetEOF(self->last_block_index_ + 1); - } - } - return Status::OK(); - }) - .Then([self]() -> Result> { - return self->DecodeBatchAndUpdateSchema(); - }); - } - return Future>::MakeFinished( - DecodeBatchAndUpdateSchema()); - } + int64_t bytes_read() const override { return bytes_decoded_->load(); } - Future> ReadNextSkippingEmpty( - std::shared_ptr self, bool internal_read) { - return DoReadNext(self).Then( - [self, internal_read](const std::shared_ptr& batch) { - if (batch != nullptr && batch->num_rows() == 0) { - return self->ReadNextSkippingEmpty(self, internal_read); - } - if (!internal_read) { - self->bytes_decoded_ += self->bytes_parsed_; - self->bytes_parsed_ = 0; - } - return Future>::MakeFinished(batch); - }); + Status ReadNext(std::shared_ptr* batch) override { + auto next_fut = ReadNextAsync(); + auto next_result = next_fut.result(); + return std::move(next_result).Value(batch); } Future> ReadNextAsync() override { - return ReadNextAsync(false); - }; - - int64_t bytes_read() const override { return bytes_decoded_; } + return record_batch_gen_(); + } protected: - Future<> SetupReader(std::shared_ptr self) { - return buffer_generator_().Then([self](const std::shared_ptr& first_buffer) { - if (first_buffer == nullptr) { - return Status::Invalid("Empty CSV file"); - } - auto own_first_buffer = first_buffer; - auto start = own_first_buffer->data(); - RETURN_NOT_OK(self->ProcessHeader(own_first_buffer, &own_first_buffer)); - self->bytes_decoded_ = own_first_buffer->data() - start; - RETURN_NOT_OK(self->MakeColumnDecoders()); - - self->block_generator_ = SerialBlockReader::MakeAsyncIterator( - std::move(self->buffer_generator_), MakeChunker(self->parse_options_), - std::move(own_first_buffer), self->read_options_.skip_rows_after_names); - return Status::OK(); + Future<> InitAfterFirstBuffer(const std::shared_ptr& first_buffer, + AsyncGenerator> buffer_generator, + int max_readahead) { + if (first_buffer == nullptr) { + return Status::Invalid("Empty CSV file"); + } + + std::shared_ptr after_header; + ARROW_ASSIGN_OR_RAISE(auto header_bytes_consumed, + ProcessHeader(first_buffer, &after_header)); + bytes_decoded_->fetch_add(header_bytes_consumed); + + auto parser_op = + BlockParsingOperator(io_context_, parse_options_, num_csv_cols_, count_rows_); + ARROW_ASSIGN_OR_RAISE( + auto decoder_op, + BlockDecodingOperator::Make(io_context_, convert_options_, conversion_schema_)); + + auto block_gen = SerialBlockReader::MakeAsyncIterator( + std::move(buffer_generator), MakeChunker(parse_options_), std::move(after_header), + read_options_.skip_rows_after_names); + auto parsed_block_gen = + MakeMappedGenerator(std::move(block_gen), std::move(parser_op)); + auto rb_gen = MakeMappedGenerator(std::move(parsed_block_gen), std::move(decoder_op)); + + auto self = shared_from_this(); + return rb_gen().Then([self, rb_gen, max_readahead](const DecodedBlock& first_block) { + return self->InitAfterFirstBatch(first_block, std::move(rb_gen), max_readahead); }); } - Future> ReadNextAsync(bool internal_read) { - if (eof_) { - return Future>::MakeFinished(nullptr); - } - if (io_context_.stop_token().IsStopRequested()) { - eof_ = true; - return io_context_.stop_token().Poll(); + Status InitAfterFirstBatch(const DecodedBlock& first_block, + AsyncGenerator batch_gen, int max_readahead) { + schema_ = first_block.record_batch->schema(); + + AsyncGenerator readahead_gen; + if (read_options_.use_threads) { + readahead_gen = MakeReadaheadGenerator(std::move(batch_gen), max_readahead); + } else { + readahead_gen = std::move(batch_gen); } - auto self = shared_from_this(); - if (!block_generator_) { - return SetupReader(self).Then( - [self, internal_read]() -> Future> { - return self->ReadNextSkippingEmpty(self, internal_read); - }, - [self](const Status& err) -> Result> { - self->eof_ = true; - return err; - }); + + AsyncGenerator restarted_gen; + // Streaming reader should not emit empty record batches + if (first_block.record_batch->num_rows() > 0) { + restarted_gen = MakeGeneratorStartsWith({first_block}, std::move(readahead_gen)); } else { - return self->ReadNextSkippingEmpty(self, internal_read); + restarted_gen = std::move(readahead_gen); } + + auto bytes_decoded = bytes_decoded_; + auto unwrap_and_record_bytes = + [bytes_decoded]( + const DecodedBlock& block) -> Result> { + bytes_decoded->fetch_add(block.bytes_processed); + return block.record_batch; + }; + + auto unwrapped = + MakeMappedGenerator(std::move(restarted_gen), std::move(unwrap_and_record_bytes)); + + record_batch_gen_ = MakeCancellable(std::move(unwrapped), io_context_.stop_token()); + return Status::OK(); } - bool source_eof_ = false; - int64_t last_block_index_ = 0; - AsyncGenerator block_generator_; - // bytes of data parsed but not yet decoded - int64_t bytes_parsed_ = 0; - // bytes which have been decoded for caller - int64_t bytes_decoded_ = 0; + std::shared_ptr schema_; + AsyncGenerator> record_batch_gen_; + // bytes which have been decoded and asked for by the caller + std::shared_ptr> bytes_decoded_; }; ///////////////////////////////////////////////////////////////////////// @@ -1089,11 +1132,13 @@ Future> MakeStreamingReader( RETURN_NOT_OK(parse_options.Validate()); RETURN_NOT_OK(read_options.Validate()); RETURN_NOT_OK(convert_options.Validate()); - std::shared_ptr reader; - reader = std::make_shared( - io_context, cpu_executor, input, read_options, parse_options, convert_options, - /*count_rows=*/true); - return reader->Init(); + std::shared_ptr reader; + reader = std::make_shared(io_context, input, read_options, + parse_options, convert_options, + /*count_rows=*/true); + return reader->Init(cpu_executor).Then([reader] { + return std::dynamic_pointer_cast(reader); + }); } ///////////////////////////////////////////////////////////////////////// @@ -1139,8 +1184,9 @@ class CSVRowCounter : public ReaderMixin, } Future DoCount(const std::shared_ptr& self) { - // We must return a value instead of Status/Future<> to work with MakeMappedGenerator, - // and we must use a type with a valid end value to work with IterationEnd. + // count_cb must return a value instead of Status/Future<> to work with + // MakeMappedGenerator, and it must use a type with a valid end value to work with + // IterationEnd. std::function>(const CSVBlock&)> count_cb = [self](const CSVBlock& maybe_block) -> Result> { ARROW_ASSIGN_OR_RAISE( diff --git a/cpp/src/arrow/csv/reader_test.cc b/cpp/src/arrow/csv/reader_test.cc index 1ab49fa8664..88ead7677f3 100644 --- a/cpp/src/arrow/csv/reader_test.cc +++ b/cpp/src/arrow/csv/reader_test.cc @@ -67,6 +67,38 @@ class StreamingReaderAsTableReader : public TableReader { using TableReaderFactory = std::function>(std::shared_ptr)>; +using StreamingReaderFactory = std::function>( + std::shared_ptr)>; + +void TestEmptyTable(TableReaderFactory reader_factory) { + auto empty_buffer = std::make_shared(""); + auto empty_input = std::make_shared(empty_buffer); + auto maybe_reader = reader_factory(empty_input); + // Streaming reader fails on open, table readers fail on first read + if (maybe_reader.ok()) { + ASSERT_FINISHES_AND_RAISES(Invalid, (*maybe_reader)->ReadAsync()); + } else { + ASSERT_TRUE(maybe_reader.status().IsInvalid()); + } +} + +void TestHeaderOnly(TableReaderFactory reader_factory) { + auto header_only_buffer = std::make_shared("a,b,c\n"); + auto input = std::make_shared(header_only_buffer); + ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input)); + ASSERT_FINISHES_OK_AND_ASSIGN(auto table, reader->ReadAsync()); + ASSERT_EQ(table->schema()->num_fields(), 3); + ASSERT_EQ(table->num_rows(), 0); +} + +void TestHeaderOnlyStreaming(StreamingReaderFactory reader_factory) { + auto header_only_buffer = std::make_shared("a,b,c\n"); + auto input = std::make_shared(header_only_buffer); + ASSERT_OK_AND_ASSIGN(auto reader, reader_factory(input)); + std::shared_ptr next_batch; + ASSERT_OK(reader->ReadNext(&next_batch)); + ASSERT_EQ(next_batch, nullptr); +} void StressTableReader(TableReaderFactory reader_factory) { #ifdef ARROW_VALGRIND @@ -151,6 +183,8 @@ TableReaderFactory MakeSerialFactory() { }; } +TEST(SerialReaderTests, Empty) { TestEmptyTable(MakeSerialFactory()); } +TEST(SerialReaderTests, HeaderOnly) { TestHeaderOnly(MakeSerialFactory()); } TEST(SerialReaderTests, Stress) { StressTableReader(MakeSerialFactory()); } TEST(SerialReaderTests, StressInvalid) { StressInvalidTableReader(MakeSerialFactory()); } TEST(SerialReaderTests, NestedParallelism) { @@ -175,6 +209,14 @@ Result MakeAsyncFactory( }; } +TEST(AsyncReaderTests, Empty) { + ASSERT_OK_AND_ASSIGN(auto table_factory, MakeAsyncFactory()); + TestEmptyTable(table_factory); +} +TEST(AsyncReaderTests, HeaderOnly) { + ASSERT_OK_AND_ASSIGN(auto table_factory, MakeAsyncFactory()); + TestHeaderOnly(table_factory); +} TEST(AsyncReaderTests, Stress) { ASSERT_OK_AND_ASSIGN(auto table_factory, MakeAsyncFactory()); StressTableReader(table_factory); @@ -194,6 +236,7 @@ Result MakeStreamingFactory() { -> Result> { auto read_options = ReadOptions::Defaults(); read_options.block_size = 1 << 10; + read_options.use_threads = true; ARROW_ASSIGN_OR_RAISE( auto streaming_reader, StreamingReader::Make(io::default_io_context(), input_stream, read_options, @@ -202,6 +245,25 @@ Result MakeStreamingFactory() { }; } +Result MakeStreamingReaderFactory() { + return [](std::shared_ptr input_stream) + -> Result> { + auto read_options = ReadOptions::Defaults(); + read_options.block_size = 1 << 10; + read_options.use_threads = true; + return StreamingReader::Make(io::default_io_context(), input_stream, read_options, + ParseOptions::Defaults(), ConvertOptions::Defaults()); + }; +} + +TEST(StreamingReaderTests, Empty) { + ASSERT_OK_AND_ASSIGN(auto table_factory, MakeStreamingFactory()); + TestEmptyTable(table_factory); +} +TEST(StreamingReaderTests, HeaderOnly) { + ASSERT_OK_AND_ASSIGN(auto table_factory, MakeStreamingReaderFactory()); + TestHeaderOnlyStreaming(table_factory); +} TEST(StreamingReaderTests, Stress) { ASSERT_OK_AND_ASSIGN(auto table_factory, MakeStreamingFactory()); StressTableReader(table_factory); @@ -227,18 +289,20 @@ TEST(StreamingReaderTest, BytesRead) { auto read_options = ReadOptions::Defaults(); read_options.block_size = 20; + read_options.use_threads = false; ASSERT_OK_AND_ASSIGN( auto streaming_reader, StreamingReader::Make(io::default_io_context(), input, read_options, ParseOptions::Defaults(), ConvertOptions::Defaults())); std::shared_ptr batch; - int64_t bytes = 6; // Size of header + int64_t bytes = 6; // Size of header (counted during StreamingReader::Make) do { ASSERT_EQ(bytes, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); bytes += 12; // Add size of each row - } while (batch); + } while (bytes <= 42); ASSERT_EQ(42, streaming_reader->bytes_read()); + ASSERT_EQ(batch.get(), nullptr); } // Interaction of skip_rows and bytes_read() @@ -246,13 +310,18 @@ TEST(StreamingReaderTest, BytesRead) { auto input = std::make_shared(table_buffer); auto read_options = ReadOptions::Defaults(); - read_options.skip_rows = 2; + read_options.skip_rows = 1; + read_options.block_size = 32; ASSERT_OK_AND_ASSIGN( auto streaming_reader, StreamingReader::Make(io::default_io_context(), input, read_options, ParseOptions::Defaults(), ConvertOptions::Defaults())); std::shared_ptr batch; - // first two rows and third row as header + // The header (6 bytes) and first skipped row (12 bytes) are counted during + // StreamingReader::Make + ASSERT_EQ(18, streaming_reader->bytes_read()); + ASSERT_OK(streaming_reader->ReadNext(&batch)); + ASSERT_NE(batch.get(), nullptr); ASSERT_EQ(30, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); ASSERT_NE(batch.get(), nullptr); @@ -266,7 +335,8 @@ TEST(StreamingReaderTest, BytesRead) { auto input = std::make_shared(table_buffer); auto read_options = ReadOptions::Defaults(); - read_options.skip_rows_after_names = 2; + read_options.block_size = 32; + read_options.skip_rows_after_names = 1; ASSERT_OK_AND_ASSIGN( auto streaming_reader, @@ -274,10 +344,14 @@ TEST(StreamingReaderTest, BytesRead) { ParseOptions::Defaults(), ConvertOptions::Defaults())); std::shared_ptr batch; - // Just header + // The header is read as part of StreamingReader::Make ASSERT_EQ(6, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); ASSERT_NE(batch.get(), nullptr); + // Next the skipped batch (12 bytes) and 1 row (12 bytes) + ASSERT_EQ(30, streaming_reader->bytes_read()); + ASSERT_OK(streaming_reader->ReadNext(&batch)); + ASSERT_NE(batch.get(), nullptr); ASSERT_EQ(42, streaming_reader->bytes_read()); ASSERT_OK(streaming_reader->ReadNext(&batch)); ASSERT_EQ(batch.get(), nullptr); diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index c2aad6cd680..8992e7bcac2 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -907,6 +907,8 @@ AsyncGenerator MakeVectorGenerator(std::vector vec) { return [state]() { auto idx = state->vec_idx.fetch_add(1); if (idx >= state->vec.size()) { + // Eagerly return memory + state->vec.clear(); return AsyncGeneratorEnd(); } return Future::MakeFinished(state->vec[idx]); diff --git a/python/pyarrow/tests/test_csv.py b/python/pyarrow/tests/test_csv.py index 050342de747..e4a4e3e5935 100644 --- a/python/pyarrow/tests/test_csv.py +++ b/python/pyarrow/tests/test_csv.py @@ -1250,10 +1250,16 @@ def read_csv(self, *args, validate_full=True, **kwargs): return table -class BaseTestStreamingCSVRead: +@pytest.mark.parametrize('use_threads', [False, True]) +class TestStreamingCSVRead: - def open_bytes(self, b, **kwargs): - return self.open_csv(pa.py_buffer(b), **kwargs) + def open_bytes(self, b, use_threads, **kwargs): + return self.open_csv(pa.py_buffer(b), use_threads, **kwargs) + + def open_csv(self, b, use_threads, *args, **kwargs): + read_options = kwargs.setdefault('read_options', ReadOptions()) + read_options.use_threads = use_threads + return open_csv(b, *args, **kwargs) def check_reader(self, reader, expected_schema, expected_data): assert reader.schema == expected_schema @@ -1264,24 +1270,24 @@ def check_reader(self, reader, expected_schema, expected_data): assert batch.schema == expected_schema assert batch.to_pydict() == expected_batch - def test_file_object(self): + def test_file_object(self, use_threads): data = b"a,b\n1,2\n3,4\n" expected_data = {'a': [1, 3], 'b': [2, 4]} bio = io.BytesIO(data) - reader = self.open_csv(bio) + reader = self.open_csv(bio, use_threads) expected_schema = pa.schema([('a', pa.int64()), ('b', pa.int64())]) self.check_reader(reader, expected_schema, [expected_data]) - def test_header(self): + def test_header(self, use_threads): rows = b"abc,def,gh\n" - reader = self.open_bytes(rows) + reader = self.open_bytes(rows, use_threads) expected_schema = pa.schema([('abc', pa.null()), ('def', pa.null()), ('gh', pa.null())]) self.check_reader(reader, expected_schema, []) - def test_inference(self): + def test_inference(self, use_threads): # Inference is done on first block rows = b"a,b\n123,456\nabc,de\xff\ngh,ij\n" expected_schema = pa.schema([('a', pa.string()), @@ -1289,25 +1295,25 @@ def test_inference(self): read_options = ReadOptions() read_options.block_size = len(rows) - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) self.check_reader(reader, expected_schema, [{'a': ['123', 'abc', 'gh'], 'b': [b'456', b'de\xff', b'ij']}]) read_options.block_size = len(rows) - 1 - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) self.check_reader(reader, expected_schema, [{'a': ['123', 'abc'], 'b': [b'456', b'de\xff']}, {'a': ['gh'], 'b': [b'ij']}]) - def test_inference_failure(self): + def test_inference_failure(self, use_threads): # Inference on first block, then conversion failure on second block rows = b"a,b\n123,456\nabc,de\xff\ngh,ij\n" read_options = ReadOptions() read_options.block_size = len(rows) - 7 - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) expected_schema = pa.schema([('a', pa.int64()), ('b', pa.int64())]) assert reader.schema == expected_schema @@ -1322,38 +1328,20 @@ def test_inference_failure(self): with pytest.raises(StopIteration): reader.read_next_batch() - # Inference on first block, then conversion failure on second block, - # then success on third block - rows = b"a,b\n1,2\nabc,def\n45,67\n" - read_options.block_size = 8 - reader = self.open_bytes(rows, read_options=read_options) - expected_schema = pa.schema([('a', pa.int64()), - ('b', pa.int64())]) - assert reader.schema == expected_schema - assert reader.read_next_batch().to_pydict() == {'a': [1], 'b': [2]} - # Second block - with pytest.raises(ValueError, - match="CSV conversion error to int64"): - reader.read_next_batch() - # Third block - assert reader.read_next_batch().to_pydict() == {'a': [45], 'b': [67]} - # EOF - with pytest.raises(StopIteration): - reader.read_next_batch() - - def test_invalid_csv(self): + def test_invalid_csv(self, use_threads): # CSV errors on first block rows = b"a,b\n1,2,3\n4,5\n6,7\n" read_options = ReadOptions() read_options.block_size = 10 with pytest.raises(pa.ArrowInvalid, match="Expected 2 columns, got 3"): - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes( + rows, use_threads, read_options=read_options) # CSV errors on second block rows = b"a,b\n1,2\n3,4,5\n6,7\n" read_options.block_size = 8 - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) assert reader.read_next_batch().to_pydict() == {'a': [1], 'b': [2]} with pytest.raises(pa.ArrowInvalid, match="Expected 2 columns, got 3"): @@ -1362,9 +1350,9 @@ def test_invalid_csv(self): with pytest.raises(StopIteration): reader.read_next_batch() - def test_options_delimiter(self): + def test_options_delimiter(self, use_threads): rows = b"a;b,c\nde,fg;eh\n" - reader = self.open_bytes(rows) + reader = self.open_bytes(rows, use_threads) expected_schema = pa.schema([('a;b', pa.string()), ('c', pa.string())]) self.check_reader(reader, expected_schema, @@ -1372,17 +1360,17 @@ def test_options_delimiter(self): 'c': ['fg;eh']}]) opts = ParseOptions(delimiter=';') - reader = self.open_bytes(rows, parse_options=opts) + reader = self.open_bytes(rows, use_threads, parse_options=opts) expected_schema = pa.schema([('a', pa.string()), ('b,c', pa.string())]) self.check_reader(reader, expected_schema, [{'a': ['de,fg'], 'b,c': ['eh']}]) - def test_no_ending_newline(self): + def test_no_ending_newline(self, use_threads): # No \n after last line rows = b"a,b,c\n1,2,3\n4,5,6" - reader = self.open_bytes(rows) + reader = self.open_bytes(rows, use_threads) expected_schema = pa.schema([('a', pa.int64()), ('b', pa.int64()), ('c', pa.int64())]) @@ -1391,16 +1379,16 @@ def test_no_ending_newline(self): 'b': [2, 5], 'c': [3, 6]}]) - def test_empty_file(self): + def test_empty_file(self, use_threads): with pytest.raises(ValueError, match="Empty CSV file"): - self.open_bytes(b"") + self.open_bytes(b"", use_threads) - def test_column_options(self): + def test_column_options(self, use_threads): # With column_names rows = b"1,2,3\n4,5,6" read_options = ReadOptions() read_options.column_names = ['d', 'e', 'f'] - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) expected_schema = pa.schema([('d', pa.int64()), ('e', pa.int64()), ('f', pa.int64())]) @@ -1412,7 +1400,7 @@ def test_column_options(self): # With include_columns convert_options = ConvertOptions() convert_options.include_columns = ['f', 'e'] - reader = self.open_bytes(rows, read_options=read_options, + reader = self.open_bytes(rows, use_threads, read_options=read_options, convert_options=convert_options) expected_schema = pa.schema([('f', pa.int64()), ('e', pa.int64())]) @@ -1422,7 +1410,7 @@ def test_column_options(self): # With column_types convert_options.column_types = {'e': pa.string()} - reader = self.open_bytes(rows, read_options=read_options, + reader = self.open_bytes(rows, use_threads, read_options=read_options, convert_options=convert_options) expected_schema = pa.schema([('f', pa.int64()), ('e', pa.string())]) @@ -1435,11 +1423,12 @@ def test_column_options(self): with pytest.raises( KeyError, match="Column 'g' in include_columns does not exist"): - reader = self.open_bytes(rows, read_options=read_options, + reader = self.open_bytes(rows, use_threads, + read_options=read_options, convert_options=convert_options) convert_options.include_missing_columns = True - reader = self.open_bytes(rows, read_options=read_options, + reader = self.open_bytes(rows, use_threads, read_options=read_options, convert_options=convert_options) expected_schema = pa.schema([('g', pa.null()), ('f', pa.int64()), @@ -1450,7 +1439,7 @@ def test_column_options(self): 'f': [3, 6]}]) convert_options.column_types = {'e': pa.string(), 'g': pa.float64()} - reader = self.open_bytes(rows, read_options=read_options, + reader = self.open_bytes(rows, use_threads, read_options=read_options, convert_options=convert_options) expected_schema = pa.schema([('g', pa.float64()), ('f', pa.int64()), @@ -1460,11 +1449,11 @@ def test_column_options(self): 'e': ["2", "5"], 'f': [3, 6]}]) - def test_encoding(self): + def test_encoding(self, use_threads): # latin-1 (invalid utf-8) rows = b"a,b\nun,\xe9l\xe9phant" read_options = ReadOptions() - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) expected_schema = pa.schema([('a', pa.string()), ('b', pa.binary())]) self.check_reader(reader, expected_schema, @@ -1472,7 +1461,7 @@ def test_encoding(self): 'b': [b"\xe9l\xe9phant"]}]) read_options.encoding = 'latin1' - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) expected_schema = pa.schema([('a', pa.string()), ('b', pa.string())]) self.check_reader(reader, expected_schema, @@ -1483,22 +1472,22 @@ def test_encoding(self): rows = (b'\xff\xfea\x00,\x00b\x00\n\x00u\x00n\x00,' b'\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00') read_options.encoding = 'utf16' - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) expected_schema = pa.schema([('a', pa.string()), ('b', pa.string())]) self.check_reader(reader, expected_schema, [{'a': ["un"], 'b': ["éléphant"]}]) - def test_small_random_csv(self): + def test_small_random_csv(self, use_threads): csv, expected = make_random_csv(num_cols=2, num_rows=10) - reader = self.open_bytes(csv) + reader = self.open_bytes(csv, use_threads) table = reader.read_all() assert table.schema == expected.schema assert table.equals(expected) assert table.to_pydict() == expected.to_pydict() - def test_stress_block_sizes(self): + def test_stress_block_sizes(self, use_threads): # Test a number of small block sizes to stress block stitching csv_base, expected = make_random_csv(num_cols=2, num_rows=500) block_sizes = [19, 21, 23, 26, 37, 111] @@ -1508,22 +1497,15 @@ def test_stress_block_sizes(self): # Need at least two lines for type inference assert csv[:block_size].count(b'\n') >= 2 read_options = ReadOptions(block_size=block_size) - reader = self.open_bytes(csv, read_options=read_options) + reader = self.open_bytes( + csv, use_threads, read_options=read_options) table = reader.read_all() assert table.schema == expected.schema if not table.equals(expected): # Better error output assert table.to_pydict() == expected.to_pydict() - -class TestSerialStreamingCSVRead(BaseTestStreamingCSVRead, unittest.TestCase): - - def open_csv(self, *args, **kwargs): - read_options = kwargs.setdefault('read_options', ReadOptions()) - read_options.use_threads = False - return open_csv(*args, **kwargs) - - def test_batch_lifetime(self): + def test_batch_lifetime(self, use_threads): gc.collect() old_allocated = pa.total_allocated_bytes() @@ -1536,15 +1518,15 @@ def check_one_batch(reader, expected): read_options = ReadOptions() read_options.column_names = ['a', 'b'] read_options.block_size = 6 - reader = self.open_bytes(rows, read_options=read_options) + reader = self.open_bytes(rows, use_threads, read_options=read_options) check_one_batch(reader, {'a': [10], 'b': [11]}) allocated_after_first_batch = pa.total_allocated_bytes() check_one_batch(reader, {'a': [12], 'b': [13]}) - assert pa.total_allocated_bytes() == allocated_after_first_batch + assert pa.total_allocated_bytes() <= allocated_after_first_batch check_one_batch(reader, {'a': [14], 'b': [15]}) - assert pa.total_allocated_bytes() == allocated_after_first_batch + assert pa.total_allocated_bytes() <= allocated_after_first_batch check_one_batch(reader, {'a': [16], 'b': [17]}) - assert pa.total_allocated_bytes() == allocated_after_first_batch + assert pa.total_allocated_bytes() <= allocated_after_first_batch with pytest.raises(StopIteration): reader.read_next_batch() assert pa.total_allocated_bytes() == old_allocated