diff --git a/sdk/src/common/BUILD b/sdk/src/common/BUILD index a8981b4d55..466b075f16 100644 --- a/sdk/src/common/BUILD +++ b/sdk/src/common/BUILD @@ -14,6 +14,17 @@ package(default_visibility = ["//visibility:public"]) +cc_library( + name = "atomic_unique_ptr", + hdrs = [ + "atomic_unique_ptr.h", + ], + include_prefix = "src/common", + deps = [ + "//api", + ], +) + cc_library( name = "random", srcs = ["random.cc"], @@ -27,3 +38,27 @@ cc_library( "//sdk/src/common/platform:fork", ], ) + +cc_library( + name = "circular_buffer_range", + hdrs = [ + "circular_buffer_range.h", + ], + include_prefix = "src/common", + deps = [ + "//api", + ], +) + +cc_library( + name = "circular_buffer", + hdrs = [ + "circular_buffer.h", + ], + include_prefix = "src/common", + deps = [ + ":atomic_unique_ptr", + ":circular_buffer_range", + "//api", + ], +) diff --git a/sdk/src/common/atomic_unique_ptr.h b/sdk/src/common/atomic_unique_ptr.h new file mode 100644 index 0000000000..1fdfee618c --- /dev/null +++ b/sdk/src/common/atomic_unique_ptr.h @@ -0,0 +1,82 @@ +#pragma once + +#include +#include + +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace common +{ +/** + * An owning pointer similar to std::unique_ptr but with methods for atomic + * operations. + */ +template +class AtomicUniquePtr +{ +public: + AtomicUniquePtr() noexcept {} + + ~AtomicUniquePtr() noexcept { Reset(); } + + T &operator*() const noexcept { return *Get(); } + + T *operator->() const noexcept { return Get(); } + + /** + * @return the underly pointer managed. + */ + T *Get() const noexcept { return ptr_; } + + /** + * @return true if the pointer is null + */ + bool IsNull() const noexcept { return ptr_ == nullptr; } + + /** + * Atomically swap the pointer only if it's null. + * @param owner the pointer to swap with + * @return true if the swap was successful + */ + bool SwapIfNull(std::unique_ptr &owner) noexcept + { + auto ptr = owner.get(); + T *expected = nullptr; + auto was_successful = ptr_.compare_exchange_weak(expected, ptr, std::memory_order_release, + std::memory_order_relaxed); + if (was_successful) + { + owner.release(); + return true; + } + return false; + } + + /** + * Atomically swap the pointer with another. + * @param ptr the pointer to swap with + */ + void Swap(std::unique_ptr &other) noexcept { other.reset(ptr_.exchange(other.release())); } + + /** + * Set the pointer to a new value and delete the current value if non-null. + * @param ptr the new pointer value to set + */ + void Reset(T *ptr = nullptr) noexcept + { + ptr = ptr_.exchange(ptr); + if (ptr != nullptr) + { + delete ptr; + } + } + +private: + std::atomic ptr_{nullptr}; +}; +} // namespace common +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/common/circular_buffer.h b/sdk/src/common/circular_buffer.h new file mode 100644 index 0000000000..a5a104bc34 --- /dev/null +++ b/sdk/src/common/circular_buffer.h @@ -0,0 +1,183 @@ +#pragma once + +#include +#include +#include + +#include "opentelemetry/version.h" +#include "src/common/atomic_unique_ptr.h" +#include "src/common/circular_buffer_range.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace common +{ +/* + * A lock-free circular buffer that supports multiple concurrent producers + * and a single consumer. + */ +template +class CircularBuffer +{ +public: + explicit CircularBuffer(size_t max_size) + : data_{new AtomicUniquePtr[max_size + 1]}, capacity_{max_size + 1} + {} + + /** + * @return a range of the elements in the circular buffer + * + * Note: This method must only be called from the consumer thread. + */ + CircularBufferRange> Peek() const noexcept + { + return const_cast(this)->PeekImpl(); + } + + /** + * Consume elements from the circular buffer's tail. + * @param n the number of elements to consume + * @param callback the callback to invoke with an AtomicUniquePtr to each + * consumed element. + * + * Note: The callback must set the passed AtomicUniquePtr to null. + * + * Note: This method must only be called from the consumer thread. + */ + template + void Consume(size_t n, Callback callback) noexcept + { + assert(n <= static_cast(head_ - tail_)); + auto range = PeekImpl().Take(n); + static_assert(noexcept(callback(range)), "callback not allowed to throw"); + tail_ += n; + callback(range); + } + + /** + * Consume elements from the circular buffer's tail. + * @param n the number of elements to consume + * + * Note: This method must only be called from the consumer thread. + */ + void Consume(size_t n) noexcept + { + Consume( + n, [](CircularBufferRange> & range) noexcept { + range.ForEach([](AtomicUniquePtr & ptr) noexcept { + ptr.Reset(); + return true; + }); + }); + } + + /** + * Adds an element into the circular buffer. + * @param ptr a pointer to the element to add + * @return true if the element was successfully added; false, otherwise. + */ + bool Add(std::unique_ptr &ptr) noexcept + { + while (true) + { + uint64_t tail = tail_; + uint64_t head = head_; + + // The circular buffer is full, so return false. + if (head - tail >= capacity_ - 1) + { + return false; + } + + uint64_t head_index = head % capacity_; + if (data_[head_index].SwapIfNull(ptr)) + { + auto new_head = head + 1; + auto expected_head = head; + if (head_.compare_exchange_weak(expected_head, new_head, std::memory_order_release, + std::memory_order_relaxed)) + { + // free the swapped out value + ptr.reset(); + + return true; + } + + // If we reached this point (unlikely), it means that between the last + // iteration elements were added and then consumed from the circular + // buffer, so we undo the swap and attempt to add again. + data_[head_index].Swap(ptr); + } + } + return true; + } + + /** + * Clear the circular buffer. + * + * Note: This method must only be called from the consumer thread. + */ + void Clear() noexcept { Consume(size()); } + + /** + * @return the maximum number of bytes that can be stored in the buffer. + */ + size_t max_size() const noexcept { return capacity_ - 1; } + + /** + * @return true if the buffer is empty. + */ + bool empty() const noexcept { return head_ == tail_; } + + /** + * @return the number of bytes stored in the circular buffer. + * + * Note: this method will only return a correct snapshot of the size if called + * from the consumer thread. + */ + size_t size() const noexcept + { + uint64_t tail = tail_; + uint64_t head = head_; + assert(tail <= head); + return head - tail; + } + + /** + * @return the number of elements consumed from the circular buffer. + */ + uint64_t consumption_count() const noexcept { return tail_; } + + /** + * @return the number of elements added to the circular buffer. + */ + uint64_t production_count() const noexcept { return head_; } + +private: + std::unique_ptr[]> data_; + size_t capacity_; + std::atomic head_{0}; + std::atomic tail_{0}; + + CircularBufferRange> PeekImpl() noexcept + { + uint64_t tail_index = tail_ % capacity_; + uint64_t head_index = head_ % capacity_; + if (head_index == tail_index) + { + return {}; + } + auto data = data_.get(); + if (tail_index < head_index) + { + return CircularBufferRange>{ + nostd::span>{data + tail_index, head_index - tail_index}}; + } + return {nostd::span>{data + tail_index, capacity_ - tail_index}, + nostd::span>{data, head_index}}; + } +}; +} // namespace common +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/common/circular_buffer_range.h b/sdk/src/common/circular_buffer_range.h new file mode 100644 index 0000000000..d26026690b --- /dev/null +++ b/sdk/src/common/circular_buffer_range.h @@ -0,0 +1,90 @@ +#pragma once + +#include +#include +#include +#include + +#include "opentelemetry/nostd/span.h" +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk +{ +namespace common +{ +/** + * A non-owning view into a range of elements in a circular buffer. + */ +template +class CircularBufferRange +{ +public: + CircularBufferRange() noexcept = default; + + explicit CircularBufferRange(nostd::span first) noexcept : first_{first} {} + + CircularBufferRange(nostd::span first, nostd::span second) noexcept + : first_{first}, second_{second} + {} + + operator CircularBufferRange() const noexcept { return {first_, second_}; } + + /** + * Iterate over the elements in the range. + * @param callback the callback to call for each element + * @return true if we iterated over all elements + */ + template + bool ForEach(Callback callback) const + noexcept(noexcept(std::declval()(std::declval()))) + { + for (auto &value : first_) + { + if (!callback(value)) + { + return false; + } + } + for (auto &value : second_) + { + if (!callback(value)) + { + return false; + } + } + return true; + } + + /** + * @return the number of elements in the range + */ + size_t size() const noexcept { return first_.size() + second_.size(); } + + /** + * @return true if the range is empty + */ + bool empty() const noexcept { return first_.empty(); } + + /** + * Return a subrange taken from the start of this range. + * @param n the number of element to take in the subrange + * @return a subrange of the first n elements in this range + */ + CircularBufferRange Take(size_t n) const noexcept + { + assert(n <= size()); + if (first_.size() >= n) + { + return CircularBufferRange{nostd::span{first_.data(), n}}; + } + return {first_, nostd::span{second_.data(), n - first_.size()}}; + } + +private: + nostd::span first_; + nostd::span second_; +}; +} // namespace common +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/common/platform/BUILD b/sdk/src/common/platform/BUILD index 368106425c..c96a1417a0 100644 --- a/sdk/src/common/platform/BUILD +++ b/sdk/src/common/platform/BUILD @@ -24,6 +24,10 @@ cc_library( "fork.h", ], include_prefix = "src/common/platform", + linkopts = select({ + "//bazel:windows": [], + "//conditions:default": ["-pthread"], + }), deps = [ "//api", ], diff --git a/sdk/test/common/BUILD b/sdk/test/common/BUILD index d5aae82c96..a4b778f388 100644 --- a/sdk/test/common/BUILD +++ b/sdk/test/common/BUILD @@ -22,6 +22,34 @@ cc_test( ], ) +otel_cc_benchmark( + name = "random_benchmark", + srcs = ["random_benchmark.cc"], + deps = ["//sdk/src/common:random"], +) + +cc_test( + name = "atomic_unique_ptr_test", + srcs = [ + "atomic_unique_ptr_test.cc", + ], + deps = [ + "//sdk/src/common:atomic_unique_ptr", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "circular_buffer_range_test", + srcs = [ + "circular_buffer_range_test.cc", + ], + deps = [ + "//sdk/src/common:circular_buffer_range", + "@com_google_googletest//:gtest_main", + ], +) + cc_test( name = "random_fork_test", srcs = [ @@ -29,12 +57,25 @@ cc_test( ], deps = [ "//sdk/src/common:random", - "@com_google_googletest//:gtest_main", + ], +) + +cc_library( + name = "baseline_circular_buffer", + hdrs = [ + "baseline_circular_buffer.h", + ], + include_prefix = "test/common", + deps = [ + "//api", ], ) otel_cc_benchmark( - name = "random_benchmark", - srcs = ["random_benchmark.cc"], - deps = ["//sdk/src/common:random"], + name = "circular_buffer_benchmark", + srcs = ["circular_buffer_benchmark.cc"], + deps = [ + ":baseline_circular_buffer", + "//sdk/src/common:circular_buffer", + ], ) diff --git a/sdk/test/common/CMakeLists.txt b/sdk/test/common/CMakeLists.txt index e0b7a3ae75..7a181c90c4 100644 --- a/sdk/test/common/CMakeLists.txt +++ b/sdk/test/common/CMakeLists.txt @@ -1,7 +1,10 @@ -foreach(testname random_test fast_random_number_generator_test) +foreach(testname + random_test fast_random_number_generator_test atomic_unique_ptr_test + circular_buffer_range_test circular_buffer_test) add_executable(${testname} "${testname}.cc") - target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} - ${CMAKE_THREAD_LIBS_INIT} opentelemetry_common) + target_link_libraries( + ${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} + opentelemetry_common opentelemetry_trace) gtest_add_tests(TARGET ${testname} TEST_PREFIX trace. TEST_LIST ${testname}) endforeach() @@ -12,3 +15,7 @@ add_test(random_fork_test random_fork_test) add_executable(random_benchmark random_benchmark.cc) target_link_libraries(random_benchmark benchmark::benchmark ${CMAKE_THREAD_LIBS_INIT} opentelemetry_common) + +add_executable(circular_buffer_benchmark circular_buffer_benchmark.cc) +target_link_libraries(circular_buffer_benchmark benchmark::benchmark + ${CMAKE_THREAD_LIBS_INIT} opentelemetry_api) diff --git a/sdk/test/common/atomic_unique_ptr_test.cc b/sdk/test/common/atomic_unique_ptr_test.cc new file mode 100644 index 0000000000..f2c75a151f --- /dev/null +++ b/sdk/test/common/atomic_unique_ptr_test.cc @@ -0,0 +1,39 @@ +#include "src/common/atomic_unique_ptr.h" + +#include +using opentelemetry::sdk::common::AtomicUniquePtr; + +TEST(AtomicUniquePtrTest, SwapIfNullWithNull) +{ + AtomicUniquePtr ptr; + EXPECT_TRUE(ptr.IsNull()); + + std::unique_ptr x{new int{33}}; + EXPECT_TRUE(ptr.SwapIfNull(x)); + EXPECT_EQ(x, nullptr); +} + +TEST(AtomicUniquePtrTest, SwapIfNullWithNonNull) +{ + AtomicUniquePtr ptr; + ptr.Reset(new int{11}); + std::unique_ptr x{new int{33}}; + EXPECT_TRUE(!ptr.SwapIfNull(x)); + EXPECT_NE(x, nullptr); + EXPECT_EQ(*x, 33); + EXPECT_EQ(*ptr, 11); +} + +TEST(AtomicUniquePtrTest, Swap) +{ + AtomicUniquePtr ptr; + EXPECT_TRUE(ptr.IsNull()); + + ptr.Reset(new int{11}); + std::unique_ptr x{new int{33}}; + ptr.Swap(x); + EXPECT_FALSE(ptr.IsNull()); + EXPECT_NE(x, nullptr); + EXPECT_EQ(*x, 11); + EXPECT_EQ(*ptr, 33); +} diff --git a/sdk/test/common/baseline_circular_buffer.h b/sdk/test/common/baseline_circular_buffer.h new file mode 100644 index 0000000000..4c77990844 --- /dev/null +++ b/sdk/test/common/baseline_circular_buffer.h @@ -0,0 +1,85 @@ +#pragma once + +#include +#include +#include +#include + +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace testing +{ +/** + * A locking circular buffer. + * + * Used as a baseline in benchmarking. + */ +template +class BaselineCircularBuffer +{ +public: + explicit BaselineCircularBuffer(size_t max_size) : data_{max_size} {} + + /** + * Add an element to the circular buffer. + * @param element the element to add + * @return true if the element was added successfully + */ + bool Add(std::unique_ptr &element) noexcept { return this->Add(std::move(element)); } + + bool Add(std::unique_ptr &&element) noexcept + { + std::lock_guard lock_guard{mutex_}; + if (tail_ + data_.size() == head_) + { + return false; + } + data_[head_ % data_.size()] = std::move(element); + head_ += 1; + return true; + } + + /** + * Consume elements in the circular buffer. + * @param f the callback to call for each element + */ + template + void Consume(F f) noexcept + { + std::lock_guard lock_guard{mutex_}; + if (head_ == tail_) + { + return; + } + auto tail_index = tail_ % data_.size(); + auto head_index = head_ % data_.size(); + if (tail_index < head_index) + { + for (auto i = tail_index; i < head_index; ++i) + { + f(std::move(data_[i])); + } + } + else + { + for (auto i = tail_index; i < data_.size(); ++i) + { + f(std::move(data_[i])); + } + for (auto i = 0ull; i < head_index; ++i) + { + f(std::move(data_[i])); + } + } + tail_ = head_; + } + +private: + std::mutex mutex_; + uint64_t head_{0}; + uint64_t tail_{0}; + std::vector> data_; +}; +} // namespace testing +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/common/circular_buffer_benchmark.cc b/sdk/test/common/circular_buffer_benchmark.cc new file mode 100644 index 0000000000..568cb99174 --- /dev/null +++ b/sdk/test/common/circular_buffer_benchmark.cc @@ -0,0 +1,130 @@ +#include "benchmark/benchmark.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "src/common/circular_buffer.h" +#include "test/common/baseline_circular_buffer.h" +using opentelemetry::sdk::common::AtomicUniquePtr; +using opentelemetry::sdk::common::CircularBuffer; +using opentelemetry::sdk::common::CircularBufferRange; +using opentelemetry::testing::BaselineCircularBuffer; + +const int N = 10000; + +static uint64_t ConsumeBufferNumbers(BaselineCircularBuffer &buffer) noexcept +{ + uint64_t result = 0; + buffer.Consume([&](std::unique_ptr &&x) { + result += *x; + x.reset(); + }); + return result; +} + +static uint64_t ConsumeBufferNumbers(CircularBuffer &buffer) noexcept +{ + uint64_t result = 0; + buffer.Consume( + buffer.size(), [&](CircularBufferRange> & range) noexcept { + range.ForEach([&](AtomicUniquePtr & ptr) noexcept { + result += *ptr; + ptr.Reset(); + return true; + }); + }); + return result; +} + +template +static void GenerateNumbersForThread(Buffer &buffer, int n, std::atomic &sum) noexcept +{ + thread_local std::mt19937_64 random_number_generator{std::random_device{}()}; + for (int i = 0; i < n; ++i) + { + auto x = random_number_generator(); + std::unique_ptr element{new uint64_t{x}}; + if (buffer.Add(element)) + { + sum += x; + } + } +} + +template +static uint64_t GenerateNumbers(Buffer &buffer, int num_threads, int n) noexcept +{ + std::atomic sum{0}; + std::vector threads(num_threads); + for (auto &thread : threads) + { + thread = std::thread{GenerateNumbersForThread, std::ref(buffer), n, std::ref(sum)}; + } + for (auto &thread : threads) + { + thread.join(); + } + return sum; +} + +template +static void ConsumeNumbers(Buffer &buffer, uint64_t &sum, std::atomic &finished) noexcept +{ + while (!finished) + { + sum += ConsumeBufferNumbers(buffer); + } + sum += ConsumeBufferNumbers(buffer); +} + +template +static void RunSimulation(Buffer &buffer, int num_threads, int n) noexcept +{ + std::atomic finished{false}; + uint64_t consumer_sum{0}; + std::thread consumer_thread{ConsumeNumbers, std::ref(buffer), std::ref(consumer_sum), + std::ref(finished)}; + uint64_t producer_sum = GenerateNumbers(buffer, num_threads, n); + finished = true; + consumer_thread.join(); + if (consumer_sum != producer_sum) + { + std::cerr << "Sumulation failed: consumer_sum != producer_sum\n"; + std::terminate(); + } +} + +static void BM_BaselineBuffer(benchmark::State &state) +{ + const size_t max_elements = 500; + auto num_threads = state.range(0); + const int n = N / num_threads; + BaselineCircularBuffer buffer{max_elements}; + for (auto _ : state) + { + RunSimulation(buffer, num_threads, n); + } +} + +BENCHMARK(BM_BaselineBuffer)->Arg(1)->Arg(2)->Arg(4); + +static void BM_LockFreeBuffer(benchmark::State &state) +{ + const size_t max_elements = 500; + auto num_threads = state.range(0); + const int n = N / num_threads; + CircularBuffer buffer{max_elements}; + for (auto _ : state) + { + RunSimulation(buffer, num_threads, n); + } +} + +BENCHMARK(BM_LockFreeBuffer)->Arg(1)->Arg(2)->Arg(4); + +BENCHMARK_MAIN(); diff --git a/sdk/test/common/circular_buffer_range_test.cc b/sdk/test/common/circular_buffer_range_test.cc new file mode 100644 index 0000000000..0ffb619b2b --- /dev/null +++ b/sdk/test/common/circular_buffer_range_test.cc @@ -0,0 +1,56 @@ +#include "src/common/circular_buffer_range.h" + +#include + +#include +using opentelemetry::sdk::common::CircularBufferRange; + +TEST(CircularBufferRangeTest, ForEach) +{ + int array1[] = {1, 2, 3, 4}; + int array2[] = {5, 6, 7}; + CircularBufferRange range{array1, array2}; + + int x = 0; + range.ForEach([&](int y) { + EXPECT_EQ(++x, y); + return true; + }); + EXPECT_EQ(x, 7); +} + +TEST(CircularBufferRangeTest, ForEachWithExit) +{ + int array1[] = {1, 2, 3, 4}; + int array2[] = {5, 6, 7}; + CircularBufferRange range{array1, array2}; + + int x = 0; + range.ForEach([&](int y) { + EXPECT_EQ(++x, y); + return false; + }); + EXPECT_EQ(x, 1); + + x = 0; + range.ForEach([&](int y) { + EXPECT_EQ(++x, y); + return y != 5; + }); + EXPECT_EQ(x, 5); +} + +TEST(CircularBufferRangeTest, Conversion) +{ + int array1[] = {1, 2, 3, 4}; + int array2[] = {5, 6, 7}; + CircularBufferRange range{array1, array2}; + + CircularBufferRange range2{range}; + int x = 0; + range2.ForEach([&](int y) { + EXPECT_EQ(++x, y); + return true; + }); + EXPECT_EQ(x, 7); +} diff --git a/sdk/test/common/circular_buffer_test.cc b/sdk/test/common/circular_buffer_test.cc new file mode 100644 index 0000000000..7b5fc1c0f2 --- /dev/null +++ b/sdk/test/common/circular_buffer_test.cc @@ -0,0 +1,158 @@ +#include "src/common/circular_buffer.h" + +#include +#include +#include + +#include +using opentelemetry::sdk::common::AtomicUniquePtr; +using opentelemetry::sdk::common::CircularBuffer; +using opentelemetry::sdk::common::CircularBufferRange; + +static thread_local std::mt19937 RandomNumberGenerator{std::random_device{}()}; + +static void GenerateRandomNumbers(CircularBuffer &buffer, + std::vector &numbers, + int n) +{ + for (int i = 0; i < n; ++i) + { + auto value = static_cast(RandomNumberGenerator()); + std::unique_ptr x{new uint32_t{value}}; + if (buffer.Add(x)) + { + numbers.push_back(value); + } + } +} + +static void RunNumberProducers(CircularBuffer &buffer, + std::vector &numbers, + int num_threads, + int n) +{ + std::vector> thread_numbers(num_threads); + std::vector threads(num_threads); + for (int thread_index = 0; thread_index < num_threads; ++thread_index) + { + threads[thread_index] = std::thread{GenerateRandomNumbers, std::ref(buffer), + std::ref(thread_numbers[thread_index]), n}; + } + for (auto &thread : threads) + { + thread.join(); + } + for (int thread_index = 0; thread_index < num_threads; ++thread_index) + { + numbers.insert(numbers.end(), thread_numbers[thread_index].begin(), + thread_numbers[thread_index].end()); + } +} + +void RunNumberConsumer(CircularBuffer &buffer, + std::atomic &exit, + std::vector &numbers) +{ + while (true) + { + auto allotment = buffer.Peek(); + if (exit && allotment.empty()) + { + return; + } + auto n = std::uniform_int_distribution{0, allotment.size()}(RandomNumberGenerator); + buffer.Consume( + n, [&](CircularBufferRange> range) noexcept { + assert(range.size() == n); + range.ForEach([&](AtomicUniquePtr & ptr) noexcept { + assert(!ptr.IsNull()); + numbers.push_back(*ptr); + ptr.Reset(); + return true; + }); + }); + } +} + +TEST(CircularBufferTest, Add) +{ + CircularBuffer buffer{10}; + + std::unique_ptr x{new int{11}}; + EXPECT_TRUE(buffer.Add(x)); + EXPECT_EQ(x, nullptr); + auto range = buffer.Peek(); + EXPECT_EQ(range.size(), 1); + range.ForEach([](const AtomicUniquePtr &y) { + EXPECT_EQ(*y, 11); + return true; + }); +} + +TEST(CircularBufferTest, Clear) +{ + CircularBuffer buffer{10}; + + std::unique_ptr x{new int{11}}; + EXPECT_TRUE(buffer.Add(x)); + EXPECT_EQ(x, nullptr); + buffer.Clear(); + EXPECT_TRUE(buffer.empty()); +} + +TEST(CircularBufferTest, AddOnFull) +{ + CircularBuffer buffer{10}; + for (int i = 0; i < static_cast(buffer.max_size()); ++i) + { + std::unique_ptr x{new int{i}}; + EXPECT_TRUE(buffer.Add(x)); + } + std::unique_ptr x{new int{33}}; + EXPECT_FALSE(buffer.Add(x)); + EXPECT_NE(x, nullptr); + EXPECT_EQ(*x, 33); +} + +TEST(CircularBufferTest, Consume) +{ + CircularBuffer buffer{10}; + for (int i = 0; i < static_cast(buffer.max_size()); ++i) + { + std::unique_ptr x{new int{i}}; + EXPECT_TRUE(buffer.Add(x)); + } + int count = 0; + buffer.Consume( + 5, [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + EXPECT_EQ(*ptr, count++); + ptr.Reset(); + return true; + }); + }); + EXPECT_EQ(count, 5); +} + +TEST(CircularBufferTest, Simulation) +{ + const int num_producer_threads = 4; + const int n = 25000; + for (size_t max_size : {1, 2, 10, 50, 100, 1000}) + { + CircularBuffer buffer{max_size}; + std::vector producer_numbers; + std::vector consumer_numbers; + auto producers = std::thread{RunNumberProducers, std::ref(buffer), std::ref(producer_numbers), + num_producer_threads, n}; + std::atomic exit{false}; + auto consumer = std::thread{RunNumberConsumer, std::ref(buffer), std::ref(exit), + std::ref(consumer_numbers)}; + producers.join(); + exit = true; + consumer.join(); + std::sort(producer_numbers.begin(), producer_numbers.end()); + std::sort(consumer_numbers.begin(), consumer_numbers.end()); + EXPECT_EQ(producer_numbers, consumer_numbers); + } +}