Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
223 changes: 77 additions & 146 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,19 +199,6 @@ class SerialBlockReader : public BlockReader {
return MakeTransformedIterator(std::move(buffer_iterator), block_reader_fn);
}

static AsyncGenerator<CSVBlock> MakeAsyncIterator(
AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,
std::unique_ptr<Chunker> chunker, std::shared_ptr<Buffer> first_buffer) {
auto block_reader =
std::make_shared<SerialBlockReader>(std::move(chunker), first_buffer);
// Wrap shared pointer in callable
Transformer<std::shared_ptr<Buffer>, CSVBlock> block_reader_fn =
[block_reader](std::shared_ptr<Buffer> next) {
return (*block_reader)(std::move(next));
};
return MakeTransformedGenerator(std::move(buffer_generator), block_reader_fn);
}

Result<TransformFlow<CSVBlock>> operator()(std::shared_ptr<Buffer> next_buffer) {
if (buffer_ == nullptr) {
return TransformFinish();
Expand Down Expand Up @@ -585,25 +572,22 @@ class BaseTableReader : public ReaderMixin, public csv::TableReader {

class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader {
public:
BaseStreamingReader(io::IOContext io_context, Executor* cpu_executor,
std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options)
: ReaderMixin(io_context, std::move(input), read_options, parse_options,
convert_options),
cpu_executor_(cpu_executor) {}
using ReaderMixin::ReaderMixin;

virtual Future<std::shared_ptr<csv::StreamingReader>> Init() = 0;
virtual Status Init() = 0;

std::shared_ptr<Schema> schema() const override { return schema_; }

Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
auto next_fut = ReadNextAsync();
auto next_result = next_fut.result();
return std::move(next_result).Value(batch);
do {
RETURN_NOT_OK(ReadNext().Value(batch));
} while (*batch != nullptr && (*batch)->num_rows() == 0);
return Status::OK();
}

protected:
virtual Result<std::shared_ptr<RecordBatch>> ReadNext() = 0;

// Make column decoders from conversion schema
Status MakeColumnDecoders() {
for (const auto& column : conversion_schema_.columns) {
Expand Down Expand Up @@ -686,141 +670,101 @@ class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader {
std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_;
std::shared_ptr<Schema> schema_;
std::shared_ptr<RecordBatch> pending_batch_;
AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
Executor* cpu_executor_;
Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
bool eof_ = false;
};

/////////////////////////////////////////////////////////////////////////
// Serial StreamingReader implementation

class SerialStreamingReader : public BaseStreamingReader,
public std::enable_shared_from_this<SerialStreamingReader> {
class SerialStreamingReader : public BaseStreamingReader {
public:
using BaseStreamingReader::BaseStreamingReader;

Future<std::shared_ptr<csv::StreamingReader>> Init() override {
Status Init() override {
ARROW_ASSIGN_OR_RAISE(auto istream_it,
io::MakeInputStreamIterator(input_, read_options_.block_size));

// TODO Consider exposing readahead as a read option (ARROW-12090)
ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it),
io_context_.executor()));

auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_);

buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it));
// Since we're converting serially, no need to readahead more than one block
int32_t block_queue_size = 1;
ARROW_ASSIGN_OR_RAISE(auto rh_it,
MakeReadaheadIterator(std::move(istream_it), block_queue_size));
buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it));
task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token());

auto self = shared_from_this();
// Read schema from first batch
return ReadNextAsync().Then([self](const std::shared_ptr<RecordBatch>& first_batch)
-> Result<std::shared_ptr<csv::StreamingReader>> {
self->pending_batch_ = first_batch;
DCHECK_NE(self->schema_, nullptr);
return self;
});
ARROW_ASSIGN_OR_RAISE(pending_batch_, ReadNext());
DCHECK_NE(schema_, nullptr);
return Status::OK();
}

Result<std::shared_ptr<RecordBatch>> DecodeBatchAndUpdateSchema() {
auto maybe_batch = DecodeNextBatch();
if (schema_ == nullptr && maybe_batch.ok()) {
schema_ = (*maybe_batch)->schema();
protected:
Result<std::shared_ptr<RecordBatch>> ReadNext() override {
if (eof_) {
return nullptr;
}
if (io_context_.stop_token().IsStopRequested()) {
eof_ = true;
return io_context_.stop_token().Poll();
}
if (!block_iterator_) {
Status st = SetupReader();
if (!st.ok()) {
// Can't setup reader => bail out
eof_ = true;
return st;
}
}
return maybe_batch;
}

Future<std::shared_ptr<RecordBatch>> DoReadNext(
std::shared_ptr<SerialStreamingReader> self) {
auto batch = std::move(pending_batch_);
if (batch != nullptr) {
return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch);
return batch;
}

if (!source_eof_) {
return block_generator_()
.Then([self](const CSVBlock& maybe_block) -> Status {
if (!IsIterationEnd(maybe_block)) {
self->last_block_index_ = maybe_block.block_index;
auto maybe_parsed = self->ParseAndInsert(
maybe_block.partial, maybe_block.completion, maybe_block.buffer,
maybe_block.block_index, maybe_block.is_final);
if (!maybe_parsed.ok()) {
// Parse error => bail out
self->eof_ = true;
return maybe_parsed.status();
}
RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed));
} else {
self->source_eof_ = true;
for (auto& decoder : self->column_decoders_) {
decoder->SetEOF(self->last_block_index_ + 1);
}
}
return Status::OK();
})
.Then([self](const ::arrow::detail::Empty& st)
-> Result<std::shared_ptr<RecordBatch>> {
return self->DecodeBatchAndUpdateSchema();
});
}
return Future<std::shared_ptr<RecordBatch>>::MakeFinished(
DecodeBatchAndUpdateSchema());
}

