From 7ea173cd54f67ded0bd14afca6eae926609a5698 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 6 Jun 2022 17:51:13 -0700 Subject: [PATCH 01/15] feat: implement dispatch for ChunkedArray take --- .../arrow/compute/kernels/vector_selection.cc | 89 ++++++++----------- 1 file changed, 35 insertions(+), 54 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc b/cpp/src/arrow/compute/kernels/vector_selection.cc index 5060b06465b..9e2d42ddefb 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection.cc @@ -2005,70 +2005,51 @@ Result> TakeAA(const std::shared_ptr& valu return result.array(); } +Result> TakeCC(const ChunkedArray& values, + const ChunkedArray& indices, + const TakeOptions& options, + ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE( + Datum result, + CallFunction("array_take", {Datum(values), Datum(indices)}, &options, ctx)); + return result.chunked_array(); +} + Result> TakeCA(const ChunkedArray& values, - const Array& indices, + std::shared_ptr indices, const TakeOptions& options, ExecContext* ctx) { auto num_chunks = values.num_chunks(); - std::shared_ptr current_chunk; - - // Case 1: `values` has a single chunk, so just use it - if (num_chunks == 1) { - current_chunk = values.chunk(0); - } else { - // TODO Case 2: See if all `indices` fall in the same chunk and call Array Take on it - // See - // https://github.com/apache/arrow/blob/6f2c9041137001f7a9212f244b51bc004efc29af/r/src/compute.cpp#L123-L151 - // TODO Case 3: If indices are sorted, can slice them and call Array Take - // Case 4: Else, concatenate chunks and call Array Take - if (values.chunks().empty()) { + // If `values` has zero or one chunks, just use the AA implementation + if (num_chunks <= 1) { + std::shared_ptr current_chunk; + // Case 1: `values` has a single chunk, so just use it + if (num_chunks == 0) { + current_chunk = values.chunk(0); + } else { + // Case 2: `values` has no chunks, so create an empty one ARROW_ASSIGN_OR_RAISE(current_chunk, MakeArrayOfNull(values.type(), /*length=*/0, ctx->memory_pool())); - } else { - ARROW_ASSIGN_OR_RAISE(current_chunk, - Concatenate(values.chunks(), ctx->memory_pool())); } + // Call Array Take on our single chunk + ARROW_ASSIGN_OR_RAISE(std::shared_ptr new_chunk, + TakeAA(current_chunk->data(), indices->data(), options, ctx)); + std::vector> chunks = {MakeArray(new_chunk)}; + return std::make_shared(std::move(chunks)); + // Case 3: + } else { + ChunkedArray indices_chunked(indices); + return TakeCC(values, indices_chunked, options, ctx); } - // Call Array Take on our single chunk - ARROW_ASSIGN_OR_RAISE(std::shared_ptr new_chunk, - TakeAA(current_chunk->data(), indices.data(), options, ctx)); - std::vector> chunks = {MakeArray(new_chunk)}; - return std::make_shared(std::move(chunks)); -} - -Result> TakeCC(const ChunkedArray& values, - const ChunkedArray& indices, - const TakeOptions& options, - ExecContext* ctx) { - auto num_chunks = indices.num_chunks(); - std::vector> new_chunks(num_chunks); - for (int i = 0; i < num_chunks; i++) { - // Take with that indices chunk - // Note that as currently implemented, this is inefficient because `values` - // will get concatenated on every iteration of this loop - ARROW_ASSIGN_OR_RAISE(std::shared_ptr current_chunk, - TakeCA(values, *indices.chunk(i), options, ctx)); - // Concatenate the result to make a single array for this chunk - ARROW_ASSIGN_OR_RAISE(new_chunks[i], - Concatenate(current_chunk->chunks(), ctx->memory_pool())); - } - return std::make_shared(std::move(new_chunks), values.type()); } -Result> TakeAC(const Array& values, +Result> TakeAC(std::shared_ptr values, const ChunkedArray& indices, const TakeOptions& options, ExecContext* ctx) { - auto num_chunks = indices.num_chunks(); - std::vector> new_chunks(num_chunks); - for (int i = 0; i < num_chunks; i++) { - // Take with that indices chunk - ARROW_ASSIGN_OR_RAISE(std::shared_ptr chunk, - TakeAA(values.data(), indices.chunk(i)->data(), options, ctx)); - new_chunks[i] = MakeArray(chunk); - } - return std::make_shared(std::move(new_chunks), values.type()); + ChunkedArray values_chunked(values); + return TakeCC(values_chunked, indices, options, ctx); } Result> TakeRA(const RecordBatch& batch, @@ -2086,7 +2067,7 @@ Result> TakeRA(const RecordBatch& batch, return RecordBatch::Make(batch.schema(), nrows, std::move(columns)); } -Result> TakeTA(const Table& table, const Array& indices, +Result> TakeTA(const Table& table, std::shared_ptr indices, const TakeOptions& options, ExecContext* ctx) { auto ncols = table.num_columns(); std::vector> columns(ncols); @@ -2138,12 +2119,12 @@ class TakeMetaFunction : public MetaFunction { if (index_kind == Datum::ARRAY) { return TakeAA(args[0].array(), args[1].array(), take_opts, ctx); } else if (index_kind == Datum::CHUNKED_ARRAY) { - return TakeAC(*args[0].make_array(), *args[1].chunked_array(), take_opts, ctx); + return TakeAC(args[0].make_array(), *args[1].chunked_array(), take_opts, ctx); } break; case Datum::CHUNKED_ARRAY: if (index_kind == Datum::ARRAY) { - return TakeCA(*args[0].chunked_array(), *args[1].make_array(), take_opts, ctx); + return TakeCA(*args[0].chunked_array(), args[1].make_array(), take_opts, ctx); } else if (index_kind == Datum::CHUNKED_ARRAY) { return TakeCC(*args[0].chunked_array(), *args[1].chunked_array(), take_opts, ctx); @@ -2156,7 +2137,7 @@ class TakeMetaFunction : public MetaFunction { break; case Datum::TABLE: if (index_kind == Datum::ARRAY) { - return TakeTA(*args[0].table(), *args[1].make_array(), take_opts, ctx); + return TakeTA(*args[0].table(), args[1].make_array(), take_opts, ctx); } else if (index_kind == Datum::CHUNKED_ARRAY) { return TakeTC(*args[0].table(), *args[1].chunked_array(), take_opts, ctx); } From d7e7c10040ba5881b796bfce0c7c6e6b8eea1855 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 12 Aug 2022 16:07:28 -0700 Subject: [PATCH 02/15] Draft implementation for ChunkedPrimitiveTake --- cpp/src/arrow/compute/kernel.h | 2 +- .../arrow/compute/kernels/vector_selection.cc | 264 ++++++++++++++++-- cpp/src/arrow/util/int_util.cc | 8 + cpp/src/arrow/util/int_util.h | 3 + 4 files changed, 246 insertions(+), 31 deletions(-) diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index d8960308dff..077bdcb31fa 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -538,7 +538,7 @@ struct ScalarKernel : public Kernel { // ---------------------------------------------------------------------- // VectorKernel (for VectorFunction) -/// \brief Kernel data structure for implementations of VectorFunction. In +/// \brief Kernel data structure for implementations of VectorFunction. It /// contains an optional finalizer function, the null handling and memory /// pre-allocation preferences (which have different defaults from /// ScalarKernel), and some other execution-related options. diff --git a/cpp/src/arrow/compute/kernels/vector_selection.cc b/cpp/src/arrow/compute/kernels/vector_selection.cc index 9e2d42ddefb..3e72f854efa 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection.cc @@ -28,6 +28,7 @@ #include "arrow/array/builder_primitive.h" #include "arrow/array/concatenate.h" #include "arrow/buffer_builder.h" +#include "arrow/chunk_resolver.h" #include "arrow/chunked_array.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/kernels/common.h" @@ -43,6 +44,7 @@ #include "arrow/util/bitmap_ops.h" #include "arrow/util/bitmap_reader.h" #include "arrow/util/int_util.h" +#include "arrow/util/vector.h" namespace arrow { @@ -50,8 +52,11 @@ using internal::BinaryBitBlockCounter; using internal::BitBlockCount; using internal::BitBlockCounter; using internal::CheckIndexBounds; +using internal::ChunkLocation; +using internal::ChunkResolver; using internal::CopyBitmap; using internal::CountSetBits; +using internal::MapVector; using internal::OptionalBitBlockCounter; using internal::OptionalBitIndexer; @@ -365,6 +370,126 @@ struct PrimitiveTakeImpl { } out_arr->null_count = out_arr->length - valid_count; } + + static void Exec(const ChunkedArray& values, const ChunkedArray& indices_chunked, + ArrayData* out_arr) { + auto values_resolver = ChunkResolver(values.chunks()); + const std::vector values_data = MapVector( + [](const auto& x) { return x->data()->template GetValues(1); }, + values.chunks()); + const std::vector values_is_valid = + MapVector([](const auto& x) { return x->null_bitmap_data(); }, values.chunks()); + const std::vector values_offset = + MapVector([](const auto& x) { return x->offset(); }, values.chunks()); + + auto out = out_arr->GetMutableValues(1); + uint8_t* out_is_valid = out_arr->buffers[0]->mutable_data(); + int64_t out_offset = out_arr->offset; + + int64_t position = 0; // Position in output array + int64_t valid_count = 0; + int64_t internal_offset = 0; // Total length of indices chunks already processed + + for (const auto& indices_chunk : indices_chunked.chunks()) { + const ArraySpan indices = ArraySpan(*indices_chunk.get()->data()); + // TODO: How do we reduce duplication of code? + const IndexCType* indices_data = indices.GetValues(1); + const uint8_t* indices_is_valid = indices.buffers[0].data; + int64_t indices_offset = indices.offset; + + // If either the values or indices have nulls, we preemptively zero out the + // out validity bitmap so that we don't have to use ClearBit in each + // iteration for nulls. + if (values.null_count() != 0 || indices.null_count != 0) { + bit_util::SetBitsTo(out_is_valid, out_offset, indices.length, false); + } + + OptionalBitBlockCounter indices_bit_counter(indices_is_valid, indices_offset, + indices.length); + + while (position < internal_offset + indices.length) { + BitBlockCount block = indices_bit_counter.NextBlock(); + if (values.null_count() == 0) { + // Values are never null, so things are easier + valid_count += block.popcount; + if (block.popcount == block.length) { + // Fastest path: neither values nor index nulls + bit_util::SetBitsTo(out_is_valid, out_offset + position, block.length, true); + for (int64_t i = 0; i < block.length; ++i) { + int64_t idx = indices_data[position]; + ChunkLocation loc = values_resolver.Resolve(idx); + out[position] = values_data[loc.chunk_index][loc.index_in_chunk]; + ++position; + } + } else if (block.popcount > 0) { + // Slow path: some indices but not all are null + for (int64_t i = 0; i < block.length; ++i) { + if (bit_util::GetBit(indices_is_valid, indices_offset + position)) { + // index is not null + bit_util::SetBit(out_is_valid, out_offset + position); + int64_t idx = indices_data[position]; + ChunkLocation loc = values_resolver.Resolve(idx); + out[position] = values_data[loc.chunk_index][loc.index_in_chunk]; + } else { + out[position] = ValueCType{}; + } + ++position; + } + } else { + memset(out + position, 0, sizeof(ValueCType) * block.length); + position += block.length; + } + } else { + // Values have nulls, so we must do random access into the values bitmap + if (block.popcount == block.length) { + // Faster path: indices are not null but values may be + for (int64_t i = 0; i < block.length; ++i) { + int64_t idx = indices_data[position]; + ChunkLocation loc = values_resolver.Resolve(idx); + if (bit_util::GetBit(values_is_valid[loc.chunk_index], + values_offset[loc.chunk_index] + loc.index_in_chunk)) { + // value is not null + out[position] = values_data[loc.chunk_index][loc.index_in_chunk]; + bit_util::SetBit(out_is_valid, out_offset + position); + ++valid_count; + } else { + out[position] = ValueCType{}; + } + ++position; + } + } else if (block.popcount > 0) { + // Slow path: some but not all indices are null. Since we are doing + // random access in general we have to check the value nullness one by + // one. + for (int64_t i = 0; i < block.length; ++i) { + int64_t idx = indices_data[position]; + ChunkLocation loc = values_resolver.Resolve(idx); + if (bit_util::GetBit(indices_is_valid, indices_offset + position) && + bit_util::GetBit(values_is_valid[loc.chunk_index], + values_offset[loc.chunk_index] + loc.index_in_chunk)) { + // index is not null && value is not null + out[position] = values_data[loc.chunk_index][loc.index_in_chunk]; + bit_util::SetBit(out_is_valid, out_offset + position); + ++valid_count; + } else { + out[position] = ValueCType{}; + } + ++position; + } + } else { + memset(out + position, 0, sizeof(ValueCType) * block.length); + position += block.length; + } + } + } + + // Start next output at end of what we just wrote. + out_offset += indices.length; + internal_offset += indices.length; + } + + out_arr->null_count = out_arr->length - valid_count; + } }; template @@ -464,6 +589,11 @@ struct BooleanTakeImpl { } out_arr->null_count = out_arr->length - valid_count; } + + static void Exec(const ChunkedArray& values, const ChunkedArray& indices_chunked, + ArrayData* out_arr) { + // TODO + } }; template