Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions installer/conf/container.conf
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,6 @@
log_level debug
</source>

# Container host inventory
<source>
type omi
run_interval 60s
tag oms.api.ContainerNodeInventory
items [
["root/cimv2","Container_HostInventory"]
]
</source>

#cadvisor perf
<source>
type cadvisorperf
Expand All @@ -33,19 +23,6 @@
log_level debug
</source>

<match oms.api.ContainerNodeInventory**>
type out_oms_api
log_level debug
buffer_chunk_limit 20m
buffer_type file
buffer_path %STATE_DIR_WS%/out_oms_containernodeinventory*.buffer
buffer_queue_limit 20
flush_interval 20s
retry_limit 10
retry_wait 15s
max_retry_wait 9m
</match>

<match oms.containerinsights.containerinventory**>
type out_oms
log_level debug
Expand Down
13 changes: 13 additions & 0 deletions installer/conf/kube.conf
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,19 @@
max_retry_wait 9m
</match>

<match oms.api.ContainerNodeInventory**>
type out_oms_api
log_level debug
buffer_chunk_limit 20m
buffer_type file
buffer_path %STATE_DIR_WS%/out_oms_containernodeinventory*.buffer
buffer_queue_limit 20
flush_interval 20s
retry_limit 10
retry_wait 15s
max_retry_wait 9m
</match>

<match oms.api.KubePerf**>
type out_oms
log_level debug
Expand Down
6 changes: 3 additions & 3 deletions source/code/plugin/ApplicationInsightsUtility.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def sendHeartBeatEvent(pluginName)
end
end

def sendCustomEvent(pluginName, properties)
def sendCustomMetric(pluginName, properties)
begin
if !(@@Tc.nil?)
@@Tc.track_metric 'LastProcessedContainerInventoryCount', properties['ContainerCount'],
Expand All @@ -93,7 +93,7 @@ def sendCustomEvent(pluginName, properties)
$log.info("AppInsights Container Count Telemetry sent successfully")
end
rescue => errorStr
$log.warn("Exception in AppInsightsUtility: sendCustomEvent - error: #{errorStr}")
$log.warn("Exception in AppInsightsUtility: sendCustomMetric - error: #{errorStr}")
end
end

Expand All @@ -120,7 +120,7 @@ def sendTelemetry(pluginName, properties)
end
@@CustomProperties['Computer'] = properties['Computer']
sendHeartBeatEvent(pluginName)
sendCustomEvent(pluginName, properties)
sendCustomMetric(pluginName, properties)
rescue => errorStr
$log.warn("Exception in AppInsightsUtility: sendTelemetry - error: #{errorStr}")
end
Expand Down
45 changes: 40 additions & 5 deletions source/code/plugin/in_kube_nodes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@ module Fluent
class Kube_nodeInventory_Input < Input
Plugin.register_input('kubenodeinventory', self)

@@ContainerNodeInventoryTag = 'oms.api.ContainerNodeInventory'

def initialize
super
require 'yaml'
require 'json'

require_relative 'KubernetesApiClient'
require_relative 'ApplicationInsightsUtility'
require_relative 'oms_common'
require_relative 'omslog'
end
Expand All @@ -29,6 +32,7 @@ def start
@condition = ConditionVariable.new
@mutex = Mutex.new
@thread = Thread.new(&method(:run_periodic))
@@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i
end
end

Expand All @@ -46,15 +50,22 @@ def enumerate
currentTime = Time.now
emitTime = currentTime.to_f
batchTime = currentTime.utc.iso8601
$log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}")
nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('nodes').body)
$log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}")
telemetrySent = false
$log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}")
nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('nodes').body)
$log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}")
begin
if(!nodeInventory.empty?)
eventStream = MultiEventStream.new
containerNodeInventoryEventStream = MultiEventStream.new
#get node inventory
nodeInventory['items'].each do |items|
record = {}
# Sending records for ContainerNodeInventory
containerNodeInventoryRecord = {}
containerNodeInventoryRecord['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated
containerNodeInventoryRecord['Computer'] = items['metadata']['name']

record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated
record['Computer'] = items['metadata']['name']
record['ClusterName'] = KubernetesApiClient.getClusterName
Expand Down Expand Up @@ -89,16 +100,40 @@ def enumerate

end

record['KubeletVersion'] = items['status']['nodeInfo']['kubeletVersion']
record['KubeProxyVersion'] = items['status']['nodeInfo']['kubeProxyVersion']
nodeInfo = items['status']['nodeInfo']
record['KubeletVersion'] = nodeInfo['kubeletVersion']
record['KubeProxyVersion'] = nodeInfo['kubeProxyVersion']
containerNodeInventoryRecord['OperatingSystem'] = nodeInfo['osImage']
dockerVersion = nodeInfo['containerRuntimeVersion']
dockerVersion.slice! "docker://"
containerNodeInventoryRecord['DockerVersion'] = dockerVersion
# ContainerNodeInventory data for docker version and operating system.
containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryRecord) if containerNodeInventoryRecord

