diff --git a/cpp/src/arrow/array/array_primitive.cc b/cpp/src/arrow/array/array_primitive.cc index 5312c3ece14..7c4a14d9340 100644 --- a/cpp/src/arrow/array/array_primitive.cc +++ b/cpp/src/arrow/array/array_primitive.cc @@ -58,18 +58,9 @@ int64_t BooleanArray::false_count() const { int64_t BooleanArray::true_count() const { if (data_->null_count.load() != 0) { DCHECK(data_->buffers[0]); - internal::BinaryBitBlockCounter bit_counter(data_->buffers[0]->data(), data_->offset, - data_->buffers[1]->data(), data_->offset, - data_->length); - int64_t count = 0; - while (true) { - internal::BitBlockCount block = bit_counter.NextAndWord(); - if (block.length == 0) { - break; - } - count += block.popcount; - } - return count; + return internal::CountAndSetBits(data_->buffers[0]->data(), data_->offset, + data_->buffers[1]->data(), data_->offset, + data_->length); } else { return internal::CountSetBits(data_->buffers[1]->data(), data_->offset, data_->length); diff --git a/cpp/src/arrow/array/data.cc b/cpp/src/arrow/array/data.cc index 653d206f015..37db8ccb775 100644 --- a/cpp/src/arrow/array/data.cc +++ b/cpp/src/arrow/array/data.cc @@ -151,6 +151,13 @@ void ArraySpan::SetMembers(const ArrayData& data) { } } + Type::type type_id = this->type->id(); + if (data.buffers[0] == nullptr && type_id != Type::NA && + type_id != Type::SPARSE_UNION && type_id != Type::DENSE_UNION) { + // This should already be zero but we make for sure + this->null_count = 0; + } + // Makes sure any other buffers are seen as null / non-existent for (int i = static_cast(data.buffers.size()); i < 3; ++i) { ClearBuffer(i); @@ -208,7 +215,6 @@ int64_t ArraySpan::GetNullCount() const { int GetNumBuffers(const DataType& type) { switch (type.id()) { case Type::NA: - return 0; case Type::STRUCT: case Type::FIXED_SIZE_LIST: return 1; @@ -232,7 +238,7 @@ int ArraySpan::num_buffers() const { return GetNumBuffers(*this->type); } std::shared_ptr ArraySpan::ToArrayData() const { auto result = std::make_shared(this->type->Copy(), this->length, - kUnknownNullCount, this->offset); + this->null_count, this->offset); for (int i = 0; i < this->num_buffers(); ++i) { if (this->buffers[i].owner) { diff --git a/cpp/src/arrow/builder.cc b/cpp/src/arrow/builder.cc index 7c027c9b1e8..779722e0d1c 100644 --- a/cpp/src/arrow/builder.cc +++ b/cpp/src/arrow/builder.cc @@ -170,7 +170,7 @@ struct DictionaryBuilderCase { out->reset(new internal::DictionaryBuilderBase( index_type, value_type, pool)); } else { - auto start_int_size = internal::GetByteWidth(*index_type); + auto start_int_size = index_type->byte_width(); out->reset(new AdaptiveBuilderType(start_int_size, value_type, pool)); } return Status::OK(); diff --git a/cpp/src/arrow/compare.cc b/cpp/src/arrow/compare.cc index ae1783a515e..8af319ed9ea 100644 --- a/cpp/src/arrow/compare.cc +++ b/cpp/src/arrow/compare.cc @@ -1022,10 +1022,10 @@ bool IntegerTensorEquals(const Tensor& left, const Tensor& right) { if (!(left_row_major_p && right_row_major_p) && !(left_column_major_p && right_column_major_p)) { const auto& type = checked_cast(*left.type()); - are_equal = StridedIntegerTensorContentEquals(0, 0, 0, internal::GetByteWidth(type), - left, right); + are_equal = + StridedIntegerTensorContentEquals(0, 0, 0, type.byte_width(), left, right); } else { - const int byte_width = internal::GetByteWidth(*left.type()); + const int byte_width = left.type()->byte_width(); DCHECK_GT(byte_width, 0); const uint8_t* left_data = left.data()->data(); @@ -1195,7 +1195,7 @@ struct SparseTensorEqualsImpl { return false; } - const int byte_width = internal::GetByteWidth(*left.type()); + const int byte_width = left.type()->byte_width(); DCHECK_GT(byte_width, 0); const uint8_t* left_data = left.data()->data(); diff --git a/cpp/src/arrow/compute/api_vector.cc b/cpp/src/arrow/compute/api_vector.cc index ad4248fc6c1..4ebdecf5e78 100644 --- a/cpp/src/arrow/compute/api_vector.cc +++ b/cpp/src/arrow/compute/api_vector.cc @@ -17,6 +17,7 @@ #include "arrow/compute/api_vector.h" +#include #include #include #include @@ -26,6 +27,7 @@ #include "arrow/array/builder_primitive.h" #include "arrow/compute/exec.h" #include "arrow/compute/function_internal.h" +#include "arrow/compute/kernels/vector_sort_internal.h" #include "arrow/compute/registry.h" #include "arrow/datum.h" #include "arrow/record_batch.h" @@ -305,10 +307,7 @@ Result> SortIndices(const ChunkedArray& chunked_array, Result> SortIndices(const ChunkedArray& chunked_array, SortOrder order, ExecContext* ctx) { - SortOptions options({SortKey("not-used", order)}); - ARROW_ASSIGN_OR_RAISE( - Datum result, CallFunction("sort_indices", {Datum(chunked_array)}, &options, ctx)); - return result.make_array(); + return SortIndices(chunked_array, ArraySortOptions(order), ctx); } Result> SortIndices(const Datum& datum, const SortOptions& options, diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index 1f4581566fb..88331b6e592 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -275,14 +275,14 @@ namespace internal { /// \brief Return the number of selected indices in the boolean filter ARROW_EXPORT -int64_t GetFilterOutputSize(const ArrayData& filter, +int64_t GetFilterOutputSize(const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection); /// \brief Compute uint64 selection indices for use with Take given a boolean /// filter ARROW_EXPORT Result> GetTakeIndices( - const ArrayData& filter, FilterOptions::NullSelectionBehavior null_selection, + const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection, MemoryPool* memory_pool = default_memory_pool()); } // namespace internal diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index da226a062d0..a612a83e7a8 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -332,10 +332,14 @@ Status ExecSpanIterator::Init(const ExecBatch& batch, ValueDescr::Shape output_s int64_t max_chunksize) { if (batch.num_values() > 0) { // Validate arguments - ARROW_ASSIGN_OR_RAISE(int64_t inferred_length, InferBatchLength(batch.values)); + bool all_args_same_length = false; + int64_t inferred_length = InferBatchLength(batch.values, &all_args_same_length); if (inferred_length != batch.length) { return Status::Invalid("Value lengths differed from ExecBatch length"); } + if (!all_args_same_length) { + return Status::Invalid("Array arguments must all be the same length"); + } } args_ = &batch.values; initialized_ = have_chunked_arrays_ = false; @@ -991,43 +995,62 @@ class ScalarExecutor : public KernelExecutorImpl { ExecSpanIterator span_iterator_; }; -Status PackBatchNoChunks(const std::vector& args, ExecBatch* out) { - int64_t length = 0; - for (const auto& arg : args) { - switch (arg.kind()) { - case Datum::SCALAR: - case Datum::ARRAY: - case Datum::CHUNKED_ARRAY: - length = std::max(arg.length(), length); - break; - default: - DCHECK(false); - break; - } - } - out->length = length; - out->values = args; - return Status::OK(); -} - class VectorExecutor : public KernelExecutorImpl { public: - Status ExecuteImpl(const std::vector& args, ExecListener* listener) { - RETURN_NOT_OK(PrepareExecute(args)); - ExecBatch batch; + Status Execute(const ExecBatch& batch, ExecListener* listener) override { + // TODO(wesm): remove in ARROW-16577 + if (output_descr_.shape == ValueDescr::SCALAR) { + return Status::Invalid("VectorExecutor only supports array output types"); + } + + // Some vector kernels have a separate code path for handling + // chunked arrays (VectorKernel::exec_chunked) so we check if we + // have any chunked arrays. If we do and an exec_chunked function + // is defined then we call that. + bool have_chunked_arrays = false; + for (const Datum& arg : batch.values) { + if (arg.is_chunked_array()) have_chunked_arrays = true; + } + + output_num_buffers_ = static_cast(output_descr_.type->layout().buffers.size()); + + // Decide if we need to preallocate memory for this kernel + validity_preallocated_ = + (kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE && + kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL); + if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { + ComputeDataPreallocate(*output_descr_.type, &data_preallocated_); + } + if (kernel_->can_execute_chunkwise) { - while (batch_iterator_->Next(&batch)) { - RETURN_NOT_OK(ExecuteBatch(batch, listener)); + RETURN_NOT_OK(span_iterator_.Init(batch, output_descr_.shape, + exec_context()->exec_chunksize())); + ExecSpan span; + while (span_iterator_.Next(&span)) { + RETURN_NOT_OK(Exec(span, listener)); } } else { - RETURN_NOT_OK(PackBatchNoChunks(args, &batch)); - RETURN_NOT_OK(ExecuteBatch(batch, listener)); + // Kernel cannot execute chunkwise. If we have any chunked + // arrays, then VectorKernel::exec_chunked must be defined + // otherwise we raise an error + if (have_chunked_arrays) { + RETURN_NOT_OK(ExecChunked(batch, listener)); + } else { + // No chunked arrays. We pack the args into an ExecSpan and + // call the regular exec code path + RETURN_NOT_OK(Exec(ExecSpan(batch), listener)); + } } - return Finalize(listener); - } - Status Execute(const ExecBatch& batch, ExecListener* listener) override { - return ExecuteImpl(batch.values, listener); + if (kernel_->finalize) { + // Intermediate results require post-processing after the execution is + // completed (possibly involving some accumulated state) + RETURN_NOT_OK(kernel_->finalize(kernel_ctx_, &results_)); + for (const auto& result : results_) { + RETURN_NOT_OK(listener->OnResult(result)); + } + } + return Status::OK(); } Datum WrapResults(const std::vector& inputs, @@ -1047,59 +1070,54 @@ class VectorExecutor : public KernelExecutorImpl { } protected: - Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) { - Datum out; - if (output_descr_.shape == ValueDescr::ARRAY) { - // We preallocate (maybe) only for the output of processing the current - // batch - ARROW_ASSIGN_OR_RAISE(out.value, PrepareOutput(batch.length)); - } + Status Exec(const ExecSpan& span, ExecListener* listener) { + ExecResult out; - if (kernel_->null_handling == NullHandling::INTERSECTION && - output_descr_.shape == ValueDescr::ARRAY) { - RETURN_NOT_OK(PropagateNulls(kernel_ctx_, ExecSpan(batch), out.mutable_array())); + // We preallocate (maybe) only for the output of processing the current + // batch, but create an output ArrayData instance regardless + ARROW_ASSIGN_OR_RAISE(out.value, PrepareOutput(span.length)); + + if (kernel_->null_handling == NullHandling::INTERSECTION) { + RETURN_NOT_OK(PropagateNulls(kernel_ctx_, span, out.array_data().get())); } - RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out)); + RETURN_NOT_OK(kernel_->exec(kernel_ctx_, span, &out)); if (!kernel_->finalize) { // If there is no result finalizer (e.g. for hash-based functions, we can // emit the processed batch right away rather than waiting - RETURN_NOT_OK(listener->OnResult(std::move(out))); + RETURN_NOT_OK(listener->OnResult(out.array_data())); } else { - results_.emplace_back(std::move(out)); + results_.emplace_back(out.array_data()); } return Status::OK(); } - Status Finalize(ExecListener* listener) { - if (kernel_->finalize) { - // Intermediate results require post-processing after the execution is - // completed (possibly involving some accumulated state) - RETURN_NOT_OK(kernel_->finalize(kernel_ctx_, &results_)); - for (const auto& result : results_) { - RETURN_NOT_OK(listener->OnResult(result)); - } + Status ExecChunked(const ExecBatch& batch, ExecListener* listener) { + if (kernel_->exec_chunked == nullptr) { + return Status::Invalid( + "Vector kernel cannot execute chunkwise and no " + "chunked exec function was defined"); } - return Status::OK(); - } - Status PrepareExecute(const std::vector& args) { - if (kernel_->can_execute_chunkwise) { - ARROW_ASSIGN_OR_RAISE(batch_iterator_, ExecBatchIterator::Make( - args, exec_context()->exec_chunksize())); + if (kernel_->null_handling == NullHandling::INTERSECTION) { + return Status::Invalid( + "Null pre-propagation is unsupported for ChunkedArray " + "execution in vector kernels"); } - output_num_buffers_ = static_cast(output_descr_.type->layout().buffers.size()); - // Decide if we need to preallocate memory for this kernel - validity_preallocated_ = - (kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE && - kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL); - if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { - ComputeDataPreallocate(*output_descr_.type, &data_preallocated_); + Datum out; + ARROW_ASSIGN_OR_RAISE(out.value, PrepareOutput(batch.length)); + RETURN_NOT_OK(kernel_->exec_chunked(kernel_ctx_, batch, &out)); + if (!kernel_->finalize) { + // If there is no result finalizer (e.g. for hash-based functions, we can + // emit the processed batch right away rather than waiting + RETURN_NOT_OK(listener->OnResult(std::move(out))); + } else { + results_.emplace_back(std::move(out)); } return Status::OK(); } - std::unique_ptr batch_iterator_; + ExecSpanIterator span_iterator_; std::vector results_; }; @@ -1270,7 +1288,7 @@ std::unique_ptr KernelExecutor::MakeScalarAggregate() { return ::arrow::internal::make_unique(); } -Result InferBatchLength(const std::vector& values) { +int64_t InferBatchLength(const std::vector& values, bool* all_same) { int64_t length = -1; bool are_all_scalar = true; for (const Datum& arg : values) { @@ -1280,7 +1298,8 @@ Result InferBatchLength(const std::vector& values) { length = arg_length; } else { if (length != arg_length) { - return Status::Invalid("Array arguments must all be the same length"); + *all_same = false; + return length; } } are_all_scalar = false; @@ -1290,7 +1309,8 @@ Result InferBatchLength(const std::vector& values) { length = arg_length; } else { if (length != arg_length) { - return Status::Invalid("Array arguments must all be the same length"); + *all_same = false; + return length; } } are_all_scalar = false; @@ -1302,6 +1322,7 @@ Result InferBatchLength(const std::vector& values) { } else if (length < 0) { length = 0; } + *all_same = true; return length; } diff --git a/cpp/src/arrow/compute/exec.h b/cpp/src/arrow/compute/exec.h index ba41bfb5b6e..8fd938ce299 100644 --- a/cpp/src/arrow/compute/exec.h +++ b/cpp/src/arrow/compute/exec.h @@ -20,7 +20,6 @@ #pragma once -#include #include #include #include @@ -397,8 +396,12 @@ struct ARROW_EXPORT ExecSpan { } bool is_all_scalar() const { - return std::all_of(this->values.begin(), this->values.end(), - [](const ExecValue& v) { return v.is_scalar(); }); + for (const ExecValue& value : this->values) { + if (value.is_array()) { + return false; + } + } + return true; } /// \brief Return the value at the i-th index diff --git a/cpp/src/arrow/compute/exec/expression.cc b/cpp/src/arrow/compute/exec/expression.cc index 2e0fe6ff34b..b796f5cda3b 100644 --- a/cpp/src/arrow/compute/exec/expression.cc +++ b/cpp/src/arrow/compute/exec/expression.cc @@ -419,7 +419,7 @@ Result BindNonRecursive(Expression::Call call, bool insert_implicit_ } } - compute::KernelContext kernel_context(exec_context); + compute::KernelContext kernel_context(exec_context, call.kernel); if (call.kernel->init) { const FunctionOptions* options = call.options ? call.options.get() : call.function->default_options(); @@ -593,7 +593,7 @@ Result ExecuteScalarExpression(const Expression& expr, const ExecBatch& i auto executor = compute::detail::KernelExecutor::MakeScalar(); - compute::KernelContext kernel_context(exec_context); + compute::KernelContext kernel_context(exec_context, call->kernel); kernel_context.SetState(call->kernel_state.get()); auto kernel = call->kernel; diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 1ebe11e7046..a145863e597 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -127,7 +127,7 @@ class HashJoinBasicImpl : public HashJoinImpl { *opt_projected_batch = projected; } - return encoder->EncodeAndAppend(projected); + return encoder->EncodeAndAppend(ExecSpan(projected)); } void ProbeBatch_Lookup(ThreadLocalState* local_state, const RowEncoder& exec_batch_keys, diff --git a/cpp/src/arrow/compute/exec/hash_join_dict.cc b/cpp/src/arrow/compute/exec/hash_join_dict.cc index 63d7d1143c9..731a5662d7d 100644 --- a/cpp/src/arrow/compute/exec/hash_join_dict.cc +++ b/cpp/src/arrow/compute/exec/hash_join_dict.cc @@ -234,8 +234,7 @@ Status HashJoinDictBuild::Init(ExecContext* ctx, std::shared_ptr dictiona return Status::Invalid( "Dictionary length in hash join must fit into signed 32-bit integer."); } - ExecBatch batch({dictionary->data()}, length); - RETURN_NOT_OK(encoder.EncodeAndAppend(batch)); + RETURN_NOT_OK(encoder.EncodeAndAppend(ExecSpan({*dictionary->data()}, length))); std::vector entries_to_take; @@ -296,7 +295,7 @@ Result> HashJoinDictBuild::RemapInputValues( bool is_scalar = values.is_scalar(); int64_t encoded_length = is_scalar ? 1 : batch_length; ExecBatch batch({values}, encoded_length); - RETURN_NOT_OK(encoder.EncodeAndAppend(batch)); + RETURN_NOT_OK(encoder.EncodeAndAppend(ExecSpan(batch))); // Allocate output buffers // @@ -426,8 +425,8 @@ Result> HashJoinDictProbe::RemapInput( std::vector encoder_types; encoder_types.emplace_back(dict_type.value_type(), ValueDescr::ARRAY); encoder_.Init(encoder_types, ctx); - ExecBatch batch({dict->data()}, dict->length()); - RETURN_NOT_OK(encoder_.EncodeAndAppend(batch)); + RETURN_NOT_OK( + encoder_.EncodeAndAppend(ExecSpan({*dict->data()}, dict->length()))); } } @@ -547,7 +546,7 @@ Status HashJoinDictBuildMulti::EncodeBatch( proj_map.data_type(HashJoinProjection::KEY, icol))); } } - return encoder->EncodeAndAppend(projected); + return encoder->EncodeAndAppend(ExecSpan(projected)); } Status HashJoinDictBuildMulti::PostDecode( @@ -656,7 +655,7 @@ Status HashJoinDictProbeMulti::EncodeBatch( } local_state.post_remap_encoder.Clear(); - RETURN_NOT_OK(local_state.post_remap_encoder.EncodeAndAppend(projected)); + RETURN_NOT_OK(local_state.post_remap_encoder.EncodeAndAppend(ExecSpan(projected))); *out_encoder = &local_state.post_remap_encoder; return Status::OK(); diff --git a/cpp/src/arrow/compute/exec/hash_join_node_test.cc b/cpp/src/arrow/compute/exec/hash_join_node_test.cc index e752870486a..46600a96da3 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node_test.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node_test.cc @@ -496,7 +496,7 @@ std::vector> GenRandomUniqueRecords( for (size_t i = 0; i < result.size(); ++i) { batch.values[i] = result[i]; } - Status status = encoder.EncodeAndAppend(batch); + Status status = encoder.EncodeAndAppend(ExecSpan(batch)); ARROW_DCHECK(status.ok()); std::unordered_map uniques; diff --git a/cpp/src/arrow/compute/exec/tpch_node.cc b/cpp/src/arrow/compute/exec/tpch_node.cc index c447de6cff7..d8f2c60312d 100644 --- a/cpp/src/arrow/compute/exec/tpch_node.cc +++ b/cpp/src/arrow/compute/exec/tpch_node.cc @@ -844,7 +844,7 @@ class PartAndPartSupplierGenerator { Status AllocatePartBatch(size_t thread_index, int column) { ThreadLocalData& tld = thread_local_data_[thread_index]; ARROW_DCHECK(tld.part[column].kind() == Datum::NONE); - int32_t byte_width = arrow::internal::GetByteWidth(*kPartTypes[column]); + int32_t byte_width = kPartTypes[column]->byte_width(); ARROW_ASSIGN_OR_RAISE(std::unique_ptr buff, AllocateBuffer(tld.part_to_generate * byte_width)); ArrayData ad(kPartTypes[column], tld.part_to_generate, {nullptr, std::move(buff)}); @@ -917,7 +917,7 @@ class PartAndPartSupplierGenerator { RETURN_NOT_OK(AllocatePartBatch(thread_index, PART::P_MFGR)); char* p_mfgr = reinterpret_cast( tld.part[PART::P_MFGR].array()->buffers[1]->mutable_data()); - int32_t byte_width = arrow::internal::GetByteWidth(*kPartTypes[PART::P_MFGR]); + int32_t byte_width = kPartTypes[PART::P_MFGR]->byte_width(); for (int64_t irow = 0; irow < tld.part_to_generate; irow++) { std::strncpy(p_mfgr + byte_width * irow, manufacturer, byte_width); char mfgr_id = '0' + dist(tld.rng); @@ -939,8 +939,8 @@ class PartAndPartSupplierGenerator { tld.part[PART::P_MFGR].array()->buffers[1]->data()); char* p_brand = reinterpret_cast( tld.part[PART::P_BRAND].array()->buffers[1]->mutable_data()); - int32_t byte_width = arrow::internal::GetByteWidth(*kPartTypes[PART::P_BRAND]); - int32_t mfgr_byte_width = arrow::internal::GetByteWidth(*kPartTypes[PART::P_MFGR]); + int32_t byte_width = kPartTypes[PART::P_BRAND]->byte_width(); + int32_t mfgr_byte_width = kPartTypes[PART::P_MFGR]->byte_width(); const size_t mfgr_id_offset = std::strlen("Manufacturer#"); for (int64_t irow = 0; irow < tld.part_to_generate; irow++) { char* row = p_brand + byte_width * irow; @@ -1023,7 +1023,7 @@ class PartAndPartSupplierGenerator { RETURN_NOT_OK(AllocatePartBatch(thread_index, PART::P_CONTAINER)); char* p_container = reinterpret_cast( tld.part[PART::P_CONTAINER].array()->buffers[1]->mutable_data()); - int32_t byte_width = arrow::internal::GetByteWidth(*kPartTypes[PART::P_CONTAINER]); + int32_t byte_width = kPartTypes[PART::P_CONTAINER]->byte_width(); for (int64_t irow = 0; irow < tld.part_to_generate; irow++) { int container1_idx = dist1(tld.rng); int container2_idx = dist2(tld.rng); @@ -1090,7 +1090,7 @@ class PartAndPartSupplierGenerator { Status AllocatePartSuppBatch(size_t thread_index, size_t ibatch, int column) { ThreadLocalData& tld = thread_local_data_[thread_index]; - int32_t byte_width = arrow::internal::GetByteWidth(*kPartsuppTypes[column]); + int32_t byte_width = kPartsuppTypes[column]->byte_width(); ARROW_ASSIGN_OR_RAISE(std::unique_ptr buff, AllocateResizableBuffer(batch_size_ * byte_width)); ArrayData ad(kPartsuppTypes[column], batch_size_, {nullptr, std::move(buff)}); @@ -1101,7 +1101,7 @@ class PartAndPartSupplierGenerator { Status SetPartSuppColumnSize(size_t thread_index, size_t ibatch, int column, size_t new_size) { ThreadLocalData& tld = thread_local_data_[thread_index]; - int32_t byte_width = arrow::internal::GetByteWidth(*kPartsuppTypes[column]); + int32_t byte_width = kPartsuppTypes[column]->byte_width(); tld.partsupp[ibatch][column].array()->length = static_cast(new_size); ResizableBuffer* buff = checked_cast( tld.partsupp[ibatch][column].array()->buffers[1].get()); @@ -1554,7 +1554,7 @@ class OrdersAndLineItemGenerator { Status AllocateOrdersBatch(size_t thread_index, int column) { ThreadLocalData& tld = thread_local_data_[thread_index]; ARROW_DCHECK(tld.orders[column].kind() == Datum::NONE); - int32_t byte_width = arrow::internal::GetByteWidth(*kOrdersTypes[column]); + int32_t byte_width = kOrdersTypes[column]->byte_width(); ARROW_ASSIGN_OR_RAISE(std::unique_ptr buff, AllocateBuffer(tld.orders_to_generate * byte_width)); ArrayData ad(kOrdersTypes[column], tld.orders_to_generate, @@ -1711,8 +1711,7 @@ class OrdersAndLineItemGenerator { ThreadLocalData& tld = thread_local_data_[thread_index]; if (tld.orders[ORDERS::O_ORDERPRIORITY].kind() == Datum::NONE) { RETURN_NOT_OK(AllocateOrdersBatch(thread_index, ORDERS::O_ORDERPRIORITY)); - int32_t byte_width = - arrow::internal::GetByteWidth(*kOrdersTypes[ORDERS::O_ORDERPRIORITY]); + int32_t byte_width = kOrdersTypes[ORDERS::O_ORDERPRIORITY]->byte_width(); std::uniform_int_distribution dist(0, kNumPriorities - 1); char* o_orderpriority = reinterpret_cast( tld.orders[ORDERS::O_ORDERPRIORITY].array()->buffers[1]->mutable_data()); @@ -1728,7 +1727,7 @@ class OrdersAndLineItemGenerator { ThreadLocalData& tld = thread_local_data_[thread_index]; if (tld.orders[ORDERS::O_CLERK].kind() == Datum::NONE) { RETURN_NOT_OK(AllocateOrdersBatch(thread_index, ORDERS::O_CLERK)); - int32_t byte_width = arrow::internal::GetByteWidth(*kOrdersTypes[ORDERS::O_CLERK]); + int32_t byte_width = kOrdersTypes[ORDERS::O_CLERK]->byte_width(); int64_t max_clerk_id = static_cast(scale_factor_ * 1000); std::uniform_int_distribution dist(1, max_clerk_id); char* o_clerk = reinterpret_cast( @@ -1792,7 +1791,7 @@ class OrdersAndLineItemGenerator { ThreadLocalData& tld = thread_local_data_[thread_index]; if (tld.lineitem[ibatch][column].kind() == Datum::NONE) { ARROW_DCHECK(ibatch != 0 || tld.first_batch_offset == 0); - int32_t byte_width = arrow::internal::GetByteWidth(*kLineitemTypes[column]); + int32_t byte_width = kLineitemTypes[column]->byte_width(); ARROW_ASSIGN_OR_RAISE(std::unique_ptr buff, AllocateResizableBuffer(batch_size_ * byte_width)); ArrayData ad(kLineitemTypes[column], batch_size_, {nullptr, std::move(buff)}); @@ -1807,7 +1806,7 @@ class OrdersAndLineItemGenerator { Status SetLineItemColumnSize(size_t thread_index, size_t ibatch, int column, size_t new_size) { ThreadLocalData& tld = thread_local_data_[thread_index]; - int32_t byte_width = arrow::internal::GetByteWidth(*kLineitemTypes[column]); + int32_t byte_width = kLineitemTypes[column]->byte_width(); tld.lineitem[ibatch][column].array()->length = static_cast(new_size); ResizableBuffer* buff = checked_cast( tld.lineitem[ibatch][column].array()->buffers[1].get()); @@ -2283,8 +2282,7 @@ class OrdersAndLineItemGenerator { ThreadLocalData& tld = thread_local_data_[thread_index]; if (!tld.generated_lineitem[LINEITEM::L_SHIPINSTRUCT]) { tld.generated_lineitem[LINEITEM::L_SHIPINSTRUCT] = true; - int32_t byte_width = - arrow::internal::GetByteWidth(*kLineitemTypes[LINEITEM::L_SHIPINSTRUCT]); + int32_t byte_width = kLineitemTypes[LINEITEM::L_SHIPINSTRUCT]->byte_width(); size_t ibatch = 0; std::uniform_int_distribution dist(0, kNumInstructions - 1); for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) { @@ -2318,8 +2316,7 @@ class OrdersAndLineItemGenerator { ThreadLocalData& tld = thread_local_data_[thread_index]; if (!tld.generated_lineitem[LINEITEM::L_SHIPMODE]) { tld.generated_lineitem[LINEITEM::L_SHIPMODE] = true; - int32_t byte_width = - arrow::internal::GetByteWidth(*kLineitemTypes[LINEITEM::L_SHIPMODE]); + int32_t byte_width = kLineitemTypes[LINEITEM::L_SHIPMODE]->byte_width(); size_t ibatch = 0; std::uniform_int_distribution dist(0, kNumModes - 1); for (int64_t irow = 0; irow < tld.lineitem_to_generate; ibatch++) { @@ -2530,7 +2527,7 @@ class SupplierGenerator : public TpchTableGenerator { Status AllocateColumn(size_t thread_index, int column) { ThreadLocalData& tld = thread_local_data_[thread_index]; ARROW_DCHECK(tld.batch[column].kind() == Datum::NONE); - int32_t byte_width = arrow::internal::GetByteWidth(*kTypes[column]); + int32_t byte_width = kTypes[column]->byte_width(); ARROW_ASSIGN_OR_RAISE(std::unique_ptr buff, AllocateBuffer(tld.to_generate * byte_width)); ArrayData ad(kTypes[column], tld.to_generate, {nullptr, std::move(buff)}); @@ -2558,7 +2555,7 @@ class SupplierGenerator : public TpchTableGenerator { const int32_t* s_suppkey = reinterpret_cast( tld.batch[SUPPLIER::S_SUPPKEY].array()->buffers[1]->data()); RETURN_NOT_OK(AllocateColumn(thread_index, SUPPLIER::S_NAME)); - int32_t byte_width = arrow::internal::GetByteWidth(*kTypes[SUPPLIER::S_NAME]); + int32_t byte_width = kTypes[SUPPLIER::S_NAME]->byte_width(); char* s_name = reinterpret_cast( tld.batch[SUPPLIER::S_NAME].array()->buffers[1]->mutable_data()); // Look man, I'm just following the spec ok? Section 4.2.3 as of March 1 2022 @@ -2600,7 +2597,7 @@ class SupplierGenerator : public TpchTableGenerator { if (tld.batch[SUPPLIER::S_PHONE].kind() == Datum::NONE) { RETURN_NOT_OK(S_NATIONKEY(thread_index)); RETURN_NOT_OK(AllocateColumn(thread_index, SUPPLIER::S_PHONE)); - int32_t byte_width = arrow::internal::GetByteWidth(*kTypes[SUPPLIER::S_PHONE]); + int32_t byte_width = kTypes[SUPPLIER::S_PHONE]->byte_width(); const int32_t* s_nationkey = reinterpret_cast( tld.batch[SUPPLIER::S_NATIONKEY].array()->buffers[1]->data()); char* s_phone = reinterpret_cast( @@ -2913,7 +2910,7 @@ class CustomerGenerator : public TpchTableGenerator { Status AllocateColumn(size_t thread_index, int column) { ThreadLocalData& tld = thread_local_data_[thread_index]; ARROW_DCHECK(tld.batch[column].kind() == Datum::NONE); - int32_t byte_width = arrow::internal::GetByteWidth(*kTypes[column]); + int32_t byte_width = kTypes[column]->byte_width(); ARROW_ASSIGN_OR_RAISE(std::unique_ptr buff, AllocateBuffer(tld.to_generate * byte_width)); ArrayData ad(kTypes[column], tld.to_generate, {nullptr, std::move(buff)}); @@ -2994,7 +2991,7 @@ class CustomerGenerator : public TpchTableGenerator { if (tld.batch[CUSTOMER::C_PHONE].kind() == Datum::NONE) { RETURN_NOT_OK(C_NATIONKEY(thread_index)); RETURN_NOT_OK(AllocateColumn(thread_index, CUSTOMER::C_PHONE)); - int32_t byte_width = arrow::internal::GetByteWidth(*kTypes[CUSTOMER::C_PHONE]); + int32_t byte_width = kTypes[CUSTOMER::C_PHONE]->byte_width(); const int32_t* c_nationkey = reinterpret_cast( tld.batch[CUSTOMER::C_NATIONKEY].array()->buffers[1]->data()); char* c_phone = reinterpret_cast( @@ -3023,7 +3020,7 @@ class CustomerGenerator : public TpchTableGenerator { ThreadLocalData& tld = thread_local_data_[thread_index]; if (tld.batch[CUSTOMER::C_MKTSEGMENT].kind() == Datum::NONE) { RETURN_NOT_OK(AllocateColumn(thread_index, CUSTOMER::C_MKTSEGMENT)); - int32_t byte_width = arrow::internal::GetByteWidth(*kTypes[CUSTOMER::C_MKTSEGMENT]); + int32_t byte_width = kTypes[CUSTOMER::C_MKTSEGMENT]->byte_width(); char* c_mktsegment = reinterpret_cast( tld.batch[CUSTOMER::C_MKTSEGMENT].array()->buffers[1]->mutable_data()); std::uniform_int_distribution dist(0, kNumSegments - 1); diff --git a/cpp/src/arrow/compute/exec_internal.h b/cpp/src/arrow/compute/exec_internal.h index fac78da6db1..c475a61c1ba 100644 --- a/cpp/src/arrow/compute/exec_internal.h +++ b/cpp/src/arrow/compute/exec_internal.h @@ -179,7 +179,7 @@ class ARROW_EXPORT KernelExecutor { static std::unique_ptr MakeScalarAggregate(); }; -Result InferBatchLength(const std::vector& values); +int64_t InferBatchLength(const std::vector& values, bool* all_same); /// \brief Populate validity bitmap with the intersection of the nullity of the /// arguments. If a preallocated bitmap is not provided, then one will be diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index c6b0992a458..b5ebc67d180 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -252,11 +252,11 @@ Result Function::ExecuteInternal(const std::vector& args, return Status::NotImplemented("Direct execution of HASH_AGGREGATE functions"); } - ARROW_ASSIGN_OR_RAISE(auto kernel, DispatchBest(&inputs)); + ARROW_ASSIGN_OR_RAISE(const Kernel* kernel, DispatchBest(&inputs)); ARROW_ASSIGN_OR_RAISE(std::vector args_with_casts, Cast(args, inputs, ctx)); std::unique_ptr state; - KernelContext kernel_ctx{ctx}; + KernelContext kernel_ctx{ctx, kernel}; if (kernel->init) { ARROW_ASSIGN_OR_RAISE(state, kernel->init(&kernel_ctx, {kernel, inputs, options})); kernel_ctx.SetState(state.get()); @@ -266,21 +266,23 @@ Result Function::ExecuteInternal(const std::vector& args, detail::DatumAccumulator listener; - // Set length to 0 unless it's a scalar function (vector functions don't use - // it). - ExecBatch input(std::move(args_with_casts), 0); - if (kind() == Function::SCALAR) { - ARROW_ASSIGN_OR_RAISE(int64_t inferred_length, - detail::InferBatchLength(input.values)); - if (passed_length == -1) { - input.length = inferred_length; - } else { - // ARROW-16819: will clean up more later - if (input.num_values() > 0 && passed_length != inferred_length) { - return Status::Invalid("Passed batch length did not equal actual array lengths"); - } + ExecBatch input(std::move(args_with_casts), /*length=*/0); + if (input.num_values() == 0) { + if (passed_length != -1) { input.length = passed_length; } + } else { + bool all_same_length = false; + int64_t inferred_length = detail::InferBatchLength(input.values, &all_same_length); + input.length = inferred_length; + if (kind() == Function::SCALAR) { + DCHECK(passed_length == -1 || passed_length == inferred_length); + } else if (kind() == Function::VECTOR) { + auto vkernel = static_cast(kernel); + if (!(all_same_length || !vkernel->can_execute_chunkwise)) { + return Status::Invalid("Vector kernel arguments must all be the same length"); + } + } } RETURN_NOT_OK(executor->Execute(input, &listener)); const auto out = executor->WrapResults(input.values, listener.values()); @@ -366,7 +368,7 @@ Status ScalarFunction::AddKernel(ScalarKernel kernel) { } Status VectorFunction::AddKernel(std::vector in_types, OutputType out_type, - ArrayKernelExecOld exec, KernelInit init) { + ArrayKernelExec exec, KernelInit init) { RETURN_NOT_OK(CheckArity(in_types)); if (arity_.is_varargs && in_types.size() != 1) { diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index 6e3c8374335..c32c8766a91 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -344,7 +344,7 @@ class ARROW_EXPORT VectorFunction : public detail::FunctionImpl { /// state initialization, no data preallocation, and no preallocation of the /// validity bitmap. Status AddKernel(std::vector in_types, OutputType out_type, - ArrayKernelExecOld exec, KernelInit init = NULLPTR); + ArrayKernelExec exec, KernelInit init = NULLPTR); /// \brief Add a kernel (function implementation). Returns error if the /// kernel's signature does not match the function's arity. diff --git a/cpp/src/arrow/compute/function_test.cc b/cpp/src/arrow/compute/function_test.cc index ec5f3bc170c..f06f225f5b9 100644 --- a/cpp/src/arrow/compute/function_test.cc +++ b/cpp/src/arrow/compute/function_test.cc @@ -214,10 +214,6 @@ auto ExecNYI = [](KernelContext* ctx, const ExecSpan& args, ExecResult* out) { return Status::NotImplemented("NYI"); }; -auto ExecNYIOld = [](KernelContext* ctx, const ExecBatch& args, Datum* out) { - return Status::NotImplemented("NYI"); -}; - template void CheckAddDispatch(FunctionType* func, ExecType exec) { using KernelType = typename FunctionType::KernelType; @@ -272,7 +268,7 @@ TEST(ScalarVectorFunction, DispatchExact) { CheckAddDispatch(&func1, ExecNYI); // ARROW-16576: will migrate later to new span-based kernel exec API - CheckAddDispatch(&func2, ExecNYIOld); + CheckAddDispatch(&func2, ExecNYI); } TEST(ArrayFunction, VarArgs) { diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index e115c5194bc..93a1c605a99 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -52,7 +52,10 @@ struct ARROW_EXPORT KernelState { /// \brief Context/state for the execution of a particular kernel. class ARROW_EXPORT KernelContext { public: - explicit KernelContext(ExecContext* exec_ctx) : exec_ctx_(exec_ctx) {} + // Can pass optional backreference; not used consistently for the + // moment but will be made so in the future + explicit KernelContext(ExecContext* exec_ctx, const Kernel* kernel = NULLPTR) + : exec_ctx_(exec_ctx), kernel_(kernel) {} /// \brief Allocate buffer from the context's memory pool. The contents are /// not initialized. @@ -68,6 +71,10 @@ class ARROW_EXPORT KernelContext { /// be minded separately. void SetState(KernelState* state) { state_ = state; } + // Set kernel that is being invoked since some kernel + // implementations will examine the kernel state. + void SetKernel(const Kernel* kernel) { kernel_ = kernel; } + KernelState* state() { return state_; } /// \brief Configuration related to function execution that is to be shared @@ -78,9 +85,12 @@ class ARROW_EXPORT KernelContext { /// MemoryPool contained in the ExecContext used to create the KernelContext. MemoryPool* memory_pool() { return exec_ctx_->memory_pool(); } + const Kernel* kernel() const { return kernel_; } + private: ExecContext* exec_ctx_; KernelState* state_ = NULLPTR; + const Kernel* kernel_ = NULLPTR; }; /// \brief An type-checking interface to permit customizable validation rules @@ -548,10 +558,6 @@ struct Kernel { using ArrayKernelExec = std::function; -/// \brief Kernel execution API being phased out per ARROW-16756 -using ArrayKernelExecOld = - std::function; - /// \brief Kernel data structure for implementations of ScalarFunction. In /// addition to the members found in Kernel, contains the null handling /// and memory pre-allocation preferences. @@ -584,6 +590,9 @@ struct ScalarKernel : public Kernel { // bitmaps is a reasonable default NullHandling::type null_handling = NullHandling::INTERSECTION; MemAllocation::type mem_allocation = MemAllocation::PREALLOCATE; + + // Additional kernel-specific data + std::shared_ptr data; }; // ---------------------------------------------------------------------- @@ -597,16 +606,19 @@ struct VectorKernel : public Kernel { /// \brief See VectorKernel::finalize member for usage using FinalizeFunc = std::function*)>; + /// \brief Function for executing a stateful VectorKernel against a + /// ChunkedArray input. Does not need to be defined for all VectorKernels + typedef Status (*ChunkedExec)(KernelContext*, const ExecBatch&, Datum* out); + VectorKernel() = default; - VectorKernel(std::vector in_types, OutputType out_type, - ArrayKernelExecOld exec, KernelInit init = NULLPTR, - FinalizeFunc finalize = NULLPTR) + VectorKernel(std::vector in_types, OutputType out_type, ArrayKernelExec exec, + KernelInit init = NULLPTR, FinalizeFunc finalize = NULLPTR) : Kernel(std::move(in_types), std::move(out_type), std::move(init)), exec(std::move(exec)), finalize(std::move(finalize)) {} - VectorKernel(std::shared_ptr sig, ArrayKernelExecOld exec, + VectorKernel(std::shared_ptr sig, ArrayKernelExec exec, KernelInit init = NULLPTR, FinalizeFunc finalize = NULLPTR) : Kernel(std::move(sig), std::move(init)), exec(std::move(exec)), @@ -614,7 +626,10 @@ struct VectorKernel : public Kernel { /// \brief Perform a single invocation of this kernel. Any required state is /// managed through the KernelContext. - ArrayKernelExecOld exec; + ArrayKernelExec exec; + + /// \brief Execute the kernel on a ChunkedArray. Does not need to be defined + ChunkedExec exec_chunked = NULLPTR; /// \brief For VectorKernel, convert intermediate results into finalized /// results. Mutates input argument. Some kernels may accumulate state @@ -637,7 +652,7 @@ struct VectorKernel : public Kernel { /// functionality. bool can_write_into_slices = true; - /// Some vector kernels can do chunkwise execution using ExecBatchIterator, + /// Some vector kernels can do chunkwise execution using ExecSpanIterator, /// in some cases accumulating some state. Other kernels (like Take) need to /// be passed whole arrays and don't work on ChunkedArray inputs bool can_execute_chunkwise = true; diff --git a/cpp/src/arrow/compute/kernels/aggregate_mode.cc b/cpp/src/arrow/compute/kernels/aggregate_mode.cc index f8c56b2a220..6676b86436a 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_mode.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_mode.cc @@ -42,9 +42,9 @@ constexpr uint64_t kCountEOF = ~0ULL; template ::CType> Result> PrepareOutput(int64_t n, KernelContext* ctx, - Datum* out) { - DCHECK_EQ(Type::STRUCT, out->type()->id()); - const auto& out_type = checked_cast(*out->type()); + const DataType& type, ExecResult* out) { + DCHECK_EQ(Type::STRUCT, type.id()); + const auto& out_type = checked_cast(type); DCHECK_EQ(2, out_type.num_fields()); const auto& mode_type = out_type.field(0)->type(); const auto& count_type = int64(); @@ -64,14 +64,15 @@ Result> PrepareOutput(int64_t n, KernelContext* ctx, count_buffer = count_data->template GetMutableValues(1); } - *out = Datum(ArrayData::Make(out->type(), n, {nullptr}, {mode_data, count_data}, 0)); + out->value = ArrayData::Make(type.Copy(), n, {nullptr}, {mode_data, count_data}, 0); return std::make_pair(mode_buffer, count_buffer); } // find top-n value:count pairs with minimal heap // suboptimal for tiny or large n, possibly okay as we're not in hot path template -Status Finalize(KernelContext* ctx, Datum* out, Generator&& gen) { +Status Finalize(KernelContext* ctx, const DataType& type, ExecResult* out, + Generator&& gen) { using CType = typename TypeTraits::CType; using ValueCountPair = std::pair; @@ -101,7 +102,7 @@ Status Finalize(KernelContext* ctx, Datum* out, Generator&& gen) { CType* mode_buffer; int64_t* count_buffer; ARROW_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), - PrepareOutput(n, ctx, out)); + PrepareOutput(n, ctx, type, out)); for (int64_t i = n - 1; i >= 0; --i) { std::tie(mode_buffer[i], count_buffer[i]) = min_heap.top(); @@ -127,18 +128,7 @@ struct CountModer { this->counts.resize(value_range, 0); } - Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - // count values in all chunks, ignore nulls - const Datum& datum = batch[0]; - - const ModeOptions& options = ModeState::Get(ctx); - if ((!options.skip_nulls && datum.null_count() > 0) || - (datum.length() - datum.null_count() < options.min_count)) { - return PrepareOutput(/*n=*/0, ctx, out).status(); - } - - CountValues(this->counts.data(), datum, this->min); - + Status GetResult(KernelContext* ctx, const DataType& type, ExecResult* out) { // generator to emit next value:count pair int index = 0; auto gen = [&]() { @@ -153,41 +143,67 @@ struct CountModer { return std::pair(0, kCountEOF); }; - return Finalize(ctx, out, std::move(gen)); + return Finalize(ctx, type, out, std::move(gen)); + } + + Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { + // count values in all chunks, ignore nulls + const ArraySpan& values = batch[0].array; + const ModeOptions& options = ModeState::Get(ctx); + if ((!options.skip_nulls && values.GetNullCount() > 0) || + (values.length - values.GetNullCount() < options.min_count)) { + return PrepareOutput(/*n=*/0, ctx, *out->type(), out).status(); + } + + CountValues(values, this->min, this->counts.data()); + return GetResult(ctx, *out->type(), out); + } + + Status ExecChunked(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + // count values in all chunks, ignore nulls + const ChunkedArray& values = *batch[0].chunked_array(); + const ModeOptions& options = ModeState::Get(ctx); + ExecResult result; + if ((!options.skip_nulls && values.null_count() > 0) || + (values.length() - values.null_count() < options.min_count)) { + RETURN_NOT_OK(PrepareOutput(/*n=*/0, ctx, *out->type(), &result)); + } else { + CountValues(values, this->min, this->counts.data()); + RETURN_NOT_OK(GetResult(ctx, *out->type(), &result)); + } + *out = result.array_data(); + return Status::OK(); } }; // booleans can be handled more straightforward template <> struct CountModer { - Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - const Datum& datum = batch[0]; + int64_t counts[2] = {0, 0}; - const ModeOptions& options = ModeState::Get(ctx); - if ((!options.skip_nulls && datum.null_count() > 0) || - (datum.length() - datum.null_count() < options.min_count)) { - return PrepareOutput(/*n=*/0, ctx, out).status(); + void UpdateCounts(const ArraySpan& values) { + if (values.length > values.GetNullCount()) { + const int64_t true_count = GetTrueCount(values); + counts[true] += true_count; + counts[false] += values.length - values.null_count - true_count; } + } - int64_t counts[2]{}; - - for (const auto& array : datum.chunks()) { - if (array->length() > array->null_count()) { - const int64_t true_count = - arrow::internal::checked_pointer_cast(array)->true_count(); - const int64_t false_count = array->length() - array->null_count() - true_count; - counts[true] += true_count; - counts[false] += false_count; - } + void UpdateCounts(const ChunkedArray& values) { + for (const auto& chunk : values.chunks()) { + UpdateCounts(*chunk->data()); } + } - const int64_t distinct_values = (counts[0] != 0) + (counts[1] != 0); + Status WrapResult(KernelContext* ctx, const ModeOptions& options, const DataType& type, + ExecResult* out) { + const int64_t distinct_values = (this->counts[0] != 0) + (this->counts[1] != 0); const int64_t n = std::min(options.n, distinct_values); bool* mode_buffer; int64_t* count_buffer; ARROW_ASSIGN_OR_RAISE(std::tie(mode_buffer, count_buffer), - PrepareOutput(n, ctx, out)); + PrepareOutput(n, ctx, type, out)); if (n >= 1) { const bool index = counts[1] > counts[0]; @@ -201,6 +217,32 @@ struct CountModer { return Status::OK(); } + + Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { + const ArraySpan& values = batch[0].array; + const ModeOptions& options = ModeState::Get(ctx); + if ((!options.skip_nulls && values.GetNullCount() > 0) || + (values.length - values.null_count < options.min_count)) { + return PrepareOutput(/*n=*/0, ctx, *out->type(), out).status(); + } + UpdateCounts(values); + return WrapResult(ctx, options, *out->type(), out); + } + + Status ExecChunked(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const ChunkedArray& values = *batch[0].chunked_array(); + const ModeOptions& options = ModeState::Get(ctx); + ExecResult result; + if ((!options.skip_nulls && values.null_count() > 0) || + (values.length() - values.null_count() < options.min_count)) { + RETURN_NOT_OK(PrepareOutput(/*n=*/0, ctx, *out->type(), &result)); + } else { + UpdateCounts(values); + RETURN_NOT_OK(WrapResult(ctx, options, *out->type(), &result)); + } + *out = result.array_data(); + return Status::OK(); + } }; // copy and sort approach for floating points, decimals, or integers with wide @@ -222,40 +264,38 @@ struct SortModer { return static_cast(0); } - Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - const Datum& datum = batch[0]; - const int64_t in_length = datum.length() - datum.null_count(); - + template + Status ComputeMode(KernelContext* ctx, const Container& arr, int64_t length, + int64_t null_count, const DataType& type, ExecResult* out) { const ModeOptions& options = ModeState::Get(ctx); - if ((!options.skip_nulls && datum.null_count() > 0) || - (in_length < options.min_count)) { - return PrepareOutput(/*n=*/0, ctx, out).status(); + const int64_t in_length = length - null_count; + if ((!options.skip_nulls && null_count > 0) || (in_length < options.min_count)) { + return PrepareOutput(/*n=*/0, ctx, type, out).status(); } // copy all chunks to a buffer, ignore nulls and nans - std::vector in_buffer(Allocator(ctx->memory_pool())); + std::vector values(Allocator(ctx->memory_pool())); uint64_t nan_count = 0; - if (in_length > 0) { - in_buffer.resize(in_length); - CopyNonNullValues(datum, in_buffer.data()); + if (length > 0) { + values.resize(length - null_count); + CopyNonNullValues(arr, values.data()); // drop nan if (is_floating_type::value) { - const auto& it = std::remove_if(in_buffer.begin(), in_buffer.end(), - [](CType v) { return v != v; }); - nan_count = in_buffer.end() - it; - in_buffer.resize(it - in_buffer.begin()); + const auto& it = + std::remove_if(values.begin(), values.end(), [](CType v) { return v != v; }); + nan_count = values.end() - it; + values.resize(it - values.begin()); } } - // sort the input data to count same values - std::sort(in_buffer.begin(), in_buffer.end()); + std::sort(values.begin(), values.end()); // generator to emit next value:count pair - auto it = in_buffer.cbegin(); + auto it = values.cbegin(); auto gen = [&]() { - if (ARROW_PREDICT_FALSE(it == in_buffer.cend())) { + if (ARROW_PREDICT_FALSE(it == values.cend())) { // handle NAN at last if (nan_count > 0) { auto value_count = std::make_pair(GetNan(), nan_count); @@ -270,37 +310,68 @@ struct SortModer { do { ++it; ++count; - } while (it != in_buffer.cend() && *it == value); + } while (it != values.cend() && *it == value); return std::make_pair(value, count); }; - return Finalize(ctx, out, std::move(gen)); + return Finalize(ctx, type, out, std::move(gen)); + } + + Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { + const ArraySpan& values = batch[0].array; + return ComputeMode(ctx, values, values.length, values.GetNullCount(), *out->type(), + out); + } + + Status ExecChunked(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const ChunkedArray& values = *batch[0].chunked_array(); + ExecResult result; + RETURN_NOT_OK(ComputeMode(ctx, values, values.length(), values.null_count(), + *out->type(), &result)); + *out = result.array_data(); + return Status::OK(); } }; +template +bool ShouldUseCountMode(const Container& values, int64_t num_valid, CType* min, + CType* max) { + // cross point to benefit from counting approach + // about 2x improvement for int32/64 from micro-benchmarking + static constexpr int kMinArraySize = 8192; + static constexpr int kMaxValueRange = 32768; + + if (num_valid >= kMinArraySize) { + std::tie(*min, *max) = GetMinMax(values); + return static_cast(*max) - static_cast(*min) <= kMaxValueRange; + } + return false; +} + // pick counting or sorting approach per integers value range template struct CountOrSortModer { using CType = typename T::c_type; - Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - // cross point to benefit from counting approach - // about 2x improvement for int32/64 from micro-benchmarking - static constexpr int kMinArraySize = 8192; - static constexpr int kMaxValueRange = 32768; - - const Datum& datum = batch[0]; - if (datum.length() - datum.null_count() >= kMinArraySize) { - CType min, max; - std::tie(min, max) = GetMinMax(datum); - - if (static_cast(max) - static_cast(min) <= kMaxValueRange) { - return CountModer(min, max).Exec(ctx, batch, out); - } + Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { + const ArraySpan& values = batch[0].array; + CType min, max; + if (ShouldUseCountMode(values, values.length - values.GetNullCount(), &min, + &max)) { + return CountModer(min, max).Exec(ctx, batch, out); } - return SortModer().Exec(ctx, batch, out); } + + Status ExecChunked(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const ChunkedArray& values = *batch[0].chunked_array(); + CType min, max; + if (ShouldUseCountMode(values, values.length() - values.null_count(), &min, + &max)) { + return CountModer(min, max).ExecChunked(ctx, batch, out); + } + return SortModer().ExecChunked(ctx, batch, out); + } }; template @@ -340,18 +411,18 @@ struct Moder> { }; template -Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) { +Status ScalarMode(KernelContext* ctx, const Scalar& scalar, ExecResult* out) { using CType = typename TypeTraits::CType; const ModeOptions& options = ModeState::Get(ctx); if ((!options.skip_nulls && !scalar.is_valid) || (static_cast(scalar.is_valid) < options.min_count)) { - return PrepareOutput(/*n=*/0, ctx, out).status(); + return PrepareOutput(/*n=*/0, ctx, *out->type(), out).status(); } if (scalar.is_valid) { bool called = false; - return Finalize(ctx, out, [&]() { + return Finalize(ctx, *out->type(), out, [&]() { if (!called) { called = true; return std::pair(UnboxScalar::Unbox(scalar), 1); @@ -359,37 +430,48 @@ Status ScalarMode(KernelContext* ctx, const Scalar& scalar, Datum* out) { return std::pair(static_cast(0), kCountEOF); }); } - return Finalize(ctx, out, []() { + return Finalize(ctx, *out->type(), out, []() { return std::pair(static_cast(0), kCountEOF); }); } -template -struct ModeExecutor { - static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - if (ctx->state() == nullptr) { - return Status::Invalid("Mode requires ModeOptions"); - } - const ModeOptions& options = ModeState::Get(ctx); - if (options.n <= 0) { - return Status::Invalid("ModeOptions::n must be strictly positive"); - } +Status CheckOptions(KernelContext* ctx) { + if (ctx->state() == nullptr) { + return Status::Invalid("Mode requires ModeOptions"); + } + const ModeOptions& options = ModeState::Get(ctx); + if (options.n <= 0) { + return Status::Invalid("ModeOptions::n must be strictly positive"); + } + return Status::OK(); +} +template +struct ModeExecutor { + static Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { + RETURN_NOT_OK(CheckOptions(ctx)); if (batch[0].is_scalar()) { - return ScalarMode(ctx, *batch[0].scalar(), out); + return ScalarMode(ctx, *batch[0].scalar, out); } - return Moder().impl.Exec(ctx, batch, out); } }; +template +struct ModeExecutorChunked { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + RETURN_NOT_OK(CheckOptions(ctx)); + return Moder().impl.ExecChunked(ctx, batch, out); + } +}; + Result ModeType(KernelContext*, const std::vector& descrs) { return ValueDescr::Array( struct_({field(kModeFieldName, descrs[0].type), field(kCountFieldName, int64())})); } -VectorKernel NewModeKernel(const std::shared_ptr& in_type, - ArrayKernelExecOld exec) { +VectorKernel NewModeKernel(const std::shared_ptr& in_type, ArrayKernelExec exec, + VectorKernel::ChunkedExec exec_chunked) { VectorKernel kernel; kernel.init = ModeState::Init; kernel.can_execute_chunkwise = false; @@ -409,6 +491,7 @@ VectorKernel NewModeKernel(const std::shared_ptr& in_type, } } kernel.exec = std::move(exec); + kernel.exec_chunked = exec_chunked; return kernel; } @@ -431,17 +514,22 @@ void RegisterScalarAggregateMode(FunctionRegistry* registry) { auto func = std::make_shared("mode", Arity::Unary(), mode_doc, &default_options); DCHECK_OK(func->AddKernel( - NewModeKernel(boolean(), ModeExecutor::Exec))); + NewModeKernel(boolean(), ModeExecutor::Exec, + ModeExecutorChunked::Exec))); for (const auto& type : NumericTypes()) { // TODO(wesm): - DCHECK_OK(func->AddKernel( - NewModeKernel(type, GenerateNumericOld(*type)))); + DCHECK_OK(func->AddKernel(NewModeKernel( + type, GenerateNumeric(*type), + GenerateNumeric( + *type)))); } // Type parameters are ignored DCHECK_OK(func->AddKernel( - NewModeKernel(decimal128(1, 0), ModeExecutor::Exec))); + NewModeKernel(decimal128(1, 0), ModeExecutor::Exec, + ModeExecutorChunked::Exec))); DCHECK_OK(func->AddKernel( - NewModeKernel(decimal256(1, 0), ModeExecutor::Exec))); + NewModeKernel(decimal256(1, 0), ModeExecutor::Exec, + ModeExecutorChunked::Exec))); DCHECK_OK(registry->AddFunction(std::move(func))); } diff --git a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc index d18d8425946..7b989bfe5f5 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_quantile.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_quantile.cc @@ -89,39 +89,19 @@ struct SortQuantiler { using CType = typename TypeTraits::CType; using Allocator = arrow::stl::allocator; - Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - const QuantileOptions& options = QuantileState::Get(ctx); - const Datum& datum = batch[0]; - - // copy all chunks to a buffer, ignore nulls and nans - std::vector in_buffer(Allocator(ctx->memory_pool())); - int64_t in_length = 0; - if ((!options.skip_nulls && datum.null_count() > 0) || - (datum.length() - datum.null_count() < options.min_count)) { - in_length = 0; - } else { - in_length = datum.length() - datum.null_count(); - } - - if (in_length > 0) { - in_buffer.resize(in_length); - CopyNonNullValues(datum, in_buffer.data()); - - // drop nan - if (is_floating_type::value) { - const auto& it = std::remove_if(in_buffer.begin(), in_buffer.end(), - [](CType v) { return v != v; }); - in_buffer.resize(it - in_buffer.begin()); - } - } - + Status ComputeQuantile(KernelContext* ctx, const QuantileOptions& options, + const std::shared_ptr& type, + std::vector& in_buffer, ExecResult* out) { // prepare out array // out type depends on options const bool is_datapoint = IsDataPoint(options); - const std::shared_ptr out_type = is_datapoint ? datum.type() : float64(); + const std::shared_ptr out_type = is_datapoint ? type : float64(); int64_t out_length = options.q.size(); if (in_buffer.empty()) { - return MakeArrayOfNull(out_type, out_length, ctx->memory_pool()).Value(out); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr result, + MakeArrayOfNull(out_type, out_length, ctx->memory_pool())); + out->value = result->data(); + return Status::OK(); } auto out_data = ArrayData::Make(out_type, out_length, 0); out_data->buffers.resize(2, nullptr); @@ -129,7 +109,7 @@ struct SortQuantiler { // calculate quantiles if (out_length > 0) { ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], - ctx->Allocate(out_length * GetBitWidth(*out_type) / 8)); + ctx->Allocate(out_length * out_type->byte_width())); // find quantiles in descending order std::vector q_indices(out_length); @@ -153,14 +133,59 @@ struct SortQuantiler { double* out_buffer = out_data->template GetMutableValues(1); for (int64_t i = 0; i < out_length; ++i) { const int64_t q_index = q_indices[i]; - out_buffer[q_index] = - GetQuantileByInterp(in_buffer, &last_index, options.q[q_index], - options.interpolation, *datum.type()); + out_buffer[q_index] = GetQuantileByInterp( + in_buffer, &last_index, options.q[q_index], options.interpolation, *type); } } } - *out = Datum(std::move(out_data)); + out->value = std::move(out_data); + return Status::OK(); + } + + template + void FillBuffer(const QuantileOptions& options, const Container& container, + int64_t length, int64_t null_count, + std::vector* in_buffer) { + int64_t in_length = 0; + if ((!options.skip_nulls && null_count > 0) || + (length - null_count < options.min_count)) { + in_length = 0; + } else { + in_length = length - null_count; + } + + if (in_length > 0) { + in_buffer->resize(in_length); + CopyNonNullValues(container, in_buffer->data()); + + // drop nan + if (is_floating_type::value) { + const auto& it = std::remove_if(in_buffer->begin(), in_buffer->end(), + [](CType v) { return v != v; }); + in_buffer->resize(it - in_buffer->begin()); + } + } + } + + Status Exec(KernelContext* ctx, const ArraySpan& values, ExecResult* out) { + const QuantileOptions& options = QuantileState::Get(ctx); + + // copy all chunks to a buffer, ignore nulls and nans + std::vector in_buffer(Allocator(ctx->memory_pool())); + FillBuffer(options, values, values.length, values.GetNullCount(), &in_buffer); + return ComputeQuantile(ctx, options, values.type->Copy(), in_buffer, out); + } + + Status Exec(KernelContext* ctx, const ChunkedArray& values, Datum* out) { + const QuantileOptions& options = QuantileState::Get(ctx); + + // copy all chunks to a buffer, ignore nulls and nans + std::vector in_buffer(Allocator(ctx->memory_pool())); + FillBuffer(options, values, values.length(), values.null_count(), &in_buffer); + ExecResult result; + RETURN_NOT_OK(ComputeQuantile(ctx, options, values.type(), in_buffer, &result)); + *out = result.array_data(); return Status::OK(); } @@ -245,17 +270,8 @@ struct CountQuantiler { this->counts.resize(value_range, 0); } - Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - const QuantileOptions& options = QuantileState::Get(ctx); - - // count values in all chunks, ignore nulls - const Datum& datum = batch[0]; - int64_t in_length = 0; - if ((options.skip_nulls || (!options.skip_nulls && datum.null_count() == 0)) && - (datum.length() - datum.null_count() >= options.min_count)) { - in_length = CountValues(this->counts.data(), datum, this->min); - } - + Status ComputeQuantile(KernelContext* ctx, const QuantileOptions& options, + int64_t in_length, ExecResult* out) { // prepare out array // out type depends on options const bool is_datapoint = IsDataPoint(options); @@ -263,7 +279,10 @@ struct CountQuantiler { is_datapoint ? TypeTraits::type_singleton() : float64(); int64_t out_length = options.q.size(); if (in_length == 0) { - return MakeArrayOfNull(out_type, out_length, ctx->memory_pool()).Value(out); + ARROW_ASSIGN_OR_RAISE(std::shared_ptr result, + MakeArrayOfNull(out_type, out_length, ctx->memory_pool())); + out->value = std::move(result->data()); + return Status::OK(); } auto out_data = ArrayData::Make(out_type, out_length, 0); out_data->buffers.resize(2, nullptr); @@ -271,7 +290,7 @@ struct CountQuantiler { // calculate quantiles if (out_length > 0) { ARROW_ASSIGN_OR_RAISE(out_data->buffers[1], - ctx->Allocate(out_length * GetBitWidth(*out_type) / 8)); + ctx->Allocate(out_length * out_type->byte_width())); // find quantiles in ascending order std::vector q_indices(out_length); @@ -298,8 +317,36 @@ struct CountQuantiler { } } } + out->value = std::move(out_data); + return Status::OK(); + } + + Status Exec(KernelContext* ctx, const ArraySpan& values, ExecResult* out) { + const QuantileOptions& options = QuantileState::Get(ctx); + + // count values in all chunks, ignore nulls + int64_t in_length = 0; + if ((options.skip_nulls || (!options.skip_nulls && values.GetNullCount() == 0)) && + (values.length - values.GetNullCount() >= options.min_count)) { + in_length = CountValues(values, this->min, this->counts.data()); + } + + return ComputeQuantile(ctx, options, in_length, out); + } + + Status Exec(KernelContext* ctx, const ChunkedArray& values, Datum* out) { + const QuantileOptions& options = QuantileState::Get(ctx); - *out = Datum(std::move(out_data)); + // count values in all chunks, ignore nulls + int64_t in_length = 0; + if ((options.skip_nulls || (!options.skip_nulls && values.null_count() == 0)) && + (values.length() - values.null_count() >= options.min_count)) { + in_length = CountValues(values, this->min, this->counts.data()); + } + + ExecResult result; + RETURN_NOT_OK(ComputeQuantile(ctx, options, in_length, &result)); + *out = result.array_data(); return Status::OK(); } @@ -365,23 +412,31 @@ template struct CountOrSortQuantiler { using CType = typename InType::c_type; - Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - // cross point to benefit from histogram approach - // parameters estimated from ad-hoc benchmarks manually - static constexpr int kMinArraySize = 65536; - static constexpr int kMaxValueRange = 65536; + // cross point to benefit from histogram approach + // parameters estimated from ad-hoc benchmarks manually + static constexpr int kMinArraySize = 65536; + static constexpr int kMaxValueRange = 65536; - const Datum& datum = batch[0]; - if (datum.length() - datum.null_count() >= kMinArraySize) { + Status Exec(KernelContext* ctx, const ArraySpan& values, ExecResult* out) { + if (values.length - values.GetNullCount() >= kMinArraySize) { CType min, max; - std::tie(min, max) = GetMinMax(datum); - + std::tie(min, max) = GetMinMax(values); if (static_cast(max) - static_cast(min) <= kMaxValueRange) { - return CountQuantiler(min, max).Exec(ctx, batch, out); + return CountQuantiler(min, max).Exec(ctx, values, out); } } + return SortQuantiler().Exec(ctx, values, out); + } - return SortQuantiler().Exec(ctx, batch, out); + Status Exec(KernelContext* ctx, const ChunkedArray& values, Datum* out) { + if (values.length() - values.null_count() >= kMinArraySize) { + CType min, max; + std::tie(min, max) = GetMinMax(values); + if (static_cast(max) - static_cast(min) <= kMaxValueRange) { + return CountQuantiler(min, max).Exec(ctx, values, out); + } + } + return SortQuantiler().Exec(ctx, values, out); } }; @@ -417,15 +472,14 @@ struct ExactQuantiler::value>> { }; template -Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options, - const Scalar& scalar, Datum* out) { +Status ScalarQuantile(KernelContext* ctx, const Scalar& scalar, ExecResult* out) { + const QuantileOptions& options = QuantileState::Get(ctx); using CType = typename TypeTraits::CType; - ArrayData* output = out->mutable_array(); + ArrayData* output = out->array_data().get(); output->length = options.q.size(); auto out_type = IsDataPoint(options) ? scalar.type : float64(); - ARROW_ASSIGN_OR_RAISE( - output->buffers[1], - ctx->Allocate(output->length * bit_util::BytesForBits(GetBitWidth(*out_type)))); + ARROW_ASSIGN_OR_RAISE(output->buffers[1], + ctx->Allocate(output->length * out_type->byte_width())); if (!scalar.is_valid || options.min_count > 1) { output->null_count = output->length; @@ -456,28 +510,39 @@ Status ScalarQuantile(KernelContext* ctx, const QuantileOptions& options, return Status::OK(); } -template -struct QuantileExecutor { - static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - if (ctx->state() == nullptr) { - return Status::Invalid("Quantile requires QuantileOptions"); - } +Status CheckQuantileOptions(KernelContext* ctx) { + if (ctx->state() == nullptr) { + return Status::Invalid("Quantile requires QuantileOptions"); + } - const QuantileOptions& options = QuantileState::Get(ctx); - if (options.q.empty()) { - return Status::Invalid("Requires quantile argument"); - } - for (double q : options.q) { - if (q < 0 || q > 1) { - return Status::Invalid("Quantile must be between 0 and 1"); - } + const QuantileOptions& options = QuantileState::Get(ctx); + if (options.q.empty()) { + return Status::Invalid("Requires quantile argument"); + } + for (double q : options.q) { + if (q < 0 || q > 1) { + return Status::Invalid("Quantile must be between 0 and 1"); } + } + return Status::OK(); +} +template +struct QuantileExecutor { + static Status Exec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { + RETURN_NOT_OK(CheckQuantileOptions(ctx)); if (batch[0].is_scalar()) { - return ScalarQuantile(ctx, options, *batch[0].scalar(), out); + return ScalarQuantile(ctx, *batch[0].scalar, out); } + return ExactQuantiler().impl.Exec(ctx, batch[0].array, out); + } +}; - return ExactQuantiler().impl.Exec(ctx, batch, out); +template +struct QuantileExecutorChunked { + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + RETURN_NOT_OK(CheckQuantileOptions(ctx)); + return ExactQuantiler().impl.Exec(ctx, *batch[0].chunked_array(), out); } }; @@ -500,19 +565,24 @@ void AddQuantileKernels(VectorFunction* func) { for (const auto& ty : NumericTypes()) { base.signature = KernelSignature::Make({InputType(ty)}, OutputType(ResolveOutput)); // output type is determined at runtime, set template argument to nulltype - base.exec = GenerateNumericOld(*ty); + base.exec = GenerateNumeric(*ty); + base.exec_chunked = + GenerateNumeric( + *ty); DCHECK_OK(func->AddKernel(base)); } { base.signature = KernelSignature::Make({InputType(Type::DECIMAL128)}, OutputType(ResolveOutput)); base.exec = QuantileExecutor::Exec; + base.exec_chunked = QuantileExecutorChunked::Exec; DCHECK_OK(func->AddKernel(base)); } { base.signature = KernelSignature::Make({InputType(Type::DECIMAL256)}, OutputType(ResolveOutput)); base.exec = QuantileExecutor::Exec; + base.exec_chunked = QuantileExecutorChunked::Exec; DCHECK_OK(func->AddKernel(base)); } } diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.cc b/cpp/src/arrow/compute/kernels/codegen_internal.cc index c696e6376f7..9e32f9e7f6d 100644 --- a/cpp/src/arrow/compute/kernels/codegen_internal.cc +++ b/cpp/src/arrow/compute/kernels/codegen_internal.cc @@ -33,18 +33,6 @@ Status ExecFail(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { return Status::NotImplemented("This kernel is malformed"); } -Status ExecFailOld(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - return Status::NotImplemented("This kernel is malformed"); -} - -ArrayKernelExec MakeFlippedBinaryExec(ArrayKernelExec exec) { - return [exec](KernelContext* ctx, const ExecSpan& span, ExecResult* out) { - ExecSpan flipped_span = span; - std::swap(flipped_span.values[0], flipped_span.values[1]); - return exec(ctx, flipped_span, out); - }; -} - const std::vector>& ExampleParametricTypes() { static DataTypeVector example_parametric_types = { decimal128(12, 2), diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h b/cpp/src/arrow/compute/kernels/codegen_internal.h index 8c3c7e3d423..bc21c4efb6a 100644 --- a/cpp/src/arrow/compute/kernels/codegen_internal.h +++ b/cpp/src/arrow/compute/kernels/codegen_internal.h @@ -456,14 +456,6 @@ Result FirstType(KernelContext*, const std::vector& desc Result LastType(KernelContext*, const std::vector& descrs); Result ListValuesType(KernelContext*, const std::vector& args); -// ---------------------------------------------------------------------- -// Generate an array kernel given template classes - -Status ExecFail(KernelContext* ctx, const ExecSpan& batch, ExecResult* out); -Status ExecFailOld(KernelContext* ctx, const ExecBatch& batch, Datum* out); - -ArrayKernelExec MakeFlippedBinaryExec(ArrayKernelExec exec); - // ---------------------------------------------------------------------- // Helpers for iterating over common DataType instances for adding kernels to // functions @@ -1032,41 +1024,29 @@ struct GetTypeId { } // namespace detail -// GD for numeric types (integer and floating point) -template