From 9063b6bc481a2aefdeceec35f12a57fe326a2e2c Mon Sep 17 00:00:00 2001 From: rashmy Date: Wed, 13 Nov 2019 17:26:54 -0800 Subject: [PATCH 01/13] changes for http connection close --- .../code/plugin/CAdvisorMetricsAPIClient.rb | 13 +++++-------- source/code/plugin/KubernetesApiClient.rb | 19 ++++++------------- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/source/code/plugin/CAdvisorMetricsAPIClient.rb b/source/code/plugin/CAdvisorMetricsAPIClient.rb index cb10992d5..7b13fb736 100644 --- a/source/code/plugin/CAdvisorMetricsAPIClient.rb +++ b/source/code/plugin/CAdvisorMetricsAPIClient.rb @@ -65,14 +65,11 @@ def getSummaryStatsFromCAdvisor(winNode) cAdvisorUri = getCAdvisorUri(winNode) if !cAdvisorUri.nil? uri = URI.parse(cAdvisorUri) - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = false - http.open_timeout = 20 - http.read_timeout = 40 - - cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri) - response = http.request(cAdvisorApiRequest) - @Log.info "Got response code #{response.code} from #{uri.request_uri}" + Net::HTTP.start(uri.host, uri.port, :use_ssl => false, :open_timeout => 20, :read_timeout => 40 ) do |http| + cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri) + response = http.request(cAdvisorApiRequest) + @Log.info "Got response code #{response.code} from #{uri.request_uri}" + end end rescue => error @Log.warn("CAdvisor api request failed: #{error}") diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index 0cff50752..040373afe 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -40,23 +40,16 @@ def getKubeResourceInfo(resource, api_group: nil) resourceUri = getResourceUri(resource, api_group) if !resourceUri.nil? uri = URI.parse(resourceUri) - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = true - http.open_timeout = 20 - http.read_timeout = 40 - if !File.exist?(@@CaFile) raise "#{@@CaFile} doesnt exist" else - http.ca_file = @@CaFile if File.exist?(@@CaFile) + Net::HTTP.start(uri.host, uri.port, :use_ssl => true, :ca_file => @@CaFile, :verify_mode => OpenSSL::SSL::VERIFY_PEER ) do |http| + kubeApiRequest = Net::HTTP::Get.new(uri.request_uri) + kubeApiRequest["Authorization"] = "Bearer " + getTokenStr + @Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}" + response = http.request(kubeApiRequest) + @Log.info "KubernetesAPIClient::getKubeResourceInfo : Got response of #{response.code} for #{uri.request_uri} @ #{Time.now.utc.iso8601}" end - http.verify_mode = OpenSSL::SSL::VERIFY_PEER - - kubeApiRequest = Net::HTTP::Get.new(uri.request_uri) - kubeApiRequest["Authorization"] = "Bearer " + getTokenStr - @Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}" - response = http.request(kubeApiRequest) - @Log.info "KubernetesAPIClient::getKubeResourceInfo : Got response of #{response.code} for #{uri.request_uri} @ #{Time.now.utc.iso8601}" end rescue => error @Log.warn("kubernetes api request failed: #{error} for #{resource} @ #{Time.now.utc.iso8601}") From 050d405093b9a78b6d8de57f95200e84e3c9ffed Mon Sep 17 00:00:00 2001 From: rashmy Date: Wed, 13 Nov 2019 17:54:22 -0800 Subject: [PATCH 02/13] close socket in ensure --- source/code/plugin/DockerApiClient.rb | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/code/plugin/DockerApiClient.rb b/source/code/plugin/DockerApiClient.rb index fb2148ec9..ab9d67d43 100644 --- a/source/code/plugin/DockerApiClient.rb +++ b/source/code/plugin/DockerApiClient.rb @@ -40,7 +40,6 @@ def getResponse(request, isMultiJson, isVersion) end break if (isVersion) ? (responseChunk.length < @@ChunkSize) : (responseChunk.end_with? "0\r\n\r\n") end - socket.close return (isTimeOut) ? nil : parseResponse(dockerResponse, isMultiJson) rescue => errorStr $log.warn("Socket call failed for request: #{request} error: #{errorStr} , isMultiJson: #{isMultiJson} @ #{Time.now.utc.iso8601}") @@ -49,6 +48,10 @@ def getResponse(request, isMultiJson, isVersion) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end return nil + ensure + ## REMOVE LOG BEFORE MERGE + $log.warn "Closing docker socket connection" + socket.close end end From 16aa96beea658bf14b8b72f34eb7ba7828073bab Mon Sep 17 00:00:00 2001 From: rashmy Date: Thu, 14 Nov 2019 10:39:40 -0800 Subject: [PATCH 03/13] adding nil check --- source/code/plugin/DockerApiClient.rb | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/code/plugin/DockerApiClient.rb b/source/code/plugin/DockerApiClient.rb index ab9d67d43..cc0d0e722 100644 --- a/source/code/plugin/DockerApiClient.rb +++ b/source/code/plugin/DockerApiClient.rb @@ -3,7 +3,7 @@ class DockerApiClient require "socket" - require 'yajl/json_gem' + require "yajl/json_gem" require "timeout" require_relative "omslog" require_relative "DockerApiRestHelper" @@ -51,7 +51,9 @@ def getResponse(request, isMultiJson, isVersion) ensure ## REMOVE LOG BEFORE MERGE $log.warn "Closing docker socket connection" - socket.close + if !socket.nil? + socket.close + end end end From 9a8f0f8b58d28aee68cf680bebf8094c8e1b8ea6 Mon Sep 17 00:00:00 2001 From: bragi92 Date: Thu, 14 Nov 2019 10:42:50 -0800 Subject: [PATCH 04/13] Update MDM region list to include francecentral, japaneast and australiaeast --- installer/conf/container.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/installer/conf/container.conf b/installer/conf/container.conf index f9540bde8..696ffdb6b 100755 --- a/installer/conf/container.conf +++ b/installer/conf/container.conf @@ -36,7 +36,7 @@ #custom_metrics_mdm filter plugin type filter_cadvisor2mdm - custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral + custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast metrics_to_collect cpuUsageNanoCores,memoryWorkingSetBytes,memoryRssBytes log_level info From 597b2fb3dd9a4e9a7f4f4ec8cef3a855526abbe0 Mon Sep 17 00:00:00 2001 From: bragi92 Date: Thu, 14 Nov 2019 10:48:48 -0800 Subject: [PATCH 05/13] Update MDM region list to include francecentral, japaneast and australiaeast --- installer/conf/kube.conf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/installer/conf/kube.conf b/installer/conf/kube.conf index 40f4ac880..49d0bf62e 100644 --- a/installer/conf/kube.conf +++ b/installer/conf/kube.conf @@ -70,14 +70,14 @@ type filter_inventory2mdm - custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral + custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast log_level info #custom_metrics_mdm filter plugin for perf data from windows nodes type filter_cadvisor2mdm - custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral + custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast metrics_to_collect cpuUsageNanoCores,memoryWorkingSetBytes log_level info @@ -247,4 +247,4 @@ retry_limit 10 retry_wait 30s max_retry_wait 9m - \ No newline at end of file + From 258818efba56728ddf6e93253df7a107dba8d119 Mon Sep 17 00:00:00 2001 From: rashmy Date: Thu, 14 Nov 2019 14:14:02 -0800 Subject: [PATCH 06/13] adding missing end --- source/code/plugin/KubernetesApiClient.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index 040373afe..8adf3f6b7 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -49,6 +49,7 @@ def getKubeResourceInfo(resource, api_group: nil) @Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}" response = http.request(kubeApiRequest) @Log.info "KubernetesAPIClient::getKubeResourceInfo : Got response of #{response.code} for #{uri.request_uri} @ #{Time.now.utc.iso8601}" + end end end rescue => error From cd1a37b72b1911eb657012668319763e0b3770da Mon Sep 17 00:00:00 2001 From: Dilip Raghunathan Date: Thu, 14 Nov 2019 18:18:54 -0800 Subject: [PATCH 07/13] Send telemetry when there is error in calculation of state in percentage aggregation, and send state as unknown (#300) --- .../code/plugin/health/aggregate_monitor.rb | 38 ++++++++++++++++++- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/source/code/plugin/health/aggregate_monitor.rb b/source/code/plugin/health/aggregate_monitor.rb index 794f716ce..10dbdc705 100644 --- a/source/code/plugin/health/aggregate_monitor.rb +++ b/source/code/plugin/health/aggregate_monitor.rb @@ -3,6 +3,12 @@ require_relative 'health_model_constants' require 'json' +# Require only when running inside container. +# otherwise unit tests will fail due to ApplicationInsightsUtility dependency on base omsagent ruby files. If you have your dev machine starting with omsagent-rs, then GOOD LUCK! +if Socket.gethostname.start_with?('omsagent-rs') + require_relative '../ApplicationInsightsUtility' +end + module HealthModel class AggregateMonitor attr_accessor :monitor_id, :monitor_instance_id, :state, :transition_date_time, :aggregation_algorithm, :aggregation_algorithm_params, :labels, :is_aggregate_monitor, :details @@ -16,6 +22,8 @@ class AggregateMonitor MonitorState::NONE => 5 } + @@telemetry_sent_hash = {} + # constructor def initialize( monitor_id, @@ -127,17 +135,43 @@ def calculate_percentage_state(monitor_set) #sort #TODO: What if sorted_filtered is empty? is that even possible? + log = HealthMonitorHelpers.get_log_handle sorted_filtered = sort_filter_member_monitors(monitor_set) state_threshold = @aggregation_algorithm_params['state_threshold'].to_f - size = sorted_filtered.size + if sorted_filtered.nil? + size = 0 + else + size = sorted_filtered.size + end + if size == 1 @state = sorted_filtered[0].state else count = ((state_threshold*size)/100).ceil index = size - count - @state = sorted_filtered[index].state + if sorted_filtered.nil? || sorted_filtered[index].nil? + @state = HealthMonitorStates::UNKNOWN + if !@@telemetry_sent_hash.key?(@monitor_instance_id) + log.debug "Adding to telemetry sent hash #{@monitor_instance_id}" + @@telemetry_sent_hash[@monitor_instance_id] = true + log.info "Index: #{index} size: #{size} Count: #{count}" + custom_error_event_map = {} + custom_error_event_map["count"] = count + custom_error_event_map["index"] = index + custom_error_event_map["size"] = size + if !sorted_filtered.nil? + sorted_filtered.each_index{|i| + custom_error_event_map[i] = sorted_filtered[i].state + } + end + ApplicationInsightsUtility.sendCustomEvent("PercentageStateCalculationErrorEvent", custom_error_event_map) + end + else + @state = sorted_filtered[index].state + end + @state end end From 6c705b2291236209b91e0b16701295866e169a64 Mon Sep 17 00:00:00 2001 From: rashmy Date: Fri, 15 Nov 2019 16:38:27 -0800 Subject: [PATCH 08/13] changes for chunking --- source/code/plugin/in_kube_podinventory.rb | 157 +++++++++++++-------- 1 file changed, 96 insertions(+), 61 deletions(-) diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index e912ea6ef..d040b8cba 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -9,11 +9,16 @@ class Kube_PodInventory_Input < Input @@hostName = (OMS::Common.get_hostname) @@kubeperfTag = "oms.api.KubePerf" @@kubeservicesTag = "oms.containerinsights.KubeServices" + @PODS_CHUNK_SIZE = "1500" + @podCount = 0 + @controllerSet = Set.new [] + @winContainerCount = 0 + @controllerData = {} def initialize super require "yaml" - require 'yajl/json_gem' + require "yajl/json_gem" require "set" require "time" @@ -50,37 +55,88 @@ def shutdown end end - def enumerate(podList = nil) - podInventory = podList - currentTime = Time.now - $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") - podInfo = KubernetesApiClient.getKubeResourceInfo("pods") - $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") - - if !podInfo.nil? - podInventory = JSON.parse(podInfo.body) - end - + def processPodChunks(podInventory, serviceList) begin if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) batchTime = currentTime.utc.iso8601 - #get pod inventory & services - $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") - serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body) - $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") - parse_and_emit_records(podInventory, serviceList, batchTime ) + parse_and_emit_records(podInventory, serviceList, batchTime) else $log.warn "Received empty podInventory" end podInfo = nil podInventory = nil rescue => errorStr - $log.warn "Failed in enumerate pod inventory: #{errorStr}" + $log.warn "Failed in process pod chunks: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end + def parsePodsJsonAndProcess(podInfo, serviceList) + if !podInfo.nil? + # podInventory = JSON.parse(podInfo.body) + podInventory = Yajl::Parser.parse(StringIO.new(podInfo.body)) + end + if (!podInventory.nil? && !podInventory["metadata"].nil?) + continuationToken = podInventory["metadata"]["continue"] + end + processPodChunks(podInventory, serviceList) + return continuationToken + end + + def enumerate(podList = nil) + podInventory = podList + currentTime = Time.now + telemetryFlush = false + @podCount = 0 + @controllerSet = Set.new [] + @winContainerCount = 0 + @controllerData = {} + + # Get services first so that we dont need to make a call for very chunk + $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") + serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body) + $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") + + # Initializing continuation token to nil + continuationToken = nil + $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") + podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}") + $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") + + continuationToken = parsePodsJsonAndProcess(podInfo, serviceList) + + #If we receive a continuation token, make calls, process and flush data until we have processed all data + while (!continuationToken.nil? && !continuationToken.empty?) + $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") + podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") + $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") + parsePodsJsonAndProcess(podInfo, serviceList) + end + + # Adding telemetry to send pod telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 5) + telemetryFlush = true + end + + # Flush AppInsights telemetry once all the processing is done + if telemetryFlush == true + telemetryProperties = {} + telemetryProperties["Computer"] = @@hostName + ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) + ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {}) + telemetryProperties["ControllerData"] = @controllerData.to_json + ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties) + if @winContainerCount > 0 + telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount + ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) + end + @@podTelemetryTimeTracker = DateTime.now.to_time.to_i + end + end + def populateWindowsContainerInventoryRecord(container, record, containerEnvVariableHash, batchTime) begin containerInventoryRecord = {} @@ -193,15 +249,12 @@ def getContainerEnvironmentVariables(pod, clusterCollectEnvironmentVar) end end - def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8601) + def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8601) currentTime = Time.now emitTime = currentTime.to_f #batchTime = currentTime.utc.iso8601 eventStream = MultiEventStream.new - controllerSet = Set.new [] - controllerData = {} - telemetryFlush = false - winContainerCount = 0 + begin #begin block start # Getting windows nodes from kubeapi winNodes = KubernetesApiClient.getWindowsNodesArray @@ -284,24 +337,17 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8 record["ClusterId"] = KubernetesApiClient.getClusterId record["ClusterName"] = KubernetesApiClient.getClusterName record["ServiceName"] = getServiceNameFromLabels(items["metadata"]["namespace"], items["metadata"]["labels"], serviceList) - # Adding telemetry to send pod telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference / 60 - if (timeDifferenceInMinutes >= 5) - telemetryFlush = true - end + if !items["metadata"]["ownerReferences"].nil? record["ControllerKind"] = items["metadata"]["ownerReferences"][0]["kind"] record["ControllerName"] = items["metadata"]["ownerReferences"][0]["name"] - if telemetryFlush == true - controllerSet.add(record["ControllerKind"] + record["ControllerName"]) - #Adding controller kind to telemetry ro information about customer workload - if (controllerData[record["ControllerKind"]].nil?) - controllerData[record["ControllerKind"]] = 1 - else - controllerValue = controllerData[record["ControllerKind"]] - controllerData[record["ControllerKind"]] += 1 - end + @controllerSet.add(record["ControllerKind"] + record["ControllerName"]) + #Adding controller kind to telemetry ro information about customer workload + if (@controllerData[record["ControllerKind"]].nil?) + @controllerData[record["ControllerKind"]] = 1 + else + controllerValue = @controllerData[record["ControllerKind"]] + @controllerData[record["ControllerKind"]] += 1 end end podRestartCount = 0 @@ -419,7 +465,7 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8 end end # Send container inventory records for containers on windows nodes - winContainerCount += containerInventoryRecords.length + @winContainerCount += containerInventoryRecords.length containerInventoryRecords.each do |cirecord| if !cirecord.nil? ciwrapper = { @@ -436,25 +482,24 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8 router.emit_stream(@@MDMKubePodInventoryTag, eventStream) if eventStream #:optimize:kubeperf merge begin - #if(!podInventory.empty?) + #if(!podInventory.empty?) containerMetricDataItems = [] #hostName = (OMS::Common.get_hostname) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "cpu","cpuRequestNanoCores", batchTime)) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "memory","memoryRequestBytes", batchTime)) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "cpu","cpuLimitNanoCores", batchTime)) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "memory","memoryLimitBytes", batchTime)) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "cpu", "cpuRequestNanoCores", batchTime)) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "memory", "memoryRequestBytes", batchTime)) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "cpu", "cpuLimitNanoCores", batchTime)) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "memory", "memoryLimitBytes", batchTime)) kubePerfEventStream = MultiEventStream.new containerMetricDataItems.each do |record| - record['DataType'] = "LINUX_PERF_BLOB" - record['IPName'] = "LogManagement" + record["DataType"] = "LINUX_PERF_BLOB" + record["IPName"] = "LogManagement" kubePerfEventStream.add(emitTime, record) if record - #router.emit(@tag, time, record) if record + #router.emit(@tag, time, record) if record end #end router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream - rescue => errorStr $log.warn "Failed in parse_and_emit_record for KubePerf from in_kube_podinventory : #{errorStr}" $log.debug_backtrace(errorStr.backtrace) @@ -493,19 +538,9 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8 end #:optimize:end kubeservices merge - if telemetryFlush == true - telemetryProperties = {} - telemetryProperties["Computer"] = @@hostName - ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) - ApplicationInsightsUtility.sendMetricTelemetry("PodCount", podInventory["items"].length, {}) - telemetryProperties["ControllerData"] = controllerData.to_json - ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", controllerSet.length, telemetryProperties) - if winContainerCount > 0 - telemetryProperties["ClusterWideWindowsContainersCount"] = winContainerCount - ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) - end - @@podTelemetryTimeTracker = DateTime.now.to_time.to_i - end + #Updating value for AppInsights telemetry + @podCount += podInventory["items"].length + @@istestvar = ENV["ISTEST"] if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) $log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") From a8e3e76e47035835f459c63356b22a770a5c59ee Mon Sep 17 00:00:00 2001 From: rashmy Date: Fri, 15 Nov 2019 16:51:40 -0800 Subject: [PATCH 09/13] telemetry changes --- source/code/plugin/in_kube_podinventory.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index d040b8cba..58a4ef327 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -74,8 +74,9 @@ def processPodChunks(podInventory, serviceList) def parsePodsJsonAndProcess(podInfo, serviceList) if !podInfo.nil? - # podInventory = JSON.parse(podInfo.body) + $log.info("in_kube_podinventory::parsePodsJsonAndProcess : Start::Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") podInventory = Yajl::Parser.parse(StringIO.new(podInfo.body)) + $log.info("in_kube_podinventory::parsePodsJsonAndProcess : End::Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") end if (!podInventory.nil? && !podInventory["metadata"].nil?) continuationToken = podInventory["metadata"]["continue"] From b4ab80060e470e5f7e4ea354356465857024a61d Mon Sep 17 00:00:00 2001 From: rashmy Date: Fri, 15 Nov 2019 17:30:49 -0800 Subject: [PATCH 10/13] some fixes --- source/code/plugin/in_kube_podinventory.rb | 34 ++++++++++++---------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index 58a4ef327..d9d856c52 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -9,16 +9,12 @@ class Kube_PodInventory_Input < Input @@hostName = (OMS::Common.get_hostname) @@kubeperfTag = "oms.api.KubePerf" @@kubeservicesTag = "oms.containerinsights.KubeServices" - @PODS_CHUNK_SIZE = "1500" - @podCount = 0 - @controllerSet = Set.new [] - @winContainerCount = 0 - @controllerData = {} def initialize super require "yaml" require "yajl/json_gem" + require "yajl" require "set" require "time" @@ -26,6 +22,12 @@ def initialize require_relative "ApplicationInsightsUtility" require_relative "oms_common" require_relative "omslog" + + @PODS_CHUNK_SIZE = "1500" + @podCount = 0 + @controllerSet = Set.new [] + @winContainerCount = 0 + @controllerData = {} end config_param :run_interval, :time, :default => 60 @@ -55,44 +57,44 @@ def shutdown end end - def processPodChunks(podInventory, serviceList) + def processPodChunks(podInventory, serviceList, batchTime) begin + currentTime = Time.now if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) - batchTime = currentTime.utc.iso8601 parse_and_emit_records(podInventory, serviceList, batchTime) else - $log.warn "Received empty podInventory" + $log.warn "in_kube_podinventory::processPodChunks:Received empty podInventory" end podInfo = nil podInventory = nil rescue => errorStr - $log.warn "Failed in process pod chunks: #{errorStr}" + $log.warn "in_kube_podinventory::processPodChunks:Failed in process pod chunks: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end - def parsePodsJsonAndProcess(podInfo, serviceList) + def parsePodsJsonAndProcess(podInfo, serviceList, batchTime) if !podInfo.nil? - $log.info("in_kube_podinventory::parsePodsJsonAndProcess : Start::Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") + $log.info("in_kube_podinventory::parsePodsJsonAndProcess:Start:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") podInventory = Yajl::Parser.parse(StringIO.new(podInfo.body)) - $log.info("in_kube_podinventory::parsePodsJsonAndProcess : End::Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") + $log.info("in_kube_podinventory::parsePodsJsonAndProcess:End:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") end if (!podInventory.nil? && !podInventory["metadata"].nil?) continuationToken = podInventory["metadata"]["continue"] end - processPodChunks(podInventory, serviceList) + processPodChunks(podInventory, serviceList, batchTime) return continuationToken end def enumerate(podList = nil) podInventory = podList - currentTime = Time.now telemetryFlush = false @podCount = 0 @controllerSet = Set.new [] @winContainerCount = 0 @controllerData = {} + batchTime = currentTime.utc.iso8601 # Get services first so that we dont need to make a call for very chunk $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") @@ -105,14 +107,14 @@ def enumerate(podList = nil) podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}") $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") - continuationToken = parsePodsJsonAndProcess(podInfo, serviceList) + continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) #If we receive a continuation token, make calls, process and flush data until we have processed all data while (!continuationToken.nil? && !continuationToken.empty?) $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") - parsePodsJsonAndProcess(podInfo, serviceList) + continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) end # Adding telemetry to send pod telemetry every 5 minutes From 4a75a190069ab9bf2e625c2725315d037fc5f3a1 Mon Sep 17 00:00:00 2001 From: rashmy Date: Fri, 15 Nov 2019 17:34:04 -0800 Subject: [PATCH 11/13] bug fix --- source/code/plugin/in_kube_podinventory.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index d9d856c52..cc1ec35f8 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -59,7 +59,6 @@ def shutdown def processPodChunks(podInventory, serviceList, batchTime) begin - currentTime = Time.now if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) parse_and_emit_records(podInventory, serviceList, batchTime) else @@ -94,6 +93,7 @@ def enumerate(podList = nil) @controllerSet = Set.new [] @winContainerCount = 0 @controllerData = {} + currentTime = Time.now batchTime = currentTime.utc.iso8601 # Get services first so that we dont need to make a call for very chunk From 311fdbda66e43377d520faff820c38295cfa4422 Mon Sep 17 00:00:00 2001 From: rashmy Date: Fri, 15 Nov 2019 17:42:18 -0800 Subject: [PATCH 12/13] changing to have morgan changes only --- installer/conf/container.conf | 58 +++++++++---------- .../code/plugin/health/aggregate_monitor.rb | 40 +------------ 2 files changed, 32 insertions(+), 66 deletions(-) diff --git a/installer/conf/container.conf b/installer/conf/container.conf index 696ffdb6b..ee1e79f8a 100644 --- a/installer/conf/container.conf +++ b/installer/conf/container.conf @@ -11,7 +11,7 @@ type containerinventory tag oms.containerinsights.containerinventory - run_interval 60s + run_interval 60 log_level debug @@ -19,40 +19,40 @@ type cadvisorperf tag oms.api.cadvisorperf - run_interval 60s + run_interval 60 log_level debug - - type filter_cadvisor_health_node - log_level debug - +# +# type filter_cadvisor_health_node +# log_level debug +# - - type filter_cadvisor_health_container - log_level debug - +# +# type filter_cadvisor_health_container +# log_level debug +# #custom_metrics_mdm filter plugin - - type filter_cadvisor2mdm - custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast - metrics_to_collect cpuUsageNanoCores,memoryWorkingSetBytes,memoryRssBytes - log_level info - +# +# type filter_cadvisor2mdm +# custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral +# metrics_to_collect cpuUsageNanoCores,memoryWorkingSetBytes,memoryRssBytes +# log_level info +# type out_oms log_level debug num_threads 5 - buffer_chunk_limit 20m + #buffer_chunk_limit 20m buffer_type file buffer_path %STATE_DIR_WS%/out_oms_containerinventory*.buffer - buffer_queue_limit 20 + #buffer_queue_limit 20 buffer_queue_full_action drop_oldest_chunk - flush_interval 20s + flush_interval 10s retry_limit 10 - retry_wait 30s + retry_wait 10s max_retry_wait 9m @@ -60,14 +60,14 @@ type out_oms log_level debug num_threads 5 - buffer_chunk_limit 20m + #buffer_chunk_limit 20m buffer_type file buffer_path %STATE_DIR_WS%/out_oms_cadvisorperf*.buffer - buffer_queue_limit 20 + #buffer_queue_limit 20 buffer_queue_full_action drop_oldest_chunk - flush_interval 20s + flush_interval 10s retry_limit 10 - retry_wait 30s + retry_wait 10s max_retry_wait 9m @@ -96,14 +96,14 @@ type out_mdm log_level debug num_threads 5 - buffer_chunk_limit 20m + #buffer_chunk_limit 20m buffer_type file buffer_path %STATE_DIR_WS%/out_mdm_cdvisorperf*.buffer - buffer_queue_limit 20 + #buffer_queue_limit 20 buffer_queue_full_action drop_oldest_chunk - flush_interval 20s + flush_interval 10s retry_limit 10 - retry_wait 30s + retry_wait 10s max_retry_wait 9m retry_mdm_post_wait_minutes 60 - + \ No newline at end of file diff --git a/source/code/plugin/health/aggregate_monitor.rb b/source/code/plugin/health/aggregate_monitor.rb index 8ffc15ddc..19e457d69 100644 --- a/source/code/plugin/health/aggregate_monitor.rb +++ b/source/code/plugin/health/aggregate_monitor.rb @@ -3,12 +3,6 @@ require_relative 'health_model_constants' require 'yajl/json_gem' -# Require only when running inside container. -# otherwise unit tests will fail due to ApplicationInsightsUtility dependency on base omsagent ruby files. If you have your dev machine starting with omsagent-rs, then GOOD LUCK! -if Socket.gethostname.start_with?('omsagent-rs') - require_relative '../ApplicationInsightsUtility' -end - module HealthModel class AggregateMonitor attr_accessor :monitor_id, :monitor_instance_id, :state, :transition_date_time, :aggregation_algorithm, :aggregation_algorithm_params, :labels, :is_aggregate_monitor, :details @@ -22,8 +16,6 @@ class AggregateMonitor MonitorState::NONE => 5 } - @@telemetry_sent_hash = {} - # constructor def initialize( monitor_id, @@ -135,43 +127,17 @@ def calculate_percentage_state(monitor_set) #sort #TODO: What if sorted_filtered is empty? is that even possible? - log = HealthMonitorHelpers.get_log_handle sorted_filtered = sort_filter_member_monitors(monitor_set) state_threshold = @aggregation_algorithm_params['state_threshold'].to_f - if sorted_filtered.nil? - size = 0 - else - size = sorted_filtered.size - end - + size = sorted_filtered.size if size == 1 @state = sorted_filtered[0].state else count = ((state_threshold*size)/100).ceil index = size - count - if sorted_filtered.nil? || sorted_filtered[index].nil? - @state = HealthMonitorStates::UNKNOWN - if !@@telemetry_sent_hash.key?(@monitor_instance_id) - log.debug "Adding to telemetry sent hash #{@monitor_instance_id}" - @@telemetry_sent_hash[@monitor_instance_id] = true - log.info "Index: #{index} size: #{size} Count: #{count}" - custom_error_event_map = {} - custom_error_event_map["count"] = count - custom_error_event_map["index"] = index - custom_error_event_map["size"] = size - if !sorted_filtered.nil? - sorted_filtered.each_index{|i| - custom_error_event_map[i] = sorted_filtered[i].state - } - end - ApplicationInsightsUtility.sendCustomEvent("PercentageStateCalculationErrorEvent", custom_error_event_map) - end - else - @state = sorted_filtered[index].state - end - @state + @state = sorted_filtered[index].state end end @@ -224,4 +190,4 @@ def sort_filter_member_monitors(monitor_set) return sorted end end -end +end \ No newline at end of file From b013e12fcd4eb79597491f1fa13c15fcca0ffeef Mon Sep 17 00:00:00 2001 From: rashmy Date: Fri, 15 Nov 2019 17:43:28 -0800 Subject: [PATCH 13/13] add new line --- installer/conf/container.conf | 2 +- source/code/plugin/health/aggregate_monitor.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/installer/conf/container.conf b/installer/conf/container.conf index ee1e79f8a..e1877a576 100644 --- a/installer/conf/container.conf +++ b/installer/conf/container.conf @@ -106,4 +106,4 @@ retry_wait 10s max_retry_wait 9m retry_mdm_post_wait_minutes 60 - \ No newline at end of file + diff --git a/source/code/plugin/health/aggregate_monitor.rb b/source/code/plugin/health/aggregate_monitor.rb index 19e457d69..00ee9aecd 100644 --- a/source/code/plugin/health/aggregate_monitor.rb +++ b/source/code/plugin/health/aggregate_monitor.rb @@ -190,4 +190,4 @@ def sort_filter_member_monitors(monitor_set) return sorted end end -end \ No newline at end of file +end