Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions sdk/include/opentelemetry/sdk/logs/batch_log_processor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "opentelemetry/sdk/common/circular_buffer.h"
#include "opentelemetry/sdk/logs/exporter.h"
#include "opentelemetry/sdk/logs/processor.h"

#include <atomic>
#include <condition_variable>
#include <thread>

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{

namespace logs
{

/**
* This is an implementation of the LogProcessor which creates batches of finished logs and passes
* the export-friendly log data representations to the configured LogExporter.
*/
class BatchLogProcessor : public LogProcessor
{
public:
/**
* Creates a batch log processor by configuring the specified exporter and other parameters
* as per the official, language-agnostic opentelemetry specs.
*
* @param exporter - The backend exporter to pass the logs to
* @param max_queue_size - The maximum buffer/queue size. After the size is reached, logs are
* dropped.
* @param scheduled_delay_millis - The time interval between two consecutive exports.
* @param max_export_batch_size - The maximum batch size of every export. It must be smaller or
* equal to max_queue_size
*/
explicit BatchLogProcessor(
std::unique_ptr<LogExporter> &&exporter,
const size_t max_queue_size = 2048,
const std::chrono::milliseconds scheduled_delay_millis = std::chrono::milliseconds(5000),
const size_t max_export_batch_size = 512);

/** Makes a new recordable **/
std::unique_ptr<Recordable> MakeRecordable() noexcept override;

/**
* Called when the Logger's log method creates a log record
* @param record the log record
*/

Comment thread
kxyr marked this conversation as resolved.
void OnReceive(std::unique_ptr<Recordable> &&record) noexcept override;

/**
* Export all log records that have not been exported yet.
*
* NOTE: Timeout functionality not supported yet.
*/
bool ForceFlush(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

/**
* Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of
* all its logs and passes them to the exporter. Any subsequent calls to
* ForceFlush or Shutdown will return immediately without doing anything.
*
* NOTE: Timeout functionality not supported yet.
*/
bool Shutdown(
std::chrono::microseconds timeout = std::chrono::microseconds::max()) noexcept override;

/**
* Class destructor which invokes the Shutdown() method.
*/
virtual ~BatchLogProcessor() override;

private:
/**
* The background routine performed by the worker thread.
*/
void DoBackgroundWork();

/**
* Exports all logs to the configured exporter.
*
* @param was_force_flush_called - A flag to check if the current export is the result
* of a call to ForceFlush method. If true, then we have to
* notify the main thread to wake it up in the ForceFlush
* method.
*/
void Export(const bool was_for_flush_called);

/**
* Called when Shutdown() is invoked. Completely drains the queue of all log records and
* passes them to the exporter.
*/
void DrainQueue();

/* The configured backend log exporter */
std::unique_ptr<LogExporter> exporter_;

/* Configurable parameters as per the official *trace* specs */
const size_t max_queue_size_;
const std::chrono::milliseconds scheduled_delay_millis_;
const size_t max_export_batch_size_;

/* Synchronization primitives */
std::condition_variable cv_, force_flush_cv_;
std::mutex cv_m_, force_flush_cv_m_;

/* The buffer/queue to which the ended logs are added */
common::CircularBuffer<Recordable> buffer_;

/* Important boolean flags to handle the workflow of the processor */
std::atomic<bool> is_shutdown_{false};
std::atomic<bool> is_force_flush_{false};
std::atomic<bool> is_force_flush_notified_{false};

/* The background worker thread */
std::thread worker_thread_;
};

} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
2 changes: 1 addition & 1 deletion sdk/src/logs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
add_library(opentelemetry_logs logger_provider.cc logger.cc
simple_log_processor.cc)
simple_log_processor.cc batch_log_processor.cc)

target_link_libraries(opentelemetry_logs opentelemetry_common)
213 changes: 213 additions & 0 deletions sdk/src/logs/batch_log_processor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "opentelemetry/sdk/logs/batch_log_processor.h"

#include <vector>
using opentelemetry::sdk::common::AtomicUniquePtr;
using opentelemetry::sdk::common::CircularBufferRange;

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace logs
{
BatchLogProcessor::BatchLogProcessor(std::unique_ptr<LogExporter> &&exporter,
const size_t max_queue_size,
const std::chrono::milliseconds scheduled_delay_millis,
const size_t max_export_batch_size)
: exporter_(std::move(exporter)),
max_queue_size_(max_queue_size),
scheduled_delay_millis_(scheduled_delay_millis),
max_export_batch_size_(max_export_batch_size),
buffer_(max_queue_size_),
worker_thread_(&BatchLogProcessor::DoBackgroundWork, this)
{}

std::unique_ptr<Recordable> BatchLogProcessor::MakeRecordable() noexcept
{
return exporter_->MakeRecordable();
}

void BatchLogProcessor::OnReceive(std::unique_ptr<Recordable> &&record) noexcept
{
if (is_shutdown_.load() == true)
{
return;
}

if (buffer_.Add(record) == false)
{
return;
}

// If the queue gets at least half full a preemptive notification is
// sent to the worker thread to start a new export cycle.
if (buffer_.size() >= max_queue_size_ / 2)
{
// signal the worker thread
cv_.notify_one();
}
}

bool BatchLogProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
{
Comment thread
maxgolov marked this conversation as resolved.
if (is_shutdown_.load() == true)
{
return false;
}

is_force_flush_ = true;

// Keep attempting to wake up the worker thread
Comment thread
maxgolov marked this conversation as resolved.
while (is_force_flush_.load() == true)
{
cv_.notify_one();
}

// Now wait for the worker thread to signal back from the Export method
std::unique_lock<std::mutex> lk(force_flush_cv_m_);
while (is_force_flush_notified_.load() == false)
{
force_flush_cv_.wait(lk);
}

// Notify the worker thread
is_force_flush_notified_ = false;

return true;
}

void BatchLogProcessor::DoBackgroundWork()
{
auto timeout = scheduled_delay_millis_;

while (true)
{
// Wait for `timeout` milliseconds
std::unique_lock<std::mutex> lk(cv_m_);
cv_.wait_for(lk, timeout);

if (is_shutdown_.load() == true)
{
DrainQueue();
return;
}

bool was_force_flush_called = is_force_flush_.load();

// Check if this export was the result of a force flush.
if (was_force_flush_called == true)
{
// Since this export was the result of a force flush, signal the
// main thread that the worker thread has been notified
is_force_flush_ = false;
}
else
{
// If the buffer was empty during the entire `timeout` time interval,
// go back to waiting. If this was a spurious wake-up, we export only if
// `buffer_` is not empty. This is acceptable because batching is a best
// mechanism effort here.
if (buffer_.empty() == true)
{
continue;
}
}

auto start = std::chrono::steady_clock::now();
Export(was_force_flush_called);
auto end = std::chrono::steady_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(end - start);

// Subtract the duration of this export call from the next `timeout`.
timeout = scheduled_delay_millis_ - duration;
}
}

Comment thread
kxyr marked this conversation as resolved.
void BatchLogProcessor::Export(const bool was_force_flush_called)
{
std::vector<std::unique_ptr<Recordable>> records_arr;

size_t num_records_to_export;

if (was_force_flush_called == true)
{
num_records_to_export = buffer_.size();
}
else
{
num_records_to_export =
buffer_.size() >= max_export_batch_size_ ? max_export_batch_size_ : buffer_.size();
}

buffer_.Consume(
num_records_to_export, [&](CircularBufferRange<AtomicUniquePtr<Recordable>> range) noexcept {
range.ForEach([&](AtomicUniquePtr<Recordable> &ptr) {
std::unique_ptr<Recordable> swap_ptr = std::unique_ptr<Recordable>(nullptr);
ptr.Swap(swap_ptr);
records_arr.push_back(std::unique_ptr<Recordable>(swap_ptr.release()));
return true;
});
});

exporter_->Export(
nostd::span<std::unique_ptr<Recordable>>(records_arr.data(), records_arr.size()));

// Notify the main thread in case this export was the result of a force flush.
if (was_force_flush_called == true)
{
is_force_flush_notified_ = true;
while (is_force_flush_notified_.load() == true)
Comment thread
maxgolov marked this conversation as resolved.
{
force_flush_cv_.notify_one();
}
}
}

void BatchLogProcessor::DrainQueue()
{
while (buffer_.empty() == false)
{
Export(false);
}
}

bool BatchLogProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
is_shutdown_.store(true);

cv_.notify_one();
worker_thread_.join();
if (exporter_ != nullptr)
{
return exporter_->Shutdown();
}

return true;
}

BatchLogProcessor::~BatchLogProcessor()
{
if (is_shutdown_.load() == false)
Comment thread
kxyr marked this conversation as resolved.
{
Shutdown();
}
}

} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
11 changes: 11 additions & 0 deletions sdk/test/logs/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,14 @@ cc_test(
"@com_google_googletest//:gtest_main",
],
)

cc_test(
name = "batch_log_processor_test",
srcs = [
"batch_log_processor_test.cc",
],
deps = [
"//sdk/src/logs",
"@com_google_googletest//:gtest_main",
],
)
4 changes: 2 additions & 2 deletions sdk/test/logs/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
foreach(testname logger_provider_sdk_test logger_sdk_test
simple_log_processor_test log_record_test)
foreach(testname logger_provider_sdk_test logger_sdk_test log_record_test
simple_log_processor_test batch_log_processor_test)
add_executable(${testname} "${testname}.cc")
target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES}
${CMAKE_THREAD_LIBS_INIT} opentelemetry_logs)
Expand Down
Loading