From ac80aea8c166cf6445f5eae5c4ff1ff01c0125a0 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 2 Jun 2020 10:07:38 -0500 Subject: [PATCH 1/3] Formalize "metafunction" concept. Add Take and Filter metafunctions. Port R, Python bindings modulo one failing R test Fix Table__to_dataframe for ChunkedArray with zero chunks Restore deprecated APIs so that GLib will compile and pass tests again CI fixes Try to fix R 4.0 toolchain issue again --- cpp/src/arrow/compute/api_vector.cc | 150 ++--------- cpp/src/arrow/compute/api_vector.h | 184 ++++---------- cpp/src/arrow/compute/cast.cc | 13 +- cpp/src/arrow/compute/function.cc | 6 + cpp/src/arrow/compute/function.h | 20 +- .../arrow/compute/kernels/vector_filter.cc | 68 ++++- cpp/src/arrow/compute/kernels/vector_take.cc | 169 ++++++++++++- .../arrow/compute/kernels/vector_take_test.cc | 33 ++- cpp/src/arrow/datum.cc | 9 + cpp/src/arrow/datum.h | 6 + python/pyarrow/_compute.pyx | 29 +++ python/pyarrow/array.pxi | 76 +----- python/pyarrow/compute.py | 62 +++++ python/pyarrow/includes/libarrow.pxd | 22 +- python/pyarrow/table.pxi | 234 ++---------------- r/R/array.R | 10 +- r/R/arrowExports.R | 44 ---- r/R/chunked-array.R | 16 +- r/R/record-batch.R | 5 +- r/R/table.R | 14 +- r/src/array_to_vector.cpp | 106 ++++---- r/src/arrowExports.cpp | 192 -------------- r/src/compute.cpp | 126 ---------- 23 files changed, 559 insertions(+), 1035 deletions(-) diff --git a/cpp/src/arrow/compute/api_vector.cc b/cpp/src/arrow/compute/api_vector.cc index fe7c7dbe813..d0d67e76fb2 100644 --- a/cpp/src/arrow/compute/api_vector.cc +++ b/cpp/src/arrow/compute/api_vector.cc @@ -21,7 +21,6 @@ #include #include -#include "arrow/array/concatenate.h" #include "arrow/compute/exec.h" #include "arrow/compute/kernels/vector_selection_internal.h" #include "arrow/datum.h" @@ -47,11 +46,6 @@ Result> SortToIndices(const Array& values, ExecContext* c return result.make_array(); } -Result Take(const Datum& values, const Datum& indices, const TakeOptions& options, - ExecContext* ctx) { - return CallFunction("take", {values, indices}, &options, ctx); -} - Result> Unique(const Datum& value, ExecContext* ctx) { ARROW_ASSIGN_OR_RAISE(Datum result, CallFunction("unique", {value}, ctx)); return result.make_array(); @@ -71,156 +65,66 @@ Result> ValueCounts(const Datum& value, ExecContext* ctx) return result.make_array(); } -// ---------------------------------------------------------------------- -// Filter with conveniences to filter RecordBatch, Table - -Result> FilterRecordBatch(const RecordBatch& batch, - const Datum& filter, - FilterOptions options, - ExecContext* ctx) { - if (!filter.is_array()) { - return Status::Invalid("Cannot filter a RecordBatch with a filter of kind ", - filter.kind()); - } - - // TODO: Rewrite this to convert to selection vector and use Take - std::vector> columns(batch.num_columns()); - for (int i = 0; i < batch.num_columns(); ++i) { - ARROW_ASSIGN_OR_RAISE(Datum out, - Filter(batch.column(i)->data(), filter, options, ctx)); - columns[i] = out.make_array(); - } - - int64_t out_length; - if (columns.size() == 0) { - out_length = - internal::FilterOutputSize(options.null_selection_behavior, *filter.make_array()); - } else { - out_length = columns[0]->length(); - } - return RecordBatch::Make(batch.schema(), out_length, columns); -} - -Result> FilterTable(const Table& table, const Datum& filter, - FilterOptions options, ExecContext* ctx) { - auto new_columns = table.columns(); - for (auto& column : new_columns) { - ARROW_ASSIGN_OR_RAISE(Datum out_column, Filter(column, filter, options, ctx)); - column = out_column.chunked_array(); - } - return Table::Make(table.schema(), std::move(new_columns)); +Result Filter(const Datum& values, const Datum& filter, + const FilterOptions& options, ExecContext* ctx) { + // Invoke metafunction which deals with Datum kinds other than just Array, + // ChunkedArray. + return CallFunction("filter", {values, filter}, &options, ctx); } -Result Filter(const Datum& values, const Datum& filter, FilterOptions options, - ExecContext* ctx) { - if (values.kind() == Datum::RECORD_BATCH) { - auto values_batch = values.record_batch(); - ARROW_ASSIGN_OR_RAISE(std::shared_ptr out_batch, - FilterRecordBatch(*values_batch, filter, options, ctx)); - return Datum(out_batch); - } else if (values.kind() == Datum::TABLE) { - ARROW_ASSIGN_OR_RAISE(std::shared_ptr out_table, - FilterTable(*values.table(), filter, options, ctx)); - return Datum(out_table); - } else { - return CallFunction("filter", {values, filter}, &options, ctx); - } +Result Take(const Datum& values, const Datum& filter, const TakeOptions& options, + ExecContext* ctx) { + // Invoke metafunction which deals with Datum kinds other than just Array, + // ChunkedArray. + return CallFunction("take", {values, filter}, &options, ctx); } -// ---------------------------------------------------------------------- -// Take invocation conveniences - Result> Take(const Array& values, const Array& indices, const TakeOptions& options, ExecContext* ctx) { - ARROW_ASSIGN_OR_RAISE(Datum out_datum, - Take(Datum(values.data()), Datum(indices.data()), options, ctx)); - return out_datum.make_array(); + ARROW_ASSIGN_OR_RAISE(Datum out, Take(Datum(values), Datum(indices), options, ctx)); + return out.make_array(); } +// ---------------------------------------------------------------------- +// Deprecated functions + Result> Take(const ChunkedArray& values, const Array& indices, const TakeOptions& options, ExecContext* ctx) { - auto num_chunks = values.num_chunks(); - std::vector> new_chunks(1); // Hard-coded 1 for now - std::shared_ptr current_chunk; - - // Case 1: `values` has a single chunk, so just use it - if (num_chunks == 1) { - current_chunk = values.chunk(0); - } else { - // TODO Case 2: See if all `indices` fall in the same chunk and call Array Take on it - // See - // https://github.com/apache/arrow/blob/6f2c9041137001f7a9212f244b51bc004efc29af/r/src/compute.cpp#L123-L151 - // TODO Case 3: If indices are sorted, can slice them and call Array Take - - // Case 4: Else, concatenate chunks and call Array Take - RETURN_NOT_OK(Concatenate(values.chunks(), default_memory_pool(), ¤t_chunk)); - } - // Call Array Take on our single chunk - ARROW_ASSIGN_OR_RAISE(new_chunks[0], Take(*current_chunk, indices, options, ctx)); - return std::make_shared(std::move(new_chunks)); + ARROW_ASSIGN_OR_RAISE(Datum result, Take(Datum(values), Datum(indices), options, ctx)); + return result.chunked_array(); } Result> Take(const ChunkedArray& values, const ChunkedArray& indices, const TakeOptions& options, ExecContext* ctx) { - auto num_chunks = indices.num_chunks(); - std::vector> new_chunks(num_chunks); - for (int i = 0; i < num_chunks; i++) { - // Take with that indices chunk - // Note that as currently implemented, this is inefficient because `values` - // will get concatenated on every iteration of this loop - ARROW_ASSIGN_OR_RAISE(std::shared_ptr current_chunk, - Take(values, *indices.chunk(i), options, ctx)); - // Concatenate the result to make a single array for this chunk - RETURN_NOT_OK( - Concatenate(current_chunk->chunks(), default_memory_pool(), &new_chunks[i])); - } - return std::make_shared(std::move(new_chunks)); + ARROW_ASSIGN_OR_RAISE(Datum result, Take(Datum(values), Datum(indices), options, ctx)); + return result.chunked_array(); } Result> Take(const Array& values, const ChunkedArray& indices, const TakeOptions& options, ExecContext* ctx) { - auto num_chunks = indices.num_chunks(); - std::vector> new_chunks(num_chunks); - for (int i = 0; i < num_chunks; i++) { - // Take with that indices chunk - ARROW_ASSIGN_OR_RAISE(new_chunks[i], Take(values, *indices.chunk(i), options, ctx)); - } - return std::make_shared(std::move(new_chunks)); + ARROW_ASSIGN_OR_RAISE(Datum result, Take(Datum(values), Datum(indices), options, ctx)); + return result.chunked_array(); } Result> Take(const RecordBatch& batch, const Array& indices, const TakeOptions& options, ExecContext* ctx) { - auto ncols = batch.num_columns(); - auto nrows = indices.length(); - std::vector> columns(ncols); - for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(columns[j], Take(*batch.column(j), indices, options, ctx)); - } - return RecordBatch::Make(batch.schema(), nrows, columns); + ARROW_ASSIGN_OR_RAISE(Datum result, Take(Datum(batch), Datum(indices), options, ctx)); + return result.record_batch(); } Result> Take(const Table& table, const Array& indices, const TakeOptions& options, ExecContext* ctx) { - auto ncols = table.num_columns(); - std::vector> columns(ncols); - - for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(columns[j], Take(*table.column(j), indices, options, ctx)); - } - return Table::Make(table.schema(), columns); + ARROW_ASSIGN_OR_RAISE(Datum result, Take(Datum(table), Datum(indices), options, ctx)); + return result.table(); } Result> Take(const Table& table, const ChunkedArray& indices, const TakeOptions& options, ExecContext* ctx) { - auto ncols = table.num_columns(); - std::vector> columns(ncols); - for (int j = 0; j < ncols; j++) { - ARROW_ASSIGN_OR_RAISE(columns[j], Take(*table.column(j), indices, options, ctx)); - } - return Table::Make(table.schema(), columns); + ARROW_ASSIGN_OR_RAISE(Datum result, Take(Datum(table), Datum(indices), options, ctx)); + return result.table(); } } // namespace compute diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index eb2a15dec3a..e02aad2c56f 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -57,12 +57,12 @@ struct FilterOptions : public FunctionOptions { /// \param[in] values array to filter /// \param[in] filter indicates which values should be filtered out /// \param[in] options configures null_selection_behavior -/// \param[in] context the function execution context, optional +/// \param[in] ctx the function execution context, optional /// \return the resulting datum ARROW_EXPORT Result Filter(const Datum& values, const Datum& filter, - FilterOptions options = FilterOptions::Defaults(), - ExecContext* context = NULLPTR); + const FilterOptions& options = FilterOptions::Defaults(), + ExecContext* ctx = NULLPTR); struct ARROW_EXPORT TakeOptions : public FunctionOptions { static TakeOptions Defaults() { return TakeOptions{}; } @@ -73,152 +73,18 @@ struct ARROW_EXPORT TakeOptions : public FunctionOptions { /// \param[in] values datum from which to take /// \param[in] indices which values to take /// \param[in] options options -/// \param[in] context the function execution context, optional +/// \param[in] ctx the function execution context, optional /// \return the resulting datum ARROW_EXPORT Result Take(const Datum& values, const Datum& indices, const TakeOptions& options = TakeOptions::Defaults(), - ExecContext* context = NULLPTR); + ExecContext* ctx = NULLPTR); -/// \brief Take from an array of values at indices in another array -/// -/// The output array will be of the same type as the input values -/// array, with elements taken from the values array at the given -/// indices. If an index is null then the taken element will be null. -/// -/// For example given values = ["a", "b", "c", null, "e", "f"] and -/// indices = [2, 1, null, 3], the output will be -/// = [values[2], values[1], null, values[3]] -/// = ["c", "b", null, null] -/// -/// \param[in] values array from which to take -/// \param[in] indices which values to take -/// \param[in] options options -/// \param[in] context the function execution context, optional -/// \return the resulting array +/// \brief Take with Array inputs and output ARROW_EXPORT Result> Take(const Array& values, const Array& indices, const TakeOptions& options = TakeOptions::Defaults(), - ExecContext* context = NULLPTR); - -/// \brief Take from a chunked array of values at indices in another array -/// -/// The output chunked array will be of the same type as the input values -/// array, with elements taken from the values array at the given -/// indices. If an index is null then the taken element will be null. -/// -/// For example given values = ["a", "b", "c", null, "e", "f"] and -/// indices = [2, 1, null, 3], the output will be -/// = [values[2], values[1], null, values[3]] -/// = ["c", "b", null, null] -/// -/// \param[in] values chunked array from which to take -/// \param[in] indices which values to take -/// \param[in] options options -/// \param[in] context the function execution context, optional -/// \return the resulting chunked array -/// NOTE: Experimental API -ARROW_EXPORT -Result> Take( - const ChunkedArray& values, const Array& indices, - const TakeOptions& options = TakeOptions::Defaults(), ExecContext* context = NULLPTR); - -/// \brief Take from a chunked array of values at indices in a chunked array -/// -/// The output chunked array will be of the same type as the input values -/// array, with elements taken from the values array at the given -/// indices. If an index is null then the taken element will be null. -/// The chunks in the output array will align with the chunks in the indices. -/// -/// For example given values = ["a", "b", "c", null, "e", "f"] and -/// indices = [2, 1, null, 3], the output will be -/// = [values[2], values[1], null, values[3]] -/// = ["c", "b", null, null] -/// -/// \param[in] values chunked array from which to take -/// \param[in] indices which values to take -/// \param[in] options options -/// \param[in] context the function execution context, optional -/// \return the resulting chunked array -/// NOTE: Experimental API -ARROW_EXPORT -Result> Take( - const ChunkedArray& values, const ChunkedArray& indices, - const TakeOptions& options = TakeOptions::Defaults(), ExecContext* context = NULLPTR); - -/// \brief Take from an array of values at indices in a chunked array -/// -/// The output chunked array will be of the same type as the input values -/// array, with elements taken from the values array at the given -/// indices. If an index is null then the taken element will be null. -/// The chunks in the output array will align with the chunks in the indices. -/// -/// For example given values = ["a", "b", "c", null, "e", "f"] and -/// indices = [2, 1, null, 3], the output will be -/// = [values[2], values[1], null, values[3]] -/// = ["c", "b", null, null] -/// -/// \param[in] values array from which to take -/// \param[in] indices which values to take -/// \param[in] options options -/// \param[in] context the function execution context, optional -/// \return the resulting chunked array -/// NOTE: Experimental API -ARROW_EXPORT -Result> Take( - const Array& values, const ChunkedArray& indices, - const TakeOptions& options = TakeOptions::Defaults(), ExecContext* context = NULLPTR); - -/// \brief Take from a record batch at indices in another array -/// -/// The output batch will have the same schema as the input batch, -/// with rows taken from the columns in the batch at the given -/// indices. If an index is null then the taken element will be null. -/// -/// \param[in] batch record batch from which to take -/// \param[in] indices which values to take -/// \param[in] options options -/// \param[in] context the function execution context, optional -/// \return the resulting record batch -/// NOTE: Experimental API -ARROW_EXPORT -Result> Take( - const RecordBatch& batch, const Array& indices, - const TakeOptions& options = TakeOptions::Defaults(), ExecContext* context = NULLPTR); - -/// \brief Take from a table at indices in an array -/// -/// The output table will have the same schema as the input table, -/// with rows taken from the columns in the table at the given -/// indices. If an index is null then the taken element will be null. -/// -/// \param[in] table table from which to take -/// \param[in] indices which values to take -/// \param[in] options options -/// \param[in] context the function execution context, optional -/// \return the resulting table -/// NOTE: Experimental API -ARROW_EXPORT -Result> Take(const Table& table, const Array& indices, - const TakeOptions& options = TakeOptions::Defaults(), - ExecContext* context = NULLPTR); - -/// \brief Take from a table at indices in a chunked array -/// -/// The output table will have the same schema as the input table, -/// with rows taken from the values array at the given -/// indices. If an index is null then the taken element will be null. -/// -/// \param[in] table table from which to take -/// \param[in] indices which values to take -/// \param[in] options options -/// \param[in] context the function execution context, optional -/// \return the resulting table -/// NOTE: Experimental API -ARROW_EXPORT -Result> Take(const Table& table, const ChunkedArray& indices, - const TakeOptions& options = TakeOptions::Defaults(), - ExecContext* context = NULLPTR); + ExecContext* ctx = NULLPTR); struct PartitionOptions : public FunctionOptions { explicit PartitionOptions(int64_t pivot) : pivot(pivot) {} @@ -305,5 +171,41 @@ Result> ValueCounts(const Datum& value, ARROW_EXPORT Result DictionaryEncode(const Datum& data, ExecContext* ctx = NULLPTR); +// ---------------------------------------------------------------------- +// Deprecated functions + +// TODO: Add deprecation warnings to these functions +// ARROW_DEPRECATED("Deprecated in 1.0.0. Use Datum-based version") + +ARROW_EXPORT +Result> Take( + const ChunkedArray& values, const Array& indices, + const TakeOptions& options = TakeOptions::Defaults(), ExecContext* context = NULLPTR); + +ARROW_EXPORT +Result> Take( + const ChunkedArray& values, const ChunkedArray& indices, + const TakeOptions& options = TakeOptions::Defaults(), ExecContext* context = NULLPTR); + +ARROW_EXPORT +Result> Take( + const Array& values, const ChunkedArray& indices, + const TakeOptions& options = TakeOptions::Defaults(), ExecContext* context = NULLPTR); + +ARROW_EXPORT +Result> Take( + const RecordBatch& batch, const Array& indices, + const TakeOptions& options = TakeOptions::Defaults(), ExecContext* context = NULLPTR); + +ARROW_EXPORT +Result> Take(const Table& table, const Array& indices, + const TakeOptions& options = TakeOptions::Defaults(), + ExecContext* context = NULLPTR); + +ARROW_EXPORT +Result> Take(const Table& table, const ChunkedArray& indices, + const TakeOptions& options = TakeOptions::Defaults(), + ExecContext* context = NULLPTR); + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/cast.cc b/cpp/src/arrow/compute/cast.cc index ac605fe72ed..59e9dc2f2ea 100644 --- a/cpp/src/arrow/compute/cast.cc +++ b/cpp/src/arrow/compute/cast.cc @@ -54,16 +54,11 @@ void InitCastTable() { void EnsureInitCastTable() { std::call_once(cast_table_initialized, InitCastTable); } -// A function that overrides Function::Execute to dispatch to the appropriate -// target-type-specific CastFunction -// -// This corresponds to the standard SQL CAST(expr AS target_type) -// -// As a "metafunction" this function has no kernels and is intended to be used -// through its Execute function -class CastMetaFunction : public ScalarFunction { +// Metafunction for dispatching to appropraite CastFunction. This corresponds +// to the standard SQL CAST(expr AS target_type) +class CastMetaFunction : public MetaFunction { public: - CastMetaFunction() : ScalarFunction("cast", Arity::Unary()) {} + CastMetaFunction() : MetaFunction("cast", Arity::Unary()) {} Result Execute(const std::vector& args, const FunctionOptions* options, ExecContext* ctx) const override { diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 881cd229b3b..42a2afcbc0a 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -163,5 +163,11 @@ Result ScalarAggregateFunction::DispatchExact( return DispatchExactImpl(*this, kernels_, values); } +Result MetaFunction::Execute(const std::vector& args, + const FunctionOptions* options, + ExecContext* ctx) const { + return Status::NotImplemented("Metafunctions must provide their own Execute method"); +} + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index a2971f1ee11..f07cd137b64 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -93,7 +93,11 @@ class ARROW_EXPORT Function { VECTOR, /// A function that computes scalar summary statistics from array input. - SCALAR_AGGREGATE + SCALAR_AGGREGATE, + + /// A function that dispatches to other functions and does not contain its + /// own kernels. + META }; virtual ~Function() = default; @@ -229,5 +233,19 @@ class ARROW_EXPORT ScalarAggregateFunction const std::vector& values) const; }; +/// \brief A function that dispatches to other functions. Must override +/// Function::Execute. +class ARROW_EXPORT MetaFunction : public Function { + public: + int num_kernels() const override { return 0; } + + Result Execute(const std::vector& args, const FunctionOptions* options, + ExecContext* ctx) const override; + + protected: + MetaFunction(std::string name, const Arity& arity) + : Function(std::move(name), Function::META, arity) {} +}; + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_filter.cc b/cpp/src/arrow/compute/kernels/vector_filter.cc index 74d84795a5f..71d6bfc8c1b 100644 --- a/cpp/src/arrow/compute/kernels/vector_filter.cc +++ b/cpp/src/arrow/compute/kernels/vector_filter.cc @@ -150,11 +150,74 @@ Status GetFilterKernel(const DataType& type, ArrayKernelExec* exec) { return Status::OK(); } +Result> FilterRecordBatch(const RecordBatch& batch, + const Datum& filter, + const FunctionOptions* options, + ExecContext* ctx) { + if (!filter.is_array()) { + return Status::Invalid("Cannot filter a RecordBatch with a filter of kind ", + filter.kind()); + } + + const auto& filter_opts = *static_cast(options); + // TODO: Rewrite this to convert to selection vector and use Take + std::vector> columns(batch.num_columns()); + for (int i = 0; i < batch.num_columns(); ++i) { + ARROW_ASSIGN_OR_RAISE(Datum out, + Filter(batch.column(i)->data(), filter, filter_opts, ctx)); + columns[i] = out.make_array(); + } + + int64_t out_length; + if (columns.size() == 0) { + out_length = + FilterOutputSize(filter_opts.null_selection_behavior, *filter.make_array()); + } else { + out_length = columns[0]->length(); + } + return RecordBatch::Make(batch.schema(), out_length, columns); +} + +Result> FilterTable(const Table& table, const Datum& filter, + const FunctionOptions* options, + ExecContext* ctx) { + auto new_columns = table.columns(); + for (auto& column : new_columns) { + ARROW_ASSIGN_OR_RAISE( + Datum out_column, + Filter(column, filter, *static_cast(options), ctx)); + column = out_column.chunked_array(); + } + return Table::Make(table.schema(), std::move(new_columns)); +} + +class FilterMetaFunction : public MetaFunction { + public: + FilterMetaFunction() : MetaFunction("filter", Arity::Binary()) {} + + Result Execute(const std::vector& args, const FunctionOptions* options, + ExecContext* ctx) const override { + if (args[0].kind() == Datum::RECORD_BATCH) { + auto values_batch = args[0].record_batch(); + ARROW_ASSIGN_OR_RAISE( + std::shared_ptr out_batch, + FilterRecordBatch(*args[0].record_batch(), args[1], options, ctx)); + return Datum(out_batch); + } else if (args[0].kind() == Datum::TABLE) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr
out_table, + FilterTable(*args[0].table(), args[1], options, ctx)); + return Datum(out_table); + } else { + return CallFunction("array_filter", args, options, ctx); + } + } +}; + void RegisterVectorFilter(FunctionRegistry* registry) { VectorKernel base; base.init = InitFilter; - auto filter = std::make_shared("filter", Arity::Binary()); + auto filter = std::make_shared("array_filter", Arity::Binary()); InputType filter_ty = InputType::Array(boolean()); OutputType out_ty(FirstType); @@ -172,6 +235,9 @@ void RegisterVectorFilter(FunctionRegistry* registry) { AddKernel(InputType::Array(value_ty->id()), *value_ty); } DCHECK_OK(registry->AddFunction(std::move(filter))); + + // Add take metafunction + DCHECK_OK(registry->AddFunction(std::make_shared())); } } // namespace internal diff --git a/cpp/src/arrow/compute/kernels/vector_take.cc b/cpp/src/arrow/compute/kernels/vector_take.cc index 73ce7fd1a13..127d7e5f768 100644 --- a/cpp/src/arrow/compute/kernels/vector_take.cc +++ b/cpp/src/arrow/compute/kernels/vector_take.cc @@ -15,8 +15,13 @@ // specific language governing permissions and limitations // under the License. +#include +#include + #include "arrow/array/array_base.h" #include "arrow/compute/api_vector.h" +#include "arrow/array/concatenate.h" +#include "arrow/builder.h" #include "arrow/compute/kernels/common.h" #include "arrow/compute/kernels/vector_selection_internal.h" #include "arrow/result.h" @@ -76,12 +81,167 @@ Status GetTakeKernel(const DataType& value_type, const DataType& index_type, return Status::OK(); } +// Shorthand naming of these functions +// A -> Array +// C -> ChunkedArray +// R -> RecordBatch +// T -> Table + +Result> TakeAA(const Array& values, const Array& indices, + const TakeOptions& options, ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(Datum result, + CallFunction("array_take", {values, indices}, &options, ctx)); + return result.make_array(); +} + +Result> TakeCA(const ChunkedArray& values, + const Array& indices, + const TakeOptions& options, + ExecContext* ctx) { + auto num_chunks = values.num_chunks(); + std::vector> new_chunks(1); // Hard-coded 1 for now + std::shared_ptr current_chunk; + + // Case 1: `values` has a single chunk, so just use it + if (num_chunks == 1) { + current_chunk = values.chunk(0); + } else { + // TODO Case 2: See if all `indices` fall in the same chunk and call Array Take on it + // See + // https://github.com/apache/arrow/blob/6f2c9041137001f7a9212f244b51bc004efc29af/r/src/compute.cpp#L123-L151 + // TODO Case 3: If indices are sorted, can slice them and call Array Take + + // Case 4: Else, concatenate chunks and call Array Take + RETURN_NOT_OK(Concatenate(values.chunks(), default_memory_pool(), ¤t_chunk)); + } + // Call Array Take on our single chunk + ARROW_ASSIGN_OR_RAISE(new_chunks[0], TakeAA(*current_chunk, indices, options, ctx)); + return std::make_shared(std::move(new_chunks)); +} + +Result> TakeCC(const ChunkedArray& values, + const ChunkedArray& indices, + const TakeOptions& options, + ExecContext* ctx) { + auto num_chunks = indices.num_chunks(); + std::vector> new_chunks(num_chunks); + for (int i = 0; i < num_chunks; i++) { + // Take with that indices chunk + // Note that as currently implemented, this is inefficient because `values` + // will get concatenated on every iteration of this loop + ARROW_ASSIGN_OR_RAISE(std::shared_ptr current_chunk, + TakeCA(values, *indices.chunk(i), options, ctx)); + // Concatenate the result to make a single array for this chunk + RETURN_NOT_OK( + Concatenate(current_chunk->chunks(), default_memory_pool(), &new_chunks[i])); + } + return std::make_shared(std::move(new_chunks)); +} + +Result> TakeAC(const Array& values, + const ChunkedArray& indices, + const TakeOptions& options, + ExecContext* ctx) { + auto num_chunks = indices.num_chunks(); + std::vector> new_chunks(num_chunks); + for (int i = 0; i < num_chunks; i++) { + // Take with that indices chunk + ARROW_ASSIGN_OR_RAISE(new_chunks[i], TakeAA(values, *indices.chunk(i), options, ctx)); + } + return std::make_shared(std::move(new_chunks)); +} + +Result> TakeRA(const RecordBatch& batch, + const Array& indices, + const TakeOptions& options, + ExecContext* ctx) { + auto ncols = batch.num_columns(); + auto nrows = indices.length(); + std::vector> columns(ncols); + for (int j = 0; j < ncols; j++) { + ARROW_ASSIGN_OR_RAISE(columns[j], TakeAA(*batch.column(j), indices, options, ctx)); + } + return RecordBatch::Make(batch.schema(), nrows, columns); +} + +Result> TakeTA(const Table& table, const Array& indices, + const TakeOptions& options, ExecContext* ctx) { + auto ncols = table.num_columns(); + std::vector> columns(ncols); + + for (int j = 0; j < ncols; j++) { + ARROW_ASSIGN_OR_RAISE(columns[j], TakeCA(*table.column(j), indices, options, ctx)); + } + return Table::Make(table.schema(), columns); +} + +Result> TakeTC(const Table& table, const ChunkedArray& indices, + const TakeOptions& options, ExecContext* ctx) { + auto ncols = table.num_columns(); + std::vector> columns(ncols); + for (int j = 0; j < ncols; j++) { + ARROW_ASSIGN_OR_RAISE(columns[j], TakeCC(*table.column(j), indices, options, ctx)); + } + return Table::Make(table.schema(), columns); +} + +// Metafunction for dispatching to different Take implementations other than +// Array-Array. +// +// TODO: Revamp approach to executing Take operations. In addition to being +// overly complex dispatching, there is no parallelization. +class TakeMetaFunction : public MetaFunction { + public: + TakeMetaFunction() : MetaFunction("take", Arity::Binary()) {} + + Result Execute(const std::vector& args, const FunctionOptions* options, + ExecContext* ctx) const override { + Datum::Kind index_kind = args[1].kind(); + const TakeOptions& take_opts = static_cast(*options); + switch (args[0].kind()) { + case Datum::ARRAY: + if (index_kind == Datum::ARRAY) { + return TakeAA(*args[0].make_array(), *args[1].make_array(), take_opts, ctx); + } else if (index_kind == Datum::CHUNKED_ARRAY) { + return TakeAC(*args[0].make_array(), *args[1].chunked_array(), take_opts, ctx); + } + break; + case Datum::CHUNKED_ARRAY: + if (index_kind == Datum::ARRAY) { + return TakeCA(*args[0].chunked_array(), *args[1].make_array(), take_opts, ctx); + } else if (index_kind == Datum::CHUNKED_ARRAY) { + return TakeCC(*args[0].chunked_array(), *args[1].chunked_array(), take_opts, + ctx); + } + break; + case Datum::RECORD_BATCH: + if (index_kind == Datum::ARRAY) { + return TakeRA(*args[0].record_batch(), *args[1].make_array(), take_opts, ctx); + } + break; + case Datum::TABLE: + if (index_kind == Datum::ARRAY) { + return TakeTA(*args[0].table(), *args[1].make_array(), take_opts, ctx); + } else if (index_kind == Datum::CHUNKED_ARRAY) { + return TakeTC(*args[0].table(), *args[1].chunked_array(), take_opts, ctx); + } + break; + default: + break; + } + return Status::NotImplemented( + "Unsupported types for take operation: " + "values=", + args[0].ToString(), "indices=", args[1].ToString()); + } +}; + void RegisterVectorTake(FunctionRegistry* registry) { VectorKernel base; base.init = InitTake; base.can_execute_chunkwise = false; - auto take = std::make_shared("take", Arity::Binary()); + auto array_take = std::make_shared("array_take", Arity::Binary()); OutputType out_ty(FirstType); auto AddKernel = [&](InputType value_ty, const DataType& example_value_ty, @@ -89,7 +249,7 @@ void RegisterVectorTake(FunctionRegistry* registry) { base.signature = KernelSignature::Make({value_ty, InputType::Array(index_ty)}, out_ty); DCHECK_OK(GetTakeKernel(example_value_ty, *index_ty, &base.exec)); - DCHECK_OK(take->AddKernel(base)); + DCHECK_OK(array_take->AddKernel(base)); }; for (const auto& value_ty : PrimitiveTypes()) { @@ -102,7 +262,10 @@ void RegisterVectorTake(FunctionRegistry* registry) { AddKernel(InputType::Array(value_ty->id()), *value_ty, index_ty); } } - DCHECK_OK(registry->AddFunction(std::move(take))); + DCHECK_OK(registry->AddFunction(std::move(array_take))); + + // Add take metafunction + DCHECK_OK(registry->AddFunction(std::make_shared())); } } // namespace internal diff --git a/cpp/src/arrow/compute/kernels/vector_take_test.cc b/cpp/src/arrow/compute/kernels/vector_take_test.cc index 1c3a19851c9..10c855697c1 100644 --- a/cpp/src/arrow/compute/kernels/vector_take_test.cc +++ b/cpp/src/arrow/compute/kernels/vector_take_test.cc @@ -118,7 +118,8 @@ class TestTakeKernelWithNumeric : public TestTakeKernel { void ValidateTake(const std::shared_ptr& values, const std::shared_ptr& indices_boxed) { - ASSERT_OK_AND_ASSIGN(std::shared_ptr taken, Take(*values, *indices_boxed)); + ASSERT_OK_AND_ASSIGN(Datum out, Take(values, indices_boxed)); + auto taken = out.make_array(); ASSERT_OK(taken->ValidateFull()); ASSERT_EQ(indices_boxed->length(), taken->length()); @@ -523,7 +524,10 @@ class TestTakeKernelWithRecordBatch : public TestTakeKernel { const std::shared_ptr& index_type, const std::string& indices, std::shared_ptr* out) { auto batch = RecordBatchFromJSON(schm, batch_json); - return Take(*batch, *ArrayFromJSON(index_type, indices)).Value(out); + ARROW_ASSIGN_OR_RAISE(Datum result, + Take(Datum(batch), Datum(ArrayFromJSON(index_type, indices)))); + *out = result.record_batch(); + return Status::OK(); } }; @@ -586,17 +590,20 @@ class TestTakeKernelWithChunkedArray : public TestTakeKernel { Status TakeWithArray(const std::shared_ptr& type, const std::vector& values, const std::string& indices, std::shared_ptr* out) { - return Take(*ChunkedArrayFromJSON(type, values), *ArrayFromJSON(int8(), indices)) - .Value(out); + ARROW_ASSIGN_OR_RAISE(Datum result, Take(ChunkedArrayFromJSON(type, values), + ArrayFromJSON(int8(), indices))); + *out = result.chunked_array(); + return Status::OK(); } Status TakeWithChunkedArray(const std::shared_ptr& type, const std::vector& values, const std::vector& indices, std::shared_ptr* out) { - return Take(*ChunkedArrayFromJSON(type, values), - *ChunkedArrayFromJSON(int8(), indices)) - .Value(out); + ARROW_ASSIGN_OR_RAISE(Datum result, Take(ChunkedArrayFromJSON(type, values), + ChunkedArrayFromJSON(int8(), indices))); + *out = result.chunked_array(); + return Status::OK(); } }; @@ -642,15 +649,21 @@ class TestTakeKernelWithTable : public TestTakeKernel
{ Status TakeWithArray(const std::shared_ptr& schm, const std::vector& values, const std::string& indices, std::shared_ptr
* out) { - return Take(*TableFromJSON(schm, values), *ArrayFromJSON(int8(), indices)).Value(out); + ARROW_ASSIGN_OR_RAISE(Datum result, Take(Datum(TableFromJSON(schm, values)), + Datum(ArrayFromJSON(int8(), indices)))); + *out = result.table(); + return Status::OK(); } Status TakeWithChunkedArray(const std::shared_ptr& schm, const std::vector& values, const std::vector& indices, std::shared_ptr
* out) { - return Take(*TableFromJSON(schm, values), *ChunkedArrayFromJSON(int8(), indices)) - .Value(out); + ARROW_ASSIGN_OR_RAISE(Datum result, + Take(Datum(TableFromJSON(schm, values)), + Datum(ChunkedArrayFromJSON(int8(), indices)))); + *out = result.table(); + return Status::OK(); } }; diff --git a/cpp/src/arrow/datum.cc b/cpp/src/arrow/datum.cc index 3167d9d5dd7..42270fc7bad 100644 --- a/cpp/src/arrow/datum.cc +++ b/cpp/src/arrow/datum.cc @@ -68,6 +68,15 @@ Datum::Datum(uint64_t value) : value(std::make_shared(value)) {} Datum::Datum(float value) : value(std::make_shared(value)) {} Datum::Datum(double value) : value(std::make_shared(value)) {} +Datum::Datum(const ChunkedArray& value) + : value(std::make_shared(value.chunks(), value.type())) {} + +Datum::Datum(const Table& value) + : value(Table::Make(value.schema(), value.columns(), value.num_rows())) {} + +Datum::Datum(const RecordBatch& value) + : value(RecordBatch::Make(value.schema(), value.num_rows(), value.columns())) {} + std::shared_ptr Datum::make_array() const { DCHECK_EQ(Datum::ARRAY, this->kind()); return MakeArray(util::get>(this->value)); diff --git a/cpp/src/arrow/datum.h b/cpp/src/arrow/datum.h index 53b47db2040..a25ee5b024c 100644 --- a/cpp/src/arrow/datum.h +++ b/cpp/src/arrow/datum.h @@ -128,6 +128,12 @@ struct ARROW_EXPORT Datum { Datum(std::shared_ptr
value); // NOLINT implicit conversion Datum(std::vector value); // NOLINT implicit conversion + // Explicit constructors from const-refs. Can be expensive, prefer the + // shared_ptr constructors + explicit Datum(const ChunkedArray& value); + explicit Datum(const RecordBatch& value); + explicit Datum(const Table& value); + // Cast from subtypes of Array to Datum template ::value>> Datum(const std::shared_ptr& value) // NOLINT implicit conversion diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index 481a1887a24..a2de8ec492d 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -20,6 +20,9 @@ from pyarrow.compat import frombytes, tobytes, ordered_dict from pyarrow.lib cimport * from pyarrow.includes.libarrow cimport * +import pyarrow.lib as lib + +import numpy as np cdef wrap_scalar_function(const shared_ptr[CFunction]& sp_func): cdef ScalarFunction func = ScalarFunction.__new__(ScalarFunction) @@ -41,6 +44,14 @@ cdef wrap_scalar_aggregate_function(const shared_ptr[CFunction]& sp_func): return func +cdef wrap_meta_function(const shared_ptr[CFunction]& sp_func): + cdef MetaFunction func = ( + MetaFunction.__new__(MetaFunction) + ) + func.init(sp_func) + return func + + cdef wrap_function(const shared_ptr[CFunction]& sp_func): if sp_func.get() == NULL: raise ValueError('Function was NULL') @@ -52,6 +63,8 @@ cdef wrap_function(const shared_ptr[CFunction]& sp_func): return wrap_vector_function(sp_func) elif c_kind == FunctionKind_SCALAR_AGGREGATE: return wrap_scalar_aggregate_function(sp_func) + elif c_kind == FunctionKind_META: + return wrap_meta_function(sp_func) else: raise NotImplementedError("Unknown Function::Kind") @@ -218,14 +231,30 @@ cdef class ScalarAggregateFunction(Function): return [wrap_scalar_aggregate_kernel(k) for k in kernels] +cdef class MetaFunction(Function): + cdef: + const CMetaFunction* func + + cdef void init(self, const shared_ptr[CFunction]& sp_func) except *: + Function.init(self, sp_func) + self.func = sp_func.get() + + cdef _pack_compute_args(object values, vector[CDatum]* out): for val in values: + if isinstance(val, (list, np.ndarray)): + val = lib.asarray(val) + if isinstance(val, Array): out.push_back(CDatum(( val).sp_array)) elif isinstance(val, ChunkedArray): out.push_back(CDatum(( val).sp_chunked_array)) elif isinstance(val, ScalarValue): out.push_back(CDatum(( val).sp_scalar)) + elif isinstance(val, RecordBatch): + out.push_back(CDatum(( val).sp_batch)) + elif isinstance(val, Table): + out.push_back(CDatum((
val).sp_table)) else: raise TypeError(type(val)) diff --git a/python/pyarrow/array.pxi b/python/pyarrow/array.pxi index 0b6ad3df71e..e8888a8113a 100644 --- a/python/pyarrow/array.pxi +++ b/python/pyarrow/array.pxi @@ -921,81 +921,15 @@ cdef class Array(_PandasConvertible): def take(self, object indices): """ - Take elements from an array. - - The resulting array will be of the same type as the input array, with - elements taken from the input array at the given indices. If an index - is null then the taken element will be null. - - Parameters - ---------- - indices : Array - The indices of the values to extract. Array needs to be of - integer type. - - Returns - ------- - Array - - Examples - -------- - - >>> import pyarrow as pa - >>> arr = pa.array(["a", "b", "c", None, "e", "f"]) - >>> indices = pa.array([0, None, 4, 3]) - >>> arr.take(indices) - - [ - "a", - null, - "e", - null - ] + Select values from an array. See pyarrow.compute.take for full usage. """ - return _pc().call_function('take', [self, asarray(indices)]) + return _pc().take(self, indices) def filter(self, Array mask, null_selection_behavior='drop'): """ - Filter the array with a boolean mask. - - Parameters - ---------- - mask : Array - The boolean mask indicating which values to extract. - null_selection_behavior : str, default 'drop' - Configure the behavior on encountering a null slot in the mask. - Allowed values are 'drop' and 'emit_null'. - - - 'drop': nulls will be treated as equivalent to False. - - 'emit_null': nulls will result in a null in the output. - - Returns - ------- - Array - - Examples - -------- - - >>> import pyarrow as pa - >>> arr = pa.array(["a", "b", "c", None, "e"]) - >>> mask = pa.array([True, False, None, False, True]) - >>> arr.filter(mask) - - [ - "a", - "e" - ] - >>> arr.filter(mask, null_selection_behavior='emit_null') - - [ - "a", - null, - "e" - ] - """ - pc = _pc() - options = pc.FilterOptions(null_selection_behavior) - return pc.call_function('filter', [self, mask], options) + Select values from an array. See pyarrow.compute.filter for full usage. + """ + return _pc().filter(self, mask, null_selection_behavior) def _to_pandas(self, options, **kwargs): return _array_like_to_pandas(self, options) diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py index d7c063ab0a5..a9c1d0d9a2a 100644 --- a/python/pyarrow/compute.py +++ b/python/pyarrow/compute.py @@ -107,3 +107,65 @@ def sum(array): sum : pyarrow.Scalar """ return call_function('sum', [array]) + + +def filter(data, mask, null_selection_behavior='drop'): + """ + Select values (or records) from array- or table-like data given boolean + filter, where true values are selected. + + Parameters + ---------- + data : Array, ChunkedArray, RecordBatch, or Table + mask : Array, ChunkedArray + Must be of boolean type + null_selection_behavior : str, default 'drop' + Configure the behavior on encountering a null slot in the mask. + Allowed values are 'drop' and 'emit_null'. + + - 'drop': nulls will be treated as equivalent to False. + - 'emit_null': nulls will result in a null in the output. + + Returns + ------- + result : depends on inputs + + Examples + -------- + >>> import pyarrow as pa + >>> arr = pa.array(["a", "b", "c", None, "e"]) + >>> mask = pa.array([True, False, None, False, True]) + >>> arr.filter(mask) + + [ + "a", + "e" + ] + >>> arr.filter(mask, null_selection_behavior='emit_null') + + [ + "a", + null, + "e" + ] + """ + options = FilterOptions(null_selection_behavior) + return call_function('filter', [data, mask], options) + + +def take(data, indices): + """ + Select values (or records) from array- or table-like data given integer + selection indices. + + Parameters + ---------- + data : Array, ChunkedArray, RecordBatch, or Table + indices : Array, ChunkedArray + Must be of integer type + + Returns + ------- + result : depends on inputs + """ + return call_function('take', [data, indices]) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 450cb4373f4..e0b981d3032 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1485,6 +1485,8 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: FunctionKind_VECTOR" arrow::compute::Function::VECTOR" FunctionKind_SCALAR_AGGREGATE \ " arrow::compute::Function::SCALAR_AGGREGATE" + FunctionKind_META \ + " arrow::compute::Function::META" cdef cppclass CFunctionOptions" arrow::compute::FunctionOptions": pass @@ -1508,6 +1510,9 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: (CFunction): vector[const CScalarAggregateKernel*] kernels() const + cdef cppclass CMetaFunction" arrow::compute::MetaFunction"(CFunction): + pass + cdef cppclass CFunctionRegistry" arrow::compute::FunctionRegistry": CResult[shared_ptr[CFunction]] GetFunction( const c_string& name) const @@ -1571,23 +1576,6 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: shared_ptr[CTable] table() shared_ptr[CScalar] scalar() - CResult[CDatum] Take(const CDatum& values, const CDatum& indices, - const CTakeOptions& options) - - CResult[shared_ptr[CChunkedArray]] Take(const CChunkedArray& values, - const CArray& indices, - const CTakeOptions& options) - CResult[shared_ptr[CRecordBatch]] Take(const CRecordBatch& batch, - const CArray& indices, - const CTakeOptions& options) - CResult[shared_ptr[CTable]] Take(const CTable& table, - const CArray& indices, - const CTakeOptions& options) - - # Filter clashes with gandiva.pyx::Filter - CResult[CDatum] FilterKernel" arrow::compute::Filter"( - const CDatum& values, const CDatum& filter, CFilterOptions options) - cdef extern from "arrow/python/api.h" namespace "arrow::py": # Requires GIL diff --git a/python/pyarrow/table.pxi b/python/pyarrow/table.pxi index afcece54920..35a4b6ef670 100644 --- a/python/pyarrow/table.pxi +++ b/python/pyarrow/table.pxi @@ -328,90 +328,17 @@ cdef class ChunkedArray(_PandasConvertible): def filter(self, mask, object null_selection_behavior="drop"): """ - Filter the chunked array with a boolean mask. - - Parameters - ---------- - mask : Array or ChunkedArray - The boolean mask indicating which values to extract. - null_selection_behavior : str, default 'drop' - Configure the behavior on encountering a null slot in the mask. - Allowed values are 'drop' and 'emit_null'. - - - 'drop': nulls will be treated as equivalent to False. - - 'emit_null': nulls will result in a null in the output. - - Returns - ------- - ChunkedArray - - Examples - -------- - >>> import pyarrow as pa - >>> arr = pa.chunked_array([["a", "b"], ["c", None, "e"]]) - >>> mask = pa.chunked_array([[True, False], [None, False, True]]) - - >>> arr.filter(mask) - - [ - [ - "a" - ], - [ - "e" - ] - ] + Select values from a chunked array. See pyarrow.compute.filter for full + usage. """ - cdef: - CDatum filter - CDatum out - CFilterOptions options - - options = _convert_filter_option(null_selection_behavior) - - mask = asarray(mask) - if isinstance(mask, Array): - filter = CDatum(( mask).sp_array) - else: - filter = CDatum(( mask).sp_chunked_array) - - with nogil: - out = GetResultValue( - FilterKernel(CDatum(self.sp_chunked_array), - filter, options)) - - return wrap_datum(out) + return _pc().filter(self, mask, null_selection_behavior) def take(self, object indices): """ - Take elements from a chunked array. - - The resulting array will be of the same type as the input array, with - elements taken from the input array at the given indices. If an index - is null then the taken element will be null. - - Parameters - ---------- - indices : Array - The indices of the values to extract. Array needs to be of - integer type. - - Returns - ------- - ChunkedArray + Select values from a chunked array. See pyarrow.compute.take for full + usage. """ - cdef: - cdef CTakeOptions options - cdef shared_ptr[CChunkedArray] out - cdef Array c_indices - - c_indices = asarray(indices) - - with nogil: - out = GetResultValue(Take(deref(self.sp_chunked_array), - deref(c_indices.sp_array), options)) - - return pyarrow_wrap_chunked_array(out) + return _pc().take(self, indices) @property def num_chunks(self): @@ -459,25 +386,6 @@ cdef class ChunkedArray(_PandasConvertible): return result -# TODO: ARROW-8916, delete this once there is a Function registered for -# filtering types other than Array, ChunkedArray -cdef CFilterOptions _convert_filter_option(object null_selection_behavior): - cdef CFilterOptions options - - if null_selection_behavior == 'drop': - options.null_selection_behavior = \ - CFilterNullSelectionBehavior_DROP - elif null_selection_behavior == 'emit_null': - options.null_selection_behavior = \ - CFilterNullSelectionBehavior_EMIT_NULL - else: - raise ValueError( - '"{}" is not a valid null_selection_behavior'.format( - null_selection_behavior) - ) - return options - - def chunked_array(arrays, type=None): """ Construct chunked array from list of array-like objects @@ -836,36 +744,10 @@ cdef class RecordBatch(_PandasConvertible): def filter(self, Array mask, object null_selection_behavior="drop"): """ - Filter the record batch with a boolean mask. - - Parameters - ---------- - mask : Array - The boolean mask indicating which rows to extract. - null_selection_behavior : str, default 'drop' - Configure the behavior on encountering a null slot in the mask. - Allowed values are 'drop' and 'emit_null'. - - - 'drop': nulls will be treated as equivalent to False. - - 'emit_null': nulls will result in a null in the output. - - Returns - ------- - RecordBatch + Select record from a record batch. See pyarrow.compute.filter for full + usage. """ - cdef: - CDatum out - CFilterOptions options - - options = _convert_filter_option(null_selection_behavior) - - with nogil: - out = GetResultValue( - FilterKernel(CDatum(self.sp_batch), - CDatum(mask.sp_array), options) - ) - - return wrap_datum(out) + return _pc().filter(self, mask, null_selection_behavior) def equals(self, object other, bint check_metadata=False): """ @@ -897,34 +779,10 @@ cdef class RecordBatch(_PandasConvertible): def take(self, object indices): """ - Take rows from a RecordBatch. - - The resulting batch contains rows taken from the input batch at the - given indices. If an index is null then all the cells in that row - will be null. - - Parameters - ---------- - indices : Array - The indices of the values to extract. Array needs to be of - integer type. - Returns - ------- - RecordBatch + Select records from an RecordBatch. See pyarrow.compute.take for full + usage. """ - cdef: - CTakeOptions options - shared_ptr[CRecordBatch] out - CRecordBatch* this_batch = self.batch - Array c_indices - - c_indices = asarray(indices) - - with nogil: - out = GetResultValue(Take(deref(this_batch), - deref(c_indices.sp_array), options)) - - return pyarrow_wrap_batch(out) + return _pc().take(self, indices) def to_pydict(self): """ @@ -1255,74 +1113,16 @@ cdef class Table(_PandasConvertible): def filter(self, mask, object null_selection_behavior="drop"): """ - Filter the rows of the table with a boolean mask. - - Parameters - ---------- - mask : Array or ChunkedArray - The boolean mask indicating which rows to extract. - null_selection_behavior : str, default 'drop' - Configure the behavior on encountering a null slot in the mask. - Allowed values are 'drop' and 'emit_null'. - - - 'drop': nulls will be treated as equivalent to False. - - 'emit_null': nulls will result in a null in the output. - - Returns - ------- - Table + Select records from a Table. See pyarrow.compute.filter for full usage. """ - cdef: - CDatum filter - CDatum out - CFilterOptions options - - options = _convert_filter_option(null_selection_behavior) - - mask = asarray(mask) - if isinstance(mask, Array): - filter = CDatum(( mask).sp_array) - else: - filter = CDatum(( mask).sp_chunked_array) - - with nogil: - out = GetResultValue( - FilterKernel(CDatum(self.sp_table), - filter, options) - ) - - return wrap_datum(out) + return _pc().filter(self, mask, null_selection_behavior) def take(self, object indices): """ - Take rows from a Table. - - The resulting table contains rows taken from the input table at the - given indices. If an index is null then all the cells in that row - will be null. - - Parameters - ---------- - indices : Array - The indices of the values to extract. Array needs to be of - integer type. - - Returns - ------- - Table + Select records from an Table. See pyarrow.compute.take for full + usage. """ - cdef: - CTakeOptions options - shared_ptr[CTable] out - Array c_indices - - c_indices = asarray(indices) - - with nogil: - out = GetResultValue(Take(deref(self.table), - deref(c_indices.sp_array), options)) - - return pyarrow_wrap_table(out) + return _pc().take(self, indices) def replace_schema_metadata(self, metadata=None): """ diff --git a/r/R/array.R b/r/R/array.R index 4cce6114646..f17ea56cb01 100644 --- a/r/R/array.R +++ b/r/R/array.R @@ -127,13 +127,13 @@ Array <- R6Class("Array", if (is.integer(i)) { i <- Array$create(i) } + # ARROW-9001: autoboxing in call_function + result <- call_function("take", self, i) if (inherits(i, "ChunkedArray")) { - # Invalid: Kernel does not support chunked array arguments - # so use the old method - return(shared_ptr(ChunkedArray, Array__TakeChunked(self, i))) + return(shared_ptr(ChunkedArray, result)) + } else { + Array$create(result) } - assert_is(i, "Array") - Array$create(call_function("take", self, i)) }, Filter = function(i, keep_na = TRUE) { if (is.logical(i)) { diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 9830673eb18..59b1fa21f47 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -272,50 +272,6 @@ Table__cast <- function(table, schema, options){ .Call(`_arrow_Table__cast` , table, schema, options) } -Array__TakeChunked <- function(values, indices){ - .Call(`_arrow_Array__TakeChunked` , values, indices) -} - -RecordBatch__Take <- function(batch, indices){ - .Call(`_arrow_RecordBatch__Take` , batch, indices) -} - -ChunkedArray__Take <- function(values, indices){ - .Call(`_arrow_ChunkedArray__Take` , values, indices) -} - -ChunkedArray__TakeChunked <- function(values, indices){ - .Call(`_arrow_ChunkedArray__TakeChunked` , values, indices) -} - -Table__Take <- function(table, indices){ - .Call(`_arrow_Table__Take` , table, indices) -} - -Table__TakeChunked <- function(table, indices){ - .Call(`_arrow_Table__TakeChunked` , table, indices) -} - -RecordBatch__Filter <- function(batch, filter, keep_na){ - .Call(`_arrow_RecordBatch__Filter` , batch, filter, keep_na) -} - -ChunkedArray__Filter <- function(values, filter, keep_na){ - .Call(`_arrow_ChunkedArray__Filter` , values, filter, keep_na) -} - -ChunkedArray__FilterChunked <- function(values, filter, keep_na){ - .Call(`_arrow_ChunkedArray__FilterChunked` , values, filter, keep_na) -} - -Table__Filter <- function(table, filter, keep_na){ - .Call(`_arrow_Table__Filter` , table, filter, keep_na) -} - -Table__FilterChunked <- function(table, filter, keep_na){ - .Call(`_arrow_Table__FilterChunked` , table, filter, keep_na) -} - compute__CallFunction <- function(func_name, args, options){ .Call(`_arrow_compute__CallFunction` , func_name, args, options) } diff --git a/r/R/chunked-array.R b/r/R/chunked-array.R index 34e383c0835..f3a3eaec9fb 100644 --- a/r/R/chunked-array.R +++ b/r/R/chunked-array.R @@ -75,23 +75,15 @@ ChunkedArray <- R6Class("ChunkedArray", inherit = ArrowObject, if (is.integer(i)) { i <- Array$create(i) } - # Invalid: Kernel does not support chunked array arguments - # so use the old method for both cases - if (inherits(i, "ChunkedArray")) { - return(shared_ptr(ChunkedArray, ChunkedArray__TakeChunked(self, i))) - } - assert_is(i, "Array") - return(shared_ptr(ChunkedArray, ChunkedArray__Take(self, i))) + # Invalid: Tried executing function with non-value type: ChunkedArray + # so use old methods + shared_ptr(ChunkedArray, call_function("take", self, i)) }, Filter = function(i, keep_na = TRUE) { if (is.logical(i)) { i <- Array$create(i) } - if (inherits(i, "ChunkedArray")) { - return(shared_ptr(ChunkedArray, ChunkedArray__FilterChunked(self, i, keep_na))) - } - assert_is(i, "Array") - shared_ptr(ChunkedArray, ChunkedArray__Filter(self, i, keep_na)) + shared_ptr(ChunkedArray, call_function("filter", self, i, options = list(keep_na = keep_na))) }, cast = function(target_type, safe = TRUE, options = cast_options(safe)) { assert_is(options, "CastOptions") diff --git a/r/R/record-batch.R b/r/R/record-batch.R index a75c5cbd78e..78d28cf3cb4 100644 --- a/r/R/record-batch.R +++ b/r/R/record-batch.R @@ -121,14 +121,13 @@ RecordBatch <- R6Class("RecordBatch", inherit = ArrowObject, assert_is(i, "Array") # Invalid: Tried executing function with non-value type: RecordBatch # so use old methods - shared_ptr(RecordBatch, RecordBatch__Take(self, i)) + shared_ptr(RecordBatch, call_function("take", self, i)) }, Filter = function(i, keep_na = TRUE) { if (is.logical(i)) { i <- Array$create(i) } - assert_is(i, "Array") - shared_ptr(RecordBatch, RecordBatch__Filter(self, i, keep_na)) + shared_ptr(RecordBatch, call_function("filter", self, i, options = list(keep_na = keep_na))) }, serialize = function() ipc___SerializeRecordBatch__Raw(self), ToString = function() ToString_tabular(self), diff --git a/r/R/table.R b/r/R/table.R index 331d6dd853b..785a10979c6 100644 --- a/r/R/table.R +++ b/r/R/table.R @@ -146,21 +146,15 @@ Table <- R6Class("Table", inherit = ArrowObject, if (is.integer(i)) { i <- Array$create(i) } - if (inherits(i, "ChunkedArray")) { - return(shared_ptr(Table, Table__TakeChunked(self, i))) - } - assert_is(i, "Array") - shared_ptr(Table, Table__Take(self, i)) + # Invalid: Tried executing function with non-value type: Table + # so use old methods + shared_ptr(Table, call_function("take", self, i)) }, Filter = function(i, keep_na = TRUE) { if (is.logical(i)) { i <- Array$create(i) } - if (inherits(i, "ChunkedArray")) { - return(shared_ptr(Table, Table__FilterChunked(self, i, keep_na))) - } - assert_is(i, "Array") - shared_ptr(Table, Table__Filter(self, i, keep_na)) + shared_ptr(Table, call_function("filter", self, i, options = list(keep_na = keep_na))) }, Equals = function(other, check_metadata = FALSE, ...) { diff --git a/r/src/array_to_vector.cpp b/r/src/array_to_vector.cpp index 9872ecde0a4..28c671906d3 100644 --- a/r/src/array_to_vector.cpp +++ b/r/src/array_to_vector.cpp @@ -24,6 +24,9 @@ #include namespace arrow { + +using internal::checked_cast; + namespace r { using Rcpp::default_value; @@ -35,7 +38,7 @@ using Rcpp::StringVector_; class Converter { public: - explicit Converter(const ArrayVector& arrays) : arrays_(arrays) {} + explicit Converter(ArrayVector arrays) : arrays_(std::move(arrays)) {} virtual ~Converter() {} @@ -88,10 +91,11 @@ class Converter { } // Converter factory - static std::shared_ptr Make(const ArrayVector& arrays); + static std::shared_ptr Make(const std::shared_ptr& type, + ArrayVector arrays); protected: - const ArrayVector& arrays_; + ArrayVector arrays_; }; // data[start:(start+n)] = NA @@ -128,8 +132,9 @@ Status SomeNull_Ingest(SEXP data, R_xlen_t start, R_xlen_t n, } // Allocate + Ingest -SEXP ArrayVector__as_vector(R_xlen_t n, const ArrayVector& arrays) { - auto converter = Converter::Make(arrays); +SEXP ArrayVector__as_vector(R_xlen_t n, const std::shared_ptr& type, + const ArrayVector& arrays) { + auto converter = Converter::Make(type, arrays); Shield data(converter->Allocate(n)); StopIfNotOk(converter->IngestSerial(data)); return data; @@ -274,7 +279,7 @@ class Converter_Dictionary : public Converter { SEXP Allocate(R_xlen_t n) const { IntegerVector data(no_init(n)); - auto dict_array = static_cast(Converter::arrays_[0].get()); + auto dict_array = static_cast(this->arrays_[0].get()); auto dict = dict_array->dictionary(); auto indices = dict_array->indices(); switch (indices->type_id()) { @@ -300,8 +305,8 @@ class Converter_Dictionary : public Converter { // TODO (npr): this coercion should be optional, "dictionariesAsFactors" ;) // Alternative: preserve the logical type of the dictionary values // (e.g. if dict is timestamp, return a POSIXt R vector, not factor) - data.attr("levels") = - Rf_coerceVector(ArrayVector__as_vector(dict->length(), {dict}), STRSXP); + data.attr("levels") = Rf_coerceVector( + ArrayVector__as_vector(dict->length(), dict->type(), {dict}), STRSXP); if (ordered) { data.attr("class") = Rcpp::CharacterVector::create("ordered", "factor"); } else { @@ -356,18 +361,17 @@ class Converter_Dictionary : public Converter { class Converter_Struct : public Converter { public: explicit Converter_Struct(const ArrayVector& arrays) : Converter(arrays), converters() { - auto first_array = - internal::checked_cast(Converter::arrays_[0].get()); + auto first_array = checked_cast(this->arrays_[0].get()); int nf = first_array->num_fields(); for (int i = 0; i < nf; i++) { - converters.push_back(Converter::Make({first_array->field(i)})); + converters.push_back( + Converter::Make(first_array->field(i)->type(), {first_array->field(i)})); } } SEXP Allocate(R_xlen_t n) const { // allocate a data frame column to host each array - auto first_array = - internal::checked_cast(Converter::arrays_[0].get()); + auto first_array = checked_cast(this->arrays_[0].get()); auto type = first_array->struct_type(); int nf = first_array->num_fields(); Rcpp::List out(nf); @@ -396,7 +400,7 @@ class Converter_Struct : public Converter { Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n) const { - auto struct_array = internal::checked_cast(array.get()); + auto struct_array = checked_cast(array.get()); int nf = converters.size(); // Flatten() deals with merging of nulls auto arrays = ValueOrStop(struct_array->Flatten(default_memory_pool())); @@ -517,8 +521,8 @@ class Converter_Timestamp : public Converter_Time { SEXP Allocate(R_xlen_t n) const { Rcpp::NumericVector data(no_init(n)); Rf_classgets(data, arrow::r::data::classes_POSIXct); - auto array = internal::checked_cast(Converter::arrays_[0].get()); - auto array_type = internal::checked_cast(array->type().get()); + auto array = checked_cast(this->arrays_[0].get()); + auto array_type = checked_cast(array->type().get()); data.attr("tzone") = array_type->timezone(); return data; } @@ -537,8 +541,7 @@ class Converter_Decimal : public Converter { Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n) const { auto p_data = Rcpp::internal::r_vector_start(data) + start; - const auto& decimals_arr = - internal::checked_cast(*array); + const auto& decimals_arr = checked_cast(*array); if (array->null_count()) { internal::BitmapReader bitmap_reader(array->null_bitmap()->data(), array->offset(), @@ -571,8 +574,7 @@ class Converter_List : public Converter { Status Ingest_some_nulls(SEXP data, const std::shared_ptr& array, R_xlen_t start, R_xlen_t n) const { - using internal::checked_cast; - auto list_array = checked_cast(array.get()); + auto list_array = checked_cast(array.get()); auto values_array = list_array->values(); auto ingest_one = [&](R_xlen_t i) { @@ -659,93 +661,96 @@ class Converter_Null : public Converter { } }; -std::shared_ptr Converter::Make(const ArrayVector& arrays) { +std::shared_ptr Converter::Make(const std::shared_ptr& type, + ArrayVector arrays) { if (arrays.empty()) { - Rcpp::stop(tfm::format("Must have at least one array to create a converter")); + // slight hack for the 0-row case since the converters expect at least one + // chunk to process. + arrays.push_back(ValueOrStop(arrow::MakeArrayOfNull(type, 0))); } - auto type = arrays[0]->type(); - switch (type->id()) { // direct support case Type::INT32: - return std::make_shared>(arrays); + return std::make_shared>(std::move(arrays)); case Type::DOUBLE: - return std::make_shared>(arrays); + return std::make_shared>( + std::move(arrays)); // need to handle 1-bit case case Type::BOOL: - return std::make_shared(arrays); + return std::make_shared(std::move(arrays)); // handle memory dense strings case Type::STRING: - return std::make_shared(arrays); + return std::make_shared(std::move(arrays)); case Type::DICTIONARY: - return std::make_shared(arrays); + return std::make_shared(std::move(arrays)); case Type::DATE32: - return std::make_shared(arrays); + return std::make_shared(std::move(arrays)); case Type::DATE64: - return std::make_shared(arrays); + return std::make_shared(std::move(arrays)); // promotions to integer vector case Type::INT8: return std::make_shared>( - arrays); + std::move(arrays)); case Type::UINT8: return std::make_shared>( - arrays); + std::move(arrays)); case Type::INT16: return std::make_shared>( - arrays); + std::move(arrays)); case Type::UINT16: return std::make_shared>( - arrays); + std::move(arrays)); // promotions to numeric vector case Type::UINT32: return std::make_shared>( - arrays); + std::move(arrays)); case Type::HALF_FLOAT: return std::make_shared< - arrow::r::Converter_Promotion>(arrays); + arrow::r::Converter_Promotion>( + std::move(arrays)); case Type::FLOAT: return std::make_shared>( - arrays); + std::move(arrays)); // time32 ane time64 case Type::TIME32: - return std::make_shared>(arrays); + return std::make_shared>(std::move(arrays)); case Type::TIME64: - return std::make_shared>(arrays); + return std::make_shared>(std::move(arrays)); case Type::TIMESTAMP: - return std::make_shared>(arrays); + return std::make_shared>(std::move(arrays)); case Type::INT64: - return std::make_shared(arrays); + return std::make_shared(std::move(arrays)); case Type::DECIMAL: - return std::make_shared(arrays); + return std::make_shared(std::move(arrays)); // nested case Type::STRUCT: - return std::make_shared(arrays); + return std::make_shared(std::move(arrays)); case Type::LIST: - return std::make_shared(arrays); + return std::make_shared(std::move(arrays)); case Type::NA: - return std::make_shared(arrays); + return std::make_shared(std::move(arrays)); default: break; @@ -817,12 +822,12 @@ Rcpp::List to_dataframe_parallel( // [[arrow::export]] SEXP Array__as_vector(const std::shared_ptr& array) { - return arrow::r::ArrayVector__as_vector(array->length(), {array}); + return arrow::r::ArrayVector__as_vector(array->length(), array->type(), {array}); } // [[arrow::export]] SEXP ChunkedArray__as_vector(const std::shared_ptr& chunked_array) { - return arrow::r::ArrayVector__as_vector(chunked_array->length(), + return arrow::r::ArrayVector__as_vector(chunked_array->length(), chunked_array->type(), chunked_array->chunks()); } @@ -838,7 +843,7 @@ Rcpp::List RecordBatch__to_dataframe(const std::shared_ptr& for (int64_t i = 0; i < nc; i++) { names[i] = batch->column_name(i); arrays[i] = {batch->column(i)}; - converters[i] = arrow::r::Converter::Make(arrays[i]); + converters[i] = arrow::r::Converter::Make(batch->column(i)->type(), arrays[i]); } if (use_threads) { @@ -857,7 +862,8 @@ Rcpp::List Table__to_dataframe(const std::shared_ptr& table, std::vector> converters(nc); for (int64_t i = 0; i < nc; i++) { - converters[i] = arrow::r::Converter::Make(table->column(i)->chunks()); + converters[i] = + arrow::r::Converter::Make(table->column(i)->type(), table->column(i)->chunks()); names[i] = table->field(i)->name(); } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 42ca64910b2..14a805e2733 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -1069,187 +1069,6 @@ RcppExport SEXP _arrow_Table__cast(SEXP table_sexp, SEXP schema_sexp, SEXP optio } #endif -// compute.cpp -#if defined(ARROW_R_WITH_ARROW) -std::shared_ptr Array__TakeChunked(const std::shared_ptr& values, const std::shared_ptr& indices); -RcppExport SEXP _arrow_Array__TakeChunked(SEXP values_sexp, SEXP indices_sexp){ -BEGIN_RCPP - Rcpp::traits::input_parameter&>::type values(values_sexp); - Rcpp::traits::input_parameter&>::type indices(indices_sexp); - return Rcpp::wrap(Array__TakeChunked(values, indices)); -END_RCPP -} -#else -RcppExport SEXP _arrow_Array__TakeChunked(SEXP values_sexp, SEXP indices_sexp){ - Rf_error("Cannot call Array__TakeChunked(). Please use arrow::install_arrow() to install required runtime libraries. "); -} -#endif - -// compute.cpp -#if defined(ARROW_R_WITH_ARROW) -std::shared_ptr RecordBatch__Take(const std::shared_ptr& batch, const std::shared_ptr& indices); -RcppExport SEXP _arrow_RecordBatch__Take(SEXP batch_sexp, SEXP indices_sexp){ -BEGIN_RCPP - Rcpp::traits::input_parameter&>::type batch(batch_sexp); - Rcpp::traits::input_parameter&>::type indices(indices_sexp); - return Rcpp::wrap(RecordBatch__Take(batch, indices)); -END_RCPP -} -#else -RcppExport SEXP _arrow_RecordBatch__Take(SEXP batch_sexp, SEXP indices_sexp){ - Rf_error("Cannot call RecordBatch__Take(). Please use arrow::install_arrow() to install required runtime libraries. "); -} -#endif - -// compute.cpp -#if defined(ARROW_R_WITH_ARROW) -std::shared_ptr ChunkedArray__Take(const std::shared_ptr& values, const std::shared_ptr& indices); -RcppExport SEXP _arrow_ChunkedArray__Take(SEXP values_sexp, SEXP indices_sexp){ -BEGIN_RCPP - Rcpp::traits::input_parameter&>::type values(values_sexp); - Rcpp::traits::input_parameter&>::type indices(indices_sexp); - return Rcpp::wrap(ChunkedArray__Take(values, indices)); -END_RCPP -} -#else -RcppExport SEXP _arrow_ChunkedArray__Take(SEXP values_sexp, SEXP indices_sexp){ - Rf_error("Cannot call ChunkedArray__Take(). Please use arrow::install_arrow() to install required runtime libraries. "); -} -#endif - -// compute.cpp -#if defined(ARROW_R_WITH_ARROW) -std::shared_ptr ChunkedArray__TakeChunked(const std::shared_ptr& values, const std::shared_ptr& indices); -RcppExport SEXP _arrow_ChunkedArray__TakeChunked(SEXP values_sexp, SEXP indices_sexp){ -BEGIN_RCPP - Rcpp::traits::input_parameter&>::type values(values_sexp); - Rcpp::traits::input_parameter&>::type indices(indices_sexp); - return Rcpp::wrap(ChunkedArray__TakeChunked(values, indices)); -END_RCPP -} -#else -RcppExport SEXP _arrow_ChunkedArray__TakeChunked(SEXP values_sexp, SEXP indices_sexp){ - Rf_error("Cannot call ChunkedArray__TakeChunked(). Please use arrow::install_arrow() to install required runtime libraries. "); -} -#endif - -// compute.cpp -#if defined(ARROW_R_WITH_ARROW) -std::shared_ptr Table__Take(const std::shared_ptr& table, const std::shared_ptr& indices); -RcppExport SEXP _arrow_Table__Take(SEXP table_sexp, SEXP indices_sexp){ -BEGIN_RCPP - Rcpp::traits::input_parameter&>::type table(table_sexp); - Rcpp::traits::input_parameter&>::type indices(indices_sexp); - return Rcpp::wrap(Table__Take(table, indices)); -END_RCPP -} -#else -RcppExport SEXP _arrow_Table__Take(SEXP table_sexp, SEXP indices_sexp){ - Rf_error("Cannot call Table__Take(). Please use arrow::install_arrow() to install required runtime libraries. "); -} -#endif - -// compute.cpp -#if defined(ARROW_R_WITH_ARROW) -std::shared_ptr Table__TakeChunked(const std::shared_ptr& table, const std::shared_ptr& indices); -RcppExport SEXP _arrow_Table__TakeChunked(SEXP table_sexp, SEXP indices_sexp){ -BEGIN_RCPP - Rcpp::traits::input_parameter&>::type table(table_sexp); - Rcpp::traits::input_parameter&>::type indices(indices_sexp); - return Rcpp::wrap(Table__TakeChunked(table, indices)); -END_RCPP -} -#else -RcppExport SEXP _arrow_Table__TakeChunked(SEXP table_sexp, SEXP indices_sexp){ - Rf_error("Cannot call Table__TakeChunked(). Please use arrow::install_arrow() to install required runtime libraries. "); -} -#endif - -// compute.cpp -#if defined(ARROW_R_WITH_ARROW) -std::shared_ptr RecordBatch__Filter(const std::shared_ptr& batch, const std::shared_ptr& filter, bool keep_na); -RcppExport SEXP _arrow_RecordBatch__Filter(SEXP batch_sexp, SEXP filter_sexp, SEXP keep_na_sexp){ -BEGIN_RCPP - Rcpp::traits::input_parameter&>::type batch(batch_sexp); - Rcpp::traits::input_parameter&>::type filter(filter_sexp); - Rcpp::traits::input_parameter::type keep_na(keep_na_sexp); - return Rcpp::wrap(RecordBatch__Filter(batch, filter, keep_na)); -END_RCPP -} -#else -RcppExport SEXP _arrow_RecordBatch__Filter(SEXP batch_sexp, SEXP filter_sexp, SEXP keep_na_sexp){ - Rf_error("Cannot call RecordBatch__Filter(). Please use arrow::install_arrow() to install required runtime libraries. "); -} -#endif - -// compute.cpp -#if defined(ARROW_R_WITH_ARROW) -std::shared_ptr ChunkedArray__Filter(const std::shared_ptr& values, const std::shared_ptr& filter, bool keep_na); -RcppExport SEXP _arrow_ChunkedArray__Filter(SEXP values_sexp, SEXP filter_sexp, SEXP keep_na_sexp){ -BEGIN_RCPP - Rcpp::traits::input_parameter&>::type values(values_sexp); - Rcpp::traits::input_parameter&>::type filter(filter_sexp); - Rcpp::traits::input_parameter::type keep_na(keep_na_sexp); - return Rcpp::wrap(ChunkedArray__Filter(values, filter, keep_na)); -END_RCPP -} -#else -RcppExport SEXP _arrow_ChunkedArray__Filter(SEXP values_sexp, SEXP filter_sexp, SEXP keep_na_sexp){ - Rf_error("Cannot call ChunkedArray__Filter(). Please use arrow::install_arrow() to install required runtime libraries. "); -} -#endif - -// compute.cpp -#if defined(ARROW_R_WITH_ARROW) -std::shared_ptr ChunkedArray__FilterChunked(const std::shared_ptr& values, const std::shared_ptr& filter, bool keep_na); -RcppExport SEXP _arrow_ChunkedArray__FilterChunked(SEXP values_sexp, SEXP filter_sexp, SEXP keep_na_sexp){ -BEGIN_RCPP - Rcpp::traits::input_parameter&>::type values(values_sexp); - Rcpp::traits::input_parameter&>::type filter(filter_sexp); - Rcpp::traits::input_parameter::type keep_na(keep_na_sexp); - return Rcpp::wrap(ChunkedArray__FilterChunked(values, filter, keep_na)); -END_RCPP -} -#else -RcppExport SEXP _arrow_ChunkedArray__FilterChunked(SEXP values_sexp, SEXP filter_sexp, SEXP keep_na_sexp){ - Rf_error("Cannot call ChunkedArray__FilterChunked(). Please use arrow::install_arrow() to install required runtime libraries. "); -} -#endif - -// compute.cpp -#if defined(ARROW_R_WITH_ARROW) -std::shared_ptr Table__Filter(const std::shared_ptr& table, const std::shared_ptr& filter, bool keep_na); -RcppExport SEXP _arrow_Table__Filter(SEXP table_sexp, SEXP filter_sexp, SEXP keep_na_sexp){ -BEGIN_RCPP - Rcpp::traits::input_parameter&>::type table(table_sexp); - Rcpp::traits::input_parameter&>::type filter(filter_sexp); - Rcpp::traits::input_parameter::type keep_na(keep_na_sexp); - return Rcpp::wrap(Table__Filter(table, filter, keep_na)); -END_RCPP -} -#else -RcppExport SEXP _arrow_Table__Filter(SEXP table_sexp, SEXP filter_sexp, SEXP keep_na_sexp){ - Rf_error("Cannot call Table__Filter(). Please use arrow::install_arrow() to install required runtime libraries. "); -} -#endif - -// compute.cpp -#if defined(ARROW_R_WITH_ARROW) -std::shared_ptr Table__FilterChunked(const std::shared_ptr& table, const std::shared_ptr& filter, bool keep_na); -RcppExport SEXP _arrow_Table__FilterChunked(SEXP table_sexp, SEXP filter_sexp, SEXP keep_na_sexp){ -BEGIN_RCPP - Rcpp::traits::input_parameter&>::type table(table_sexp); - Rcpp::traits::input_parameter&>::type filter(filter_sexp); - Rcpp::traits::input_parameter::type keep_na(keep_na_sexp); - return Rcpp::wrap(Table__FilterChunked(table, filter, keep_na)); -END_RCPP -} -#else -RcppExport SEXP _arrow_Table__FilterChunked(SEXP table_sexp, SEXP filter_sexp, SEXP keep_na_sexp){ - Rf_error("Cannot call Table__FilterChunked(). Please use arrow::install_arrow() to install required runtime libraries. "); -} -#endif - // compute.cpp #if defined(ARROW_R_WITH_ARROW) SEXP compute__CallFunction(std::string func_name, List_ args, List_ options); @@ -6073,17 +5892,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ChunkedArray__cast", (DL_FUNC) &_arrow_ChunkedArray__cast, 3}, { "_arrow_RecordBatch__cast", (DL_FUNC) &_arrow_RecordBatch__cast, 3}, { "_arrow_Table__cast", (DL_FUNC) &_arrow_Table__cast, 3}, - { "_arrow_Array__TakeChunked", (DL_FUNC) &_arrow_Array__TakeChunked, 2}, - { "_arrow_RecordBatch__Take", (DL_FUNC) &_arrow_RecordBatch__Take, 2}, - { "_arrow_ChunkedArray__Take", (DL_FUNC) &_arrow_ChunkedArray__Take, 2}, - { "_arrow_ChunkedArray__TakeChunked", (DL_FUNC) &_arrow_ChunkedArray__TakeChunked, 2}, - { "_arrow_Table__Take", (DL_FUNC) &_arrow_Table__Take, 2}, - { "_arrow_Table__TakeChunked", (DL_FUNC) &_arrow_Table__TakeChunked, 2}, - { "_arrow_RecordBatch__Filter", (DL_FUNC) &_arrow_RecordBatch__Filter, 3}, - { "_arrow_ChunkedArray__Filter", (DL_FUNC) &_arrow_ChunkedArray__Filter, 3}, - { "_arrow_ChunkedArray__FilterChunked", (DL_FUNC) &_arrow_ChunkedArray__FilterChunked, 3}, - { "_arrow_Table__Filter", (DL_FUNC) &_arrow_Table__Filter, 3}, - { "_arrow_Table__FilterChunked", (DL_FUNC) &_arrow_Table__FilterChunked, 3}, { "_arrow_compute__CallFunction", (DL_FUNC) &_arrow_compute__CallFunction, 3}, { "_arrow_csv___ReadOptions__initialize", (DL_FUNC) &_arrow_csv___ReadOptions__initialize, 1}, { "_arrow_csv___ParseOptions__initialize", (DL_FUNC) &_arrow_csv___ParseOptions__initialize, 1}, diff --git a/r/src/compute.cpp b/r/src/compute.cpp index ae2bbe16e36..b83576d3d26 100644 --- a/r/src/compute.cpp +++ b/r/src/compute.cpp @@ -83,132 +83,6 @@ std::shared_ptr Table__cast( return arrow::Table::Make(schema, std::move(columns), table->num_rows()); } -// [[arrow::export]] -std::shared_ptr Array__TakeChunked( - const std::shared_ptr& values, - const std::shared_ptr& indices) { - arrow::compute::TakeOptions options; - return ValueOrStop(arrow::compute::Take(*values, *indices, options)); -} - -// [[arrow::export]] -std::shared_ptr RecordBatch__Take( - const std::shared_ptr& batch, - const std::shared_ptr& indices) { - arrow::compute::TakeOptions options; - return ValueOrStop(arrow::compute::Take(*batch, *indices, options)); -} - -// [[arrow::export]] -std::shared_ptr ChunkedArray__Take( - const std::shared_ptr& values, - const std::shared_ptr& indices) { - arrow::compute::TakeOptions options; - return ValueOrStop(arrow::compute::Take(*values, *indices, options)); -} - -// [[arrow::export]] -std::shared_ptr ChunkedArray__TakeChunked( - const std::shared_ptr& values, - const std::shared_ptr& indices) { - arrow::compute::TakeOptions options; - return ValueOrStop(arrow::compute::Take(*values, *indices, options)); -} - -// [[arrow::export]] -std::shared_ptr Table__Take(const std::shared_ptr& table, - const std::shared_ptr& indices) { - arrow::compute::TakeOptions options; - return ValueOrStop(arrow::compute::Take(*table, *indices, options)); -} - -// [[arrow::export]] -std::shared_ptr Table__TakeChunked( - const std::shared_ptr& table, - const std::shared_ptr& indices) { - arrow::compute::TakeOptions options; - return ValueOrStop(arrow::compute::Take(*table, *indices, options)); -} - -// [[arrow::export]] -std::shared_ptr RecordBatch__Filter( - const std::shared_ptr& batch, - const std::shared_ptr& filter, bool keep_na) { - // Use the EMIT_NULL filter option to match R's behavior in [ - arrow::compute::FilterOptions options; - if (keep_na) { - options.null_selection_behavior = arrow::compute::FilterOptions::EMIT_NULL; - } - arrow::Datum out = ValueOrStop(arrow::compute::Filter(batch, filter, options)); - return out.record_batch(); -} - -// [[arrow::export]] -std::shared_ptr ChunkedArray__Filter( - const std::shared_ptr& values, - const std::shared_ptr& filter, bool keep_na) { - // Use the EMIT_NULL filter option to match R's behavior in [ - arrow::compute::FilterOptions options; - if (keep_na) { - options.null_selection_behavior = arrow::compute::FilterOptions::EMIT_NULL; - } - arrow::Datum out = ValueOrStop(arrow::compute::Filter(values, filter, options)); - return out.chunked_array(); -} - -// [[arrow::export]] -std::shared_ptr ChunkedArray__FilterChunked( - const std::shared_ptr& values, - const std::shared_ptr& filter, bool keep_na) { - // Use the EMIT_NULL filter option to match R's behavior in [ - arrow::compute::FilterOptions options; - if (keep_na) { - options.null_selection_behavior = arrow::compute::FilterOptions::EMIT_NULL; - } - arrow::Datum out = ValueOrStop(arrow::compute::Filter(values, filter, options)); - return out.chunked_array(); -} - -// [[arrow::export]] -std::shared_ptr Table__Filter(const std::shared_ptr& table, - const std::shared_ptr& filter, - bool keep_na) { - // Use the EMIT_NULL filter option to match R's behavior in [ - arrow::compute::FilterOptions options; - if (keep_na) { - options.null_selection_behavior = arrow::compute::FilterOptions::EMIT_NULL; - } - arrow::Datum out = ValueOrStop(arrow::compute::Filter(table, filter, options)); - std::shared_ptr tab = out.table(); - if (tab->num_rows() == 0) { - // Slight hack: if there are no rows in the result, instead do a 0-length - // slice so that we get chunked arrays with 1 chunk (itself length 0). - // We need that because the Arrow-to-R converter fails when there are 0 chunks. - return table->Slice(0, 0); - } - return tab; -} - -// [[arrow::export]] -std::shared_ptr Table__FilterChunked( - const std::shared_ptr& table, - const std::shared_ptr& filter, bool keep_na) { - // Use the EMIT_NULL filter option to match R's behavior in [ - arrow::compute::FilterOptions options; - if (keep_na) { - options.null_selection_behavior = arrow::compute::FilterOptions::EMIT_NULL; - } - arrow::Datum out = ValueOrStop(arrow::compute::Filter(table, filter, options)); - std::shared_ptr tab = out.table(); - if (tab->num_rows() == 0) { - // Slight hack: if there are no rows in the result, instead do a 0-length - // slice so that we get chunked arrays with 1 chunk (itself length 0). - // We need that because the Arrow-to-R converter fails when there are 0 chunks. - return table->Slice(0, 0); - } - return tab; -} - template std::shared_ptr MaybeUnbox(const char* class_name, SEXP x) { if (Rf_inherits(x, "ArrowObject") && Rf_inherits(x, class_name)) { From 5d84b2cccf901833b7d7f2270973f429ab97a3ba Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 2 Jun 2020 10:19:13 -0500 Subject: [PATCH 2/3] Address Joris review comments, rebase --- cpp/src/arrow/compute/api_vector.h | 9 +++++++++ cpp/src/arrow/compute/function.h | 3 +++ .../arrow/compute/kernels/vector_filter.cc | 3 ++- cpp/src/arrow/compute/kernels/vector_take.cc | 7 ++----- python/pyarrow/compute.py | 19 +++++++++++++++++++ 5 files changed, 35 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index e02aad2c56f..84086802f18 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -70,6 +70,15 @@ struct ARROW_EXPORT TakeOptions : public FunctionOptions { /// \brief Take from an array of values at indices in another array /// +/// The output array will be of the same type as the input values +/// array, with elements taken from the values array at the given +/// indices. If an index is null then the taken element will be null. +/// +/// For example given values = ["a", "b", "c", null, "e", "f"] and +/// indices = [2, 1, null, 3], the output will be +/// = [values[2], values[1], null, values[3]] +/// = ["c", "b", null, null] +/// /// \param[in] values datum from which to take /// \param[in] indices which values to take /// \param[in] options options diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index f07cd137b64..cfb52530fae 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -235,6 +235,9 @@ class ARROW_EXPORT ScalarAggregateFunction /// \brief A function that dispatches to other functions. Must override /// Function::Execute. +/// +/// For Array, ChunkedArray, and Scalar Datum kinds, may rely on the execution +/// of concrete Function types, but must handle other Datum kinds on its own. class ARROW_EXPORT MetaFunction : public Function { public: int num_kernels() const override { return 0; } diff --git a/cpp/src/arrow/compute/kernels/vector_filter.cc b/cpp/src/arrow/compute/kernels/vector_filter.cc index 71d6bfc8c1b..67622245745 100644 --- a/cpp/src/arrow/compute/kernels/vector_filter.cc +++ b/cpp/src/arrow/compute/kernels/vector_filter.cc @@ -20,6 +20,7 @@ #include "arrow/compute/api_vector.h" #include "arrow/compute/kernels/common.h" #include "arrow/compute/kernels/vector_selection_internal.h" +#include "arrow/record_batch.h" #include "arrow/result.h" namespace arrow { @@ -236,7 +237,7 @@ void RegisterVectorFilter(FunctionRegistry* registry) { } DCHECK_OK(registry->AddFunction(std::move(filter))); - // Add take metafunction + // Add filter metafunction DCHECK_OK(registry->AddFunction(std::make_shared())); } diff --git a/cpp/src/arrow/compute/kernels/vector_take.cc b/cpp/src/arrow/compute/kernels/vector_take.cc index 127d7e5f768..7531fbac079 100644 --- a/cpp/src/arrow/compute/kernels/vector_take.cc +++ b/cpp/src/arrow/compute/kernels/vector_take.cc @@ -15,15 +15,12 @@ // specific language governing permissions and limitations // under the License. -#include -#include - #include "arrow/array/array_base.h" -#include "arrow/compute/api_vector.h" #include "arrow/array/concatenate.h" -#include "arrow/builder.h" +#include "arrow/compute/api_vector.h" #include "arrow/compute/kernels/common.h" #include "arrow/compute/kernels/vector_selection_internal.h" +#include "arrow/record_batch.h" #include "arrow/result.h" namespace arrow { diff --git a/python/pyarrow/compute.py b/python/pyarrow/compute.py index a9c1d0d9a2a..3761c91cc2a 100644 --- a/python/pyarrow/compute.py +++ b/python/pyarrow/compute.py @@ -158,6 +158,11 @@ def take(data, indices): Select values (or records) from array- or table-like data given integer selection indices. + The result will be of the same type(s) as the input, with elements taken + from the input array (or record batch / table fields) at the given + indices. If an index is null then the corresponding value in the output + will be null. + Parameters ---------- data : Array, ChunkedArray, RecordBatch, or Table @@ -167,5 +172,19 @@ def take(data, indices): Returns ------- result : depends on inputs + + Examples + -------- + >>> import pyarrow as pa + >>> arr = pa.array(["a", "b", "c", None, "e", "f"]) + >>> indices = pa.array([0, None, 4, 3]) + >>> arr.take(indices) + + [ + "a", + null, + "e", + null + ] """ return call_function('take', [data, indices]) From 37753d11e52e1ad88670a4bc26b500554bf9b883 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 2 Jun 2020 17:52:18 -0500 Subject: [PATCH 3/3] Add pure virtual MetaFunction::ExecuteImpl as suggested by Ben --- cpp/src/arrow/compute/cast.cc | 5 +++-- cpp/src/arrow/compute/function.cc | 2 +- cpp/src/arrow/compute/function.h | 8 ++++++-- cpp/src/arrow/compute/kernels/vector_filter.cc | 5 +++-- cpp/src/arrow/compute/kernels/vector_take.cc | 5 +++-- 5 files changed, 16 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/compute/cast.cc b/cpp/src/arrow/compute/cast.cc index 59e9dc2f2ea..f8dff9a528b 100644 --- a/cpp/src/arrow/compute/cast.cc +++ b/cpp/src/arrow/compute/cast.cc @@ -60,8 +60,9 @@ class CastMetaFunction : public MetaFunction { public: CastMetaFunction() : MetaFunction("cast", Arity::Unary()) {} - Result Execute(const std::vector& args, const FunctionOptions* options, - ExecContext* ctx) const override { + Result ExecuteImpl(const std::vector& args, + const FunctionOptions* options, + ExecContext* ctx) const override { auto cast_options = static_cast(options); if (cast_options == nullptr || cast_options->to_type == nullptr) { return Status::Invalid( diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 42a2afcbc0a..1d08062b8d1 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -166,7 +166,7 @@ Result ScalarAggregateFunction::DispatchExact( Result MetaFunction::Execute(const std::vector& args, const FunctionOptions* options, ExecContext* ctx) const { - return Status::NotImplemented("Metafunctions must provide their own Execute method"); + return ExecuteImpl(args, options, ctx); } } // namespace compute diff --git a/cpp/src/arrow/compute/function.h b/cpp/src/arrow/compute/function.h index cfb52530fae..d0d28c544b9 100644 --- a/cpp/src/arrow/compute/function.h +++ b/cpp/src/arrow/compute/function.h @@ -233,8 +233,8 @@ class ARROW_EXPORT ScalarAggregateFunction const std::vector& values) const; }; -/// \brief A function that dispatches to other functions. Must override -/// Function::Execute. +/// \brief A function that dispatches to other functions. Must implement +/// MetaFunction::ExecuteImpl. /// /// For Array, ChunkedArray, and Scalar Datum kinds, may rely on the execution /// of concrete Function types, but must handle other Datum kinds on its own. @@ -246,6 +246,10 @@ class ARROW_EXPORT MetaFunction : public Function { ExecContext* ctx) const override; protected: + virtual Result ExecuteImpl(const std::vector& args, + const FunctionOptions* options, + ExecContext* ctx) const = 0; + MetaFunction(std::string name, const Arity& arity) : Function(std::move(name), Function::META, arity) {} }; diff --git a/cpp/src/arrow/compute/kernels/vector_filter.cc b/cpp/src/arrow/compute/kernels/vector_filter.cc index 67622245745..03a725d2bbf 100644 --- a/cpp/src/arrow/compute/kernels/vector_filter.cc +++ b/cpp/src/arrow/compute/kernels/vector_filter.cc @@ -196,8 +196,9 @@ class FilterMetaFunction : public MetaFunction { public: FilterMetaFunction() : MetaFunction("filter", Arity::Binary()) {} - Result Execute(const std::vector& args, const FunctionOptions* options, - ExecContext* ctx) const override { + Result ExecuteImpl(const std::vector& args, + const FunctionOptions* options, + ExecContext* ctx) const override { if (args[0].kind() == Datum::RECORD_BATCH) { auto values_batch = args[0].record_batch(); ARROW_ASSIGN_OR_RAISE( diff --git a/cpp/src/arrow/compute/kernels/vector_take.cc b/cpp/src/arrow/compute/kernels/vector_take.cc index 7531fbac079..1658f965212 100644 --- a/cpp/src/arrow/compute/kernels/vector_take.cc +++ b/cpp/src/arrow/compute/kernels/vector_take.cc @@ -191,8 +191,9 @@ class TakeMetaFunction : public MetaFunction { public: TakeMetaFunction() : MetaFunction("take", Arity::Binary()) {} - Result Execute(const std::vector& args, const FunctionOptions* options, - ExecContext* ctx) const override { + Result ExecuteImpl(const std::vector& args, + const FunctionOptions* options, + ExecContext* ctx) const override { Datum::Kind index_kind = args[1].kind(); const TakeOptions& take_opts = static_cast(*options); switch (args[0].kind()) {