From 3e9451edaf921c3136a903c052b0d71dac0b0c0f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Tue, 15 Jan 2019 12:50:46 -0500 Subject: [PATCH 01/14] ARROW-4124: Implement Aggregate and Sum kernels --- cpp/src/arrow/CMakeLists.txt | 2 + cpp/src/arrow/compute/kernel.h | 53 +++++++- cpp/src/arrow/compute/kernels/CMakeLists.txt | 4 + cpp/src/arrow/compute/kernels/aggregation.cc | 46 +++++++ cpp/src/arrow/compute/kernels/aggregation.h | 84 +++++++++++++ cpp/src/arrow/compute/kernels/monoid-impl.h | 115 ++++++++++++++++++ cpp/src/arrow/compute/kernels/monoid.h | 85 +++++++++++++ .../arrow/compute/kernels/sum-benchmark.cc | 60 +++++++++ cpp/src/arrow/compute/kernels/sum-test.cc | 77 ++++++++++++ cpp/src/arrow/compute/kernels/sum.cc | 59 +++++++++ cpp/src/arrow/compute/kernels/sum.h | 57 +++++++++ 11 files changed, 637 insertions(+), 5 deletions(-) create mode 100644 cpp/src/arrow/compute/kernels/aggregation.cc create mode 100644 cpp/src/arrow/compute/kernels/aggregation.h create mode 100644 cpp/src/arrow/compute/kernels/monoid-impl.h create mode 100644 cpp/src/arrow/compute/kernels/monoid.h create mode 100644 cpp/src/arrow/compute/kernels/sum-benchmark.cc create mode 100644 cpp/src/arrow/compute/kernels/sum-test.cc create mode 100644 cpp/src/arrow/compute/kernels/sum.cc create mode 100644 cpp/src/arrow/compute/kernels/sum.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index c65824f5385..6b295555dfb 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -130,9 +130,11 @@ if (ARROW_COMPUTE) add_subdirectory(compute) set(ARROW_SRCS ${ARROW_SRCS} compute/context.cc + compute/kernels/aggregation.cc compute/kernels/boolean.cc compute/kernels/cast.cc compute/kernels/hash.cc + compute/kernels/sum.cc compute/kernels/util-internal.cc ) endif() diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index adfea897c88..0839a90eb7f 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -57,9 +57,48 @@ class ARROW_EXPORT OpKernel { /// \brief Placeholder for Scalar values until we implement these struct ARROW_EXPORT Scalar { - ~Scalar() {} - - ARROW_DISALLOW_COPY_AND_ASSIGN(Scalar); + util::variant value; + + Scalar(bool value) : value(value) {} + Scalar(uint8_t value) : value(value) {} + Scalar(int8_t value) : value(value) {} + Scalar(uint16_t value) : value(value) {} + Scalar(int16_t value) : value(value) {} + Scalar(uint32_t value) : value(value) {} + Scalar(int32_t value) : value(value) {} + Scalar(uint64_t value) : value(value) {} + Scalar(int64_t value) : value(value) {} + Scalar(float value) : value(value) {} + Scalar(double value) : value(value) {} + + Type::type kind() const { + switch (this->value.which()) { + case 0: + return Type::BOOL; + case 1: + return Type::UINT8; + case 2: + return Type::INT8; + case 3: + return Type::UINT16; + case 4: + return Type::INT16; + case 5: + return Type::UINT32; + case 6: + return Type::INT32; + case 7: + return Type::UINT64; + case 8: + return Type::INT64; + case 9: + return Type::FLOAT; + case 10: + return Type::DOUBLE; + default: + return Type::NA; + } + } }; /// \class Datum @@ -67,7 +106,7 @@ struct ARROW_EXPORT Scalar { struct ARROW_EXPORT Datum { enum type { NONE, SCALAR, ARRAY, CHUNKED_ARRAY, RECORD_BATCH, TABLE, COLLECTION }; - util::variant, std::shared_ptr, + util::variant, std::shared_ptr, std::shared_ptr, std::shared_ptr, std::vector> value; @@ -75,7 +114,7 @@ struct ARROW_EXPORT Datum { /// \brief Empty datum, to be populated elsewhere Datum() : value(NULLPTR) {} - Datum(const std::shared_ptr& value) // NOLINT implicit conversion + Datum(const Scalar& value) // NOLINT implicit conversion : value(value) {} Datum(const std::shared_ptr& value) // NOLINT implicit conversion : value(value) {} @@ -147,6 +186,10 @@ struct ARROW_EXPORT Datum { return util::get>(this->value); } + Scalar scalar() const { + return util::get(this->value); + } + bool is_arraylike() const { return this->kind() == Datum::ARRAY || this->kind() == Datum::CHUNKED_ARRAY; } diff --git a/cpp/src/arrow/compute/kernels/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt index 4d508aacb99..cadd789a8e9 100644 --- a/cpp/src/arrow/compute/kernels/CMakeLists.txt +++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt @@ -20,3 +20,7 @@ ARROW_INSTALL_ALL_HEADERS("arrow/compute/kernels") ADD_ARROW_TEST(boolean-test PREFIX "arrow-compute") ADD_ARROW_TEST(cast-test PREFIX "arrow-compute") ADD_ARROW_TEST(hash-test PREFIX "arrow-compute") + +# Aggregates +ADD_ARROW_TEST(sum-test PREFIX "arrow-compute") +ADD_ARROW_BENCHMARK(sum-benchmark PREFIX "arrow-compute") diff --git a/cpp/src/arrow/compute/kernels/aggregation.cc b/cpp/src/arrow/compute/kernels/aggregation.cc new file mode 100644 index 00000000000..df81a4cd60e --- /dev/null +++ b/cpp/src/arrow/compute/kernels/aggregation.cc @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/kernels/aggregation.h" + +#include "arrow/compute/kernels/monoid.h" +#include "arrow/status.h" + +namespace arrow { +namespace compute { + +Status AggregateUnaryKernel::Call(FunctionContext* ctx, const Datum& input, Datum* out) { + switch (input.kind()) { + case Datum::ARRAY: + RETURN_NOT_OK(state_->Consume(ctx, *input.make_array())); + break; + case Datum::CHUNKED_ARRAY: { + auto chunked = input.chunked_array(); + for (auto& array : chunked->chunks()) { + RETURN_NOT_OK(state_->Consume(ctx, *array)); + } + } break; + default: + return Status::Invalid( + "Aggregation Kernel expects an array-like (Array or ChunkedArray) datum"); + } + + return state_->Finalize(ctx, out); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregation.h b/cpp/src/arrow/compute/kernels/aggregation.h new file mode 100644 index 00000000000..dbf69f54e80 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/aggregation.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_COMPUTE_KERNELS_AGGREGATION_H +#define ARROW_COMPUTE_KERNELS_AGGREGATION_H + +#include +#include + +#include "arrow/compute/kernel.h" + +namespace arrow { + +class Array; +class Status; + +namespace compute { + +struct Datum; +class FunctionContext; + +/// \class AggregateState +/// \brief Interface for aggregate kernels. +/// +/// An AggregateState separate the concerns of kernel computation and parallel +/// scheduling. +/// +/// \code{.cpp} +/// // Loop can run in parallel. +/// for (array: input.chunks()) { +/// RETURN_NOT_OK(state->Consume(ctx, array)); +/// } +/// +/// return state->Finalize(ctx, out); +/// \endcode +class AggregateState { + public: + /// \brief Consume an array. + /// + /// \param[in] ctx Function context provided by the user. + /// \param[in] input Array to consume. + virtual Status Consume(FunctionContext* ctx, const Array& input) = 0; + + /// \brief Finalize the computation into a Datum. + /// + /// + /// + /// \param[in] ctx Function context provided by the user. + /// \param[out] out The output of the function. + virtual Status Finalize(FunctionContext* ctx, Datum* out) = 0; + + virtual ~AggregateState() {} +}; + + +/// \brief UnaryKernel implemented by an AggregateState +class ARROW_EXPORT AggregateUnaryKernel : public UnaryKernel { + public: + explicit AggregateUnaryKernel(AggregateState* state) : state_(state) {} + + Status Call(FunctionContext* ctx, const Datum& input, Datum* out) override; + + private: + std::unique_ptr state_; +}; + +} // namespace compute +} // namespace arrow + +#endif // ARROW_COMPUTE_KERNELS_AGGREGATION_H diff --git a/cpp/src/arrow/compute/kernels/monoid-impl.h b/cpp/src/arrow/compute/kernels/monoid-impl.h new file mode 100644 index 00000000000..38378b7559f --- /dev/null +++ b/cpp/src/arrow/compute/kernels/monoid-impl.h @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_COMPUTE_KERNELS_MONOID_IMPL_H +#define ARROW_COMPUTE_KERNELS_MONOID_IMPL_H + +#include +#include +#include + +#include "arrow/array.h" +#include "arrow/compute/kernel.h" +#include "arrow/compute/kernels/aggregation.h" +#include "arrow/compute/kernels/monoid.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit-util.h" +#include "arrow/visitor_inline.h" + +namespace arrow { +namespace compute { + +template +class IdentityVisitor { + public: + using ValueType = typename MonoidType::ValueType; + + Status VisitValue(const ValueType& value) { + monoid_ += MonoidType(value); + return Status::OK(); + } + + Status VisitNull() { return Status::OK(); } + + MonoidType Value() const { return monoid_; } + + private: + MonoidType monoid_; +}; + +template > +class MonoidAggregateState : public AggregateState { + public: + Status Consume(FunctionContext* ctx, const Array& input) final { + auto data = input.data(); + MonoidVisitor visitor; + + RETURN_NOT_OK(ArrayDataVisitor::Visit(*data, &visitor)); + + { + // Merging the state must be protected by a mutex for concurrent access. + // The contention should be low assuming that the majority of time is + // spent in the preceding `Visit` call. + std::lock_guard guard(monoid_mutex_); + monoid_ += visitor.Value(); + } + + return Status::OK(); + }; + + Status Finalize(FunctionContext* ctx, Datum* out) final { + *out = Datum(Scalar(monoid_.value())); + return Status::OK(); + } + + private: + std::mutex monoid_mutex_; + MonoidType monoid_; +}; + +#define GET_MONOID_KERNEL_CASE(T, M) \ + case T::type_id: \ + kernel = \ + std::unique_ptr(new AggregateUnaryKernel(new M())); \ + break + +#define DEFINE_GET_MONOID_KERNEL(MonoidStateType) \ + static Status GetMonoidAggregateKernel( \ + FunctionContext* ctx, const DataType& type, \ + std::unique_ptr& kernel) { \ + switch (type.id()) { \ + GET_MONOID_KERNEL_CASE(UInt8Type, MonoidStateType); \ + GET_MONOID_KERNEL_CASE(Int8Type, MonoidStateType); \ + GET_MONOID_KERNEL_CASE(UInt16Type, MonoidStateType); \ + GET_MONOID_KERNEL_CASE(Int16Type, MonoidStateType); \ + GET_MONOID_KERNEL_CASE(UInt32Type, MonoidStateType); \ + GET_MONOID_KERNEL_CASE(Int32Type, MonoidStateType); \ + GET_MONOID_KERNEL_CASE(UInt64Type, MonoidStateType); \ + GET_MONOID_KERNEL_CASE(Int64Type, MonoidStateType); \ + GET_MONOID_KERNEL_CASE(FloatType, MonoidStateType); \ + GET_MONOID_KERNEL_CASE(DoubleType, MonoidStateType); \ + default: \ + return Status::Invalid("Unsupported type ", type.ToString()); \ + } \ + \ + return Status::OK(); \ + } + +} // namespace compute +} // namespace arrow + +#endif // ARROW_COMPUTE_KERNELS_MONOID_IMPL_H diff --git a/cpp/src/arrow/compute/kernels/monoid.h b/cpp/src/arrow/compute/kernels/monoid.h new file mode 100644 index 00000000000..734dd436216 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/monoid.h @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_COMPUTE_KERNELS_MONOID_H +#define ARROW_COMPUTE_KERNELS_MONOID_H + +#include +#include + +namespace arrow { +namespace compute { + +template +class Monoid { + public: + using ValueType = T; + using ThisType = Monoid; + + static constexpr ValueType identity() { return Identity(); } + + Monoid() {} + Monoid(ValueType value) : value_(value) {} + + ThisType operator+(const ThisType& rhs) const { + return BinaryOp(this->value_, rhs.value_); + } + + ThisType& operator+=(const ThisType& rhs) { + this->value_ = BinaryOp(this->value_, rhs.value_); + return *this; + } + + ValueType value() const { return value_; } + + private: + ValueType value_ = identity(); +}; + +template +constexpr T zero() { + return static_cast(0); +}; + +template +constexpr T add(const T& lhs, const T& rhs) { + return lhs + rhs; +} + +template +using SumMonoid = Monoid, zero>; + +template +constexpr T min(const T& lhs, const T& rhs) { + return std::min(lhs, rhs); +} + +template +using MinMonoid = Monoid, std::numeric_limits::max>; + +template +constexpr T max(const T& lhs, const T& rhs) { + return std::max(lhs, rhs); +} + +template +using MaxMonoid = Monoid, std::numeric_limits::lowest>; + +} // namespace compute +} // namespace arrow + +#endif // ARROW_COMPUTE_KERNELS_MONOID_H diff --git a/cpp/src/arrow/compute/kernels/sum-benchmark.cc b/cpp/src/arrow/compute/kernels/sum-benchmark.cc new file mode 100644 index 00000000000..079abd98368 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/sum-benchmark.cc @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" + +#include + +#include "arrow/builder.h" +#include "arrow/memory_pool.h" +#include "arrow/test-util.h" + +#include "arrow/compute/context.h" +#include "arrow/compute/kernel.h" +#include "arrow/compute/kernels/sum.h" + +namespace arrow { +namespace compute { + +static void BenchmarkSum(benchmark::State& state) { // NOLINT non-const reference + const int64_t iterations = 1UL << 12; + + std::vector values; + std::vector is_valid; + for (int64_t i = 0; i < iterations; i++) { + for (int64_t j = 0; j < i; j++) { + is_valid.push_back(true); + values.push_back(j); + } + } + + std::shared_ptr arr; + ArrayFromVector(is_valid, values, &arr); + + FunctionContext ctx; + + while (state.KeepRunning()) { + Datum out; + ABORT_NOT_OK(Sum(&ctx, Datum(arr), &out)); + } + state.SetBytesProcessed(state.iterations() * values.size() * sizeof(int64_t)); +} + +BENCHMARK(BenchmarkSum)->MinTime(1.0)->Unit(benchmark::kMicrosecond); + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/sum-test.cc b/cpp/src/arrow/compute/kernels/sum-test.cc new file mode 100644 index 00000000000..7ce71ab0745 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/sum-test.cc @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "arrow/array.h" +#include "arrow/test-common.h" +#include "arrow/test-util.h" +#include "arrow/type.h" + +#include "arrow/compute/kernel.h" +#include "arrow/compute/kernels/sum.h" +#include "arrow/compute/test-util.h" + +using std::shared_ptr; +using std::vector; + +namespace arrow { +namespace compute { + +template +void CheckSum(FunctionContext* ctx, const Array& input, CType expected) { + Datum result; + ASSERT_OK(Sum(ctx, input, &result)); + + // Ensure Datum is Scalar of proper type. + ASSERT_EQ(result.kind(), Datum::SCALAR); + auto type = TypeTraits::type_singleton(); + ASSERT_EQ(result.scalar().kind(), type->id()); + + ASSERT_EQ(util::get(result.scalar().value), expected); +} + +template +void CheckSum(FunctionContext* ctx, const std::string& json, CType expected) { + Datum result; + auto array = ArrayFromJSON(TypeTraits::type_singleton(), json); + + CheckSum(ctx, *array, expected); +} + +template +class TestSumKernelNumeric : public ComputeFixture, public TestBase {}; + +typedef ::testing::Types + NumericTypes; + +TYPED_TEST_CASE(TestSumKernelNumeric, NumericTypes); + +TYPED_TEST(TestSumKernelNumeric, SimpleSum) { + using CType = typename TypeParam::c_type; + + CheckSum(&this->ctx_, "[0, 1, 2, 3, 4, 5]", 5 * 6 / 2); + + // Avoid this tests for (U)Int8Type + if (sizeof(CType) > 1) + CheckSum(&this->ctx_, "[1000, null, 300, null, 30, null, 7]", 1337); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/sum.cc b/cpp/src/arrow/compute/kernels/sum.cc new file mode 100644 index 00000000000..0d037e062bf --- /dev/null +++ b/cpp/src/arrow/compute/kernels/sum.cc @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// returnGegarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/kernels/sum.h" + +#include "arrow/array.h" +#include "arrow/compute/kernel.h" +#include "arrow/compute/kernels/aggregation.h" +#include "arrow/compute/kernels/monoid.h" +#include "arrow/compute/kernels/monoid_impl.h" +#include "arrow/type_traits.h" +#include "arrow/util/bit-util.h" +#include "arrow/visitor_inline.h" + +namespace arrow { +namespace compute { + +template +using SumMonoidType = MaxMonoid; + +template +using SumAggregateState = MonoidAggregateState>; + +DEFINE_GET_MONOID_KERNEL(SumAggregateState) + +Status Sum(FunctionContext* ctx, const Datum& value, Datum* out) { + std::unique_ptr kernel; + + auto data_type = value.type(); + if (data_type == nullptr) + return Status::Invalid("Datum must be array-like"); + else if (!is_integer(data_type->id()) && !is_floating(data_type->id())) + return Status::Invalid("Datum must contain a NumericType"); + + RETURN_NOT_OK(GetMonoidAggregateKernel(ctx, *data_type, kernel)); + + return kernel->Call(ctx, value, out); +} + +Status Sum(FunctionContext* ctx, const Array& array, Datum* out) { + return Sum(ctx, Datum(array.data()), out); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/sum.h b/cpp/src/arrow/compute/kernels/sum.h new file mode 100644 index 00000000000..ee9e39016c6 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/sum.h @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef ARROW_COMPUTE_KERNELS_SUM_H +#define ARROW_COMPUTE_KERNELS_SUM_H + +#include + +#include "arrow/status.h" +#include "arrow/util/visibility.h" + +namespace arrow { + +class Array; + +namespace compute { + +struct Datum; +class FunctionContext; + +/// \brief Sum values of a numeric array. +/// \param[in] context the FunctionContext +/// \param[in] value datum to sum, expecting Array or ChunkedArray +/// \param[out] out resulting datum +/// +/// \since 0.13.0 +/// \note API not yet finalized +ARROW_EXPORT +Status Sum(FunctionContext* context, const Datum& value, Datum* out); + +/// \brief Sum values of a numeric array. +/// \param[in] context the FunctionContext +/// \param[in] array to sum +/// \param[out] out resulting datum +/// +/// \since 0.13.0 +/// \note API not yet finalized +Status Sum(FunctionContext* context, const Array& value, Datum* out); + +} // namespace compute +} // namespace arrow + +#endif // ARROW_COMPUTE_KERNELS_CAST_H From 92a4e03884da4735b1b6f5b7b722300e2c9608bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Tue, 15 Jan 2019 14:03:06 -0500 Subject: [PATCH 02/14] Fix --- cpp/src/arrow/compute/kernels/sum.cc | 4 ++-- cpp/src/arrow/compute/kernels/sum.h | 2 ++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/sum.cc b/cpp/src/arrow/compute/kernels/sum.cc index 0d037e062bf..466c54f5114 100644 --- a/cpp/src/arrow/compute/kernels/sum.cc +++ b/cpp/src/arrow/compute/kernels/sum.cc @@ -21,7 +21,7 @@ #include "arrow/compute/kernel.h" #include "arrow/compute/kernels/aggregation.h" #include "arrow/compute/kernels/monoid.h" -#include "arrow/compute/kernels/monoid_impl.h" +#include "arrow/compute/kernels/monoid-impl.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" #include "arrow/visitor_inline.h" @@ -30,7 +30,7 @@ namespace arrow { namespace compute { template -using SumMonoidType = MaxMonoid; +using SumMonoidType = SumMonoid; template using SumAggregateState = MonoidAggregateState>; diff --git a/cpp/src/arrow/compute/kernels/sum.h b/cpp/src/arrow/compute/kernels/sum.h index ee9e39016c6..b316ccce287 100644 --- a/cpp/src/arrow/compute/kernels/sum.h +++ b/cpp/src/arrow/compute/kernels/sum.h @@ -33,6 +33,7 @@ struct Datum; class FunctionContext; /// \brief Sum values of a numeric array. +/// /// \param[in] context the FunctionContext /// \param[in] value datum to sum, expecting Array or ChunkedArray /// \param[out] out resulting datum @@ -43,6 +44,7 @@ ARROW_EXPORT Status Sum(FunctionContext* context, const Datum& value, Datum* out); /// \brief Sum values of a numeric array. +/// /// \param[in] context the FunctionContext /// \param[in] array to sum /// \param[out] out resulting datum From f1044bea8fcfabf9eeaba19c06fc943a16d2c1ac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Wed, 23 Jan 2019 11:42:15 -0500 Subject: [PATCH 03/14] Fix warnings --- cpp/src/arrow/compute/kernels/monoid-impl.h | 3 ++- cpp/src/arrow/compute/kernels/monoid.h | 2 +- cpp/src/arrow/compute/kernels/sum.cc | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/monoid-impl.h b/cpp/src/arrow/compute/kernels/monoid-impl.h index 38378b7559f..f150a1dac53 100644 --- a/cpp/src/arrow/compute/kernels/monoid-impl.h +++ b/cpp/src/arrow/compute/kernels/monoid-impl.h @@ -51,7 +51,8 @@ class IdentityVisitor { MonoidType monoid_; }; -template > +template > class MonoidAggregateState : public AggregateState { public: Status Consume(FunctionContext* ctx, const Array& input) final { diff --git a/cpp/src/arrow/compute/kernels/monoid.h b/cpp/src/arrow/compute/kernels/monoid.h index 734dd436216..390d695d74e 100644 --- a/cpp/src/arrow/compute/kernels/monoid.h +++ b/cpp/src/arrow/compute/kernels/monoid.h @@ -57,7 +57,7 @@ constexpr T zero() { template constexpr T add(const T& lhs, const T& rhs) { - return lhs + rhs; + return static_cast(lhs + rhs); } template diff --git a/cpp/src/arrow/compute/kernels/sum.cc b/cpp/src/arrow/compute/kernels/sum.cc index 466c54f5114..3c4941cf4db 100644 --- a/cpp/src/arrow/compute/kernels/sum.cc +++ b/cpp/src/arrow/compute/kernels/sum.cc @@ -20,8 +20,8 @@ #include "arrow/array.h" #include "arrow/compute/kernel.h" #include "arrow/compute/kernels/aggregation.h" -#include "arrow/compute/kernels/monoid.h" #include "arrow/compute/kernels/monoid-impl.h" +#include "arrow/compute/kernels/monoid.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" #include "arrow/visitor_inline.h" From c4b5b214c38d0e98faccaa139a5af961fed1a88a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Wed, 23 Jan 2019 11:45:46 -0500 Subject: [PATCH 04/14] Fix lint issues. --- cpp/src/arrow/compute/kernel.h | 32 ++++++++++----------- cpp/src/arrow/compute/kernels/aggregation.h | 1 - cpp/src/arrow/compute/kernels/monoid-impl.h | 1 + cpp/src/arrow/compute/kernels/monoid.h | 5 ++-- 4 files changed, 20 insertions(+), 19 deletions(-) diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 0839a90eb7f..49ae31413ac 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -57,19 +57,21 @@ class ARROW_EXPORT OpKernel { /// \brief Placeholder for Scalar values until we implement these struct ARROW_EXPORT Scalar { - util::variant value; - - Scalar(bool value) : value(value) {} - Scalar(uint8_t value) : value(value) {} - Scalar(int8_t value) : value(value) {} - Scalar(uint16_t value) : value(value) {} - Scalar(int16_t value) : value(value) {} - Scalar(uint32_t value) : value(value) {} - Scalar(int32_t value) : value(value) {} - Scalar(uint64_t value) : value(value) {} - Scalar(int64_t value) : value(value) {} - Scalar(float value) : value(value) {} - Scalar(double value) : value(value) {} + util::variant + value; + + explicit Scalar(bool value) : value(value) {} + explicit Scalar(uint8_t value) : value(value) {} + explicit Scalar(int8_t value) : value(value) {} + explicit Scalar(uint16_t value) : value(value) {} + explicit Scalar(int16_t value) : value(value) {} + explicit Scalar(uint32_t value) : value(value) {} + explicit Scalar(int32_t value) : value(value) {} + explicit Scalar(uint64_t value) : value(value) {} + explicit Scalar(int64_t value) : value(value) {} + explicit Scalar(float value) : value(value) {} + explicit Scalar(double value) : value(value) {} Type::type kind() const { switch (this->value.which()) { @@ -186,9 +188,7 @@ struct ARROW_EXPORT Datum { return util::get>(this->value); } - Scalar scalar() const { - return util::get(this->value); - } + Scalar scalar() const { return util::get(this->value); } bool is_arraylike() const { return this->kind() == Datum::ARRAY || this->kind() == Datum::CHUNKED_ARRAY; diff --git a/cpp/src/arrow/compute/kernels/aggregation.h b/cpp/src/arrow/compute/kernels/aggregation.h index dbf69f54e80..fdf68bf28ed 100644 --- a/cpp/src/arrow/compute/kernels/aggregation.h +++ b/cpp/src/arrow/compute/kernels/aggregation.h @@ -66,7 +66,6 @@ class AggregateState { virtual ~AggregateState() {} }; - /// \brief UnaryKernel implemented by an AggregateState class ARROW_EXPORT AggregateUnaryKernel : public UnaryKernel { public: diff --git a/cpp/src/arrow/compute/kernels/monoid-impl.h b/cpp/src/arrow/compute/kernels/monoid-impl.h index f150a1dac53..e8b9aff1cd8 100644 --- a/cpp/src/arrow/compute/kernels/monoid-impl.h +++ b/cpp/src/arrow/compute/kernels/monoid-impl.h @@ -20,6 +20,7 @@ #include #include +#include #include #include "arrow/array.h" diff --git a/cpp/src/arrow/compute/kernels/monoid.h b/cpp/src/arrow/compute/kernels/monoid.h index 390d695d74e..07f02f8d1a3 100644 --- a/cpp/src/arrow/compute/kernels/monoid.h +++ b/cpp/src/arrow/compute/kernels/monoid.h @@ -20,6 +20,7 @@ #include #include +#include namespace arrow { namespace compute { @@ -33,7 +34,7 @@ class Monoid { static constexpr ValueType identity() { return Identity(); } Monoid() {} - Monoid(ValueType value) : value_(value) {} + explicit Monoid(ValueType value) : value_(value) {} ThisType operator+(const ThisType& rhs) const { return BinaryOp(this->value_, rhs.value_); @@ -53,7 +54,7 @@ class Monoid { template constexpr T zero() { return static_cast(0); -}; +} template constexpr T add(const T& lhs, const T& rhs) { From 440ed3d99676c5d3a7831dd680fce62d9c7269e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Thu, 24 Jan 2019 09:21:09 -0500 Subject: [PATCH 05/14] Doxygen error --- cpp/src/arrow/compute/kernels/sum.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/kernels/sum.h b/cpp/src/arrow/compute/kernels/sum.h index b316ccce287..4b8ab0bafab 100644 --- a/cpp/src/arrow/compute/kernels/sum.h +++ b/cpp/src/arrow/compute/kernels/sum.h @@ -51,7 +51,7 @@ Status Sum(FunctionContext* context, const Datum& value, Datum* out); /// /// \since 0.13.0 /// \note API not yet finalized -Status Sum(FunctionContext* context, const Array& value, Datum* out); +Status Sum(FunctionContext* context, const Array& array, Datum* out); } // namespace compute } // namespace arrow From d5ea4c8261690ed69f1a33af40bade12252f514f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Thu, 7 Feb 2019 11:58:58 -0500 Subject: [PATCH 06/14] Refactor --- cpp/src/arrow/CMakeLists.txt | 2 +- cpp/src/arrow/compute/kernel.h | 2 + cpp/src/arrow/compute/kernels/aggregate.cc | 64 ++++ cpp/src/arrow/compute/kernels/aggregate.h | 110 ++++++ cpp/src/arrow/compute/kernels/aggregation.cc | 46 --- cpp/src/arrow/compute/kernels/aggregation.h | 83 ----- cpp/src/arrow/compute/kernels/hash-test.cc | 6 +- cpp/src/arrow/compute/kernels/monoid-impl.h | 117 ------ cpp/src/arrow/compute/kernels/monoid.h | 86 ----- .../arrow/compute/kernels/sum-benchmark.cc | 340 +++++++++++++++++- cpp/src/arrow/compute/kernels/sum-test.cc | 93 ++++- cpp/src/arrow/compute/kernels/sum.cc | 166 ++++++++- cpp/src/arrow/compute/kernels/sum.h | 5 + cpp/src/arrow/io/memory-benchmark.cc | 141 ++++++-- cpp/src/arrow/test-random.h | 30 ++ cpp/src/arrow/util/bit-util.h | 5 + 16 files changed, 897 insertions(+), 399 deletions(-) create mode 100644 cpp/src/arrow/compute/kernels/aggregate.cc create mode 100644 cpp/src/arrow/compute/kernels/aggregate.h delete mode 100644 cpp/src/arrow/compute/kernels/aggregation.cc delete mode 100644 cpp/src/arrow/compute/kernels/aggregation.h delete mode 100644 cpp/src/arrow/compute/kernels/monoid-impl.h delete mode 100644 cpp/src/arrow/compute/kernels/monoid.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 6b295555dfb..e78e5d73164 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -130,7 +130,7 @@ if (ARROW_COMPUTE) add_subdirectory(compute) set(ARROW_SRCS ${ARROW_SRCS} compute/context.cc - compute/kernels/aggregation.cc + compute/kernels/aggregate.cc compute/kernels/boolean.cc compute/kernels/cast.cc compute/kernels/hash.cc diff --git a/cpp/src/arrow/compute/kernel.h b/cpp/src/arrow/compute/kernel.h index 49ae31413ac..2270a48c9a9 100644 --- a/cpp/src/arrow/compute/kernel.h +++ b/cpp/src/arrow/compute/kernel.h @@ -190,6 +190,8 @@ struct ARROW_EXPORT Datum { Scalar scalar() const { return util::get(this->value); } + bool is_array() const { return this->kind() == Datum::ARRAY; } + bool is_arraylike() const { return this->kind() == Datum::ARRAY || this->kind() == Datum::CHUNKED_ARRAY; } diff --git a/cpp/src/arrow/compute/kernels/aggregate.cc b/cpp/src/arrow/compute/kernels/aggregate.cc new file mode 100644 index 00000000000..655dfb59734 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/aggregate.cc @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/compute/kernels/aggregate.h" +#include "arrow/compute/context.h" + +namespace arrow { +namespace compute { + +// Helper class that properly invokes destructor when state goes out of scope. +class ManagedAggregateState { + public: + ManagedAggregateState(std::shared_ptr& desc, + std::shared_ptr&& buffer) + : desc_(desc), state_(buffer) { + desc_->New(state_->mutable_data()); + } + + ~ManagedAggregateState() { desc_->Delete(state_->mutable_data()); } + + void* mutable_data() { return state_->mutable_data(); } + + static std::shared_ptr Make( + std::shared_ptr& desc, MemoryPool* pool) { + std::shared_ptr buf; + if (!AllocateBuffer(pool, desc->Size(), &buf).ok()) return nullptr; + + return std::make_shared(desc, std::move(buf)); + } + + private: + std::shared_ptr desc_; + std::shared_ptr state_; +}; + +Status AggregateUnaryKernel::Call(FunctionContext* ctx, const Datum& input, Datum* out) { + if (!input.is_array()) return Status::Invalid("AggregateKernel expects Array datum"); + + auto state = ManagedAggregateState::Make(aggregate_function_, ctx->memory_pool()); + if (!state) return Status::OutOfMemory("AggregateState allocation failed"); + + auto array = input.make_array(); + RETURN_NOT_OK(aggregate_function_->Consume(*array, state->mutable_data())); + RETURN_NOT_OK(aggregate_function_->Finalize(state->mutable_data(), out)); + + return Status::OK(); +} + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregate.h b/cpp/src/arrow/compute/kernels/aggregate.h new file mode 100644 index 00000000000..2998cbccb61 --- /dev/null +++ b/cpp/src/arrow/compute/kernels/aggregate.h @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "arrow/compute/kernel.h" + +namespace arrow { + +class Array; +class Status; + +namespace compute { + +class FunctionContext; +struct Datum; + +/// AggregateFunction is an interface for Aggregates +/// +/// An aggregates transforms an array into single result called a state via the +/// Consume method.. State supports the merge operation via the Merge method. +/// State can be sealed into a final result via the Finalize method. +// +/// State ownership is handled by callers, thus the interface exposes 3 methods +/// for the caller to manage memory: +/// - Size +/// - New (placement new constructor invocation) +/// - Delete (state desctructor) +/// +/// Design borrowed from ClickHouse aggregate functions. +class AggregateFunction { + public: + /// \brief Consume an array into a state. + virtual Status Consume(const Array& input, void* state) const = 0; + + /// \brief Merge states. + virtual Status Merge(const void* src, void* dst) const = 0; + + /// \brief Convert state into a final result. + virtual Status Finalize(const void* src, Datum* output) const = 0; + + /// State management methods. + virtual int64_t Size() const = 0; + virtual void New(void* ptr) const = 0; + virtual void Delete(void* ptr) const = 0; +}; + +/// AggregateFunction partial implementation for static type state +template +class AggregateFunctionStaticState : public AggregateFunction { + virtual Status Consume(const Array& input, State* state) const = 0; + virtual Status Merge(const State& src, State* dst) const = 0; + virtual Status Finalize(const State& src, Datum* output) const = 0; + + Status Consume(const Array& input, void* state) const final { + return Consume(input, static_cast(state)); + } + + Status Merge(const void* src, void* dst) const final { + return Merge(*static_cast(src), static_cast(dst)); + } + + /// \brief Convert state into a final result. + Status Finalize(const void* src, Datum* output) const final { + return Finalize(*static_cast(src), output); + } + + int64_t Size() const final { return sizeof(State); } + + void New(void* ptr) const final { + // By using placement-new syntax, the constructor of the State is invoked + // in the memory location defined by the caller. This only supports State + // with a parameter-less constructor. + new (ptr) State; + } + + void Delete(void* ptr) const final { static_cast(ptr)->~State(); } +}; + +/// \brief UnaryKernel implemented by an AggregateState +class ARROW_EXPORT AggregateUnaryKernel : public UnaryKernel { + public: + explicit AggregateUnaryKernel(std::shared_ptr& aggregate) + : aggregate_function_(aggregate) {} + + Status Call(FunctionContext* ctx, const Datum& input, Datum* out) override; + + private: + std::shared_ptr aggregate_function_; +}; + +} // namespace compute +} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregation.cc b/cpp/src/arrow/compute/kernels/aggregation.cc deleted file mode 100644 index df81a4cd60e..00000000000 --- a/cpp/src/arrow/compute/kernels/aggregation.cc +++ /dev/null @@ -1,46 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "arrow/compute/kernels/aggregation.h" - -#include "arrow/compute/kernels/monoid.h" -#include "arrow/status.h" - -namespace arrow { -namespace compute { - -Status AggregateUnaryKernel::Call(FunctionContext* ctx, const Datum& input, Datum* out) { - switch (input.kind()) { - case Datum::ARRAY: - RETURN_NOT_OK(state_->Consume(ctx, *input.make_array())); - break; - case Datum::CHUNKED_ARRAY: { - auto chunked = input.chunked_array(); - for (auto& array : chunked->chunks()) { - RETURN_NOT_OK(state_->Consume(ctx, *array)); - } - } break; - default: - return Status::Invalid( - "Aggregation Kernel expects an array-like (Array or ChunkedArray) datum"); - } - - return state_->Finalize(ctx, out); -} - -} // namespace compute -} // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/aggregation.h b/cpp/src/arrow/compute/kernels/aggregation.h deleted file mode 100644 index fdf68bf28ed..00000000000 --- a/cpp/src/arrow/compute/kernels/aggregation.h +++ /dev/null @@ -1,83 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef ARROW_COMPUTE_KERNELS_AGGREGATION_H -#define ARROW_COMPUTE_KERNELS_AGGREGATION_H - -#include -#include - -#include "arrow/compute/kernel.h" - -namespace arrow { - -class Array; -class Status; - -namespace compute { - -struct Datum; -class FunctionContext; - -/// \class AggregateState -/// \brief Interface for aggregate kernels. -/// -/// An AggregateState separate the concerns of kernel computation and parallel -/// scheduling. -/// -/// \code{.cpp} -/// // Loop can run in parallel. -/// for (array: input.chunks()) { -/// RETURN_NOT_OK(state->Consume(ctx, array)); -/// } -/// -/// return state->Finalize(ctx, out); -/// \endcode -class AggregateState { - public: - /// \brief Consume an array. - /// - /// \param[in] ctx Function context provided by the user. - /// \param[in] input Array to consume. - virtual Status Consume(FunctionContext* ctx, const Array& input) = 0; - - /// \brief Finalize the computation into a Datum. - /// - /// - /// - /// \param[in] ctx Function context provided by the user. - /// \param[out] out The output of the function. - virtual Status Finalize(FunctionContext* ctx, Datum* out) = 0; - - virtual ~AggregateState() {} -}; - -/// \brief UnaryKernel implemented by an AggregateState -class ARROW_EXPORT AggregateUnaryKernel : public UnaryKernel { - public: - explicit AggregateUnaryKernel(AggregateState* state) : state_(state) {} - - Status Call(FunctionContext* ctx, const Datum& input, Datum* out) override; - - private: - std::unique_ptr state_; -}; - -} // namespace compute -} // namespace arrow - -#endif // ARROW_COMPUTE_KERNELS_AGGREGATION_H diff --git a/cpp/src/arrow/compute/kernels/hash-test.cc b/cpp/src/arrow/compute/kernels/hash-test.cc index f20575f621b..17c8c9b5583 100644 --- a/cpp/src/arrow/compute/kernels/hash-test.cc +++ b/cpp/src/arrow/compute/kernels/hash-test.cc @@ -114,12 +114,8 @@ TYPED_TEST(TestHashKernelPrimitive, DictEncode) { TYPED_TEST(TestHashKernelPrimitive, PrimitiveResizeTable) { using T = typename TypeParam::c_type; - // Skip this test for (u)int8 - if (sizeof(Scalar) == 1) { - return; - } - const int64_t kTotalValues = 1000000; + const int64_t kTotalValues = std::min(INT16_MAX, 1UL << sizeof(T) / 2); const int64_t kRepeats = 5; vector values; diff --git a/cpp/src/arrow/compute/kernels/monoid-impl.h b/cpp/src/arrow/compute/kernels/monoid-impl.h deleted file mode 100644 index e8b9aff1cd8..00000000000 --- a/cpp/src/arrow/compute/kernels/monoid-impl.h +++ /dev/null @@ -1,117 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef ARROW_COMPUTE_KERNELS_MONOID_IMPL_H -#define ARROW_COMPUTE_KERNELS_MONOID_IMPL_H - -#include -#include -#include -#include - -#include "arrow/array.h" -#include "arrow/compute/kernel.h" -#include "arrow/compute/kernels/aggregation.h" -#include "arrow/compute/kernels/monoid.h" -#include "arrow/type_traits.h" -#include "arrow/util/bit-util.h" -#include "arrow/visitor_inline.h" - -namespace arrow { -namespace compute { - -template -class IdentityVisitor { - public: - using ValueType = typename MonoidType::ValueType; - - Status VisitValue(const ValueType& value) { - monoid_ += MonoidType(value); - return Status::OK(); - } - - Status VisitNull() { return Status::OK(); } - - MonoidType Value() const { return monoid_; } - - private: - MonoidType monoid_; -}; - -template > -class MonoidAggregateState : public AggregateState { - public: - Status Consume(FunctionContext* ctx, const Array& input) final { - auto data = input.data(); - MonoidVisitor visitor; - - RETURN_NOT_OK(ArrayDataVisitor::Visit(*data, &visitor)); - - { - // Merging the state must be protected by a mutex for concurrent access. - // The contention should be low assuming that the majority of time is - // spent in the preceding `Visit` call. - std::lock_guard guard(monoid_mutex_); - monoid_ += visitor.Value(); - } - - return Status::OK(); - }; - - Status Finalize(FunctionContext* ctx, Datum* out) final { - *out = Datum(Scalar(monoid_.value())); - return Status::OK(); - } - - private: - std::mutex monoid_mutex_; - MonoidType monoid_; -}; - -#define GET_MONOID_KERNEL_CASE(T, M) \ - case T::type_id: \ - kernel = \ - std::unique_ptr(new AggregateUnaryKernel(new M())); \ - break - -#define DEFINE_GET_MONOID_KERNEL(MonoidStateType) \ - static Status GetMonoidAggregateKernel( \ - FunctionContext* ctx, const DataType& type, \ - std::unique_ptr& kernel) { \ - switch (type.id()) { \ - GET_MONOID_KERNEL_CASE(UInt8Type, MonoidStateType); \ - GET_MONOID_KERNEL_CASE(Int8Type, MonoidStateType); \ - GET_MONOID_KERNEL_CASE(UInt16Type, MonoidStateType); \ - GET_MONOID_KERNEL_CASE(Int16Type, MonoidStateType); \ - GET_MONOID_KERNEL_CASE(UInt32Type, MonoidStateType); \ - GET_MONOID_KERNEL_CASE(Int32Type, MonoidStateType); \ - GET_MONOID_KERNEL_CASE(UInt64Type, MonoidStateType); \ - GET_MONOID_KERNEL_CASE(Int64Type, MonoidStateType); \ - GET_MONOID_KERNEL_CASE(FloatType, MonoidStateType); \ - GET_MONOID_KERNEL_CASE(DoubleType, MonoidStateType); \ - default: \ - return Status::Invalid("Unsupported type ", type.ToString()); \ - } \ - \ - return Status::OK(); \ - } - -} // namespace compute -} // namespace arrow - -#endif // ARROW_COMPUTE_KERNELS_MONOID_IMPL_H diff --git a/cpp/src/arrow/compute/kernels/monoid.h b/cpp/src/arrow/compute/kernels/monoid.h deleted file mode 100644 index 07f02f8d1a3..00000000000 --- a/cpp/src/arrow/compute/kernels/monoid.h +++ /dev/null @@ -1,86 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef ARROW_COMPUTE_KERNELS_MONOID_H -#define ARROW_COMPUTE_KERNELS_MONOID_H - -#include -#include -#include - -namespace arrow { -namespace compute { - -template -class Monoid { - public: - using ValueType = T; - using ThisType = Monoid; - - static constexpr ValueType identity() { return Identity(); } - - Monoid() {} - explicit Monoid(ValueType value) : value_(value) {} - - ThisType operator+(const ThisType& rhs) const { - return BinaryOp(this->value_, rhs.value_); - } - - ThisType& operator+=(const ThisType& rhs) { - this->value_ = BinaryOp(this->value_, rhs.value_); - return *this; - } - - ValueType value() const { return value_; } - - private: - ValueType value_ = identity(); -}; - -template -constexpr T zero() { - return static_cast(0); -} - -template -constexpr T add(const T& lhs, const T& rhs) { - return static_cast(lhs + rhs); -} - -template -using SumMonoid = Monoid, zero>; - -template -constexpr T min(const T& lhs, const T& rhs) { - return std::min(lhs, rhs); -} - -template -using MinMonoid = Monoid, std::numeric_limits::max>; - -template -constexpr T max(const T& lhs, const T& rhs) { - return std::max(lhs, rhs); -} - -template -using MaxMonoid = Monoid, std::numeric_limits::lowest>; - -} // namespace compute -} // namespace arrow - -#endif // ARROW_COMPUTE_KERNELS_MONOID_H diff --git a/cpp/src/arrow/compute/kernels/sum-benchmark.cc b/cpp/src/arrow/compute/kernels/sum-benchmark.cc index 079abd98368..6cedd39e673 100644 --- a/cpp/src/arrow/compute/kernels/sum-benchmark.cc +++ b/cpp/src/arrow/compute/kernels/sum-benchmark.cc @@ -18,10 +18,18 @@ #include "benchmark/benchmark.h" #include +#ifdef _MSC_VER +#include +#else +#include +#endif #include "arrow/builder.h" #include "arrow/memory_pool.h" +#include "arrow/test-random.h" #include "arrow/test-util.h" +#include "arrow/util/bit-util.h" +#include "arrow/util/cpu-info.h" #include "arrow/compute/context.h" #include "arrow/compute/kernel.h" @@ -30,31 +38,335 @@ namespace arrow { namespace compute { -static void BenchmarkSum(benchmark::State& state) { // NOLINT non-const reference - const int64_t iterations = 1UL << 12; +#include +#include +#include +#include - std::vector values; - std::vector is_valid; - for (int64_t i = 0; i < iterations; i++) { - for (int64_t j = 0; j < i; j++) { - is_valid.push_back(true); - values.push_back(j); +using internal::CpuInfo; +static CpuInfo* cpu_info = CpuInfo::GetInstance(); + +static const int64_t kL1Size = cpu_info->CacheSize(CpuInfo::L1_CACHE); +static const int64_t kL2Size = cpu_info->CacheSize(CpuInfo::L2_CACHE); +static const int64_t kL3Size = cpu_info->CacheSize(CpuInfo::L3_CACHE); + +namespace BitUtil = arrow::BitUtil; +using arrow::internal::BitmapReader; + +template +struct SumState { + using ValueType = T; + + SumState() : total(0), valid_count(0) {} + + T total = 0; + int64_t valid_count = 0; +}; + +template +struct Traits {}; + +template <> +struct Traits { + using ArrayType = typename CTypeTraits::ArrayType; + static constexpr int64_t null_sentinel = std::numeric_limits::lowest(); + + static void FixSentinel(std::shared_ptr& array) { + auto data = array->data(); + for (int64_t i = 0; i < array->length(); i++) + if (array->IsNull(i)) { + int64_t* val_ptr = data->GetMutableValues(1, i); + *val_ptr = null_sentinel; + } + } + + static inline bool IsNull(int64_t val) { return val == null_sentinel; } + + static inline bool NotNull(int64_t val) { return val != null_sentinel; } +}; + +template +struct Summer { + public: + using ValueType = T; + using ArrowType = typename CTypeTraits::ArrowType; +}; + +template +struct SumNoNulls : public Summer { + using ArrayType = typename CTypeTraits::ArrayType; + + static void Sum(const ArrayType& array, SumState* state) { + SumState local; + + const auto values = array.raw_values(); + for (int64_t i = 0; i < array.length(); ++i) { + local.total += values[i]; } + + local.valid_count = array.length(); + *state = local; } +}; - std::shared_ptr arr; - ArrayFromVector(is_valid, values, &arr); +template +struct SumNoNullsUnrolled : public Summer { + using ArrayType = typename CTypeTraits::ArrayType; - FunctionContext ctx; + static void Sum(const ArrayType& array, SumState* state) { + SumState local; + + const auto values = array.raw_values(); + const auto length = array.length(); + const int64_t length_rounded = BitUtil::RoundDown(length, 8); + for (int64_t i = 0; i < length_rounded; i += 8) { + local.total += values[i + 0] + values[i + 1] + values[i + 2] + values[i + 3] + + values[i + 4] + values[i + 5] + values[i + 6] + values[i + 7]; + } + + for (int64_t i = length_rounded; i < length; ++i) { + local.total += values[i]; + } + + local.valid_count = length; + + *state = local; + } +}; + +template +struct SumSentinel : public Summer { + using ArrayType = typename CTypeTraits::ArrayType; + + static void Sum(const ArrayType& array, SumState* state) { + SumState local; + + const auto values = array.raw_values(); + const auto length = array.length(); + for (int64_t i = 0; i < length; i++) { + if (Traits::NotNull(values[i])) { + // NaN is not equal to itself + local.total += values[i]; + local.valid_count++; + } + } + + *state = local; + } +}; + +template +struct SumSentinelUnrolled : public Summer { + using ArrayType = typename CTypeTraits::ArrayType; + + static void Sum(const ArrayType& array, SumState* state) { + SumState local; + +#define SUM_NOT_NULL(ITEM) \ + do { \ + if (Traits::NotNull(values[i + ITEM])) { \ + local.total += values[i + ITEM]; \ + local.valid_count++; \ + } \ + } while (0) + + const auto values = array.raw_values(); + const auto length = array.length(); + const int64_t length_rounded = BitUtil::RoundDown(length, 8); + for (int64_t i = 0; i < length_rounded; i += 8) { + SUM_NOT_NULL(0); + SUM_NOT_NULL(1); + SUM_NOT_NULL(2); + SUM_NOT_NULL(3); + SUM_NOT_NULL(4); + SUM_NOT_NULL(5); + SUM_NOT_NULL(6); + SUM_NOT_NULL(7); + } + +#undef SUM_NOT_NULL + + for (int64_t i = length_rounded * 8; i < length; ++i) { + if (Traits::NotNull(values[i])) { + local.total += values[i]; + ++local.valid_count; + } + } + + *state = local; + } +}; + +template +struct SumBitmapNaive : public Summer { + using ArrayType = typename CTypeTraits::ArrayType; + + static void Sum(const ArrayType& array, SumState* state) { + SumState local; + const auto values = array.raw_values(); + const auto bitmap = array.null_bitmap_data(); + const auto length = array.length(); + + for (int64_t i = 0; i < length; ++i) { + if (BitUtil::GetBit(bitmap, i)) { + local.total += values[i]; + ++local.valid_count; + } + } + + *state = local; + } +}; + +template +struct SumBitmapReader : public Summer { + using ArrayType = typename CTypeTraits::ArrayType; + + static void Sum(const ArrayType& array, SumState* state) { + SumState local; + + const auto values = array.raw_values(); + const auto bitmap = array.null_bitmap_data(); + const auto length = array.length(); + BitmapReader bit_reader(bitmap, 0, length); + for (int64_t i = 0; i < length; ++i) { + if (bit_reader.IsSet()) { + local.total += values[i]; + ++local.valid_count; + } + + bit_reader.Next(); + } + + *state = local; + } +}; + +// Generated with the following Python code + +// output = 'static constexpr uint8_t kBytePopcount[] = {{{0}}};' +// popcounts = [str(bin(i).count('1')) for i in range(0, 256)] +// print(output.format(', '.join(popcounts))) + +static constexpr uint8_t kBytePopcount[] = { + 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, + 4, 4, 5, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, + 4, 5, 4, 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, + 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, + 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, + 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, + 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, + 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, + 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8}; + +template +struct SumBitmapVectorizeUnroll : public Summer { + using ArrayType = typename CTypeTraits::ArrayType; + + static void Sum(const ArrayType& array, SumState* state) { + SumState local; + + const auto values = array.raw_values(); + const auto bitmap = array.null_bitmap_data(); + const auto length = array.length(); + const int64_t length_rounded = BitUtil::RoundDown(length, 8); + for (int64_t i = 0; i < length_rounded; i += 8) { + const uint8_t valid_byte = bitmap[i / 8]; + +#define SUM_SHIFT(ITEM) (values[i + ITEM] * ((valid_byte >> ITEM) & 1)) + + if (valid_byte < 0xFF) { + // Some nulls + local.total += SUM_SHIFT(0); + local.total += SUM_SHIFT(1); + local.total += SUM_SHIFT(2); + local.total += SUM_SHIFT(3); + local.total += SUM_SHIFT(4); + local.total += SUM_SHIFT(5); + local.total += SUM_SHIFT(6); + local.total += SUM_SHIFT(7); + local.valid_count += kBytePopcount[valid_byte]; + } else { + // No nulls + local.total += values[i + 0] + values[i + 1] + values[i + 2] + values[i + 3] + + values[i + 4] + values[i + 5] + values[i + 6] + values[i + 7]; + local.valid_count += 8; + } + } + +#undef SUM_SHIFT + + for (int64_t i = length_rounded; i < length; ++i) { + if (BitUtil::GetBit(bitmap, i)) { + local.total = values[i]; + ++local.valid_count; + } + } + + *state = local; + } +}; + +template +void BenchSum(benchmark::State& state) { + using T = typename Functor::ValueType; + + const int64_t array_size = state.range(0) / sizeof(int64_t); + const double null_percent = state.range(1) / 100.0; + auto rand = random::RandomArrayGenerator(1923); + auto array = std::static_pointer_cast>( + rand.Int64(array_size, -100, 100, null_percent)); + + Traits::FixSentinel(array); + + while (state.KeepRunning()) { + SumState sum_state; + Functor::Sum(*array, &sum_state); + benchmark::DoNotOptimize(sum_state); + } + + state.counters["size"] = state.range(0); + state.counters["null_percent"] = state.range(1); + state.SetBytesProcessed(state.iterations() * array_size * sizeof(T)); +} + +static void SetArgs(benchmark::internal::Benchmark* bench) { + bench->Unit(benchmark::kMicrosecond); + + for (auto size : {kL1Size, kL2Size, kL3Size, kL3Size * 4}) + for (auto nulls : std::vector({0, 1, 10, 50})) + bench->Args({static_cast(size), nulls}); +} + +BENCHMARK_TEMPLATE(BenchSum, SumNoNulls)->Apply(SetArgs); +BENCHMARK_TEMPLATE(BenchSum, SumNoNullsUnrolled)->Apply(SetArgs); +BENCHMARK_TEMPLATE(BenchSum, SumSentinel)->Apply(SetArgs); +BENCHMARK_TEMPLATE(BenchSum, SumSentinelUnrolled)->Apply(SetArgs); +BENCHMARK_TEMPLATE(BenchSum, SumBitmapNaive)->Apply(SetArgs); +BENCHMARK_TEMPLATE(BenchSum, SumBitmapReader)->Apply(SetArgs); +BENCHMARK_TEMPLATE(BenchSum, SumBitmapVectorizeUnroll)->Apply(SetArgs); + +static void BenchSumKernel(benchmark::State& state) { + const int64_t array_size = state.range(0) / sizeof(int64_t); + const double null_percent = state.range(1) / 100.0; + auto rand = random::RandomArrayGenerator(1923); + auto array = std::static_pointer_cast>( + rand.Int64(array_size, -100, 100, null_percent)); + + FunctionContext ctx; while (state.KeepRunning()) { Datum out; - ABORT_NOT_OK(Sum(&ctx, Datum(arr), &out)); + ABORT_NOT_OK(Sum(&ctx, Datum(array), &out)); + benchmark::DoNotOptimize(out); } - state.SetBytesProcessed(state.iterations() * values.size() * sizeof(int64_t)); + + state.counters["size"] = state.range(0); + state.counters["null_percent"] = state.range(1); + state.SetBytesProcessed(state.iterations() * array_size * sizeof(int64_t)); } -BENCHMARK(BenchmarkSum)->MinTime(1.0)->Unit(benchmark::kMicrosecond); +BENCHMARK(BenchSumKernel)->Apply(SetArgs); } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/kernels/sum-test.cc b/cpp/src/arrow/compute/kernels/sum-test.cc index 7ce71ab0745..a354206ef7b 100644 --- a/cpp/src/arrow/compute/kernels/sum-test.cc +++ b/cpp/src/arrow/compute/kernels/sum-test.cc @@ -20,6 +20,7 @@ #include "arrow/array.h" #include "arrow/test-common.h" +#include "arrow/test-random.h" #include "arrow/test-util.h" #include "arrow/type.h" @@ -33,44 +34,102 @@ using std::vector; namespace arrow { namespace compute { -template -void CheckSum(FunctionContext* ctx, const Array& input, CType expected) { +constexpr double kArbitraryDoubleErrorBound = 1.0; + +template +void ValidateSum(FunctionContext* ctx, const Array& input, Datum expected) { + using CType = typename ArrowType::c_type; + Datum result; ASSERT_OK(Sum(ctx, input, &result)); // Ensure Datum is Scalar of proper type. - ASSERT_EQ(result.kind(), Datum::SCALAR); - auto type = TypeTraits::type_singleton(); - ASSERT_EQ(result.scalar().kind(), type->id()); + ASSERT_EQ(result.kind(), expected.kind()); + + if (result.kind() == Datum::SCALAR) { + ASSERT_EQ(result.scalar().kind(), expected.scalar().kind()); + switch (result.scalar().kind()) { + case Type::FLOAT: + case Type::DOUBLE: + ASSERT_NEAR(util::get(result.scalar().value), + util::get(expected.scalar().value), + kArbitraryDoubleErrorBound); + break; + default: + ASSERT_EQ(util::get(result.scalar().value), + util::get(expected.scalar().value)); + break; + } + } +} - ASSERT_EQ(util::get(result.scalar().value), expected); +template +void ValidateSum(FunctionContext* ctx, const char* json, Datum expected) { + auto array = ArrayFromJSON(TypeTraits::type_singleton(), json); + ValidateSum(ctx, *array, expected); } -template -void CheckSum(FunctionContext* ctx, const std::string& json, CType expected) { - Datum result; - auto array = ArrayFromJSON(TypeTraits::type_singleton(), json); +template +static Datum DummySum(const Array& array) { + using CType = typename ArrowType::c_type; + using ArrayType = typename TypeTraits::ArrayType; + + CType sum = 0; + int64_t count = 0; + + const auto& array_numeric = reinterpret_cast(array); + const auto values = array_numeric.raw_values(); + const auto bitmap = array.null_bitmap_data(); + for (int64_t i = 0; i < array.length(); i++) { + if (BitUtil::GetBit(bitmap, i)) { + sum += values[i]; + count++; + } + } + + return (count > 0) ? Datum(Scalar(static_cast(sum))) : Datum(); +} - CheckSum(ctx, *array, expected); +template +void ValidateSum(FunctionContext* ctx, const Array& array) { + ValidateSum(ctx, array, DummySum(array)); } -template +template class TestSumKernelNumeric : public ComputeFixture, public TestBase {}; typedef ::testing::Types - NumericTypes; - -TYPED_TEST_CASE(TestSumKernelNumeric, NumericTypes); + NumericArrowTypes; +TYPED_TEST_CASE(TestSumKernelNumeric, NumericArrowTypes); TYPED_TEST(TestSumKernelNumeric, SimpleSum) { using CType = typename TypeParam::c_type; - CheckSum(&this->ctx_, "[0, 1, 2, 3, 4, 5]", 5 * 6 / 2); + ValidateSum(&this->ctx_, "[0, 1, 2, 3, 4, 5]", + Datum(Scalar(static_cast(5 * 6 / 2)))); // Avoid this tests for (U)Int8Type if (sizeof(CType) > 1) - CheckSum(&this->ctx_, "[1000, null, 300, null, 30, null, 7]", 1337); + ValidateSum(&this->ctx_, "[1000, null, 300, null, 30, null, 7]", + Datum(Scalar(static_cast(1337)))); +} + +template +class TestRandomSumKernelNumeric : public ComputeFixture, public TestBase {}; + +TYPED_TEST_CASE(TestRandomSumKernelNumeric, NumericArrowTypes); +TYPED_TEST(TestRandomSumKernelNumeric, RandomArraySum) { + auto rand = random::RandomArrayGenerator(0x5487655); + for (size_t i = 5; i < 14; i++) { + for (auto null_probability : {0.0, 0.01, 0.1, 0.25, 0.5, 1.0}) { + for (auto length_offset : {-2, -1, 0, 1, 2}) { + int64_t length = (1UL << i) + length_offset; + auto array = rand.Numeric(length, 0, 100, null_probability); + ValidateSum(&this->ctx_, *array); + } + } + } } } // namespace compute diff --git a/cpp/src/arrow/compute/kernels/sum.cc b/cpp/src/arrow/compute/kernels/sum.cc index 3c4941cf4db..573f8b7e280 100644 --- a/cpp/src/arrow/compute/kernels/sum.cc +++ b/cpp/src/arrow/compute/kernels/sum.cc @@ -19,26 +19,172 @@ #include "arrow/array.h" #include "arrow/compute/kernel.h" -#include "arrow/compute/kernels/aggregation.h" -#include "arrow/compute/kernels/monoid-impl.h" -#include "arrow/compute/kernels/monoid.h" +#include "arrow/compute/kernels/aggregate.h" #include "arrow/type_traits.h" #include "arrow/util/bit-util.h" +#include "arrow/util/logging.h" #include "arrow/visitor_inline.h" namespace arrow { namespace compute { -template -using SumMonoidType = SumMonoid; +template +struct SumState { + SumState operator+(const SumState& rhs) const { + return SumState(this->count_ + rhs.count_, this->sum_ + rhs.sum_); + } -template -using SumAggregateState = MonoidAggregateState>; + SumState& operator+=(const SumState& rhs) { + this->count_ += rhs.count_; + this->sum_ += rhs.sum_; -DEFINE_GET_MONOID_KERNEL(SumAggregateState) + return *this; + } + + size_t count_ = 0; + CType sum_ = 0; +}; + +// Generated with the following Python code + +// output = 'static constexpr uint8_t kBytePopcount[] = {{{0}}};' +// popcounts = [str(bin(i).count('1')) for i in range(0, 256)] +// print(output.format(', '.join(popcounts))) + +static constexpr uint8_t kBytePopcount[] = { + 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, + 4, 4, 5, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, + 4, 5, 4, 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, + 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, + 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, + 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, + 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, + 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, + 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8}; + +template +class SumAggregateFunction final : public AggregateFunctionStaticState> { + using State = SumState; + using ArrayType = typename TypeTraits::ArrayType; + + public: + Status Consume(const Array& input, State* state) const override { + const ArrayType& array = static_cast(input); + + if (input.null_count() > 0) + *state = ConsumeSparse(array); + else + *state = ConsumeDense(array); + + return Status::OK(); + } + + Status Merge(const State& src, State* dst) const override { + *dst += src; + return Status::OK(); + } + + Status Finalize(const State& src, Datum* output) const override { + *output = (src.count_ > 0) ? Datum(Scalar(src.sum_)) : Datum(); + return Status::OK(); + } + + private: + State ConsumeDense(const ArrayType& array) const { + State local; + + const auto values = array.raw_values(); + for (int64_t i = 0; i < array.length(); i++) { + local.sum_ += values[i]; + } + + local.count_ = array.length(); + + return local; + } + + State ConsumeSparse(const ArrayType& array) const { + State local; + + // TODO(fsaintjacques): This fails on slice not byte-aligned. + DCHECK_EQ(array.offset() % 8, 0); + + const auto values = array.raw_values(); + const auto bitmap = array.null_bitmap_data() + BitUtil::RoundDown(array.offset(), 8); + const auto length = array.length(); + const int64_t length_rounded = BitUtil::RoundDown(length, 8); + + for (int64_t i = 0; i < length_rounded; i += 8) { + const uint8_t valid_byte = bitmap[i / 8]; + if (valid_byte < 0xFF) { +#define SUM_SHIFT(ITEM) (values[i + ITEM] * ((valid_byte >> ITEM) & 1)) + // Some nulls + local.sum_ += SUM_SHIFT(0); + local.sum_ += SUM_SHIFT(1); + local.sum_ += SUM_SHIFT(2); + local.sum_ += SUM_SHIFT(3); + local.sum_ += SUM_SHIFT(4); + local.sum_ += SUM_SHIFT(5); + local.sum_ += SUM_SHIFT(6); + local.sum_ += SUM_SHIFT(7); + local.count_ += kBytePopcount[valid_byte]; +#undef SUM_SHIFT + } else { + // No nulls + local.sum_ += values[i + 0] + values[i + 1] + values[i + 2] + values[i + 3] + + values[i + 4] + values[i + 5] + values[i + 6] + values[i + 7]; + local.count_ += 8; + } + } + + for (int64_t i = length_rounded; i < length; ++i) { + if (BitUtil::GetBit(bitmap, i)) { + local.sum_ += values[i]; + local.count_++; + } + } + + return local; + } +}; + +#define SUM_AGG_FN_CASE(T) \ + case T::type_id: \ + return std::static_pointer_cast( \ + std::make_shared>()); + +std::shared_ptr MakeSumAggregateFunction(const DataType& type, + FunctionContext* ctx) { + switch (type.id()) { + SUM_AGG_FN_CASE(UInt8Type); + SUM_AGG_FN_CASE(Int8Type); + SUM_AGG_FN_CASE(UInt16Type); + SUM_AGG_FN_CASE(Int16Type); + SUM_AGG_FN_CASE(UInt32Type); + SUM_AGG_FN_CASE(Int32Type); + SUM_AGG_FN_CASE(UInt64Type); + SUM_AGG_FN_CASE(Int64Type); + SUM_AGG_FN_CASE(FloatType); + SUM_AGG_FN_CASE(DoubleType); + default: + return nullptr; + } + +#undef SUM_AGG_FN_CASE +} + +static Status GetSumKernel(FunctionContext* ctx, const DataType& type, + std::shared_ptr& kernel) { + std::shared_ptr aggregate = MakeSumAggregateFunction(type, ctx); + if (!aggregate) return Status::Invalid("No sum for type ", type); + + kernel = std::make_shared(aggregate); + + return Status::OK(); +} Status Sum(FunctionContext* ctx, const Datum& value, Datum* out) { - std::unique_ptr kernel; + std::shared_ptr kernel; auto data_type = value.type(); if (data_type == nullptr) @@ -46,7 +192,7 @@ Status Sum(FunctionContext* ctx, const Datum& value, Datum* out) { else if (!is_integer(data_type->id()) && !is_floating(data_type->id())) return Status::Invalid("Datum must contain a NumericType"); - RETURN_NOT_OK(GetMonoidAggregateKernel(ctx, *data_type, kernel)); + RETURN_NOT_OK(GetSumKernel(ctx, *data_type, kernel)); return kernel->Call(ctx, value, out); } diff --git a/cpp/src/arrow/compute/kernels/sum.h b/cpp/src/arrow/compute/kernels/sum.h index 4b8ab0bafab..c7be9f42147 100644 --- a/cpp/src/arrow/compute/kernels/sum.h +++ b/cpp/src/arrow/compute/kernels/sum.h @@ -26,11 +26,16 @@ namespace arrow { class Array; +class DataType; namespace compute { struct Datum; class FunctionContext; +class AggregateFunction; + +std::shared_ptr MakeSumAggregateFunction(const DataType& type, + FunctionContext* context); /// \brief Sum values of a numeric array. /// diff --git a/cpp/src/arrow/io/memory-benchmark.cc b/cpp/src/arrow/io/memory-benchmark.cc index b36be4de163..ed808ccdf60 100644 --- a/cpp/src/arrow/io/memory-benchmark.cc +++ b/cpp/src/arrow/io/memory-benchmark.cc @@ -32,49 +32,129 @@ namespace arrow { -static const int kNumCores = internal::CpuInfo::GetInstance()->num_cores(); +using internal::CpuInfo; +static CpuInfo* cpu_info = CpuInfo::GetInstance(); + +static const int kNumCores = cpu_info->num_cores(); +static const int64_t kL1Size = cpu_info->CacheSize(CpuInfo::L1_CACHE); +static const int64_t kL2Size = cpu_info->CacheSize(CpuInfo::L2_CACHE); +static const int64_t kL3Size = cpu_info->CacheSize(CpuInfo::L3_CACHE); + constexpr size_t kMemoryPerCore = 32 * 1024 * 1024; using BufferPtr = std::shared_ptr; +#ifdef ARROW_AVX512 + +using VectorType = __m512i; +#define VectorSet _mm512_set1_epi32 +#define VectorLoad _mm512_stream_load_si512 +#define VectorLoadAsm(SRC, DST) \ + asm volatile("vmovaps %[src], %[dst]" : [dst] "=v"(DST) : [src] "m"(SRC) :) +#define VectorStreamLoad _mm512_stream_load_si512 +#define VectorStreamLoadAsm(SRC, DST) \ + asm volatile("vmovntdqa %[src], %[dst]" : [dst] "=v"(DST) : [src] "m"(SRC) :) +#define VectorStreamWrite _mm512_stream_si512 + +#else + +#ifdef ARROW_AVX2 + +using VectorType = __m256i; +#define VectorSet _mm256_set1_epi32 +#define VectorLoad _mm256_stream_load_si256 +#define VectorLoadAsm(SRC, DST) \ + asm volatile("vmovaps %[src], %[dst]" : [dst] "=v"(DST) : [src] "m"(SRC) :) +#define VectorStreamLoad _mm256_stream_load_si256 +#define VectorStreamLoadAsm(SRC, DST) \ + asm volatile("vmovntdqa %[src], %[dst]" : [dst] "=v"(DST) : [src] "m"(SRC) :) +#define VectorStreamWrite _mm256_stream_si256 + +#else + using VectorType = __m128i; +#define VectorSet _mm_set1_epi32 +#define VectorLoad _mm_stream_load_si128 +#define VectorLoadAsm(SRC, DST) \ + asm volatile("movaps %[src], %[dst]" : [dst] "=x"(DST) : [src] "m"(SRC) :) +#define VectorStreamLoad _mm_stream_load_si128 +#define VectorStreamLoadAsm(SRC, DST) \ + asm volatile("movntdqa %[src], %[dst]" : [dst] "=x"(DST) : [src] "m"(SRC) :) +#define VectorStreamWrite _mm_stream_si128 + +#endif +#endif + +static void Read(void* src, void* dst, size_t size) { + const auto simd = static_cast(src); + VectorType a, b, c, d; + (void)dst; + + for (size_t i = 0; i < size / sizeof(VectorType); i += 4) { + VectorLoadAsm(simd[i], a); + VectorLoadAsm(simd[i + 1], b); + VectorLoadAsm(simd[i + 2], c); + VectorLoadAsm(simd[i + 3], d); + } + + benchmark::DoNotOptimize(a + b + c + d); +} // See http://codearcana.com/posts/2013/05/18/achieving-maximum-memory-bandwidth.html // for the usage of stream loads/writes. Or section 6.1, page 47 of // https://akkadia.org/drepper/cpumemory.pdf . - -static void Read(void* src, void* dst, size_t size) { +static void StreamRead(void* src, void* dst, size_t size) { auto simd = static_cast(src); + VectorType a, b, c, d; (void)dst; - for (size_t i = 0; i < size / sizeof(VectorType); i++) - benchmark::DoNotOptimize(_mm_stream_load_si128(&simd[i])); + for (size_t i = 0; i < size / sizeof(VectorType); i += 4) { + VectorStreamLoadAsm(simd[i], a); + VectorStreamLoadAsm(simd[i + 1], b); + VectorStreamLoadAsm(simd[i + 2], c); + VectorStreamLoadAsm(simd[i + 3], d); + } + + benchmark::DoNotOptimize(a + b + c + d); } -static void Write(void* src, void* dst, size_t size) { +static void StreamWrite(void* src, void* dst, size_t size) { auto simd = static_cast(dst); - const VectorType ones = _mm_set1_epi32(1); + const VectorType ones = VectorSet(1); (void)src; - for (size_t i = 0; i < size / sizeof(VectorType); i++) _mm_stream_si128(&simd[i], ones); + for (size_t i = 0; i < size / sizeof(VectorType); i += 4) { + VectorStreamWrite(&simd[i], ones); + VectorStreamWrite(&simd[i + 1], ones); + VectorStreamWrite(&simd[i + 2], ones); + VectorStreamWrite(&simd[i + 3], ones); + } } -static void ReadWrite(void* src, void* dst, size_t size) { +static void StreamReadWrite(void* src, void* dst, size_t size) { auto src_simd = static_cast(src); auto dst_simd = static_cast(dst); - for (size_t i = 0; i < size / sizeof(VectorType); i++) - _mm_stream_si128(&dst_simd[i], _mm_stream_load_si128(&src_simd[i])); + for (size_t i = 0; i < size / sizeof(VectorType); i += 4) { + VectorStreamWrite(&dst_simd[i], VectorStreamLoad(&src_simd[i])); + VectorStreamWrite(&dst_simd[i + 1], VectorStreamLoad(&src_simd[i + 1])); + VectorStreamWrite(&dst_simd[i + 2], VectorStreamLoad(&src_simd[i + 2])); + VectorStreamWrite(&dst_simd[i + 3], VectorStreamLoad(&src_simd[i + 3])); + } +} + +static void PlatformMemcpy(void* src, void* dst, size_t size) { + memcpy(src, dst, size); } using ApplyFn = decltype(Read); template static void MemoryBandwidth(benchmark::State& state) { // NOLINT non-const reference - const size_t buffer_size = kMemoryPerCore; + const size_t buffer_size = state.range(0); BufferPtr src, dst; - ABORT_NOT_OK(AllocateBuffer(buffer_size, &src)); ABORT_NOT_OK(AllocateBuffer(buffer_size, &dst)); + ABORT_NOT_OK(AllocateBuffer(buffer_size, &src)); random_bytes(buffer_size, 0, src->mutable_data()); while (state.KeepRunning()) { @@ -84,11 +164,29 @@ static void MemoryBandwidth(benchmark::State& state) { // NOLINT non-const refe state.SetBytesProcessed(state.iterations() * buffer_size); } -// `UseRealTime` is required due to threads, otherwise the cumulative CPU time -// is used which will skew the results by the number of threads. -BENCHMARK_TEMPLATE(MemoryBandwidth, Read)->ThreadRange(1, kNumCores)->UseRealTime(); -BENCHMARK_TEMPLATE(MemoryBandwidth, Write)->ThreadRange(1, kNumCores)->UseRealTime(); -BENCHMARK_TEMPLATE(MemoryBandwidth, ReadWrite)->ThreadRange(1, kNumCores)->UseRealTime(); +static void SetCacheBandwidthArgs(benchmark::internal::Benchmark* bench) { + auto cache_sizes = {kL1Size, kL2Size, kL3Size}; + for (auto size : cache_sizes) { + bench->Arg(size / 2); + bench->Arg(size); + bench->Arg(size * 2); + } + + bench->ArgName("size"); +} + +BENCHMARK_TEMPLATE(MemoryBandwidth, Read)->Apply(SetCacheBandwidthArgs); + +static void SetMemoryBandwidthArgs(benchmark::internal::Benchmark* bench) { + // `UseRealTime` is required due to threads, otherwise the cumulative CPU time + // is used which will skew the results by the number of threads. + bench->Arg(kMemoryPerCore)->ThreadRange(1, kNumCores)->UseRealTime(); +} + +BENCHMARK_TEMPLATE(MemoryBandwidth, StreamRead)->Apply(SetMemoryBandwidthArgs); +BENCHMARK_TEMPLATE(MemoryBandwidth, StreamWrite)->Apply(SetMemoryBandwidthArgs); +BENCHMARK_TEMPLATE(MemoryBandwidth, StreamReadWrite)->Apply(SetMemoryBandwidthArgs); +BENCHMARK_TEMPLATE(MemoryBandwidth, PlatformMemcpy)->Apply(SetMemoryBandwidthArgs); static void ParallelMemoryCopy(benchmark::State& state) { // NOLINT non-const reference const int64_t n_threads = state.range(0); @@ -107,9 +205,12 @@ static void ParallelMemoryCopy(benchmark::State& state) { // NOLINT non-const r } state.SetBytesProcessed(int64_t(state.iterations()) * buffer_size); - state.counters["threads"] = static_cast(n_threads); } -BENCHMARK(ParallelMemoryCopy)->RangeMultiplier(2)->Range(1, kNumCores)->UseRealTime(); +BENCHMARK(ParallelMemoryCopy) + ->RangeMultiplier(2) + ->Range(1, kNumCores) + ->ArgName("threads") + ->UseRealTime(); } // namespace arrow diff --git a/cpp/src/arrow/test-random.h b/cpp/src/arrow/test-random.h index dc57dcab025..2f820321ca6 100644 --- a/cpp/src/arrow/test-random.h +++ b/cpp/src/arrow/test-random.h @@ -22,6 +22,7 @@ #include #include +#include "arrow/type.h" #include "arrow/util/visibility.h" namespace arrow { @@ -158,6 +159,35 @@ class ARROW_EXPORT RandomArrayGenerator { std::shared_ptr Float64(int64_t size, double min, double max, double null_probability); + template + std::shared_ptr Numeric(int64_t size, CType min, CType max, + double null_probability) { + switch (ArrowType::type_id) { + case Type::UINT8: + return UInt8(size, min, max, null_probability); + case Type::INT8: + return Int8(size, min, max, null_probability); + case Type::UINT16: + return UInt16(size, min, max, null_probability); + case Type::INT16: + return Int16(size, min, max, null_probability); + case Type::UINT32: + return UInt32(size, min, max, null_probability); + case Type::INT32: + return Int32(size, min, max, null_probability); + case Type::UINT64: + return UInt64(size, min, max, null_probability); + case Type::INT64: + return Int64(size, min, max, null_probability); + case Type::FLOAT: + return Float32(size, min, max, null_probability); + case Type::DOUBLE: + return Float64(size, min, max, null_probability); + default: + return nullptr; + } + } + private: SeedType seed() { return seed_distribution_(seed_rng_); } diff --git a/cpp/src/arrow/util/bit-util.h b/cpp/src/arrow/util/bit-util.h index bfdb44f255c..0d671f01ee1 100644 --- a/cpp/src/arrow/util/bit-util.h +++ b/cpp/src/arrow/util/bit-util.h @@ -117,6 +117,11 @@ constexpr int64_t RoundUp(int64_t value, int64_t factor) { return (value + (factor - 1)) / factor * factor; } +// Returns 'value' rounded down to the nearest multiple of 'factor' +constexpr int64_t RoundDown(int64_t value, int64_t factor) { + return (value / factor) * factor; +} + // Returns 'value' rounded up to the nearest multiple of 'factor' when factor // is a power of two. // The result is undefined on overflow, i.e. if `value > 2**64 - factor`, From 0ff3bae9d3f837ffee72ab887bd2354796cd9e49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Thu, 7 Feb 2019 12:33:16 -0500 Subject: [PATCH 07/14] lint and format --- cpp/src/arrow/compute/kernels/aggregate.cc | 4 +++- cpp/src/arrow/io/memory-benchmark.cc | 4 +--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate.cc b/cpp/src/arrow/compute/kernels/aggregate.cc index 655dfb59734..e1e2dd942a1 100644 --- a/cpp/src/arrow/compute/kernels/aggregate.cc +++ b/cpp/src/arrow/compute/kernels/aggregate.cc @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. -#include "arrow/compute/kernels/aggregate.h" +#include + #include "arrow/compute/context.h" +#include "arrow/compute/kernels/aggregate.h" namespace arrow { namespace compute { diff --git a/cpp/src/arrow/io/memory-benchmark.cc b/cpp/src/arrow/io/memory-benchmark.cc index ed808ccdf60..2d0c803952c 100644 --- a/cpp/src/arrow/io/memory-benchmark.cc +++ b/cpp/src/arrow/io/memory-benchmark.cc @@ -142,9 +142,7 @@ static void StreamReadWrite(void* src, void* dst, size_t size) { } } -static void PlatformMemcpy(void* src, void* dst, size_t size) { - memcpy(src, dst, size); -} +static void PlatformMemcpy(void* src, void* dst, size_t size) { memcpy(src, dst, size); } using ApplyFn = decltype(Read); From cadc7099aaca921f5e5538ab0ed672b16511c772 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Thu, 7 Feb 2019 12:43:18 -0500 Subject: [PATCH 08/14] Remove mutex header --- cpp/src/arrow/compute/kernels/aggregate.h | 1 - 1 file changed, 1 deletion(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate.h b/cpp/src/arrow/compute/kernels/aggregate.h index 2998cbccb61..6e520bdcd80 100644 --- a/cpp/src/arrow/compute/kernels/aggregate.h +++ b/cpp/src/arrow/compute/kernels/aggregate.h @@ -18,7 +18,6 @@ #pragma once #include -#include #include "arrow/compute/kernel.h" From f062737f69c2be0d683ad7b97a9741838d9e0155 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Fri, 8 Feb 2019 12:09:03 -0500 Subject: [PATCH 09/14] Use bigger width type for accumulator. --- cpp/src/arrow/compute/kernels/aggregate.h | 2 + .../arrow/compute/kernels/sum-benchmark.cc | 12 +-- cpp/src/arrow/compute/kernels/sum-test.cc | 70 ++++++++++------ cpp/src/arrow/compute/kernels/sum.cc | 79 ++++++++++--------- cpp/src/arrow/compute/kernels/sum.h | 25 ++++++ cpp/src/arrow/test-random.h | 30 ++++--- 6 files changed, 139 insertions(+), 79 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate.h b/cpp/src/arrow/compute/kernels/aggregate.h index 6e520bdcd80..eb3ed202ec5 100644 --- a/cpp/src/arrow/compute/kernels/aggregate.h +++ b/cpp/src/arrow/compute/kernels/aggregate.h @@ -55,6 +55,8 @@ class AggregateFunction { /// \brief Convert state into a final result. virtual Status Finalize(const void* src, Datum* output) const = 0; + virtual ~AggregateFunction() {} + /// State management methods. virtual int64_t Size() const = 0; virtual void New(void* ptr) const = 0; diff --git a/cpp/src/arrow/compute/kernels/sum-benchmark.cc b/cpp/src/arrow/compute/kernels/sum-benchmark.cc index 6cedd39e673..db5e34d72a1 100644 --- a/cpp/src/arrow/compute/kernels/sum-benchmark.cc +++ b/cpp/src/arrow/compute/kernels/sum-benchmark.cc @@ -313,7 +313,7 @@ void BenchSum(benchmark::State& state) { using T = typename Functor::ValueType; const int64_t array_size = state.range(0) / sizeof(int64_t); - const double null_percent = state.range(1) / 100.0; + const double null_percent = static_cast(state.range(1)) / 100.0; auto rand = random::RandomArrayGenerator(1923); auto array = std::static_pointer_cast>( rand.Int64(array_size, -100, 100, null_percent)); @@ -326,8 +326,8 @@ void BenchSum(benchmark::State& state) { benchmark::DoNotOptimize(sum_state); } - state.counters["size"] = state.range(0); - state.counters["null_percent"] = state.range(1); + state.counters["size"] = static_cast(state.range(0)); + state.counters["null_percent"] = static_cast(state.range(1)); state.SetBytesProcessed(state.iterations() * array_size * sizeof(T)); } @@ -349,7 +349,7 @@ BENCHMARK_TEMPLATE(BenchSum, SumBitmapVectorizeUnroll)->Apply(SetArgs); static void BenchSumKernel(benchmark::State& state) { const int64_t array_size = state.range(0) / sizeof(int64_t); - const double null_percent = state.range(1) / 100.0; + const double null_percent = static_cast(state.range(1)) / 100.0; auto rand = random::RandomArrayGenerator(1923); auto array = std::static_pointer_cast>( rand.Int64(array_size, -100, 100, null_percent)); @@ -361,8 +361,8 @@ static void BenchSumKernel(benchmark::State& state) { benchmark::DoNotOptimize(out); } - state.counters["size"] = state.range(0); - state.counters["null_percent"] = state.range(1); + state.counters["size"] = static_cast(state.range(0)); + state.counters["null_percent"] = static_cast(state.range(1)); state.SetBytesProcessed(state.iterations() * array_size * sizeof(int64_t)); } diff --git a/cpp/src/arrow/compute/kernels/sum-test.cc b/cpp/src/arrow/compute/kernels/sum-test.cc index a354206ef7b..a142a6d17b7 100644 --- a/cpp/src/arrow/compute/kernels/sum-test.cc +++ b/cpp/src/arrow/compute/kernels/sum-test.cc @@ -15,8 +15,10 @@ // specific language governing permissions and limitations // under the License. -#include #include +#include + +#include #include "arrow/array.h" #include "arrow/test-common.h" @@ -34,33 +36,47 @@ using std::vector; namespace arrow { namespace compute { -constexpr double kArbitraryDoubleErrorBound = 1.0; +template +struct DatumEqual { + static void EnsureEqual(const Datum& lhs, const Datum& rhs) {} +}; + +template +struct DatumEqual::value>::type> { + static constexpr double kArbitraryDoubleErrorBound = 1.0; + + static void EnsureEqual(const Datum& lhs, const Datum& rhs) { + ASSERT_EQ(lhs.kind(), rhs.kind()); + if (lhs.kind() == Datum::SCALAR) { + ASSERT_EQ(lhs.scalar().kind(), rhs.scalar().kind()); + ASSERT_NEAR(util::get(lhs.scalar().value), + util::get(rhs.scalar().value), kArbitraryDoubleErrorBound); + } + } +}; + +template +struct DatumEqual::value>::type> { + static void EnsureEqual(const Datum& lhs, const Datum& rhs) { + ASSERT_EQ(lhs.kind(), rhs.kind()); + if (lhs.kind() == Datum::SCALAR) { + ASSERT_EQ(lhs.scalar().kind(), rhs.scalar().kind()); + ASSERT_EQ(util::get(lhs.scalar().value), + util::get(rhs.scalar().value)); + } + } +}; template void ValidateSum(FunctionContext* ctx, const Array& input, Datum expected) { using CType = typename ArrowType::c_type; + using SumType = typename FindAccumulatorType::Type; Datum result; ASSERT_OK(Sum(ctx, input, &result)); - - // Ensure Datum is Scalar of proper type. - ASSERT_EQ(result.kind(), expected.kind()); - - if (result.kind() == Datum::SCALAR) { - ASSERT_EQ(result.scalar().kind(), expected.scalar().kind()); - switch (result.scalar().kind()) { - case Type::FLOAT: - case Type::DOUBLE: - ASSERT_NEAR(util::get(result.scalar().value), - util::get(expected.scalar().value), - kArbitraryDoubleErrorBound); - break; - default: - ASSERT_EQ(util::get(result.scalar().value), - util::get(expected.scalar().value)); - break; - } - } + DatumEqual::EnsureEqual(result, expected); } template @@ -73,8 +89,9 @@ template static Datum DummySum(const Array& array) { using CType = typename ArrowType::c_type; using ArrayType = typename TypeTraits::ArrayType; + using SumType = typename FindAccumulatorType::Type; - CType sum = 0; + SumType sum = 0; int64_t count = 0; const auto& array_numeric = reinterpret_cast(array); @@ -87,7 +104,7 @@ static Datum DummySum(const Array& array) { } } - return (count > 0) ? Datum(Scalar(static_cast(sum))) : Datum(); + return (count > 0) ? Datum(Scalar(sum)) : Datum(); } template @@ -105,14 +122,17 @@ typedef ::testing::Types::Type; + + ValidateSum(&this->ctx_, "[]", Datum()); ValidateSum(&this->ctx_, "[0, 1, 2, 3, 4, 5]", - Datum(Scalar(static_cast(5 * 6 / 2)))); + Datum(Scalar(static_cast(5 * 6 / 2)))); // Avoid this tests for (U)Int8Type if (sizeof(CType) > 1) ValidateSum(&this->ctx_, "[1000, null, 300, null, 30, null, 7]", - Datum(Scalar(static_cast(1337)))); + Datum(Scalar(static_cast(1337)))); } template diff --git a/cpp/src/arrow/compute/kernels/sum.cc b/cpp/src/arrow/compute/kernels/sum.cc index 573f8b7e280..e4f12274bef 100644 --- a/cpp/src/arrow/compute/kernels/sum.cc +++ b/cpp/src/arrow/compute/kernels/sum.cc @@ -28,21 +28,23 @@ namespace arrow { namespace compute { -template +template ::Type> struct SumState { - SumState operator+(const SumState& rhs) const { - return SumState(this->count_ + rhs.count_, this->sum_ + rhs.sum_); + using ThisType = SumState; + + ThisType operator+(const ThisType& rhs) const { + return ThisType(this->count + rhs.count, this->sum + rhs.sum); } - SumState& operator+=(const SumState& rhs) { - this->count_ += rhs.count_; - this->sum_ += rhs.sum_; + ThisType& operator+=(const ThisType& rhs) { + this->count += rhs.count; + this->sum += this->sum; return *this; } - size_t count_ = 0; - CType sum_ = 0; + size_t count = 0; + SumType sum = 0; }; // Generated with the following Python code @@ -62,13 +64,13 @@ static constexpr uint8_t kBytePopcount[] = { 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8}; -template -class SumAggregateFunction final : public AggregateFunctionStaticState> { - using State = SumState; - using ArrayType = typename TypeTraits::ArrayType; +template > +class SumAggregateFunction final : public AggregateFunctionStaticState { + using CType = typename TypeTraits::CType; + using ArrayType = typename TypeTraits::ArrayType; public: - Status Consume(const Array& input, State* state) const override { + Status Consume(const Array& input, StateType* state) const override { const ArrayType& array = static_cast(input); if (input.null_count() > 0) @@ -79,32 +81,32 @@ class SumAggregateFunction final : public AggregateFunctionStaticState 0) ? Datum(Scalar(src.sum_)) : Datum(); + Status Finalize(const StateType& src, Datum* output) const override { + *output = (src.count > 0) ? Datum(Scalar(src.sum)) : Datum(); return Status::OK(); } private: - State ConsumeDense(const ArrayType& array) const { - State local; + StateType ConsumeDense(const ArrayType& array) const { + StateType local; const auto values = array.raw_values(); for (int64_t i = 0; i < array.length(); i++) { - local.sum_ += values[i]; + local.sum += values[i]; } - local.count_ = array.length(); + local.count = array.length(); return local; } - State ConsumeSparse(const ArrayType& array) const { - State local; + StateType ConsumeSparse(const ArrayType& array) const { + StateType local; // TODO(fsaintjacques): This fails on slice not byte-aligned. DCHECK_EQ(array.offset() % 8, 0); @@ -112,35 +114,36 @@ class SumAggregateFunction final : public AggregateFunctionStaticState> ITEM) & 1)) +#define SUM_SHIFT(ITEM) \ + static_cast(values[i + ITEM] * static_cast(((valid_byte >> ITEM) & 1U))) // Some nulls - local.sum_ += SUM_SHIFT(0); - local.sum_ += SUM_SHIFT(1); - local.sum_ += SUM_SHIFT(2); - local.sum_ += SUM_SHIFT(3); - local.sum_ += SUM_SHIFT(4); - local.sum_ += SUM_SHIFT(5); - local.sum_ += SUM_SHIFT(6); - local.sum_ += SUM_SHIFT(7); - local.count_ += kBytePopcount[valid_byte]; + local.sum += SUM_SHIFT(0); + local.sum += SUM_SHIFT(1); + local.sum += SUM_SHIFT(2); + local.sum += SUM_SHIFT(3); + local.sum += SUM_SHIFT(4); + local.sum += SUM_SHIFT(5); + local.sum += SUM_SHIFT(6); + local.sum += SUM_SHIFT(7); + local.count += kBytePopcount[valid_byte]; #undef SUM_SHIFT } else { // No nulls - local.sum_ += values[i + 0] + values[i + 1] + values[i + 2] + values[i + 3] + - values[i + 4] + values[i + 5] + values[i + 6] + values[i + 7]; - local.count_ += 8; + local.sum += values[i + 0] + values[i + 1] + values[i + 2] + values[i + 3] + + values[i + 4] + values[i + 5] + values[i + 6] + values[i + 7]; + local.count += 8; } } for (int64_t i = length_rounded; i < length; ++i) { if (BitUtil::GetBit(bitmap, i)) { - local.sum_ += values[i]; - local.count_++; + local.sum += values[i]; + local.count++; } } diff --git a/cpp/src/arrow/compute/kernels/sum.h b/cpp/src/arrow/compute/kernels/sum.h index c7be9f42147..af431a5971d 100644 --- a/cpp/src/arrow/compute/kernels/sum.h +++ b/cpp/src/arrow/compute/kernels/sum.h @@ -19,6 +19,7 @@ #define ARROW_COMPUTE_KERNELS_SUM_H #include +#include #include "arrow/status.h" #include "arrow/util/visibility.h" @@ -30,6 +31,30 @@ class DataType; namespace compute { +// Find the largest compatible primitive type for a primitive type. +template +struct FindAccumulatorType { + using Type = double; +}; + +template +struct FindAccumulatorType::value && + std::is_signed::value>::type> { + using Type = int64_t; +}; + +template +struct FindAccumulatorType::value && + std::is_unsigned::value>::type> { + using Type = uint64_t; +}; + +template +struct FindAccumulatorType< + I, typename std::enable_if::value>::type> { + using Type = double; +}; + struct Datum; class FunctionContext; class AggregateFunction; diff --git a/cpp/src/arrow/test-random.h b/cpp/src/arrow/test-random.h index 2f820321ca6..1ed8d03e8a1 100644 --- a/cpp/src/arrow/test-random.h +++ b/cpp/src/arrow/test-random.h @@ -164,25 +164,35 @@ class ARROW_EXPORT RandomArrayGenerator { double null_probability) { switch (ArrowType::type_id) { case Type::UINT8: - return UInt8(size, min, max, null_probability); + return UInt8(size, static_cast(min), static_cast(max), + null_probability); case Type::INT8: - return Int8(size, min, max, null_probability); + return Int8(size, static_cast(min), static_cast(max), + null_probability); case Type::UINT16: - return UInt16(size, min, max, null_probability); + return UInt16(size, static_cast(min), static_cast(max), + null_probability); case Type::INT16: - return Int16(size, min, max, null_probability); + return Int16(size, static_cast(min), static_cast(max), + null_probability); case Type::UINT32: - return UInt32(size, min, max, null_probability); + return UInt32(size, static_cast(min), static_cast(max), + null_probability); case Type::INT32: - return Int32(size, min, max, null_probability); + return Int32(size, static_cast(min), static_cast(max), + null_probability); case Type::UINT64: - return UInt64(size, min, max, null_probability); + return UInt64(size, static_cast(min), static_cast(max), + null_probability); case Type::INT64: - return Int64(size, min, max, null_probability); + return Int64(size, static_cast(min), static_cast(max), + null_probability); case Type::FLOAT: - return Float32(size, min, max, null_probability); + return Float32(size, static_cast(min), static_cast(max), + null_probability); case Type::DOUBLE: - return Float64(size, min, max, null_probability); + return Float64(size, static_cast(min), static_cast(max), + null_probability); default: return nullptr; } From 9bd1f2512cc07cf8d168a70f5ed08d10f4b1ec29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Saint-Jacques?= Date: Fri, 8 Feb 2019 14:02:45 -0500 Subject: [PATCH 10/14] Warnings --- cpp/src/arrow/compute/kernels/sum.h | 2 ++ cpp/src/arrow/io/memory-benchmark.cc | 10 ++++++++++ 2 files changed, 12 insertions(+) diff --git a/cpp/src/arrow/compute/kernels/sum.h b/cpp/src/arrow/compute/kernels/sum.h index af431a5971d..2e2ca3c3d4b 100644 --- a/cpp/src/arrow/compute/kernels/sum.h +++ b/cpp/src/arrow/compute/kernels/sum.h @@ -59,6 +59,7 @@ struct Datum; class FunctionContext; class AggregateFunction; +ARROW_EXPORT std::shared_ptr MakeSumAggregateFunction(const DataType& type, FunctionContext* context); @@ -81,6 +82,7 @@ Status Sum(FunctionContext* context, const Datum& value, Datum* out); /// /// \since 0.13.0 /// \note API not yet finalized +ARROW_EXPORT Status Sum(FunctionContext* context, const Array& array, Datum* out); } // namespace compute diff --git a/cpp/src/arrow/io/memory-benchmark.cc b/cpp/src/arrow/io/memory-benchmark.cc index 2d0c803952c..b37aff96649 100644 --- a/cpp/src/arrow/io/memory-benchmark.cc +++ b/cpp/src/arrow/io/memory-benchmark.cc @@ -96,6 +96,11 @@ static void Read(void* src, void* dst, size_t size) { VectorLoadAsm(simd[i + 3], d); } + memset(&a, 0, sizeof(a)); + memset(&b, 0, sizeof(b)); + memset(&c, 0, sizeof(c)); + memset(&d, 0, sizeof(d)); + benchmark::DoNotOptimize(a + b + c + d); } @@ -107,6 +112,11 @@ static void StreamRead(void* src, void* dst, size_t size) { VectorType a, b, c, d; (void)dst; + memset(&a, 0, sizeof(a)); + memset(&b, 0, sizeof(b)); + memset(&c, 0, sizeof(c)); + memset(&d, 0, sizeof(d)); + for (size_t i = 0; i < size / sizeof(VectorType); i += 4) { VectorStreamLoadAsm(simd[i], a); VectorStreamLoadAsm(simd[i + 1], b); From 27d32b538841fe3e265fb4800881036c5e73ac18 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 9 Feb 2019 18:10:39 -0600 Subject: [PATCH 11/14] Use multiplication method with sentinel benchmarks Change-Id: Iacd7e6bdd6ce03cff3003ea1f8e703ba42c3ab7a --- .../arrow/compute/kernels/sum-benchmark.cc | 24 +++++++------------ 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/sum-benchmark.cc b/cpp/src/arrow/compute/kernels/sum-benchmark.cc index db5e34d72a1..a2d0288ab98 100644 --- a/cpp/src/arrow/compute/kernels/sum-benchmark.cc +++ b/cpp/src/arrow/compute/kernels/sum-benchmark.cc @@ -144,11 +144,9 @@ struct SumSentinel : public Summer { const auto values = array.raw_values(); const auto length = array.length(); for (int64_t i = 0; i < length; i++) { - if (Traits::NotNull(values[i])) { - // NaN is not equal to itself - local.total += values[i]; - local.valid_count++; - } + // NaN is not equal to itself + local.total += values[i] * Traits::NotNull(values[i]); + local.valid_count++; } *state = local; @@ -162,12 +160,10 @@ struct SumSentinelUnrolled : public Summer { static void Sum(const ArrayType& array, SumState* state) { SumState local; -#define SUM_NOT_NULL(ITEM) \ - do { \ - if (Traits::NotNull(values[i + ITEM])) { \ - local.total += values[i + ITEM]; \ - local.valid_count++; \ - } \ +#define SUM_NOT_NULL(ITEM) \ + do { \ + local.total += values[i + ITEM] * Traits::NotNull(values[i + ITEM]); \ + local.valid_count++; \ } while (0) const auto values = array.raw_values(); @@ -187,10 +183,8 @@ struct SumSentinelUnrolled : public Summer { #undef SUM_NOT_NULL for (int64_t i = length_rounded * 8; i < length; ++i) { - if (Traits::NotNull(values[i])) { - local.total += values[i]; - ++local.valid_count; - } + local.total += values[i] * Traits::NotNull(values[i]); + ++local.valid_count; } *state = local; From 985e9ce4c7f3a9867e1fbd9e1e8b944ff8555a9d Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 9 Feb 2019 18:16:22 -0600 Subject: [PATCH 12/14] Rename arrow-compute-sum-benchmark/test Change-Id: I9a1f46f10c30d6f740c7896a2c72195013a714e7 --- cpp/src/arrow/compute/kernels/CMakeLists.txt | 4 ++-- .../kernels/{sum-benchmark.cc => aggregate-benchmark.cc} | 0 .../arrow/compute/kernels/{sum-test.cc => aggregate-test.cc} | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename cpp/src/arrow/compute/kernels/{sum-benchmark.cc => aggregate-benchmark.cc} (100%) rename cpp/src/arrow/compute/kernels/{sum-test.cc => aggregate-test.cc} (100%) diff --git a/cpp/src/arrow/compute/kernels/CMakeLists.txt b/cpp/src/arrow/compute/kernels/CMakeLists.txt index cadd789a8e9..df80fb2b84e 100644 --- a/cpp/src/arrow/compute/kernels/CMakeLists.txt +++ b/cpp/src/arrow/compute/kernels/CMakeLists.txt @@ -22,5 +22,5 @@ ADD_ARROW_TEST(cast-test PREFIX "arrow-compute") ADD_ARROW_TEST(hash-test PREFIX "arrow-compute") # Aggregates -ADD_ARROW_TEST(sum-test PREFIX "arrow-compute") -ADD_ARROW_BENCHMARK(sum-benchmark PREFIX "arrow-compute") +ADD_ARROW_TEST(aggregate-test PREFIX "arrow-compute") +ADD_ARROW_BENCHMARK(aggregate-benchmark PREFIX "arrow-compute") diff --git a/cpp/src/arrow/compute/kernels/sum-benchmark.cc b/cpp/src/arrow/compute/kernels/aggregate-benchmark.cc similarity index 100% rename from cpp/src/arrow/compute/kernels/sum-benchmark.cc rename to cpp/src/arrow/compute/kernels/aggregate-benchmark.cc diff --git a/cpp/src/arrow/compute/kernels/sum-test.cc b/cpp/src/arrow/compute/kernels/aggregate-test.cc similarity index 100% rename from cpp/src/arrow/compute/kernels/sum-test.cc rename to cpp/src/arrow/compute/kernels/aggregate-test.cc From c272059d8a68b0b941fc4fe7608329e8e654e0e8 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 9 Feb 2019 18:22:59 -0600 Subject: [PATCH 13/14] Some code review comments Change-Id: I7f5d047191d58369f6e73650f3d705e967a2ba7b --- .../compute/kernels/aggregate-benchmark.cc | 23 +++---------------- cpp/src/arrow/compute/kernels/aggregate.h | 2 +- cpp/src/arrow/compute/kernels/sum.cc | 19 +-------------- cpp/src/arrow/util/bit-util.h | 17 ++++++++++++++ 4 files changed, 22 insertions(+), 39 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/aggregate-benchmark.cc b/cpp/src/arrow/compute/kernels/aggregate-benchmark.cc index a2d0288ab98..0c31d2bede5 100644 --- a/cpp/src/arrow/compute/kernels/aggregate-benchmark.cc +++ b/cpp/src/arrow/compute/kernels/aggregate-benchmark.cc @@ -237,23 +237,6 @@ struct SumBitmapReader : public Summer { } }; -// Generated with the following Python code - -// output = 'static constexpr uint8_t kBytePopcount[] = {{{0}}};' -// popcounts = [str(bin(i).count('1')) for i in range(0, 256)] -// print(output.format(', '.join(popcounts))) - -static constexpr uint8_t kBytePopcount[] = { - 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, - 4, 4, 5, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, - 4, 5, 4, 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, - 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, - 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, - 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, - 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, - 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, - 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8}; - template struct SumBitmapVectorizeUnroll : public Summer { using ArrayType = typename CTypeTraits::ArrayType; @@ -280,7 +263,7 @@ struct SumBitmapVectorizeUnroll : public Summer { local.total += SUM_SHIFT(5); local.total += SUM_SHIFT(6); local.total += SUM_SHIFT(7); - local.valid_count += kBytePopcount[valid_byte]; + local.valid_count += BitUtil::kBytePopcount[valid_byte]; } else { // No nulls local.total += values[i + 0] + values[i + 1] + values[i + 2] + values[i + 3] + @@ -314,7 +297,7 @@ void BenchSum(benchmark::State& state) { Traits::FixSentinel(array); - while (state.KeepRunning()) { + for (auto _ : state) { SumState sum_state; Functor::Sum(*array, &sum_state); benchmark::DoNotOptimize(sum_state); @@ -349,7 +332,7 @@ static void BenchSumKernel(benchmark::State& state) { rand.Int64(array_size, -100, 100, null_percent)); FunctionContext ctx; - while (state.KeepRunning()) { + for (auto _ : state) { Datum out; ABORT_NOT_OK(Sum(&ctx, Datum(array), &out)); benchmark::DoNotOptimize(out); diff --git a/cpp/src/arrow/compute/kernels/aggregate.h b/cpp/src/arrow/compute/kernels/aggregate.h index eb3ed202ec5..4bc869aec3f 100644 --- a/cpp/src/arrow/compute/kernels/aggregate.h +++ b/cpp/src/arrow/compute/kernels/aggregate.h @@ -43,7 +43,7 @@ struct Datum; /// - New (placement new constructor invocation) /// - Delete (state desctructor) /// -/// Design borrowed from ClickHouse aggregate functions. +/// Design inspired by ClickHouse aggregate functions. class AggregateFunction { public: /// \brief Consume an array into a state. diff --git a/cpp/src/arrow/compute/kernels/sum.cc b/cpp/src/arrow/compute/kernels/sum.cc index e4f12274bef..74853fe2400 100644 --- a/cpp/src/arrow/compute/kernels/sum.cc +++ b/cpp/src/arrow/compute/kernels/sum.cc @@ -47,23 +47,6 @@ struct SumState { SumType sum = 0; }; -// Generated with the following Python code - -// output = 'static constexpr uint8_t kBytePopcount[] = {{{0}}};' -// popcounts = [str(bin(i).count('1')) for i in range(0, 256)] -// print(output.format(', '.join(popcounts))) - -static constexpr uint8_t kBytePopcount[] = { - 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, - 4, 4, 5, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, - 4, 5, 4, 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, - 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, - 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, - 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, - 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, - 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, - 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8}; - template > class SumAggregateFunction final : public AggregateFunctionStaticState { using CType = typename TypeTraits::CType; @@ -130,7 +113,7 @@ class SumAggregateFunction final : public AggregateFunctionStaticState::type as_unsigned(Integer x) { namespace BitUtil { +// The number of set bits in a given unsigned byte value, pre-computed +// +// Generated with the following Python code +// output = 'static constexpr uint8_t kBytePopcount[] = {{{0}}};' +// popcounts = [str(bin(i).count('1')) for i in range(0, 256)] +// print(output.format(', '.join(popcounts))) +static constexpr uint8_t kBytePopcount[] = { + 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, + 4, 4, 5, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, + 4, 5, 4, 5, 5, 6, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, 3, 3, 4, 3, 4, 4, + 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 3, 4, 4, 5, + 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 1, 2, 2, 3, 2, 3, 3, 4, 2, 3, 3, 4, 3, 4, 4, 5, 2, + 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, 5, 6, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, 5, 4, 5, + 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 2, 3, 3, 4, 3, 4, 4, 5, 3, 4, 4, + 5, 4, 5, 5, 6, 3, 4, 4, 5, 4, 5, 5, 6, 4, 5, 5, 6, 5, 6, 6, 7, 3, 4, 4, 5, 4, 5, 5, 6, + 4, 5, 5, 6, 5, 6, 6, 7, 4, 5, 5, 6, 5, 6, 6, 7, 5, 6, 6, 7, 6, 7, 7, 8}; + // // Bit-related computations on integer values // From 568ba09e61e04325d0ca5625a9349a241800c3a7 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Sat, 9 Feb 2019 18:24:47 -0600 Subject: [PATCH 14/14] Add braces Change-Id: Ib638796e4bb7c456625b8c29ba803f071051fd66 --- cpp/src/arrow/compute/kernels/sum.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/kernels/sum.cc b/cpp/src/arrow/compute/kernels/sum.cc index 74853fe2400..cb37c4a0961 100644 --- a/cpp/src/arrow/compute/kernels/sum.cc +++ b/cpp/src/arrow/compute/kernels/sum.cc @@ -56,10 +56,11 @@ class SumAggregateFunction final : public AggregateFunctionStaticState(input); - if (input.null_count() > 0) + if (input.null_count() > 0) { *state = ConsumeSparse(array); - else + } else { *state = ConsumeDense(array); + } return Status::OK(); }