Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions api/include/opentelemetry/common/spin_lock_mutex.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#pragma once

#include <atomic>

#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace common
{

/**
* A Mutex which uses atomic flags and spin-locks instead of halting threads.
*
* This class implements the `BasicLockable` specification:
* https://en.cppreference.com/w/cpp/named_req/BasicLockable
*/
class SpinLockMutex
{
public:
SpinLockMutex() noexcept {}
~SpinLockMutex() noexcept = default;
SpinLockMutex(const SpinLockMutex &) = delete;
SpinLockMutex &operator=(const SpinLockMutex &) = delete;
SpinLockMutex &operator=(const SpinLockMutex &) volatile = delete;

/**
* Blocks until a lock can be obtained for the current thread.
*
* This mutex will spin the current CPU waiting for the lock to be available. This can have
* decent performance in scenarios where there is low lock contention and lock-holders acheive
Comment thread
jsuereth marked this conversation as resolved.
* their work quickly. It degrades in scenarios where locked tasks take a long time.
*/
void lock() noexcept
{
/* Note: We expect code protected by this lock to be "fast", i.e. we do NOT incrementally
* back-off and wait/notify here, we just loop until we have access, then try again.
*
* This has the downside that we could be spinning a long time if the exporter is slow.
* Note: in C++20x we could use `.wait` to make this slightly better. This should move to
* an exponential-back-off / wait strategy.
*/
while (flag_.test_and_set(std::memory_order_acquire))
Comment thread
jsuereth marked this conversation as resolved.
/** TODO - We should immmediately yield if the machine is single processor. */
;
}
/** Releases the lock held by the execution agent. Throws no exceptions. */
void unlock() noexcept { flag_.clear(std::memory_order_release); }

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We can also add a try_lock() method (as provided by std::mutex ) which returns immediately with status. It will provide flexibility to caller whether try acquiring in tight loop, or lazy way . This mayn't be usable through lock_guard, but still useful if caller is not concerned about RAII. Something like:
bool try_lock() { return !flag_.test_and_set(std::memory_order_acquire)) }

private:
std::atomic_flag flag_{ATOMIC_FLAG_INIT};
};

} // namespace common
OPENTELEMETRY_END_NAMESPACE
19 changes: 7 additions & 12 deletions api/include/opentelemetry/metrics/provider.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#pragma once

#include <atomic>
#include <mutex>

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/metrics/meter_provider.h"
#include "opentelemetry/metrics/noop.h"
#include "opentelemetry/nostd/shared_ptr.h"
Expand All @@ -23,23 +24,17 @@ class Provider
*/
static nostd::shared_ptr<MeterProvider> GetMeterProvider() noexcept
{
while (GetLock().test_and_set(std::memory_order_acquire))
;
auto provider = nostd::shared_ptr<MeterProvider>(GetProvider());
GetLock().clear(std::memory_order_release);

return provider;
std::lock_guard<common::SpinLockMutex> guard(GetLock());
return nostd::shared_ptr<MeterProvider>(GetProvider());
}

/**
* Changes the singleton MeterProvider.
*/
static void SetMeterProvider(nostd::shared_ptr<MeterProvider> tp) noexcept
{
while (GetLock().test_and_set(std::memory_order_acquire))
;
std::lock_guard<common::SpinLockMutex> guard(GetLock());
GetProvider() = tp;
GetLock().clear(std::memory_order_release);
}

private:
Expand All @@ -49,9 +44,9 @@ class Provider
return provider;
}

static std::atomic_flag &GetLock() noexcept
static common::SpinLockMutex &GetLock() noexcept
{
static std::atomic_flag lock = ATOMIC_FLAG_INIT;
static common::SpinLockMutex lock;
return lock;
}
};
Expand Down
19 changes: 7 additions & 12 deletions api/include/opentelemetry/trace/provider.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
#pragma once

#include <atomic>
#include <mutex>

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/nostd/shared_ptr.h"
#include "opentelemetry/trace/noop.h"
#include "opentelemetry/trace/tracer_provider.h"
Expand All @@ -23,23 +24,17 @@ class Provider
*/
static nostd::shared_ptr<TracerProvider> GetTracerProvider() noexcept
{
while (GetLock().test_and_set(std::memory_order_acquire))
;
auto provider = nostd::shared_ptr<TracerProvider>(GetProvider());
GetLock().clear(std::memory_order_release);

return provider;
std::lock_guard<common::SpinLockMutex> guard(GetLock());
return nostd::shared_ptr<TracerProvider>(GetProvider());
}

/**
* Changes the singleton TracerProvider.
*/
static void SetTracerProvider(nostd::shared_ptr<TracerProvider> tp) noexcept
{
while (GetLock().test_and_set(std::memory_order_acquire))
;
std::lock_guard<common::SpinLockMutex> guard(GetLock());
GetProvider() = tp;
GetLock().clear(std::memory_order_release);
}

private:
Expand All @@ -49,9 +44,9 @@ class Provider
return provider;
}

static std::atomic_flag &GetLock() noexcept
static common::SpinLockMutex &GetLock() noexcept
{
static std::atomic_flag lock = ATOMIC_FLAG_INIT;
static common::SpinLockMutex lock;
return lock;
}
};
Expand Down
2 changes: 2 additions & 0 deletions sdk/include/opentelemetry/sdk/trace/exporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ class SpanExporter
* custom recordables or use the default SpanData recordable provided by the
* SDK.
* @return a newly initialized Recordable object
*
* Note: This method must be callable from multiple threads.
*/
virtual std::unique_ptr<Recordable> MakeRecordable() noexcept = 0;

Expand Down
2 changes: 2 additions & 0 deletions sdk/include/opentelemetry/sdk/trace/processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class SpanProcessor
* Create a span recordable. This requests a new span recordable from the
* associated exporter.
* @return a newly initialized recordable
*
* Note: This method must be callable from multiple threads.
*/
virtual std::unique_ptr<Recordable> MakeRecordable() noexcept = 0;

Expand Down
16 changes: 15 additions & 1 deletion sdk/include/opentelemetry/sdk/trace/simple_processor.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#pragma once

#include <atomic>
#include <mutex>

#include "opentelemetry/common/spin_lock_mutex.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk/trace/processor.h"

Expand All @@ -13,6 +17,9 @@ namespace trace
* SpanExporter, as soon as they are finished.
*
* OnEnd and ForceFlush are no-ops.
*
* All calls to the configured SpanExporter are synchronized using a
* spin-lock on an atomic_flag.
*/
class SimpleSpanProcessor : public SpanProcessor
{
Expand All @@ -35,6 +42,7 @@ class SimpleSpanProcessor : public SpanProcessor
void OnEnd(std::unique_ptr<Recordable> &&span) noexcept override
{
nostd::span<std::unique_ptr<Recordable>> batch(&span, 1);
const std::lock_guard<opentelemetry::common::SpinLockMutex> locked(lock_);
if (exporter_->Export(batch) == ExportResult::kFailure)
{
/* Once it is defined how the SDK does logging, an error should be
Expand All @@ -48,11 +56,17 @@ class SimpleSpanProcessor : public SpanProcessor

void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override
{
exporter_->Shutdown(timeout);
// We only call shutdown ONCE.
if (!shutdown_latch_.test_and_set(std::memory_order_acquire))
{
exporter_->Shutdown(timeout);
}
}

private:
std::unique_ptr<SpanExporter> exporter_;
opentelemetry::common::SpinLockMutex lock_;
std::atomic_flag shutdown_latch_{ATOMIC_FLAG_INIT};
};
} // namespace trace
} // namespace sdk
Expand Down
39 changes: 39 additions & 0 deletions sdk/test/trace/simple_processor_test.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "opentelemetry/sdk/trace/simple_processor.h"
#include "opentelemetry/exporters/memory/in_memory_span_exporter.h"
#include "opentelemetry/nostd/span.h"
#include "opentelemetry/sdk/trace/exporter.h"
#include "opentelemetry/sdk/trace/span_data.h"

#include <gtest/gtest.h>
Expand All @@ -27,3 +28,41 @@ TEST(SimpleProcessor, ToInMemorySpanExporter)

processor.Shutdown();
}

// An exporter that does nothing but record (and give back ) the # of times Shutdown was called.
class RecordShutdownExporter final : public SpanExporter
{
public:
RecordShutdownExporter(int *shutdown_counter) : shutdown_counter_(shutdown_counter) {}

std::unique_ptr<Recordable> MakeRecordable() noexcept override
{
return std::unique_ptr<Recordable>(new SpanData());
}

ExportResult Export(
const opentelemetry::nostd::span<std::unique_ptr<Recordable>> &recordables) noexcept override
{
return ExportResult::kSuccess;
}

void Shutdown(std::chrono::microseconds timeout = std::chrono::microseconds(0)) noexcept override
{
*shutdown_counter_ += 1;
}

private:
int *shutdown_counter_;
};

TEST(SimpleSpanProcessor, ShutdownCalledOnce)
{
int shutdowns = 0;
std::unique_ptr<RecordShutdownExporter> exporter(new RecordShutdownExporter(&shutdowns));
SimpleSpanProcessor processor(std::move(exporter));
EXPECT_EQ(0, shutdowns);
processor.Shutdown();
EXPECT_EQ(1, shutdowns);
processor.Shutdown();
EXPECT_EQ(1, shutdowns);
}