diff --git a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc index 8d9d198a353..be8d66c4c24 100644 --- a/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc +++ b/cpp/src/arrow/compute/kernels/aggregate_tdigest.cc @@ -65,9 +65,7 @@ struct TDigestImpl : public ScalarAggregator { Status MergeFrom(KernelContext*, KernelState&& src) override { auto& other = checked_cast(src); - std::vector other_tdigest; - other_tdigest.push_back(std::move(other.tdigest)); - this->tdigest.Merge(&other_tdigest); + this->tdigest.Merge(other.tdigest); return Status::OK(); } diff --git a/cpp/src/arrow/compute/kernels/hash_aggregate.cc b/cpp/src/arrow/compute/kernels/hash_aggregate.cc index 4fd6af9b190..e5d16a6a67d 100644 --- a/cpp/src/arrow/compute/kernels/hash_aggregate.cc +++ b/cpp/src/arrow/compute/kernels/hash_aggregate.cc @@ -1489,10 +1489,8 @@ struct GroupedTDigestImpl : public GroupedAggregator { auto other = checked_cast(&raw_other); auto g = group_id_mapping.GetValues(1); - std::vector other_tdigest(1); for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) { - other_tdigest[0] = std::move(other->tdigests_[other_g]); - tdigests_[*g].Merge(&other_tdigest); + tdigests_[*g].Merge(other->tdigests_[other_g]); } return Status::OK(); diff --git a/cpp/src/arrow/util/tdigest.cc b/cpp/src/arrow/util/tdigest.cc index 99b771ca0f2..541fc362529 100644 --- a/cpp/src/arrow/util/tdigest.cc +++ b/cpp/src/arrow/util/tdigest.cc @@ -371,34 +371,40 @@ void TDigest::Reset() { impl_->Reset(); } -Status TDigest::Validate() { +Status TDigest::Validate() const { MergeInput(); return impl_->Validate(); } -void TDigest::Dump() { +void TDigest::Dump() const { MergeInput(); impl_->Dump(); } -void TDigest::Merge(std::vector* tdigests) { +void TDigest::Merge(const std::vector& others) { MergeInput(); - std::vector tdigest_impls; - tdigest_impls.reserve(tdigests->size()); - for (auto& td : *tdigests) { - td.MergeInput(); - tdigest_impls.push_back(td.impl_.get()); + std::vector other_impls; + other_impls.reserve(others.size()); + for (auto& other : others) { + other.MergeInput(); + other_impls.push_back(other.impl_.get()); } - impl_->Merge(tdigest_impls); + impl_->Merge(other_impls); } -double TDigest::Quantile(double q) { +void TDigest::Merge(const TDigest& other) { + MergeInput(); + other.MergeInput(); + impl_->Merge({other.impl_.get()}); +} + +double TDigest::Quantile(double q) const { MergeInput(); return impl_->Quantile(q); } -double TDigest::Mean() { +double TDigest::Mean() const { MergeInput(); return impl_->Mean(); } @@ -407,7 +413,7 @@ bool TDigest::is_empty() const { return input_.size() == 0 && impl_->total_weight() == 0; } -void TDigest::MergeInput() { +void TDigest::MergeInput() const { if (input_.size() > 0) { impl_->MergeInput(input_); // will mutate input_ } diff --git a/cpp/src/arrow/util/tdigest.h b/cpp/src/arrow/util/tdigest.h index ae42ce48e7d..308df468840 100644 --- a/cpp/src/arrow/util/tdigest.h +++ b/cpp/src/arrow/util/tdigest.h @@ -47,10 +47,10 @@ class ARROW_EXPORT TDigest { void Reset(); // validate data integrity - Status Validate(); + Status Validate() const; // dump internal data, only for debug - void Dump(); + void Dump() const; // buffer a single data point, consume internal buffer if full // this function is intensively called and performance critical @@ -75,24 +75,25 @@ class ARROW_EXPORT TDigest { } // merge with other t-digests, called infrequently - void Merge(std::vector* tdigests); + void Merge(const std::vector& others); + void Merge(const TDigest& other); // calculate quantile - double Quantile(double q); + double Quantile(double q) const; - double Min() { return Quantile(0); } - double Max() { return Quantile(1); } - double Mean(); + double Min() const { return Quantile(0); } + double Max() const { return Quantile(1); } + double Mean() const; // check if this tdigest contains no valid data points bool is_empty() const; private: // merge input data with current tdigest - void MergeInput(); + void MergeInput() const; // input buffer, size = buffer_size * sizeof(double) - std::vector input_; + mutable std::vector input_; // hide other members with pimpl class TDigestImpl; diff --git a/cpp/src/arrow/util/tdigest_test.cc b/cpp/src/arrow/util/tdigest_test.cc index e9a3924f812..532046b20cb 100644 --- a/cpp/src/arrow/util/tdigest_test.cc +++ b/cpp/src/arrow/util/tdigest_test.cc @@ -170,7 +170,7 @@ void TestMerge(const std::vector>& values_vector, uint32_t d // merge into an empty tdigest { TDigest td(delta); - td.Merge(&tds); + td.Merge(tds); ASSERT_OK(td.Validate()); for (size_t i = 0; i < quantiles.size(); ++i) { const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1); @@ -182,7 +182,7 @@ void TestMerge(const std::vector>& values_vector, uint32_t d { TDigest td = std::move(tds[0]); tds.erase(tds.begin(), tds.begin() + 1); - td.Merge(&tds); + td.Merge(tds); ASSERT_OK(td.Validate()); for (size_t i = 0; i < quantiles.size(); ++i) { const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);