Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions sdk/src/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,17 @@

package(default_visibility = ["//visibility:public"])

cc_library(
name = "atomic_unique_ptr",
hdrs = [
"atomic_unique_ptr.h",
],
include_prefix = "src/common",
deps = [
"//api",
],
)

cc_library(
name = "random",
srcs = ["random.cc"],
Expand All @@ -27,3 +38,27 @@ cc_library(
"//sdk/src/common/platform:fork",
],
)

cc_library(
name = "circular_buffer_range",
hdrs = [
"circular_buffer_range.h",
],
include_prefix = "src/common",
deps = [
"//api",
],
)

cc_library(
name = "circular_buffer",
hdrs = [
"circular_buffer.h",
],
include_prefix = "src/common",
deps = [
":atomic_unique_ptr",
":circular_buffer_range",
"//api",
],
)
82 changes: 82 additions & 0 deletions sdk/src/common/atomic_unique_ptr.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#pragma once

#include <atomic>
#include <memory>

#include "opentelemetry/version.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace common
{
/**
* An owning pointer similar to std::unique_ptr but with methods for atomic
* operations.
*/
template <class T>
class AtomicUniquePtr
{
public:
AtomicUniquePtr() noexcept {}

~AtomicUniquePtr() noexcept { Reset(); }

T &operator*() const noexcept { return *Get(); }

T *operator->() const noexcept { return Get(); }

/**
* @return the underly pointer managed.
*/
T *Get() const noexcept { return ptr_; }

/**
* @return true if the pointer is null
*/
bool IsNull() const noexcept { return ptr_ == nullptr; }

/**
* Atomically swap the pointer only if it's null.
* @param owner the pointer to swap with
* @return true if the swap was successful
*/
bool SwapIfNull(std::unique_ptr<T> &owner) noexcept
{
auto ptr = owner.get();
T *expected = nullptr;
auto was_successful = ptr_.compare_exchange_weak(expected, ptr, std::memory_order_release,
std::memory_order_relaxed);
if (was_successful)
{
owner.release();
return true;
}
return false;
}

/**
* Atomically swap the pointer with another.
* @param ptr the pointer to swap with
*/
void Swap(std::unique_ptr<T> &other) noexcept { other.reset(ptr_.exchange(other.release())); }

/**
* Set the pointer to a new value and delete the current value if non-null.
* @param ptr the new pointer value to set
*/
void Reset(T *ptr = nullptr) noexcept
{
ptr = ptr_.exchange(ptr);
if (ptr != nullptr)
{
delete ptr;
}
}

private:
std::atomic<T *> ptr_{nullptr};
};
} // namespace common
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
183 changes: 183 additions & 0 deletions sdk/src/common/circular_buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#pragma once

#include <atomic>
#include <cstdint>
#include <memory>

#include "opentelemetry/version.h"
#include "src/common/atomic_unique_ptr.h"
#include "src/common/circular_buffer_range.h"

