diff --git a/cpp/src/arrow/compute/kernels/vector_selection_benchmark.cc b/cpp/src/arrow/compute/kernels/vector_selection_benchmark.cc index e65d5dbcab1..c2a27dfe434 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_benchmark.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_benchmark.cc @@ -115,6 +115,24 @@ struct TakeBenchmark { indices_have_nulls(indices_have_nulls), monotonic_indices(monotonic_indices) {} + static constexpr int kStringMinLength = 0; + static constexpr int kStringMaxLength = 32; + static constexpr int kByteWidthRange = 2; + + template + std::shared_ptr GenChunkedArray(int64_t num_chunks, + GenChunk&& gen_chunk) { + const int64_t chunk_length = + std::llround(args.size / static_cast(num_chunks)); + ArrayVector chunks; + for (int64_t i = 0; i < num_chunks; ++i) { + const int64_t fitting_chunk_length = + std::min(chunk_length, args.size - i * chunk_length); + chunks.push_back(gen_chunk(fitting_chunk_length)); + } + return std::make_shared(std::move(chunks)); + } + void Int64() { auto values = rand.Int64(args.size, -100, 100, args.null_proportion); Bench(values); @@ -129,19 +147,43 @@ struct TakeBenchmark { } void FixedSizeBinary() { - const int32_t byte_width = static_cast(state.range(2)); + const auto byte_width = static_cast(state.range(kByteWidthRange)); auto values = rand.FixedSizeBinary(args.size, byte_width, args.null_proportion); Bench(values); state.counters["byte_width"] = byte_width; } void String() { - int32_t string_min_length = 0, string_max_length = 32; - auto values = std::static_pointer_cast(rand.String( - args.size, string_min_length, string_max_length, args.null_proportion)); + auto values = std::static_pointer_cast( + rand.String(args.size, kStringMinLength, kStringMaxLength, args.null_proportion)); Bench(values); } + void ChunkedInt64(int64_t num_chunks, bool chunk_indices_too) { + auto chunked_array = GenChunkedArray(num_chunks, [this](int64_t chunk_length) { + return rand.Int64(chunk_length, -100, 100, args.null_proportion); + }); + BenchChunked(chunked_array, chunk_indices_too); + } + + void ChunkedFSB(int64_t num_chunks, bool chunk_indices_too) { + const auto byte_width = static_cast(state.range(kByteWidthRange)); + auto chunked_array = + GenChunkedArray(num_chunks, [this, byte_width](int64_t chunk_length) { + return rand.FixedSizeBinary(chunk_length, byte_width, args.null_proportion); + }); + BenchChunked(chunked_array, chunk_indices_too); + state.counters["byte_width"] = byte_width; + } + + void ChunkedString(int64_t num_chunks, bool chunk_indices_too) { + auto chunked_array = GenChunkedArray(num_chunks, [this](int64_t chunk_length) { + return std::static_pointer_cast(rand.String( + chunk_length, kStringMinLength, kStringMaxLength, args.null_proportion)); + }); + BenchChunked(chunked_array, chunk_indices_too); + } + void Bench(const std::shared_ptr& values) { double indices_null_proportion = indices_have_nulls ? args.null_proportion : 0; auto indices = @@ -158,6 +200,40 @@ struct TakeBenchmark { } state.SetItemsProcessed(state.iterations() * values->length()); } + + void BenchChunked(const std::shared_ptr& values, bool chunk_indices_too) { + double indices_null_proportion = indices_have_nulls ? args.null_proportion : 0; + auto indices = + rand.Int32(values->length(), 0, static_cast(values->length() - 1), + indices_null_proportion); + + if (monotonic_indices) { + auto arg_sorter = *SortIndices(*indices); + indices = *Take(*indices, *arg_sorter); + } + std::shared_ptr chunked_indices; + if (chunk_indices_too) { + std::vector> indices_chunks; + int64_t offset = 0; + for (int i = 0; i < values->num_chunks(); ++i) { + auto chunk = indices->Slice(offset, values->chunk(i)->length()); + indices_chunks.push_back(std::move(chunk)); + offset += values->chunk(i)->length(); + } + chunked_indices = std::make_shared(std::move(indices_chunks)); + } + + if (chunk_indices_too) { + for (auto _ : state) { + ABORT_NOT_OK(Take(values, chunked_indices).status()); + } + } else { + for (auto _ : state) { + ABORT_NOT_OK(Take(values, indices).status()); + } + } + state.SetItemsProcessed(state.iterations() * values->length()); + } }; struct FilterBenchmark { @@ -298,11 +374,11 @@ static void FilterRecordBatchWithNulls(benchmark::State& state) { } static void TakeInt64RandomIndicesNoNulls(benchmark::State& state) { - TakeBenchmark(state, false).Int64(); + TakeBenchmark(state, /*indices_with_nulls=*/false).Int64(); } static void TakeInt64RandomIndicesWithNulls(benchmark::State& state) { - TakeBenchmark(state, true).Int64(); + TakeBenchmark(state, /*indices_with_nulls=*/true).Int64(); } static void TakeInt64MonotonicIndices(benchmark::State& state) { @@ -310,11 +386,11 @@ static void TakeInt64MonotonicIndices(benchmark::State& state) { } static void TakeFixedSizeBinaryRandomIndicesNoNulls(benchmark::State& state) { - TakeBenchmark(state, false).FixedSizeBinary(); + TakeBenchmark(state, /*indices_with_nulls=*/false).FixedSizeBinary(); } static void TakeFixedSizeBinaryRandomIndicesWithNulls(benchmark::State& state) { - TakeBenchmark(state, true).FixedSizeBinary(); + TakeBenchmark(state, /*indices_with_nulls=*/true).FixedSizeBinary(); } static void TakeFixedSizeBinaryMonotonicIndices(benchmark::State& state) { @@ -323,11 +399,11 @@ static void TakeFixedSizeBinaryMonotonicIndices(benchmark::State& state) { } static void TakeFSLInt64RandomIndicesNoNulls(benchmark::State& state) { - TakeBenchmark(state, false).FSLInt64(); + TakeBenchmark(state, /*indices_with_nulls=*/false).FSLInt64(); } static void TakeFSLInt64RandomIndicesWithNulls(benchmark::State& state) { - TakeBenchmark(state, true).FSLInt64(); + TakeBenchmark(state, /*indices_with_nulls=*/true).FSLInt64(); } static void TakeFSLInt64MonotonicIndices(benchmark::State& state) { @@ -335,17 +411,79 @@ static void TakeFSLInt64MonotonicIndices(benchmark::State& state) { } static void TakeStringRandomIndicesNoNulls(benchmark::State& state) { - TakeBenchmark(state, false).String(); + TakeBenchmark(state, /*indices_with_nulls=*/false).String(); } static void TakeStringRandomIndicesWithNulls(benchmark::State& state) { - TakeBenchmark(state, true).String(); + TakeBenchmark(state, /*indices_with_nulls=*/true).String(); } static void TakeStringMonotonicIndices(benchmark::State& state) { TakeBenchmark(state, /*indices_with_nulls=*/false, /*monotonic=*/true).FSLInt64(); } +static void TakeChunkedChunkedInt64RandomIndicesNoNulls(benchmark::State& state) { + TakeBenchmark(state, /*indices_with_nulls=*/false) + .ChunkedInt64(/*num_chunks=*/100, /*chunk_indices_too=*/true); +} + +static void TakeChunkedChunkedInt64RandomIndicesWithNulls(benchmark::State& state) { + TakeBenchmark(state, /*indices_with_nulls=*/true) + .ChunkedInt64(/*num_chunks=*/100, /*chunk_indices_too=*/true); +} + +static void TakeChunkedChunkedInt64MonotonicIndices(benchmark::State& state) { + TakeBenchmark(state, /*indices_with_nulls=*/false, /*monotonic=*/true) + .ChunkedInt64( + /*num_chunks=*/100, /*chunk_indices_too=*/true); +} + +static void TakeChunkedChunkedFSBRandomIndicesNoNulls(benchmark::State& state) { + TakeBenchmark(state, /*indices_with_nulls=*/false) + .ChunkedFSB(/*num_chunks=*/100, /*chunk_indices_too=*/true); +} + +static void TakeChunkedChunkedFSBRandomIndicesWithNulls(benchmark::State& state) { + TakeBenchmark(state, /*indices_with_nulls=*/true) + .ChunkedFSB(/*num_chunks=*/100, /*chunk_indices_too=*/true); +} + +static void TakeChunkedChunkedFSBMonotonicIndices(benchmark::State& state) { + TakeBenchmark(state, /*indices_with_nulls=*/false, /*monotonic=*/true) + .ChunkedFSB(/*num_chunks=*/100, /*chunk_indices_too=*/true); +} + +static void TakeChunkedChunkedStringRandomIndicesNoNulls(benchmark::State& state) { + TakeBenchmark(state, /*indices_with_nulls=*/false) + .ChunkedString(/*num_chunks=*/100, /*chunk_indices_too=*/true); +} + +static void TakeChunkedChunkedStringRandomIndicesWithNulls(benchmark::State& state) { + TakeBenchmark(state, /*indices_with_nulls=*/true) + .ChunkedString(/*num_chunks=*/100, /*chunk_indices_too=*/true); +} + +static void TakeChunkedChunkedStringMonotonicIndices(benchmark::State& state) { + TakeBenchmark(state, /*indices_with_nulls=*/false, /*monotonic=*/true) + .ChunkedString(/*num_chunks=*/100, /*chunk_indices_too=*/true); +} + +static void TakeChunkedFlatInt64RandomIndicesNoNulls(benchmark::State& state) { + TakeBenchmark(state, /*indices_with_nulls=*/false) + .ChunkedInt64(/*num_chunks=*/100, /*chunk_indices_too=*/false); +} + +static void TakeChunkedFlatInt64RandomIndicesWithNulls(benchmark::State& state) { + TakeBenchmark(state, /*indices_with_nulls=*/true) + .ChunkedInt64(/*num_chunks=*/100, /*chunk_indices_too=*/false); +} + +static void TakeChunkedFlatInt64MonotonicIndices(benchmark::State& state) { + TakeBenchmark(state, /*indices_with_nulls=*/false, /*monotonic=*/true) + .ChunkedInt64( + /*num_chunks=*/100, /*chunk_indices_too=*/false); +} + void FilterSetArgs(benchmark::internal::Benchmark* bench) { for (int64_t size : g_data_sizes) { for (int i = 0; i < static_cast(g_filter_params.size()); ++i) { @@ -405,6 +543,7 @@ void TakeFSBSetArgs(benchmark::internal::Benchmark* bench) { } } +// Flat values x Flat indices BENCHMARK(TakeInt64RandomIndicesNoNulls)->Apply(TakeSetArgs); BENCHMARK(TakeInt64RandomIndicesWithNulls)->Apply(TakeSetArgs); BENCHMARK(TakeInt64MonotonicIndices)->Apply(TakeSetArgs); @@ -418,5 +557,21 @@ BENCHMARK(TakeStringRandomIndicesNoNulls)->Apply(TakeSetArgs); BENCHMARK(TakeStringRandomIndicesWithNulls)->Apply(TakeSetArgs); BENCHMARK(TakeStringMonotonicIndices)->Apply(TakeSetArgs); +// Chunked values x Chunked indices +BENCHMARK(TakeChunkedChunkedInt64RandomIndicesNoNulls)->Apply(TakeSetArgs); +BENCHMARK(TakeChunkedChunkedInt64RandomIndicesWithNulls)->Apply(TakeSetArgs); +BENCHMARK(TakeChunkedChunkedInt64MonotonicIndices)->Apply(TakeSetArgs); +BENCHMARK(TakeChunkedChunkedFSBRandomIndicesNoNulls)->Apply(TakeFSBSetArgs); +BENCHMARK(TakeChunkedChunkedFSBRandomIndicesWithNulls)->Apply(TakeFSBSetArgs); +BENCHMARK(TakeChunkedChunkedFSBMonotonicIndices)->Apply(TakeFSBSetArgs); +BENCHMARK(TakeChunkedChunkedStringRandomIndicesNoNulls)->Apply(TakeSetArgs); +BENCHMARK(TakeChunkedChunkedStringRandomIndicesWithNulls)->Apply(TakeSetArgs); +BENCHMARK(TakeChunkedChunkedStringMonotonicIndices)->Apply(TakeSetArgs); + +// Chunked values x Flat indices +BENCHMARK(TakeChunkedFlatInt64RandomIndicesNoNulls)->Apply(TakeSetArgs); +BENCHMARK(TakeChunkedFlatInt64RandomIndicesWithNulls)->Apply(TakeSetArgs); +BENCHMARK(TakeChunkedFlatInt64MonotonicIndices)->Apply(TakeSetArgs); + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc index 89b3f7d0d3c..5cd37108284 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc @@ -681,112 +681,122 @@ Status ExtensionTake(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) // R -> RecordBatch // T -> Table -Result> TakeAA(const std::shared_ptr& values, - const std::shared_ptr& indices, - const TakeOptions& options, ExecContext* ctx) { +Result> TakeAAA(const std::shared_ptr& values, + const std::shared_ptr& indices, + const TakeOptions& options, ExecContext* ctx) { ARROW_ASSIGN_OR_RAISE(Datum result, CallFunction("array_take", {values, indices}, &options, ctx)); return result.array(); } -Result> TakeCA(const ChunkedArray& values, - const Array& indices, - const TakeOptions& options, - ExecContext* ctx) { - auto num_chunks = values.num_chunks(); - std::shared_ptr current_chunk; - - // Case 1: `values` has a single chunk, so just use it - if (num_chunks == 1) { - current_chunk = values.chunk(0); +Result> TakeCAC(const ChunkedArray& values, + const Array& indices, + const TakeOptions& options, + ExecContext* ctx) { + std::shared_ptr values_array; + if (values.num_chunks() == 1) { + // Case 1: `values` has a single chunk, so just use it + values_array = values.chunk(0); } else { // TODO Case 2: See if all `indices` fall in the same chunk and call Array Take on it // See // https://github.com/apache/arrow/blob/6f2c9041137001f7a9212f244b51bc004efc29af/r/src/compute.cpp#L123-L151 // TODO Case 3: If indices are sorted, can slice them and call Array Take + // (these are relevant to TakeCCC as well) // Case 4: Else, concatenate chunks and call Array Take if (values.chunks().empty()) { - ARROW_ASSIGN_OR_RAISE(current_chunk, MakeArrayOfNull(values.type(), /*length=*/0, - ctx->memory_pool())); + ARROW_ASSIGN_OR_RAISE( + values_array, MakeArrayOfNull(values.type(), /*length=*/0, ctx->memory_pool())); } else { - ARROW_ASSIGN_OR_RAISE(current_chunk, + ARROW_ASSIGN_OR_RAISE(values_array, Concatenate(values.chunks(), ctx->memory_pool())); } } // Call Array Take on our single chunk ARROW_ASSIGN_OR_RAISE(std::shared_ptr new_chunk, - TakeAA(current_chunk->data(), indices.data(), options, ctx)); + TakeAAA(values_array->data(), indices.data(), options, ctx)); std::vector> chunks = {MakeArray(new_chunk)}; return std::make_shared(std::move(chunks)); } -Result> TakeCC(const ChunkedArray& values, - const ChunkedArray& indices, - const TakeOptions& options, - ExecContext* ctx) { - auto num_chunks = indices.num_chunks(); - std::vector> new_chunks(num_chunks); - for (int i = 0; i < num_chunks; i++) { - // Take with that indices chunk - // Note that as currently implemented, this is inefficient because `values` - // will get concatenated on every iteration of this loop - ARROW_ASSIGN_OR_RAISE(std::shared_ptr current_chunk, - TakeCA(values, *indices.chunk(i), options, ctx)); - // Concatenate the result to make a single array for this chunk - ARROW_ASSIGN_OR_RAISE(new_chunks[i], - Concatenate(current_chunk->chunks(), ctx->memory_pool())); +Result> TakeCCC(const ChunkedArray& values, + const ChunkedArray& indices, + const TakeOptions& options, + ExecContext* ctx) { + // XXX: for every chunk in indices, values are gathered from all chunks in values to + // form a new chunk in the result. Performing this concatenation is not ideal, but + // greatly simplifies the implementation before something more efficient is + // implemented. + std::shared_ptr values_array; + if (values.num_chunks() == 1) { + values_array = values.chunk(0); + } else { + if (values.chunks().empty()) { + ARROW_ASSIGN_OR_RAISE( + values_array, MakeArrayOfNull(values.type(), /*length=*/0, ctx->memory_pool())); + } else { + ARROW_ASSIGN_OR_RAISE(values_array, + Concatenate(values.chunks(), ctx->memory_pool())); + } + } + std::vector> new_chunks; + new_chunks.resize(indices.num_chunks()); + for (int i = 0; i < indices.num_chunks(); i++) { + ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(values_array->data(), + indices.chunk(i)->data(), options, ctx)); + new_chunks[i] = MakeArray(chunk); } return std::make_shared(std::move(new_chunks), values.type()); } -Result> TakeAC(const Array& values, - const ChunkedArray& indices, - const TakeOptions& options, - ExecContext* ctx) { +Result> TakeACC(const Array& values, + const ChunkedArray& indices, + const TakeOptions& options, + ExecContext* ctx) { auto num_chunks = indices.num_chunks(); std::vector> new_chunks(num_chunks); for (int i = 0; i < num_chunks; i++) { // Take with that indices chunk ARROW_ASSIGN_OR_RAISE(std::shared_ptr chunk, - TakeAA(values.data(), indices.chunk(i)->data(), options, ctx)); + TakeAAA(values.data(), indices.chunk(i)->data(), options, ctx)); new_chunks[i] = MakeArray(chunk); } return std::make_shared(std::move(new_chunks), values.type()); } -Result> TakeRA(const RecordBatch& batch, - const Array& indices, - const TakeOptions& options, - ExecContext* ctx) { +Result> TakeRAR(const RecordBatch& batch, + const Array& indices, + const TakeOptions& options, + ExecContext* ctx) { auto ncols = batch.num_columns(); auto nrows = indices.length(); std::vector> columns(ncols); for (int j = 0; j < ncols; j++) { ARROW_ASSIGN_OR_RAISE(std::shared_ptr col_data, - TakeAA(batch.column(j)->data(), indices.data(), options, ctx)); + TakeAAA(batch.column(j)->data(), indices.data(), options, ctx)); columns[j] = MakeArray(col_data); } return RecordBatch::Make(batch.schema(), nrows, std::move(columns)); } -Result> TakeTA(const Table& table, const Array& indices, - const TakeOptions& options, ExecContext* ctx) { +Result> TakeTAT(const Table& table, const Array& indices, + const TakeOptions& options, ExecContext* ctx) { auto ncols = table.num_columns(); std::vector> columns(ncols); for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(columns[j], TakeCA(*table.column(j), indices, options, ctx)); + ARROW_ASSIGN_OR_RAISE(columns[j], TakeCAC(*table.column(j), indices, options, ctx)); } return Table::Make(table.schema(), std::move(columns)); } -Result> TakeTC(const Table& table, const ChunkedArray& indices, - const TakeOptions& options, ExecContext* ctx) { +Result> TakeTCT(const Table& table, const ChunkedArray& indices, + const TakeOptions& options, ExecContext* ctx) { auto ncols = table.num_columns(); std::vector> columns(ncols); for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(columns[j], TakeCC(*table.column(j), indices, options, ctx)); + ARROW_ASSIGN_OR_RAISE(columns[j], TakeCCC(*table.column(j), indices, options, ctx)); } return Table::Make(table.schema(), std::move(columns)); } @@ -815,29 +825,29 @@ class TakeMetaFunction : public MetaFunction { switch (args[0].kind()) { case Datum::ARRAY: if (index_kind == Datum::ARRAY) { - return TakeAA(args[0].array(), args[1].array(), take_opts, ctx); + return TakeAAA(args[0].array(), args[1].array(), take_opts, ctx); } else if (index_kind == Datum::CHUNKED_ARRAY) { - return TakeAC(*args[0].make_array(), *args[1].chunked_array(), take_opts, ctx); + return TakeACC(*args[0].make_array(), *args[1].chunked_array(), take_opts, ctx); } break; case Datum::CHUNKED_ARRAY: if (index_kind == Datum::ARRAY) { - return TakeCA(*args[0].chunked_array(), *args[1].make_array(), take_opts, ctx); + return TakeCAC(*args[0].chunked_array(), *args[1].make_array(), take_opts, ctx); } else if (index_kind == Datum::CHUNKED_ARRAY) { - return TakeCC(*args[0].chunked_array(), *args[1].chunked_array(), take_opts, - ctx); + return TakeCCC(*args[0].chunked_array(), *args[1].chunked_array(), take_opts, + ctx); } break; case Datum::RECORD_BATCH: if (index_kind == Datum::ARRAY) { - return TakeRA(*args[0].record_batch(), *args[1].make_array(), take_opts, ctx); + return TakeRAR(*args[0].record_batch(), *args[1].make_array(), take_opts, ctx); } break; case Datum::TABLE: if (index_kind == Datum::ARRAY) { - return TakeTA(*args[0].table(), *args[1].make_array(), take_opts, ctx); + return TakeTAT(*args[0].table(), *args[1].make_array(), take_opts, ctx); } else if (index_kind == Datum::CHUNKED_ARRAY) { - return TakeTC(*args[0].table(), *args[1].chunked_array(), take_opts, ctx); + return TakeTCT(*args[0].table(), *args[1].chunked_array(), take_opts, ctx); } break; default: