From b1c3e7b3e388386362aa106774cb63338ae0eb4b Mon Sep 17 00:00:00 2001 From: benibus Date: Wed, 22 Feb 2023 16:18:18 -0500 Subject: [PATCH 1/6] Split most non-cast kernels from default build --- cpp/src/arrow/CMakeLists.txt | 142 ++++++++++--------- cpp/src/arrow/compute/kernels/CMakeLists.txt | 3 +- cpp/src/arrow/compute/registry.cc | 23 +-- 3 files changed, 88 insertions(+), 80 deletions(-) diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index e0931c19eff..0d7d4f7ef55 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -383,65 +383,87 @@ if(ARROW_CSV) list(APPEND ARROW_TESTING_SRCS csv/test_common.cc) endif() +# Baseline Compute functionality + scalar casts and a few select kernels +list(APPEND + ARROW_SRCS + compute/api_aggregate.cc + compute/api_scalar.cc + compute/api_vector.cc + compute/cast.cc + compute/exec.cc + compute/exec/groupby.cc + compute/exec/accumulation_queue.cc + compute/exec/aggregate_node.cc + compute/exec/asof_join_node.cc + compute/exec/bloom_filter.cc + compute/exec/exec_plan.cc + compute/exec/expression.cc + compute/exec/fetch_node.cc + compute/exec/filter_node.cc + compute/exec/hash_join.cc + compute/exec/hash_join_dict.cc + compute/exec/hash_join_node.cc + compute/exec/key_hash.cc + compute/exec/key_map.cc + compute/exec/map_node.cc + compute/exec/options.cc + compute/exec/order_by_impl.cc + compute/exec/partition_util.cc + compute/exec/project_node.cc + compute/exec/query_context.cc + compute/exec/sink_node.cc + compute/exec/source_node.cc + compute/exec/swiss_join.cc + compute/exec/task_util.cc + compute/exec/tpch_node.cc + compute/exec/union_node.cc + compute/exec/util.cc + compute/function.cc + compute/function_internal.cc + compute/kernel.cc + compute/light_array.cc + compute/ordering.cc + compute/registry.cc + compute/kernels/codegen_internal.cc + compute/kernels/row_encoder.cc + compute/kernels/scalar_cast_boolean.cc + compute/kernels/scalar_cast_dictionary.cc + compute/kernels/scalar_cast_extension.cc + compute/kernels/scalar_cast_internal.cc + compute/kernels/scalar_cast_nested.cc + compute/kernels/scalar_cast_numeric.cc + compute/kernels/scalar_cast_string.cc + compute/kernels/scalar_cast_temporal.cc + compute/kernels/util_internal.cc + compute/kernels/vector_hash.cc + compute/kernels/vector_selection.cc + compute/row/encode_internal.cc + compute/row/compare_internal.cc + compute/row/grouper.cc + compute/row/row_internal.cc) + +append_avx2_src(compute/exec/bloom_filter_avx2.cc) +append_avx2_src(compute/exec/key_hash_avx2.cc) +append_avx2_src(compute/exec/key_map_avx2.cc) +append_avx2_src(compute/exec/swiss_join_avx2.cc) +append_avx2_src(compute/exec/util_avx2.cc) +append_avx2_src(compute/row/compare_internal_avx2.cc) +append_avx2_src(compute/row/encode_internal_avx2.cc) + +list(APPEND ARROW_TESTING_SRCS compute/exec/test_util.cc) + if(ARROW_COMPUTE) + # Include the remaining kernels list(APPEND ARROW_SRCS - compute/api_aggregate.cc - compute/api_scalar.cc - compute/api_vector.cc - compute/cast.cc - compute/exec.cc - compute/exec/groupby.cc - compute/exec/accumulation_queue.cc - compute/exec/aggregate_node.cc - compute/exec/asof_join_node.cc - compute/exec/bloom_filter.cc - compute/exec/exec_plan.cc - compute/exec/expression.cc - compute/exec/fetch_node.cc - compute/exec/filter_node.cc - compute/exec/hash_join.cc - compute/exec/hash_join_dict.cc - compute/exec/hash_join_node.cc - compute/exec/key_hash.cc - compute/exec/key_map.cc - compute/exec/map_node.cc - compute/exec/options.cc - compute/exec/order_by_impl.cc - compute/exec/partition_util.cc - compute/exec/project_node.cc - compute/exec/query_context.cc - compute/exec/sink_node.cc - compute/exec/source_node.cc - compute/exec/swiss_join.cc - compute/exec/task_util.cc - compute/exec/tpch_node.cc - compute/exec/union_node.cc - compute/exec/util.cc - compute/function.cc - compute/function_internal.cc - compute/kernel.cc - compute/light_array.cc - compute/ordering.cc - compute/registry.cc compute/kernels/aggregate_basic.cc compute/kernels/aggregate_mode.cc compute/kernels/aggregate_quantile.cc compute/kernels/aggregate_tdigest.cc compute/kernels/aggregate_var_std.cc - compute/kernels/codegen_internal.cc compute/kernels/hash_aggregate.cc - compute/kernels/row_encoder.cc compute/kernels/scalar_arithmetic.cc compute/kernels/scalar_boolean.cc - compute/kernels/scalar_cast_boolean.cc - compute/kernels/scalar_cast_dictionary.cc - compute/kernels/scalar_cast_extension.cc - compute/kernels/scalar_cast_internal.cc - compute/kernels/scalar_cast_nested.cc - compute/kernels/scalar_cast_numeric.cc - compute/kernels/scalar_cast_string.cc - compute/kernels/scalar_cast_temporal.cc compute/kernels/scalar_compare.cc compute/kernels/scalar_if_else.cc compute/kernels/scalar_nested.cc @@ -453,33 +475,16 @@ if(ARROW_COMPUTE) compute/kernels/scalar_temporal_binary.cc compute/kernels/scalar_temporal_unary.cc compute/kernels/scalar_validity.cc - compute/kernels/util_internal.cc compute/kernels/vector_array_sort.cc compute/kernels/vector_cumulative_ops.cc - compute/kernels/vector_hash.cc compute/kernels/vector_nested.cc compute/kernels/vector_rank.cc compute/kernels/vector_replace.cc compute/kernels/vector_select_k.cc - compute/kernels/vector_selection.cc - compute/kernels/vector_sort.cc - compute/row/encode_internal.cc - compute/row/compare_internal.cc - compute/row/grouper.cc - compute/row/row_internal.cc) + compute/kernels/vector_sort.cc) append_avx2_src(compute/kernels/aggregate_basic_avx2.cc) append_avx512_src(compute/kernels/aggregate_basic_avx512.cc) - - append_avx2_src(compute/exec/bloom_filter_avx2.cc) - append_avx2_src(compute/exec/key_hash_avx2.cc) - append_avx2_src(compute/exec/key_map_avx2.cc) - append_avx2_src(compute/exec/swiss_join_avx2.cc) - append_avx2_src(compute/exec/util_avx2.cc) - append_avx2_src(compute/row/compare_internal_avx2.cc) - append_avx2_src(compute/row/encode_internal_avx2.cc) - - list(APPEND ARROW_TESTING_SRCS compute/exec/test_util.cc) endif() if(ARROW_FILESYSTEM) @@ -821,6 +826,7 @@ add_subdirectory(testing) add_subdirectory(array) add_subdirectory(c) +add_subdirectory(compute) add_subdirectory(io) add_subdirectory(tensor) add_subdirectory(util) @@ -830,10 +836,6 @@ if(ARROW_CSV) add_subdirectory(csv) endif() -if(ARROW_COMPUTE) - add_subdirectory(compute) -endif() - if(ARROW_SUBSTRAIT) add_subdirectory(engine) endif() diff --git a/cpp/src/arrow/compute/kernels/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt index a4d0fc8582f..3aa0c725c13 100644 --- a/cpp/src/arrow/compute/kernels/CMakeLists.txt +++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt @@ -18,10 +18,11 @@ # ---------------------------------------------------------------------- # Scalar kernels +add_arrow_compute_test(scalar_cast_test SOURCES scalar_cast_test.cc test_util.cc) + add_arrow_compute_test(scalar_type_test SOURCES scalar_boolean_test.cc - scalar_cast_test.cc scalar_nested_test.cc scalar_string_test.cc test_util.cc) diff --git a/cpp/src/arrow/compute/registry.cc b/cpp/src/arrow/compute/registry.cc index 9f95290c12a..f91ecfdc2f4 100644 --- a/cpp/src/arrow/compute/registry.cc +++ b/cpp/src/arrow/compute/registry.cc @@ -27,6 +27,7 @@ #include "arrow/compute/function_internal.h" #include "arrow/compute/registry_internal.h" #include "arrow/status.h" +#include "arrow/util/config.h" // For ARROW_COMPUTE #include "arrow/util/logging.h" namespace arrow { @@ -272,10 +273,21 @@ namespace internal { static std::unique_ptr CreateBuiltInRegistry() { auto registry = FunctionRegistry::Make(); + // Register core kernels + RegisterScalarCast(registry.get()); + RegisterVectorHash(registry.get()); + RegisterVectorSelection(registry.get()); + + RegisterScalarOptions(registry.get()); + RegisterVectorOptions(registry.get()); + RegisterAggregateOptions(registry.get()); + +#ifdef ARROW_COMPUTE + // Register additional kernels + // Scalar functions RegisterScalarArithmetic(registry.get()); RegisterScalarBoolean(registry.get()); - RegisterScalarCast(registry.get()); RegisterScalarComparison(registry.get()); RegisterScalarIfElse(registry.get()); RegisterScalarNested(registry.get()); @@ -288,21 +300,15 @@ static std::unique_ptr CreateBuiltInRegistry() { RegisterScalarTemporalUnary(registry.get()); RegisterScalarValidity(registry.get()); - RegisterScalarOptions(registry.get()); - // Vector functions RegisterVectorArraySort(registry.get()); RegisterVectorCumulativeSum(registry.get()); - RegisterVectorHash(registry.get()); RegisterVectorNested(registry.get()); RegisterVectorRank(registry.get()); RegisterVectorReplace(registry.get()); RegisterVectorSelectK(registry.get()); - RegisterVectorSelection(registry.get()); RegisterVectorSort(registry.get()); - RegisterVectorOptions(registry.get()); - // Aggregate functions RegisterHashAggregateBasic(registry.get()); RegisterScalarAggregateBasic(registry.get()); @@ -310,8 +316,7 @@ static std::unique_ptr CreateBuiltInRegistry() { RegisterScalarAggregateQuantile(registry.get()); RegisterScalarAggregateTDigest(registry.get()); RegisterScalarAggregateVariance(registry.get()); - - RegisterAggregateOptions(registry.get()); +#endif return registry; } From 01191ae9755171acd0b6294f8e0bb417126f0f6a Mon Sep 17 00:00:00 2001 From: benibus Date: Wed, 22 Feb 2023 16:25:13 -0500 Subject: [PATCH 2/6] Remove various conditional compilations --- cpp/src/arrow/CMakeLists.txt | 13 +++---------- cpp/src/arrow/array/CMakeLists.txt | 6 +----- cpp/src/arrow/csv/CMakeLists.txt | 8 ++------ cpp/src/arrow/csv/api.h | 5 ----- cpp/src/arrow/public_api_test.cc | 9 +++------ cpp/src/arrow/testing/generator.cc | 6 ++---- cpp/src/arrow/testing/generator.h | 8 ++++---- 7 files changed, 15 insertions(+), 40 deletions(-) diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 0d7d4f7ef55..721812b4c09 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -375,10 +375,8 @@ if(ARROW_CSV) csv/column_decoder.cc csv/options.cc csv/parser.cc - csv/reader.cc) - if(ARROW_COMPUTE) - list(APPEND ARROW_SRCS csv/writer.cc) - endif() + csv/reader.cc + csv/writer.cc) list(APPEND ARROW_TESTING_SRCS csv/test_common.cc) endif() @@ -805,12 +803,7 @@ add_arrow_test(table_test add_arrow_test(tensor_test) add_arrow_test(sparse_tensor_test) -set(STL_TEST_SRCS stl_iterator_test.cc) -if(ARROW_COMPUTE) - # This unit test uses compute code - list(APPEND STL_TEST_SRCS stl_test.cc) -endif() -add_arrow_test(stl_test SOURCES ${STL_TEST_SRCS}) +add_arrow_test(stl_test SOURCES stl_iterator_test.cc stl_test.cc) add_arrow_benchmark(builder_benchmark) add_arrow_benchmark(compare_benchmark) diff --git a/cpp/src/arrow/array/CMakeLists.txt b/cpp/src/arrow/array/CMakeLists.txt index c0fc17687db..d8dc83bb71d 100644 --- a/cpp/src/arrow/array/CMakeLists.txt +++ b/cpp/src/arrow/array/CMakeLists.txt @@ -16,11 +16,7 @@ # under the License. add_arrow_test(concatenate_test) - -if(ARROW_COMPUTE) - # This unit test uses compute code - add_arrow_test(diff_test) -endif() +add_arrow_test(diff_test) # Headers: top level arrow_install_all_headers("arrow/array") diff --git a/cpp/src/arrow/csv/CMakeLists.txt b/cpp/src/arrow/csv/CMakeLists.txt index 00c00a87f54..a112ca423e9 100644 --- a/cpp/src/arrow/csv/CMakeLists.txt +++ b/cpp/src/arrow/csv/CMakeLists.txt @@ -21,12 +21,8 @@ set(CSV_TEST_SRCS column_decoder_test.cc converter_test.cc parser_test.cc - reader_test.cc) - -# Writer depends on compute's cast functionality -if(ARROW_COMPUTE) - list(APPEND CSV_TEST_SRCS writer_test.cc) -endif() + reader_test.cc + writer_test.cc) add_arrow_test(csv-test SOURCES ${CSV_TEST_SRCS}) diff --git a/cpp/src/arrow/csv/api.h b/cpp/src/arrow/csv/api.h index 9f83efab296..4af1835cd70 100644 --- a/cpp/src/arrow/csv/api.h +++ b/cpp/src/arrow/csv/api.h @@ -19,9 +19,4 @@ #include "arrow/csv/options.h" #include "arrow/csv/reader.h" - -// The writer depends on compute module for casting. -#include "arrow/util/config.h" // for ARROW_COMPUTE definition -#ifdef ARROW_COMPUTE #include "arrow/csv/writer.h" -#endif diff --git a/cpp/src/arrow/public_api_test.cc b/cpp/src/arrow/public_api_test.cc index 9abff229508..20de827ced1 100644 --- a/cpp/src/arrow/public_api_test.cc +++ b/cpp/src/arrow/public_api_test.cc @@ -22,13 +22,10 @@ // Include various "api.h" entrypoints and check they don't leak internal symbols -#include "arrow/api.h" // IWYU pragma: keep -#include "arrow/io/api.h" // IWYU pragma: keep -#include "arrow/ipc/api.h" // IWYU pragma: keep - -#ifdef ARROW_COMPUTE +#include "arrow/api.h" // IWYU pragma: keep #include "arrow/compute/api.h" // IWYU pragma: keep -#endif +#include "arrow/io/api.h" // IWYU pragma: keep +#include "arrow/ipc/api.h" // IWYU pragma: keep #ifdef ARROW_CSV #include "arrow/csv/api.h" // IWYU pragma: keep diff --git a/cpp/src/arrow/testing/generator.cc b/cpp/src/arrow/testing/generator.cc index ad0984e4083..fc90a5cd889 100644 --- a/cpp/src/arrow/testing/generator.cc +++ b/cpp/src/arrow/testing/generator.cc @@ -289,7 +289,6 @@ class DataGeneratorImpl : public DataGenerator, return batches; } -#ifdef ARROW_COMPUTE Result<::arrow::compute::ExecBatch> ExecBatch(int64_t num_rows) override { std::vector values; values.reserve(generators_.size()); @@ -318,7 +317,6 @@ class DataGeneratorImpl : public DataGenerator, "exec_batch_source", ::arrow::compute::ExecBatchSourceNodeOptions(schema_, std::move(batches))); } -#endif Result> Table(int64_t rows_per_chunk, int num_chunks = 1) override { @@ -365,7 +363,7 @@ class GTestDataGeneratorImpl : public GTestDataGenerator { target_->RecordBatches(rows_per_batch, num_batches)); return batches; } -#ifdef ARROW_COMPUTE + ::arrow::compute::ExecBatch ExecBatch(int64_t num_rows) override { EXPECT_OK_AND_ASSIGN(auto batch, target_->ExecBatch(num_rows)); return batch; @@ -381,7 +379,7 @@ class GTestDataGeneratorImpl : public GTestDataGenerator { target_->SourceNode(rows_per_batch, num_batches)); return source_node; } -#endif + std::shared_ptr<::arrow::Table> Table(int64_t rows_per_chunk, int num_chunks) override { EXPECT_OK_AND_ASSIGN(auto table, target_->Table(rows_per_chunk, num_chunks)); return table; diff --git a/cpp/src/arrow/testing/generator.h b/cpp/src/arrow/testing/generator.h index 9f02df4505f..ecfc4ee640f 100644 --- a/cpp/src/arrow/testing/generator.h +++ b/cpp/src/arrow/testing/generator.h @@ -252,13 +252,13 @@ class ARROW_TESTING_EXPORT GTestDataGenerator { virtual std::shared_ptr<::arrow::RecordBatch> RecordBatch(int64_t num_rows) = 0; virtual std::vector> RecordBatches( int64_t rows_per_batch, int num_batches) = 0; -#ifdef ARROW_COMPUTE + virtual ::arrow::compute::ExecBatch ExecBatch(int64_t num_rows) = 0; virtual std::vector<::arrow::compute::ExecBatch> ExecBatches(int64_t rows_per_batch, int num_batches) = 0; virtual ::arrow::compute::Declaration SourceNode(int64_t rows_per_batch, int num_batches) = 0; -#endif + virtual std::shared_ptr<::arrow::Table> Table(int64_t rows_per_chunk, int num_chunks = 1) = 0; virtual std::shared_ptr<::arrow::Schema> Schema() = 0; @@ -270,13 +270,13 @@ class ARROW_TESTING_EXPORT DataGenerator { virtual Result> RecordBatch(int64_t num_rows) = 0; virtual Result>> RecordBatches( int64_t rows_per_batch, int num_batches) = 0; -#ifdef ARROW_COMPUTE + virtual Result<::arrow::compute::ExecBatch> ExecBatch(int64_t num_rows) = 0; virtual Result> ExecBatches( int64_t rows_per_batch, int num_batches) = 0; virtual Result<::arrow::compute::Declaration> SourceNode(int64_t rows_per_batch, int num_batches) = 0; -#endif + virtual Result> Table(int64_t rows_per_chunk, int num_chunks = 1) = 0; virtual std::shared_ptr<::arrow::Schema> Schema() = 0; From f6c96ceb2f6da0fd4c393c20846fe99901ebe843 Mon Sep 17 00:00:00 2001 From: benibus Date: Wed, 22 Feb 2023 16:26:19 -0500 Subject: [PATCH 3/6] Update cmake option defs --- cpp/cmake_modules/DefineOptions.cmake | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index 6700a409e1b..1ec1245e7d3 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -296,7 +296,7 @@ takes precedence over ccache if a storage backend is configured" ON) define_option(ARROW_BUILD_UTILITIES "Build Arrow commandline utilities" OFF) - define_option(ARROW_COMPUTE "Build the Arrow Compute Modules" OFF) + define_option(ARROW_COMPUTE "Build all Arrow Compute kernels" OFF) define_option(ARROW_CSV "Build the Arrow CSV Parser Module" OFF) @@ -361,7 +361,6 @@ takes precedence over ccache if a storage backend is configured" ON) "Build the Parquet libraries" OFF DEPENDS - ARROW_COMPUTE ARROW_IPC) define_option(ARROW_ORC From 96fe9afa7042e81711c3b9b81030bc2d6ddfd96b Mon Sep 17 00:00:00 2001 From: benibus Date: Mon, 27 Feb 2023 18:01:32 -0500 Subject: [PATCH 4/6] Refactor scalar_round.cc and scalar_arithmetic.cc --- .../compute/kernels/scalar_arithmetic.cc | 10 - cpp/src/arrow/compute/kernels/scalar_round.cc | 225 +----------------- .../arrow/compute/kernels/util_internal.cc | 14 ++ cpp/src/arrow/compute/kernels/util_internal.h | 19 ++ 4 files changed, 36 insertions(+), 232 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc b/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc index 0021aa11089..249da4758ea 100644 --- a/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc +++ b/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc @@ -761,16 +761,6 @@ struct ArithmeticFloatingPointFunction : public ArithmeticFunction { } }; -// A scalar kernel that ignores (assumed all-null) inputs and returns null. -Status NullToNullExec(KernelContext* ctx, const ExecSpan& batch, ExecResult* out) { - return Status::OK(); -} - -void AddNullExec(ScalarFunction* func) { - std::vector input_types(func->arity().num_args, InputType(Type::NA)); - DCHECK_OK(func->AddKernel(std::move(input_types), OutputType(null()), NullToNullExec)); -} - template std::shared_ptr MakeArithmeticFunction(std::string name, FunctionDoc doc) { diff --git a/cpp/src/arrow/compute/kernels/scalar_round.cc b/cpp/src/arrow/compute/kernels/scalar_round.cc index 41961ad50e5..fc2cb5b8a6e 100644 --- a/cpp/src/arrow/compute/kernels/scalar_round.cc +++ b/cpp/src/arrow/compute/kernels/scalar_round.cc @@ -771,114 +771,6 @@ struct Trunc { } }; -// Generate a kernel given a bitwise arithmetic functor. Assumes the -// functor treats all integer types of equal width identically -template