Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 9 additions & 12 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,14 +703,11 @@ class SerialStreamingReader : public BaseStreamingReader,
ARROW_ASSIGN_OR_RAISE(auto istream_it,
io::MakeInputStreamIterator(input_, read_options_.block_size));

// TODO Consider exposing readahead as a read option (ARROW-12090)
ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it),
io_context_.executor()));

// TODO Consider exposing readahead as a read option (ARROW-12090)
auto rh_it =
MakeSerialReadaheadGenerator(std::move(bg_it), cpu_executor_->GetCapacity());

auto transferred_it = MakeTransferredGenerator(rh_it, cpu_executor_);
auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_);

buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it));
task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token());
Expand Down Expand Up @@ -909,15 +906,15 @@ class AsyncThreadedTableReader
ARROW_ASSIGN_OR_RAISE(auto istream_it,
io::MakeInputStreamIterator(input_, read_options_.block_size));

ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it),
io_context_.executor()));
int max_readahead = cpu_executor_->GetCapacity();
int readahead_restart = std::max(1, max_readahead / 2);

auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_);
ARROW_ASSIGN_OR_RAISE(
auto bg_it, MakeBackgroundGenerator(std::move(istream_it), io_context_.executor(),
max_readahead, readahead_restart));

int32_t block_queue_size = cpu_executor_->GetCapacity();
auto rh_it =
MakeSerialReadaheadGenerator(std::move(transferred_it), block_queue_size);
buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(rh_it));
auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_);
buffer_generator_ = CSVBufferIterator::MakeAsync(std::move(transferred_it));
return Status::OK();
}

Expand Down
182 changes: 138 additions & 44 deletions cpp/src/arrow/util/async_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -1096,65 +1096,158 @@ AsyncGenerator<T> MakeIteratorGenerator(Iterator<T> it) {
template <typename T>
class BackgroundGenerator {
public:
explicit BackgroundGenerator(Iterator<T> it, internal::Executor* io_executor)
: io_executor_(io_executor) {
task_ = Task{std::make_shared<Iterator<T>>(std::move(it)),
std::make_shared<std::atomic<bool>>(false)};
}

~BackgroundGenerator() {
// The thread pool will be disposed of automatically. By default it will not wait
// so the background thread may outlive this object. That should be ok. Any task
// objects in the thread pool are copies of task_ and have their own shared_ptr to
// the iterator.
}
explicit BackgroundGenerator(Iterator<T> it, internal::Executor* io_executor, int max_q,
int q_restart)
: state_(std::make_shared<State>(io_executor, std::move(it), max_q, q_restart)) {}

ARROW_DEFAULT_MOVE_AND_ASSIGN(BackgroundGenerator);
ARROW_DISALLOW_COPY_AND_ASSIGN(BackgroundGenerator);
~BackgroundGenerator() {}

Future<T> operator()() {
auto submitted_future = io_executor_->Submit(task_);
if (!submitted_future.ok()) {
return Future<T>::MakeFinished(submitted_future.status());
auto guard = state_->mutex.Lock();
Future<T> waiting_future;
if (state_->queue.empty()) {
if (state_->finished) {
return AsyncGeneratorEnd<T>();
} else {
waiting_future = Future<T>::Make();
state_->waiting_future = waiting_future;
}
} else {
auto next = Future<T>::MakeFinished(std::move(state_->queue.front()));
state_->queue.pop();
if (!state_->running &&
static_cast<int>(state_->queue.size()) <= state_->q_restart) {
state_->RestartTask(state_, std::move(guard));
}
return next;
}
if (!state_->running) {
// This branch should only be needed to start the background thread on the first
// call
state_->RestartTask(state_, std::move(guard));
}
return std::move(*submitted_future);
return waiting_future;
}

protected:
struct Task {
Result<T> operator()() {
if (*done_) {
return IterationTraits<T>::End();
struct State {
State(internal::Executor* io_executor, Iterator<T> it, int max_q, int q_restart)
: io_executor(io_executor),
it(std::move(it)),
running(false),
finished(false),
max_q(max_q),
q_restart(q_restart) {}

void ClearQueue() {
while (!queue.empty()) {
queue.pop();
}
auto next = it_->Next();
if (!next.ok() || IsIterationEnd(*next)) {
*done_ = true;
}

void RestartTask(std::shared_ptr<State> state, util::Mutex::Guard guard) {
if (!finished) {
running = true;
auto spawn_status = io_executor->Spawn([state]() { Task()(std::move(state)); });
if (!spawn_status.ok()) {
running = false;
finished = true;
if (waiting_future.has_value()) {
auto to_deliver = std::move(waiting_future.value());
waiting_future.reset();
guard.Unlock();
to_deliver.MarkFinished(spawn_status);
} else {
ClearQueue();
queue.push(spawn_status);
}
}
}
return next;
}
// This task is going to be copied so we need to convert the iterator ptr to
// a shared ptr. This should be safe however because the background executor only
// has a single thread so it can't access it_ across multiple threads.
std::shared_ptr<Iterator<T>> it_;
std::shared_ptr<std::atomic<bool>> done_;

internal::Executor* io_executor;
Iterator<T> it;
bool running;
bool finished;
int max_q;
int q_restart;
std::queue<Result<T>> queue;
util::optional<Future<T>> waiting_future;
util::Mutex mutex;
};

Task task_;
internal::Executor* io_executor_;
class Task {
public:
void operator()(std::shared_ptr<State> state) {
// while condition can't be based on state_ because it is run outside the mutex
bool running = true;
while (running) {
auto next = state->it.Next();
// Need to capture state->waiting_future inside the mutex to mark finished outside
Future<T> waiting_future;
{
auto guard = state->mutex.Lock();

if (!next.ok() || IsIterationEnd<T>(*next)) {
state->finished = true;
state->running = false;
if (!next.ok()) {
state->ClearQueue();
}
}
if (state->waiting_future.has_value()) {
waiting_future = std::move(state->waiting_future.value());
state->waiting_future.reset();
} else {
state->queue.push(std::move(next));
if (static_cast<int>(state->queue.size()) >= state->max_q) {
state->running = false;
}
}
running = state->running;
}
// This must happen outside the task. Although presumably there is a transferring
// generator on the other end that will quickly transfer any callbacks off of this
// thread so we can continue looping. Still, best not to rely on that
if (waiting_future.is_valid()) {
waiting_future.MarkFinished(next);
}
}
}
};

std::shared_ptr<State> state_;
};

constexpr int kDefaultBackgroundMaxQ = 32;
constexpr int kDefaultBackgroundQRestart = 16;

/// \brief Creates an AsyncGenerator<T> by iterating over an Iterator<T> on a background
/// thread
///
/// This generator is async-reentrant
/// The parameter max_q and q_restart control queue size and background thread task
/// management. If the background task is fast you typically don't want it creating a
/// thread task for every item. Instead the background thread will run until it fills
/// up a readahead queue.
///
/// This generator will not queue
/// Once the queue has filled up the background thread task will terminate (allowing other
/// I/O tasks to use the thread). Once the queue has been drained enough (specified by
/// q_restart) then the background thread task will be restarted. If q_restart is too low
/// then you may exhaust the queue waiting for the background thread task to start running
/// again. If it is too high then it will be constantly stopping and restarting the
/// background queue task
///
/// This generator is not async-reentrant
///
/// This generator will queue up to max_q blocks
template <typename T>
static Result<AsyncGenerator<T>> MakeBackgroundGenerator(
Iterator<T> iterator, internal::Executor* io_executor) {
auto background_iterator = std::make_shared<BackgroundGenerator<T>>(
std::move(iterator), std::move(io_executor));
return [background_iterator]() { return (*background_iterator)(); };
Iterator<T> iterator, internal::Executor* io_executor,
int max_q = kDefaultBackgroundMaxQ, int q_restart = kDefaultBackgroundQRestart) {
if (max_q < q_restart) {
return Status::Invalid("max_q must be >= q_restart");
}
return BackgroundGenerator<T>(std::move(iterator), io_executor, max_q, q_restart);
}

/// \see MakeGeneratorIterator
Expand Down Expand Up @@ -1185,16 +1278,17 @@ Result<Iterator<T>> MakeGeneratorIterator(AsyncGenerator<T> source) {
template <typename T>
Result<Iterator<T>> MakeReadaheadIterator(Iterator<T> it, int readahead_queue_size) {
ARROW_ASSIGN_OR_RAISE(auto io_executor, internal::ThreadPool::Make(1));
ARROW_ASSIGN_OR_RAISE(auto background_generator,
MakeBackgroundGenerator(std::move(it), io_executor.get()));
auto max_q = readahead_queue_size;
auto q_restart = std::max(1, max_q / 2);
ARROW_ASSIGN_OR_RAISE(
auto background_generator,
MakeBackgroundGenerator(std::move(it), io_executor.get(), max_q, q_restart));
// Capture io_executor to keep it alive as long as owned_bg_generator is still
// referenced
AsyncGenerator<T> owned_bg_generator = [io_executor, background_generator]() {
return background_generator();
};
auto readahead_generator =
MakeReadaheadGenerator(std::move(owned_bg_generator), readahead_queue_size);
return MakeGeneratorIterator(std::move(readahead_generator));
return MakeGeneratorIterator(std::move(owned_bg_generator));
}

} // namespace arrow
Loading