From 29e34f35285e0a873b3a8f081a3b91771ed8ebfc Mon Sep 17 00:00:00 2001 From: ZhangHuiGui <2689496754@qq.com> Date: Thu, 13 Jun 2024 22:59:36 +0800 Subject: [PATCH 1/5] refactoring the setup preallocation logic in-place --- cpp/src/arrow/compute/exec.cc | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 05c4936482b..7bbc81ecbe3 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -1034,9 +1034,23 @@ class VectorExecutor : public KernelExecutorImpl { output_num_buffers_ = static_cast(output_type_.type->layout().buffers.size()); // Decide if we need to preallocate memory for this kernel - validity_preallocated_ = - (kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE && - kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL); + validity_preallocated_ = false; + if (output_type_.type->id() != Type::NA) { + if (kernel_->null_handling == NullHandling::COMPUTED_PREALLOCATE) { + // Override the flag if kernel asks for pre-allocation + validity_preallocated_ = true; + } else if (kernel_->null_handling == NullHandling::INTERSECTION) { + bool elide_validity_bitmap = true; + for (const auto& arg : batch.values) { + auto null_gen = NullGeneralization::Get(arg) == NullGeneralization::ALL_VALID; + + // If not all valid, this becomes false + elide_validity_bitmap = elide_validity_bitmap && null_gen; + } + validity_preallocated_ = !elide_validity_bitmap; + } + } + if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { data_preallocated_.clear(); ComputeDataPreallocate(*output_type_.type, &data_preallocated_); From d42eeb7052c4471089055be5629ec8663b608611 Mon Sep 17 00:00:00 2001 From: ZhangHuiGui <2689496754@qq.com> Date: Thu, 13 Jun 2024 23:05:54 +0800 Subject: [PATCH 2/5] extracting the setup preallocation logic to a function for VectorExecutor --- cpp/src/arrow/compute/exec.cc | 55 ++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 7bbc81ecbe3..dc8460299c0 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -1031,30 +1031,7 @@ class VectorExecutor : public KernelExecutorImpl { if (arg.is_chunked_array()) have_chunked_arrays = true; } - output_num_buffers_ = static_cast(output_type_.type->layout().buffers.size()); - - // Decide if we need to preallocate memory for this kernel - validity_preallocated_ = false; - if (output_type_.type->id() != Type::NA) { - if (kernel_->null_handling == NullHandling::COMPUTED_PREALLOCATE) { - // Override the flag if kernel asks for pre-allocation - validity_preallocated_ = true; - } else if (kernel_->null_handling == NullHandling::INTERSECTION) { - bool elide_validity_bitmap = true; - for (const auto& arg : batch.values) { - auto null_gen = NullGeneralization::Get(arg) == NullGeneralization::ALL_VALID; - - // If not all valid, this becomes false - elide_validity_bitmap = elide_validity_bitmap && null_gen; - } - validity_preallocated_ = !elide_validity_bitmap; - } - } - - if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { - data_preallocated_.clear(); - ComputeDataPreallocate(*output_type_.type, &data_preallocated_); - } + RETURN_NOT_OK(SetupPreallocation(batch.values)); if (kernel_->can_execute_chunkwise) { RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize())); @@ -1137,6 +1114,36 @@ class VectorExecutor : public KernelExecutorImpl { } } + Status SetupPreallocation(const std::vector& args) { + output_num_buffers_ = static_cast(output_type_.type->layout().buffers.size()); + const auto& out_type_id = output_type_.type->id(); + + // Decide if we need to preallocate memory for this kernel + validity_preallocated_ = false; + if (out_type_id != Type::NA) { + if (kernel_->null_handling == NullHandling::COMPUTED_PREALLOCATE) { + // Override the flag if kernel asks for pre-allocation + validity_preallocated_ = true; + } else if (kernel_->null_handling == NullHandling::INTERSECTION) { + bool elide_validity_bitmap = true; + for (const auto& arg : args) { + auto null_gen = NullGeneralization::Get(arg) == NullGeneralization::ALL_VALID; + + // If not all valid, this becomes false + elide_validity_bitmap = elide_validity_bitmap && null_gen; + } + validity_preallocated_ = !elide_validity_bitmap; + } + } + + if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { + data_preallocated_.clear(); + ComputeDataPreallocate(*output_type_.type, &data_preallocated_); + } + + return Status::OK(); + } + ExecSpanIterator span_iterator_; std::vector results_; }; From 0a358e76b41121201156656b8dab0f25a81de2cb Mon Sep 17 00:00:00 2001 From: ZhangHuiGui <2689496754@qq.com> Date: Thu, 13 Jun 2024 23:18:12 +0800 Subject: [PATCH 3/5] 1. support preallocation for [Large]ListView's data type 2. support NullGeneralization for chunked-array 3. simple refactor some execute logic and remove finished TODO --- cpp/src/arrow/compute/exec.cc | 88 ++++++++++++++++++--------- cpp/src/arrow/compute/exec_internal.h | 3 - 2 files changed, 59 insertions(+), 32 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index dc8460299c0..161af8e6ae1 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -297,11 +297,23 @@ void ComputeDataPreallocate(const DataType& type, case Type::MAP: widths->emplace_back(32, /*added_length=*/1); return; + case Type::LIST_VIEW: { + // add offsets and size + widths->emplace_back(32); + widths->emplace_back(32); + return; + } case Type::LARGE_BINARY: case Type::LARGE_STRING: case Type::LARGE_LIST: widths->emplace_back(64, /*added_length=*/1); return; + case Type::LARGE_LIST_VIEW: { + // add offsets and size + widths->emplace_back(64); + widths->emplace_back(64); + return; + } default: break; } @@ -410,7 +422,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) { // The first time this is called, we populate the output span with any // Scalar or Array arguments in the ExecValue struct, and then just // increment array offsets below. If any arguments are ChunkedArray, then - // the internal ArraySpans will see their members updated during hte + // the internal ArraySpans will see their members updated during the // iteration span->values.resize(args_->size()); for (size_t i = 0; i < args_->size(); ++i) { @@ -473,7 +485,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) { namespace { struct NullGeneralization { - enum type { PERHAPS_NULL, ALL_VALID, ALL_NULL }; + enum type { PERHAPS_NULL = 0, ALL_VALID = 1, ALL_NULL = 2 }; static type Get(const ExecValue& value) { const auto dtype_id = value.type()->id(); @@ -498,7 +510,30 @@ struct NullGeneralization { return PERHAPS_NULL; } + static type Get(const ChunkedArray& chunk_array) { + if (chunk_array.num_chunks() == 0) { + return ALL_VALID; + } + if (chunk_array.null_count() == chunk_array.length()) { + return ALL_NULL; + } + + for (const auto& chunk : chunk_array.chunks()) { + ExecValue value; + value.SetArray(*chunk->data()); + auto null_gen = Get(value); + if (null_gen == ALL_NULL || null_gen == PERHAPS_NULL) { + return PERHAPS_NULL; + } + } + return ALL_VALID; + } + static type Get(const Datum& datum) { + if (datum.is_chunked_array()) { + return Get(*datum.chunked_array()); + } + // Temporary workaround to help with ARROW-16756 ExecValue value; if (datum.is_array()) { @@ -506,7 +541,6 @@ struct NullGeneralization { } else if (datum.is_scalar()) { value.SetScalar(datum.scalar().get()); } else { - // TODO(wesm): ChunkedArray, I think return PERHAPS_NULL; } return Get(value); @@ -738,12 +772,14 @@ class KernelExecutorImpl : public KernelExecutor { } for (size_t i = 0; i < data_preallocated_.size(); ++i) { const auto& prealloc = data_preallocated_[i]; - if (prealloc.bit_width >= 0) { - ARROW_ASSIGN_OR_RAISE( - out->buffers[i + 1], - AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length, - prealloc.bit_width)); - } + + // ComputeDataPreallocate can make sure all of the element in + // data_preallocated_ could satisfy below DCHECK + DCHECK_GE(prealloc.bit_width, 0); + ARROW_ASSIGN_OR_RAISE( + out->buffers[i + 1], + AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length, + prealloc.bit_width)); } return out; } @@ -796,7 +832,7 @@ class ScalarExecutor : public KernelExecutorImpl { // kernels supporting preallocation, then we do so up front and then // iterate over slices of that large array. Otherwise, we preallocate prior // to processing each span emitted from the ExecSpanIterator - RETURN_NOT_OK(SetupPreallocation(span_iterator_.length(), batch.values)); + RETURN_NOT_OK(SetupPreallocation(batch.values)); // ARROW-16756: Here we have to accommodate the distinct cases // @@ -928,7 +964,7 @@ class ScalarExecutor : public KernelExecutorImpl { return Status::OK(); } - Status SetupPreallocation(int64_t total_length, const std::vector& args) { + Status SetupPreallocation(const std::vector& args) { output_num_buffers_ = static_cast(output_type_.type->layout().buffers.size()); auto out_type_id = output_type_.type->id(); // Default to no validity pre-allocation for following cases: @@ -966,12 +1002,6 @@ class ScalarExecutor : public KernelExecutorImpl { data_preallocated_.size() == static_cast(output_num_buffers_ - 1) && !is_nested(out_type_id) && !is_dictionary(out_type_id)); - // TODO(wesm): why was this check ever here? Fixed width binary - // can be 0-width but anything else? - DCHECK(std::all_of( - data_preallocated_.begin(), data_preallocated_.end(), - [](const BufferPreallocation& prealloc) { return prealloc.bit_width >= 0; })); - // Contiguous preallocation only possible on non-nested types if all // buffers are preallocated. Otherwise, we must go chunk-by-chunk. // @@ -1022,15 +1052,6 @@ Status CheckCanExecuteChunked(const VectorKernel* kernel) { class VectorExecutor : public KernelExecutorImpl { public: Status Execute(const ExecBatch& batch, ExecListener* listener) override { - // Some vector kernels have a separate code path for handling - // chunked arrays (VectorKernel::exec_chunked) so we check if we - // have any chunked arrays. If we do and an exec_chunked function - // is defined then we call that. - bool have_chunked_arrays = false; - for (const Datum& arg : batch.values) { - if (arg.is_chunked_array()) have_chunked_arrays = true; - } - RETURN_NOT_OK(SetupPreallocation(batch.values)); if (kernel_->can_execute_chunkwise) { @@ -1040,10 +1061,19 @@ class VectorExecutor : public KernelExecutorImpl { RETURN_NOT_OK(Exec(span, listener)); } } else { - // Kernel cannot execute chunkwise. If we have any chunked - // arrays, then VectorKernel::exec_chunked must be defined - // otherwise we raise an error + // Some vector kernels have a separate code path for handling + // chunked arrays (VectorKernel::exec_chunked) so we check if we + // have any chunked arrays. If we do and an exec_chunked function + // is defined then we call that. + bool have_chunked_arrays = false; + for (const Datum& arg : batch.values) { + if (arg.is_chunked_array()) have_chunked_arrays = true; + } + if (have_chunked_arrays) { + // Kernel cannot execute chunkwise. If we have any chunked + // arrays, then VectorKernel::exec_chunked must be defined + // otherwise we raise an error RETURN_NOT_OK(ExecChunked(batch, listener)); } else { // No chunked arrays. We pack the args into an ExecSpan and diff --git a/cpp/src/arrow/compute/exec_internal.h b/cpp/src/arrow/compute/exec_internal.h index 7e4f364a928..f8472231476 100644 --- a/cpp/src/arrow/compute/exec_internal.h +++ b/cpp/src/arrow/compute/exec_internal.h @@ -132,9 +132,6 @@ class ARROW_EXPORT KernelExecutor { /// for all scanned batches in a dataset filter. virtual Status Init(KernelContext*, KernelInitArgs) = 0; - // TODO(wesm): per ARROW-16819, adding ExecBatch variant so that a batch - // length can be passed in for scalar functions; will have to return and - // clean a bunch of things up virtual Status Execute(const ExecBatch& batch, ExecListener* listener) = 0; virtual Datum WrapResults(const std::vector& args, From 4307d1e91948cae4430687a1a57c23c926da6998 Mon Sep 17 00:00:00 2001 From: ZhangHuiGui <2689496754@qq.com> Date: Tue, 25 Jun 2024 09:52:27 +0800 Subject: [PATCH 4/5] revert more precise pre-allocation for VectorExecutor --- cpp/src/arrow/compute/exec.cc | 41 +++++++++-------------------------- 1 file changed, 10 insertions(+), 31 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 161af8e6ae1..3c9bb4c307a 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -1052,7 +1052,16 @@ Status CheckCanExecuteChunked(const VectorKernel* kernel) { class VectorExecutor : public KernelExecutorImpl { public: Status Execute(const ExecBatch& batch, ExecListener* listener) override { - RETURN_NOT_OK(SetupPreallocation(batch.values)); + output_num_buffers_ = static_cast(output_type_.type->layout().buffers.size()); + + // Decide if we need to preallocate memory for this kernel + validity_preallocated_ = + (kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE && + kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL); + if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { + data_preallocated_.clear(); + ComputeDataPreallocate(*output_type_.type, &data_preallocated_); + } if (kernel_->can_execute_chunkwise) { RETURN_NOT_OK(span_iterator_.Init(batch, exec_context()->exec_chunksize())); @@ -1144,36 +1153,6 @@ class VectorExecutor : public KernelExecutorImpl { } } - Status SetupPreallocation(const std::vector& args) { - output_num_buffers_ = static_cast(output_type_.type->layout().buffers.size()); - const auto& out_type_id = output_type_.type->id(); - - // Decide if we need to preallocate memory for this kernel - validity_preallocated_ = false; - if (out_type_id != Type::NA) { - if (kernel_->null_handling == NullHandling::COMPUTED_PREALLOCATE) { - // Override the flag if kernel asks for pre-allocation - validity_preallocated_ = true; - } else if (kernel_->null_handling == NullHandling::INTERSECTION) { - bool elide_validity_bitmap = true; - for (const auto& arg : args) { - auto null_gen = NullGeneralization::Get(arg) == NullGeneralization::ALL_VALID; - - // If not all valid, this becomes false - elide_validity_bitmap = elide_validity_bitmap && null_gen; - } - validity_preallocated_ = !elide_validity_bitmap; - } - } - - if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) { - data_preallocated_.clear(); - ComputeDataPreallocate(*output_type_.type, &data_preallocated_); - } - - return Status::OK(); - } - ExecSpanIterator span_iterator_; std::vector results_; }; From 7bbe96b5dd17083d554c6d449ad844fb3d4d5eab Mon Sep 17 00:00:00 2001 From: ZhangHuiGui <2689496754@qq.com> Date: Wed, 26 Jun 2024 17:41:13 +0800 Subject: [PATCH 5/5] keep ChunkedArray same constraint with Array in NullGeneration --- cpp/src/arrow/compute/exec.cc | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 3c9bb4c307a..a314fdcdd5e 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -511,22 +511,19 @@ struct NullGeneralization { } static type Get(const ChunkedArray& chunk_array) { - if (chunk_array.num_chunks() == 0) { - return ALL_VALID; - } - if (chunk_array.null_count() == chunk_array.length()) { - return ALL_NULL; - } - + std::optional current_gen; for (const auto& chunk : chunk_array.chunks()) { - ExecValue value; - value.SetArray(*chunk->data()); - auto null_gen = Get(value); - if (null_gen == ALL_NULL || null_gen == PERHAPS_NULL) { + if (chunk->length() == 0) { + continue; + } + + const auto& chunk_gen = Get(chunk); + if (current_gen.has_value() && chunk_gen != *current_gen) { return PERHAPS_NULL; } + current_gen = chunk_gen; } - return ALL_VALID; + return current_gen.value_or(ALL_VALID); } static type Get(const Datum& datum) {