Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions source/code/plugin/KubernetesApiClient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# frozen_string_literal: true

class KubernetesApiClient
require 'yajl/json_gem'
require "yajl/json_gem"
require "logger"
require "net/http"
require "net/https"
Expand Down Expand Up @@ -43,7 +43,7 @@ def getKubeResourceInfo(resource, api_group: nil)
if !File.exist?(@@CaFile)
raise "#{@@CaFile} doesnt exist"
else
Net::HTTP.start(uri.host, uri.port, :use_ssl => true, :ca_file => @@CaFile, :verify_mode => OpenSSL::SSL::VERIFY_PEER ) do |http|
Net::HTTP.start(uri.host, uri.port, :use_ssl => true, :ca_file => @@CaFile, :verify_mode => OpenSSL::SSL::VERIFY_PEER, :open_timeout => 20, :read_timeout => 40) do |http|
kubeApiRequest = Net::HTTP::Get.new(uri.request_uri)
kubeApiRequest["Authorization"] = "Bearer " + getTokenStr
@Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}"
Expand Down Expand Up @@ -333,7 +333,7 @@ def getContainerLogsSinceTime(namespace, pod, container, since, showTimeStamp)
return containerLogs
end

def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601 )
def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601)
metricItems = []
begin
clusterId = getClusterId
Expand Down Expand Up @@ -546,5 +546,29 @@ def getMetricNumericValue(metricName, metricVal)
end
return metricValue
end # getMetricNumericValue

def getResourcesAndContinuationToken(uri)
continuationToken = nil
resourceInventory = nil
begin
@Log.info "KubernetesApiClient::getResourcesAndContinuationToken : Getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}"
resourceInfo = getKubeResourceInfo(uri)
@Log.info "KubernetesApiClient::getResourcesAndContinuationToken : Done getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}"
if !resourceInfo.nil?
@Log.info "KubernetesApiClient::getResourcesAndContinuationToken:Start:Parsing data for #{uri} using yajl @ #{Time.now.utc.iso8601}"
resourceInventory = Yajl::Parser.parse(StringIO.new(resourceInfo.body))
@Log.info "KubernetesApiClient::getResourcesAndContinuationToken:End:Parsing data for #{uri} using yajl @ #{Time.now.utc.iso8601}"
resourceInfo = nil
end
if (!resourceInventory.nil? && !resourceInventory["metadata"].nil?)
continuationToken = resourceInventory["metadata"]["continue"]
end
rescue => errorStr
@Log.warn "KubernetesApiClient::getResourcesAndContinuationToken:Failed in get resources for #{uri} and continuation token: #{errorStr}"
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
resourceInventory = nil
end
return continuationToken, resourceInventory
end #getResourcesAndContinuationToken
end
end
133 changes: 80 additions & 53 deletions source/code/plugin/in_kube_events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ class Kube_Event_Input < Input

def initialize
super
require 'yajl/json_gem'
require 'yajl'
require "yajl/json_gem"
require "yajl"
require "time"

require_relative "KubernetesApiClient"
require_relative "oms_common"
require_relative "omslog"
require_relative "ApplicationInsightsUtility"

# 30000 events account to approximately 5MB
@EVENTS_CHUNK_SIZE = 30000
end

config_param :run_interval, :time, :default => 60
Expand Down Expand Up @@ -45,66 +48,90 @@ def shutdown
end
end

def enumerate(eventList = nil)
currentTime = Time.now
emitTime = currentTime.to_f
batchTime = currentTime.utc.iso8601
def enumerate
begin
eventList = nil
currentTime = Time.now
batchTime = currentTime.utc.iso8601
eventQueryState = getEventQueryState
newEventQueryState = []

# Initializing continuation token to nil
continuationToken = nil
$log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}")
continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}")
$log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}")
if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?)
newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime)
else
$log.warn "in_kube_events::enumerate:Received empty eventList"
end

events = eventList
$log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}")
eventInfo = KubernetesApiClient.getKubeResourceInfo("events?fieldSelector=type!=Normal")
$log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}")
#If we receive a continuation token, make calls, process and flush data until we have processed all data
while (!continuationToken.nil? && !continuationToken.empty?)
continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}&continue=#{continuationToken}")
if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?)
newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime)
else
$log.warn "in_kube_events::enumerate:Received empty eventList"
end
end

