From 4bb5b4c00a58412600c753a5b3abfb6552686db1 Mon Sep 17 00:00:00 2001 From: Stefan Sundin Date: Sun, 22 Dec 2019 20:58:02 -0800 Subject: [PATCH 1/5] Add :most_recent aggregation to DirectFileStore This reports the value that was set by a process most recently. The way this works is by tagging each value in the files with the timestamp of when they were set. For all existing aggregations, we ignore that timestamp and do what we've been doing so far. For `:most_recent`, we take the "maximum" entry according to its timestamp (i.e. the latest) and then return its value Signed-off-by: Stefan Sundin Signed-off-by: Daniel Magliola --- README.md | 10 ++-- .../client/data_stores/direct_file_store.rb | 52 ++++++++++++------- .../data_stores/direct_file_store_spec.rb | 35 +++++++++++++ 3 files changed, 72 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index 9be8a520..e19c1f41 100644 --- a/README.md +++ b/README.md @@ -313,9 +313,9 @@ When instantiating metrics, there is an optional `store_settings` attribute. Thi to set up store-specific settings for each metric. For most stores, this is not used, but for multi-process stores, this is used to specify how to aggregate the values of each metric across multiple processes. For the most part, this is used for Gauges, to specify -whether you want to report the `SUM`, `MAX` or `MIN` value observed across all processes. -For almost all other cases, you'd leave the default (`SUM`). More on this on the -*Aggregation* section below. +whether you want to report the `SUM`, `MAX`, `MIN`, or `MOST_RECENT` value observed across +all processes. For almost all other cases, you'd leave the default (`SUM`). More on this +on the *Aggregation* section below. Custom stores may also accept extra parameters besides `:aggregation`. See the documentation of each store for more details. @@ -348,8 +348,8 @@ use case, you may need to control how this works. When using this store, each Metric allows you to specify an `:aggregation` setting, defining how to aggregate the multiple possible values we can get for each labelset. By default, Counters, Histograms and Summaries are `SUM`med, and Gauges report all their values (one -for each process), tagged with a `pid` label. You can also select `SUM`, `MAX` or `MIN` -for your gauges, depending on your use case. +for each process), tagged with a `pid` label. You can also select `SUM`, `MAX`, `MIN`, or +`MOST_RECENT` for your gauges, depending on your use case. **Memory Usage**: When scraped by Prometheus, this store will read all these files, get all the values and aggregate them. We have notice this can have a noticeable effect on memory diff --git a/lib/prometheus/client/data_stores/direct_file_store.rb b/lib/prometheus/client/data_stores/direct_file_store.rb index de8d9784..cd995f76 100644 --- a/lib/prometheus/client/data_stores/direct_file_store.rb +++ b/lib/prometheus/client/data_stores/direct_file_store.rb @@ -29,7 +29,7 @@ module DataStores class DirectFileStore class InvalidStoreSettingsError < StandardError; end - AGGREGATION_MODES = [MAX = :max, MIN = :min, SUM = :sum, ALL = :all] + AGGREGATION_MODES = [MAX = :max, MIN = :min, SUM = :sum, ALL = :all, MOST_RECENT = :most_recent] DEFAULT_METRIC_SETTINGS = { aggregation: SUM } DEFAULT_GAUGE_SETTINGS = { aggregation: ALL } @@ -121,7 +121,7 @@ def all_values stores_for_metric.each do |file_path| begin store = FileMappedDict.new(file_path, true) - store.all_values.each do |(labelset_qs, v)| + store.all_values.each do |(labelset_qs, v, ts)| # Labels come as a query string, and CGI::parse returns arrays for each key # "foo=bar&x=y" => { "foo" => ["bar"], "x" => ["y"] } # Turn the keys back into symbols, and remove the arrays @@ -129,7 +129,7 @@ def all_values [k.to_sym, vs.first] end.to_h - stores_data[label_set] << v + stores_data[label_set] << [v, ts] end ensure store.close if store @@ -181,30 +181,41 @@ def process_id end def aggregate_values(values) - if @values_aggregation_mode == SUM - values.inject { |sum, element| sum + element } - elsif @values_aggregation_mode == MAX - values.max - elsif @values_aggregation_mode == MIN - values.min - elsif @values_aggregation_mode == ALL - values.first + # Each entry in the `values` array is a tuple of `value` and `timestamp`, + # so for all aggregations except `MOST_RECENT`, we need to only take the + # first value in each entry and ignore the second. + if @values_aggregation_mode == MOST_RECENT + latest_tuple = values.max { |a,b| a[1] <=> b[1] } + latest_tuple.first # return the value without the timestamp else - raise InvalidStoreSettingsError, - "Invalid Aggregation Mode: #{ @values_aggregation_mode }" + values = values.map(&:first) # Discard timestamps + + if @values_aggregation_mode == SUM + values.inject { |sum, element| sum + element } + elsif @values_aggregation_mode == MAX + values.max + elsif @values_aggregation_mode == MIN + values.min + elsif @values_aggregation_mode == ALL + values.first + else + raise InvalidStoreSettingsError, + "Invalid Aggregation Mode: #{ @values_aggregation_mode }" + end end end end private_constant :MetricStore - # A dict of doubles, backed by an file we access directly a a byte array. + # A dict of doubles, backed by an file we access directly as a byte array. # # The file starts with a 4 byte int, indicating how much of it is used. # Then 4 bytes of padding. # There's then a number of entries, consisting of a 4 byte int which is the # size of the next field, a utf-8 encoded string key, padding to an 8 byte - # alignment, and then a 8 byte float which is the value. + # alignment, and then a 8 byte float which is the value, and then a 8 byte + # float which is the unix timestamp when the value was set. class FileMappedDict INITIAL_FILE_SIZE = 1024*1024 @@ -236,7 +247,8 @@ def all_values @positions.map do |key, pos| @f.seek(pos) value = @f.read(8).unpack('d')[0] - [key, value] + timestamp = @f.read(8).unpack('d')[0] + [key, value, timestamp] end end end @@ -258,7 +270,7 @@ def write_value(key, value) pos = @positions[key] @f.seek(pos) - @f.write([value].pack('d')) + @f.write([value, Time.now.to_f].pack('dd')) @f.flush end @@ -299,7 +311,7 @@ def resize_file(new_capacity) def init_value(key) # Pad to be 8-byte aligned. padded = key + (' ' * (8 - (key.length + 4) % 8)) - value = [padded.length, padded, 0.0].pack("lA#{padded.length}d") + value = [padded.length, padded, 0.0, 0.0].pack("lA#{padded.length}dd") while @used + value.length > @capacity @capacity *= 2 resize_file(@capacity) @@ -310,7 +322,7 @@ def init_value(key) @f.seek(0) @f.write([@used].pack('l')) @f.flush - @positions[key] = @used - 8 + @positions[key] = @used - 16 end # Read position of all keys. No locking is performed. @@ -320,7 +332,7 @@ def populate_positions padded_len = @f.read(4).unpack('l')[0] key = @f.read(padded_len).unpack("A#{padded_len}")[0].strip @positions[key] = @f.pos - @f.seek(8, :CUR) + @f.seek(16, :CUR) end end end diff --git a/spec/prometheus/client/data_stores/direct_file_store_spec.rb b/spec/prometheus/client/data_stores/direct_file_store_spec.rb index f39ff10e..8fedd4a2 100644 --- a/spec/prometheus/client/data_stores/direct_file_store_spec.rb +++ b/spec/prometheus/client/data_stores/direct_file_store_spec.rb @@ -267,6 +267,41 @@ end end + context "with a metric that takes MOST_RECENT instead of SUM" do + it "reports the most recently written value from different processes" do + metric_store1 = subject.for_metric( + :metric_name, + metric_type: :gauge, + metric_settings: { aggregation: :most_recent } + ) + metric_store2 = subject.for_metric( + :metric_name, + metric_type: :gauge, + metric_settings: { aggregation: :most_recent } + ) + + allow(Process).to receive(:pid).and_return(12345) + metric_store1.set(labels: { foo: "bar" }, val: 1) + + allow(Process).to receive(:pid).and_return(23456) + metric_store2.set(labels: { foo: "bar" }, val: 3) # Supercedes 'bar' in PID 12345 + metric_store2.set(labels: { foo: "baz" }, val: 2) + metric_store2.set(labels: { foo: "zzz" }, val: 1) + + allow(Process).to receive(:pid).and_return(12345) + metric_store1.set(labels: { foo: "baz" }, val: 4) # Supercedes 'baz' in PID 23456 + + expect(metric_store1.all_values).to eq( + { foo: "bar" } => 3.0, + { foo: "baz" } => 4.0, + { foo: "zzz" } => 1.0, + ) + + # Both processes should return the same value + expect(metric_store1.all_values).to eq(metric_store2.all_values) + end + end + it "resizes the File if metrics get too big" do truncate_calls_count = 0 allow_any_instance_of(Prometheus::Client::DataStores::DirectFileStore::FileMappedDict). From f40a541d145832d41a0103ab0cf4cd0e96c518c8 Mon Sep 17 00:00:00 2001 From: Daniel Magliola Date: Mon, 24 Feb 2020 15:37:39 -0300 Subject: [PATCH 2/5] Only allow Gauges to use :most_recent aggregation Using this aggregation with any other metric type is almost certainly not what the user intended, and it'll result in counters that go up and down, and completely inconsistent histograms. Signed-off-by: Daniel Magliola --- .../client/data_stores/direct_file_store.rb | 9 ++++-- .../data_stores/direct_file_store_spec.rb | 31 ++++++++++++++++++- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/lib/prometheus/client/data_stores/direct_file_store.rb b/lib/prometheus/client/data_stores/direct_file_store.rb index cd995f76..87d1b3eb 100644 --- a/lib/prometheus/client/data_stores/direct_file_store.rb +++ b/lib/prometheus/client/data_stores/direct_file_store.rb @@ -45,7 +45,7 @@ def for_metric(metric_name, metric_type:, metric_settings: {}) end settings = default_settings.merge(metric_settings) - validate_metric_settings(settings) + validate_metric_settings(metric_type, settings) MetricStore.new(metric_name: metric_name, store_settings: @store_settings, @@ -54,7 +54,7 @@ def for_metric(metric_name, metric_type:, metric_settings: {}) private - def validate_metric_settings(metric_settings) + def validate_metric_settings(metric_type, metric_settings) unless metric_settings.has_key?(:aggregation) && AGGREGATION_MODES.include?(metric_settings[:aggregation]) raise InvalidStoreSettingsError, @@ -65,6 +65,11 @@ def validate_metric_settings(metric_settings) raise InvalidStoreSettingsError, "Only :aggregation setting can be specified" end + + if metric_settings[:aggregation] == MOST_RECENT && metric_type != :gauge + raise InvalidStoreSettingsError, + "Only :gauge metrics support :most_recent aggregation" + end end class MetricStore diff --git a/spec/prometheus/client/data_stores/direct_file_store_spec.rb b/spec/prometheus/client/data_stores/direct_file_store_spec.rb index 8fedd4a2..20efd567 100644 --- a/spec/prometheus/client/data_stores/direct_file_store_spec.rb +++ b/spec/prometheus/client/data_stores/direct_file_store_spec.rb @@ -14,7 +14,7 @@ it_behaves_like Prometheus::Client::DataStores - it "only accepts valid :aggregation as Metric Settings" do + it "only accepts valid :aggregation values as Metric Settings" do expect do subject.for_metric(:metric_name, metric_type: :counter, @@ -26,7 +26,10 @@ metric_type: :counter, metric_settings: { aggregation: :invalid }) end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError) + end + it "only accepts valid keys as Metric Settings" do + # the only valid key at the moment is :aggregation expect do subject.for_metric(:metric_name, metric_type: :counter, @@ -34,6 +37,32 @@ end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError) end + it "only accepts :most_recent aggregation for gauges" do + expect do + subject.for_metric(:metric_name, + metric_type: :gauge, + metric_settings: { aggregation: Prometheus::Client::DataStores::DirectFileStore::MOST_RECENT }) + end.not_to raise_error + + expect do + subject.for_metric(:metric_name, + metric_type: :counter, + metric_settings: { aggregation: Prometheus::Client::DataStores::DirectFileStore::MOST_RECENT }) + end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError) + + expect do + subject.for_metric(:metric_name, + metric_type: :histogram, + metric_settings: { aggregation: Prometheus::Client::DataStores::DirectFileStore::MOST_RECENT }) + end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError) + + expect do + subject.for_metric(:metric_name, + metric_type: :summary, + metric_settings: { aggregation: Prometheus::Client::DataStores::DirectFileStore::MOST_RECENT }) + end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError) + end + it "raises when aggregating if we get to that that point with an invalid aggregation mode" do # This is basically just for coverage of a safety clause that can never be reached allow(subject).to receive(:validate_metric_settings) # turn off validation From dddba2f43532262a802b25ddd40932ec4dec8127 Mon Sep 17 00:00:00 2001 From: Daniel Magliola Date: Mon, 24 Feb 2020 15:44:28 -0300 Subject: [PATCH 3/5] Do now allow `DirectFileStore#increment` when using :most_recent aggregation If we do this, we'd be incrementing the value for *this* process, not the global one, which is almost certainly not what the user wants to do. This is not very pretty because we may end up raising an exception in production (as test/dev tend to not use DirectFileStore), but we consider it better than letting the user mangle their numbers and end up with incorrect metrics. Signed-off-by: Daniel Magliola --- README.md | 3 +++ .../client/data_stores/direct_file_store.rb | 6 ++++++ .../client/data_stores/direct_file_store_spec.rb | 12 ++++++++++++ 3 files changed, 21 insertions(+) diff --git a/README.md b/README.md index e19c1f41..ecdcc68f 100644 --- a/README.md +++ b/README.md @@ -351,6 +351,9 @@ Counters, Histograms and Summaries are `SUM`med, and Gauges report all their val for each process), tagged with a `pid` label. You can also select `SUM`, `MAX`, `MIN`, or `MOST_RECENT` for your gauges, depending on your use case. +Please note that that the `MOST_RECENT` aggregation only works for gauges, and it does not +allow the use of `increment` / `decrement`, you can only use `set`. + **Memory Usage**: When scraped by Prometheus, this store will read all these files, get all the values and aggregate them. We have notice this can have a noticeable effect on memory usage for your app. We recommend you test this in a realistic usage scenario to make sure diff --git a/lib/prometheus/client/data_stores/direct_file_store.rb b/lib/prometheus/client/data_stores/direct_file_store.rb index 87d1b3eb..fd1886a7 100644 --- a/lib/prometheus/client/data_stores/direct_file_store.rb +++ b/lib/prometheus/client/data_stores/direct_file_store.rb @@ -105,6 +105,12 @@ def set(labels:, val:) end def increment(labels:, by: 1) + if @values_aggregation_mode == DirectFileStore::MOST_RECENT + raise InvalidStoreSettingsError, + "The :most_recent aggregation does not support the use of increment"\ + "/decrement" + end + key = store_key(labels) in_process_sync do value = internal_store.read_value(key) diff --git a/spec/prometheus/client/data_stores/direct_file_store_spec.rb b/spec/prometheus/client/data_stores/direct_file_store_spec.rb index 20efd567..c140546e 100644 --- a/spec/prometheus/client/data_stores/direct_file_store_spec.rb +++ b/spec/prometheus/client/data_stores/direct_file_store_spec.rb @@ -329,6 +329,18 @@ # Both processes should return the same value expect(metric_store1.all_values).to eq(metric_store2.all_values) end + + it "does now allow `increment`, only `set`" do + metric_store1 = subject.for_metric( + :metric_name, + metric_type: :gauge, + metric_settings: { aggregation: :most_recent } + ) + + expect do + metric_store1.increment(labels: {}) + end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError) + end end it "resizes the File if metrics get too big" do From 6bb714422bc38c8ae70b58ac1050916dbb1aa54e Mon Sep 17 00:00:00 2001 From: Daniel Magliola Date: Tue, 25 Feb 2020 08:20:20 -0300 Subject: [PATCH 4/5] Use monotonic clock when timestamping observed values The Monotonic clock is going to be more accurate on the few cases where the distinction matters, but it's also somehow faster than `Time.now`. Signed-off-by: Daniel Magliola --- lib/prometheus/client/data_stores/direct_file_store.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/prometheus/client/data_stores/direct_file_store.rb b/lib/prometheus/client/data_stores/direct_file_store.rb index fd1886a7..717d8d5c 100644 --- a/lib/prometheus/client/data_stores/direct_file_store.rb +++ b/lib/prometheus/client/data_stores/direct_file_store.rb @@ -279,9 +279,10 @@ def write_value(key, value) init_value(key) end + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) pos = @positions[key] @f.seek(pos) - @f.write([value, Time.now.to_f].pack('dd')) + @f.write([value, now].pack('dd')) @f.flush end From 826b32e34e8bc6e4ca40c88b5144c745bc86b07f Mon Sep 17 00:00:00 2001 From: Daniel Magliola Date: Tue, 25 Feb 2020 08:32:51 -0300 Subject: [PATCH 5/5] Small file read performance improvement for DirectFileStore Instead of two `read` operations, we can do both together at once Signed-off-by: Daniel Magliola --- lib/prometheus/client/data_stores/direct_file_store.rb | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lib/prometheus/client/data_stores/direct_file_store.rb b/lib/prometheus/client/data_stores/direct_file_store.rb index 717d8d5c..7bb5c1f4 100644 --- a/lib/prometheus/client/data_stores/direct_file_store.rb +++ b/lib/prometheus/client/data_stores/direct_file_store.rb @@ -257,8 +257,7 @@ def all_values with_file_lock do @positions.map do |key, pos| @f.seek(pos) - value = @f.read(8).unpack('d')[0] - timestamp = @f.read(8).unpack('d')[0] + value, timestamp = @f.read(16).unpack('dd') [key, value, timestamp] end end