diff --git a/LICENSE.txt b/LICENSE.txt index bd0770bbc7874d..4c9f595cc008a6 100644 --- a/LICENSE.txt +++ b/LICENSE.txt @@ -647,3 +647,15 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------------------- + +be/src/util/histogram* : GPLv2, Apache 2.0 License + +Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +This source code is licensed under both the GPLv2 (found in the +COPYING file in the root directory) and Apache 2.0 License +(found in the LICENSE.Apache file in the root directory). + +Copyright (c) 2011 The LevelDB Authors. All rights reserved. +Use of this source code is governed by a BSD-style license that can be +found in the LICENSE file. See the AUTHORS file for names of contributors. diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index adb21a66980f4f..a149e08943e4ed 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -37,6 +37,7 @@ set(UTIL_FILES disk_info.cpp errno.cpp hash_util.hpp + histogram.cpp json_util.cpp doris_metrics.cpp mem_info.cpp diff --git a/be/src/util/histogram.cpp b/be/src/util/histogram.cpp new file mode 100644 index 00000000000000..b3397114b5b98b --- /dev/null +++ b/be/src/util/histogram.cpp @@ -0,0 +1,230 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/histogram.h" + +#include +#include +#include +#include + +namespace doris { + +HistogramBucketMapper::HistogramBucketMapper() { + // If you change this, you also need to change + // size of array buckets_ in HistogramStat + _bucket_values = {1, 2}; + _value_index_map = {{1, 0}, {2, 1}}; + double bucket_val = static_cast(_bucket_values.back()); + while ((bucket_val = 1.5 * bucket_val) <= static_cast(std::numeric_limits::max())) { + _bucket_values.push_back(static_cast(bucket_val)); + // Extracts two most significant digits to make histogram buckets more + // human-readable. E.g., 172 becomes 170. + uint64_t pow_of_ten = 1; + while (_bucket_values.back() / 10 > 10) { + _bucket_values.back() /= 10; + pow_of_ten *= 10; + } + _bucket_values.back() *= pow_of_ten; + _value_index_map[_bucket_values.back()] = _bucket_values.size() - 1; + } + _max_bucket_value = _bucket_values.back(); + _min_bucket_value = _bucket_values.front(); +} + +size_t HistogramBucketMapper::index_for_value(const uint64_t& value) const { + if (value >= _max_bucket_value) { + return _bucket_values.size() - 1; + } else if (value >= _min_bucket_value) { + std::map::const_iterator lowerBound = + _value_index_map.lower_bound(value); + if (lowerBound != _value_index_map.end()) { + return static_cast(lowerBound->second); + } else { + return 0; + } + } else { + return 0; + } +} + +namespace { + const HistogramBucketMapper bucket_mapper; +} + +HistogramStat::HistogramStat() : _num_buckets(bucket_mapper.bucket_count()) { + DCHECK(_num_buckets == sizeof(_buckets) / sizeof(*_buckets)); + clear(); +} + +void HistogramStat::clear() { + _min.store(bucket_mapper.last_value(), std::memory_order_relaxed); + _max.store(0, std::memory_order_relaxed); + _num.store(0, std::memory_order_relaxed); + _sum.store(0, std::memory_order_relaxed); + _sum_squares.store(0, std::memory_order_relaxed); + for (unsigned int b = 0; b < _num_buckets; b++) { + _buckets[b].store(0, std::memory_order_relaxed); + } +}; + +bool HistogramStat::is_empty() const { return num() == 0; } + +void HistogramStat::add(const uint64_t& value) { + // This function is designed to be lock free, as it's in the critical path + // of any operation. Each individual value is atomic and the order of updates + // by concurrent threads is tolerable. + const size_t index = bucket_mapper.index_for_value(value); + DCHECK(index < _num_buckets); + _buckets[index].store(_buckets[index].load(std::memory_order_relaxed) + 1, + std::memory_order_relaxed); + + uint64_t old_min = min(); + if (value < old_min) { + _min.store(value, std::memory_order_relaxed); + } + + uint64_t old_max = max(); + if (value > old_max) { + _max.store(value, std::memory_order_relaxed); + } + + _num.store(_num.load(std::memory_order_relaxed) + 1, + std::memory_order_relaxed); + _sum.store(_sum.load(std::memory_order_relaxed) + value, + std::memory_order_relaxed); + _sum_squares.store( + _sum_squares.load(std::memory_order_relaxed) + value * value, + std::memory_order_relaxed); +} + +void HistogramStat::merge(const HistogramStat& other) { + // This function needs to be performned with the outer lock acquired + // However, atomic operation on every member is still need, since Add() + // requires no lock and value update can still happen concurrently + uint64_t old_min = min(); + uint64_t other_min = other.min(); + while (other_min < old_min && + !_min.compare_exchange_weak(old_min, other_min)) {} + + uint64_t old_max = max(); + uint64_t other_max = other.max(); + while (other_max > old_max && + !_max.compare_exchange_weak(old_max, other_max)) {} + + _num.fetch_add(other.num(), std::memory_order_relaxed); + _sum.fetch_add(other.sum(), std::memory_order_relaxed); + _sum_squares.fetch_add(other.sum_squares(), std::memory_order_relaxed); + for (unsigned int b = 0; b < _num_buckets; b++) { + _buckets[b].fetch_add(other.bucket_at(b), std::memory_order_relaxed); + } +} + +double HistogramStat::median() const { + return percentile(50.0); +} + +double HistogramStat::percentile(double p) const { + double threshold = num() * (p / 100.0); + uint64_t cumulative_sum = 0; + for (unsigned int b = 0; b < _num_buckets; b++) { + uint64_t bucket_value = bucket_at(b); + cumulative_sum += bucket_value; + if (cumulative_sum >= threshold) { + // Scale linearly within this bucket + uint64_t left_point = (b == 0) ? 0 : bucket_mapper.bucket_limit(b-1); + uint64_t right_point = bucket_mapper.bucket_limit(b); + uint64_t left_sum = cumulative_sum - bucket_value; + uint64_t right_sum = cumulative_sum; + double pos = 0; + uint64_t right_left_diff = right_sum - left_sum; + if (right_left_diff != 0) { + pos = (threshold - left_sum) / right_left_diff; + } + double r = left_point + (right_point - left_point) * pos; + uint64_t cur_min = min(); + uint64_t cur_max = max(); + if (r < cur_min) r = static_cast(cur_min); + if (r > cur_max) r = static_cast(cur_max); + return r; + } + } + return static_cast(max()); +} + +double HistogramStat::average() const { + uint64_t cur_num = num(); + uint64_t cur_sum = sum(); + if (cur_num == 0) return 0; + return static_cast(cur_sum) / static_cast(cur_num); +} + +double HistogramStat::standard_deviation() const { + uint64_t cur_num = num(); + uint64_t cur_sum = sum(); + uint64_t cur_sum_squares = sum_squares(); + if (cur_num == 0) return 0; + double variance = + static_cast(cur_sum_squares * cur_num - cur_sum * cur_sum) / + static_cast(cur_num * cur_num); + return std::sqrt(variance); +} +std::string HistogramStat::to_string() const { + uint64_t cur_num = num(); + std::string r; + char buf[1650]; + snprintf(buf, sizeof(buf), + "Count: %" PRIu64 " Average: %.4f StdDev: %.2f\n", + cur_num, average(), standard_deviation()); + r.append(buf); + snprintf(buf, sizeof(buf), + "Min: %" PRIu64 " Median: %.4f Max: %" PRIu64 "\n", + (cur_num == 0 ? 0 : min()), median(), (cur_num == 0 ? 0 : max())); + r.append(buf); + snprintf(buf, sizeof(buf), + "Percentiles: " + "P50: %.2f P75: %.2f P99: %.2f P99.9: %.2f P99.99: %.2f\n", + percentile(50), percentile(75), percentile(99), percentile(99.9), + percentile(99.99)); + r.append(buf); + r.append("------------------------------------------------------\n"); + if (cur_num == 0) return r; // all buckets are empty + const double mult = 100.0 / cur_num; + uint64_t cumulative_sum = 0; + for (unsigned int b = 0; b < _num_buckets; b++) { + uint64_t bucket_value = bucket_at(b); + if (bucket_value <= 0.0) continue; + cumulative_sum += bucket_value; + snprintf(buf, sizeof(buf), + "%c %7" PRIu64 ", %7" PRIu64 " ] %8" PRIu64 " %7.3f%% %7.3f%% ", + (b == 0) ? '[' : '(', + (b == 0) ? 0 : bucket_mapper.bucket_limit(b-1), // left + bucket_mapper.bucket_limit(b), // right + bucket_value, // count + (mult * bucket_value), // percentage + (mult * cumulative_sum)); // cumulative percentage + r.append(buf); + + // Add hash marks based on percentage; 20 marks for 100%. + size_t marks = static_cast(mult * bucket_value / 5 + 0.5); + r.append(marks, '#'); + r.push_back('\n'); + } + return r; +} + +} // namespace doris diff --git a/be/src/util/histogram.h b/be/src/util/histogram.h new file mode 100644 index 00000000000000..0a7ea47726dd78 --- /dev/null +++ b/be/src/util/histogram.h @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include "common/logging.h" + +namespace doris { + +// Histogram data structure implementation: +// +// After construction, the 'value_index_map' will be set to: +// +// BucketValue: | 1 | 2 | 2*1.5 |2*1.5^2|2*1.5^3| ... |2*1.5^n| ... |UINT64MAX| +// Index: | 0 | 1 | 2 | 3 | 4 | ... | n-1 | ... | 108 | +// +// The width of bucket is growing by 1.5 times and its initial values are 1 and 2. +// The input value will be add to the bucket which lower bound is just greater than +// input value. For example, input value A > 2*1.5^n and A <= 2*1.5^(n+1), A will be added +// to the latter bucket. +class HistogramBucketMapper { +public: + HistogramBucketMapper(); + + // converts a value to the bucket index. + size_t index_for_value(const uint64_t& value) const; + // number of buckets required. + + size_t bucket_count() const { + return _bucket_values.size(); + } + + uint64_t last_value() const { + return _max_bucket_value; + } + + uint64_t first_value() const { + return _min_bucket_value; + } + + uint64_t bucket_limit(const size_t bucket_number) const { + DCHECK(bucket_number < bucket_count()); + return _bucket_values[bucket_number]; + } + +private: + std::vector _bucket_values; + uint64_t _max_bucket_value; + uint64_t _min_bucket_value; + std::map _value_index_map; +}; + +struct HistogramStat { + HistogramStat(); + ~HistogramStat() {} + + HistogramStat(const HistogramStat&) = delete; + HistogramStat& operator=(const HistogramStat&) = delete; + + void clear(); + bool is_empty() const; + void add(const uint64_t& value); + void merge(const HistogramStat& other); + + inline uint64_t min() const { return _min.load(std::memory_order_relaxed); } + inline uint64_t max() const { return _max.load(std::memory_order_relaxed); } + inline uint64_t num() const { return _num.load(std::memory_order_relaxed); } + inline uint64_t sum() const { return _sum.load(std::memory_order_relaxed); } + inline uint64_t sum_squares() const { + return _sum_squares.load(std::memory_order_relaxed); + } + inline uint64_t bucket_at(size_t b) const { + return _buckets[b].load(std::memory_order_relaxed); + } + + double median() const; + double percentile(double p) const; + double average() const; + double standard_deviation() const; + std::string to_string() const; + + // To be able to use HistogramStat as thread local variable, it + // cannot have dynamic allocated member. That's why we're + // using manually values from BucketMapper + std::atomic _min; + std::atomic _max; + std::atomic _num; + std::atomic _sum; + std::atomic _sum_squares; + std::atomic _buckets[109]; // 109==BucketMapper::bucket_count() + const uint64_t _num_buckets; +}; + +} // namespace doris diff --git a/be/src/util/metrics.cpp b/be/src/util/metrics.cpp index 19e265f81d5025..402b169a2e2322 100644 --- a/be/src/util/metrics.cpp +++ b/be/src/util/metrics.cpp @@ -102,6 +102,65 @@ std::string labels_to_string(const Labels& entity_labels, const Labels& metric_l return ss.str(); } +void HistogramMetric::clear() { + std::lock_guard l(_lock); + _stats.clear(); +} + +bool HistogramMetric::is_empty() const { + return _stats.is_empty(); +} + +void HistogramMetric::add(const uint64_t& value) { + _stats.add(value); +} + +void HistogramMetric::merge(const HistogramMetric& other) { + std::lock_guard l(_lock); + _stats.merge(other._stats); +} + +double HistogramMetric::median() const { + return _stats.median(); +} + +double HistogramMetric::percentile(double p) const { + return _stats.percentile(p); +} + +double HistogramMetric::average() const { + return _stats.average(); +} + +double HistogramMetric::standard_deviation() const { + return _stats.standard_deviation(); +} + +std::string HistogramMetric::to_string() const { + return _stats.to_string(); +} + +rj::Value HistogramMetric::to_json_value() const { + rj::Document document; + rj::Document::AllocatorType& allocator = document.GetAllocator(); + rj::Value json_value(rj::kObjectType); + + json_value.AddMember("total_count", rj::Value(_stats.num()), allocator); + json_value.AddMember("min", rj::Value(_stats.min()), allocator); + json_value.AddMember("average", rj::Value(_stats.average()), allocator); + json_value.AddMember("median", rj::Value(_stats.median()), allocator); + json_value.AddMember("percentile_75", rj::Value(_stats.percentile(75.0)), allocator); + json_value.AddMember("percentile_95", rj::Value(_stats.percentile(95)), allocator); + json_value.AddMember("percentile_99", rj::Value(_stats.percentile(99)), allocator); + json_value.AddMember("percentile_99_9", rj::Value(_stats.percentile(99.9)), allocator); + json_value.AddMember("percentile_99_99", rj::Value(_stats.percentile(99.99)), allocator); + json_value.AddMember("standard_deviation", rj::Value(_stats.standard_deviation()), allocator); + json_value.AddMember("max", rj::Value(_stats.max()), allocator); + json_value.AddMember("total_sum", rj::Value(_stats.sum()), allocator); + + return json_value; +} + std::string MetricPrototype::simple_name() const { return group_name.empty() ? name : group_name; } diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h index 6d88621dee7c03..732f532e3b8d35 100644 --- a/be/src/util/metrics.h +++ b/be/src/util/metrics.h @@ -33,6 +33,7 @@ #include "common/config.h" #include "util/core_local.h" #include "util/spinlock.h" +#include "util/histogram.h" namespace doris { @@ -159,6 +160,35 @@ class CoreLocalCounter : public Metric { CoreLocalValue _value; }; +class HistogramMetric : public Metric { +public: + HistogramMetric() {} + virtual ~HistogramMetric() {} + + HistogramMetric(const HistogramMetric&) = delete; + HistogramMetric& operator=(const HistogramMetric&) = delete; + + void clear(); + bool is_empty() const; + void add(const uint64_t& value); + void merge(const HistogramMetric& other); + + uint64_t min() const { return _stats.min(); } + uint64_t max() const { return _stats.max(); } + uint64_t num() const { return _stats.num(); } + uint64_t sum() const { return _stats.sum(); } + double median() const; + double percentile(double p) const; + double average() const; + double standard_deviation() const; + std::string to_string() const override; + rj::Value to_json_value() const override; + +protected: + mutable SpinLock _lock; + HistogramStat _stats; +}; + template class AtomicCounter : public AtomicMetric { public: diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index df3c9f87be0457..25d3122d158682 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -66,3 +66,4 @@ ADD_BE_TEST(threadpool_test) ADD_BE_TEST(trace_test) ADD_BE_TEST(easy_json-test) ADD_BE_TEST(http_channel_test) +ADD_BE_TEST(histogram_test) diff --git a/be/test/util/histogram_test.cpp b/be/test/util/histogram_test.cpp new file mode 100644 index 00000000000000..b883ca7a9297f1 --- /dev/null +++ b/be/test/util/histogram_test.cpp @@ -0,0 +1,100 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/histogram.h" + +#include +#include + +namespace doris { + +class HistogramTest : public testing::Test { +public: + HistogramTest() {} + virtual ~HistogramTest() {} +}; + +namespace { + const HistogramBucketMapper bucket_mapper; + const double delta = 0.1; +} + +void populate_histogram(HistogramStat& hist, uint64_t low, + uint64_t high, uint64_t loop = 1) { + for (; loop > 0; loop--) { + for (uint64_t i = low; i <= high; i++) { + hist.add(i); + } + } +} + +TEST_F(HistogramTest, Normal) { + HistogramStat hist; + ASSERT_TRUE(hist.is_empty()); + populate_histogram(hist, 1, 110, 10); + ASSERT_EQ(hist.num(), 1100); + + ASSERT_LE(fabs(hist.percentile(100.0) - 110.0), delta); + ASSERT_LE(fabs(hist.percentile(99.0) - 108.9), delta); + ASSERT_LE(fabs(hist.percentile(95.0) - 104.5), delta); + ASSERT_LE(fabs(hist.median() - 55.0), delta); + ASSERT_EQ(hist.average(), 55.5); +} + +TEST_F(HistogramTest, Merge) { + HistogramStat hist; + HistogramStat other; + + populate_histogram(hist, 1, 100); + populate_histogram(other, 101, 250); + hist.merge(other); + + ASSERT_LE(fabs(hist.percentile(100.0) - 250.0), delta); + ASSERT_LE(fabs(hist.percentile(99.0) - 247.5), delta); + ASSERT_LE(fabs(hist.percentile(95.0) - 237.5), delta); + ASSERT_LE(fabs(hist.median() - 125.0), delta); + ASSERT_EQ(hist.average(), 125.5); +} + +TEST_F(HistogramTest, Empty) { + HistogramStat hist; + ASSERT_EQ(hist.min(), bucket_mapper.last_value()); + ASSERT_EQ(hist.max(), 0); + ASSERT_EQ(hist.num(), 0); + ASSERT_EQ(hist.median(), 0.0); + ASSERT_EQ(hist.percentile(85.0), 0.0); + ASSERT_EQ(hist.average(), 0.0); + ASSERT_EQ(hist.standard_deviation(), 0.0); +} + +TEST_F(HistogramTest, Clear) { + HistogramStat hist; + populate_histogram(hist, 1, 100); + + hist.clear(); + ASSERT_TRUE(hist.is_empty()); + ASSERT_EQ(hist.median(), 0); + ASSERT_EQ(hist.percentile(85.0), 0.0); + ASSERT_EQ(hist.average(), 0.0); +} + +} // namespace doris + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}