Skip to content
Merged
15 changes: 3 additions & 12 deletions cpp/src/arrow/array/array_primitive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,9 @@ int64_t BooleanArray::false_count() const {
int64_t BooleanArray::true_count() const {
if (data_->null_count.load() != 0) {
DCHECK(data_->buffers[0]);
internal::BinaryBitBlockCounter bit_counter(data_->buffers[0]->data(), data_->offset,
data_->buffers[1]->data(), data_->offset,
data_->length);
int64_t count = 0;
while (true) {
internal::BitBlockCount block = bit_counter.NextAndWord();
if (block.length == 0) {
break;
}
count += block.popcount;
}
return count;
return internal::CountAndSetBits(data_->buffers[0]->data(), data_->offset,
data_->buffers[1]->data(), data_->offset,
data_->length);
} else {
return internal::CountSetBits(data_->buffers[1]->data(), data_->offset,
data_->length);
Expand Down
10 changes: 8 additions & 2 deletions cpp/src/arrow/array/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ void ArraySpan::SetMembers(const ArrayData& data) {
}
}

Type::type type_id = this->type->id();
if (data.buffers[0] == nullptr && type_id != Type::NA &&
type_id != Type::SPARSE_UNION && type_id != Type::DENSE_UNION) {
Comment on lines +155 to +156
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// This should already be zero but we make for sure
this->null_count = 0;
}

// Makes sure any other buffers are seen as null / non-existent
for (int i = static_cast<int>(data.buffers.size()); i < 3; ++i) {
ClearBuffer(i);
Expand Down Expand Up @@ -208,7 +215,6 @@ int64_t ArraySpan::GetNullCount() const {
int GetNumBuffers(const DataType& type) {
switch (type.id()) {
case Type::NA:
return 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — if you look at NullType::layout() there's a single buffer that's always null, but this seems to conflict with the columnar format documentation. @pitrou do you know more about this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I suppose it's like how here we have unions listed with three buffers even though the spec lists only one/two.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that was to avoid breaking compatibility. We noticed too late that this was inconsistent with the format spec.
We bridge for the inconsistency in the IPC and C data layers.

case Type::STRUCT:
case Type::FIXED_SIZE_LIST:
return 1;
Expand All @@ -232,7 +238,7 @@ int ArraySpan::num_buffers() const { return GetNumBuffers(*this->type); }

std::shared_ptr<ArrayData> ArraySpan::ToArrayData() const {
auto result = std::make_shared<ArrayData>(this->type->Copy(), this->length,
kUnknownNullCount, this->offset);
this->null_count, this->offset);

for (int i = 0; i < this->num_buffers(); ++i) {
if (this->buffers[i].owner) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ struct DictionaryBuilderCase {
out->reset(new internal::DictionaryBuilderBase<TypeErasedIntBuilder, ValueType>(
index_type, value_type, pool));
} else {
auto start_int_size = internal::GetByteWidth(*index_type);
auto start_int_size = index_type->byte_width();
out->reset(new AdaptiveBuilderType(start_int_size, value_type, pool));
}
return Status::OK();
Expand Down
8 changes: 4 additions & 4 deletions cpp/src/arrow/compare.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1022,10 +1022,10 @@ bool IntegerTensorEquals(const Tensor& left, const Tensor& right) {
if (!(left_row_major_p && right_row_major_p) &&
!(left_column_major_p && right_column_major_p)) {
const auto& type = checked_cast<const FixedWidthType&>(*left.type());
are_equal = StridedIntegerTensorContentEquals(0, 0, 0, internal::GetByteWidth(type),
left, right);
are_equal =
StridedIntegerTensorContentEquals(0, 0, 0, type.byte_width(), left, right);
} else {
const int byte_width = internal::GetByteWidth(*left.type());
const int byte_width = left.type()->byte_width();
DCHECK_GT(byte_width, 0);

const uint8_t* left_data = left.data()->data();
Expand Down Expand Up @@ -1195,7 +1195,7 @@ struct SparseTensorEqualsImpl<SparseIndexType, SparseIndexType> {
return false;
}

const int byte_width = internal::GetByteWidth(*left.type());
const int byte_width = left.type()->byte_width();
DCHECK_GT(byte_width, 0);

const uint8_t* left_data = left.data()->data();
Expand Down
7 changes: 3 additions & 4 deletions cpp/src/arrow/compute/api_vector.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "arrow/compute/api_vector.h"

#include <algorithm>
#include <memory>
#include <sstream>
#include <utility>
Expand All @@ -26,6 +27,7 @@
#include "arrow/array/builder_primitive.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/function_internal.h"
#include "arrow/compute/kernels/vector_sort_internal.h"
#include "arrow/compute/registry.h"
#include "arrow/datum.h"
#include "arrow/record_batch.h"
Expand Down Expand Up @@ -305,10 +307,7 @@ Result<std::shared_ptr<Array>> SortIndices(const ChunkedArray& chunked_array,

Result<std::shared_ptr<Array>> SortIndices(const ChunkedArray& chunked_array,
SortOrder order, ExecContext* ctx) {
SortOptions options({SortKey("not-used", order)});
ARROW_ASSIGN_OR_RAISE(
Datum result, CallFunction("sort_indices", {Datum(chunked_array)}, &options, ctx));
return result.make_array();
return SortIndices(chunked_array, ArraySortOptions(order), ctx);
}

Result<std::shared_ptr<Array>> SortIndices(const Datum& datum, const SortOptions& options,
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/compute/api_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,14 @@ namespace internal {

/// \brief Return the number of selected indices in the boolean filter
ARROW_EXPORT
int64_t GetFilterOutputSize(const ArrayData& filter,
int64_t GetFilterOutputSize(const ArraySpan& filter,
FilterOptions::NullSelectionBehavior null_selection);

/// \brief Compute uint64 selection indices for use with Take given a boolean
/// filter
ARROW_EXPORT
Result<std::shared_ptr<ArrayData>> GetTakeIndices(
const ArrayData& filter, FilterOptions::NullSelectionBehavior null_selection,
const ArraySpan& filter, FilterOptions::NullSelectionBehavior null_selection,
MemoryPool* memory_pool = default_memory_pool());

} // namespace internal
Expand Down
159 changes: 90 additions & 69 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,10 +332,14 @@ Status ExecSpanIterator::Init(const ExecBatch& batch, ValueDescr::Shape output_s
int64_t max_chunksize) {
if (batch.num_values() > 0) {
// Validate arguments
ARROW_ASSIGN_OR_RAISE(int64_t inferred_length, InferBatchLength(batch.values));
bool all_args_same_length = false;
int64_t inferred_length = InferBatchLength(batch.values, &all_args_same_length);
if (inferred_length != batch.length) {
return Status::Invalid("Value lengths differed from ExecBatch length");
}
if (!all_args_same_length) {
return Status::Invalid("Array arguments must all be the same length");
}
}
args_ = &batch.values;
initialized_ = have_chunked_arrays_ = false;
Expand Down Expand Up @@ -991,43 +995,62 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
ExecSpanIterator span_iterator_;
};

Status PackBatchNoChunks(const std::vector<Datum>& args, ExecBatch* out) {
int64_t length = 0;
for (const auto& arg : args) {
switch (arg.kind()) {
case Datum::SCALAR:
case Datum::ARRAY:
case Datum::CHUNKED_ARRAY:
length = std::max(arg.length(), length);
break;
default:
DCHECK(false);
break;
}
}
out->length = length;
out->values = args;
return Status::OK();
}

class VectorExecutor : public KernelExecutorImpl<VectorKernel> {
public:
Status ExecuteImpl(const std::vector<Datum>& args, ExecListener* listener) {
RETURN_NOT_OK(PrepareExecute(args));
ExecBatch batch;
Status Execute(const ExecBatch& batch, ExecListener* listener) override {
// TODO(wesm): remove in ARROW-16577
if (output_descr_.shape == ValueDescr::SCALAR) {
return Status::Invalid("VectorExecutor only supports array output types");
}

// Some vector kernels have a separate code path for handling
// chunked arrays (VectorKernel::exec_chunked) so we check if we
// have any chunked arrays. If we do and an exec_chunked function
// is defined then we call that.
bool have_chunked_arrays = false;
for (const Datum& arg : batch.values) {
if (arg.is_chunked_array()) have_chunked_arrays = true;
}

output_num_buffers_ = static_cast<int>(output_descr_.type->layout().buffers.size());

// Decide if we need to preallocate memory for this kernel
validity_preallocated_ =
(kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE &&
kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL);
if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
ComputeDataPreallocate(*output_descr_.type, &data_preallocated_);
}

if (kernel_->can_execute_chunkwise) {
while (batch_iterator_->Next(&batch)) {
RETURN_NOT_OK(ExecuteBatch(batch, listener));
RETURN_NOT_OK(span_iterator_.Init(batch, output_descr_.shape,
exec_context()->exec_chunksize()));
ExecSpan span;
while (span_iterator_.Next(&span)) {
RETURN_NOT_OK(Exec(span, listener));
}
} else {
RETURN_NOT_OK(PackBatchNoChunks(args, &batch));
RETURN_NOT_OK(ExecuteBatch(batch, listener));
// Kernel cannot execute chunkwise. If we have any chunked
// arrays, then VectorKernel::exec_chunked must be defined
// otherwise we raise an error
if (have_chunked_arrays) {
RETURN_NOT_OK(ExecChunked(batch, listener));
} else {
// No chunked arrays. We pack the args into an ExecSpan and
// call the regular exec code path
RETURN_NOT_OK(Exec(ExecSpan(batch), listener));
}
}
return Finalize(listener);
}

Status Execute(const ExecBatch& batch, ExecListener* listener) override {
return ExecuteImpl(batch.values, listener);
if (kernel_->finalize) {
// Intermediate results require post-processing after the execution is
// completed (possibly involving some accumulated state)
RETURN_NOT_OK(kernel_->finalize(kernel_ctx_, &results_));
for (const auto& result : results_) {
RETURN_NOT_OK(listener->OnResult(result));
}
}
return Status::OK();
}

Datum WrapResults(const std::vector<Datum>& inputs,
Expand All @@ -1047,59 +1070,54 @@ class VectorExecutor : public KernelExecutorImpl<VectorKernel> {
}

protected:
Status ExecuteBatch(const ExecBatch& batch, ExecListener* listener) {
Datum out;
if (output_descr_.shape == ValueDescr::ARRAY) {
// We preallocate (maybe) only for the output of processing the current
// batch
ARROW_ASSIGN_OR_RAISE(out.value, PrepareOutput(batch.length));
}
Status Exec(const ExecSpan& span, ExecListener* listener) {
ExecResult out;

if (kernel_->null_handling == NullHandling::INTERSECTION &&
output_descr_.shape == ValueDescr::ARRAY) {
RETURN_NOT_OK(PropagateNulls(kernel_ctx_, ExecSpan(batch), out.mutable_array()));
// We preallocate (maybe) only for the output of processing the current
// batch, but create an output ArrayData instance regardless
ARROW_ASSIGN_OR_RAISE(out.value, PrepareOutput(span.length));

if (kernel_->null_handling == NullHandling::INTERSECTION) {
RETURN_NOT_OK(PropagateNulls(kernel_ctx_, span, out.array_data().get()));
}
RETURN_NOT_OK(kernel_->exec(kernel_ctx_, batch, &out));
RETURN_NOT_OK(kernel_->exec(kernel_ctx_, span, &out));
if (!kernel_->finalize) {
// If there is no result finalizer (e.g. for hash-based functions, we can
// emit the processed batch right away rather than waiting
RETURN_NOT_OK(listener->OnResult(std::move(out)));
RETURN_NOT_OK(listener->OnResult(out.array_data()));
} else {
results_.emplace_back(std::move(out));
results_.emplace_back(out.array_data());
}
return Status::OK();
}

Status Finalize(ExecListener* listener) {
if (kernel_->finalize) {
// Intermediate results require post-processing after the execution is
// completed (possibly involving some accumulated state)
RETURN_NOT_OK(kernel_->finalize(kernel_ctx_, &results_));
for (const auto& result : results_) {
RETURN_NOT_OK(listener->OnResult(result));
}
Status ExecChunked(const ExecBatch& batch, ExecListener* listener) {
if (kernel_->exec_chunked == nullptr) {
return Status::Invalid(
"Vector kernel cannot execute chunkwise and no "
"chunked exec function was defined");
}
return Status::OK();
}

Status PrepareExecute(const std::vector<Datum>& args) {
if (kernel_->can_execute_chunkwise) {
ARROW_ASSIGN_OR_RAISE(batch_iterator_, ExecBatchIterator::Make(
args, exec_context()->exec_chunksize()));
if (kernel_->null_handling == NullHandling::INTERSECTION) {
return Status::Invalid(
"Null pre-propagation is unsupported for ChunkedArray "
"execution in vector kernels");
}
output_num_buffers_ = static_cast<int>(output_descr_.type->layout().buffers.size());

// Decide if we need to preallocate memory for this kernel
validity_preallocated_ =
(kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE &&
kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL);
if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
ComputeDataPreallocate(*output_descr_.type, &data_preallocated_);
Datum out;
ARROW_ASSIGN_OR_RAISE(out.value, PrepareOutput(batch.length));
RETURN_NOT_OK(kernel_->exec_chunked(kernel_ctx_, batch, &out));
if (!kernel_->finalize) {
// If there is no result finalizer (e.g. for hash-based functions, we can
// emit the processed batch right away rather than waiting
RETURN_NOT_OK(listener->OnResult(std::move(out)));
} else {
results_.emplace_back(std::move(out));
}
return Status::OK();
}

std::unique_ptr<ExecBatchIterator> batch_iterator_;
ExecSpanIterator span_iterator_;
std::vector<Datum> results_;
};

Expand Down Expand Up @@ -1270,7 +1288,7 @@ std::unique_ptr<KernelExecutor> KernelExecutor::MakeScalarAggregate() {
return ::arrow::internal::make_unique<detail::ScalarAggExecutor>();
}

Result<int64_t> InferBatchLength(const std::vector<Datum>& values) {
int64_t InferBatchLength(const std::vector<Datum>& values, bool* all_same) {
int64_t length = -1;
bool are_all_scalar = true;
for (const Datum& arg : values) {
Expand All @@ -1280,7 +1298,8 @@ Result<int64_t> InferBatchLength(const std::vector<Datum>& values) {
length = arg_length;
} else {
if (length != arg_length) {
return Status::Invalid("Array arguments must all be the same length");
*all_same = false;
return length;
}
}
are_all_scalar = false;
Expand All @@ -1290,7 +1309,8 @@ Result<int64_t> InferBatchLength(const std::vector<Datum>& values) {
length = arg_length;
} else {
if (length != arg_length) {
return Status::Invalid("Array arguments must all be the same length");
*all_same = false;
return length;
}
}
are_all_scalar = false;
Expand All @@ -1302,6 +1322,7 @@ Result<int64_t> InferBatchLength(const std::vector<Datum>& values) {
} else if (length < 0) {
length = 0;
}
*all_same = true;
return length;
}

Expand Down
9 changes: 6 additions & 3 deletions cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

#pragma once

#include <algorithm>
#include <atomic>
#include <cstdint>
#include <limits>
Expand Down Expand Up @@ -397,8 +396,12 @@ struct ARROW_EXPORT ExecSpan {
}

bool is_all_scalar() const {
return std::all_of(this->values.begin(), this->values.end(),
[](const ExecValue& v) { return v.is_scalar(); });
for (const ExecValue& value : this->values) {
if (value.is_array()) {
return false;
}
}
return true;
}

/// \brief Return the value at the i-th index
Expand Down
Loading