Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file modified installer/conf/container.conf
100755 → 100644
Empty file.
164 changes: 101 additions & 63 deletions source/code/plugin/in_kube_podinventory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,21 @@ class Kube_PodInventory_Input < Input
def initialize
super
require "yaml"
require 'yajl/json_gem'
require "yajl/json_gem"
require "yajl"
require "set"
require "time"

require_relative "KubernetesApiClient"
require_relative "ApplicationInsightsUtility"
require_relative "oms_common"
require_relative "omslog"

@PODS_CHUNK_SIZE = "1500"
@podCount = 0
@controllerSet = Set.new []
@winContainerCount = 0
@controllerData = {}
end

config_param :run_interval, :time, :default => 60
Expand Down Expand Up @@ -50,37 +57,89 @@ def shutdown
end
end

def enumerate(podList = nil)
podInventory = podList
currentTime = Time.now
$log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}")
podInfo = KubernetesApiClient.getKubeResourceInfo("pods")
$log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}")

if !podInfo.nil?
podInventory = JSON.parse(podInfo.body)
end

def processPodChunks(podInventory, serviceList, batchTime)
begin
if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?)
batchTime = currentTime.utc.iso8601
#get pod inventory & services
$log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}")
serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body)
$log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}")
parse_and_emit_records(podInventory, serviceList, batchTime )
parse_and_emit_records(podInventory, serviceList, batchTime)
else
$log.warn "Received empty podInventory"
$log.warn "in_kube_podinventory::processPodChunks:Received empty podInventory"
end
podInfo = nil
podInventory = nil
rescue => errorStr
$log.warn "Failed in enumerate pod inventory: #{errorStr}"
$log.warn "in_kube_podinventory::processPodChunks:Failed in process pod chunks: #{errorStr}"
$log.debug_backtrace(errorStr.backtrace)
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
end

def parsePodsJsonAndProcess(podInfo, serviceList, batchTime)
if !podInfo.nil?
$log.info("in_kube_podinventory::parsePodsJsonAndProcess:Start:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}")
podInventory = Yajl::Parser.parse(StringIO.new(podInfo.body))
$log.info("in_kube_podinventory::parsePodsJsonAndProcess:End:Parsing chunked data using yajl @ #{Time.now.utc.iso8601}")
end
if (!podInventory.nil? && !podInventory["metadata"].nil?)
continuationToken = podInventory["metadata"]["continue"]
end
processPodChunks(podInventory, serviceList, batchTime)
return continuationToken
end

def enumerate(podList = nil)
podInventory = podList
telemetryFlush = false
@podCount = 0
@controllerSet = Set.new []
@winContainerCount = 0
@controllerData = {}
currentTime = Time.now
batchTime = currentTime.utc.iso8601

# Get services first so that we dont need to make a call for very chunk
$log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}")
serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body)
$log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}")

# Initializing continuation token to nil
continuationToken = nil
$log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}")
podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}")
$log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}")

continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime)

#If we receive a continuation token, make calls, process and flush data until we have processed all data
while (!continuationToken.nil? && !continuationToken.empty?)
$log.info("in_kube_podinventory::enumerate : Getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}")
podInfo = KubernetesApiClient.getKubeResourceInfo("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}")
$log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API using continuation token @ #{Time.now.utc.iso8601}")
continuationToken = parsePodsJsonAndProcess(podInfo, serviceList, batchTime)
end

# Adding telemetry to send pod telemetry every 5 minutes
timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs
timeDifferenceInMinutes = timeDifference / 60
if (timeDifferenceInMinutes >= 5)
telemetryFlush = true
end

# Flush AppInsights telemetry once all the processing is done
if telemetryFlush == true
telemetryProperties = {}
telemetryProperties["Computer"] = @@hostName
ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties)
ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {})
telemetryProperties["ControllerData"] = @controllerData.to_json
ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties)
if @winContainerCount > 0
telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount
ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties)
end
@@podTelemetryTimeTracker = DateTime.now.to_time.to_i
end
end

def populateWindowsContainerInventoryRecord(container, record, containerEnvVariableHash, batchTime)
begin
containerInventoryRecord = {}
Expand Down Expand Up @@ -193,15 +252,12 @@ def getContainerEnvironmentVariables(pod, clusterCollectEnvironmentVar)
end
end