OPENTELEMETRY_BEGIN_NAMESPACE
namespace sdk
{
namespace common
{
/*
* A lock-free circular buffer that supports multiple concurrent producers
* and a single consumer.
*/
template <class T>
class CircularBuffer
{
public:
explicit CircularBuffer(size_t max_size)
: data_{new AtomicUniquePtr<T>[max_size + 1]}, capacity_{max_size + 1}
{}

/**
* @return a range of the elements in the circular buffer
*
* Note: This method must only be called from the consumer thread.
*/
CircularBufferRange<const AtomicUniquePtr<T>> Peek() const noexcept
{
return const_cast<CircularBuffer *>(this)->PeekImpl();
}

/**
* Consume elements from the circular buffer's tail.
* @param n the number of elements to consume
* @param callback the callback to invoke with an AtomicUniquePtr to each
* consumed element.
*
* Note: The callback must set the passed AtomicUniquePtr to null.
*
* Note: This method must only be called from the consumer thread.
*/
template <class Callback>
void Consume(size_t n, Callback callback) noexcept
{
assert(n <= static_cast<size_t>(head_ - tail_));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's intended that users of CircularBuffer have to check the buffer size each time before calling Consume? Instead of n being the maximum of elements consumed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You may not always want to consume the maximum number of elements. For example, you might first call Peek to get a range of elements in the buffer, then call Consume to remove those elements if they were processed successfully. Passing the size is necessary since it's a concurrently multi-producer, single-consumer buffer and elements could have been added between the calls to Peek and Consume.

auto range = PeekImpl().Take(n);
static_assert(noexcept(callback(range)), "callback not allowed to throw");
tail_ += n;
callback(range);
}

/**
* Consume elements from the circular buffer's tail.
* @param n the number of elements to consume
*
* Note: This method must only be called from the consumer thread.
*/
void Consume(size_t n) noexcept
{
Consume(
n, [](CircularBufferRange<AtomicUniquePtr<T>> & range) noexcept {
range.ForEach([](AtomicUniquePtr<T> & ptr) noexcept {
ptr.Reset();
return true;
});
});
}

/**
* Adds an element into the circular buffer.
* @param ptr a pointer to the element to add
* @return true if the element was successfully added; false, otherwise.
*/
bool Add(std::unique_ptr<T> &ptr) noexcept
{
while (true)
{
uint64_t tail = tail_;
uint64_t head = head_;

// The circular buffer is full, so return false.
if (head - tail >= capacity_ - 1)
{
return false;
}

uint64_t head_index = head % capacity_;
if (data_[head_index].SwapIfNull(ptr))
{
auto new_head = head + 1;
auto expected_head = head;
if (head_.compare_exchange_weak(expected_head, new_head, std::memory_order_release,
std::memory_order_relaxed))
{
// free the swapped out value
ptr.reset();

return true;
}

// If we reached this point (unlikely), it means that between the last
// iteration elements were added and then consumed from the circular
// buffer, so we undo the swap and attempt to add again.
data_[head_index].Swap(ptr);
}
}
return true;
}

/**
* Clear the circular buffer.
*
* Note: This method must only be called from the consumer thread.
*/
void Clear() noexcept { Consume(size()); }

/**
* @return the maximum number of bytes that can be stored in the buffer.
*/
size_t max_size() const noexcept { return capacity_ - 1; }

/**
* @return true if the buffer is empty.
*/
bool empty() const noexcept { return head_ == tail_; }

/**
* @return the number of bytes stored in the circular buffer.
*
* Note: this method will only return a correct snapshot of the size if called
* from the consumer thread.
*/
size_t size() const noexcept
{
uint64_t tail = tail_;
uint64_t head = head_;
assert(tail <= head);
return head - tail;
}

/**
* @return the number of elements consumed from the circular buffer.
*/
uint64_t consumption_count() const noexcept { return tail_; }

/**
* @return the number of elements added to the circular buffer.
*/
uint64_t production_count() const noexcept { return head_; }

private:
std::unique_ptr<AtomicUniquePtr<T>[]> data_;
size_t capacity_;
std::atomic<uint64_t> head_{0};
std::atomic<uint64_t> tail_{0};

CircularBufferRange<AtomicUniquePtr<T>> PeekImpl() noexcept
{
uint64_t tail_index = tail_ % capacity_;
uint64_t head_index = head_ % capacity_;
if (head_index == tail_index)
{
return {};
}
auto data = data_.get();
if (tail_index < head_index)
{
return CircularBufferRange<AtomicUniquePtr<T>>{
nostd::span<AtomicUniquePtr<T>>{data + tail_index, head_index - tail_index}};
}
return {nostd::span<AtomicUniquePtr<T>>{data + tail_index, capacity_ - tail_index},
nostd::span<AtomicUniquePtr<T>>{data, head_index}};
}
};
} // namespace common
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Loading