From f886b09a50b4b7f61801e8bc27c4c54c9683415c Mon Sep 17 00:00:00 2001 From: Ankit Bhargava Date: Mon, 20 Jul 2020 12:04:17 -0400 Subject: [PATCH 1/8] tests --- sdk/test/metrics/metric_instrument_test.cc | 148 ++++++++++++++++++++- 1 file changed, 146 insertions(+), 2 deletions(-) diff --git a/sdk/test/metrics/metric_instrument_test.cc b/sdk/test/metrics/metric_instrument_test.cc index f1bb23b73f..6a51250602 100644 --- a/sdk/test/metrics/metric_instrument_test.cc +++ b/sdk/test/metrics/metric_instrument_test.cc @@ -1,10 +1,11 @@ -#include -#include "opentelemetry/sdk/metrics/sync_instruments.h" #include +#include #include #include #include +#include "opentelemetry/sdk/metrics/async_instruments.h" +#include "opentelemetry/sdk/metrics/sync_instruments.h" #include #include @@ -260,6 +261,149 @@ TEST(IntValueRecorder, StressRecord) 125); // count } +void ObserverConstructorCallback(metrics_api::ObserverResult result){ + std::map labels = {{"key", "value"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + result.observe(1,labelkv); +} + +TEST(ApiSdkConversion, async){ + nostd::shared_ptr> alpha = nostd::shared_ptr>(new ValueObserver("ankit","none","unitles",true, &ObserverConstructorCallback)); + + std::map labels = {{"key587", "value264"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + + alpha->observe(123456,labelkv); + EXPECT_EQ(dynamic_cast*>(alpha.get())->GetRecords()[0].GetLabels(),"{\"key587\":\"value264\"}"); + + alpha->observe(123456,labelkv); + AggregatorVariant canCollect = dynamic_cast*>(alpha.get())->GetRecords()[0].GetAggregator(); + EXPECT_EQ(nostd::holds_alternative>>(canCollect), false); + EXPECT_EQ(nostd::holds_alternative>>(canCollect), true); + EXPECT_EQ(nostd::get>>(canCollect)->get_checkpoint()[0], 123456); +} + +TEST(IntValueObserver, InstrumentFunctions) +{ + ValueObserver alpha("enabled", "no description", "unitless", true, &ObserverConstructorCallback); + std::map labels = {{"key", "value"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + + EXPECT_EQ(alpha.GetName(), "enabled"); + EXPECT_EQ(alpha.GetDescription(), "no description"); + EXPECT_EQ(alpha.GetUnits(), "unitless"); + EXPECT_EQ(alpha.IsEnabled(), true); + EXPECT_EQ(alpha.GetKind(), metrics_api::InstrumentKind::ValueObserver); + + alpha.run(); + EXPECT_EQ(alpha.boundAggregators_[KvToString(labelkv)]->get_values()[0], 1); // min +} + +void ObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ + for (int i=0; iobserve(i, labels); + } +} + +void NegObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ + for (int i=0; iobserve(-i, labels); + } +} + +TEST(IntValueObserver, StressObserve) +{ + std::shared_ptr> alpha(new ValueObserver("enabled", "no description", "unitless", true, &ObserverConstructorCallback)); + + std::map labels = {{"key", "value"}}; + std::map labels1 = {{"key1", "value1"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + auto labelkv1 = trace::KeyValueIterableView{labels1}; + + std::thread first (ObserverCallback, alpha, 25, labelkv); // spawn new threads that call the callback + std::thread second (ObserverCallback, alpha, 50, labelkv); + std::thread third (ObserverCallback, alpha, 25, labelkv1); + std::thread fourth (NegObserverCallback, alpha, 100, labelkv1); // negative values + + first.join(); + second.join(); + third.join(); + fourth.join(); + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 0); // min + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[1], 49); // max + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[2], 1525); // sum + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[3], 75); // count + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], -99); // min + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[1], 24); // max + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[2], -4650); // sum + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[3], 125); // count +} + +void SumObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ + for (int i=0; iobserve(1, labels); + } +} + +TEST(IntSumObserver, StressObserve) +{ + std::shared_ptr> alpha(new SumObserver("test", "none", "unitless", true, &ObserverConstructorCallback)); + + std::map labels = {{"key", "value"}}; + std::map labels1 = {{"key1", "value1"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + auto labelkv1 = trace::KeyValueIterableView{labels1}; + + std::thread first (SumObserverCallback, alpha, 100000, labelkv); + std::thread second (SumObserverCallback, alpha, 100000, labelkv); + std::thread third (SumObserverCallback, alpha, 300000, labelkv1); + + first.join(); + second.join(); + third.join(); + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 200000); + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], 300000); +} + + +void UpDownSumObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ + for (int i=0; iobserve(1, labels); + } +} + +void NegUpDownSumObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ + for (int i=0; iobserve(-1, labels); + } +} + +TEST(IntUpDownObserver, StressAdd){ + std::shared_ptr> alpha(new UpDownSumObserver("test", "none", "unitless", true, &ObserverConstructorCallback)); + + std::map labels = {{"key", "value"}}; + std::map labels1 = {{"key1", "value1"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + auto labelkv1 = trace::KeyValueIterableView{labels1}; + + std::thread first (UpDownSumObserverCallback, alpha, 123400, labelkv); // spawn new threads that call the callback + std::thread second (UpDownSumObserverCallback, alpha, 123400, labelkv); + std::thread third (UpDownSumObserverCallback, alpha, 567800, labelkv1); + std::thread fourth (NegUpDownSumObserverCallback, alpha, 123400, labelkv1); // negative values + + first.join(); + second.join(); + third.join(); + fourth.join(); + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 123400*2); + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], 567800-123400); +} + } // namespace metrics } // namespace sdk + OPENTELEMETRY_END_NAMESPACE From d7763fc1098d04ff941fde8068a058f0162ff6fa Mon Sep 17 00:00:00 2001 From: Ankit Bhargava Date: Mon, 20 Jul 2020 12:45:47 -0400 Subject: [PATCH 2/8] instruments --- .../sdk/metrics/async_instruments.h | 190 ++++++++++++++++++ .../sdk/metrics/observer_result.h | 42 ++++ 2 files changed, 232 insertions(+) create mode 100644 sdk/include/opentelemetry/sdk/metrics/async_instruments.h create mode 100644 sdk/include/opentelemetry/sdk/metrics/observer_result.h diff --git a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h new file mode 100644 index 0000000000..10e15a3bfb --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h @@ -0,0 +1,190 @@ +#pragma once + +#include +#include "opentelemetry/sdk/metrics/instrument.h" +#include "opentelemetry/sdk/metrics/observer_result.h" + + + namespace metrics_api = opentelemetry::metrics; + + OPENTELEMETRY_BEGIN_NAMESPACE + namespace sdk + { + namespace metrics + { + + template + class ValueObserver : public AsynchronousInstrument + { + + public: + ValueObserver() = default; + + ValueObserver(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(ObserverResult)): + AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::ValueObserver) + {} + + /* + * Updates the instruments aggregator with the new value. The labels should + * contain the keys and values to be associated with this value. + * + * @param value is the numerical representation of the metric being captured + * @param labels the set of labels, as key-value pairs + */ + virtual void observe(T value, const std::map &labels) override { + this->mu_.lock(); + std::string labelset = mapToString(labels); + if (boundAggregators_.find(labelset) == boundAggregators_.end()) + { + auto sp1 = std::shared_ptr>(new MinMaxSumCountAggregator(this->kind_)); + boundAggregators_[labelset]=sp1; + sp1->update(value); + } + else + { + boundAggregators_[labelset]->update(value); + } + this->mu_.unlock(); + } + + /* + * Activate the intsrument's callback function to record a measurement. This + * function will be called by the specified controller at a regular interval. + * + * @param none + * @return none + */ + virtual void run(){ + ObserverResult res(*this); + this->callback_(res); + } + + // Public mapping from labels (stored as strings) to their respective aggregators + std::unordered_map>> boundAggregators_; + }; + + template + class SumObserver : public AsynchronousInstrument + { + + public: + SumObserver() = default; + + SumObserver(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(ObserverResult)): + AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::SumObserver) + {} + + /* + * Updates the instruments aggregator with the new value. The labels should + * contain the keys and values to be associated with this value. + * + * @param value is the numerical representation of the metric being captured + * @param labels the set of labels, as key-value pairs + */ + virtual void observe(T value, const std::map &labels) override { + this->mu_.lock(); + std::string labelset = mapToString(labels); + if (boundAggregators_.find(labelset) == boundAggregators_.end()) + { + auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); + boundAggregators_[labelset]=sp1; + if (value < 0){ + throw std::invalid_argument("Counter instrument updates must be non-negative."); + } else { + sp1->update(value); + } + } + else + { + if (value < 0){ + throw std::invalid_argument("Counter instrument updates must be non-negative."); + } else { + boundAggregators_[labelset]->update(value); + } + } + this->mu_.unlock(); + } + + /* + * Activate the intsrument's callback function to record a measurement. This + * function will be called by the specified controller at a regular interval. + * + * @param none + * @return none + */ + virtual void run(){ + ObserverResult res(*this); + this->callback_(res); + } + + // Public mapping from labels (stored as strings) to their respective aggregators + std::unordered_map>> boundAggregators_; + }; + + template + class UpDownSumObserver : public AsynchronousInstrument + { + + public: + UpDownSumObserver() = default; + + UpDownSumObserver(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(ObserverResult)): + AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::UpDownSumObserver) + {} + + /* + * Updates the instruments aggregator with the new value. The labels should + * contain the keys and values to be associated with this value. + * + * @param value is the numerical representation of the metric being captured + * @param labels the set of labels, as key-value pairs + */ + virtual void observe(T value, const std::map &labels) override { + this->mu_.lock(); + std::string labelset = mapToString(labels); + if (boundAggregators_.find(labelset) == boundAggregators_.end()) + { + auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); + boundAggregators_[labelset]=sp1; + sp1->update(value); + + } + else + { + boundAggregators_[labelset]->update(value); + } + this->mu_.unlock(); + } + + /* + * Activate the intsrument's callback function to record a measurement. This + * function will be called by the specified controller at a regular interval. + * + * @param none + * @return none + */ + virtual void run(){ + ObserverResult res(*this); + this->callback_(res); + } + + // Public mapping from labels (stored as strings) to their respective aggregators + std::unordered_map>> boundAggregators_; + }; + + } + } + OPENTELEMETRY_END_NAMESPACE + diff --git a/sdk/include/opentelemetry/sdk/metrics/observer_result.h b/sdk/include/opentelemetry/sdk/metrics/observer_result.h new file mode 100644 index 0000000000..939b971515 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/metrics/observer_result.h @@ -0,0 +1,42 @@ +#pragma once + +#include "opentelemetry/sdk/metrics/instrument.h" +#include "opentelemetry/metrics/observer_result.h" + + + namespace metrics_api = opentelemetry::metrics; + + OPENTELEMETRY_BEGIN_NAMESPACE + namespace sdk + { + namespace metrics + { + + /** + * ObserverResult class is used in the callback recording of asynchronous + * instruments. Callback functions for asynchronous instruments are designed to + * accept a single ObserverResult object and update using its pointer to the + * instrument itself. + */ + template + class ObserverResult + { + + public: + ObserverResult() = default; + + ObserverResult(AsynchronousInstrument & instrument): instrument_(instrument) {} + + void observe(T value, const std::map &labels) + { + instrument_.observe(value, labels); + } + + private: + AsynchronousInstrument & instrument_; + + }; + + } // namespace sdk + } // namespace metrics + OPENTELEMETRY_END_NAMESPACE From 9e126e98ff02159ea6cf3ae7a9b35115855d21cf Mon Sep 17 00:00:00 2001 From: Ankit Bhargava Date: Sun, 26 Jul 2020 22:02:54 -0400 Subject: [PATCH 3/8] revised asynchronous instrument --- .../sdk/metrics/async_instruments.h | 106 ++++--- .../sdk/metrics/observer_result.h | 42 --- sdk/test/metrics/metric_instrument_test.cc | 285 +++++++++--------- 3 files changed, 211 insertions(+), 222 deletions(-) delete mode 100644 sdk/include/opentelemetry/sdk/metrics/observer_result.h diff --git a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h index 10e15a3bfb..3b9ca55a5b 100644 --- a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h @@ -1,9 +1,13 @@ #pragma once -#include #include "opentelemetry/sdk/metrics/instrument.h" -#include "opentelemetry/sdk/metrics/observer_result.h" - +#include "opentelemetry/metrics/async_instruments.h" +#include "opentelemetry/version.h" +#include +#include +#include +#include +#include namespace metrics_api = opentelemetry::metrics; @@ -14,7 +18,7 @@ { template - class ValueObserver : public AsynchronousInstrument + class ValueObserver : public AsynchronousInstrument, virtual public metrics_api::ValueObserver { public: @@ -24,7 +28,7 @@ nostd::string_view description, nostd::string_view unit, bool enabled, - void (*callback)(ObserverResult)): + void (*callback)(metrics_api::ObserverResult)): AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::ValueObserver) {} @@ -35,13 +39,13 @@ * @param value is the numerical representation of the metric being captured * @param labels the set of labels, as key-value pairs */ - virtual void observe(T value, const std::map &labels) override { + virtual void observe(T value, const trace::KeyValueIterable &labels) override { this->mu_.lock(); - std::string labelset = mapToString(labels); + std::string labelset = KvToString(labels); if (boundAggregators_.find(labelset) == boundAggregators_.end()) { - auto sp1 = std::shared_ptr>(new MinMaxSumCountAggregator(this->kind_)); - boundAggregators_[labelset]=sp1; + auto sp1 = nostd::shared_ptr>(new MinMaxSumCountAggregator(this->kind_)); + boundAggregators_.insert(std::make_pair(labelset, sp1)); sp1->update(value); } else @@ -50,7 +54,7 @@ } this->mu_.unlock(); } - + /* * Activate the intsrument's callback function to record a measurement. This * function will be called by the specified controller at a regular interval. @@ -58,17 +62,27 @@ * @param none * @return none */ - virtual void run(){ - ObserverResult res(*this); + virtual void run() override { + metrics_api::ObserverResult res(this); this->callback_(res); } + + virtual std::vector GetRecords() override { + std::vector ret; + for (auto x : boundAggregators_){ + x.second->checkpoint(); + ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); + } + return ret; + } // Public mapping from labels (stored as strings) to their respective aggregators - std::unordered_map>> boundAggregators_; + std::unordered_map>> boundAggregators_; }; - + + template - class SumObserver : public AsynchronousInstrument + class SumObserver : public AsynchronousInstrument, virtual public metrics_api::SumObserver { public: @@ -78,7 +92,7 @@ nostd::string_view description, nostd::string_view unit, bool enabled, - void (*callback)(ObserverResult)): + void (*callback)(metrics_api::ObserverResult)): AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::SumObserver) {} @@ -89,13 +103,13 @@ * @param value is the numerical representation of the metric being captured * @param labels the set of labels, as key-value pairs */ - virtual void observe(T value, const std::map &labels) override { + virtual void observe(T value, const trace::KeyValueIterable &labels) override { this->mu_.lock(); - std::string labelset = mapToString(labels); + std::string labelset = KvToString(labels); if (boundAggregators_.find(labelset) == boundAggregators_.end()) { - auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); - boundAggregators_[labelset]=sp1; + auto sp1 = nostd::shared_ptr>(new CounterAggregator(this->kind_)); + boundAggregators_.insert(std::make_pair(labelset, sp1)); if (value < 0){ throw std::invalid_argument("Counter instrument updates must be non-negative."); } else { @@ -120,17 +134,26 @@ * @param none * @return none */ - virtual void run(){ - ObserverResult res(*this); + virtual void run() override { + metrics_api::ObserverResult res(this); this->callback_(res); } + + virtual std::vector GetRecords() override { + std::vector ret; + for (auto x : boundAggregators_){ + x.second->checkpoint(); + ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); + } + return ret; + } // Public mapping from labels (stored as strings) to their respective aggregators - std::unordered_map>> boundAggregators_; + std::unordered_map>> boundAggregators_; }; - + template - class UpDownSumObserver : public AsynchronousInstrument + class UpDownSumObserver : public AsynchronousInstrument, virtual public metrics_api::UpDownSumObserver { public: @@ -140,7 +163,7 @@ nostd::string_view description, nostd::string_view unit, bool enabled, - void (*callback)(ObserverResult)): + void (*callback)(metrics_api::ObserverResult)): AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::UpDownSumObserver) {} @@ -151,15 +174,14 @@ * @param value is the numerical representation of the metric being captured * @param labels the set of labels, as key-value pairs */ - virtual void observe(T value, const std::map &labels) override { + virtual void observe(T value, const trace::KeyValueIterable &labels) override { this->mu_.lock(); - std::string labelset = mapToString(labels); + std::string labelset = KvToString(labels); if (boundAggregators_.find(labelset) == boundAggregators_.end()) { - auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); - boundAggregators_[labelset]=sp1; + auto sp1 = nostd::shared_ptr>(new CounterAggregator(this->kind_)); + boundAggregators_.insert(std::make_pair(labelset, sp1)); sp1->update(value); - } else { @@ -175,16 +197,24 @@ * @param none * @return none */ - virtual void run(){ - ObserverResult res(*this); + virtual void run() override { + metrics_api::ObserverResult res(this); this->callback_(res); } + + virtual std::vector GetRecords() override { + std::vector ret; + for (auto x : boundAggregators_){ + x.second->checkpoint(); + ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); + } + return ret; + } // Public mapping from labels (stored as strings) to their respective aggregators - std::unordered_map>> boundAggregators_; + std::unordered_map>> boundAggregators_; }; - } - } - OPENTELEMETRY_END_NAMESPACE - +} +} +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/include/opentelemetry/sdk/metrics/observer_result.h b/sdk/include/opentelemetry/sdk/metrics/observer_result.h deleted file mode 100644 index 939b971515..0000000000 --- a/sdk/include/opentelemetry/sdk/metrics/observer_result.h +++ /dev/null @@ -1,42 +0,0 @@ -#pragma once - -#include "opentelemetry/sdk/metrics/instrument.h" -#include "opentelemetry/metrics/observer_result.h" - - - namespace metrics_api = opentelemetry::metrics; - - OPENTELEMETRY_BEGIN_NAMESPACE - namespace sdk - { - namespace metrics - { - - /** - * ObserverResult class is used in the callback recording of asynchronous - * instruments. Callback functions for asynchronous instruments are designed to - * accept a single ObserverResult object and update using its pointer to the - * instrument itself. - */ - template - class ObserverResult - { - - public: - ObserverResult() = default; - - ObserverResult(AsynchronousInstrument & instrument): instrument_(instrument) {} - - void observe(T value, const std::map &labels) - { - instrument_.observe(value, labels); - } - - private: - AsynchronousInstrument & instrument_; - - }; - - } // namespace sdk - } // namespace metrics - OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/metrics/metric_instrument_test.cc b/sdk/test/metrics/metric_instrument_test.cc index 6a51250602..d8d409b0fc 100644 --- a/sdk/test/metrics/metric_instrument_test.cc +++ b/sdk/test/metrics/metric_instrument_test.cc @@ -11,12 +11,155 @@ namespace metrics_api = opentelemetry::metrics; + OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk { namespace metrics { +void ObserverConstructorCallback(metrics_api::ObserverResult result){ + std::map labels = {{"key", "value"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + result.observe(1,labelkv); +} + +TEST(ApiSdkConversion, async){ + nostd::shared_ptr> alpha = nostd::shared_ptr>(new ValueObserver("ankit","none","unitles",true, &ObserverConstructorCallback)); + + std::map labels = {{"key587", "value264"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + + alpha->observe(123456,labelkv); + EXPECT_EQ(dynamic_cast*>(alpha.get())->GetRecords()[0].GetLabels(),"{\"key587\":\"value264\"}"); + + alpha->observe(123456,labelkv); + AggregatorVariant canCollect = dynamic_cast*>(alpha.get())->GetRecords()[0].GetAggregator(); + EXPECT_EQ(nostd::holds_alternative>>(canCollect), false); + EXPECT_EQ(nostd::holds_alternative>>(canCollect), true); + EXPECT_EQ(nostd::get>>(canCollect)->get_checkpoint()[0], 123456); +} + +TEST(IntValueObserver, InstrumentFunctions) +{ + ValueObserver alpha("enabled", "no description", "unitless", true, &ObserverConstructorCallback); + std::map labels = {{"key", "value"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + + EXPECT_EQ(alpha.GetName(), "enabled"); + EXPECT_EQ(alpha.GetDescription(), "no description"); + EXPECT_EQ(alpha.GetUnits(), "unitless"); + EXPECT_EQ(alpha.IsEnabled(), true); + EXPECT_EQ(alpha.GetKind(), metrics_api::InstrumentKind::ValueObserver); + + alpha.run(); + EXPECT_EQ(alpha.boundAggregators_[KvToString(labelkv)]->get_values()[0], 1); // min +} + +void ObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ + for (int i=0; iobserve(i, labels); + } +} + +void NegObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ + for (int i=0; iobserve(-i, labels); + } +} + +TEST(IntValueObserver, StressObserve) +{ + std::shared_ptr> alpha(new ValueObserver("enabled", "no description", "unitless", true, &ObserverConstructorCallback)); + + std::map labels = {{"key", "value"}}; + std::map labels1 = {{"key1", "value1"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + auto labelkv1 = trace::KeyValueIterableView{labels1}; + + std::thread first (ObserverCallback, alpha, 25, labelkv); // spawn new threads that call the callback + std::thread second (ObserverCallback, alpha, 50, labelkv); + std::thread third (ObserverCallback, alpha, 25, labelkv1); + std::thread fourth (NegObserverCallback, alpha, 100, labelkv1); // negative values + + first.join(); + second.join(); + third.join(); + fourth.join(); + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 0); // min + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[1], 49); // max + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[2], 1525); // sum + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[3], 75); // count + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], -99); // min + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[1], 24); // max + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[2], -4650); // sum + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[3], 125); // count +} + +void SumObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ + for (int i=0; iobserve(1, labels); + } +} + +TEST(IntSumObserver, StressObserve) +{ + std::shared_ptr> alpha(new SumObserver("test", "none", "unitless", true, &ObserverConstructorCallback)); + + std::map labels = {{"key", "value"}}; + std::map labels1 = {{"key1", "value1"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + auto labelkv1 = trace::KeyValueIterableView{labels1}; + + std::thread first (SumObserverCallback, alpha, 100000, labelkv); + std::thread second (SumObserverCallback, alpha, 100000, labelkv); + std::thread third (SumObserverCallback, alpha, 300000, labelkv1); + + first.join(); + second.join(); + third.join(); + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 200000); + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], 300000); +} + + +void UpDownSumObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ + for (int i=0; iobserve(1, labels); + } +} + +void NegUpDownSumObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ + for (int i=0; iobserve(-1, labels); + } +} + +TEST(IntUpDownObserver, StressAdd){ + std::shared_ptr> alpha(new UpDownSumObserver("test", "none", "unitless", true, &ObserverConstructorCallback)); + + std::map labels = {{"key", "value"}}; + std::map labels1 = {{"key1", "value1"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + auto labelkv1 = trace::KeyValueIterableView{labels1}; + + std::thread first (UpDownSumObserverCallback, alpha, 123400, labelkv); // spawn new threads that call the callback + std::thread second (UpDownSumObserverCallback, alpha, 123400, labelkv); + std::thread third (UpDownSumObserverCallback, alpha, 567800, labelkv1); + std::thread fourth (NegUpDownSumObserverCallback, alpha, 123400, labelkv1); // negative values + + first.join(); + second.join(); + third.join(); + fourth.join(); + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 123400*2); + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], 567800-123400); +} + TEST(Counter, InstrumentFunctions) { Counter alpha("enabled", "no description", "unitless", true); @@ -261,148 +404,6 @@ TEST(IntValueRecorder, StressRecord) 125); // count } -void ObserverConstructorCallback(metrics_api::ObserverResult result){ - std::map labels = {{"key", "value"}}; - auto labelkv = trace::KeyValueIterableView{labels}; - result.observe(1,labelkv); -} - -TEST(ApiSdkConversion, async){ - nostd::shared_ptr> alpha = nostd::shared_ptr>(new ValueObserver("ankit","none","unitles",true, &ObserverConstructorCallback)); - - std::map labels = {{"key587", "value264"}}; - auto labelkv = trace::KeyValueIterableView{labels}; - - alpha->observe(123456,labelkv); - EXPECT_EQ(dynamic_cast*>(alpha.get())->GetRecords()[0].GetLabels(),"{\"key587\":\"value264\"}"); - - alpha->observe(123456,labelkv); - AggregatorVariant canCollect = dynamic_cast*>(alpha.get())->GetRecords()[0].GetAggregator(); - EXPECT_EQ(nostd::holds_alternative>>(canCollect), false); - EXPECT_EQ(nostd::holds_alternative>>(canCollect), true); - EXPECT_EQ(nostd::get>>(canCollect)->get_checkpoint()[0], 123456); -} - -TEST(IntValueObserver, InstrumentFunctions) -{ - ValueObserver alpha("enabled", "no description", "unitless", true, &ObserverConstructorCallback); - std::map labels = {{"key", "value"}}; - auto labelkv = trace::KeyValueIterableView{labels}; - - EXPECT_EQ(alpha.GetName(), "enabled"); - EXPECT_EQ(alpha.GetDescription(), "no description"); - EXPECT_EQ(alpha.GetUnits(), "unitless"); - EXPECT_EQ(alpha.IsEnabled(), true); - EXPECT_EQ(alpha.GetKind(), metrics_api::InstrumentKind::ValueObserver); - - alpha.run(); - EXPECT_EQ(alpha.boundAggregators_[KvToString(labelkv)]->get_values()[0], 1); // min -} - -void ObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ - for (int i=0; iobserve(i, labels); - } -} - -void NegObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ - for (int i=0; iobserve(-i, labels); - } -} - -TEST(IntValueObserver, StressObserve) -{ - std::shared_ptr> alpha(new ValueObserver("enabled", "no description", "unitless", true, &ObserverConstructorCallback)); - - std::map labels = {{"key", "value"}}; - std::map labels1 = {{"key1", "value1"}}; - auto labelkv = trace::KeyValueIterableView{labels}; - auto labelkv1 = trace::KeyValueIterableView{labels1}; - - std::thread first (ObserverCallback, alpha, 25, labelkv); // spawn new threads that call the callback - std::thread second (ObserverCallback, alpha, 50, labelkv); - std::thread third (ObserverCallback, alpha, 25, labelkv1); - std::thread fourth (NegObserverCallback, alpha, 100, labelkv1); // negative values - - first.join(); - second.join(); - third.join(); - fourth.join(); - - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 0); // min - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[1], 49); // max - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[2], 1525); // sum - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[3], 75); // count - - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], -99); // min - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[1], 24); // max - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[2], -4650); // sum - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[3], 125); // count -} - -void SumObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ - for (int i=0; iobserve(1, labels); - } -} - -TEST(IntSumObserver, StressObserve) -{ - std::shared_ptr> alpha(new SumObserver("test", "none", "unitless", true, &ObserverConstructorCallback)); - - std::map labels = {{"key", "value"}}; - std::map labels1 = {{"key1", "value1"}}; - auto labelkv = trace::KeyValueIterableView{labels}; - auto labelkv1 = trace::KeyValueIterableView{labels1}; - - std::thread first (SumObserverCallback, alpha, 100000, labelkv); - std::thread second (SumObserverCallback, alpha, 100000, labelkv); - std::thread third (SumObserverCallback, alpha, 300000, labelkv1); - - first.join(); - second.join(); - third.join(); - - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 200000); - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], 300000); -} - - -void UpDownSumObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ - for (int i=0; iobserve(1, labels); - } -} - -void NegUpDownSumObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ - for (int i=0; iobserve(-1, labels); - } -} - -TEST(IntUpDownObserver, StressAdd){ - std::shared_ptr> alpha(new UpDownSumObserver("test", "none", "unitless", true, &ObserverConstructorCallback)); - - std::map labels = {{"key", "value"}}; - std::map labels1 = {{"key1", "value1"}}; - auto labelkv = trace::KeyValueIterableView{labels}; - auto labelkv1 = trace::KeyValueIterableView{labels1}; - - std::thread first (UpDownSumObserverCallback, alpha, 123400, labelkv); // spawn new threads that call the callback - std::thread second (UpDownSumObserverCallback, alpha, 123400, labelkv); - std::thread third (UpDownSumObserverCallback, alpha, 567800, labelkv1); - std::thread fourth (NegUpDownSumObserverCallback, alpha, 123400, labelkv1); // negative values - - first.join(); - second.join(); - third.join(); - fourth.join(); - - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 123400*2); - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], 567800-123400); -} - } // namespace metrics } // namespace sdk From 52d799310e45f3daec6c45f9a9b4fc191e7712f7 Mon Sep 17 00:00:00 2001 From: Ankit Bhargava Date: Sun, 26 Jul 2020 22:58:17 -0400 Subject: [PATCH 4/8] base class --- .../opentelemetry/sdk/metrics/instrument.h | 55 ++++++++++++++++--- 1 file changed, 48 insertions(+), 7 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/instrument.h b/sdk/include/opentelemetry/sdk/metrics/instrument.h index 3a3e525714..64c46820b2 100644 --- a/sdk/include/opentelemetry/sdk/metrics/instrument.h +++ b/sdk/include/opentelemetry/sdk/metrics/instrument.h @@ -1,20 +1,20 @@ #pragma once +#include +#include +#include #include "opentelemetry/metrics/instrument.h" #include "opentelemetry/sdk/metrics/aggregator/aggregator.h" #include "opentelemetry/sdk/metrics/record.h" #include "opentelemetry/version.h" - -#include -#include -#include #include #include #include #include + namespace metrics_api = opentelemetry::metrics; -namespace trace_api = opentelemetry::trace; +namespace trace_api = opentelemetry::trace; OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk @@ -237,6 +237,47 @@ inline std::string KvToString(const trace::KeyValueIterable &kv) noexcept return ss.str(); } -} // namespace metrics -} // namespace sdk +template +class AsynchronousInstrument : public Instrument, virtual public metrics_api::AsynchronousInstrument { + +public: + AsynchronousInstrument() = default; + + AsynchronousInstrument(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(metrics_api::ObserverResult), + metrics_api::InstrumentKind kind): + Instrument(name, description, unit, enabled, kind) + { + this->callback_ = callback; + } + + /** + * Captures data through a manual call rather than the automatic collection process instituted + * in the run function. Asynchronous instruments are generally expected to obtain data from + * their callbacks rather than direct calls. This function is used by the callback to store data. + * + * @param value is the numerical representation of the metric being captured + * @param labels is the numerical representation of the metric being captured + * @return none + */ + virtual void observe(T value, const trace::KeyValueIterable &labels) override = 0; + + virtual std::vector GetRecords() = 0; + + /** + * Captures data by activating the callback function associated with the + * instrument and storing its return value. Callbacks for asynchronous + * instruments are defined during construction. + * + * @param none + * @return none + */ + virtual void run() override = 0; +}; + +} // namespace metrics +} // namespace sdk OPENTELEMETRY_END_NAMESPACE From 865568e93dddff117ba26d5e91cc35e8bbe34c03 Mon Sep 17 00:00:00 2001 From: Ankit Bhargava Date: Sun, 2 Aug 2020 23:34:55 -0400 Subject: [PATCH 5/8] use std shared_ptr internally --- .../sdk/metrics/async_instruments.h | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h index 3b9ca55a5b..d8ed9ebb5f 100644 --- a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h @@ -3,6 +3,8 @@ #include "opentelemetry/sdk/metrics/instrument.h" #include "opentelemetry/metrics/async_instruments.h" #include "opentelemetry/version.h" +#include "opentelemetry/sdk/metrics/aggregator/counter_aggregator.h" +#include "opentelemetry/sdk/metrics/aggregator/min_max_sum_count_aggregator.h" #include #include #include @@ -44,7 +46,7 @@ std::string labelset = KvToString(labels); if (boundAggregators_.find(labelset) == boundAggregators_.end()) { - auto sp1 = nostd::shared_ptr>(new MinMaxSumCountAggregator(this->kind_)); + auto sp1 = std::shared_ptr>(new MinMaxSumCountAggregator(this->kind_)); boundAggregators_.insert(std::make_pair(labelset, sp1)); sp1->update(value); } @@ -73,11 +75,12 @@ x.second->checkpoint(); ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); } + boundAggregators_.clear(); return ret; } // Public mapping from labels (stored as strings) to their respective aggregators - std::unordered_map>> boundAggregators_; + std::unordered_map>> boundAggregators_; }; @@ -108,7 +111,7 @@ std::string labelset = KvToString(labels); if (boundAggregators_.find(labelset) == boundAggregators_.end()) { - auto sp1 = nostd::shared_ptr>(new CounterAggregator(this->kind_)); + auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); boundAggregators_.insert(std::make_pair(labelset, sp1)); if (value < 0){ throw std::invalid_argument("Counter instrument updates must be non-negative."); @@ -145,11 +148,12 @@ x.second->checkpoint(); ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); } + boundAggregators_.clear(); return ret; } // Public mapping from labels (stored as strings) to their respective aggregators - std::unordered_map>> boundAggregators_; + std::unordered_map>> boundAggregators_; }; template @@ -179,7 +183,7 @@ std::string labelset = KvToString(labels); if (boundAggregators_.find(labelset) == boundAggregators_.end()) { - auto sp1 = nostd::shared_ptr>(new CounterAggregator(this->kind_)); + auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); boundAggregators_.insert(std::make_pair(labelset, sp1)); sp1->update(value); } @@ -208,11 +212,12 @@ x.second->checkpoint(); ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); } + boundAggregators_.clear() return ret; } // Public mapping from labels (stored as strings) to their respective aggregators - std::unordered_map>> boundAggregators_; + std::unordered_map>> boundAggregators_; }; } From 3ead98c8a1c727b4cd40493ac3788d70a3d73315 Mon Sep 17 00:00:00 2001 From: Ankit Bhargava Date: Tue, 4 Aug 2020 10:04:14 -0400 Subject: [PATCH 6/8] rebasing --- .../sdk/metrics/async_instruments.h | 418 +++++++++--------- .../opentelemetry/sdk/metrics/instrument.h | 392 ++++++++-------- sdk/test/metrics/metric_instrument_test.cc | 6 +- 3 files changed, 412 insertions(+), 404 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h index d8ed9ebb5f..4e0207a44b 100644 --- a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h @@ -11,214 +11,222 @@ #include #include - namespace metrics_api = opentelemetry::metrics; - - OPENTELEMETRY_BEGIN_NAMESPACE - namespace sdk - { - namespace metrics - { - - template - class ValueObserver : public AsynchronousInstrument, virtual public metrics_api::ValueObserver - { - - public: - ValueObserver() = default; - - ValueObserver(nostd::string_view name, - nostd::string_view description, - nostd::string_view unit, - bool enabled, - void (*callback)(metrics_api::ObserverResult)): - AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::ValueObserver) - {} - - /* - * Updates the instruments aggregator with the new value. The labels should - * contain the keys and values to be associated with this value. - * - * @param value is the numerical representation of the metric being captured - * @param labels the set of labels, as key-value pairs - */ - virtual void observe(T value, const trace::KeyValueIterable &labels) override { - this->mu_.lock(); - std::string labelset = KvToString(labels); - if (boundAggregators_.find(labelset) == boundAggregators_.end()) - { - auto sp1 = std::shared_ptr>(new MinMaxSumCountAggregator(this->kind_)); - boundAggregators_.insert(std::make_pair(labelset, sp1)); - sp1->update(value); - } - else - { - boundAggregators_[labelset]->update(value); - } - this->mu_.unlock(); - } - - /* - * Activate the intsrument's callback function to record a measurement. This - * function will be called by the specified controller at a regular interval. - * - * @param none - * @return none - */ - virtual void run() override { - metrics_api::ObserverResult res(this); - this->callback_(res); - } - - virtual std::vector GetRecords() override { - std::vector ret; - for (auto x : boundAggregators_){ - x.second->checkpoint(); - ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); - } - boundAggregators_.clear(); - return ret; - } - +namespace metrics_api = opentelemetry::metrics; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace metrics +{ + +template +class ValueObserver : public AsynchronousInstrument, virtual public metrics_api::ValueObserver +{ + +public: + ValueObserver() = default; + + ValueObserver(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(metrics_api::ObserverResult)): + AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::ValueObserver) + {} + + /* + * Updates the instruments aggregator with the new value. The labels should + * contain the keys and values to be associated with this value. + * + * @param value is the numerical representation of the metric being captured + * @param labels the set of labels, as key-value pairs + */ + virtual void observe(T value, const trace::KeyValueIterable &labels) override { + this->mu_.lock(); + std::string labelset = KvToString(labels); + if (boundAggregators_.find(labelset) == boundAggregators_.end()) + { + auto sp1 = std::shared_ptr>(new MinMaxSumCountAggregator(this->kind_)); + boundAggregators_.insert(std::make_pair(labelset, sp1)); + sp1->update(value); + } + else + { + boundAggregators_[labelset]->update(value); + } + this->mu_.unlock(); + } + + /* + * Activate the instrument's callback function to record a measurement. This + * function will be called by the specified controller at a regular interval. + * + * @param none + * @return none + */ + virtual void run() override { + metrics_api::ObserverResult res(this); + this->callback_(res); + } + + virtual std::vector GetRecords() override { + std::vector ret; + for (auto x : boundAggregators_){ + x.second->checkpoint(); + ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); + } + boundAggregators_.clear(); + return ret; + } + // Public mapping from labels (stored as strings) to their respective aggregators - std::unordered_map>> boundAggregators_; - }; - - - template - class SumObserver : public AsynchronousInstrument, virtual public metrics_api::SumObserver - { - - public: - SumObserver() = default; - - SumObserver(nostd::string_view name, - nostd::string_view description, - nostd::string_view unit, - bool enabled, - void (*callback)(metrics_api::ObserverResult)): - AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::SumObserver) - {} - - /* - * Updates the instruments aggregator with the new value. The labels should - * contain the keys and values to be associated with this value. - * - * @param value is the numerical representation of the metric being captured - * @param labels the set of labels, as key-value pairs - */ - virtual void observe(T value, const trace::KeyValueIterable &labels) override { - this->mu_.lock(); - std::string labelset = KvToString(labels); - if (boundAggregators_.find(labelset) == boundAggregators_.end()) - { - auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); - boundAggregators_.insert(std::make_pair(labelset, sp1)); - if (value < 0){ - throw std::invalid_argument("Counter instrument updates must be non-negative."); - } else { - sp1->update(value); - } - } - else - { - if (value < 0){ - throw std::invalid_argument("Counter instrument updates must be non-negative."); - } else { - boundAggregators_[labelset]->update(value); - } - } - this->mu_.unlock(); - } - - /* - * Activate the intsrument's callback function to record a measurement. This - * function will be called by the specified controller at a regular interval. - * - * @param none - * @return none - */ - virtual void run() override { - metrics_api::ObserverResult res(this); - this->callback_(res); - } - - virtual std::vector GetRecords() override { - std::vector ret; - for (auto x : boundAggregators_){ - x.second->checkpoint(); - ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); - } - boundAggregators_.clear(); - return ret; - } - + std::unordered_map>> boundAggregators_; +}; + + +template +class SumObserver : public AsynchronousInstrument, virtual public metrics_api::SumObserver +{ + +public: + SumObserver() = default; + + SumObserver(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(metrics_api::ObserverResult)): + AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::SumObserver) + {} + + /* + * Updates the instruments aggregator with the new value. The labels should + * contain the keys and values to be associated with this value. + * + * @param value is the numerical representation of the metric being captured + * @param labels the set of labels, as key-value pairs + */ + virtual void observe(T value, const trace::KeyValueIterable &labels) override { + this->mu_.lock(); + std::string labelset = KvToString(labels); + if (boundAggregators_.find(labelset) == boundAggregators_.end()) + { + auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); + boundAggregators_.insert(std::make_pair(labelset, sp1)); + if (value < 0){ + #if __EXCEPTIONS + throw std::invalid_argument("Counter instrument updates must be non-negative."); + #else + std::terminate(); + #endif + } else { + sp1->update(value); + } + } + else + { + if (value < 0){ + #if __EXCEPTIONS + throw std::invalid_argument("Counter instrument updates must be non-negative."); + #else + std::terminate(); + #endif + } else { + boundAggregators_[labelset]->update(value); + } + } + this->mu_.unlock(); + } + + /* + * Activate the intsrument's callback function to record a measurement. This + * function will be called by the specified controller at a regular interval. + * + * @param none + * @return none + */ + virtual void run() override { + metrics_api::ObserverResult res(this); + this->callback_(res); + } + + virtual std::vector GetRecords() override { + std::vector ret; + for (auto x : boundAggregators_){ + x.second->checkpoint(); + ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); + } + boundAggregators_.clear(); + return ret; + } + // Public mapping from labels (stored as strings) to their respective aggregators - std::unordered_map>> boundAggregators_; - }; - - template - class UpDownSumObserver : public AsynchronousInstrument, virtual public metrics_api::UpDownSumObserver - { - - public: - UpDownSumObserver() = default; - - UpDownSumObserver(nostd::string_view name, - nostd::string_view description, - nostd::string_view unit, - bool enabled, - void (*callback)(metrics_api::ObserverResult)): - AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::UpDownSumObserver) - {} - - /* - * Updates the instruments aggregator with the new value. The labels should - * contain the keys and values to be associated with this value. - * - * @param value is the numerical representation of the metric being captured - * @param labels the set of labels, as key-value pairs - */ - virtual void observe(T value, const trace::KeyValueIterable &labels) override { - this->mu_.lock(); - std::string labelset = KvToString(labels); - if (boundAggregators_.find(labelset) == boundAggregators_.end()) - { - auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); - boundAggregators_.insert(std::make_pair(labelset, sp1)); - sp1->update(value); - } - else - { - boundAggregators_[labelset]->update(value); - } - this->mu_.unlock(); - } - - /* - * Activate the intsrument's callback function to record a measurement. This - * function will be called by the specified controller at a regular interval. - * - * @param none - * @return none - */ - virtual void run() override { - metrics_api::ObserverResult res(this); - this->callback_(res); - } - - virtual std::vector GetRecords() override { - std::vector ret; - for (auto x : boundAggregators_){ - x.second->checkpoint(); - ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); - } - boundAggregators_.clear() - return ret; - } - + std::unordered_map>> boundAggregators_; +}; + +template +class UpDownSumObserver : public AsynchronousInstrument, virtual public metrics_api::UpDownSumObserver +{ + +public: + UpDownSumObserver() = default; + + UpDownSumObserver(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(metrics_api::ObserverResult)): + AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::UpDownSumObserver) + {} + + /* + * Updates the instruments aggregator with the new value. The labels should + * contain the keys and values to be associated with this value. + * + * @param value is the numerical representation of the metric being captured + * @param labels the set of labels, as key-value pairs + */ + virtual void observe(T value, const trace::KeyValueIterable &labels) override { + this->mu_.lock(); + std::string labelset = KvToString(labels); + if (boundAggregators_.find(labelset) == boundAggregators_.end()) + { + auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); + boundAggregators_.insert(std::make_pair(labelset, sp1)); + sp1->update(value); + } + else + { + boundAggregators_[labelset]->update(value); + } + this->mu_.unlock(); + } + + /* + * Activate the intsrument's callback function to record a measurement. This + * function will be called by the specified controller at a regular interval. + * + * @param none + * @return none + */ + virtual void run() override { + metrics_api::ObserverResult res(this); + this->callback_(res); + } + + virtual std::vector GetRecords() override { + std::vector ret; + for (auto x : boundAggregators_){ + x.second->checkpoint(); + ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); + } + boundAggregators_.clear(); + return ret; + } + // Public mapping from labels (stored as strings) to their respective aggregators - std::unordered_map>> boundAggregators_; - }; + std::unordered_map>> boundAggregators_; +}; } } diff --git a/sdk/include/opentelemetry/sdk/metrics/instrument.h b/sdk/include/opentelemetry/sdk/metrics/instrument.h index 64c46820b2..daf32351c9 100644 --- a/sdk/include/opentelemetry/sdk/metrics/instrument.h +++ b/sdk/include/opentelemetry/sdk/metrics/instrument.h @@ -24,219 +24,159 @@ namespace metrics class Instrument : virtual public metrics_api::Instrument { - + public: - Instrument() = default; - - Instrument(nostd::string_view name, - nostd::string_view description, - nostd::string_view unit, - bool enabled, - metrics_api::InstrumentKind kind) - : name_(name), description_(description), unit_(unit), enabled_(enabled), kind_(kind) - {} - - // Returns true if the instrument is enabled and collecting data - virtual bool IsEnabled() override { return enabled_; } - - // Return the instrument name - virtual nostd::string_view GetName() override { return name_; } - - // Return the instrument description - virtual nostd::string_view GetDescription() override { return description_; } - - // Return the insrument's units of measurement - virtual nostd::string_view GetUnits() override { return unit_; } - - virtual metrics_api::InstrumentKind GetKind() override { return this->kind_; } - + Instrument() = default; + + Instrument(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + metrics_api::InstrumentKind kind) + : name_(name), description_(description), unit_(unit), enabled_(enabled), kind_(kind) + {} + + // Returns true if the instrument is enabled and collecting data + virtual bool IsEnabled() override { return enabled_; } + + // Return the instrument name + virtual nostd::string_view GetName() override { return name_; } + + // Return the instrument description + virtual nostd::string_view GetDescription() override { return description_; } + + // Return the insrument's units of measurement + virtual nostd::string_view GetUnits() override { return unit_; } + + virtual metrics_api::InstrumentKind GetKind() override { return this->kind_; } + protected: - std::string name_; - std::string description_; - std::string unit_; - bool enabled_; - std::mutex mu_; - metrics_api::InstrumentKind kind_; + std::string name_; + std::string description_; + std::string unit_; + bool enabled_; + std::mutex mu_; + metrics_api::InstrumentKind kind_; }; template class BoundSynchronousInstrument : public Instrument, - virtual public metrics_api::BoundSynchronousInstrument +virtual public metrics_api::BoundSynchronousInstrument { - + public: - BoundSynchronousInstrument() = default; - - BoundSynchronousInstrument(nostd::string_view name, - nostd::string_view description, - nostd::string_view unit, - bool enabled, - metrics_api::InstrumentKind kind, - std::shared_ptr> agg) - : Instrument(name, description, unit, enabled, kind), agg_(agg) - { - this->inc_ref(); // increase reference count when instantiated - } - - /** - * Frees the resources associated with this Bound Instrument. - * The Metric from which this instrument was created is not impacted. - * - * @param none - * @return void - */ - virtual void unbind() override { ref_ -= 1; } - - /** - * Increments the reference count. This function is used when binding or instantiating. - * - * @param none - * @return void - */ - virtual void inc_ref() override { ref_ += 1; } - - /** - * Returns the current reference count of the instrument. This value is used to - * later in the pipeline remove stale instruments. - * - * @param none - * @return current ref count of the instrument - */ - virtual int get_ref() override { return ref_; } - - /** - * Records a single synchronous metric event via a call to the aggregator. - * Since this is a bound synchronous instrument, labels are not required in - * metric capture calls. - * - * @param value is the numerical representation of the metric being captured - * @return void - */ - virtual void update(T value) override - { - this->mu_.lock(); - agg_->update(value); - this->mu_.unlock(); - } - - /** - * Returns the aggregator responsible for meaningfully combining update values. - * - * @param none - * @return the aggregator assigned to this instrument - */ - virtual std::shared_ptr> GetAggregator() final { return agg_; } - + BoundSynchronousInstrument() = default; + + BoundSynchronousInstrument(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + metrics_api::InstrumentKind kind, + std::shared_ptr> agg) + : Instrument(name, description, unit, enabled, kind), agg_(agg) + { + this->inc_ref(); // increase reference count when instantiated + } + + /** + * Frees the resources associated with this Bound Instrument. + * The Metric from which this instrument was created is not impacted. + * + * @param none + * @return void + */ + virtual void unbind() override { ref_ -= 1; } + + /** + * Increments the reference count. This function is used when binding or instantiating. + * + * @param none + * @return void + */ + virtual void inc_ref() override { ref_ += 1; } + + /** + * Returns the current reference count of the instrument. This value is used to + * later in the pipeline remove stale instruments. + * + * @param none + * @return current ref count of the instrument + */ + virtual int get_ref() override { return ref_; } + + /** + * Records a single synchronous metric event via a call to the aggregator. + * Since this is a bound synchronous instrument, labels are not required in + * metric capture calls. + * + * @param value is the numerical representation of the metric being captured + * @return void + */ + virtual void update(T value) override + { + this->mu_.lock(); + agg_->update(value); + this->mu_.unlock(); + } + + /** + * Returns the aggregator responsible for meaningfully combining update values. + * + * @param none + * @return the aggregator assigned to this instrument + */ + virtual std::shared_ptr> GetAggregator() final { return agg_; } + private: - std::shared_ptr> agg_; - int ref_ = 0; + std::shared_ptr> agg_; + int ref_ = 0; }; template class SynchronousInstrument : public Instrument, - virtual public metrics_api::SynchronousInstrument +virtual public metrics_api::SynchronousInstrument { - + public: - SynchronousInstrument() = default; - - SynchronousInstrument(nostd::string_view name, - nostd::string_view description, - nostd::string_view unit, - bool enabled, - metrics_api::InstrumentKind kind) - : Instrument(name, description, unit, enabled, kind) - {} - - /** - * Returns a Bound Instrument associated with the specified labels. Multiples requests - * with the same set of labels may return the same Bound Instrument instance. - * - * It is recommended that callers keep a reference to the Bound Instrument - * instead of repeatedly calling this operation. - * - * @param labels the set of labels, as key-value pairs - * @return a Bound Instrument - */ - virtual nostd::shared_ptr> bind( - const trace::KeyValueIterable &labels) override - { - return nostd::shared_ptr>(); - } - - virtual void update(T value, const trace::KeyValueIterable &labels) override = 0; - - /** - * Checkpoints instruments and returns a set of records which are ready for processing. - * This method should ONLY be called by the Meter Class as part of the export pipeline - * as it also prunes bound instruments with no active references. - * - * @param none - * @return vector of Records which hold the data attached to this synchronous instrument - */ - virtual std::vector GetRecords() = 0; -}; - -// Helper functions for turning a trace::KeyValueIterable into a string -inline void print_value(std::stringstream &ss, - common::AttributeValue &value, - bool jsonTypes = false) -{ - switch (value.index()) - { - case common::AttributeType::TYPE_STRING: - if (jsonTypes) - ss << '"'; - ss << nostd::get(value); - if (jsonTypes) - ss << '"'; - break; - default: -#if __EXCEPTIONS - throw std::invalid_argument("Labels must be strings"); -#else - std::terminate(); -#endif - break; - } + SynchronousInstrument() = default; + + SynchronousInstrument(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + metrics_api::InstrumentKind kind) + : Instrument(name, description, unit, enabled, kind) + {} + + /** + * Returns a Bound Instrument associated with the specified labels. Multiples requests + * with the same set of labels may return the same Bound Instrument instance. + * + * It is recommended that callers keep a reference to the Bound Instrument + * instead of repeatedly calling this operation. + * + * @param labels the set of labels, as key-value pairs + * @return a Bound Instrument + */ + virtual nostd::shared_ptr> bind( + const trace::KeyValueIterable &labels) override + { + return nostd::shared_ptr>(); + } + + virtual void update(T value, const trace::KeyValueIterable &labels) override = 0; + + /** + * Checkpoints instruments and returns a set of records which are ready for processing. + * This method should ONLY be called by the Meter Class as part of the export pipeline + * as it also prunes bound instruments with no active references. + * + * @param none + * @return vector of Records which hold the data attached to this synchronous instrument + */ + virtual std::vector GetRecords() = 0; }; -// Utility function which converts maps to strings for better performance -inline std::string mapToString(const std::map &conv) -{ - std::stringstream ss; - ss << "{"; - for (auto i : conv) - { - ss << i.first << ':' << i.second << ','; - } - ss << "}"; - return ss.str(); -} - -inline std::string KvToString(const trace::KeyValueIterable &kv) noexcept -{ - std::stringstream ss; - ss << "{"; - size_t size = kv.size(); - if (size) - { - size_t i = 1; - kv.ForEachKeyValue([&](nostd::string_view key, common::AttributeValue value) noexcept { - ss << "\"" << key << "\":"; - print_value(ss, value, true); - if (size != i) - { - ss << ","; - } - i++; - return true; - }); - }; - ss << "}"; - return ss.str(); -} - template class AsynchronousInstrument : public Instrument, virtual public metrics_api::AsynchronousInstrument { @@ -278,6 +218,66 @@ class AsynchronousInstrument : public Instrument, virtual public metrics_api::As virtual void run() override = 0; }; +// Helper functions for turning a trace::KeyValueIterable into a string +inline void print_value(std::stringstream &ss, + common::AttributeValue &value, + bool jsonTypes = false) +{ + switch (value.index()) + { + case common::AttributeType::TYPE_STRING: + if (jsonTypes) + ss << '"'; + ss << nostd::get(value); + if (jsonTypes) + ss << '"'; + break; + default: +#if __EXCEPTIONS + throw std::invalid_argument("Labels must be strings"); +#else + std::terminate(); +#endif + break; + } +}; + +// Utility function which converts maps to strings for better performance +inline std::string mapToString(const std::map &conv) +{ + std::stringstream ss; + ss << "{"; + for (auto i : conv) + { + ss << i.first << ':' << i.second << ','; + } + ss << "}"; + return ss.str(); +} + +inline std::string KvToString(const trace::KeyValueIterable &kv) noexcept +{ + std::stringstream ss; + ss << "{"; + size_t size = kv.size(); + if (size) + { + size_t i = 1; + kv.ForEachKeyValue([&](nostd::string_view key, common::AttributeValue value) noexcept { + ss << "\"" << key << "\":"; + print_value(ss, value, true); + if (size != i) + { + ss << ","; + } + i++; + return true; + }); + }; + ss << "}"; + return ss.str(); +} + } // namespace metrics } // namespace sdk OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/metrics/metric_instrument_test.cc b/sdk/test/metrics/metric_instrument_test.cc index d8d409b0fc..ca16a0179d 100644 --- a/sdk/test/metrics/metric_instrument_test.cc +++ b/sdk/test/metrics/metric_instrument_test.cc @@ -35,9 +35,9 @@ TEST(ApiSdkConversion, async){ alpha->observe(123456,labelkv); AggregatorVariant canCollect = dynamic_cast*>(alpha.get())->GetRecords()[0].GetAggregator(); - EXPECT_EQ(nostd::holds_alternative>>(canCollect), false); - EXPECT_EQ(nostd::holds_alternative>>(canCollect), true); - EXPECT_EQ(nostd::get>>(canCollect)->get_checkpoint()[0], 123456); + EXPECT_EQ(nostd::holds_alternative>>(canCollect), false); + EXPECT_EQ(nostd::holds_alternative>>(canCollect), true); + EXPECT_EQ(nostd::get>>(canCollect)->get_checkpoint()[0], 123456); } TEST(IntValueObserver, InstrumentFunctions) From 69bd15a80c04ce214379d6e148bb143edf656602 Mon Sep 17 00:00:00 2001 From: Ankit Bhargava Date: Tue, 4 Aug 2020 10:07:04 -0400 Subject: [PATCH 7/8] reformat --- .../sdk/metrics/async_instruments.h | 417 ++++++++-------- .../opentelemetry/sdk/metrics/instrument.h | 445 +++++++++--------- sdk/test/metrics/metric_instrument_test.cc | 267 ++++++----- 3 files changed, 597 insertions(+), 532 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h index 4e0207a44b..64541df9e7 100644 --- a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h @@ -1,15 +1,15 @@ #pragma once -#include "opentelemetry/sdk/metrics/instrument.h" -#include "opentelemetry/metrics/async_instruments.h" -#include "opentelemetry/version.h" -#include "opentelemetry/sdk/metrics/aggregator/counter_aggregator.h" -#include "opentelemetry/sdk/metrics/aggregator/min_max_sum_count_aggregator.h" -#include #include +#include #include +#include #include -#include +#include "opentelemetry/metrics/async_instruments.h" +#include "opentelemetry/sdk/metrics/aggregator/counter_aggregator.h" +#include "opentelemetry/sdk/metrics/aggregator/min_max_sum_count_aggregator.h" +#include "opentelemetry/sdk/metrics/instrument.h" +#include "opentelemetry/version.h" namespace metrics_api = opentelemetry::metrics; @@ -22,212 +22,245 @@ namespace metrics template class ValueObserver : public AsynchronousInstrument, virtual public metrics_api::ValueObserver { - + public: - ValueObserver() = default; - - ValueObserver(nostd::string_view name, - nostd::string_view description, - nostd::string_view unit, - bool enabled, - void (*callback)(metrics_api::ObserverResult)): - AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::ValueObserver) - {} - - /* - * Updates the instruments aggregator with the new value. The labels should - * contain the keys and values to be associated with this value. - * - * @param value is the numerical representation of the metric being captured - * @param labels the set of labels, as key-value pairs - */ - virtual void observe(T value, const trace::KeyValueIterable &labels) override { - this->mu_.lock(); - std::string labelset = KvToString(labels); - if (boundAggregators_.find(labelset) == boundAggregators_.end()) - { - auto sp1 = std::shared_ptr>(new MinMaxSumCountAggregator(this->kind_)); - boundAggregators_.insert(std::make_pair(labelset, sp1)); - sp1->update(value); - } - else - { - boundAggregators_[labelset]->update(value); - } - this->mu_.unlock(); + ValueObserver() = default; + + ValueObserver(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(metrics_api::ObserverResult)) + : AsynchronousInstrument(name, + description, + unit, + enabled, + callback, + metrics_api::InstrumentKind::ValueObserver) + {} + + /* + * Updates the instruments aggregator with the new value. The labels should + * contain the keys and values to be associated with this value. + * + * @param value is the numerical representation of the metric being captured + * @param labels the set of labels, as key-value pairs + */ + virtual void observe(T value, const trace::KeyValueIterable &labels) override + { + this->mu_.lock(); + std::string labelset = KvToString(labels); + if (boundAggregators_.find(labelset) == boundAggregators_.end()) + { + auto sp1 = std::shared_ptr>(new MinMaxSumCountAggregator(this->kind_)); + boundAggregators_.insert(std::make_pair(labelset, sp1)); + sp1->update(value); } - - /* - * Activate the instrument's callback function to record a measurement. This - * function will be called by the specified controller at a regular interval. - * - * @param none - * @return none - */ - virtual void run() override { - metrics_api::ObserverResult res(this); - this->callback_(res); + else + { + boundAggregators_[labelset]->update(value); } - - virtual std::vector GetRecords() override { - std::vector ret; - for (auto x : boundAggregators_){ - x.second->checkpoint(); - ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); - } - boundAggregators_.clear(); - return ret; + this->mu_.unlock(); + } + + /* + * Activate the instrument's callback function to record a measurement. This + * function will be called by the specified controller at a regular interval. + * + * @param none + * @return none + */ + virtual void run() override + { + metrics_api::ObserverResult res(this); + this->callback_(res); + } + + virtual std::vector GetRecords() override + { + std::vector ret; + for (auto x : boundAggregators_) + { + x.second->checkpoint(); + ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); } - - // Public mapping from labels (stored as strings) to their respective aggregators - std::unordered_map>> boundAggregators_; -}; + boundAggregators_.clear(); + return ret; + } + // Public mapping from labels (stored as strings) to their respective aggregators + std::unordered_map>> boundAggregators_; +}; template class SumObserver : public AsynchronousInstrument, virtual public metrics_api::SumObserver { - + public: - SumObserver() = default; - - SumObserver(nostd::string_view name, - nostd::string_view description, - nostd::string_view unit, - bool enabled, - void (*callback)(metrics_api::ObserverResult)): - AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::SumObserver) - {} - - /* - * Updates the instruments aggregator with the new value. The labels should - * contain the keys and values to be associated with this value. - * - * @param value is the numerical representation of the metric being captured - * @param labels the set of labels, as key-value pairs - */ - virtual void observe(T value, const trace::KeyValueIterable &labels) override { - this->mu_.lock(); - std::string labelset = KvToString(labels); - if (boundAggregators_.find(labelset) == boundAggregators_.end()) - { - auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); - boundAggregators_.insert(std::make_pair(labelset, sp1)); - if (value < 0){ - #if __EXCEPTIONS - throw std::invalid_argument("Counter instrument updates must be non-negative."); - #else - std::terminate(); - #endif - } else { - sp1->update(value); - } - } - else - { - if (value < 0){ - #if __EXCEPTIONS - throw std::invalid_argument("Counter instrument updates must be non-negative."); - #else - std::terminate(); - #endif - } else { - boundAggregators_[labelset]->update(value); - } - } - this->mu_.unlock(); + SumObserver() = default; + + SumObserver(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(metrics_api::ObserverResult)) + : AsynchronousInstrument(name, + description, + unit, + enabled, + callback, + metrics_api::InstrumentKind::SumObserver) + {} + + /* + * Updates the instruments aggregator with the new value. The labels should + * contain the keys and values to be associated with this value. + * + * @param value is the numerical representation of the metric being captured + * @param labels the set of labels, as key-value pairs + */ + virtual void observe(T value, const trace::KeyValueIterable &labels) override + { + this->mu_.lock(); + std::string labelset = KvToString(labels); + if (boundAggregators_.find(labelset) == boundAggregators_.end()) + { + auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); + boundAggregators_.insert(std::make_pair(labelset, sp1)); + if (value < 0) + { +#if __EXCEPTIONS + throw std::invalid_argument("Counter instrument updates must be non-negative."); +#else + std::terminate(); +#endif + } + else + { + sp1->update(value); + } } - - /* - * Activate the intsrument's callback function to record a measurement. This - * function will be called by the specified controller at a regular interval. - * - * @param none - * @return none - */ - virtual void run() override { - metrics_api::ObserverResult res(this); - this->callback_(res); + else + { + if (value < 0) + { +#if __EXCEPTIONS + throw std::invalid_argument("Counter instrument updates must be non-negative."); +#else + std::terminate(); +#endif + } + else + { + boundAggregators_[labelset]->update(value); + } } - - virtual std::vector GetRecords() override { - std::vector ret; - for (auto x : boundAggregators_){ - x.second->checkpoint(); - ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); - } - boundAggregators_.clear(); - return ret; + this->mu_.unlock(); + } + + /* + * Activate the intsrument's callback function to record a measurement. This + * function will be called by the specified controller at a regular interval. + * + * @param none + * @return none + */ + virtual void run() override + { + metrics_api::ObserverResult res(this); + this->callback_(res); + } + + virtual std::vector GetRecords() override + { + std::vector ret; + for (auto x : boundAggregators_) + { + x.second->checkpoint(); + ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); } - - // Public mapping from labels (stored as strings) to their respective aggregators - std::unordered_map>> boundAggregators_; + boundAggregators_.clear(); + return ret; + } + + // Public mapping from labels (stored as strings) to their respective aggregators + std::unordered_map>> boundAggregators_; }; template -class UpDownSumObserver : public AsynchronousInstrument, virtual public metrics_api::UpDownSumObserver +class UpDownSumObserver : public AsynchronousInstrument, + virtual public metrics_api::UpDownSumObserver { - + public: - UpDownSumObserver() = default; - - UpDownSumObserver(nostd::string_view name, - nostd::string_view description, - nostd::string_view unit, - bool enabled, - void (*callback)(metrics_api::ObserverResult)): - AsynchronousInstrument(name, description, unit, enabled, callback, metrics_api::InstrumentKind::UpDownSumObserver) - {} - - /* - * Updates the instruments aggregator with the new value. The labels should - * contain the keys and values to be associated with this value. - * - * @param value is the numerical representation of the metric being captured - * @param labels the set of labels, as key-value pairs - */ - virtual void observe(T value, const trace::KeyValueIterable &labels) override { - this->mu_.lock(); - std::string labelset = KvToString(labels); - if (boundAggregators_.find(labelset) == boundAggregators_.end()) - { - auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); - boundAggregators_.insert(std::make_pair(labelset, sp1)); - sp1->update(value); - } - else - { - boundAggregators_[labelset]->update(value); - } - this->mu_.unlock(); + UpDownSumObserver() = default; + + UpDownSumObserver(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(metrics_api::ObserverResult)) + : AsynchronousInstrument(name, + description, + unit, + enabled, + callback, + metrics_api::InstrumentKind::UpDownSumObserver) + {} + + /* + * Updates the instruments aggregator with the new value. The labels should + * contain the keys and values to be associated with this value. + * + * @param value is the numerical representation of the metric being captured + * @param labels the set of labels, as key-value pairs + */ + virtual void observe(T value, const trace::KeyValueIterable &labels) override + { + this->mu_.lock(); + std::string labelset = KvToString(labels); + if (boundAggregators_.find(labelset) == boundAggregators_.end()) + { + auto sp1 = std::shared_ptr>(new CounterAggregator(this->kind_)); + boundAggregators_.insert(std::make_pair(labelset, sp1)); + sp1->update(value); } - - /* - * Activate the intsrument's callback function to record a measurement. This - * function will be called by the specified controller at a regular interval. - * - * @param none - * @return none - */ - virtual void run() override { - metrics_api::ObserverResult res(this); - this->callback_(res); + else + { + boundAggregators_[labelset]->update(value); } - - virtual std::vector GetRecords() override { - std::vector ret; - for (auto x : boundAggregators_){ - x.second->checkpoint(); - ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); - } - boundAggregators_.clear(); - return ret; + this->mu_.unlock(); + } + + /* + * Activate the intsrument's callback function to record a measurement. This + * function will be called by the specified controller at a regular interval. + * + * @param none + * @return none + */ + virtual void run() override + { + metrics_api::ObserverResult res(this); + this->callback_(res); + } + + virtual std::vector GetRecords() override + { + std::vector ret; + for (auto x : boundAggregators_) + { + x.second->checkpoint(); + ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); } - - // Public mapping from labels (stored as strings) to their respective aggregators - std::unordered_map>> boundAggregators_; + boundAggregators_.clear(); + return ret; + } + + // Public mapping from labels (stored as strings) to their respective aggregators + std::unordered_map>> boundAggregators_; }; -} -} +} // namespace metrics +} // namespace sdk OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/include/opentelemetry/sdk/metrics/instrument.h b/sdk/include/opentelemetry/sdk/metrics/instrument.h index daf32351c9..64973aa485 100644 --- a/sdk/include/opentelemetry/sdk/metrics/instrument.h +++ b/sdk/include/opentelemetry/sdk/metrics/instrument.h @@ -3,18 +3,17 @@ #include #include #include -#include "opentelemetry/metrics/instrument.h" -#include "opentelemetry/sdk/metrics/aggregator/aggregator.h" -#include "opentelemetry/sdk/metrics/record.h" -#include "opentelemetry/version.h" #include #include #include #include - +#include "opentelemetry/metrics/instrument.h" +#include "opentelemetry/sdk/metrics/aggregator/aggregator.h" +#include "opentelemetry/sdk/metrics/record.h" +#include "opentelemetry/version.h" namespace metrics_api = opentelemetry::metrics; -namespace trace_api = opentelemetry::trace; +namespace trace_api = opentelemetry::trace; OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk @@ -24,198 +23,200 @@ namespace metrics class Instrument : virtual public metrics_api::Instrument { - + public: - Instrument() = default; - - Instrument(nostd::string_view name, - nostd::string_view description, - nostd::string_view unit, - bool enabled, - metrics_api::InstrumentKind kind) - : name_(name), description_(description), unit_(unit), enabled_(enabled), kind_(kind) - {} - - // Returns true if the instrument is enabled and collecting data - virtual bool IsEnabled() override { return enabled_; } - - // Return the instrument name - virtual nostd::string_view GetName() override { return name_; } - - // Return the instrument description - virtual nostd::string_view GetDescription() override { return description_; } - - // Return the insrument's units of measurement - virtual nostd::string_view GetUnits() override { return unit_; } - - virtual metrics_api::InstrumentKind GetKind() override { return this->kind_; } - + Instrument() = default; + + Instrument(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + metrics_api::InstrumentKind kind) + : name_(name), description_(description), unit_(unit), enabled_(enabled), kind_(kind) + {} + + // Returns true if the instrument is enabled and collecting data + virtual bool IsEnabled() override { return enabled_; } + + // Return the instrument name + virtual nostd::string_view GetName() override { return name_; } + + // Return the instrument description + virtual nostd::string_view GetDescription() override { return description_; } + + // Return the insrument's units of measurement + virtual nostd::string_view GetUnits() override { return unit_; } + + virtual metrics_api::InstrumentKind GetKind() override { return this->kind_; } + protected: - std::string name_; - std::string description_; - std::string unit_; - bool enabled_; - std::mutex mu_; - metrics_api::InstrumentKind kind_; + std::string name_; + std::string description_; + std::string unit_; + bool enabled_; + std::mutex mu_; + metrics_api::InstrumentKind kind_; }; template class BoundSynchronousInstrument : public Instrument, -virtual public metrics_api::BoundSynchronousInstrument + virtual public metrics_api::BoundSynchronousInstrument { - + public: - BoundSynchronousInstrument() = default; - - BoundSynchronousInstrument(nostd::string_view name, - nostd::string_view description, - nostd::string_view unit, - bool enabled, - metrics_api::InstrumentKind kind, - std::shared_ptr> agg) - : Instrument(name, description, unit, enabled, kind), agg_(agg) - { - this->inc_ref(); // increase reference count when instantiated - } - - /** - * Frees the resources associated with this Bound Instrument. - * The Metric from which this instrument was created is not impacted. - * - * @param none - * @return void - */ - virtual void unbind() override { ref_ -= 1; } - - /** - * Increments the reference count. This function is used when binding or instantiating. - * - * @param none - * @return void - */ - virtual void inc_ref() override { ref_ += 1; } - - /** - * Returns the current reference count of the instrument. This value is used to - * later in the pipeline remove stale instruments. - * - * @param none - * @return current ref count of the instrument - */ - virtual int get_ref() override { return ref_; } - - /** - * Records a single synchronous metric event via a call to the aggregator. - * Since this is a bound synchronous instrument, labels are not required in - * metric capture calls. - * - * @param value is the numerical representation of the metric being captured - * @return void - */ - virtual void update(T value) override - { - this->mu_.lock(); - agg_->update(value); - this->mu_.unlock(); - } - - /** - * Returns the aggregator responsible for meaningfully combining update values. - * - * @param none - * @return the aggregator assigned to this instrument - */ - virtual std::shared_ptr> GetAggregator() final { return agg_; } - + BoundSynchronousInstrument() = default; + + BoundSynchronousInstrument(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + metrics_api::InstrumentKind kind, + std::shared_ptr> agg) + : Instrument(name, description, unit, enabled, kind), agg_(agg) + { + this->inc_ref(); // increase reference count when instantiated + } + + /** + * Frees the resources associated with this Bound Instrument. + * The Metric from which this instrument was created is not impacted. + * + * @param none + * @return void + */ + virtual void unbind() override { ref_ -= 1; } + + /** + * Increments the reference count. This function is used when binding or instantiating. + * + * @param none + * @return void + */ + virtual void inc_ref() override { ref_ += 1; } + + /** + * Returns the current reference count of the instrument. This value is used to + * later in the pipeline remove stale instruments. + * + * @param none + * @return current ref count of the instrument + */ + virtual int get_ref() override { return ref_; } + + /** + * Records a single synchronous metric event via a call to the aggregator. + * Since this is a bound synchronous instrument, labels are not required in + * metric capture calls. + * + * @param value is the numerical representation of the metric being captured + * @return void + */ + virtual void update(T value) override + { + this->mu_.lock(); + agg_->update(value); + this->mu_.unlock(); + } + + /** + * Returns the aggregator responsible for meaningfully combining update values. + * + * @param none + * @return the aggregator assigned to this instrument + */ + virtual std::shared_ptr> GetAggregator() final { return agg_; } + private: - std::shared_ptr> agg_; - int ref_ = 0; + std::shared_ptr> agg_; + int ref_ = 0; }; template class SynchronousInstrument : public Instrument, -virtual public metrics_api::SynchronousInstrument + virtual public metrics_api::SynchronousInstrument { - + public: - SynchronousInstrument() = default; - - SynchronousInstrument(nostd::string_view name, - nostd::string_view description, - nostd::string_view unit, - bool enabled, - metrics_api::InstrumentKind kind) - : Instrument(name, description, unit, enabled, kind) - {} - - /** - * Returns a Bound Instrument associated with the specified labels. Multiples requests - * with the same set of labels may return the same Bound Instrument instance. - * - * It is recommended that callers keep a reference to the Bound Instrument - * instead of repeatedly calling this operation. - * - * @param labels the set of labels, as key-value pairs - * @return a Bound Instrument - */ - virtual nostd::shared_ptr> bind( - const trace::KeyValueIterable &labels) override - { - return nostd::shared_ptr>(); - } - - virtual void update(T value, const trace::KeyValueIterable &labels) override = 0; - - /** - * Checkpoints instruments and returns a set of records which are ready for processing. - * This method should ONLY be called by the Meter Class as part of the export pipeline - * as it also prunes bound instruments with no active references. - * - * @param none - * @return vector of Records which hold the data attached to this synchronous instrument - */ - virtual std::vector GetRecords() = 0; + SynchronousInstrument() = default; + + SynchronousInstrument(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + metrics_api::InstrumentKind kind) + : Instrument(name, description, unit, enabled, kind) + {} + + /** + * Returns a Bound Instrument associated with the specified labels. Multiples requests + * with the same set of labels may return the same Bound Instrument instance. + * + * It is recommended that callers keep a reference to the Bound Instrument + * instead of repeatedly calling this operation. + * + * @param labels the set of labels, as key-value pairs + * @return a Bound Instrument + */ + virtual nostd::shared_ptr> bind( + const trace::KeyValueIterable &labels) override + { + return nostd::shared_ptr>(); + } + + virtual void update(T value, const trace::KeyValueIterable &labels) override = 0; + + /** + * Checkpoints instruments and returns a set of records which are ready for processing. + * This method should ONLY be called by the Meter Class as part of the export pipeline + * as it also prunes bound instruments with no active references. + * + * @param none + * @return vector of Records which hold the data attached to this synchronous instrument + */ + virtual std::vector GetRecords() = 0; }; template -class AsynchronousInstrument : public Instrument, virtual public metrics_api::AsynchronousInstrument { - +class AsynchronousInstrument : public Instrument, + virtual public metrics_api::AsynchronousInstrument +{ + public: - AsynchronousInstrument() = default; - - AsynchronousInstrument(nostd::string_view name, - nostd::string_view description, - nostd::string_view unit, - bool enabled, - void (*callback)(metrics_api::ObserverResult), - metrics_api::InstrumentKind kind): - Instrument(name, description, unit, enabled, kind) - { - this->callback_ = callback; - } - - /** - * Captures data through a manual call rather than the automatic collection process instituted - * in the run function. Asynchronous instruments are generally expected to obtain data from - * their callbacks rather than direct calls. This function is used by the callback to store data. - * - * @param value is the numerical representation of the metric being captured - * @param labels is the numerical representation of the metric being captured - * @return none - */ - virtual void observe(T value, const trace::KeyValueIterable &labels) override = 0; - - virtual std::vector GetRecords() = 0; - - /** - * Captures data by activating the callback function associated with the - * instrument and storing its return value. Callbacks for asynchronous - * instruments are defined during construction. - * - * @param none - * @return none - */ - virtual void run() override = 0; + AsynchronousInstrument() = default; + + AsynchronousInstrument(nostd::string_view name, + nostd::string_view description, + nostd::string_view unit, + bool enabled, + void (*callback)(metrics_api::ObserverResult), + metrics_api::InstrumentKind kind) + : Instrument(name, description, unit, enabled, kind) + { + this->callback_ = callback; + } + + /** + * Captures data through a manual call rather than the automatic collection process instituted + * in the run function. Asynchronous instruments are generally expected to obtain data from + * their callbacks rather than direct calls. This function is used by the callback to store data. + * + * @param value is the numerical representation of the metric being captured + * @param labels is the numerical representation of the metric being captured + * @return none + */ + virtual void observe(T value, const trace::KeyValueIterable &labels) override = 0; + + virtual std::vector GetRecords() = 0; + + /** + * Captures data by activating the callback function associated with the + * instrument and storing its return value. Callbacks for asynchronous + * instruments are defined during construction. + * + * @param none + * @return none + */ + virtual void run() override = 0; }; // Helper functions for turning a trace::KeyValueIterable into a string @@ -223,61 +224,61 @@ inline void print_value(std::stringstream &ss, common::AttributeValue &value, bool jsonTypes = false) { - switch (value.index()) - { - case common::AttributeType::TYPE_STRING: - if (jsonTypes) - ss << '"'; - ss << nostd::get(value); - if (jsonTypes) - ss << '"'; - break; - default: + switch (value.index()) + { + case common::AttributeType::TYPE_STRING: + if (jsonTypes) + ss << '"'; + ss << nostd::get(value); + if (jsonTypes) + ss << '"'; + break; + default: #if __EXCEPTIONS - throw std::invalid_argument("Labels must be strings"); + throw std::invalid_argument("Labels must be strings"); #else - std::terminate(); + std::terminate(); #endif - break; - } + break; + } }; // Utility function which converts maps to strings for better performance inline std::string mapToString(const std::map &conv) { - std::stringstream ss; - ss << "{"; - for (auto i : conv) - { - ss << i.first << ':' << i.second << ','; - } - ss << "}"; - return ss.str(); + std::stringstream ss; + ss << "{"; + for (auto i : conv) + { + ss << i.first << ':' << i.second << ','; + } + ss << "}"; + return ss.str(); } inline std::string KvToString(const trace::KeyValueIterable &kv) noexcept { - std::stringstream ss; - ss << "{"; - size_t size = kv.size(); - if (size) - { - size_t i = 1; - kv.ForEachKeyValue([&](nostd::string_view key, common::AttributeValue value) noexcept { - ss << "\"" << key << "\":"; - print_value(ss, value, true); - if (size != i) - { - ss << ","; - } - i++; - return true; - }); - }; - ss << "}"; - return ss.str(); + std::stringstream ss; + ss << "{"; + size_t size = kv.size(); + if (size) + { + size_t i = 1; + kv.ForEachKeyValue([&](nostd::string_view key, common::AttributeValue value) noexcept { + ss << "\"" << key << "\":"; + print_value(ss, value, true); + if (size != i) + { + ss << ","; + } + i++; + return true; + }); + }; + ss << "}"; + return ss.str(); } -} // namespace metrics -} // namespace sdk +} // namespace metrics +} // namespace sdk OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/metrics/metric_instrument_test.cc b/sdk/test/metrics/metric_instrument_test.cc index ca16a0179d..97fe53278c 100644 --- a/sdk/test/metrics/metric_instrument_test.cc +++ b/sdk/test/metrics/metric_instrument_test.cc @@ -1,163 +1,194 @@ -#include #include +#include #include #include #include -#include "opentelemetry/sdk/metrics/async_instruments.h" -#include "opentelemetry/sdk/metrics/sync_instruments.h" #include #include +#include "opentelemetry/sdk/metrics/async_instruments.h" +#include "opentelemetry/sdk/metrics/sync_instruments.h" namespace metrics_api = opentelemetry::metrics; - OPENTELEMETRY_BEGIN_NAMESPACE namespace sdk { namespace metrics { -void ObserverConstructorCallback(metrics_api::ObserverResult result){ - std::map labels = {{"key", "value"}}; - auto labelkv = trace::KeyValueIterableView{labels}; - result.observe(1,labelkv); +void ObserverConstructorCallback(metrics_api::ObserverResult result) +{ + std::map labels = {{"key", "value"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + result.observe(1, labelkv); } -TEST(ApiSdkConversion, async){ - nostd::shared_ptr> alpha = nostd::shared_ptr>(new ValueObserver("ankit","none","unitles",true, &ObserverConstructorCallback)); - - std::map labels = {{"key587", "value264"}}; - auto labelkv = trace::KeyValueIterableView{labels}; - - alpha->observe(123456,labelkv); - EXPECT_EQ(dynamic_cast*>(alpha.get())->GetRecords()[0].GetLabels(),"{\"key587\":\"value264\"}"); - - alpha->observe(123456,labelkv); - AggregatorVariant canCollect = dynamic_cast*>(alpha.get())->GetRecords()[0].GetAggregator(); - EXPECT_EQ(nostd::holds_alternative>>(canCollect), false); - EXPECT_EQ(nostd::holds_alternative>>(canCollect), true); - EXPECT_EQ(nostd::get>>(canCollect)->get_checkpoint()[0], 123456); +TEST(ApiSdkConversion, async) +{ + nostd::shared_ptr> alpha = + nostd::shared_ptr>( + new ValueObserver("ankit", "none", "unitles", true, &ObserverConstructorCallback)); + + std::map labels = {{"key587", "value264"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + + alpha->observe(123456, labelkv); + EXPECT_EQ(dynamic_cast *>(alpha.get())->GetRecords()[0].GetLabels(), + "{\"key587\":\"value264\"}"); + + alpha->observe(123456, labelkv); + AggregatorVariant canCollect = + dynamic_cast *>(alpha.get())->GetRecords()[0].GetAggregator(); + EXPECT_EQ(nostd::holds_alternative>>(canCollect), false); + EXPECT_EQ(nostd::holds_alternative>>(canCollect), true); + EXPECT_EQ(nostd::get>>(canCollect)->get_checkpoint()[0], 123456); } TEST(IntValueObserver, InstrumentFunctions) { - ValueObserver alpha("enabled", "no description", "unitless", true, &ObserverConstructorCallback); - std::map labels = {{"key", "value"}}; - auto labelkv = trace::KeyValueIterableView{labels}; - - EXPECT_EQ(alpha.GetName(), "enabled"); - EXPECT_EQ(alpha.GetDescription(), "no description"); - EXPECT_EQ(alpha.GetUnits(), "unitless"); - EXPECT_EQ(alpha.IsEnabled(), true); - EXPECT_EQ(alpha.GetKind(), metrics_api::InstrumentKind::ValueObserver); - - alpha.run(); - EXPECT_EQ(alpha.boundAggregators_[KvToString(labelkv)]->get_values()[0], 1); // min + ValueObserver alpha("enabled", "no description", "unitless", true, + &ObserverConstructorCallback); + std::map labels = {{"key", "value"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + + EXPECT_EQ(alpha.GetName(), "enabled"); + EXPECT_EQ(alpha.GetDescription(), "no description"); + EXPECT_EQ(alpha.GetUnits(), "unitless"); + EXPECT_EQ(alpha.IsEnabled(), true); + EXPECT_EQ(alpha.GetKind(), metrics_api::InstrumentKind::ValueObserver); + + alpha.run(); + EXPECT_EQ(alpha.boundAggregators_[KvToString(labelkv)]->get_values()[0], 1); // min } -void ObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ - for (int i=0; iobserve(i, labels); - } +void ObserverCallback(std::shared_ptr> in, + int freq, + const trace::KeyValueIterable &labels) +{ + for (int i = 0; i < freq; i++) + { + in->observe(i, labels); + } } -void NegObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ - for (int i=0; iobserve(-i, labels); - } +void NegObserverCallback(std::shared_ptr> in, + int freq, + const trace::KeyValueIterable &labels) +{ + for (int i = 0; i < freq; i++) + { + in->observe(-i, labels); + } } TEST(IntValueObserver, StressObserve) { - std::shared_ptr> alpha(new ValueObserver("enabled", "no description", "unitless", true, &ObserverConstructorCallback)); - - std::map labels = {{"key", "value"}}; - std::map labels1 = {{"key1", "value1"}}; - auto labelkv = trace::KeyValueIterableView{labels}; - auto labelkv1 = trace::KeyValueIterableView{labels1}; - - std::thread first (ObserverCallback, alpha, 25, labelkv); // spawn new threads that call the callback - std::thread second (ObserverCallback, alpha, 50, labelkv); - std::thread third (ObserverCallback, alpha, 25, labelkv1); - std::thread fourth (NegObserverCallback, alpha, 100, labelkv1); // negative values - - first.join(); - second.join(); - third.join(); - fourth.join(); - - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 0); // min - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[1], 49); // max - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[2], 1525); // sum - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[3], 75); // count - - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], -99); // min - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[1], 24); // max - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[2], -4650); // sum - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[3], 125); // count + std::shared_ptr> alpha(new ValueObserver( + "enabled", "no description", "unitless", true, &ObserverConstructorCallback)); + + std::map labels = {{"key", "value"}}; + std::map labels1 = {{"key1", "value1"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + auto labelkv1 = trace::KeyValueIterableView{labels1}; + + std::thread first(ObserverCallback, alpha, 25, + labelkv); // spawn new threads that call the callback + std::thread second(ObserverCallback, alpha, 50, labelkv); + std::thread third(ObserverCallback, alpha, 25, labelkv1); + std::thread fourth(NegObserverCallback, alpha, 100, labelkv1); // negative values + + first.join(); + second.join(); + third.join(); + fourth.join(); + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 0); // min + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[1], 49); // max + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[2], 1525); // sum + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[3], 75); // count + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], -99); // min + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[1], 24); // max + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[2], -4650); // sum + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[3], 125); // count } -void SumObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ - for (int i=0; iobserve(1, labels); - } +void SumObserverCallback(std::shared_ptr> in, + int freq, + const trace::KeyValueIterable &labels) +{ + for (int i = 0; i < freq; i++) + { + in->observe(1, labels); + } } TEST(IntSumObserver, StressObserve) { - std::shared_ptr> alpha(new SumObserver("test", "none", "unitless", true, &ObserverConstructorCallback)); - - std::map labels = {{"key", "value"}}; - std::map labels1 = {{"key1", "value1"}}; - auto labelkv = trace::KeyValueIterableView{labels}; - auto labelkv1 = trace::KeyValueIterableView{labels1}; - - std::thread first (SumObserverCallback, alpha, 100000, labelkv); - std::thread second (SumObserverCallback, alpha, 100000, labelkv); - std::thread third (SumObserverCallback, alpha, 300000, labelkv1); - - first.join(); - second.join(); - third.join(); - - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 200000); - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], 300000); -} + std::shared_ptr> alpha( + new SumObserver("test", "none", "unitless", true, &ObserverConstructorCallback)); + + std::map labels = {{"key", "value"}}; + std::map labels1 = {{"key1", "value1"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + auto labelkv1 = trace::KeyValueIterableView{labels1}; + std::thread first(SumObserverCallback, alpha, 100000, labelkv); + std::thread second(SumObserverCallback, alpha, 100000, labelkv); + std::thread third(SumObserverCallback, alpha, 300000, labelkv1); -void UpDownSumObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ - for (int i=0; iobserve(1, labels); - } + first.join(); + second.join(); + third.join(); + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 200000); + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], 300000); +} + +void UpDownSumObserverCallback(std::shared_ptr> in, + int freq, + const trace::KeyValueIterable &labels) +{ + for (int i = 0; i < freq; i++) + { + in->observe(1, labels); + } } -void NegUpDownSumObserverCallback(std::shared_ptr> in, int freq, const trace::KeyValueIterable &labels){ - for (int i=0; iobserve(-1, labels); - } +void NegUpDownSumObserverCallback(std::shared_ptr> in, + int freq, + const trace::KeyValueIterable &labels) +{ + for (int i = 0; i < freq; i++) + { + in->observe(-1, labels); + } } -TEST(IntUpDownObserver, StressAdd){ - std::shared_ptr> alpha(new UpDownSumObserver("test", "none", "unitless", true, &ObserverConstructorCallback)); - - std::map labels = {{"key", "value"}}; - std::map labels1 = {{"key1", "value1"}}; - auto labelkv = trace::KeyValueIterableView{labels}; - auto labelkv1 = trace::KeyValueIterableView{labels1}; - - std::thread first (UpDownSumObserverCallback, alpha, 123400, labelkv); // spawn new threads that call the callback - std::thread second (UpDownSumObserverCallback, alpha, 123400, labelkv); - std::thread third (UpDownSumObserverCallback, alpha, 567800, labelkv1); - std::thread fourth (NegUpDownSumObserverCallback, alpha, 123400, labelkv1); // negative values - - first.join(); - second.join(); - third.join(); - fourth.join(); - - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 123400*2); - EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], 567800-123400); +TEST(IntUpDownObserver, StressAdd) +{ + std::shared_ptr> alpha( + new UpDownSumObserver("test", "none", "unitless", true, &ObserverConstructorCallback)); + + std::map labels = {{"key", "value"}}; + std::map labels1 = {{"key1", "value1"}}; + auto labelkv = trace::KeyValueIterableView{labels}; + auto labelkv1 = trace::KeyValueIterableView{labels1}; + + std::thread first(UpDownSumObserverCallback, alpha, 123400, + labelkv); // spawn new threads that call the callback + std::thread second(UpDownSumObserverCallback, alpha, 123400, labelkv); + std::thread third(UpDownSumObserverCallback, alpha, 567800, labelkv1); + std::thread fourth(NegUpDownSumObserverCallback, alpha, 123400, labelkv1); // negative values + + first.join(); + second.join(); + third.join(); + fourth.join(); + + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv)]->get_values()[0], 123400 * 2); + EXPECT_EQ(alpha->boundAggregators_[KvToString(labelkv1)]->get_values()[0], 567800 - 123400); } TEST(Counter, InstrumentFunctions) From 7b76bc49c529bd82e3cdc2c23280f9852bb5de78 Mon Sep 17 00:00:00 2001 From: Ankit Bhargava Date: Tue, 4 Aug 2020 10:32:45 -0400 Subject: [PATCH 8/8] locking and formatting --- .../sdk/metrics/async_instruments.h | 6 +++ .../opentelemetry/sdk/metrics/instrument.h | 15 +++++- .../sdk/metrics/sync_instruments.h | 46 +++++++++---------- 3 files changed, 42 insertions(+), 25 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h index 64541df9e7..9eccb86466 100644 --- a/sdk/include/opentelemetry/sdk/metrics/async_instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/async_instruments.h @@ -78,6 +78,7 @@ class ValueObserver : public AsynchronousInstrument, virtual public metrics_a virtual std::vector GetRecords() override { + this->mu_.lock(); std::vector ret; for (auto x : boundAggregators_) { @@ -85,6 +86,7 @@ class ValueObserver : public AsynchronousInstrument, virtual public metrics_a ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); } boundAggregators_.clear(); + this->mu_.unlock(); return ret; } @@ -173,6 +175,7 @@ class SumObserver : public AsynchronousInstrument, virtual public metrics_api virtual std::vector GetRecords() override { + this->mu_.lock(); std::vector ret; for (auto x : boundAggregators_) { @@ -180,6 +183,7 @@ class SumObserver : public AsynchronousInstrument, virtual public metrics_api ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); } boundAggregators_.clear(); + this->mu_.unlock(); return ret; } @@ -247,6 +251,7 @@ class UpDownSumObserver : public AsynchronousInstrument, virtual std::vector GetRecords() override { + this->mu_.lock(); std::vector ret; for (auto x : boundAggregators_) { @@ -254,6 +259,7 @@ class UpDownSumObserver : public AsynchronousInstrument, ret.push_back(Record(this->GetName(), this->GetDescription(), x.first, x.second)); } boundAggregators_.clear(); + this->mu_.unlock(); return ret; } diff --git a/sdk/include/opentelemetry/sdk/metrics/instrument.h b/sdk/include/opentelemetry/sdk/metrics/instrument.h index 64973aa485..1d06cbec4f 100644 --- a/sdk/include/opentelemetry/sdk/metrics/instrument.h +++ b/sdk/include/opentelemetry/sdk/metrics/instrument.h @@ -84,7 +84,12 @@ class BoundSynchronousInstrument : public Instrument, * @param none * @return void */ - virtual void unbind() override { ref_ -= 1; } + virtual void unbind() override + { + this->mu_.lock(); + ref_ -= 1; + this->mu_.unlock(); + } /** * Increments the reference count. This function is used when binding or instantiating. @@ -92,7 +97,12 @@ class BoundSynchronousInstrument : public Instrument, * @param none * @return void */ - virtual void inc_ref() override { ref_ += 1; } + virtual void inc_ref() override + { + this->mu_.lock(); + ref_ += 1; + this->mu_.unlock(); + } /** * Returns the current reference count of the instrument. This value is used to @@ -163,6 +173,7 @@ class SynchronousInstrument : public Instrument, return nostd::shared_ptr>(); } + // This function is necessary for batch recording and should NOT be called by the user virtual void update(T value, const trace::KeyValueIterable &labels) override = 0; /** diff --git a/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h b/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h index d8b1f83d24..2c0931eeb2 100644 --- a/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h +++ b/sdk/include/opentelemetry/sdk/metrics/sync_instruments.h @@ -48,7 +48,6 @@ class BoundCounter final : public BoundSynchronousInstrument, public metrics_ */ virtual void add(T value) override { - this->mu_.lock(); if (value < 0) { #if __EXCEPTIONS @@ -61,7 +60,6 @@ class BoundCounter final : public BoundSynchronousInstrument, public metrics_ { this->update(value); } - this->mu_.unlock(); } }; @@ -94,18 +92,22 @@ class Counter final : public SynchronousInstrument, public metrics_api::Count virtual nostd::shared_ptr> bindCounter( const trace::KeyValueIterable &labels) override { + this->mu_.lock(); std::string labelset = KvToString(labels); if (boundInstruments_.find(labelset) == boundInstruments_.end()) { auto sp1 = nostd::shared_ptr>( new BoundCounter(this->name_, this->description_, this->unit_, this->enabled_)); boundInstruments_[labelset] = sp1; + this->mu_.unlock(); return sp1; } else { boundInstruments_[labelset]->inc_ref(); - return boundInstruments_[labelset]; + auto ret = boundInstruments_[labelset]; + this->mu_.unlock(); + return ret; } } @@ -119,7 +121,6 @@ class Counter final : public SynchronousInstrument, public metrics_api::Count */ virtual void add(T value, const trace::KeyValueIterable &labels) override { - this->mu_.lock(); if (value < 0) { #if __EXCEPTIONS @@ -134,11 +135,11 @@ class Counter final : public SynchronousInstrument, public metrics_api::Count sp->update(value); sp->unbind(); } - this->mu_.unlock(); } virtual std::vector GetRecords() override { + this->mu_.lock(); std::vector ret; std::vector toDelete; for (const auto &x : boundInstruments_) @@ -155,6 +156,7 @@ class Counter final : public SynchronousInstrument, public metrics_api::Count { boundInstruments_.erase(x); } + this->mu_.unlock(); return ret; } @@ -194,12 +196,7 @@ class BoundUpDownCounter final : public BoundSynchronousInstrument, * @param value the numerical representation of the metric being captured * @param labels the set of labels, as key-value pairs */ - virtual void add(T value) override - { - this->mu_.lock(); - this->update(value); - this->mu_.unlock(); - } + virtual void add(T value) override { this->update(value); } }; template @@ -230,18 +227,22 @@ class UpDownCounter final : public SynchronousInstrument, public metrics_api: nostd::shared_ptr> bindUpDownCounter( const trace::KeyValueIterable &labels) override { + this->mu_.lock(); std::string labelset = KvToString(labels); if (boundInstruments_.find(labelset) == boundInstruments_.end()) { auto sp1 = nostd::shared_ptr>( new BoundUpDownCounter(this->name_, this->description_, this->unit_, this->enabled_)); boundInstruments_[labelset] = sp1; + this->mu_.unlock(); return sp1; } else { boundInstruments_[labelset]->inc_ref(); - return boundInstruments_[labelset]; + auto ret = boundInstruments_[labelset]; + this->mu_.unlock(); + return ret; } } @@ -255,15 +256,14 @@ class UpDownCounter final : public SynchronousInstrument, public metrics_api: */ void add(T value, const trace::KeyValueIterable &labels) override { - this->mu_.lock(); auto sp = bindUpDownCounter(labels); sp->update(value); sp->unbind(); - this->mu_.unlock(); } virtual std::vector GetRecords() override { + this->mu_.lock(); std::vector ret; std::vector toDelete; for (const auto &x : boundInstruments_) @@ -280,6 +280,7 @@ class UpDownCounter final : public SynchronousInstrument, public metrics_api: { boundInstruments_.erase(x); } + this->mu_.unlock(); return ret; } @@ -318,12 +319,7 @@ class BoundValueRecorder final : public BoundSynchronousInstrument, * @param value the numerical representation of the metric being captured * @param labels the set of labels, as key-value pairs */ - void record(T value) - { - this->mu_.lock(); - this->update(value); - this->mu_.unlock(); - } + void record(T value) { this->update(value); } }; template @@ -354,18 +350,22 @@ class ValueRecorder final : public SynchronousInstrument, public metrics_api: nostd::shared_ptr> bindValueRecorder( const trace::KeyValueIterable &labels) override { + this->mu_.lock(); std::string labelset = KvToString(labels); if (boundInstruments_.find(labelset) == boundInstruments_.end()) { auto sp1 = nostd::shared_ptr>( new BoundValueRecorder(this->name_, this->description_, this->unit_, this->enabled_)); boundInstruments_[labelset] = sp1; + this->mu_.unlock(); return sp1; } else { boundInstruments_[labelset]->inc_ref(); - return boundInstruments_[labelset]; + auto ret = boundInstruments_[labelset]; + this->mu_.unlock(); + return ret; } } @@ -379,15 +379,14 @@ class ValueRecorder final : public SynchronousInstrument, public metrics_api: */ void record(T value, const trace::KeyValueIterable &labels) override { - this->mu_.lock(); auto sp = bindValueRecorder(labels); sp->update(value); sp->unbind(); - this->mu_.unlock(); } virtual std::vector GetRecords() override { + this->mu_.lock(); std::vector ret; std::vector toDelete; for (const auto &x : boundInstruments_) @@ -404,6 +403,7 @@ class ValueRecorder final : public SynchronousInstrument, public metrics_api: { boundInstruments_.erase(x); } + this->mu_.unlock(); return ret; }