Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/examples/arrow/compute_register_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class ExampleNode : public cp::ExecNode {
/*output_schema=*/input->output_schema(), /*num_outputs=*/1) {}

const char* kind_name() const override { return "ExampleNode"; }
const std::vector<int>& ordering() override { return ExecNode::kNoOrdering; }

arrow::Status StartProducing() override {
outputs_[0]->InputFinished(this, 0);
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ if(ARROW_COMPUTE)
compute/exec/bloom_filter.cc
compute/exec/exec_plan.cc
compute/exec/expression.cc
compute/exec/fetch_node.cc
compute/exec/filter_node.cc
compute/exec/hash_join.cc
compute/exec/hash_join_dict.cc
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ ExecBatch::ExecBatch(const RecordBatch& batch)
}

bool ExecBatch::Equals(const ExecBatch& other) const {
return guarantee == other.guarantee && values == other.values;
return index == other.index && guarantee == other.guarantee && values == other.values;
}

void PrintTo(const ExecBatch& batch, std::ostream* os) {
Expand All @@ -83,6 +83,9 @@ void PrintTo(const ExecBatch& batch, std::ostream* os) {
if (batch.guarantee != literal(true)) {
*os << indent << "Guarantee: " << batch.guarantee.ToString() << "\n";
}
if (batch.index != ExecBatch::kNoOrdering) {
*os << indent << "Index: " << batch.index << "\n";
}

int i = 0;
for (const Datum& value : batch.values) {
Expand Down
20 changes: 20 additions & 0 deletions cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,26 @@ struct ARROW_EXPORT ExecBatch {
/// whether any values are Scalar.
int64_t length = 0;

/// Indicates a batch is not part of an ordered stream
static constexpr int32_t kNoOrdering = -1;
/// The index of the exec batch in an ordered stream of batches
///
/// Several operations can impose an ordering on their output. Because
/// batches travel through the execution graph at different speeds there
/// is no guarantee those batches will arrive in the same order they are
/// emitted.
///
/// If there is no ordering then the index should be kNoOrdering. If a node rearranges
/// rows within a batch it will destroy the ordering (e.g. a hash-join node) and should
/// set the index of output batches to kNoOrdering. Other nodes which leave
/// row-in-batch ordering alone should maintain the index on their output batches.
/// Nodes that impose an ordering (e.g. sort) should assign index appropriately.
///
/// An ordering must be monotonic and have no gaps. This can be somewhat tricky to
/// maintain. For example, when filtering, an implementation may need to emit empty
/// batches to maintain correct ordering.
int32_t index = kNoOrdering;

/// \brief The sum of bytes in each buffer referenced by the batch
///
/// Note: Scalars are not counted
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

arrow_install_all_headers("arrow/compute/exec")

add_arrow_compute_test(accumulation_queue PREFIX "arrow-compute" SOURCES accumulation_queue_test.cc)
add_arrow_compute_test(expression_test
PREFIX
"arrow-compute"
Expand Down
87 changes: 87 additions & 0 deletions cpp/src/arrow/compute/exec/accumulation_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,13 @@

#include "arrow/compute/exec/accumulation_queue.h"

#include "arrow/util/future.h"
#include "arrow/util/logging.h"

#include <iterator>
#include <mutex>
#include <optional>
#include <queue>

namespace arrow {
namespace util {
Expand Down Expand Up @@ -54,5 +60,86 @@ void AccumulationQueue::Clear() {
}

ExecBatch& AccumulationQueue::operator[](size_t i) { return batches_[i]; }

struct ExecBatchCmp {
bool operator()(const ExecBatch& left, const ExecBatch& right) {
return left.index > right.index;
}
};

class OrderedAccumulationQueueImpl : public OrderedAccumulationQueue {
public:
OrderedAccumulationQueueImpl(TaskFactoryCallback create_task, ScheduleCallback schedule)
: create_task_(std::move(create_task)), schedule_(std::move(schedule)) {}

~OrderedAccumulationQueueImpl() override = default;

Status InsertBatch(ExecBatch batch) override {
DCHECK_GE(batch.index, 0);
std::unique_lock<std::mutex> lk(mutex_);
if (!processing_ && batch.index == next_index_) {
std::vector<ExecBatch> next_batch = PopUnlocked(std::move(batch));
processing_ = true;
lk.unlock();
return Deliver(std::move(next_batch));
}
batches_.push(std::move(batch));
return Status::OK();
}

Status CheckDrained() const override {
if (!batches_.empty()) {
return Status::UnknownError(
"Ordered accumulation queue has data remaining after finish");
}
return Status::OK();
}

private:
std::vector<ExecBatch> PopUnlocked(std::optional<ExecBatch> batch) {
std::vector<ExecBatch> popped;
if (batch.has_value()) {
popped.push_back(std::move(*batch));
next_index_++;
}
while (!batches_.empty() && batches_.top().index == next_index_) {
popped.push_back(std::move(batches_.top()));
batches_.pop();
next_index_++;
}
return popped;
}

Status Deliver(std::vector<ExecBatch> batches) {
ARROW_ASSIGN_OR_RAISE(Task task, create_task_(std::move(batches)));
Task wrapped_task = [this, task = std::move(task)] {
ARROW_RETURN_NOT_OK(task());
std::unique_lock<std::mutex> lk(mutex_);
if (!batches_.empty() && batches_.top().index == next_index_) {
std::vector<ExecBatch> next_batches = PopUnlocked(std::nullopt);
lk.unlock();
ARROW_RETURN_NOT_OK(Deliver(std::move(next_batches)));
} else {
processing_ = false;
}
return Status::OK();
};
return schedule_(std::move(wrapped_task));
}

TaskFactoryCallback create_task_;
ScheduleCallback schedule_;
std::priority_queue<ExecBatch, std::vector<ExecBatch>, ExecBatchCmp> batches_;
int next_index_ = 0;
bool processing_ = false;
std::mutex mutex_;
};

std::unique_ptr<OrderedAccumulationQueue> OrderedAccumulationQueue::Make(
TaskFactoryCallback create_task, ScheduleCallback schedule) {
return std::make_unique<OrderedAccumulationQueueImpl>(std::move(create_task),
std::move(schedule));
}

} // namespace util
} // namespace arrow
58 changes: 58 additions & 0 deletions cpp/src/arrow/compute/exec/accumulation_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,63 @@ class AccumulationQueue {
std::vector<ExecBatch> batches_;
};

/// \brief Sequences data and allows for algorithms relying on ordered execution
///
/// The ordered execution queue will buffer data. Typically, it is used when
/// there is an ordering in place, and we can assume the stream is roughly in
/// order, even though it may be quite jittery. For example, if we are scanning
/// a dataset that is ordered by some column then the ordered accumulation queue
/// can be used even though a parallel dataset scan wouldn't neccesarily produce
/// a perfectly ordered stream due to jittery I/O.
///
/// The downstream side of the queue is broken into two parts. The first part,
/// which should be relatively fast, runs serially, and creates tasks. The second
/// part, which can be slower, will run these tasks in parallel.
///
/// For example, if we are doing an ordered group by operation then the serial part
/// will scan the batches to find group boundaries (places where the key value changes)
/// and will slice the input into groups. The second part will actually run the
/// aggregations on the groups and then call the downstream nodes.
///
/// This node is currently implemented as a pipeline breaker in the sense that it creates
/// new thread tasks. Each downstream task (the slower part) will be run as a new thread
/// task (submitted via the scheduling callback). A more sophisticated implementation
/// could probably be created that only breaks the pipeline when a batch arrives out of
/// order.
class OrderedAccumulationQueue {
public:
using Task = std::function<Status()>;
using TaskFactoryCallback = std::function<Result<Task>(std::vector<ExecBatch>)>;
using ScheduleCallback = std::function<Status(Task)>;

virtual ~OrderedAccumulationQueue() = default;

/// \brief Insert a new batch into the queue
/// \param batch The batch to insert
///
/// If the batch is the next batch in the sequence then a new task will be created from
/// all available batches and submitted to the scheduler.
virtual Status InsertBatch(ExecBatch batch) = 0;
/// \brief Ensure the queue has been fully drained
///
/// If a caller expects to process all data (e.g. not something like a fetch node) then
/// the caller should call this to ensure that all batches have been processed. This is
/// a sanity check to help detect bugs which produce streams of batches with gaps in the
/// sequencing index and is not strictly needed.
virtual Status CheckDrained() const = 0;

/// \brief Create a new ordered accumulation queue
/// \param create_task The callback to use when a new task needs to be created
///
/// This callback will run serially and will never be called reentrantly. It will
/// be given a vector of batches and those batches will be in sequence-order.
///
/// Ideally this callback should be as quick as possible, doing only the work that
/// needs to be truly serialized. The returned task will then be scheduled.
/// \param schedule The callback to use to schedule a new task
static std::unique_ptr<OrderedAccumulationQueue> Make(TaskFactoryCallback create_task,
ScheduleCallback schedule);
};

} // namespace util
} // namespace arrow
104 changes: 104 additions & 0 deletions cpp/src/arrow/compute/exec/accumulation_queue_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/exec/accumulation_queue.h"

#include <gtest/gtest.h>

#include <condition_variable>
#include <memory>
#include <mutex>
#include <random>
#include <thread>

#include "arrow/testing/gtest_util.h"

namespace arrow {
namespace util {

TEST(AccumulationQueue, Basic) {
constexpr int kNumBatches = 1000;
constexpr int kNumIters = 100;
constexpr int kRandomSeed = 42;

for (int i = 0; i < kNumIters; i++) {
std::vector<ExecBatch> collected(kNumBatches);
int num_seen = 0;
int num_active_tasks = 0;
std::mutex task_counter_mutex;
std::condition_variable task_counter_cv;

OrderedAccumulationQueue::TaskFactoryCallback create_collect_task =
[&](std::vector<ExecBatch> batches) {
int start = num_seen;
num_seen += static_cast<int>(batches.size());
return [&, start, batches = std::move(batches)]() {
std::move(batches.begin(), batches.end(), collected.begin() + start);
return Status::OK();
};
};

std::vector<std::thread> threads;

OrderedAccumulationQueue::ScheduleCallback schedule =
[&](OrderedAccumulationQueue::Task task) {
std::lock_guard<std::mutex> lk(task_counter_mutex);
num_active_tasks++;
threads.emplace_back([&, task = std::move(task)] {
ASSERT_OK(task());
std::lock_guard<std::mutex> lk(task_counter_mutex);
if (--num_active_tasks == 0) {
task_counter_cv.notify_one();
}
});
return Status::OK();
};

std::unique_ptr<OrderedAccumulationQueue> ordered_queue =
OrderedAccumulationQueue::Make(std::move(create_collect_task),
std::move(schedule));

std::vector<ExecBatch> test_batches(kNumBatches);
for (int i = 0; i < kNumBatches; i++) {
test_batches[i].index = i;
}

std::default_random_engine gen(kRandomSeed);
std::shuffle(test_batches.begin(), test_batches.end(), gen);

for (auto& batch : test_batches) {
ASSERT_OK(ordered_queue->InsertBatch(std::move(batch)));
}

std::unique_lock<std::mutex> lk(task_counter_mutex);
task_counter_cv.wait(lk, [&] { return num_active_tasks == 0; });

for (auto& thread : threads) {
thread.join();
}

ASSERT_OK(ordered_queue->CheckDrained());
ASSERT_EQ(kNumBatches, static_cast<int>(collected.size()));

for (int i = 0; i < kNumBatches; i++) {
ASSERT_EQ(i, collected[i].index);
}
}
}

} // namespace util
} // namespace arrow
11 changes: 10 additions & 1 deletion cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ class ScalarAggregateNode : public ExecNode {

const char* kind_name() const override { return "ScalarAggregateNode"; }

// There is currently no meaningful ordering to the output of the scalar aggregate
// although in the future we may want to allow sorting here since we will have already
// gathered all the data
const std::vector<int32_t>& ordering() override { return ExecNode::kNoOrdering; }

Status DoConsume(const ExecSpan& batch, size_t thread_index) {
util::tracing::Span span;
START_COMPUTE_SPAN(span, "Consume",
Expand Down Expand Up @@ -211,7 +216,7 @@ class ScalarAggregateNode : public ExecNode {

void StopProducing(ExecNode* output) override {
DCHECK_EQ(output, outputs_[0]);
StopProducing();
inputs_[0]->StopProducing(this);
}

void StopProducing() override {
Expand Down Expand Up @@ -360,6 +365,10 @@ class GroupByNode : public ExecNode {

const char* kind_name() const override { return "GroupByNode"; }

// There is currently no ordering assigned to the output although we may want
// to consider a future addition to allow ordering by grouping keys
const std::vector<int32_t>& ordering() override { return ExecNode::kNoOrdering; }

Status Consume(ExecSpan batch) {
util::tracing::Span span;
START_COMPUTE_SPAN(span, "Consume",
Expand Down
6 changes: 5 additions & 1 deletion cpp/src/arrow/compute/exec/asof_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,8 @@ class AsofJoinNode : public ExecNode {
return indices_of_by_key_;
}

const std::vector<int>& ordering() override { return indices_of_on_key_; }

static Status is_valid_on_field(const std::shared_ptr<Field>& field) {
switch (field->type()->id()) {
case Type::INT8:
Expand Down Expand Up @@ -1114,7 +1116,9 @@ class AsofJoinNode : public ExecNode {
void ResumeProducing(ExecNode* output, int32_t counter) override {}
void StopProducing(ExecNode* output) override {
DCHECK_EQ(output, outputs_[0]);
StopProducing();
for (auto input : inputs_) {
input->StopProducing(this);
}
}
void StopProducing() override {
process_.Clear();
Expand Down
Loading