diff --git a/installer/conf/container.conf b/installer/conf/container.conf
index 798bd8eb6..091753230 100755
--- a/installer/conf/container.conf
+++ b/installer/conf/container.conf
@@ -15,16 +15,6 @@
log_level debug
-# Container host inventory
-
- type omi
- run_interval 60s
- tag oms.api.ContainerNodeInventory
- items [
- ["root/cimv2","Container_HostInventory"]
- ]
-
-
#cadvisor perf
type cadvisorperf
@@ -33,19 +23,6 @@
log_level debug
-
- type out_oms_api
- log_level debug
- buffer_chunk_limit 20m
- buffer_type file
- buffer_path %STATE_DIR_WS%/out_oms_containernodeinventory*.buffer
- buffer_queue_limit 20
- flush_interval 20s
- retry_limit 10
- retry_wait 15s
- max_retry_wait 9m
-
-
type out_oms
log_level debug
diff --git a/installer/conf/kube.conf b/installer/conf/kube.conf
index 94fe2ef0b..22c51ad0e 100644
--- a/installer/conf/kube.conf
+++ b/installer/conf/kube.conf
@@ -118,6 +118,19 @@
max_retry_wait 9m
+
+ type out_oms_api
+ log_level debug
+ buffer_chunk_limit 20m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/out_oms_containernodeinventory*.buffer
+ buffer_queue_limit 20
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 15s
+ max_retry_wait 9m
+
+
type out_oms
log_level debug
diff --git a/source/code/plugin/ApplicationInsightsUtility.rb b/source/code/plugin/ApplicationInsightsUtility.rb
index 78553a83f..76e0b2926 100644
--- a/source/code/plugin/ApplicationInsightsUtility.rb
+++ b/source/code/plugin/ApplicationInsightsUtility.rb
@@ -83,7 +83,7 @@ def sendHeartBeatEvent(pluginName)
end
end
- def sendCustomEvent(pluginName, properties)
+ def sendCustomMetric(pluginName, properties)
begin
if !(@@Tc.nil?)
@@Tc.track_metric 'LastProcessedContainerInventoryCount', properties['ContainerCount'],
@@ -93,7 +93,7 @@ def sendCustomEvent(pluginName, properties)
$log.info("AppInsights Container Count Telemetry sent successfully")
end
rescue => errorStr
- $log.warn("Exception in AppInsightsUtility: sendCustomEvent - error: #{errorStr}")
+ $log.warn("Exception in AppInsightsUtility: sendCustomMetric - error: #{errorStr}")
end
end
@@ -120,7 +120,7 @@ def sendTelemetry(pluginName, properties)
end
@@CustomProperties['Computer'] = properties['Computer']
sendHeartBeatEvent(pluginName)
- sendCustomEvent(pluginName, properties)
+ sendCustomMetric(pluginName, properties)
rescue => errorStr
$log.warn("Exception in AppInsightsUtility: sendTelemetry - error: #{errorStr}")
end
diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb
index edbbdd37f..1c792d0da 100644
--- a/source/code/plugin/in_kube_nodes.rb
+++ b/source/code/plugin/in_kube_nodes.rb
@@ -6,12 +6,15 @@ module Fluent
class Kube_nodeInventory_Input < Input
Plugin.register_input('kubenodeinventory', self)
+ @@ContainerNodeInventoryTag = 'oms.api.ContainerNodeInventory'
+
def initialize
super
require 'yaml'
require 'json'
require_relative 'KubernetesApiClient'
+ require_relative 'ApplicationInsightsUtility'
require_relative 'oms_common'
require_relative 'omslog'
end
@@ -29,6 +32,7 @@ def start
@condition = ConditionVariable.new
@mutex = Mutex.new
@thread = Thread.new(&method(:run_periodic))
+ @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i
end
end
@@ -46,15 +50,22 @@ def enumerate
currentTime = Time.now
emitTime = currentTime.to_f
batchTime = currentTime.utc.iso8601
- $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}")
- nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('nodes').body)
- $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}")
+ telemetrySent = false
+ $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}")
+ nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('nodes').body)
+ $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}")
begin
if(!nodeInventory.empty?)
eventStream = MultiEventStream.new
+ containerNodeInventoryEventStream = MultiEventStream.new
#get node inventory
nodeInventory['items'].each do |items|
record = {}
+ # Sending records for ContainerNodeInventory
+ containerNodeInventoryRecord = {}
+ containerNodeInventoryRecord['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated
+ containerNodeInventoryRecord['Computer'] = items['metadata']['name']
+
record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated
record['Computer'] = items['metadata']['name']
record['ClusterName'] = KubernetesApiClient.getClusterName
@@ -89,16 +100,40 @@ def enumerate
end
- record['KubeletVersion'] = items['status']['nodeInfo']['kubeletVersion']
- record['KubeProxyVersion'] = items['status']['nodeInfo']['kubeProxyVersion']
+ nodeInfo = items['status']['nodeInfo']
+ record['KubeletVersion'] = nodeInfo['kubeletVersion']
+ record['KubeProxyVersion'] = nodeInfo['kubeProxyVersion']
+ containerNodeInventoryRecord['OperatingSystem'] = nodeInfo['osImage']
+ dockerVersion = nodeInfo['containerRuntimeVersion']
+ dockerVersion.slice! "docker://"
+ containerNodeInventoryRecord['DockerVersion'] = dockerVersion
+ # ContainerNodeInventory data for docker version and operating system.
+ containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryRecord) if containerNodeInventoryRecord
+
wrapper = {
"DataType"=>"KUBE_NODE_INVENTORY_BLOB",
"IPName"=>"ContainerInsights",
"DataItems"=>[record.each{|k,v| record[k]=v}]
}
eventStream.add(emitTime, wrapper) if wrapper
+ # Adding telemetry to send node telemetry every 5 minutes
+ timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs
+ timeDifferenceInMinutes = timeDifference/60
+ if (timeDifferenceInMinutes >= 5)
+ properties = {}
+ properties["Computer"] = record["Computer"]
+ ApplicationInsightsUtility.sendMetricTelemetry("KubeletVersion", record["KubeletVersion"] , properties)
+ capacityInfo = items['status']['capacity']
+ ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"] , properties)
+ ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"] , properties)
+ telemetrySent = true
+ end
end
router.emit_stream(@tag, eventStream) if eventStream
+ router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream
+ if telemetrySent == true
+ @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i
+ end
@@istestvar = ENV['ISTEST']
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0)
$log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb
index ec76bac61..c6873e8fe 100644
--- a/source/code/plugin/in_kube_podinventory.rb
+++ b/source/code/plugin/in_kube_podinventory.rb
@@ -10,8 +10,10 @@ def initialize
super
require 'yaml'
require 'json'
+ require 'set'
require_relative 'KubernetesApiClient'
+ require_relative 'ApplicationInsightsUtility'
require_relative 'oms_common'
require_relative 'omslog'
end
@@ -29,6 +31,7 @@ def start
@condition = ConditionVariable.new
@mutex = Mutex.new
@thread = Thread.new(&method(:run_periodic))
+ @@podTelemetryTimeTracker = DateTime.now.to_time.to_i
end
end
@@ -71,6 +74,8 @@ def parse_and_emit_records(podInventory, serviceList)
emitTime = currentTime.to_f
batchTime = currentTime.utc.iso8601
eventStream = MultiEventStream.new
+ controllerSet = Set.new []
+ telemetryFlush = false
begin #begin block start
podInventory['items'].each do |items| #podInventory block start
records = []
@@ -78,6 +83,7 @@ def parse_and_emit_records(podInventory, serviceList)
record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated
record['Name'] = items['metadata']['name']
podNameSpace = items['metadata']['namespace']
+
if podNameSpace.eql?("kube-system") && !items['metadata'].key?("ownerReferences")
# The above case seems to be the only case where you have horizontal scaling of pods
# but no controller, in which case cAdvisor picks up kubernetes.io/config.hash
@@ -129,9 +135,18 @@ def parse_and_emit_records(podInventory, serviceList)
record['ClusterId'] = KubernetesApiClient.getClusterId
record['ClusterName'] = KubernetesApiClient.getClusterName
record['ServiceName'] = getServiceNameFromLabels(items['metadata']['namespace'], items['metadata']['labels'], serviceList)
+ # Adding telemetry to send pod telemetry every 5 minutes
+ timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs
+ timeDifferenceInMinutes = timeDifference/60
+ if (timeDifferenceInMinutes >= 5)
+ telemetryFlush = true
+ end
if !items['metadata']['ownerReferences'].nil?
record['ControllerKind'] = items['metadata']['ownerReferences'][0]['kind']
record['ControllerName'] = items['metadata']['ownerReferences'][0]['name']
+ if telemetryFlush == true
+ controllerSet.add(record['ControllerKind'] + record['ControllerName'])
+ end
end
podRestartCount = 0
record['PodRestartCount'] = 0
@@ -191,6 +206,11 @@ def parse_and_emit_records(podInventory, serviceList)
end
end #podInventory block end
router.emit_stream(@tag, eventStream) if eventStream
+ if telemetryFlush == true
+ ApplicationInsightsUtility.sendMetricTelemetry("PodCount", podInventory['items'].length , {})
+ ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", controllerSet.length , {})
+ @@podTelemetryTimeTracker = DateTime.now.to_time.to_i
+ end
@@istestvar = ENV['ISTEST']
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0)
$log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")