From 647a0cb93b201030ec5c13b8d60e3733f5bf5023 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Tue, 3 Nov 2020 15:48:58 -0500 Subject: [PATCH 1/6] Add simple log processor and log exporter interface --- sdk/include/opentelemetry/sdk/logs/exporter.h | 72 ++++++++ .../opentelemetry/sdk/logs/processor.h | 31 +++- .../sdk/logs/simple_log_processor.h | 64 +++++++ sdk/src/logs/CMakeLists.txt | 3 +- sdk/src/logs/simple_log_processor.cc | 83 +++++++++ sdk/test/logs/BUILD | 11 ++ sdk/test/logs/CMakeLists.txt | 3 +- sdk/test/logs/logger_provider_sdk_test.cc | 11 +- sdk/test/logs/logger_sdk_test.cc | 11 +- sdk/test/logs/simple_log_processor_test.cc | 161 ++++++++++++++++++ 10 files changed, 440 insertions(+), 10 deletions(-) create mode 100644 sdk/include/opentelemetry/sdk/logs/exporter.h create mode 100644 sdk/include/opentelemetry/sdk/logs/simple_log_processor.h create mode 100644 sdk/src/logs/simple_log_processor.cc create mode 100644 sdk/test/logs/simple_log_processor_test.cc diff --git a/sdk/include/opentelemetry/sdk/logs/exporter.h b/sdk/include/opentelemetry/sdk/logs/exporter.h new file mode 100644 index 0000000000..1b2c2d828f --- /dev/null +++ b/sdk/include/opentelemetry/sdk/logs/exporter.h @@ -0,0 +1,72 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include "opentelemetry/logs/log_record.h" +#include "opentelemetry/sdk/logs/processor.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace logs +{ +/** + * ExportResult is returned as result of exporting a batch of Log Records. + */ +enum class ExportResult +{ + // The batch was exported successfully + kSuccess = 0, + // The batch was exported unsuccessfully and was dropped + kFailure +}; + +/** + * LogExporter defines the interface that log exporters must implement. + */ +class LogExporter +{ +public: + virtual ~LogExporter() = default; + + /** + * Exporters that implement this should typically format each LogRecord into the format + * required by the exporter destination (e.g. JSON), then send the LogRecord to the exporter. + * The exporter may retry logs a maximum of 3 times before dropping and returning kFailure. + * If this exporter is already shut down, should return kFailure. + * @param records: a vector of unique pointers to log records + * @returns an ExportResult code (whether export was success or failure) + * + * TODO: This method should not block indefinitely. Should abort within timeout. + */ + virtual ExportResult Export( + const std::vector> &records) noexcept = 0; + + /** + * Marks the exporter as ShutDown and cleans up any resources as required. + * Shutdown should be called only once for each Exporter instance. + * @param timeout this method should not block indefinitely; should abort within timeout. + * @return a ShutDownResult code (if it succeeded, failed or timed out) + */ + virtual ShutdownResult Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0; +}; +} // namespace logs +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/include/opentelemetry/sdk/logs/processor.h b/sdk/include/opentelemetry/sdk/logs/processor.h index d77b7a65d0..76481d45a8 100644 --- a/sdk/include/opentelemetry/sdk/logs/processor.h +++ b/sdk/include/opentelemetry/sdk/logs/processor.h @@ -25,21 +25,44 @@ namespace sdk { namespace logs { +enum class ShutdownResult +{ + kSuccess = 0, + kFailure = 1, + kTimeout = 2 +}; /** - * This Log Processor is responsible for conversion of logs to exportable - * representation and passing them to exporters. + * This Log Processor is responsible for the batching of log records + * and passing them to exporters. */ class LogProcessor { public: virtual ~LogProcessor() = default; + /** + * OnReceive is responsible for batching every log record that is created by the SDK + * @param record a log record that has all the user data and injected data + */ virtual void OnReceive(std::unique_ptr &&record) noexcept = 0; - virtual void ForceFlush( + /** + * Exports all log records that have not yet been exported to the configured Exporter. + * @param timeout that the forceflush is required to finish within. + * A default timeout of 0 mean no timeout is applied. + * @return a result code indicating whether it succeeded, failed or timed out + */ + virtual ShutdownResult ForceFlush( std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0; - virtual void Shutdown( + /** + * Shuts down the processor and does any cleanup required. + * ShutDown should only be called once for each processor. + * @param timeout that the shutdown should finish within. + * A default timeout of 0 means no timeout is applied. + * @return a ShutDown result (if it succeeded, failed or timed out) + */ + virtual ShutdownResult Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0; }; } // namespace logs diff --git a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h new file mode 100644 index 0000000000..83039ddd47 --- /dev/null +++ b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h @@ -0,0 +1,64 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "opentelemetry/common/spin_lock_mutex.h" +#include "opentelemetry/sdk/logs/exporter.h" +#include "opentelemetry/sdk/logs/processor.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace logs +{ +/** + * The simple log processor passes all log records + * in a batch of 1 to the configured + * LogExporter. + * + * All calls to the configured LogExporter are synchronized using a + * spin-lock on an atomic_flag. + */ +class SimpleLogProcessor : public LogProcessor +{ + +public: + explicit SimpleLogProcessor(std::unique_ptr &&exporter); + virtual ~SimpleLogProcessor() = default; + + void OnReceive(std::unique_ptr &&record) noexcept override; + + ShutdownResult ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override; + + ShutdownResult Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override; + +private: + // The configured exporter + std::unique_ptr exporter_; + // The lock used to ensure the exporter is not called concurrently + opentelemetry::common::SpinLockMutex lock_; + // The atomic boolean flag to ensure the ShutDown() function is only called once + std::atomic_flag shutdown_latch_{ATOMIC_FLAG_INIT}; +}; +} // namespace logs +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/logs/CMakeLists.txt b/sdk/src/logs/CMakeLists.txt index 22260f059d..e2e7c2c915 100644 --- a/sdk/src/logs/CMakeLists.txt +++ b/sdk/src/logs/CMakeLists.txt @@ -1,3 +1,4 @@ -add_library(opentelemetry_logs logger_provider.cc logger.cc) +add_library(opentelemetry_logs logger_provider.cc logger.cc + simple_log_processor.cc) target_link_libraries(opentelemetry_logs opentelemetry_common) diff --git a/sdk/src/logs/simple_log_processor.cc b/sdk/src/logs/simple_log_processor.cc new file mode 100644 index 0000000000..9e78ed697d --- /dev/null +++ b/sdk/src/logs/simple_log_processor.cc @@ -0,0 +1,83 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "opentelemetry/sdk/logs/simple_log_processor.h" + +#include +#include + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace logs +{ +/** + * Initialize a simple log processor. + * @param exporter the configured exporter where log records are sent + */ +SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr &&exporter) + : exporter_(std::move(exporter)) +{} + +/** + * Batches the log record it receives in a batch of 1 and immediately sends it + * to the configured exporter + */ +void SimpleLogProcessor::OnReceive( + std::unique_ptr &&record) noexcept +{ + std::vector> batch; + batch.emplace_back(std::move(record)); + + // Get lock to ensure Export() is never called concurrently + std::lock_guard locked(lock_); + + if (exporter_->Export(batch) == ExportResult::kFailure) + { + /* TODO: alert user of the failed or timedout export result */ + } +} + +/** + * The simple processor does not have any log records to flush so this method is not used + */ +ShutdownResult SimpleLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept +{ + return ShutdownResult::kSuccess; +} + +/** + * TODO: This method should not block indefinitely. Should abort within timeout. + */ +ShutdownResult SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept +{ + if (timeout < std::chrono::microseconds(0)) + { + // TODO: alert caller of invalid timeout? + return ShutdownResult::kFailure; + } + + // Should only shutdown exporter ONCE. + if (!shutdown_latch_.test_and_set(std::memory_order_acquire)) + { + return exporter_->Shutdown(timeout); + } + + return ShutdownResult::kFailure; +} +} // namespace logs +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/logs/BUILD b/sdk/test/logs/BUILD index b58eac0eee..ccbbfd8c78 100644 --- a/sdk/test/logs/BUILD +++ b/sdk/test/logs/BUILD @@ -20,3 +20,14 @@ cc_test( "@com_google_googletest//:gtest_main", ], ) + +cc_test( + name = "simple_log_processor_test", + srcs = [ + "simple_log_processor_test.cc", + ], + deps = [ + "//sdk/src/logs", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/sdk/test/logs/CMakeLists.txt b/sdk/test/logs/CMakeLists.txt index c1342a16b9..024c42c45a 100644 --- a/sdk/test/logs/CMakeLists.txt +++ b/sdk/test/logs/CMakeLists.txt @@ -1,4 +1,5 @@ -foreach(testname logger_provider_sdk_test logger_sdk_test) +foreach(testname logger_provider_sdk_test logger_sdk_test + simple_log_processor_test) add_executable(${testname} "${testname}.cc") target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} opentelemetry_logs) diff --git a/sdk/test/logs/logger_provider_sdk_test.cc b/sdk/test/logs/logger_provider_sdk_test.cc index a2a020c838..41d464cec7 100644 --- a/sdk/test/logs/logger_provider_sdk_test.cc +++ b/sdk/test/logs/logger_provider_sdk_test.cc @@ -70,8 +70,15 @@ TEST(LoggerProviderSDK, LoggerProviderLoggerArguments) class DummyProcessor : public LogProcessor { void OnReceive(std::unique_ptr &&record) noexcept {} - void ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {} - void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {} + ShutdownResult ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept + { + return ShutdownResult::kSuccess; + } + ShutdownResult Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept + { + return ShutdownResult::kSuccess; + } }; TEST(LoggerProviderSDK, GetAndSetProcessor) diff --git a/sdk/test/logs/logger_sdk_test.cc b/sdk/test/logs/logger_sdk_test.cc index 6c36b68c0c..86c8759d21 100644 --- a/sdk/test/logs/logger_sdk_test.cc +++ b/sdk/test/logs/logger_sdk_test.cc @@ -38,8 +38,15 @@ TEST(LoggerSDK, LogToNullProcessor) class DummyProcessor : public LogProcessor { void OnReceive(std::unique_ptr &&record) noexcept {} - void ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {} - void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept {} + ShutdownResult ForceFlush( + std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept + { + return ShutdownResult::kSuccess; + } + ShutdownResult Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept + { + return ShutdownResult::kSuccess; + } }; TEST(LoggerSDK, LogToAProcessor) diff --git a/sdk/test/logs/simple_log_processor_test.cc b/sdk/test/logs/simple_log_processor_test.cc new file mode 100644 index 0000000000..ccc79124fa --- /dev/null +++ b/sdk/test/logs/simple_log_processor_test.cc @@ -0,0 +1,161 @@ +#include "opentelemetry/sdk/logs/simple_log_processor.h" +#include + +#include +#include + +using namespace opentelemetry::sdk::logs; +using opentelemetry::logs::LogRecord; + +/* + * A test exporter that can return a vector of all the records it has received, + * and keep track of the number of times its Shutdown() function was called. + */ +class TestExporter final : public LogExporter +{ +public: + TestExporter(int *shutdown_counter, + std::shared_ptr> logs_received, + int *batch_size_received) + : shutdown_counter_(shutdown_counter), + logs_received_(logs_received), + batch_size_received(batch_size_received) + {} + + // Stores the names of the log records this exporter receives to an internal list + ExportResult Export(const std::vector> &records) noexcept override + { + *batch_size_received = records.size(); + for (auto &record : records) + { + logs_received_->push_back(record->name.data()); + } + return ExportResult::kSuccess; + } + + // Increment the shutdown counter everytime this method is called + ShutdownResult Shutdown(std::chrono::microseconds timeout) noexcept override + { + *shutdown_counter_ += 1; + return ShutdownResult::kSuccess; + } + +private: + int *shutdown_counter_; + std::shared_ptr> logs_received_; + int *batch_size_received; +}; + +// Tests whether the simple processor successfully creates a batch of size 1 +// and whether the contents of the record is sent to the exporter correctly +TEST(SimpleLogProcessorTest, SendReceivedLogsToExporter) +{ + // Create a simple processor with a TestExporter attached + std::shared_ptr> logs_received(new std::vector); + int batch_size_received = 0; + + std::unique_ptr exporter( + new TestExporter(nullptr, logs_received, &batch_size_received)); + + SimpleLogProcessor processor(std::move(exporter)); + + // Send some log records to the processor (which should then send to the TestExporter) + const int num_logs = 5; + for (int i = 0; i < num_logs; i++) + { + auto record = std::unique_ptr(new LogRecord()); + std::string s("Log name"); + s += std::to_string(i); + record->name = s; + + processor.OnReceive(std::move(record)); + + // Verify that the batch of 1 log record sent by processor matches what exporter received + EXPECT_EQ(1, batch_size_received); + } + + // Test whether the processor's log sent matches the log record received by the exporter + EXPECT_EQ(logs_received->size(), num_logs); + for (int i = 0; i < num_logs; i++) + { + std::string s("Log name"); + s += std::to_string(i); + EXPECT_EQ(s, logs_received->at(i)); + } +} + +// Tests behavior when calling the processor's ShutDown() multiple times +TEST(SimpleLogProcessorTest, ShutdownCalledOnce) +{ + // Create a TestExporter + int num_shutdowns = 0; + + std::unique_ptr exporter(new TestExporter(&num_shutdowns, nullptr, nullptr)); + + // Create a processor with the previous test exporter + SimpleLogProcessor processor(std::move(exporter)); + + // The first time processor shutdown is called + EXPECT_EQ(0, num_shutdowns); + EXPECT_EQ(ShutdownResult::kSuccess, processor.Shutdown()); + EXPECT_EQ(1, num_shutdowns); + + // The second time processor shutdown is called + EXPECT_EQ(ShutdownResult::kFailure, processor.Shutdown()); + // Processor::ShutDown(), even if called more than once, should only shutdown exporter once + EXPECT_EQ(1, num_shutdowns); +} + +class SlowShutDownExporter final : public LogExporter +{ +public: + SlowShutDownExporter() {} + + ExportResult Export(const std::vector> &records) noexcept override + { + return ExportResult::kSuccess; + } + + ShutdownResult Shutdown(std::chrono::microseconds timeout) noexcept override + { + return ShutdownResult::kTimeout; + } +}; + +// Tests whether processor shutdown times out when exporter shutdown times out +TEST(SimpleLogProcessorTest, ShutDownTimeout) +{ + std::unique_ptr exporter(new SlowShutDownExporter()); + SimpleLogProcessor processor(std::move(exporter)); + EXPECT_EQ(ShutdownResult::kTimeout, processor.Shutdown(std::chrono::microseconds(1))); +} + +// A test exporter that always returns failure when shut down +class FailShutDownExporter final : public LogExporter +{ +public: + FailShutDownExporter() {} + + ExportResult Export(const std::vector> &records) noexcept override + { + return ExportResult::kSuccess; + } + + ShutdownResult Shutdown(std::chrono::microseconds timeout) noexcept override + { + return ShutdownResult::kFailure; + } +}; + +// Tests for when when processor should fail to shutdown +TEST(SimpleLogProcessorTest, ShutDownFail) +{ + std::unique_ptr exporter(new FailShutDownExporter()); + SimpleLogProcessor processor(std::move(exporter)); + + // Expect failure result when exporter fails to shutdown + EXPECT_EQ(ShutdownResult::kFailure, processor.Shutdown()); + + // Expect failure result when processor given a negative timeout allowed to shutdown + EXPECT_EQ(ShutdownResult::kFailure, processor.Shutdown(std::chrono::microseconds(-1))); +} From 219da60949433035a552087fd2d762aca407b57b Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Tue, 24 Nov 2020 16:42:52 -0500 Subject: [PATCH 2/6] review comments --- sdk/include/opentelemetry/sdk/logs/exporter.h | 18 ++++---- .../opentelemetry/sdk/logs/processor.h | 23 ++++------- .../sdk/logs/simple_log_processor.h | 5 +-- sdk/src/logs/simple_log_processor.cc | 13 +++--- sdk/test/logs/logger_provider_sdk_test.cc | 9 ++-- sdk/test/logs/logger_sdk_test.cc | 9 ++-- sdk/test/logs/simple_log_processor_test.cc | 41 ++++--------------- 7 files changed, 40 insertions(+), 78 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/exporter.h b/sdk/include/opentelemetry/sdk/logs/exporter.h index 1b2c2d828f..39e15b3557 100644 --- a/sdk/include/opentelemetry/sdk/logs/exporter.h +++ b/sdk/include/opentelemetry/sdk/logs/exporter.h @@ -46,14 +46,11 @@ class LogExporter virtual ~LogExporter() = default; /** - * Exporters that implement this should typically format each LogRecord into the format - * required by the exporter destination (e.g. JSON), then send the LogRecord to the exporter. - * The exporter may retry logs a maximum of 3 times before dropping and returning kFailure. - * If this exporter is already shut down, should return kFailure. - * @param records: a vector of unique pointers to log records + * This method does any formatting required, then sends the log records + * to their export destination. The exporter may attempt to retry sending + * logs, but should drop them and return kFailure after a timeout. + * @param records a vector of unique pointers to log records * @returns an ExportResult code (whether export was success or failure) - * - * TODO: This method should not block indefinitely. Should abort within timeout. */ virtual ExportResult Export( const std::vector> &records) noexcept = 0; @@ -61,10 +58,11 @@ class LogExporter /** * Marks the exporter as ShutDown and cleans up any resources as required. * Shutdown should be called only once for each Exporter instance. - * @param timeout this method should not block indefinitely; should abort within timeout. - * @return a ShutDownResult code (if it succeeded, failed or timed out) + * @param timeout minimum amount of microseconds to wait for shutdown before giving up and + * returning failure. + * @return true if the exporter shutdown succeeded, false otherwise */ - virtual ShutdownResult Shutdown( + virtual bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0; }; } // namespace logs diff --git a/sdk/include/opentelemetry/sdk/logs/processor.h b/sdk/include/opentelemetry/sdk/logs/processor.h index 76481d45a8..64614f8113 100644 --- a/sdk/include/opentelemetry/sdk/logs/processor.h +++ b/sdk/include/opentelemetry/sdk/logs/processor.h @@ -25,15 +25,9 @@ namespace sdk { namespace logs { -enum class ShutdownResult -{ - kSuccess = 0, - kFailure = 1, - kTimeout = 2 -}; /** - * This Log Processor is responsible for the batching of log records - * and passing them to exporters. + * The Log Processor is responsible for passing log records + * to the configured exporter. */ class LogProcessor { @@ -41,8 +35,8 @@ class LogProcessor virtual ~LogProcessor() = default; /** - * OnReceive is responsible for batching every log record that is created by the SDK - * @param record a log record that has all the user data and injected data + * OnReceive is called by the SDK once a log record has been successfully created. + * @param record the log record */ virtual void OnReceive(std::unique_ptr &&record) noexcept = 0; @@ -52,17 +46,18 @@ class LogProcessor * A default timeout of 0 mean no timeout is applied. * @return a result code indicating whether it succeeded, failed or timed out */ - virtual ShutdownResult ForceFlush( + virtual bool ForceFlush( std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0; /** * Shuts down the processor and does any cleanup required. * ShutDown should only be called once for each processor. - * @param timeout that the shutdown should finish within. + * @param timeout minimum amount of microseconds to wait for + * shutdown before giving up and returning failure. * A default timeout of 0 means no timeout is applied. - * @return a ShutDown result (if it succeeded, failed or timed out) + * @return true if the shutdown succeeded, false otherwise */ - virtual ShutdownResult Shutdown( + virtual bool Shutdown( std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0; }; } // namespace logs diff --git a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h index 83039ddd47..58a41db380 100644 --- a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h @@ -45,11 +45,10 @@ class SimpleLogProcessor : public LogProcessor void OnReceive(std::unique_ptr &&record) noexcept override; - ShutdownResult ForceFlush( + bool ForceFlush( std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override; - ShutdownResult Shutdown( - std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override; + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override; private: // The configured exporter diff --git a/sdk/src/logs/simple_log_processor.cc b/sdk/src/logs/simple_log_processor.cc index 9e78ed697d..3e473f61c2 100644 --- a/sdk/src/logs/simple_log_processor.cc +++ b/sdk/src/logs/simple_log_processor.cc @@ -45,29 +45,28 @@ void SimpleLogProcessor::OnReceive( // Get lock to ensure Export() is never called concurrently std::lock_guard locked(lock_); - if (exporter_->Export(batch) == ExportResult::kFailure) + if (exporter_->Export(batch) != ExportResult::kSuccess) { /* TODO: alert user of the failed or timedout export result */ } } - /** * The simple processor does not have any log records to flush so this method is not used */ -ShutdownResult SimpleLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept +bool SimpleLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept { - return ShutdownResult::kSuccess; + return true; } /** * TODO: This method should not block indefinitely. Should abort within timeout. */ -ShutdownResult SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept +bool SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { if (timeout < std::chrono::microseconds(0)) { // TODO: alert caller of invalid timeout? - return ShutdownResult::kFailure; + return false; } // Should only shutdown exporter ONCE. @@ -76,7 +75,7 @@ ShutdownResult SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) n return exporter_->Shutdown(timeout); } - return ShutdownResult::kFailure; + return false; } } // namespace logs } // namespace sdk diff --git a/sdk/test/logs/logger_provider_sdk_test.cc b/sdk/test/logs/logger_provider_sdk_test.cc index 41d464cec7..02e7c47275 100644 --- a/sdk/test/logs/logger_provider_sdk_test.cc +++ b/sdk/test/logs/logger_provider_sdk_test.cc @@ -70,14 +70,13 @@ TEST(LoggerProviderSDK, LoggerProviderLoggerArguments) class DummyProcessor : public LogProcessor { void OnReceive(std::unique_ptr &&record) noexcept {} - ShutdownResult ForceFlush( - std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept + bool ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept { - return ShutdownResult::kSuccess; + return true; } - ShutdownResult Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept { - return ShutdownResult::kSuccess; + return true; } }; diff --git a/sdk/test/logs/logger_sdk_test.cc b/sdk/test/logs/logger_sdk_test.cc index 86c8759d21..d4825d0032 100644 --- a/sdk/test/logs/logger_sdk_test.cc +++ b/sdk/test/logs/logger_sdk_test.cc @@ -38,14 +38,13 @@ TEST(LoggerSDK, LogToNullProcessor) class DummyProcessor : public LogProcessor { void OnReceive(std::unique_ptr &&record) noexcept {} - ShutdownResult ForceFlush( - std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept + bool ForceFlush(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept { - return ShutdownResult::kSuccess; + return true; } - ShutdownResult Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept { - return ShutdownResult::kSuccess; + return true; } }; diff --git a/sdk/test/logs/simple_log_processor_test.cc b/sdk/test/logs/simple_log_processor_test.cc index ccc79124fa..077c1c274f 100644 --- a/sdk/test/logs/simple_log_processor_test.cc +++ b/sdk/test/logs/simple_log_processor_test.cc @@ -34,10 +34,10 @@ class TestExporter final : public LogExporter } // Increment the shutdown counter everytime this method is called - ShutdownResult Shutdown(std::chrono::microseconds timeout) noexcept override + bool Shutdown(std::chrono::microseconds timeout) noexcept override { *shutdown_counter_ += 1; - return ShutdownResult::kSuccess; + return true; } private: @@ -97,39 +97,15 @@ TEST(SimpleLogProcessorTest, ShutdownCalledOnce) // The first time processor shutdown is called EXPECT_EQ(0, num_shutdowns); - EXPECT_EQ(ShutdownResult::kSuccess, processor.Shutdown()); + EXPECT_EQ(true, processor.Shutdown()); EXPECT_EQ(1, num_shutdowns); // The second time processor shutdown is called - EXPECT_EQ(ShutdownResult::kFailure, processor.Shutdown()); + EXPECT_EQ(false, processor.Shutdown()); // Processor::ShutDown(), even if called more than once, should only shutdown exporter once EXPECT_EQ(1, num_shutdowns); } -class SlowShutDownExporter final : public LogExporter -{ -public: - SlowShutDownExporter() {} - - ExportResult Export(const std::vector> &records) noexcept override - { - return ExportResult::kSuccess; - } - - ShutdownResult Shutdown(std::chrono::microseconds timeout) noexcept override - { - return ShutdownResult::kTimeout; - } -}; - -// Tests whether processor shutdown times out when exporter shutdown times out -TEST(SimpleLogProcessorTest, ShutDownTimeout) -{ - std::unique_ptr exporter(new SlowShutDownExporter()); - SimpleLogProcessor processor(std::move(exporter)); - EXPECT_EQ(ShutdownResult::kTimeout, processor.Shutdown(std::chrono::microseconds(1))); -} - // A test exporter that always returns failure when shut down class FailShutDownExporter final : public LogExporter { @@ -141,10 +117,7 @@ class FailShutDownExporter final : public LogExporter return ExportResult::kSuccess; } - ShutdownResult Shutdown(std::chrono::microseconds timeout) noexcept override - { - return ShutdownResult::kFailure; - } + bool Shutdown(std::chrono::microseconds timeout) noexcept override { return false; } }; // Tests for when when processor should fail to shutdown @@ -154,8 +127,8 @@ TEST(SimpleLogProcessorTest, ShutDownFail) SimpleLogProcessor processor(std::move(exporter)); // Expect failure result when exporter fails to shutdown - EXPECT_EQ(ShutdownResult::kFailure, processor.Shutdown()); + EXPECT_EQ(false, processor.Shutdown()); // Expect failure result when processor given a negative timeout allowed to shutdown - EXPECT_EQ(ShutdownResult::kFailure, processor.Shutdown(std::chrono::microseconds(-1))); + EXPECT_EQ(false, processor.Shutdown(std::chrono::microseconds(-1))); } From dfde0eac9cf85bfbb23b50fcc6832f89187201b0 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Wed, 25 Nov 2020 16:46:31 -0500 Subject: [PATCH 3/6] update default duration and description --- sdk/include/opentelemetry/sdk/logs/exporter.h | 11 ++++++----- sdk/include/opentelemetry/sdk/logs/processor.h | 6 ++---- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/exporter.h b/sdk/include/opentelemetry/sdk/logs/exporter.h index 39e15b3557..6e2585f87e 100644 --- a/sdk/include/opentelemetry/sdk/logs/exporter.h +++ b/sdk/include/opentelemetry/sdk/logs/exporter.h @@ -33,7 +33,7 @@ enum class ExportResult { // The batch was exported successfully kSuccess = 0, - // The batch was exported unsuccessfully and was dropped + // The batch was exported unsuccessfully and was dropped, but can not be retried kFailure }; @@ -46,9 +46,10 @@ class LogExporter virtual ~LogExporter() = default; /** - * This method does any formatting required, then sends the log records - * to their export destination. The exporter may attempt to retry sending - * logs, but should drop them and return kFailure after a timeout. + * Exports the batch of log records to their export destination. + * This method must not be called concurrently for the same exporter instance. + * The exporter may attempt to retry sending the batch, but should drop + * and return kFailure after a certain timeout. * @param records a vector of unique pointers to log records * @returns an ExportResult code (whether export was success or failure) */ @@ -63,7 +64,7 @@ class LogExporter * @return true if the exporter shutdown succeeded, false otherwise */ virtual bool Shutdown( - std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0; + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0; }; } // namespace logs } // namespace sdk diff --git a/sdk/include/opentelemetry/sdk/logs/processor.h b/sdk/include/opentelemetry/sdk/logs/processor.h index 64614f8113..7468548dcf 100644 --- a/sdk/include/opentelemetry/sdk/logs/processor.h +++ b/sdk/include/opentelemetry/sdk/logs/processor.h @@ -43,22 +43,20 @@ class LogProcessor /** * Exports all log records that have not yet been exported to the configured Exporter. * @param timeout that the forceflush is required to finish within. - * A default timeout of 0 mean no timeout is applied. * @return a result code indicating whether it succeeded, failed or timed out */ virtual bool ForceFlush( - std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0; + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0; /** * Shuts down the processor and does any cleanup required. * ShutDown should only be called once for each processor. * @param timeout minimum amount of microseconds to wait for * shutdown before giving up and returning failure. - * A default timeout of 0 means no timeout is applied. * @return true if the shutdown succeeded, false otherwise */ virtual bool Shutdown( - std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept = 0; + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept = 0; }; } // namespace logs } // namespace sdk From efe013b0588f2b240302ddc1bc11d7d808160002 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Fri, 27 Nov 2020 18:09:41 -0500 Subject: [PATCH 4/6] Use span instead of vector for Export() --- sdk/include/opentelemetry/sdk/logs/exporter.h | 5 +++-- sdk/src/logs/simple_log_processor.cc | 6 ++---- sdk/test/logs/simple_log_processor_test.cc | 8 ++++++-- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/exporter.h b/sdk/include/opentelemetry/sdk/logs/exporter.h index 6e2585f87e..6f44d32a28 100644 --- a/sdk/include/opentelemetry/sdk/logs/exporter.h +++ b/sdk/include/opentelemetry/sdk/logs/exporter.h @@ -19,6 +19,7 @@ #include #include #include "opentelemetry/logs/log_record.h" +#include "opentelemetry/nostd/span.h" #include "opentelemetry/sdk/logs/processor.h" OPENTELEMETRY_BEGIN_NAMESPACE @@ -50,11 +51,11 @@ class LogExporter * This method must not be called concurrently for the same exporter instance. * The exporter may attempt to retry sending the batch, but should drop * and return kFailure after a certain timeout. - * @param records a vector of unique pointers to log records + * @param records a span of unique pointers to log records * @returns an ExportResult code (whether export was success or failure) */ virtual ExportResult Export( - const std::vector> &records) noexcept = 0; + const nostd::span> &records) noexcept = 0; /** * Marks the exporter as ShutDown and cleans up any resources as required. diff --git a/sdk/src/logs/simple_log_processor.cc b/sdk/src/logs/simple_log_processor.cc index 3e473f61c2..30293b9e8a 100644 --- a/sdk/src/logs/simple_log_processor.cc +++ b/sdk/src/logs/simple_log_processor.cc @@ -39,11 +39,9 @@ SimpleLogProcessor::SimpleLogProcessor(std::unique_ptr &&exporter) void SimpleLogProcessor::OnReceive( std::unique_ptr &&record) noexcept { - std::vector> batch; - batch.emplace_back(std::move(record)); - + nostd::span> batch(&record, 1); // Get lock to ensure Export() is never called concurrently - std::lock_guard locked(lock_); + const std::lock_guard locked(lock_); if (exporter_->Export(batch) != ExportResult::kSuccess) { diff --git a/sdk/test/logs/simple_log_processor_test.cc b/sdk/test/logs/simple_log_processor_test.cc index 077c1c274f..834cba7ad8 100644 --- a/sdk/test/logs/simple_log_processor_test.cc +++ b/sdk/test/logs/simple_log_processor_test.cc @@ -1,4 +1,6 @@ #include "opentelemetry/sdk/logs/simple_log_processor.h" +#include "opentelemetry/sdk/logs/exporter.h" + #include #include @@ -23,7 +25,8 @@ class TestExporter final : public LogExporter {} // Stores the names of the log records this exporter receives to an internal list - ExportResult Export(const std::vector> &records) noexcept override + ExportResult Export( + const opentelemetry::nostd::span> &records) noexcept override { *batch_size_received = records.size(); for (auto &record : records) @@ -112,7 +115,8 @@ class FailShutDownExporter final : public LogExporter public: FailShutDownExporter() {} - ExportResult Export(const std::vector> &records) noexcept override + ExportResult Export( + const opentelemetry::nostd::span> &records) noexcept override { return ExportResult::kSuccess; } From d35f906fa413279a63bfaa47c2ed3e3fed7e02c4 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Wed, 2 Dec 2020 15:51:19 -0500 Subject: [PATCH 5/6] Minor: timeout changed (for consistency) --- sdk/include/opentelemetry/sdk/logs/simple_log_processor.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h index 58a41db380..fb1eb6d87d 100644 --- a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h @@ -46,9 +46,9 @@ class SimpleLogProcessor : public LogProcessor void OnReceive(std::unique_ptr &&record) noexcept override; bool ForceFlush( - std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override; + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; - bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override; + bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; private: // The configured exporter From e04d63f2b50840a0716a8f782fb3ba0364a9b665 Mon Sep 17 00:00:00 2001 From: Karen Xu Date: Thu, 3 Dec 2020 18:19:42 -0500 Subject: [PATCH 6/6] Remove TODOs, run format CI, and rebase master --- .../opentelemetry/sdk/logs/simple_log_processor.h | 3 ++- sdk/src/logs/simple_log_processor.cc | 11 +---------- 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h index fb1eb6d87d..18ed28b2cf 100644 --- a/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h +++ b/sdk/include/opentelemetry/sdk/logs/simple_log_processor.h @@ -48,7 +48,8 @@ class SimpleLogProcessor : public LogProcessor bool ForceFlush( std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; - bool Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; + bool Shutdown( + std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override; private: // The configured exporter diff --git a/sdk/src/logs/simple_log_processor.cc b/sdk/src/logs/simple_log_processor.cc index 30293b9e8a..d88fc9eb92 100644 --- a/sdk/src/logs/simple_log_processor.cc +++ b/sdk/src/logs/simple_log_processor.cc @@ -45,7 +45,7 @@ void SimpleLogProcessor::OnReceive( if (exporter_->Export(batch) != ExportResult::kSuccess) { - /* TODO: alert user of the failed or timedout export result */ + /* Alert user of the failed export */ } } /** @@ -56,17 +56,8 @@ bool SimpleLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept return true; } -/** - * TODO: This method should not block indefinitely. Should abort within timeout. - */ bool SimpleLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept { - if (timeout < std::chrono::microseconds(0)) - { - // TODO: alert caller of invalid timeout? - return false; - } - // Should only shutdown exporter ONCE. if (!shutdown_latch_.test_and_set(std::memory_order_acquire)) {