def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8601)
def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8601)
currentTime = Time.now
emitTime = currentTime.to_f
#batchTime = currentTime.utc.iso8601
eventStream = MultiEventStream.new
controllerSet = Set.new []
controllerData = {}
telemetryFlush = false
winContainerCount = 0

begin #begin block start
# Getting windows nodes from kubeapi
winNodes = KubernetesApiClient.getWindowsNodesArray
Expand Down Expand Up @@ -284,24 +340,17 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8
record["ClusterId"] = KubernetesApiClient.getClusterId
record["ClusterName"] = KubernetesApiClient.getClusterName
record["ServiceName"] = getServiceNameFromLabels(items["metadata"]["namespace"], items["metadata"]["labels"], serviceList)
# Adding telemetry to send pod telemetry every 5 minutes
timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs
timeDifferenceInMinutes = timeDifference / 60
if (timeDifferenceInMinutes >= 5)
telemetryFlush = true
end

if !items["metadata"]["ownerReferences"].nil?
record["ControllerKind"] = items["metadata"]["ownerReferences"][0]["kind"]
record["ControllerName"] = items["metadata"]["ownerReferences"][0]["name"]
if telemetryFlush == true
controllerSet.add(record["ControllerKind"] + record["ControllerName"])
#Adding controller kind to telemetry ro information about customer workload
if (controllerData[record["ControllerKind"]].nil?)
controllerData[record["ControllerKind"]] = 1
else
controllerValue = controllerData[record["ControllerKind"]]
controllerData[record["ControllerKind"]] += 1
end
@controllerSet.add(record["ControllerKind"] + record["ControllerName"])
#Adding controller kind to telemetry ro information about customer workload
if (@controllerData[record["ControllerKind"]].nil?)
@controllerData[record["ControllerKind"]] = 1
else
controllerValue = @controllerData[record["ControllerKind"]]
@controllerData[record["ControllerKind"]] += 1
end
end
podRestartCount = 0
Expand Down Expand Up @@ -419,7 +468,7 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8
end
end
# Send container inventory records for containers on windows nodes
winContainerCount += containerInventoryRecords.length
@winContainerCount += containerInventoryRecords.length
containerInventoryRecords.each do |cirecord|
if !cirecord.nil?
ciwrapper = {
Expand All @@ -436,25 +485,24 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8
router.emit_stream(@@MDMKubePodInventoryTag, eventStream) if eventStream
#:optimize:kubeperf merge
begin
#if(!podInventory.empty?)
#if(!podInventory.empty?)
containerMetricDataItems = []
#hostName = (OMS::Common.get_hostname)
containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "cpu","cpuRequestNanoCores", batchTime))
containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "memory","memoryRequestBytes", batchTime))
containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "cpu","cpuLimitNanoCores", batchTime))
containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "memory","memoryLimitBytes", batchTime))
containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "cpu", "cpuRequestNanoCores", batchTime))
containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "memory", "memoryRequestBytes", batchTime))
containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "cpu", "cpuLimitNanoCores", batchTime))
containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "memory", "memoryLimitBytes", batchTime))

kubePerfEventStream = MultiEventStream.new

containerMetricDataItems.each do |record|
record['DataType'] = "LINUX_PERF_BLOB"
record['IPName'] = "LogManagement"
record["DataType"] = "LINUX_PERF_BLOB"
record["IPName"] = "LogManagement"
kubePerfEventStream.add(emitTime, record) if record
#router.emit(@tag, time, record) if record
#router.emit(@tag, time, record) if record
end
#end
router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream

rescue => errorStr
$log.warn "Failed in parse_and_emit_record for KubePerf from in_kube_podinventory : #{errorStr}"
$log.debug_backtrace(errorStr.backtrace)
Expand Down Expand Up @@ -493,19 +541,9 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8
end
#:optimize:end kubeservices merge

if telemetryFlush == true
telemetryProperties = {}
telemetryProperties["Computer"] = @@hostName
ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties)
ApplicationInsightsUtility.sendMetricTelemetry("PodCount", podInventory["items"].length, {})
telemetryProperties["ControllerData"] = controllerData.to_json
ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", controllerSet.length, telemetryProperties)
if winContainerCount > 0
telemetryProperties["ClusterWideWindowsContainersCount"] = winContainerCount
ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties)
end
@@podTelemetryTimeTracker = DateTime.now.to_time.to_i
end
#Updating value for AppInsights telemetry
@podCount += podInventory["items"].length

@@istestvar = ENV["ISTEST"]
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0)
$log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
Expand Down