From 52e86f9abfa8752b74786b51187f763451ee349f Mon Sep 17 00:00:00 2001 From: Lalit Date: Fri, 7 Oct 2022 15:26:01 -0700 Subject: [PATCH 1/7] fix race --- sdk/include/opentelemetry/sdk/metrics/meter.h | 1 + sdk/src/metrics/meter.cc | 3 +++ 2 files changed, 4 insertions(+) diff --git a/sdk/include/opentelemetry/sdk/metrics/meter.h b/sdk/include/opentelemetry/sdk/metrics/meter.h index ef0d5ffb4e..b2c86a7e49 100644 --- a/sdk/include/opentelemetry/sdk/metrics/meter.h +++ b/sdk/include/opentelemetry/sdk/metrics/meter.h @@ -120,6 +120,7 @@ class Meter final : public opentelemetry::metrics::Meter InstrumentDescriptor &instrument_descriptor); std::unique_ptr RegisterAsyncMetricStorage( InstrumentDescriptor &instrument_descriptor); + opentelemetry::common::SpinLockMutex storage_lock_; }; } // namespace metrics } // namespace sdk diff --git a/sdk/src/metrics/meter.cc b/sdk/src/metrics/meter.cc index 4d6595dd7f..a13dc62c84 100644 --- a/sdk/src/metrics/meter.cc +++ b/sdk/src/metrics/meter.cc @@ -208,6 +208,7 @@ const sdk::instrumentationscope::InstrumentationScope *Meter::GetInstrumentation std::unique_ptr Meter::RegisterSyncMetricStorage( InstrumentDescriptor &instrument_descriptor) { + std::lock_guard guard(storage_lock_); auto ctx = meter_context_.lock(); if (!ctx) { @@ -251,6 +252,7 @@ std::unique_ptr Meter::RegisterSyncMetricStorage( std::unique_ptr Meter::RegisterAsyncMetricStorage( InstrumentDescriptor &instrument_descriptor) { + std::lock_guard guard(storage_lock_); auto ctx = meter_context_.lock(); if (!ctx) { @@ -293,6 +295,7 @@ std::unique_ptr Meter::RegisterAsyncMetricStorage( std::vector Meter::Collect(CollectorHandle *collector, opentelemetry::common::SystemTimestamp collect_ts) noexcept { + std::lock_guard guard(storage_lock_); observable_registry_->Observe(collect_ts); std::vector metric_data_list; auto ctx = meter_context_.lock(); From 6adbedde8f6fa0a7fd0c9a298f0e623c02126e33 Mon Sep 17 00:00:00 2001 From: Lalit Date: Fri, 7 Oct 2022 23:43:06 -0700 Subject: [PATCH 2/7] stress test metrics --- sdk/test/metrics/meter_test.cc | 51 ++++++++++++++++++++++++++++++++-- 1 file changed, 49 insertions(+), 2 deletions(-) diff --git a/sdk/test/metrics/meter_test.cc b/sdk/test/metrics/meter_test.cc index 9108297624..dfa51fe00f 100644 --- a/sdk/test/metrics/meter_test.cc +++ b/sdk/test/metrics/meter_test.cc @@ -30,14 +30,15 @@ class MockMetricReader : public MetricReader namespace { -nostd::shared_ptr InitMeter(MetricReader **metricReaderPtr) +nostd::shared_ptr InitMeter(MetricReader **metricReaderPtr, + std::string meter_name = "meter_name") { static std::shared_ptr provider(new MeterProvider()); std::unique_ptr metric_reader(new MockMetricReader()); *metricReaderPtr = metric_reader.get(); auto p = std::static_pointer_cast(provider); p->AddMetricReader(std::move(metric_reader)); - auto meter = provider->GetMeter("meter_name"); + auto meter = provider->GetMeter(meter_name); return meter; } } // namespace @@ -72,4 +73,50 @@ TEST(MeterTest, BasicAsyncTests) }); } +constexpr static unsigned MAX_THREADS = 25; +constexpr static unsigned MAX_ITERATIONS_MT = 1000; + +TEST(MeterTest, StressMultiThread) +{ + MetricReader *metric_reader_ptr = nullptr; + auto meter = InitMeter(&metric_reader_ptr, "stress_test_meter"); + std::atomic threadCount(0); + size_t numIterations = MAX_ITERATIONS_MT; + std::atomic do_collect{false}, do_sync_create{true}, do_async_create{false}; + std::vector> + observable_instruments; + while (numIterations--) + { + for (size_t i = 0; i < MAX_THREADS; i++) + { + if (threadCount++ < MAX_THREADS) + { + auto t = std::thread([&]() { + std::this_thread::yield(); + if (do_sync_create.exchange(false)) + { + std::string instrument_name = "test_couter_" + std::to_string(numIterations); + meter->CreateLongCounter(instrument_name, "", ""); + do_async_create.store(true); + } + if (do_async_create.exchange(false)) + { + auto observable_instrument = + meter->CreateLongObservableGauge("test_gauge" + std::to_string(numIterations)); + observable_instrument->AddCallback(asyc_generate_measurements, nullptr); + observable_instruments.push_back(std::move(observable_instrument)); + do_sync_create.store(true); + } + if (do_collect.exchange(false)) + { + metric_reader_ptr->Collect([](ResourceMetrics &metric_data) { return true; }); + do_sync_create.store(true); + } + }); + t.detach(); + } + } + } +} + #endif From 4502675cf473c746e077153be5348e41ae982c5e Mon Sep 17 00:00:00 2001 From: Lalit Date: Sun, 9 Oct 2022 12:20:46 -0700 Subject: [PATCH 3/7] Fix --- sdk/test/metrics/meter_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/test/metrics/meter_test.cc b/sdk/test/metrics/meter_test.cc index dfa51fe00f..c97e58012c 100644 --- a/sdk/test/metrics/meter_test.cc +++ b/sdk/test/metrics/meter_test.cc @@ -105,7 +105,7 @@ TEST(MeterTest, StressMultiThread) meter->CreateLongObservableGauge("test_gauge" + std::to_string(numIterations)); observable_instrument->AddCallback(asyc_generate_measurements, nullptr); observable_instruments.push_back(std::move(observable_instrument)); - do_sync_create.store(true); + do_collect.store(true); } if (do_collect.exchange(false)) { From 30697185b982aa9e857b45616bd463c153b18c34 Mon Sep 17 00:00:00 2001 From: Lalit Date: Sun, 9 Oct 2022 12:49:55 -0700 Subject: [PATCH 4/7] fix --- sdk/test/metrics/meter_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/test/metrics/meter_test.cc b/sdk/test/metrics/meter_test.cc index c97e58012c..d89b0ea083 100644 --- a/sdk/test/metrics/meter_test.cc +++ b/sdk/test/metrics/meter_test.cc @@ -71,6 +71,7 @@ TEST(MeterTest, BasicAsyncTests) } return true; }); + observable_counter->RemoveCallback(asyc_generate_measurements, nullptr); } constexpr static unsigned MAX_THREADS = 25; From 3346e2b96756fdc3b3de01e6c2c07cb866c43a57 Mon Sep 17 00:00:00 2001 From: Lalit Date: Sun, 9 Oct 2022 19:03:59 -0700 Subject: [PATCH 5/7] fix --- sdk/test/metrics/meter_test.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/test/metrics/meter_test.cc b/sdk/test/metrics/meter_test.cc index d89b0ea083..65c44157fb 100644 --- a/sdk/test/metrics/meter_test.cc +++ b/sdk/test/metrics/meter_test.cc @@ -118,6 +118,8 @@ TEST(MeterTest, StressMultiThread) } } } + // random wait for all callbacks to complete + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } #endif From 8f7601cc84be97321f7b7d06df28be1b40d7b127 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 10 Oct 2022 09:56:22 -0700 Subject: [PATCH 6/7] fix --- sdk/test/metrics/meter_test.cc | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/sdk/test/metrics/meter_test.cc b/sdk/test/metrics/meter_test.cc index 65c44157fb..77fde74d03 100644 --- a/sdk/test/metrics/meter_test.cc +++ b/sdk/test/metrics/meter_test.cc @@ -86,6 +86,8 @@ TEST(MeterTest, StressMultiThread) std::atomic do_collect{false}, do_sync_create{true}, do_async_create{false}; std::vector> observable_instruments; + std::vector meter_operation_threads; + size_t instrument_id = 0; while (numIterations--) { for (size_t i = 0; i < MAX_THREADS; i++) @@ -96,17 +98,20 @@ TEST(MeterTest, StressMultiThread) std::this_thread::yield(); if (do_sync_create.exchange(false)) { - std::string instrument_name = "test_couter_" + std::to_string(numIterations); + std::string instrument_name = "test_couter_" + std::to_string(instrument_id); meter->CreateLongCounter(instrument_name, "", ""); do_async_create.store(true); + instrument_id++; } if (do_async_create.exchange(false)) { + std::cout << "\n creating async thread " << std::to_string(numIterations); auto observable_instrument = - meter->CreateLongObservableGauge("test_gauge" + std::to_string(numIterations)); + meter->CreateLongObservableGauge("test_gauge_" + std::to_string(instrument_id)); observable_instrument->AddCallback(asyc_generate_measurements, nullptr); observable_instruments.push_back(std::move(observable_instrument)); do_collect.store(true); + instrument_id++; } if (do_collect.exchange(false)) { @@ -114,12 +119,17 @@ TEST(MeterTest, StressMultiThread) do_sync_create.store(true); } }); - t.detach(); + meter_operation_threads.push_back(std::move(t)); } } } - // random wait for all callbacks to complete - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + for (auto &t : meter_operation_threads) + { + if (t.joinable()) + { + t.join(); + } + } } #endif From 5fc08129da6549207eefc2cb5073516c33e5d530 Mon Sep 17 00:00:00 2001 From: Lalit Date: Mon, 10 Oct 2022 15:27:01 -0700 Subject: [PATCH 7/7] review comment --- sdk/src/metrics/meter.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/metrics/meter.cc b/sdk/src/metrics/meter.cc index ac728db3c2..8d36509c34 100644 --- a/sdk/src/metrics/meter.cc +++ b/sdk/src/metrics/meter.cc @@ -295,7 +295,6 @@ std::unique_ptr Meter::RegisterAsyncMetricStorage( std::vector Meter::Collect(CollectorHandle *collector, opentelemetry::common::SystemTimestamp collect_ts) noexcept { - std::lock_guard guard(storage_lock_); observable_registry_->Observe(collect_ts); std::vector metric_data_list; auto ctx = meter_context_.lock(); @@ -305,6 +304,7 @@ std::vector Meter::Collect(CollectorHandle *collector, << "The metric context is invalid"); return std::vector{}; } + std::lock_guard guard(storage_lock_); for (auto &metric_storage : storage_registry_) { metric_storage.second->Collect(collector, ctx->GetCollectors(), ctx->GetSDKStartTime(),