From cdd5ddee34c0a6c0b5c9aca59ec60f7cd624e5e1 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Tue, 17 Mar 2020 16:05:52 -0700 Subject: [PATCH 01/12] Add AtomicUniquePtr --- sdk/src/common/BUILD | 26 ++++++++ sdk/src/common/atomic_unique_ptr.h | 77 +++++++++++++++++++++++ sdk/test/common/BUILD | 11 ++++ sdk/test/common/atomic_unique_ptr_test.cc | 38 +++++++++++ 4 files changed, 152 insertions(+) create mode 100644 sdk/src/common/BUILD create mode 100644 sdk/src/common/atomic_unique_ptr.h create mode 100644 sdk/test/common/BUILD create mode 100644 sdk/test/common/atomic_unique_ptr_test.cc diff --git a/sdk/src/common/BUILD b/sdk/src/common/BUILD new file mode 100644 index 0000000000..c2f5c8e210 --- /dev/null +++ b/sdk/src/common/BUILD @@ -0,0 +1,26 @@ +# Copyright 2020, OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +package(default_visibility = ["//visibility:public"]) + +cc_library( + name = "atomic_unique_ptr", + hdrs = [ + "atomic_unique_ptr.h", + ], + include_prefix = "src/common", + deps = [ + "//api", + ] +) diff --git a/sdk/src/common/atomic_unique_ptr.h b/sdk/src/common/atomic_unique_ptr.h new file mode 100644 index 0000000000..7d1c43a692 --- /dev/null +++ b/sdk/src/common/atomic_unique_ptr.h @@ -0,0 +1,77 @@ +#pragma once + +#include +#include + +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk { +namespace common { +/** + * An owning pointer similar to std::unique_ptr but with methods for atomic + * operations. + */ +template +class AtomicUniquePtr { + public: + AtomicUniquePtr() noexcept = default; + + ~AtomicUniquePtr() noexcept { Reset(); } + + T& operator*() const noexcept { return *Get(); } + + T* operator->() const noexcept { return Get(); } + + /** + * @return the underly pointer managed. + */ + T* Get() const noexcept { return ptr_; } + + /** + * @return true if the pointer is null + */ + bool IsNull() const noexcept { return ptr_ == nullptr; } + + /** + * Atomically swap the pointer only if it's null. + * @param owner the pointer to swap with + * @return true if the swap was successful + */ + bool SwapIfNull(std::unique_ptr& owner) noexcept { + auto ptr = owner.get(); + T* expected = nullptr; + auto was_successful = ptr_.compare_exchange_weak( + expected, ptr, std::memory_order_release, std::memory_order_relaxed); + if (was_successful) { + owner.release(); + return true; + } + return false; + } + + /** + * Atomically swap the pointer with another. + * @param ptr the pointer to swap with + */ + void Swap(std::unique_ptr& ptr) noexcept { + ptr.reset(ptr_.exchange(ptr.release())); + } + + /** + * Set the pointer to a new value and delete the current value if non-null. + * @param ptr the new pointer value to set + */ + void Reset(T* ptr = nullptr) noexcept { + ptr = ptr_.exchange(ptr); + if (ptr != nullptr) { + delete ptr; + } + } + + private: + std::atomic ptr_{nullptr}; +}; +} // namespace common +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/common/BUILD b/sdk/test/common/BUILD new file mode 100644 index 0000000000..0ab35aab7f --- /dev/null +++ b/sdk/test/common/BUILD @@ -0,0 +1,11 @@ +cc_test( + name = "atomic_unique_ptr_test", + srcs = [ + "atomic_unique_ptr_test.cc", + ], + deps = [ + "//sdk/src/common:atomic_unique_ptr", + "@com_google_googletest//:gtest_main", + ], +) + diff --git a/sdk/test/common/atomic_unique_ptr_test.cc b/sdk/test/common/atomic_unique_ptr_test.cc new file mode 100644 index 0000000000..8c92157ad1 --- /dev/null +++ b/sdk/test/common/atomic_unique_ptr_test.cc @@ -0,0 +1,38 @@ +#include "src/common/atomic_unique_ptr.h" + +#include +using opentelemetry::sdk::common::AtomicUniquePtr; + +TEST(AtomicUniquePtrTest, SwapIfNullWithNull) { + AtomicUniquePtr ptr; + EXPECT_TRUE(ptr.IsNull()); + + std::unique_ptr x{new int{33}}; + EXPECT_TRUE(ptr.SwapIfNull(x)); + EXPECT_EQ(x, nullptr); +} + +TEST(AtomicUniquePtrTest, SwapIfNullWithNonNull) +{ + AtomicUniquePtr ptr; + ptr.Reset(new int{11}); + std::unique_ptr x{new int{33}}; + EXPECT_TRUE(!ptr.SwapIfNull(x)); + EXPECT_NE(x, nullptr); + EXPECT_EQ(*x, 33); + EXPECT_EQ(*ptr, 11); +} + +TEST(AtomicUniquePtrTest, Swap) +{ + AtomicUniquePtr ptr; + EXPECT_TRUE(ptr.IsNull()); + + ptr.Reset(new int{11}); + std::unique_ptr x{new int{33}}; + ptr.Swap(x); + EXPECT_FALSE(ptr.IsNull()); + EXPECT_NE(x, nullptr); + EXPECT_EQ(*x, 11); + EXPECT_EQ(*ptr, 33); +} From c8fdee4a2686a5a121e5b12f589e6e53acfefc25 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Tue, 17 Mar 2020 22:11:41 -0700 Subject: [PATCH 02/12] Add CircularBuffer --- sdk/src/common/BUILD | 24 +++ sdk/src/common/circular_buffer.h | 174 ++++++++++++++++++ sdk/src/common/circular_buffer_range.h | 81 ++++++++ sdk/test/common/BUILD | 21 +++ sdk/test/common/circular_buffer_range_test.cc | 56 ++++++ sdk/test/common/circular_buffer_test.cc | 145 +++++++++++++++ 6 files changed, 501 insertions(+) create mode 100644 sdk/src/common/circular_buffer.h create mode 100644 sdk/src/common/circular_buffer_range.h create mode 100644 sdk/test/common/circular_buffer_range_test.cc create mode 100644 sdk/test/common/circular_buffer_test.cc diff --git a/sdk/src/common/BUILD b/sdk/src/common/BUILD index c2f5c8e210..3e9ed0c875 100644 --- a/sdk/src/common/BUILD +++ b/sdk/src/common/BUILD @@ -24,3 +24,27 @@ cc_library( "//api", ] ) + +cc_library( + name = "circular_buffer_range", + hdrs = [ + "circular_buffer_range.h", + ], + include_prefix = "src/common", + deps = [ + "//api", + ] +) + +cc_library( + name = "circular_buffer", + hdrs = [ + "circular_buffer.h", + ], + include_prefix = "src/common", + deps = [ + "//api", + ":atomic_unique_ptr", + ":circular_buffer_range", + ] +) diff --git a/sdk/src/common/circular_buffer.h b/sdk/src/common/circular_buffer.h new file mode 100644 index 0000000000..8820679d97 --- /dev/null +++ b/sdk/src/common/circular_buffer.h @@ -0,0 +1,174 @@ +#pragma once + +#include +#include +#include + +#include "opentelemetry/version.h" +#include "src/common/atomic_unique_ptr.h" +#include "src/common/circular_buffer_range.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk { +namespace common { +/* + * A lock-free circular buffer that supports multiple concurrent producers + * and a single consumer. + */ +template +class CircularBuffer { + public: + explicit CircularBuffer(size_t max_size) + : data_{new AtomicUniquePtr[max_size + 1]}, capacity_{max_size + 1} + {} + + /** + * @return a range of the elements in the circular buffer + * + * Note: This method must only be called from the consumer thread. + */ + CircularBufferRange> Peek() const noexcept + { + return const_cast(this)->PeekImpl(); + } + + /** + * Consume elements from the circular buffer's tail. + * @param n the number of elements to consume + * @param callback the callback to invoke with a AtomicUniquePtr to each + * consumed element. + * + * Note: The callback must set the passed AtomicUniquePtr to null. + * + * Note: This method must only be called from the consumer thread. + */ + template + void Consume(size_t n, Callback callback) noexcept { + assert(n <= static_cast(head_ - tail_)); + auto range = PeekImpl().Take(n); + static_assert(noexcept(callback(range)), "callback not allowed to throw"); + tail_ += n; + callback(range); + } + + /** + * Consume elements from the circular buffer's tail. + * @param n the number of elements to consume + * + * Note: This method must only be called from the consumer thread. + */ + void Consume(size_t n) noexcept { + Consume( + n, [](CircularBufferRange> & range) noexcept { + range.ForEach([](AtomicUniquePtr & ptr) noexcept { + ptr.Reset(); + return true; + }); + }); + } + + /** + * Adds an element into the circular buffer. + * @param ptr a pointer to the element to add + * @return true if the element was successfully added; false, otherwise. + */ + bool Add(std::unique_ptr& ptr) noexcept { + while (true) { + uint64_t tail = tail_; + uint64_t head = head_; + + // The circular buffer is full, so return false. + if (head - tail >= capacity_ - 1) + { + return false; + } + + uint64_t head_index = head % capacity_; + if (data_[head_index].SwapIfNull(ptr)) { + auto new_head = head + 1; + auto expected_head = head; + if (head_.compare_exchange_weak(expected_head, new_head, + std::memory_order_release, + std::memory_order_relaxed)) { + // free the swapped out value + ptr.reset(); + + return true; + } + + // If we reached this point (unlikely), it means that between the last + // iteration elements were added and then consumed from the circular + // buffer, so we undo the swap and attempt to add again. + data_[head_index].Swap(ptr); + } + } + return true; + } + + /** + * Clear the circular buffer. + * + * Note: This method must only be called from the consumer thread. + */ + void Clear() noexcept { Consume(size()); } + + /** + * @return the maximum number of bytes that can be stored in the buffer. + */ + size_t max_size() const noexcept { return capacity_ - 1; } + + /** + * @return true if the buffer is empty. + */ + bool empty() const noexcept { return head_ == tail_; } + + /** + * @return the number of bytes stored in the circular buffer. + * + * Note: this method will only return a correct snapshot of the size if called + * from the consumer thread. + */ + size_t size() const noexcept { + uint64_t tail = tail_; + uint64_t head = head_; + assert(tail <= head); + return head - tail; + } + + /** + * @return the number of elements consumed from the circular buffer. + */ + uint64_t consumption_count() const noexcept { return tail_; } + + /** + * @return the number of elements added to the circular buffer. + */ + uint64_t production_count() const noexcept { return head_; } + + private: + std::unique_ptr[]> data_; + size_t capacity_; + std::atomic head_{0}; + std::atomic tail_{0}; + + CircularBufferRange> PeekImpl() noexcept + { + uint64_t tail_index = tail_ % capacity_; + uint64_t head_index = head_ % capacity_; + if (head_index == tail_index) + { + return {}; + } + auto data = data_.get(); + if (tail_index < head_index) + { + return CircularBufferRange>{ + nostd::span>{data + tail_index, head_index - tail_index}}; + } + return {nostd::span>{data + tail_index, capacity_ - tail_index}, + nostd::span>{data, head_index}}; + } +}; +} // namespace common +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/common/circular_buffer_range.h b/sdk/src/common/circular_buffer_range.h new file mode 100644 index 0000000000..d195b4db88 --- /dev/null +++ b/sdk/src/common/circular_buffer_range.h @@ -0,0 +1,81 @@ +#pragma once + +#include +#include +#include +#include + +#include "opentelemetry/nostd/span.h" +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace sdk { +namespace common { +/** + * A non-owning view into a range of elements in a circular buffer. + */ +template +class CircularBufferRange { + public: + CircularBufferRange() noexcept = default; + + explicit CircularBufferRange(nostd::span first) noexcept : first_{first} {} + + CircularBufferRange(nostd::span first, nostd::span second) noexcept + : first_{first}, second_{second} {} + + operator CircularBufferRange() const noexcept { return {first_, second_}; } + + /** + * Iterate over the elements in the range. + * @param callback the callback to call for each element + * @return true if we iterated over all elements + */ + template + bool ForEach(Callback callback) const + noexcept(noexcept(std::declval()(std::declval()))) { + for (auto& value : first_) { + if (!callback(value)) { + return false; + } + } + for (auto& value : second_) { + if (!callback(value)) { + return false; + } + } + return true; + } + + /** + * @return the number of elements in the range + */ + size_t size() const noexcept { return first_.size() + second_.size(); } + + /** + * @return true if the range is empty + */ + bool empty() const noexcept { return first_.empty(); } + + /** + * Return a subrange taken from the start of this range. + * @param n the number of element to take in the subrange + * @return a subrange of the first n elements in this range + */ + CircularBufferRange Take(size_t n) const noexcept + { + assert(n <= size()); + if (first_.size() >= n) + { + return CircularBufferRange{nostd::span{first_.data(), n}}; + } + return {first_, nostd::span{second_.data(), n - first_.size()}}; + } + + private: + nostd::span first_; + nostd::span second_; +}; +} // namespace common +} // namespace sdk +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/common/BUILD b/sdk/test/common/BUILD index 0ab35aab7f..af8291e075 100644 --- a/sdk/test/common/BUILD +++ b/sdk/test/common/BUILD @@ -9,3 +9,24 @@ cc_test( ], ) +cc_test( + name = "circular_buffer_range_test", + srcs = [ + "circular_buffer_range_test.cc", + ], + deps = [ + "//sdk/src/common:circular_buffer_range", + "@com_google_googletest//:gtest_main", + ], +) + +cc_test( + name = "circular_buffer_test", + srcs = [ + "circular_buffer_test.cc", + ], + deps = [ + "//sdk/src/common:circular_buffer", + "@com_google_googletest//:gtest_main", + ], +) diff --git a/sdk/test/common/circular_buffer_range_test.cc b/sdk/test/common/circular_buffer_range_test.cc new file mode 100644 index 0000000000..0ffb619b2b --- /dev/null +++ b/sdk/test/common/circular_buffer_range_test.cc @@ -0,0 +1,56 @@ +#include "src/common/circular_buffer_range.h" + +#include + +#include +using opentelemetry::sdk::common::CircularBufferRange; + +TEST(CircularBufferRangeTest, ForEach) +{ + int array1[] = {1, 2, 3, 4}; + int array2[] = {5, 6, 7}; + CircularBufferRange range{array1, array2}; + + int x = 0; + range.ForEach([&](int y) { + EXPECT_EQ(++x, y); + return true; + }); + EXPECT_EQ(x, 7); +} + +TEST(CircularBufferRangeTest, ForEachWithExit) +{ + int array1[] = {1, 2, 3, 4}; + int array2[] = {5, 6, 7}; + CircularBufferRange range{array1, array2}; + + int x = 0; + range.ForEach([&](int y) { + EXPECT_EQ(++x, y); + return false; + }); + EXPECT_EQ(x, 1); + + x = 0; + range.ForEach([&](int y) { + EXPECT_EQ(++x, y); + return y != 5; + }); + EXPECT_EQ(x, 5); +} + +TEST(CircularBufferRangeTest, Conversion) +{ + int array1[] = {1, 2, 3, 4}; + int array2[] = {5, 6, 7}; + CircularBufferRange range{array1, array2}; + + CircularBufferRange range2{range}; + int x = 0; + range2.ForEach([&](int y) { + EXPECT_EQ(++x, y); + return true; + }); + EXPECT_EQ(x, 7); +} diff --git a/sdk/test/common/circular_buffer_test.cc b/sdk/test/common/circular_buffer_test.cc new file mode 100644 index 0000000000..973f43f44f --- /dev/null +++ b/sdk/test/common/circular_buffer_test.cc @@ -0,0 +1,145 @@ +#include "src/common/circular_buffer.h" + +#include +#include +#include + +#include +using opentelemetry::sdk::common::AtomicUniquePtr; +using opentelemetry::sdk::common::CircularBuffer; +using opentelemetry::sdk::common::CircularBufferRange; + +static thread_local std::mt19937 RandomNumberGenerator{std::random_device{}()}; + +static void GenerateRandomNumbers(CircularBuffer& buffer, + std::vector& numbers, int n) { + for (int i = 0; i < n; ++i) { + auto value = static_cast(RandomNumberGenerator()); + std::unique_ptr x{new uint32_t{value}}; + if (buffer.Add(x)) { + numbers.push_back(value); + } + } +} + +static void RunNumberProducers(CircularBuffer& buffer, + std::vector& numbers, int num_threads, + int n) { + std::vector> thread_numbers(num_threads); + std::vector threads(num_threads); + for (int thread_index = 0; thread_index < num_threads; ++thread_index) { + threads[thread_index] = + std::thread{GenerateRandomNumbers, std::ref(buffer), + std::ref(thread_numbers[thread_index]), n}; + } + for (auto& thread : threads) { + thread.join(); + } + for (int thread_index = 0; thread_index < num_threads; ++thread_index) { + numbers.insert(numbers.end(), thread_numbers[thread_index].begin(), + thread_numbers[thread_index].end()); + } +} + +void RunNumberConsumer(CircularBuffer& buffer, + std::atomic& exit, + std::vector& numbers) { + while (true) { + auto allotment = buffer.Peek(); + if (exit && allotment.empty()) { + return; + } + auto n = std::uniform_int_distribution{ + 0, allotment.size()}(RandomNumberGenerator); + buffer.Consume( + n, [&](CircularBufferRange> range) noexcept { + assert(range.size() == n); + range.ForEach([&](AtomicUniquePtr & ptr) noexcept { + assert(!ptr.IsNull()); + numbers.push_back(*ptr); + ptr.Reset(); + return true; + }); + }); + } +} + +TEST(CircularBufferTest, Add) +{ + CircularBuffer buffer{10}; + + std::unique_ptr x{new int{11}}; + EXPECT_TRUE(buffer.Add(x)); + EXPECT_EQ(x, nullptr); + auto range = buffer.Peek(); + EXPECT_EQ(range.size(), 1); + range.ForEach([](const AtomicUniquePtr &y) { + EXPECT_EQ(*y, 11); + return true; + }); +} + +TEST(CircularBufferTest, Clear) +{ + CircularBuffer buffer{10}; + + std::unique_ptr x{new int{11}}; + EXPECT_TRUE(buffer.Add(x)); + EXPECT_EQ(x, nullptr); + buffer.Clear(); + EXPECT_TRUE(buffer.empty()); +} + +TEST(CircularBufferTest, AddOnFull) +{ + CircularBuffer buffer{10}; + for (int i = 0; i < static_cast(buffer.max_size()); ++i) + { + std::unique_ptr x{new int{i}}; + EXPECT_TRUE(buffer.Add(x)); + } + std::unique_ptr x{new int{33}}; + EXPECT_FALSE(buffer.Add(x)); + EXPECT_NE(x, nullptr); + EXPECT_EQ(*x, 33); +} + +TEST(CircularBufferTest, Consume) { + CircularBuffer buffer{10}; + for (int i = 0; i < static_cast(buffer.max_size()); ++i) { + std::unique_ptr x{new int{i}}; + EXPECT_TRUE(buffer.Add(x)); + } + int count = 0; + buffer.Consume( + 5, [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + EXPECT_EQ(*ptr, count++); + ptr.Reset(); + return true; + }); + }); + EXPECT_EQ(count, 5); +} + +TEST(CircularBufferTest, Simulation) { + const int num_producer_threads = 4; + const int n = 25000; + for (size_t max_size : {1, 2, 10, 50, 100, 1000}) { + CircularBuffer buffer{max_size}; + std::vector producer_numbers; + std::vector consumer_numbers; + auto producers = + std::thread{RunNumberProducers, std::ref(buffer), + std::ref(producer_numbers), num_producer_threads, n}; + std::atomic exit{false}; + auto consumer = std::thread{RunNumberConsumer, std::ref(buffer), + std::ref(exit), std::ref(consumer_numbers)}; + producers.join(); + exit = true; + consumer.join(); + std::sort(producer_numbers.begin(), producer_numbers.end()); + std::sort(consumer_numbers.begin(), consumer_numbers.end()); + EXPECT_EQ(producer_numbers, consumer_numbers); + } +} From 92fdb17a8ecc3610dd081fdd524aabb9f6b39eca Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 18 Mar 2020 16:06:10 -0700 Subject: [PATCH 03/12] Add circular buffer benchmark --- sdk/test/common/BUILD | 21 +++ sdk/test/common/baseline_circular_buffer.h | 75 ++++++++++ sdk/test/common/circular_buffer_benchmark.cc | 144 +++++++++++++++++++ 3 files changed, 240 insertions(+) create mode 100644 sdk/test/common/baseline_circular_buffer.h create mode 100644 sdk/test/common/circular_buffer_benchmark.cc diff --git a/sdk/test/common/BUILD b/sdk/test/common/BUILD index af8291e075..b67359832a 100644 --- a/sdk/test/common/BUILD +++ b/sdk/test/common/BUILD @@ -1,3 +1,5 @@ +load("//bazel:otel_cc_benchmark.bzl", "otel_cc_benchmark") + cc_test( name = "atomic_unique_ptr_test", srcs = [ @@ -30,3 +32,22 @@ cc_test( "@com_google_googletest//:gtest_main", ], ) + +cc_library( + name = "baseline_circular_buffer", + hdrs = [ + "baseline_circular_buffer.h", + ], + deps = [ + "//api", + ], +) + +otel_cc_benchmark( + name = "circular_buffer_benchmark", + srcs = ["circular_buffer_benchmark.cc"], + deps = [ + "//sdk/src/common:circular_buffer", + ":baseline_circular_buffer", + ], +) diff --git a/sdk/test/common/baseline_circular_buffer.h b/sdk/test/common/baseline_circular_buffer.h new file mode 100644 index 0000000000..082436614c --- /dev/null +++ b/sdk/test/common/baseline_circular_buffer.h @@ -0,0 +1,75 @@ +#pragma once + +#include +#include +#include +#include + +#include "opentelemetry/version.h" + +OPENTELEMETRY_BEGIN_NAMESPACE +namespace testing { +/** + * A locking circular buffer. + * + * Used as a baseline in benchmarking. + */ +template +class BaselineCircularBuffer { + public: + explicit BaselineCircularBuffer(size_t max_size) : data_{max_size} {} + + /** + * Add an element to the circular buffer. + * @param element the element to add + * @return true if the element was added successfully + */ + bool Add(std::unique_ptr& element) noexcept { + return this->Add(std::move(element)); + } + + bool Add(std::unique_ptr&& element) noexcept { + std::lock_guard lock_gaurd{mutex_}; + if (tail_ + data_.size() == head_) { + return false; + } + data_[head_ % data_.size()] = std::move(element); + head_ += 1; + return true; + } + + /** + * Consume elements in the circular buffer. + * @param f the callback to call for each element + */ + template + void Consume(F f) noexcept { + std::lock_guard lock_guard{mutex_}; + if (head_ == tail_) { + return; + } + auto tail_index = tail_ % data_.size(); + auto head_index = head_ % data_.size(); + if (tail_index < head_index) { + for (auto i = tail_index; i < head_index; ++i) { + f(std::move(data_[i])); + } + } else { + for (auto i = tail_index; i < data_.size(); ++i) { + f(std::move(data_[i])); + } + for (auto i = 0ull; i < head_index; ++i) { + f(std::move(data_[i])); + } + } + tail_ = head_; + } + + private: + std::mutex mutex_; + uint64_t head_{0}; + uint64_t tail_{0}; + std::vector> data_; +}; +} // namespace testing +OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/common/circular_buffer_benchmark.cc b/sdk/test/common/circular_buffer_benchmark.cc new file mode 100644 index 0000000000..1d8a1f700e --- /dev/null +++ b/sdk/test/common/circular_buffer_benchmark.cc @@ -0,0 +1,144 @@ +#include "benchmark/benchmark.h" + +#include +#include +#include +#include +#include +#include +#include + +#include "src/common/circular_buffer.h" +#include "sdk/test/common/baseline_circular_buffer.h" +using opentelemetry::sdk::common::AtomicUniquePtr; +using opentelemetry::sdk::common::CircularBuffer; +using opentelemetry::sdk::common::CircularBufferRange; +using opentelemetry::testing::BaselineCircularBuffer; + +const int N = 10000; + +//-------------------------------------------------------------------------------------------------- +// ConsumeBufferNumbers +//-------------------------------------------------------------------------------------------------- +uint64_t ConsumeBufferNumbers( + BaselineCircularBuffer& buffer) noexcept { + uint64_t result = 0; + buffer.Consume([&](std::unique_ptr&& x) { + result += *x; + x.reset(); + }); + return result; +} + +uint64_t ConsumeBufferNumbers(CircularBuffer& buffer) noexcept { + uint64_t result = 0; + buffer.Consume( + buffer.size(), [&](CircularBufferRange> & + range) noexcept { + range.ForEach([&](AtomicUniquePtr & ptr) noexcept { + result += *ptr; + ptr.Reset(); + return true; + }); + }); + return result; +} + +//-------------------------------------------------------------------------------------------------- +// GenerateNumbersForThread +//-------------------------------------------------------------------------------------------------- +template +static void GenerateNumbersForThread(Buffer& buffer, int n, + std::atomic& sum) noexcept { + thread_local std::mt19937_64 random_number_generator{std::random_device{}()}; + for (int i = 0; i < n; ++i) { + auto x = random_number_generator(); + std::unique_ptr element{new uint64_t{x}}; + if (buffer.Add(element)) { + sum += x; + } + } +} + +//-------------------------------------------------------------------------------------------------- +// GenerateNumbers +//-------------------------------------------------------------------------------------------------- +template +static uint64_t GenerateNumbers(Buffer& buffer, int num_threads, + int n) noexcept { + std::atomic sum{0}; + std::vector threads(num_threads); + for (auto& thread : threads) { + thread = std::thread{GenerateNumbersForThread, std::ref(buffer), n, + std::ref(sum)}; + } + for (auto& thread : threads) { + thread.join(); + } + return sum; +} + +//-------------------------------------------------------------------------------------------------- +// ConsumeNumbers +//-------------------------------------------------------------------------------------------------- +template +static void ConsumeNumbers(Buffer& buffer, uint64_t& sum, + std::atomic& finished) noexcept { + while (!finished) { + sum += ConsumeBufferNumbers(buffer); + } + sum += ConsumeBufferNumbers(buffer); +} + +//-------------------------------------------------------------------------------------------------- +// RunSimulation +//-------------------------------------------------------------------------------------------------- +template +static void RunSimulation(Buffer& buffer, int num_threads, int n) noexcept { + std::atomic finished{false}; + uint64_t consumer_sum{0}; + std::thread consumer_thread{ConsumeNumbers, std::ref(buffer), + std::ref(consumer_sum), std::ref(finished)}; + uint64_t producer_sum = GenerateNumbers(buffer, num_threads, n); + finished = true; + consumer_thread.join(); + if (consumer_sum != producer_sum) { + std::cerr << "Sumulation failed: consumer_sum != producer_sum\n"; + std::terminate(); + } +} + +//-------------------------------------------------------------------------------------------------- +// BM_BaselineBuffer +//-------------------------------------------------------------------------------------------------- +static void BM_BaselineBuffer(benchmark::State& state) { + const size_t max_elements = 500; + auto num_threads = state.range(0); + const int n = N / num_threads; + BaselineCircularBuffer buffer{max_elements}; + for (auto _ : state) { + RunSimulation(buffer, num_threads, n); + } +} + +BENCHMARK(BM_BaselineBuffer)->Arg(1)->Arg(2)->Arg(4); + +//-------------------------------------------------------------------------------------------------- +// BM_LockFreeBuffer +//-------------------------------------------------------------------------------------------------- +static void BM_LockFreeBuffer(benchmark::State& state) { + const size_t max_elements = 500; + auto num_threads = state.range(0); + const int n = N / num_threads; + CircularBuffer buffer{max_elements}; + for (auto _ : state) { + RunSimulation(buffer, num_threads, n); + } +} + +BENCHMARK(BM_LockFreeBuffer)->Arg(1)->Arg(2)->Arg(4); + +//-------------------------------------------------------------------------------------------------- +// BENCHMARK_MAIN +//-------------------------------------------------------------------------------------------------- +BENCHMARK_MAIN(); From 5d61e93ac4d89995d6fc668ee0757376d96fd3bb Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 18 Mar 2020 16:07:55 -0700 Subject: [PATCH 04/12] Reformat --- sdk/src/common/BUILD | 52 ++++----- sdk/src/common/atomic_unique_ptr.h | 49 ++++---- sdk/src/common/circular_buffer.h | 80 +++++++------ sdk/src/common/circular_buffer_range.h | 35 +++--- sdk/test/common/BUILD | 2 +- sdk/test/common/atomic_unique_ptr_test.cc | 3 +- sdk/test/common/baseline_circular_buffer.h | 44 ++++--- sdk/test/common/circular_buffer_benchmark.cc | 76 ++++++------ sdk/test/common/circular_buffer_test.cc | 117 ++++++++++--------- 9 files changed, 256 insertions(+), 202 deletions(-) diff --git a/sdk/src/common/BUILD b/sdk/src/common/BUILD index 3e9ed0c875..e4e84b31eb 100644 --- a/sdk/src/common/BUILD +++ b/sdk/src/common/BUILD @@ -15,36 +15,36 @@ package(default_visibility = ["//visibility:public"]) cc_library( - name = "atomic_unique_ptr", - hdrs = [ - "atomic_unique_ptr.h", - ], - include_prefix = "src/common", - deps = [ - "//api", - ] + name = "atomic_unique_ptr", + hdrs = [ + "atomic_unique_ptr.h", + ], + include_prefix = "src/common", + deps = [ + "//api", + ], ) cc_library( - name = "circular_buffer_range", - hdrs = [ - "circular_buffer_range.h", - ], - include_prefix = "src/common", - deps = [ - "//api", - ] + name = "circular_buffer_range", + hdrs = [ + "circular_buffer_range.h", + ], + include_prefix = "src/common", + deps = [ + "//api", + ], ) cc_library( - name = "circular_buffer", - hdrs = [ - "circular_buffer.h", - ], - include_prefix = "src/common", - deps = [ - "//api", - ":atomic_unique_ptr", - ":circular_buffer_range", - ] + name = "circular_buffer", + hdrs = [ + "circular_buffer.h", + ], + include_prefix = "src/common", + deps = [ + ":atomic_unique_ptr", + ":circular_buffer_range", + "//api", + ], ) diff --git a/sdk/src/common/atomic_unique_ptr.h b/sdk/src/common/atomic_unique_ptr.h index 7d1c43a692..2d57a2a2ae 100644 --- a/sdk/src/common/atomic_unique_ptr.h +++ b/sdk/src/common/atomic_unique_ptr.h @@ -6,27 +6,30 @@ #include "opentelemetry/version.h" OPENTELEMETRY_BEGIN_NAMESPACE -namespace sdk { -namespace common { +namespace sdk +{ +namespace common +{ /** * An owning pointer similar to std::unique_ptr but with methods for atomic * operations. */ template -class AtomicUniquePtr { - public: +class AtomicUniquePtr +{ +public: AtomicUniquePtr() noexcept = default; ~AtomicUniquePtr() noexcept { Reset(); } - T& operator*() const noexcept { return *Get(); } + T &operator*() const noexcept { return *Get(); } - T* operator->() const noexcept { return Get(); } + T *operator->() const noexcept { return Get(); } /** * @return the underly pointer managed. */ - T* Get() const noexcept { return ptr_; } + T *Get() const noexcept { return ptr_; } /** * @return true if the pointer is null @@ -38,12 +41,14 @@ class AtomicUniquePtr { * @param owner the pointer to swap with * @return true if the swap was successful */ - bool SwapIfNull(std::unique_ptr& owner) noexcept { - auto ptr = owner.get(); - T* expected = nullptr; - auto was_successful = ptr_.compare_exchange_weak( - expected, ptr, std::memory_order_release, std::memory_order_relaxed); - if (was_successful) { + bool SwapIfNull(std::unique_ptr &owner) noexcept + { + auto ptr = owner.get(); + T *expected = nullptr; + auto was_successful = ptr_.compare_exchange_weak(expected, ptr, std::memory_order_release, + std::memory_order_relaxed); + if (was_successful) + { owner.release(); return true; } @@ -54,24 +59,24 @@ class AtomicUniquePtr { * Atomically swap the pointer with another. * @param ptr the pointer to swap with */ - void Swap(std::unique_ptr& ptr) noexcept { - ptr.reset(ptr_.exchange(ptr.release())); - } + void Swap(std::unique_ptr &ptr) noexcept { ptr.reset(ptr_.exchange(ptr.release())); } /** * Set the pointer to a new value and delete the current value if non-null. * @param ptr the new pointer value to set */ - void Reset(T* ptr = nullptr) noexcept { + void Reset(T *ptr = nullptr) noexcept + { ptr = ptr_.exchange(ptr); - if (ptr != nullptr) { + if (ptr != nullptr) + { delete ptr; } } - private: - std::atomic ptr_{nullptr}; +private: + std::atomic ptr_{nullptr}; }; -} // namespace common -} // namespace sdk +} // namespace common +} // namespace sdk OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/common/circular_buffer.h b/sdk/src/common/circular_buffer.h index 8820679d97..9b690669fc 100644 --- a/sdk/src/common/circular_buffer.h +++ b/sdk/src/common/circular_buffer.h @@ -9,27 +9,30 @@ #include "src/common/circular_buffer_range.h" OPENTELEMETRY_BEGIN_NAMESPACE -namespace sdk { -namespace common { +namespace sdk +{ +namespace common +{ /* * A lock-free circular buffer that supports multiple concurrent producers * and a single consumer. */ template -class CircularBuffer { - public: - explicit CircularBuffer(size_t max_size) - : data_{new AtomicUniquePtr[max_size + 1]}, capacity_{max_size + 1} - {} - - /** - * @return a range of the elements in the circular buffer - * - * Note: This method must only be called from the consumer thread. - */ - CircularBufferRange> Peek() const noexcept - { - return const_cast(this)->PeekImpl(); +class CircularBuffer +{ +public: + explicit CircularBuffer(size_t max_size) + : data_{new AtomicUniquePtr[max_size + 1]}, capacity_{max_size + 1} + {} + + /** + * @return a range of the elements in the circular buffer + * + * Note: This method must only be called from the consumer thread. + */ + CircularBufferRange> Peek() const noexcept + { + return const_cast(this)->PeekImpl(); } /** @@ -43,7 +46,8 @@ class CircularBuffer { * Note: This method must only be called from the consumer thread. */ template - void Consume(size_t n, Callback callback) noexcept { + void Consume(size_t n, Callback callback) noexcept + { assert(n <= static_cast(head_ - tail_)); auto range = PeekImpl().Take(n); static_assert(noexcept(callback(range)), "callback not allowed to throw"); @@ -57,14 +61,14 @@ class CircularBuffer { * * Note: This method must only be called from the consumer thread. */ - void Consume(size_t n) noexcept { - Consume( - n, [](CircularBufferRange> & range) noexcept { - range.ForEach([](AtomicUniquePtr & ptr) noexcept { - ptr.Reset(); - return true; - }); - }); + void Consume(size_t n) noexcept + { + Consume(n, [](CircularBufferRange> & range) noexcept { + range.ForEach([](AtomicUniquePtr & ptr) noexcept { + ptr.Reset(); + return true; + }); + }); } /** @@ -72,8 +76,10 @@ class CircularBuffer { * @param ptr a pointer to the element to add * @return true if the element was successfully added; false, otherwise. */ - bool Add(std::unique_ptr& ptr) noexcept { - while (true) { + bool Add(std::unique_ptr &ptr) noexcept + { + while (true) + { uint64_t tail = tail_; uint64_t head = head_; @@ -84,12 +90,13 @@ class CircularBuffer { } uint64_t head_index = head % capacity_; - if (data_[head_index].SwapIfNull(ptr)) { - auto new_head = head + 1; + if (data_[head_index].SwapIfNull(ptr)) + { + auto new_head = head + 1; auto expected_head = head; - if (head_.compare_exchange_weak(expected_head, new_head, - std::memory_order_release, - std::memory_order_relaxed)) { + if (head_.compare_exchange_weak(expected_head, new_head, std::memory_order_release, + std::memory_order_relaxed)) + { // free the swapped out value ptr.reset(); @@ -128,7 +135,8 @@ class CircularBuffer { * Note: this method will only return a correct snapshot of the size if called * from the consumer thread. */ - size_t size() const noexcept { + size_t size() const noexcept + { uint64_t tail = tail_; uint64_t head = head_; assert(tail <= head); @@ -145,7 +153,7 @@ class CircularBuffer { */ uint64_t production_count() const noexcept { return head_; } - private: +private: std::unique_ptr[]> data_; size_t capacity_; std::atomic head_{0}; @@ -169,6 +177,6 @@ class CircularBuffer { nostd::span>{data, head_index}}; } }; -} // namespace common -} // namespace sdk +} // namespace common +} // namespace sdk OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/src/common/circular_buffer_range.h b/sdk/src/common/circular_buffer_range.h index d195b4db88..d26026690b 100644 --- a/sdk/src/common/circular_buffer_range.h +++ b/sdk/src/common/circular_buffer_range.h @@ -9,20 +9,24 @@ #include "opentelemetry/version.h" OPENTELEMETRY_BEGIN_NAMESPACE -namespace sdk { -namespace common { +namespace sdk +{ +namespace common +{ /** * A non-owning view into a range of elements in a circular buffer. */ template -class CircularBufferRange { - public: +class CircularBufferRange +{ +public: CircularBufferRange() noexcept = default; explicit CircularBufferRange(nostd::span first) noexcept : first_{first} {} CircularBufferRange(nostd::span first, nostd::span second) noexcept - : first_{first}, second_{second} {} + : first_{first}, second_{second} + {} operator CircularBufferRange() const noexcept { return {first_, second_}; } @@ -33,14 +37,19 @@ class CircularBufferRange { */ template bool ForEach(Callback callback) const - noexcept(noexcept(std::declval()(std::declval()))) { - for (auto& value : first_) { - if (!callback(value)) { + noexcept(noexcept(std::declval()(std::declval()))) + { + for (auto &value : first_) + { + if (!callback(value)) + { return false; } } - for (auto& value : second_) { - if (!callback(value)) { + for (auto &value : second_) + { + if (!callback(value)) + { return false; } } @@ -72,10 +81,10 @@ class CircularBufferRange { return {first_, nostd::span{second_.data(), n - first_.size()}}; } - private: +private: nostd::span first_; nostd::span second_; }; -} // namespace common -} // namespace sdk +} // namespace common +} // namespace sdk OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/common/BUILD b/sdk/test/common/BUILD index b67359832a..e5351a6da3 100644 --- a/sdk/test/common/BUILD +++ b/sdk/test/common/BUILD @@ -47,7 +47,7 @@ otel_cc_benchmark( name = "circular_buffer_benchmark", srcs = ["circular_buffer_benchmark.cc"], deps = [ - "//sdk/src/common:circular_buffer", ":baseline_circular_buffer", + "//sdk/src/common:circular_buffer", ], ) diff --git a/sdk/test/common/atomic_unique_ptr_test.cc b/sdk/test/common/atomic_unique_ptr_test.cc index 8c92157ad1..f2c75a151f 100644 --- a/sdk/test/common/atomic_unique_ptr_test.cc +++ b/sdk/test/common/atomic_unique_ptr_test.cc @@ -3,7 +3,8 @@ #include using opentelemetry::sdk::common::AtomicUniquePtr; -TEST(AtomicUniquePtrTest, SwapIfNullWithNull) { +TEST(AtomicUniquePtrTest, SwapIfNullWithNull) +{ AtomicUniquePtr ptr; EXPECT_TRUE(ptr.IsNull()); diff --git a/sdk/test/common/baseline_circular_buffer.h b/sdk/test/common/baseline_circular_buffer.h index 082436614c..f3f591ad9b 100644 --- a/sdk/test/common/baseline_circular_buffer.h +++ b/sdk/test/common/baseline_circular_buffer.h @@ -8,15 +8,17 @@ #include "opentelemetry/version.h" OPENTELEMETRY_BEGIN_NAMESPACE -namespace testing { +namespace testing +{ /** * A locking circular buffer. * * Used as a baseline in benchmarking. */ template -class BaselineCircularBuffer { - public: +class BaselineCircularBuffer +{ +public: explicit BaselineCircularBuffer(size_t max_size) : data_{max_size} {} /** @@ -24,13 +26,13 @@ class BaselineCircularBuffer { * @param element the element to add * @return true if the element was added successfully */ - bool Add(std::unique_ptr& element) noexcept { - return this->Add(std::move(element)); - } + bool Add(std::unique_ptr &element) noexcept { return this->Add(std::move(element)); } - bool Add(std::unique_ptr&& element) noexcept { + bool Add(std::unique_ptr &&element) noexcept + { std::lock_guard lock_gaurd{mutex_}; - if (tail_ + data_.size() == head_) { + if (tail_ + data_.size() == head_) + { return false; } data_[head_ % data_.size()] = std::move(element); @@ -43,33 +45,41 @@ class BaselineCircularBuffer { * @param f the callback to call for each element */ template - void Consume(F f) noexcept { + void Consume(F f) noexcept + { std::lock_guard lock_guard{mutex_}; - if (head_ == tail_) { + if (head_ == tail_) + { return; } auto tail_index = tail_ % data_.size(); auto head_index = head_ % data_.size(); - if (tail_index < head_index) { - for (auto i = tail_index; i < head_index; ++i) { + if (tail_index < head_index) + { + for (auto i = tail_index; i < head_index; ++i) + { f(std::move(data_[i])); } - } else { - for (auto i = tail_index; i < data_.size(); ++i) { + } + else + { + for (auto i = tail_index; i < data_.size(); ++i) + { f(std::move(data_[i])); } - for (auto i = 0ull; i < head_index; ++i) { + for (auto i = 0ull; i < head_index; ++i) + { f(std::move(data_[i])); } } tail_ = head_; } - private: +private: std::mutex mutex_; uint64_t head_{0}; uint64_t tail_{0}; std::vector> data_; }; -} // namespace testing +} // namespace testing OPENTELEMETRY_END_NAMESPACE diff --git a/sdk/test/common/circular_buffer_benchmark.cc b/sdk/test/common/circular_buffer_benchmark.cc index 1d8a1f700e..f99f783bda 100644 --- a/sdk/test/common/circular_buffer_benchmark.cc +++ b/sdk/test/common/circular_buffer_benchmark.cc @@ -8,8 +8,8 @@ #include #include -#include "src/common/circular_buffer.h" #include "sdk/test/common/baseline_circular_buffer.h" +#include "src/common/circular_buffer.h" using opentelemetry::sdk::common::AtomicUniquePtr; using opentelemetry::sdk::common::CircularBuffer; using opentelemetry::sdk::common::CircularBufferRange; @@ -20,21 +20,21 @@ const int N = 10000; //-------------------------------------------------------------------------------------------------- // ConsumeBufferNumbers //-------------------------------------------------------------------------------------------------- -uint64_t ConsumeBufferNumbers( - BaselineCircularBuffer& buffer) noexcept { +uint64_t ConsumeBufferNumbers(BaselineCircularBuffer &buffer) noexcept +{ uint64_t result = 0; - buffer.Consume([&](std::unique_ptr&& x) { + buffer.Consume([&](std::unique_ptr &&x) { result += *x; x.reset(); }); return result; } -uint64_t ConsumeBufferNumbers(CircularBuffer& buffer) noexcept { +uint64_t ConsumeBufferNumbers(CircularBuffer &buffer) noexcept +{ uint64_t result = 0; buffer.Consume( - buffer.size(), [&](CircularBufferRange> & - range) noexcept { + buffer.size(), [&](CircularBufferRange> & range) noexcept { range.ForEach([&](AtomicUniquePtr & ptr) noexcept { result += *ptr; ptr.Reset(); @@ -48,13 +48,15 @@ uint64_t ConsumeBufferNumbers(CircularBuffer& buffer) noexcept { // GenerateNumbersForThread //-------------------------------------------------------------------------------------------------- template -static void GenerateNumbersForThread(Buffer& buffer, int n, - std::atomic& sum) noexcept { +static void GenerateNumbersForThread(Buffer &buffer, int n, std::atomic &sum) noexcept +{ thread_local std::mt19937_64 random_number_generator{std::random_device{}()}; - for (int i = 0; i < n; ++i) { + for (int i = 0; i < n; ++i) + { auto x = random_number_generator(); std::unique_ptr element{new uint64_t{x}}; - if (buffer.Add(element)) { + if (buffer.Add(element)) + { sum += x; } } @@ -64,15 +66,16 @@ static void GenerateNumbersForThread(Buffer& buffer, int n, // GenerateNumbers //-------------------------------------------------------------------------------------------------- template -static uint64_t GenerateNumbers(Buffer& buffer, int num_threads, - int n) noexcept { +static uint64_t GenerateNumbers(Buffer &buffer, int num_threads, int n) noexcept +{ std::atomic sum{0}; std::vector threads(num_threads); - for (auto& thread : threads) { - thread = std::thread{GenerateNumbersForThread, std::ref(buffer), n, - std::ref(sum)}; + for (auto &thread : threads) + { + thread = std::thread{GenerateNumbersForThread, std::ref(buffer), n, std::ref(sum)}; } - for (auto& thread : threads) { + for (auto &thread : threads) + { thread.join(); } return sum; @@ -82,9 +85,10 @@ static uint64_t GenerateNumbers(Buffer& buffer, int num_threads, // ConsumeNumbers //-------------------------------------------------------------------------------------------------- template -static void ConsumeNumbers(Buffer& buffer, uint64_t& sum, - std::atomic& finished) noexcept { - while (!finished) { +static void ConsumeNumbers(Buffer &buffer, uint64_t &sum, std::atomic &finished) noexcept +{ + while (!finished) + { sum += ConsumeBufferNumbers(buffer); } sum += ConsumeBufferNumbers(buffer); @@ -94,15 +98,17 @@ static void ConsumeNumbers(Buffer& buffer, uint64_t& sum, // RunSimulation //-------------------------------------------------------------------------------------------------- template -static void RunSimulation(Buffer& buffer, int num_threads, int n) noexcept { +static void RunSimulation(Buffer &buffer, int num_threads, int n) noexcept +{ std::atomic finished{false}; uint64_t consumer_sum{0}; - std::thread consumer_thread{ConsumeNumbers, std::ref(buffer), - std::ref(consumer_sum), std::ref(finished)}; + std::thread consumer_thread{ConsumeNumbers, std::ref(buffer), std::ref(consumer_sum), + std::ref(finished)}; uint64_t producer_sum = GenerateNumbers(buffer, num_threads, n); - finished = true; + finished = true; consumer_thread.join(); - if (consumer_sum != producer_sum) { + if (consumer_sum != producer_sum) + { std::cerr << "Sumulation failed: consumer_sum != producer_sum\n"; std::terminate(); } @@ -111,12 +117,14 @@ static void RunSimulation(Buffer& buffer, int num_threads, int n) noexcept { //-------------------------------------------------------------------------------------------------- // BM_BaselineBuffer //-------------------------------------------------------------------------------------------------- -static void BM_BaselineBuffer(benchmark::State& state) { +static void BM_BaselineBuffer(benchmark::State &state) +{ const size_t max_elements = 500; - auto num_threads = state.range(0); - const int n = N / num_threads; + auto num_threads = state.range(0); + const int n = N / num_threads; BaselineCircularBuffer buffer{max_elements}; - for (auto _ : state) { + for (auto _ : state) + { RunSimulation(buffer, num_threads, n); } } @@ -126,12 +134,14 @@ BENCHMARK(BM_BaselineBuffer)->Arg(1)->Arg(2)->Arg(4); //-------------------------------------------------------------------------------------------------- // BM_LockFreeBuffer //-------------------------------------------------------------------------------------------------- -static void BM_LockFreeBuffer(benchmark::State& state) { +static void BM_LockFreeBuffer(benchmark::State &state) +{ const size_t max_elements = 500; - auto num_threads = state.range(0); - const int n = N / num_threads; + auto num_threads = state.range(0); + const int n = N / num_threads; CircularBuffer buffer{max_elements}; - for (auto _ : state) { + for (auto _ : state) + { RunSimulation(buffer, num_threads, n); } } diff --git a/sdk/test/common/circular_buffer_test.cc b/sdk/test/common/circular_buffer_test.cc index 973f43f44f..37fbedbad5 100644 --- a/sdk/test/common/circular_buffer_test.cc +++ b/sdk/test/common/circular_buffer_test.cc @@ -11,56 +11,65 @@ using opentelemetry::sdk::common::CircularBufferRange; static thread_local std::mt19937 RandomNumberGenerator{std::random_device{}()}; -static void GenerateRandomNumbers(CircularBuffer& buffer, - std::vector& numbers, int n) { - for (int i = 0; i < n; ++i) { +static void GenerateRandomNumbers(CircularBuffer &buffer, + std::vector &numbers, + int n) +{ + for (int i = 0; i < n; ++i) + { auto value = static_cast(RandomNumberGenerator()); std::unique_ptr x{new uint32_t{value}}; - if (buffer.Add(x)) { + if (buffer.Add(x)) + { numbers.push_back(value); } } } -static void RunNumberProducers(CircularBuffer& buffer, - std::vector& numbers, int num_threads, - int n) { +static void RunNumberProducers(CircularBuffer &buffer, + std::vector &numbers, + int num_threads, + int n) +{ std::vector> thread_numbers(num_threads); std::vector threads(num_threads); - for (int thread_index = 0; thread_index < num_threads; ++thread_index) { - threads[thread_index] = - std::thread{GenerateRandomNumbers, std::ref(buffer), - std::ref(thread_numbers[thread_index]), n}; + for (int thread_index = 0; thread_index < num_threads; ++thread_index) + { + threads[thread_index] = std::thread{GenerateRandomNumbers, std::ref(buffer), + std::ref(thread_numbers[thread_index]), n}; } - for (auto& thread : threads) { + for (auto &thread : threads) + { thread.join(); } - for (int thread_index = 0; thread_index < num_threads; ++thread_index) { + for (int thread_index = 0; thread_index < num_threads; ++thread_index) + { numbers.insert(numbers.end(), thread_numbers[thread_index].begin(), thread_numbers[thread_index].end()); } } -void RunNumberConsumer(CircularBuffer& buffer, - std::atomic& exit, - std::vector& numbers) { - while (true) { +void RunNumberConsumer(CircularBuffer &buffer, + std::atomic &exit, + std::vector &numbers) +{ + while (true) + { auto allotment = buffer.Peek(); - if (exit && allotment.empty()) { + if (exit && allotment.empty()) + { return; } - auto n = std::uniform_int_distribution{ - 0, allotment.size()}(RandomNumberGenerator); - buffer.Consume( - n, [&](CircularBufferRange> range) noexcept { - assert(range.size() == n); - range.ForEach([&](AtomicUniquePtr & ptr) noexcept { - assert(!ptr.IsNull()); - numbers.push_back(*ptr); - ptr.Reset(); - return true; - }); - }); + auto n = std::uniform_int_distribution{0, allotment.size()}(RandomNumberGenerator); + buffer.Consume(n, [&](CircularBufferRange> range) noexcept { + assert(range.size() == n); + range.ForEach([&](AtomicUniquePtr & ptr) noexcept { + assert(!ptr.IsNull()); + numbers.push_back(*ptr); + ptr.Reset(); + return true; + }); + }); } } @@ -104,37 +113,39 @@ TEST(CircularBufferTest, AddOnFull) EXPECT_EQ(*x, 33); } -TEST(CircularBufferTest, Consume) { +TEST(CircularBufferTest, Consume) +{ CircularBuffer buffer{10}; - for (int i = 0; i < static_cast(buffer.max_size()); ++i) { - std::unique_ptr x{new int{i}}; - EXPECT_TRUE(buffer.Add(x)); - } - int count = 0; - buffer.Consume( - 5, [&](CircularBufferRange> range) noexcept { - range.ForEach([&](AtomicUniquePtr &ptr) { - EXPECT_EQ(*ptr, count++); - ptr.Reset(); - return true; - }); - }); - EXPECT_EQ(count, 5); + for (int i = 0; i < static_cast(buffer.max_size()); ++i) + { + std::unique_ptr x{new int{i}}; + EXPECT_TRUE(buffer.Add(x)); + } + int count = 0; + buffer.Consume(5, [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + EXPECT_EQ(*ptr, count++); + ptr.Reset(); + return true; + }); + }); + EXPECT_EQ(count, 5); } -TEST(CircularBufferTest, Simulation) { +TEST(CircularBufferTest, Simulation) +{ const int num_producer_threads = 4; - const int n = 25000; - for (size_t max_size : {1, 2, 10, 50, 100, 1000}) { + const int n = 25000; + for (size_t max_size : {1, 2, 10, 50, 100, 1000}) + { CircularBuffer buffer{max_size}; std::vector producer_numbers; std::vector consumer_numbers; - auto producers = - std::thread{RunNumberProducers, std::ref(buffer), - std::ref(producer_numbers), num_producer_threads, n}; + auto producers = std::thread{RunNumberProducers, std::ref(buffer), std::ref(producer_numbers), + num_producer_threads, n}; std::atomic exit{false}; - auto consumer = std::thread{RunNumberConsumer, std::ref(buffer), - std::ref(exit), std::ref(consumer_numbers)}; + auto consumer = std::thread{RunNumberConsumer, std::ref(buffer), std::ref(exit), + std::ref(consumer_numbers)}; producers.join(); exit = true; consumer.join(); From dd98471fce8f1ae75dede2928035d3fe7e7c3e41 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 18 Mar 2020 16:50:17 -0700 Subject: [PATCH 05/12] Add cmake support --- sdk/test/CMakeLists.txt | 1 + sdk/test/common/BUILD | 1 + sdk/test/common/CMakeLists.txt | 10 ++++++++++ sdk/test/common/circular_buffer_benchmark.cc | 2 +- 4 files changed, 13 insertions(+), 1 deletion(-) create mode 100644 sdk/test/common/CMakeLists.txt diff --git a/sdk/test/CMakeLists.txt b/sdk/test/CMakeLists.txt index 2cea47fa7b..8c8b199946 100644 --- a/sdk/test/CMakeLists.txt +++ b/sdk/test/CMakeLists.txt @@ -1 +1,2 @@ +add_subdirectory(common) add_subdirectory(trace) diff --git a/sdk/test/common/BUILD b/sdk/test/common/BUILD index e5351a6da3..eb8e806463 100644 --- a/sdk/test/common/BUILD +++ b/sdk/test/common/BUILD @@ -38,6 +38,7 @@ cc_library( hdrs = [ "baseline_circular_buffer.h", ], + include_prefix = "test/common", deps = [ "//api", ], diff --git a/sdk/test/common/CMakeLists.txt b/sdk/test/common/CMakeLists.txt new file mode 100644 index 0000000000..c3c69169be --- /dev/null +++ b/sdk/test/common/CMakeLists.txt @@ -0,0 +1,10 @@ +foreach(testname atomic_unique_ptr_test circular_buffer_range_test circular_buffer_test) + add_executable(${testname} "${testname}.cc") + target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} + ${CMAKE_THREAD_LIBS_INIT} opentelemetry_trace) + gtest_add_tests(TARGET ${testname} TEST_PREFIX trace. TEST_LIST ${testname}) +endforeach() + +add_executable(circular_buffer_benchmark circular_buffer_benchmark.cc) +target_link_libraries(circular_buffer_benchmark benchmark::benchmark + ${CMAKE_THREAD_LIBS_INIT} opentelemetry_api) diff --git a/sdk/test/common/circular_buffer_benchmark.cc b/sdk/test/common/circular_buffer_benchmark.cc index f99f783bda..30f530adfd 100644 --- a/sdk/test/common/circular_buffer_benchmark.cc +++ b/sdk/test/common/circular_buffer_benchmark.cc @@ -8,7 +8,7 @@ #include #include -#include "sdk/test/common/baseline_circular_buffer.h" +#include "test/common/baseline_circular_buffer.h" #include "src/common/circular_buffer.h" using opentelemetry::sdk::common::AtomicUniquePtr; using opentelemetry::sdk::common::CircularBuffer; From 4f758ee86ca2de42cbac147df8e4ba2f73e7db62 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 18 Mar 2020 17:46:45 -0700 Subject: [PATCH 06/12] Reformat --- sdk/src/common/circular_buffer.h | 13 ++++---- sdk/test/common/CMakeLists.txt | 3 +- sdk/test/common/circular_buffer_benchmark.cc | 30 ++--------------- sdk/test/common/circular_buffer_test.cc | 34 +++++++++++--------- 4 files changed, 30 insertions(+), 50 deletions(-) diff --git a/sdk/src/common/circular_buffer.h b/sdk/src/common/circular_buffer.h index 9b690669fc..8f14af6f96 100644 --- a/sdk/src/common/circular_buffer.h +++ b/sdk/src/common/circular_buffer.h @@ -63,12 +63,13 @@ class CircularBuffer */ void Consume(size_t n) noexcept { - Consume(n, [](CircularBufferRange> & range) noexcept { - range.ForEach([](AtomicUniquePtr & ptr) noexcept { - ptr.Reset(); - return true; - }); - }); + Consume( + n, [](CircularBufferRange> & range) noexcept { + range.ForEach([](AtomicUniquePtr & ptr) noexcept { + ptr.Reset(); + return true; + }); + }); } /** diff --git a/sdk/test/common/CMakeLists.txt b/sdk/test/common/CMakeLists.txt index c3c69169be..8071722f69 100644 --- a/sdk/test/common/CMakeLists.txt +++ b/sdk/test/common/CMakeLists.txt @@ -1,4 +1,5 @@ -foreach(testname atomic_unique_ptr_test circular_buffer_range_test circular_buffer_test) +foreach(testname atomic_unique_ptr_test circular_buffer_range_test + circular_buffer_test) add_executable(${testname} "${testname}.cc") target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} opentelemetry_trace) diff --git a/sdk/test/common/circular_buffer_benchmark.cc b/sdk/test/common/circular_buffer_benchmark.cc index 30f530adfd..568cb99174 100644 --- a/sdk/test/common/circular_buffer_benchmark.cc +++ b/sdk/test/common/circular_buffer_benchmark.cc @@ -8,8 +8,8 @@ #include #include -#include "test/common/baseline_circular_buffer.h" #include "src/common/circular_buffer.h" +#include "test/common/baseline_circular_buffer.h" using opentelemetry::sdk::common::AtomicUniquePtr; using opentelemetry::sdk::common::CircularBuffer; using opentelemetry::sdk::common::CircularBufferRange; @@ -17,10 +17,7 @@ using opentelemetry::testing::BaselineCircularBuffer; const int N = 10000; -//-------------------------------------------------------------------------------------------------- -// ConsumeBufferNumbers -//-------------------------------------------------------------------------------------------------- -uint64_t ConsumeBufferNumbers(BaselineCircularBuffer &buffer) noexcept +static uint64_t ConsumeBufferNumbers(BaselineCircularBuffer &buffer) noexcept { uint64_t result = 0; buffer.Consume([&](std::unique_ptr &&x) { @@ -30,7 +27,7 @@ uint64_t ConsumeBufferNumbers(BaselineCircularBuffer &buffer) noexcept return result; } -uint64_t ConsumeBufferNumbers(CircularBuffer &buffer) noexcept +static uint64_t ConsumeBufferNumbers(CircularBuffer &buffer) noexcept { uint64_t result = 0; buffer.Consume( @@ -44,9 +41,6 @@ uint64_t ConsumeBufferNumbers(CircularBuffer &buffer) noexcept return result; } -//-------------------------------------------------------------------------------------------------- -// GenerateNumbersForThread -//-------------------------------------------------------------------------------------------------- template static void GenerateNumbersForThread(Buffer &buffer, int n, std::atomic &sum) noexcept { @@ -62,9 +56,6 @@ static void GenerateNumbersForThread(Buffer &buffer, int n, std::atomic static uint64_t GenerateNumbers(Buffer &buffer, int num_threads, int n) noexcept { @@ -81,9 +72,6 @@ static uint64_t GenerateNumbers(Buffer &buffer, int num_threads, int n) noexcept return sum; } -//-------------------------------------------------------------------------------------------------- -// ConsumeNumbers -//-------------------------------------------------------------------------------------------------- template static void ConsumeNumbers(Buffer &buffer, uint64_t &sum, std::atomic &finished) noexcept { @@ -94,9 +82,6 @@ static void ConsumeNumbers(Buffer &buffer, uint64_t &sum, std::atomic &fin sum += ConsumeBufferNumbers(buffer); } -//-------------------------------------------------------------------------------------------------- -// RunSimulation -//-------------------------------------------------------------------------------------------------- template static void RunSimulation(Buffer &buffer, int num_threads, int n) noexcept { @@ -114,9 +99,6 @@ static void RunSimulation(Buffer &buffer, int num_threads, int n) noexcept } } -//-------------------------------------------------------------------------------------------------- -// BM_BaselineBuffer -//-------------------------------------------------------------------------------------------------- static void BM_BaselineBuffer(benchmark::State &state) { const size_t max_elements = 500; @@ -131,9 +113,6 @@ static void BM_BaselineBuffer(benchmark::State &state) BENCHMARK(BM_BaselineBuffer)->Arg(1)->Arg(2)->Arg(4); -//-------------------------------------------------------------------------------------------------- -// BM_LockFreeBuffer -//-------------------------------------------------------------------------------------------------- static void BM_LockFreeBuffer(benchmark::State &state) { const size_t max_elements = 500; @@ -148,7 +127,4 @@ static void BM_LockFreeBuffer(benchmark::State &state) BENCHMARK(BM_LockFreeBuffer)->Arg(1)->Arg(2)->Arg(4); -//-------------------------------------------------------------------------------------------------- -// BENCHMARK_MAIN -//-------------------------------------------------------------------------------------------------- BENCHMARK_MAIN(); diff --git a/sdk/test/common/circular_buffer_test.cc b/sdk/test/common/circular_buffer_test.cc index 37fbedbad5..7b5fc1c0f2 100644 --- a/sdk/test/common/circular_buffer_test.cc +++ b/sdk/test/common/circular_buffer_test.cc @@ -61,15 +61,16 @@ void RunNumberConsumer(CircularBuffer &buffer, return; } auto n = std::uniform_int_distribution{0, allotment.size()}(RandomNumberGenerator); - buffer.Consume(n, [&](CircularBufferRange> range) noexcept { - assert(range.size() == n); - range.ForEach([&](AtomicUniquePtr & ptr) noexcept { - assert(!ptr.IsNull()); - numbers.push_back(*ptr); - ptr.Reset(); - return true; - }); - }); + buffer.Consume( + n, [&](CircularBufferRange> range) noexcept { + assert(range.size() == n); + range.ForEach([&](AtomicUniquePtr & ptr) noexcept { + assert(!ptr.IsNull()); + numbers.push_back(*ptr); + ptr.Reset(); + return true; + }); + }); } } @@ -122,13 +123,14 @@ TEST(CircularBufferTest, Consume) EXPECT_TRUE(buffer.Add(x)); } int count = 0; - buffer.Consume(5, [&](CircularBufferRange> range) noexcept { - range.ForEach([&](AtomicUniquePtr &ptr) { - EXPECT_EQ(*ptr, count++); - ptr.Reset(); - return true; - }); - }); + buffer.Consume( + 5, [&](CircularBufferRange> range) noexcept { + range.ForEach([&](AtomicUniquePtr &ptr) { + EXPECT_EQ(*ptr, count++); + ptr.Reset(); + return true; + }); + }); EXPECT_EQ(count, 5); } From 730b9ffbd35e89557bd559690f464a2ed1452c6a Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 18 Mar 2020 17:49:40 -0700 Subject: [PATCH 07/12] Fix for gcc48 --- sdk/src/common/atomic_unique_ptr.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/common/atomic_unique_ptr.h b/sdk/src/common/atomic_unique_ptr.h index 2d57a2a2ae..49b594f48a 100644 --- a/sdk/src/common/atomic_unique_ptr.h +++ b/sdk/src/common/atomic_unique_ptr.h @@ -18,7 +18,7 @@ template class AtomicUniquePtr { public: - AtomicUniquePtr() noexcept = default; + AtomicUniquePtr() noexcept {} ~AtomicUniquePtr() noexcept { Reset(); } From de2aa291bf7cd57a55f2ef416d4c78c173ed6ff4 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 18 Mar 2020 21:38:24 -0700 Subject: [PATCH 08/12] Fix typos --- sdk/src/common/atomic_unique_ptr.h | 2 +- sdk/test/common/baseline_circular_buffer.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/src/common/atomic_unique_ptr.h b/sdk/src/common/atomic_unique_ptr.h index 49b594f48a..c737532e2a 100644 --- a/sdk/src/common/atomic_unique_ptr.h +++ b/sdk/src/common/atomic_unique_ptr.h @@ -59,7 +59,7 @@ class AtomicUniquePtr * Atomically swap the pointer with another. * @param ptr the pointer to swap with */ - void Swap(std::unique_ptr &ptr) noexcept { ptr.reset(ptr_.exchange(ptr.release())); } + void Swap(std::unique_ptr &other) noexcept { ptr.reset(ptr_.exchange(other.release())); } /** * Set the pointer to a new value and delete the current value if non-null. diff --git a/sdk/test/common/baseline_circular_buffer.h b/sdk/test/common/baseline_circular_buffer.h index f3f591ad9b..4c77990844 100644 --- a/sdk/test/common/baseline_circular_buffer.h +++ b/sdk/test/common/baseline_circular_buffer.h @@ -30,7 +30,7 @@ class BaselineCircularBuffer bool Add(std::unique_ptr &&element) noexcept { - std::lock_guard lock_gaurd{mutex_}; + std::lock_guard lock_guard{mutex_}; if (tail_ + data_.size() == head_) { return false; From 8971e384a16fe27cd9d06bcce7bfb3bc29ef8636 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Wed, 18 Mar 2020 21:40:33 -0700 Subject: [PATCH 09/12] Fix misname --- sdk/src/common/atomic_unique_ptr.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/common/atomic_unique_ptr.h b/sdk/src/common/atomic_unique_ptr.h index c737532e2a..1fdfee618c 100644 --- a/sdk/src/common/atomic_unique_ptr.h +++ b/sdk/src/common/atomic_unique_ptr.h @@ -59,7 +59,7 @@ class AtomicUniquePtr * Atomically swap the pointer with another. * @param ptr the pointer to swap with */ - void Swap(std::unique_ptr &other) noexcept { ptr.reset(ptr_.exchange(other.release())); } + void Swap(std::unique_ptr &other) noexcept { other.reset(ptr_.exchange(other.release())); } /** * Set the pointer to a new value and delete the current value if non-null. From 1db19c043e4b2405754f48528cb35883619e12ab Mon Sep 17 00:00:00 2001 From: Ryan Date: Wed, 18 Mar 2020 22:29:23 -0700 Subject: [PATCH 10/12] Update sdk/src/common/circular_buffer.h Co-Authored-By: Reiley Yang --- sdk/src/common/circular_buffer.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/src/common/circular_buffer.h b/sdk/src/common/circular_buffer.h index 8f14af6f96..a5a104bc34 100644 --- a/sdk/src/common/circular_buffer.h +++ b/sdk/src/common/circular_buffer.h @@ -38,7 +38,7 @@ class CircularBuffer /** * Consume elements from the circular buffer's tail. * @param n the number of elements to consume - * @param callback the callback to invoke with a AtomicUniquePtr to each + * @param callback the callback to invoke with an AtomicUniquePtr to each * consumed element. * * Note: The callback must set the passed AtomicUniquePtr to null. From ed5aaebda5c2f63f8e9185bf345f1edf9d737e33 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Fri, 3 Apr 2020 18:17:12 -0700 Subject: [PATCH 11/12] Reformat --- sdk/src/common/platform/BUILD | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/src/common/platform/BUILD b/sdk/src/common/platform/BUILD index 9d5eff0604..c96a1417a0 100644 --- a/sdk/src/common/platform/BUILD +++ b/sdk/src/common/platform/BUILD @@ -24,11 +24,11 @@ cc_library( "fork.h", ], include_prefix = "src/common/platform", - deps = [ - "//api", - ], linkopts = select({ "//bazel:windows": [], "//conditions:default": ["-pthread"], }), + deps = [ + "//api", + ], ) From 40198fa860dfe85f92d0d9279e1f8286b7b531c8 Mon Sep 17 00:00:00 2001 From: Ryan Burn Date: Fri, 3 Apr 2020 18:28:01 -0700 Subject: [PATCH 12/12] Fix cmake linkage --- sdk/test/common/CMakeLists.txt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sdk/test/common/CMakeLists.txt b/sdk/test/common/CMakeLists.txt index 24e153a304..7a181c90c4 100644 --- a/sdk/test/common/CMakeLists.txt +++ b/sdk/test/common/CMakeLists.txt @@ -2,8 +2,9 @@ foreach(testname random_test fast_random_number_generator_test atomic_unique_ptr_test circular_buffer_range_test circular_buffer_test) add_executable(${testname} "${testname}.cc") - target_link_libraries(${testname} ${GTEST_BOTH_LIBRARIES} - ${CMAKE_THREAD_LIBS_INIT} opentelemetry_trace) + target_link_libraries( + ${testname} ${GTEST_BOTH_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} + opentelemetry_common opentelemetry_trace) gtest_add_tests(TARGET ${testname} TEST_PREFIX trace. TEST_LIST ${testname}) endforeach()