Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
7aa5170
Take: Add VectorKernel::ChunkedExec to SelectionKernelData
felipecrv May 3, 2024
0137911
Take: VectorKernel::output_chunked should be false for "array_take"
felipecrv May 4, 2024
91829d9
Take: Make "array_take" handle CA->C cases by populating VectorKernel…
felipecrv May 4, 2024
ab18d42
gather_internal.h: Introduce GatherFromChunks
felipecrv Apr 27, 2024
f2ab899
Take: Introduce ValueSpan to delay dispatching on chunked-ness
felipecrv May 5, 2024
35de083
Take: Implement the FixedWidthTakeChunkedExec() kernel using GatherFr…
felipecrv May 5, 2024
e39e8b4
Take: Adapt kernel to the ChunkResolver changes
felipecrv Sep 2, 2024
40ba633
TakeMetaFunction: Update comment about what the MetaFunction does
felipecrv Jun 12, 2024
7f3470e
Take: Support CA->C and CC->C cases directly in "array_take" with 2 s…
felipecrv Jun 14, 2024
a28615e
Take: Simplify TakeMetaFunction even further
felipecrv Jun 15, 2024
c4eb070
Remove all ARROW_NOINLINE from vector_selection_take_internal.cc
felipecrv Jun 26, 2024
3302a24
gather_intenal.h: Clarify the semantics of ValiditySpan/IsSrcValid
felipecrv Jun 26, 2024
991609f
Take: Fix silly mistake
felipecrv Aug 16, 2024
fe18180
Small fixes from PR feedback
felipecrv Aug 20, 2024
6a6941f
Take: Use fixed size blocks of locations when running TakeCA
felipecrv Sep 1, 2024
31532f5
Take: Lazily build a ChunkResolver from ValuesSpan
felipecrv Sep 1, 2024
7380247
Take: Move the ValuesSpan class to the header
felipecrv Sep 1, 2024
daf0eed
Selection: Fix UB -- nothing guarantees these references to spans are…
felipecrv Sep 1, 2024
8e3f750
Selection: Make sub-classes constructable with ValueSpan and ArraySpan's
felipecrv Sep 1, 2024
9388bac
Take: Create a signature for Take kernels support AAA and CAA calls
felipecrv Sep 1, 2024
1bcff91
Fixes
pitrou Jun 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 160 additions & 7 deletions cpp/src/arrow/compute/kernels/gather_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <vector>

