From 7d63c95ee046d8e384c50be01e09f9e97eba7d51 Mon Sep 17 00:00:00 2001 From: rashmy Date: Tue, 26 Nov 2019 09:31:56 -0800 Subject: [PATCH 01/15] changes --- source/code/plugin/in_kube_nodes.rb | 86 ++++++++++++++++++++++++++++- 1 file changed, 84 insertions(+), 2 deletions(-) diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index 92fece728..e635b6d2c 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -30,6 +30,7 @@ def initialize require_relative "ApplicationInsightsUtility" require_relative "oms_common" require_relative "omslog" + @NODES_CHUNK_SIZE = "350" end config_param :run_interval, :time, :default => 60 @@ -59,13 +60,94 @@ def shutdown end end + def processNodeChunks(nodeInventory, batchTime) + begin + if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) + parse_and_emit_records(podInventory, serviceList, batchTime) + else + $log.warn "in_kube_podinventory::processPodChunks:Received empty podInventory" + end + podInfo = nil + podInventory = nil + rescue => errorStr + $log.warn "in_kube_podinventory::processPodChunks:Failed in process pod chunks: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + end + + def parseNodesJsonAndProcess(nodeInfo, batchTime) + if !nodeInfo.nil? + $log.info("in_kube_podinventory::parsePodsJsonAndProcess:Start:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") + podInventory = Yajl::Parser.parse(StringIO.new(nodeInfo.body)) + $log.info("in_kube_podinventory::parsePodsJsonAndProcess:End:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") + end + if (!podInventory.nil? && !podInventory["metadata"].nil?) + continuationToken = podInventory["metadata"]["continue"] + end + processPodChunks(podInventory, serviceList, batchTime) + return continuationToken + end + + def enumerate + telemetryFlush = false + @podCount = 0 + @controllerSet = Set.new [] + @winContainerCount = 0 + @controllerData = {} currentTime = Time.now - emitTime = currentTime.to_f batchTime = currentTime.utc.iso8601 + + # Initializing continuation token to nil + continuationToken = nil + $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") + nodeInfo = KubernetesApiClient.getKubeResourceInfo("nodes?limit=#{@NODES_CHUNK_SIZE}") + $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + + + continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) + + #If we receive a continuation token, make calls, process and flush data until we have processed all data + while (!continuationToken.nil? && !continuationToken.empty?) + $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") + podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@NODES_CHUNK_SIZE}&continue=#{continuationToken}") + $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") + continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) + end + + # Adding telemetry to send pod telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 5) + telemetryFlush = true + end + + # Flush AppInsights telemetry once all the processing is done + if telemetryFlush == true + telemetryProperties = {} + telemetryProperties["Computer"] = @@hostName + ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) + ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {}) + telemetryProperties["ControllerData"] = @controllerData.to_json + ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties) + if @winContainerCount > 0 + telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount + ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) + end + @@podTelemetryTimeTracker = DateTime.now.to_time.to_i + end + end + + + # def enumerate + def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) + # currentTime = Time.now + emitTime = currentTime.to_f + # batchTime = currentTime.utc.iso8601 telemetrySent = false - nodeInventory = nil + # nodeInventory = nil $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") nodeInfo = KubernetesApiClient.getKubeResourceInfo("nodes") From 965e19b0b7735282a62e3f140296099aa40937ca Mon Sep 17 00:00:00 2001 From: rashmy Date: Tue, 26 Nov 2019 11:52:24 -0800 Subject: [PATCH 02/15] changes --- source/code/plugin/KubernetesApiClient.rb | 34 +++++++- source/code/plugin/in_kube_nodes.rb | 6 +- source/code/plugin/in_kube_podinventory.rb | 90 ++++++++++++++-------- 3 files changed, 91 insertions(+), 39 deletions(-) diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index 8adf3f6b7..a4503c033 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -2,7 +2,7 @@ # frozen_string_literal: true class KubernetesApiClient - require 'yajl/json_gem' + require "yajl/json_gem" require "logger" require "net/http" require "net/https" @@ -43,7 +43,7 @@ def getKubeResourceInfo(resource, api_group: nil) if !File.exist?(@@CaFile) raise "#{@@CaFile} doesnt exist" else - Net::HTTP.start(uri.host, uri.port, :use_ssl => true, :ca_file => @@CaFile, :verify_mode => OpenSSL::SSL::VERIFY_PEER ) do |http| + Net::HTTP.start(uri.host, uri.port, :use_ssl => true, :ca_file => @@CaFile, :verify_mode => OpenSSL::SSL::VERIFY_PEER) do |http| kubeApiRequest = Net::HTTP::Get.new(uri.request_uri) kubeApiRequest["Authorization"] = "Bearer " + getTokenStr @Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}" @@ -333,7 +333,7 @@ def getContainerLogsSinceTime(namespace, pod, container, since, showTimeStamp) return containerLogs end - def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601 ) + def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601) metricItems = [] begin clusterId = getClusterId @@ -546,5 +546,33 @@ def getMetricNumericValue(metricName, metricVal) end return metricValue end # getMetricNumericValue + + # def parseJsonForContinuationToken(resourceInfo) + def getInventoryAndContinuationToken(uri) + continuationToken = nil + resourceInventory = nil + begin + $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") + resourceInfo = KubernetesApiClient.getKubeResourceInfo(uri) + $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") + if !resourceInfo.nil? + $log.info("in_kube_podinventory::getInventoryAndContinuationToken:Start:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") + resourceInventory = Yajl::Parser.parse(StringIO.new(resourceInfo.body)) + $log.info("in_kube_podinventory::getInventoryAndContinuationToken:End:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") + end + if (!resourceInventory.nil? && !resourceInventory["metadata"].nil?) + continuationToken = resourceInventory["metadata"]["continue"] + end + rescue => errorStr + $log.warn "KubernetesApiClient::getInventoryAndContinuationToken:Failed in parse json for continuation token: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + resourceInventory = nil + end + # ensure + return continuationToken, resourceInventory + # resourceInventory = nil + # end + end #getInventoryAndContinuationToken end end diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index e635b6d2c..fc2fc752c 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -62,10 +62,10 @@ def shutdown def processNodeChunks(nodeInventory, batchTime) begin - if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) - parse_and_emit_records(podInventory, serviceList, batchTime) + if (!nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].empty?) + parse_and_emit_records(nodeInventory, serviceList, batchTime) else - $log.warn "in_kube_podinventory::processPodChunks:Received empty podInventory" + $log.warn "in_kube_podinventory::processNodeChunks:Received empty nodeInventory" end podInfo = nil podInventory = nil diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index c9ae75a03..2aa52860d 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -57,34 +57,50 @@ def shutdown end end - def processPodChunks(podInventory, serviceList, batchTime) - begin - if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) - parse_and_emit_records(podInventory, serviceList, batchTime) - else - $log.warn "in_kube_podinventory::processPodChunks:Received empty podInventory" - end - podInfo = nil - podInventory = nil - rescue => errorStr - $log.warn "in_kube_podinventory::processPodChunks:Failed in process pod chunks: #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end - end - - def parsePodsJsonAndProcess(podInfo, serviceList, batchTime) - if !podInfo.nil? - $log.info("in_kube_podinventory::parsePodsJsonAndProcess:Start:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") - podInventory = Yajl::Parser.parse(StringIO.new(podInfo.body)) - $log.info("in_kube_podinventory::parsePodsJsonAndProcess:End:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") - end - if (!podInventory.nil? && !podInventory["metadata"].nil?) - continuationToken = podInventory["metadata"]["continue"] - end - processPodChunks(podInventory, serviceList, batchTime) - return continuationToken - end + # def processPodChunks(podInventory, serviceList, batchTime) + # begin + # if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) + # parse_and_emit_records(podInventory, serviceList, batchTime) + # else + # $log.warn "in_kube_podinventory::processPodChunks:Received empty podInventory" + # end + # podInfo = nil + # podInventory = nil + # rescue => errorStr + # $log.warn "in_kube_podinventory::processPodChunks:Failed in process pod chunks: #{errorStr}" + # $log.debug_backtrace(errorStr.backtrace) + # ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + # end + # end + + # def parseJsonForContinuationToken(podInfo, serviceList, batchTime) + # def parseJsonForContinuationToken(podInfo) + # continuationToken = nil + # podInventory = nil + # begin + # if !podInfo.nil? + # $log.info("in_kube_podinventory::parsePodsJsonAndProcess:Start:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") + # podInventory = Yajl::Parser.parse(StringIO.new(podInfo.body)) + # $log.info("in_kube_podinventory::parsePodsJsonAndProcess:End:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") + # end + # if (!podInventory.nil? && !podInventory["metadata"].nil?) + # continuationToken = podInventory["metadata"]["continue"] + # end + # # processPodChunks(podInventory, serviceList, batchTime) + # # if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) + # # parse_and_emit_records(podInventory, serviceList, batchTime) + # # else + # # $log.warn "in_kube_podinventory::parsePodsJsonAndProcess:Received empty podInventory" + # # end + # # podInfo = nil + # # podInventory = nil + # rescue => errorStr + # $log.warn "KubernetesApiClient::parseJsonForContinuationToken:Failed in parse pods and process: #{errorStr}" + # $log.debug_backtrace(errorStr.backtrace) + # ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + # end + # return continuationToken, podInventory + # end def enumerate(podList = nil) podInventory = podList @@ -107,14 +123,22 @@ def enumerate(podList = nil) podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}") $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") - continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) + # continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) + continuationToken = parseJsonForContinuationToken(podInfo) #If we receive a continuation token, make calls, process and flush data until we have processed all data while (!continuationToken.nil? && !continuationToken.empty?) - $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") - podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") - $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") - continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) + # $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") + # podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") + # $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") + # # continuationToken, podInventory = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) + continuationToken, podInventory = getInventoryAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") + if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) + parse_and_emit_records(podInventory, serviceList, batchTime) + podInventory = nil + else + $log.warn "in_kube_podinventory::parsePodsJsonAndProcess:Received empty podInventory" + end end # Adding telemetry to send pod telemetry every 5 minutes From 2f13c22dd3c879e399793fd12fa28a25ced7cfd6 Mon Sep 17 00:00:00 2001 From: rashmy Date: Tue, 26 Nov 2019 14:18:56 -0800 Subject: [PATCH 03/15] refactor changes --- source/code/plugin/KubernetesApiClient.rb | 16 +++--- source/code/plugin/in_kube_podinventory.rb | 57 ++-------------------- 2 files changed, 10 insertions(+), 63 deletions(-) diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index a4503c033..fc9f499bb 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -548,31 +548,29 @@ def getMetricNumericValue(metricName, metricVal) end # getMetricNumericValue # def parseJsonForContinuationToken(resourceInfo) - def getInventoryAndContinuationToken(uri) + def getResourcesAndContinuationToken(uri) continuationToken = nil resourceInventory = nil begin - $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") + $log.info("KubernetesApiClient::getResourcesAndContinuationToken : Getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}") resourceInfo = KubernetesApiClient.getKubeResourceInfo(uri) - $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") + $log.info("KubernetesApiClient::getResourcesAndContinuationToken : Done getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}") if !resourceInfo.nil? - $log.info("in_kube_podinventory::getInventoryAndContinuationToken:Start:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") + $log.info("KubernetesApiClient::getResourcesAndContinuationToken:Start:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") resourceInventory = Yajl::Parser.parse(StringIO.new(resourceInfo.body)) - $log.info("in_kube_podinventory::getInventoryAndContinuationToken:End:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") + $log.info("KubernetesApiClient::getResourcesAndContinuationToken:End:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") end if (!resourceInventory.nil? && !resourceInventory["metadata"].nil?) continuationToken = resourceInventory["metadata"]["continue"] end rescue => errorStr - $log.warn "KubernetesApiClient::getInventoryAndContinuationToken:Failed in parse json for continuation token: #{errorStr}" + $log.warn "KubernetesApiClient::getResourcesAndContinuationToken:Failed in parse json for continuation token: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) resourceInventory = nil end # ensure return continuationToken, resourceInventory - # resourceInventory = nil - # end - end #getInventoryAndContinuationToken + end #getResourcesAndContinuationToken end end diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index 2aa52860d..c62f787dc 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -57,51 +57,6 @@ def shutdown end end - # def processPodChunks(podInventory, serviceList, batchTime) - # begin - # if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) - # parse_and_emit_records(podInventory, serviceList, batchTime) - # else - # $log.warn "in_kube_podinventory::processPodChunks:Received empty podInventory" - # end - # podInfo = nil - # podInventory = nil - # rescue => errorStr - # $log.warn "in_kube_podinventory::processPodChunks:Failed in process pod chunks: #{errorStr}" - # $log.debug_backtrace(errorStr.backtrace) - # ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - # end - # end - - # def parseJsonForContinuationToken(podInfo, serviceList, batchTime) - # def parseJsonForContinuationToken(podInfo) - # continuationToken = nil - # podInventory = nil - # begin - # if !podInfo.nil? - # $log.info("in_kube_podinventory::parsePodsJsonAndProcess:Start:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") - # podInventory = Yajl::Parser.parse(StringIO.new(podInfo.body)) - # $log.info("in_kube_podinventory::parsePodsJsonAndProcess:End:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") - # end - # if (!podInventory.nil? && !podInventory["metadata"].nil?) - # continuationToken = podInventory["metadata"]["continue"] - # end - # # processPodChunks(podInventory, serviceList, batchTime) - # # if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) - # # parse_and_emit_records(podInventory, serviceList, batchTime) - # # else - # # $log.warn "in_kube_podinventory::parsePodsJsonAndProcess:Received empty podInventory" - # # end - # # podInfo = nil - # # podInventory = nil - # rescue => errorStr - # $log.warn "KubernetesApiClient::parseJsonForContinuationToken:Failed in parse pods and process: #{errorStr}" - # $log.debug_backtrace(errorStr.backtrace) - # ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - # end - # return continuationToken, podInventory - # end - def enumerate(podList = nil) podInventory = podList telemetryFlush = false @@ -120,22 +75,16 @@ def enumerate(podList = nil) # Initializing continuation token to nil continuationToken = nil $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") - podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}") + continuationToken, podInventory = getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}") $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") - # continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) - continuationToken = parseJsonForContinuationToken(podInfo) - #If we receive a continuation token, make calls, process and flush data until we have processed all data while (!continuationToken.nil? && !continuationToken.empty?) - # $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") - # podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") - # $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") - # # continuationToken, podInventory = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) - continuationToken, podInventory = getInventoryAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") + continuationToken, podInventory = getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) parse_and_emit_records(podInventory, serviceList, batchTime) podInventory = nil + serviceList = nil else $log.warn "in_kube_podinventory::parsePodsJsonAndProcess:Received empty podInventory" end From 7765dd0bb2cccf798899ed1febbe51dc2ebd7be0 Mon Sep 17 00:00:00 2001 From: rashmy Date: Tue, 26 Nov 2019 14:50:48 -0800 Subject: [PATCH 04/15] changes --- source/code/plugin/KubernetesApiClient.rb | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index fc9f499bb..0440e145d 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -556,20 +556,19 @@ def getResourcesAndContinuationToken(uri) resourceInfo = KubernetesApiClient.getKubeResourceInfo(uri) $log.info("KubernetesApiClient::getResourcesAndContinuationToken : Done getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}") if !resourceInfo.nil? - $log.info("KubernetesApiClient::getResourcesAndContinuationToken:Start:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") + $log.info("KubernetesApiClient::getResourcesAndContinuationToken:Start:Parsing data using yajl @ #{Time.now.utc.iso8601}") resourceInventory = Yajl::Parser.parse(StringIO.new(resourceInfo.body)) - $log.info("KubernetesApiClient::getResourcesAndContinuationToken:End:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") + $log.info("KubernetesApiClient::getResourcesAndContinuationToken:End:Parsing data using yajl @ #{Time.now.utc.iso8601}") end if (!resourceInventory.nil? && !resourceInventory["metadata"].nil?) continuationToken = resourceInventory["metadata"]["continue"] end rescue => errorStr - $log.warn "KubernetesApiClient::getResourcesAndContinuationToken:Failed in parse json for continuation token: #{errorStr}" + $log.warn "KubernetesApiClient::getResourcesAndContinuationToken:Failed in get resources and continuation token: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) resourceInventory = nil end - # ensure return continuationToken, resourceInventory end #getResourcesAndContinuationToken end From a9614652f6092ce48f4f2276fe5a94c169d18cf1 Mon Sep 17 00:00:00 2001 From: rashmy Date: Tue, 26 Nov 2019 15:19:06 -0800 Subject: [PATCH 05/15] changes --- source/code/plugin/in_kube_podinventory.rb | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index c62f787dc..40081f508 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -75,21 +75,28 @@ def enumerate(podList = nil) # Initializing continuation token to nil continuationToken = nil $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") - continuationToken, podInventory = getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}") + continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}") $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") + if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) + parse_and_emit_records(podInventory, serviceList, batchTime) + else + $log.warn "in_kube_podinventory::parsePodsJsonAndProcess:Received empty podInventory" + end #If we receive a continuation token, make calls, process and flush data until we have processed all data while (!continuationToken.nil? && !continuationToken.empty?) - continuationToken, podInventory = getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") - if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) + continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") + if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) parse_and_emit_records(podInventory, serviceList, batchTime) - podInventory = nil - serviceList = nil else $log.warn "in_kube_podinventory::parsePodsJsonAndProcess:Received empty podInventory" end end + # Setting these to nil so that we dont hold memory until GC kicks in + podInventory = nil + serviceList = nil + # Adding telemetry to send pod telemetry every 5 minutes timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs timeDifferenceInMinutes = timeDifference / 60 From 9f47b48728adfc14987bd110377d3e99303f65d9 Mon Sep 17 00:00:00 2001 From: rashmy Date: Tue, 26 Nov 2019 15:22:37 -0800 Subject: [PATCH 06/15] changes --- source/code/plugin/in_kube_nodes.rb | 30 ------ source/code/plugin/in_kube_podinventory.rb | 106 +++++++++++---------- 2 files changed, 56 insertions(+), 80 deletions(-) diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index fc2fc752c..445c4e1bc 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -60,36 +60,6 @@ def shutdown end end - def processNodeChunks(nodeInventory, batchTime) - begin - if (!nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].empty?) - parse_and_emit_records(nodeInventory, serviceList, batchTime) - else - $log.warn "in_kube_podinventory::processNodeChunks:Received empty nodeInventory" - end - podInfo = nil - podInventory = nil - rescue => errorStr - $log.warn "in_kube_podinventory::processPodChunks:Failed in process pod chunks: #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end - end - - def parseNodesJsonAndProcess(nodeInfo, batchTime) - if !nodeInfo.nil? - $log.info("in_kube_podinventory::parsePodsJsonAndProcess:Start:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") - podInventory = Yajl::Parser.parse(StringIO.new(nodeInfo.body)) - $log.info("in_kube_podinventory::parsePodsJsonAndProcess:End:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") - end - if (!podInventory.nil? && !podInventory["metadata"].nil?) - continuationToken = podInventory["metadata"]["continue"] - end - processPodChunks(podInventory, serviceList, batchTime) - return continuationToken - end - - def enumerate telemetryFlush = false @podCount = 0 diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index 40081f508..8564f5f25 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -58,65 +58,71 @@ def shutdown end def enumerate(podList = nil) - podInventory = podList - telemetryFlush = false - @podCount = 0 - @controllerSet = Set.new [] - @winContainerCount = 0 - @controllerData = {} - currentTime = Time.now - batchTime = currentTime.utc.iso8601 - - # Get services first so that we dont need to make a call for very chunk - $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") - serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body) - $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") - - # Initializing continuation token to nil - continuationToken = nil - $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") - continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}") - $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") - if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) - parse_and_emit_records(podInventory, serviceList, batchTime) - else - $log.warn "in_kube_podinventory::parsePodsJsonAndProcess:Received empty podInventory" - end - - #If we receive a continuation token, make calls, process and flush data until we have processed all data - while (!continuationToken.nil? && !continuationToken.empty?) - continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") + begin + podInventory = podList + telemetryFlush = false + @podCount = 0 + @controllerSet = Set.new [] + @winContainerCount = 0 + @controllerData = {} + currentTime = Time.now + batchTime = currentTime.utc.iso8601 + + # Get services first so that we dont need to make a call for very chunk + $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") + serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body) + $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") + + # Initializing continuation token to nil + continuationToken = nil + $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") + continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}") + $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) parse_and_emit_records(podInventory, serviceList, batchTime) else $log.warn "in_kube_podinventory::parsePodsJsonAndProcess:Received empty podInventory" end - end - # Setting these to nil so that we dont hold memory until GC kicks in - podInventory = nil - serviceList = nil + #If we receive a continuation token, make calls, process and flush data until we have processed all data + while (!continuationToken.nil? && !continuationToken.empty?) + continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") + if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) + parse_and_emit_records(podInventory, serviceList, batchTime) + else + $log.warn "in_kube_podinventory::parsePodsJsonAndProcess:Received empty podInventory" + end + end + + # Setting these to nil so that we dont hold memory until GC kicks in + podInventory = nil + serviceList = nil - # Adding telemetry to send pod telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference / 60 - if (timeDifferenceInMinutes >= 5) - telemetryFlush = true - end + # Adding telemetry to send pod telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 5) + telemetryFlush = true + end - # Flush AppInsights telemetry once all the processing is done - if telemetryFlush == true - telemetryProperties = {} - telemetryProperties["Computer"] = @@hostName - ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) - ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {}) - telemetryProperties["ControllerData"] = @controllerData.to_json - ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties) - if @winContainerCount > 0 - telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount - ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) + # Flush AppInsights telemetry once all the processing is done + if telemetryFlush == true + telemetryProperties = {} + telemetryProperties["Computer"] = @@hostName + ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) + ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {}) + telemetryProperties["ControllerData"] = @controllerData.to_json + ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties) + if @winContainerCount > 0 + telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount + ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) + end + @@podTelemetryTimeTracker = DateTime.now.to_time.to_i end - @@podTelemetryTimeTracker = DateTime.now.to_time.to_i + rescue => errorStr + $log.warn "in_kube_podinventory::enumerate:Failed in enumerate: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end From 8827332cfe908f3d136fc4467c76c1939a037d2f Mon Sep 17 00:00:00 2001 From: rashmy Date: Tue, 26 Nov 2019 15:34:16 -0800 Subject: [PATCH 07/15] changes --- source/code/plugin/KubernetesApiClient.rb | 1 + source/code/plugin/in_kube_nodes.rb | 111 ++++++++++++--------- source/code/plugin/in_kube_podinventory.rb | 14 ++- 3 files changed, 74 insertions(+), 52 deletions(-) diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index 0440e145d..8f5382c02 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -559,6 +559,7 @@ def getResourcesAndContinuationToken(uri) $log.info("KubernetesApiClient::getResourcesAndContinuationToken:Start:Parsing data using yajl @ #{Time.now.utc.iso8601}") resourceInventory = Yajl::Parser.parse(StringIO.new(resourceInfo.body)) $log.info("KubernetesApiClient::getResourcesAndContinuationToken:End:Parsing data using yajl @ #{Time.now.utc.iso8601}") + resourceInfo = nil end if (!resourceInventory.nil? && !resourceInventory["metadata"].nil?) continuationToken = resourceInventory["metadata"]["continue"] diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index 445c4e1bc..2aae3e51b 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -61,56 +61,69 @@ def shutdown end def enumerate - telemetryFlush = false - @podCount = 0 - @controllerSet = Set.new [] - @winContainerCount = 0 - @controllerData = {} - currentTime = Time.now - batchTime = currentTime.utc.iso8601 - - # Initializing continuation token to nil - continuationToken = nil - $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") - nodeInfo = KubernetesApiClient.getKubeResourceInfo("nodes?limit=#{@NODES_CHUNK_SIZE}") - $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + begin + nodeInventory = nil + telemetryFlush = false + @podCount = 0 + @controllerSet = Set.new [] + @winContainerCount = 0 + @controllerData = {} + currentTime = Time.now + batchTime = currentTime.utc.iso8601 + + # Initializing continuation token to nil + continuationToken = nil + $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") + continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken("nodes?limit=#{@NODES_CHUNK_SIZE}") + $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].empty?) + parse_and_emit_records(nodeInventory, batchTime) + else + $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory" + end - - continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) + #If we receive a continuation token, make calls, process and flush data until we have processed all data + while (!continuationToken.nil? && !continuationToken.empty?) + continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken("nodes?limit=#{@NODES_CHUNK_SIZE}&continue=#{continuationToken}") + if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].empty?) + parse_and_emit_records(nodeInventory, batchTime) + else + $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory" + end + end - #If we receive a continuation token, make calls, process and flush data until we have processed all data - while (!continuationToken.nil? && !continuationToken.empty?) - $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") - podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@NODES_CHUNK_SIZE}&continue=#{continuationToken}") - $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") - continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) - end + # Setting this to nil so that we dont hold memory until GC kicks in + nodeInventory = nil - # Adding telemetry to send pod telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference / 60 - if (timeDifferenceInMinutes >= 5) - telemetryFlush = true - end + # Adding telemetry to send pod telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 5) + telemetryFlush = true + end - # Flush AppInsights telemetry once all the processing is done - if telemetryFlush == true - telemetryProperties = {} - telemetryProperties["Computer"] = @@hostName - ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) - ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {}) - telemetryProperties["ControllerData"] = @controllerData.to_json - ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties) - if @winContainerCount > 0 - telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount - ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) + # Flush AppInsights telemetry once all the processing is done + if telemetryFlush == true + telemetryProperties = {} + telemetryProperties["Computer"] = @@hostName + ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) + ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {}) + telemetryProperties["ControllerData"] = @controllerData.to_json + ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties) + if @winContainerCount > 0 + telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount + ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) + end + @@podTelemetryTimeTracker = DateTime.now.to_time.to_i end - @@podTelemetryTimeTracker = DateTime.now.to_time.to_i + rescue => errorStr + $log.warn "in_kube_podinventory::enumerate:Failed in enumerate: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end - end + end # end enumerate - - # def enumerate + # def parse_and_emit_records def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) # currentTime = Time.now emitTime = currentTime.to_f @@ -119,13 +132,13 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) # nodeInventory = nil - $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") - nodeInfo = KubernetesApiClient.getKubeResourceInfo("nodes") - $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + # $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") + # nodeInfo = KubernetesApiClient.getKubeResourceInfo("nodes") + # $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") - if !nodeInfo.nil? - nodeInventory = Yajl::Parser.parse(StringIO.new(nodeInfo.body)) - end + # if !nodeInfo.nil? + # nodeInventory = Yajl::Parser.parse(StringIO.new(nodeInfo.body)) + # end begin if (!nodeInventory.nil? && !nodeInventory.empty?) diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index 8564f5f25..b7cb1f649 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -70,9 +70,17 @@ def enumerate(podList = nil) # Get services first so that we dont need to make a call for very chunk $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") - serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body) + serviceInfo = KubernetesApiClient.getKubeResourceInfo("services") + # serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body) $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") + if !serviceInfo.nil? + $log.info("in_kube_podinventory::enumerate:Start:Parsing services data using yajl @ #{Time.now.utc.iso8601}") + serviceList = Yajl::Parser.parse(StringIO.new(serviceInfo.body)) + $log.info("in_kube_podinventory::enumerate:End:Parsing services data using yajl @ #{Time.now.utc.iso8601}") + serviceInfo = nil + end + # Initializing continuation token to nil continuationToken = nil $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") @@ -81,7 +89,7 @@ def enumerate(podList = nil) if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) parse_and_emit_records(podInventory, serviceList, batchTime) else - $log.warn "in_kube_podinventory::parsePodsJsonAndProcess:Received empty podInventory" + $log.warn "in_kube_podinventory::enumerate:Received empty podInventory" end #If we receive a continuation token, make calls, process and flush data until we have processed all data @@ -90,7 +98,7 @@ def enumerate(podList = nil) if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) parse_and_emit_records(podInventory, serviceList, batchTime) else - $log.warn "in_kube_podinventory::parsePodsJsonAndProcess:Received empty podInventory" + $log.warn "in_kube_podinventory::enumerate:Received empty podInventory" end end From ccc072ab443c6b1c3d78745127f3df2d7e06d218 Mon Sep 17 00:00:00 2001 From: rashmy Date: Tue, 26 Nov 2019 15:44:56 -0800 Subject: [PATCH 08/15] node changes --- source/code/plugin/in_kube_nodes.rb | 322 +++++++++------------ source/code/plugin/in_kube_podinventory.rb | 4 +- 2 files changed, 142 insertions(+), 184 deletions(-) diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index 2aae3e51b..c2f8af2cd 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -22,8 +22,8 @@ class Kube_nodeInventory_Input < Input def initialize super require "yaml" - require 'yajl/json_gem' - require 'yajl' + require "yajl/json_gem" + require "yajl" require "time" require_relative "KubernetesApiClient" @@ -63,11 +63,6 @@ def shutdown def enumerate begin nodeInventory = nil - telemetryFlush = false - @podCount = 0 - @controllerSet = Set.new [] - @winContainerCount = 0 - @controllerData = {} currentTime = Time.now batchTime = currentTime.utc.iso8601 @@ -85,7 +80,7 @@ def enumerate #If we receive a continuation token, make calls, process and flush data until we have processed all data while (!continuationToken.nil? && !continuationToken.empty?) continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken("nodes?limit=#{@NODES_CHUNK_SIZE}&continue=#{continuationToken}") - if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].empty?) + if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?) parse_and_emit_records(nodeInventory, batchTime) else $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory" @@ -95,27 +90,6 @@ def enumerate # Setting this to nil so that we dont hold memory until GC kicks in nodeInventory = nil - # Adding telemetry to send pod telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference / 60 - if (timeDifferenceInMinutes >= 5) - telemetryFlush = true - end - - # Flush AppInsights telemetry once all the processing is done - if telemetryFlush == true - telemetryProperties = {} - telemetryProperties["Computer"] = @@hostName - ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) - ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {}) - telemetryProperties["ControllerData"] = @controllerData.to_json - ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties) - if @winContainerCount > 0 - telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount - ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) - end - @@podTelemetryTimeTracker = DateTime.now.to_time.to_i - end rescue => errorStr $log.warn "in_kube_podinventory::enumerate:Failed in enumerate: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) @@ -125,167 +99,151 @@ def enumerate # def parse_and_emit_records def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) - # currentTime = Time.now emitTime = currentTime.to_f - # batchTime = currentTime.utc.iso8601 telemetrySent = false - # nodeInventory = nil - - # $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") - # nodeInfo = KubernetesApiClient.getKubeResourceInfo("nodes") - # $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") - - # if !nodeInfo.nil? - # nodeInventory = Yajl::Parser.parse(StringIO.new(nodeInfo.body)) - # end - begin - if (!nodeInventory.nil? && !nodeInventory.empty?) - eventStream = MultiEventStream.new - containerNodeInventoryEventStream = MultiEventStream.new - if !nodeInventory["items"].nil? - #get node inventory - nodeInventory["items"].each do |items| - record = {} - # Sending records for ContainerNodeInventory - containerNodeInventoryRecord = {} - containerNodeInventoryRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - containerNodeInventoryRecord["Computer"] = items["metadata"]["name"] + eventStream = MultiEventStream.new + containerNodeInventoryEventStream = MultiEventStream.new + #get node inventory + nodeInventory["items"].each do |items| + record = {} + # Sending records for ContainerNodeInventory + containerNodeInventoryRecord = {} + containerNodeInventoryRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + containerNodeInventoryRecord["Computer"] = items["metadata"]["name"] - record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - record["Computer"] = items["metadata"]["name"] - record["ClusterName"] = KubernetesApiClient.getClusterName - record["ClusterId"] = KubernetesApiClient.getClusterId - record["CreationTimeStamp"] = items["metadata"]["creationTimestamp"] - record["Labels"] = [items["metadata"]["labels"]] - record["Status"] = "" + record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + record["Computer"] = items["metadata"]["name"] + record["ClusterName"] = KubernetesApiClient.getClusterName + record["ClusterId"] = KubernetesApiClient.getClusterId + record["CreationTimeStamp"] = items["metadata"]["creationTimestamp"] + record["Labels"] = [items["metadata"]["labels"]] + record["Status"] = "" - if !items["spec"]["providerID"].nil? && !items["spec"]["providerID"].empty? - if File.file?(@@AzStackCloudFileName) # existence of this file indicates agent running on azstack - record["KubernetesProviderID"] = "azurestack" - else - record["KubernetesProviderID"] = items["spec"]["providerID"] - end - else - record["KubernetesProviderID"] = "onprem" - end + if !items["spec"]["providerID"].nil? && !items["spec"]["providerID"].empty? + if File.file?(@@AzStackCloudFileName) # existence of this file indicates agent running on azstack + record["KubernetesProviderID"] = "azurestack" + else + record["KubernetesProviderID"] = items["spec"]["providerID"] + end + else + record["KubernetesProviderID"] = "onprem" + end - # Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions. - # We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we - # populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk" - # implying that the node is ready for hosting pods, however its out of disk. + # Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions. + # We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we + # populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk" + # implying that the node is ready for hosting pods, however its out of disk. - if items["status"].key?("conditions") && !items["status"]["conditions"].empty? - allNodeConditions = "" - items["status"]["conditions"].each do |condition| - if condition["status"] == "True" - if !allNodeConditions.empty? - allNodeConditions = allNodeConditions + "," + condition["type"] - else - allNodeConditions = condition["type"] - end - end - #collect last transition to/from ready (no matter ready is true/false) - if condition["type"] == "Ready" && !condition["lastTransitionTime"].nil? - record["LastTransitionTimeReady"] = condition["lastTransitionTime"] - end - end + if items["status"].key?("conditions") && !items["status"]["conditions"].empty? + allNodeConditions = "" + items["status"]["conditions"].each do |condition| + if condition["status"] == "True" if !allNodeConditions.empty? - record["Status"] = allNodeConditions + allNodeConditions = allNodeConditions + "," + condition["type"] + else + allNodeConditions = condition["type"] end end + #collect last transition to/from ready (no matter ready is true/false) + if condition["type"] == "Ready" && !condition["lastTransitionTime"].nil? + record["LastTransitionTimeReady"] = condition["lastTransitionTime"] + end + end + if !allNodeConditions.empty? + record["Status"] = allNodeConditions + end + end - nodeInfo = items["status"]["nodeInfo"] - record["KubeletVersion"] = nodeInfo["kubeletVersion"] - record["KubeProxyVersion"] = nodeInfo["kubeProxyVersion"] - containerNodeInventoryRecord["OperatingSystem"] = nodeInfo["osImage"] - dockerVersion = nodeInfo["containerRuntimeVersion"] - dockerVersion.slice! "docker://" - containerNodeInventoryRecord["DockerVersion"] = dockerVersion - # ContainerNodeInventory data for docker version and operating system. - containerNodeInventoryWrapper = { - "DataType" => "CONTAINER_NODE_INVENTORY_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [containerNodeInventoryRecord.each { |k, v| containerNodeInventoryRecord[k] = v }], - } - containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryWrapper) if containerNodeInventoryWrapper + nodeInfo = items["status"]["nodeInfo"] + record["KubeletVersion"] = nodeInfo["kubeletVersion"] + record["KubeProxyVersion"] = nodeInfo["kubeProxyVersion"] + containerNodeInventoryRecord["OperatingSystem"] = nodeInfo["osImage"] + dockerVersion = nodeInfo["containerRuntimeVersion"] + dockerVersion.slice! "docker://" + containerNodeInventoryRecord["DockerVersion"] = dockerVersion + # ContainerNodeInventory data for docker version and operating system. + containerNodeInventoryWrapper = { + "DataType" => "CONTAINER_NODE_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [containerNodeInventoryRecord.each { |k, v| containerNodeInventoryRecord[k] = v }], + } + containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryWrapper) if containerNodeInventoryWrapper - wrapper = { - "DataType" => "KUBE_NODE_INVENTORY_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [record.each { |k, v| record[k] = v }], - } - eventStream.add(emitTime, wrapper) if wrapper - # Adding telemetry to send node telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference / 60 - if (timeDifferenceInMinutes >= 10) - properties = {} - properties["Computer"] = record["Computer"] - properties["KubeletVersion"] = record["KubeletVersion"] - properties["OperatingSystem"] = nodeInfo["operatingSystem"] - properties["DockerVersion"] = dockerVersion - properties["KubernetesProviderID"] = record["KubernetesProviderID"] - properties["KernelVersion"] = nodeInfo["kernelVersion"] - properties["OSImage"] = nodeInfo["osImage"] + wrapper = { + "DataType" => "KUBE_NODE_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [record.each { |k, v| record[k] = v }], + } + eventStream.add(emitTime, wrapper) if wrapper + # Adding telemetry to send node telemetry every 10 minutes + timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 10) + properties = {} + properties["Computer"] = record["Computer"] + properties["KubeletVersion"] = record["KubeletVersion"] + properties["OperatingSystem"] = nodeInfo["operatingSystem"] + properties["DockerVersion"] = dockerVersion + properties["KubernetesProviderID"] = record["KubernetesProviderID"] + properties["KernelVersion"] = nodeInfo["kernelVersion"] + properties["OSImage"] = nodeInfo["osImage"] - capacityInfo = items["status"]["capacity"] - ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties) + capacityInfo = items["status"]["capacity"] + ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties) - #telemetry about prometheus metric collections settings for replicaset - if (File.file?(@@promConfigMountPath)) - properties["rsPromInt"] = @@rsPromInterval - properties["rsPromFPC"] = @@rsPromFieldPassCount - properties["rsPromFDC"] = @@rsPromFieldDropCount - properties["rsPromServ"] = @@rsPromK8sServiceCount - properties["rsPromUrl"] = @@rsPromUrlCount - properties["rsPromMonPods"] = @@rsPromMonitorPods - properties["rsPromMonPodsNs"] = @@rsPromMonitorPodsNamespaceLength - end - ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties) - telemetrySent = true - end + #telemetry about prometheus metric collections settings for replicaset + if (File.file?(@@promConfigMountPath)) + properties["rsPromInt"] = @@rsPromInterval + properties["rsPromFPC"] = @@rsPromFieldPassCount + properties["rsPromFDC"] = @@rsPromFieldDropCount + properties["rsPromServ"] = @@rsPromK8sServiceCount + properties["rsPromUrl"] = @@rsPromUrlCount + properties["rsPromMonPods"] = @@rsPromMonitorPods + properties["rsPromMonPodsNs"] = @@rsPromMonitorPodsNamespaceLength end + ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties) + telemetrySent = true end - router.emit_stream(@tag, eventStream) if eventStream - router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream - router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream - if telemetrySent == true - @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i - end - @@istestvar = ENV["ISTEST"] - if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) - $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") - end - #:optimize:kubeperf merge - begin - #if(!nodeInventory.empty?) - nodeMetricDataItems = [] - #allocatable metrics @ node level - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "cpu", "cpuAllocatableNanoCores", batchTime)) - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "memory", "memoryAllocatableBytes", batchTime)) - #capacity metrics @ node level - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores", batchTime)) - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes", batchTime)) + end + router.emit_stream(@tag, eventStream) if eventStream + router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream + router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream + if telemetrySent == true + @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i + end + @@istestvar = ENV["ISTEST"] + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) + $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") + end + #:optimize:kubeperf merge + begin + #if(!nodeInventory.empty?) + nodeMetricDataItems = [] + #allocatable metrics @ node level + nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "cpu", "cpuAllocatableNanoCores", batchTime)) + nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "memory", "memoryAllocatableBytes", batchTime)) + #capacity metrics @ node level + nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores", batchTime)) + nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes", batchTime)) - kubePerfEventStream = MultiEventStream.new + kubePerfEventStream = MultiEventStream.new - nodeMetricDataItems.each do |record| - record['DataType'] = "LINUX_PERF_BLOB" - record['IPName'] = "LogManagement" - kubePerfEventStream.add(emitTime, record) if record - end - #end - router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream - rescue => errorStr - $log.warn "Failed in enumerate for KubePerf from in_kube_nodes : #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end - #:optimize:end kubeperf merge + nodeMetricDataItems.each do |record| + record["DataType"] = "LINUX_PERF_BLOB" + record["IPName"] = "LogManagement" + kubePerfEventStream.add(emitTime, record) if record + end + #end + router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream + rescue => errorStr + $log.warn "Failed in enumerate for KubePerf from in_kube_nodes : #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end - + #:optimize:end kubeperf merge + rescue => errorStr $log.warn "Failed to retrieve node inventory: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) @@ -299,22 +257,22 @@ def run_periodic @nextTimeToRun = Time.now @waitTimeout = @run_interval until done - @nextTimeToRun = @nextTimeToRun + @run_interval - @now = Time.now - if @nextTimeToRun <= @now - @waitTimeout = 1 - @nextTimeToRun = @now - else - @waitTimeout = @nextTimeToRun - @now - end - @condition.wait(@mutex, @waitTimeout) + @nextTimeToRun = @nextTimeToRun + @run_interval + @now = Time.now + if @nextTimeToRun <= @now + @waitTimeout = 1 + @nextTimeToRun = @now + else + @waitTimeout = @nextTimeToRun - @now + end + @condition.wait(@mutex, @waitTimeout) done = @finished @mutex.unlock if !done begin - $log.info("in_kube_nodes::run_periodic.enumerate.start #{Time.now.utc.iso8601}") - enumerate - $log.info("in_kube_nodes::run_periodic.enumerate.end #{Time.now.utc.iso8601}") + $log.info("in_kube_nodes::run_periodic.enumerate.start #{Time.now.utc.iso8601}") + enumerate + $log.info("in_kube_nodes::run_periodic.enumerate.end #{Time.now.utc.iso8601}") rescue => errorStr $log.warn "in_kube_nodes::run_periodic: enumerate Failed to retrieve node inventory: #{errorStr}" ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index b7cb1f649..28b20bfc0 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -86,7 +86,7 @@ def enumerate(podList = nil) $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}") $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") - if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) + if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?) parse_and_emit_records(podInventory, serviceList, batchTime) else $log.warn "in_kube_podinventory::enumerate:Received empty podInventory" @@ -95,7 +95,7 @@ def enumerate(podList = nil) #If we receive a continuation token, make calls, process and flush data until we have processed all data while (!continuationToken.nil? && !continuationToken.empty?) continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") - if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) + if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?) parse_and_emit_records(podInventory, serviceList, batchTime) else $log.warn "in_kube_podinventory::enumerate:Received empty podInventory" From 54f98a335222a1f282eaee94904693303d1e1a69 Mon Sep 17 00:00:00 2001 From: rashmy Date: Tue, 26 Nov 2019 17:47:45 -0800 Subject: [PATCH 09/15] changes --- source/code/plugin/KubernetesApiClient.rb | 13 ++++++------- source/code/plugin/in_kube_nodes.rb | 12 ++++++++++-- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index 8f5382c02..a1ff0720e 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -552,21 +552,20 @@ def getResourcesAndContinuationToken(uri) continuationToken = nil resourceInventory = nil begin - $log.info("KubernetesApiClient::getResourcesAndContinuationToken : Getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}") - resourceInfo = KubernetesApiClient.getKubeResourceInfo(uri) - $log.info("KubernetesApiClient::getResourcesAndContinuationToken : Done getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}") + @Log.info "KubernetesApiClient::getResourcesAndContinuationToken : Getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}" + resourceInfo = getKubeResourceInfo(uri) + @Log.info "KubernetesApiClient::getResourcesAndContinuationToken : Done getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}" if !resourceInfo.nil? - $log.info("KubernetesApiClient::getResourcesAndContinuationToken:Start:Parsing data using yajl @ #{Time.now.utc.iso8601}") + @Log.info "KubernetesApiClient::getResourcesAndContinuationToken:Start:Parsing data for #{uri} using yajl @ #{Time.now.utc.iso8601}" resourceInventory = Yajl::Parser.parse(StringIO.new(resourceInfo.body)) - $log.info("KubernetesApiClient::getResourcesAndContinuationToken:End:Parsing data using yajl @ #{Time.now.utc.iso8601}") + @Log.info "KubernetesApiClient::getResourcesAndContinuationToken:End:Parsing data for #{uri} using yajl @ #{Time.now.utc.iso8601}" resourceInfo = nil end if (!resourceInventory.nil? && !resourceInventory["metadata"].nil?) continuationToken = resourceInventory["metadata"]["continue"] end rescue => errorStr - $log.warn "KubernetesApiClient::getResourcesAndContinuationToken:Failed in get resources and continuation token: #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) + @Log.warn "KubernetesApiClient::getResourcesAndContinuationToken:Failed in get resources for #{uri} and continuation token: #{errorStr}" ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) resourceInventory = nil end diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index c2f8af2cd..3d31f659c 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -71,7 +71,7 @@ def enumerate $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken("nodes?limit=#{@NODES_CHUNK_SIZE}") $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") - if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].empty?) + if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?) parse_and_emit_records(nodeInventory, batchTime) else $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory" @@ -79,19 +79,22 @@ def enumerate #If we receive a continuation token, make calls, process and flush data until we have processed all data while (!continuationToken.nil? && !continuationToken.empty?) + $log.warn "in_kube_nodes: in continuation token block: start" + $log.warn "in_kube_nodes: continuation token: #{continuationToken}" continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken("nodes?limit=#{@NODES_CHUNK_SIZE}&continue=#{continuationToken}") if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?) parse_and_emit_records(nodeInventory, batchTime) else $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory" end + $log.warn "in_kube_nodes: in continuation token block: end" end # Setting this to nil so that we dont hold memory until GC kicks in nodeInventory = nil rescue => errorStr - $log.warn "in_kube_podinventory::enumerate:Failed in enumerate: #{errorStr}" + $log.warn "in_kube_nodes::enumerate:Failed in enumerate: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end @@ -99,6 +102,7 @@ def enumerate # def parse_and_emit_records def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) + $log.warn "in_kube_nodes::parse_and_emit_records:Start #{Time.now.utc.iso8601}" emitTime = currentTime.to_f telemetrySent = false @@ -207,6 +211,7 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) telemetrySent = true end end + $log.warn "in_kube_nodes::Done iterating through all the nodes - emitting" router.emit_stream(@tag, eventStream) if eventStream router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream @@ -220,6 +225,7 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) #:optimize:kubeperf merge begin #if(!nodeInventory.empty?) + $log.warn "in_kube_nodes::Perf Start" nodeMetricDataItems = [] #allocatable metrics @ node level nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "cpu", "cpuAllocatableNanoCores", batchTime)) @@ -237,6 +243,7 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) end #end router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream + $log.warn "in_kube_nodes::Perf End" rescue => errorStr $log.warn "Failed in enumerate for KubePerf from in_kube_nodes : #{errorStr}" $log.debug_backtrace(errorStr.backtrace) @@ -249,6 +256,7 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end + $log.warn "in_kube_nodes::parse_and_emit_records:End #{Time.now.utc.iso8601}" end def run_periodic From df9279f625f6f7e200434a64bf694463a289caf8 Mon Sep 17 00:00:00 2001 From: rashmy Date: Tue, 26 Nov 2019 18:10:47 -0800 Subject: [PATCH 10/15] changes --- source/code/plugin/in_kube_nodes.rb | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index 3d31f659c..1ad59b0e3 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -79,20 +79,16 @@ def enumerate #If we receive a continuation token, make calls, process and flush data until we have processed all data while (!continuationToken.nil? && !continuationToken.empty?) - $log.warn "in_kube_nodes: in continuation token block: start" - $log.warn "in_kube_nodes: continuation token: #{continuationToken}" continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken("nodes?limit=#{@NODES_CHUNK_SIZE}&continue=#{continuationToken}") if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?) parse_and_emit_records(nodeInventory, batchTime) else $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory" end - $log.warn "in_kube_nodes: in continuation token block: end" end # Setting this to nil so that we dont hold memory until GC kicks in nodeInventory = nil - rescue => errorStr $log.warn "in_kube_nodes::enumerate:Failed in enumerate: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) @@ -102,11 +98,10 @@ def enumerate # def parse_and_emit_records def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) - $log.warn "in_kube_nodes::parse_and_emit_records:Start #{Time.now.utc.iso8601}" - emitTime = currentTime.to_f - telemetrySent = false - begin + currentTime = Time.now + emitTime = currentTime.to_f + telemetrySent = false eventStream = MultiEventStream.new containerNodeInventoryEventStream = MultiEventStream.new #get node inventory @@ -211,7 +206,6 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) telemetrySent = true end end - $log.warn "in_kube_nodes::Done iterating through all the nodes - emitting" router.emit_stream(@tag, eventStream) if eventStream router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream @@ -225,7 +219,6 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) #:optimize:kubeperf merge begin #if(!nodeInventory.empty?) - $log.warn "in_kube_nodes::Perf Start" nodeMetricDataItems = [] #allocatable metrics @ node level nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "cpu", "cpuAllocatableNanoCores", batchTime)) @@ -243,7 +236,6 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) end #end router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream - $log.warn "in_kube_nodes::Perf End" rescue => errorStr $log.warn "Failed in enumerate for KubePerf from in_kube_nodes : #{errorStr}" $log.debug_backtrace(errorStr.backtrace) From e976df064b1a6e7303f24edbb9210745a349ff77 Mon Sep 17 00:00:00 2001 From: rashmy Date: Tue, 26 Nov 2019 19:40:55 -0800 Subject: [PATCH 11/15] changes --- source/code/plugin/in_kube_events.rb | 146 +++++++++++++++++---------- 1 file changed, 94 insertions(+), 52 deletions(-) diff --git a/source/code/plugin/in_kube_events.rb b/source/code/plugin/in_kube_events.rb index 82fb88b70..fac54c6eb 100644 --- a/source/code/plugin/in_kube_events.rb +++ b/source/code/plugin/in_kube_events.rb @@ -9,14 +9,15 @@ class Kube_Event_Input < Input def initialize super - require 'yajl/json_gem' - require 'yajl' + require "yajl/json_gem" + require "yajl" require "time" require_relative "KubernetesApiClient" require_relative "oms_common" require_relative "omslog" require_relative "ApplicationInsightsUtility" + @EVENTS_CHUNK_SIZE = 1000 end config_param :run_interval, :time, :default => 60 @@ -45,66 +46,107 @@ def shutdown end end - def enumerate(eventList = nil) + def enumerate + begin + eventList = nil + currentTime = Time.now + batchTime = currentTime.utc.iso8601 + eventQueryState = getEventQueryState + newEventQueryState = [] + + # Initializing continuation token to nil + continuationToken = nil + $log.info("in_kube_events::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") + continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}") + $log.info("in_kube_events::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?) + newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime) + else + $log.warn "in_kube_events::enumerate:Received empty eventList" + end + + #If we receive a continuation token, make calls, process and flush data until we have processed all data + while (!continuationToken.nil? && !continuationToken.empty?) + continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}&continue=#{continuationToken}") + if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?) + newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime) + else + $log.warn "in_kube_events::enumerate:Received empty eventList" + end + end + + # Setting this to nil so that we dont hold memory until GC kicks in + eventList = nil + writeEventQueryState(newEventQueryState) + rescue => errorStr + $log.warn "in_kube_events::enumerate:Failed in enumerate: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + end # end enumerate + + # def enumerate(eventList = nil) + def parse_and_emit_records(events, eventQueryState, newEventQueryState, batchTime = Time.utc.iso8601) currentTime = Time.now emitTime = currentTime.to_f - batchTime = currentTime.utc.iso8601 + # batchTime = currentTime.utc.iso8601 - events = eventList - $log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}") - eventInfo = KubernetesApiClient.getKubeResourceInfo("events?fieldSelector=type!=Normal") - $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}") + # events = eventList + # $log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}") + # eventInfo = KubernetesApiClient.getKubeResourceInfo("events?fieldSelector=type!=Normal") + # $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}") - if !eventInfo.nil? - events = Yajl::Parser.parse(StringIO.new(eventInfo.body)) - end + # if !eventInfo.nil? + # events = Yajl::Parser.parse(StringIO.new(eventInfo.body)) + # end - eventQueryState = getEventQueryState - newEventQueryState = [] + # eventQueryState = getEventQueryState + # newEventQueryState = [] begin - if (!events.nil? && !events.empty? && !events["items"].nil?) - eventStream = MultiEventStream.new - events["items"].each do |items| - record = {} - # - Not sure if ingestion has the below mapping for this custom type. Fix it as part of fixed type conversion - record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - eventId = items["metadata"]["uid"] + "/" + items["count"].to_s - newEventQueryState.push(eventId) - if !eventQueryState.empty? && eventQueryState.include?(eventId) - next - end - record["ObjectKind"] = items["involvedObject"]["kind"] - record["Namespace"] = items["involvedObject"]["namespace"] - record["Name"] = items["involvedObject"]["name"] - record["Reason"] = items["reason"] - record["Message"] = items["message"] - record["Type"] = items["type"] - record["TimeGenerated"] = items["metadata"]["creationTimestamp"] - record["SourceComponent"] = items["source"]["component"] - record["FirstSeen"] = items["firstTimestamp"] - record["LastSeen"] = items["lastTimestamp"] - record["Count"] = items["count"] - if items["source"].key?("host") - record["Computer"] = items["source"]["host"] - else - record["Computer"] = (OMS::Common.get_hostname) - end - record['ClusterName'] = KubernetesApiClient.getClusterName - record["ClusterId"] = KubernetesApiClient.getClusterId - wrapper = { - "DataType" => "KUBE_EVENTS_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [record.each { |k, v| record[k] = v }], - } - eventStream.add(emitTime, wrapper) if wrapper + # if (!events.nil? && !events.empty? && !events["items"].nil?) + eventStream = MultiEventStream.new + events["items"].each do |items| + record = {} + # - Not sure if ingestion has the below mapping for this custom type. Fix it as part of fixed type conversion + record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + eventId = items["metadata"]["uid"] + "/" + items["count"].to_s + newEventQueryState.push(eventId) + if !eventQueryState.empty? && eventQueryState.include?(eventId) + next end - router.emit_stream(@tag, eventStream) if eventStream - end - writeEventQueryState(newEventQueryState) + record["ObjectKind"] = items["involvedObject"]["kind"] + record["Namespace"] = items["involvedObject"]["namespace"] + record["Name"] = items["involvedObject"]["name"] + record["Reason"] = items["reason"] + record["Message"] = items["message"] + record["Type"] = items["type"] + record["TimeGenerated"] = items["metadata"]["creationTimestamp"] + record["SourceComponent"] = items["source"]["component"] + record["FirstSeen"] = items["firstTimestamp"] + record["LastSeen"] = items["lastTimestamp"] + record["Count"] = items["count"] + if items["source"].key?("host") + record["Computer"] = items["source"]["host"] + else + record["Computer"] = (OMS::Common.get_hostname) + end + record["ClusterName"] = KubernetesApiClient.getClusterName + record["ClusterId"] = KubernetesApiClient.getClusterId + wrapper = { + "DataType" => "KUBE_EVENTS_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [record.each { |k, v| record[k] = v }], + } + eventStream.add(emitTime, wrapper) if wrapper + end + router.emit_stream(@tag, eventStream) if eventStream + # end + # writeEventQueryState(newEventQueryState) rescue => errorStr $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end + end + return newEventQueryState end def run_periodic From b2d72df2d97a2c06dc79e853ef6f2f098121871a Mon Sep 17 00:00:00 2001 From: rashmy Date: Tue, 26 Nov 2019 19:45:12 -0800 Subject: [PATCH 12/15] changes --- source/code/plugin/in_kube_events.rb | 21 ++------------------- 1 file changed, 2 insertions(+), 19 deletions(-) diff --git a/source/code/plugin/in_kube_events.rb b/source/code/plugin/in_kube_events.rb index fac54c6eb..b55bad141 100644 --- a/source/code/plugin/in_kube_events.rb +++ b/source/code/plugin/in_kube_events.rb @@ -56,9 +56,9 @@ def enumerate # Initializing continuation token to nil continuationToken = nil - $log.info("in_kube_events::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") + $log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}") continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}") - $log.info("in_kube_events::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}") if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?) newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime) else @@ -85,25 +85,10 @@ def enumerate end end # end enumerate - # def enumerate(eventList = nil) def parse_and_emit_records(events, eventQueryState, newEventQueryState, batchTime = Time.utc.iso8601) currentTime = Time.now emitTime = currentTime.to_f - # batchTime = currentTime.utc.iso8601 - - # events = eventList - # $log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}") - # eventInfo = KubernetesApiClient.getKubeResourceInfo("events?fieldSelector=type!=Normal") - # $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}") - - # if !eventInfo.nil? - # events = Yajl::Parser.parse(StringIO.new(eventInfo.body)) - # end - - # eventQueryState = getEventQueryState - # newEventQueryState = [] begin - # if (!events.nil? && !events.empty? && !events["items"].nil?) eventStream = MultiEventStream.new events["items"].each do |items| record = {} @@ -140,8 +125,6 @@ def parse_and_emit_records(events, eventQueryState, newEventQueryState, batchTim eventStream.add(emitTime, wrapper) if wrapper end router.emit_stream(@tag, eventStream) if eventStream - # end - # writeEventQueryState(newEventQueryState) rescue => errorStr $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) From c926ede83a3b56daccfa30c39ad6589cb4b7d380 Mon Sep 17 00:00:00 2001 From: rashmy Date: Mon, 2 Dec 2019 11:47:56 -0800 Subject: [PATCH 13/15] adding open and read timeouts for api client --- source/code/plugin/KubernetesApiClient.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index a1ff0720e..02a81f9c0 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -43,7 +43,7 @@ def getKubeResourceInfo(resource, api_group: nil) if !File.exist?(@@CaFile) raise "#{@@CaFile} doesnt exist" else - Net::HTTP.start(uri.host, uri.port, :use_ssl => true, :ca_file => @@CaFile, :verify_mode => OpenSSL::SSL::VERIFY_PEER) do |http| + Net::HTTP.start(uri.host, uri.port, :use_ssl => true, :ca_file => @@CaFile, :verify_mode => OpenSSL::SSL::VERIFY_PEER, :open_timeout => 20, :read_timeout => 40) do |http| kubeApiRequest = Net::HTTP::Get.new(uri.request_uri) kubeApiRequest["Authorization"] = "Bearer " + getTokenStr @Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}" From 3a69461bf04e966df5fc6cd8061433d90f0aee45 Mon Sep 17 00:00:00 2001 From: rashmy Date: Mon, 2 Dec 2019 14:25:41 -0800 Subject: [PATCH 14/15] removing comments --- source/code/plugin/KubernetesApiClient.rb | 1 - source/code/plugin/in_kube_nodes.rb | 1 - 2 files changed, 2 deletions(-) diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index 02a81f9c0..43c4c20d3 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -547,7 +547,6 @@ def getMetricNumericValue(metricName, metricVal) return metricValue end # getMetricNumericValue - # def parseJsonForContinuationToken(resourceInfo) def getResourcesAndContinuationToken(uri) continuationToken = nil resourceInventory = nil diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index 1ad59b0e3..0e02a68fa 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -96,7 +96,6 @@ def enumerate end end # end enumerate - # def parse_and_emit_records def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) begin currentTime = Time.now From 9f020387903c5d76bf031c43ceeb1c87426f145a Mon Sep 17 00:00:00 2001 From: rashmy Date: Mon, 2 Dec 2019 21:58:23 -0800 Subject: [PATCH 15/15] updating chunk size --- source/code/plugin/in_kube_events.rb | 4 +++- source/code/plugin/in_kube_nodes.rb | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/code/plugin/in_kube_events.rb b/source/code/plugin/in_kube_events.rb index b55bad141..6116cb62d 100644 --- a/source/code/plugin/in_kube_events.rb +++ b/source/code/plugin/in_kube_events.rb @@ -17,7 +17,9 @@ def initialize require_relative "oms_common" require_relative "omslog" require_relative "ApplicationInsightsUtility" - @EVENTS_CHUNK_SIZE = 1000 + + # 30000 events account to approximately 5MB + @EVENTS_CHUNK_SIZE = 30000 end config_param :run_interval, :time, :default => 60 diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index 0e02a68fa..fa0994f43 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -30,7 +30,7 @@ def initialize require_relative "ApplicationInsightsUtility" require_relative "oms_common" require_relative "omslog" - @NODES_CHUNK_SIZE = "350" + @NODES_CHUNK_SIZE = "400" end config_param :run_interval, :time, :default => 60