Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cpp/src/arrow/compute/kernels/aggregate_tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ struct TDigestImpl : public ScalarAggregator {

Status MergeFrom(KernelContext*, KernelState&& src) override {
auto& other = checked_cast<ThisType&>(src);
std::vector<TDigest> other_tdigest;
other_tdigest.push_back(std::move(other.tdigest));
this->tdigest.Merge(&other_tdigest);
this->tdigest.Merge(other.tdigest);
return Status::OK();
}

Expand Down
4 changes: 1 addition & 3 deletions cpp/src/arrow/compute/kernels/hash_aggregate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1489,10 +1489,8 @@ struct GroupedTDigestImpl : public GroupedAggregator {
auto other = checked_cast<GroupedTDigestImpl*>(&raw_other);

auto g = group_id_mapping.GetValues<uint32_t>(1);
std::vector<TDigest> other_tdigest(1);
for (int64_t other_g = 0; other_g < group_id_mapping.length; ++other_g, ++g) {
other_tdigest[0] = std::move(other->tdigests_[other_g]);
tdigests_[*g].Merge(&other_tdigest);
tdigests_[*g].Merge(other->tdigests_[other_g]);
}

return Status::OK();
Expand Down
30 changes: 18 additions & 12 deletions cpp/src/arrow/util/tdigest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -371,34 +371,40 @@ void TDigest::Reset() {
impl_->Reset();
}

Status TDigest::Validate() {
Status TDigest::Validate() const {
MergeInput();
return impl_->Validate();
}

void TDigest::Dump() {
void TDigest::Dump() const {
MergeInput();
impl_->Dump();
}

void TDigest::Merge(std::vector<TDigest>* tdigests) {
void TDigest::Merge(const std::vector<TDigest>& others) {
MergeInput();

std::vector<const TDigestImpl*> tdigest_impls;
tdigest_impls.reserve(tdigests->size());
for (auto& td : *tdigests) {
td.MergeInput();
tdigest_impls.push_back(td.impl_.get());
std::vector<const TDigestImpl*> other_impls;
other_impls.reserve(others.size());
for (auto& other : others) {
other.MergeInput();
other_impls.push_back(other.impl_.get());
}
impl_->Merge(tdigest_impls);
impl_->Merge(other_impls);
}

double TDigest::Quantile(double q) {
void TDigest::Merge(const TDigest& other) {
MergeInput();
other.MergeInput();
impl_->Merge({other.impl_.get()});
}

double TDigest::Quantile(double q) const {
MergeInput();
return impl_->Quantile(q);
}

double TDigest::Mean() {
double TDigest::Mean() const {
MergeInput();
return impl_->Mean();
}
Expand All @@ -407,7 +413,7 @@ bool TDigest::is_empty() const {
return input_.size() == 0 && impl_->total_weight() == 0;
}

void TDigest::MergeInput() {
void TDigest::MergeInput() const {
if (input_.size() > 0) {
impl_->MergeInput(input_); // will mutate input_
}
Expand Down
19 changes: 10 additions & 9 deletions cpp/src/arrow/util/tdigest.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ class ARROW_EXPORT TDigest {
void Reset();

// validate data integrity
Status Validate();
Status Validate() const;

// dump internal data, only for debug
void Dump();
void Dump() const;

// buffer a single data point, consume internal buffer if full
// this function is intensively called and performance critical
Expand All @@ -75,24 +75,25 @@ class ARROW_EXPORT TDigest {
}

// merge with other t-digests, called infrequently
void Merge(std::vector<TDigest>* tdigests);
void Merge(const std::vector<TDigest>& others);
void Merge(const TDigest& other);

// calculate quantile
double Quantile(double q);
double Quantile(double q) const;

double Min() { return Quantile(0); }
double Max() { return Quantile(1); }
double Mean();
double Min() const { return Quantile(0); }
double Max() const { return Quantile(1); }
double Mean() const;

// check if this tdigest contains no valid data points
bool is_empty() const;

private:
// merge input data with current tdigest
void MergeInput();
void MergeInput() const;

// input buffer, size = buffer_size * sizeof(double)
std::vector<double> input_;
mutable std::vector<double> input_;

// hide other members with pimpl
class TDigestImpl;
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/util/tdigest_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ void TestMerge(const std::vector<std::vector<double>>& values_vector, uint32_t d
// merge into an empty tdigest
{
TDigest td(delta);
td.Merge(&tds);
td.Merge(tds);
ASSERT_OK(td.Validate());
for (size_t i = 0; i < quantiles.size(); ++i) {
const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);
Expand All @@ -182,7 +182,7 @@ void TestMerge(const std::vector<std::vector<double>>& values_vector, uint32_t d
{
TDigest td = std::move(tds[0]);
tds.erase(tds.begin(), tds.begin() + 1);
td.Merge(&tds);
td.Merge(tds);
ASSERT_OK(td.Validate());
for (size_t i = 0; i < quantiles.size(); ++i) {
const double tolerance = std::max(std::fabs(expected[i]) * error_ratio, 0.1);
Expand Down