From cd6550b9c8e88b1570a6ad0e2142f38e9ef2f11e Mon Sep 17 00:00:00 2001 From: JabariBooker Date: Thu, 17 Feb 2022 11:12:13 -0500 Subject: [PATCH 01/67] Creating new kernel for cumulative sum --- .../compute/kernels/vector_cumulative_sum.cc | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc diff --git a/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc new file mode 100644 index 00000000000..9add6c7d2bd --- /dev/null +++ b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/array/array_base.h" +#include "arrow/compute/kernels/common.h" +#include "arrow/result.h" +#include "arrow/visit_type_inline.h" + +namespace arrow { +namespace compute { +namespace internal { +namespace { + +Status CumulativeSum(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const ArrayData& array = *batch[0].array(); + ArrayData* output = out->array().get(); + output->length = array.length; + + return Status::OK(); +} + +void RegisterVectorCumulativeSum(FunctionRegistry* registry) { + +} + +} + +} // namespace internal +} // namespace compute +} // namespace arrow \ No newline at end of file From 067f846b1cc12b67e596a3823ff158e690fc7043 Mon Sep 17 00:00:00 2001 From: JabariBooker Date: Thu, 24 Feb 2022 21:28:55 -0500 Subject: [PATCH 02/67] Now made a scalar function; Created struct to hold exec and kernel signature generation --- .../compute/kernels/scalar_cumulative_sum.cc | 88 +++++++++++++++++++ .../compute/kernels/vector_cumulative_sum.cc | 44 ---------- 2 files changed, 88 insertions(+), 44 deletions(-) create mode 100644 cpp/src/arrow/compute/kernels/scalar_cumulative_sum.cc delete mode 100644 cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc diff --git a/cpp/src/arrow/compute/kernels/scalar_cumulative_sum.cc b/cpp/src/arrow/compute/kernels/scalar_cumulative_sum.cc new file mode 100644 index 00000000000..cbd8ed3c416 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/scalar_cumulative_sum.cc @@ -0,0 +1,88 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/array/array_base.h" +#include "arrow/compute/kernels/common.h" +#include "arrow/result.h" +#include "arrow/visit_type_inline.h" + +namespace arrow { +namespace compute { +namespace internal { +namespace { + +template +struct CumulativeSumFunctor { + Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + using CType = TypeTraits::CType; + + const ArrayData& array = *batch[0].array(); + auto data = array.GetValues(1, array.offset); + + const NumericScalar& start = checked_cast(*batch[1].scalar()); + CType sum = start.value; + + ArrayData* output = out->array().get(); + CType* out_values = checked_cast(output->buffers[1]->mutable_data()); + output->length = array.length; + + for(size_t i = array.offset; i < array.length; ++i) { + sum += data[i]; + out_values[i] = sum; + } + + return Status::OK(); + } + + static std::shared_ptr GetSignature(detail::GetTypeId get_id) { + return KernelSignature::Make( + {InputType::Array(get_id.id), InputType::Scalar(get_id.id), OutputType(FirstType)); + } +}; + +const FunctionDoc cumulative_sum_doc( + "Compute the cumulative sum of an array", + ("Return an array containing the result of the cumulative sum\n" + "computed over the input array"), + {"values", "starting sum"}); + +void RegisterVectorCumulativeSum(FunctionRegistry* registry) { + + auto add_kernel = [&](detail::GetTypeId get_id, ArrayKernelExec exec) { + ScalarKernel kernel; + kernel.mem_allocation = MemAllocation::type::PREALLOCATE; + kernel.signature = CumulativeSumFunctor::GetSignature(get_id.id); + kernel.exec = std::move(exec); + DCHECK_OK(func->AddKernel(std::move(kernel))); + }; + + auto cumulative_sum = + std::make_shared("cumulative_sum", Arity::Binary(), &cumulative_sum_doc); + + for(auto ty : NumericTypes()) { + add_kernel(ty, GenerateTypeAgnosticPrimitive(ty)); + } + + DCHECK_OK(registry->AddFunction(std::move(flatten))); + +} + +} + +} // namespace internal +} // namespace compute +} // namespace arrow \ No newline at end of file diff --git a/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc deleted file mode 100644 index 9add6c7d2bd..00000000000 --- a/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc +++ /dev/null @@ -1,44 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/array/array_base.h" -#include "arrow/compute/kernels/common.h" -#include "arrow/result.h" -#include "arrow/visit_type_inline.h" - -namespace arrow { -namespace compute { -namespace internal { -namespace { - -Status CumulativeSum(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - const ArrayData& array = *batch[0].array(); - ArrayData* output = out->array().get(); - output->length = array.length; - - return Status::OK(); -} - -void RegisterVectorCumulativeSum(FunctionRegistry* registry) { - -} - -} - -} // namespace internal -} // namespace compute -} // namespace arrow \ No newline at end of file From a747cb45884a1524e41d86abaf18f5a65dcfebc7 Mon Sep 17 00:00:00 2001 From: JabariBooker Date: Tue, 1 Mar 2022 23:13:12 -0500 Subject: [PATCH 03/67] Changed handle chunked arrays and arrays with null indicies --- .../compute/kernels/scalar_cumulative_sum.cc | 119 +++++++++++++----- 1 file changed, 89 insertions(+), 30 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/scalar_cumulative_sum.cc b/cpp/src/arrow/compute/kernels/scalar_cumulative_sum.cc index cbd8ed3c416..d02194b5dc6 100644 --- a/cpp/src/arrow/compute/kernels/scalar_cumulative_sum.cc +++ b/cpp/src/arrow/compute/kernels/scalar_cumulative_sum.cc @@ -25,44 +25,103 @@ namespace compute { namespace internal { namespace { +template +CType CumulativeSum(std::shared_ptr& input, ArrayData* output, CType start) { + CType sum = start; + CType* data = checked_cast(input->data()->buffers[1]->data()); + CType* out_values = checked_cast(output->buffers[1]->mutable_data()); + for (size_t i = input->offset; i < input->length; ++i) { + if (input->IsValid(i)) { + sum += data[i]; + out_values[i] = sum; + } + } + + return sum; +} + template struct CumulativeSumFunctor { + using CType = TypeTraits::CType; + Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - using CType = TypeTraits::CType; - - const ArrayData& array = *batch[0].array(); - auto data = array.GetValues(1, array.offset); + const NumericScalar& start_scalar = + checked_cast(*batch[1].scalar()); + CType start = start_scalar.value; - const NumericScalar& start = checked_cast(*batch[1].scalar()); - CType sum = start.value; + switch (batch[0].kind()) { + case Datum::ARRAY: + std::shared_ptr input = batch[0].make_array(); + ArrayData* output = out->array().get(); - ArrayData* output = out->array().get(); - CType* out_values = checked_cast(output->buffers[1]->mutable_data()); - output->length = array.length; + output->length = input->data()->length; + *output->type = std::move(input->type()); + uint8_t* out_bitmap = output->buffers[0]->mutable_data(); + int64_t out_offset = input->offset(); - for(size_t i = array.offset; i < array.length; ++i) { - sum += data[i]; - out_values[i] = sum; + if (input->data()->MayHaveNulls()) { + arrow::internal::CopyBitmap(input->null_bitmap_data(), input->offset(), + input->length(), out_bitmap, out_offset); + output->null_count = input->null_count(); + } else { + bit_util::SetBitsTo(out_bitmap, out_offset, input->length(), true); + output->null_count = 0; + } + + CumulativeSum(input, output, start); + return Status::OK(); + case Datum::CHUNKED_ARRAY: + const auto& input = batch[0].chunked_array(); + + ArrayVector out_chunks; + for (const auto& chunk : input->chunks()) { + auto out_chunk = std::make_shared( + chunk->type(), chunk->length(), chunk->null_count(), chunk->offset()); + + uint8_t* out_chunk_bitmap = out_chunk->buffers[0]->mutable_data(); + if (chunk->data()->MayHaveNulls()) { + arrow::internal::CopyBitmap(chunk->null_bitmap_data(), chunk->offset(), + chunk->length(), out_chunk_bitmap, + out_chunk->offset()); + out_chunk->null_count = chunk->null_count(); + } else { + bit_util::SetBitsTo(out_chunk_bitmap, out_chunk->offset(), chunk->length(), + true); + out_chunk->null_count = 0; + } + + CType last_value = CumulativeSum(chunk, out_chunk, start); + start = last_value; + out_chunks.push_back(MakeArray(std::move(out_chunk))); + } + + *out->chunked_array() = ChunkedArray(out_chunks, input->type()); + return Status::OK(); + default: + return Status::NotImplemented( + "Unsupported input type for function 'cumulative_sum': ", + batch[0].ToString()); } - - return Status::OK(); } static std::shared_ptr GetSignature(detail::GetTypeId get_id) { return KernelSignature::Make( - {InputType::Array(get_id.id), InputType::Scalar(get_id.id), OutputType(FirstType)); + {InputType::Array(get_id.id), InputType::Scalar(get_id.id)}, + OutputType(FirstType)); } }; const FunctionDoc cumulative_sum_doc( - "Compute the cumulative sum of an array", - ("Return an array containing the result of the cumulative sum\n" - "computed over the input array"), - {"values", "starting sum"}); + "Compute the cumulative sum over an array of numbers", + ("`values` must be an array of numeric type values.\n" + "`start` is a single value of the same type.\n" + "Return an array which is the cumulative sum computed over `values`\n" + "`start` is an optional starting sum of computation."), + {"values", "start"}); void RegisterVectorCumulativeSum(FunctionRegistry* registry) { - - auto add_kernel = [&](detail::GetTypeId get_id, ArrayKernelExec exec) { + auto add_kernel = [&](detail::GetTypeId get_id, ArrayKernelExec exec, + std::shared_ptr func) { ScalarKernel kernel; kernel.mem_allocation = MemAllocation::type::PREALLOCATE; kernel.signature = CumulativeSumFunctor::GetSignature(get_id.id); @@ -70,19 +129,19 @@ void RegisterVectorCumulativeSum(FunctionRegistry* registry) { DCHECK_OK(func->AddKernel(std::move(kernel))); }; - auto cumulative_sum = - std::make_shared("cumulative_sum", Arity::Binary(), &cumulative_sum_doc); - - for(auto ty : NumericTypes()) { - add_kernel(ty, GenerateTypeAgnosticPrimitive(ty)); + auto cumulative_sum = std::make_shared( + "cumulative_sum", Arity::Binary(), &cumulative_sum_doc); + + for (auto ty : NumericTypes()) { + add_kernel(ty, GenerateTypeAgnosticPrimitive(ty), + cumulative_sum); } - DCHECK_OK(registry->AddFunction(std::move(flatten))); - + DCHECK_OK(registry->AddFunction(std::move(cumulative_sum))); } -} +} // namespace } // namespace internal } // namespace compute -} // namespace arrow \ No newline at end of file +} // namespace arrow From 30984e1a0f3e5271f1508ba16548f9b5a6d8f76f Mon Sep 17 00:00:00 2001 From: JabariBooker Date: Thu, 3 Mar 2022 23:56:42 -0500 Subject: [PATCH 04/67] Changed to vector function, starting creation corresponding test, included suggested changes --- ...lative_sum.cc => vector_cumulative_sum.cc} | 81 +++++++++++-------- .../kernels/vector_cumulative_sum_test.cc | 54 +++++++++++++ cpp/src/arrow/compute/registry.cc | 1 + cpp/src/arrow/compute/registry_internal.h | 1 + 4 files changed, 103 insertions(+), 34 deletions(-) rename cpp/src/arrow/compute/kernels/{scalar_cumulative_sum.cc => vector_cumulative_sum.cc} (70%) create mode 100644 cpp/src/arrow/compute/kernels/vector_cumulative_sum_test.cc diff --git a/cpp/src/arrow/compute/kernels/scalar_cumulative_sum.cc b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc similarity index 70% rename from cpp/src/arrow/compute/kernels/scalar_cumulative_sum.cc rename to cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc index d02194b5dc6..cff80f886c0 100644 --- a/cpp/src/arrow/compute/kernels/scalar_cumulative_sum.cc +++ b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc @@ -16,6 +16,7 @@ // under the License. #include "arrow/array/array_base.h" +#include "arrow/compute/api_scalar.h" #include "arrow/compute/kernels/common.h" #include "arrow/result.h" #include "arrow/visit_type_inline.h" @@ -23,30 +24,34 @@ namespace arrow { namespace compute { namespace internal { -namespace { - -template -CType CumulativeSum(std::shared_ptr& input, ArrayData* output, CType start) { - CType sum = start; - CType* data = checked_cast(input->data()->buffers[1]->data()); - CType* out_values = checked_cast(output->buffers[1]->mutable_data()); - for (size_t i = input->offset; i < input->length; ++i) { - if (input->IsValid(i)) { - sum += data[i]; - out_values[i] = sum; - } - } - - return sum; -} template -struct CumulativeSumFunctor { +struct CumulativeSum { using CType = TypeTraits::CType; + using ScalarType = TypeTraits::ScalarType; + + CType Sum(ExecContext* ctx, std::shared_ptr& input, ArrayData* output, + CType start) { + CType sum = start; + CType* data = checked_cast(input->data()->buffers[1]->data()); + CType* out_values = checked_cast(output->buffers[1]->mutable_data()); + ArithmeticOptions options; + for (size_t i = input->offset; i < input->length; ++i) { + if (input->IsValid(i)) { + Datum value_datum(data[i]); + Datum sum_datum(sum); + auto result = Add(value_datum, sum_datum, options, ctx); + ScalarType result_scalar = result.ValueOrDie().scalar_as(); + sum = result_scalar.value; + out_values[i] = sum; + } + } + + return sum; + } Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - const NumericScalar& start_scalar = - checked_cast(*batch[1].scalar()); + const ScalarType& start_scalar = checked_cast(*batch[1].scalar()); CType start = start_scalar.value; switch (batch[0].kind()) { @@ -68,7 +73,7 @@ struct CumulativeSumFunctor { output->null_count = 0; } - CumulativeSum(input, output, start); + Sum(ctx->exec_context(), input, output, start); return Status::OK(); case Datum::CHUNKED_ARRAY: const auto& input = batch[0].chunked_array(); @@ -90,7 +95,7 @@ struct CumulativeSumFunctor { out_chunk->null_count = 0; } - CType last_value = CumulativeSum(chunk, out_chunk, start); + CType last_value = Sum(ctx->exec_context(), chunk, out_chunk, start); start = last_value; out_chunks.push_back(MakeArray(std::move(out_chunk))); } @@ -115,33 +120,41 @@ const FunctionDoc cumulative_sum_doc( "Compute the cumulative sum over an array of numbers", ("`values` must be an array of numeric type values.\n" "`start` is a single value of the same type.\n" - "Return an array which is the cumulative sum computed over `values`\n" + "Return an array which is the cumulative sum computed over `values.`\n" + "Null entries remain in place but are not used in calucating sum.\n" "`start` is an optional starting sum of computation."), {"values", "start"}); void RegisterVectorCumulativeSum(FunctionRegistry* registry) { - auto add_kernel = [&](detail::GetTypeId get_id, ArrayKernelExec exec, - std::shared_ptr func) { - ScalarKernel kernel; + auto cumulative_sum = std::make_shared( + "cumulative_sum", Arity::Binary(), &cumulative_sum_doc); + + auto add_kernel = [&](detail::GetTypeId get_id, ArrayKernelExec exec) { + VectorKernel kernel; + kernel.can_execute_chunkwise = true; + kernel.null_handling = NullHandling::type::INTERSECTION; kernel.mem_allocation = MemAllocation::type::PREALLOCATE; - kernel.signature = CumulativeSumFunctor::GetSignature(get_id.id); + kernel.signature = CumulativeSum::GetSignature(get_id.id); kernel.exec = std::move(exec); - DCHECK_OK(func->AddKernel(std::move(kernel))); + DCHECK_OK(cumulative_sum->AddKernel(std::move(kernel))); }; - auto cumulative_sum = std::make_shared( - "cumulative_sum", Arity::Binary(), &cumulative_sum_doc); - for (auto ty : NumericTypes()) { - add_kernel(ty, GenerateTypeAgnosticPrimitive(ty), - cumulative_sum); + add_kernel(ty, GenerateTypeAgnosticPrimitive(ty)); } + for (auto ty : TemporalTypes()) { + add_kernel(ty, GenerateTypeAgnosticPrimitive(ty)); + } + + add_kernel(Type::DURATION, + GenerateTypeAgnosticPrimitive(Type::DURATION)); + add_kernel(Type::INTERVAL_MONTHS, + GenerateTypeAgnosticPrimitive(Type::INTERVAL_MONTHS)); + DCHECK_OK(registry->AddFunction(std::move(cumulative_sum))); } -} // namespace - } // namespace internal } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/vector_cumulative_sum_test.cc b/cpp/src/arrow/compute/kernels/vector_cumulative_sum_test.cc new file mode 100644 index 00000000000..dfeb492cb70 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/vector_cumulative_sum_test.cc @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "arrow/array.h" +#include "arrow/array/builder_decimal.h" +#include "arrow/buffer.h" +#include "arrow/chunked_array.h" +#include "arrow/status.h" +#include "arrow/testing/util.h" +#include "arrow/type.h" +#include "arrow/type_fwd.h" +#include "arrow/type_traits.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/decimal.h" + +#include "arrow/compute/api.h" +#include "arrow/compute/kernels/test_util.h" + +#include "arrow/ipc/json_simple.h" + +namespace arrow { +namespace compute { + +class TestBaseCumulativeSum : public ::testing::Test {} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/registry.cc b/cpp/src/arrow/compute/registry.cc index 8ab83a72e5e..7e1975d3b68 100644 --- a/cpp/src/arrow/compute/registry.cc +++ b/cpp/src/arrow/compute/registry.cc @@ -174,6 +174,7 @@ static std::unique_ptr CreateBuiltInRegistry() { // Vector functions RegisterVectorArraySort(registry.get()); + RegisterVectorCumulativeSum(registry.get()); RegisterVectorHash(registry.get()); RegisterVectorNested(registry.get()); RegisterVectorReplace(registry.get()); diff --git a/cpp/src/arrow/compute/registry_internal.h b/cpp/src/arrow/compute/registry_internal.h index 35f7b079529..38f81e98889 100644 --- a/cpp/src/arrow/compute/registry_internal.h +++ b/cpp/src/arrow/compute/registry_internal.h @@ -43,6 +43,7 @@ void RegisterScalarOptions(FunctionRegistry* registry); // Vector functions void RegisterVectorArraySort(FunctionRegistry* registry); +void RegisterVectorCumulativeSum(FunctionRegistry* registry); void RegisterVectorHash(FunctionRegistry* registry); void RegisterVectorNested(FunctionRegistry* registry); void RegisterVectorReplace(FunctionRegistry* registry); From 59fad235b9fa4f9c8384f82cd9f75ca6858d3077 Mon Sep 17 00:00:00 2001 From: JabariBooker Date: Tue, 8 Mar 2022 23:57:56 -0500 Subject: [PATCH 05/67] Starting value is now part of options; working more adding new test --- cpp/src/arrow/compute/api_vector.cc | 23 +++++++++++++++++++ cpp/src/arrow/compute/api_vector.h | 21 +++++++++++++++++ cpp/src/arrow/compute/kernels/CMakeLists.txt | 1 + .../compute/kernels/vector_cumulative_sum.cc | 9 ++++---- docs/source/python/api/compute.rst | 2 ++ python/pyarrow/_compute.pyx | 20 ++++++++++++++++ python/pyarrow/includes/libarrow.pxd | 5 ++++ python/pyarrow/tests/test_compute.py | 1 + 8 files changed, 77 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/compute/api_vector.cc b/cpp/src/arrow/compute/api_vector.cc index a5cb61d6b55..c624ec0d942 100644 --- a/cpp/src/arrow/compute/api_vector.cc +++ b/cpp/src/arrow/compute/api_vector.cc @@ -135,6 +135,8 @@ static auto kPartitionNthOptionsType = GetFunctionOptionsType( DataMember("k", &SelectKOptions::k), DataMember("sort_keys", &SelectKOptions::sort_keys)); +static auto kCumulativeSumOptionsType = GetFunctionOptionsType( + DataMember("start", &CumulativeSumOptions::start)); } // namespace } // namespace internal @@ -176,6 +178,10 @@ SelectKOptions::SelectKOptions(int64_t k, std::vector sort_keys) sort_keys(std::move(sort_keys)) {} constexpr char SelectKOptions::kTypeName[]; +CumulativeSumOptions::CumulativeSumOptions(std::shared_ptr start) + : FunctionOptions(internal::kCumulativeSumOptionsType), start(std::move(start)) {} +constexpr char CumulativeSumOptions::kTypeName[]; + namespace internal { void RegisterVectorOptions(FunctionRegistry* registry) { DCHECK_OK(registry->AddFunctionOptionsType(kFilterOptionsType)); @@ -185,6 +191,7 @@ void RegisterVectorOptions(FunctionRegistry* registry) { DCHECK_OK(registry->AddFunctionOptionsType(kSortOptionsType)); DCHECK_OK(registry->AddFunctionOptionsType(kPartitionNthOptionsType)); DCHECK_OK(registry->AddFunctionOptionsType(kSelectKOptionsType)); + DCHECK_OK(registry->AddFunctionOptionsType(kCumulativeSumOptionsType)); } } // namespace internal @@ -325,6 +332,22 @@ Result> DropNull(const Array& values, ExecContext* ctx) { return out.make_array(); } +Result> CumulativeSum(const Array& values, + CumulativeSumOptions& options, + ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE(Datum out, + CallFunction("cumulative_sum", {Datum(values)}, &options, ctx)); + return out.make_array(); +} + +Result> CumulativeSum(const ChunkedArray& chunked_array, + CumulativeSumOptions& options, + ExecContext* ctx) { + ARROW_ASSIGN_OR_RAISE( + Datum out, CallFunction("cumulative_sum", {Datum(chunked_array)}, &options, ctx)); + return out.make_array(); +} + // ---------------------------------------------------------------------- // Deprecated functions diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index 9e53cfcf640..02cd0764469 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -188,6 +188,17 @@ class ARROW_EXPORT PartitionNthOptions : public FunctionOptions { NullPlacement null_placement; }; +/// \brief Options for cumulative sum function +class ARROW_EXPORT CumulativeSumOptions : public FunctionOptions { + public: + explicit CumulativeSumOptions(std::shared_ptr start); + CumulativeSumOptions() : CumulativeSumOptions(std::make_shared()) {} + static constexpr char const kTypeName[] = "CumulativeSumOptions"; + + /// Optional starting value for sum computation + std::shared_ptr start; +}; + /// @} /// \brief Filter with a boolean selection filter @@ -522,6 +533,16 @@ Result DictionaryEncode( const DictionaryEncodeOptions& options = DictionaryEncodeOptions::Defaults(), ExecContext* ctx = NULLPTR); +ARROW_EXPORT +Result> CumulativeSum(const Array& values, + CumulativeSumOptions& options, + ExecContext* ctx = NULLPTR); + +ARROW_EXPORT +Result> CumulativeSum(const ChunkedArray& chunked_array, + CumulativeSumOptions& options, + ExecContext* ctx = NULLPTR); + // ---------------------------------------------------------------------- // Deprecated functions diff --git a/cpp/src/arrow/compute/kernels/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt index 93a02cdb1f1..7659a5b5490 100644 --- a/cpp/src/arrow/compute/kernels/CMakeLists.txt +++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt @@ -47,6 +47,7 @@ add_arrow_benchmark(scalar_string_benchmark PREFIX "arrow-compute") add_arrow_compute_test(vector_test SOURCES + vector_cumulative_sum_test.cc vector_hash_test.cc vector_nested_test.cc vector_replace_test.cc diff --git a/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc index cff80f886c0..b3528c754d8 100644 --- a/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc +++ b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc @@ -51,8 +51,8 @@ struct CumulativeSum { } Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - const ScalarType& start_scalar = checked_cast(*batch[1].scalar()); - CType start = start_scalar.value; + const auto& options = OptionsWrapper>::Get(ctx); + CType start = checked_cast(options.start).value; switch (batch[0].kind()) { case Datum::ARRAY: @@ -110,9 +110,7 @@ struct CumulativeSum { } static std::shared_ptr GetSignature(detail::GetTypeId get_id) { - return KernelSignature::Make( - {InputType::Array(get_id.id), InputType::Scalar(get_id.id)}, - OutputType(FirstType)); + return KernelSignature::Make({InputType::Array(get_id.id)}, OutputType(FirstType)); } }; @@ -136,6 +134,7 @@ void RegisterVectorCumulativeSum(FunctionRegistry* registry) { kernel.mem_allocation = MemAllocation::type::PREALLOCATE; kernel.signature = CumulativeSum::GetSignature(get_id.id); kernel.exec = std::move(exec); + kernel.init = OptionsWrapper::Init; DCHECK_OK(cumulative_sum->AddKernel(std::move(kernel))); }; diff --git a/docs/source/python/api/compute.rst b/docs/source/python/api/compute.rst index 579c4dad808..df43bd5e8e0 100644 --- a/docs/source/python/api/compute.rst +++ b/docs/source/python/api/compute.rst @@ -479,6 +479,7 @@ Structural Transforms .. autosummary:: :toctree: ../generated/ + cumulative_sum fill_null_backward fill_null_forward list_element @@ -501,6 +502,7 @@ Compute Options CastOptions CountOptions CountOptions + CumulativeSumOptions DayOfWeekOptions DictionaryEncodeOptions ElementWiseAggregateOptions diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index 2f18ab99866..75bd819d52f 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -1736,6 +1736,26 @@ class PartitionNthOptions(_PartitionNthOptions): self._set_options(pivot, null_placement) +cdef class _CumulativeSumOptions(FunctionOptions): + def _set_options(self, start): + self.wrapped.reset(new CCumulativeSumOptions( + pyarrow_unwrap_scalar(start))) + + +class CumulativeSumOptions(_CumulativeSumOptions): + """ + Options for `cumulative_sum` function. + + Parameters + ---------- + start : Scalar + Optional starting value for sum computation + """ + + def __init__(self, start): + self._set_options(start) + + cdef class _ArraySortOptions(FunctionOptions): def _set_options(self, order, null_placement): self.wrapped.reset(new CArraySortOptions( diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 6246a15fcff..6b370515399 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2243,6 +2243,11 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: int64_t pivot CNullPlacement null_placement + cdef cppclass CCumulativeSumOptions \ + "arrow::compute::CumulativeSumOptions"(CFunctionOptions): + CCumulativeSumOptions(shared_ptr[CScalar] start) + shared_ptr[CScalar] start + cdef cppclass CArraySortOptions \ "arrow::compute::ArraySortOptions"(CFunctionOptions): CArraySortOptions(CSortOrder, CNullPlacement) diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index 46d302c214c..12791a9caea 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -147,6 +147,7 @@ def test_option_class_equality(): pc.NullOptions(), pc.PadOptions(5), pc.PartitionNthOptions(1, null_placement="at_start"), + pc.CumulativeSumOptions(pa.scalar(0)), pc.QuantileOptions(), pc.RandomOptions(10), pc.ReplaceSliceOptions(0, 1, "a"), From 93644f874f395542d26f83922f30b7cd3ed4d026 Mon Sep 17 00:00:00 2001 From: JabariBooker Date: Wed, 9 Mar 2022 12:22:19 -0500 Subject: [PATCH 06/67] Minor changes to cumulative sum kernel --- cpp/src/arrow/compute/api_vector.h | 2 +- .../compute/kernels/vector_cumulative_sum.cc | 38 ++++++++++--------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index 02cd0764469..d2d825d91fb 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -192,7 +192,7 @@ class ARROW_EXPORT PartitionNthOptions : public FunctionOptions { class ARROW_EXPORT CumulativeSumOptions : public FunctionOptions { public: explicit CumulativeSumOptions(std::shared_ptr start); - CumulativeSumOptions() : CumulativeSumOptions(std::make_shared()) {} + CumulativeSumOptions() : CumulativeSumOptions(nullptr) {} static constexpr char const kTypeName[] = "CumulativeSumOptions"; /// Optional starting value for sum computation diff --git a/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc index b3528c754d8..03a987d9be7 100644 --- a/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc +++ b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc @@ -16,7 +16,7 @@ // under the License. #include "arrow/array/array_base.h" -#include "arrow/compute/api_scalar.h" +#include "arrow/compute/api_vector.h" #include "arrow/compute/kernels/common.h" #include "arrow/result.h" #include "arrow/visit_type_inline.h" @@ -52,7 +52,16 @@ struct CumulativeSum { Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { const auto& options = OptionsWrapper>::Get(ctx); - CType start = checked_cast(options.start).value; + std::shared_ptr& start_scalar = options.start; + CType start = 0; + if (start_scalar) { + if (start_scalar->is_valid()) { + if (start_scalar->type()->id() != TypeTraits::type_singleton()->id()) { + return Status::Invalid("Types of array values and starting value do not match."); + } + start = UnboxScalar::Unbox(*start_scalar); + } + } switch (batch[0].kind()) { case Datum::ARRAY: @@ -127,30 +136,23 @@ void RegisterVectorCumulativeSum(FunctionRegistry* registry) { auto cumulative_sum = std::make_shared( "cumulative_sum", Arity::Binary(), &cumulative_sum_doc); - auto add_kernel = [&](detail::GetTypeId get_id, ArrayKernelExec exec) { + std::vector types; + types.insert(types.end(), NumericTypes().begin(), NumericTypes().end()); + types.insert(types.end(), TemporalTypes().begin(), TemporalTypes().end()); + types.push_back(Type::DURATION); + types.push_back(Type::INTERVAL_MONTHS); + + for (auto ty : types) { VectorKernel kernel; kernel.can_execute_chunkwise = true; kernel.null_handling = NullHandling::type::INTERSECTION; kernel.mem_allocation = MemAllocation::type::PREALLOCATE; - kernel.signature = CumulativeSum::GetSignature(get_id.id); - kernel.exec = std::move(exec); + kernel.signature = CumulativeSum::GetSignature(ty.id); + kernel.exec = std::move(GenerateTypeAgnosticPrimitive(ty)); kernel.init = OptionsWrapper::Init; DCHECK_OK(cumulative_sum->AddKernel(std::move(kernel))); - }; - - for (auto ty : NumericTypes()) { - add_kernel(ty, GenerateTypeAgnosticPrimitive(ty)); - } - - for (auto ty : TemporalTypes()) { - add_kernel(ty, GenerateTypeAgnosticPrimitive(ty)); } - add_kernel(Type::DURATION, - GenerateTypeAgnosticPrimitive(Type::DURATION)); - add_kernel(Type::INTERVAL_MONTHS, - GenerateTypeAgnosticPrimitive(Type::INTERVAL_MONTHS)); - DCHECK_OK(registry->AddFunction(std::move(cumulative_sum))); } From dd45c3daf02c55c0fe9d85b4af4674211cb2ac6d Mon Sep 17 00:00:00 2001 From: JabariBooker Date: Fri, 18 Mar 2022 14:48:07 -0400 Subject: [PATCH 07/67] Added option for skipping nulls --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/compute/api_vector.cc | 22 +++++-- cpp/src/arrow/compute/api_vector.h | 15 +++-- .../compute/kernels/vector_cumulative_sum.cc | 63 ++++++++++++------- .../kernels/vector_cumulative_sum_test.cc | 19 +++++- python/pyarrow/_compute.pyx | 10 +-- python/pyarrow/includes/libarrow.pxd | 3 +- python/pyarrow/tests/test_compute.py | 2 +- 8 files changed, 98 insertions(+), 37 deletions(-) diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index e9e826097b3..de67503ad0c 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -436,6 +436,7 @@ if(ARROW_COMPUTE) compute/kernels/scalar_validity.cc compute/kernels/util_internal.cc compute/kernels/vector_array_sort.cc + compute/kernels/vector_cumulative_sum.cc compute/kernels/vector_hash.cc compute/kernels/vector_nested.cc compute/kernels/vector_replace.cc diff --git a/cpp/src/arrow/compute/api_vector.cc b/cpp/src/arrow/compute/api_vector.cc index c624ec0d942..261bb3a2dd1 100644 --- a/cpp/src/arrow/compute/api_vector.cc +++ b/cpp/src/arrow/compute/api_vector.cc @@ -136,7 +136,8 @@ static auto kSelectKOptionsType = GetFunctionOptionsType( DataMember("k", &SelectKOptions::k), DataMember("sort_keys", &SelectKOptions::sort_keys)); static auto kCumulativeSumOptionsType = GetFunctionOptionsType( - DataMember("start", &CumulativeSumOptions::start)); + DataMember("start", &CumulativeSumOptions::start), + DataMember("skip_nulls", &CumulativeSumOptions::skip_nulls)); } // namespace } // namespace internal @@ -178,8 +179,19 @@ SelectKOptions::SelectKOptions(int64_t k, std::vector sort_keys) sort_keys(std::move(sort_keys)) {} constexpr char SelectKOptions::kTypeName[]; -CumulativeSumOptions::CumulativeSumOptions(std::shared_ptr start) - : FunctionOptions(internal::kCumulativeSumOptionsType), start(std::move(start)) {} +CumulativeSumOptions::CumulativeSumOptions(uint64_t start, bool skip_nulls) + : CumulativeSumOptions(std::make_shared(start), skip_nulls) {} + +CumulativeSumOptions::CumulativeSumOptions(int64_t start, bool skip_nulls) + : CumulativeSumOptions(std::make_shared(start), skip_nulls) {} + +CumulativeSumOptions::CumulativeSumOptions(double start, bool skip_nulls) + : CumulativeSumOptions(std::make_shared(start), skip_nulls) {} + +CumulativeSumOptions::CumulativeSumOptions(std::shared_ptr start, bool skip_nulls) + : FunctionOptions(internal::kCumulativeSumOptionsType), + start(std::move(start)), + skip_nulls(skip_nulls) {} constexpr char CumulativeSumOptions::kTypeName[]; namespace internal { @@ -333,7 +345,7 @@ Result> DropNull(const Array& values, ExecContext* ctx) { } Result> CumulativeSum(const Array& values, - CumulativeSumOptions& options, + const CumulativeSumOptions& options, ExecContext* ctx) { ARROW_ASSIGN_OR_RAISE(Datum out, CallFunction("cumulative_sum", {Datum(values)}, &options, ctx)); @@ -341,7 +353,7 @@ Result> CumulativeSum(const Array& values, } Result> CumulativeSum(const ChunkedArray& chunked_array, - CumulativeSumOptions& options, + const CumulativeSumOptions& options, ExecContext* ctx) { ARROW_ASSIGN_OR_RAISE( Datum out, CallFunction("cumulative_sum", {Datum(chunked_array)}, &options, ctx)); diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index d2d825d91fb..74bd9893b00 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -191,12 +191,19 @@ class ARROW_EXPORT PartitionNthOptions : public FunctionOptions { /// \brief Options for cumulative sum function class ARROW_EXPORT CumulativeSumOptions : public FunctionOptions { public: - explicit CumulativeSumOptions(std::shared_ptr start); - CumulativeSumOptions() : CumulativeSumOptions(nullptr) {} + explicit CumulativeSumOptions(uint64_t start = 0, bool skip_nulls = false); + explicit CumulativeSumOptions(int64_t start = 0, bool skip_nulls = false); + explicit CumulativeSumOptions(double start = 0, bool skip_nulls = false); + explicit CumulativeSumOptions(std::shared_ptr start = nullptr, + bool skip_nulls = false); static constexpr char const kTypeName[] = "CumulativeSumOptions"; + static CumulativeSumOptions Defaults() { return CumulativeSumOptions(nullptr); } /// Optional starting value for sum computation std::shared_ptr start; + + /// When false, propagates the first null/NaN encountered + bool skip_nulls = false; }; /// @} @@ -535,12 +542,12 @@ Result DictionaryEncode( ARROW_EXPORT Result> CumulativeSum(const Array& values, - CumulativeSumOptions& options, + const CumulativeSumOptions& options, ExecContext* ctx = NULLPTR); ARROW_EXPORT Result> CumulativeSum(const ChunkedArray& chunked_array, - CumulativeSumOptions& options, + const CumulativeSumOptions& options, ExecContext* ctx = NULLPTR); // ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc index 03a987d9be7..fc74a3b412c 100644 --- a/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc +++ b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc @@ -16,6 +16,7 @@ // under the License. #include "arrow/array/array_base.h" +#include "arrow/compute/api_scalar.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/kernels/common.h" #include "arrow/result.h" @@ -25,23 +26,37 @@ namespace arrow { namespace compute { namespace internal { +namespace { + +std::shared_ptr GetSignature(detail::GetTypeId get_id) { + return KernelSignature::Make({InputType::Array(get_id.id)}, OutputType(FirstType)); +} + +} // namespace + template struct CumulativeSum { - using CType = TypeTraits::CType; - using ScalarType = TypeTraits::ScalarType; + using CType = typename TypeTraits::CType; + using ScalarType = typename TypeTraits::ScalarType; CType Sum(ExecContext* ctx, std::shared_ptr& input, ArrayData* output, - CType start) { + CType start, bool skip_nulls) { CType sum = start; CType* data = checked_cast(input->data()->buffers[1]->data()); CType* out_values = checked_cast(output->buffers[1]->mutable_data()); ArithmeticOptions options; - for (size_t i = input->offset; i < input->length; ++i) { - if (input->IsValid(i)) { + bool set_null = false; + for (size_t i = input->offset; i < input->length(); ++i) { + if (set_null) { + out_values[i] = NULL; + } else if (input->IsNull(i) && !skip_nulls) { + out_values[i] = NULL; + set_null = true; + } else { Datum value_datum(data[i]); Datum sum_datum(sum); auto result = Add(value_datum, sum_datum, options, ctx); - ScalarType result_scalar = result.ValueOrDie().scalar_as(); + ScalarType result_scalar = result.ValueOrDie().scalar_as(); sum = result_scalar.value; out_values[i] = sum; } @@ -50,14 +65,17 @@ struct CumulativeSum { return sum; } - Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { - const auto& options = OptionsWrapper>::Get(ctx); + static Status Exec(KernelContext* ctx, const ExecBatch& batch, Datum* out) { + const auto& options = OptionsWrapper::Get(ctx); std::shared_ptr& start_scalar = options.start; + bool skip_nulls = options.skip_nulls; + CType start = 0; if (start_scalar) { if (start_scalar->is_valid()) { if (start_scalar->type()->id() != TypeTraits::type_singleton()->id()) { - return Status::Invalid("Types of array values and starting value do not match."); + return Status::Invalid( + "Types of array values and starting value do not match."); } start = UnboxScalar::Unbox(*start_scalar); } @@ -82,8 +100,9 @@ struct CumulativeSum { output->null_count = 0; } - Sum(ctx->exec_context(), input, output, start); + Sum(ctx->exec_context(), input, output, start, skip_nulls); return Status::OK(); + break; case Datum::CHUNKED_ARRAY: const auto& input = batch[0].chunked_array(); @@ -104,23 +123,21 @@ struct CumulativeSum { out_chunk->null_count = 0; } - CType last_value = Sum(ctx->exec_context(), chunk, out_chunk, start); + CType last_value = + Sum(ctx->exec_context(), chunk, out_chunk, start, skip_nulls); start = last_value; out_chunks.push_back(MakeArray(std::move(out_chunk))); } *out->chunked_array() = ChunkedArray(out_chunks, input->type()); return Status::OK(); + break; default: return Status::NotImplemented( "Unsupported input type for function 'cumulative_sum': ", batch[0].ToString()); } } - - static std::shared_ptr GetSignature(detail::GetTypeId get_id) { - return KernelSignature::Make({InputType::Array(get_id.id)}, OutputType(FirstType)); - } }; const FunctionDoc cumulative_sum_doc( @@ -136,18 +153,22 @@ void RegisterVectorCumulativeSum(FunctionRegistry* registry) { auto cumulative_sum = std::make_shared( "cumulative_sum", Arity::Binary(), &cumulative_sum_doc); - std::vector types; - types.insert(types.end(), NumericTypes().begin(), NumericTypes().end()); + std::vector> types; + types.insert(types.end(), IntTypes().begin(), IntTypes().end()); + types.insert(types.end(), FloatingPointTypes().begin(), IntTypes().end()); types.insert(types.end(), TemporalTypes().begin(), TemporalTypes().end()); - types.push_back(Type::DURATION); - types.push_back(Type::INTERVAL_MONTHS); + types.push_back(duration(TimeUnit::SECOND)); + types.push_back(duration(TimeUnit::MILLI)); + types.push_back(duration(TimeUnit::MICRO)); + types.push_back(duration(TimeUnit::NANO)); + types.push_back(month_interval()); for (auto ty : types) { VectorKernel kernel; - kernel.can_execute_chunkwise = true; + kernel.can_execute_chunkwise = false; kernel.null_handling = NullHandling::type::INTERSECTION; kernel.mem_allocation = MemAllocation::type::PREALLOCATE; - kernel.signature = CumulativeSum::GetSignature(ty.id); + kernel.signature = GetSignature(ty->id()); kernel.exec = std::move(GenerateTypeAgnosticPrimitive(ty)); kernel.init = OptionsWrapper::Init; DCHECK_OK(cumulative_sum->AddKernel(std::move(kernel))); diff --git a/cpp/src/arrow/compute/kernels/vector_cumulative_sum_test.cc b/cpp/src/arrow/compute/kernels/vector_cumulative_sum_test.cc index dfeb492cb70..825ff83174d 100644 --- a/cpp/src/arrow/compute/kernels/vector_cumulative_sum_test.cc +++ b/cpp/src/arrow/compute/kernels/vector_cumulative_sum_test.cc @@ -32,6 +32,7 @@ #include "arrow/array/builder_decimal.h" #include "arrow/buffer.h" #include "arrow/chunked_array.h" +#include "arrow/compute/api_vector.h" #include "arrow/status.h" #include "arrow/testing/util.h" #include "arrow/type.h" @@ -48,7 +49,23 @@ namespace arrow { namespace compute { -class TestBaseCumulativeSum : public ::testing::Test {} +template +class TestBaseCumulativeSum : public ::testing::Test { + using CType = TypeTraits::CType; + + void AssertValidCumulativeSum(const Array& input, CumulativeSumOptions options) { + ASSERT_OK_AND_ASSIGN(auto result, CumulativeSum(input, options, nullptr)); + } +}; + +template +class TestIntegerCumulativeSum : public TestBaseCumulativeSum {}; + +template +class TestFloatingPointCumulativeSum : public TestBaseCumulativeSum {}; + +template +class TestTemporalCumulativeSum : public TestBaseCumulativeSum {}; } // namespace compute } // namespace arrow diff --git a/python/pyarrow/_compute.pyx b/python/pyarrow/_compute.pyx index 75bd819d52f..dc9cd9dd372 100644 --- a/python/pyarrow/_compute.pyx +++ b/python/pyarrow/_compute.pyx @@ -1737,9 +1737,9 @@ class PartitionNthOptions(_PartitionNthOptions): cdef class _CumulativeSumOptions(FunctionOptions): - def _set_options(self, start): + def _set_options(self, start, skip_nulls): self.wrapped.reset(new CCumulativeSumOptions( - pyarrow_unwrap_scalar(start))) + pyarrow_unwrap_scalar(start), skip_nulls)) class CumulativeSumOptions(_CumulativeSumOptions): @@ -1750,10 +1750,12 @@ class CumulativeSumOptions(_CumulativeSumOptions): ---------- start : Scalar Optional starting value for sum computation + skip_nulls : bool + When false, propagates the first null/NaN encountered """ - def __init__(self, start): - self._set_options(start) + def __init__(self, start=0, skip_nulls=false): + self._set_options(start, skip_nulls) cdef class _ArraySortOptions(FunctionOptions): diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 6b370515399..33c46e56b70 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2245,8 +2245,9 @@ cdef extern from "arrow/compute/api.h" namespace "arrow::compute" nogil: cdef cppclass CCumulativeSumOptions \ "arrow::compute::CumulativeSumOptions"(CFunctionOptions): - CCumulativeSumOptions(shared_ptr[CScalar] start) + CCumulativeSumOptions(shared_ptr[CScalar] start, c_bool skip_nulls) shared_ptr[CScalar] start + c_bool skip_nulls cdef cppclass CArraySortOptions \ "arrow::compute::ArraySortOptions"(CFunctionOptions): diff --git a/python/pyarrow/tests/test_compute.py b/python/pyarrow/tests/test_compute.py index 12791a9caea..14ca2355484 100644 --- a/python/pyarrow/tests/test_compute.py +++ b/python/pyarrow/tests/test_compute.py @@ -147,7 +147,7 @@ def test_option_class_equality(): pc.NullOptions(), pc.PadOptions(5), pc.PartitionNthOptions(1, null_placement="at_start"), - pc.CumulativeSumOptions(pa.scalar(0)), + pc.CumulativeSumOptions(start=0, skip_nulls=False), pc.QuantileOptions(), pc.RandomOptions(10), pc.ReplaceSliceOptions(0, 1, "a"), From 236d56bd7b5424f745f427de90d645dc5cf4af2a Mon Sep 17 00:00:00 2001 From: JabariBooker Date: Tue, 22 Mar 2022 17:09:37 -0400 Subject: [PATCH 08/67] Fixed some errors; reworking addition for cumulative sum --- cpp/src/arrow/compute/api_vector.h | 8 ++-- .../compute/kernels/vector_cumulative_sum.cc | 46 +++++++++++++++---- .../kernels/vector_cumulative_sum_test.cc | 35 ++++++++++---- 3 files changed, 67 insertions(+), 22 deletions(-) diff --git a/cpp/src/arrow/compute/api_vector.h b/cpp/src/arrow/compute/api_vector.h index 74bd9893b00..5742871fd64 100644 --- a/cpp/src/arrow/compute/api_vector.h +++ b/cpp/src/arrow/compute/api_vector.h @@ -191,13 +191,13 @@ class ARROW_EXPORT PartitionNthOptions : public FunctionOptions { /// \brief Options for cumulative sum function class ARROW_EXPORT CumulativeSumOptions : public FunctionOptions { public: - explicit CumulativeSumOptions(uint64_t start = 0, bool skip_nulls = false); - explicit CumulativeSumOptions(int64_t start = 0, bool skip_nulls = false); - explicit CumulativeSumOptions(double start = 0, bool skip_nulls = false); + explicit CumulativeSumOptions(uint64_t start, bool skip_nulls = false); + explicit CumulativeSumOptions(int64_t start, bool skip_nulls = false); + explicit CumulativeSumOptions(double start, bool skip_nulls = false); explicit CumulativeSumOptions(std::shared_ptr start = nullptr, bool skip_nulls = false); static constexpr char const kTypeName[] = "CumulativeSumOptions"; - static CumulativeSumOptions Defaults() { return CumulativeSumOptions(nullptr); } + static CumulativeSumOptions Defaults() { return CumulativeSumOptions(); } /// Optional starting value for sum computation std::shared_ptr start; diff --git a/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc index fc74a3b412c..1df2bcf95e4 100644 --- a/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc +++ b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc @@ -28,6 +28,31 @@ namespace internal { namespace { +struct Add { + template + static constexpr enable_if_floating_value Call(KernelContext*, Arg0 left, Arg1 right, + Status*) { + return left + right; + } + + template + static constexpr enable_if_unsigned_integer_value Call(KernelContext*, Arg0 left, + Arg1 right, Status*) { + return left + right; + } + + template + static constexpr enable_if_signed_integer_value Call(KernelContext*, Arg0 left, + Arg1 right, Status*) { + return arrow::internal::SafeSignedAdd(left, right); + } + + template + static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) { + return left + right; + } +}; + std::shared_ptr GetSignature(detail::GetTypeId get_id) { return KernelSignature::Make({InputType::Array(get_id.id)}, OutputType(FirstType)); } @@ -46,18 +71,19 @@ struct CumulativeSum { CType* out_values = checked_cast(output->buffers[1]->mutable_data()); ArithmeticOptions options; bool set_null = false; - for (size_t i = input->offset; i < input->length(); ++i) { + for (size_t i = input->offset(); i < input->length(); ++i) { if (set_null) { out_values[i] = NULL; } else if (input->IsNull(i) && !skip_nulls) { out_values[i] = NULL; set_null = true; } else { - Datum value_datum(data[i]); - Datum sum_datum(sum); - auto result = Add(value_datum, sum_datum, options, ctx); - ScalarType result_scalar = result.ValueOrDie().scalar_as(); - sum = result_scalar.value; + // Datum value_datum(data[i]); + // Datum sum_datum(sum); + // auto result = Add(value_datum, sum_datum, options, ctx); + // sum = result_scalar.value; + + sum = Add::Call(data[i], sum); out_values[i] = sum; } } @@ -82,7 +108,7 @@ struct CumulativeSum { } switch (batch[0].kind()) { - case Datum::ARRAY: + case Datum::ARRAY: { std::shared_ptr input = batch[0].make_array(); ArrayData* output = out->array().get(); @@ -102,8 +128,8 @@ struct CumulativeSum { Sum(ctx->exec_context(), input, output, start, skip_nulls); return Status::OK(); - break; - case Datum::CHUNKED_ARRAY: + } + case Datum::CHUNKED_ARRAY: { const auto& input = batch[0].chunked_array(); ArrayVector out_chunks; @@ -131,7 +157,7 @@ struct CumulativeSum { *out->chunked_array() = ChunkedArray(out_chunks, input->type()); return Status::OK(); - break; + } default: return Status::NotImplemented( "Unsupported input type for function 'cumulative_sum': ", diff --git a/cpp/src/arrow/compute/kernels/vector_cumulative_sum_test.cc b/cpp/src/arrow/compute/kernels/vector_cumulative_sum_test.cc index 825ff83174d..8ad10be1da2 100644 --- a/cpp/src/arrow/compute/kernels/vector_cumulative_sum_test.cc +++ b/cpp/src/arrow/compute/kernels/vector_cumulative_sum_test.cc @@ -49,23 +49,42 @@ namespace arrow { namespace compute { +using IntegralTypes = testing::Types; + +using FloatingTypes = testing::Types; + +using TimeTypes = testing::Types; + template -class TestBaseCumulativeSum : public ::testing::Test { +class TestCumulativeSum : public ::testing::Test { using CType = TypeTraits::CType; - void AssertValidCumulativeSum(const Array& input, CumulativeSumOptions options) { + protected: + CumulativeSumOptions no_start_no_skip_nulls(); + CumulativeSumOptions has_start_no_skip_nulls(10); + CumulativeSumOptions no_start_skip_nulls(0, true); + CumulativeSumOptions has_start_skip_nulls(10, true); + + void SetUp() override {} + + void AssertValidCumulativeSum(const Array& expected, const Array& input, + const CumulativeSumOptions options) { ASSERT_OK_AND_ASSIGN(auto result, CumulativeSum(input, options, nullptr)); + AssertArraysEqual(expected, *result, false, EqualOptions::Defaults()); } }; -template -class TestIntegerCumulativeSum : public TestBaseCumulativeSum {}; +TYPED_TEST_SUITE(TestCumulativeSumIntegral, IntegralTypes); +TYPED_TEST_SUITE(TestCumulativeSumFloating, FloatingTypes); +TYPED_TEST_SUITE(TestCumulativeSumTemporal, TimeTypes); -template -class TestFloatingPointCumulativeSum : public TestBaseCumulativeSum {}; +TYPED_TEST(TestCumulativeSumIntegral, NoStartNoSkipNulls) {} +TYPED_TEST(TestCumulativeSumIntegral, HasStartNoSkipNulls) {} +TYPED_TEST(TestCumulativeSumIntegral, NoStartSkipNulls) {} +TYPED_TEST(TestCumulativeSumIntegral, HasStartSkipNulls) {} -template -class TestTemporalCumulativeSum : public TestBaseCumulativeSum {}; } // namespace compute } // namespace arrow From 19a9d05bad0d25f4951ea15c4e90a6ccf817393e Mon Sep 17 00:00:00 2001 From: JabariBooker Date: Wed, 23 Mar 2022 14:29:17 -0400 Subject: [PATCH 09/67] Creating separate header for addition kernels --- .../kernels/base_arithmetic_internal.h | 103 ++++++++++++++++++ .../compute/kernels/scalar_arithmetic.cc | 82 +------------- .../compute/kernels/vector_cumulative_sum.cc | 26 +++++ 3 files changed, 130 insertions(+), 81 deletions(-) create mode 100644 cpp/src/arrow/compute/kernels/base_arithmetic_internal.h diff --git a/cpp/src/arrow/compute/kernels/base_arithmetic_internal.h b/cpp/src/arrow/compute/kernels/base_arithmetic_internal.h new file mode 100644 index 00000000000..899f26cd126 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/base_arithmetic_internal.h @@ -0,0 +1,103 @@ +#pragma once + +#include "arrow/compute/api_scalar.h" +#include "arrow/compute/kernels/common.h" +#include "arrow/compute/kernels/util_internal.h" +#include "arrow/type.h" +#include "arrow/type_traits.h" +#include "arrow/util/decimal.h" +#include "arrow/util/int_util_internal.h" +#include "arrow/util/macros.h" + +namespace arrow { + +using internal::AddWithOverflow; + +namespace compute { +namespace internal { + +struct Add { + template + static constexpr enable_if_floating_value Call(KernelContext*, Arg0 left, Arg1 right, + Status*) { + return left + right; + } + + template + static constexpr enable_if_unsigned_integer_value Call(KernelContext*, Arg0 left, + Arg1 right, Status*) { + return left + right; + } + + template + static constexpr enable_if_signed_integer_value Call(KernelContext*, Arg0 left, + Arg1 right, Status*) { + return arrow::internal::SafeSignedAdd(left, right); + } + + template + static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) { + return left + right; + } +}; + +struct AddChecked { + template + static enable_if_integer_value Call(KernelContext*, Arg0 left, Arg1 right, + Status* st) { + static_assert(std::is_same::value && std::is_same::value, ""); + T result = 0; + if (ARROW_PREDICT_FALSE(AddWithOverflow(left, right, &result))) { + *st = Status::Invalid("overflow"); + } + return result; + } + + template + static enable_if_floating_value Call(KernelContext*, Arg0 left, Arg1 right, + Status*) { + static_assert(std::is_same::value && std::is_same::value, ""); + return left + right; + } + + template + static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) { + return left + right; + } +}; + +template +struct AddTimeDuration { + template + static T Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { + T result = + arrow::internal::SafeSignedAdd(static_cast(left), static_cast(right)); + if (result < 0 || multiple <= result) { + *st = Status::Invalid(result, " is not within the acceptable range of ", "[0, ", + multiple, ") s"); + } + return result; + } +}; + +template +struct AddTimeDurationChecked { + template + static T Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { + T result = 0; + if (ARROW_PREDICT_FALSE( + AddWithOverflow(static_cast(left), static_cast(right), &result))) { + *st = Status::Invalid("overflow"); + } + if (result < 0 || multiple <= result) { + *st = Status::Invalid(result, " is not within the acceptable range of ", "[0, ", + multiple, ") s"); + } + return result; + } +}; + +} // namespace internal +} // namespace compute +} // namespace arrow + diff --git a/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc b/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc index bfafb6fcc19..150cd3ead96 100644 --- a/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc +++ b/cpp/src/arrow/compute/kernels/scalar_arithmetic.cc @@ -25,6 +25,7 @@ #include "arrow/compute/api_scalar.h" #include "arrow/compute/kernels/common.h" #include "arrow/compute/kernels/util_internal.h" +#include "arrow/compute/kernels/base_arithmetic_internal.h" #include "arrow/type.h" #include "arrow/type_traits.h" #include "arrow/util/decimal.h" @@ -114,56 +115,6 @@ struct AbsoluteValueChecked { } }; -struct Add { - template - static constexpr enable_if_floating_value Call(KernelContext*, Arg0 left, Arg1 right, - Status*) { - return left + right; - } - - template - static constexpr enable_if_unsigned_integer_value Call(KernelContext*, Arg0 left, - Arg1 right, Status*) { - return left + right; - } - - template - static constexpr enable_if_signed_integer_value Call(KernelContext*, Arg0 left, - Arg1 right, Status*) { - return arrow::internal::SafeSignedAdd(left, right); - } - - template - static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) { - return left + right; - } -}; - -struct AddChecked { - template - static enable_if_integer_value Call(KernelContext*, Arg0 left, Arg1 right, - Status* st) { - static_assert(std::is_same::value && std::is_same::value, ""); - T result = 0; - if (ARROW_PREDICT_FALSE(AddWithOverflow(left, right, &result))) { - *st = Status::Invalid("overflow"); - } - return result; - } - - template - static enable_if_floating_value Call(KernelContext*, Arg0 left, Arg1 right, - Status*) { - static_assert(std::is_same::value && std::is_same::value, ""); - return left + right; - } - - template - static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) { - return left + right; - } -}; - struct Subtract { template static constexpr enable_if_floating_value Call(KernelContext*, Arg0 left, Arg1 right, @@ -240,37 +191,6 @@ struct SubtractCheckedDate32 { } }; -template -struct AddTimeDuration { - template - static T Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { - T result = - arrow::internal::SafeSignedAdd(static_cast(left), static_cast(right)); - if (result < 0 || multiple <= result) { - *st = Status::Invalid(result, " is not within the acceptable range of ", "[0, ", - multiple, ") s"); - } - return result; - } -}; - -template -struct AddTimeDurationChecked { - template - static T Call(KernelContext*, Arg0 left, Arg1 right, Status* st) { - T result = 0; - if (ARROW_PREDICT_FALSE( - AddWithOverflow(static_cast(left), static_cast(right), &result))) { - *st = Status::Invalid("overflow"); - } - if (result < 0 || multiple <= result) { - *st = Status::Invalid(result, " is not within the acceptable range of ", "[0, ", - multiple, ") s"); - } - return result; - } -}; - template struct SubtractTimeDuration { template diff --git a/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc index 1df2bcf95e4..bed009fbf88 100644 --- a/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc +++ b/cpp/src/arrow/compute/kernels/vector_cumulative_sum.cc @@ -19,6 +19,7 @@ #include "arrow/compute/api_scalar.h" #include "arrow/compute/api_vector.h" #include "arrow/compute/kernels/common.h" +#include "arrow/compute/kernels/base_arithmetic_internal.h" #include "arrow/result.h" #include "arrow/visit_type_inline.h" @@ -28,6 +29,7 @@ namespace internal { namespace { + struct Add { template static constexpr enable_if_floating_value Call(KernelContext*, Arg0 left, Arg1 right, @@ -53,6 +55,30 @@ struct Add { } }; +struct AddChecked { + template + static enable_if_integer_value Call(KernelContext*, Arg0 left, Arg1 right, + Status* st) { + static_assert(std::is_same::value && std::is_same::value, ""); + T result = 0; + if (ARROW_PREDICT_FALSE(AddWithOverflow(left, right, &result))) { + *st = Status::Invalid("overflow"); + } + return result; + } + + template + static enable_if_floating_value Call(KernelContext*, Arg0 left, Arg1 right, + Status*) { + static_assert(std::is_same::value && std::is_same::value, ""); + return left + right; + } + + template + static enable_if_decimal_value Call(KernelContext*, Arg0 left, Arg1 right, Status*) { + return left + right; + } +}; std::shared_ptr GetSignature(detail::GetTypeId get_id) { return KernelSignature::Make({InputType::Array(get_id.id)}, OutputType(FirstType)); } From 2c3722f10f0362c75c783199af241b3c1b1f95ad Mon Sep 17 00:00:00 2001 From: JabariBooker Date: Thu, 24 Mar 2022 00:00:37 -0400 Subject: [PATCH 10/67] Creation of CumulativeMeta, simplification of CumulativeSum, and moving add kernel to new internal file --- .../arrow/compute/kernels/codegen_internal.h | 31 +++ .../compute/kernels/scalar_arithmetic.cc | 32 --- .../compute/kernels/vector_cumulative_sum.cc | 184 ++++-------------- 3 files changed, 65 insertions(+), 182 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/codegen_internal.h b/cpp/src/arrow/compute/kernels/codegen_internal.h index ff7b9161fe3..8eb5cba5d39 100644 --- a/cpp/src/arrow/compute/kernels/codegen_internal.h +++ b/cpp/src/arrow/compute/kernels/codegen_internal.h @@ -1134,6 +1134,37 @@ ArrayKernelExec GeneratePhysicalInteger(detail::GetTypeId get_id) { } } +template