From 560e0a07433c9345e3076e38c90609dd9f15d0c8 Mon Sep 17 00:00:00 2001 From: owentou Date: Fri, 4 Mar 2022 23:59:27 +0800 Subject: [PATCH 1/8] Reorder the destructor of members in LoggerProvider and TracerProvider. It's safe to shutdown multiple times.(Fix #1244) Signed-off-by: owentou --- .../opentelemetry/sdk/_metrics/controller.h | 5 +++- sdk/include/opentelemetry/sdk/logs/logger.h | 5 ++-- .../opentelemetry/sdk/logs/logger_provider.h | 20 +++++++++----- .../opentelemetry/sdk/trace/tracer_provider.h | 2 ++ sdk/src/logs/batch_log_processor.cc | 13 ++++++--- sdk/src/logs/logger.cc | 7 +++-- sdk/src/logs/logger_provider.cc | 21 +++++++++++++++ sdk/src/logs/multi_log_processor.cc | 6 ++--- sdk/src/logs/simple_log_processor.cc | 2 +- sdk/src/trace/batch_span_processor.cc | 13 ++++++--- sdk/src/trace/tracer_provider.cc | 11 ++++++++ sdk/test/logs/batch_log_processor_test.cc | 2 ++ sdk/test/logs/logger_provider_sdk_test.cc | 27 +++++++++++++++++++ sdk/test/trace/batch_span_processor_test.cc | 2 ++ sdk/test/trace/tracer_provider_test.cc | 3 +++ 15 files changed, 113 insertions(+), 26 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/_metrics/controller.h b/sdk/include/opentelemetry/sdk/_metrics/controller.h index 72b53eda5d..9d80787bcd 100644 --- a/sdk/include/opentelemetry/sdk/_metrics/controller.h +++ b/sdk/include/opentelemetry/sdk/_metrics/controller.h @@ -80,7 +80,10 @@ class PushController { if (active_.exchange(false)) { - runner_.join(); + if (runner_.joinable()) + { + runner_.join(); + } tick(); // flush metrics sitting in the processor } } diff --git a/sdk/include/opentelemetry/sdk/logs/logger.h b/sdk/include/opentelemetry/sdk/logs/logger.h index f6c9e0a226..7242f4e624 100644 --- a/sdk/include/opentelemetry/sdk/logs/logger.h +++ b/sdk/include/opentelemetry/sdk/logs/logger.h @@ -67,9 +67,10 @@ class Logger final : public opentelemetry::logs::Logger // The name of this logger std::string logger_name_; - // The logger context of this Logger. Uses a weak_ptr to avoid cyclic dependency issues the with - std::weak_ptr context_; std::unique_ptr instrumentation_library_; + + // The logger context of this Logger. Uses a weak_ptr to avoid cyclic dependency issues the with + std::shared_ptr context_; }; } // namespace logs diff --git a/sdk/include/opentelemetry/sdk/logs/logger_provider.h b/sdk/include/opentelemetry/sdk/logs/logger_provider.h index fd0c163b8b..9afb67ba0c 100755 --- a/sdk/include/opentelemetry/sdk/logs/logger_provider.h +++ b/sdk/include/opentelemetry/sdk/logs/logger_provider.h @@ -61,6 +61,8 @@ class LoggerProvider final : public opentelemetry::logs::LoggerProvider */ explicit LoggerProvider(std::shared_ptr context) noexcept; + ~LoggerProvider(); + /** * Creates a logger with the given name, and returns a shared pointer to it. * If a logger with that name already exists, return a shared pointer to it @@ -107,14 +109,20 @@ class LoggerProvider final : public opentelemetry::logs::LoggerProvider */ const opentelemetry::sdk::resource::Resource &GetResource() const noexcept; -private: - // A pointer to the processor stored by this logger provider - std::shared_ptr context_; + /** + * Shutdown the log processor associated with this log provider. + */ + bool Shutdown() noexcept; - // A vector of pointers to all the loggers that have been created - std::vector> loggers_; + /** + * Force flush the log processor associated with this log provider. + */ + bool ForceFlush(std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept; - // A mutex that ensures only one thread is using the map of loggers +private: + // order of declaration is important here - loggers should destroy only after context. + std::vector> loggers_; + std::shared_ptr context_; std::mutex lock_; }; } // namespace logs diff --git a/sdk/include/opentelemetry/sdk/trace/tracer_provider.h b/sdk/include/opentelemetry/sdk/trace/tracer_provider.h index 77a9b713ba..aa0f69a2ba 100644 --- a/sdk/include/opentelemetry/sdk/trace/tracer_provider.h +++ b/sdk/include/opentelemetry/sdk/trace/tracer_provider.h @@ -59,6 +59,8 @@ class TracerProvider final : public opentelemetry::trace::TracerProvider */ explicit TracerProvider(std::shared_ptr context) noexcept; + ~TracerProvider(); + opentelemetry::nostd::shared_ptr GetTracer( nostd::string_view library_name, nostd::string_view library_version = "", diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index 4628bb1449..8b6597eb50 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -176,11 +176,16 @@ void BatchLogProcessor::DrainQueue() bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { - is_shutdown_.store(true); + bool already_shutdown = is_shutdown_.exchange(true); - cv_.notify_one(); - worker_thread_.join(); - if (exporter_ != nullptr) + if (worker_thread_.joinable()) + { + cv_.notify_one(); + worker_thread_.join(); + } + + // Should only shutdown exporter ONCE. + if (!already_shutdown && exporter_ != nullptr) { return exporter_->Shutdown(); } diff --git a/sdk/src/logs/logger.cc b/sdk/src/logs/logger.cc index 3c1d38d03e..6f78f87ce2 100644 --- a/sdk/src/logs/logger.cc +++ b/sdk/src/logs/logger.cc @@ -45,12 +45,11 @@ void Logger::Log(opentelemetry::logs::Severity severity, common::SystemTimestamp timestamp) noexcept { // If this logger does not have a processor, no need to create a log record - auto context = context_.lock(); - if (!context) + if (!context_) { return; } - auto &processor = context->GetProcessor(); + auto &processor = context_->GetProcessor(); // TODO: Sampler (should include check for minSeverity) @@ -68,7 +67,7 @@ void Logger::Log(opentelemetry::logs::Severity severity, recordable->SetBody(body); recordable->SetInstrumentationLibrary(GetInstrumentationLibrary()); - recordable->SetResource(context->GetResource()); + recordable->SetResource(context_->GetResource()); attributes.ForEachKeyValue([&](nostd::string_view key, common::AttributeValue value) noexcept { recordable->SetAttribute(key, value); diff --git a/sdk/src/logs/logger_provider.cc b/sdk/src/logs/logger_provider.cc index 08edce783d..3115b7fac8 100644 --- a/sdk/src/logs/logger_provider.cc +++ b/sdk/src/logs/logger_provider.cc @@ -43,6 +43,17 @@ LoggerProvider::LoggerProvider(std::shared_ptr context : context_{context} {} +LoggerProvider::~LoggerProvider() +{ + // Logger hold the shared pointer to the context. So we can not use destructor of LoggerContext to + // Shutdown and flush all pending recordables when we hasve more than one loggers.These + // recordables may use the raw pointer of instrumentation_library_ in Logger + if (context_) + { + context_->Shutdown(); + } +} + nostd::shared_ptr LoggerProvider::GetLogger( nostd::string_view logger_name, nostd::string_view options, @@ -105,6 +116,16 @@ const opentelemetry::sdk::resource::Resource &LoggerProvider::GetResource() cons return context_->GetResource(); } +bool LoggerProvider::Shutdown() noexcept +{ + return context_->Shutdown(); +} + +bool LoggerProvider::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + return context_->ForceFlush(timeout); +} + } // namespace logs } // namespace sdk OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/logs/multi_log_processor.cc b/sdk/src/logs/multi_log_processor.cc index b73cc9e7a6..5c7fe400f9 100644 --- a/sdk/src/logs/multi_log_processor.cc +++ b/sdk/src/logs/multi_log_processor.cc @@ -129,10 +129,8 @@ bool MultiLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept } for (auto &processor : processors_) { - if (!processor->Shutdown(std::chrono::duration_cast(timeout_ns))) - { - result = false; - } + result |= + processor->Shutdown(std::chrono::duration_cast(timeout_ns)); start_time = std::chrono::system_clock::now(); if (expire_time > start_time) { diff --git a/sdk/src/logs/simple_log_processor.cc b/sdk/src/logs/simple_log_processor.cc index ebcd6a4642..a347468841 100644 --- a/sdk/src/logs/simple_log_processor.cc +++ b/sdk/src/logs/simple_log_processor.cc @@ -56,7 +56,7 @@ bool SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept return exporter_->Shutdown(timeout); } - return false; + return true; } } // namespace logs } // namespace sdk diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index e9fdef7647..c3f20737c5 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -180,11 +180,16 @@ void BatchSpanProcessor::DrainQueue() bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { - is_shutdown_.store(true); + bool already_shutdown = is_shutdown_.exchange(true); - cv_.notify_one(); - worker_thread_.join(); - if (exporter_ != nullptr) + if (worker_thread_.joinable()) + { + cv_.notify_one(); + worker_thread_.join(); + } + + // Should only shutdown exporter ONCE. + if (!already_shutdown && exporter_ != nullptr) { return exporter_->Shutdown(); } diff --git a/sdk/src/trace/tracer_provider.cc b/sdk/src/trace/tracer_provider.cc index 5182fd1d22..64997d7d0c 100644 --- a/sdk/src/trace/tracer_provider.cc +++ b/sdk/src/trace/tracer_provider.cc @@ -36,6 +36,17 @@ TracerProvider::TracerProvider(std::vector> &&pro std::move(id_generator)); } +TracerProvider::~TracerProvider() +{ + // Tracer hold the shared pointer to the context. So we can not use destructor of TracerContext to + // Shutdown and flush all pending recordables when we have more than one tracers.These recordables + // may use the raw pointer of instrumentation_library_ in Tracer + if (context_) + { + context_->Shutdown(); + } +} + nostd::shared_ptr TracerProvider::GetTracer( nostd::string_view library_name, nostd::string_view library_version, diff --git a/sdk/test/logs/batch_log_processor_test.cc b/sdk/test/logs/batch_log_processor_test.cc index ebd0be069e..df503cb2aa 100644 --- a/sdk/test/logs/batch_log_processor_test.cc +++ b/sdk/test/logs/batch_log_processor_test.cc @@ -118,6 +118,8 @@ TEST_F(BatchLogProcessorTest, TestShutdown) // current batch of logs to be sent to the log exporter // by checking the number of logs sent and the names of the logs sent EXPECT_EQ(true, batch_processor->Shutdown()); + // It's safe to shutdown again + EXPECT_TRUE(batch_processor->Shutdown()); EXPECT_EQ(num_logs, logs_received->size()); diff --git a/sdk/test/logs/logger_provider_sdk_test.cc b/sdk/test/logs/logger_provider_sdk_test.cc index e31c1b6825..5948a9dfd2 100644 --- a/sdk/test/logs/logger_provider_sdk_test.cc +++ b/sdk/test/logs/logger_provider_sdk_test.cc @@ -10,6 +10,7 @@ # include "opentelemetry/sdk/logs/log_record.h" # include "opentelemetry/sdk/logs/logger.h" # include "opentelemetry/sdk/logs/logger_provider.h" +# include "opentelemetry/sdk/logs/simple_log_processor.h" # include @@ -103,4 +104,30 @@ TEST(LoggerProviderSDK, GetResource) LoggerProvider lp{nullptr, resource}; ASSERT_EQ(nostd::get(lp.GetResource().GetAttributes().at("key")), "value"); } + +TEST(LoggerProviderSDK, Shutdown) +{ + std::unique_ptr processor(new SimpleLogProcessor(nullptr)); + std::vector> processors; + processors.push_back(std::move(processor)); + + LoggerProvider lp(std::make_shared(std::move(processors))); + + EXPECT_TRUE(lp.Shutdown()); + + // It's safe to shutdown again + EXPECT_TRUE(lp.Shutdown()); +} + +TEST(LoggerProviderSDK, ForceFlush) +{ + std::unique_ptr processor(new SimpleLogProcessor(nullptr)); + std::vector> processors; + processors.push_back(std::move(processor)); + + LoggerProvider lp(std::make_shared(std::move(processors))); + + EXPECT_TRUE(lp.ForceFlush()); +} + #endif diff --git a/sdk/test/trace/batch_span_processor_test.cc b/sdk/test/trace/batch_span_processor_test.cc index 72a742a68b..0e6f9c35aa 100644 --- a/sdk/test/trace/batch_span_processor_test.cc +++ b/sdk/test/trace/batch_span_processor_test.cc @@ -118,6 +118,8 @@ TEST_F(BatchSpanProcessorTestPeer, TestShutdown) batch_processor->OnEnd(std::move(test_spans->at(i))); } + EXPECT_TRUE(batch_processor->Shutdown()); + // It's safe to shutdown again EXPECT_TRUE(batch_processor->Shutdown()); EXPECT_EQ(num_spans, spans_received->size()); diff --git a/sdk/test/trace/tracer_provider_test.cc b/sdk/test/trace/tracer_provider_test.cc index 25304bc1ba..d72d6bd40f 100644 --- a/sdk/test/trace/tracer_provider_test.cc +++ b/sdk/test/trace/tracer_provider_test.cc @@ -84,6 +84,9 @@ TEST(TracerProvider, Shutdown) TracerProvider tp1(std::make_shared(std::move(processors))); EXPECT_TRUE(tp1.Shutdown()); + + // It's safe to shutdown again + EXPECT_TRUE(tp1.Shutdown()); } TEST(TracerProvider, ForceFlush) From 4b0f856d6fe0912522c124748845fa5d499d9559 Mon Sep 17 00:00:00 2001 From: owentou Date: Sat, 5 Mar 2022 00:19:20 +0800 Subject: [PATCH 2/8] Fix warning. `exporters/ostream/src/log_exporter.cc:154:24: warning: comparison of integer expressions of different signedness: 'int' and 'const long unsigned int' [-Wsign-compare]` Signed-off-by: owentou --- exporters/ostream/src/log_exporter.cc | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/exporters/ostream/src/log_exporter.cc b/exporters/ostream/src/log_exporter.cc index c11ec13786..ef103bb6b2 100644 --- a/exporters/ostream/src/log_exporter.cc +++ b/exporters/ostream/src/log_exporter.cc @@ -146,12 +146,11 @@ sdk::common::ExportResult OStreamLogExporter::Export( // into severity_num and severity_text sout_ << "{\n" << " timestamp : " << log_record->GetTimestamp().time_since_epoch().count() << "\n" - << " severity_num : " << static_cast(log_record->GetSeverity()) << "\n" + << " severity_num : " << static_cast(log_record->GetSeverity()) << "\n" << " severity_text : "; - int severity_index = static_cast(log_record->GetSeverity()); - if (severity_index < 0 || - severity_index >= std::extent::value) + std::uint32_t severity_index = static_cast(log_record->GetSeverity()); + if (severity_index >= std::extent::value) { sout_ << "Invalid severity(" << severity_index << ")\n"; } From 6470fb60031c5e38c79ed6bb7328a9e89b336588 Mon Sep 17 00:00:00 2001 From: owentou Date: Sat, 5 Mar 2022 00:49:58 +0800 Subject: [PATCH 3/8] Fix unit test, keep the same behaviour between `SimpleLogProcessor` and `SimpleSpanProcessor` Signed-off-by: owentou --- sdk/src/logs/simple_log_processor.cc | 2 +- sdk/test/logs/simple_log_processor_test.cc | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/sdk/src/logs/simple_log_processor.cc b/sdk/src/logs/simple_log_processor.cc index a347468841..6e2fde9f14 100644 --- a/sdk/src/logs/simple_log_processor.cc +++ b/sdk/src/logs/simple_log_processor.cc @@ -51,7 +51,7 @@ bool SimpleLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept bool SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { // Should only shutdown exporter ONCE. - if (!shutdown_latch_.test_and_set(std::memory_order_acquire)) + if (!shutdown_latch_.test_and_set(std::memory_order_acquire) && exporter_ != nullptr) { return exporter_->Shutdown(timeout); } diff --git a/sdk/test/logs/simple_log_processor_test.cc b/sdk/test/logs/simple_log_processor_test.cc index 3b89d60994..0a0c04cfc6 100644 --- a/sdk/test/logs/simple_log_processor_test.cc +++ b/sdk/test/logs/simple_log_processor_test.cc @@ -116,8 +116,6 @@ TEST(SimpleLogProcessorTest, ShutdownCalledOnce) EXPECT_EQ(true, processor.Shutdown()); EXPECT_EQ(1, num_shutdowns); - // The second time processor shutdown is called - EXPECT_EQ(false, processor.Shutdown()); // Processor::ShutDown(), even if called more than once, should only shutdown exporter once EXPECT_EQ(1, num_shutdowns); } @@ -149,8 +147,5 @@ TEST(SimpleLogProcessorTest, ShutDownFail) // Expect failure result when exporter fails to shutdown EXPECT_EQ(false, processor.Shutdown()); - - // Expect failure result when processor given a negative timeout allowed to shutdown - EXPECT_EQ(false, processor.Shutdown(std::chrono::microseconds(-1))); } #endif From 0322126f7f7d4decddd91408a4185d6438eb9103 Mon Sep 17 00:00:00 2001 From: owentou Date: Sat, 5 Mar 2022 01:00:24 +0800 Subject: [PATCH 4/8] Fix comment Signed-off-by: owentou --- sdk/include/opentelemetry/sdk/logs/logger.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/logger.h b/sdk/include/opentelemetry/sdk/logs/logger.h index 7242f4e624..604eb0d8f9 100644 --- a/sdk/include/opentelemetry/sdk/logs/logger.h +++ b/sdk/include/opentelemetry/sdk/logs/logger.h @@ -67,9 +67,9 @@ class Logger final : public opentelemetry::logs::Logger // The name of this logger std::string logger_name_; + // order of declaration is important here - instrumentation library should destroy after + // logger-context. std::unique_ptr instrumentation_library_; - - // The logger context of this Logger. Uses a weak_ptr to avoid cyclic dependency issues the with std::shared_ptr context_; }; From f9b1b3a7500dfa04fc6b070da5e956ec9e22cd66 Mon Sep 17 00:00:00 2001 From: owentou Date: Sat, 5 Mar 2022 01:01:52 +0800 Subject: [PATCH 5/8] Fix initialization order. Signed-off-by: owentou --- sdk/src/logs/logger.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/src/logs/logger.cc b/sdk/src/logs/logger.cc index 6f78f87ce2..0222b786bc 100644 --- a/sdk/src/logs/logger.cc +++ b/sdk/src/logs/logger.cc @@ -21,8 +21,8 @@ Logger::Logger(nostd::string_view name, std::unique_ptr instrumentation_library) noexcept : logger_name_(std::string(name)), - context_(context), - instrumentation_library_{std::move(instrumentation_library)} + instrumentation_library_(std::move(instrumentation_library)), + context_(context) {} const nostd::string_view Logger::GetName() noexcept From ee91478757c85ba082f6a753724719c0453e9ea9 Mon Sep 17 00:00:00 2001 From: owent Date: Sat, 5 Mar 2022 14:06:59 +0800 Subject: [PATCH 6/8] + Fix warning when comparing signed and unsigned integer. + Temporary fix the `Export()` may be called too many times when we shutdown or just wakeup worker thread once.This problem is completely fixed in #1209 but not merged yet. + Fix http client may has a different error code when our network is under a proxy. --- .../exporters/elasticsearch/es_log_recordable.h | 5 ++--- exporters/otlp/src/otlp_grpc_exporter.cc | 5 +++++ exporters/otlp/src/otlp_grpc_log_exporter.cc | 5 +++++ exporters/otlp/src/otlp_http_exporter.cc | 5 +++++ exporters/otlp/src/otlp_http_log_exporter.cc | 4 ++++ exporters/otlp/test/otlp_http_log_exporter_test.cc | 4 ++-- ext/test/http/curl_http_test.cc | 4 +++- 7 files changed, 26 insertions(+), 6 deletions(-) diff --git a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h index 7cc552d2d4..5c223cd0d4 100755 --- a/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h +++ b/exporters/elasticsearch/include/opentelemetry/exporters/elasticsearch/es_log_recordable.h @@ -111,9 +111,8 @@ class ElasticSearchRecordable final : public sdk::logs::Recordable void SetSeverity(opentelemetry::logs::Severity severity) noexcept override { // Convert the severity enum to a string - int severity_index = static_cast(severity); - if (severity_index < 0 || - severity_index >= std::extent::value) + std::uint32_t severity_index = static_cast(severity); + if (severity_index >= std::extent::value) { std::stringstream sout; sout << "Invalid severity(" << severity_index << ")"; diff --git a/exporters/otlp/src/otlp_grpc_exporter.cc b/exporters/otlp/src/otlp_grpc_exporter.cc index 0cd6bb8cda..32f4a60a52 100644 --- a/exporters/otlp/src/otlp_grpc_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_exporter.cc @@ -110,6 +110,11 @@ sdk::common::ExportResult OtlpGrpcExporter::Export( << " span(s) failed, exporter is shutdown"); return sdk::common::ExportResult::kFailure; } + if (spans.empty()) + { + return sdk::common::ExportResult::kSuccess; + } + proto::collector::trace::v1::ExportTraceServiceRequest request; OtlpRecordableUtils::PopulateRequest(spans, &request); diff --git a/exporters/otlp/src/otlp_grpc_log_exporter.cc b/exporters/otlp/src/otlp_grpc_log_exporter.cc index 4dd84cff11..38bfb0a5bb 100644 --- a/exporters/otlp/src/otlp_grpc_log_exporter.cc +++ b/exporters/otlp/src/otlp_grpc_log_exporter.cc @@ -131,6 +131,11 @@ opentelemetry::sdk::common::ExportResult OtlpGrpcLogExporter::Export( << " log(s) failed, exporter is shutdown"); return sdk::common::ExportResult::kFailure; } + if (logs.empty()) + { + return sdk::common::ExportResult::kSuccess; + } + proto::collector::logs::v1::ExportLogsServiceRequest request; OtlpRecordableUtils::PopulateRequest(logs, &request); diff --git a/exporters/otlp/src/otlp_http_exporter.cc b/exporters/otlp/src/otlp_http_exporter.cc index ca705d2e30..92155dd00d 100644 --- a/exporters/otlp/src/otlp_http_exporter.cc +++ b/exporters/otlp/src/otlp_http_exporter.cc @@ -46,6 +46,11 @@ std::unique_ptr OtlpHttpExporter::MakeRec opentelemetry::sdk::common::ExportResult OtlpHttpExporter::Export( const nostd::span> &spans) noexcept { + if (spans.empty()) + { + return opentelemetry::sdk::common::ExportResult::kSuccess; + } + proto::collector::trace::v1::ExportTraceServiceRequest service_request; OtlpRecordableUtils::PopulateRequest(spans, &service_request); return http_client_->Export(service_request); diff --git a/exporters/otlp/src/otlp_http_log_exporter.cc b/exporters/otlp/src/otlp_http_log_exporter.cc index 3b64ee4eda..436c77beaa 100644 --- a/exporters/otlp/src/otlp_http_log_exporter.cc +++ b/exporters/otlp/src/otlp_http_log_exporter.cc @@ -48,6 +48,10 @@ std::unique_ptr OtlpHttpLogExporter::MakeR opentelemetry::sdk::common::ExportResult OtlpHttpLogExporter::Export( const nostd::span> &logs) noexcept { + if (logs.empty()) + { + return opentelemetry::sdk::common::ExportResult::kSuccess; + } proto::collector::logs::v1::ExportLogsServiceRequest service_request; OtlpRecordableUtils::PopulateRequest(logs, &service_request); return http_client_->Export(service_request); diff --git a/exporters/otlp/test/otlp_http_log_exporter_test.cc b/exporters/otlp/test/otlp_http_log_exporter_test.cc index 9be4af0983..ffd1a9a0f3 100644 --- a/exporters/otlp/test/otlp_http_log_exporter_test.cc +++ b/exporters/otlp/test/otlp_http_log_exporter_test.cc @@ -103,7 +103,7 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportJsonIntegrationTest) auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); provider->AddProcessor(std::unique_ptr( - new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 1))); + new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 5))); std::string report_trace_id; std::string report_span_id; @@ -192,7 +192,7 @@ TEST_F(OtlpHttpLogExporterTestPeer, ExportBinaryIntegrationTest) auto provider = nostd::shared_ptr(new sdk::logs::LoggerProvider()); provider->AddProcessor(std::unique_ptr( - new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 1))); + new sdk::logs::BatchLogProcessor(std::move(exporter), 5, std::chrono::milliseconds(256), 5))); std::string report_trace_id; std::string report_span_id; diff --git a/ext/test/http/curl_http_test.cc b/ext/test/http/curl_http_test.cc index c9a6312888..72d0530a21 100644 --- a/ext/test/http/curl_http_test.cc +++ b/ext/test/http/curl_http_test.cc @@ -291,7 +291,9 @@ TEST_F(BasicCurlHttpTests, SendGetRequestSyncTimeout) auto result = http_client.Get("http://222.222.222.200:19000/get/", m1); EXPECT_EQ(result, false); - EXPECT_EQ(result.GetSessionState(), http_client::SessionState::ConnectFailed); + // When network is under proxy, it may cennect success but closed by peer when send data + EXPECT_TRUE(result.GetSessionState() == http_client::SessionState::ConnectFailed || + result.GetSessionState() == http_client::SessionState::SendFailed); } TEST_F(BasicCurlHttpTests, SendPostRequestSync) From bcf6aa0f3b437aacf1cc1a35c59ce011e2fed23a Mon Sep 17 00:00:00 2001 From: owent Date: Sat, 5 Mar 2022 21:26:31 +0800 Subject: [PATCH 7/8] Add the missing checking of double calling to `SimpleLogProcessor::Shutdown` --- sdk/test/logs/simple_log_processor_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/test/logs/simple_log_processor_test.cc b/sdk/test/logs/simple_log_processor_test.cc index 0a0c04cfc6..0bb6ba2667 100644 --- a/sdk/test/logs/simple_log_processor_test.cc +++ b/sdk/test/logs/simple_log_processor_test.cc @@ -116,6 +116,7 @@ TEST(SimpleLogProcessorTest, ShutdownCalledOnce) EXPECT_EQ(true, processor.Shutdown()); EXPECT_EQ(1, num_shutdowns); + EXPECT_EQ(true, processor.Shutdown()); // Processor::ShutDown(), even if called more than once, should only shutdown exporter once EXPECT_EQ(1, num_shutdowns); } From 67963f14d0948a247bb928574b6fdf89a8e4bb69 Mon Sep 17 00:00:00 2001 From: owentou Date: Tue, 8 Mar 2022 10:23:19 +0800 Subject: [PATCH 8/8] Do not return when shutdown not finished.Fix a spell error. Signed-off-by: owentou --- ext/test/http/curl_http_test.cc | 2 +- sdk/include/opentelemetry/sdk/logs/batch_log_processor.h | 2 +- sdk/include/opentelemetry/sdk/trace/batch_span_processor.h | 2 +- sdk/src/logs/batch_log_processor.cc | 1 + sdk/src/trace/batch_span_processor.cc | 1 + 5 files changed, 5 insertions(+), 3 deletions(-) diff --git a/ext/test/http/curl_http_test.cc b/ext/test/http/curl_http_test.cc index 72d0530a21..f8d248bae4 100644 --- a/ext/test/http/curl_http_test.cc +++ b/ext/test/http/curl_http_test.cc @@ -291,7 +291,7 @@ TEST_F(BasicCurlHttpTests, SendGetRequestSyncTimeout) auto result = http_client.Get("http://222.222.222.200:19000/get/", m1); EXPECT_EQ(result, false); - // When network is under proxy, it may cennect success but closed by peer when send data + // When network is under proxy, it may connect success but closed by peer when send data EXPECT_TRUE(result.GetSessionState() == http_client::SessionState::ConnectFailed || result.GetSessionState() == http_client::SessionState::SendFailed); } diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h index 6d5d7fde3c..1b6d443c8a 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -108,7 +108,7 @@ class BatchLogProcessor : public LogProcessor /* Synchronization primitives */ std::condition_variable cv_, force_flush_cv_; - std::mutex cv_m_, force_flush_cv_m_; + std::mutex cv_m_, force_flush_cv_m_, shutdown_m_; /* The buffer/queue to which the ended logs are added */ common::CircularBuffer buffer_; diff --git a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h index 6b47d993d8..d25ff2d950 100644 --- a/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h +++ b/sdk/include/opentelemetry/sdk/trace/batch_span_processor.h @@ -139,7 +139,7 @@ class BatchSpanProcessor : public SpanProcessor /* Synchronization primitives */ std::condition_variable cv_, force_flush_cv_; - std::mutex cv_m_, force_flush_cv_m_; + std::mutex cv_m_, force_flush_cv_m_, shutdown_m_; /* The buffer/queue to which the ended spans are added */ common::CircularBuffer buffer_; diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index 8b6597eb50..9b20705b0a 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -176,6 +176,7 @@ void BatchLogProcessor::DrainQueue() bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { + std::lock_guard shutdown_guard{shutdown_m_}; bool already_shutdown = is_shutdown_.exchange(true); if (worker_thread_.joinable()) diff --git a/sdk/src/trace/batch_span_processor.cc b/sdk/src/trace/batch_span_processor.cc index c3f20737c5..0ab042b9ab 100644 --- a/sdk/src/trace/batch_span_processor.cc +++ b/sdk/src/trace/batch_span_processor.cc @@ -180,6 +180,7 @@ void BatchSpanProcessor::DrainQueue() bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { + std::lock_guard shutdown_guard{shutdown_m_}; bool already_shutdown = is_shutdown_.exchange(true); if (worker_thread_.joinable())