From 6a3fed886626ef54949df6d411969407fbcbc332 Mon Sep 17 00:00:00 2001
From: David Li
Date: Tue, 13 Apr 2021 15:43:03 -0400
Subject: [PATCH] ARROW-12161: [C++][Dataset] Revert async CSV reader in
datasets
---
cpp/src/arrow/dataset/file_csv.cc | 72 +++++++++----------------------
1 file changed, 21 insertions(+), 51 deletions(-)
diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc
index 9a7a9d2de4c..f870293190b 100644
--- a/cpp/src/arrow/dataset/file_csv.cc
+++ b/cpp/src/arrow/dataset/file_csv.cc
@@ -34,7 +34,6 @@
#include "arrow/io/compressed.h"
#include "arrow/result.h"
#include "arrow/type.h"
-#include "arrow/util/async_generator.h"
#include "arrow/util/iterator.h"
#include "arrow/util/logging.h"
@@ -114,53 +113,35 @@ static inline Result GetReadOptions(
return read_options;
}
-static inline Future> OpenReaderAsync(
+static inline Result> OpenReader(
const FileSource& source, const CsvFileFormat& format,
- internal::Executor* cpu_executor,
const std::shared_ptr& scan_options = nullptr,
MemoryPool* pool = default_memory_pool()) {
ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));
+ util::string_view first_block;
ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
ARROW_ASSIGN_OR_RAISE(
input, io::BufferedInputStream::Create(reader_options.block_size,
default_memory_pool(), std::move(input)));
+ ARROW_ASSIGN_OR_RAISE(first_block, input->Peek(reader_options.block_size));
- auto peek_fut = DeferNotOk(input->io_context().executor()->Submit(
- [input, reader_options] { return input->Peek(reader_options.block_size); }));
-
- return peek_fut.Then([=](const util::string_view& first_block)
- -> Future> {
- const auto& parse_options = format.parse_options;
- auto convert_options = csv::ConvertOptions::Defaults();
- if (scan_options != nullptr) {
- ARROW_ASSIGN_OR_RAISE(convert_options,
- GetConvertOptions(format, scan_options, first_block, pool));
- }
-
- return csv::StreamingReader::MakeAsync(io::default_io_context(), std::move(input),
- cpu_executor, reader_options, parse_options,
- convert_options)
- .Then(
- [](const std::shared_ptr& maybe_reader)
- -> Result> { return maybe_reader; },
- [source](const Status& err) -> Result> {
- return err.WithMessage("Could not open CSV input source '", source.path(),
- "': ", err);
- });
- });
-}
+ const auto& parse_options = format.parse_options;
+ auto convert_options = csv::ConvertOptions::Defaults();
+ if (scan_options != nullptr) {
+ ARROW_ASSIGN_OR_RAISE(convert_options,
+ GetConvertOptions(format, scan_options, first_block, pool));
+ }
-static inline Result> OpenReader(
- const FileSource& source, const CsvFileFormat& format,
- const std::shared_ptr& scan_options = nullptr,
- MemoryPool* pool = default_memory_pool()) {
- bool use_threads = (scan_options != nullptr && scan_options->use_threads);
- return internal::RunSynchronously>(
- [&](Executor* executor) {
- return OpenReaderAsync(source, format, executor, scan_options, pool);
- },
- use_threads);
+ auto maybe_reader =
+ csv::StreamingReader::Make(io::IOContext(pool), std::move(input), reader_options,
+ parse_options, convert_options);
+ if (!maybe_reader.ok()) {
+ return maybe_reader.status().WithMessage("Could not open CSV input source '",
+ source.path(), "': ", maybe_reader.status());
+ }
+
+ return std::move(maybe_reader).ValueOrDie();
}
/// \brief A ScanTask backed by an Csv file.
@@ -174,20 +155,9 @@ class CsvScanTask : public ScanTask {
source_(fragment->source()) {}
Result Execute() override {
- ARROW_ASSIGN_OR_RAISE(auto gen, ExecuteAsync(internal::GetCpuThreadPool()));
- return MakeGeneratorIterator(std::move(gen));
- }
-
- bool supports_async() const override { return true; }
-
- Result ExecuteAsync(internal::Executor* cpu_executor) override {
- auto reader_fut =
- OpenReaderAsync(source_, *format_, cpu_executor, options(), options()->pool);
- auto generator_fut = reader_fut.Then(
- [](const std::shared_ptr& reader) -> RecordBatchGenerator {
- return [reader]() { return reader->ReadNextAsync(); };
- });
- return MakeFromFuture(generator_fut);
+ ARROW_ASSIGN_OR_RAISE(auto reader,
+ OpenReader(source_, *format_, options(), options()->pool));
+ return IteratorFromReader(std::move(reader));
}
private: