diff --git a/README.md b/README.md index 698d54cd1..5c65308fb 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,31 @@ information see the [Code of Conduct FAQ](https://opensource.microsoft.com/codeo additional questions or comments. ## Release History + Note : The agent version(s) below has dates (ciprod), which indicate the agent build dates (not release dates) + +### 10/09/2018 - Version microsoft/oms:ciprod01092019 +- Omsagent - 1.8.1.256 (nov 2018 release) +- Persist fluentbit state between container restarts +- Populate 'TimeOfCommand' for agent ingest time for container logs +- Get node cpu usage from cpuusagenanoseconds (and convert to cpuusgaenanocores) +- Container Node Inventory - move to fluentD from OMI +- Mount docker.sock (Daemon set) as /var/run/host +- Liveness probe (Daemon set) - check for omsagent user permissions in docker.sock and update as necessary (required when docker daemon gets restarted) +- Move to fixed type for kubeevents & kubeservices +- Disable collecting ENV for our oms agent container (daemonset & replicaset) +- Disable container inventory collection for 'sandbox' containers & non kubernetes managed containers +- Agent telemetry - ContainerLogsAgentSideLatencyMs +- Agent telemetry - PodCount +- Agent telemetry - ControllerCount +- Agent telemetry - K8S Version +- Agent telemetry - NodeCoreCapacity +- Agent telemetry - NodeMemoryCapacity +- Agent telemetry - KubeEvents (exceptions) +- Agent telemetry - Kubenodes (exceptions) +- Agent telemetry - kubepods (exceptions) +- Agent telemetry - kubeservices (exceptions) +- Agent telemetry - Daemonset , Replicaset as dimensions (bug fix) ### 11/29/2018 - Version microsoft/oms:ciprod11292018 - Disable Container Image inventory workflow diff --git a/installer/conf/container.conf b/installer/conf/container.conf index 798bd8eb6..091753230 100755 --- a/installer/conf/container.conf +++ b/installer/conf/container.conf @@ -15,16 +15,6 @@ log_level debug -# Container host inventory - - type omi - run_interval 60s - tag oms.api.ContainerNodeInventory - items [ - ["root/cimv2","Container_HostInventory"] - ] - - #cadvisor perf type cadvisorperf @@ -33,19 +23,6 @@ log_level debug - - type out_oms_api - log_level debug - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_containernodeinventory*.buffer - buffer_queue_limit 20 - flush_interval 20s - retry_limit 10 - retry_wait 15s - max_retry_wait 9m - - type out_oms log_level debug diff --git a/installer/conf/kube.conf b/installer/conf/kube.conf index 94fe2ef0b..6331d257e 100644 --- a/installer/conf/kube.conf +++ b/installer/conf/kube.conf @@ -11,7 +11,7 @@ #Kubernetes events type kubeevents - tag oms.api.KubeEvents.CollectionTime + tag oms.containerinsights.KubeEvents run_interval 60s log_level debug @@ -26,7 +26,7 @@ #Kubernetes services type kubeservices - tag oms.api.KubeServices.CollectionTime + tag oms.containerinsights.KubeServices run_interval 60s log_level debug @@ -62,18 +62,19 @@ max_retry_wait 9m - - type out_oms_api + + type out_oms log_level debug - num_threads 5 + num_threads 5 buffer_chunk_limit 5m buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_api_kubeevents*.buffer + buffer_path %STATE_DIR_WS%/out_oms_kubeevents*.buffer buffer_queue_limit 10 - buffer_queue_full_action drop_oldest_chunk + buffer_queue_full_action drop_oldest_chunk flush_interval 20s retry_limit 10 retry_wait 30s + max_retry_wait 9m @@ -88,8 +89,8 @@ retry_wait 30s - - type out_oms_api + + type out_oms log_level debug num_threads 5 buffer_chunk_limit 20m @@ -118,6 +119,19 @@ max_retry_wait 9m + + type out_oms_api + log_level debug + buffer_chunk_limit 20m + buffer_type file + buffer_path %STATE_DIR_WS%/out_oms_containernodeinventory*.buffer + buffer_queue_limit 20 + flush_interval 20s + retry_limit 10 + retry_wait 15s + max_retry_wait 9m + + type out_oms log_level debug diff --git a/installer/conf/td-agent-bit.conf b/installer/conf/td-agent-bit.conf index c3252a185..29c98bdf1 100644 --- a/installer/conf/td-agent-bit.conf +++ b/installer/conf/td-agent-bit.conf @@ -8,7 +8,7 @@ Name tail Tag oms.container.log.* Path /var/log/containers/*.log - DB /var/opt/microsoft/docker-cimprov/state/fblogs.db + DB /var/log/omsagent-fblogs.db Parser docker Mem_Buf_Limit 30m Path_Key filepath @@ -28,5 +28,5 @@ EnableTelemetry true TelemetryPushIntervalSeconds 300 Match oms.container.log.* - AgentVersion ciprod11292018 + AgentVersion ciprod01092019 diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go index 9876acc42..5d9269d1e 100644 --- a/source/code/go/src/plugins/oms.go +++ b/source/code/go/src/plugins/oms.go @@ -77,9 +77,10 @@ var ( // DataItem represents the object corresponding to the json that is sent by fluentbit tail plugin type DataItem struct { - LogEntry string `json:"LogEntry"` - LogEntrySource string `json:"LogEntrySource"` - LogEntryTimeStamp string `json:"LogEntryTimeStamp"` + LogEntry string `json:"LogEntry"` + LogEntrySource string `json:"LogEntrySource"` + LogEntryTimeStamp string `json:"LogEntryTimeStamp"` + LogEntryTimeOfCommand string `json:"TimeOfCommand"` ID string `json:"Id"` Image string `json:"Image"` Name string `json:"Name"` @@ -204,6 +205,10 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { start := time.Now() var dataItems []DataItem + + var maxLatency float64 + var maxLatencyContainer string + ignoreIDSet := make(map[string]bool) imageIDMap := make(map[string]string) nameIDMap := make(map[string]string) @@ -248,18 +253,32 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { Log("ContainerId %s not present in Map ", containerID) } + dataItem := DataItem{ - ID: stringMap["Id"], - LogEntry: stringMap["LogEntry"], - LogEntrySource: stringMap["LogEntrySource"], - LogEntryTimeStamp: stringMap["LogEntryTimeStamp"], - SourceSystem: stringMap["SourceSystem"], - Computer: Computer, - Image: stringMap["Image"], - Name: stringMap["Name"], + ID: stringMap["Id"], + LogEntry: stringMap["LogEntry"], + LogEntrySource: stringMap["LogEntrySource"], + LogEntryTimeStamp: stringMap["LogEntryTimeStamp"], + LogEntryTimeOfCommand: start.Format(time.RFC3339), + SourceSystem: stringMap["SourceSystem"], + Computer: Computer, + Image: stringMap["Image"], + Name: stringMap["Name"], } dataItems = append(dataItems, dataItem) + loggedTime, e := time.Parse(time.RFC3339, dataItem.LogEntryTimeStamp) + if e!= nil { + message := fmt.Sprintf("Error while converting LogEntryTimeStamp for telemetry purposes: %s", e.Error()) + Log(message) + SendException(message) + } else { + ltncy := float64(start.Sub(loggedTime) / time.Millisecond) + if ltncy >= maxLatency { + maxLatency = ltncy + maxLatencyContainer = dataItem.Name + "=" + dataItem.ID + } + } } if len(dataItems) > 0 { @@ -302,6 +321,12 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { ContainerLogTelemetryMutex.Lock() FlushedRecordsCount += float64(numRecords) FlushedRecordsTimeTaken += float64(elapsed / time.Millisecond) + + if maxLatency >= AgentLogProcessingMaxLatencyMs { + AgentLogProcessingMaxLatencyMs = maxLatency + AgentLogProcessingMaxLatencyMsContainer = maxLatencyContainer + } + ContainerLogTelemetryMutex.Unlock() } diff --git a/source/code/go/src/plugins/telemetry.go b/source/code/go/src/plugins/telemetry.go index 5952ac9ac..82f970d3a 100644 --- a/source/code/go/src/plugins/telemetry.go +++ b/source/code/go/src/plugins/telemetry.go @@ -17,6 +17,10 @@ var ( FlushedRecordsCount float64 // FlushedRecordsTimeTaken indicates the cumulative time taken to flush the records for the current period FlushedRecordsTimeTaken float64 + // This is telemetry for how old/latent logs we are processing in milliseconds (max over a period of time) + AgentLogProcessingMaxLatencyMs float64 + // This is telemetry for which container logs were latent (max over a period of time) + AgentLogProcessingMaxLatencyMsContainer string // CommonProperties indicates the dimensions that are sent with every event/metric CommonProperties map[string]string // TelemetryClient is the client used to send the telemetry @@ -35,6 +39,8 @@ const ( envAppInsightsAuth = "APPLICATIONINSIGHTS_AUTH" metricNameAvgFlushRate = "ContainerLogAvgRecordsFlushedPerSec" metricNameAvgLogGenerationRate = "ContainerLogsGeneratedPerSec" + metricNameAgentLogProcessingMaxLatencyMs = "ContainerLogsAgentSideLatencyMs" + defaultTelemetryPushIntervalSeconds = 300 eventNameContainerLogInit = "ContainerLogPluginInitialized" @@ -62,12 +68,19 @@ func SendContainerLogPluginMetrics(telemetryPushIntervalProperty string) { logRate := FlushedRecordsCount / float64(elapsed/time.Second) FlushedRecordsCount = 0.0 FlushedRecordsTimeTaken = 0.0 + logLatencyMs := AgentLogProcessingMaxLatencyMs + logLatencyMsContainer := AgentLogProcessingMaxLatencyMsContainer + AgentLogProcessingMaxLatencyMs = 0 + AgentLogProcessingMaxLatencyMsContainer = "" ContainerLogTelemetryMutex.Unlock() flushRateMetric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate) TelemetryClient.Track(flushRateMetric) logRateMetric := appinsights.NewMetricTelemetry(metricNameAvgLogGenerationRate, logRate) TelemetryClient.Track(logRateMetric) + logLatencyMetric := appinsights.NewMetricTelemetry(metricNameAgentLogProcessingMaxLatencyMs, logLatencyMs) + logLatencyMetric.Properties["Container"] = logLatencyMsContainer + TelemetryClient.Track(logLatencyMetric) start = time.Now() } } diff --git a/source/code/plugin/ApplicationInsightsUtility.rb b/source/code/plugin/ApplicationInsightsUtility.rb index 78553a83f..27660d708 100644 --- a/source/code/plugin/ApplicationInsightsUtility.rb +++ b/source/code/plugin/ApplicationInsightsUtility.rb @@ -13,12 +13,13 @@ class ApplicationInsightsUtility @@Exception = 'ExceptionEvent' @@AcsClusterType = 'ACS' @@AksClusterType = 'AKS' - @@DaemonsetControllerType = 'DaemonSet' @OmsAdminFilePath = '/etc/opt/microsoft/omsagent/conf/omsadmin.conf' @@EnvAcsResourceName = 'ACS_RESOURCE_NAME' @@EnvAksRegion = 'AKS_REGION' @@EnvAgentVersion = 'AGENT_VERSION' @@EnvApplicationInsightsKey = 'APPLICATIONINSIGHTS_AUTH' + @@EnvControllerType = 'CONTROLLER_TYPE' + @@CustomProperties = {} @@Tc = nil @@hostName = (OMS::Common.get_hostname) @@ -54,12 +55,11 @@ def initializeUtility() @@CustomProperties["ClusterName"] = clusterName @@CustomProperties["Region"] = ENV[@@EnvAksRegion] end - @@CustomProperties['ControllerType'] = @@DaemonsetControllerType - dockerInfo = DockerApiClient.dockerInfo - @@CustomProperties['DockerVersion'] = dockerInfo['Version'] - @@CustomProperties['DockerApiVersion'] = dockerInfo['ApiVersion'] + + getDockerInfo() @@CustomProperties['WorkspaceID'] = getWorkspaceId @@CustomProperties['AgentVersion'] = ENV[@@EnvAgentVersion] + @@CustomProperties['ControllerType'] = ENV[@@EnvControllerType] encodedAppInsightsKey = ENV[@@EnvApplicationInsightsKey] if !encodedAppInsightsKey.nil? decodedAppInsightsKey = Base64.decode64(encodedAppInsightsKey) @@ -70,6 +70,14 @@ def initializeUtility() end end + def getDockerInfo() + dockerInfo = DockerApiClient.dockerInfo + if (!dockerInfo.nil? && !dockerInfo.empty?) + @@CustomProperties['DockerVersion'] = dockerInfo['Version'] + @@CustomProperties['DockerApiVersion'] = dockerInfo['ApiVersion'] + end + end + def sendHeartBeatEvent(pluginName) begin eventName = pluginName + @@HeartBeat @@ -83,7 +91,7 @@ def sendHeartBeatEvent(pluginName) end end - def sendCustomEvent(pluginName, properties) + def sendCustomMetric(pluginName, properties) begin if !(@@Tc.nil?) @@Tc.track_metric 'LastProcessedContainerInventoryCount', properties['ContainerCount'], @@ -93,14 +101,16 @@ def sendCustomEvent(pluginName, properties) $log.info("AppInsights Container Count Telemetry sent successfully") end rescue => errorStr - $log.warn("Exception in AppInsightsUtility: sendCustomEvent - error: #{errorStr}") + $log.warn("Exception in AppInsightsUtility: sendCustomMetric - error: #{errorStr}") end end def sendExceptionTelemetry(errorStr) begin if @@CustomProperties.empty? || @@CustomProperties.nil? - initializeUtility + initializeUtility() + elsif @@CustomProperties['DockerVersion'].nil? + getDockerInfo() end if !(@@Tc.nil?) @@Tc.track_exception errorStr , :properties => @@CustomProperties @@ -116,11 +126,13 @@ def sendExceptionTelemetry(errorStr) def sendTelemetry(pluginName, properties) begin if @@CustomProperties.empty? || @@CustomProperties.nil? - initializeUtility + initializeUtility() + elsif @@CustomProperties['DockerVersion'].nil? + getDockerInfo() end @@CustomProperties['Computer'] = properties['Computer'] sendHeartBeatEvent(pluginName) - sendCustomEvent(pluginName, properties) + sendCustomMetric(pluginName, properties) rescue => errorStr $log.warn("Exception in AppInsightsUtility: sendTelemetry - error: #{errorStr}") end @@ -134,7 +146,9 @@ def sendMetricTelemetry(metricName, metricValue, properties) return end if @@CustomProperties.empty? || @@CustomProperties.nil? - initializeUtility + initializeUtility() + elsif @@CustomProperties['DockerVersion'].nil? + getDockerInfo() end telemetryProps = {} telemetryProps["Computer"] = @@hostName diff --git a/source/code/plugin/CAdvisorMetricsAPIClient.rb b/source/code/plugin/CAdvisorMetricsAPIClient.rb index 9e47e5a9e..3c36775af 100644 --- a/source/code/plugin/CAdvisorMetricsAPIClient.rb +++ b/source/code/plugin/CAdvisorMetricsAPIClient.rb @@ -20,9 +20,12 @@ class CAdvisorMetricsAPIClient @@rxBytesTimeLast = nil @@txBytesLast = nil @@txBytesTimeLast = nil + @@nodeCpuUsageNanoSecondsLast = nil + @@nodeCpuUsageNanoSecondsTimeLast = nil @@telemetryCpuMetricTimeTracker = DateTime.now.to_time.to_i @@telemetryMemoryMetricTimeTracker = DateTime.now.to_time.to_i - + + def initialize end @@ -73,7 +76,10 @@ def getMetrics() metricDataItems.concat(getContainerMemoryMetricItems(metricInfo, hostName, "rssBytes", "memoryRssBytes")) metricDataItems.concat(getContainerStartTimeMetricItems(metricInfo, hostName, "restartTimeEpoch")) - metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "cpu", "usageNanoCores", "cpuUsageNanoCores")) + cpuUsageNanoSecondsRate = getNodeMetricItemRate(metricInfo, hostName, "cpu", "usageCoreNanoSeconds", "cpuUsageNanoCores") + if cpuUsageNanoSecondsRate && !cpuUsageNanoSecondsRate.empty? && !cpuUsageNanoSecondsRate.nil? + metricDataItems.push(cpuUsageNanoSecondsRate) + end metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "memory", "workingSetBytes", "memoryWorkingSetBytes")) metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "memory", "rssBytes", "memoryRssBytes")) metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "network", "rxBytes", "networkRxBytes")) @@ -274,24 +280,41 @@ def getNodeMetricItemRate(metricJSON, hostName, metricCategory, metricNameToColl metricValue = node[metricCategory][metricNameToCollect] metricTime = node[metricCategory]['time'] - if !(metricNameToCollect == "rxBytes" || metricNameToCollect == "txBytes" ) - @Log.warn("getNodeMetricItemRate : rateMetric is supported only for rxBytes & txBytes and not for #{metricNameToCollect}") + if !(metricNameToCollect == "rxBytes" || metricNameToCollect == "txBytes" || metricNameToCollect == "usageCoreNanoSeconds" ) + @Log.warn("getNodeMetricItemRate : rateMetric is supported only for rxBytes, txBytes & usageCoreNanoSeconds and not for #{metricNameToCollect}") return nil elsif metricNameToCollect == "rxBytes" - if @@rxBytesLast.nil? || @@rxBytesTimeLast.nil? + if @@rxBytesLast.nil? || @@rxBytesTimeLast.nil? || @@rxBytesLast > metricValue #when kubelet is restarted the last condition will be true @@rxBytesLast = metricValue @@rxBytesTimeLast = metricTime return nil else - metricValue = ((metricValue - @@rxBytesLast) * 1.0)/(DateTime.parse(metricTime).to_time - DateTime.parse(@@rxBytesTimeLast).to_time) + metricRateValue = ((metricValue - @@rxBytesLast) * 1.0)/(DateTime.parse(metricTime).to_time - DateTime.parse(@@rxBytesTimeLast).to_time) + @@rxBytesLast = metricValue + @@rxBytesTimeLast = metricTime + metricValue = metricRateValue end - else - if @@txBytesLast.nil? || @@txBytesTimeLast.nil? + elsif metricNameToCollect == "txBytes" + if @@txBytesLast.nil? || @@txBytesTimeLast.nil? || @@txBytesLast > metricValue #when kubelet is restarted the last condition will be true @@txBytesLast = metricValue @@txBytesTimeLast = metricTime return nil else - metricValue = ((metricValue - @@txBytesLast) * 1.0)/(DateTime.parse(metricTime).to_time - DateTime.parse(@@txBytesTimeLast).to_time) + metricRateValue = ((metricValue - @@txBytesLast) * 1.0)/(DateTime.parse(metricTime).to_time - DateTime.parse(@@txBytesTimeLast).to_time) + @@txBytesLast = metricValue + @@txBytesTimeLast = metricTime + metricValue = metricRateValue + end + else + if @@nodeCpuUsageNanoSecondsLast.nil? || @@nodeCpuUsageNanoSecondsTimeLast.nil? || @@nodeCpuUsageNanoSecondsLast > metricValue #when kubelet is restarted the last condition will be true + @@nodeCpuUsageNanoSecondsLast = metricValue + @@nodeCpuUsageNanoSecondsTimeLast = metricTime + return nil + else + metricRateValue = ((metricValue - @@nodeCpuUsageNanoSecondsLast) * 1.0)/(DateTime.parse(metricTime).to_time - DateTime.parse(@@nodeCpuUsageNanoSecondsTimeLast).to_time) + @@nodeCpuUsageNanoSecondsLast = metricValue + @@nodeCpuUsageNanoSecondsTimeLast = metricTime + metricValue = metricRateValue end end diff --git a/source/code/plugin/DockerApiClient.rb b/source/code/plugin/DockerApiClient.rb index e12ef13ec..5a46b5fdb 100644 --- a/source/code/plugin/DockerApiClient.rb +++ b/source/code/plugin/DockerApiClient.rb @@ -10,10 +10,11 @@ class DockerApiClient require_relative 'DockerApiRestHelper' require_relative 'ApplicationInsightsUtility' - @@SocketPath = "/var/run/docker.sock" + @@SocketPath = "/var/run/host/docker.sock" @@ChunkSize = 4096 @@TimeoutInSeconds = 5 @@PluginName = 'ContainerInventory' + def initialize end @@ -85,7 +86,23 @@ def listContainers() containers = getResponse(request, true, false) if !containers.nil? && !containers.empty? containers.each do |container| - ids.push(container['Id']) + labels = (!container['Labels'].nil?)? container['Labels'] : container['labels'] + if !labels.nil? + labelKeys = labels.keys + dockerTypeLabel = labelKeys.find {|k| 'io.kubernetes.docker.type'.downcase == k.downcase} + if !dockerTypeLabel.nil? + dockerTypeLabelValue = labels[dockerTypeLabel] + # Checking for 'io.kubernetes.docker.type' label for docker containers to exclude the pause-amd64 containers + if !(dockerTypeLabelValue.downcase == "podsandbox".downcase) + # Case insensitive lookup for pod uid label - This is to exclude containers created using docker run and only include containers that + # are created in the pods for ContainerInventory + keyValue = labelKeys.find {|k| 'io.kubernetes.pod.uid'.downcase == k.downcase} + if !labels[keyValue].nil? + ids.push(container['Id']) + end + end + end + end end end return ids diff --git a/source/code/plugin/in_kube_events.rb b/source/code/plugin/in_kube_events.rb index 5df31df95..309dd8034 100644 --- a/source/code/plugin/in_kube_events.rb +++ b/source/code/plugin/in_kube_events.rb @@ -15,10 +15,12 @@ def initialize require_relative 'KubernetesApiClient' require_relative 'oms_common' require_relative 'omslog' + require_relative 'ApplicationInsightsUtility' + end config_param :run_interval, :time, :default => '1m' - config_param :tag, :string, :default => "oms.api.KubeEvents.CollectionTime" + config_param :tag, :string, :default => "oms.containerinsights.KubeEvents" def configure (conf) super @@ -86,7 +88,12 @@ def enumerate(eventList = nil) end record['ClusterName'] = KubernetesApiClient.getClusterName record['ClusterId'] = KubernetesApiClient.getClusterId - eventStream.add(emitTime, record) if record + wrapper = { + "DataType"=>"KUBE_EVENTS_BLOB", + "IPName"=>"ContainerInsights", + "DataItems"=>[record.each{|k,v| record[k]=v}] + } + eventStream.add(emitTime, wrapper) if wrapper end router.emit_stream(@tag, eventStream) if eventStream end @@ -94,6 +101,7 @@ def enumerate(eventList = nil) rescue => errorStr $log.warn line.dump, error: errorStr.to_s $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end @@ -110,6 +118,7 @@ def run_periodic enumerate rescue => errorStr $log.warn "in_kube_events::run_periodic: enumerate Failed to retrieve kube events: #{errorStr}" + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end @mutex.lock @@ -129,6 +138,7 @@ def getEventQueryState rescue => errorStr $log.warn $log.warn line.dump, error: errorStr.to_s $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end return eventQueryState end @@ -144,6 +154,7 @@ def writeEventQueryState(eventQueryState) rescue => errorStr $log.warn $log.warn line.dump, error: errorStr.to_s $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index edbbdd37f..a6908fc99 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -6,14 +6,18 @@ module Fluent class Kube_nodeInventory_Input < Input Plugin.register_input('kubenodeinventory', self) + @@ContainerNodeInventoryTag = 'oms.api.ContainerNodeInventory' + def initialize super require 'yaml' require 'json' require_relative 'KubernetesApiClient' + require_relative 'ApplicationInsightsUtility' require_relative 'oms_common' require_relative 'omslog' + end config_param :run_interval, :time, :default => '1m' @@ -29,6 +33,7 @@ def start @condition = ConditionVariable.new @mutex = Mutex.new @thread = Thread.new(&method(:run_periodic)) + @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i end end @@ -46,15 +51,22 @@ def enumerate currentTime = Time.now emitTime = currentTime.to_f batchTime = currentTime.utc.iso8601 - $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") - nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('nodes').body) - $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + telemetrySent = false + $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") + nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('nodes').body) + $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") begin if(!nodeInventory.empty?) eventStream = MultiEventStream.new + containerNodeInventoryEventStream = MultiEventStream.new #get node inventory nodeInventory['items'].each do |items| record = {} + # Sending records for ContainerNodeInventory + containerNodeInventoryRecord = {} + containerNodeInventoryRecord['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated + containerNodeInventoryRecord['Computer'] = items['metadata']['name'] + record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated record['Computer'] = items['metadata']['name'] record['ClusterName'] = KubernetesApiClient.getClusterName @@ -89,16 +101,40 @@ def enumerate end - record['KubeletVersion'] = items['status']['nodeInfo']['kubeletVersion'] - record['KubeProxyVersion'] = items['status']['nodeInfo']['kubeProxyVersion'] + nodeInfo = items['status']['nodeInfo'] + record['KubeletVersion'] = nodeInfo['kubeletVersion'] + record['KubeProxyVersion'] = nodeInfo['kubeProxyVersion'] + containerNodeInventoryRecord['OperatingSystem'] = nodeInfo['osImage'] + dockerVersion = nodeInfo['containerRuntimeVersion'] + dockerVersion.slice! "docker://" + containerNodeInventoryRecord['DockerVersion'] = dockerVersion + # ContainerNodeInventory data for docker version and operating system. + containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryRecord) if containerNodeInventoryRecord + wrapper = { "DataType"=>"KUBE_NODE_INVENTORY_BLOB", "IPName"=>"ContainerInsights", "DataItems"=>[record.each{|k,v| record[k]=v}] } eventStream.add(emitTime, wrapper) if wrapper + # Adding telemetry to send node telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference/60 + if (timeDifferenceInMinutes >= 5) + properties = {} + properties["Computer"] = record["Computer"] + properties["KubeletVersion"] = record["KubeletVersion"] + capacityInfo = items['status']['capacity'] + ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"] , properties) + ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"] , properties) + telemetrySent = true + end end router.emit_stream(@tag, eventStream) if eventStream + router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream + if telemetrySent == true + @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i + end @@istestvar = ENV['ISTEST'] if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0) $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") @@ -107,6 +143,7 @@ def enumerate rescue => errorStr $log.warn "Failed to retrieve node inventory: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end @@ -123,6 +160,7 @@ def run_periodic enumerate rescue => errorStr $log.warn "in_kube_nodes::run_periodic: enumerate Failed to retrieve node inventory: #{errorStr}" + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end @mutex.lock diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index ec76bac61..eaf14b035 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -10,10 +10,13 @@ def initialize super require 'yaml' require 'json' + require 'set' require_relative 'KubernetesApiClient' + require_relative 'ApplicationInsightsUtility' require_relative 'oms_common' require_relative 'omslog' + end config_param :run_interval, :time, :default => '1m' @@ -29,6 +32,7 @@ def start @condition = ConditionVariable.new @mutex = Mutex.new @thread = Thread.new(&method(:run_periodic)) + @@podTelemetryTimeTracker = DateTime.now.to_time.to_i end end @@ -63,6 +67,7 @@ def enumerate(podList = nil) rescue => errorStr $log.warn "Failed in enumerate pod inventory: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end @@ -71,6 +76,8 @@ def parse_and_emit_records(podInventory, serviceList) emitTime = currentTime.to_f batchTime = currentTime.utc.iso8601 eventStream = MultiEventStream.new + controllerSet = Set.new [] + telemetryFlush = false begin #begin block start podInventory['items'].each do |items| #podInventory block start records = [] @@ -78,6 +85,7 @@ def parse_and_emit_records(podInventory, serviceList) record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated record['Name'] = items['metadata']['name'] podNameSpace = items['metadata']['namespace'] + if podNameSpace.eql?("kube-system") && !items['metadata'].key?("ownerReferences") # The above case seems to be the only case where you have horizontal scaling of pods # but no controller, in which case cAdvisor picks up kubernetes.io/config.hash @@ -129,9 +137,18 @@ def parse_and_emit_records(podInventory, serviceList) record['ClusterId'] = KubernetesApiClient.getClusterId record['ClusterName'] = KubernetesApiClient.getClusterName record['ServiceName'] = getServiceNameFromLabels(items['metadata']['namespace'], items['metadata']['labels'], serviceList) + # Adding telemetry to send pod telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference/60 + if (timeDifferenceInMinutes >= 5) + telemetryFlush = true + end if !items['metadata']['ownerReferences'].nil? record['ControllerKind'] = items['metadata']['ownerReferences'][0]['kind'] record['ControllerName'] = items['metadata']['ownerReferences'][0]['name'] + if telemetryFlush == true + controllerSet.add(record['ControllerKind'] + record['ControllerName']) + end end podRestartCount = 0 record['PodRestartCount'] = 0 @@ -191,6 +208,12 @@ def parse_and_emit_records(podInventory, serviceList) end end #podInventory block end router.emit_stream(@tag, eventStream) if eventStream + if telemetryFlush == true + ApplicationInsightsUtility.sendHeartBeatEvent("KubePodInventory") + ApplicationInsightsUtility.sendMetricTelemetry("PodCount", podInventory['items'].length , {}) + ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", controllerSet.length , {}) + @@podTelemetryTimeTracker = DateTime.now.to_time.to_i + end @@istestvar = ENV['ISTEST'] if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0) $log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") @@ -198,6 +221,7 @@ def parse_and_emit_records(podInventory, serviceList) rescue => errorStr $log.warn "Failed in parse_and_emit_record pod inventory: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end #begin block end end @@ -214,6 +238,7 @@ def run_periodic enumerate rescue => errorStr $log.warn "in_kube_podinventory::run_periodic: enumerate Failed to retrieve pod inventory: #{errorStr}" + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end @mutex.lock @@ -248,6 +273,7 @@ def getServiceNameFromLabels(namespace, labels, serviceList) rescue => errorStr $log.warn "Failed to retrieve service name from labels: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end return serviceName end diff --git a/source/code/plugin/in_kube_services.rb b/source/code/plugin/in_kube_services.rb index 9a33f4581..e1bb93f30 100644 --- a/source/code/plugin/in_kube_services.rb +++ b/source/code/plugin/in_kube_services.rb @@ -14,10 +14,12 @@ def initialize require_relative 'KubernetesApiClient' require_relative 'oms_common' require_relative 'omslog' + require_relative 'ApplicationInsightsUtility' + end config_param :run_interval, :time, :default => '1m' - config_param :tag, :string, :default => "oms.api.KubeServices.CollectionTime" + config_param :tag, :string, :default => "oms.containerinsights.KubeServices" def configure (conf) super @@ -63,13 +65,19 @@ def enumerate record['ClusterIP'] = items['spec']['clusterIP'] record['ServiceType'] = items['spec']['type'] # : Add ports and status fields - eventStream.add(emitTime, record) if record + wrapper = { + "DataType"=>"KUBE_SERVICES_BLOB", + "IPName"=>"ContainerInsights", + "DataItems"=>[record.each{|k,v| record[k]=v}] + } + eventStream.add(emitTime, wrapper) if wrapper end router.emit_stream(@tag, eventStream) if eventStream end rescue => errorStr $log.warn line.dump, error: errorStr.to_s $log.debug_backtrace(e.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end @@ -86,6 +94,7 @@ def run_periodic enumerate rescue => errorStr $log.warn "in_kube_services::run_periodic: enumerate Failed to kube services: #{errorStr}" + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end @mutex.lock