diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index 8adf3f6b7..43c4c20d3 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -2,7 +2,7 @@ # frozen_string_literal: true class KubernetesApiClient - require 'yajl/json_gem' + require "yajl/json_gem" require "logger" require "net/http" require "net/https" @@ -43,7 +43,7 @@ def getKubeResourceInfo(resource, api_group: nil) if !File.exist?(@@CaFile) raise "#{@@CaFile} doesnt exist" else - Net::HTTP.start(uri.host, uri.port, :use_ssl => true, :ca_file => @@CaFile, :verify_mode => OpenSSL::SSL::VERIFY_PEER ) do |http| + Net::HTTP.start(uri.host, uri.port, :use_ssl => true, :ca_file => @@CaFile, :verify_mode => OpenSSL::SSL::VERIFY_PEER, :open_timeout => 20, :read_timeout => 40) do |http| kubeApiRequest = Net::HTTP::Get.new(uri.request_uri) kubeApiRequest["Authorization"] = "Bearer " + getTokenStr @Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}" @@ -333,7 +333,7 @@ def getContainerLogsSinceTime(namespace, pod, container, since, showTimeStamp) return containerLogs end - def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601 ) + def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601) metricItems = [] begin clusterId = getClusterId @@ -546,5 +546,29 @@ def getMetricNumericValue(metricName, metricVal) end return metricValue end # getMetricNumericValue + + def getResourcesAndContinuationToken(uri) + continuationToken = nil + resourceInventory = nil + begin + @Log.info "KubernetesApiClient::getResourcesAndContinuationToken : Getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}" + resourceInfo = getKubeResourceInfo(uri) + @Log.info "KubernetesApiClient::getResourcesAndContinuationToken : Done getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}" + if !resourceInfo.nil? + @Log.info "KubernetesApiClient::getResourcesAndContinuationToken:Start:Parsing data for #{uri} using yajl @ #{Time.now.utc.iso8601}" + resourceInventory = Yajl::Parser.parse(StringIO.new(resourceInfo.body)) + @Log.info "KubernetesApiClient::getResourcesAndContinuationToken:End:Parsing data for #{uri} using yajl @ #{Time.now.utc.iso8601}" + resourceInfo = nil + end + if (!resourceInventory.nil? && !resourceInventory["metadata"].nil?) + continuationToken = resourceInventory["metadata"]["continue"] + end + rescue => errorStr + @Log.warn "KubernetesApiClient::getResourcesAndContinuationToken:Failed in get resources for #{uri} and continuation token: #{errorStr}" + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + resourceInventory = nil + end + return continuationToken, resourceInventory + end #getResourcesAndContinuationToken end end diff --git a/source/code/plugin/in_kube_events.rb b/source/code/plugin/in_kube_events.rb index 82fb88b70..6116cb62d 100644 --- a/source/code/plugin/in_kube_events.rb +++ b/source/code/plugin/in_kube_events.rb @@ -9,14 +9,17 @@ class Kube_Event_Input < Input def initialize super - require 'yajl/json_gem' - require 'yajl' + require "yajl/json_gem" + require "yajl" require "time" require_relative "KubernetesApiClient" require_relative "oms_common" require_relative "omslog" require_relative "ApplicationInsightsUtility" + + # 30000 events account to approximately 5MB + @EVENTS_CHUNK_SIZE = 30000 end config_param :run_interval, :time, :default => 60 @@ -45,66 +48,90 @@ def shutdown end end - def enumerate(eventList = nil) - currentTime = Time.now - emitTime = currentTime.to_f - batchTime = currentTime.utc.iso8601 + def enumerate + begin + eventList = nil + currentTime = Time.now + batchTime = currentTime.utc.iso8601 + eventQueryState = getEventQueryState + newEventQueryState = [] + + # Initializing continuation token to nil + continuationToken = nil + $log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}") + continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}") + $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}") + if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?) + newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime) + else + $log.warn "in_kube_events::enumerate:Received empty eventList" + end - events = eventList - $log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}") - eventInfo = KubernetesApiClient.getKubeResourceInfo("events?fieldSelector=type!=Normal") - $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}") + #If we receive a continuation token, make calls, process and flush data until we have processed all data + while (!continuationToken.nil? && !continuationToken.empty?) + continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}&continue=#{continuationToken}") + if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?) + newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime) + else + $log.warn "in_kube_events::enumerate:Received empty eventList" + end + end - if !eventInfo.nil? - events = Yajl::Parser.parse(StringIO.new(eventInfo.body)) + # Setting this to nil so that we dont hold memory until GC kicks in + eventList = nil + writeEventQueryState(newEventQueryState) + rescue => errorStr + $log.warn "in_kube_events::enumerate:Failed in enumerate: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end + end # end enumerate - eventQueryState = getEventQueryState - newEventQueryState = [] + def parse_and_emit_records(events, eventQueryState, newEventQueryState, batchTime = Time.utc.iso8601) + currentTime = Time.now + emitTime = currentTime.to_f begin - if (!events.nil? && !events.empty? && !events["items"].nil?) - eventStream = MultiEventStream.new - events["items"].each do |items| - record = {} - # - Not sure if ingestion has the below mapping for this custom type. Fix it as part of fixed type conversion - record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - eventId = items["metadata"]["uid"] + "/" + items["count"].to_s - newEventQueryState.push(eventId) - if !eventQueryState.empty? && eventQueryState.include?(eventId) - next - end - record["ObjectKind"] = items["involvedObject"]["kind"] - record["Namespace"] = items["involvedObject"]["namespace"] - record["Name"] = items["involvedObject"]["name"] - record["Reason"] = items["reason"] - record["Message"] = items["message"] - record["Type"] = items["type"] - record["TimeGenerated"] = items["metadata"]["creationTimestamp"] - record["SourceComponent"] = items["source"]["component"] - record["FirstSeen"] = items["firstTimestamp"] - record["LastSeen"] = items["lastTimestamp"] - record["Count"] = items["count"] - if items["source"].key?("host") - record["Computer"] = items["source"]["host"] - else - record["Computer"] = (OMS::Common.get_hostname) - end - record['ClusterName'] = KubernetesApiClient.getClusterName - record["ClusterId"] = KubernetesApiClient.getClusterId - wrapper = { - "DataType" => "KUBE_EVENTS_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [record.each { |k, v| record[k] = v }], - } - eventStream.add(emitTime, wrapper) if wrapper + eventStream = MultiEventStream.new + events["items"].each do |items| + record = {} + # - Not sure if ingestion has the below mapping for this custom type. Fix it as part of fixed type conversion + record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + eventId = items["metadata"]["uid"] + "/" + items["count"].to_s + newEventQueryState.push(eventId) + if !eventQueryState.empty? && eventQueryState.include?(eventId) + next end - router.emit_stream(@tag, eventStream) if eventStream - end - writeEventQueryState(newEventQueryState) + record["ObjectKind"] = items["involvedObject"]["kind"] + record["Namespace"] = items["involvedObject"]["namespace"] + record["Name"] = items["involvedObject"]["name"] + record["Reason"] = items["reason"] + record["Message"] = items["message"] + record["Type"] = items["type"] + record["TimeGenerated"] = items["metadata"]["creationTimestamp"] + record["SourceComponent"] = items["source"]["component"] + record["FirstSeen"] = items["firstTimestamp"] + record["LastSeen"] = items["lastTimestamp"] + record["Count"] = items["count"] + if items["source"].key?("host") + record["Computer"] = items["source"]["host"] + else + record["Computer"] = (OMS::Common.get_hostname) + end + record["ClusterName"] = KubernetesApiClient.getClusterName + record["ClusterId"] = KubernetesApiClient.getClusterId + wrapper = { + "DataType" => "KUBE_EVENTS_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [record.each { |k, v| record[k] = v }], + } + eventStream.add(emitTime, wrapper) if wrapper + end + router.emit_stream(@tag, eventStream) if eventStream rescue => errorStr $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end + end + return newEventQueryState end def run_periodic diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index 92fece728..fa0994f43 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -22,14 +22,15 @@ class Kube_nodeInventory_Input < Input def initialize super require "yaml" - require 'yajl/json_gem' - require 'yajl' + require "yajl/json_gem" + require "yajl" require "time" require_relative "KubernetesApiClient" require_relative "ApplicationInsightsUtility" require_relative "oms_common" require_relative "omslog" + @NODES_CHUNK_SIZE = "400" end config_param :run_interval, :time, :default => 60 @@ -60,172 +61,193 @@ def shutdown end def enumerate - currentTime = Time.now - emitTime = currentTime.to_f - batchTime = currentTime.utc.iso8601 - telemetrySent = false + begin + nodeInventory = nil + currentTime = Time.now + batchTime = currentTime.utc.iso8601 - nodeInventory = nil + # Initializing continuation token to nil + continuationToken = nil + $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") + continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken("nodes?limit=#{@NODES_CHUNK_SIZE}") + $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?) + parse_and_emit_records(nodeInventory, batchTime) + else + $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory" + end - $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") - nodeInfo = KubernetesApiClient.getKubeResourceInfo("nodes") - $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + #If we receive a continuation token, make calls, process and flush data until we have processed all data + while (!continuationToken.nil? && !continuationToken.empty?) + continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken("nodes?limit=#{@NODES_CHUNK_SIZE}&continue=#{continuationToken}") + if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?) + parse_and_emit_records(nodeInventory, batchTime) + else + $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory" + end + end - if !nodeInfo.nil? - nodeInventory = Yajl::Parser.parse(StringIO.new(nodeInfo.body)) + # Setting this to nil so that we dont hold memory until GC kicks in + nodeInventory = nil + rescue => errorStr + $log.warn "in_kube_nodes::enumerate:Failed in enumerate: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end + end # end enumerate + def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) begin - if (!nodeInventory.nil? && !nodeInventory.empty?) - eventStream = MultiEventStream.new - containerNodeInventoryEventStream = MultiEventStream.new - if !nodeInventory["items"].nil? - #get node inventory - nodeInventory["items"].each do |items| - record = {} - # Sending records for ContainerNodeInventory - containerNodeInventoryRecord = {} - containerNodeInventoryRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - containerNodeInventoryRecord["Computer"] = items["metadata"]["name"] + currentTime = Time.now + emitTime = currentTime.to_f + telemetrySent = false + eventStream = MultiEventStream.new + containerNodeInventoryEventStream = MultiEventStream.new + #get node inventory + nodeInventory["items"].each do |items| + record = {} + # Sending records for ContainerNodeInventory + containerNodeInventoryRecord = {} + containerNodeInventoryRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + containerNodeInventoryRecord["Computer"] = items["metadata"]["name"] - record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - record["Computer"] = items["metadata"]["name"] - record["ClusterName"] = KubernetesApiClient.getClusterName - record["ClusterId"] = KubernetesApiClient.getClusterId - record["CreationTimeStamp"] = items["metadata"]["creationTimestamp"] - record["Labels"] = [items["metadata"]["labels"]] - record["Status"] = "" + record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + record["Computer"] = items["metadata"]["name"] + record["ClusterName"] = KubernetesApiClient.getClusterName + record["ClusterId"] = KubernetesApiClient.getClusterId + record["CreationTimeStamp"] = items["metadata"]["creationTimestamp"] + record["Labels"] = [items["metadata"]["labels"]] + record["Status"] = "" - if !items["spec"]["providerID"].nil? && !items["spec"]["providerID"].empty? - if File.file?(@@AzStackCloudFileName) # existence of this file indicates agent running on azstack - record["KubernetesProviderID"] = "azurestack" - else - record["KubernetesProviderID"] = items["spec"]["providerID"] - end - else - record["KubernetesProviderID"] = "onprem" - end + if !items["spec"]["providerID"].nil? && !items["spec"]["providerID"].empty? + if File.file?(@@AzStackCloudFileName) # existence of this file indicates agent running on azstack + record["KubernetesProviderID"] = "azurestack" + else + record["KubernetesProviderID"] = items["spec"]["providerID"] + end + else + record["KubernetesProviderID"] = "onprem" + end - # Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions. - # We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we - # populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk" - # implying that the node is ready for hosting pods, however its out of disk. + # Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions. + # We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we + # populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk" + # implying that the node is ready for hosting pods, however its out of disk. - if items["status"].key?("conditions") && !items["status"]["conditions"].empty? - allNodeConditions = "" - items["status"]["conditions"].each do |condition| - if condition["status"] == "True" - if !allNodeConditions.empty? - allNodeConditions = allNodeConditions + "," + condition["type"] - else - allNodeConditions = condition["type"] - end - end - #collect last transition to/from ready (no matter ready is true/false) - if condition["type"] == "Ready" && !condition["lastTransitionTime"].nil? - record["LastTransitionTimeReady"] = condition["lastTransitionTime"] - end - end + if items["status"].key?("conditions") && !items["status"]["conditions"].empty? + allNodeConditions = "" + items["status"]["conditions"].each do |condition| + if condition["status"] == "True" if !allNodeConditions.empty? - record["Status"] = allNodeConditions + allNodeConditions = allNodeConditions + "," + condition["type"] + else + allNodeConditions = condition["type"] end end + #collect last transition to/from ready (no matter ready is true/false) + if condition["type"] == "Ready" && !condition["lastTransitionTime"].nil? + record["LastTransitionTimeReady"] = condition["lastTransitionTime"] + end + end + if !allNodeConditions.empty? + record["Status"] = allNodeConditions + end + end - nodeInfo = items["status"]["nodeInfo"] - record["KubeletVersion"] = nodeInfo["kubeletVersion"] - record["KubeProxyVersion"] = nodeInfo["kubeProxyVersion"] - containerNodeInventoryRecord["OperatingSystem"] = nodeInfo["osImage"] - dockerVersion = nodeInfo["containerRuntimeVersion"] - dockerVersion.slice! "docker://" - containerNodeInventoryRecord["DockerVersion"] = dockerVersion - # ContainerNodeInventory data for docker version and operating system. - containerNodeInventoryWrapper = { - "DataType" => "CONTAINER_NODE_INVENTORY_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [containerNodeInventoryRecord.each { |k, v| containerNodeInventoryRecord[k] = v }], - } - containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryWrapper) if containerNodeInventoryWrapper + nodeInfo = items["status"]["nodeInfo"] + record["KubeletVersion"] = nodeInfo["kubeletVersion"] + record["KubeProxyVersion"] = nodeInfo["kubeProxyVersion"] + containerNodeInventoryRecord["OperatingSystem"] = nodeInfo["osImage"] + dockerVersion = nodeInfo["containerRuntimeVersion"] + dockerVersion.slice! "docker://" + containerNodeInventoryRecord["DockerVersion"] = dockerVersion + # ContainerNodeInventory data for docker version and operating system. + containerNodeInventoryWrapper = { + "DataType" => "CONTAINER_NODE_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [containerNodeInventoryRecord.each { |k, v| containerNodeInventoryRecord[k] = v }], + } + containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryWrapper) if containerNodeInventoryWrapper - wrapper = { - "DataType" => "KUBE_NODE_INVENTORY_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [record.each { |k, v| record[k] = v }], - } - eventStream.add(emitTime, wrapper) if wrapper - # Adding telemetry to send node telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference / 60 - if (timeDifferenceInMinutes >= 10) - properties = {} - properties["Computer"] = record["Computer"] - properties["KubeletVersion"] = record["KubeletVersion"] - properties["OperatingSystem"] = nodeInfo["operatingSystem"] - properties["DockerVersion"] = dockerVersion - properties["KubernetesProviderID"] = record["KubernetesProviderID"] - properties["KernelVersion"] = nodeInfo["kernelVersion"] - properties["OSImage"] = nodeInfo["osImage"] + wrapper = { + "DataType" => "KUBE_NODE_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [record.each { |k, v| record[k] = v }], + } + eventStream.add(emitTime, wrapper) if wrapper + # Adding telemetry to send node telemetry every 10 minutes + timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 10) + properties = {} + properties["Computer"] = record["Computer"] + properties["KubeletVersion"] = record["KubeletVersion"] + properties["OperatingSystem"] = nodeInfo["operatingSystem"] + properties["DockerVersion"] = dockerVersion + properties["KubernetesProviderID"] = record["KubernetesProviderID"] + properties["KernelVersion"] = nodeInfo["kernelVersion"] + properties["OSImage"] = nodeInfo["osImage"] - capacityInfo = items["status"]["capacity"] - ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties) + capacityInfo = items["status"]["capacity"] + ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties) - #telemetry about prometheus metric collections settings for replicaset - if (File.file?(@@promConfigMountPath)) - properties["rsPromInt"] = @@rsPromInterval - properties["rsPromFPC"] = @@rsPromFieldPassCount - properties["rsPromFDC"] = @@rsPromFieldDropCount - properties["rsPromServ"] = @@rsPromK8sServiceCount - properties["rsPromUrl"] = @@rsPromUrlCount - properties["rsPromMonPods"] = @@rsPromMonitorPods - properties["rsPromMonPodsNs"] = @@rsPromMonitorPodsNamespaceLength - end - ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties) - telemetrySent = true - end + #telemetry about prometheus metric collections settings for replicaset + if (File.file?(@@promConfigMountPath)) + properties["rsPromInt"] = @@rsPromInterval + properties["rsPromFPC"] = @@rsPromFieldPassCount + properties["rsPromFDC"] = @@rsPromFieldDropCount + properties["rsPromServ"] = @@rsPromK8sServiceCount + properties["rsPromUrl"] = @@rsPromUrlCount + properties["rsPromMonPods"] = @@rsPromMonitorPods + properties["rsPromMonPodsNs"] = @@rsPromMonitorPodsNamespaceLength end + ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties) + telemetrySent = true end - router.emit_stream(@tag, eventStream) if eventStream - router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream - router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream - if telemetrySent == true - @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i - end - @@istestvar = ENV["ISTEST"] - if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) - $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") - end - #:optimize:kubeperf merge - begin - #if(!nodeInventory.empty?) - nodeMetricDataItems = [] - #allocatable metrics @ node level - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "cpu", "cpuAllocatableNanoCores", batchTime)) - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "memory", "memoryAllocatableBytes", batchTime)) - #capacity metrics @ node level - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores", batchTime)) - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes", batchTime)) + end + router.emit_stream(@tag, eventStream) if eventStream + router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream + router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream + if telemetrySent == true + @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i + end + @@istestvar = ENV["ISTEST"] + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) + $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") + end + #:optimize:kubeperf merge + begin + #if(!nodeInventory.empty?) + nodeMetricDataItems = [] + #allocatable metrics @ node level + nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "cpu", "cpuAllocatableNanoCores", batchTime)) + nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "memory", "memoryAllocatableBytes", batchTime)) + #capacity metrics @ node level + nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores", batchTime)) + nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes", batchTime)) - kubePerfEventStream = MultiEventStream.new + kubePerfEventStream = MultiEventStream.new - nodeMetricDataItems.each do |record| - record['DataType'] = "LINUX_PERF_BLOB" - record['IPName'] = "LogManagement" - kubePerfEventStream.add(emitTime, record) if record - end - #end - router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream - rescue => errorStr - $log.warn "Failed in enumerate for KubePerf from in_kube_nodes : #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end - #:optimize:end kubeperf merge + nodeMetricDataItems.each do |record| + record["DataType"] = "LINUX_PERF_BLOB" + record["IPName"] = "LogManagement" + kubePerfEventStream.add(emitTime, record) if record + end + #end + router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream + rescue => errorStr + $log.warn "Failed in enumerate for KubePerf from in_kube_nodes : #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end - + #:optimize:end kubeperf merge + rescue => errorStr $log.warn "Failed to retrieve node inventory: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end + $log.warn "in_kube_nodes::parse_and_emit_records:End #{Time.now.utc.iso8601}" end def run_periodic @@ -234,22 +256,22 @@ def run_periodic @nextTimeToRun = Time.now @waitTimeout = @run_interval until done - @nextTimeToRun = @nextTimeToRun + @run_interval - @now = Time.now - if @nextTimeToRun <= @now - @waitTimeout = 1 - @nextTimeToRun = @now - else - @waitTimeout = @nextTimeToRun - @now - end - @condition.wait(@mutex, @waitTimeout) + @nextTimeToRun = @nextTimeToRun + @run_interval + @now = Time.now + if @nextTimeToRun <= @now + @waitTimeout = 1 + @nextTimeToRun = @now + else + @waitTimeout = @nextTimeToRun - @now + end + @condition.wait(@mutex, @waitTimeout) done = @finished @mutex.unlock if !done begin - $log.info("in_kube_nodes::run_periodic.enumerate.start #{Time.now.utc.iso8601}") - enumerate - $log.info("in_kube_nodes::run_periodic.enumerate.end #{Time.now.utc.iso8601}") + $log.info("in_kube_nodes::run_periodic.enumerate.start #{Time.now.utc.iso8601}") + enumerate + $log.info("in_kube_nodes::run_periodic.enumerate.end #{Time.now.utc.iso8601}") rescue => errorStr $log.warn "in_kube_nodes::run_periodic: enumerate Failed to retrieve node inventory: #{errorStr}" ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index c9ae75a03..28b20bfc0 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -57,86 +57,80 @@ def shutdown end end - def processPodChunks(podInventory, serviceList, batchTime) + def enumerate(podList = nil) begin - if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) + podInventory = podList + telemetryFlush = false + @podCount = 0 + @controllerSet = Set.new [] + @winContainerCount = 0 + @controllerData = {} + currentTime = Time.now + batchTime = currentTime.utc.iso8601 + + # Get services first so that we dont need to make a call for very chunk + $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") + serviceInfo = KubernetesApiClient.getKubeResourceInfo("services") + # serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body) + $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") + + if !serviceInfo.nil? + $log.info("in_kube_podinventory::enumerate:Start:Parsing services data using yajl @ #{Time.now.utc.iso8601}") + serviceList = Yajl::Parser.parse(StringIO.new(serviceInfo.body)) + $log.info("in_kube_podinventory::enumerate:End:Parsing services data using yajl @ #{Time.now.utc.iso8601}") + serviceInfo = nil + end + + # Initializing continuation token to nil + continuationToken = nil + $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") + continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}") + $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") + if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?) parse_and_emit_records(podInventory, serviceList, batchTime) else - $log.warn "in_kube_podinventory::processPodChunks:Received empty podInventory" + $log.warn "in_kube_podinventory::enumerate:Received empty podInventory" end - podInfo = nil - podInventory = nil - rescue => errorStr - $log.warn "in_kube_podinventory::processPodChunks:Failed in process pod chunks: #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end - end - def parsePodsJsonAndProcess(podInfo, serviceList, batchTime) - if !podInfo.nil? - $log.info("in_kube_podinventory::parsePodsJsonAndProcess:Start:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") - podInventory = Yajl::Parser.parse(StringIO.new(podInfo.body)) - $log.info("in_kube_podinventory::parsePodsJsonAndProcess:End:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}") - end - if (!podInventory.nil? && !podInventory["metadata"].nil?) - continuationToken = podInventory["metadata"]["continue"] - end - processPodChunks(podInventory, serviceList, batchTime) - return continuationToken - end + #If we receive a continuation token, make calls, process and flush data until we have processed all data + while (!continuationToken.nil? && !continuationToken.empty?) + continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") + if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?) + parse_and_emit_records(podInventory, serviceList, batchTime) + else + $log.warn "in_kube_podinventory::enumerate:Received empty podInventory" + end + end - def enumerate(podList = nil) - podInventory = podList - telemetryFlush = false - @podCount = 0 - @controllerSet = Set.new [] - @winContainerCount = 0 - @controllerData = {} - currentTime = Time.now - batchTime = currentTime.utc.iso8601 - - # Get services first so that we dont need to make a call for very chunk - $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") - serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body) - $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") - - # Initializing continuation token to nil - continuationToken = nil - $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") - podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}") - $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") - - continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) - - #If we receive a continuation token, make calls, process and flush data until we have processed all data - while (!continuationToken.nil? && !continuationToken.empty?) - $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") - podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") - $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}") - continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime) - end + # Setting these to nil so that we dont hold memory until GC kicks in + podInventory = nil + serviceList = nil - # Adding telemetry to send pod telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference / 60 - if (timeDifferenceInMinutes >= 5) - telemetryFlush = true - end + # Adding telemetry to send pod telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 5) + telemetryFlush = true + end - # Flush AppInsights telemetry once all the processing is done - if telemetryFlush == true - telemetryProperties = {} - telemetryProperties["Computer"] = @@hostName - ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) - ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {}) - telemetryProperties["ControllerData"] = @controllerData.to_json - ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties) - if @winContainerCount > 0 - telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount - ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) + # Flush AppInsights telemetry once all the processing is done + if telemetryFlush == true + telemetryProperties = {} + telemetryProperties["Computer"] = @@hostName + ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) + ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {}) + telemetryProperties["ControllerData"] = @controllerData.to_json + ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties) + if @winContainerCount > 0 + telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount + ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) + end + @@podTelemetryTimeTracker = DateTime.now.to_time.to_i end - @@podTelemetryTimeTracker = DateTime.now.to_time.to_i + rescue => errorStr + $log.warn "in_kube_podinventory::enumerate:Failed in enumerate: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end