diff --git a/README.md b/README.md index 9be8a520..ecdcc68f 100644 --- a/README.md +++ b/README.md @@ -313,9 +313,9 @@ When instantiating metrics, there is an optional `store_settings` attribute. Thi to set up store-specific settings for each metric. For most stores, this is not used, but for multi-process stores, this is used to specify how to aggregate the values of each metric across multiple processes. For the most part, this is used for Gauges, to specify -whether you want to report the `SUM`, `MAX` or `MIN` value observed across all processes. -For almost all other cases, you'd leave the default (`SUM`). More on this on the -*Aggregation* section below. +whether you want to report the `SUM`, `MAX`, `MIN`, or `MOST_RECENT` value observed across +all processes. For almost all other cases, you'd leave the default (`SUM`). More on this +on the *Aggregation* section below. Custom stores may also accept extra parameters besides `:aggregation`. See the documentation of each store for more details. @@ -348,8 +348,11 @@ use case, you may need to control how this works. When using this store, each Metric allows you to specify an `:aggregation` setting, defining how to aggregate the multiple possible values we can get for each labelset. By default, Counters, Histograms and Summaries are `SUM`med, and Gauges report all their values (one -for each process), tagged with a `pid` label. You can also select `SUM`, `MAX` or `MIN` -for your gauges, depending on your use case. +for each process), tagged with a `pid` label. You can also select `SUM`, `MAX`, `MIN`, or +`MOST_RECENT` for your gauges, depending on your use case. + +Please note that that the `MOST_RECENT` aggregation only works for gauges, and it does not +allow the use of `increment` / `decrement`, you can only use `set`. **Memory Usage**: When scraped by Prometheus, this store will read all these files, get all the values and aggregate them. We have notice this can have a noticeable effect on memory diff --git a/lib/prometheus/client/data_stores/direct_file_store.rb b/lib/prometheus/client/data_stores/direct_file_store.rb index de8d9784..7bb5c1f4 100644 --- a/lib/prometheus/client/data_stores/direct_file_store.rb +++ b/lib/prometheus/client/data_stores/direct_file_store.rb @@ -29,7 +29,7 @@ module DataStores class DirectFileStore class InvalidStoreSettingsError < StandardError; end - AGGREGATION_MODES = [MAX = :max, MIN = :min, SUM = :sum, ALL = :all] + AGGREGATION_MODES = [MAX = :max, MIN = :min, SUM = :sum, ALL = :all, MOST_RECENT = :most_recent] DEFAULT_METRIC_SETTINGS = { aggregation: SUM } DEFAULT_GAUGE_SETTINGS = { aggregation: ALL } @@ -45,7 +45,7 @@ def for_metric(metric_name, metric_type:, metric_settings: {}) end settings = default_settings.merge(metric_settings) - validate_metric_settings(settings) + validate_metric_settings(metric_type, settings) MetricStore.new(metric_name: metric_name, store_settings: @store_settings, @@ -54,7 +54,7 @@ def for_metric(metric_name, metric_type:, metric_settings: {}) private - def validate_metric_settings(metric_settings) + def validate_metric_settings(metric_type, metric_settings) unless metric_settings.has_key?(:aggregation) && AGGREGATION_MODES.include?(metric_settings[:aggregation]) raise InvalidStoreSettingsError, @@ -65,6 +65,11 @@ def validate_metric_settings(metric_settings) raise InvalidStoreSettingsError, "Only :aggregation setting can be specified" end + + if metric_settings[:aggregation] == MOST_RECENT && metric_type != :gauge + raise InvalidStoreSettingsError, + "Only :gauge metrics support :most_recent aggregation" + end end class MetricStore @@ -100,6 +105,12 @@ def set(labels:, val:) end def increment(labels:, by: 1) + if @values_aggregation_mode == DirectFileStore::MOST_RECENT + raise InvalidStoreSettingsError, + "The :most_recent aggregation does not support the use of increment"\ + "/decrement" + end + key = store_key(labels) in_process_sync do value = internal_store.read_value(key) @@ -121,7 +132,7 @@ def all_values stores_for_metric.each do |file_path| begin store = FileMappedDict.new(file_path, true) - store.all_values.each do |(labelset_qs, v)| + store.all_values.each do |(labelset_qs, v, ts)| # Labels come as a query string, and CGI::parse returns arrays for each key # "foo=bar&x=y" => { "foo" => ["bar"], "x" => ["y"] } # Turn the keys back into symbols, and remove the arrays @@ -129,7 +140,7 @@ def all_values [k.to_sym, vs.first] end.to_h - stores_data[label_set] << v + stores_data[label_set] << [v, ts] end ensure store.close if store @@ -181,30 +192,41 @@ def process_id end def aggregate_values(values) - if @values_aggregation_mode == SUM - values.inject { |sum, element| sum + element } - elsif @values_aggregation_mode == MAX - values.max - elsif @values_aggregation_mode == MIN - values.min - elsif @values_aggregation_mode == ALL - values.first + # Each entry in the `values` array is a tuple of `value` and `timestamp`, + # so for all aggregations except `MOST_RECENT`, we need to only take the + # first value in each entry and ignore the second. + if @values_aggregation_mode == MOST_RECENT + latest_tuple = values.max { |a,b| a[1] <=> b[1] } + latest_tuple.first # return the value without the timestamp else - raise InvalidStoreSettingsError, - "Invalid Aggregation Mode: #{ @values_aggregation_mode }" + values = values.map(&:first) # Discard timestamps + + if @values_aggregation_mode == SUM + values.inject { |sum, element| sum + element } + elsif @values_aggregation_mode == MAX + values.max + elsif @values_aggregation_mode == MIN + values.min + elsif @values_aggregation_mode == ALL + values.first + else + raise InvalidStoreSettingsError, + "Invalid Aggregation Mode: #{ @values_aggregation_mode }" + end end end end private_constant :MetricStore - # A dict of doubles, backed by an file we access directly a a byte array. + # A dict of doubles, backed by an file we access directly as a byte array. # # The file starts with a 4 byte int, indicating how much of it is used. # Then 4 bytes of padding. # There's then a number of entries, consisting of a 4 byte int which is the # size of the next field, a utf-8 encoded string key, padding to an 8 byte - # alignment, and then a 8 byte float which is the value. + # alignment, and then a 8 byte float which is the value, and then a 8 byte + # float which is the unix timestamp when the value was set. class FileMappedDict INITIAL_FILE_SIZE = 1024*1024 @@ -235,8 +257,8 @@ def all_values with_file_lock do @positions.map do |key, pos| @f.seek(pos) - value = @f.read(8).unpack('d')[0] - [key, value] + value, timestamp = @f.read(16).unpack('dd') + [key, value, timestamp] end end end @@ -256,9 +278,10 @@ def write_value(key, value) init_value(key) end + now = Process.clock_gettime(Process::CLOCK_MONOTONIC) pos = @positions[key] @f.seek(pos) - @f.write([value].pack('d')) + @f.write([value, now].pack('dd')) @f.flush end @@ -299,7 +322,7 @@ def resize_file(new_capacity) def init_value(key) # Pad to be 8-byte aligned. padded = key + (' ' * (8 - (key.length + 4) % 8)) - value = [padded.length, padded, 0.0].pack("lA#{padded.length}d") + value = [padded.length, padded, 0.0, 0.0].pack("lA#{padded.length}dd") while @used + value.length > @capacity @capacity *= 2 resize_file(@capacity) @@ -310,7 +333,7 @@ def init_value(key) @f.seek(0) @f.write([@used].pack('l')) @f.flush - @positions[key] = @used - 8 + @positions[key] = @used - 16 end # Read position of all keys. No locking is performed. @@ -320,7 +343,7 @@ def populate_positions padded_len = @f.read(4).unpack('l')[0] key = @f.read(padded_len).unpack("A#{padded_len}")[0].strip @positions[key] = @f.pos - @f.seek(8, :CUR) + @f.seek(16, :CUR) end end end diff --git a/spec/prometheus/client/data_stores/direct_file_store_spec.rb b/spec/prometheus/client/data_stores/direct_file_store_spec.rb index f39ff10e..c140546e 100644 --- a/spec/prometheus/client/data_stores/direct_file_store_spec.rb +++ b/spec/prometheus/client/data_stores/direct_file_store_spec.rb @@ -14,7 +14,7 @@ it_behaves_like Prometheus::Client::DataStores - it "only accepts valid :aggregation as Metric Settings" do + it "only accepts valid :aggregation values as Metric Settings" do expect do subject.for_metric(:metric_name, metric_type: :counter, @@ -26,7 +26,10 @@ metric_type: :counter, metric_settings: { aggregation: :invalid }) end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError) + end + it "only accepts valid keys as Metric Settings" do + # the only valid key at the moment is :aggregation expect do subject.for_metric(:metric_name, metric_type: :counter, @@ -34,6 +37,32 @@ end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError) end + it "only accepts :most_recent aggregation for gauges" do + expect do + subject.for_metric(:metric_name, + metric_type: :gauge, + metric_settings: { aggregation: Prometheus::Client::DataStores::DirectFileStore::MOST_RECENT }) + end.not_to raise_error + + expect do + subject.for_metric(:metric_name, + metric_type: :counter, + metric_settings: { aggregation: Prometheus::Client::DataStores::DirectFileStore::MOST_RECENT }) + end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError) + + expect do + subject.for_metric(:metric_name, + metric_type: :histogram, + metric_settings: { aggregation: Prometheus::Client::DataStores::DirectFileStore::MOST_RECENT }) + end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError) + + expect do + subject.for_metric(:metric_name, + metric_type: :summary, + metric_settings: { aggregation: Prometheus::Client::DataStores::DirectFileStore::MOST_RECENT }) + end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError) + end + it "raises when aggregating if we get to that that point with an invalid aggregation mode" do # This is basically just for coverage of a safety clause that can never be reached allow(subject).to receive(:validate_metric_settings) # turn off validation @@ -267,6 +296,53 @@ end end + context "with a metric that takes MOST_RECENT instead of SUM" do + it "reports the most recently written value from different processes" do + metric_store1 = subject.for_metric( + :metric_name, + metric_type: :gauge, + metric_settings: { aggregation: :most_recent } + ) + metric_store2 = subject.for_metric( + :metric_name, + metric_type: :gauge, + metric_settings: { aggregation: :most_recent } + ) + + allow(Process).to receive(:pid).and_return(12345) + metric_store1.set(labels: { foo: "bar" }, val: 1) + + allow(Process).to receive(:pid).and_return(23456) + metric_store2.set(labels: { foo: "bar" }, val: 3) # Supercedes 'bar' in PID 12345 + metric_store2.set(labels: { foo: "baz" }, val: 2) + metric_store2.set(labels: { foo: "zzz" }, val: 1) + + allow(Process).to receive(:pid).and_return(12345) + metric_store1.set(labels: { foo: "baz" }, val: 4) # Supercedes 'baz' in PID 23456 + + expect(metric_store1.all_values).to eq( + { foo: "bar" } => 3.0, + { foo: "baz" } => 4.0, + { foo: "zzz" } => 1.0, + ) + + # Both processes should return the same value + expect(metric_store1.all_values).to eq(metric_store2.all_values) + end + + it "does now allow `increment`, only `set`" do + metric_store1 = subject.for_metric( + :metric_name, + metric_type: :gauge, + metric_settings: { aggregation: :most_recent } + ) + + expect do + metric_store1.increment(labels: {}) + end.to raise_error(Prometheus::Client::DataStores::DirectFileStore::InvalidStoreSettingsError) + end + end + it "resizes the File if metrics get too big" do truncate_calls_count = 0 allow_any_instance_of(Prometheus::Client::DataStores::DirectFileStore::FileMappedDict).