From 04c434e2965c6e390b8e7820107e868ffd80c409 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Mon, 9 Nov 2020 20:42:33 -0500 Subject: [PATCH 01/12] Add batch log processor implementation with test coverage --- .../sdk/logs/batch_log_processor.h | 136 +++++++++ sdk/src/logs/CMakeLists.txt | 2 +- sdk/src/logs/batch_log_processor.cc | 225 +++++++++++++++ sdk/test/logs/BUILD | 11 + sdk/test/logs/CMakeLists.txt | 4 +- sdk/test/logs/batch_log_processor_test.cc | 273 ++++++++++++++++++ 6 files changed, 648 insertions(+), 3 deletions(-) create mode 100644 sdk/include/opentelemetry/sdk/logs/batch_log_processor.h create mode 100644 sdk/src/logs/batch_log_processor.cc create mode 100644 sdk/test/logs/batch_log_processor_test.cc diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h new file mode 100644 index 0000000000..2e75074a91 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -0,0 +1,136 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "opentelemetry/sdk/common/circular_buffer.h" +#include "opentelemetry/sdk/logs/exporter.h" +#include "opentelemetry/sdk/logs/processor.h" + +#include +#include +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ + +namespace logs +{ + +/** + * This is an implementation of the LogProcessor which creates batches of finished logs and passes + * the export-friendly log data representations to the configured LogExporter. + */ +class BatchLogProcessor : public LogProcessor +{ +public: + /** + * Creates a batch log processor by configuring the specified exporter and other parameters + * as per the official, language-agnostic opentelemetry specs. + * + * @param exporter - The backend exporter to pass the logs to + * @param max_queue_size - The maximum buffer/queue size. After the size is reached, logs are + * dropped. + * @param schedule_delay_millis - The time interval between two consecutive exports. + * @param max_export_batch_size - The maximum batch size of every export. It must be smaller or + * equal to max_queue_size + */ + explicit BatchLogProcessor( + std::unique_ptr &&exporter, + const size_t max_queue_size = 2048, + const std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(5000), + const size_t max_export_batch_size = 512); + + /** + * Called when the Logger's log method creates a log record + * @param record the log record + */ + + void OnReceive(std::unique_ptr &&record) noexcept override; + + /** + * Export all log records that have not been exported yet. + * + * NOTE: Timeout functionality not supported yet. + */ + bool ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + + /** + * Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of + * all its logs and passes them to the exporter. Any subsequent calls to + * ForceFlush or Shutdown will return immediately without doing anything. + * + * NOTE: Timeout functionality not supported yet. + */ + bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + + /** + * Class destructor which invokes the Shutdown() method. + */ + ~BatchLogProcessor(); + +private: + /** + * The background routine performed by the worker thread. + */ + void DoBackgroundWork(); + + /** + * Exports all logs to the configured exporter. + * + * @param was_force_flush_called - A flag to check if the current export is the result + * of a call to ForceFlush method. If true, then we have to + * notify the main thread to wake it up in the ForceFlush + * method. + */ + void Export(const bool was_for_flush_called); + + /** + * Called when Shutdown() is invoked. Completely drains the queue of all log records and + * passes them to the exporter. + */ + void DrainQueue(); + + /* The configured backend log exporter */ + std::unique_ptr exporter_; + + /* Configurable parameters as per the official *trace* specs */ + const size_t max_queue_size_; + const std::chrono::milliseconds schedule_delay_millis_; + const size_t max_export_batch_size_; + + /* Synchronization primitives */ + std::condition_variable cv_, force_flush_cv_; + std::mutex cv_m_, force_flush_cv_m_; + + /* The buffer/queue to which the ended logs are added */ + common::CircularBuffer buffer_; + + /* Important boolean flags to handle the workflow of the processor */ + std::atomic is_shutdown_{false}; + std::atomic is_force_flush_{false}; + std::atomic is_force_flush_notified_{false}; + + /* The background worker thread */ + std::thread worker_thread_; +}; + +} // namespace logs +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/logs/CMakeLists.txt b/sdk/src/logs/CMakeLists.txt index e2e7c2c915..44d8909472 100644 --- a/sdk/src/logs/CMakeLists.txt +++ b/sdk/src/logs/CMakeLists.txt @@ -1,4 +1,4 @@ add_library(opentelemetry_logs logger_provider.cc logger.cc - simple_log_processor.cc) + simple_log_processor.cc batch_log_processor.cc) target_link_libraries(opentelemetry_logs opentelemetry_common) diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc new file mode 100644 index 0000000000..34a61da89c --- /dev/null +++ b/sdk/src/logs/batch_log_processor.cc @@ -0,0 +1,225 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "opentelemetry/sdk/logs/batch_log_processor.h" + +#include +#include + +using opentelemetry::logs::LogRecord; +using opentelemetry::sdk::common::AtomicUniquePtr; +using opentelemetry::sdk::common::CircularBuffer; +using opentelemetry::sdk::common::CircularBufferRange; + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace logs +{ +BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, + const size_t max_queue_size, + const std::chrono::milliseconds schedule_delay_millis, + const size_t max_export_batch_size) + : exporter_(std::move(exporter)), + max_queue_size_(max_queue_size), + schedule_delay_millis_(schedule_delay_millis), + max_export_batch_size_(max_export_batch_size), + buffer_(max_queue_size_), + worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) +{} + +void BatchLogProcessor::OnReceive(std::unique_ptr &&record) noexcept +{ + if (is_shutdown_.load() == true) + { + return; + } + + if (buffer_.Add(record) == false) + { + std::cerr << "[Batch Log Processor] Adding record to circular buffer failed \n"; + return; + } + + // If the queue gets at least half full a preemptive notification is + // sent to the worker thread to start a new export cycle. + if (buffer_.size() >= max_queue_size_ / 2) + { + // signal the worker thread + cv_.notify_one(); + } +} + +bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + if (is_shutdown_.load() == true) + { + return false; + } + + is_force_flush_ = true; + + // Keep attempting to wake up the worker thread + while (is_force_flush_.load() == true) + { + cv_.notify_one(); + } + + // Now wait for the worker thread to signal back from the Export method + std::unique_lock lk(force_flush_cv_m_); + while (is_force_flush_notified_.load() == false) + { + force_flush_cv_.wait(lk); + } + + // Notify the worker thread + is_force_flush_notified_ = false; + return true; +} + +void BatchLogProcessor::DoBackgroundWork() +{ + auto timeout = schedule_delay_millis_; + + while (true) + { + // Wait for `timeout` milliseconds + std::unique_lock lk(cv_m_); + cv_.wait_for(lk, timeout); + + if (is_shutdown_.load() == true) + { + DrainQueue(); + return; + } + + bool was_force_flush_called = is_force_flush_.load(); + + // Check if this export was the result of a force flush. + if (was_force_flush_called == true) + { + // Since this export was the result of a force flush, signal the + // main thread that the worker thread has been notified + is_force_flush_ = false; + } + else + { + // If the buffer was empty during the entire `timeout` time interval, + // go back to waiting. If this was a spurious wake-up, we export only if + // `buffer_` is not empty. This is acceptable because batching is a best + // mechanism effort here. + if (buffer_.empty() == true) + { + continue; + } + } + + auto start = std::chrono::steady_clock::now(); + + Export(was_force_flush_called); + + auto end = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast(end - start); + + // Subtract the duration of this export call from the next `timeout`. + timeout = schedule_delay_millis_ - duration; + } +} + +void BatchLogProcessor::Export(const bool was_force_flush_called) +{ + std::vector> records_arr; + + size_t num_records_to_export; + + if (was_force_flush_called == true) + { + num_records_to_export = buffer_.size(); + } + else + { + num_records_to_export = + buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); + } + + buffer_.Consume( + num_records_to_export, [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + std::unique_ptr swap_ptr = std::unique_ptr(nullptr); + ptr.Swap(swap_ptr); + records_arr.push_back(std::unique_ptr(swap_ptr.release())); + return true; + }); + }); + + ExportResult export_status = exporter_->Export(records_arr); + if (export_status != ExportResult::kSuccess) + { + std::cerr << "[Batch Log Processor] Export failed \n"; + } + + // Notify the main thread in case this export was the result of a force flush. + if (was_force_flush_called == true) + { + is_force_flush_notified_ = true; + while (is_force_flush_notified_.load() == true) + { + force_flush_cv_.notify_one(); + } + } +} + +void BatchLogProcessor::DrainQueue() +{ + while (buffer_.empty() == false) + { + Export(false); + } +} + +// Note: Timeout functionality is currently not implemented +bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept +{ + if (is_shutdown_.load() == true) + { + return false; + } + + is_shutdown_ = true; + + // notifies worker thread that shutdown has been called + cv_.notify_one(); + // wait until worker thread completes current export + worker_thread_.join(); + // calls the exporter to shutdown + if (exporter_ != nullptr) + { + return exporter_->Shutdown(); + } + return true; +} + +BatchLogProcessor::~BatchLogProcessor() +{ + if (is_shutdown_.load() == false) + { + Shutdown(); + } +} + +} // namespace logs +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/logs/BUILD b/sdk/test/logs/BUILD index 9e12e08ab9..e70527243f 100644 --- a/sdk/test/logs/BUILD +++ b/sdk/test/logs/BUILD @@ -42,3 +42,14 @@ cc_test( "@com_google_googletest//:gtest_main", ], ) + +cc_test( + name = "batch_log_processor_test", + srcs = [ + "batch_log_processor_test.cc", + ], + deps = [ + "//sdk/src/logs", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/sdk/test/logs/CMakeLists.txt b/sdk/test/logs/CMakeLists.txt index f59c6a1926..84b865d226 100644 --- a/sdk/test/logs/CMakeLists.txt +++ b/sdk/test/logs/CMakeLists.txt @@ -1,5 +1,5 @@ -foreach(testname logger_provider_sdk_test logger_sdk_test - simple_log_processor_test log_record_test) +foreach(testname logger_provider_sdk_test logger_sdk_test log_record_test + simple_log_processor_test batch_log_processor_test) add_executable(${testname} "${testname}.cc") target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} opentelemetry_logs) diff --git a/sdk/test/logs/batch_log_processor_test.cc b/sdk/test/logs/batch_log_processor_test.cc new file mode 100644 index 0000000000..56a8e9aded --- /dev/null +++ b/sdk/test/logs/batch_log_processor_test.cc @@ -0,0 +1,273 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "opentelemetry/sdk/logs/batch_log_processor.h" +#include "opentelemetry/sdk/logs/exporter.h" + +#include +#include +#include + +using namespace opentelemetry::sdk::logs; +using opentelemetry::logs::LogRecord; + +/** + * A sample log exporter + * for testing the batch log processor + */ +class TestLogExporter final : public LogExporter +{ +private: + std::shared_ptr> logs_received_; + std::shared_ptr> is_shutdown_; + std::shared_ptr> is_export_completed_; + const std::chrono::milliseconds export_delay_; + +public: + TestLogExporter(std::shared_ptr> logs_received, + std::shared_ptr> is_shutdown, + std::shared_ptr> is_export_completed, + const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) + : logs_received_(logs_received), + is_shutdown_(is_shutdown), + is_export_completed_(is_export_completed), + export_delay_(export_delay) + {} + + // Export method stores the logs received into a shared list of record names + ExportResult Export( + const opentelemetry::nostd::span> &records) noexcept override + { + *is_export_completed_ = false; // Meant exclusively to test force flush timeout + std::this_thread::sleep_for(export_delay_); // give time for the "export" to complete + + for (auto &record : records) + { + logs_received_->push_back(record->name.data()); + } + + *is_export_completed_ = true; + return ExportResult::kSuccess; + } + + // toggles the boolean flag marking this exporter as shut down + bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override + { + *is_shutdown_ = true; + return true; + } +}; + +/** + * A fixture class for testing the BatchLogProcessor class that uses the TestExporter defined above. + */ +class BatchLogProcessorTest : public testing::Test // ::testing::Test +{ +public: + // returns a batch log processor that received a batch of log records, a shared pointer to a + // is_shutdown flag, and the processor configuration options (default if unspecified) + std::shared_ptr GetTestProcessor( + std::shared_ptr> logs_received, + std::shared_ptr> is_shutdown, + std::shared_ptr> is_export_completed = + std::shared_ptr>(new std::atomic(false)), + const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), + const std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(5000), + const size_t max_queue_size = 2048, + const size_t max_export_batch_size = 512) + { + return std::shared_ptr( + new BatchLogProcessor(std::unique_ptr(new TestLogExporter( + logs_received, is_shutdown, is_export_completed, export_delay)), + max_queue_size, schedule_delay_millis, max_export_batch_size)); + } +}; + +TEST_F(BatchLogProcessorTest, TestShutdown) +{ + // initialize a batch log processor with the test exporter + std::shared_ptr> logs_received(new std::vector); + std::shared_ptr> is_shutdown(new std::atomic(false)); + + auto batch_processor = GetTestProcessor(logs_received, is_shutdown); + + // create a few test log records and send them to the processor + const int num_logs = 3; + + for (int i = 0; i < num_logs; ++i) + { + auto log = std::unique_ptr(new LogRecord()); + log->name = "Log name"; + batch_processor->OnReceive(std::move(log)); + } + + // Test that shutting down the processor will first wait for the + // current batch of logs to be sent to the log exporter + // by checking the number of logs sent and the names of the logs sent + EXPECT_EQ(true, batch_processor->Shutdown()); + + EXPECT_EQ(num_logs, logs_received->size()); + + // Assume logs are received by exporter in same order as sent by processor + for (int i = 0; i < num_logs; ++i) + { + EXPECT_EQ("Log name", logs_received->at(i)); + } + + // Also check that the processor is shut down at the end + EXPECT_TRUE(is_shutdown->load()); +} + +TEST_F(BatchLogProcessorTest, TestForceFlush) +{ + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr> logs_received(new std::vector); + + auto batch_processor = GetTestProcessor(logs_received, is_shutdown); + const int num_logs = 2048; + + for (int i = 0; i < num_logs; ++i) + { + auto log = std::unique_ptr(new LogRecord()); + log->name = "Log name"; + batch_processor->OnReceive(std::move(log)); + } + + // Give some time to export + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + batch_processor->ForceFlush(); + + EXPECT_EQ(num_logs, logs_received->size()); + for (int i = 0; i < num_logs; ++i) + { + EXPECT_EQ("Log name", logs_received->at(i)); + } + + // Create some more logs to make sure that the processor still works + for (int i = 0; i < num_logs; ++i) + { + auto log = std::unique_ptr(new LogRecord()); + log->name = "Log name"; + batch_processor->OnReceive(std::move(log)); + } + + // Give some time to export the logs + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + batch_processor->ForceFlush(); + + EXPECT_EQ(num_logs * 2, logs_received->size()); + for (int i = 0; i < num_logs * 2; ++i) + { + EXPECT_EQ("Log name", logs_received->at(i)); + } +} + +TEST_F(BatchLogProcessorTest, TestManyLogsLoss) +{ + /* Test that when exporting more than max_queue_size logs, some are most likely lost*/ + + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr> logs_received(new std::vector); + + const int max_queue_size = 4096; + + auto batch_processor = GetTestProcessor(logs_received, is_shutdown); + + // Create max_queue_size log records + for (int i = 0; i < max_queue_size; ++i) + { + auto log = std::unique_ptr(new LogRecord()); + log->name = "Log name"; + batch_processor->OnReceive(std::move(log)); + } + + // Give some time to export the logs + std::this_thread::sleep_for(std::chrono::milliseconds(700)); + + batch_processor->ForceFlush(); + + // Log should be exported by now + EXPECT_GE(max_queue_size, logs_received->size()); +} + +TEST_F(BatchLogProcessorTest, TestManyLogsLossLess) +{ + /* Test that no logs are lost when sending max_queue_size logs */ + + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr> logs_received(new std::vector); + auto batch_processor = GetTestProcessor(logs_received, is_shutdown); + + const int num_logs = 2048; + + for (int i = 0; i < num_logs; ++i) + { + auto log = std::unique_ptr(new LogRecord()); + log->name = "Log name"; + batch_processor->OnReceive(std::move(log)); + } + + // Give some time to export the logs + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + batch_processor->ForceFlush(); + + EXPECT_EQ(num_logs, logs_received->size()); + for (int i = 0; i < num_logs; ++i) + { + EXPECT_EQ("Log name", logs_received->at(i)); + } +} + +TEST_F(BatchLogProcessorTest, TestScheduleDelayMillis) +{ + /* Test that max_export_batch_size logs are exported every schedule_delay_millis + seconds */ + + std::shared_ptr> is_shutdown(new std::atomic(false)); + std::shared_ptr> is_export_completed(new std::atomic(false)); + std::shared_ptr> logs_received(new std::vector); + + const std::chrono::milliseconds export_delay(0); + const std::chrono::milliseconds schedule_delay_millis(2000); + const size_t max_export_batch_size = 512; + + auto batch_processor = GetTestProcessor(logs_received, is_shutdown, is_export_completed, + export_delay, schedule_delay_millis); + + for (int i = 0; i < max_export_batch_size; ++i) + { + auto log = std::unique_ptr(new LogRecord()); + log->name = "Log name"; + batch_processor->OnReceive(std::move(log)); + } + // Sleep for schedule_delay_millis milliseconds + std::this_thread::sleep_for(schedule_delay_millis); + + // small delay to give time to export + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + + // Logs should be exported by now + EXPECT_TRUE(is_export_completed->load()); + EXPECT_EQ(max_export_batch_size, logs_received->size()); + for (size_t i = 0; i < max_export_batch_size; ++i) + { + EXPECT_EQ("Log name", logs_received->at(i)); + } +} From 175280bbe61d16e7ea78d6a4fdb821c86b4c97e0 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Thu, 10 Dec 2020 14:30:13 -0500 Subject: [PATCH 02/12] Rename --- sdk/test/logs/batch_log_processor_test.cc | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sdk/test/logs/batch_log_processor_test.cc b/sdk/test/logs/batch_log_processor_test.cc index 56a8e9aded..ef505b857b 100644 --- a/sdk/test/logs/batch_log_processor_test.cc +++ b/sdk/test/logs/batch_log_processor_test.cc @@ -28,7 +28,7 @@ using opentelemetry::logs::LogRecord; * A sample log exporter * for testing the batch log processor */ -class TestLogExporter final : public LogExporter +class MockLogExporter final : public LogExporter { private: std::shared_ptr> logs_received_; @@ -37,7 +37,7 @@ class TestLogExporter final : public LogExporter const std::chrono::milliseconds export_delay_; public: - TestLogExporter(std::shared_ptr> logs_received, + MockLogExporter(std::shared_ptr> logs_received, std::shared_ptr> is_shutdown, std::shared_ptr> is_export_completed, const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) @@ -80,7 +80,7 @@ class BatchLogProcessorTest : public testing::Test // ::testing::Test public: // returns a batch log processor that received a batch of log records, a shared pointer to a // is_shutdown flag, and the processor configuration options (default if unspecified) - std::shared_ptr GetTestProcessor( + std::shared_ptr GetMockProcessor( std::shared_ptr> logs_received, std::shared_ptr> is_shutdown, std::shared_ptr> is_export_completed = @@ -91,7 +91,7 @@ class BatchLogProcessorTest : public testing::Test // ::testing::Test const size_t max_export_batch_size = 512) { return std::shared_ptr( - new BatchLogProcessor(std::unique_ptr(new TestLogExporter( + new BatchLogProcessor(std::unique_ptr(new MockLogExporter( logs_received, is_shutdown, is_export_completed, export_delay)), max_queue_size, schedule_delay_millis, max_export_batch_size)); } @@ -103,7 +103,7 @@ TEST_F(BatchLogProcessorTest, TestShutdown) std::shared_ptr> logs_received(new std::vector); std::shared_ptr> is_shutdown(new std::atomic(false)); - auto batch_processor = GetTestProcessor(logs_received, is_shutdown); + auto batch_processor = GetMockProcessor(logs_received, is_shutdown); // create a few test log records and send them to the processor const int num_logs = 3; @@ -137,7 +137,7 @@ TEST_F(BatchLogProcessorTest, TestForceFlush) std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr> logs_received(new std::vector); - auto batch_processor = GetTestProcessor(logs_received, is_shutdown); + auto batch_processor = GetMockProcessor(logs_received, is_shutdown); const int num_logs = 2048; for (int i = 0; i < num_logs; ++i) @@ -187,7 +187,7 @@ TEST_F(BatchLogProcessorTest, TestManyLogsLoss) const int max_queue_size = 4096; - auto batch_processor = GetTestProcessor(logs_received, is_shutdown); + auto batch_processor = GetMockProcessor(logs_received, is_shutdown); // Create max_queue_size log records for (int i = 0; i < max_queue_size; ++i) @@ -212,7 +212,7 @@ TEST_F(BatchLogProcessorTest, TestManyLogsLossLess) std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr> logs_received(new std::vector); - auto batch_processor = GetTestProcessor(logs_received, is_shutdown); + auto batch_processor = GetMockProcessor(logs_received, is_shutdown); const int num_logs = 2048; @@ -248,7 +248,7 @@ TEST_F(BatchLogProcessorTest, TestScheduleDelayMillis) const std::chrono::milliseconds schedule_delay_millis(2000); const size_t max_export_batch_size = 512; - auto batch_processor = GetTestProcessor(logs_received, is_shutdown, is_export_completed, + auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, export_delay, schedule_delay_millis); for (int i = 0; i < max_export_batch_size; ++i) From 977e3fcfc9934462eaed4c3c177cc2f64c8269bc Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Thu, 10 Dec 2020 14:34:22 -0500 Subject: [PATCH 03/12] Remove error printing --- sdk/src/logs/batch_log_processor.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index 34a61da89c..8705bf2a7f 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -50,7 +50,6 @@ void BatchLogProcessor::OnReceive(std::unique_ptr &&record) noexcept if (buffer_.Add(record) == false) { - std::cerr << "[Batch Log Processor] Adding record to circular buffer failed \n"; return; } @@ -168,7 +167,7 @@ void BatchLogProcessor::Export(const bool was_force_flush_called) ExportResult export_status = exporter_->Export(records_arr); if (export_status != ExportResult::kSuccess) { - std::cerr << "[Batch Log Processor] Export failed \n"; + // Error } // Notify the main thread in case this export was the result of a force flush. From 30f97318b9dc51f15ae41787c6536da84894b2bd Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Sat, 12 Dec 2020 22:42:59 -0500 Subject: [PATCH 04/12] Update with Recordable --- .../sdk/logs/batch_log_processor.h | 7 +- sdk/src/logs/batch_log_processor.cc | 18 +++-- sdk/test/logs/batch_log_processor_test.cc | 79 +++++++++++-------- 3 files changed, 62 insertions(+), 42 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h index 2e75074a91..c5606b3218 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -55,12 +55,15 @@ class BatchLogProcessor : public LogProcessor const std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(5000), const size_t max_export_batch_size = 512); + /** Makes a new recordable **/ + std::unique_ptr MakeRecordable() noexcept override; + /** * Called when the Logger's log method creates a log record * @param record the log record */ - void OnReceive(std::unique_ptr &&record) noexcept override; + void OnReceive(std::unique_ptr &&record) noexcept override; /** * Export all log records that have not been exported yet. @@ -120,7 +123,7 @@ class BatchLogProcessor : public LogProcessor std::mutex cv_m_, force_flush_cv_m_; /* The buffer/queue to which the ended logs are added */ - common::CircularBuffer buffer_; + common::CircularBuffer buffer_; /* Important boolean flags to handle the workflow of the processor */ std::atomic is_shutdown_{false}; diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index 8705bf2a7f..3ddeb0722a 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -19,7 +19,6 @@ #include #include -using opentelemetry::logs::LogRecord; using opentelemetry::sdk::common::AtomicUniquePtr; using opentelemetry::sdk::common::CircularBuffer; using opentelemetry::sdk::common::CircularBufferRange; @@ -41,7 +40,12 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) {} -void BatchLogProcessor::OnReceive(std::unique_ptr &&record) noexcept +std::unique_ptr BatchLogProcessor::MakeRecordable() noexcept +{ + return exporter_->MakeRecordable(); +} + +void BatchLogProcessor::OnReceive(std::unique_ptr &&record) noexcept { if (is_shutdown_.load() == true) { @@ -140,7 +144,7 @@ void BatchLogProcessor::DoBackgroundWork() void BatchLogProcessor::Export(const bool was_force_flush_called) { - std::vector> records_arr; + std::vector> records_arr; size_t num_records_to_export; @@ -155,11 +159,11 @@ void BatchLogProcessor::Export(const bool was_force_flush_called) } buffer_.Consume( - num_records_to_export, [&](CircularBufferRange> range) noexcept { - range.ForEach([&](AtomicUniquePtr &ptr) { - std::unique_ptr swap_ptr = std::unique_ptr(nullptr); + num_records_to_export, [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + std::unique_ptr swap_ptr = std::unique_ptr(nullptr); ptr.Swap(swap_ptr); - records_arr.push_back(std::unique_ptr(swap_ptr.release())); + records_arr.push_back(std::unique_ptr(swap_ptr.release())); return true; }); }); diff --git a/sdk/test/logs/batch_log_processor_test.cc b/sdk/test/logs/batch_log_processor_test.cc index ef505b857b..1c74341a65 100644 --- a/sdk/test/logs/batch_log_processor_test.cc +++ b/sdk/test/logs/batch_log_processor_test.cc @@ -16,13 +16,13 @@ #include "opentelemetry/sdk/logs/batch_log_processor.h" #include "opentelemetry/sdk/logs/exporter.h" +#include "opentelemetry/sdk/logs/log_record.h" #include #include #include using namespace opentelemetry::sdk::logs; -using opentelemetry::logs::LogRecord; /** * A sample log exporter @@ -30,14 +30,8 @@ using opentelemetry::logs::LogRecord; */ class MockLogExporter final : public LogExporter { -private: - std::shared_ptr> logs_received_; - std::shared_ptr> is_shutdown_; - std::shared_ptr> is_export_completed_; - const std::chrono::milliseconds export_delay_; - public: - MockLogExporter(std::shared_ptr> logs_received, + MockLogExporter(std::shared_ptr>> logs_received, std::shared_ptr> is_shutdown, std::shared_ptr> is_export_completed, const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0)) @@ -47,16 +41,24 @@ class MockLogExporter final : public LogExporter export_delay_(export_delay) {} + std::unique_ptr MakeRecordable() noexcept + { + return std::unique_ptr(new LogRecord()); + } // Export method stores the logs received into a shared list of record names ExportResult Export( - const opentelemetry::nostd::span> &records) noexcept override + const opentelemetry::nostd::span> &records) noexcept override { *is_export_completed_ = false; // Meant exclusively to test force flush timeout std::this_thread::sleep_for(export_delay_); // give time for the "export" to complete for (auto &record : records) { - logs_received_->push_back(record->name.data()); + auto log = std::unique_ptr(static_cast(record.release())); + if (log != nullptr) + { + logs_received_->push_back(std::move(log)); + } } *is_export_completed_ = true; @@ -70,6 +72,12 @@ class MockLogExporter final : public LogExporter *is_shutdown_ = true; return true; } + +private: + std::shared_ptr>> logs_received_; + std::shared_ptr> is_shutdown_; + std::shared_ptr> is_export_completed_; + const std::chrono::milliseconds export_delay_; }; /** @@ -81,7 +89,7 @@ class BatchLogProcessorTest : public testing::Test // ::testing::Test // returns a batch log processor that received a batch of log records, a shared pointer to a // is_shutdown flag, and the processor configuration options (default if unspecified) std::shared_ptr GetMockProcessor( - std::shared_ptr> logs_received, + std::shared_ptr>> logs_received, std::shared_ptr> is_shutdown, std::shared_ptr> is_export_completed = std::shared_ptr>(new std::atomic(false)), @@ -100,7 +108,8 @@ class BatchLogProcessorTest : public testing::Test // ::testing::Test TEST_F(BatchLogProcessorTest, TestShutdown) { // initialize a batch log processor with the test exporter - std::shared_ptr> logs_received(new std::vector); + std::shared_ptr>> logs_received( + new std::vector>); std::shared_ptr> is_shutdown(new std::atomic(false)); auto batch_processor = GetMockProcessor(logs_received, is_shutdown); @@ -110,8 +119,8 @@ TEST_F(BatchLogProcessorTest, TestShutdown) for (int i = 0; i < num_logs; ++i) { - auto log = std::unique_ptr(new LogRecord()); - log->name = "Log name"; + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); batch_processor->OnReceive(std::move(log)); } @@ -125,7 +134,7 @@ TEST_F(BatchLogProcessorTest, TestShutdown) // Assume logs are received by exporter in same order as sent by processor for (int i = 0; i < num_logs; ++i) { - EXPECT_EQ("Log name", logs_received->at(i)); + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); } // Also check that the processor is shut down at the end @@ -135,15 +144,16 @@ TEST_F(BatchLogProcessorTest, TestShutdown) TEST_F(BatchLogProcessorTest, TestForceFlush) { std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr> logs_received(new std::vector); + std::shared_ptr>> logs_received( + new std::vector>); auto batch_processor = GetMockProcessor(logs_received, is_shutdown); const int num_logs = 2048; for (int i = 0; i < num_logs; ++i) { - auto log = std::unique_ptr(new LogRecord()); - log->name = "Log name"; + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); batch_processor->OnReceive(std::move(log)); } @@ -155,14 +165,14 @@ TEST_F(BatchLogProcessorTest, TestForceFlush) EXPECT_EQ(num_logs, logs_received->size()); for (int i = 0; i < num_logs; ++i) { - EXPECT_EQ("Log name", logs_received->at(i)); + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); } // Create some more logs to make sure that the processor still works for (int i = 0; i < num_logs; ++i) { - auto log = std::unique_ptr(new LogRecord()); - log->name = "Log name"; + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); batch_processor->OnReceive(std::move(log)); } @@ -174,7 +184,7 @@ TEST_F(BatchLogProcessorTest, TestForceFlush) EXPECT_EQ(num_logs * 2, logs_received->size()); for (int i = 0; i < num_logs * 2; ++i) { - EXPECT_EQ("Log name", logs_received->at(i)); + EXPECT_EQ("Log" + std::to_string(i % num_logs), logs_received->at(i)->GetName()); } } @@ -183,7 +193,8 @@ TEST_F(BatchLogProcessorTest, TestManyLogsLoss) /* Test that when exporting more than max_queue_size logs, some are most likely lost*/ std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr> logs_received(new std::vector); + std::shared_ptr>> logs_received( + new std::vector>); const int max_queue_size = 4096; @@ -192,8 +203,8 @@ TEST_F(BatchLogProcessorTest, TestManyLogsLoss) // Create max_queue_size log records for (int i = 0; i < max_queue_size; ++i) { - auto log = std::unique_ptr(new LogRecord()); - log->name = "Log name"; + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); batch_processor->OnReceive(std::move(log)); } @@ -211,15 +222,16 @@ TEST_F(BatchLogProcessorTest, TestManyLogsLossLess) /* Test that no logs are lost when sending max_queue_size logs */ std::shared_ptr> is_shutdown(new std::atomic(false)); - std::shared_ptr> logs_received(new std::vector); + std::shared_ptr>> logs_received( + new std::vector>); auto batch_processor = GetMockProcessor(logs_received, is_shutdown); const int num_logs = 2048; for (int i = 0; i < num_logs; ++i) { - auto log = std::unique_ptr(new LogRecord()); - log->name = "Log name"; + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); batch_processor->OnReceive(std::move(log)); } @@ -231,7 +243,7 @@ TEST_F(BatchLogProcessorTest, TestManyLogsLossLess) EXPECT_EQ(num_logs, logs_received->size()); for (int i = 0; i < num_logs; ++i) { - EXPECT_EQ("Log name", logs_received->at(i)); + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); } } @@ -242,7 +254,8 @@ TEST_F(BatchLogProcessorTest, TestScheduleDelayMillis) std::shared_ptr> is_shutdown(new std::atomic(false)); std::shared_ptr> is_export_completed(new std::atomic(false)); - std::shared_ptr> logs_received(new std::vector); + std::shared_ptr>> logs_received( + new std::vector>); const std::chrono::milliseconds export_delay(0); const std::chrono::milliseconds schedule_delay_millis(2000); @@ -253,8 +266,8 @@ TEST_F(BatchLogProcessorTest, TestScheduleDelayMillis) for (int i = 0; i < max_export_batch_size; ++i) { - auto log = std::unique_ptr(new LogRecord()); - log->name = "Log name"; + auto log = batch_processor->MakeRecordable(); + log->SetName("Log" + std::to_string(i)); batch_processor->OnReceive(std::move(log)); } // Sleep for schedule_delay_millis milliseconds @@ -268,6 +281,6 @@ TEST_F(BatchLogProcessorTest, TestScheduleDelayMillis) EXPECT_EQ(max_export_batch_size, logs_received->size()); for (size_t i = 0; i < max_export_batch_size; ++i) { - EXPECT_EQ("Log name", logs_received->at(i)); + EXPECT_EQ("Log" + std::to_string(i), logs_received->at(i)->GetName()); } } From 14033b514096017b57d9c8cd00217e383b2798e4 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Tue, 15 Dec 2020 02:41:41 -0500 Subject: [PATCH 05/12] Add lock to Shutdown(), Replace while loop with cv.wait with predicate Remove while loop around cv.notify Replace operator= with .store() for clarity --- .../sdk/logs/batch_log_processor.h | 5 +-- sdk/src/logs/batch_log_processor.cc | 32 +++++++++---------- 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h index c5606b3218..2f503b9e22 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -120,13 +120,14 @@ class BatchLogProcessor : public LogProcessor /* Synchronization primitives */ std::condition_variable cv_, force_flush_cv_; - std::mutex cv_m_, force_flush_cv_m_; + std::mutex cv_m_, force_flush_cv_m_, shutdown_mutex_; /* The buffer/queue to which the ended logs are added */ common::CircularBuffer buffer_; /* Important boolean flags to handle the workflow of the processor */ - std::atomic is_shutdown_{false}; + std::atomic is_shutdown_{false}; // once is_shutdown is set to true, there shouldn't be a + // case it's ever set to false again std::atomic is_force_flush_{false}; std::atomic is_force_flush_notified_{false}; diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index 3ddeb0722a..3815d5c89b 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -20,7 +20,6 @@ #include using opentelemetry::sdk::common::AtomicUniquePtr; -using opentelemetry::sdk::common::CircularBuffer; using opentelemetry::sdk::common::CircularBufferRange; OPENTELEMETRY_BEGIN_NAMESPACE @@ -73,7 +72,7 @@ bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept return false; } - is_force_flush_ = true; + is_force_flush_.store(true); // Keep attempting to wake up the worker thread while (is_force_flush_.load() == true) @@ -83,16 +82,15 @@ bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept // Now wait for the worker thread to signal back from the Export method std::unique_lock lk(force_flush_cv_m_); - while (is_force_flush_notified_.load() == false) - { - force_flush_cv_.wait(lk); - } + force_flush_cv_.wait(lk, [this] { return is_force_flush_notified_.load(); }); // Notify the worker thread - is_force_flush_notified_ = false; + is_force_flush_notified_.store(false); return true; } +// Note this thread will only be called once by the worker thread (which there is only 1 of) +// in the constructor, thus it will not be called concurently. void BatchLogProcessor::DoBackgroundWork() { auto timeout = schedule_delay_millis_; @@ -116,7 +114,7 @@ void BatchLogProcessor::DoBackgroundWork() { // Since this export was the result of a force flush, signal the // main thread that the worker thread has been notified - is_force_flush_ = false; + is_force_flush_.store(false); } else { @@ -168,20 +166,17 @@ void BatchLogProcessor::Export(const bool was_force_flush_called) }); }); - ExportResult export_status = exporter_->Export(records_arr); - if (export_status != ExportResult::kSuccess) + if (exporter_->Export(records_arr) != ExportResult::kSuccess) { - // Error + // Indicate Error: "[Batch Log Processor]: Failed to export a batch" } // Notify the main thread in case this export was the result of a force flush. if (was_force_flush_called == true) { - is_force_flush_notified_ = true; - while (is_force_flush_notified_.load() == true) - { - force_flush_cv_.notify_one(); - } + is_force_flush_notified_.store(true); + // Notifies the thread + force_flush_cv_.notify_one(); } } @@ -196,12 +191,15 @@ void BatchLogProcessor::DrainQueue() // Note: Timeout functionality is currently not implemented bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { + // Use a lock to ensure only one thread executes the shutdown function at a time + std::unique_lock lock(shutdown_mutex_); + if (is_shutdown_.load() == true) { return false; } - is_shutdown_ = true; + is_shutdown_.store(true); // notifies worker thread that shutdown has been called cv_.notify_one(); From 227ee38326fb25cef61fecd18e8de13c9e90e3f0 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Tue, 15 Dec 2020 15:26:12 -0500 Subject: [PATCH 06/12] Atomic exchange for shutdown --- .../sdk/logs/batch_log_processor.h | 2 +- sdk/src/logs/batch_log_processor.cc | 32 ++++++++----------- 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h index 2f503b9e22..6818460cbb 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -120,7 +120,7 @@ class BatchLogProcessor : public LogProcessor /* Synchronization primitives */ std::condition_variable cv_, force_flush_cv_; - std::mutex cv_m_, force_flush_cv_m_, shutdown_mutex_; + std::mutex cv_m_, force_flush_cv_m_; /* The buffer/queue to which the ended logs are added */ common::CircularBuffer buffer_; diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index 3815d5c89b..6d9aa3bf2f 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -191,26 +191,22 @@ void BatchLogProcessor::DrainQueue() // Note: Timeout functionality is currently not implemented bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { - // Use a lock to ensure only one thread executes the shutdown function at a time - std::unique_lock lock(shutdown_mutex_); - - if (is_shutdown_.load() == true) + // Atomically checking whether value of is_shutdown_ is false + // and setting it to true + if (is_shutdown_.exchange(true) == false) { - return false; - } - - is_shutdown_.store(true); - - // notifies worker thread that shutdown has been called - cv_.notify_one(); - // wait until worker thread completes current export - worker_thread_.join(); - // calls the exporter to shutdown - if (exporter_ != nullptr) - { - return exporter_->Shutdown(); + // notifies worker thread that shutdown has been called + cv_.notify_one(); + // wait until worker thread completes current export + worker_thread_.join(); + // calls the exporter to shutdown + if (exporter_ != nullptr) + { + return exporter_->Shutdown(); + } + return true; } - return true; + return false; } BatchLogProcessor::~BatchLogProcessor() From b3a1f2f13ed5cffc428461a938bcb302d0f5c145 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Wed, 16 Dec 2020 19:06:04 -0500 Subject: [PATCH 07/12] Revert to using lock for Shutdown(), virtual override dtor, no if --- sdk/include/opentelemetry/sdk/logs/batch_log_processor.h | 4 ++-- sdk/src/logs/batch_log_processor.cc | 7 +++---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h index 6818460cbb..6859cbcdca 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -86,7 +86,7 @@ class BatchLogProcessor : public LogProcessor /** * Class destructor which invokes the Shutdown() method. */ - ~BatchLogProcessor(); + virtual ~BatchLogProcessor() override; private: /** @@ -120,7 +120,7 @@ class BatchLogProcessor : public LogProcessor /* Synchronization primitives */ std::condition_variable cv_, force_flush_cv_; - std::mutex cv_m_, force_flush_cv_m_; + std::mutex cv_m_, force_flush_cv_m_, shutdown_mutex_; /* The buffer/queue to which the ended logs are added */ common::CircularBuffer buffer_; diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index 6d9aa3bf2f..83d071925e 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -191,6 +191,8 @@ void BatchLogProcessor::DrainQueue() // Note: Timeout functionality is currently not implemented bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { + std::unique_lock lock(shutdown_mutex_); + // Atomically checking whether value of is_shutdown_ is false // and setting it to true if (is_shutdown_.exchange(true) == false) @@ -211,10 +213,7 @@ bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept BatchLogProcessor::~BatchLogProcessor() { - if (is_shutdown_.load() == false) - { - Shutdown(); - } + Shutdown(); } } // namespace logs From 49039a138a20e529511aeb50e98e9d6b47dd3f15 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Wed, 16 Dec 2020 19:59:15 -0500 Subject: [PATCH 08/12] Remove thread sleep, simplify DoBackgroundWork(), reorg --- sdk/src/logs/batch_log_processor.cc | 195 ++++++++++++---------- sdk/test/logs/batch_log_processor_test.cc | 29 +--- 2 files changed, 115 insertions(+), 109 deletions(-) diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index 83d071925e..7956a5fb99 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -27,6 +27,8 @@ namespace sdk { namespace logs { +/************************** Constructor *******************************/ + BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, const size_t max_queue_size, const std::chrono::milliseconds schedule_delay_millis, @@ -39,58 +41,69 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) {} -std::unique_ptr BatchLogProcessor::MakeRecordable() noexcept -{ - return exporter_->MakeRecordable(); -} +/************************** Helper methods used by the worker thread ***************/ -void BatchLogProcessor::OnReceive(std::unique_ptr &&record) noexcept +void BatchLogProcessor::Export(const bool was_force_flush_called) { - if (is_shutdown_.load() == true) + // Figure out the number of records to export + size_t num_records_to_export; + + if (was_force_flush_called == true) { - return; + num_records_to_export = buffer_.size(); } - - if (buffer_.Add(record) == false) + else { - return; + // Export the min of max_export_batch_size and buffer_size + num_records_to_export = + buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); } - // If the queue gets at least half full a preemptive notification is - // sent to the worker thread to start a new export cycle. - if (buffer_.size() >= max_queue_size_ / 2) + // Get records from the circular buffer and put into a vector + std::vector> records_arr; + + buffer_.Consume( + num_records_to_export, [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + std::unique_ptr swap_ptr = std::unique_ptr(nullptr); + ptr.Swap(swap_ptr); + records_arr.push_back(std::unique_ptr(swap_ptr.release())); + return true; + }); + }); + + // Call exporter with the records, and get export status + if (exporter_->Export(nostd::span>( + records_arr.data(), records_arr.size())) != ExportResult::kSuccess) { - // signal the worker thread - cv_.notify_one(); + // Indicate Error: "[Batch Log Processor]: Failed to export a batch" } -} -bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept -{ - if (is_shutdown_.load() == true) + // In case this export was the result of a force flush, notify main thread that force flush is + // complete + if (was_force_flush_called == true) { - return false; - } + is_force_flush_notified_.store(true); - is_force_flush_.store(true); + while (is_force_flush_notified_.load() == true) + { + force_flush_cv_.notify_one(); + } + } +} - // Keep attempting to wake up the worker thread - while (is_force_flush_.load() == true) +void BatchLogProcessor::DrainQueue() +{ + while (buffer_.empty() == false) { - cv_.notify_one(); + Export(false); } - - // Now wait for the worker thread to signal back from the Export method - std::unique_lock lk(force_flush_cv_m_); - force_flush_cv_.wait(lk, [this] { return is_force_flush_notified_.load(); }); - - // Notify the worker thread - is_force_flush_notified_.store(false); - return true; } -// Note this thread will only be called once by the worker thread (which there is only 1 of) -// in the constructor, thus it will not be called concurently. +/** + * Note this method will not be called concurrently, as it is only called once in the processor's + * constructor + */ void BatchLogProcessor::DoBackgroundWork() { auto timeout = schedule_delay_millis_; @@ -101,31 +114,23 @@ void BatchLogProcessor::DoBackgroundWork() std::unique_lock lk(cv_m_); cv_.wait_for(lk, timeout); + // Check if shutown was called, drain queue if (is_shutdown_.load() == true) { DrainQueue(); return; } - bool was_force_flush_called = is_force_flush_.load(); + // Check if ForceFlush was called, notify main thread that worker thread has been notified + bool was_force_flush_called = is_force_flush_.exchange(false); - // Check if this export was the result of a force flush. - if (was_force_flush_called == true) + // If the buffer was empty during the entire `timeout` time interval, + // go back to waiting. If this was a spurious wake-up, we export only if + // `buffer_` is not empty. This is acceptable because batching is a best + // mechanism effort here. + if (buffer_.empty() == true) { - // Since this export was the result of a force flush, signal the - // main thread that the worker thread has been notified - is_force_flush_.store(false); - } - else - { - // If the buffer was empty during the entire `timeout` time interval, - // go back to waiting. If this was a spurious wake-up, we export only if - // `buffer_` is not empty. This is acceptable because batching is a best - // mechanism effort here. - if (buffer_.empty() == true) - { - continue; - } + continue; } auto start = std::chrono::steady_clock::now(); @@ -140,68 +145,80 @@ void BatchLogProcessor::DoBackgroundWork() } } -void BatchLogProcessor::Export(const bool was_force_flush_called) -{ - std::vector> records_arr; +/****************************** Overloaded processor methods ******************************/ - size_t num_records_to_export; +std::unique_ptr BatchLogProcessor::MakeRecordable() noexcept +{ + return exporter_->MakeRecordable(); +} - if (was_force_flush_called == true) - { - num_records_to_export = buffer_.size(); - } - else +void BatchLogProcessor::OnReceive(std::unique_ptr &&record) noexcept +{ + if (is_shutdown_.load() == true) { - num_records_to_export = - buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); + return; } - buffer_.Consume( - num_records_to_export, [&](CircularBufferRange> range) noexcept { - range.ForEach([&](AtomicUniquePtr &ptr) { - std::unique_ptr swap_ptr = std::unique_ptr(nullptr); - ptr.Swap(swap_ptr); - records_arr.push_back(std::unique_ptr(swap_ptr.release())); - return true; - }); - }); - - if (exporter_->Export(records_arr) != ExportResult::kSuccess) + if (buffer_.Add(record) == false) { - // Indicate Error: "[Batch Log Processor]: Failed to export a batch" + return; } - // Notify the main thread in case this export was the result of a force flush. - if (was_force_flush_called == true) + // If the queue gets at least half full a preemptive notification is + // sent to the worker thread to start a new export cycle. + if (buffer_.size() >= max_queue_size_ / 2) { - is_force_flush_notified_.store(true); - // Notifies the thread - force_flush_cv_.notify_one(); + // signal the worker thread + cv_.notify_one(); } } -void BatchLogProcessor::DrainQueue() +/** + * Notifies the worker thread that ForceFlush() has been called, + * and waits for worker to complete the force flush. + * + * Note: timeout functionality is currently not implemented. + */ +bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept { - while (buffer_.empty() == false) + if (is_shutdown_.load() == true) { - Export(false); + return false; } + + is_force_flush_.store(true); + + // Keep attempting to notify the worker thread that force flush has been caled + while (is_force_flush_.load() == true) + { + cv_.notify_one(); + } + + // Wait for the worker thread complete the force flush in the Export method + std::unique_lock lk(force_flush_cv_m_); + force_flush_cv_.wait(lk, [this] { return is_force_flush_notified_.load(); }); + + // Notify the worker thread + is_force_flush_notified_.store(false); + return true; } -// Note: Timeout functionality is currently not implemented +/** + * Drains the current buffer of logs before shutting down. + * Note: Timeout functionality is currently not implemented + */ bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { std::unique_lock lock(shutdown_mutex_); - // Atomically checking whether value of is_shutdown_ is false - // and setting it to true if (is_shutdown_.exchange(true) == false) { - // notifies worker thread that shutdown has been called + // Notifies worker thread that shutdown has been called cv_.notify_one(); - // wait until worker thread completes current export + // Wait until worker thread completes current export worker_thread_.join(); - // calls the exporter to shutdown + + // Shuts down the exporter if (exporter_ != nullptr) { return exporter_->Shutdown(); diff --git a/sdk/test/logs/batch_log_processor_test.cc b/sdk/test/logs/batch_log_processor_test.cc index 1c74341a65..e6a44b77bf 100644 --- a/sdk/test/logs/batch_log_processor_test.cc +++ b/sdk/test/logs/batch_log_processor_test.cc @@ -49,8 +49,7 @@ class MockLogExporter final : public LogExporter ExportResult Export( const opentelemetry::nostd::span> &records) noexcept override { - *is_export_completed_ = false; // Meant exclusively to test force flush timeout - std::this_thread::sleep_for(export_delay_); // give time for the "export" to complete + *is_export_completed_ = false; // Meant exclusively to test schedule_delay_millis for (auto &record : records) { @@ -114,7 +113,7 @@ TEST_F(BatchLogProcessorTest, TestShutdown) auto batch_processor = GetMockProcessor(logs_received, is_shutdown); - // create a few test log records and send them to the processor + // Create a few test log records and send them to the processor const int num_logs = 3; for (int i = 0; i < num_logs; ++i) @@ -157,10 +156,7 @@ TEST_F(BatchLogProcessorTest, TestForceFlush) batch_processor->OnReceive(std::move(log)); } - // Give some time to export - std::this_thread::sleep_for(std::chrono::milliseconds(500)); - - batch_processor->ForceFlush(); + EXPECT_TRUE(batch_processor->ForceFlush()); EXPECT_EQ(num_logs, logs_received->size()); for (int i = 0; i < num_logs; ++i) @@ -176,10 +172,7 @@ TEST_F(BatchLogProcessorTest, TestForceFlush) batch_processor->OnReceive(std::move(log)); } - // Give some time to export the logs - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - - batch_processor->ForceFlush(); + EXPECT_TRUE(batch_processor->ForceFlush()); EXPECT_EQ(num_logs * 2, logs_received->size()); for (int i = 0; i < num_logs * 2; ++i) @@ -208,10 +201,7 @@ TEST_F(BatchLogProcessorTest, TestManyLogsLoss) batch_processor->OnReceive(std::move(log)); } - // Give some time to export the logs - std::this_thread::sleep_for(std::chrono::milliseconds(700)); - - batch_processor->ForceFlush(); + EXPECT_TRUE(batch_processor->ForceFlush()); // Log should be exported by now EXPECT_GE(max_queue_size, logs_received->size()); @@ -235,10 +225,7 @@ TEST_F(BatchLogProcessorTest, TestManyLogsLossLess) batch_processor->OnReceive(std::move(log)); } - // Give some time to export the logs - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - - batch_processor->ForceFlush(); + EXPECT_TRUE(batch_processor->ForceFlush()); EXPECT_EQ(num_logs, logs_received->size()); for (int i = 0; i < num_logs; ++i) @@ -273,7 +260,9 @@ TEST_F(BatchLogProcessorTest, TestScheduleDelayMillis) // Sleep for schedule_delay_millis milliseconds std::this_thread::sleep_for(schedule_delay_millis); - // small delay to give time to export + // small delay to give time to export, which is being performed + // asynchronously by the worker thread (this thread will not + // forcibly join() the main thread unless processor's shutdown() is called). std::this_thread::sleep_for(std::chrono::milliseconds(50)); // Logs should be exported by now From bc3a1af417d12e7f8d9d188a203d871faac23aed Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Wed, 16 Dec 2020 23:03:32 -0500 Subject: [PATCH 09/12] Rename typo (scheduled_delay_millis) to match spec --- .../sdk/logs/batch_log_processor.h | 10 ++++---- sdk/src/logs/batch_log_processor.cc | 8 +++---- sdk/test/logs/batch_log_processor_test.cc | 24 +++++++++---------- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h index 6859cbcdca..9d068c9915 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -45,15 +45,15 @@ class BatchLogProcessor : public LogProcessor * @param exporter - The backend exporter to pass the logs to * @param max_queue_size - The maximum buffer/queue size. After the size is reached, logs are * dropped. - * @param schedule_delay_millis - The time interval between two consecutive exports. + * @param scheduled_delay_millis - The time interval between two consecutive exports. * @param max_export_batch_size - The maximum batch size of every export. It must be smaller or * equal to max_queue_size */ explicit BatchLogProcessor( std::unique_ptr &&exporter, - const size_t max_queue_size = 2048, - const std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(5000), - const size_t max_export_batch_size = 512); + const size_t max_queue_size = 2048, + const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000), + const size_t max_export_batch_size = 512); /** Makes a new recordable **/ std::unique_ptr MakeRecordable() noexcept override; @@ -115,7 +115,7 @@ class BatchLogProcessor : public LogProcessor /* Configurable parameters as per the official *trace* specs */ const size_t max_queue_size_; - const std::chrono::milliseconds schedule_delay_millis_; + const std::chrono::milliseconds scheduled_delay_millis_; const size_t max_export_batch_size_; /* Synchronization primitives */ diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index 7956a5fb99..55c97742f9 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -31,11 +31,11 @@ namespace logs BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, const size_t max_queue_size, - const std::chrono::milliseconds schedule_delay_millis, + const std::chrono::milliseconds scheduled_delay_millis, const size_t max_export_batch_size) : exporter_(std::move(exporter)), max_queue_size_(max_queue_size), - schedule_delay_millis_(schedule_delay_millis), + scheduled_delay_millis_(scheduled_delay_millis), max_export_batch_size_(max_export_batch_size), buffer_(max_queue_size_), worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) @@ -106,7 +106,7 @@ void BatchLogProcessor::DrainQueue() */ void BatchLogProcessor::DoBackgroundWork() { - auto timeout = schedule_delay_millis_; + auto timeout = scheduled_delay_millis_; while (true) { @@ -141,7 +141,7 @@ void BatchLogProcessor::DoBackgroundWork() auto duration = std::chrono::duration_cast(end - start); // Subtract the duration of this export call from the next `timeout`. - timeout = schedule_delay_millis_ - duration; + timeout = scheduled_delay_millis_ - duration; } } diff --git a/sdk/test/logs/batch_log_processor_test.cc b/sdk/test/logs/batch_log_processor_test.cc index e6a44b77bf..d7fc8af0d6 100644 --- a/sdk/test/logs/batch_log_processor_test.cc +++ b/sdk/test/logs/batch_log_processor_test.cc @@ -49,7 +49,7 @@ class MockLogExporter final : public LogExporter ExportResult Export( const opentelemetry::nostd::span> &records) noexcept override { - *is_export_completed_ = false; // Meant exclusively to test schedule_delay_millis + *is_export_completed_ = false; // Meant exclusively to test scheduled_delay_millis for (auto &record : records) { @@ -92,15 +92,15 @@ class BatchLogProcessorTest : public testing::Test // ::testing::Test std::shared_ptr> is_shutdown, std::shared_ptr> is_export_completed = std::shared_ptr>(new std::atomic(false)), - const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), - const std::chrono::milliseconds schedule_delay_millis = std::chrono::milliseconds(5000), - const size_t max_queue_size = 2048, - const size_t max_export_batch_size = 512) + const std::chrono::milliseconds export_delay = std::chrono::milliseconds(0), + const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000), + const size_t max_queue_size = 2048, + const size_t max_export_batch_size = 512) { return std::shared_ptr( new BatchLogProcessor(std::unique_ptr(new MockLogExporter( logs_received, is_shutdown, is_export_completed, export_delay)), - max_queue_size, schedule_delay_millis, max_export_batch_size)); + max_queue_size, scheduled_delay_millis, max_export_batch_size)); } }; @@ -234,9 +234,9 @@ TEST_F(BatchLogProcessorTest, TestManyLogsLossLess) } } -TEST_F(BatchLogProcessorTest, TestScheduleDelayMillis) +TEST_F(BatchLogProcessorTest, TestScheduledDelayMillis) { - /* Test that max_export_batch_size logs are exported every schedule_delay_millis + /* Test that max_export_batch_size logs are exported every scheduled_delay_millis seconds */ std::shared_ptr> is_shutdown(new std::atomic(false)); @@ -245,11 +245,11 @@ TEST_F(BatchLogProcessorTest, TestScheduleDelayMillis) new std::vector>); const std::chrono::milliseconds export_delay(0); - const std::chrono::milliseconds schedule_delay_millis(2000); + const std::chrono::milliseconds scheduled_delay_millis(2000); const size_t max_export_batch_size = 512; auto batch_processor = GetMockProcessor(logs_received, is_shutdown, is_export_completed, - export_delay, schedule_delay_millis); + export_delay, scheduled_delay_millis); for (int i = 0; i < max_export_batch_size; ++i) { @@ -257,8 +257,8 @@ TEST_F(BatchLogProcessorTest, TestScheduleDelayMillis) log->SetName("Log" + std::to_string(i)); batch_processor->OnReceive(std::move(log)); } - // Sleep for schedule_delay_millis milliseconds - std::this_thread::sleep_for(schedule_delay_millis); + // Sleep for scheduled_delay_millis milliseconds + std::this_thread::sleep_for(scheduled_delay_millis); // small delay to give time to export, which is being performed // asynchronously by the worker thread (this thread will not From 1204bdcf5bd684e3785578ec4dbc78b4d2594c77 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Fri, 18 Dec 2020 20:45:27 -0500 Subject: [PATCH 10/12] Revert to batch span processor implementation --- .../sdk/logs/batch_log_processor.h | 5 +- sdk/src/logs/batch_log_processor.cc | 217 ++++++++---------- 2 files changed, 98 insertions(+), 124 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h index 9d068c9915..f27bf231fc 100644 --- a/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/batch_log_processor.h @@ -120,14 +120,13 @@ class BatchLogProcessor : public LogProcessor /* Synchronization primitives */ std::condition_variable cv_, force_flush_cv_; - std::mutex cv_m_, force_flush_cv_m_, shutdown_mutex_; + std::mutex cv_m_, force_flush_cv_m_; /* The buffer/queue to which the ended logs are added */ common::CircularBuffer buffer_; /* Important boolean flags to handle the workflow of the processor */ - std::atomic is_shutdown_{false}; // once is_shutdown is set to true, there shouldn't be a - // case it's ever set to false again + std::atomic is_shutdown_{false}; std::atomic is_force_flush_{false}; std::atomic is_force_flush_notified_{false}; diff --git a/sdk/src/logs/batch_log_processor.cc b/sdk/src/logs/batch_log_processor.cc index 55c97742f9..e61f9a31fd 100644 --- a/sdk/src/logs/batch_log_processor.cc +++ b/sdk/src/logs/batch_log_processor.cc @@ -16,9 +16,7 @@ #include "opentelemetry/sdk/logs/batch_log_processor.h" -#include #include - using opentelemetry::sdk::common::AtomicUniquePtr; using opentelemetry::sdk::common::CircularBufferRange; @@ -27,8 +25,6 @@ namespace sdk { namespace logs { -/************************** Constructor *******************************/ - BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, const size_t max_queue_size, const std::chrono::milliseconds scheduled_delay_millis, @@ -41,69 +37,60 @@ BatchLogProcessor::BatchLogProcessor(std::unique_ptr &&exporter, worker_thread_(&BatchLogProcessor::DoBackgroundWork, this) {} -/************************** Helper methods used by the worker thread ***************/ - -void BatchLogProcessor::Export(const bool was_force_flush_called) +std::unique_ptr BatchLogProcessor::MakeRecordable() noexcept { - // Figure out the number of records to export - size_t num_records_to_export; + return exporter_->MakeRecordable(); +} - if (was_force_flush_called == true) +void BatchLogProcessor::OnReceive(std::unique_ptr &&record) noexcept +{ + if (is_shutdown_.load() == true) { - num_records_to_export = buffer_.size(); + return; } - else + + if (buffer_.Add(record) == false) { - // Export the min of max_export_batch_size and buffer_size - num_records_to_export = - buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); + return; } - // Get records from the circular buffer and put into a vector - std::vector> records_arr; - - buffer_.Consume( - num_records_to_export, [&](CircularBufferRange> range) noexcept { - range.ForEach([&](AtomicUniquePtr &ptr) { - std::unique_ptr swap_ptr = std::unique_ptr(nullptr); - ptr.Swap(swap_ptr); - records_arr.push_back(std::unique_ptr(swap_ptr.release())); - return true; - }); - }); - - // Call exporter with the records, and get export status - if (exporter_->Export(nostd::span>( - records_arr.data(), records_arr.size())) != ExportResult::kSuccess) + // If the queue gets at least half full a preemptive notification is + // sent to the worker thread to start a new export cycle. + if (buffer_.size() >= max_queue_size_ / 2) { - // Indicate Error: "[Batch Log Processor]: Failed to export a batch" + // signal the worker thread + cv_.notify_one(); } +} - // In case this export was the result of a force flush, notify main thread that force flush is - // complete - if (was_force_flush_called == true) +bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + if (is_shutdown_.load() == true) { - is_force_flush_notified_.store(true); + return false; + } - while (is_force_flush_notified_.load() == true) - { - force_flush_cv_.notify_one(); - } + is_force_flush_ = true; + + // Keep attempting to wake up the worker thread + while (is_force_flush_.load() == true) + { + cv_.notify_one(); } -} -void BatchLogProcessor::DrainQueue() -{ - while (buffer_.empty() == false) + // Now wait for the worker thread to signal back from the Export method + std::unique_lock lk(force_flush_cv_m_); + while (is_force_flush_notified_.load() == false) { - Export(false); + force_flush_cv_.wait(lk); } + + // Notify the worker thread + is_force_flush_notified_ = false; + + return true; } -/** - * Note this method will not be called concurrently, as it is only called once in the processor's - * constructor - */ void BatchLogProcessor::DoBackgroundWork() { auto timeout = scheduled_delay_millis_; @@ -114,29 +101,35 @@ void BatchLogProcessor::DoBackgroundWork() std::unique_lock lk(cv_m_); cv_.wait_for(lk, timeout); - // Check if shutown was called, drain queue if (is_shutdown_.load() == true) { DrainQueue(); return; } - // Check if ForceFlush was called, notify main thread that worker thread has been notified - bool was_force_flush_called = is_force_flush_.exchange(false); + bool was_force_flush_called = is_force_flush_.load(); - // If the buffer was empty during the entire `timeout` time interval, - // go back to waiting. If this was a spurious wake-up, we export only if - // `buffer_` is not empty. This is acceptable because batching is a best - // mechanism effort here. - if (buffer_.empty() == true) + // Check if this export was the result of a force flush. + if (was_force_flush_called == true) + { + // Since this export was the result of a force flush, signal the + // main thread that the worker thread has been notified + is_force_flush_ = false; + } + else { - continue; + // If the buffer was empty during the entire `timeout` time interval, + // go back to waiting. If this was a spurious wake-up, we export only if + // `buffer_` is not empty. This is acceptable because batching is a best + // mechanism effort here. + if (buffer_.empty() == true) + { + continue; + } } auto start = std::chrono::steady_clock::now(); - Export(was_force_flush_called); - auto end = std::chrono::steady_clock::now(); auto duration = std::chrono::duration_cast(end - start); @@ -145,92 +138,74 @@ void BatchLogProcessor::DoBackgroundWork() } } -/****************************** Overloaded processor methods ******************************/ - -std::unique_ptr BatchLogProcessor::MakeRecordable() noexcept +void BatchLogProcessor::Export(const bool was_force_flush_called) { - return exporter_->MakeRecordable(); -} + std::vector> records_arr; -void BatchLogProcessor::OnReceive(std::unique_ptr &&record) noexcept -{ - if (is_shutdown_.load() == true) + size_t num_records_to_export; + + if (was_force_flush_called == true) { - return; + num_records_to_export = buffer_.size(); } - - if (buffer_.Add(record) == false) + else { - return; + num_records_to_export = + buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size(); } - // If the queue gets at least half full a preemptive notification is - // sent to the worker thread to start a new export cycle. - if (buffer_.size() >= max_queue_size_ / 2) + buffer_.Consume( + num_records_to_export, [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + std::unique_ptr swap_ptr = std::unique_ptr(nullptr); + ptr.Swap(swap_ptr); + records_arr.push_back(std::unique_ptr(swap_ptr.release())); + return true; + }); + }); + + exporter_->Export( + nostd::span>(records_arr.data(), records_arr.size())); + + // Notify the main thread in case this export was the result of a force flush. + if (was_force_flush_called == true) { - // signal the worker thread - cv_.notify_one(); + is_force_flush_notified_ = true; + while (is_force_flush_notified_.load() == true) + { + force_flush_cv_.notify_one(); + } } } -/** - * Notifies the worker thread that ForceFlush() has been called, - * and waits for worker to complete the force flush. - * - * Note: timeout functionality is currently not implemented. - */ -bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept +void BatchLogProcessor::DrainQueue() { - if (is_shutdown_.load() == true) - { - return false; - } - - is_force_flush_.store(true); - - // Keep attempting to notify the worker thread that force flush has been caled - while (is_force_flush_.load() == true) + while (buffer_.empty() == false) { - cv_.notify_one(); + Export(false); } - - // Wait for the worker thread complete the force flush in the Export method - std::unique_lock lk(force_flush_cv_m_); - force_flush_cv_.wait(lk, [this] { return is_force_flush_notified_.load(); }); - - // Notify the worker thread - is_force_flush_notified_.store(false); - return true; } -/** - * Drains the current buffer of logs before shutting down. - * Note: Timeout functionality is currently not implemented - */ bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { - std::unique_lock lock(shutdown_mutex_); + is_shutdown_.store(true); - if (is_shutdown_.exchange(true) == false) + cv_.notify_one(); + worker_thread_.join(); + if (exporter_ != nullptr) { - // Notifies worker thread that shutdown has been called - cv_.notify_one(); - // Wait until worker thread completes current export - worker_thread_.join(); - - // Shuts down the exporter - if (exporter_ != nullptr) - { - return exporter_->Shutdown(); - } - return true; + return exporter_->Shutdown(); } - return false; + + return true; } BatchLogProcessor::~BatchLogProcessor() { - Shutdown(); + if (is_shutdown_.load() == false) + { + Shutdown(); + } } } // namespace logs From 24e7463088326eab60e129841127fcb1808270ea Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Sun, 20 Dec 2020 16:31:57 -0500 Subject: [PATCH 11/12] Fix tests --- sdk/test/logs/batch_log_processor_test.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/test/logs/batch_log_processor_test.cc b/sdk/test/logs/batch_log_processor_test.cc index d7fc8af0d6..7266ae076d 100644 --- a/sdk/test/logs/batch_log_processor_test.cc +++ b/sdk/test/logs/batch_log_processor_test.cc @@ -45,6 +45,7 @@ class MockLogExporter final : public LogExporter { return std::unique_ptr(new LogRecord()); } + // Export method stores the logs received into a shared list of record names ExportResult Export( const opentelemetry::nostd::span> &records) noexcept override From dfb2bbab67f63ff6690ec88942f97d36ac3f191d Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Mon, 21 Dec 2020 11:13:17 -0500 Subject: [PATCH 12/12] Format --- sdk/test/logs/batch_log_processor_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/test/logs/batch_log_processor_test.cc b/sdk/test/logs/batch_log_processor_test.cc index 7266ae076d..baf27de839 100644 --- a/sdk/test/logs/batch_log_processor_test.cc +++ b/sdk/test/logs/batch_log_processor_test.cc @@ -45,7 +45,7 @@ class MockLogExporter final : public LogExporter { return std::unique_ptr(new LogRecord()); } - + // Export method stores the logs received into a shared list of record names ExportResult Export( const opentelemetry::nostd::span> &records) noexcept override