Skip to content
Merged
9 changes: 9 additions & 0 deletions sdk/include/opentelemetry/sdk/metrics/aggregator/aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ class Aggregator
*/
virtual AggregatorKind get_aggregator_kind() final { return agg_kind_; }

/**
* Getter function for updated_ protected var
*
* @return A bool indicating wether or not this aggregator has been updated
* in the most recent collection interval.
*/
virtual bool is_updated() final { return updated_; }

// virtual function to be overriden for the Histogram Aggregator
virtual std::vector<double> get_boundaries() { return std::vector<double>(); }

Expand Down Expand Up @@ -134,6 +142,7 @@ class Aggregator
opentelemetry::metrics::InstrumentKind kind_;
std::mutex mu_;
AggregatorKind agg_kind_;
bool updated_;
};

} // namespace metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class CounterAggregator final : public Aggregator<T>
void update(T val) override
{
this->mu_.lock();
this->updated_ = true;
this->values_[0] += val; // atomic operation
this->mu_.unlock();
}
Expand All @@ -50,6 +51,7 @@ class CounterAggregator final : public Aggregator<T>
void checkpoint() override
{
this->mu_.lock();
this->updated_ = false;
this->checkpoint_ = this->values_;
this->values_[0] = 0;
this->mu_.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class ExactAggregator : public Aggregator<T>
void update(T val) override
{
this->mu_.lock();
this->updated_ = true;
this->values_.push_back(val);
this->mu_.unlock();
}
Expand All @@ -74,6 +75,7 @@ class ExactAggregator : public Aggregator<T>
void checkpoint() override
{
this->mu_.lock();
this->updated_ = false;
if (quant_estimation_)
{
std::sort(this->values_.begin(), this->values_.end());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class GaugeAggregator : public Aggregator<T>
void update(T val) override
{
this->mu_.lock();
this->updated_ = true;
this->values_[0] = val;
current_timestamp_ = core::SystemTimestamp(std::chrono::system_clock::now());
this->mu_.unlock();
Expand All @@ -75,6 +76,7 @@ class GaugeAggregator : public Aggregator<T>
{
this->mu_.lock();

this->updated_ = false;
this->checkpoint_ = this->values_;

// Reset the values to default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ class HistogramAggregator final : public Aggregator<T>
void update(T val) override
{
this->mu_.lock();
int bucketID = boundaries_.size();
this->updated_ = true;
int bucketID = boundaries_.size();
for (size_t i = 0; i < boundaries_.size(); i++)
{
if (val < boundaries_[i]) // concurrent read is thread-safe
Expand Down Expand Up @@ -93,6 +94,7 @@ class HistogramAggregator final : public Aggregator<T>
void checkpoint() override
{
this->mu_.lock();
this->updated_ = false;
this->checkpoint_ = this->values_;
this->values_[0] = 0;
this->values_[1] = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class MinMaxSumCountAggregator : public Aggregator<T>
void update(T val) override
{
this->mu_.lock();
this->updated_ = true;

if (this->values_[CountValueIndex] == 0 || val < this->values_[MinValueIndex]) // set min
this->values_[MinValueIndex] = val;
Expand All @@ -80,6 +81,7 @@ class MinMaxSumCountAggregator : public Aggregator<T>
void checkpoint() override
{
this->mu_.lock();
this->updated_ = false;
this->checkpoint_ = this->values_;
// Reset the values
this->values_[MinValueIndex] = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class SketchAggregator final : public Aggregator<T>
void update(T val) override
{
this->mu_.lock();
this->updated_ = true;
int idx;
if (val == 0)
{
Expand Down Expand Up @@ -130,6 +131,7 @@ class SketchAggregator final : public Aggregator<T>
void checkpoint() override
{
this->mu_.lock();
this->updated_ = false;
this->checkpoint_ = this->values_;
checkpoint_raw_ = raw_;
this->values_[0] = 0;
Expand Down
21 changes: 15 additions & 6 deletions sdk/include/opentelemetry/sdk/metrics/sync_instruments.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,11 @@ class Counter final : public SynchronousInstrument<T>, public metrics_api::Count
toDelete.push_back(x.first);
}
auto agg_ptr = dynamic_cast<BoundCounter<T> *>(x.second.get())->GetAggregator();
agg_ptr->checkpoint();
ret.push_back(Record(x.second->GetName(), x.second->GetDescription(), x.first, agg_ptr));
if (agg_ptr->is_updated())
{
agg_ptr->checkpoint();
ret.push_back(Record(x.second->GetName(), x.second->GetDescription(), x.first, agg_ptr));
}
}
for (const auto &x : toDelete)
{
Expand Down Expand Up @@ -273,8 +276,11 @@ class UpDownCounter final : public SynchronousInstrument<T>, public metrics_api:
toDelete.push_back(x.first);
}
auto agg_ptr = dynamic_cast<BoundUpDownCounter<T> *>(x.second.get())->GetAggregator();
agg_ptr->checkpoint();
ret.push_back(Record(x.second->GetName(), x.second->GetDescription(), x.first, agg_ptr));
if (agg_ptr->is_updated())
{
agg_ptr->checkpoint();
ret.push_back(Record(x.second->GetName(), x.second->GetDescription(), x.first, agg_ptr));
}
}
for (const auto &x : toDelete)
{
Expand Down Expand Up @@ -396,8 +402,11 @@ class ValueRecorder final : public SynchronousInstrument<T>, public metrics_api:
toDelete.push_back(x.first);
}
auto agg_ptr = dynamic_cast<BoundValueRecorder<T> *>(x.second.get())->GetAggregator();
agg_ptr->checkpoint();
ret.push_back(Record(x.second->GetName(), x.second->GetDescription(), x.first, agg_ptr));
if (agg_ptr->is_updated())
{
agg_ptr->checkpoint();
ret.push_back(Record(x.second->GetName(), x.second->GetDescription(), x.first, agg_ptr));
}
}
for (const auto &x : toDelete)
{
Expand Down
30 changes: 29 additions & 1 deletion sdk/test/metrics/metric_instrument_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ TEST(Counter, getAggsandnewupdate)

auto labelkv = trace::KeyValueIterableView<decltype(labels)>{labels};
auto beta = alpha.bindCounter(labelkv);
beta->add(1);
beta->unbind();

EXPECT_EQ(alpha.boundInstruments_[KvToString(labelkv)]->get_ref(), 0);
Expand Down Expand Up @@ -435,7 +436,34 @@ TEST(IntValueRecorder, StressRecord)
125); // count
}

TEST(Instruments, NoUpdateNoRecord)
{
// This test verifies that instruments that have received no updates
// in the last collection period are not made into records for export.

Counter<int> alpha("alpha", "no description", "unitless", true);

std::map<std::string, std::string> labels = {{"key", "value"}};

auto labelkv = trace::KeyValueIterableView<decltype(labels)>{labels};

EXPECT_EQ(alpha.GetRecords().size(), 0);
alpha.add(1, labelkv);
EXPECT_EQ(alpha.GetRecords().size(), 1);

UpDownCounter<int> beta("beta", "no description", "unitless", true);

EXPECT_EQ(beta.GetRecords().size(), 0);
beta.add(1, labelkv);
EXPECT_EQ(beta.GetRecords().size(), 1);

ValueRecorder<int> gamma("gamma", "no description", "unitless", true);

EXPECT_EQ(gamma.GetRecords().size(), 0);
gamma.record(1, labelkv);
EXPECT_EQ(gamma.GetRecords().size(), 1);
}

} // namespace metrics
} // namespace sdk

OPENTELEMETRY_END_NAMESPACE