From bec3e13c3e0bd6ce6c7e27c95bdb2f316639241a Mon Sep 17 00:00:00 2001 From: r-dilip Date: Tue, 12 Nov 2019 14:20:19 -0800 Subject: [PATCH] Fix duplicate records in container memory/cpu samples --- source/code/plugin/filter_health_model_builder.rb | 6 +++--- .../plugin/health/health_container_cpu_memory_aggregator.rb | 3 ++- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/code/plugin/filter_health_model_builder.rb b/source/code/plugin/filter_health_model_builder.rb index 47ce7a631..1724065fe 100644 --- a/source/code/plugin/filter_health_model_builder.rb +++ b/source/code/plugin/filter_health_model_builder.rb @@ -97,12 +97,11 @@ def filter_stream(tag, es) } end container_records_aggregator = HealthContainerCpuMemoryAggregator.new(@resources, @provider) - deduped_records = container_records_aggregator.dedupe_records(container_records) if @container_cpu_memory_records.nil? @log.info "@container_cpu_memory_records was not initialized" @container_cpu_memory_records = [] #in some clusters, this is null, so initialize it again. end - @container_cpu_memory_records.push(*deduped_records) # push the records for aggregation later + @container_cpu_memory_records.push(*container_records) # push the records for aggregation later return MultiEventStream.new elsif tag.start_with?("kubehealth.ReplicaSet") records = [] @@ -114,7 +113,8 @@ def filter_stream(tag, es) aggregated_container_records = [] if !@container_cpu_memory_records.nil? && !@container_cpu_memory_records.empty? container_records_aggregator = HealthContainerCpuMemoryAggregator.new(@resources, @provider) - container_records_aggregator.aggregate(@container_cpu_memory_records) + deduped_records = container_records_aggregator.dedupe_records(@container_cpu_memory_records) + container_records_aggregator.aggregate(deduped_records) container_records_aggregator.compute_state aggregated_container_records = container_records_aggregator.get_records end diff --git a/source/code/plugin/health/health_container_cpu_memory_aggregator.rb b/source/code/plugin/health/health_container_cpu_memory_aggregator.rb index 6d69e0213..29ac91bde 100644 --- a/source/code/plugin/health/health_container_cpu_memory_aggregator.rb +++ b/source/code/plugin/health/health_container_cpu_memory_aggregator.rb @@ -84,12 +84,13 @@ def dedupe_records(container_records) else r = resource_instances[instance_name] if record["Timestamp"] > r["Timestamp"] - @log.info "Dropping older record" + @log.info "Dropping older record for instance #{instance_name} new: #{record["Timestamp"]} old: #{r["Timestamp"]}" resource_instances[instance_name] = record end end rescue => e @log.info "Exception when deduping record #{record}" + next end end return cpu_deduped_instances.values.concat(memory_deduped_instances.values)