Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 56 additions & 29 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,23 @@ void ComputeDataPreallocate(const DataType& type,
case Type::MAP:
widths->emplace_back(32, /*added_length=*/1);
return;
case Type::LIST_VIEW: {
// add offsets and size
widths->emplace_back(32);
widths->emplace_back(32);
return;
}
case Type::LARGE_BINARY:
case Type::LARGE_STRING:
case Type::LARGE_LIST:
widths->emplace_back(64, /*added_length=*/1);
return;
case Type::LARGE_LIST_VIEW: {
// add offsets and size
widths->emplace_back(64);
widths->emplace_back(64);
return;
}
default:
break;
}
Expand Down Expand Up @@ -410,7 +422,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
// The first time this is called, we populate the output span with any
// Scalar or Array arguments in the ExecValue struct, and then just
// increment array offsets below. If any arguments are ChunkedArray, then
// the internal ArraySpans will see their members updated during hte
// the internal ArraySpans will see their members updated during the
// iteration
span->values.resize(args_->size());
for (size_t i = 0; i < args_->size(); ++i) {
Expand Down Expand Up @@ -473,7 +485,7 @@ bool ExecSpanIterator::Next(ExecSpan* span) {
namespace {

struct NullGeneralization {
enum type { PERHAPS_NULL, ALL_VALID, ALL_NULL };
enum type { PERHAPS_NULL = 0, ALL_VALID = 1, ALL_NULL = 2 };

static type Get(const ExecValue& value) {
const auto dtype_id = value.type()->id();
Expand All @@ -498,15 +510,34 @@ struct NullGeneralization {
return PERHAPS_NULL;
}

static type Get(const ChunkedArray& chunk_array) {
std::optional<type> current_gen;
for (const auto& chunk : chunk_array.chunks()) {
if (chunk->length() == 0) {
continue;
}

const auto& chunk_gen = Get(chunk);
if (current_gen.has_value() && chunk_gen != *current_gen) {
return PERHAPS_NULL;
}
current_gen = chunk_gen;
}
return current_gen.value_or(ALL_VALID);
}

static type Get(const Datum& datum) {
if (datum.is_chunked_array()) {
return Get(*datum.chunked_array());
}

// Temporary workaround to help with ARROW-16756
ExecValue value;
if (datum.is_array()) {
value.SetArray(*datum.array());
} else if (datum.is_scalar()) {
value.SetScalar(datum.scalar().get());
} else {
// TODO(wesm): ChunkedArray, I think
return PERHAPS_NULL;
}
return Get(value);
Expand Down Expand Up @@ -738,12 +769,14 @@ class KernelExecutorImpl : public KernelExecutor {
}
for (size_t i = 0; i < data_preallocated_.size(); ++i) {
const auto& prealloc = data_preallocated_[i];
if (prealloc.bit_width >= 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would NA type do here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data_preallocated_ will be filled in ComputeDataPreallocate, and the NA type will not be added to data_preallocated_.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, seems this could be a DCHECK?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed_size_binary<0> can have bit_width == 0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed_size_binary<0> can have bit_width == 0

Yes, fixed_size_binary<0> can be added into data_preallocated_ normally in ComputeDataPreallocate, and the function call make sure all of element in data_preallocated_ should satisfy >=0. So we just add a DCHECK here.

ARROW_ASSIGN_OR_RAISE(
out->buffers[i + 1],
AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length,
prealloc.bit_width));
}

// ComputeDataPreallocate can make sure all of the element in
// data_preallocated_ could satisfy below DCHECK
DCHECK_GE(prealloc.bit_width, 0);
ARROW_ASSIGN_OR_RAISE(
out->buffers[i + 1],
AllocateDataBuffer(kernel_ctx_, length + prealloc.added_length,
prealloc.bit_width));
}
return out;
}
Expand Down Expand Up @@ -796,7 +829,7 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
// kernels supporting preallocation, then we do so up front and then
// iterate over slices of that large array. Otherwise, we preallocate prior
// to processing each span emitted from the ExecSpanIterator
RETURN_NOT_OK(SetupPreallocation(span_iterator_.length(), batch.values));
RETURN_NOT_OK(SetupPreallocation(batch.values));

// ARROW-16756: Here we have to accommodate the distinct cases
//
Expand Down Expand Up @@ -928,7 +961,7 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
return Status::OK();
}

Status SetupPreallocation(int64_t total_length, const std::vector<Datum>& args) {
Status SetupPreallocation(const std::vector<Datum>& args) {
output_num_buffers_ = static_cast<int>(output_type_.type->layout().buffers.size());
auto out_type_id = output_type_.type->id();
// Default to no validity pre-allocation for following cases:
Expand Down Expand Up @@ -966,12 +999,6 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
data_preallocated_.size() == static_cast<size_t>(output_num_buffers_ - 1) &&
!is_nested(out_type_id) && !is_dictionary(out_type_id));

// TODO(wesm): why was this check ever here? Fixed width binary
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ComputeDataPreallocate can make sure all of the element in data_preallocated_ could satisfy the DCHECK.
It's unnecessary to keep this here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe keeping this because it's only a debug build?

Copy link
Contributor Author

@ZhangHuiGui ZhangHuiGui Jun 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can keep the DCHECK you suggested before ( https://github.com/apache/arrow/pull/41975/files#r1636669846). In essence, they detect the same content, otherwise there will be two duplicate DCHECKs.

// can be 0-width but anything else?
DCHECK(std::all_of(
data_preallocated_.begin(), data_preallocated_.end(),
[](const BufferPreallocation& prealloc) { return prealloc.bit_width >= 0; }));

// Contiguous preallocation only possible on non-nested types if all
// buffers are preallocated. Otherwise, we must go chunk-by-chunk.
//
Expand Down Expand Up @@ -1022,15 +1049,6 @@ Status CheckCanExecuteChunked(const VectorKernel* kernel) {
class VectorExecutor : public KernelExecutorImpl<VectorKernel> {
public:
Status Execute(const ExecBatch& batch, ExecListener* listener) override {
// Some vector kernels have a separate code path for handling
// chunked arrays (VectorKernel::exec_chunked) so we check if we
// have any chunked arrays. If we do and an exec_chunked function
// is defined then we call that.
bool have_chunked_arrays = false;
for (const Datum& arg : batch.values) {
if (arg.is_chunked_array()) have_chunked_arrays = true;
}

output_num_buffers_ = static_cast<int>(output_type_.type->layout().buffers.size());

// Decide if we need to preallocate memory for this kernel
Expand All @@ -1049,10 +1067,19 @@ class VectorExecutor : public KernelExecutorImpl<VectorKernel> {
RETURN_NOT_OK(Exec(span, listener));
}
} else {
// Kernel cannot execute chunkwise. If we have any chunked
// arrays, then VectorKernel::exec_chunked must be defined
// otherwise we raise an error
// Some vector kernels have a separate code path for handling
// chunked arrays (VectorKernel::exec_chunked) so we check if we
// have any chunked arrays. If we do and an exec_chunked function
// is defined then we call that.
bool have_chunked_arrays = false;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have_chunked_arrays only used in kernel_->can_execute_chunkwise=false's branch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this code move. That i could only understand when looking commit by commit.

for (const Datum& arg : batch.values) {
if (arg.is_chunked_array()) have_chunked_arrays = true;
}

if (have_chunked_arrays) {
// Kernel cannot execute chunkwise. If we have any chunked
// arrays, then VectorKernel::exec_chunked must be defined
// otherwise we raise an error
RETURN_NOT_OK(ExecChunked(batch, listener));
} else {
// No chunked arrays. We pack the args into an ExecSpan and
Expand Down
3 changes: 0 additions & 3 deletions cpp/src/arrow/compute/exec_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,6 @@ class ARROW_EXPORT KernelExecutor {
/// for all scanned batches in a dataset filter.
virtual Status Init(KernelContext*, KernelInitArgs) = 0;

// TODO(wesm): per ARROW-16819, adding ExecBatch variant so that a batch
// length can be passed in for scalar functions; will have to return and
// clean a bunch of things up
virtual Status Execute(const ExecBatch& batch, ExecListener* listener) = 0;

virtual Datum WrapResults(const std::vector<Datum>& args,
Expand Down