#include "arrow/array/array_base.h"
#include "arrow/array/data.h"
#include "arrow/chunk_resolver.h"
#include "arrow/chunked_array.h"
#include "arrow/type_fwd.h"
#include "arrow/type_traits.h"
#include "arrow/util/bit_block_counter.h"
#include "arrow/util/bit_run_reader.h"
#include "arrow/util/bit_util.h"
Expand Down Expand Up @@ -52,6 +58,15 @@ class GatherBaseCRTP {
ARROW_DEFAULT_MOVE_AND_ASSIGN(GatherBaseCRTP);

protected:
template <typename IndexCType>
bool IsSrcValid(const ArraySpan& src_validity, const IndexCType* idx,
int64_t position) const {
// Translate position into index on the source
const int64_t index = idx[position];
ARROW_COMPILER_ASSUME(src_validity.buffers[0].data != nullptr);
return src_validity.IsValid(index);
}

ARROW_FORCE_INLINE int64_t ExecuteNoNulls(int64_t idx_length) {
auto* self = static_cast<GatherImpl*>(this);
for (int64_t position = 0; position < idx_length; position++) {
Expand All @@ -76,8 +91,12 @@ class GatherBaseCRTP {
// doesn't have to be called for resulting null positions. A position is
// considered null if either the index or the source value is null at that
// position.
template <bool kOutputIsZeroInitialized, typename IndexCType>
ARROW_FORCE_INLINE int64_t ExecuteWithNulls(const ArraySpan& src_validity,
//
// ValiditySpan is any class that `GatherImpl::IsSrcValid(src_validity, idx, position)`
// can be called with.
template <bool kOutputIsZeroInitialized, typename IndexCType,
class ValiditySpan = ArraySpan>
ARROW_FORCE_INLINE int64_t ExecuteWithNulls(const ValiditySpan& src_validity,
int64_t idx_length, const IndexCType* idx,
const ArraySpan& idx_validity,
uint8_t* out_is_valid) {
Expand Down Expand Up @@ -116,12 +135,11 @@ class GatherBaseCRTP {
position += block.length;
}
} else {
// Source values may be null, so we must do random access into src_validity
// Source values may be null, so we must do random access with IsSrcValid()
if (block.popcount == block.length) {
// Faster path: indices are not null but source values may be
for (int64_t i = 0; i < block.length; ++i) {
ARROW_COMPILER_ASSUME(src_validity.buffers[0].data != nullptr);
if (src_validity.IsValid(idx[position])) {
if (self->IsSrcValid(src_validity, idx, position)) {
// value is not null
self->WriteValue(position);
bit_util::SetBit(out_is_valid, position);
Expand All @@ -136,9 +154,9 @@ class GatherBaseCRTP {
// random access in general we have to check the value nullness one by
// one.
for (int64_t i = 0; i < block.length; ++i) {
ARROW_COMPILER_ASSUME(src_validity.buffers[0].data != nullptr);
ARROW_COMPILER_ASSUME(idx_validity.buffers[0].data != nullptr);
if (idx_validity.IsValid(position) && src_validity.IsValid(idx[position])) {
if (idx_validity.IsValid(position) &&
self->IsSrcValid(src_validity, idx, position)) {
// index is not null && value is not null
self->WriteValue(position);
bit_util::SetBit(out_is_valid, position);
Expand Down Expand Up @@ -303,4 +321,139 @@ class Gather</*kValueWidthInBits=*/1, IndexCType, /*kWithFactor=*/false>
}
};

template <typename IndexCType>
struct ChunkedValiditySpan {
const ChunkedArray& chunks_validity;
const TypedChunkLocation<IndexCType>* chunk_location_vec;
const bool may_have_nulls;

ChunkedValiditySpan(const ChunkedArray& chunks_validity,
const TypedChunkLocation<IndexCType>* chunk_location_vec)
: chunks_validity(chunks_validity),
chunk_location_vec(chunk_location_vec),
may_have_nulls(chunks_validity.null_count() > 0) {}

bool MayHaveNulls() const { return may_have_nulls; }

bool IsSrcValid(const IndexCType* idx, int64_t position) const {
// idx is unused because all the indices have been pre-resolved into
// `chunk_location_vec` by ChunkResolver::ResolveMany.
ARROW_UNUSED(idx);
auto loc = chunk_location_vec[position];
return chunks_validity.chunk(static_cast<int>(loc.chunk_index))
->IsValid(loc.index_in_chunk);
}
};

template <int kValueWidthInBits, typename IndexCType, bool kWithFactor>
class GatherFromChunks
: public GatherBaseCRTP<
GatherFromChunks<kValueWidthInBits, IndexCType, kWithFactor>> {
private:
static_assert(!kWithFactor || kValueWidthInBits == 8,
"kWithFactor is only supported for kValueWidthInBits == 8");
static_assert(kValueWidthInBits == 1 || kValueWidthInBits % 8 == 0);
// kValueWidth should not be used if kValueWidthInBits == 1.
static constexpr int kValueWidth = kValueWidthInBits / 8;

// src_residual_bit_offsets_[i] is used to store the bit offset of the first byte (0-7)
// in src_chunks_[i] iff kValueWidthInBits == 1.
const int* src_residual_bit_offsets_ = NULLPTR;
// Pre-computed pointers to the start of the values in each chunk.
const uint8_t* const* src_chunks_;
// Number indices resolved in chunk_location_vec_.
const int64_t idx_length_;
const TypedChunkLocation<IndexCType>* chunk_location_vec_;

uint8_t* out_;
int64_t factor_;

public:
void WriteValue(int64_t position) {
auto loc = chunk_location_vec_[position];
auto* chunk = src_chunks_[loc.chunk_index];
if constexpr (kValueWidthInBits == 1) {
auto src_offset = src_residual_bit_offsets_[loc.chunk_index];
bit_util::SetBitTo(out_, position,
bit_util::GetBit(chunk, src_offset + loc.index_in_chunk));
} else if constexpr (kWithFactor) {
const int64_t scaled_factor = kValueWidth * factor_;
memcpy(out_ + position * scaled_factor, chunk + loc.index_in_chunk * scaled_factor,
scaled_factor);
} else {
memcpy(out_ + position * kValueWidth, chunk + loc.index_in_chunk * kValueWidth,
kValueWidth);
}
}

void WriteZero(int64_t position) {
if constexpr (kValueWidthInBits == 1) {
bit_util::ClearBit(out_, position);
} else if constexpr (kWithFactor) {
const int64_t scaled_factor = kValueWidth * factor_;
memset(out_ + position * scaled_factor, 0, scaled_factor);
} else {
memset(out_ + position * kValueWidth, 0, kValueWidth);
}
}

void WriteZeroSegment(int64_t position, int64_t block_length) {
if constexpr (kValueWidthInBits == 1) {
bit_util::SetBitsTo(out_, position, block_length, false);
} else if constexpr (kWithFactor) {
const int64_t scaled_factor = kValueWidth * factor_;
memset(out_ + position * scaled_factor, 0, block_length * scaled_factor);
} else {
memset(out_ + position * kValueWidth, 0, block_length * kValueWidth);
}
}

bool IsSrcValid(const ChunkedValiditySpan<IndexCType>& src_validity,
const IndexCType* idx, int64_t position) const {
return src_validity.IsSrcValid(idx, position);
}

public:
GatherFromChunks(const int* src_residual_bit_offsets, const uint8_t* const* src_chunks,
const int64_t idx_length,
const TypedChunkLocation<IndexCType>* chunk_location_vec, uint8_t* out,
int64_t factor = 1)
: src_residual_bit_offsets_(src_residual_bit_offsets),
src_chunks_(src_chunks),
idx_length_(idx_length),
chunk_location_vec_(chunk_location_vec),
out_(out),
factor_(factor) {
assert(src_chunks && chunk_location_vec_ && out);
if constexpr (kValueWidthInBits == 1) {
assert(src_residual_bit_offsets);
}
assert((kWithFactor || factor == 1) &&
"When kWithFactor is false, the factor is assumed to be 1 at compile time");
}

ARROW_FORCE_INLINE int64_t Execute() { return this->ExecuteNoNulls(idx_length_); }

/// \pre If kOutputIsZeroInitialized, then this->out_ has to be zero initialized.
/// \pre Bits in out_is_valid have to always be zero initialized.
/// \post The bits for the valid elements (and only those) are set in out_is_valid.
/// \post If !kOutputIsZeroInitialized, then positions in this->_out containing null
/// elements have 0s written to them. This might be less efficient than
/// zero-initializing first and calling this->Execute() afterwards.
/// \return The number of valid elements in out.
template <bool kOutputIsZeroInitialized = false>
ARROW_FORCE_INLINE int64_t Execute(const ChunkedArray& src_validity,
const ArraySpan& idx_validity,
uint8_t* out_is_valid) {
assert(idx_length_ == idx_validity.length);
assert(out_is_valid);
assert(idx_validity.type->byte_width() == sizeof(IndexCType));
ChunkedValiditySpan src_validity_span{src_validity, chunk_location_vec_};
assert(src_validity_span.MayHaveNulls() || idx_validity.MayHaveNulls());
// idx=NULLPTR because when it's passed to IsSrcValid() defined above, it's not used.
return this->template ExecuteWithNulls<kOutputIsZeroInitialized, IndexCType>(
src_validity_span, idx_length_, /*idx=*/NULLPTR, idx_validity, out_is_valid);
}
};

} // namespace arrow::internal
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/kernels/vector_selection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ std::shared_ptr<VectorFunction> MakeIndicesNonZeroFunction(std::string name,
VectorKernel kernel;
kernel.null_handling = NullHandling::OUTPUT_NOT_NULL;
kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
// "array_take" ensures that the output will be be chunked when at least one
// input is chunked, so we need to set this to false.
kernel.output_chunked = false;
kernel.exec = IndicesNonZeroExec;
kernel.exec_chunked = IndicesNonZeroExecChunked;
Expand Down Expand Up @@ -339,6 +341,7 @@ void RegisterVectorSelection(FunctionRegistry* registry) {
VectorKernel take_base;
take_base.init = TakeState::Init;
take_base.can_execute_chunkwise = false;
take_base.output_chunked = false;
RegisterSelectionFunction("array_take", array_take_doc, take_base,
std::move(take_kernels), GetDefaultTakeOptions(), registry);

Expand Down
15 changes: 10 additions & 5 deletions cpp/src/arrow/compute/kernels/vector_selection_filter_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -895,18 +895,23 @@ Status ExtensionFilterExec(KernelContext* ctx, const ExecSpan& batch, ExecResult
}

// Transform filter to selection indices and then use Take.
Status FilterWithTakeExec(const ArrayKernelExec& take_exec, KernelContext* ctx,
Status FilterWithTakeExec(TakeKernelExec take_aaa_exec, KernelContext* ctx,
const ExecSpan& batch, ExecResult* out) {
std::shared_ptr<ArrayData> indices;
std::shared_ptr<ArrayData> indices_data;
RETURN_NOT_OK(GetTakeIndices(batch[1].array,
FilterState::Get(ctx).null_selection_behavior,
ctx->memory_pool())
.Value(&indices));
.Value(&indices_data));

KernelContext take_ctx(*ctx);
TakeState state{TakeOptions::NoBoundsCheck()};
take_ctx.SetState(&state);
ExecSpan take_batch({batch[0], ArraySpan(*indices)}, batch.length);
return take_exec(&take_ctx, take_batch, out);

ValuesSpan values(batch[0].array);
std::shared_ptr<ArrayData> out_data = out->array_data();
RETURN_NOT_OK(take_aaa_exec(&take_ctx, values, *indices_data, &out_data));
out->value = std::move(out_data);
return Status::OK();
}

// Due to the special treatment with their Take kernels, we filter Struct and SparseUnion
Expand Down
Loading
Loading