if !eventInfo.nil?
events = Yajl::Parser.parse(StringIO.new(eventInfo.body))
# Setting this to nil so that we dont hold memory until GC kicks in
eventList = nil
writeEventQueryState(newEventQueryState)
rescue => errorStr
$log.warn "in_kube_events::enumerate:Failed in enumerate: #{errorStr}"
$log.debug_backtrace(errorStr.backtrace)
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
end # end enumerate

eventQueryState = getEventQueryState
newEventQueryState = []
def parse_and_emit_records(events, eventQueryState, newEventQueryState, batchTime = Time.utc.iso8601)
currentTime = Time.now
emitTime = currentTime.to_f
begin
if (!events.nil? && !events.empty? && !events["items"].nil?)
eventStream = MultiEventStream.new
events["items"].each do |items|
record = {}
#<BUGBUG> - Not sure if ingestion has the below mapping for this custom type. Fix it as part of fixed type conversion
record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated
eventId = items["metadata"]["uid"] + "/" + items["count"].to_s
newEventQueryState.push(eventId)
if !eventQueryState.empty? && eventQueryState.include?(eventId)
next
end
record["ObjectKind"] = items["involvedObject"]["kind"]
record["Namespace"] = items["involvedObject"]["namespace"]
record["Name"] = items["involvedObject"]["name"]
record["Reason"] = items["reason"]
record["Message"] = items["message"]
record["Type"] = items["type"]
record["TimeGenerated"] = items["metadata"]["creationTimestamp"]
record["SourceComponent"] = items["source"]["component"]
record["FirstSeen"] = items["firstTimestamp"]
record["LastSeen"] = items["lastTimestamp"]
record["Count"] = items["count"]
if items["source"].key?("host")
record["Computer"] = items["source"]["host"]
else
record["Computer"] = (OMS::Common.get_hostname)
end
record['ClusterName'] = KubernetesApiClient.getClusterName
record["ClusterId"] = KubernetesApiClient.getClusterId
wrapper = {
"DataType" => "KUBE_EVENTS_BLOB",
"IPName" => "ContainerInsights",
"DataItems" => [record.each { |k, v| record[k] = v }],
}
eventStream.add(emitTime, wrapper) if wrapper
eventStream = MultiEventStream.new
events["items"].each do |items|
record = {}
#<BUGBUG> - Not sure if ingestion has the below mapping for this custom type. Fix it as part of fixed type conversion
record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated
eventId = items["metadata"]["uid"] + "/" + items["count"].to_s
newEventQueryState.push(eventId)
if !eventQueryState.empty? && eventQueryState.include?(eventId)
next
end
router.emit_stream(@tag, eventStream) if eventStream
end
writeEventQueryState(newEventQueryState)
record["ObjectKind"] = items["involvedObject"]["kind"]
record["Namespace"] = items["involvedObject"]["namespace"]
record["Name"] = items["involvedObject"]["name"]
record["Reason"] = items["reason"]
record["Message"] = items["message"]
record["Type"] = items["type"]
record["TimeGenerated"] = items["metadata"]["creationTimestamp"]
record["SourceComponent"] = items["source"]["component"]
record["FirstSeen"] = items["firstTimestamp"]
record["LastSeen"] = items["lastTimestamp"]
record["Count"] = items["count"]
if items["source"].key?("host")
record["Computer"] = items["source"]["host"]
else
record["Computer"] = (OMS::Common.get_hostname)
end
record["ClusterName"] = KubernetesApiClient.getClusterName
record["ClusterId"] = KubernetesApiClient.getClusterId
wrapper = {
"DataType" => "KUBE_EVENTS_BLOB",
"IPName" => "ContainerInsights",
"DataItems" => [record.each { |k, v| record[k] = v }],
}
eventStream.add(emitTime, wrapper) if wrapper
end
router.emit_stream(@tag, eventStream) if eventStream
rescue => errorStr
$log.debug_backtrace(errorStr.backtrace)
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
end
return newEventQueryState
end

def run_periodic
Expand Down
Loading