Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion cpp/src/arrow/array/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,11 @@ int64_t ArrayData::GetNullCount() const {
void ArraySpan::SetMembers(const ArrayData& data) {
this->type = data.type.get();
this->length = data.length;
this->null_count = data.null_count.load();
if (this->type->id() == Type::NA) {
this->null_count = this->length;
} else {
this->null_count = data.null_count.load();
}
this->offset = data.offset;

for (int i = 0; i < static_cast<int>(data.buffers.size()); ++i) {
Expand Down
15 changes: 8 additions & 7 deletions cpp/src/arrow/array/data.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "arrow/buffer.h"
#include "arrow/result.h"
#include "arrow/type.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/macros.h"
#include "arrow/util/visibility.h"
Expand Down Expand Up @@ -351,14 +352,14 @@ struct ARROW_EXPORT ArraySpan {
}
}

void AddOffset(int64_t offset) {
this->offset += offset;
this->null_count = kUnknownNullCount;
}

void SetOffset(int64_t offset) {
void SetSlice(int64_t offset, int64_t length) {
this->offset = offset;
this->null_count = kUnknownNullCount;
this->length = length;
if (this->type->id() != Type::NA) {
this->null_count = kUnknownNullCount;
} else {
this->null_count = this->length;
}
}

/// \brief Return null count, or compute and set it if it's not known
Expand Down
142 changes: 22 additions & 120 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,107 +219,8 @@ void ComputeDataPreallocate(const DataType& type,

namespace detail {

ExecBatchIterator::ExecBatchIterator(std::vector<Datum> args, int64_t length,
int64_t max_chunksize)
: args_(std::move(args)),
position_(0),
length_(length),
max_chunksize_(max_chunksize) {
chunk_indexes_.resize(args_.size(), 0);
chunk_positions_.resize(args_.size(), 0);
}

Result<std::unique_ptr<ExecBatchIterator>> ExecBatchIterator::Make(
std::vector<Datum> args, int64_t max_chunksize) {
for (const auto& arg : args) {
if (!(arg.is_arraylike() || arg.is_scalar())) {
return Status::Invalid(
"ExecBatchIterator only works with Scalar, Array, and "
"ChunkedArray arguments");
}
}

int64_t length = -1;
bool length_set = false;
for (auto& arg : args) {
if (arg.is_scalar()) {
continue;
}
if (!length_set) {
length = arg.length();
length_set = true;
} else {
if (arg.length() != length) {
return Status::Invalid("Array arguments must all be the same length");
}
}
}

if (!length_set) {
// All scalar case, to be removed soon
length = 1;
}

max_chunksize = std::min(length, max_chunksize);

return std::unique_ptr<ExecBatchIterator>(
new ExecBatchIterator(std::move(args), length, max_chunksize));
}

bool ExecBatchIterator::Next(ExecBatch* batch) {
if (position_ == length_) {
return false;
}

// Determine how large the common contiguous "slice" of all the arguments is
int64_t iteration_size = std::min(length_ - position_, max_chunksize_);

// If length_ is 0, then this loop will never execute
for (size_t i = 0; i < args_.size() && iteration_size > 0; ++i) {
// If the argument is not a chunked array, it's either a Scalar or Array,
// in which case it doesn't influence the size of this batch. Note that if
// the args are all scalars the batch length is 1
if (args_[i].kind() != Datum::CHUNKED_ARRAY) {
continue;
}
const ChunkedArray& arg = *args_[i].chunked_array();
std::shared_ptr<Array> current_chunk;
while (true) {
current_chunk = arg.chunk(chunk_indexes_[i]);
if (chunk_positions_[i] == current_chunk->length()) {
// Chunk is zero-length, or was exhausted in the previous iteration
chunk_positions_[i] = 0;
++chunk_indexes_[i];
continue;
}
break;
}
iteration_size =
std::min(current_chunk->length() - chunk_positions_[i], iteration_size);
}

// Now, fill the batch
batch->values.resize(args_.size());
batch->length = iteration_size;
for (size_t i = 0; i < args_.size(); ++i) {
if (args_[i].is_scalar()) {
batch->values[i] = args_[i].scalar();
} else if (args_[i].is_array()) {
batch->values[i] = args_[i].array()->Slice(position_, iteration_size);
} else {
const ChunkedArray& carr = *args_[i].chunked_array();
const auto& chunk = carr.chunk(chunk_indexes_[i]);
batch->values[i] = chunk->data()->Slice(chunk_positions_[i], iteration_size);
chunk_positions_[i] += iteration_size;
}
}
position_ += iteration_size;
DCHECK_LE(position_, length_);
return true;
}

// ----------------------------------------------------------------------
// ExecSpanIterator; to eventually replace ExecBatchIterator
// ExecSpanIterator

namespace {

Expand Down Expand Up @@ -348,7 +249,8 @@ bool CheckIfAllScalar(const ExecBatch& batch) {

} // namespace

Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize) {
Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize,
bool promote_if_all_scalars) {
if (batch.num_values() > 0) {
// Validate arguments
bool all_args_same_length = false;
Expand All @@ -363,6 +265,7 @@ Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize) {
args_ = &batch.values;
initialized_ = have_chunked_arrays_ = false;
have_all_scalars_ = CheckIfAllScalar(batch);
promote_if_all_scalars_ = promote_if_all_scalars;
position_ = 0;
length_ = batch.length;
chunk_indexes_.clear();
Expand Down Expand Up @@ -443,7 +346,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
}
}

if (have_all_scalars_) {
if (have_all_scalars_ && promote_if_all_scalars_) {
PromoteExecSpanScalars(span);
}

Expand All @@ -465,8 +368,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
const Datum& arg = args_->at(i);
if (!arg.is_scalar()) {
ArraySpan* arr = &span->values[i].array;
arr->length = iteration_size;
arr->SetOffset(value_positions_[i] + value_offsets_[i]);
arr->SetSlice(value_positions_[i] + value_offsets_[i], iteration_size);
value_positions_[i] += iteration_size;
}
}
Expand Down Expand Up @@ -858,11 +760,12 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
// Populate and then reuse the ArraySpan inside
output_span->SetMembers(*preallocation);
output_span->offset = 0;
int64_t result_offset = 0;
while (span_iterator_.Next(&input)) {
// Set absolute output span position and length
output_span->length = input.length;
output_span->SetSlice(result_offset, input.length);
RETURN_NOT_OK(ExecuteSingleSpan(input, &output));
output_span->SetOffset(span_iterator_.position());
result_offset = span_iterator_.position();
}

// Kernel execution is complete; emit result
Expand Down Expand Up @@ -1138,19 +1041,15 @@ class ScalarAggExecutor : public KernelExecutorImpl<ScalarAggregateKernel> {
return KernelExecutorImpl<ScalarAggregateKernel>::Init(ctx, args);
}

Status Execute(const ExecBatch& args, ExecListener* listener) override {
return ExecuteImpl(args.values, listener);
}

Status ExecuteImpl(const std::vector<Datum>& args, ExecListener* listener) {
ARROW_ASSIGN_OR_RAISE(
batch_iterator_, ExecBatchIterator::Make(args, exec_context()->exec_chunksize()));
Status Execute(const ExecBatch& batch, ExecListener* listener) override {
RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize(),
/*promote_if_all_scalars=*/false));

ExecBatch batch;
while (batch_iterator_->Next(&batch)) {
ExecSpan span;
while (span_iterator_.Next(&span)) {
// TODO: implement parallelism
if (batch.length > 0) {
RETURN_NOT_OK(Consume(batch));
if (span.length > 0) {
RETURN_NOT_OK(Consume(span));
}
}

Expand All @@ -1167,7 +1066,10 @@ class ScalarAggExecutor : public KernelExecutorImpl<ScalarAggregateKernel> {
}

private:
Status Consume(const ExecBatch& batch) {
Status Consume(const ExecSpan& span) {
// TODO(wesm): this is odd and should be examined soon -- only one state
// "should" be needed per thread of execution

// FIXME(ARROW-11840) don't merge *any* aggegates for every batch
ARROW_ASSIGN_OR_RAISE(auto batch_state,
kernel_->init(kernel_ctx_, {kernel_, *input_types_, options_}));
Expand All @@ -1179,12 +1081,12 @@ class ScalarAggExecutor : public KernelExecutorImpl<ScalarAggregateKernel> {
KernelContext batch_ctx(exec_context());
batch_ctx.SetState(batch_state.get());

RETURN_NOT_OK(kernel_->consume(&batch_ctx, batch));
RETURN_NOT_OK(kernel_->consume(&batch_ctx, span));
RETURN_NOT_OK(kernel_->merge(kernel_ctx_, std::move(*batch_state), state()));
return Status::OK();
}

std::unique_ptr<ExecBatchIterator> batch_iterator_;
ExecSpanIterator span_iterator_;
const std::vector<TypeHolder>* input_types_;
const FunctionOptions* options_;
};
Expand Down
34 changes: 15 additions & 19 deletions cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ struct ARROW_EXPORT ExecBatch {
/// case, it would have scalar rows with length greater than 1.
///
/// If the array values are of length 0 then the length is 0 regardless of
/// whether any values are Scalar. In general ExecBatch objects are produced
/// by ExecBatchIterator which by design does not yield length-0 batches.
/// whether any values are Scalar.
int64_t length = 0;

/// \brief The sum of bytes in each buffer referenced by the batch
Expand Down Expand Up @@ -253,7 +252,7 @@ inline bool operator==(const ExecBatch& l, const ExecBatch& r) { return l.Equals
inline bool operator!=(const ExecBatch& l, const ExecBatch& r) { return !l.Equals(r); }

struct ExecValue {
ArraySpan array;
ArraySpan array = {};
const Scalar* scalar = NULLPTR;

ExecValue(Scalar* scalar) // NOLINT implicit conversion
Expand Down Expand Up @@ -373,22 +372,6 @@ struct ARROW_EXPORT ExecSpan {
return values[i];
}

void AddOffset(int64_t offset) {
for (ExecValue& value : values) {
if (value.is_array()) {
value.array.AddOffset(offset);
}
}
}

void SetOffset(int64_t offset) {
for (ExecValue& value : values) {
if (value.is_array()) {
value.array.SetOffset(offset);
}
}
}

/// \brief A convenience for the number of values / arguments.
int num_values() const { return static_cast<int>(values.size()); }

Expand All @@ -400,6 +383,19 @@ struct ARROW_EXPORT ExecSpan {
return result;
}

ExecBatch ToExecBatch() const {
ExecBatch result;
result.length = this->length;
for (const ExecValue& value : this->values) {
if (value.is_array()) {
result.values.push_back(value.array.ToArrayData());
} else {
result.values.push_back(value.scalar->GetSharedPtr());
}
}
return result;
}

int64_t length = 0;
std::vector<ExecValue> values;
};
Expand Down
31 changes: 15 additions & 16 deletions cpp/src/arrow/compute/exec/aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,12 @@ Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Dat
std::vector<std::vector<std::unique_ptr<KernelState>>> states;
FieldVector out_fields;

using arrow::compute::detail::ExecBatchIterator;
std::unique_ptr<ExecBatchIterator> argument_batch_iterator;
using arrow::compute::detail::ExecSpanIterator;
ExecSpanIterator argument_iterator;

ExecBatch args_batch;
if (!arguments.empty()) {
ARROW_ASSIGN_OR_RAISE(ExecBatch args_batch, ExecBatch::Make(arguments));
ARROW_ASSIGN_OR_RAISE(args_batch, ExecBatch::Make(arguments));

// Construct and initialize HashAggregateKernels
auto argument_types = args_batch.GetTypes();
Expand All @@ -129,9 +130,7 @@ Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Dat
ARROW_ASSIGN_OR_RAISE(
out_fields, ResolveKernels(aggregates, kernels, states[0], ctx, argument_types));

ARROW_ASSIGN_OR_RAISE(
argument_batch_iterator,
ExecBatchIterator::Make(args_batch.values, ctx->exec_chunksize()));
RETURN_NOT_OK(argument_iterator.Init(args_batch, ctx->exec_chunksize()));
}

// Construct Groupers
Expand All @@ -151,15 +150,13 @@ Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Dat
out_fields.push_back(field("key_" + std::to_string(i++), key_type.GetSharedPtr()));
}

ARROW_ASSIGN_OR_RAISE(
auto key_batch_iterator,
ExecBatchIterator::Make(keys_batch.values, ctx->exec_chunksize()));
ExecSpanIterator key_iterator;
RETURN_NOT_OK(key_iterator.Init(keys_batch, ctx->exec_chunksize()));

// start "streaming" execution
ExecBatch key_batch, argument_batch;
while ((argument_batch_iterator == NULLPTR ||
argument_batch_iterator->Next(&argument_batch)) &&
key_batch_iterator->Next(&key_batch)) {
ExecSpan key_batch, argument_batch;
while ((arguments.empty() || argument_iterator.Next(&argument_batch)) &&
key_iterator.Next(&key_batch)) {
if (key_batch.length == 0) continue;

task_group->Append([&, key_batch, argument_batch] {
Expand All @@ -180,9 +177,10 @@ Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Dat
for (size_t i = 0; i < kernels.size(); ++i) {
KernelContext batch_ctx{ctx};
batch_ctx.SetState(states[thread_index][i].get());
ARROW_ASSIGN_OR_RAISE(auto batch, ExecBatch::Make({argument_batch[i], id_batch}));
ExecSpan kernel_batch({argument_batch[i], *id_batch.array()},
argument_batch.length);
RETURN_NOT_OK(kernels[i]->resize(&batch_ctx, grouper->num_groups()));
RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, batch));
RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, kernel_batch));
}

return Status::OK();
Expand All @@ -194,7 +192,8 @@ Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Dat
// Merge if necessary
for (size_t thread_index = 1; thread_index < thread_ids.size(); ++thread_index) {
ARROW_ASSIGN_OR_RAISE(ExecBatch other_keys, groupers[thread_index]->GetUniques());
ARROW_ASSIGN_OR_RAISE(Datum transposition, groupers[0]->Consume(other_keys));
ARROW_ASSIGN_OR_RAISE(Datum transposition,
groupers[0]->Consume(ExecSpan(other_keys)));
groupers[thread_index].reset();

for (size_t idx = 0; idx < kernels.size(); ++idx) {
Expand Down
Loading