From deb63099cbe05ec11420cf14c1ba07788576bf6c Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 26 Apr 2024 23:31:22 -0300 Subject: [PATCH 1/7] TakeCAC: Extract TakeAAC for simpler delegation --- .../kernels/vector_selection_take_internal.cc | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc index dee80e9d258..77e30369449 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc @@ -544,35 +544,34 @@ Result> TakeAAA(const std::shared_ptr& val return result.array(); } +Result> TakeAAC(const Array& values, const Array& indices, + const TakeOptions& options, + ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr new_chunk, + TakeAAA(values.data(), indices.data(), options, ctx)); + std::vector> chunks = {MakeArray(new_chunk)}; + return std::make_shared(std::move(chunks)); +} + Result> TakeCAC(const ChunkedArray& values, const Array& indices, const TakeOptions& options, ExecContext* ctx) { - std::shared_ptr values_array; if (values.num_chunks() == 1) { - // Case 1: `values` has a single chunk, so just use it - values_array = values.chunk(0); - } else { - // TODO Case 2: See if all `indices` fall in the same chunk and call Array Take on it - // See - // https://github.com/apache/arrow/blob/6f2c9041137001f7a9212f244b51bc004efc29af/r/src/compute.cpp#L123-L151 - // TODO Case 3: If indices are sorted, can slice them and call Array Take - // (these are relevant to TakeCCC as well) + // `values` has a single chunk, so just delegate to TakeAAC + return TakeAAC(*values.chunk(0), indices, options, ctx); + } - // Case 4: Else, concatenate chunks and call Array Take - if (values.chunks().empty()) { - ARROW_ASSIGN_OR_RAISE( - values_array, MakeArrayOfNull(values.type(), /*length=*/0, ctx->memory_pool())); - } else { - ARROW_ASSIGN_OR_RAISE(values_array, - Concatenate(values.chunks(), ctx->memory_pool())); - } + // Slow path: Concatenate() chunks and delegate to TakeAAC which is more + // likely to handle the input types when they are in a single chunk. + std::shared_ptr values_array; + if (values.chunks().empty()) { + ARROW_ASSIGN_OR_RAISE( + values_array, MakeArrayOfNull(values.type(), /*length=*/0, ctx->memory_pool())); + } else { + ARROW_ASSIGN_OR_RAISE(values_array, Concatenate(values.chunks(), ctx->memory_pool())); } - // Call Array Take on our single chunk - ARROW_ASSIGN_OR_RAISE(std::shared_ptr new_chunk, - TakeAAA(values_array->data(), indices.data(), options, ctx)); - std::vector> chunks = {MakeArray(new_chunk)}; - return std::make_shared(std::move(chunks)); + return TakeAAC(*values_array, indices, options, ctx); } Result> TakeCCC(const ChunkedArray& values, From 4aa89923b9f4e571975a34b20636f5039dd7f617 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Wed, 1 May 2024 16:35:27 -0300 Subject: [PATCH 2/7] Take: Pass ChunkedArray as shared_ptr& in TakeXXX functions --- .../kernels/vector_selection_take_internal.cc | 69 ++++++++++--------- 1 file changed, 36 insertions(+), 33 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc index 77e30369449..9cd9e2ed3b3 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc @@ -553,55 +553,56 @@ Result> TakeAAC(const Array& values, const Array& return std::make_shared(std::move(chunks)); } -Result> TakeCAC(const ChunkedArray& values, +Result> TakeCAC(const std::shared_ptr& values, const Array& indices, const TakeOptions& options, ExecContext* ctx) { - if (values.num_chunks() == 1) { + if (values->num_chunks() == 1) { // `values` has a single chunk, so just delegate to TakeAAC - return TakeAAC(*values.chunk(0), indices, options, ctx); + return TakeAAC(*values->chunk(0), indices, options, ctx); } // Slow path: Concatenate() chunks and delegate to TakeAAC which is more // likely to handle the input types when they are in a single chunk. std::shared_ptr values_array; - if (values.chunks().empty()) { + if (values->chunks().empty()) { ARROW_ASSIGN_OR_RAISE( - values_array, MakeArrayOfNull(values.type(), /*length=*/0, ctx->memory_pool())); + values_array, MakeArrayOfNull(values->type(), /*length=*/0, ctx->memory_pool())); } else { - ARROW_ASSIGN_OR_RAISE(values_array, Concatenate(values.chunks(), ctx->memory_pool())); + ARROW_ASSIGN_OR_RAISE(values_array, + Concatenate(values->chunks(), ctx->memory_pool())); } return TakeAAC(*values_array, indices, options, ctx); } -Result> TakeCCC(const ChunkedArray& values, - const ChunkedArray& indices, - const TakeOptions& options, - ExecContext* ctx) { +Result> TakeCCC( + const std::shared_ptr& values, + const std::shared_ptr& indices, const TakeOptions& options, + ExecContext* ctx) { // XXX: for every chunk in indices, values are gathered from all chunks in values to // form a new chunk in the result. Performing this concatenation is not ideal, but // greatly simplifies the implementation before something more efficient is // implemented. std::shared_ptr values_array; - if (values.num_chunks() == 1) { - values_array = values.chunk(0); + if (values->num_chunks() == 1) { + values_array = values->chunk(0); } else { - if (values.chunks().empty()) { - ARROW_ASSIGN_OR_RAISE( - values_array, MakeArrayOfNull(values.type(), /*length=*/0, ctx->memory_pool())); + if (values->chunks().empty()) { + ARROW_ASSIGN_OR_RAISE(values_array, MakeArrayOfNull(values->type(), /*length=*/0, + ctx->memory_pool())); } else { ARROW_ASSIGN_OR_RAISE(values_array, - Concatenate(values.chunks(), ctx->memory_pool())); + Concatenate(values->chunks(), ctx->memory_pool())); } } std::vector> new_chunks; - new_chunks.resize(indices.num_chunks()); - for (int i = 0; i < indices.num_chunks(); i++) { + new_chunks.resize(indices->num_chunks()); + for (int i = 0; i < indices->num_chunks(); i++) { ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(values_array->data(), - indices.chunk(i)->data(), options, ctx)); + indices->chunk(i)->data(), options, ctx)); new_chunks[i] = MakeArray(chunk); } - return std::make_shared(std::move(new_chunks), values.type()); + return std::make_shared(std::move(new_chunks), values->type()); } Result> TakeACC(const Array& values, @@ -634,25 +635,27 @@ Result> TakeRAR(const RecordBatch& batch, return RecordBatch::Make(batch.schema(), nrows, std::move(columns)); } -Result> TakeTAT(const Table& table, const Array& indices, - const TakeOptions& options, ExecContext* ctx) { - auto ncols = table.num_columns(); +Result> TakeTAT(const std::shared_ptr& table, + const Array& indices, const TakeOptions& options, + ExecContext* ctx) { + auto ncols = table->num_columns(); std::vector> columns(ncols); for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(columns[j], TakeCAC(*table.column(j), indices, options, ctx)); + ARROW_ASSIGN_OR_RAISE(columns[j], TakeCAC(table->column(j), indices, options, ctx)); } - return Table::Make(table.schema(), std::move(columns)); + return Table::Make(table->schema(), std::move(columns)); } -Result> TakeTCT(const Table& table, const ChunkedArray& indices, +Result> TakeTCT(const std::shared_ptr
& table, + const std::shared_ptr& indices, const TakeOptions& options, ExecContext* ctx) { - auto ncols = table.num_columns(); + auto ncols = table->num_columns(); std::vector> columns(ncols); for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(columns[j], TakeCCC(*table.column(j), indices, options, ctx)); + ARROW_ASSIGN_OR_RAISE(columns[j], TakeCCC(table->column(j), indices, options, ctx)); } - return Table::Make(table.schema(), std::move(columns)); + return Table::Make(table->schema(), std::move(columns)); } const FunctionDoc take_doc( @@ -686,9 +689,9 @@ class TakeMetaFunction : public MetaFunction { break; case Datum::CHUNKED_ARRAY: if (index_kind == Datum::ARRAY) { - return TakeCAC(*args[0].chunked_array(), *args[1].make_array(), take_opts, ctx); + return TakeCAC(args[0].chunked_array(), *args[1].make_array(), take_opts, ctx); } else if (index_kind == Datum::CHUNKED_ARRAY) { - return TakeCCC(*args[0].chunked_array(), *args[1].chunked_array(), take_opts, + return TakeCCC(args[0].chunked_array(), args[1].chunked_array(), take_opts, ctx); } break; @@ -699,9 +702,9 @@ class TakeMetaFunction : public MetaFunction { break; case Datum::TABLE: if (index_kind == Datum::ARRAY) { - return TakeTAT(*args[0].table(), *args[1].make_array(), take_opts, ctx); + return TakeTAT(args[0].table(), *args[1].make_array(), take_opts, ctx); } else if (index_kind == Datum::CHUNKED_ARRAY) { - return TakeTCT(*args[0].table(), *args[1].chunked_array(), take_opts, ctx); + return TakeTCT(args[0].table(), args[1].chunked_array(), take_opts, ctx); } break; default: From 1a1d6ffcb09d254730b2e0bf15d9d4730786065e Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 3 May 2024 15:36:48 -0300 Subject: [PATCH 3/7] Take: Move all TakeXXX() functions to the TakeMetaFunction class --- .../kernels/vector_selection_take_internal.cc | 229 +++++++++--------- 1 file changed, 116 insertions(+), 113 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc index 9cd9e2ed3b3..1e72b8f8091 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc @@ -536,57 +536,51 @@ Status ExtensionTake(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) // R -> RecordBatch // T -> Table -Result> TakeAAA(const std::shared_ptr& values, - const std::shared_ptr& indices, - const TakeOptions& options, ExecContext* ctx) { - ARROW_ASSIGN_OR_RAISE(Datum result, - CallFunction("array_take", {values, indices}, &options, ctx)); - return result.array(); -} +const FunctionDoc take_doc( + "Select values from an input based on indices from another array", + ("The output is populated with values from the input at positions\n" + "given by `indices`. Nulls in `indices` emit null in the output."), + {"input", "indices"}, "TakeOptions"); -Result> TakeAAC(const Array& values, const Array& indices, - const TakeOptions& options, - ExecContext* ctx) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr new_chunk, - TakeAAA(values.data(), indices.data(), options, ctx)); - std::vector> chunks = {MakeArray(new_chunk)}; - return std::make_shared(std::move(chunks)); -} +// Metafunction for dispatching to different Take implementations other than +// Array-Array. +// +// TODO: Revamp approach to executing Take operations. In addition to being +// overly complex dispatching, there is no parallelization. +class TakeMetaFunction : public MetaFunction { + public: + TakeMetaFunction() + : MetaFunction("take", Arity::Binary(), take_doc, GetDefaultTakeOptions()) {} -Result> TakeCAC(const std::shared_ptr& values, - const Array& indices, - const TakeOptions& options, - ExecContext* ctx) { - if (values->num_chunks() == 1) { - // `values` has a single chunk, so just delegate to TakeAAC - return TakeAAC(*values->chunk(0), indices, options, ctx); + static Result> TakeAAA( + const std::shared_ptr& values, const std::shared_ptr& indices, + const TakeOptions& options, ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(Datum result, + CallFunction("array_take", {values, indices}, &options, ctx)); + return result.array(); } - // Slow path: Concatenate() chunks and delegate to TakeAAC which is more - // likely to handle the input types when they are in a single chunk. - std::shared_ptr values_array; - if (values->chunks().empty()) { - ARROW_ASSIGN_OR_RAISE( - values_array, MakeArrayOfNull(values->type(), /*length=*/0, ctx->memory_pool())); - } else { - ARROW_ASSIGN_OR_RAISE(values_array, - Concatenate(values->chunks(), ctx->memory_pool())); + static Result> TakeAAC(const Array& values, + const Array& indices, + const TakeOptions& options, + ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr new_chunk, + TakeAAA(values.data(), indices.data(), options, ctx)); + std::vector> chunks = {MakeArray(new_chunk)}; + return std::make_shared(std::move(chunks)); } - return TakeAAC(*values_array, indices, options, ctx); -} -Result> TakeCCC( - const std::shared_ptr& values, - const std::shared_ptr& indices, const TakeOptions& options, - ExecContext* ctx) { - // XXX: for every chunk in indices, values are gathered from all chunks in values to - // form a new chunk in the result. Performing this concatenation is not ideal, but - // greatly simplifies the implementation before something more efficient is - // implemented. - std::shared_ptr values_array; - if (values->num_chunks() == 1) { - values_array = values->chunk(0); - } else { + static Result> TakeCAC( + const std::shared_ptr& values, const Array& indices, + const TakeOptions& options, ExecContext* ctx) { + if (values->num_chunks() == 1) { + // `values` has a single chunk, so just delegate to TakeAAC + return TakeAAC(*values->chunk(0), indices, options, ctx); + } + + // Slow path: Concatenate() chunks and delegate to TakeAAC which is more + // likely to handle the input types when they are in a single chunk. + std::shared_ptr values_array; if (values->chunks().empty()) { ARROW_ASSIGN_OR_RAISE(values_array, MakeArrayOfNull(values->type(), /*length=*/0, ctx->memory_pool())); @@ -594,85 +588,94 @@ Result> TakeCCC( ARROW_ASSIGN_OR_RAISE(values_array, Concatenate(values->chunks(), ctx->memory_pool())); } + return TakeAAC(*values_array, indices, options, ctx); } - std::vector> new_chunks; - new_chunks.resize(indices->num_chunks()); - for (int i = 0; i < indices->num_chunks(); i++) { - ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(values_array->data(), - indices->chunk(i)->data(), options, ctx)); - new_chunks[i] = MakeArray(chunk); + + static Result> TakeCCC( + const std::shared_ptr& values, + const std::shared_ptr& indices, const TakeOptions& options, + ExecContext* ctx) { + // XXX: for every chunk in indices, values are gathered from all chunks in values to + // form a new chunk in the result. Performing this concatenation is not ideal, but + // greatly simplifies the implementation before something more efficient is + // implemented. + std::shared_ptr values_array; + if (values->num_chunks() == 1) { + values_array = values->chunk(0); + } else { + if (values->chunks().empty()) { + ARROW_ASSIGN_OR_RAISE(values_array, MakeArrayOfNull(values->type(), /*length=*/0, + ctx->memory_pool())); + } else { + ARROW_ASSIGN_OR_RAISE(values_array, + Concatenate(values->chunks(), ctx->memory_pool())); + } + } + std::vector> new_chunks; + new_chunks.resize(indices->num_chunks()); + for (int i = 0; i < indices->num_chunks(); i++) { + ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(values_array->data(), + indices->chunk(i)->data(), options, ctx)); + new_chunks[i] = MakeArray(chunk); + } + return std::make_shared(std::move(new_chunks), values->type()); } - return std::make_shared(std::move(new_chunks), values->type()); -} -Result> TakeACC(const Array& values, - const ChunkedArray& indices, - const TakeOptions& options, - ExecContext* ctx) { - auto num_chunks = indices.num_chunks(); - std::vector> new_chunks(num_chunks); - for (int i = 0; i < num_chunks; i++) { - // Take with that indices chunk - ARROW_ASSIGN_OR_RAISE(std::shared_ptr chunk, - TakeAAA(values.data(), indices.chunk(i)->data(), options, ctx)); - new_chunks[i] = MakeArray(chunk); + static Result> TakeACC(const Array& values, + const ChunkedArray& indices, + const TakeOptions& options, + ExecContext* ctx) { + auto num_chunks = indices.num_chunks(); + std::vector> new_chunks(num_chunks); + for (int i = 0; i < num_chunks; i++) { + // Take with that indices chunk + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr chunk, + TakeAAA(values.data(), indices.chunk(i)->data(), options, ctx)); + new_chunks[i] = MakeArray(chunk); + } + return std::make_shared(std::move(new_chunks), values.type()); } - return std::make_shared(std::move(new_chunks), values.type()); -} -Result> TakeRAR(const RecordBatch& batch, - const Array& indices, - const TakeOptions& options, - ExecContext* ctx) { - auto ncols = batch.num_columns(); - auto nrows = indices.length(); - std::vector> columns(ncols); - for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr col_data, - TakeAAA(batch.column(j)->data(), indices.data(), options, ctx)); - columns[j] = MakeArray(col_data); + static Result> TakeRAR(const RecordBatch& batch, + const Array& indices, + const TakeOptions& options, + ExecContext* ctx) { + auto ncols = batch.num_columns(); + auto nrows = indices.length(); + std::vector> columns(ncols); + for (int j = 0; j < ncols; j++) { + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr col_data, + TakeAAA(batch.column(j)->data(), indices.data(), options, ctx)); + columns[j] = MakeArray(col_data); + } + return RecordBatch::Make(batch.schema(), nrows, std::move(columns)); } - return RecordBatch::Make(batch.schema(), nrows, std::move(columns)); -} -Result> TakeTAT(const std::shared_ptr
& table, - const Array& indices, const TakeOptions& options, - ExecContext* ctx) { - auto ncols = table->num_columns(); - std::vector> columns(ncols); + static Result> TakeTAT(const std::shared_ptr
& table, + const Array& indices, + const TakeOptions& options, + ExecContext* ctx) { + auto ncols = table->num_columns(); + std::vector> columns(ncols); - for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(columns[j], TakeCAC(table->column(j), indices, options, ctx)); + for (int j = 0; j < ncols; j++) { + ARROW_ASSIGN_OR_RAISE(columns[j], TakeCAC(table->column(j), indices, options, ctx)); + } + return Table::Make(table->schema(), std::move(columns)); } - return Table::Make(table->schema(), std::move(columns)); -} -Result> TakeTCT(const std::shared_ptr
& table, - const std::shared_ptr& indices, - const TakeOptions& options, ExecContext* ctx) { - auto ncols = table->num_columns(); - std::vector> columns(ncols); - for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(columns[j], TakeCCC(table->column(j), indices, options, ctx)); + static Result> TakeTCT( + const std::shared_ptr
& table, const std::shared_ptr& indices, + const TakeOptions& options, ExecContext* ctx) { + auto ncols = table->num_columns(); + std::vector> columns(ncols); + for (int j = 0; j < ncols; j++) { + ARROW_ASSIGN_OR_RAISE(columns[j], TakeCCC(table->column(j), indices, options, ctx)); + } + return Table::Make(table->schema(), std::move(columns)); } - return Table::Make(table->schema(), std::move(columns)); -} - -const FunctionDoc take_doc( - "Select values from an input based on indices from another array", - ("The output is populated with values from the input at positions\n" - "given by `indices`. Nulls in `indices` emit null in the output."), - {"input", "indices"}, "TakeOptions"); - -// Metafunction for dispatching to different Take implementations other than -// Array-Array. -// -// TODO: Revamp approach to executing Take operations. In addition to being -// overly complex dispatching, there is no parallelization. -class TakeMetaFunction : public MetaFunction { - public: - TakeMetaFunction() - : MetaFunction("take", Arity::Binary(), take_doc, GetDefaultTakeOptions()) {} Result ExecuteImpl(const std::vector& args, const FunctionOptions* options, From 82e758553daecb048ea3b34bb4a4ee0d20ee732e Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 3 May 2024 16:15:52 -0300 Subject: [PATCH 4/7] Take: Review the TakeXXX delegation logic --- .../kernels/vector_selection_take_internal.cc | 92 +++++++++---------- 1 file changed, 45 insertions(+), 47 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc index 1e72b8f8091..60768fe5e73 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc @@ -31,6 +31,7 @@ #include "arrow/compute/kernels/gather_internal.h" #include "arrow/compute/kernels/vector_selection_internal.h" #include "arrow/compute/kernels/vector_selection_take_internal.h" +#include "arrow/compute/registry.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" #include "arrow/table.h" @@ -552,43 +553,49 @@ class TakeMetaFunction : public MetaFunction { TakeMetaFunction() : MetaFunction("take", Arity::Binary(), take_doc, GetDefaultTakeOptions()) {} - static Result> TakeAAA( - const std::shared_ptr& values, const std::shared_ptr& indices, - const TakeOptions& options, ExecContext* ctx) { - ARROW_ASSIGN_OR_RAISE(Datum result, - CallFunction("array_take", {values, indices}, &options, ctx)); + static Result CallArrayTake(const std::vector& args, + const TakeOptions& options, ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(auto array_take_func, + ctx->func_registry()->GetFunction("array_take")); + return array_take_func->Execute(args, &options, ctx); + } + + static Result> TakeAAA(const std::vector& args, + const TakeOptions& options, + ExecContext* ctx) { + DCHECK_EQ(args[0].kind(), Datum::ARRAY); + DCHECK_EQ(args[1].kind(), Datum::ARRAY); + ARROW_ASSIGN_OR_RAISE(Datum result, CallArrayTake(args, options, ctx)); return result.array(); } - static Result> TakeAAC(const Array& values, - const Array& indices, + static Result> TakeAAC(const std::vector& args, const TakeOptions& options, ExecContext* ctx) { ARROW_ASSIGN_OR_RAISE(std::shared_ptr new_chunk, - TakeAAA(values.data(), indices.data(), options, ctx)); - std::vector> chunks = {MakeArray(new_chunk)}; - return std::make_shared(std::move(chunks)); + TakeAAA(args, options, ctx)); + return std::make_shared(MakeArray(new_chunk)); + } + + static Result> ChunkedArrayAsArray( + const std::shared_ptr& values, MemoryPool* pool) { + switch (values->num_chunks()) { + case 0: + return MakeArrayOfNull(values->type(), /*length=*/0, pool); + case 1: + return values->chunk(0); + default: + return Concatenate(values->chunks(), pool); + } } static Result> TakeCAC( const std::shared_ptr& values, const Array& indices, const TakeOptions& options, ExecContext* ctx) { - if (values->num_chunks() == 1) { - // `values` has a single chunk, so just delegate to TakeAAC - return TakeAAC(*values->chunk(0), indices, options, ctx); - } - - // Slow path: Concatenate() chunks and delegate to TakeAAC which is more - // likely to handle the input types when they are in a single chunk. - std::shared_ptr values_array; - if (values->chunks().empty()) { - ARROW_ASSIGN_OR_RAISE(values_array, MakeArrayOfNull(values->type(), /*length=*/0, - ctx->memory_pool())); - } else { - ARROW_ASSIGN_OR_RAISE(values_array, - Concatenate(values->chunks(), ctx->memory_pool())); - } - return TakeAAC(*values_array, indices, options, ctx); + ARROW_ASSIGN_OR_RAISE(auto values_array, + ChunkedArrayAsArray(values, ctx->memory_pool())); + std::vector args = {std::move(values_array), indices}; + return TakeAAC(args, options, ctx); } static Result> TakeCCC( @@ -599,23 +606,14 @@ class TakeMetaFunction : public MetaFunction { // form a new chunk in the result. Performing this concatenation is not ideal, but // greatly simplifies the implementation before something more efficient is // implemented. - std::shared_ptr values_array; - if (values->num_chunks() == 1) { - values_array = values->chunk(0); - } else { - if (values->chunks().empty()) { - ARROW_ASSIGN_OR_RAISE(values_array, MakeArrayOfNull(values->type(), /*length=*/0, - ctx->memory_pool())); - } else { - ARROW_ASSIGN_OR_RAISE(values_array, - Concatenate(values->chunks(), ctx->memory_pool())); - } - } + ARROW_ASSIGN_OR_RAISE(auto values_array, + ChunkedArrayAsArray(values, ctx->memory_pool())); + std::vector args = {std::move(values_array), {}}; std::vector> new_chunks; new_chunks.resize(indices->num_chunks()); for (int i = 0; i < indices->num_chunks(); i++) { - ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(values_array->data(), - indices->chunk(i)->data(), options, ctx)); + args[1] = indices->chunk(i); + ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(args, options, ctx)); new_chunks[i] = MakeArray(chunk); } return std::make_shared(std::move(new_chunks), values->type()); @@ -627,11 +625,11 @@ class TakeMetaFunction : public MetaFunction { ExecContext* ctx) { auto num_chunks = indices.num_chunks(); std::vector> new_chunks(num_chunks); + std::vector args = {values, {}}; for (int i = 0; i < num_chunks; i++) { // Take with that indices chunk - ARROW_ASSIGN_OR_RAISE( - std::shared_ptr chunk, - TakeAAA(values.data(), indices.chunk(i)->data(), options, ctx)); + args[1] = indices.chunk(i); + ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(args, options, ctx)); new_chunks[i] = MakeArray(chunk); } return std::make_shared(std::move(new_chunks), values.type()); @@ -644,10 +642,10 @@ class TakeMetaFunction : public MetaFunction { auto ncols = batch.num_columns(); auto nrows = indices.length(); std::vector> columns(ncols); + std::vector args = {{}, indices}; for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE( - std::shared_ptr col_data, - TakeAAA(batch.column(j)->data(), indices.data(), options, ctx)); + args[0] = batch.column(j); + ARROW_ASSIGN_OR_RAISE(auto col_data, TakeAAA(args, options, ctx)); columns[j] = MakeArray(col_data); } return RecordBatch::Make(batch.schema(), nrows, std::move(columns)); @@ -685,7 +683,7 @@ class TakeMetaFunction : public MetaFunction { switch (args[0].kind()) { case Datum::ARRAY: if (index_kind == Datum::ARRAY) { - return TakeAAA(args[0].array(), args[1].array(), take_opts, ctx); + return TakeAAA(args, take_opts, ctx); } else if (index_kind == Datum::CHUNKED_ARRAY) { return TakeACC(*args[0].make_array(), *args[1].chunked_array(), take_opts, ctx); } From c27f54ed4f12e4465f98cef2e4e9023327ad7324 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Fri, 3 May 2024 16:21:49 -0300 Subject: [PATCH 5/7] Take: Add TakeCAA, and remove TakeAAC TakeCAA will be used from TakeCCC soon. --- .../kernels/vector_selection_take_internal.cc | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc index 60768fe5e73..09268a5494c 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc @@ -569,14 +569,6 @@ class TakeMetaFunction : public MetaFunction { return result.array(); } - static Result> TakeAAC(const std::vector& args, - const TakeOptions& options, - ExecContext* ctx) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr new_chunk, - TakeAAA(args, options, ctx)); - return std::make_shared(MakeArray(new_chunk)); - } - static Result> ChunkedArrayAsArray( const std::shared_ptr& values, MemoryPool* pool) { switch (values->num_chunks()) { @@ -589,13 +581,20 @@ class TakeMetaFunction : public MetaFunction { } } - static Result> TakeCAC( + static Result> TakeCAA( const std::shared_ptr& values, const Array& indices, const TakeOptions& options, ExecContext* ctx) { ARROW_ASSIGN_OR_RAISE(auto values_array, ChunkedArrayAsArray(values, ctx->memory_pool())); std::vector args = {std::move(values_array), indices}; - return TakeAAC(args, options, ctx); + return TakeAAA(args, options, ctx); + } + + static Result> TakeCAC( + const std::shared_ptr& values, const Array& indices, + const TakeOptions& options, ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(auto new_chunk, TakeCAA(values, indices, options, ctx)); + return std::make_shared(MakeArray(std::move(new_chunk))); } static Result> TakeCCC( @@ -613,6 +612,8 @@ class TakeMetaFunction : public MetaFunction { new_chunks.resize(indices->num_chunks()); for (int i = 0; i < indices->num_chunks(); i++) { args[1] = indices->chunk(i); + // XXX: this loop can use TakeCAA once it can handle ChunkedArray + // without concatenating first ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(args, options, ctx)); new_chunks[i] = MakeArray(chunk); } From 2a682cd83e02a2af03e5cd69febaeff4a64ceb52 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Wed, 12 Jun 2024 14:01:17 -0300 Subject: [PATCH 6/7] TakeMetaFunction: Make most static methods private --- .../kernels/vector_selection_take_internal.cc | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc index 09268a5494c..f6de3c91662 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc @@ -560,15 +560,6 @@ class TakeMetaFunction : public MetaFunction { return array_take_func->Execute(args, &options, ctx); } - static Result> TakeAAA(const std::vector& args, - const TakeOptions& options, - ExecContext* ctx) { - DCHECK_EQ(args[0].kind(), Datum::ARRAY); - DCHECK_EQ(args[1].kind(), Datum::ARRAY); - ARROW_ASSIGN_OR_RAISE(Datum result, CallArrayTake(args, options, ctx)); - return result.array(); - } - static Result> ChunkedArrayAsArray( const std::shared_ptr& values, MemoryPool* pool) { switch (values->num_chunks()) { @@ -581,6 +572,16 @@ class TakeMetaFunction : public MetaFunction { } } + private: + static Result> TakeAAA(const std::vector& args, + const TakeOptions& options, + ExecContext* ctx) { + DCHECK_EQ(args[0].kind(), Datum::ARRAY); + DCHECK_EQ(args[1].kind(), Datum::ARRAY); + ARROW_ASSIGN_OR_RAISE(Datum result, CallArrayTake(args, options, ctx)); + return result.array(); + } + static Result> TakeCAA( const std::shared_ptr& values, const Array& indices, const TakeOptions& options, ExecContext* ctx) { @@ -676,6 +677,7 @@ class TakeMetaFunction : public MetaFunction { return Table::Make(table->schema(), std::move(columns)); } + public: Result ExecuteImpl(const std::vector& args, const FunctionOptions* options, ExecContext* ctx) const override { @@ -709,7 +711,8 @@ class TakeMetaFunction : public MetaFunction { return TakeTCT(args[0].table(), args[1].chunked_array(), take_opts, ctx); } break; - default: + case Datum::NONE: + case Datum::SCALAR: break; } return Status::NotImplemented( From fc997830761937724471721d7f84827c9a175313 Mon Sep 17 00:00:00 2001 From: Felipe Oliveira Carvalho Date: Thu, 13 Jun 2024 13:26:29 -0300 Subject: [PATCH 7/7] Take: Remove unwanted TODO comment --- .../arrow/compute/kernels/vector_selection_take_internal.cc | 3 --- 1 file changed, 3 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc index f6de3c91662..8b3f0431e65 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc @@ -545,9 +545,6 @@ const FunctionDoc take_doc( // Metafunction for dispatching to different Take implementations other than // Array-Array. -// -// TODO: Revamp approach to executing Take operations. In addition to being -// overly complex dispatching, there is no parallelization. class TakeMetaFunction : public MetaFunction { public: TakeMetaFunction()