Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ if(ARROW_COMPUTE)
compute/exec/bloom_filter.cc
compute/exec/exec_plan.cc
compute/exec/expression.cc
compute/exec/fetch_node.cc
compute/exec/filter_node.cc
compute/exec/hash_join.cc
compute/exec/hash_join_dict.cc
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ class ARROW_EXPORT SelectionVector {
const int32_t* indices_;
};

/// An index to represent that a batch does not belong to an ordered stream
constexpr int64_t kUnsequencedIndex = -1;

/// \brief A unit of work for kernel execution. It contains a collection of
/// Array and Scalar values and an optional SelectionVector indicating that
/// there is an unmaterialized filter that either must be materialized, or (if
Expand Down Expand Up @@ -209,6 +212,12 @@ struct ARROW_EXPORT ExecBatch {
/// whether any values are Scalar.
int64_t length = 0;

/// \brief index of this batch in a sorted stream of batches
///
/// This index must be strictly monotonic starting at 0 without gaps or
/// it can be set to kUnsequencedIndex if there is no meaningful order
int64_t index = kUnsequencedIndex;

/// \brief The sum of bytes in each buffer referenced by the batch
///
/// Note: Scalars are not counted
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/compute/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ add_arrow_compute_test(plan_test
"arrow-compute"
SOURCES
plan_test.cc
test_nodes_test.cc
test_nodes.cc)
add_arrow_compute_test(fetch_node_test
PREFIX
"arrow-compute"
SOURCES
fetch_node_test.cc
test_nodes.cc)
add_arrow_compute_test(hash_join_node_test
PREFIX
Expand Down
112 changes: 112 additions & 0 deletions cpp/src/arrow/compute/exec/accumulation_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
#include "arrow/compute/exec/accumulation_queue.h"

#include <iterator>
#include <mutex>
#include <queue>
#include <vector>

#include "arrow/util/logging.h"

namespace arrow {
namespace util {
Expand Down Expand Up @@ -54,5 +59,112 @@ void AccumulationQueue::Clear() {
}

ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }

namespace {

struct LowestBatchIndexAtTop {
bool operator()(const ExecBatch& left, const ExecBatch& right) const {
return left.index > right.index;
}
};

class SequencingQueueImpl : public SequencingQueue {
public:
explicit SequencingQueueImpl(Processor* processor) : processor_(processor) {}

Status InsertBatch(ExecBatch batch) override {
std::unique_lock lk(mutex_);
if (batch.index == next_index_) {
return DeliverNextUnlocked(std::move(batch), std::move(lk));
}
queue_.emplace(std::move(batch));
return Status::OK();
}

private:
Status DeliverNextUnlocked(ExecBatch batch, std::unique_lock<std::mutex>&& lk) {
// Should be able to detect and avoid this at plan construction
DCHECK_NE(batch.index, ::arrow::compute::kUnsequencedIndex)
<< "attempt to use a sequencing queue on an unsequenced stream of batches";
std::vector<Task> tasks;
next_index_++;
ARROW_ASSIGN_OR_RAISE(std::optional<Task> this_task,
processor_->Process(std::move(batch)));
while (!queue_.empty() && next_index_ == queue_.top().index) {
ARROW_ASSIGN_OR_RAISE(std::optional<Task> task, processor_->Process(queue_.top()));
if (task) {
tasks.push_back(std::move(*task));
}
queue_.pop();
next_index_++;
}
lk.unlock();
// Schedule tasks for stale items
for (auto& task : tasks) {
processor_->Schedule(std::move(task));
}
// Run the current item immediately
if (this_task) {
ARROW_RETURN_NOT_OK(std::move(*this_task)());
}
return Status::OK();
}

Processor* processor_;

std::priority_queue<ExecBatch, std::vector<ExecBatch>, LowestBatchIndexAtTop> queue_;
int next_index_ = 0;
std::mutex mutex_;
};

class SerialSequencingQueueImpl : public SerialSequencingQueue {
public:
explicit SerialSequencingQueueImpl(Processor* processor) : processor_(processor) {}

Status InsertBatch(ExecBatch batch) override {
std::unique_lock lk(mutex_);
queue_.push(std::move(batch));
if (queue_.top().index == next_index_ && !is_processing_) {
is_processing_ = true;
return DoProcess(std::move(lk));
}
return Status::OK();
}

private:
Status DoProcess(std::unique_lock<std::mutex>&& lk) {
while (!queue_.empty() && queue_.top().index == next_index_) {
ExecBatch next(queue_.top());
queue_.pop();
next_index_++;
lk.unlock();
// ARROW_RETURN_NOT_OK may return early here. In that case is_processing_ will
// never switch to false so no other threads can process but that should be ok
// since we failed anyways. It is important however, that we do not hold the lock.
ARROW_RETURN_NOT_OK(processor_->Process(std::move(next)));
lk.lock();
}
is_processing_ = false;
return Status::OK();
}

Processor* processor_;

std::mutex mutex_;
std::priority_queue<ExecBatch, std::vector<ExecBatch>, LowestBatchIndexAtTop> queue_;
int next_index_ = 0;
bool is_processing_ = false;
};

} // namespace

std::unique_ptr<SequencingQueue> SequencingQueue::Make(Processor* processor) {
return std::make_unique<SequencingQueueImpl>(processor);
}

std::unique_ptr<SerialSequencingQueue> SerialSequencingQueue::Make(Processor* processor) {
return std::make_unique<SerialSequencingQueueImpl>(processor);
}

} // namespace util
} // namespace arrow
100 changes: 100 additions & 0 deletions cpp/src/arrow/compute/exec/accumulation_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
#pragma once

#include <cstdint>
#include <functional>
#include <optional>
#include <vector>

#include "arrow/compute/exec.h"
#include "arrow/result.h"

namespace arrow {
namespace util {
Expand Down Expand Up @@ -53,5 +56,102 @@ class AccumulationQueue {
std::vector<ExecBatch> batches_;
};

/// A queue that sequences incoming batches
///
/// This can be used when a node needs to do some kind of ordered processing on
/// the stream.
///
/// Batches can be inserted in any order. The process_callback will be called on
/// the batches, in order, without reentrant calls. For this reason the callback
/// should be quick.
///
/// For example, in a top-n node, the process callback should determine how many
/// rows need to be delivered for the given batch, and then return a task to actually
/// deliver those rows.
class SequencingQueue {
public:
using Task = std::function<Status()>;

/// Strategy that describes how to handle items
class Processor {
public:
/// Process the batch, potentially generating a task
///
/// This method will be called on each batch in order. Calls to this method
/// will be serialized and it will not be called reentrantly. This makes it
/// safe to do things that rely on order but minimal time should be spent here
/// to avoid becoming a bottlneck.
///
/// \return a follow-up task that will be scheduled. The follow-up task(s) are
/// is not guaranteed to run in any particular order. If nullopt is
/// returned then nothing will be scheduled.
virtual Result<std::optional<Task>> Process(ExecBatch batch) = 0;
/// Schedule a task
virtual void Schedule(Task task) = 0;
};

virtual ~SequencingQueue() = default;

/// Insert a batch into the queue
///
/// This will insert the batch into the queue. If this batch was the next batch
/// to deliver then this will trigger 1+ calls to the process callback to generate
/// 1+ tasks.
///
/// The task generated by this call will be executed immediately. The remaining
/// tasks will be scheduled using the schedule callback.
///
/// From a data pipeline perspective the sequencing queue is a "sometimes" breaker. If
/// a task arrives in order then this call will usually execute the downstream pipeline.
/// If this task arrives early then this call will only queue the data.
virtual Status InsertBatch(ExecBatch batch) = 0;

/// Create a queue
/// \param processor describes how to process the batches, must outlive the queue
static std::unique_ptr<SequencingQueue> Make(Processor* processor);
};

/// A queue that sequences incoming batches
///
/// Unlike SequencingQueue the Process method is not expected to schedule new tasks.
///
/// If a batch arrives and another thread is currently processing then the batch
/// will be queued and control will return. In other words, delivery of batches will
/// not block on the Process method.
///
/// It can be helpful to think of this as if a dedicated thread is running Process as
/// batches arrive
class SerialSequencingQueue {
public:
/// Strategy that describes how to handle items
class Processor {
public:
/// Process the batch
///
/// This method will be called on each batch in order. Calls to this method
/// will be serialized and it will not be called reentrantly. This makes it
/// safe to do things that rely on order.
///
/// If this falls behind then data may accumulate
///
/// TODO: Could add backpressure if needed but right now all uses of this should
/// be pretty fast and so are unlikely to block.
virtual Status Process(ExecBatch batch) = 0;
};

virtual ~SerialSequencingQueue() = default;

/// Insert a batch into the queue
///
/// This will insert the batch into the queue. If this batch was the next batch
/// to deliver then this may trigger calls to the processor which will be run
/// as part of this call.
virtual Status InsertBatch(ExecBatch batch) = 0;

/// Create a queue
/// \param processor describes how to process the batches, must outlive the queue
static std::unique_ptr<SerialSequencingQueue> Make(Processor* processor);
};

} // namespace util
} // namespace arrow
Loading