Future<std::shared_ptr<RecordBatch>> ReadNextSkippingEmpty(
std::shared_ptr<SerialStreamingReader> self) {
return DoReadNext(self).Then([self](const std::shared_ptr<RecordBatch>& batch) {
if (batch != nullptr && batch->num_rows() == 0) {
return self->ReadNextSkippingEmpty(self);
ARROW_ASSIGN_OR_RAISE(auto maybe_block, block_iterator_.Next());
if (!IsIterationEnd(maybe_block)) {
last_block_index_ = maybe_block.block_index;
auto maybe_parsed = ParseAndInsert(maybe_block.partial, maybe_block.completion,
maybe_block.buffer, maybe_block.block_index,
maybe_block.is_final);
if (!maybe_parsed.ok()) {
// Parse error => bail out
eof_ = true;
return maybe_parsed.status();
}
RETURN_NOT_OK(maybe_block.consume_bytes(*maybe_parsed));
} else {
source_eof_ = true;
for (auto& decoder : column_decoders_) {
decoder->SetEOF(last_block_index_ + 1);
}
}
return Future<std::shared_ptr<RecordBatch>>::MakeFinished(batch);
});
}

Future<std::shared_ptr<RecordBatch>> ReadNextAsync() override {
if (eof_) {
return Future<std::shared_ptr<RecordBatch>>::MakeFinished(nullptr);
}
if (io_context_.stop_token().IsStopRequested()) {
eof_ = true;
return io_context_.stop_token().Poll();
}
auto self = shared_from_this();
if (!block_generator_) {
return SetupReader(self).Then([self](const Result<::arrow::detail::Empty>& res)
-> Future<std::shared_ptr<RecordBatch>> {
if (!res.ok()) {
self->eof_ = true;
return res.status();
}
return self->ReadNextSkippingEmpty(self);
});
} else {
return self->ReadNextSkippingEmpty(self);

auto maybe_batch = DecodeNextBatch();
if (schema_ == nullptr && maybe_batch.ok()) {
schema_ = (*maybe_batch)->schema();
}
return maybe_batch;
};

protected:
Future<> SetupReader(std::shared_ptr<SerialStreamingReader> self) {
return buffer_generator_().Then([self](const std::shared_ptr<Buffer>& first_buffer) {
if (first_buffer == nullptr) {
return Status::Invalid("Empty CSV file");
}
auto own_first_buffer = first_buffer;
RETURN_NOT_OK(self->ProcessHeader(own_first_buffer, &own_first_buffer));
RETURN_NOT_OK(self->MakeColumnDecoders());
Status SetupReader() {
ARROW_ASSIGN_OR_RAISE(auto first_buffer, buffer_iterator_.Next());
if (first_buffer == nullptr) {
return Status::Invalid("Empty CSV file");
}
RETURN_NOT_OK(ProcessHeader(first_buffer, &first_buffer));
RETURN_NOT_OK(MakeColumnDecoders());

self->block_generator_ = SerialBlockReader::MakeAsyncIterator(
std::move(self->buffer_generator_), MakeChunker(self->parse_options_),
std::move(own_first_buffer));
return Status::OK();
});
block_iterator_ = SerialBlockReader::MakeIterator(std::move(buffer_iterator_),
MakeChunker(parse_options_),
std::move(first_buffer));
return Status::OK();
}

bool source_eof_ = false;
int64_t last_block_index_ = 0;
AsyncGenerator<CSVBlock> block_generator_;
Iterator<CSVBlock> block_iterator_;
};

/////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -999,14 +943,15 @@ Result<std::shared_ptr<TableReader>> MakeTableReader(
return reader;
}

Future<std::shared_ptr<StreamingReader>> MakeStreamingReader(
Result<std::shared_ptr<StreamingReader>> MakeStreamingReader(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
internal::Executor* cpu_executor, const ReadOptions& read_options,
const ParseOptions& parse_options, const ConvertOptions& convert_options) {
std::shared_ptr<BaseStreamingReader> reader;
reader = std::make_shared<SerialStreamingReader>(
io_context, cpu_executor, input, read_options, parse_options, convert_options);
return reader->Init();
reader = std::make_shared<SerialStreamingReader>(io_context, input, read_options,
parse_options, convert_options);
RETURN_NOT_OK(reader->Init());
return reader;
}

} // namespace
Expand Down Expand Up @@ -1036,29 +981,15 @@ Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
const ConvertOptions& convert_options) {
auto io_context = io::IOContext(pool);
auto cpu_executor = internal::GetCpuThreadPool();
auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor,
read_options, parse_options, convert_options);
auto reader_result = reader_fut.result();
ARROW_ASSIGN_OR_RAISE(auto reader, reader_result);
return reader;
return MakeStreamingReader(io_context, std::move(input), cpu_executor, read_options,
parse_options, convert_options);
}

Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
auto cpu_executor = internal::GetCpuThreadPool();
auto reader_fut = MakeStreamingReader(io_context, std::move(input), cpu_executor,
read_options, parse_options, convert_options);
auto reader_result = reader_fut.result();
ARROW_ASSIGN_OR_RAISE(auto reader, reader_result);
return reader;
}

