diff --git a/cpp/src/arrow/compute/exec.cc b/cpp/src/arrow/compute/exec.cc index 05c4936482b..a314fdcdd5e 100644 --- a/cpp/src/arrow/compute/exec.cc +++ b/cpp/src/arrow/compute/exec.cc @@ -297,11 +297,23 @@ void ComputeDataPreallocate(const DataType& type, case Type::MAP: widths->emplace_back(32, /*added_length=*/1); return; + case Type::LIST_VIEW: { + // add offsets and size + widths->emplace_back(32); + widths->emplace_back(32); + return; + } case Type::LARGE_BINARY: case Type::LARGE_STRING: case Type::LARGE_LIST: widths->emplace_back(64, /*added_length=*/1); return; + case Type::LARGE_LIST_VIEW: { + // add offsets and size + widths->emplace_back(64); + widths->emplace_back(64); + return; + } default: break; } @@ -410,7 +422,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) { // The first time this is called, we populate the output span with any // Scalar or Array arguments in the ExecValue struct, and then just // increment array offsets below. If any arguments are ChunkedArray, then - // the internal ArraySpans will see their members updated during hte + // the internal ArraySpans will see their members updated during the // iteration span->values.resize(args_->size()); for (size_t i = 0; i < args_->size(); ++i) { @@ -473,7 +485,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) { namespace { struct NullGeneralization { - enum type { PERHAPS_NULL, ALL_VALID, ALL_NULL }; + enum type { PERHAPS_NULL = 0, ALL_VALID = 1, ALL_NULL = 2 }; static type Get(const ExecValue& value) { const auto dtype_id = value.type()->id(); @@ -498,7 +510,27 @@ struct NullGeneralization { return PERHAPS_NULL; } + static type Get(const ChunkedArray& chunk_array) { + std::optional current_gen; + for (const auto& chunk : chunk_array.chunks()) { + if (chunk->length() == 0) { + continue; + } + + const auto& chunk_gen = Get(chunk); + if (current_gen.has_value() && chunk_gen != *current_gen) { + return PERHAPS_NULL; + } + current_gen = chunk_gen; + } + return current_gen.value_or(ALL_VALID); + } + static type Get(const Datum& datum) { + if (datum.is_chunked_array()) { + return Get(*datum.chunked_array()); + } + // Temporary workaround to help with ARROW-16756 ExecValue value; if (datum.is_array()) { @@ -506,7 +538,6 @@ struct NullGeneralization { } else if (datum.is_scalar()) { value.SetScalar(datum.scalar().get()); } else { - // TODO(wesm): ChunkedArray, I think return PERHAPS_NULL; } return Get(value); @@ -738,12 +769,14 @@ class KernelExecutorImpl : public KernelExecutor { } for (size_t i = 0; i < data_preallocated_.size(); ++i) { const auto& prealloc = data_preallocated_[i]; - if (prealloc.bit_width >= 0) { - ARROW_ASSIGN_OR_RAISE( - out->buffers[i + 1], - AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length, - prealloc.bit_width)); - } + + // ComputeDataPreallocate can make sure all of the element in + // data_preallocated_ could satisfy below DCHECK + DCHECK_GE(prealloc.bit_width, 0); + ARROW_ASSIGN_OR_RAISE( + out->buffers[i + 1], + AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length, + prealloc.bit_width)); } return out; } @@ -796,7 +829,7 @@ class ScalarExecutor : public KernelExecutorImpl { // kernels supporting preallocation, then we do so up front and then // iterate over slices of that large array. Otherwise, we preallocate prior // to processing each span emitted from the ExecSpanIterator - RETURN_NOT_OK(SetupPreallocation(span_iterator_.length(), batch.values)); + RETURN_NOT_OK(SetupPreallocation(batch.values)); // ARROW-16756: Here we have to accommodate the distinct cases // @@ -928,7 +961,7 @@ class ScalarExecutor : public KernelExecutorImpl { return Status::OK(); } - Status SetupPreallocation(int64_t total_length, const std::vector& args) { + Status SetupPreallocation(const std::vector& args) { output_num_buffers_ = static_cast(output_type_.type->layout().buffers.size()); auto out_type_id = output_type_.type->id(); // Default to no validity pre-allocation for following cases: @@ -966,12 +999,6 @@ class ScalarExecutor : public KernelExecutorImpl { data_preallocated_.size() == static_cast(output_num_buffers_ - 1) && !is_nested(out_type_id) && !is_dictionary(out_type_id)); - // TODO(wesm): why was this check ever here? Fixed width binary - // can be 0-width but anything else? - DCHECK(std::all_of( - data_preallocated_.begin(), data_preallocated_.end(), - [](const BufferPreallocation& prealloc) { return prealloc.bit_width >= 0; })); - // Contiguous preallocation only possible on non-nested types if all // buffers are preallocated. Otherwise, we must go chunk-by-chunk. // @@ -1022,15 +1049,6 @@ Status CheckCanExecuteChunked(const VectorKernel* kernel) { class VectorExecutor : public KernelExecutorImpl { public: Status Execute(const ExecBatch& batch, ExecListener* listener) override { - // Some vector kernels have a separate code path for handling - // chunked arrays (VectorKernel::exec_chunked) so we check if we - // have any chunked arrays. If we do and an exec_chunked function - // is defined then we call that. - bool have_chunked_arrays = false; - for (const Datum& arg : batch.values) { - if (arg.is_chunked_array()) have_chunked_arrays = true; - } - output_num_buffers_ = static_cast(output_type_.type->layout().buffers.size()); // Decide if we need to preallocate memory for this kernel @@ -1049,10 +1067,19 @@ class VectorExecutor : public KernelExecutorImpl { RETURN_NOT_OK(Exec(span, listener)); } } else { - // Kernel cannot execute chunkwise. If we have any chunked - // arrays, then VectorKernel::exec_chunked must be defined - // otherwise we raise an error + // Some vector kernels have a separate code path for handling + // chunked arrays (VectorKernel::exec_chunked) so we check if we + // have any chunked arrays. If we do and an exec_chunked function + // is defined then we call that. + bool have_chunked_arrays = false; + for (const Datum& arg : batch.values) { + if (arg.is_chunked_array()) have_chunked_arrays = true; + } + if (have_chunked_arrays) { + // Kernel cannot execute chunkwise. If we have any chunked + // arrays, then VectorKernel::exec_chunked must be defined + // otherwise we raise an error RETURN_NOT_OK(ExecChunked(batch, listener)); } else { // No chunked arrays. We pack the args into an ExecSpan and diff --git a/cpp/src/arrow/compute/exec_internal.h b/cpp/src/arrow/compute/exec_internal.h index 7e4f364a928..f8472231476 100644 --- a/cpp/src/arrow/compute/exec_internal.h +++ b/cpp/src/arrow/compute/exec_internal.h @@ -132,9 +132,6 @@ class ARROW_EXPORT KernelExecutor { /// for all scanned batches in a dataset filter. virtual Status Init(KernelContext*, KernelInitArgs) = 0; - // TODO(wesm): per ARROW-16819, adding ExecBatch variant so that a batch - // length can be passed in for scalar functions; will have to return and - // clean a bunch of things up virtual Status Execute(const ExecBatch& batch, ExecListener* listener) = 0; virtual Datum WrapResults(const std::vector& args,