diff --git a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc index dee80e9d258..8b3f0431e65 100644 --- a/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc +++ b/cpp/src/arrow/compute/kernels/vector_selection_take_internal.cc @@ -31,6 +31,7 @@ #include "arrow/compute/kernels/gather_internal.h" #include "arrow/compute/kernels/vector_selection_internal.h" #include "arrow/compute/kernels/vector_selection_take_internal.h" +#include "arrow/compute/registry.h" #include "arrow/memory_pool.h" #include "arrow/record_batch.h" #include "arrow/table.h" @@ -536,142 +537,144 @@ Status ExtensionTake(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) // R -> RecordBatch // T -> Table -Result> TakeAAA(const std::shared_ptr& values, - const std::shared_ptr& indices, - const TakeOptions& options, ExecContext* ctx) { - ARROW_ASSIGN_OR_RAISE(Datum result, - CallFunction("array_take", {values, indices}, &options, ctx)); - return result.array(); -} +const FunctionDoc take_doc( + "Select values from an input based on indices from another array", + ("The output is populated with values from the input at positions\n" + "given by `indices`. Nulls in `indices` emit null in the output."), + {"input", "indices"}, "TakeOptions"); -Result> TakeCAC(const ChunkedArray& values, - const Array& indices, - const TakeOptions& options, - ExecContext* ctx) { - std::shared_ptr values_array; - if (values.num_chunks() == 1) { - // Case 1: `values` has a single chunk, so just use it - values_array = values.chunk(0); - } else { - // TODO Case 2: See if all `indices` fall in the same chunk and call Array Take on it - // See - // https://github.com/apache/arrow/blob/6f2c9041137001f7a9212f244b51bc004efc29af/r/src/compute.cpp#L123-L151 - // TODO Case 3: If indices are sorted, can slice them and call Array Take - // (these are relevant to TakeCCC as well) - - // Case 4: Else, concatenate chunks and call Array Take - if (values.chunks().empty()) { - ARROW_ASSIGN_OR_RAISE( - values_array, MakeArrayOfNull(values.type(), /*length=*/0, ctx->memory_pool())); - } else { - ARROW_ASSIGN_OR_RAISE(values_array, - Concatenate(values.chunks(), ctx->memory_pool())); - } +// Metafunction for dispatching to different Take implementations other than +// Array-Array. +class TakeMetaFunction : public MetaFunction { + public: + TakeMetaFunction() + : MetaFunction("take", Arity::Binary(), take_doc, GetDefaultTakeOptions()) {} + + static Result CallArrayTake(const std::vector& args, + const TakeOptions& options, ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(auto array_take_func, + ctx->func_registry()->GetFunction("array_take")); + return array_take_func->Execute(args, &options, ctx); } - // Call Array Take on our single chunk - ARROW_ASSIGN_OR_RAISE(std::shared_ptr new_chunk, - TakeAAA(values_array->data(), indices.data(), options, ctx)); - std::vector> chunks = {MakeArray(new_chunk)}; - return std::make_shared(std::move(chunks)); -} -Result> TakeCCC(const ChunkedArray& values, - const ChunkedArray& indices, - const TakeOptions& options, - ExecContext* ctx) { - // XXX: for every chunk in indices, values are gathered from all chunks in values to - // form a new chunk in the result. Performing this concatenation is not ideal, but - // greatly simplifies the implementation before something more efficient is - // implemented. - std::shared_ptr values_array; - if (values.num_chunks() == 1) { - values_array = values.chunk(0); - } else { - if (values.chunks().empty()) { - ARROW_ASSIGN_OR_RAISE( - values_array, MakeArrayOfNull(values.type(), /*length=*/0, ctx->memory_pool())); - } else { - ARROW_ASSIGN_OR_RAISE(values_array, - Concatenate(values.chunks(), ctx->memory_pool())); + static Result> ChunkedArrayAsArray( + const std::shared_ptr& values, MemoryPool* pool) { + switch (values->num_chunks()) { + case 0: + return MakeArrayOfNull(values->type(), /*length=*/0, pool); + case 1: + return values->chunk(0); + default: + return Concatenate(values->chunks(), pool); } } - std::vector> new_chunks; - new_chunks.resize(indices.num_chunks()); - for (int i = 0; i < indices.num_chunks(); i++) { - ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(values_array->data(), - indices.chunk(i)->data(), options, ctx)); - new_chunks[i] = MakeArray(chunk); + + private: + static Result> TakeAAA(const std::vector& args, + const TakeOptions& options, + ExecContext* ctx) { + DCHECK_EQ(args[0].kind(), Datum::ARRAY); + DCHECK_EQ(args[1].kind(), Datum::ARRAY); + ARROW_ASSIGN_OR_RAISE(Datum result, CallArrayTake(args, options, ctx)); + return result.array(); } - return std::make_shared(std::move(new_chunks), values.type()); -} -Result> TakeACC(const Array& values, - const ChunkedArray& indices, - const TakeOptions& options, - ExecContext* ctx) { - auto num_chunks = indices.num_chunks(); - std::vector> new_chunks(num_chunks); - for (int i = 0; i < num_chunks; i++) { - // Take with that indices chunk - ARROW_ASSIGN_OR_RAISE(std::shared_ptr chunk, - TakeAAA(values.data(), indices.chunk(i)->data(), options, ctx)); - new_chunks[i] = MakeArray(chunk); + static Result> TakeCAA( + const std::shared_ptr& values, const Array& indices, + const TakeOptions& options, ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(auto values_array, + ChunkedArrayAsArray(values, ctx->memory_pool())); + std::vector args = {std::move(values_array), indices}; + return TakeAAA(args, options, ctx); } - return std::make_shared(std::move(new_chunks), values.type()); -} -Result> TakeRAR(const RecordBatch& batch, - const Array& indices, - const TakeOptions& options, - ExecContext* ctx) { - auto ncols = batch.num_columns(); - auto nrows = indices.length(); - std::vector> columns(ncols); - for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr col_data, - TakeAAA(batch.column(j)->data(), indices.data(), options, ctx)); - columns[j] = MakeArray(col_data); + static Result> TakeCAC( + const std::shared_ptr& values, const Array& indices, + const TakeOptions& options, ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(auto new_chunk, TakeCAA(values, indices, options, ctx)); + return std::make_shared(MakeArray(std::move(new_chunk))); } - return RecordBatch::Make(batch.schema(), nrows, std::move(columns)); -} -Result> TakeTAT(const Table& table, const Array& indices, - const TakeOptions& options, ExecContext* ctx) { - auto ncols = table.num_columns(); - std::vector> columns(ncols); + static Result> TakeCCC( + const std::shared_ptr& values, + const std::shared_ptr& indices, const TakeOptions& options, + ExecContext* ctx) { + // XXX: for every chunk in indices, values are gathered from all chunks in values to + // form a new chunk in the result. Performing this concatenation is not ideal, but + // greatly simplifies the implementation before something more efficient is + // implemented. + ARROW_ASSIGN_OR_RAISE(auto values_array, + ChunkedArrayAsArray(values, ctx->memory_pool())); + std::vector args = {std::move(values_array), {}}; + std::vector> new_chunks; + new_chunks.resize(indices->num_chunks()); + for (int i = 0; i < indices->num_chunks(); i++) { + args[1] = indices->chunk(i); + // XXX: this loop can use TakeCAA once it can handle ChunkedArray + // without concatenating first + ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(args, options, ctx)); + new_chunks[i] = MakeArray(chunk); + } + return std::make_shared(std::move(new_chunks), values->type()); + } - for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(columns[j], TakeCAC(*table.column(j), indices, options, ctx)); + static Result> TakeACC(const Array& values, + const ChunkedArray& indices, + const TakeOptions& options, + ExecContext* ctx) { + auto num_chunks = indices.num_chunks(); + std::vector> new_chunks(num_chunks); + std::vector args = {values, {}}; + for (int i = 0; i < num_chunks; i++) { + // Take with that indices chunk + args[1] = indices.chunk(i); + ARROW_ASSIGN_OR_RAISE(auto chunk, TakeAAA(args, options, ctx)); + new_chunks[i] = MakeArray(chunk); + } + return std::make_shared(std::move(new_chunks), values.type()); } - return Table::Make(table.schema(), std::move(columns)); -} -Result> TakeTCT(const Table& table, const ChunkedArray& indices, - const TakeOptions& options, ExecContext* ctx) { - auto ncols = table.num_columns(); - std::vector> columns(ncols); - for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(columns[j], TakeCCC(*table.column(j), indices, options, ctx)); + static Result> TakeRAR(const RecordBatch& batch, + const Array& indices, + const TakeOptions& options, + ExecContext* ctx) { + auto ncols = batch.num_columns(); + auto nrows = indices.length(); + std::vector> columns(ncols); + std::vector args = {{}, indices}; + for (int j = 0; j < ncols; j++) { + args[0] = batch.column(j); + ARROW_ASSIGN_OR_RAISE(auto col_data, TakeAAA(args, options, ctx)); + columns[j] = MakeArray(col_data); + } + return RecordBatch::Make(batch.schema(), nrows, std::move(columns)); } - return Table::Make(table.schema(), std::move(columns)); -} -const FunctionDoc take_doc( - "Select values from an input based on indices from another array", - ("The output is populated with values from the input at positions\n" - "given by `indices`. Nulls in `indices` emit null in the output."), - {"input", "indices"}, "TakeOptions"); + static Result> TakeTAT(const std::shared_ptr& table, + const Array& indices, + const TakeOptions& options, + ExecContext* ctx) { + auto ncols = table->num_columns(); + std::vector> columns(ncols); -// Metafunction for dispatching to different Take implementations other than -// Array-Array. -// -// TODO: Revamp approach to executing Take operations. In addition to being -// overly complex dispatching, there is no parallelization. -class TakeMetaFunction : public MetaFunction { - public: - TakeMetaFunction() - : MetaFunction("take", Arity::Binary(), take_doc, GetDefaultTakeOptions()) {} + for (int j = 0; j < ncols; j++) { + ARROW_ASSIGN_OR_RAISE(columns[j], TakeCAC(table->column(j), indices, options, ctx)); + } + return Table::Make(table->schema(), std::move(columns)); + } + + static Result> TakeTCT( + const std::shared_ptr
& table, const std::shared_ptr& indices, + const TakeOptions& options, ExecContext* ctx) { + auto ncols = table->num_columns(); + std::vector> columns(ncols); + for (int j = 0; j < ncols; j++) { + ARROW_ASSIGN_OR_RAISE(columns[j], TakeCCC(table->column(j), indices, options, ctx)); + } + return Table::Make(table->schema(), std::move(columns)); + } + public: Result ExecuteImpl(const std::vector& args, const FunctionOptions* options, ExecContext* ctx) const override { @@ -680,16 +683,16 @@ class TakeMetaFunction : public MetaFunction { switch (args[0].kind()) { case Datum::ARRAY: if (index_kind == Datum::ARRAY) { - return TakeAAA(args[0].array(), args[1].array(), take_opts, ctx); + return TakeAAA(args, take_opts, ctx); } else if (index_kind == Datum::CHUNKED_ARRAY) { return TakeACC(*args[0].make_array(), *args[1].chunked_array(), take_opts, ctx); } break; case Datum::CHUNKED_ARRAY: if (index_kind == Datum::ARRAY) { - return TakeCAC(*args[0].chunked_array(), *args[1].make_array(), take_opts, ctx); + return TakeCAC(args[0].chunked_array(), *args[1].make_array(), take_opts, ctx); } else if (index_kind == Datum::CHUNKED_ARRAY) { - return TakeCCC(*args[0].chunked_array(), *args[1].chunked_array(), take_opts, + return TakeCCC(args[0].chunked_array(), args[1].chunked_array(), take_opts, ctx); } break; @@ -700,12 +703,13 @@ class TakeMetaFunction : public MetaFunction { break; case Datum::TABLE: if (index_kind == Datum::ARRAY) { - return TakeTAT(*args[0].table(), *args[1].make_array(), take_opts, ctx); + return TakeTAT(args[0].table(), *args[1].make_array(), take_opts, ctx); } else if (index_kind == Datum::CHUNKED_ARRAY) { - return TakeTCT(*args[0].table(), *args[1].chunked_array(), take_opts, ctx); + return TakeTCT(args[0].table(), args[1].chunked_array(), take_opts, ctx); } break; - default: + case Datum::NONE: + case Datum::SCALAR: break; } return Status::NotImplemented(