diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index d3f28758d9a..0cfa9fcd2e1 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -138,7 +138,11 @@ int64_t ArrayData::GetNullCount() const { void ArraySpan::SetMembers(const ArrayData& data) { this->type = data.type.get(); this->length = data.length; - this->null_count = data.null_count.load(); + if (this->type->id() == Type::NA) { + this->null_count = this->length; + } else { + this->null_count = data.null_count.load(); + } this->offset = data.offset; for (int i = 0; i < static_cast(data.buffers.size()); ++i) { diff --git a/cpp/src/arrow/array/data.h b/cpp/src/arrow/array/data.h index 78643ae14a8..dde66ac79c4 100644 --- a/cpp/src/arrow/array/data.h +++ b/cpp/src/arrow/array/data.h @@ -25,6 +25,7 @@ #include "arrow/buffer.h" #include "arrow/result.h" +#include "arrow/type.h" #include "arrow/util/bit_util.h" #include "arrow/util/macros.h" #include "arrow/util/visibility.h" @@ -351,14 +352,14 @@ struct ARROW_EXPORT ArraySpan { } } - void AddOffset(int64_t offset) { - this->offset += offset; - this->null_count = kUnknownNullCount; - } - - void SetOffset(int64_t offset) { + void SetSlice(int64_t offset, int64_t length) { this->offset = offset; - this->null_count = kUnknownNullCount; + this->length = length; + if (this->type->id() != Type::NA) { + this->null_count = kUnknownNullCount; + } else { + this->null_count = this->length; + } } /// \brief Return null count, or compute and set it if it's not known diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 4dc5cdc5429..cf91bada6c6 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -219,107 +219,8 @@ void ComputeDataPreallocate(const DataType& type, namespace detail { -ExecBatchIterator::ExecBatchIterator(std::vector args, int64_t length, - int64_t max_chunksize) - : args_(std::move(args)), - position_(0), - length_(length), - max_chunksize_(max_chunksize) { - chunk_indexes_.resize(args_.size(), 0); - chunk_positions_.resize(args_.size(), 0); -} - -Result> ExecBatchIterator::Make( - std::vector args, int64_t max_chunksize) { - for (const auto& arg : args) { - if (!(arg.is_arraylike() || arg.is_scalar())) { - return Status::Invalid( - "ExecBatchIterator only works with Scalar, Array, and " - "ChunkedArray arguments"); - } - } - - int64_t length = -1; - bool length_set = false; - for (auto& arg : args) { - if (arg.is_scalar()) { - continue; - } - if (!length_set) { - length = arg.length(); - length_set = true; - } else { - if (arg.length() != length) { - return Status::Invalid("Array arguments must all be the same length"); - } - } - } - - if (!length_set) { - // All scalar case, to be removed soon - length = 1; - } - - max_chunksize = std::min(length, max_chunksize); - - return std::unique_ptr( - new ExecBatchIterator(std::move(args), length, max_chunksize)); -} - -bool ExecBatchIterator::Next(ExecBatch* batch) { - if (position_ == length_) { - return false; - } - - // Determine how large the common contiguous "slice" of all the arguments is - int64_t iteration_size = std::min(length_ - position_, max_chunksize_); - - // If length_ is 0, then this loop will never execute - for (size_t i = 0; i < args_.size() && iteration_size > 0; ++i) { - // If the argument is not a chunked array, it's either a Scalar or Array, - // in which case it doesn't influence the size of this batch. Note that if - // the args are all scalars the batch length is 1 - if (args_[i].kind() != Datum::CHUNKED_ARRAY) { - continue; - } - const ChunkedArray& arg = *args_[i].chunked_array(); - std::shared_ptr current_chunk; - while (true) { - current_chunk = arg.chunk(chunk_indexes_[i]); - if (chunk_positions_[i] == current_chunk->length()) { - // Chunk is zero-length, or was exhausted in the previous iteration - chunk_positions_[i] = 0; - ++chunk_indexes_[i]; - continue; - } - break; - } - iteration_size = - std::min(current_chunk->length() - chunk_positions_[i], iteration_size); - } - - // Now, fill the batch - batch->values.resize(args_.size()); - batch->length = iteration_size; - for (size_t i = 0; i < args_.size(); ++i) { - if (args_[i].is_scalar()) { - batch->values[i] = args_[i].scalar(); - } else if (args_[i].is_array()) { - batch->values[i] = args_[i].array()->Slice(position_, iteration_size); - } else { - const ChunkedArray& carr = *args_[i].chunked_array(); - const auto& chunk = carr.chunk(chunk_indexes_[i]); - batch->values[i] = chunk->data()->Slice(chunk_positions_[i], iteration_size); - chunk_positions_[i] += iteration_size; - } - } - position_ += iteration_size; - DCHECK_LE(position_, length_); - return true; -} - // ---------------------------------------------------------------------- -// ExecSpanIterator; to eventually replace ExecBatchIterator +// ExecSpanIterator namespace { @@ -348,7 +249,8 @@ bool CheckIfAllScalar(const ExecBatch& batch) { } // namespace -Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize) { +Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize, + bool promote_if_all_scalars) { if (batch.num_values() > 0) { // Validate arguments bool all_args_same_length = false; @@ -363,6 +265,7 @@ Status ExecSpanIterator::Init(const ExecBatch& batch, int64_t max_chunksize) { args_ = &batch.values; initialized_ = have_chunked_arrays_ = false; have_all_scalars_ = CheckIfAllScalar(batch); + promote_if_all_scalars_ = promote_if_all_scalars; position_ = 0; length_ = batch.length; chunk_indexes_.clear(); @@ -443,7 +346,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) { } } - if (have_all_scalars_) { + if (have_all_scalars_ && promote_if_all_scalars_) { PromoteExecSpanScalars(span); } @@ -465,8 +368,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) { const Datum& arg = args_->at(i); if (!arg.is_scalar()) { ArraySpan* arr = &span->values[i].array; - arr->length = iteration_size; - arr->SetOffset(value_positions_[i] + value_offsets_[i]); + arr->SetSlice(value_positions_[i] + value_offsets_[i], iteration_size); value_positions_[i] += iteration_size; } } @@ -858,11 +760,12 @@ class ScalarExecutor : public KernelExecutorImpl { // Populate and then reuse the ArraySpan inside output_span->SetMembers(*preallocation); output_span->offset = 0; + int64_t result_offset = 0; while (span_iterator_.Next(&input)) { // Set absolute output span position and length - output_span->length = input.length; + output_span->SetSlice(result_offset, input.length); RETURN_NOT_OK(ExecuteSingleSpan(input, &output)); - output_span->SetOffset(span_iterator_.position()); + result_offset = span_iterator_.position(); } // Kernel execution is complete; emit result @@ -1138,19 +1041,15 @@ class ScalarAggExecutor : public KernelExecutorImpl { return KernelExecutorImpl::Init(ctx, args); } - Status Execute(const ExecBatch& args, ExecListener* listener) override { - return ExecuteImpl(args.values, listener); - } - - Status ExecuteImpl(const std::vector& args, ExecListener* listener) { - ARROW_ASSIGN_OR_RAISE( - batch_iterator_, ExecBatchIterator::Make(args, exec_context()->exec_chunksize())); + Status Execute(const ExecBatch& batch, ExecListener* listener) override { + RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize(), + /*promote_if_all_scalars=*/false)); - ExecBatch batch; - while (batch_iterator_->Next(&batch)) { + ExecSpan span; + while (span_iterator_.Next(&span)) { // TODO: implement parallelism - if (batch.length > 0) { - RETURN_NOT_OK(Consume(batch)); + if (span.length > 0) { + RETURN_NOT_OK(Consume(span)); } } @@ -1167,7 +1066,10 @@ class ScalarAggExecutor : public KernelExecutorImpl { } private: - Status Consume(const ExecBatch& batch) { + Status Consume(const ExecSpan& span) { + // TODO(wesm): this is odd and should be examined soon -- only one state + // "should" be needed per thread of execution + // FIXME(ARROW-11840) don't merge *any* aggegates for every batch ARROW_ASSIGN_OR_RAISE(auto batch_state, kernel_->init(kernel_ctx_, {kernel_, *input_types_, options_})); @@ -1179,12 +1081,12 @@ class ScalarAggExecutor : public KernelExecutorImpl { KernelContext batch_ctx(exec_context()); batch_ctx.SetState(batch_state.get()); - RETURN_NOT_OK(kernel_->consume(&batch_ctx, batch)); + RETURN_NOT_OK(kernel_->consume(&batch_ctx, span)); RETURN_NOT_OK(kernel_->merge(kernel_ctx_, std::move(*batch_state), state())); return Status::OK(); } - std::unique_ptr batch_iterator_; + ExecSpanIterator span_iterator_; const std::vector* input_types_; const FunctionOptions* options_; }; diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index f0b951dccb8..cdd3daf7f74 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -209,8 +209,7 @@ struct ARROW_EXPORT ExecBatch { /// case, it would have scalar rows with length greater than 1. /// /// If the array values are of length 0 then the length is 0 regardless of - /// whether any values are Scalar. In general ExecBatch objects are produced - /// by ExecBatchIterator which by design does not yield length-0 batches. + /// whether any values are Scalar. int64_t length = 0; /// \brief The sum of bytes in each buffer referenced by the batch @@ -253,7 +252,7 @@ inline bool operator==(const ExecBatch& l, const ExecBatch& r) { return l.Equals inline bool operator!=(const ExecBatch& l, const ExecBatch& r) { return !l.Equals(r); } struct ExecValue { - ArraySpan array; + ArraySpan array = {}; const Scalar* scalar = NULLPTR; ExecValue(Scalar* scalar) // NOLINT implicit conversion @@ -373,22 +372,6 @@ struct ARROW_EXPORT ExecSpan { return values[i]; } - void AddOffset(int64_t offset) { - for (ExecValue& value : values) { - if (value.is_array()) { - value.array.AddOffset(offset); - } - } - } - - void SetOffset(int64_t offset) { - for (ExecValue& value : values) { - if (value.is_array()) { - value.array.SetOffset(offset); - } - } - } - /// \brief A convenience for the number of values / arguments. int num_values() const { return static_cast(values.size()); } @@ -400,6 +383,19 @@ struct ARROW_EXPORT ExecSpan { return result; } + ExecBatch ToExecBatch() const { + ExecBatch result; + result.length = this->length; + for (const ExecValue& value : this->values) { + if (value.is_array()) { + result.values.push_back(value.array.ToArrayData()); + } else { + result.values.push_back(value.scalar->GetSharedPtr()); + } + } + return result; + } + int64_t length = 0; std::vector values; }; diff --git a/cpp/src/arrow/compute/exec/aggregate.cc b/cpp/src/arrow/compute/exec/aggregate.cc index 5cb9a9c5633..cc2c464d42b 100644 --- a/cpp/src/arrow/compute/exec/aggregate.cc +++ b/cpp/src/arrow/compute/exec/aggregate.cc @@ -110,11 +110,12 @@ Result GroupBy(const std::vector& arguments, const std::vector>> states; FieldVector out_fields; - using arrow::compute::detail::ExecBatchIterator; - std::unique_ptr argument_batch_iterator; + using arrow::compute::detail::ExecSpanIterator; + ExecSpanIterator argument_iterator; + ExecBatch args_batch; if (!arguments.empty()) { - ARROW_ASSIGN_OR_RAISE(ExecBatch args_batch, ExecBatch::Make(arguments)); + ARROW_ASSIGN_OR_RAISE(args_batch, ExecBatch::Make(arguments)); // Construct and initialize HashAggregateKernels auto argument_types = args_batch.GetTypes(); @@ -129,9 +130,7 @@ Result GroupBy(const std::vector& arguments, const std::vectorexec_chunksize())); + RETURN_NOT_OK(argument_iterator.Init(args_batch, ctx->exec_chunksize())); } // Construct Groupers @@ -151,15 +150,13 @@ Result GroupBy(const std::vector& arguments, const std::vectorexec_chunksize())); + ExecSpanIterator key_iterator; + RETURN_NOT_OK(key_iterator.Init(keys_batch, ctx->exec_chunksize())); // start "streaming" execution - ExecBatch key_batch, argument_batch; - while ((argument_batch_iterator == NULLPTR || - argument_batch_iterator->Next(&argument_batch)) && - key_batch_iterator->Next(&key_batch)) { + ExecSpan key_batch, argument_batch; + while ((arguments.empty() || argument_iterator.Next(&argument_batch)) && + key_iterator.Next(&key_batch)) { if (key_batch.length == 0) continue; task_group->Append([&, key_batch, argument_batch] { @@ -180,9 +177,10 @@ Result GroupBy(const std::vector& arguments, const std::vectorresize(&batch_ctx, grouper->num_groups())); - RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, batch)); + RETURN_NOT_OK(kernels[i]->consume(&batch_ctx, kernel_batch)); } return Status::OK(); @@ -194,7 +192,8 @@ Result GroupBy(const std::vector& arguments, const std::vectorGetUniques()); - ARROW_ASSIGN_OR_RAISE(Datum transposition, groupers[0]->Consume(other_keys)); + ARROW_ASSIGN_OR_RAISE(Datum transposition, + groupers[0]->Consume(ExecSpan(other_keys))); groupers[thread_index].reset(); for (size_t idx = 0; idx < kernels.size(); ++idx) { diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index 96aa56b80cb..cca266ad691 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -137,7 +137,7 @@ class ScalarAggregateNode : public ExecNode { const char* kind_name() const override { return "ScalarAggregateNode"; } - Status DoConsume(const ExecBatch& batch, size_t thread_index) { + Status DoConsume(const ExecSpan& batch, size_t thread_index) { util::tracing::Span span; START_COMPUTE_SPAN(span, "Consume", {{"aggregate", ToStringExtra()}, @@ -153,7 +153,7 @@ class ScalarAggregateNode : public ExecNode { KernelContext batch_ctx{plan()->exec_context()}; batch_ctx.SetState(states_[i][thread_index].get()); - ExecBatch single_column_batch{{batch.values[target_field_ids_[i]]}, batch.length}; + ExecSpan single_column_batch{{batch.values[target_field_ids_[i]]}, batch.length}; RETURN_NOT_OK(kernels_[i]->consume(&batch_ctx, single_column_batch)); } return Status::OK(); @@ -170,7 +170,7 @@ class ScalarAggregateNode : public ExecNode { auto thread_index = plan_->GetThreadIndex(); - if (ErrorIfNotOk(DoConsume(std::move(batch), thread_index))) return; + if (ErrorIfNotOk(DoConsume(ExecSpan(batch), thread_index))) return; if (input_counter_.Increment()) { ErrorIfNotOk(Finish()); @@ -360,7 +360,7 @@ class GroupByNode : public ExecNode { const char* kind_name() const override { return "GroupByNode"; } - Status Consume(ExecBatch batch) { + Status Consume(ExecSpan batch) { util::tracing::Span span; START_COMPUTE_SPAN(span, "Consume", {{"group_by", ToStringExtra()}, @@ -376,11 +376,11 @@ class GroupByNode : public ExecNode { RETURN_NOT_OK(InitLocalStateIfNeeded(state)); // Create a batch with key columns - std::vector keys(key_field_ids_.size()); + std::vector keys(key_field_ids_.size()); for (size_t i = 0; i < key_field_ids_.size(); ++i) { - keys[i] = batch.values[key_field_ids_[i]]; + keys[i] = batch[key_field_ids_[i]]; } - ExecBatch key_batch(std::move(keys), batch.length); + ExecSpan key_batch(std::move(keys), batch.length); // Create a batch with group ids ARROW_ASSIGN_OR_RAISE(Datum id_batch, state->grouper->Consume(key_batch)); @@ -396,10 +396,8 @@ class GroupByNode : public ExecNode { KernelContext kernel_ctx{ctx_}; kernel_ctx.SetState(state->agg_states[i].get()); - ARROW_ASSIGN_OR_RAISE( - auto agg_batch, - ExecBatch::Make({batch.values[agg_src_field_ids_[i]], id_batch})); - + ExecSpan agg_batch({batch[agg_src_field_ids_[i]], ExecValue(*id_batch.array())}, + batch.length); RETURN_NOT_OK(agg_kernels_[i]->resize(&kernel_ctx, state->grouper->num_groups())); RETURN_NOT_OK(agg_kernels_[i]->consume(&kernel_ctx, agg_batch)); } @@ -419,7 +417,8 @@ class GroupByNode : public ExecNode { } ARROW_ASSIGN_OR_RAISE(ExecBatch other_keys, state->grouper->GetUniques()); - ARROW_ASSIGN_OR_RAISE(Datum transposition, state0->grouper->Consume(other_keys)); + ARROW_ASSIGN_OR_RAISE(Datum transposition, + state0->grouper->Consume(ExecSpan(other_keys))); state->grouper.reset(); for (size_t i = 0; i < agg_kernels_.size(); ++i) { @@ -515,7 +514,7 @@ class GroupByNode : public ExecNode { DCHECK_EQ(input, inputs_[0]); - if (ErrorIfNotOk(Consume(std::move(batch)))) return; + if (ErrorIfNotOk(Consume(ExecSpan(batch)))) return; if (input_counter_.Increment()) { ErrorIfNotOk(OutputResult()); diff --git a/cpp/src/arrow/compute/exec_internal.h b/cpp/src/arrow/compute/exec_internal.h index afca289c20e..8beff2a6c63 100644 --- a/cpp/src/arrow/compute/exec_internal.h +++ b/cpp/src/arrow/compute/exec_internal.h @@ -39,39 +39,6 @@ static constexpr int64_t kDefaultMaxChunksize = std::numeric_limits::ma namespace detail { -/// \brief Break std::vector into a sequence of ExecBatch for kernel -/// execution -class ARROW_EXPORT ExecBatchIterator { - public: - /// \brief Construct iterator and do basic argument validation - /// - /// \param[in] args the Datum argument, must be all array-like or scalar - /// \param[in] max_chunksize the maximum length of each ExecBatch. Depending - /// on the chunk layout of ChunkedArray. - static Result> Make( - std::vector args, int64_t max_chunksize = kDefaultMaxChunksize); - - /// \brief Compute the next batch. Always returns at least one batch. Return - /// false if the iterator is exhausted - bool Next(ExecBatch* batch); - - int64_t length() const { return length_; } - - int64_t position() const { return position_; } - - int64_t max_chunksize() const { return max_chunksize_; } - - private: - ExecBatchIterator(std::vector args, int64_t length, int64_t max_chunksize); - - std::vector args_; - std::vector chunk_indexes_; - std::vector chunk_positions_; - int64_t position_; - int64_t length_; - int64_t max_chunksize_; -}; - /// \brief Break std::vector into a sequence of non-owning /// ExecSpan for kernel execution. The lifetime of the Datum vector /// must be longer than the lifetime of this object @@ -84,7 +51,11 @@ class ARROW_EXPORT ExecSpanIterator { /// \param[in] batch the input ExecBatch /// \param[in] max_chunksize the maximum length of each ExecSpan. Depending /// on the chunk layout of ChunkedArray. - Status Init(const ExecBatch& batch, int64_t max_chunksize = kDefaultMaxChunksize); + /// \param[in] promote_if_all_scalars if all of the values are scalars, + /// return them in each ExecSpan as ArraySpan of length 1. This must be set + /// to true for Scalar and Vector executors but false for Aggregators + Status Init(const ExecBatch& batch, int64_t max_chunksize = kDefaultMaxChunksize, + bool promote_if_all_scalars = true); /// \brief Compute the next span by updating the state of the /// previous span object. You must keep passing in the previous @@ -110,6 +81,7 @@ class ARROW_EXPORT ExecSpanIterator { bool initialized_ = false; bool have_chunked_arrays_ = false; bool have_all_scalars_ = false; + bool promote_if_all_scalars_ = true; const std::vector* args_; std::vector chunk_indexes_; std::vector value_positions_; diff --git a/cpp/src/arrow/compute/exec_test.cc b/cpp/src/arrow/compute/exec_test.cc index 573f4aee4a0..c31309da931 100644 --- a/cpp/src/arrow/compute/exec_test.cc +++ b/cpp/src/arrow/compute/exec_test.cc @@ -658,137 +658,6 @@ TEST_F(TestPropagateNullsSpans, NullOutputTypeNoop) { ASSERT_EQ(nullptr, result.buffers[0].data); } -// ---------------------------------------------------------------------- -// ExecBatchIterator - -class TestExecBatchIterator : public TestComputeInternals { - public: - void SetupIterator(std::vector args, - int64_t max_chunksize = kDefaultMaxChunksize) { - ASSERT_OK_AND_ASSIGN(iterator_, - ExecBatchIterator::Make(std::move(args), max_chunksize)); - } - void CheckIteration(const std::vector& args, int chunksize, - const std::vector& ex_batch_sizes) { - SetupIterator(args, chunksize); - ExecBatch batch; - int64_t position = 0; - for (size_t i = 0; i < ex_batch_sizes.size(); ++i) { - ASSERT_EQ(position, iterator_->position()); - ASSERT_TRUE(iterator_->Next(&batch)); - ASSERT_EQ(ex_batch_sizes[i], batch.length); - - for (size_t j = 0; j < args.size(); ++j) { - switch (args[j].kind()) { - case Datum::SCALAR: - ASSERT_TRUE(args[j].scalar()->Equals(batch[j].scalar())); - break; - case Datum::ARRAY: - AssertArraysEqual(*args[j].make_array()->Slice(position, batch.length), - *batch[j].make_array()); - break; - case Datum::CHUNKED_ARRAY: { - const ChunkedArray& carr = *args[j].chunked_array(); - if (batch.length == 0) { - ASSERT_EQ(0, carr.length()); - } else { - auto arg_slice = carr.Slice(position, batch.length); - // The sliced ChunkedArrays should only ever be 1 chunk - ASSERT_EQ(1, arg_slice->num_chunks()); - AssertArraysEqual(*arg_slice->chunk(0), *batch[j].make_array()); - } - } break; - default: - break; - } - } - position += ex_batch_sizes[i]; - } - // Ensure that the iterator is exhausted - ASSERT_FALSE(iterator_->Next(&batch)); - - ASSERT_EQ(iterator_->length(), iterator_->position()); - } - - protected: - std::unique_ptr iterator_; -}; - -TEST_F(TestExecBatchIterator, Basics) { - const int64_t length = 100; - - // Simple case with a single chunk - std::vector args = {Datum(GetInt32Array(length)), Datum(GetFloat64Array(length)), - Datum(std::make_shared(3))}; - SetupIterator(args); - - ExecBatch batch; - ASSERT_TRUE(iterator_->Next(&batch)); - ASSERT_EQ(3, batch.values.size()); - ASSERT_EQ(3, batch.num_values()); - ASSERT_EQ(length, batch.length); - - std::vector types = batch.GetTypes(); - ASSERT_EQ(types[0], int32()); - ASSERT_EQ(types[1], float64()); - ASSERT_EQ(types[2], int32()); - - AssertArraysEqual(*args[0].make_array(), *batch[0].make_array()); - AssertArraysEqual(*args[1].make_array(), *batch[1].make_array()); - ASSERT_TRUE(args[2].scalar()->Equals(batch[2].scalar())); - - ASSERT_EQ(length, iterator_->position()); - ASSERT_FALSE(iterator_->Next(&batch)); - - // Split into chunks of size 16 - CheckIteration(args, /*chunksize=*/16, {16, 16, 16, 16, 16, 16, 4}); -} - -TEST_F(TestExecBatchIterator, InputValidation) { - std::vector args = {Datum(GetInt32Array(10)), Datum(GetInt32Array(9))}; - ASSERT_RAISES(Invalid, ExecBatchIterator::Make(args)); - - args = {Datum(GetInt32Array(9)), Datum(GetInt32Array(10))}; - ASSERT_RAISES(Invalid, ExecBatchIterator::Make(args)); - - args = {Datum(GetInt32Array(10))}; - ASSERT_OK_AND_ASSIGN(auto iterator, ExecBatchIterator::Make(args)); - ASSERT_EQ(10, iterator->max_chunksize()); -} - -TEST_F(TestExecBatchIterator, ChunkedArrays) { - std::vector args = {Datum(GetInt32Chunked({0, 20, 10})), - Datum(GetInt32Chunked({15, 15})), Datum(GetInt32Array(30)), - Datum(std::make_shared(5)), - Datum(MakeNullScalar(boolean()))}; - - CheckIteration(args, /*chunksize=*/10, {10, 5, 5, 10}); - CheckIteration(args, /*chunksize=*/20, {15, 5, 10}); - CheckIteration(args, /*chunksize=*/30, {15, 5, 10}); -} - -TEST_F(TestExecBatchIterator, ZeroLengthInputs) { - auto carr = std::shared_ptr(new ChunkedArray({}, int32())); - - auto CheckArgs = [&](const std::vector& args) { - auto iterator = ExecBatchIterator::Make(args).ValueOrDie(); - ExecBatch batch; - ASSERT_FALSE(iterator->Next(&batch)); - }; - - // Zero-length ChunkedArray with zero chunks - std::vector args = {Datum(carr)}; - CheckArgs(args); - - // Zero-length array - args = {Datum(GetInt32Array(0))}; - CheckArgs(args); - - // ChunkedArray with single empty chunk - args = {Datum(GetInt32Chunked({0}))}; - CheckArgs(args); -} - // ---------------------------------------------------------------------- // ExecSpanIterator tests diff --git a/cpp/src/arrow/compute/function_benchmark.cc b/cpp/src/arrow/compute/function_benchmark.cc index 791052358e7..c7850b841c1 100644 --- a/cpp/src/arrow/compute/function_benchmark.cc +++ b/cpp/src/arrow/compute/function_benchmark.cc @@ -174,11 +174,13 @@ void BM_ExecuteScalarKernelOnScalar(benchmark::State& state) { state.SetItemsProcessed(state.iterations() * N); } -void BM_ExecBatchIterator(benchmark::State& state) { - // Measure overhead related to splitting ExecBatch into smaller ExecBatches - // for parallelism or more optimal CPU cache affinity +void BM_ExecSpanIterator(benchmark::State& state) { + // Measure overhead related to splitting ExecBatch into smaller non-owning + // ExecSpans for parallelism or more optimal CPU cache affinity random::RandomArrayGenerator rag(kSeed); + using ::arrow::compute::detail::ExecSpanIterator; + const int64_t length = 1 << 20; const int num_fields = 32; @@ -187,22 +189,24 @@ void BM_ExecBatchIterator(benchmark::State& state) { args[i] = rag.Int64(length, 0, 100)->data(); } + ExecBatch batch(args, length); + const int64_t blocksize = state.range(0); + ExecSpanIterator it; for (auto _ : state) { - std::unique_ptr it = - *detail::ExecBatchIterator::Make(args, blocksize); - ExecBatch batch; - while (it->Next(&batch)) { + ABORT_NOT_OK(it.Init(batch, blocksize)); + ExecSpan span; + while (it.Next(&span)) { for (int i = 0; i < num_fields; ++i) { - auto data = batch.values[i].array()->buffers[1]->data(); + auto data = span[i].array.buffers[1].data; benchmark::DoNotOptimize(data); } } - benchmark::DoNotOptimize(batch); + benchmark::DoNotOptimize(span); } // Provides comparability across blocksizes by looking at the iterations per // second. So 1000 iterations/second means that input splitting associated - // with ExecBatchIterator takes up 1ms every time. + // with ExecSpanIterator takes up 1ms every time. state.SetItemsProcessed(state.iterations()); } @@ -211,7 +215,7 @@ BENCHMARK(BM_CastDispatchBaseline); BENCHMARK(BM_AddDispatch); BENCHMARK(BM_ExecuteScalarFunctionOnScalar); BENCHMARK(BM_ExecuteScalarKernelOnScalar); -BENCHMARK(BM_ExecBatchIterator)->RangeMultiplier(4)->Range(1024, 64 * 1024); +BENCHMARK(BM_ExecSpanIterator)->RangeMultiplier(4)->Range(1024, 64 * 1024); } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/function_test.cc b/cpp/src/arrow/compute/function_test.cc index 94daa6baa96..ea151e81f0b 100644 --- a/cpp/src/arrow/compute/function_test.cc +++ b/cpp/src/arrow/compute/function_test.cc @@ -153,7 +153,7 @@ TEST(FunctionOptions, Equality) { } } -struct ExecBatch; +struct ExecSpan; TEST(Arity, Basics) { auto nullary = Arity::Nullary(); @@ -310,10 +310,8 @@ Result> NoopInit(KernelContext*, const KernelInitAr return nullptr; } -Status NoopConsume(KernelContext*, const ExecBatch&) { return Status::OK(); } -Status NoopMerge(KernelContext*, const KernelState&, KernelState*) { - return Status::OK(); -} +Status NoopConsume(KernelContext*, const ExecSpan&) { return Status::OK(); } +Status NoopMerge(KernelContext*, KernelState&&, KernelState*) { return Status::OK(); } Status NoopFinalize(KernelContext*, Datum*) { return Status::OK(); } TEST(ScalarAggregateFunction, DispatchExact) { diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 5463a2de579..d8960308dff 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -258,7 +258,7 @@ class ARROW_EXPORT OutputType { /// /// This function SHOULD _not_ be used to check for arity, that is to be /// performed one or more layers above. - typedef Result (*Resolver)(KernelContext*, const std::vector&); + using Resolver = Result (*)(KernelContext*, const std::vector&); /// \brief Output an exact type OutputType(std::shared_ptr type) // NOLINT implicit construction @@ -500,7 +500,7 @@ struct Kernel { /// endeavor to write into pre-allocated memory if they are able, though for /// some kernels (e.g. in cases when a builder like StringBuilder) must be /// employed this may not be possible. -typedef Status (*ArrayKernelExec)(KernelContext*, const ExecSpan&, ExecResult*); +using ArrayKernelExec = Status (*)(KernelContext*, const ExecSpan&, ExecResult*); /// \brief Kernel data structure for implementations of ScalarFunction. In /// addition to the members found in Kernel, contains the null handling @@ -548,7 +548,7 @@ struct VectorKernel : public Kernel { /// \brief Function for executing a stateful VectorKernel against a /// ChunkedArray input. Does not need to be defined for all VectorKernels - typedef Status (*ChunkedExec)(KernelContext*, const ExecBatch&, Datum* out); + using ChunkedExec = Status (*)(KernelContext*, const ExecBatch&, Datum* out); VectorKernel() = default; @@ -609,20 +609,17 @@ struct VectorKernel : public Kernel { // ---------------------------------------------------------------------- // ScalarAggregateKernel (for ScalarAggregateFunction) -using ScalarAggregateConsume = std::function; - -using ScalarAggregateMerge = - std::function; - +using ScalarAggregateConsume = Status (*)(KernelContext*, const ExecSpan&); +using ScalarAggregateMerge = Status (*)(KernelContext*, KernelState&&, KernelState*); // Finalize returns Datum to permit multiple return values -using ScalarAggregateFinalize = std::function; +using ScalarAggregateFinalize = Status (*)(KernelContext*, Datum*); /// \brief Kernel data structure for implementations of /// ScalarAggregateFunction. The four necessary components of an aggregation /// kernel are the init, consume, merge, and finalize functions. /// /// * init: creates a new KernelState for a kernel. -/// * consume: processes an ExecBatch and updates the KernelState found in the +/// * consume: processes an ExecSpan and updates the KernelState found in the /// KernelContext. /// * merge: combines one KernelState with another. /// * finalize: produces the end result of the aggregation using the @@ -634,16 +631,16 @@ struct ScalarAggregateKernel : public Kernel { ScalarAggregateConsume consume, ScalarAggregateMerge merge, ScalarAggregateFinalize finalize) : Kernel(std::move(sig), std::move(init)), - consume(std::move(consume)), - merge(std::move(merge)), - finalize(std::move(finalize)) {} + consume(consume), + merge(merge), + finalize(finalize) {} ScalarAggregateKernel(std::vector in_types, OutputType out_type, KernelInit init, ScalarAggregateConsume consume, ScalarAggregateMerge merge, ScalarAggregateFinalize finalize) : ScalarAggregateKernel( KernelSignature::Make(std::move(in_types), std::move(out_type)), - std::move(init), std::move(consume), std::move(merge), std::move(finalize)) {} + std::move(init), consume, merge, finalize) {} /// \brief Merge a vector of KernelStates into a single KernelState. /// The merged state will be returned and will be set on the KernelContext. @@ -659,15 +656,12 @@ struct ScalarAggregateKernel : public Kernel { // ---------------------------------------------------------------------- // HashAggregateKernel (for HashAggregateFunction) -using HashAggregateResize = std::function; - -using HashAggregateConsume = std::function; - -using HashAggregateMerge = - std::function; +using HashAggregateResize = Status (*)(KernelContext*, int64_t); +using HashAggregateConsume = Status (*)(KernelContext*, const ExecSpan&); +using HashAggregateMerge = Status (*)(KernelContext*, KernelState&&, const ArrayData&); // Finalize returns Datum to permit multiple return values -using HashAggregateFinalize = std::function; +using HashAggregateFinalize = Status (*)(KernelContext*, Datum*); /// \brief Kernel data structure for implementations of /// HashAggregateFunction. The four necessary components of an aggregation @@ -675,7 +669,7 @@ using HashAggregateFinalize = std::function; /// /// * init: creates a new KernelState for a kernel. /// * resize: ensure that the KernelState can accommodate the specified number of groups. -/// * consume: processes an ExecBatch (which includes the argument as well +/// * consume: processes an ExecSpan (which includes the argument as well /// as an array of group identifiers) and updates the KernelState found in the /// KernelContext. /// * merge: combines one KernelState with another. @@ -688,10 +682,10 @@ struct HashAggregateKernel : public Kernel { HashAggregateResize resize, HashAggregateConsume consume, HashAggregateMerge merge, HashAggregateFinalize finalize) : Kernel(std::move(sig), std::move(init)), - resize(std::move(resize)), - consume(std::move(consume)), - merge(std::move(merge)), - finalize(std::move(finalize)) {} + resize(resize), + consume(consume), + merge(merge), + finalize(finalize) {} HashAggregateKernel(std::vector in_types, OutputType out_type, KernelInit init, HashAggregateConsume consume, @@ -699,8 +693,7 @@ struct HashAggregateKernel : public Kernel { HashAggregateFinalize finalize) : HashAggregateKernel( KernelSignature::Make(std::move(in_types), std::move(out_type)), - std::move(init), std::move(resize), std::move(consume), std::move(merge), - std::move(finalize)) {} + std::move(init), resize, consume, merge, finalize) {} HashAggregateResize resize; HashAggregateConsume consume; diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic.cc b/cpp/src/arrow/compute/kernels/aggregate_basic.cc index fec483318ef..400ccbdf9f6 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_basic.cc @@ -30,7 +30,7 @@ namespace internal { namespace { -Status AggregateConsume(KernelContext* ctx, const ExecBatch& batch) { +Status AggregateConsume(KernelContext* ctx, const ExecSpan& batch) { return checked_cast(ctx->state())->Consume(ctx, batch); } @@ -71,16 +71,16 @@ namespace { struct CountImpl : public ScalarAggregator { explicit CountImpl(CountOptions options) : options(std::move(options)) {} - Status Consume(KernelContext*, const ExecBatch& batch) override { + Status Consume(KernelContext*, const ExecSpan& batch) override { if (options.mode == CountOptions::ALL) { this->non_nulls += batch.length; } else if (batch[0].is_array()) { - const ArrayData& input = *batch[0].array(); + const ArraySpan& input = batch[0].array; const int64_t nulls = input.GetNullCount(); this->nulls += nulls; this->non_nulls += input.length - nulls; } else { - const Scalar& input = *batch[0].scalar(); + const Scalar& input = *batch[0].scalar; this->nulls += !input.is_valid * batch.length; this->non_nulls += input.is_valid * batch.length; } @@ -133,9 +133,9 @@ struct CountDistinctImpl : public ScalarAggregator { explicit CountDistinctImpl(MemoryPool* memory_pool, CountOptions options) : options(std::move(options)), memo_table_(new MemoTable(memory_pool, 0)) {} - Status Consume(KernelContext*, const ExecBatch& batch) override { + Status Consume(KernelContext*, const ExecSpan& batch) override { if (batch[0].is_array()) { - const ArrayData& arr = *batch[0].array(); + const ArraySpan& arr = batch[0].array; this->has_nulls = arr.GetNullCount() > 0; auto visit_null = []() { return Status::OK(); }; @@ -144,9 +144,8 @@ struct CountDistinctImpl : public ScalarAggregator { return memo_table_->GetOrInsert(arg, &y); }; RETURN_NOT_OK(VisitArraySpanInline(arr, visit_value, visit_null)); - } else { - const Scalar& input = *batch[0].scalar(); + const Scalar& input = *batch[0].scalar; this->has_nulls = !input.is_valid; if (input.is_valid) { @@ -156,7 +155,6 @@ struct CountDistinctImpl : public ScalarAggregator { } this->non_nulls = memo_table_->size(); - return Status::OK(); } @@ -292,11 +290,11 @@ struct ProductImpl : public ScalarAggregator { product(MultiplyTraits::one(*out_type)), nulls_observed(false) {} - Status Consume(KernelContext*, const ExecBatch& batch) override { + Status Consume(KernelContext*, const ExecSpan& batch) override { if (batch[0].is_array()) { - const auto& data = batch[0].array(); - this->count += data->length - data->GetNullCount(); - this->nulls_observed = this->nulls_observed || data->GetNullCount(); + const ArraySpan& data = batch[0].array; + this->count += data.length - data.GetNullCount(); + this->nulls_observed = this->nulls_observed || data.GetNullCount(); if (!options.skip_nulls && this->nulls_observed) { // Short-circuit @@ -304,14 +302,14 @@ struct ProductImpl : public ScalarAggregator { } internal::VisitArrayValuesInline( - *data, + data, [&](typename TypeTraits::CType value) { this->product = MultiplyTraits::Multiply(*out_type, this->product, value); }, [] {}); } else { - const auto& data = *batch[0].scalar(); + const Scalar& data = *batch[0].scalar; this->count += data.is_valid * batch.length; this->nulls_observed = this->nulls_observed || !data.is_valid; if (data.is_valid) { @@ -461,23 +459,24 @@ void AddMinOrMaxAggKernel(ScalarAggregateFunction* func, struct BooleanAnyImpl : public ScalarAggregator { explicit BooleanAnyImpl(ScalarAggregateOptions options) : options(std::move(options)) {} - Status Consume(KernelContext*, const ExecBatch& batch) override { + Status Consume(KernelContext*, const ExecSpan& batch) override { // short-circuit if seen a True already if (this->any == true && this->count >= options.min_count) { return Status::OK(); } if (batch[0].is_scalar()) { - const auto& scalar = *batch[0].scalar(); + const Scalar& scalar = *batch[0].scalar; this->has_nulls = !scalar.is_valid; this->any = scalar.is_valid && checked_cast(scalar).value; this->count += scalar.is_valid; return Status::OK(); } - const auto& data = *batch[0].array(); + const ArraySpan& data = batch[0].array; this->has_nulls = data.GetNullCount() > 0; this->count += data.length - data.GetNullCount(); arrow::internal::OptionalBinaryBitBlockCounter counter( - data.buffers[0], data.offset, data.buffers[1], data.offset, data.length); + data.buffers[0].data, data.offset, data.buffers[1].data, data.offset, + data.length); int64_t position = 0; while (position < data.length) { const auto block = counter.NextAndBlock(); @@ -527,7 +526,7 @@ Result> AnyInit(KernelContext*, const KernelInitArg struct BooleanAllImpl : public ScalarAggregator { explicit BooleanAllImpl(ScalarAggregateOptions options) : options(std::move(options)) {} - Status Consume(KernelContext*, const ExecBatch& batch) override { + Status Consume(KernelContext*, const ExecSpan& batch) override { // short-circuit if seen a false already if (this->all == false && this->count >= options.min_count) { return Status::OK(); @@ -537,17 +536,18 @@ struct BooleanAllImpl : public ScalarAggregator { return Status::OK(); } if (batch[0].is_scalar()) { - const auto& scalar = *batch[0].scalar(); + const Scalar& scalar = *batch[0].scalar; this->has_nulls = !scalar.is_valid; this->count += scalar.is_valid; this->all = !scalar.is_valid || checked_cast(scalar).value; return Status::OK(); } - const auto& data = *batch[0].array(); + const ArraySpan& data = batch[0].array; this->has_nulls = data.GetNullCount() > 0; this->count += data.length - data.GetNullCount(); arrow::internal::OptionalBinaryBitBlockCounter counter( - data.buffers[1], data.offset, data.buffers[0], data.offset, data.length); + data.buffers[1].data, data.offset, data.buffers[0].data, data.offset, + data.length); int64_t position = 0; while (position < data.length) { const auto block = counter.NextOrNotBlock(); @@ -605,7 +605,7 @@ struct IndexImpl : public ScalarAggregator { } } - Status Consume(KernelContext* ctx, const ExecBatch& batch) override { + Status Consume(KernelContext* ctx, const ExecSpan& batch) override { // short-circuit if (index >= 0 || !options.value->is_valid) { return Status::OK(); @@ -615,8 +615,8 @@ struct IndexImpl : public ScalarAggregator { if (batch[0].is_scalar()) { seen = batch.length; - if (batch[0].scalar()->is_valid) { - const ArgValue v = internal::UnboxScalar::Unbox(*batch[0].scalar()); + if (batch[0].scalar->is_valid) { + const ArgValue v = internal::UnboxScalar::Unbox(*batch[0].scalar); if (v == desired) { index = 0; return Status::Cancelled("Found"); @@ -625,12 +625,12 @@ struct IndexImpl : public ScalarAggregator { return Status::OK(); } - auto input = batch[0].array(); - seen = input->length; + const ArraySpan& input = batch[0].array; + seen = input.length; int64_t i = 0; ARROW_UNUSED(internal::VisitArrayValuesInline( - *input, + input, [&](ArgValue v) -> Status { if (v == desired) { index = i; @@ -671,7 +671,7 @@ template <> struct IndexImpl : public ScalarAggregator { explicit IndexImpl(IndexOptions, KernelState*) {} - Status Consume(KernelContext*, const ExecBatch&) override { return Status::OK(); } + Status Consume(KernelContext*, const ExecSpan&) override { return Status::OK(); } Status MergeFrom(KernelContext*, KernelState&&) override { return Status::OK(); } diff --git a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h index c945e7f27f3..bd2fe534608 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_basic_internal.h @@ -24,6 +24,7 @@ #include "arrow/compute/kernels/aggregate_internal.h" #include "arrow/compute/kernels/codegen_internal.h" #include "arrow/compute/kernels/common.h" +#include "arrow/compute/kernels/util_internal.h" #include "arrow/util/align_util.h" #include "arrow/util/bit_block_counter.h" #include "arrow/util/decimal.h" @@ -68,11 +69,11 @@ struct SumImpl : public ScalarAggregator { SumImpl(std::shared_ptr out_type, const ScalarAggregateOptions& options_) : out_type(out_type), options(options_) {} - Status Consume(KernelContext*, const ExecBatch& batch) override { + Status Consume(KernelContext*, const ExecSpan& batch) override { if (batch[0].is_array()) { - const auto& data = batch[0].array(); - this->count += data->length - data->GetNullCount(); - this->nulls_observed = this->nulls_observed || data->GetNullCount(); + const ArraySpan& data = batch[0].array; + this->count += data.length - data.GetNullCount(); + this->nulls_observed = this->nulls_observed || data.GetNullCount(); if (!options.skip_nulls && this->nulls_observed) { // Short-circuit @@ -80,12 +81,12 @@ struct SumImpl : public ScalarAggregator { } if (is_boolean_type::value) { - this->sum += static_cast(BooleanArray(data).true_count()); + this->sum += GetTrueCount(data); } else { - this->sum += SumArray(*data); + this->sum += SumArray(data); } } else { - const auto& data = *batch[0].scalar(); + const Scalar& data = *batch[0].scalar; this->count += data.is_valid * batch.length; this->nulls_observed = this->nulls_observed || !data.is_valid; if (data.is_valid) { @@ -126,8 +127,8 @@ struct NullImpl : public ScalarAggregator { explicit NullImpl(const ScalarAggregateOptions& options_) : options(options_) {} - Status Consume(KernelContext*, const ExecBatch& batch) override { - if (batch[0].is_scalar() || batch[0].array()->GetNullCount() > 0) { + Status Consume(KernelContext*, const ExecSpan& batch) override { + if (batch[0].is_scalar() || batch[0].array.GetNullCount() > 0) { // If the batch is a scalar or an array with elements, set is_empty to false is_empty = false; } @@ -428,11 +429,11 @@ struct MinMaxImpl : public ScalarAggregator { this->options.min_count = std::max(1, this->options.min_count); } - Status Consume(KernelContext*, const ExecBatch& batch) override { + Status Consume(KernelContext*, const ExecSpan& batch) override { if (batch[0].is_array()) { - return ConsumeArray(ArrayType(batch[0].array())); + return ConsumeArray(batch[0].array); } - return ConsumeScalar(*batch[0].scalar()); + return ConsumeScalar(*batch[0].scalar); } Status ConsumeScalar(const Scalar& scalar) { @@ -448,9 +449,11 @@ struct MinMaxImpl : public ScalarAggregator { return Status::OK(); } - Status ConsumeArray(const ArrayType& arr) { + Status ConsumeArray(const ArraySpan& arr_span) { StateType local; + ArrayType arr(arr_span.ToArrayData()); + const auto null_count = arr.null_count(); local.has_nulls = null_count > 0; this->count += arr.length() - null_count; @@ -566,12 +569,12 @@ struct BooleanMinMaxImpl : public MinMaxImpl { using MinMaxImpl::MinMaxImpl; using MinMaxImpl::options; - Status Consume(KernelContext*, const ExecBatch& batch) override { + Status Consume(KernelContext*, const ExecSpan& batch) override { if (ARROW_PREDICT_FALSE(batch[0].is_scalar())) { - return ConsumeScalar(checked_cast(*batch[0].scalar())); + return ConsumeScalar(checked_cast(*batch[0].scalar)); } StateType local; - ArrayType arr(batch[0].array()); + ArrayType arr(batch[0].array.ToArrayData()); const auto arr_length = arr.length(); const auto null_count = arr.null_count(); @@ -608,7 +611,7 @@ struct BooleanMinMaxImpl : public MinMaxImpl { }; struct NullMinMaxImpl : public ScalarAggregator { - Status Consume(KernelContext*, const ExecBatch& batch) override { return Status::OK(); } + Status Consume(KernelContext*, const ExecSpan& batch) override { return Status::OK(); } Status MergeFrom(KernelContext*, KernelState&& src) override { return Status::OK(); } diff --git a/cpp/src/arrow/compute/kernels/aggregate_internal.h b/cpp/src/arrow/compute/kernels/aggregate_internal.h index 07378e3ce82..8db74bfe0cd 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_internal.h +++ b/cpp/src/arrow/compute/kernels/aggregate_internal.h @@ -91,7 +91,7 @@ struct MultiplyTraits> { }; struct ScalarAggregator : public KernelState { - virtual Status Consume(KernelContext* ctx, const ExecBatch& batch) = 0; + virtual Status Consume(KernelContext* ctx, const ExecSpan& batch) = 0; virtual Status MergeFrom(KernelContext* ctx, KernelState&& src) = 0; virtual Status Finalize(KernelContext* ctx, Datum* out) = 0; }; @@ -142,7 +142,7 @@ struct GetSumType> { template enable_if_t::value, SumType> SumArray( - const ArrayData& data, ValueFunc&& func) { + const ArraySpan& data, ValueFunc&& func) { using arrow::internal::VisitSetBitRunsVoid; const int64_t data_size = data.length - data.GetNullCount(); @@ -182,7 +182,7 @@ enable_if_t::value, SumType> SumArray( }; const ValueType* values = data.GetValues(1); - VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length, + VisitSetBitRunsVoid(data.buffers[0].data, data.offset, data.length, [&](int64_t pos, int64_t len) { const ValueType* v = &values[pos]; // unsigned division by constant is cheaper than signed one @@ -219,12 +219,12 @@ enable_if_t::value, SumType> SumArray( template enable_if_t::value, SumType> SumArray( - const ArrayData& data, ValueFunc&& func) { + const ArraySpan& data, ValueFunc&& func) { using arrow::internal::VisitSetBitRunsVoid; SumType sum = 0; const ValueType* values = data.GetValues(1); - VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length, + VisitSetBitRunsVoid(data.buffers[0].data, data.offset, data.length, [&](int64_t pos, int64_t len) { for (int64_t i = 0; i < len; ++i) { sum += func(values[pos + i]); @@ -234,7 +234,7 @@ enable_if_t::value, SumType> SumArray( } template -SumType SumArray(const ArrayData& data) { +SumType SumArray(const ArraySpan& data) { return SumArray( data, [](ValueType v) { return static_cast(v); }); } diff --git a/cpp/src/arrow/compute/kernels/aggregate_mode.cc b/cpp/src/arrow/compute/kernels/aggregate_mode.cc index 263eced9e30..7f379a07197 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_mode.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_mode.cc @@ -416,31 +416,6 @@ struct Moder> { SortModer impl; }; -template -Status ScalarMode(KernelContext* ctx, const Scalar& scalar, ExecResult* out) { - using CType = typename TypeTraits::CType; - - const ModeOptions& options = ModeState::Get(ctx); - if ((!options.skip_nulls && !scalar.is_valid) || - (static_cast(scalar.is_valid) < options.min_count)) { - return PrepareOutput(/*n=*/0, ctx, *out->type(), out).status(); - } - - if (scalar.is_valid) { - bool called = false; - return Finalize(ctx, *out->type(), out, [&]() { - if (!called) { - called = true; - return std::pair(UnboxScalar::Unbox(scalar), 1); - } - return std::pair(static_cast(0), kCountEOF); - }); - } - return Finalize(ctx, *out->type(), out, []() { - return std::pair(static_cast(0), kCountEOF); - }); -} - Status CheckOptions(KernelContext* ctx) { if (ctx->state() == nullptr) { return Status::Invalid("Mode requires ModeOptions"); @@ -456,9 +431,6 @@ template struct ModeExecutor { static Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { RETURN_NOT_OK(CheckOptions(ctx)); - if (batch[0].is_scalar()) { - return ScalarMode(ctx, *batch[0].scalar, out); - } return Moder().impl.Exec(ctx, batch, out); } }; diff --git a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc index 921de15c316..32a5d127dcf 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc @@ -471,45 +471,6 @@ struct ExactQuantiler::value>> { SortQuantiler impl; }; -template -Status ScalarQuantile(KernelContext* ctx, const Scalar& scalar, ExecResult* out) { - const QuantileOptions& options = QuantileState::Get(ctx); - using CType = typename TypeTraits::CType; - ArrayData* output = out->array_data().get(); - output->length = options.q.size(); - auto out_type = IsDataPoint(options) ? scalar.type : float64(); - ARROW_ASSIGN_OR_RAISE(output->buffers[1], - ctx->Allocate(output->length * out_type->byte_width())); - - if (!scalar.is_valid || options.min_count > 1) { - output->null_count = output->length; - ARROW_ASSIGN_OR_RAISE(output->buffers[0], ctx->AllocateBitmap(output->length)); - bit_util::SetBitsTo(output->buffers[0]->mutable_data(), /*offset=*/0, output->length, - false); - if (IsDataPoint(options)) { - CType* out_buffer = output->template GetMutableValues(1); - std::fill(out_buffer, out_buffer + output->length, CType(0)); - } else { - double* out_buffer = output->template GetMutableValues(1); - std::fill(out_buffer, out_buffer + output->length, 0.0); - } - return Status::OK(); - } - output->null_count = 0; - if (IsDataPoint(options)) { - CType* out_buffer = output->template GetMutableValues(1); - for (int64_t i = 0; i < output->length; i++) { - out_buffer[i] = UnboxScalar::Unbox(scalar); - } - } else { - double* out_buffer = output->template GetMutableValues(1); - for (int64_t i = 0; i < output->length; i++) { - out_buffer[i] = DataPointToDouble(UnboxScalar::Unbox(scalar), *scalar.type); - } - } - return Status::OK(); -} - Status CheckQuantileOptions(KernelContext* ctx) { if (ctx->state() == nullptr) { return Status::Invalid("Quantile requires QuantileOptions"); @@ -531,9 +492,6 @@ template struct QuantileExecutor { static Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { RETURN_NOT_OK(CheckQuantileOptions(ctx)); - if (batch[0].is_scalar()) { - return ScalarQuantile(ctx, *batch[0].scalar, out); - } return ExactQuantiler().impl.Exec(ctx, batch[0].array, out); } }; diff --git a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc index cfb7d3c3b35..0e00537e3cc 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc @@ -54,19 +54,19 @@ struct TDigestImpl : public ScalarAggregator { double ToDouble(const Decimal128& value) const { return value.ToDouble(decimal_scale); } double ToDouble(const Decimal256& value) const { return value.ToDouble(decimal_scale); } - Status Consume(KernelContext*, const ExecBatch& batch) override { + Status Consume(KernelContext*, const ExecSpan& batch) override { if (!this->all_valid) return Status::OK(); if (!options.skip_nulls && batch[0].null_count() > 0) { this->all_valid = false; return Status::OK(); } if (batch[0].is_array()) { - const ArrayData& data = *batch[0].array(); + const ArraySpan& data = batch[0].array; const CType* values = data.GetValues(1); if (data.length > data.GetNullCount()) { this->count += data.length - data.GetNullCount(); - VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length, + VisitSetBitRunsVoid(data.buffers[0].data, data.offset, data.length, [&](int64_t pos, int64_t len) { for (int64_t i = 0; i < len; ++i) { this->tdigest.NanAdd(ToDouble(values[pos + i])); @@ -74,8 +74,8 @@ struct TDigestImpl : public ScalarAggregator { }); } } else { - const CType value = UnboxScalar::Unbox(*batch[0].scalar()); - if (batch[0].scalar()->is_valid) { + const CType value = UnboxScalar::Unbox(*batch[0].scalar); + if (batch[0].scalar->is_valid) { this->count += 1; for (int64_t i = 0; i < batch.length; i++) { this->tdigest.NanAdd(ToDouble(value)); diff --git a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc index 1f9a26960b8..1693e952787 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_var_std.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_var_std.cc @@ -54,19 +54,19 @@ struct VarStdState { // https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Two-pass_algorithm template enable_if_t::value || (sizeof(CType) > 4)> Consume( - const ArrayType& array) { - this->all_valid = array.null_count() == 0; - int64_t count = array.length() - array.null_count(); + const ArraySpan& array) { + this->all_valid = array.GetNullCount() == 0; + int64_t count = array.length - array.GetNullCount(); if (count == 0 || (!this->all_valid && !options.skip_nulls)) { return; } using SumType = typename internal::GetSumType::SumType; - SumType sum = internal::SumArray(*array.data()); + SumType sum = internal::SumArray(array); const double mean = ToDouble(sum) / count; const double m2 = internal::SumArray( - *array.data(), [this, mean](CType value) { + array, [this, mean](CType value) { const double v = ToDouble(value); return (v - mean) * (v - mean); }); @@ -79,29 +79,30 @@ struct VarStdState { // int32/16/8: textbook one pass algorithm with integer arithmetic template enable_if_t::value && (sizeof(CType) <= 4)> Consume( - const ArrayType& array) { + const ArraySpan& array) { // max number of elements that sum will not overflow int64 (2Gi int32 elements) // for uint32: 0 <= sum < 2^63 (int64 >= 0) // for int32: -2^62 <= sum < 2^62 constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8); - this->all_valid = array.null_count() == 0; + this->all_valid = array.GetNullCount() == 0; if (!this->all_valid && !options.skip_nulls) return; int64_t start_index = 0; - int64_t valid_count = array.length() - array.null_count(); + int64_t valid_count = array.length - array.GetNullCount(); + ArraySpan slice = array; while (valid_count > 0) { // process in chunks that overflow will never happen - const auto slice = array.Slice(start_index, max_length); - const int64_t count = slice->length() - slice->null_count(); - start_index += max_length; + slice.SetSlice(start_index + array.offset, + std::min(max_length, array.length - start_index)); + const int64_t count = slice.length - slice.GetNullCount(); + start_index += slice.length; valid_count -= count; if (count > 0) { IntegerVarStd var_std; - const ArrayData& data = *slice->data(); - const CType* values = data.GetValues(1); - VisitSetBitRunsVoid(data.buffers[0], data.offset, data.length, + const CType* values = slice.GetValues(1); + VisitSetBitRunsVoid(slice.buffers[0].data, slice.offset, slice.length, [&](int64_t pos, int64_t len) { for (int64_t i = 0; i < len; ++i) { const auto value = values[pos + i]; @@ -166,12 +167,11 @@ struct VarStdImpl : public ScalarAggregator { const VarianceOptions& options, VarOrStd return_type) : out_type(out_type), state(decimal_scale, options), return_type(return_type) {} - Status Consume(KernelContext*, const ExecBatch& batch) override { + Status Consume(KernelContext*, const ExecSpan& batch) override { if (batch[0].is_array()) { - ArrayType array(batch[0].array()); - this->state.Consume(array); + this->state.Consume(batch[0].array); } else { - this->state.Consume(*batch[0].scalar(), batch.length); + this->state.Consume(*batch[0].scalar, batch.length); } return Status::OK(); } diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 49c88324a91..4537c32eb38 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -72,7 +72,7 @@ struct GroupedAggregator : KernelState { virtual Status Resize(int64_t new_num_groups) = 0; - virtual Status Consume(const ExecBatch& batch) = 0; + virtual Status Consume(const ExecSpan& batch) = 0; virtual Status Merge(GroupedAggregator&& other, const ArrayData& group_id_mapping) = 0; @@ -92,7 +92,7 @@ Result> HashAggregateInit(KernelContext* ctx, Status HashAggregateResize(KernelContext* ctx, int64_t num_groups) { return checked_cast(ctx->state())->Resize(num_groups); } -Status HashAggregateConsume(KernelContext* ctx, const ExecBatch& batch) { +Status HashAggregateConsume(KernelContext* ctx, const ExecSpan& batch) { return checked_cast(ctx->state())->Consume(batch); } Status HashAggregateMerge(KernelContext* ctx, KernelState&& other, @@ -167,17 +167,17 @@ struct GroupedValueTraits { template typename arrow::internal::call_traits::enable_if_return::type -VisitGroupedValues(const ExecBatch& batch, ConsumeValue&& valid_func, +VisitGroupedValues(const ExecSpan& batch, ConsumeValue&& valid_func, ConsumeNull&& null_func) { - auto g = batch[1].array()->GetValues(1); + auto g = batch[1].array.GetValues(1); if (batch[0].is_array()) { VisitArrayValuesInline( - *batch[0].array(), + batch[0].array, [&](typename TypeTraits::CType val) { valid_func(*g++, val); }, [&]() { null_func(*g++); }); return; } - const auto& input = *batch[0].scalar(); + const Scalar& input = *batch[0].scalar; if (input.is_valid) { const auto val = UnboxScalar::Unbox(input); for (int64_t i = 0; i < batch.length; i++) { @@ -192,16 +192,16 @@ VisitGroupedValues(const ExecBatch& batch, ConsumeValue&& valid_func, template typename arrow::internal::call_traits::enable_if_return::type -VisitGroupedValues(const ExecBatch& batch, ConsumeValue&& valid_func, +VisitGroupedValues(const ExecSpan& batch, ConsumeValue&& valid_func, ConsumeNull&& null_func) { - auto g = batch[1].array()->GetValues(1); + auto g = batch[1].array.GetValues(1); if (batch[0].is_array()) { return VisitArrayValuesInline( - *batch[0].array(), + batch[0].array, [&](typename GetViewType::T val) { return valid_func(*g++, val); }, [&]() { return null_func(*g++); }); } - const auto& input = *batch[0].scalar(); + const Scalar& input = *batch[0].scalar; if (input.is_valid) { const auto val = UnboxScalar::Unbox(input); for (int64_t i = 0; i < batch.length; i++) { @@ -216,7 +216,7 @@ VisitGroupedValues(const ExecBatch& batch, ConsumeValue&& valid_func, } template -void VisitGroupedValuesNonNull(const ExecBatch& batch, ConsumeValue&& valid_func) { +void VisitGroupedValuesNonNull(const ExecSpan& batch, ConsumeValue&& valid_func) { VisitGroupedValues(batch, std::forward(valid_func), [](uint32_t) {}); } @@ -251,20 +251,20 @@ struct GroupedCountImpl : public GroupedAggregator { return Status::OK(); } - Status Consume(const ExecBatch& batch) override { + Status Consume(const ExecSpan& batch) override { auto counts = reinterpret_cast(counts_.mutable_data()); - auto g_begin = batch[1].array()->GetValues(1); + auto g_begin = batch[1].array.GetValues(1); if (options_.mode == CountOptions::ALL) { for (int64_t i = 0; i < batch.length; ++i, ++g_begin) { counts[*g_begin] += 1; } } else if (batch[0].is_array()) { - const auto& input = batch[0].array(); + const ArraySpan& input = batch[0].array; if (options_.mode == CountOptions::ONLY_VALID) { - if (input->type->id() != arrow::Type::NA) { + if (input.type->id() != arrow::Type::NA) { arrow::internal::VisitSetBitRunsVoid( - input->buffers[0], input->offset, input->length, + input.buffers[0].data, input.offset, input.length, [&](int64_t offset, int64_t length) { auto g = g_begin + offset; for (int64_t i = 0; i < length; ++i, ++g) { @@ -273,19 +273,19 @@ struct GroupedCountImpl : public GroupedAggregator { }); } } else { // ONLY_NULL - if (input->type->id() == arrow::Type::NA) { + if (input.type->id() == arrow::Type::NA) { for (int64_t i = 0; i < batch.length; ++i, ++g_begin) { counts[*g_begin] += 1; } - } else if (input->MayHaveNulls()) { - auto end = input->offset + input->length; - for (int64_t i = input->offset; i < end; ++i, ++g_begin) { - counts[*g_begin] += !bit_util::GetBit(input->buffers[0]->data(), i); + } else if (input.MayHaveNulls()) { + auto end = input.offset + input.length; + for (int64_t i = input.offset; i < end; ++i, ++g_begin) { + counts[*g_begin] += !bit_util::GetBit(input.buffers[0].data, i); } } } } else { - const auto& input = *batch[0].scalar(); + const Scalar& input = *batch[0].scalar; if (options_.mode == CountOptions::ONLY_VALID) { for (int64_t i = 0; i < batch.length; ++i, ++g_begin) { counts[*g_begin] += input.is_valid; @@ -339,7 +339,7 @@ struct GroupedReducingAggregator : public GroupedAggregator { return Status::OK(); } - Status Consume(const ExecBatch& batch) override { + Status Consume(const ExecSpan& batch) override { CType* reduced = reduced_.mutable_data(); int64_t* counts = counts_.mutable_data(); uint8_t* no_nulls = no_nulls_.mutable_data(); @@ -457,7 +457,7 @@ struct GroupedNullImpl : public GroupedAggregator { return Status::OK(); } - Status Consume(const ExecBatch& batch) override { return Status::OK(); } + Status Consume(const ExecSpan& batch) override { return Status::OK(); } Status Merge(GroupedAggregator&& raw_other, const ArrayData& group_id_mapping) override { @@ -747,13 +747,13 @@ struct GroupedVarStdImpl : public GroupedAggregator { return value.ToDouble(decimal_scale_); } - Status Consume(const ExecBatch& batch) override { return ConsumeImpl(batch); } + Status Consume(const ExecSpan& batch) override { return ConsumeImpl(batch); } // float/double/int64/decimal: calculate `m2` (sum((X-mean)^2)) with // `two pass algorithm` (see aggregate_var_std.cc) template enable_if_t::value || (sizeof(CType) > 4), Status> ConsumeImpl( - const ExecBatch& batch) { + const ExecSpan& batch) { using SumType = typename internal::GetSumType::SumType; GroupedVarStdImpl state; @@ -799,14 +799,14 @@ struct GroupedVarStdImpl : public GroupedAggregator { // aggregate_var_std.cc) template enable_if_t::value && (sizeof(CType) <= 4), Status> ConsumeImpl( - const ExecBatch& batch) { + const ExecSpan& batch) { // max number of elements that sum will not overflow int64 (2Gi int32 elements) // for uint32: 0 <= sum < 2^63 (int64 >= 0) // for int32: -2^62 <= sum < 2^62 constexpr int64_t max_length = 1ULL << (63 - sizeof(CType) * 8); - const auto g = batch[1].array()->GetValues(1); - if (batch[0].is_scalar() && !batch[0].scalar()->is_valid) { + const auto g = batch[1].array.GetValues(1); + if (batch[0].is_scalar() && !batch[0].scalar->is_valid) { uint8_t* no_nulls = no_nulls_.mutable_data(); for (int64_t i = 0; i < batch.length; i++) { bit_util::ClearBit(no_nulls, g[i]); @@ -839,7 +839,7 @@ struct GroupedVarStdImpl : public GroupedAggregator { uint8_t* other_no_nulls = state.no_nulls_.mutable_data(); if (batch[0].is_array()) { - const auto& array = *batch[0].array(); + const ArraySpan& array = batch[0].array; const CType* values = array.GetValues(1); auto visit_values = [&](int64_t pos, int64_t len) { for (int64_t i = 0; i < len; ++i) { @@ -851,7 +851,7 @@ struct GroupedVarStdImpl : public GroupedAggregator { if (array.MayHaveNulls()) { arrow::internal::BitRunReader reader( - array.buffers[0]->data(), array.offset + start_index, + array.buffers[0].data, array.offset + start_index, std::min(max_length, batch.length - start_index)); int64_t position = 0; while (true) { @@ -870,7 +870,7 @@ struct GroupedVarStdImpl : public GroupedAggregator { visit_values(0, array.length); } } else { - const auto value = UnboxScalar::Unbox(*batch[0].scalar()); + const auto value = UnboxScalar::Unbox(*batch[0].scalar); for (int64_t i = 0; i < std::min(max_length, batch.length - start_index); ++i) { const int64_t index = start_index + i; var_std[g[index]].ConsumeOne(value); @@ -1052,7 +1052,7 @@ struct GroupedTDigestImpl : public GroupedAggregator { return value.ToDouble(decimal_scale_); } - Status Consume(const ExecBatch& batch) override { + Status Consume(const ExecSpan& batch) override { int64_t* counts = counts_.mutable_data(); uint8_t* no_nulls = no_nulls_.mutable_data(); VisitGroupedValues( @@ -1263,7 +1263,7 @@ struct GroupedMinMaxImpl final : public GroupedAggregator { return Status::OK(); } - Status Consume(const ExecBatch& batch) override { + Status Consume(const ExecSpan& batch) override { auto raw_mins = mins_.mutable_data(); auto raw_maxes = maxes_.mutable_data(); @@ -1370,7 +1370,7 @@ struct GroupedMinMaxImpl( batch, [&](uint32_t g, util::string_view val) { @@ -1518,7 +1518,7 @@ struct GroupedNullMinMaxImpl final : public GroupedAggregator { return Status::OK(); } - Status Consume(const ExecBatch& batch) override { return Status::OK(); } + Status Consume(const ExecSpan& batch) override { return Status::OK(); } Status Merge(GroupedAggregator&& raw_other, const ArrayData& group_id_mapping) override { @@ -1663,18 +1663,18 @@ struct GroupedBooleanAggregator : public GroupedAggregator { return counts_.Append(added_groups, 0); } - Status Consume(const ExecBatch& batch) override { + Status Consume(const ExecSpan& batch) override { uint8_t* reduced = reduced_.mutable_data(); uint8_t* no_nulls = no_nulls_.mutable_data(); int64_t* counts = counts_.mutable_data(); - auto g = batch[1].array()->GetValues(1); + auto g = batch[1].array.GetValues(1); if (batch[0].is_array()) { - const auto& input = *batch[0].array(); - const uint8_t* bitmap = input.buffers[1]->data(); + const ArraySpan& input = batch[0].array; + const uint8_t* bitmap = input.buffers[1].data; if (input.MayHaveNulls()) { arrow::internal::VisitBitBlocksVoid( - input.buffers[0]->data(), input.offset, input.length, + input.buffers[0].data, input.offset, input.length, [&](int64_t position) { counts[*g]++; Impl::UpdateGroupWith(reduced, *g, bit_util::GetBit(bitmap, position)); @@ -1694,7 +1694,7 @@ struct GroupedBooleanAggregator : public GroupedAggregator { }); } } else { - const auto& input = *batch[0].scalar(); + const Scalar& input = *batch[0].scalar; if (input.is_valid) { const bool value = UnboxScalar::Unbox(input); for (int64_t i = 0; i < batch.length; i++) { @@ -1828,7 +1828,7 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { return Status::OK(); } - Status Consume(const ExecBatch& batch) override { + Status Consume(const ExecSpan& batch) override { ARROW_ASSIGN_OR_RAISE(std::ignore, grouper_->Consume(batch)); return Status::OK(); } @@ -1839,8 +1839,8 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { // Get (value, group_id) pairs, then translate the group IDs and consume them // ourselves - ARROW_ASSIGN_OR_RAISE(auto uniques, other->grouper_->GetUniques()); - ARROW_ASSIGN_OR_RAISE(auto remapped_g, + ARROW_ASSIGN_OR_RAISE(ExecBatch uniques, other->grouper_->GetUniques()); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr remapped_g, AllocateBuffer(uniques.length * sizeof(uint32_t), pool_)); const auto* g_mapping = group_id_mapping.GetValues(1); @@ -1850,10 +1850,10 @@ struct GroupedCountDistinctImpl : public GroupedAggregator { for (int64_t i = 0; i < uniques.length; i++) { g[i] = g_mapping[other_g[i]]; } - uniques.values[1] = - ArrayData::Make(uint32(), uniques.length, {nullptr, std::move(remapped_g)}); - return Consume(std::move(uniques)); + ExecSpan uniques_span(uniques); + uniques_span.values[1].array.SetBuffer(1, remapped_g); + return Consume(uniques_span); } Result Finalize() override { @@ -1990,7 +1990,7 @@ struct GroupedOneImpl final : public GroupedAggregator { return Status::OK(); } - Status Consume(const ExecBatch& batch) override { + Status Consume(const ExecSpan& batch) override { auto raw_ones_ = ones_.mutable_data(); return VisitGroupedValues( @@ -2049,7 +2049,7 @@ struct GroupedNullOneImpl : public GroupedAggregator { return Status::OK(); } - Status Consume(const ExecBatch& batch) override { return Status::OK(); } + Status Consume(const ExecSpan& batch) override { return Status::OK(); } Status Merge(GroupedAggregator&& raw_other, const ArrayData& group_id_mapping) override { @@ -2089,7 +2089,7 @@ struct GroupedOneImpl::value || return Status::OK(); } - Status Consume(const ExecBatch& batch) override { + Status Consume(const ExecSpan& batch) override { return VisitGroupedValues( batch, [&](uint32_t g, util::string_view val) -> Status { @@ -2292,17 +2292,17 @@ struct GroupedListImpl final : public GroupedAggregator { return Status::OK(); } - Status Consume(const ExecBatch& batch) override { - const auto& values_array_data = batch[0].array(); - int64_t num_values = values_array_data->length; + Status Consume(const ExecSpan& batch) override { + const ArraySpan& values_array_data = batch[0].array; + const ArraySpan& groups_array_data = batch[1].array; - const auto& groups_array_data = batch[1].array(); - const auto* groups = groups_array_data->GetValues(1, 0); - DCHECK_EQ(groups_array_data->offset, 0); + int64_t num_values = values_array_data.length; + const auto* groups = groups_array_data.GetValues(1, 0); + DCHECK_EQ(groups_array_data.offset, 0); RETURN_NOT_OK(groups_.Append(groups, num_values)); - int64_t offset = values_array_data->offset; - const uint8_t* values = values_array_data->buffers[1]->data(); + int64_t offset = values_array_data.offset; + const uint8_t* values = values_array_data.buffers[1].data; RETURN_NOT_OK(GetSet::AppendBuffers(&values_, values, offset, num_values)); if (batch[0].null_count() > 0) { @@ -2310,7 +2310,7 @@ struct GroupedListImpl final : public GroupedAggregator { has_nulls_ = true; RETURN_NOT_OK(values_bitmap_.Append(num_args_, true)); } - const uint8_t* values_bitmap = values_array_data->buffers[0]->data(); + const uint8_t* values_bitmap = values_array_data.buffers[0].data; RETURN_NOT_OK(GroupedValueTraits::AppendBuffers( &values_bitmap_, values_bitmap, offset, num_values)); } else if (has_nulls_) { @@ -2399,20 +2399,20 @@ struct GroupedListImpl::value || return Status::OK(); } - Status Consume(const ExecBatch& batch) override { - const auto& values_array_data = batch[0].array(); - int64_t num_values = values_array_data->length; - int64_t offset = values_array_data->offset; + Status Consume(const ExecSpan& batch) override { + const ArraySpan& values_array_data = batch[0].array; + int64_t num_values = values_array_data.length; + int64_t offset = values_array_data.offset; - const auto& groups_array_data = batch[1].array(); - const auto* groups = groups_array_data->GetValues(1, 0); - DCHECK_EQ(groups_array_data->offset, 0); + const ArraySpan& groups_array_data = batch[1].array; + const uint32_t* groups = groups_array_data.GetValues(1, 0); + DCHECK_EQ(groups_array_data.offset, 0); RETURN_NOT_OK(groups_.Append(groups, num_values)); if (batch[0].null_count() == 0) { RETURN_NOT_OK(values_bitmap_.Append(num_values, true)); } else { - const uint8_t* values_bitmap = values_array_data->buffers[0]->data(); + const uint8_t* values_bitmap = values_array_data.buffers[0].data; RETURN_NOT_OK(GroupedValueTraits::AppendBuffers( &values_bitmap_, values_bitmap, offset, num_values)); } @@ -2553,9 +2553,9 @@ struct GroupedNullListImpl : public GroupedAggregator { return counts_.Append(added_groups, 0); } - Status Consume(const ExecBatch& batch) override { + Status Consume(const ExecSpan& batch) override { int64_t* counts = counts_.mutable_data(); - const auto* g_begin = batch[1].array()->GetValues(1); + const auto* g_begin = batch[1].array.GetValues(1); for (int64_t i = 0; i < batch.length; ++i, ++g_begin) { counts[*g_begin] += 1; } diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc index 156e5896124..f599f9abb60 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate_test.cc @@ -77,7 +77,7 @@ Result NaiveGroupBy(std::vector arguments, std::vector keys ARROW_ASSIGN_OR_RAISE(auto grouper, Grouper::Make(key_batch.GetTypes())); - ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(key_batch)); + ARROW_ASSIGN_OR_RAISE(Datum id_batch, grouper->Consume(ExecSpan(key_batch))); ARROW_ASSIGN_OR_RAISE( auto groupings, @@ -184,7 +184,7 @@ Result GroupByUsingExecPlan(const std::vector& arguments, const std::vector& keys, const std::vector& aggregates, bool use_threads, ExecContext* ctx) { - using arrow::compute::detail::ExecBatchIterator; + using arrow::compute::detail::ExecSpanIterator; FieldVector scan_fields(arguments.size() + keys.size()); std::vector key_names(keys.size()); @@ -202,14 +202,15 @@ Result GroupByUsingExecPlan(const std::vector& arguments, inputs.reserve(inputs.size() + keys.size()); inputs.insert(inputs.end(), keys.begin(), keys.end()); - ARROW_ASSIGN_OR_RAISE(auto batch_iterator, - ExecBatchIterator::Make(inputs, ctx->exec_chunksize())); + ExecSpanIterator span_iterator; + ARROW_ASSIGN_OR_RAISE(auto batch, ExecBatch::Make(inputs)); + RETURN_NOT_OK(span_iterator.Init(batch, ctx->exec_chunksize())); BatchesWithSchema input; input.schema = schema(std::move(scan_fields)); - ExecBatch batch; - while (batch_iterator->Next(&batch)) { - if (batch.length == 0) continue; - input.batches.push_back(std::move(batch)); + ExecSpan span; + while (span_iterator.Next(&span)) { + if (span.length == 0) continue; + input.batches.push_back(span.ToExecBatch()); } return GroupByUsingExecPlan(input, key_names, aggregates, use_threads, ctx); @@ -388,7 +389,7 @@ struct TestGrouper { } void ConsumeAndValidate(const ExecBatch& key_batch, Datum* ids = nullptr) { - ASSERT_OK_AND_ASSIGN(Datum id_batch, grouper_->Consume(key_batch)); + ASSERT_OK_AND_ASSIGN(Datum id_batch, grouper_->Consume(ExecSpan(key_batch))); ValidateConsume(key_batch, id_batch); @@ -536,11 +537,13 @@ TEST(Grouper, DictKey) { g.ExpectConsume({WithIndices(" [3, 1, null, 0, 2]")}, ArrayFromJSON(uint32(), "[3, 1, 4, 0, 2]")); - EXPECT_RAISES_WITH_MESSAGE_THAT( - NotImplemented, HasSubstr("Unifying differing dictionaries"), - g.grouper_->Consume(*ExecBatch::Make({*DictionaryArray::FromArrays( - ArrayFromJSON(int32(), "[0, 1]"), - ArrayFromJSON(utf8(), R"(["different", "dictionary"])"))}))); + auto dict_arr = *DictionaryArray::FromArrays( + ArrayFromJSON(int32(), "[0, 1]"), + ArrayFromJSON(utf8(), R"(["different", "dictionary"])")); + ExecSpan dict_span({*dict_arr->data()}, 2); + EXPECT_RAISES_WITH_MESSAGE_THAT(NotImplemented, + HasSubstr("Unifying differing dictionaries"), + g.grouper_->Consume(dict_span)); } TEST(Grouper, StringInt64Key) { diff --git a/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc b/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc index 61e8e90dddc..8d36cff6ae9 100644 --- a/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc +++ b/cpp/src/arrow/compute/kernels/scalar_cast_numeric.cc @@ -194,13 +194,7 @@ Status CheckIntegerFloatTruncateImpl(const ExecValue& input) { const int64_t limit = FloatingIntegerBound::value; InScalarType bound_lower(IsSigned ? -limit : 0); InScalarType bound_upper(limit); - - if (input.is_scalar()) { - ArraySpan span(*input.scalar); - return CheckIntegersInRange(span, bound_lower, bound_upper); - } else { - return CheckIntegersInRange(input.array, bound_lower, bound_upper); - } + return CheckIntegersInRange(input.array, bound_lower, bound_upper); } Status CheckForIntegerToFloatingTruncation(const ExecValue& input, Type::type out_type) { diff --git a/cpp/src/arrow/compute/kernels/scalar_nested.cc b/cpp/src/arrow/compute/kernels/scalar_nested.cc index d3e72bea34b..0b6118812a4 100644 --- a/cpp/src/arrow/compute/kernels/scalar_nested.cc +++ b/cpp/src/arrow/compute/kernels/scalar_nested.cc @@ -99,7 +99,7 @@ struct ListElement { const ArraySpan& list_values = list.child_data[0]; const offset_type* offsets = list.GetValues(1); - IndexValueType index; + IndexValueType index = 0; RETURN_NOT_OK(GetListElementIndex(batch[1], &index)); std::unique_ptr builder; @@ -138,7 +138,7 @@ struct FixedSizeListElement { const ArraySpan& list = batch[0].array; const ArraySpan& list_values = list.child_data[0]; - IndexValueType index; + IndexValueType index = 0; RETURN_NOT_OK(GetListElementIndex(batch[1], &index)); std::unique_ptr builder; @@ -446,8 +446,7 @@ struct MapLookupFunctor { const int32_t item_size = offsets[map_index + 1] - offsets[map_index]; // Adjust the keys view to just the map slot that we are about to search - map_keys.SetOffset(item_offset); - map_keys.length = item_size; + map_keys.SetSlice(item_offset, item_size); bool found_at_least_one_key = false; RETURN_NOT_OK(FindMatchingIndices(map_keys, query_key, [&](int64_t key_index) { @@ -477,8 +476,7 @@ struct MapLookupFunctor { const int32_t item_size = offsets[map_index + 1] - offsets[map_index]; // Adjust the keys view to just the map slot that we are about to search - map_keys.SetOffset(item_offset); - map_keys.length = item_size; + map_keys.SetSlice(item_offset, item_size); ARROW_ASSIGN_OR_RAISE( int64_t item_index, diff --git a/cpp/src/arrow/compute/row/grouper.cc b/cpp/src/arrow/compute/row/grouper.cc index 28ebc9f1967..d6d00c2cce6 100644 --- a/cpp/src/arrow/compute/row/grouper.cc +++ b/cpp/src/arrow/compute/row/grouper.cc @@ -99,16 +99,10 @@ struct GrouperImpl : Grouper { return std::move(impl); } - Result Consume(const ExecBatch& batch) override { + Result Consume(const ExecSpan& batch) override { std::vector offsets_batch(batch.length + 1); for (int i = 0; i < batch.num_values(); ++i) { - ExecValue value; - if (batch[i].is_array()) { - value.SetArray(*batch[i].array()); - } else { - value.SetScalar(batch[i].scalar().get()); - } - encoders_[i]->AddLength(value, batch.length, offsets_batch.data()); + encoders_[i]->AddLength(batch[i], batch.length, offsets_batch.data()); } int32_t total_length = 0; @@ -126,13 +120,7 @@ struct GrouperImpl : Grouper { } for (int i = 0; i < batch.num_values(); ++i) { - ExecValue value; - if (batch[i].is_array()) { - value.SetArray(*batch[i].array()); - } else { - value.SetScalar(batch[i].scalar().get()); - } - RETURN_NOT_OK(encoders_[i]->Encode(value, batch.length, key_buf_ptrs.data())); + RETURN_NOT_OK(encoders_[i]->Encode(batch[i], batch.length, key_buf_ptrs.data())); } TypedBufferBuilder group_ids_batch(ctx_->memory_pool()); @@ -281,11 +269,11 @@ struct GrouperFastImpl : Grouper { ~GrouperFastImpl() { map_.cleanup(); } - Result Consume(const ExecBatch& batch) override { + Result Consume(const ExecSpan& batch) override { // ARROW-14027: broadcast scalar arguments for now for (int i = 0; i < batch.num_values(); i++) { - if (batch.values[i].is_scalar()) { - ExecBatch expanded = batch; + if (batch[i].is_scalar()) { + ExecBatch expanded = batch.ToExecBatch(); for (int j = i; j < expanded.num_values(); j++) { if (expanded.values[j].is_scalar()) { ARROW_ASSIGN_OR_RAISE( @@ -294,20 +282,20 @@ struct GrouperFastImpl : Grouper { ctx_->memory_pool())); } } - return ConsumeImpl(expanded); + return ConsumeImpl(ExecSpan(expanded)); } } return ConsumeImpl(batch); } - Result ConsumeImpl(const ExecBatch& batch) { + Result ConsumeImpl(const ExecSpan& batch) { int64_t num_rows = batch.length; int num_columns = batch.num_values(); // Process dictionaries for (int icol = 0; icol < num_columns; ++icol) { if (key_types_[icol].id() == Type::DICTIONARY) { - auto data = batch[icol].array(); - auto dict = MakeArray(data->dictionary); + const ArraySpan& data = batch[icol].array; + auto dict = MakeArray(data.dictionary().ToArrayData()); if (dictionaries_[icol]) { if (!dictionaries_[icol]->Equals(dict)) { // TODO(bkietz) unify if necessary. For now, just error if any batch's @@ -331,16 +319,16 @@ struct GrouperFastImpl : Grouper { // Skip if the key's type is NULL if (key_types_[icol].id() != Type::NA) { - if (batch[icol].array()->buffers[0] != NULLPTR) { - non_nulls = batch[icol].array()->buffers[0]->data(); + if (batch[icol].array.buffers[0].data != NULLPTR) { + non_nulls = batch[icol].array.buffers[0].data; } - fixedlen = batch[icol].array()->buffers[1]->data(); + fixedlen = batch[icol].array.buffers[1].data; if (!col_metadata_[icol].is_fixed_length) { - varlen = batch[icol].array()->buffers[2]->data(); + varlen = batch[icol].array.buffers[2].data; } } - int64_t offset = batch[icol].array()->offset; + int64_t offset = batch[icol].array.offset; auto col_base = KeyColumnArray(col_metadata_[icol], offset + num_rows, non_nulls, fixedlen, varlen); diff --git a/cpp/src/arrow/compute/row/grouper.h b/cpp/src/arrow/compute/row/grouper.h index 4c106794573..ce09adf09b3 100644 --- a/cpp/src/arrow/compute/row/grouper.h +++ b/cpp/src/arrow/compute/row/grouper.h @@ -42,7 +42,7 @@ class ARROW_EXPORT Grouper { /// Consume a batch of keys, producing the corresponding group ids as an integer array. /// Currently only uint32 indices will be produced, eventually the bit width will only /// be as wide as necessary. - virtual Result Consume(const ExecBatch& batch) = 0; + virtual Result Consume(const ExecSpan& batch) = 0; /// Get current unique keys. May be called multiple times. virtual Result GetUniques() = 0; diff --git a/cpp/src/arrow/dataset/partition.cc b/cpp/src/arrow/dataset/partition.cc index a210c947a3c..26abc10e6b8 100644 --- a/cpp/src/arrow/dataset/partition.cc +++ b/cpp/src/arrow/dataset/partition.cc @@ -161,10 +161,10 @@ Result KeyValuePartitioning::Partition( return PartitionedBatches{{batch}, {compute::literal(true)}}; } - // assemble an ExecBatch of the key columns - compute::ExecBatch key_batch({}, batch->num_rows()); + // assemble an ExecSpan of the key columns + compute::ExecSpan key_batch({}, batch->num_rows()); for (int i : key_indices) { - key_batch.values.emplace_back(batch->column_data(i)); + key_batch.values.emplace_back(ArraySpan(*batch->column_data(i))); } ARROW_ASSIGN_OR_RAISE(auto grouper, compute::Grouper::Make(key_batch.GetTypes()));