wrapper = {
"DataType"=>"KUBE_NODE_INVENTORY_BLOB",
"IPName"=>"ContainerInsights",
"DataItems"=>[record.each{|k,v| record[k]=v}]
}
eventStream.add(emitTime, wrapper) if wrapper
# Adding telemetry to send node telemetry every 5 minutes
timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs
timeDifferenceInMinutes = timeDifference/60
if (timeDifferenceInMinutes >= 5)
properties = {}
properties["Computer"] = record["Computer"]
ApplicationInsightsUtility.sendMetricTelemetry("KubeletVersion", record["KubeletVersion"] , properties)
capacityInfo = items['status']['capacity']
ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"] , properties)
ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"] , properties)
telemetrySent = true
end
end
router.emit_stream(@tag, eventStream) if eventStream
router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream
if telemetrySent == true
@@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i
end
@@istestvar = ENV['ISTEST']
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0)
$log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
Expand Down
20 changes: 20 additions & 0 deletions source/code/plugin/in_kube_podinventory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ def initialize
super
require 'yaml'
require 'json'
require 'set'

require_relative 'KubernetesApiClient'
require_relative 'ApplicationInsightsUtility'
require_relative 'oms_common'
require_relative 'omslog'
end
Expand All @@ -29,6 +31,7 @@ def start
@condition = ConditionVariable.new
@mutex = Mutex.new
@thread = Thread.new(&method(:run_periodic))
@@podTelemetryTimeTracker = DateTime.now.to_time.to_i
end
end

Expand Down Expand Up @@ -71,13 +74,16 @@ def parse_and_emit_records(podInventory, serviceList)
emitTime = currentTime.to_f
batchTime = currentTime.utc.iso8601
eventStream = MultiEventStream.new
controllerSet = Set.new []
telemetryFlush = false
begin #begin block start
podInventory['items'].each do |items| #podInventory block start
records = []
record = {}
record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated
record['Name'] = items['metadata']['name']
podNameSpace = items['metadata']['namespace']

if podNameSpace.eql?("kube-system") && !items['metadata'].key?("ownerReferences")
# The above case seems to be the only case where you have horizontal scaling of pods
# but no controller, in which case cAdvisor picks up kubernetes.io/config.hash
Expand Down Expand Up @@ -129,9 +135,18 @@ def parse_and_emit_records(podInventory, serviceList)
record['ClusterId'] = KubernetesApiClient.getClusterId
record['ClusterName'] = KubernetesApiClient.getClusterName
record['ServiceName'] = getServiceNameFromLabels(items['metadata']['namespace'], items['metadata']['labels'], serviceList)
# Adding telemetry to send pod telemetry every 5 minutes
timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs
timeDifferenceInMinutes = timeDifference/60
if (timeDifferenceInMinutes >= 5)
telemetryFlush = true
end
if !items['metadata']['ownerReferences'].nil?
record['ControllerKind'] = items['metadata']['ownerReferences'][0]['kind']
record['ControllerName'] = items['metadata']['ownerReferences'][0]['name']
if telemetryFlush == true
controllerSet.add(record['ControllerKind'] + record['ControllerName'])
end
end
podRestartCount = 0
record['PodRestartCount'] = 0
Expand Down Expand Up @@ -191,6 +206,11 @@ def parse_and_emit_records(podInventory, serviceList)
end
end #podInventory block end
router.emit_stream(@tag, eventStream) if eventStream
if telemetryFlush == true
ApplicationInsightsUtility.sendMetricTelemetry("PodCount", podInventory['items'].length , {})
ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", controllerSet.length , {})
@@podTelemetryTimeTracker = DateTime.now.to_time.to_i
end
@@istestvar = ENV['ISTEST']
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0)
$log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
Expand Down