diff --git a/installer/conf/container.conf b/installer/conf/container.conf old mode 100755 new mode 100644 diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index e912ea6ef..cc1ec35f8 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -13,7 +13,8 @@ class Kube_PodInventory_Input < Input def initialize super require "yaml" - require 'yajl/json_gem' + require "yajl/json_gem" + require "yajl" require "set" require "time" @@ -21,6 +22,12 @@ def initialize require_relative "ApplicationInsightsUtility" require_relative "oms_common" require_relative "omslog" + + @PODS_CHUNK_SIZE = "1500" + @podCount = 0 + @controllerSet = Set.new [] + @winContainerCount = 0 + @controllerData = {} end config_param :run_interval, :time, :default => 60 @@ -50,37 +57,89 @@ def shutdown end end - def enumerate(podList = nil) - podInventory = podList - currentTime = Time.now - $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") - podInfo = KubernetesApiClient.getKubeResourceInfo("pods") - $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") - - if !podInfo.nil? - podInventory = JSON.parse(podInfo.body) - end - + def processPodChunks(podInventory, serviceList, batchTime) begin if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) - batchTime = currentTime.utc.iso8601 - #get pod inventory & services - $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") - serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body) - $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") - parse_and_emit_records(podInventory, serviceList, batchTime ) + parse_and_emit_records(podInventory, serviceList, batchTime) else - $log.warn "Received empty podInventory" + $log.warn "in_kube_podinventory::processPodChunks:Received empty podInventory" end podInfo = nil podInventory = nil rescue => errorStr - $log.warn "Failed in enumerate pod inventory: #{errorStr}" + $log.warn "in_kube_podinventory::processPodChunks:Failed in process pod chunks: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end + def parsePodsJsonAndProcess(podInfo, serviceList, batchTime) + if !podInfo.nil? + $log.info("in_kube_podinventory::parsePodsJsonAndProcess:Start:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") + podInventory = Yajl::Parser.parse(StringIO.new(podInfo.body)) + $log.info("in_kube_podinventory::parsePodsJsonAndProcess:End:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") + end + if (!podInventory.nil? && !podInventory["metadata"].nil?) + continuationToken = podInventory["metadata"]["continue"] + end + processPodChunks(podInventory, serviceList, batchTime) + return continuationToken + end + + def enumerate(podList = nil) + podInventory = podList + telemetryFlush = false + @podCount = 0 + @controllerSet = Set.new [] + @winContainerCount = 0 + @controllerData = {} + currentTime = Time.now + batchTime = currentTime.utc.iso8601 + + # Get services first so that we dont need to make a call for very chunk + $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") + serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body) + $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") + + # Initializing continuation token to nil + continuationToken = nil + $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") + podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}") + $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") + + continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) + + #If we receive a continuation token, make calls, process and flush data until we have processed all data + while (!continuationToken.nil? && !continuationToken.empty?) + $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") + podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") + $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") + continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) + end + + # Adding telemetry to send pod telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 5) + telemetryFlush = true + end + + # Flush AppInsights telemetry once all the processing is done + if telemetryFlush == true + telemetryProperties = {} + telemetryProperties["Computer"] = @@hostName + ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) + ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {}) + telemetryProperties["ControllerData"] = @controllerData.to_json + ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties) + if @winContainerCount > 0 + telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount + ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) + end + @@podTelemetryTimeTracker = DateTime.now.to_time.to_i + end + end + def populateWindowsContainerInventoryRecord(container, record, containerEnvVariableHash, batchTime) begin containerInventoryRecord = {} @@ -193,15 +252,12 @@ def getContainerEnvironmentVariables(pod, clusterCollectEnvironmentVar) end end - def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8601) + def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8601) currentTime = Time.now emitTime = currentTime.to_f #batchTime = currentTime.utc.iso8601 eventStream = MultiEventStream.new - controllerSet = Set.new [] - controllerData = {} - telemetryFlush = false - winContainerCount = 0 + begin #begin block start # Getting windows nodes from kubeapi winNodes = KubernetesApiClient.getWindowsNodesArray @@ -284,24 +340,17 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8 record["ClusterId"] = KubernetesApiClient.getClusterId record["ClusterName"] = KubernetesApiClient.getClusterName record["ServiceName"] = getServiceNameFromLabels(items["metadata"]["namespace"], items["metadata"]["labels"], serviceList) - # Adding telemetry to send pod telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference / 60 - if (timeDifferenceInMinutes >= 5) - telemetryFlush = true - end + if !items["metadata"]["ownerReferences"].nil? record["ControllerKind"] = items["metadata"]["ownerReferences"][0]["kind"] record["ControllerName"] = items["metadata"]["ownerReferences"][0]["name"] - if telemetryFlush == true - controllerSet.add(record["ControllerKind"] + record["ControllerName"]) - #Adding controller kind to telemetry ro information about customer workload - if (controllerData[record["ControllerKind"]].nil?) - controllerData[record["ControllerKind"]] = 1 - else - controllerValue = controllerData[record["ControllerKind"]] - controllerData[record["ControllerKind"]] += 1 - end + @controllerSet.add(record["ControllerKind"] + record["ControllerName"]) + #Adding controller kind to telemetry ro information about customer workload + if (@controllerData[record["ControllerKind"]].nil?) + @controllerData[record["ControllerKind"]] = 1 + else + controllerValue = @controllerData[record["ControllerKind"]] + @controllerData[record["ControllerKind"]] += 1 end end podRestartCount = 0 @@ -419,7 +468,7 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8 end end # Send container inventory records for containers on windows nodes - winContainerCount += containerInventoryRecords.length + @winContainerCount += containerInventoryRecords.length containerInventoryRecords.each do |cirecord| if !cirecord.nil? ciwrapper = { @@ -436,25 +485,24 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8 router.emit_stream(@@MDMKubePodInventoryTag, eventStream) if eventStream #:optimize:kubeperf merge begin - #if(!podInventory.empty?) + #if(!podInventory.empty?) containerMetricDataItems = [] #hostName = (OMS::Common.get_hostname) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "cpu","cpuRequestNanoCores", batchTime)) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "memory","memoryRequestBytes", batchTime)) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "cpu","cpuLimitNanoCores", batchTime)) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "memory","memoryLimitBytes", batchTime)) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "cpu", "cpuRequestNanoCores", batchTime)) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "memory", "memoryRequestBytes", batchTime)) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "cpu", "cpuLimitNanoCores", batchTime)) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "memory", "memoryLimitBytes", batchTime)) kubePerfEventStream = MultiEventStream.new containerMetricDataItems.each do |record| - record['DataType'] = "LINUX_PERF_BLOB" - record['IPName'] = "LogManagement" + record["DataType"] = "LINUX_PERF_BLOB" + record["IPName"] = "LogManagement" kubePerfEventStream.add(emitTime, record) if record - #router.emit(@tag, time, record) if record + #router.emit(@tag, time, record) if record end #end router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream - rescue => errorStr $log.warn "Failed in parse_and_emit_record for KubePerf from in_kube_podinventory : #{errorStr}" $log.debug_backtrace(errorStr.backtrace) @@ -493,19 +541,9 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8 end #:optimize:end kubeservices merge - if telemetryFlush == true - telemetryProperties = {} - telemetryProperties["Computer"] = @@hostName - ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) - ApplicationInsightsUtility.sendMetricTelemetry("PodCount", podInventory["items"].length, {}) - telemetryProperties["ControllerData"] = controllerData.to_json - ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", controllerSet.length, telemetryProperties) - if winContainerCount > 0 - telemetryProperties["ClusterWideWindowsContainersCount"] = winContainerCount - ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) - end - @@podTelemetryTimeTracker = DateTime.now.to_time.to_i - end + #Updating value for AppInsights telemetry + @podCount += podInventory["items"].length + @@istestvar = ENV["ISTEST"] if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) $log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")