From 2b1866f195e528dbd47dca832183e9accd81afe5 Mon Sep 17 00:00:00 2001 From: rashmy Date: Tue, 4 Dec 2018 18:33:11 -0800 Subject: [PATCH 1/6] containernodeinventory changes --- source/code/plugin/in_kube_nodes.rb | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index edbbdd37f..f23d2eec7 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -55,6 +55,11 @@ def enumerate #get node inventory nodeInventory['items'].each do |items| record = {} + # Sending records for ContainerNodeInventory + containerInventory = {} + containerInventory['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated + containerInventory['Computer'] = items['metadata']['name'] + record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated record['Computer'] = items['metadata']['name'] record['ClusterName'] = KubernetesApiClient.getClusterName @@ -89,8 +94,14 @@ def enumerate end - record['KubeletVersion'] = items['status']['nodeInfo']['kubeletVersion'] - record['KubeProxyVersion'] = items['status']['nodeInfo']['kubeProxyVersion'] + nodeInfo = items['status']['nodeInfo'] + record['KubeletVersion'] = nodeInfo['kubeletVersion'] + record['KubeProxyVersion'] = nodeInfo['kubeProxyVersion'] + containerInventory['OperatingSystem'] = nodeInfo['osImage'] + dockerVersion = nodeInfo['containerRuntimeVersion'] + dockerVersion.slice! "docker://" + containerInventory['DockerVersion'] = dockerVersion + wrapper = { "DataType"=>"KUBE_NODE_INVENTORY_BLOB", "IPName"=>"ContainerInsights", From b5b6c0eed6ba72ea046beb924565aa5f0cce9aca Mon Sep 17 00:00:00 2001 From: rashmy Date: Thu, 6 Dec 2018 13:10:07 -0800 Subject: [PATCH 2/6] changes for containernodeinventory --- installer/conf/container.conf | 23 ----------------------- installer/conf/kube.conf | 13 +++++++++++++ source/code/plugin/in_kube_nodes.rb | 16 +++++++++++----- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/installer/conf/container.conf b/installer/conf/container.conf index 798bd8eb6..091753230 100755 --- a/installer/conf/container.conf +++ b/installer/conf/container.conf @@ -15,16 +15,6 @@ log_level debug -# Container host inventory - - type omi - run_interval 60s - tag oms.api.ContainerNodeInventory - items [ - ["root/cimv2","Container_HostInventory"] - ] - - #cadvisor perf type cadvisorperf @@ -33,19 +23,6 @@ log_level debug - - type out_oms_api - log_level debug - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_containernodeinventory*.buffer - buffer_queue_limit 20 - flush_interval 20s - retry_limit 10 - retry_wait 15s - max_retry_wait 9m - - type out_oms log_level debug diff --git a/installer/conf/kube.conf b/installer/conf/kube.conf index 94fe2ef0b..22c51ad0e 100644 --- a/installer/conf/kube.conf +++ b/installer/conf/kube.conf @@ -118,6 +118,19 @@ max_retry_wait 9m + + type out_oms_api + log_level debug + buffer_chunk_limit 20m + buffer_type file + buffer_path %STATE_DIR_WS%/out_oms_containernodeinventory*.buffer + buffer_queue_limit 20 + flush_interval 20s + retry_limit 10 + retry_wait 15s + max_retry_wait 9m + + type out_oms log_level debug diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index f23d2eec7..a621b5bc0 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -6,6 +6,8 @@ module Fluent class Kube_nodeInventory_Input < Input Plugin.register_input('kubenodeinventory', self) + @@ContainerNodeInventoryTag = 'oms.api.ContainerNodeInventory' + def initialize super require 'yaml' @@ -52,13 +54,14 @@ def enumerate begin if(!nodeInventory.empty?) eventStream = MultiEventStream.new + containerNodeInventoryEventStream = MultiEventStream.new #get node inventory nodeInventory['items'].each do |items| record = {} # Sending records for ContainerNodeInventory - containerInventory = {} - containerInventory['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated - containerInventory['Computer'] = items['metadata']['name'] + containerNodeInventoryRecord = {} + containerNodeInventoryRecord['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated + containerNodeInventoryRecord['Computer'] = items['metadata']['name'] record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated record['Computer'] = items['metadata']['name'] @@ -97,10 +100,12 @@ def enumerate nodeInfo = items['status']['nodeInfo'] record['KubeletVersion'] = nodeInfo['kubeletVersion'] record['KubeProxyVersion'] = nodeInfo['kubeProxyVersion'] - containerInventory['OperatingSystem'] = nodeInfo['osImage'] + containerNodeInventoryRecord['OperatingSystem'] = nodeInfo['osImage'] dockerVersion = nodeInfo['containerRuntimeVersion'] dockerVersion.slice! "docker://" - containerInventory['DockerVersion'] = dockerVersion + containerNodeInventoryRecord['DockerVersion'] = dockerVersion + # ContainerNodeInventory data for docker version and operating system. + containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryRecord) if containerNodeInventoryRecord wrapper = { "DataType"=>"KUBE_NODE_INVENTORY_BLOB", @@ -110,6 +115,7 @@ def enumerate eventStream.add(emitTime, wrapper) if wrapper end router.emit_stream(@tag, eventStream) if eventStream + router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream @@istestvar = ENV['ISTEST'] if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0) $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") From f48c26a64ed046ba58e1bf1869ca0b533794700f Mon Sep 17 00:00:00 2001 From: rashmy Date: Fri, 14 Dec 2018 17:23:37 -0800 Subject: [PATCH 3/6] changes to add node telemetry --- .../code/plugin/ApplicationInsightsUtility.rb | 6 ++--- source/code/plugin/in_kube_nodes.rb | 24 ++++++++++++++++--- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/source/code/plugin/ApplicationInsightsUtility.rb b/source/code/plugin/ApplicationInsightsUtility.rb index 78553a83f..76e0b2926 100644 --- a/source/code/plugin/ApplicationInsightsUtility.rb +++ b/source/code/plugin/ApplicationInsightsUtility.rb @@ -83,7 +83,7 @@ def sendHeartBeatEvent(pluginName) end end - def sendCustomEvent(pluginName, properties) + def sendCustomMetric(pluginName, properties) begin if !(@@Tc.nil?) @@Tc.track_metric 'LastProcessedContainerInventoryCount', properties['ContainerCount'], @@ -93,7 +93,7 @@ def sendCustomEvent(pluginName, properties) $log.info("AppInsights Container Count Telemetry sent successfully") end rescue => errorStr - $log.warn("Exception in AppInsightsUtility: sendCustomEvent - error: #{errorStr}") + $log.warn("Exception in AppInsightsUtility: sendCustomMetric - error: #{errorStr}") end end @@ -120,7 +120,7 @@ def sendTelemetry(pluginName, properties) end @@CustomProperties['Computer'] = properties['Computer'] sendHeartBeatEvent(pluginName) - sendCustomEvent(pluginName, properties) + sendCustomMetric(pluginName, properties) rescue => errorStr $log.warn("Exception in AppInsightsUtility: sendTelemetry - error: #{errorStr}") end diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index a621b5bc0..ee7440e5f 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -14,6 +14,7 @@ def initialize require 'json' require_relative 'KubernetesApiClient' + require_relative 'ApplicationInsightsUtility' require_relative 'oms_common' require_relative 'omslog' end @@ -31,6 +32,7 @@ def start @condition = ConditionVariable.new @mutex = Mutex.new @thread = Thread.new(&method(:run_periodic)) + @@telemetryTimeTracker = DateTime.now.to_time.to_i end end @@ -48,9 +50,10 @@ def enumerate currentTime = Time.now emitTime = currentTime.to_f batchTime = currentTime.utc.iso8601 - $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") - nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('nodes').body) - $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + @@telemetrySent = false + $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") + nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('nodes').body) + $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") begin if(!nodeInventory.empty?) eventStream = MultiEventStream.new @@ -113,9 +116,24 @@ def enumerate "DataItems"=>[record.each{|k,v| record[k]=v}] } eventStream.add(emitTime, wrapper) if wrapper + # Adding telemetry to send node telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@telemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference/60 + if (timeDifferenceInMinutes >= 5) + properties = {} + properties["Computer"] = record["Computer"] + ApplicationInsightsUtility.sendMetricTelemetry("KubeletVersion", record["KubeletVersion"] , properties) + capacityInfo = items['status']['capacity'] + ApplicationInsightsUtility.sendMetricTelemetry("CoreCapacity", capacityInfo["cpu"] , properties) + ApplicationInsightsUtility.sendMetricTelemetry("Memory", capacityInfo["memory"] , properties) + @@telemetrySent = true + end end router.emit_stream(@tag, eventStream) if eventStream router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream + if @@telemetrySent == true + @@telemetryTimeTracker = DateTime.now.to_time.to_i + end @@istestvar = ENV['ISTEST'] if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0) $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") From 5336e55be74096197532ebff2401e6713c0de805 Mon Sep 17 00:00:00 2001 From: rashmy Date: Fri, 14 Dec 2018 17:58:27 -0800 Subject: [PATCH 4/6] pod telemetry cahnges --- source/code/plugin/in_kube_nodes.rb | 6 +++--- source/code/plugin/in_kube_podinventory.rb | 22 ++++++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index ee7440e5f..a8c4b3f6f 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -32,7 +32,7 @@ def start @condition = ConditionVariable.new @mutex = Mutex.new @thread = Thread.new(&method(:run_periodic)) - @@telemetryTimeTracker = DateTime.now.to_time.to_i + @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i end end @@ -117,7 +117,7 @@ def enumerate } eventStream.add(emitTime, wrapper) if wrapper # Adding telemetry to send node telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@telemetryTimeTracker).abs + timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs timeDifferenceInMinutes = timeDifference/60 if (timeDifferenceInMinutes >= 5) properties = {} @@ -132,7 +132,7 @@ def enumerate router.emit_stream(@tag, eventStream) if eventStream router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream if @@telemetrySent == true - @@telemetryTimeTracker = DateTime.now.to_time.to_i + @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i end @@istestvar = ENV['ISTEST'] if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0) diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index ec76bac61..51b1d56e0 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -10,8 +10,10 @@ def initialize super require 'yaml' require 'json' + require 'set' require_relative 'KubernetesApiClient' + require_relative 'ApplicationInsightsUtility' require_relative 'oms_common' require_relative 'omslog' end @@ -29,6 +31,7 @@ def start @condition = ConditionVariable.new @mutex = Mutex.new @thread = Thread.new(&method(:run_periodic)) + @@podTelemetryTimeTracker = DateTime.now.to_time.to_i end end @@ -71,6 +74,8 @@ def parse_and_emit_records(podInventory, serviceList) emitTime = currentTime.to_f batchTime = currentTime.utc.iso8601 eventStream = MultiEventStream.new + controllerSet = Set.new [] + telemetryFlush = false begin #begin block start podInventory['items'].each do |items| #podInventory block start records = [] @@ -78,6 +83,12 @@ def parse_and_emit_records(podInventory, serviceList) record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated record['Name'] = items['metadata']['name'] podNameSpace = items['metadata']['namespace'] + # Adding telemetry to send node telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference/60 + if (timeDifferenceInMinutes >= 5) + telemetryFlush = true + end if podNameSpace.eql?("kube-system") && !items['metadata'].key?("ownerReferences") # The above case seems to be the only case where you have horizontal scaling of pods # but no controller, in which case cAdvisor picks up kubernetes.io/config.hash @@ -85,8 +96,14 @@ def parse_and_emit_records(podInventory, serviceList) # its ok to use this. # Use kubernetes.io/config.hash to be able to correlate with cadvisor data podUid = items['metadata']['annotations']['kubernetes.io/config.hash'] + if telemetryFlush == true + controllerSet.add(podUid) + end else podUid = items['metadata']['uid'] + if telemetryFlush == true + controllerSet.add(podUid) + end end record['PodUid'] = podUid record['PodLabel'] = [items['metadata']['labels']] @@ -189,6 +206,11 @@ def parse_and_emit_records(podInventory, serviceList) eventStream.add(emitTime, wrapper) if wrapper end end + if telemetryFlush == true + ApplicationInsightsUtility.sendMetricTelemetry("PodCount", records.count , {}) + ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", controllerSet.length , {}) + @@podTelemetryTimeTracker = DateTime.now.to_time.to_i + end end #podInventory block end router.emit_stream(@tag, eventStream) if eventStream @@istestvar = ENV['ISTEST'] From 7c67c046dc76573deddecf854ea140b5802a84c7 Mon Sep 17 00:00:00 2001 From: rashmy Date: Sat, 15 Dec 2018 19:02:42 -0800 Subject: [PATCH 5/6] updated telemetry changes --- source/code/plugin/in_kube_nodes.rb | 10 +++++----- source/code/plugin/in_kube_podinventory.rb | 14 +++++++------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index a8c4b3f6f..1c792d0da 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -50,7 +50,7 @@ def enumerate currentTime = Time.now emitTime = currentTime.to_f batchTime = currentTime.utc.iso8601 - @@telemetrySent = false + telemetrySent = false $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('nodes').body) $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") @@ -124,14 +124,14 @@ def enumerate properties["Computer"] = record["Computer"] ApplicationInsightsUtility.sendMetricTelemetry("KubeletVersion", record["KubeletVersion"] , properties) capacityInfo = items['status']['capacity'] - ApplicationInsightsUtility.sendMetricTelemetry("CoreCapacity", capacityInfo["cpu"] , properties) - ApplicationInsightsUtility.sendMetricTelemetry("Memory", capacityInfo["memory"] , properties) - @@telemetrySent = true + ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"] , properties) + ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"] , properties) + telemetrySent = true end end router.emit_stream(@tag, eventStream) if eventStream router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream - if @@telemetrySent == true + if telemetrySent == true @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i end @@istestvar = ENV['ISTEST'] diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index 51b1d56e0..2b8b2625d 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -83,8 +83,8 @@ def parse_and_emit_records(podInventory, serviceList) record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated record['Name'] = items['metadata']['name'] podNameSpace = items['metadata']['namespace'] - # Adding telemetry to send node telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs + # Adding telemetry to send pod telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs timeDifferenceInMinutes = timeDifference/60 if (timeDifferenceInMinutes >= 5) telemetryFlush = true @@ -206,13 +206,13 @@ def parse_and_emit_records(podInventory, serviceList) eventStream.add(emitTime, wrapper) if wrapper end end - if telemetryFlush == true - ApplicationInsightsUtility.sendMetricTelemetry("PodCount", records.count , {}) - ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", controllerSet.length , {}) - @@podTelemetryTimeTracker = DateTime.now.to_time.to_i - end end #podInventory block end router.emit_stream(@tag, eventStream) if eventStream + if telemetryFlush == true + ApplicationInsightsUtility.sendMetricTelemetry("PodCount", podInventory['items'].length , {}) + ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", controllerSet.length , {}) + @@podTelemetryTimeTracker = DateTime.now.to_time.to_i + end @@istestvar = ENV['ISTEST'] if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0) $log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") From 0c3ccef8eb119a36dd24e47ffb3b4f3c4ac05286 Mon Sep 17 00:00:00 2001 From: rashmy Date: Mon, 17 Dec 2018 13:50:17 -0800 Subject: [PATCH 6/6] changes to get uid of owner references as controller id --- source/code/plugin/in_kube_podinventory.rb | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index 2b8b2625d..c6873e8fe 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -83,12 +83,7 @@ def parse_and_emit_records(podInventory, serviceList) record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated record['Name'] = items['metadata']['name'] podNameSpace = items['metadata']['namespace'] - # Adding telemetry to send pod telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference/60 - if (timeDifferenceInMinutes >= 5) - telemetryFlush = true - end + if podNameSpace.eql?("kube-system") && !items['metadata'].key?("ownerReferences") # The above case seems to be the only case where you have horizontal scaling of pods # but no controller, in which case cAdvisor picks up kubernetes.io/config.hash @@ -96,14 +91,8 @@ def parse_and_emit_records(podInventory, serviceList) # its ok to use this. # Use kubernetes.io/config.hash to be able to correlate with cadvisor data podUid = items['metadata']['annotations']['kubernetes.io/config.hash'] - if telemetryFlush == true - controllerSet.add(podUid) - end else podUid = items['metadata']['uid'] - if telemetryFlush == true - controllerSet.add(podUid) - end end record['PodUid'] = podUid record['PodLabel'] = [items['metadata']['labels']] @@ -146,9 +135,18 @@ def parse_and_emit_records(podInventory, serviceList) record['ClusterId'] = KubernetesApiClient.getClusterId record['ClusterName'] = KubernetesApiClient.getClusterName record['ServiceName'] = getServiceNameFromLabels(items['metadata']['namespace'], items['metadata']['labels'], serviceList) + # Adding telemetry to send pod telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference/60 + if (timeDifferenceInMinutes >= 5) + telemetryFlush = true + end if !items['metadata']['ownerReferences'].nil? record['ControllerKind'] = items['metadata']['ownerReferences'][0]['kind'] record['ControllerName'] = items['metadata']['ownerReferences'][0]['name'] + if telemetryFlush == true + controllerSet.add(record['ControllerKind'] + record['ControllerName']) + end end podRestartCount = 0 record['PodRestartCount'] = 0