Future<std::shared_ptr<StreamingReader>> StreamingReader::MakeAsync(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
internal::Executor* cpu_executor, const ReadOptions& read_options,
const ParseOptions& parse_options, const ConvertOptions& convert_options) {
return MakeStreamingReader(io_context, std::move(input), cpu_executor, read_options,
parse_options, convert_options);
}
Expand Down
11 changes: 0 additions & 11 deletions cpp/src/arrow/csv/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,6 @@ class ARROW_EXPORT StreamingReader : public RecordBatchReader {
virtual ~StreamingReader() = default;

/// Create a StreamingReader instance
///
/// This involves some I/O as the first batch must be loaded during the creation process
/// so it is returned as a future
///
/// Currently, the StreamingReader is not async-reentrant and does not do any fan-out
/// parsing (see ARROW-11889)
static Future<std::shared_ptr<StreamingReader>> MakeAsync(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
internal::Executor* cpu_executor, const ReadOptions&, const ParseOptions&,
const ConvertOptions&);

static Result<std::shared_ptr<StreamingReader>> Make(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions&, const ParseOptions&, const ConvertOptions&);
Expand Down
15 changes: 3 additions & 12 deletions cpp/src/arrow/csv/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,10 @@
#include "arrow/table.h"
#include "arrow/testing/future_util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/future.h"
#include "arrow/util/thread_pool.h"

namespace arrow {

using RecordBatchGenerator = AsyncGenerator<std::shared_ptr<RecordBatch>>;

namespace csv {

// Allows the streaming reader to be used in tests that expect a table reader
Expand All @@ -49,17 +45,12 @@ class StreamingReaderAsTableReader : public TableReader {
: reader_(std::move(reader)) {}
virtual ~StreamingReaderAsTableReader() = default;
virtual Result<std::shared_ptr<Table>> Read() {
auto table_fut = ReadAsync();
auto table_res = table_fut.result();
ARROW_ASSIGN_OR_RAISE(auto table, table_res);
std::shared_ptr<Table> table;
RETURN_NOT_OK(reader_->ReadAll(&table));
return table;
}
virtual Future<std::shared_ptr<Table>> ReadAsync() {
auto reader = reader_;
RecordBatchGenerator rb_generator = [reader]() { return reader->ReadNextAsync(); };
return CollectAsyncGenerator(rb_generator).Then([](const RecordBatchVector& rbs) {
return Table::FromRecordBatches(rbs);
});
return Future<std::shared_ptr<Table>>::MakeFinished(Read());
}

private:
Expand Down
25 changes: 8 additions & 17 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -428,25 +428,16 @@ Future<> WriteInternal(const ScanOptions& scan_options, WriteState& state,
auto task_group = scan_options.TaskGroup();

for (const auto& scan_task : scan_tasks) {
if (scan_task->supports_async()) {
ARROW_ASSIGN_OR_RAISE(auto batches_gen, scan_task->ExecuteAsync(cpu_executor));
std::function<Status(std::shared_ptr<RecordBatch> batch)> batch_visitor =
[&, scan_task](std::shared_ptr<RecordBatch> batch) {
return WriteNextBatch(state, scan_task->fragment(), std::move(batch));
};
scan_futs.push_back(VisitAsyncGenerator(batches_gen, batch_visitor));
} else {
task_group->Append([&, scan_task] {
ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute());
task_group->Append([&, scan_task] {
ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute());

for (auto maybe_batch : batches) {
ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
RETURN_NOT_OK(WriteNextBatch(state, scan_task->fragment(), std::move(batch)));
}
for (auto maybe_batch : batches) {
ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
RETURN_NOT_OK(WriteNextBatch(state, scan_task->fragment(), std::move(batch)));
}

return Status::OK();
});
}
return Status::OK();
});
}
scan_futs.push_back(task_group->FinishAsync());
return AllComplete(scan_futs);
Expand Down
Loading