Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,22 @@ Result<ExecBatch> ExecBatch::Make(std::vector<Datum> values) {
return ExecBatch(std::move(values), length);
}

Result<std::shared_ptr<RecordBatch>> ExecBatch::ToRecordBatch(
std::shared_ptr<Schema> schema, MemoryPool* pool) const {
ArrayVector columns(schema->num_fields());

for (size_t i = 0; i < columns.size(); ++i) {
const Datum& value = values[i];
if (value.is_array()) {
columns[i] = value.make_array();
continue;
}
ARROW_ASSIGN_OR_RAISE(columns[i], MakeArrayFromScalar(*value.scalar(), length, pool));
}

return RecordBatch::Make(std::move(schema), length, std::move(columns));
}

namespace {

Result<std::shared_ptr<Buffer>> AllocateDataBuffer(KernelContext* ctx, int64_t length,
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ struct ARROW_EXPORT ExecBatch {

static Result<ExecBatch> Make(std::vector<Datum> values);

Result<std::shared_ptr<RecordBatch>> ToRecordBatch(
std::shared_ptr<Schema> schema, MemoryPool* pool = default_memory_pool()) const;

/// The values representing positional arguments to be passed to a kernel's
/// exec function for processing.
std::vector<Datum> values;
Expand Down
229 changes: 223 additions & 6 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@
#include "arrow/compute/exec/exec_plan.h"

#include <mutex>
#include <thread>
#include <unordered_map>
#include <unordered_set>

#include "arrow/array/util.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/exec/expression.h"
#include "arrow/compute/registry.h"
#include "arrow/datum.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/checked_cast.h"
Expand All @@ -33,6 +38,7 @@
namespace arrow {

using internal::checked_cast;
using internal::checked_pointer_cast;

namespace compute {

Expand Down Expand Up @@ -489,15 +495,23 @@ struct ProjectNode : ExecNode {
};

Result<ExecNode*> MakeProjectNode(ExecNode* input, std::string label,
std::vector<Expression> exprs) {
std::vector<Expression> exprs,
std::vector<std::string> names) {
FieldVector fields(exprs.size());

if (names.size() == 0) {
names.resize(exprs.size());
for (size_t i = 0; i < exprs.size(); ++i) {
names[i] = exprs[i].ToString();
}
}

int i = 0;
for (auto& expr : exprs) {
if (!expr.IsBound()) {
ARROW_ASSIGN_OR_RAISE(expr, expr.Bind(*input->output_schema()));
}
fields[i] = field(expr.ToString(), expr.type());
fields[i] = field(std::move(names[i]), expr.type());
++i;
}

Expand Down Expand Up @@ -552,15 +566,16 @@ struct SinkNode : ExecNode {
++num_received_;
if (num_received_ == emit_stop_) {
lock.unlock();
producer_.Push(std::move(batch));
Finish();
lock.lock();
return;
}

if (emit_stop_ != -1) {
DCHECK_LE(seq_num, emit_stop_);
}
lock.unlock();

lock.unlock();
producer_.Push(std::move(batch));
}

Expand All @@ -574,8 +589,10 @@ struct SinkNode : ExecNode {
void InputFinished(ExecNode* input, int seq_stop) override {
std::unique_lock<std::mutex> lock(mutex_);
emit_stop_ = seq_stop;
lock.unlock();
Finish();
if (num_received_ == emit_stop_) {
lock.unlock();
Finish();
}
}

private:
Expand All @@ -601,5 +618,205 @@ AsyncGenerator<util::optional<ExecBatch>> MakeSinkNode(ExecNode* input,
return out;
}

std::shared_ptr<RecordBatchReader> MakeGeneratorReader(
std::shared_ptr<Schema> schema,
std::function<Future<util::optional<ExecBatch>>()> gen, MemoryPool* pool) {
struct Impl : RecordBatchReader {
std::shared_ptr<Schema> schema() const override { return schema_; }

Status ReadNext(std::shared_ptr<RecordBatch>* record_batch) override {
ARROW_ASSIGN_OR_RAISE(auto batch, iterator_.Next());
if (batch) {
ARROW_ASSIGN_OR_RAISE(*record_batch, batch->ToRecordBatch(schema_, pool_));
} else {
*record_batch = IterationEnd<std::shared_ptr<RecordBatch>>();
}
return Status::OK();
}

MemoryPool* pool_;
std::shared_ptr<Schema> schema_;
Iterator<util::optional<ExecBatch>> iterator_;
};

auto out = std::make_shared<Impl>();
out->pool_ = pool;
out->schema_ = std::move(schema);
out->iterator_ = MakeGeneratorIterator(std::move(gen));
return out;
}

struct ScalarAggregateNode : ExecNode {
ScalarAggregateNode(ExecNode* input, std::string label,
std::shared_ptr<Schema> output_schema,
std::vector<const ScalarAggregateKernel*> kernels,
std::vector<std::vector<std::unique_ptr<KernelState>>> states)
: ExecNode(input->plan(), std::move(label), {input}, {"target"},
/*output_schema=*/std::move(output_schema),
/*num_outputs=*/1),
kernels_(std::move(kernels)),
states_(std::move(states)) {}

const char* kind_name() override { return "ScalarAggregateNode"; }

Status DoConsume(const ExecBatch& batch, size_t thread_index) {
for (size_t i = 0; i < kernels_.size(); ++i) {
KernelContext batch_ctx{plan()->exec_context()};
batch_ctx.SetState(states_[i][thread_index].get());
ExecBatch single_column_batch{{batch.values[i]}, batch.length};
RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, single_column_batch));
}
return Status::OK();
}

void InputReceived(ExecNode* input, int seq, ExecBatch batch) override {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: to implement something like ARROW-12710 (string concat aggregate kernel) we'll need to know the order of inputs in the kernels (or will have to feed results into the kernel in order) - how do we plan to handle that? Passing down seq and having each kernel reorder inputs itself, or perhaps with an upstream ExecNode that orders its inputs? This also applies to the group by node.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seq is not an indication of order, it's only a tag in the range [0, seq_stop) (where seq_stop is set by InputFinished) so we could not use it to order results.

As specified in ARROW-12710, the KernelState of the string concat agg kernel will need to include ordering criteria so that merge(move(state1), &state0) can be guaranteed equivalent to merge(move(state0), &state1). Furthermore, merge cannot actually concatenate anything because if we happened to first merge(move(state0), &state3) we'd have no way to insert state1, state2 in the middle later. Actual concatenation would have to wait for finalize.

Those ordering criteria could be synthesized from (for example) fragment/batch index information, but the presence of O(N) state in a scalar agg kernel's State is suspect to me and I'm not sure it's a great match for ScalarAggregateKernel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks, sorry for the misunderstanding (I need to stop thinking only about datasets).

I suppose it only makes sense to talk about 'order' when directly downstream from a scan or explicit sort, then. And any aggregates that have O(N) state might properly belong as their own ExecNode.

DCHECK_EQ(input, inputs_[0]);

std::unique_lock<std::mutex> lock(mutex_);
auto it =
thread_indices_.emplace(std::this_thread::get_id(), thread_indices_.size()).first;
++num_received_;
auto thread_index = it->second;

lock.unlock();

Status st = DoConsume(std::move(batch), thread_index);
if (!st.ok()) {
outputs_[0]->ErrorReceived(this, std::move(st));
return;
}

lock.lock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lock could probably be removed. We might want to make a note to measure this with micro benchmarks someday. Only one thread should be finishing anyways and the "what state blocks have we used" map could probably be a lock free structure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InputReceived(last batch) might be called concurrently with InputFinished, so those two must synchronize to ensure only one does the finishing. It'd certainly be helpful to introduce less clumsy control flow in these classes

st = MaybeFinish(&lock);
if (!st.ok()) {
outputs_[0]->ErrorReceived(this, std::move(st));
}
}

void ErrorReceived(ExecNode* input, Status error) override {
DCHECK_EQ(input, inputs_[0]);
outputs_[0]->ErrorReceived(this, std::move(error));
}

void InputFinished(ExecNode* input, int seq) override {
DCHECK_EQ(input, inputs_[0]);
std::unique_lock<std::mutex> lock(mutex_);
num_total_ = seq;
Status st = MaybeFinish(&lock);

if (!st.ok()) {
outputs_[0]->ErrorReceived(this, std::move(st));
}
}

Status StartProducing() override {
finished_ = Future<>::Make();
// Scalar aggregates will only output a single batch
outputs_[0]->InputFinished(this, 1);
return Status::OK();
}

void PauseProducing(ExecNode* output) override {}

void ResumeProducing(ExecNode* output) override {}

void StopProducing(ExecNode* output) override {
DCHECK_EQ(output, outputs_[0]);
StopProducing();
}

void StopProducing() override {
inputs_[0]->StopProducing(this);
finished_.MarkFinished();
}

Future<> finished() override { return finished_; }

private:
Status MaybeFinish(std::unique_lock<std::mutex>* lock) {
if (num_received_ != num_total_) return Status::OK();

if (finished_.is_finished()) return Status::OK();

ExecBatch batch{{}, 1};
batch.values.resize(kernels_.size());

for (size_t i = 0; i < kernels_.size(); ++i) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe someday in the future we could merge each kernel on its own thread but that can be for a future PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merging scalar aggregates is pretty trivial so I'd guess we don't gain much with parallelization. Worth investigating in a follow up, though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, In my head "merge" meant something more like a merge sort. I agree, if it's just summing up a sum/mean/etc. counter across the various states then I agree it's not necessary.

KernelContext ctx{plan()->exec_context()};
ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll(
kernels_[i], &ctx, std::move(states_[i])));
RETURN_NOT_OK(kernels_[i]->finalize(&ctx, &batch.values[i]));
}
lock->unlock();

outputs_[0]->InputReceived(this, 0, batch);

finished_.MarkFinished();
return Status::OK();
}

Future<> finished_ = Future<>::MakeFinished();
std::vector<const ScalarAggregateKernel*> kernels_;
std::vector<std::vector<std::unique_ptr<KernelState>>> states_;
std::unordered_map<std::thread::id, size_t> thread_indices_;
std::mutex mutex_;
int num_received_ = 0, num_total_;
};

Result<ExecNode*> MakeScalarAggregateNode(ExecNode* input, std::string label,
std::vector<internal::Aggregate> aggregates) {
if (input->output_schema()->num_fields() != static_cast<int>(aggregates.size())) {
return Status::Invalid("Provided ", aggregates.size(),
" aggregates, expected one for each field of ",
input->output_schema()->ToString());
}

auto exec_ctx = input->plan()->exec_context();

std::vector<const ScalarAggregateKernel*> kernels(aggregates.size());
std::vector<std::vector<std::unique_ptr<KernelState>>> states(kernels.size());
FieldVector fields(kernels.size());

for (size_t i = 0; i < kernels.size(); ++i) {
ARROW_ASSIGN_OR_RAISE(auto function,
exec_ctx->func_registry()->GetFunction(aggregates[i].function));

if (function->kind() != Function::SCALAR_AGGREGATE) {
return Status::Invalid("Provided non ScalarAggregateFunction ",
aggregates[i].function);
}

auto in_type = ValueDescr::Array(input->output_schema()->fields()[i]->type());

ARROW_ASSIGN_OR_RAISE(const Kernel* kernel, function->DispatchExact({in_type}));
kernels[i] = static_cast<const ScalarAggregateKernel*>(kernel);

if (aggregates[i].options == nullptr) {
aggregates[i].options = function->default_options();
}

KernelContext kernel_ctx{exec_ctx};
states[i].resize(exec_ctx->executor() ? exec_ctx->executor()->GetCapacity() : 1);
RETURN_NOT_OK(Kernel::InitAll(&kernel_ctx,
KernelInitArgs{kernels[i],
{
in_type,
},
aggregates[i].options},
&states[i]));

// pick one to resolve the kernel signature
kernel_ctx.SetState(states[i][0].get());
ARROW_ASSIGN_OR_RAISE(
auto descr, kernels[i]->signature->out_type().Resolve(&kernel_ctx, {in_type}));

fields[i] = field(aggregates[i].function, std::move(descr.type));
}

return input->plan()->EmplaceNode<ScalarAggregateNode>(
input, std::move(label), schema(std::move(fields)), std::move(kernels),
std::move(states));
}

} // namespace compute
} // namespace arrow
20 changes: 17 additions & 3 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>
#include <vector>

#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/type_fwd.h"
#include "arrow/type_fwd.h"
Expand Down Expand Up @@ -243,12 +244,19 @@ ExecNode* MakeSourceNode(ExecPlan* plan, std::string label,

/// \brief Add a sink node which forwards to an AsyncGenerator<ExecBatch>
///
/// Emitted batches will not be ordered; instead they will be tagged with the `seq` at
/// which they were received.
/// Emitted batches will not be ordered.
ARROW_EXPORT
std::function<Future<util::optional<ExecBatch>>()> MakeSinkNode(ExecNode* input,
std::string label);

/// \brief Wrap an ExecBatch generator in a RecordBatchReader.
///
/// The RecordBatchReader does not impose any ordering on emitted batches.
ARROW_EXPORT
std::shared_ptr<RecordBatchReader> MakeGeneratorReader(
std::shared_ptr<Schema>, std::function<Future<util::optional<ExecBatch>>()>,
MemoryPool*);

/// \brief Make a node which excludes some rows from batches passed through it
///
/// The filter Expression will be evaluated against each batch which is pushed to
Expand All @@ -265,9 +273,15 @@ Result<ExecNode*> MakeFilterNode(ExecNode* input, std::string label, Expression
/// this node to produce a corresponding output column.
///
/// If exprs are not already bound, they will be bound against the input's schema.
/// If names are not provided, the string representations of exprs will be used.
ARROW_EXPORT
Result<ExecNode*> MakeProjectNode(ExecNode* input, std::string label,
std::vector<Expression> exprs);
std::vector<Expression> exprs,
std::vector<std::string> names = {});

ARROW_EXPORT
Result<ExecNode*> MakeScalarAggregateNode(ExecNode* input, std::string label,
std::vector<internal::Aggregate> aggregates);

} // namespace compute
} // namespace arrow
Loading