From bdc283d4113b9d7b0d04210ad5d43de17449e880 Mon Sep 17 00:00:00 2001 From: Dilip Raghunathan Date: Tue, 7 Jan 2020 14:08:19 -0800 Subject: [PATCH 1/9] Batch Commit --- source/code/plugin/out_mdm.rb | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/source/code/plugin/out_mdm.rb b/source/code/plugin/out_mdm.rb index 0a4e601b2..ab3bae4c3 100644 --- a/source/code/plugin/out_mdm.rb +++ b/source/code/plugin/out_mdm.rb @@ -22,6 +22,7 @@ def initialize @@post_request_url_template = "https://%{aks_region}.monitoring.azure.com%{aks_resource_id}/metrics" @@token_url_template = "https://login.microsoftonline.com/%{tenant_id}/oauth2/token" @@plugin_name = "AKSCustomMetricsMDM" + @@record_batch_size = 2600 @data_hash = {} @token_url = nil @@ -136,7 +137,15 @@ def write(chunk) chunk.msgpack_each { |(tag, record)| post_body.push(record.to_json) } - send_to_mdm post_body + # the limit of the payload is 1MB. Each record is ~300 bytes. using a batch size of 2600, so that + # the pay load size becomes approximately 800 Kb. + count = post_body.size + while count > 0 + current_batch = post_body.first(@@record_batch_size) + count -= current_batch.size + send_to_mdm current_batch + end + else if !@can_send_data_to_mdm @log.info "Cannot send data to MDM since all required conditions were not met" @@ -157,6 +166,7 @@ def send_to_mdm(post_body) request = Net::HTTP::Post.new(@post_request_uri.request_uri) request["Content-Type"] = "application/x-ndjson" request["Authorization"] = "Bearer #{access_token}" + request.body = post_body.join("\n") response = @http_client.request(request) response.value # this throws for non 200 HTTP response code From e9f54b99b92932613571ee0d8bd8ab987f254bb6 Mon Sep 17 00:00:00 2001 From: Dilip Raghunathan Date: Thu, 16 Jan 2020 20:48:33 -0800 Subject: [PATCH 2/9] WIP: Committing move logic from filter to input --- source/code/plugin/filter_cadvisor2mdm.rb | 49 +++-- source/code/plugin/filter_inventory2mdm.rb | 4 +- source/code/plugin/in_kube_podinventory.rb | 22 +- source/code/plugin/kubelet_utils.rb | 33 +++ source/code/plugin/podinventory_to_mdm.rb | 238 +++++++++++++++++++++ 5 files changed, 320 insertions(+), 26 deletions(-) create mode 100644 source/code/plugin/kubelet_utils.rb create mode 100644 source/code/plugin/podinventory_to_mdm.rb diff --git a/source/code/plugin/filter_cadvisor2mdm.rb b/source/code/plugin/filter_cadvisor2mdm.rb index f14a1369b..278ab957f 100644 --- a/source/code/plugin/filter_cadvisor2mdm.rb +++ b/source/code/plugin/filter_cadvisor2mdm.rb @@ -7,6 +7,7 @@ module Fluent require 'yajl/json_gem' require_relative 'oms_common' require_relative 'CustomMetricsUtils' + require_relative 'kubelet_utils' class CAdvisor2MdmFilter < Filter Fluent::Plugin.register_filter('filter_cadvisor2mdm', self) @@ -138,33 +139,41 @@ def filter(tag, time, record) def ensure_cpu_memory_capacity_set - @log.info "ensure_cpu_memory_capacity_set @cpu_capacity #{@cpu_capacity} @memory_capacity #{@memory_capacity}" if @cpu_capacity != 0.0 && @memory_capacity != 0.0 @log.info "CPU And Memory Capacity are already set" return end - begin - nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes?fieldSelector=metadata.name%3D#{@@hostName}").body) - rescue Exception => e - @log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} " - ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) - end - if !nodeInventory.nil? - cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores") - if !cpu_capacity_json.nil? && !cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? - @cpu_capacity = cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] - @log.info "CPU Limit #{@cpu_capacity}" - else - @log.info "Error getting cpu_capacity" + controller_type = ENV["CONTROLLER_TYPE"] + if controller_type.nil? || controller_type.downcase == 'replicaset' + @log.info "ensure_cpu_memory_capacity_set @cpu_capacity #{@cpu_capacity} @memory_capacity #{@memory_capacity}" + + begin + nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes?fieldSelector=metadata.name%3D#{@@hostName}").body) + rescue Exception => e + @log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} " + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) end - memory_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes") - if !memory_capacity_json.nil? && !memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? - @memory_capacity = memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] - @log.info "Memory Limit #{@memory_capacity}" - else - @log.info "Error getting memory_capacity" + if !nodeInventory.nil? + cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores") + if !cpu_capacity_json.nil? && !cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? + @cpu_capacity = cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] + @log.info "CPU Limit #{@cpu_capacity}" + else + @log.info "Error getting cpu_capacity" + end + memory_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes") + if !memory_capacity_json.nil? && !memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? + @memory_capacity = memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] + @log.info "Memory Limit #{@memory_capacity}" + else + @log.info "Error getting memory_capacity" + end end + elsif controller_type.downcase == 'daemonset' + capacity_from_kubelet = KubeletUtils.get_node_capacity + @cpu_capacity = capacity_from_kubelet[0] + @memory_capacity = capacity_from_kubelet[1] end end diff --git a/source/code/plugin/filter_inventory2mdm.rb b/source/code/plugin/filter_inventory2mdm.rb index 422b4b54a..16f2bb148 100644 --- a/source/code/plugin/filter_inventory2mdm.rb +++ b/source/code/plugin/filter_inventory2mdm.rb @@ -156,7 +156,7 @@ def process_pod_inventory_records(es) no_phase_dim_values_hash = Hash.new total_pod_count = 0 pod_count_by_phase = {} - podUids = {} + podUids = {} record_count = 0 begin records = [] @@ -165,7 +165,7 @@ def process_pod_inventory_records(es) timestamp = record['DataItems'][0]['CollectionTime'] podUid = record['DataItems'][0]['PodUid'] - if podUids.key?(podUid) + if podUids.key?(podUid) #@log.info "pod with #{podUid} already counted" next end diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index 28b20bfc0..672eb58f8 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -22,16 +22,19 @@ def initialize require_relative "ApplicationInsightsUtility" require_relative "oms_common" require_relative "omslog" + require_relative "podinventory_to_mdm" @PODS_CHUNK_SIZE = "1500" @podCount = 0 @controllerSet = Set.new [] @winContainerCount = 0 @controllerData = {} + @inventoryToMdmConvertor = Inventory2MdmConvertor.new(@custom_metrics_azure_regions) end config_param :run_interval, :time, :default => 60 config_param :tag, :string, :default => "oms.containerinsights.KubePodInventory" + config_param :custom_metrics_azure_regions, :string def configure(conf) super @@ -87,7 +90,7 @@ def enumerate(podList = nil) continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}") $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?) - parse_and_emit_records(podInventory, serviceList, batchTime) + parse_and_emit_records(podInventory, serviceList, continuationToken, batchTime) else $log.warn "in_kube_podinventory::enumerate:Received empty podInventory" end @@ -96,7 +99,7 @@ def enumerate(podList = nil) while (!continuationToken.nil? && !continuationToken.empty?) continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?) - parse_and_emit_records(podInventory, serviceList, batchTime) + parse_and_emit_records(podInventory, serviceList, continuationToken, batchTime) else $log.warn "in_kube_podinventory::enumerate:Received empty podInventory" end @@ -246,7 +249,7 @@ def getContainerEnvironmentVariables(pod, clusterCollectEnvironmentVar) end end - def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8601) + def parse_and_emit_records(podInventory, serviceList, continuationToken, batchTime = Time.utc.iso8601) currentTime = Time.now emitTime = currentTime.to_f #batchTime = currentTime.utc.iso8601 @@ -476,7 +479,18 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso86 end #podInventory block end router.emit_stream(@tag, eventStream) if eventStream - router.emit_stream(@@MDMKubePodInventoryTag, eventStream) if eventStream + # optimize inventory to mdm conversion. Move to input plugin for pod and node inventory from filter + begin + converted_records = @inventoryToMdmConvertor.process_pod_inventory_records(eventStream, batchTime) + mdm_pod_inventory_es = MultiEventStream.new + converted_records.each {|converted_record| + mdm_pod_inventory_es.add(batchTime, converted_record) if converted_record + } if converted_records + router.emit_stream(@@MDMKubePodInventoryTag, mdm_pod_inventory_es) if mdm_pod_inventory_es + rescue Exception => e + $log.info "Error converting inventory records to mdm custom metrics format #{e.class} Message: #{e.message}" + end + #:optimize:kubeperf merge begin #if(!podInventory.empty?) diff --git a/source/code/plugin/kubelet_utils.rb b/source/code/plugin/kubelet_utils.rb new file mode 100644 index 000000000..861ecfcf8 --- /dev/null +++ b/source/code/plugin/kubelet_utils.rb @@ -0,0 +1,33 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +#!/usr/local/bin/ruby +# frozen_string_literal: true + +class KubeletUtils + class << self + def get_node_capacity + $log.info "Getting Node capacity from KubeletUtils" + default_host = "http://localhost:10255" + relative_uri = "/spec/" + node_ip = ENV["NODE_IP"] + + if !node_ip.nil? + $log.info("Using #{node_ip + relative_uri} for CAdvisor Uri in Kubelet Utils") + cadvisor_uri = "http://#{node_ip}:10255#{relative_uri}" + + uri = URI.parse(cadvisor_uri) + cpu_capacity = 1.0 + memory_capacity = 1.0 + Net::HTTP.start(uri.host, uri.port, :use_ssl => false, :open_timeout => 20, :read_timeout => 40 ) do |http| + cadvisor_api_request = Net::HTTP::Get.new(uri.request_uri) + response = http.request(cadvisor_api_request) + if !response.nil? && !response.body.nil? + cpu_capacity = JSON.parse(response.body)["num_cores"].nil? ? 1.0 : (JSON.parse(response.body)["num_cores"] * 1000.0) + memory_capacity = JSON.parse(response.body)["memory_capacity"].nil? ? 1.0 : JSON.parse(response.body)["memory_capacity"].to_f + $log.info "CPU = #{cpu_capacity}mc Memory = #{memory_capacity/1024/1024}MB" + return [cpu_capacity, memory_capacity] + end + end + end + end + end +end \ No newline at end of file diff --git a/source/code/plugin/podinventory_to_mdm.rb b/source/code/plugin/podinventory_to_mdm.rb new file mode 100644 index 000000000..b35f125c2 --- /dev/null +++ b/source/code/plugin/podinventory_to_mdm.rb @@ -0,0 +1,238 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. + +# frozen_string_literal: true + +require 'logger' +require 'yajl/json_gem' +require_relative 'oms_common' +require_relative 'CustomMetricsUtils' + + +class Inventory2MdmConvertor + + @@node_count_metric_name = 'nodesCount' + @@pod_count_metric_name = 'podCount' + @@pod_inventory_tag = 'mdm.kubepodinventory' + @@node_inventory_tag = 'mdm.kubenodeinventory' + @@node_status_ready = 'Ready' + @@node_status_not_ready = 'NotReady' + + @@node_inventory_custom_metrics_template = ' + { + "time": "%{timestamp}", + "data": { + "baseData": { + "metric": "%{metricName}", + "namespace": "insights.container/nodes", + "dimNames": [ + "status" + ], + "series": [ + { + "dimValues": [ + "%{statusValue}" + ], + "min": %{node_status_count}, + "max": %{node_status_count}, + "sum": %{node_status_count}, + "count": 1 + } + ] + } + } + }' + + @@pod_inventory_custom_metrics_template = ' + { + "time": "%{timestamp}", + "data": { + "baseData": { + "metric": "%{metricName}", + "namespace": "insights.container/pods", + "dimNames": [ + "phase", + "Kubernetes namespace", + "node", + "controllerName" + ], + "series": [ + { + "dimValues": [ + "%{phaseDimValue}", + "%{namespaceDimValue}", + "%{nodeDimValue}", + "%{controllerNameDimValue}" + ], + "min": %{podCountMetricValue}, + "max": %{podCountMetricValue}, + "sum": %{podCountMetricValue}, + "count": 1 + } + ] + } + } + }' + + @@pod_phase_values = ['Running', 'Pending', 'Succeeded', 'Failed', 'Unknown'] + @process_incoming_stream = false + + def initialize(custom_metrics_azure_regions) + @log = Logger.new(@log_path, 1, 5000000) + @pod_count_hash = {} + @no_phase_dim_values_hash = {} + total_pod_count = 0 + @pod_count_by_phase = {} + @pod_uids = {} + @pod_inventory_records = {} + @process_incoming_stream = CustomMetricsUtils.check_custom_metrics_availability(custom_metrics_azure_regions) + @log.debug "After check_custom_metrics_availability process_incoming_stream #{@process_incoming_stream}" + @log.debug {'Starting filter_inventory2mdm plugin'} + end + + def process_node_inventory_records(es, batch_time) + if @process_incoming_stream + begin + node_ready_count = 0 + node_not_ready_count = 0 + records = [] + + es.each{|time,record| + begin + node_status = record['DataItems'][0]['Status'] + if node_status.downcase == @@node_status_ready.downcase + node_ready_count = node_ready_count+1 + else + node_not_ready_count = node_not_ready_count + 1 + end + rescue => e + end + } + + ready_record = @@node_inventory_custom_metrics_template % { + timestamp: batch_time, + metricName: @@node_count_metric_name, + statusValue: @@node_status_ready, + node_status_count: node_ready_count + } + records.push(JSON.parse(ready_record)) + + not_ready_record = @@node_inventory_custom_metrics_template % { + timestamp: batch_time, + metricName: @@node_count_metric_name, + statusValue: @@node_status_not_ready, + node_status_count: node_not_ready_count + } + records.push(JSON.parse(not_ready_record)) + rescue Exception => e + @log.info "Error processing node inventory records Exception: #{e.class} Message: #{e.message}" + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + return [] + end + return records + else + return [] + end + end + + def get_pod_inventory_mdm_records() + begin + @pod_count_hash.each {|key, value| + key_elements = key.split('~~') + if key_elements.length != 4 + next + end + + # get dimension values by key + podNodeDimValue = key_elements[0] + podNamespaceDimValue = key_elements[1] + podControllerNameDimValue = key_elements[2] + podPhaseDimValue = key_elements[3] + + record = @@pod_inventory_custom_metrics_template % { + timestamp: batch_time, + metricName: @@pod_count_metric_name, + phaseDimValue: podPhaseDimValue, + namespaceDimValue: podNamespaceDimValue, + nodeDimValue: podNodeDimValue, + controllerNameDimValue: podControllerNameDimValue, + podCountMetricValue: value + } + records.push(JSON.parse(record)) + } + rescue Exception => e + @log.info "Error processing pod inventory record Exception: #{e.class} Message: #{e.message}" + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + return [] + end + @log.info "Record Count #{record_count} pod count = #{total_pod_count} Pod Count To Phase #{@pod_count_by_phase} " + return records + end + + def process_pod_inventory_record(record) + if @process_incoming_stream + record_count = 0 + begin + records = [] + + record_count += 1 + podUid = record['DataItems'][0]['PodUid'] + + if @pod_uids.key?(podUid) + #@log.info "pod with #{podUid} already counted" + return + end + + @pod_uids[podUid] = true + podPhaseDimValue = record['DataItems'][0]['PodStatus'] + podNamespaceDimValue = record['DataItems'][0]['Namespace'] + podControllerNameDimValue = record['DataItems'][0]['ControllerName'] + podNodeDimValue = record['DataItems'][0]['Computer'] + + if podControllerNameDimValue.nil? || podControllerNameDimValue.empty? + podControllerNameDimValue = 'No Controller' + end + + if podNodeDimValue.empty? && podPhaseDimValue.downcase == 'pending' + podNodeDimValue = 'unscheduled' + elsif podNodeDimValue.empty? + podNodeDimValue = 'unknown' + end + + # group by distinct dimension values + pod_key = [podNodeDimValue, podNamespaceDimValue, podControllerNameDimValue, podPhaseDimValue].join('~~') + + if @pod_count_by_phase.key?(podPhaseDimValue) + phase_count = @pod_count_by_phase[podPhaseDimValue] + phase_count += 1 + @pod_count_by_phase[podPhaseDimValue] = phase_count + else + @pod_count_by_phase[podPhaseDimValue] = 1 + end + + total_pod_count += 1 + + if @pod_count_hash.key?(pod_key) + pod_count = @pod_count_hash[pod_key] + pod_count = pod_count + 1 + @pod_count_hash[pod_key] = pod_count + else + pod_count = 1 + @pod_count_hash[pod_key] = pod_count + end + + # Collect all possible combinations of dimension values other than pod phase + key_without_phase_dim_value = [podNodeDimValue, podNamespaceDimValue, podControllerNameDimValue].join('~~') + if @no_phase_dim_values_hash.key?(key_without_phase_dim_value) + return + else + @no_phase_dim_values_hash[key_without_phase_dim_value] = true + end + rescue Exception => e + @log.info "Error processing pod inventory record Exception: #{e.class} Message: #{e.message}" + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + end + @log.info "Record Count #{record_count} pod count = #{total_pod_count} Pod Count To Phase #{@pod_count_by_phase} " + end + end +end + From 7273508be3e3c36048642ec868b364fcb7538e74 Mon Sep 17 00:00:00 2001 From: r-dilip Date: Fri, 17 Jan 2020 11:32:22 -0800 Subject: [PATCH 3/9] WIP : MDM plugins for scale clusters --- installer/datafiles/base_container.data | 2 + source/code/plugin/in_kube_podinventory.rb | 28 ++++--- source/code/plugin/out_mdm.rb | 7 +- source/code/plugin/podinventory_to_mdm.rb | 97 ++++++---------------- 4 files changed, 47 insertions(+), 87 deletions(-) diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data index 60de5af18..f976454f9 100644 --- a/installer/datafiles/base_container.data +++ b/installer/datafiles/base_container.data @@ -35,6 +35,8 @@ MAINTAINER: 'Microsoft Corporation' /opt/microsoft/omsagent/plugin/in_win_cadvisor_perf.rb; source/code/plugin/in_win_cadvisor_perf.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_nodes.rb; source/code/plugin/in_kube_nodes.rb; 644; root; root /opt/microsoft/omsagent/plugin/filter_inventory2mdm.rb; source/code/plugin/filter_inventory2mdm.rb; 644; root; root +/opt/microsoft/omsagent/plugin/podinventory_to_mdm.rb; source/code/plugin/podinventory_to_mdm.rb; 644; root; root +/opt/microsoft/omsagent/plugin/kubelet_utils.rb; source/code/plugin/kubelet_utils.rb; 644; root; root /opt/microsoft/omsagent/plugin/CustomMetricsUtils.rb; source/code/plugin/CustomMetricsUtils.rb; 644; root; root diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index 672eb58f8..1aa5ca4e6 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -2,6 +2,9 @@ # frozen_string_literal: true module Fluent + + require_relative "podinventory_to_mdm" + class Kube_PodInventory_Input < Input Plugin.register_input("kubepodinventory", self) @@ -22,22 +25,22 @@ def initialize require_relative "ApplicationInsightsUtility" require_relative "oms_common" require_relative "omslog" - require_relative "podinventory_to_mdm" @PODS_CHUNK_SIZE = "1500" @podCount = 0 @controllerSet = Set.new [] @winContainerCount = 0 @controllerData = {} - @inventoryToMdmConvertor = Inventory2MdmConvertor.new(@custom_metrics_azure_regions) end config_param :run_interval, :time, :default => 60 config_param :tag, :string, :default => "oms.containerinsights.KubePodInventory" config_param :custom_metrics_azure_regions, :string + def configure(conf) super + @inventoryToMdmConvertor = Inventory2MdmConvertor.new(@custom_metrics_azure_regions) end def start @@ -462,6 +465,7 @@ def parse_and_emit_records(podInventory, serviceList, continuationToken, batchTi "DataItems" => [record.each { |k, v| record[k] = v }], } eventStream.add(emitTime, wrapper) if wrapper + @inventoryToMdmConvertor.process_pod_inventory_record(wrapper) end end # Send container inventory records for containers on windows nodes @@ -479,16 +483,16 @@ def parse_and_emit_records(podInventory, serviceList, continuationToken, batchTi end #podInventory block end router.emit_stream(@tag, eventStream) if eventStream - # optimize inventory to mdm conversion. Move to input plugin for pod and node inventory from filter - begin - converted_records = @inventoryToMdmConvertor.process_pod_inventory_records(eventStream, batchTime) - mdm_pod_inventory_es = MultiEventStream.new - converted_records.each {|converted_record| - mdm_pod_inventory_es.add(batchTime, converted_record) if converted_record - } if converted_records - router.emit_stream(@@MDMKubePodInventoryTag, mdm_pod_inventory_es) if mdm_pod_inventory_es - rescue Exception => e - $log.info "Error converting inventory records to mdm custom metrics format #{e.class} Message: #{e.message}" + + if continuationToken.nil? #no more chunks in this batch to be sent, get all pod inventory records to send + @log.info "Sending pod inventory mdm records to out_mdm" + pod_inventory_mdm_records = @inventoryToMdmConvertor.get_pod_inventory_mdm_records(batchTime) + @log.info "pod_inventory_mdm_records.size #{pod_inventory_mdm_records.size}" + mdm_pod_inventory_es = MultiEventStream.new + pod_inventory_mdm_records.each {|pod_inventory_mdm_record| + mdm_pod_inventory_es.add(batchTime, pod_inventory_mdm_record) if pod_inventory_mdm_record + } if pod_inventory_mdm_records + router.emit_stream(@@MDMKubePodInventoryTag, mdm_pod_inventory_es) if mdm_pod_inventory_es end #:optimize:kubeperf merge diff --git a/source/code/plugin/out_mdm.rb b/source/code/plugin/out_mdm.rb index ab3bae4c3..7999e1036 100644 --- a/source/code/plugin/out_mdm.rb +++ b/source/code/plugin/out_mdm.rb @@ -137,15 +137,15 @@ def write(chunk) chunk.msgpack_each { |(tag, record)| post_body.push(record.to_json) } - # the limit of the payload is 1MB. Each record is ~300 bytes. using a batch size of 2600, so that + # the limit of the payload is 1MB. Each record is ~300 bytes. using a batch size of 2600, so that # the pay load size becomes approximately 800 Kb. count = post_body.size while count > 0 current_batch = post_body.first(@@record_batch_size) + @log.info "Current Batch size #{current_batch.bytesize/1024}" count -= current_batch.size send_to_mdm current_batch - end - + end else if !@can_send_data_to_mdm @log.info "Cannot send data to MDM since all required conditions were not met" @@ -168,6 +168,7 @@ def send_to_mdm(post_body) request["Authorization"] = "Bearer #{access_token}" request.body = post_body.join("\n") + @log.info "REQUEST BODY SIZE #{request.body.bytesize/1024}" response = @http_client.request(request) response.value # this throws for non 200 HTTP response code @log.info "HTTP Post Response Code : #{response.code}" diff --git a/source/code/plugin/podinventory_to_mdm.rb b/source/code/plugin/podinventory_to_mdm.rb index b35f125c2..aa33473f1 100644 --- a/source/code/plugin/podinventory_to_mdm.rb +++ b/source/code/plugin/podinventory_to_mdm.rb @@ -8,7 +8,7 @@ require_relative 'CustomMetricsUtils' -class Inventory2MdmConvertor +class Inventory2MdmConvertor @@node_count_metric_name = 'nodesCount' @@pod_count_metric_name = 'podCount' @@ -77,65 +77,32 @@ class Inventory2MdmConvertor @process_incoming_stream = false def initialize(custom_metrics_azure_regions) + @log_path = '/var/opt/microsoft/docker-cimprov/log/filter_inventory2mdm.log' @log = Logger.new(@log_path, 1, 5000000) @pod_count_hash = {} @no_phase_dim_values_hash = {} - total_pod_count = 0 @pod_count_by_phase = {} @pod_uids = {} - @pod_inventory_records = {} @process_incoming_stream = CustomMetricsUtils.check_custom_metrics_availability(custom_metrics_azure_regions) @log.debug "After check_custom_metrics_availability process_incoming_stream #{@process_incoming_stream}" @log.debug {'Starting filter_inventory2mdm plugin'} end - def process_node_inventory_records(es, batch_time) - if @process_incoming_stream - begin - node_ready_count = 0 - node_not_ready_count = 0 - records = [] - - es.each{|time,record| - begin - node_status = record['DataItems'][0]['Status'] - if node_status.downcase == @@node_status_ready.downcase - node_ready_count = node_ready_count+1 - else - node_not_ready_count = node_not_ready_count + 1 - end - rescue => e + def get_pod_inventory_mdm_records(batch_time) + begin + # generate all possible values of non_phase_dim_values X pod Phases and zero-fill the ones that are not already present + @no_phase_dim_values_hash.each {|key, value| + @@pod_phase_values.each{|phase| + pod_key = [key, phase].join('~~') + if !@pod_count_hash.key?(pod_key) + @pod_count_hash[pod_key] = 0 + #@log.info "Zero filled #{pod_key}" + else + next end } - - ready_record = @@node_inventory_custom_metrics_template % { - timestamp: batch_time, - metricName: @@node_count_metric_name, - statusValue: @@node_status_ready, - node_status_count: node_ready_count - } - records.push(JSON.parse(ready_record)) - - not_ready_record = @@node_inventory_custom_metrics_template % { - timestamp: batch_time, - metricName: @@node_count_metric_name, - statusValue: @@node_status_not_ready, - node_status_count: node_not_ready_count - } - records.push(JSON.parse(not_ready_record)) - rescue Exception => e - @log.info "Error processing node inventory records Exception: #{e.class} Message: #{e.message}" - ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) - return [] - end - return records - else - return [] - end - end - - def get_pod_inventory_mdm_records() - begin + } + records = [] @pod_count_hash.each {|key, value| key_elements = key.split('~~') if key_elements.length != 4 @@ -164,19 +131,22 @@ def get_pod_inventory_mdm_records() ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) return [] end - @log.info "Record Count #{record_count} pod count = #{total_pod_count} Pod Count To Phase #{@pod_count_by_phase} " + @log.info "Pod Count To Phase #{@pod_count_by_phase} " + @log.info "resetting convertor state " + @pod_count_hash = {} + @no_phase_dim_values_hash = {} + @pod_count_by_phase = {} + @pod_uids = {} return records end def process_pod_inventory_record(record) + @log.info "Processing POD Inventory Record" if @process_incoming_stream - record_count = 0 begin records = [] - - record_count += 1 - podUid = record['DataItems'][0]['PodUid'] + podUid = record['DataItems'][0]['PodUid'] if @pod_uids.key?(podUid) #@log.info "pod with #{podUid} already counted" return @@ -201,24 +171,8 @@ def process_pod_inventory_record(record) # group by distinct dimension values pod_key = [podNodeDimValue, podNamespaceDimValue, podControllerNameDimValue, podPhaseDimValue].join('~~') - if @pod_count_by_phase.key?(podPhaseDimValue) - phase_count = @pod_count_by_phase[podPhaseDimValue] - phase_count += 1 - @pod_count_by_phase[podPhaseDimValue] = phase_count - else - @pod_count_by_phase[podPhaseDimValue] = 1 - end - - total_pod_count += 1 - - if @pod_count_hash.key?(pod_key) - pod_count = @pod_count_hash[pod_key] - pod_count = pod_count + 1 - @pod_count_hash[pod_key] = pod_count - else - pod_count = 1 - @pod_count_hash[pod_key] = pod_count - end + @pod_count_by_phase[podPhaseDimValue] = @pod_count_by_phase.key?(podPhaseDimValue) ? @pod_count_by_phase[podPhaseDimValue] + 1 : 1 + @pod_count_hash[pod_key] = @pod_count_hash.key?(pod_key) ? @pod_count_hash[pod_key] + 1 : 1 # Collect all possible combinations of dimension values other than pod phase key_without_phase_dim_value = [podNodeDimValue, podNamespaceDimValue, podControllerNameDimValue].join('~~') @@ -231,7 +185,6 @@ def process_pod_inventory_record(record) @log.info "Error processing pod inventory record Exception: #{e.class} Message: #{e.message}" ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) end - @log.info "Record Count #{record_count} pod count = #{total_pod_count} Pod Count To Phase #{@pod_count_by_phase} " end end end From e1556f7c30183a9442ebb184b19fa279f3e4cde2 Mon Sep 17 00:00:00 2001 From: r-dilip Date: Fri, 17 Jan 2020 15:42:52 -0800 Subject: [PATCH 4/9] Bug fixes 1. cpu percentage 2. bytesize on array. Remove log line --- source/code/plugin/filter_cadvisor2mdm.rb | 1 - source/code/plugin/out_mdm.rb | 1 - source/code/plugin/podinventory_to_mdm.rb | 1 - 3 files changed, 3 deletions(-) diff --git a/source/code/plugin/filter_cadvisor2mdm.rb b/source/code/plugin/filter_cadvisor2mdm.rb index 278ab957f..bfe75d5a3 100644 --- a/source/code/plugin/filter_cadvisor2mdm.rb +++ b/source/code/plugin/filter_cadvisor2mdm.rb @@ -111,7 +111,6 @@ def filter(tag, time, record) metric_value = record['DataItems'][0]['Collections'][0]['Value'] if counter_name.downcase == @@cpu_usage_nano_cores metric_name = @@cpu_usage_milli_cores - metric_value = metric_value/1000000 if @cpu_capacity != 0.0 percentage_metric_value = (metric_value*1000000)*100/@cpu_capacity end diff --git a/source/code/plugin/out_mdm.rb b/source/code/plugin/out_mdm.rb index 7999e1036..308eb6c68 100644 --- a/source/code/plugin/out_mdm.rb +++ b/source/code/plugin/out_mdm.rb @@ -142,7 +142,6 @@ def write(chunk) count = post_body.size while count > 0 current_batch = post_body.first(@@record_batch_size) - @log.info "Current Batch size #{current_batch.bytesize/1024}" count -= current_batch.size send_to_mdm current_batch end diff --git a/source/code/plugin/podinventory_to_mdm.rb b/source/code/plugin/podinventory_to_mdm.rb index aa33473f1..21ef12c34 100644 --- a/source/code/plugin/podinventory_to_mdm.rb +++ b/source/code/plugin/podinventory_to_mdm.rb @@ -141,7 +141,6 @@ def get_pod_inventory_mdm_records(batch_time) end def process_pod_inventory_record(record) - @log.info "Processing POD Inventory Record" if @process_incoming_stream begin records = [] From ca2a0000c5a130f3d1c00b8ebba7945391cd1b11 Mon Sep 17 00:00:00 2001 From: r-dilip Date: Tue, 21 Jan 2020 12:58:31 -0800 Subject: [PATCH 5/9] Fixing metric value in cadvisor2mdm plugin --- source/code/plugin/filter_cadvisor2mdm.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/code/plugin/filter_cadvisor2mdm.rb b/source/code/plugin/filter_cadvisor2mdm.rb index bfe75d5a3..cf63db7fc 100644 --- a/source/code/plugin/filter_cadvisor2mdm.rb +++ b/source/code/plugin/filter_cadvisor2mdm.rb @@ -111,8 +111,9 @@ def filter(tag, time, record) metric_value = record['DataItems'][0]['Collections'][0]['Value'] if counter_name.downcase == @@cpu_usage_nano_cores metric_name = @@cpu_usage_milli_cores + metric_value /= 1000000 #cadvisor record is in nanocores. Convert to mc if @cpu_capacity != 0.0 - percentage_metric_value = (metric_value*1000000)*100/@cpu_capacity + percentage_metric_value = (metric_value)*100/@cpu_capacity end end From 41de1f88c3ddfbd3c2b7a3ea7dfe625e515d1b71 Mon Sep 17 00:00:00 2001 From: r-dilip Date: Wed, 22 Jan 2020 08:58:17 -0800 Subject: [PATCH 6/9] WIP to laptop --- .../code/plugin/CAdvisorMetricsAPIClient.rb | 66 +++++++++++-------- source/code/plugin/filter_cadvisor2mdm.rb | 1 + .../plugin/health/health_monitor_utils.rb | 39 +---------- source/code/plugin/kubelet_utils.rb | 35 +++++----- 4 files changed, 58 insertions(+), 83 deletions(-) diff --git a/source/code/plugin/CAdvisorMetricsAPIClient.rb b/source/code/plugin/CAdvisorMetricsAPIClient.rb index 8b0105a6f..372471102 100644 --- a/source/code/plugin/CAdvisorMetricsAPIClient.rb +++ b/source/code/plugin/CAdvisorMetricsAPIClient.rb @@ -55,7 +55,9 @@ class CAdvisorMetricsAPIClient # Keeping track of containers so that can delete the container from the container cpu cache when the container is deleted # as a part of the cleanup routine @@winContainerIdCache = [] - + #cadvisor ports + @@CADVISOR_SECURE_PORT = "10250" + @@CADVISOR_NON_SECURE_PORT = "10255" def initialize end @@ -65,13 +67,8 @@ def getSummaryStatsFromCAdvisor(winNode) response = nil @Log.info "Getting CAdvisor Uri" begin - cAdvisorSecurePort = false - # Check to see if omsagent needs to use 10255(insecure) port or 10250(secure) port - if !@cAdvisorMetricsSecurePort.nil? && @cAdvisorMetricsSecurePort == "true" - cAdvisorSecurePort = true - end - cAdvisorUri = getCAdvisorUri(winNode, cAdvisorSecurePort) + cAdvisorUri = getCAdvisorUri(winNode) bearerToken = File.read("/var/run/secrets/kubernetes.io/serviceaccount/token") @Log.info "cAdvisorUri: #{cAdvisorUri}" @@ -104,36 +101,46 @@ def getSummaryStatsFromCAdvisor(winNode) return response end - def getCAdvisorUri(winNode, cAdvisorSecurePort) - begin + def getBaseCAdvisorUri(winNode: nil) + + cAdvisorSecurePort = false + # Check to see if omsagent needs to use 10255(insecure) port or 10250(secure) port + if !@cAdvisorMetricsSecurePort.nil? && @cAdvisorMetricsSecurePort == "true" + cAdvisorSecurePort = true + end + if !!cAdvisorSecurePort == true - defaultHost = "https://localhost:10250" + defaultHost = "https://localhost:#{@@CADVISOR_SECURE_PORT}" else - defaultHost = "http://localhost:10255" + defaultHost = "http://localhost:#{@@CADVISOR_NON_SECURE_PORT}" end - relativeUri = "/stats/summary" if !winNode.nil? - nodeIP = winNode["InternalIP"] + nodeIP = winNode["InternalIP"] else - nodeIP = ENV["NODE_IP"] + nodeIP = ENV["NODE_IP"] end + if !nodeIP.nil? - @Log.info("Using #{nodeIP + relativeUri} for CAdvisor Uri") - if !!cAdvisorSecurePort == true - return "https://#{nodeIP}:10250" + relativeUri - else - return "http://#{nodeIP}:10255" + relativeUri - end + @Log.info("Using #{nodeIP} for CAdvisor Base Uri") + if !!cAdvisorSecurePort == true + return "https://#{nodeIP}:#{@@CADVISOR_SECURE_PORT}" + else + return "http://#{nodeIP}:#{@@CADVISOR_NON_SECURE_PORT}" + end else - @Log.warn ("NODE_IP environment variable not set. Using default as : #{defaultHost + relativeUri} ") - if !winNode.nil? - return nil - else - return defaultHost + relativeUri - end + @Log.warn ("NODE_IP environment variable not set. Using default as : #{defaultHost} ") + if !winNode.nil? + return nil + else + return defaultHost + end end - end + end + + def getCAdvisorUri(winNode: nil, relativeUri) + baseUri = getBaseCAdvisorUri(winNode) + return baseUri + relativeUri end def getMetrics(winNode: nil, metricTime: Time.now.utc.iso8601) @@ -696,5 +703,10 @@ def getContainerStartTimeMetricItems(metricJSON, hostName, metricNametoReturn, m end return metricItems end + + def getResponse(winNode: nil, relativeUri) + cadvisorUri = getCAdvisorUri(winNode, relativeUri) + + end end end diff --git a/source/code/plugin/filter_cadvisor2mdm.rb b/source/code/plugin/filter_cadvisor2mdm.rb index cf63db7fc..1eed183e9 100644 --- a/source/code/plugin/filter_cadvisor2mdm.rb +++ b/source/code/plugin/filter_cadvisor2mdm.rb @@ -112,6 +112,7 @@ def filter(tag, time, record) if counter_name.downcase == @@cpu_usage_nano_cores metric_name = @@cpu_usage_milli_cores metric_value /= 1000000 #cadvisor record is in nanocores. Convert to mc + @log.info "Metric_value: #{metric_value} CPU Capacity #{@cpu_capacity}" if @cpu_capacity != 0.0 percentage_metric_value = (metric_value)*100/@cpu_capacity end diff --git a/source/code/plugin/health/health_monitor_utils.rb b/source/code/plugin/health/health_monitor_utils.rb index 13d1416b1..f65bd4a63 100644 --- a/source/code/plugin/health/health_monitor_utils.rb +++ b/source/code/plugin/health/health_monitor_utils.rb @@ -3,6 +3,7 @@ require 'digest' require_relative 'health_model_constants' require 'yajl/json_gem' +require_relative '../kubelet_utils' module HealthModel # static class that provides a bunch of utility methods @@ -263,49 +264,13 @@ def get_monitor_instance_id(monitor_id, args = []) end def ensure_cpu_memory_capacity_set(log, cpu_capacity, memory_capacity, hostname) - log.info "ensure_cpu_memory_capacity_set cpu_capacity #{cpu_capacity} memory_capacity #{memory_capacity}" if cpu_capacity != 1.0 && memory_capacity != 1.0 log.info "CPU And Memory Capacity are already set" return [cpu_capacity, memory_capacity] end - log.info "CPU and Memory Capacity Not set" - begin - @@nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes").body) - rescue Exception => e - log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} " - ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) - end - if !@@nodeInventory.nil? - cpu_capacity_json = KubernetesApiClient.parseNodeLimits(@@nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores") - if !cpu_capacity_json.nil? - cpu_capacity_json.each do |cpu_info_node| - if !cpu_info_node['DataItems'][0]['Host'].nil? && cpu_info_node['DataItems'][0]['Host'] == hostname - if !cpu_info_node['DataItems'][0]['Collections'][0]['Value'].nil? - cpu_capacity = cpu_info_node['DataItems'][0]['Collections'][0]['Value'] - end - end - end - log.info "CPU Limit #{cpu_capacity}" - else - log.info "Error getting cpu_capacity" - end - memory_capacity_json = KubernetesApiClient.parseNodeLimits(@@nodeInventory, "capacity", "memory", "memoryCapacityBytes") - if !memory_capacity_json.nil? - memory_capacity_json.each do |memory_info_node| - if !memory_info_node['DataItems'][0]['Host'].nil? && memory_info_node['DataItems'][0]['Host'] == hostname - if !memory_info_node['DataItems'][0]['Collections'][0]['Value'].nil? - memory_capacity = memory_info_node['DataItems'][0]['Collections'][0]['Value'] - end - end - end - log.info "memory Limit #{memory_capacity}" - else - log.info "Error getting memory_capacity" - end - return [cpu_capacity, memory_capacity] - end + return KubeletUtils.get_node_capacity end def build_metrics_hash(metrics_to_collect) diff --git a/source/code/plugin/kubelet_utils.rb b/source/code/plugin/kubelet_utils.rb index 861ecfcf8..2b1ef8e3e 100644 --- a/source/code/plugin/kubelet_utils.rb +++ b/source/code/plugin/kubelet_utils.rb @@ -2,30 +2,27 @@ #!/usr/local/bin/ruby # frozen_string_literal: true +require_relative 'CAdvisorMetricsAPIClient' + class KubeletUtils class << self def get_node_capacity - $log.info "Getting Node capacity from KubeletUtils" - default_host = "http://localhost:10255" + base_uri = CAdvisorMetricsAPIClient.getBaseCAdvisorUri(winNode: nil) relative_uri = "/spec/" - node_ip = ENV["NODE_IP"] - - if !node_ip.nil? - $log.info("Using #{node_ip + relative_uri} for CAdvisor Uri in Kubelet Utils") - cadvisor_uri = "http://#{node_ip}:10255#{relative_uri}" - uri = URI.parse(cadvisor_uri) - cpu_capacity = 1.0 - memory_capacity = 1.0 - Net::HTTP.start(uri.host, uri.port, :use_ssl => false, :open_timeout => 20, :read_timeout => 40 ) do |http| - cadvisor_api_request = Net::HTTP::Get.new(uri.request_uri) - response = http.request(cadvisor_api_request) - if !response.nil? && !response.body.nil? - cpu_capacity = JSON.parse(response.body)["num_cores"].nil? ? 1.0 : (JSON.parse(response.body)["num_cores"] * 1000.0) - memory_capacity = JSON.parse(response.body)["memory_capacity"].nil? ? 1.0 : JSON.parse(response.body)["memory_capacity"].to_f - $log.info "CPU = #{cpu_capacity}mc Memory = #{memory_capacity/1024/1024}MB" - return [cpu_capacity, memory_capacity] - end + cadvisor_uri = "#{base_uri}#{relative_uri}" + $log.info("Using #{cadvisor_uri} for CAdvisor Uri in Kubelet Utils") + uri = URI.parse(cadvisor_uri) + cpu_capacity = 1.0 + memory_capacity = 1.0 + Net::HTTP.start(uri.host, uri.port, :use_ssl => false, :open_timeout => 20, :read_timeout => 40 ) do |http| + cadvisor_api_request = Net::HTTP::Get.new(uri.request_uri) + response = http.request(cadvisor_api_request) + if !response.nil? && !response.body.nil? + cpu_capacity = JSON.parse(response.body)["num_cores"].nil? ? 1.0 : (JSON.parse(response.body)["num_cores"] * 1000.0) + memory_capacity = JSON.parse(response.body)["memory_capacity"].nil? ? 1.0 : JSON.parse(response.body)["memory_capacity"].to_f + $log.info "CPU = #{cpu_capacity}mc Memory = #{memory_capacity/1024/1024}MB" + return [cpu_capacity, memory_capacity] end end end From d02badbfc6f030e9a6e1a5e7ac89e95fb4b7b1c5 Mon Sep 17 00:00:00 2001 From: Dilip Raghunathan Date: Wed, 22 Jan 2020 12:49:16 -0800 Subject: [PATCH 7/9] Working version with cadvisor changes --- .../code/plugin/CAdvisorMetricsAPIClient.rb | 101 ++++++++++-------- source/code/plugin/kubelet_utils.rb | 23 ++-- 2 files changed, 62 insertions(+), 62 deletions(-) diff --git a/source/code/plugin/CAdvisorMetricsAPIClient.rb b/source/code/plugin/CAdvisorMetricsAPIClient.rb index 372471102..5eb4ed884 100644 --- a/source/code/plugin/CAdvisorMetricsAPIClient.rb +++ b/source/code/plugin/CAdvisorMetricsAPIClient.rb @@ -63,51 +63,17 @@ def initialize class << self def getSummaryStatsFromCAdvisor(winNode) - headers = {} - response = nil - @Log.info "Getting CAdvisor Uri" - begin - - cAdvisorUri = getCAdvisorUri(winNode) - bearerToken = File.read("/var/run/secrets/kubernetes.io/serviceaccount/token") - @Log.info "cAdvisorUri: #{cAdvisorUri}" - - if !cAdvisorUri.nil? - uri = URI.parse(cAdvisorUri) - if !!cAdvisorSecurePort == true - Net::HTTP.start(uri.host, uri.port, - :use_ssl => true, :open_timeout => 20, :read_timeout => 40, - :ca_file => "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", - :verify_mode => OpenSSL::SSL::VERIFY_NONE) do |http| - cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri) - cAdvisorApiRequest["Authorization"] = "Bearer #{bearerToken}" - response = http.request(cAdvisorApiRequest) - @Log.info "Got response code #{response.code} from #{uri.request_uri}" - end - else - Net::HTTP.start(uri.host, uri.port, :use_ssl => false, :open_timeout => 20, :read_timeout => 40) do |http| - cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri) - response = http.request(cAdvisorApiRequest) - @Log.info "Got response code #{response.code} from #{uri.request_uri}" - end - end - end - rescue => error - @Log.warn("CAdvisor api request failed: #{error}") - telemetryProps = {} - telemetryProps["Computer"] = winNode["Hostname"] - ApplicationInsightsUtility.sendExceptionTelemetry(error, telemetryProps) - end - return response + relativeUri = "/stats/summary" + return getResponse(winNode, relativeUri) end - def getBaseCAdvisorUri(winNode: nil) + def getNodeCapacityFromCAdvisor(winNode: nil) + relativeUri = "/spec/" + return getResponse(winNode, relativeUri) + end - cAdvisorSecurePort = false - # Check to see if omsagent needs to use 10255(insecure) port or 10250(secure) port - if !@cAdvisorMetricsSecurePort.nil? && @cAdvisorMetricsSecurePort == "true" - cAdvisorSecurePort = true - end + def getBaseCAdvisorUri(winNode) + cAdvisorSecurePort = isCAdvisorOnSecurePort() if !!cAdvisorSecurePort == true defaultHost = "https://localhost:#{@@CADVISOR_SECURE_PORT}" @@ -122,14 +88,14 @@ def getBaseCAdvisorUri(winNode: nil) end if !nodeIP.nil? - @Log.info("Using #{nodeIP} for CAdvisor Base Uri") + @Log.info("Using #{nodeIP} for CAdvisor Host") if !!cAdvisorSecurePort == true return "https://#{nodeIP}:#{@@CADVISOR_SECURE_PORT}" else return "http://#{nodeIP}:#{@@CADVISOR_NON_SECURE_PORT}" end else - @Log.warn ("NODE_IP environment variable not set. Using default as : #{defaultHost} ") + @Log.warn ("NODE_IP environment variable not set. Using default as : #{defaultHost}") if !winNode.nil? return nil else @@ -138,7 +104,7 @@ def getBaseCAdvisorUri(winNode: nil) end end - def getCAdvisorUri(winNode: nil, relativeUri) + def getCAdvisorUri(winNode, relativeUri) baseUri = getBaseCAdvisorUri(winNode) return baseUri + relativeUri end @@ -704,9 +670,50 @@ def getContainerStartTimeMetricItems(metricJSON, hostName, metricNametoReturn, m return metricItems end - def getResponse(winNode: nil, relativeUri) - cadvisorUri = getCAdvisorUri(winNode, relativeUri) + def getResponse(winNode, relativeUri) + response = nil + @Log.info "Getting CAdvisor Uri Response" + bearerToken = File.read("/var/run/secrets/kubernetes.io/serviceaccount/token") + begin + cAdvisorUri = getCAdvisorUri(winNode, relativeUri) + @Log.info "cAdvisorUri: #{cAdvisorUri}" + + if !cAdvisorUri.nil? + uri = URI.parse(cAdvisorUri) + if isCAdvisorOnSecurePort() + Net::HTTP.start(uri.host, uri.port, + :use_ssl => true, :open_timeout => 20, :read_timeout => 40, + :ca_file => "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", + :verify_mode => OpenSSL::SSL::VERIFY_NONE) do |http| + cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri) + cAdvisorApiRequest["Authorization"] = "Bearer #{bearerToken}" + response = http.request(cAdvisorApiRequest) + @Log.info "Got response code #{response.code} from #{uri.request_uri}" + end + else + Net::HTTP.start(uri.host, uri.port, :use_ssl => false, :open_timeout => 20, :read_timeout => 40) do |http| + cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri) + response = http.request(cAdvisorApiRequest) + @Log.info "Got response code #{response.code} from #{uri.request_uri}" + end + end + end + rescue => error + @Log.warn("CAdvisor api request failed: #{error}") + telemetryProps = {} + telemetryProps["Computer"] = winNode["Hostname"] + ApplicationInsightsUtility.sendExceptionTelemetry(error, telemetryProps) + end + return response + end + def isCAdvisorOnSecurePort + cAdvisorSecurePort = false + # Check to see whether omsagent needs to use 10255(insecure) port or 10250(secure) port + if !@cAdvisorMetricsSecurePort.nil? && @cAdvisorMetricsSecurePort == "true" + cAdvisorSecurePort = true + end + return cAdvisorSecurePort end end end diff --git a/source/code/plugin/kubelet_utils.rb b/source/code/plugin/kubelet_utils.rb index 2b1ef8e3e..6d97e30a9 100644 --- a/source/code/plugin/kubelet_utils.rb +++ b/source/code/plugin/kubelet_utils.rb @@ -7,23 +7,16 @@ class KubeletUtils class << self def get_node_capacity - base_uri = CAdvisorMetricsAPIClient.getBaseCAdvisorUri(winNode: nil) - relative_uri = "/spec/" - - cadvisor_uri = "#{base_uri}#{relative_uri}" - $log.info("Using #{cadvisor_uri} for CAdvisor Uri in Kubelet Utils") - uri = URI.parse(cadvisor_uri) + cpu_capacity = 1.0 memory_capacity = 1.0 - Net::HTTP.start(uri.host, uri.port, :use_ssl => false, :open_timeout => 20, :read_timeout => 40 ) do |http| - cadvisor_api_request = Net::HTTP::Get.new(uri.request_uri) - response = http.request(cadvisor_api_request) - if !response.nil? && !response.body.nil? - cpu_capacity = JSON.parse(response.body)["num_cores"].nil? ? 1.0 : (JSON.parse(response.body)["num_cores"] * 1000.0) - memory_capacity = JSON.parse(response.body)["memory_capacity"].nil? ? 1.0 : JSON.parse(response.body)["memory_capacity"].to_f - $log.info "CPU = #{cpu_capacity}mc Memory = #{memory_capacity/1024/1024}MB" - return [cpu_capacity, memory_capacity] - end + + response = CAdvisorMetricsAPIClient.getNodeCapacityFromCAdvisor(winNode: nil) + if !response.nil? && !response.body.nil? + cpu_capacity = JSON.parse(response.body)["num_cores"].nil? ? 1.0 : (JSON.parse(response.body)["num_cores"] * 1000.0) + memory_capacity = JSON.parse(response.body)["memory_capacity"].nil? ? 1.0 : JSON.parse(response.body)["memory_capacity"].to_f + $log.info "CPU = #{cpu_capacity}mc Memory = #{memory_capacity/1024/1024}MB" + return [cpu_capacity, memory_capacity] end end end From 283bb93242913b84394eb73491443e4a129cc7ba Mon Sep 17 00:00:00 2001 From: r-dilip Date: Wed, 22 Jan 2020 12:51:11 -0800 Subject: [PATCH 8/9] Fix Health cpu usage --- source/code/plugin/filter_cadvisor_health_node.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/code/plugin/filter_cadvisor_health_node.rb b/source/code/plugin/filter_cadvisor_health_node.rb index c6280db60..4106b4d82 100644 --- a/source/code/plugin/filter_cadvisor_health_node.rb +++ b/source/code/plugin/filter_cadvisor_health_node.rb @@ -131,13 +131,13 @@ def process_node_cpu_record(record, metric_value) else instance_name = record['DataItems'][0]['InstanceName'] #@log.info "CPU capacity #{@cpu_capacity}" - + metric_value /= 1000000 percent = (metric_value.to_f/@cpu_capacity*100).round(2) #@log.debug "Percentage of CPU limit: #{percent}" state = HealthMonitorUtils.compute_percentage_state(percent, @provider.get_config(MonitorId::NODE_CPU_MONITOR_ID)) #@log.debug "Computed State : #{state}" timestamp = record['DataItems'][0]['Timestamp'] - health_monitor_record = {"timestamp" => timestamp, "state" => state, "details" => {"cpuUsageMillicores" => metric_value/1000000.to_f, "cpuUtilizationPercentage" => percent}} + health_monitor_record = {"timestamp" => timestamp, "state" => state, "details" => {"cpuUsageMillicores" => metric_value, "cpuUtilizationPercentage" => percent}} monitor_instance_id = HealthMonitorUtils.get_monitor_instance_id(monitor_id, [@@clusterId, @@hostName]) # temp = record.nil? ? "Nil" : record["MonitorInstanceId"] From fc9c222efdc61053e4f4968477a9c0011d4cf180 Mon Sep 17 00:00:00 2001 From: r-dilip Date: Tue, 28 Jan 2020 15:55:21 -0800 Subject: [PATCH 9/9] Added uri for cadvisor failure --- source/code/plugin/CAdvisorMetricsAPIClient.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/code/plugin/CAdvisorMetricsAPIClient.rb b/source/code/plugin/CAdvisorMetricsAPIClient.rb index 5eb4ed884..54e7e5fd9 100644 --- a/source/code/plugin/CAdvisorMetricsAPIClient.rb +++ b/source/code/plugin/CAdvisorMetricsAPIClient.rb @@ -699,7 +699,7 @@ def getResponse(winNode, relativeUri) end end rescue => error - @Log.warn("CAdvisor api request failed: #{error}") + @Log.warn("CAdvisor api request for #{cAdvisorUri} failed: #{error}") telemetryProps = {} telemetryProps["Computer"] = winNode["Hostname"] ApplicationInsightsUtility.sendExceptionTelemetry(error, telemetryProps)