diff --git a/installer/conf/kube.conf b/installer/conf/kube.conf index 454df6e91..0dfa3710e 100644 --- a/installer/conf/kube.conf +++ b/installer/conf/kube.conf @@ -47,12 +47,44 @@ log_level debug +#cadvisor perf- Windows nodes + + type wincadvisorperf + tag oms.api.wincadvisorperf + run_interval 60s + log_level debug + + type filter_inventory2mdm custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westEurope log_level info +#custom_metrics_mdm filter plugin for perf data from windows nodes + + type filter_cadvisor2mdm + custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westEurope + metrics_to_collect cpuUsageNanoCores,memoryWorkingSetBytes + log_level info + + + + type out_mdm + log_level debug + num_threads 5 + buffer_chunk_limit 20m + buffer_type file + buffer_path %STATE_DIR_WS%/out_mdm_cdvisorperf*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 30s + max_retry_wait 9m + retry_mdm_post_wait_minutes 60 + + type out_oms log_level debug @@ -168,3 +200,18 @@ max_retry_wait 9m retry_mdm_post_wait_minutes 60 + + + type out_oms + log_level debug + num_threads 5 + buffer_chunk_limit 20m + buffer_type file + buffer_path %STATE_DIR_WS%/out_oms_api_wincadvisorperf*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 30s + max_retry_wait 9m + \ No newline at end of file diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data index c263aa505..9c4d563f8 100644 --- a/installer/datafiles/base_container.data +++ b/installer/datafiles/base_container.data @@ -34,6 +34,7 @@ MAINTAINER: 'Microsoft Corporation' /opt/microsoft/omsagent/plugin/CAdvisorMetricsAPIClient.rb; source/code/plugin/CAdvisorMetricsAPIClient.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_perf.rb; source/code/plugin/in_kube_perf.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_cadvisor_perf.rb; source/code/plugin/in_cadvisor_perf.rb; 644; root; root +/opt/microsoft/omsagent/plugin/in_win_cadvisor_perf.rb; source/code/plugin/in_win_cadvisor_perf.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_services.rb; source/code/plugin/in_kube_services.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_nodes.rb; source/code/plugin/in_kube_nodes.rb; 644; root; root /opt/microsoft/omsagent/plugin/filter_inventory2mdm.rb; source/code/plugin/filter_inventory2mdm.rb; 644; root; root diff --git a/source/code/plugin/ApplicationInsightsUtility.rb b/source/code/plugin/ApplicationInsightsUtility.rb index 5c5e92a6c..5dc2bfab8 100644 --- a/source/code/plugin/ApplicationInsightsUtility.rb +++ b/source/code/plugin/ApplicationInsightsUtility.rb @@ -2,209 +2,222 @@ # frozen_string_literal: true class ApplicationInsightsUtility - require_relative 'lib/application_insights' - require_relative 'omslog' - require_relative 'DockerApiClient' - require_relative 'oms_common' - require 'json' - require 'base64' + require_relative "lib/application_insights" + require_relative "omslog" + require_relative "DockerApiClient" + require_relative "oms_common" + require "json" + require "base64" - @@HeartBeat = 'HeartBeatEvent' - @@Exception = 'ExceptionEvent' - @@AcsClusterType = 'ACS' - @@AksClusterType = 'AKS' - @OmsAdminFilePath = '/etc/opt/microsoft/omsagent/conf/omsadmin.conf' - @@EnvAcsResourceName = 'ACS_RESOURCE_NAME' - @@EnvAksRegion = 'AKS_REGION' - @@EnvAgentVersion = 'AGENT_VERSION' - @@EnvApplicationInsightsKey = 'APPLICATIONINSIGHTS_AUTH' - @@EnvControllerType = 'CONTROLLER_TYPE' + @@HeartBeat = "HeartBeatEvent" + @@Exception = "ExceptionEvent" + @@AcsClusterType = "ACS" + @@AksClusterType = "AKS" + @OmsAdminFilePath = "/etc/opt/microsoft/omsagent/conf/omsadmin.conf" + @@EnvAcsResourceName = "ACS_RESOURCE_NAME" + @@EnvAksRegion = "AKS_REGION" + @@EnvAgentVersion = "AGENT_VERSION" + @@EnvApplicationInsightsKey = "APPLICATIONINSIGHTS_AUTH" + @@EnvControllerType = "CONTROLLER_TYPE" - @@CustomProperties = {} - @@Tc = nil - @@hostName = (OMS::Common.get_hostname) + @@CustomProperties = {} + @@Tc = nil + @@hostName = (OMS::Common.get_hostname) - def initialize - end + def initialize + end - class << self - #Set default properties for telemetry event - def initializeUtility() - begin - resourceInfo = ENV['AKS_RESOURCE_ID'] - if resourceInfo.nil? || resourceInfo.empty? - @@CustomProperties["ACSResourceName"] = ENV[@@EnvAcsResourceName] - @@CustomProperties["ClusterType"] = @@AcsClusterType - @@CustomProperties["SubscriptionID"] = "" - @@CustomProperties["ResourceGroupName"] = "" - @@CustomProperties["ClusterName"] = "" - @@CustomProperties["Region"] = "" - else - @@CustomProperties["AKS_RESOURCE_ID"] = resourceInfo - begin - splitStrings = resourceInfo.split('/') - subscriptionId = splitStrings[2] - resourceGroupName = splitStrings[4] - clusterName = splitStrings[8] - rescue => errorStr - $log.warn("Exception in AppInsightsUtility: parsing AKS resourceId: #{resourceInfo}, error: #{errorStr}") - end - @@CustomProperties["ClusterType"] = @@AksClusterType - @@CustomProperties["SubscriptionID"] = subscriptionId - @@CustomProperties["ResourceGroupName"] = resourceGroupName - @@CustomProperties["ClusterName"] = clusterName - @@CustomProperties["Region"] = ENV[@@EnvAksRegion] - end + class << self + #Set default properties for telemetry event + def initializeUtility() + begin + resourceInfo = ENV["AKS_RESOURCE_ID"] + if resourceInfo.nil? || resourceInfo.empty? + @@CustomProperties["ACSResourceName"] = ENV[@@EnvAcsResourceName] + @@CustomProperties["ClusterType"] = @@AcsClusterType + @@CustomProperties["SubscriptionID"] = "" + @@CustomProperties["ResourceGroupName"] = "" + @@CustomProperties["ClusterName"] = "" + @@CustomProperties["Region"] = "" + else + @@CustomProperties["AKS_RESOURCE_ID"] = resourceInfo + begin + splitStrings = resourceInfo.split("/") + subscriptionId = splitStrings[2] + resourceGroupName = splitStrings[4] + clusterName = splitStrings[8] + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: parsing AKS resourceId: #{resourceInfo}, error: #{errorStr}") + end + @@CustomProperties["ClusterType"] = @@AksClusterType + @@CustomProperties["SubscriptionID"] = subscriptionId + @@CustomProperties["ResourceGroupName"] = resourceGroupName + @@CustomProperties["ClusterName"] = clusterName + @@CustomProperties["Region"] = ENV[@@EnvAksRegion] + end - getDockerInfo() - @@CustomProperties['WorkspaceID'] = getWorkspaceId - @@CustomProperties['AgentVersion'] = ENV[@@EnvAgentVersion] - @@CustomProperties['ControllerType'] = ENV[@@EnvControllerType] - encodedAppInsightsKey = ENV[@@EnvApplicationInsightsKey] + #Commenting it for now from initilize method, we need to pivot all telemetry off of kubenode docker version + #getDockerInfo() + @@CustomProperties["WorkspaceID"] = getWorkspaceId + @@CustomProperties["AgentVersion"] = ENV[@@EnvAgentVersion] + @@CustomProperties["ControllerType"] = ENV[@@EnvControllerType] + encodedAppInsightsKey = ENV[@@EnvApplicationInsightsKey] - #Check if telemetry is turned off - telemetryOffSwitch = ENV['DISABLE_TELEMETRY'] - if telemetryOffSwitch && !telemetryOffSwitch.nil? && !telemetryOffSwitch.empty? && telemetryOffSwitch.downcase == "true".downcase - $log.warn("AppInsightsUtility: Telemetry is disabled") - @@Tc = ApplicationInsights::TelemetryClient.new - elsif !encodedAppInsightsKey.nil? - decodedAppInsightsKey = Base64.decode64(encodedAppInsightsKey) - @@Tc = ApplicationInsights::TelemetryClient.new decodedAppInsightsKey - - end - rescue => errorStr - $log.warn("Exception in AppInsightsUtility: initilizeUtility - error: #{errorStr}") - end + #Check if telemetry is turned off + telemetryOffSwitch = ENV["DISABLE_TELEMETRY"] + if telemetryOffSwitch && !telemetryOffSwitch.nil? && !telemetryOffSwitch.empty? && telemetryOffSwitch.downcase == "true".downcase + $log.warn("AppInsightsUtility: Telemetry is disabled") + @@Tc = ApplicationInsights::TelemetryClient.new + elsif !encodedAppInsightsKey.nil? + decodedAppInsightsKey = Base64.decode64(encodedAppInsightsKey) + @@Tc = ApplicationInsights::TelemetryClient.new decodedAppInsightsKey end + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: initilizeUtility - error: #{errorStr}") + end + end - def getDockerInfo() - dockerInfo = DockerApiClient.dockerInfo - if (!dockerInfo.nil? && !dockerInfo.empty?) - @@CustomProperties['DockerVersion'] = dockerInfo['Version'] - @@CustomProperties['DockerApiVersion'] = dockerInfo['ApiVersion'] - end - end + def getDockerInfo() + dockerInfo = DockerApiClient.dockerInfo + if (!dockerInfo.nil? && !dockerInfo.empty?) + @@CustomProperties["DockerVersion"] = dockerInfo["Version"] + #@@CustomProperties["DockerApiVersion"] = dockerInfo["ApiVersion"] + end + end - def sendHeartBeatEvent(pluginName) - begin - eventName = pluginName + @@HeartBeat - if !(@@Tc.nil?) - @@Tc.track_event eventName , :properties => @@CustomProperties - @@Tc.flush - $log.info("AppInsights Heartbeat Telemetry sent successfully") - end - rescue =>errorStr - $log.warn("Exception in AppInsightsUtility: sendHeartBeatEvent - error: #{errorStr}") - end + def sendHeartBeatEvent(pluginName) + begin + eventName = pluginName + @@HeartBeat + if !(@@Tc.nil?) + @@Tc.track_event eventName, :properties => @@CustomProperties + @@Tc.flush + $log.info("AppInsights Heartbeat Telemetry sent successfully") end + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: sendHeartBeatEvent - error: #{errorStr}") + end + end - def sendLastProcessedContainerInventoryCountMetric(pluginName, properties) - begin - if !(@@Tc.nil?) - @@Tc.track_metric 'LastProcessedContainerInventoryCount', properties['ContainerCount'], - :kind => ApplicationInsights::Channel::Contracts::DataPointType::MEASUREMENT, - :properties => @@CustomProperties - @@Tc.flush - $log.info("AppInsights Container Count Telemetry sent successfully") - end - rescue => errorStr - $log.warn("Exception in AppInsightsUtility: sendCustomMetric - error: #{errorStr}") - end + def sendLastProcessedContainerInventoryCountMetric(pluginName, properties) + begin + if !(@@Tc.nil?) + @@Tc.track_metric "LastProcessedContainerInventoryCount", properties["ContainerCount"], + :kind => ApplicationInsights::Channel::Contracts::DataPointType::MEASUREMENT, + :properties => @@CustomProperties + @@Tc.flush + $log.info("AppInsights Container Count Telemetry sent successfully") end + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: sendCustomMetric - error: #{errorStr}") + end + end - def sendCustomEvent(eventName, properties) - begin - if @@CustomProperties.empty? || @@CustomProperties.nil? - initializeUtility() - end - if !(@@Tc.nil?) - @@Tc.track_event eventName, :properties => @@CustomProperties - @@Tc.flush - $log.info("AppInsights Custom Event #{eventName} sent successfully") - end - rescue => errorStr - $log.warn("Exception in AppInsightsUtility: sendCustomEvent - error: #{errorStr}") - end + def sendCustomEvent(eventName, properties) + begin + if @@CustomProperties.empty? || @@CustomProperties.nil? + initializeUtility() + end + telemetryProps = {} + # add common dimensions + @@CustomProperties.each { |k, v| telemetryProps[k] = v } + # add passed-in dimensions if any + if (!properties.nil? && !properties.empty?) + properties.each { |k, v| telemetryProps[k] = v } + end + if !(@@Tc.nil?) + @@Tc.track_event eventName, :properties => telemetryProps + @@Tc.flush + $log.info("AppInsights Custom Event #{eventName} sent successfully") end + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: sendCustomEvent - error: #{errorStr}") + end + end - def sendExceptionTelemetry(errorStr) - begin - if @@CustomProperties.empty? || @@CustomProperties.nil? - initializeUtility() - elsif @@CustomProperties['DockerVersion'].nil? - getDockerInfo() - end - if !(@@Tc.nil?) - @@Tc.track_exception errorStr , :properties => @@CustomProperties - @@Tc.flush - $log.info("AppInsights Exception Telemetry sent successfully") - end - rescue => errorStr - $log.warn("Exception in AppInsightsUtility: sendExceptionTelemetry - error: #{errorStr}") - end + def sendExceptionTelemetry(errorStr, properties = nil) + begin + if @@CustomProperties.empty? || @@CustomProperties.nil? + initializeUtility() + elsif @@CustomProperties["DockerVersion"].nil? + getDockerInfo() + end + telemetryProps = {} + # add common dimensions + @@CustomProperties.each { |k, v| telemetryProps[k] = v } + # add passed-in dimensions if any + if (!properties.nil? && !properties.empty?) + properties.each { |k, v| telemetryProps[k] = v } + end + if !(@@Tc.nil?) + @@Tc.track_exception errorStr, :properties => telemetryProps + @@Tc.flush + $log.info("AppInsights Exception Telemetry sent successfully") end + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: sendExceptionTelemetry - error: #{errorStr}") + end + end - #Method to send heartbeat and container inventory count - def sendTelemetry(pluginName, properties) - begin - if @@CustomProperties.empty? || @@CustomProperties.nil? - initializeUtility() - elsif @@CustomProperties['DockerVersion'].nil? - getDockerInfo() - end - @@CustomProperties['Computer'] = properties['Computer'] - sendHeartBeatEvent(pluginName) - sendLastProcessedContainerInventoryCountMetric(pluginName, properties) - rescue => errorStr - $log.warn("Exception in AppInsightsUtility: sendTelemetry - error: #{errorStr}") - end + #Method to send heartbeat and container inventory count + def sendTelemetry(pluginName, properties) + begin + if @@CustomProperties.empty? || @@CustomProperties.nil? + initializeUtility() + elsif @@CustomProperties["DockerVersion"].nil? + getDockerInfo() end + @@CustomProperties["Computer"] = properties["Computer"] + sendHeartBeatEvent(pluginName) + sendLastProcessedContainerInventoryCountMetric(pluginName, properties) + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: sendTelemetry - error: #{errorStr}") + end + end - #Method to send metric. It will merge passed-in properties with common custom properties - def sendMetricTelemetry(metricName, metricValue, properties) - begin - if (metricName.empty? || metricName.nil?) - $log.warn("SendMetricTelemetry: metricName is missing") - return - end - if @@CustomProperties.empty? || @@CustomProperties.nil? - initializeUtility() - elsif @@CustomProperties['DockerVersion'].nil? - getDockerInfo() - end - telemetryProps = {} - telemetryProps["Computer"] = @@hostName - # add common dimensions - @@CustomProperties.each{ |k,v| telemetryProps[k]=v} - # add passed-in dimensions if any - if (!properties.nil? && !properties.empty?) - properties.each{ |k,v| telemetryProps[k]=v} - end - if !(@@Tc.nil?) - @@Tc.track_metric metricName, metricValue, - :kind => ApplicationInsights::Channel::Contracts::DataPointType::MEASUREMENT, - :properties => telemetryProps - @@Tc.flush - $log.info("AppInsights metric Telemetry #{metricName} sent successfully") - end - rescue => errorStr - $log.warn("Exception in AppInsightsUtility: sendMetricTelemetry - error: #{errorStr}") - end + #Method to send metric. It will merge passed-in properties with common custom properties + def sendMetricTelemetry(metricName, metricValue, properties) + begin + if (metricName.empty? || metricName.nil?) + $log.warn("SendMetricTelemetry: metricName is missing") + return end + if @@CustomProperties.empty? || @@CustomProperties.nil? + initializeUtility() + elsif @@CustomProperties["DockerVersion"].nil? + getDockerInfo() + end + telemetryProps = {} + # add common dimensions + @@CustomProperties.each { |k, v| telemetryProps[k] = v } + # add passed-in dimensions if any + if (!properties.nil? && !properties.empty?) + properties.each { |k, v| telemetryProps[k] = v } + end + if !(@@Tc.nil?) + @@Tc.track_metric metricName, metricValue, + :kind => ApplicationInsights::Channel::Contracts::DataPointType::MEASUREMENT, + :properties => telemetryProps + @@Tc.flush + $log.info("AppInsights metric Telemetry #{metricName} sent successfully") + end + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: sendMetricTelemetry - error: #{errorStr}") + end + end - def getWorkspaceId() - begin - adminConf = {} - confFile = File.open(@OmsAdminFilePath, "r") - confFile.each_line do |line| - splitStrings = line.split('=') - adminConf[splitStrings[0]] = splitStrings[1] - end - workspaceId = adminConf['WORKSPACE_ID'] - return workspaceId - rescue => errorStr - $log.warn("Exception in AppInsightsUtility: getWorkspaceId - error: #{errorStr}") - end + def getWorkspaceId() + begin + adminConf = {} + confFile = File.open(@OmsAdminFilePath, "r") + confFile.each_line do |line| + splitStrings = line.split("=") + adminConf[splitStrings[0]] = splitStrings[1] end + workspaceId = adminConf["WORKSPACE_ID"] + return workspaceId + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: getWorkspaceId - error: #{errorStr}") + end end -end \ No newline at end of file + end +end diff --git a/source/code/plugin/CAdvisorMetricsAPIClient.rb b/source/code/plugin/CAdvisorMetricsAPIClient.rb index 3c36775af..8b4fd9fcf 100644 --- a/source/code/plugin/CAdvisorMetricsAPIClient.rb +++ b/source/code/plugin/CAdvisorMetricsAPIClient.rb @@ -2,424 +2,628 @@ # frozen_string_literal: true class CAdvisorMetricsAPIClient - - require 'json' - require 'logger' - require 'net/http' - require 'net/https' - require 'uri' - require 'date' - - require_relative 'oms_common' - require_relative 'KubernetesApiClient' - require_relative 'ApplicationInsightsUtility' - - @LogPath = "/var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt" - @Log = Logger.new(@LogPath, 2, 10*1048576) #keep last 2 files, max log file size = 10M - @@rxBytesLast = nil - @@rxBytesTimeLast = nil - @@txBytesLast = nil - @@txBytesTimeLast = nil - @@nodeCpuUsageNanoSecondsLast = nil - @@nodeCpuUsageNanoSecondsTimeLast = nil - @@telemetryCpuMetricTimeTracker = DateTime.now.to_time.to_i - @@telemetryMemoryMetricTimeTracker = DateTime.now.to_time.to_i - - - def initialize + require "json" + require "logger" + require "net/http" + require "net/https" + require "uri" + require "date" + + require_relative "oms_common" + require_relative "KubernetesApiClient" + require_relative "ApplicationInsightsUtility" + + @LogPath = "/var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt" + @Log = Logger.new(@LogPath, 2, 10 * 1048576) #keep last 2 files, max log file size = 10M + # @@rxBytesLast = nil + # @@rxBytesTimeLast = nil + # @@txBytesLast = nil + # @@txBytesTimeLast = nil + @@nodeCpuUsageNanoSecondsLast = nil + @@nodeCpuUsageNanoSecondsTimeLast = nil + @@winNodeCpuUsageNanoSecondsLast = {} + @@winNodeCpuUsageNanoSecondsTimeLast = {} + @@winContainerCpuUsageNanoSecondsLast = {} + @@winContainerCpuUsageNanoSecondsTimeLast = {} + @@winContainerPrevMetricRate = {} + @@linuxNodePrevMetricRate = nil + @@winNodePrevMetricRate = {} + @@telemetryCpuMetricTimeTracker = DateTime.now.to_time.to_i + @@telemetryMemoryMetricTimeTracker = DateTime.now.to_time.to_i + + #Containers a hash of node name and the last time telemetry was sent for this node + @@nodeTelemetryTimeTracker = {} + + # Keeping track of containers so that can delete the container from the container cpu cache when the container is deleted + # as a part of the cleanup routine + @@winContainerIdCache = [] + + def initialize + end + + class << self + def getSummaryStatsFromCAdvisor(winNode) + headers = {} + response = nil + @Log.info "Getting CAdvisor Uri" + begin + cAdvisorUri = getCAdvisorUri(winNode) + if !cAdvisorUri.nil? + uri = URI.parse(cAdvisorUri) + http = Net::HTTP.new(uri.host, uri.port) + http.use_ssl = false + + cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri) + response = http.request(cAdvisorApiRequest) + @Log.info "Got response code #{response.code} from #{uri.request_uri}" + end + rescue => error + @Log.warn("CAdvisor api request failed: #{error}") + telemetryProps = {} + telemetryProps["Computer"] = winNode["Hostname"] + ApplicationInsightsUtility.sendExceptionTelemetry(error, telemetryProps) + end + return response + end + + def getCAdvisorUri(winNode) + begin + defaultHost = "http://localhost:10255" + relativeUri = "/stats/summary" + if !winNode.nil? + nodeIP = winNode["InternalIP"] + else + nodeIP = ENV["NODE_IP"] + end + if !nodeIP.nil? + @Log.info("Using #{nodeIP + relativeUri} for CAdvisor Uri") + return "http://#{nodeIP}:10255" + relativeUri + else + @Log.warn ("NODE_IP environment variable not set. Using default as : #{defaultHost + relativeUri} ") + if !winNode.nil? + return nil + else + return defaultHost + relativeUri + end + end + end + end + + def getMetrics(winNode = nil) + metricDataItems = [] + begin + if !winNode.nil? + hostName = winNode["Hostname"] + operatingSystem = "Windows" + else + hostName = (OMS::Common.get_hostname) + operatingSystem = "Linux" + end + cAdvisorStats = getSummaryStatsFromCAdvisor(winNode) + if !cAdvisorStats.nil? + metricInfo = JSON.parse(cAdvisorStats.body) + end + if !metricInfo.nil? + metricDataItems.concat(getContainerMemoryMetricItems(metricInfo, hostName, "workingSetBytes", "memoryWorkingSetBytes")) + metricDataItems.concat(getContainerStartTimeMetricItems(metricInfo, hostName, "restartTimeEpoch")) + + if operatingSystem == "Linux" + metricDataItems.concat(getContainerCpuMetricItems(metricInfo, hostName, "usageNanoCores", "cpuUsageNanoCores")) + metricDataItems.concat(getContainerMemoryMetricItems(metricInfo, hostName, "rssBytes", "memoryRssBytes")) + metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "memory", "rssBytes", "memoryRssBytes")) + elsif operatingSystem == "Windows" + containerCpuUsageNanoSecondsRate = getContainerCpuMetricItemRate(metricInfo, hostName, "usageCoreNanoSeconds", "cpuUsageNanoCores") + if containerCpuUsageNanoSecondsRate && !containerCpuUsageNanoSecondsRate.empty? && !containerCpuUsageNanoSecondsRate.nil? + metricDataItems.concat(containerCpuUsageNanoSecondsRate) end - - class << self - def getSummaryStatsFromCAdvisor() - headers = {} - response = nil - @Log.info 'Getting CAdvisor Uri' - begin - cAdvisorUri = getCAdvisorUri() - if !cAdvisorUri.nil? - uri = URI.parse(cAdvisorUri) - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = false - - cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri) - response = http.request(cAdvisorApiRequest) - @Log.info "Got response code #{response.code} from #{uri.request_uri}" - end - rescue => error - @Log.warn("CAdvisor api request failed: #{error}") - end - return response - end - - def getCAdvisorUri() - begin - defaultHost = "http://localhost:10255" - relativeUri = "/stats/summary" - nodeIP = ENV['NODE_IP'] - if !nodeIP.nil? - @Log.info("Using #{nodeIP + relativeUri} for CAdvisor Uri") - return "http://#{nodeIP}:10255" + relativeUri - else - @Log.warn ("NODE_IP environment variable not set. Using default as : #{defaultHost + relativeUri} ") - return defaultHost + relativeUri - end - end - end - - def getMetrics() - metricDataItems = [] - begin - hostName = (OMS::Common.get_hostname) - metricInfo = JSON.parse(getSummaryStatsFromCAdvisor().body) - metricDataItems.concat(getContainerCpuMetricItems(metricInfo, hostName, "usageNanoCores","cpuUsageNanoCores")) - metricDataItems.concat(getContainerMemoryMetricItems(metricInfo, hostName, "workingSetBytes", "memoryWorkingSetBytes")) - metricDataItems.concat(getContainerMemoryMetricItems(metricInfo, hostName, "rssBytes", "memoryRssBytes")) - metricDataItems.concat(getContainerStartTimeMetricItems(metricInfo, hostName, "restartTimeEpoch")) - - cpuUsageNanoSecondsRate = getNodeMetricItemRate(metricInfo, hostName, "cpu", "usageCoreNanoSeconds", "cpuUsageNanoCores") - if cpuUsageNanoSecondsRate && !cpuUsageNanoSecondsRate.empty? && !cpuUsageNanoSecondsRate.nil? - metricDataItems.push(cpuUsageNanoSecondsRate) - end - metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "memory", "workingSetBytes", "memoryWorkingSetBytes")) - metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "memory", "rssBytes", "memoryRssBytes")) - metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "network", "rxBytes", "networkRxBytes")) - metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "network", "txBytes", "networkTxBytes")) - metricDataItems.push(getNodeLastRebootTimeMetric(metricInfo, hostName, "restartTimeEpoch")) - - networkRxRate = getNodeMetricItemRate(metricInfo, hostName, "network", "rxBytes", "networkRxBytesPerSec") - if networkRxRate && !networkRxRate.empty? && !networkRxRate.nil? - metricDataItems.push(networkRxRate) - end - networkTxRate = getNodeMetricItemRate(metricInfo, hostName, "network", "txBytes", "networkTxBytesPerSec") - if networkTxRate && !networkTxRate.empty? && !networkTxRate.nil? - metricDataItems.push(networkTxRate) - end - - - rescue => error - @Log.warn("getContainerMetrics failed: #{error}") - return metricDataItems - end - return metricDataItems - end + end - def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, metricNametoReturn) - metricItems = [] - clusterId = KubernetesApiClient.getClusterId - timeDifference = (DateTime.now.to_time.to_i - @@telemetryCpuMetricTimeTracker).abs - timeDifferenceInMinutes = timeDifference/60 - begin - metricInfo = metricJSON - metricInfo['pods'].each do |pod| - podUid = pod['podRef']['uid'] - podName = pod['podRef']['name'] - podNamespace = pod['podRef']['namespace'] - - if (!pod['containers'].nil?) - pod['containers'].each do |container| - #cpu metric - containerName = container['name'] - metricValue = container['cpu'][cpuMetricNameToCollect] - metricTime = container['cpu']['time'] - metricItem = {} - metricItem['DataItems'] = [] - - metricProps = {} - metricProps['Timestamp'] = metricTime - metricProps['Host'] = hostName - metricProps['ObjectName'] = "K8SContainer" - metricProps['InstanceName'] = clusterId + "/" + podUid + "/" + containerName - - metricProps['Collections'] = [] - metricCollections = {} - metricCollections['CounterName'] = metricNametoReturn - metricCollections['Value'] = metricValue - - metricProps['Collections'].push(metricCollections) - metricItem['DataItems'].push(metricProps) - metricItems.push(metricItem) - #Telemetry about agent performance - begin - # we can only do this much now. Ideally would like to use the docker image repository to find our pods/containers - # cadvisor does not have pod/container metadata. so would need more work to cache as pv & use - if (podName.downcase.start_with?('omsagent-') && podNamespace.eql?("kube-system") && containerName.downcase.start_with?('omsagent') && metricNametoReturn.eql?("cpuUsageNanoCores")) - - if (timeDifferenceInMinutes >= 10) - telemetryProps = {} - telemetryProps['PodName'] = podName - telemetryProps['ContainerName'] = containerName - ApplicationInsightsUtility.sendMetricTelemetry(metricNametoReturn, metricValue, telemetryProps) - end - end - rescue => errorStr - $log.warn("Exception while generating Telemetry from getcontainerCpuMetricItems failed: #{errorStr} for metric #{cpuMetricNameToCollect}") - end - end - end - end - # reset time outside pod iterator as we use one timer per metric for 2 pods (ds & rs) - if (timeDifferenceInMinutes >= 10 && metricNametoReturn.eql?("cpuUsageNanoCores")) - @@telemetryCpuMetricTimeTracker = DateTime.now.to_time.to_i - end - rescue => error - @Log.warn("getcontainerCpuMetricItems failed: #{error} for metric #{cpuMetricNameToCollect}") - return metricItems - end - return metricItems - end + cpuUsageNanoSecondsRate = getNodeMetricItemRate(metricInfo, hostName, "cpu", "usageCoreNanoSeconds", "cpuUsageNanoCores", operatingSystem) + if cpuUsageNanoSecondsRate && !cpuUsageNanoSecondsRate.empty? && !cpuUsageNanoSecondsRate.nil? + metricDataItems.push(cpuUsageNanoSecondsRate) + end + metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "memory", "workingSetBytes", "memoryWorkingSetBytes")) - def getContainerMemoryMetricItems(metricJSON, hostName, memoryMetricNameToCollect, metricNametoReturn) - metricItems = [] - clusterId = KubernetesApiClient.getClusterId - timeDifference = (DateTime.now.to_time.to_i - @@telemetryMemoryMetricTimeTracker).abs - timeDifferenceInMinutes = timeDifference/60 - begin - metricInfo = metricJSON - metricInfo['pods'].each do |pod| - podUid = pod['podRef']['uid'] - podName = pod['podRef']['name'] - podNamespace = pod['podRef']['namespace'] - if (!pod['containers'].nil?) - pod['containers'].each do |container| - containerName = container['name'] - metricValue = container['memory'][memoryMetricNameToCollect] - metricTime = container['memory']['time'] - - metricItem = {} - metricItem['DataItems'] = [] - - metricProps = {} - metricProps['Timestamp'] = metricTime - metricProps['Host'] = hostName - metricProps['ObjectName'] = "K8SContainer" - metricProps['InstanceName'] = clusterId + "/" + podUid + "/" + containerName - - metricProps['Collections'] = [] - metricCollections = {} - metricCollections['CounterName'] = metricNametoReturn - metricCollections['Value'] = metricValue - - metricProps['Collections'].push(metricCollections) - metricItem['DataItems'].push(metricProps) - metricItems.push(metricItem) - #Telemetry about agent performance - begin - # we can only do this much now. Ideally would like to use the docker image repository to find our pods/containers - # cadvisor does not have pod/container metadata. so would need more work to cache as pv & use - if (podName.downcase.start_with?('omsagent-') && podNamespace.eql?("kube-system") && containerName.downcase.start_with?('omsagent') && metricNametoReturn.eql?("memoryRssBytes")) - if (timeDifferenceInMinutes >= 10) - telemetryProps = {} - telemetryProps['PodName'] = podName - telemetryProps['ContainerName'] = containerName - ApplicationInsightsUtility.sendMetricTelemetry(metricNametoReturn, metricValue, telemetryProps) - end - end - rescue => errorStr - $log.warn("Exception while generating Telemetry from getcontainerMemoryMetricItems failed: #{errorStr} for metric #{memoryMetricNameToCollect}") - end - end - end - end - # reset time outside pod iterator as we use one timer per metric for 2 pods (ds & rs) - if (timeDifferenceInMinutes >= 10 && metricNametoReturn.eql?("memoryRssBytes")) - @@telemetryMemoryMetricTimeTracker = DateTime.now.to_time.to_i - end - rescue => error - @Log.warn("getcontainerMemoryMetricItems failed: #{error} for metric #{memoryMetricNameToCollect}") - @Log.warn metricJSON - return metricItems - end - return metricItems - end + metricDataItems.push(getNodeLastRebootTimeMetric(metricInfo, hostName, "restartTimeEpoch")) + + # Disabling networkRxRate and networkTxRate since we dont use it as of now. + #metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "network", "rxBytes", "networkRxBytes")) + #metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "network", "txBytes", "networkTxBytes")) + # networkRxRate = getNodeMetricItemRate(metricInfo, hostName, "network", "rxBytes", "networkRxBytesPerSec") + # if networkRxRate && !networkRxRate.empty? && !networkRxRate.nil? + # metricDataItems.push(networkRxRate) + # end + # networkTxRate = getNodeMetricItemRate(metricInfo, hostName, "network", "txBytes", "networkTxBytesPerSec") + # if networkTxRate && !networkTxRate.empty? && !networkTxRate.nil? + # metricDataItems.push(networkTxRate) + # end + else + @Log.warn("Couldn't get metric information for host: #{hostName}") + end + rescue => error + @Log.warn("getContainerMetrics failed: #{error}") + return metricDataItems + end + return metricDataItems + end + + def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, metricNametoReturn) + metricItems = [] + clusterId = KubernetesApiClient.getClusterId + timeDifference = (DateTime.now.to_time.to_i - @@telemetryCpuMetricTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + begin + metricInfo = metricJSON + metricInfo["pods"].each do |pod| + podUid = pod["podRef"]["uid"] + podName = pod["podRef"]["name"] + podNamespace = pod["podRef"]["namespace"] + + if (!pod["containers"].nil?) + pod["containers"].each do |container| + #cpu metric + containerName = container["name"] + metricValue = container["cpu"][cpuMetricNameToCollect] + metricTime = container["cpu"]["time"] + metricItem = {} + metricItem["DataItems"] = [] - def getNodeMetricItem(metricJSON, hostName, metricCategory, metricNameToCollect, metricNametoReturn) - metricItem = {} - clusterId = KubernetesApiClient.getClusterId - begin - metricInfo = metricJSON - node = metricInfo['node'] - nodeName = node['nodeName'] - - - metricValue = node[metricCategory][metricNameToCollect] - metricTime = node[metricCategory]['time'] - - metricItem['DataItems'] = [] - - metricProps = {} - metricProps['Timestamp'] = metricTime - metricProps['Host'] = hostName - metricProps['ObjectName'] = "K8SNode" - metricProps['InstanceName'] = clusterId + "/" + nodeName - - metricProps['Collections'] = [] - metricCollections = {} - metricCollections['CounterName'] = metricNametoReturn - metricCollections['Value'] = metricValue - - metricProps['Collections'].push(metricCollections) - metricItem['DataItems'].push(metricProps) - - rescue => error - @Log.warn("getNodeMetricItem failed: #{error} for metric #{metricNameToCollect}") - @Log.warn metricJSON - return metricItem - end - return metricItem + metricProps = {} + metricProps["Timestamp"] = metricTime + metricProps["Host"] = hostName + metricProps["ObjectName"] = "K8SContainer" + metricProps["InstanceName"] = clusterId + "/" + podUid + "/" + containerName + + metricProps["Collections"] = [] + metricCollections = {} + metricCollections["CounterName"] = metricNametoReturn + metricCollections["Value"] = metricValue + + metricProps["Collections"].push(metricCollections) + metricItem["DataItems"].push(metricProps) + metricItems.push(metricItem) + #Telemetry about agent performance + begin + # we can only do this much now. Ideally would like to use the docker image repository to find our pods/containers + # cadvisor does not have pod/container metadata. so would need more work to cache as pv & use + if (podName.downcase.start_with?("omsagent-") && podNamespace.eql?("kube-system") && containerName.downcase.start_with?("omsagent") && metricNametoReturn.eql?("cpuUsageNanoCores")) + if (timeDifferenceInMinutes >= 10) + telemetryProps = {} + telemetryProps["PodName"] = podName + telemetryProps["ContainerName"] = containerName + telemetryProps["Computer"] = hostName + ApplicationInsightsUtility.sendMetricTelemetry(metricNametoReturn, metricValue, telemetryProps) + end end + rescue => errorStr + $log.warn("Exception while generating Telemetry from getcontainerCpuMetricItems failed: #{errorStr} for metric #{cpuMetricNameToCollect}") + end + end + end + end + # reset time outside pod iterator as we use one timer per metric for 2 pods (ds & rs) + if (timeDifferenceInMinutes >= 10 && metricNametoReturn.eql?("cpuUsageNanoCores")) + @@telemetryCpuMetricTimeTracker = DateTime.now.to_time.to_i + end + rescue => error + @Log.warn("getcontainerCpuMetricItems failed: #{error} for metric #{cpuMetricNameToCollect}") + return metricItems + end + return metricItems + end + + def clearDeletedWinContainersFromCache() + begin + winCpuUsageNanoSecondsKeys = @@winContainerCpuUsageNanoSecondsLast.keys + winCpuUsageNanoSecondsTimeKeys = @@winContainerCpuUsageNanoSecondsTimeLast.keys + + # Find the container ids to be deleted from cache + winContainersToBeCleared = winCpuUsageNanoSecondsKeys - @@winContainerIdCache + if winContainersToBeCleared.length > 0 + @Log.warn "Stale containers found in cache, clearing...: #{winContainersToBeCleared}" + end + winContainersToBeCleared.each do |containerId| + @@winContainerCpuUsageNanoSecondsLast.delete(containerId) + @@winContainerCpuUsageNanoSecondsTimeLast.delete(containerId) + end + rescue => errorStr + @Log.warn("clearDeletedWinContainersFromCache failed: #{errorStr}") + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + end + + def resetWinContainerIdCache + @@winContainerIdCache = [] + end + + # usageNanoCores doesnt exist for windows nodes. Hence need to compute this from usageCoreNanoSeconds + def getContainerCpuMetricItemRate(metricJSON, hostName, cpuMetricNameToCollect, metricNametoReturn) + metricItems = [] + clusterId = KubernetesApiClient.getClusterId + timeDifference = (DateTime.now.to_time.to_i - @@telemetryCpuMetricTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + @Log.warn "in host: #{hostName}" + begin + metricInfo = metricJSON + containerCount = 0 + metricInfo["pods"].each do |pod| + podUid = pod["podRef"]["uid"] + podName = pod["podRef"]["name"] + podNamespace = pod["podRef"]["namespace"] + + if (!pod["containers"].nil?) + pod["containers"].each do |container| + #cpu metric + containerCount += 1 + containerName = container["name"] + metricValue = container["cpu"][cpuMetricNameToCollect] + metricTime = container["cpu"]["time"] + metricItem = {} + metricItem["DataItems"] = [] + + metricProps = {} + metricProps["Timestamp"] = metricTime + metricProps["Host"] = hostName + metricProps["ObjectName"] = "K8SContainer" + metricProps["InstanceName"] = clusterId + "/" + podUid + "/" + containerName + + metricProps["Collections"] = [] + metricCollections = {} + metricCollections["CounterName"] = metricNametoReturn - def getNodeMetricItemRate(metricJSON, hostName, metricCategory, metricNameToCollect, metricNametoReturn) - metricItem = {} - clusterId = KubernetesApiClient.getClusterId - begin - - metricInfo = metricJSON - node = metricInfo['node'] - nodeName = node['nodeName'] - - metricValue = node[metricCategory][metricNameToCollect] - metricTime = node[metricCategory]['time'] - - if !(metricNameToCollect == "rxBytes" || metricNameToCollect == "txBytes" || metricNameToCollect == "usageCoreNanoSeconds" ) - @Log.warn("getNodeMetricItemRate : rateMetric is supported only for rxBytes, txBytes & usageCoreNanoSeconds and not for #{metricNameToCollect}") - return nil - elsif metricNameToCollect == "rxBytes" - if @@rxBytesLast.nil? || @@rxBytesTimeLast.nil? || @@rxBytesLast > metricValue #when kubelet is restarted the last condition will be true - @@rxBytesLast = metricValue - @@rxBytesTimeLast = metricTime - return nil - else - metricRateValue = ((metricValue - @@rxBytesLast) * 1.0)/(DateTime.parse(metricTime).to_time - DateTime.parse(@@rxBytesTimeLast).to_time) - @@rxBytesLast = metricValue - @@rxBytesTimeLast = metricTime - metricValue = metricRateValue - end - elsif metricNameToCollect == "txBytes" - if @@txBytesLast.nil? || @@txBytesTimeLast.nil? || @@txBytesLast > metricValue #when kubelet is restarted the last condition will be true - @@txBytesLast = metricValue - @@txBytesTimeLast = metricTime - return nil - else - metricRateValue = ((metricValue - @@txBytesLast) * 1.0)/(DateTime.parse(metricTime).to_time - DateTime.parse(@@txBytesTimeLast).to_time) - @@txBytesLast = metricValue - @@txBytesTimeLast = metricTime - metricValue = metricRateValue - end - else - if @@nodeCpuUsageNanoSecondsLast.nil? || @@nodeCpuUsageNanoSecondsTimeLast.nil? || @@nodeCpuUsageNanoSecondsLast > metricValue #when kubelet is restarted the last condition will be true - @@nodeCpuUsageNanoSecondsLast = metricValue - @@nodeCpuUsageNanoSecondsTimeLast = metricTime - return nil - else - metricRateValue = ((metricValue - @@nodeCpuUsageNanoSecondsLast) * 1.0)/(DateTime.parse(metricTime).to_time - DateTime.parse(@@nodeCpuUsageNanoSecondsTimeLast).to_time) - @@nodeCpuUsageNanoSecondsLast = metricValue - @@nodeCpuUsageNanoSecondsTimeLast = metricTime - metricValue = metricRateValue - end - end - - metricItem['DataItems'] = [] - - metricProps = {} - metricProps['Timestamp'] = metricTime - metricProps['Host'] = hostName - metricProps['ObjectName'] = "K8SNode" - metricProps['InstanceName'] = clusterId + "/" + nodeName - - metricProps['Collections'] = [] - metricCollections = {} - metricCollections['CounterName'] = metricNametoReturn - metricCollections['Value'] = metricValue - - metricProps['Collections'].push(metricCollections) - metricItem['DataItems'].push(metricProps) - - rescue => error - @Log.warn("getNodeMetricItemRate failed: #{error} for metric #{metricNameToCollect}") - @Log.warn metricJSON - return nil - end - return metricItem + containerId = podUid + "/" + containerName + # Adding the containers to the winContainerIdCache so that it can be used by the cleanup routine + # to clear the delted containers every 5 minutes + @@winContainerIdCache.push(containerId) + if @@winContainerCpuUsageNanoSecondsLast[containerId].nil? || @@winContainerCpuUsageNanoSecondsTimeLast[containerId].nil? || @@winContainerCpuUsageNanoSecondsLast[containerId] > metricValue #when kubelet is restarted the last condition will be true + @@winContainerCpuUsageNanoSecondsLast[containerId] = metricValue + @@winContainerCpuUsageNanoSecondsTimeLast[containerId] = metricTime + next + else + timeDifference = DateTime.parse(metricTime).to_time - DateTime.parse(@@winContainerCpuUsageNanoSecondsTimeLast[containerId]).to_time + containerCpuUsageDifference = metricValue - @@winContainerCpuUsageNanoSecondsLast[containerId] + # containerCpuUsageDifference check is added to make sure we report non zero values when cadvisor returns same values for subsequent calls + if timeDifference != 0 && containerCpuUsageDifference != 0 + metricRateValue = (containerCpuUsageDifference * 1.0) / timeDifference + else + @Log.info "container - cpu usage difference / time difference is 0, hence using previous cached value" + if !@@winContainerPrevMetricRate[containerId].nil? + metricRateValue = @@winContainerPrevMetricRate[containerId] + else + # This can happen when the metric value returns same values for subsequent calls when the plugin first starts + metricRateValue = 0 + end end + @@winContainerCpuUsageNanoSecondsLast[containerId] = metricValue + @@winContainerCpuUsageNanoSecondsTimeLast[containerId] = metricTime + metricValue = metricRateValue + @@winContainerPrevMetricRate[containerId] = metricRateValue + end - def getNodeLastRebootTimeMetric(metricJSON, hostName, metricNametoReturn) - metricItem = {} - clusterId = KubernetesApiClient.getClusterId - - begin - metricInfo = metricJSON - node = metricInfo['node'] - nodeName = node['nodeName'] - - - metricValue = node['startTime'] - metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z - - metricItem['DataItems'] = [] - - metricProps = {} - metricProps['Timestamp'] = metricTime - metricProps['Host'] = hostName - metricProps['ObjectName'] = "K8SNode" - metricProps['InstanceName'] = clusterId + "/" + nodeName - - metricProps['Collections'] = [] - metricCollections = {} - metricCollections['CounterName'] = metricNametoReturn - #Read it from /proc/uptime - metricCollections['Value'] = DateTime.parse(metricTime).to_time.to_i - IO.read("/proc/uptime").split[0].to_f - - metricProps['Collections'].push(metricCollections) - metricItem['DataItems'].push(metricProps) - - rescue => error - @Log.warn("getNodeLastRebootTimeMetric failed: #{error} ") - @Log.warn metricJSON - return metricItem - end - return metricItem + metricCollections["Value"] = metricValue + metricProps["Collections"].push(metricCollections) + metricItem["DataItems"].push(metricProps) + metricItems.push(metricItem) + end + end + end + #Sending ContainerInventoryTelemetry from replicaset for telemetry purposes + if @@nodeTelemetryTimeTracker[hostName].nil? + @@nodeTelemetryTimeTracker[hostName] = DateTime.now.to_time.to_i + else + timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker[hostName]).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 5) + @@nodeTelemetryTimeTracker[hostName] = DateTime.now.to_time.to_i + telemetryProperties = {} + telemetryProperties["Computer"] = hostName + telemetryProperties["ContainerCount"] = containerCount + # Hardcoding the event to ContainerInventory hearbeat event since the telemetry is pivoted off of this event. + @Log.info "sending container inventory heartbeat telemetry" + ApplicationInsightsUtility.sendCustomEvent("ContainerInventoryHeartBeatEvent", telemetryProperties) + end + end + rescue => error + @Log.warn("getcontainerCpuMetricItemRate failed: #{error} for metric #{cpuMetricNameToCollect}") + return metricItems + end + return metricItems + end + + def getContainerMemoryMetricItems(metricJSON, hostName, memoryMetricNameToCollect, metricNametoReturn) + metricItems = [] + clusterId = KubernetesApiClient.getClusterId + timeDifference = (DateTime.now.to_time.to_i - @@telemetryMemoryMetricTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + begin + metricInfo = metricJSON + metricInfo["pods"].each do |pod| + podUid = pod["podRef"]["uid"] + podName = pod["podRef"]["name"] + podNamespace = pod["podRef"]["namespace"] + if (!pod["containers"].nil?) + pod["containers"].each do |container| + containerName = container["name"] + metricValue = container["memory"][memoryMetricNameToCollect] + metricTime = container["memory"]["time"] + + metricItem = {} + metricItem["DataItems"] = [] + + metricProps = {} + metricProps["Timestamp"] = metricTime + metricProps["Host"] = hostName + metricProps["ObjectName"] = "K8SContainer" + metricProps["InstanceName"] = clusterId + "/" + podUid + "/" + containerName + + metricProps["Collections"] = [] + metricCollections = {} + metricCollections["CounterName"] = metricNametoReturn + metricCollections["Value"] = metricValue + + metricProps["Collections"].push(metricCollections) + metricItem["DataItems"].push(metricProps) + metricItems.push(metricItem) + #Telemetry about agent performance + begin + # we can only do this much now. Ideally would like to use the docker image repository to find our pods/containers + # cadvisor does not have pod/container metadata. so would need more work to cache as pv & use + if (podName.downcase.start_with?("omsagent-") && podNamespace.eql?("kube-system") && containerName.downcase.start_with?("omsagent") && metricNametoReturn.eql?("memoryRssBytes")) + if (timeDifferenceInMinutes >= 10) + telemetryProps = {} + telemetryProps["PodName"] = podName + telemetryProps["ContainerName"] = containerName + telemetryProps["Computer"] = hostName + ApplicationInsightsUtility.sendMetricTelemetry(metricNametoReturn, metricValue, telemetryProps) + end end + rescue => errorStr + $log.warn("Exception while generating Telemetry from getcontainerMemoryMetricItems failed: #{errorStr} for metric #{memoryMetricNameToCollect}") + end + end + end + end + # reset time outside pod iterator as we use one timer per metric for 2 pods (ds & rs) + if (timeDifferenceInMinutes >= 10 && metricNametoReturn.eql?("memoryRssBytes")) + @@telemetryMemoryMetricTimeTracker = DateTime.now.to_time.to_i + end + rescue => error + @Log.warn("getcontainerMemoryMetricItems failed: #{error} for metric #{memoryMetricNameToCollect}") + @Log.warn metricJSON + return metricItems + end + return metricItems + end + + def getNodeMetricItem(metricJSON, hostName, metricCategory, metricNameToCollect, metricNametoReturn) + metricItem = {} + clusterId = KubernetesApiClient.getClusterId + begin + metricInfo = metricJSON + node = metricInfo["node"] + nodeName = node["nodeName"] + + if !node[metricCategory].nil? + metricValue = node[metricCategory][metricNameToCollect] + metricTime = node[metricCategory]["time"] + + metricItem["DataItems"] = [] + + metricProps = {} + metricProps["Timestamp"] = metricTime + metricProps["Host"] = hostName + metricProps["ObjectName"] = "K8SNode" + metricProps["InstanceName"] = clusterId + "/" + nodeName + + metricProps["Collections"] = [] + metricCollections = {} + metricCollections["CounterName"] = metricNametoReturn + metricCollections["Value"] = metricValue + + metricProps["Collections"].push(metricCollections) + metricItem["DataItems"].push(metricProps) + end + rescue => error + @Log.warn("getNodeMetricItem failed: #{error} for metric #{metricNameToCollect}") + @Log.warn metricJSON + return metricItem + end + return metricItem + end + + def getNodeMetricItemRate(metricJSON, hostName, metricCategory, metricNameToCollect, metricNametoReturn, operatingSystem) + metricItem = {} + clusterId = KubernetesApiClient.getClusterId + begin + metricInfo = metricJSON + node = metricInfo["node"] + nodeName = node["nodeName"] - def getContainerStartTimeMetricItems(metricJSON, hostName, metricNametoReturn) - metricItems = [] - clusterId = KubernetesApiClient.getClusterId - currentTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z - begin - metricInfo = metricJSON - metricInfo['pods'].each do |pod| - podUid = pod['podRef']['uid'] - if (!pod['containers'].nil?) - pod['containers'].each do |container| - containerName = container['name'] - metricValue = container['startTime'] - metricTime = currentTime - - metricItem = {} - metricItem['DataItems'] = [] - - metricProps = {} - metricProps['Timestamp'] = metricTime - metricProps['Host'] = hostName - metricProps['ObjectName'] = "K8SContainer" - metricProps['InstanceName'] = clusterId + "/" + podUid + "/" + containerName - - metricProps['Collections'] = [] - metricCollections = {} - metricCollections['CounterName'] = metricNametoReturn - metricCollections['Value'] = DateTime.parse(metricValue).to_time.to_i - - metricProps['Collections'].push(metricCollections) - metricItem['DataItems'].push(metricProps) - metricItems.push(metricItem) - end - end - end - rescue => error - @Log.warn("getContainerStartTimeMetric failed: #{error} for metric #{metricNametoReturn}") - @Log.warn metricJSON - return metricItems - end - return metricItems + if !node[metricCategory].nil? + metricValue = node[metricCategory][metricNameToCollect] + metricTime = node[metricCategory]["time"] + + # if !(metricNameToCollect == "rxBytes" || metricNameToCollect == "txBytes" || metricNameToCollect == "usageCoreNanoSeconds") + # @Log.warn("getNodeMetricItemRate : rateMetric is supported only for rxBytes, txBytes & usageCoreNanoSeconds and not for #{metricNameToCollect}") + if !(metricNameToCollect == "usageCoreNanoSeconds") + @Log.warn("getNodeMetricItemRate : rateMetric is supported only for usageCoreNanoSeconds and not for #{metricNameToCollect}") + return nil + # elsif metricNameToCollect == "rxBytes" + # if @@rxBytesLast.nil? || @@rxBytesTimeLast.nil? || @@rxBytesLast > metricValue #when kubelet is restarted the last condition will be true + # @@rxBytesLast = metricValue + # @@rxBytesTimeLast = metricTime + # return nil + # else + # metricRateValue = ((metricValue - @@rxBytesLast) * 1.0) / (DateTime.parse(metricTime).to_time - DateTime.parse(@@rxBytesTimeLast).to_time) + # @@rxBytesLast = metricValue + # @@rxBytesTimeLast = metricTime + # metricValue = metricRateValue + # end + # elsif metricNameToCollect == "txBytes" + # if @@txBytesLast.nil? || @@txBytesTimeLast.nil? || @@txBytesLast > metricValue #when kubelet is restarted the last condition will be true + # @@txBytesLast = metricValue + # @@txBytesTimeLast = metricTime + # return nil + # else + # metricRateValue = ((metricValue - @@txBytesLast) * 1.0) / (DateTime.parse(metricTime).to_time - DateTime.parse(@@txBytesTimeLast).to_time) + # @@txBytesLast = metricValue + # @@txBytesTimeLast = metricTime + # metricValue = metricRateValue + # end + else + if operatingSystem == "Linux" + if @@nodeCpuUsageNanoSecondsLast.nil? || @@nodeCpuUsageNanoSecondsTimeLast.nil? || @@nodeCpuUsageNanoSecondsLast > metricValue #when kubelet is restarted the last condition will be true + @@nodeCpuUsageNanoSecondsLast = metricValue + @@nodeCpuUsageNanoSecondsTimeLast = metricTime + return nil + else + timeDifference = DateTime.parse(metricTime).to_time - DateTime.parse(@@nodeCpuUsageNanoSecondsTimeLast).to_time + nodeCpuUsageDifference = metricValue - @@nodeCpuUsageNanoSecondsLast + # nodeCpuUsageDifference check is added to make sure we report non zero values when cadvisor returns same values for subsequent calls + if timeDifference != 0 && nodeCpuUsageDifference != 0 + metricRateValue = (nodeCpuUsageDifference * 1.0) / timeDifference + else + @Log.info "linux node - cpu usage difference / time difference is 0, hence using previous cached value" + if !@@linuxNodePrevMetricRate.nil? + metricRateValue = @@linuxNodePrevMetricRate + else + # This can happen when the metric value returns same values for subsequent calls when the plugin first starts + metricRateValue = 0 + end + end + @@nodeCpuUsageNanoSecondsLast = metricValue + @@nodeCpuUsageNanoSecondsTimeLast = metricTime + @@linuxNodePrevMetricRate = metricRateValue + metricValue = metricRateValue + end + elsif operatingSystem == "Windows" + # Using the hash for windows nodes since this is running in replica set and there can be multiple nodes + if @@winNodeCpuUsageNanoSecondsLast[hostName].nil? || @@winNodeCpuUsageNanoSecondsTimeLast[hostName].nil? || @@winNodeCpuUsageNanoSecondsLast[hostName] > metricValue #when kubelet is restarted the last condition will be true + @@winNodeCpuUsageNanoSecondsLast[hostName] = metricValue + @@winNodeCpuUsageNanoSecondsTimeLast[hostName] = metricTime + return nil + else + timeDifference = DateTime.parse(metricTime).to_time - DateTime.parse(@@winNodeCpuUsageNanoSecondsTimeLast[hostName]).to_time + nodeCpuUsageDifference = metricValue - @@winNodeCpuUsageNanoSecondsLast[hostName] + # nodeCpuUsageDifference check is added to make sure we report non zero values when cadvisor returns same values for subsequent calls + if timeDifference != 0 && nodeCpuUsageDifference != 0 + metricRateValue = (nodeCpuUsageDifference * 1.0) / timeDifference + else + @Log.info "windows node - cpu usage difference / time difference is 0, hence using previous cached value" + if !@@winNodePrevMetricRate[hostName].nil? + metricRateValue = @@winNodePrevMetricRate[hostName] + else + # This can happen when the metric value returns same values for subsequent calls when the plugin first starts + metricRateValue = 0 + end end + @@winNodeCpuUsageNanoSecondsLast[hostName] = metricValue + @@winNodeCpuUsageNanoSecondsTimeLast[hostName] = metricTime + @@winNodePrevMetricRate[hostName] = metricRateValue + metricValue = metricRateValue + end + end + end + metricItem["DataItems"] = [] + + metricProps = {} + metricProps["Timestamp"] = metricTime + metricProps["Host"] = hostName + metricProps["ObjectName"] = "K8SNode" + metricProps["InstanceName"] = clusterId + "/" + nodeName + + metricProps["Collections"] = [] + metricCollections = {} + metricCollections["CounterName"] = metricNametoReturn + metricCollections["Value"] = metricValue + + metricProps["Collections"].push(metricCollections) + metricItem["DataItems"].push(metricProps) + end + rescue => error + @Log.warn("getNodeMetricItemRate failed: #{error} for metric #{metricNameToCollect}") + @Log.warn metricJSON + return nil + end + return metricItem + end + + def getNodeLastRebootTimeMetric(metricJSON, hostName, metricNametoReturn) + metricItem = {} + clusterId = KubernetesApiClient.getClusterId + + begin + metricInfo = metricJSON + node = metricInfo["node"] + nodeName = node["nodeName"] + + metricValue = node["startTime"] + metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z + + metricItem["DataItems"] = [] + + metricProps = {} + metricProps["Timestamp"] = metricTime + metricProps["Host"] = hostName + metricProps["ObjectName"] = "K8SNode" + metricProps["InstanceName"] = clusterId + "/" + nodeName + + metricProps["Collections"] = [] + metricCollections = {} + metricCollections["CounterName"] = metricNametoReturn + #Read it from /proc/uptime + metricCollections["Value"] = DateTime.parse(metricTime).to_time.to_i - IO.read("/proc/uptime").split[0].to_f + + metricProps["Collections"].push(metricCollections) + metricItem["DataItems"].push(metricProps) + rescue => error + @Log.warn("getNodeLastRebootTimeMetric failed: #{error} ") + @Log.warn metricJSON + return metricItem + end + return metricItem + end + + def getContainerStartTimeMetricItems(metricJSON, hostName, metricNametoReturn) + metricItems = [] + clusterId = KubernetesApiClient.getClusterId + currentTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z + begin + metricInfo = metricJSON + metricInfo["pods"].each do |pod| + podUid = pod["podRef"]["uid"] + if (!pod["containers"].nil?) + pod["containers"].each do |container| + containerName = container["name"] + metricValue = container["startTime"] + metricTime = currentTime + + metricItem = {} + metricItem["DataItems"] = [] + + metricProps = {} + metricProps["Timestamp"] = metricTime + metricProps["Host"] = hostName + metricProps["ObjectName"] = "K8SContainer" + metricProps["InstanceName"] = clusterId + "/" + podUid + "/" + containerName + + metricProps["Collections"] = [] + metricCollections = {} + metricCollections["CounterName"] = metricNametoReturn + metricCollections["Value"] = DateTime.parse(metricValue).to_time.to_i + + metricProps["Collections"].push(metricCollections) + metricItem["DataItems"].push(metricProps) + metricItems.push(metricItem) end + end end + rescue => error + @Log.warn("getContainerStartTimeMetric failed: #{error} for metric #{metricNametoReturn}") + @Log.warn metricJSON + return metricItems + end + return metricItems + end + end +end diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index a1e143b15..4ed85025f 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -2,474 +2,516 @@ # frozen_string_literal: true class KubernetesApiClient + require "json" + require "logger" + require "net/http" + require "net/https" + require "uri" + require "time" - require 'json' - require 'logger' - require 'net/http' - require 'net/https' - require 'uri' - require 'time' - - require_relative 'oms_common' - - @@ApiVersion = "v1" - @@CaFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" - @@ClusterName = nil - @@ClusterId = nil - @@IsNodeMaster = nil - #@@IsValidRunningNode = nil - #@@IsLinuxCluster = nil - @@KubeSystemNamespace = "kube-system" - @LogPath = "/var/opt/microsoft/docker-cimprov/log/kubernetes_client_log.txt" - @Log = Logger.new(@LogPath, 2, 10*1048576) #keep last 2 files, max log file size = 10M - @@TokenFileName = "/var/run/secrets/kubernetes.io/serviceaccount/token" - @@TokenStr = nil - @@NodeMetrics = Hash.new - - def initialize + require_relative "oms_common" + + @@ApiVersion = "v1" + @@CaFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + @@ClusterName = nil + @@ClusterId = nil + @@IsNodeMaster = nil + #@@IsValidRunningNode = nil + #@@IsLinuxCluster = nil + @@KubeSystemNamespace = "kube-system" + @LogPath = "/var/opt/microsoft/docker-cimprov/log/kubernetes_client_log.txt" + @Log = Logger.new(@LogPath, 2, 10 * 1048576) #keep last 2 files, max log file size = 10M + @@TokenFileName = "/var/run/secrets/kubernetes.io/serviceaccount/token" + @@TokenStr = nil + @@NodeMetrics = Hash.new + @@WinNodeArray = [] + + def initialize + end + + class << self + def getKubeResourceInfo(resource) + headers = {} + response = nil + @Log.info "Getting Kube resource" + @Log.info resource + begin + resourceUri = getResourceUri(resource) + if !resourceUri.nil? + uri = URI.parse(resourceUri) + http = Net::HTTP.new(uri.host, uri.port) + http.use_ssl = true + if !File.exist?(@@CaFile) + raise "#{@@CaFile} doesnt exist" + else + http.ca_file = @@CaFile if File.exist?(@@CaFile) + end + http.verify_mode = OpenSSL::SSL::VERIFY_PEER + + kubeApiRequest = Net::HTTP::Get.new(uri.request_uri) + kubeApiRequest["Authorization"] = "Bearer " + getTokenStr + @Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}" + response = http.request(kubeApiRequest) + @Log.info "KubernetesAPIClient::getKubeResourceInfo : Got response of #{response.code} for #{uri.request_uri} @ #{Time.now.utc.iso8601}" end + rescue => error + @Log.warn("kubernetes api request failed: #{error} for #{resource} @ #{Time.now.utc.iso8601}") + end + if (response.body.empty?) + @Log.warn("KubernetesAPIClient::getKubeResourceInfo : Got empty response from Kube API for #{resource} @ #{Time.now.utc.iso8601}") + end + return response + end - class << self - def getKubeResourceInfo(resource) - headers = {} - response = nil - @Log.info 'Getting Kube resource' - @Log.info resource - begin - resourceUri = getResourceUri(resource) - if !resourceUri.nil? - uri = URI.parse(resourceUri) - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = true - if !File.exist?(@@CaFile) - raise "#{@@CaFile} doesnt exist" - else - http.ca_file = @@CaFile if File.exist?(@@CaFile) - end - http.verify_mode = OpenSSL::SSL::VERIFY_PEER - - kubeApiRequest = Net::HTTP::Get.new(uri.request_uri) - kubeApiRequest['Authorization'] = "Bearer " + getTokenStr - @Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}" - response = http.request(kubeApiRequest) - @Log.info "KubernetesAPIClient::getKubeResourceInfo : Got response of #{response.code} for #{uri.request_uri} @ #{Time.now.utc.iso8601}" - end - rescue => error - @Log.warn("kubernetes api request failed: #{error} for #{resource} @ #{Time.now.utc.iso8601}") - end - if (response.body.empty?) - @Log.warn("KubernetesAPIClient::getKubeResourceInfo : Got empty response from Kube API for #{resource} @ #{Time.now.utc.iso8601}") - end - return response - end + def getTokenStr + return @@TokenStr if !@@TokenStr.nil? + begin + if File.exist?(@@TokenFileName) && File.readable?(@@TokenFileName) + @@TokenStr = File.read(@@TokenFileName).strip + return @@TokenStr + else + @Log.warn("Unable to read token string from #{@@TokenFileName}: #{error}") + return nil + end + end + end - def getTokenStr - return @@TokenStr if !@@TokenStr.nil? - begin - if File.exist?(@@TokenFileName) && File.readable?(@@TokenFileName) - @@TokenStr = File.read(@@TokenFileName).strip - return @@TokenStr - else - @Log.warn("Unable to read token string from #{@@TokenFileName}: #{error}") - return nil - end - end - end + def getResourceUri(resource) + begin + if ENV["KUBERNETES_SERVICE_HOST"] && ENV["KUBERNETES_PORT_443_TCP_PORT"] + return "https://#{ENV["KUBERNETES_SERVICE_HOST"]}:#{ENV["KUBERNETES_PORT_443_TCP_PORT"]}/api/" + @@ApiVersion + "/" + resource + else + @Log.warn ("Kubernetes environment variable not set KUBERNETES_SERVICE_HOST: #{ENV["KUBERNETES_SERVICE_HOST"]} KUBERNETES_PORT_443_TCP_PORT: #{ENV["KUBERNETES_PORT_443_TCP_PORT"]}. Unable to form resourceUri") + return nil + end + end + end - def getResourceUri(resource) - begin - if ENV['KUBERNETES_SERVICE_HOST'] && ENV['KUBERNETES_PORT_443_TCP_PORT'] - return "https://#{ENV['KUBERNETES_SERVICE_HOST']}:#{ENV['KUBERNETES_PORT_443_TCP_PORT']}/api/" + @@ApiVersion + "/" + resource - else - @Log.warn ("Kubernetes environment variable not set KUBERNETES_SERVICE_HOST: #{ENV['KUBERNETES_SERVICE_HOST']} KUBERNETES_PORT_443_TCP_PORT: #{ENV['KUBERNETES_PORT_443_TCP_PORT']}. Unable to form resourceUri") - return nil - end + def getClusterName + return @@ClusterName if !@@ClusterName.nil? + @@ClusterName = "None" + begin + #try getting resource ID for aks + cluster = ENV["AKS_RESOURCE_ID"] + if cluster && !cluster.nil? && !cluster.empty? + @@ClusterName = cluster.split("/").last + else + cluster = ENV["ACS_RESOURCE_NAME"] + if cluster && !cluster.nil? && !cluster.empty? + @@ClusterName = cluster + else + kubesystemResourceUri = "namespaces/" + @@KubeSystemNamespace + "/pods" + @Log.info("KubernetesApiClient::getClusterName : Getting pods from Kube API @ #{Time.now.utc.iso8601}") + podInfo = JSON.parse(getKubeResourceInfo(kubesystemResourceUri).body) + @Log.info("KubernetesApiClient::getClusterName : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") + podInfo["items"].each do |items| + if items["metadata"]["name"].include? "kube-controller-manager" + items["spec"]["containers"][0]["command"].each do |command| + if command.include? "--cluster-name" + @@ClusterName = command.split("=")[1] + end end + end end + end + end + rescue => error + @Log.warn("getClusterName failed: #{error}") + end + return @@ClusterName + end - def getClusterName - return @@ClusterName if !@@ClusterName.nil? - @@ClusterName = "None" - begin - #try getting resource ID for aks - cluster = ENV['AKS_RESOURCE_ID'] - if cluster && !cluster.nil? && !cluster.empty? - @@ClusterName = cluster.split("/").last - else - cluster = ENV['ACS_RESOURCE_NAME'] - if cluster && !cluster.nil? && !cluster.empty? - @@ClusterName = cluster - else - kubesystemResourceUri = "namespaces/" + @@KubeSystemNamespace + "/pods" - @Log.info("KubernetesApiClient::getClusterName : Getting pods from Kube API @ #{Time.now.utc.iso8601}") - podInfo = JSON.parse(getKubeResourceInfo(kubesystemResourceUri).body) - @Log.info("KubernetesApiClient::getClusterName : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") - podInfo['items'].each do |items| - if items['metadata']['name'].include? "kube-controller-manager" - items['spec']['containers'][0]['command'].each do |command| - if command.include? "--cluster-name" - @@ClusterName = command.split('=')[1] - end - end - end - end - end - end - rescue => error - @Log.warn("getClusterName failed: #{error}") - end - return @@ClusterName - end + def getClusterId + return @@ClusterId if !@@ClusterId.nil? + #By default initialize ClusterId to ClusterName. + # In ACS/On-prem, we need to figure out how we can generate ClusterId + @@ClusterId = getClusterName + begin + cluster = ENV["AKS_RESOURCE_ID"] + if cluster && !cluster.nil? && !cluster.empty? + @@ClusterId = cluster + end + rescue => error + @Log.warn("getClusterId failed: #{error}") + end + return @@ClusterId + end - def getClusterId - return @@ClusterId if !@@ClusterId.nil? - #By default initialize ClusterId to ClusterName. - # In ACS/On-prem, we need to figure out how we can generate ClusterId - @@ClusterId = getClusterName - begin - cluster = ENV['AKS_RESOURCE_ID'] - if cluster && !cluster.nil? && !cluster.empty? - @@ClusterId = cluster - end - rescue => error - @Log.warn("getClusterId failed: #{error}") - end - return @@ClusterId + def isNodeMaster + return @@IsNodeMaster if !@@IsNodeMaster.nil? + @@IsNodeMaster = false + begin + @Log.info("KubernetesApiClient::isNodeMaster : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") + allNodesInfo = JSON.parse(getKubeResourceInfo("nodes").body) + @Log.info("KubernetesApiClient::isNodeMaster : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + if !allNodesInfo.nil? && !allNodesInfo.empty? + thisNodeName = OMS::Common.get_hostname + allNodesInfo["items"].each do |item| + if item["metadata"]["name"].casecmp(thisNodeName) == 0 + if item["metadata"]["labels"]["kubernetes.io/role"].to_s.include?("master") || item["metadata"]["labels"]["role"].to_s.include?("master") + @@IsNodeMaster = true + end + break end + end + end + rescue => error + @Log.warn("KubernetesApiClient::isNodeMaster : node role request failed: #{error}") + end - def isNodeMaster - return @@IsNodeMaster if !@@IsNodeMaster.nil? - @@IsNodeMaster = false - begin - @Log.info("KubernetesApiClient::isNodeMaster : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") - allNodesInfo = JSON.parse(getKubeResourceInfo('nodes').body) - @Log.info("KubernetesApiClient::isNodeMaster : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") - if !allNodesInfo.nil? && !allNodesInfo.empty? - thisNodeName = OMS::Common.get_hostname - allNodesInfo['items'].each do |item| - if item['metadata']['name'].casecmp(thisNodeName) == 0 - if item['metadata']['labels']["kubernetes.io/role"].to_s.include?("master") || item['metadata']['labels']["role"].to_s.include?("master") - @@IsNodeMaster = true - end - break - end - end - end - rescue => error - @Log.warn("KubernetesApiClient::isNodeMaster : node role request failed: #{error}") - end - - return @@IsNodeMaster - end + return @@IsNodeMaster + end - #def isValidRunningNode - # return @@IsValidRunningNode if !@@IsValidRunningNode.nil? - # @@IsValidRunningNode = false - # begin - # thisNodeName = OMS::Common.get_hostname - # if isLinuxCluster - # # Run on agent node [0] - # @@IsValidRunningNode = !isNodeMaster && thisNodeName.to_s.split('-').last == '0' - # else - # # Run on master node [0] - # @@IsValidRunningNode = isNodeMaster && thisNodeName.to_s.split('-').last == '0' - # end - # rescue => error - # @Log.warn("Checking Node Type failed: #{error}") - # end - # if(@@IsValidRunningNode == true) - # @Log.info("Electing current node to talk to k8 api") - # else - # @Log.info("Not Electing current node to talk to k8 api") - # end - # return @@IsValidRunningNode - #end - - #def isLinuxCluster - # return @@IsLinuxCluster if !@@IsLinuxCluster.nil? - # @@IsLinuxCluster = true - # begin - # @Log.info("KubernetesApiClient::isLinuxCluster : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") - # allNodesInfo = JSON.parse(getKubeResourceInfo('nodes').body) - # @Log.info("KubernetesApiClient::isLinuxCluster : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") - # if !allNodesInfo.nil? && !allNodesInfo.empty? - # allNodesInfo['items'].each do |item| - # if !(item['status']['nodeInfo']['operatingSystem'].casecmp('linux') == 0) - # @@IsLinuxCluster = false - # break - # end - # end - # end - # rescue => error - # @Log.warn("KubernetesApiClient::isLinuxCluster : node role request failed: #{error}") - # end - # return @@IsLinuxCluster - #end - - # returns an arry of pods (json) - def getPods(namespace) - pods = [] - begin - kubesystemResourceUri = "namespaces/" + namespace + "/pods" - podInfo = JSON.parse(getKubeResourceInfo(kubesystemResourceUri).body) - podInfo['items'].each do |items| - pods.push items - end - rescue => error - @Log.warn("List pods request failed: #{error}") - end - return pods - end + #def isValidRunningNode + # return @@IsValidRunningNode if !@@IsValidRunningNode.nil? + # @@IsValidRunningNode = false + # begin + # thisNodeName = OMS::Common.get_hostname + # if isLinuxCluster + # # Run on agent node [0] + # @@IsValidRunningNode = !isNodeMaster && thisNodeName.to_s.split('-').last == '0' + # else + # # Run on master node [0] + # @@IsValidRunningNode = isNodeMaster && thisNodeName.to_s.split('-').last == '0' + # end + # rescue => error + # @Log.warn("Checking Node Type failed: #{error}") + # end + # if(@@IsValidRunningNode == true) + # @Log.info("Electing current node to talk to k8 api") + # else + # @Log.info("Not Electing current node to talk to k8 api") + # end + # return @@IsValidRunningNode + #end + + #def isLinuxCluster + # return @@IsLinuxCluster if !@@IsLinuxCluster.nil? + # @@IsLinuxCluster = true + # begin + # @Log.info("KubernetesApiClient::isLinuxCluster : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") + # allNodesInfo = JSON.parse(getKubeResourceInfo('nodes').body) + # @Log.info("KubernetesApiClient::isLinuxCluster : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + # if !allNodesInfo.nil? && !allNodesInfo.empty? + # allNodesInfo['items'].each do |item| + # if !(item['status']['nodeInfo']['operatingSystem'].casecmp('linux') == 0) + # @@IsLinuxCluster = false + # break + # end + # end + # end + # rescue => error + # @Log.warn("KubernetesApiClient::isLinuxCluster : node role request failed: #{error}") + # end + # return @@IsLinuxCluster + #end + + # returns an arry of pods (json) + def getPods(namespace) + pods = [] + begin + kubesystemResourceUri = "namespaces/" + namespace + "/pods" + podInfo = JSON.parse(getKubeResourceInfo(kubesystemResourceUri).body) + podInfo["items"].each do |items| + pods.push items + end + rescue => error + @Log.warn("List pods request failed: #{error}") + end + return pods + end - def getContainerIDs(namespace) - containers = Hash.new - begin - kubesystemResourceUri = "namespaces/" + namespace + "/pods" - @Log.info("KubernetesApiClient::getContainerIDs : Getting pods from Kube API @ #{Time.now.utc.iso8601}") - podInfo = JSON.parse(getKubeResourceInfo(kubesystemResourceUri).body) - @Log.info("KubernetesApiClient::getContainerIDs : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") - podInfo['items'].each do |item| - if (!item['status'].nil? && !item['status'].empty? && !item['status']['containerStatuses'].nil? && !item['status']['containerStatuses'].empty?) - item['status']['containerStatuses'].each do |cntr| - containers[cntr['containerID']] = "kube-system" - end - end - end - rescue => error - @Log.warn("KubernetesApiClient::getContainerIDs : List ContainerIDs request failed: #{error}") + # returns a hash of windows node names and their internal IPs + def getWindowsNodes + winNodes = [] + begin + nodeInventory = JSON.parse(getKubeResourceInfo("nodes").body) + @Log.info "KubernetesAPIClient::getWindowsNodes : Got nodes from kube api" + # Resetting the windows node cache + @@WinNodeArray.clear + if (!nodeInventory.empty?) + nodeInventory["items"].each do |item| + # check for windows operating system in node metadata + winNode = {} + nodeStatus = item["status"] + nodeMetadata = item["metadata"] + if !nodeStatus.nil? && !nodeStatus["nodeInfo"].nil? && !nodeStatus["nodeInfo"]["operatingSystem"].nil? + operatingSystem = nodeStatus["nodeInfo"]["operatingSystem"] + if (operatingSystem.is_a?(String) && operatingSystem.casecmp("windows") == 0) + # Adding windows nodes to winNodeArray so that it can be used in kubepodinventory to send ContainerInventory data + # to get images and image tags for containers in windows nodes + if !nodeMetadata.nil? && !nodeMetadata["name"].nil? + @@WinNodeArray.push(nodeMetadata["name"]) end - return containers + nodeStatusAddresses = nodeStatus["addresses"] + if !nodeStatusAddresses.nil? + nodeStatusAddresses.each do |address| + winNode[address["type"]] = address["address"] + end + winNodes.push(winNode) + end + end end + end + end + return winNodes + rescue => error + @Log.warn("Error in get windows nodes: #{error}") + return nil + end + end - def getContainerLogs(namespace, pod, container, showTimeStamp) - containerLogs = "" - begin - kubesystemResourceUri = "namespaces/" + namespace + "/pods/" + pod + "/log" + "?container=" + container - if showTimeStamp - kubesystemResourceUri += "×tamps=true" - end - @Log.info("KubernetesApiClient::getContainerLogs : Getting logs from Kube API @ #{Time.now.utc.iso8601}") - containerLogs = getKubeResourceInfo(kubesystemResourceUri).body - @Log.info("KubernetesApiClient::getContainerLogs : Done getting logs from Kube API @ #{Time.now.utc.iso8601}") - rescue => error - @Log.warn("Pod logs request failed: #{error}") - end - return containerLogs + def getWindowsNodesArray + return @@WinNodeArray + end + + def getContainerIDs(namespace) + containers = Hash.new + begin + kubesystemResourceUri = "namespaces/" + namespace + "/pods" + @Log.info("KubernetesApiClient::getContainerIDs : Getting pods from Kube API @ #{Time.now.utc.iso8601}") + podInfo = JSON.parse(getKubeResourceInfo(kubesystemResourceUri).body) + @Log.info("KubernetesApiClient::getContainerIDs : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") + podInfo["items"].each do |item| + if (!item["status"].nil? && !item["status"].empty? && !item["status"]["containerStatuses"].nil? && !item["status"]["containerStatuses"].empty?) + item["status"]["containerStatuses"].each do |cntr| + containers[cntr["containerID"]] = "kube-system" end + end + end + rescue => error + @Log.warn("KubernetesApiClient::getContainerIDs : List ContainerIDs request failed: #{error}") + end + return containers + end + + def getContainerLogs(namespace, pod, container, showTimeStamp) + containerLogs = "" + begin + kubesystemResourceUri = "namespaces/" + namespace + "/pods/" + pod + "/log" + "?container=" + container + if showTimeStamp + kubesystemResourceUri += "×tamps=true" + end + @Log.info("KubernetesApiClient::getContainerLogs : Getting logs from Kube API @ #{Time.now.utc.iso8601}") + containerLogs = getKubeResourceInfo(kubesystemResourceUri).body + @Log.info("KubernetesApiClient::getContainerLogs : Done getting logs from Kube API @ #{Time.now.utc.iso8601}") + rescue => error + @Log.warn("Pod logs request failed: #{error}") + end + return containerLogs + end + + def getContainerLogsSinceTime(namespace, pod, container, since, showTimeStamp) + containerLogs = "" + begin + kubesystemResourceUri = "namespaces/" + namespace + "/pods/" + pod + "/log" + "?container=" + container + "&sinceTime=" + since + kubesystemResourceUri = URI.escape(kubesystemResourceUri, ":.+") # HTML URL Encoding for date + + if showTimeStamp + kubesystemResourceUri += "×tamps=true" + end + @Log.info("calling #{kubesystemResourceUri}") + @Log.info("KubernetesApiClient::getContainerLogsSinceTime : Getting logs from Kube API @ #{Time.now.utc.iso8601}") + containerLogs = getKubeResourceInfo(kubesystemResourceUri).body + @Log.info("KubernetesApiClient::getContainerLogsSinceTime : Done getting logs from Kube API @ #{Time.now.utc.iso8601}") + rescue => error + @Log.warn("Pod logs request failed: #{error}") + end + return containerLogs + end - def getContainerLogsSinceTime(namespace, pod, container, since, showTimeStamp) - containerLogs = "" - begin - kubesystemResourceUri = "namespaces/" + namespace + "/pods/" + pod + "/log" + "?container=" + container + "&sinceTime=" + since - kubesystemResourceUri = URI.escape(kubesystemResourceUri, ":.+") # HTML URL Encoding for date - - if showTimeStamp - kubesystemResourceUri += "×tamps=true" - end - @Log.info("calling #{kubesystemResourceUri}") - @Log.info("KubernetesApiClient::getContainerLogsSinceTime : Getting logs from Kube API @ #{Time.now.utc.iso8601}") - containerLogs = getKubeResourceInfo(kubesystemResourceUri).body - @Log.info("KubernetesApiClient::getContainerLogsSinceTime : Done getting logs from Kube API @ #{Time.now.utc.iso8601}") - rescue => error - @Log.warn("Pod logs request failed: #{error}") + def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn) + metricItems = [] + begin + clusterId = getClusterId + metricInfo = metricJSON + metricInfo["items"].each do |pod| + podNameSpace = pod["metadata"]["namespace"] + if podNameSpace.eql?("kube-system") && !pod["metadata"].key?("ownerReferences") + # The above case seems to be the only case where you have horizontal scaling of pods + # but no controller, in which case cAdvisor picks up kubernetes.io/config.hash + # instead of the actual poduid. Since this uid is not being surface into the UX + # its ok to use this. + # Use kubernetes.io/config.hash to be able to correlate with cadvisor data + podUid = pod["metadata"]["annotations"]["kubernetes.io/config.hash"] + else + podUid = pod["metadata"]["uid"] + end + if (!pod["spec"]["containers"].nil? && !pod["spec"]["nodeName"].nil?) + nodeName = pod["spec"]["nodeName"] + pod["spec"]["containers"].each do |container| + containerName = container["name"] + metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z + if (!container["resources"].nil? && !container["resources"].empty? && !container["resources"][metricCategory].nil? && !container["resources"][metricCategory][metricNameToCollect].nil?) + metricValue = getMetricNumericValue(metricNameToCollect, container["resources"][metricCategory][metricNameToCollect]) + + metricItem = {} + metricItem["DataItems"] = [] + + metricProps = {} + metricProps["Timestamp"] = metricTime + metricProps["Host"] = nodeName + metricProps["ObjectName"] = "K8SContainer" + metricProps["InstanceName"] = clusterId + "/" + podUid + "/" + containerName + + metricProps["Collections"] = [] + metricCollections = {} + metricCollections["CounterName"] = metricNametoReturn + metricCollections["Value"] = metricValue + + metricProps["Collections"].push(metricCollections) + metricItem["DataItems"].push(metricProps) + metricItems.push(metricItem) + #No container level limit for the given metric, so default to node level limit + else + nodeMetricsHashKey = clusterId + "/" + nodeName + "_" + "allocatable" + "_" + metricNameToCollect + if (metricCategory == "limits" && @@NodeMetrics.has_key?(nodeMetricsHashKey)) + metricValue = @@NodeMetrics[nodeMetricsHashKey] + #@Log.info("Limits not set for container #{clusterId + "/" + podUid + "/" + containerName} using node level limits: #{nodeMetricsHashKey}=#{metricValue} ") + metricItem = {} + metricItem["DataItems"] = [] + + metricProps = {} + metricProps["Timestamp"] = metricTime + metricProps["Host"] = nodeName + metricProps["ObjectName"] = "K8SContainer" + metricProps["InstanceName"] = clusterId + "/" + podUid + "/" + containerName + + metricProps["Collections"] = [] + metricCollections = {} + metricCollections["CounterName"] = metricNametoReturn + metricCollections["Value"] = metricValue + + metricProps["Collections"].push(metricCollections) + metricItem["DataItems"].push(metricProps) + metricItems.push(metricItem) end - return containerLogs + end end + end + end + rescue => error + @Log.warn("getcontainerResourceRequestsAndLimits failed: #{error} for metric #{metricCategory} #{metricNameToCollect}") + return metricItems + end + return metricItems + end #getContainerResourceRequestAndLimits - def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn) - metricItems = [] - begin - clusterId = getClusterId - metricInfo = metricJSON - metricInfo['items'].each do |pod| - podNameSpace = pod['metadata']['namespace'] - if podNameSpace.eql?("kube-system") && !pod['metadata'].key?("ownerReferences") - # The above case seems to be the only case where you have horizontal scaling of pods - # but no controller, in which case cAdvisor picks up kubernetes.io/config.hash - # instead of the actual poduid. Since this uid is not being surface into the UX - # its ok to use this. - # Use kubernetes.io/config.hash to be able to correlate with cadvisor data - podUid = pod['metadata']['annotations']['kubernetes.io/config.hash'] - else - podUid = pod['metadata']['uid'] - end - if (!pod['spec']['containers'].nil? && !pod['spec']['nodeName'].nil?) - nodeName = pod['spec']['nodeName'] - pod['spec']['containers'].each do |container| - containerName = container['name'] - metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z - if (!container['resources'].nil? && !container['resources'].empty? && !container['resources'][metricCategory].nil? && !container['resources'][metricCategory][metricNameToCollect].nil?) - metricValue = getMetricNumericValue(metricNameToCollect, container['resources'][metricCategory][metricNameToCollect]) - - metricItem = {} - metricItem['DataItems'] = [] - - metricProps = {} - metricProps['Timestamp'] = metricTime - metricProps['Host'] = nodeName - metricProps['ObjectName'] = "K8SContainer" - metricProps['InstanceName'] = clusterId + "/" + podUid + "/" + containerName - - metricProps['Collections'] = [] - metricCollections = {} - metricCollections['CounterName'] = metricNametoReturn - metricCollections['Value'] = metricValue - - metricProps['Collections'].push(metricCollections) - metricItem['DataItems'].push(metricProps) - metricItems.push(metricItem) - #No container level limit for the given metric, so default to node level limit - else - nodeMetricsHashKey = clusterId + "/" + nodeName + "_" + "allocatable" + "_" + metricNameToCollect - if (metricCategory == "limits" && @@NodeMetrics.has_key?(nodeMetricsHashKey)) - - metricValue = @@NodeMetrics[nodeMetricsHashKey] - #@Log.info("Limits not set for container #{clusterId + "/" + podUid + "/" + containerName} using node level limits: #{nodeMetricsHashKey}=#{metricValue} ") - metricItem = {} - metricItem['DataItems'] = [] - - metricProps = {} - metricProps['Timestamp'] = metricTime - metricProps['Host'] = nodeName - metricProps['ObjectName'] = "K8SContainer" - metricProps['InstanceName'] = clusterId + "/" + podUid + "/" + containerName - - metricProps['Collections'] = [] - metricCollections = {} - metricCollections['CounterName'] = metricNametoReturn - metricCollections['Value'] = metricValue - - metricProps['Collections'].push(metricCollections) - metricItem['DataItems'].push(metricProps) - metricItems.push(metricItem) - end - end - end - end - end - rescue => error - @Log.warn("getcontainerResourceRequestsAndLimits failed: #{error} for metric #{metricCategory} #{metricNameToCollect}") - return metricItems - end - return metricItems - end #getContainerResourceRequestAndLimits - - def parseNodeLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn) - metricItems = [] - begin - metricInfo = metricJSON - clusterId = getClusterId - #Since we are getting all node data at the same time and kubernetes doesnt specify a timestamp for the capacity and allocation metrics, - #if we are coming up with the time it should be same for all nodes - metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z - metricInfo['items'].each do |node| - if (!node['status'][metricCategory].nil?) - - # metricCategory can be "capacity" or "allocatable" and metricNameToCollect can be "cpu" or "memory" - metricValue = getMetricNumericValue(metricNameToCollect, node['status'][metricCategory][metricNameToCollect]) - - metricItem = {} - metricItem['DataItems'] = [] - metricProps = {} - metricProps['Timestamp'] = metricTime - metricProps['Host'] = node['metadata']['name'] - metricProps['ObjectName'] = "K8SNode" - metricProps['InstanceName'] = clusterId + "/" + node['metadata']['name'] - metricProps['Collections'] = [] - metricCollections = {} - metricCollections['CounterName'] = metricNametoReturn - metricCollections['Value'] = metricValue - - metricProps['Collections'].push(metricCollections) - metricItem['DataItems'].push(metricProps) - metricItems.push(metricItem) - #push node level metrics to a inmem hash so that we can use it looking up at container level. - #Currently if container level cpu & memory limits are not defined we default to node level limits - @@NodeMetrics[clusterId + "/" + node['metadata']['name'] + "_" + metricCategory + "_" + metricNameToCollect] = metricValue - #@Log.info ("Node metric hash: #{@@NodeMetrics}") - end - end - rescue => error - @Log.warn("parseNodeLimits failed: #{error} for metric #{metricCategory} #{metricNameToCollect}") - end - return metricItems - end #parseNodeLimits - - def getMetricNumericValue(metricName, metricVal) - metricValue = metricVal - begin - case metricName - when "memory" #convert to bytes for memory - #https://kubernetes.io/docs/tasks/configure-pod-container/assign-memory-resource/ - if (metricValue.end_with?("Ki")) - metricValue.chomp!("Ki") - metricValue = Float(metricValue) * 1024.0 ** 1 - elsif (metricValue.end_with?("Mi")) - metricValue.chomp!("Mi") - metricValue = Float(metricValue) * 1024.0 ** 2 - elsif (metricValue.end_with?("Gi")) - metricValue.chomp!("Gi") - metricValue = Float(metricValue) * 1024.0 ** 3 - elsif (metricValue.end_with?("Ti")) - metricValue.chomp!("Ti") - metricValue = Float(metricValue) * 1024.0 ** 4 - elsif (metricValue.end_with?("Pi")) - metricValue.chomp!("Pi") - metricValue = Float(metricValue) * 1024.0 ** 5 - elsif (metricValue.end_with?("Ei")) - metricValue.chomp!("Ei") - metricValue = Float(metricValue) * 1024.0 ** 6 - elsif (metricValue.end_with?("Zi")) - metricValue.chomp!("Zi") - metricValue = Float(metricValue) * 1024.0 ** 7 - elsif (metricValue.end_with?("Yi")) - metricValue.chomp!("Yi") - metricValue = Float(metricValue) * 1024.0 ** 8 - elsif (metricValue.end_with?("K")) - metricValue.chomp!("K") - metricValue = Float(metricValue) * 1000.0 ** 1 - elsif (metricValue.end_with?("M")) - metricValue.chomp!("M") - metricValue = Float(metricValue) * 1000.0 ** 2 - elsif (metricValue.end_with?("G")) - metricValue.chomp!("G") - metricValue = Float(metricValue) * 1000.0 ** 3 - elsif (metricValue.end_with?("T")) - metricValue.chomp!("T") - metricValue = Float(metricValue) * 1000.0 ** 4 - elsif (metricValue.end_with?("P")) - metricValue.chomp!("P") - metricValue = Float(metricValue) * 1000.0 ** 5 - elsif (metricValue.end_with?("E")) - metricValue.chomp!("E") - metricValue = Float(metricValue) * 1000.0 ** 6 - elsif (metricValue.end_with?("Z")) - metricValue.chomp!("Z") - metricValue = Float(metricValue) * 1000.0 ** 7 - elsif (metricValue.end_with?("Y")) - metricValue.chomp!("Y") - metricValue = Float(metricValue) * 1000.0 ** 8 - else #assuming there are no units specified, it is bytes (the below conversion will fail for other unsupported 'units') - metricValue = Float(metricValue) - end - when "cpu" #convert to nanocores for cpu - #https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/ - if (metricValue.end_with?("m")) - metricValue.chomp!("m") - metricValue = Float(metricValue) * 1000.0 ** 2 - else #assuming no units specified, it is cores that we are converting to nanocores (the below conversion will fail for other unsupported 'units') - metricValue = Float(metricValue) * 1000.0 ** 3 - end - else - @Log.warn("getMetricNumericValue: Unsupported metric #{metricName}. Returning 0 for metric value") - metricValue = 0 - end #case statement - rescue => error - @Log.warn("getMetricNumericValue failed: #{error} for metric #{metricName} with value #{metricVal}. Returning 0 formetric value") - return 0 - end - return metricValue - end # getMetricNumericValue + def parseNodeLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn) + metricItems = [] + begin + metricInfo = metricJSON + clusterId = getClusterId + #Since we are getting all node data at the same time and kubernetes doesnt specify a timestamp for the capacity and allocation metrics, + #if we are coming up with the time it should be same for all nodes + metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z + metricInfo["items"].each do |node| + if (!node["status"][metricCategory].nil?) + + # metricCategory can be "capacity" or "allocatable" and metricNameToCollect can be "cpu" or "memory" + metricValue = getMetricNumericValue(metricNameToCollect, node["status"][metricCategory][metricNameToCollect]) + + metricItem = {} + metricItem["DataItems"] = [] + metricProps = {} + metricProps["Timestamp"] = metricTime + metricProps["Host"] = node["metadata"]["name"] + metricProps["ObjectName"] = "K8SNode" + metricProps["InstanceName"] = clusterId + "/" + node["metadata"]["name"] + metricProps["Collections"] = [] + metricCollections = {} + metricCollections["CounterName"] = metricNametoReturn + metricCollections["Value"] = metricValue + + metricProps["Collections"].push(metricCollections) + metricItem["DataItems"].push(metricProps) + metricItems.push(metricItem) + #push node level metrics to a inmem hash so that we can use it looking up at container level. + #Currently if container level cpu & memory limits are not defined we default to node level limits + @@NodeMetrics[clusterId + "/" + node["metadata"]["name"] + "_" + metricCategory + "_" + metricNameToCollect] = metricValue + #@Log.info ("Node metric hash: #{@@NodeMetrics}") + end end - end + rescue => error + @Log.warn("parseNodeLimits failed: #{error} for metric #{metricCategory} #{metricNameToCollect}") + end + return metricItems + end #parseNodeLimits + def getMetricNumericValue(metricName, metricVal) + metricValue = metricVal + begin + case metricName + when "memory" #convert to bytes for memory + #https://kubernetes.io/docs/tasks/configure-pod-container/assign-memory-resource/ + if (metricValue.end_with?("Ki")) + metricValue.chomp!("Ki") + metricValue = Float(metricValue) * 1024.0 ** 1 + elsif (metricValue.end_with?("Mi")) + metricValue.chomp!("Mi") + metricValue = Float(metricValue) * 1024.0 ** 2 + elsif (metricValue.end_with?("Gi")) + metricValue.chomp!("Gi") + metricValue = Float(metricValue) * 1024.0 ** 3 + elsif (metricValue.end_with?("Ti")) + metricValue.chomp!("Ti") + metricValue = Float(metricValue) * 1024.0 ** 4 + elsif (metricValue.end_with?("Pi")) + metricValue.chomp!("Pi") + metricValue = Float(metricValue) * 1024.0 ** 5 + elsif (metricValue.end_with?("Ei")) + metricValue.chomp!("Ei") + metricValue = Float(metricValue) * 1024.0 ** 6 + elsif (metricValue.end_with?("Zi")) + metricValue.chomp!("Zi") + metricValue = Float(metricValue) * 1024.0 ** 7 + elsif (metricValue.end_with?("Yi")) + metricValue.chomp!("Yi") + metricValue = Float(metricValue) * 1024.0 ** 8 + elsif (metricValue.end_with?("K")) + metricValue.chomp!("K") + metricValue = Float(metricValue) * 1000.0 ** 1 + elsif (metricValue.end_with?("M")) + metricValue.chomp!("M") + metricValue = Float(metricValue) * 1000.0 ** 2 + elsif (metricValue.end_with?("G")) + metricValue.chomp!("G") + metricValue = Float(metricValue) * 1000.0 ** 3 + elsif (metricValue.end_with?("T")) + metricValue.chomp!("T") + metricValue = Float(metricValue) * 1000.0 ** 4 + elsif (metricValue.end_with?("P")) + metricValue.chomp!("P") + metricValue = Float(metricValue) * 1000.0 ** 5 + elsif (metricValue.end_with?("E")) + metricValue.chomp!("E") + metricValue = Float(metricValue) * 1000.0 ** 6 + elsif (metricValue.end_with?("Z")) + metricValue.chomp!("Z") + metricValue = Float(metricValue) * 1000.0 ** 7 + elsif (metricValue.end_with?("Y")) + metricValue.chomp!("Y") + metricValue = Float(metricValue) * 1000.0 ** 8 + else #assuming there are no units specified, it is bytes (the below conversion will fail for other unsupported 'units') + metricValue = Float(metricValue) + end + when "cpu" #convert to nanocores for cpu + #https://kubernetes.io/docs/tasks/configure-pod-container/assign-cpu-resource/ + if (metricValue.end_with?("m")) + metricValue.chomp!("m") + metricValue = Float(metricValue) * 1000.0 ** 2 + else #assuming no units specified, it is cores that we are converting to nanocores (the below conversion will fail for other unsupported 'units') + metricValue = Float(metricValue) * 1000.0 ** 3 + end + else + @Log.warn("getMetricNumericValue: Unsupported metric #{metricName}. Returning 0 for metric value") + metricValue = 0 + end #case statement + rescue => error + @Log.warn("getMetricNumericValue failed: #{error} for metric #{metricName} with value #{metricVal}. Returning 0 formetric value") + return 0 + end + return metricValue + end # getMetricNumericValue + end +end diff --git a/source/code/plugin/in_cadvisor_perf.rb b/source/code/plugin/in_cadvisor_perf.rb index a857aa6b9..f5f65f01b 100644 --- a/source/code/plugin/in_cadvisor_perf.rb +++ b/source/code/plugin/in_cadvisor_perf.rb @@ -2,90 +2,88 @@ # frozen_string_literal: true module Fluent - - class CAdvisor_Perf_Input < Input - Plugin.register_input('cadvisorperf', self) - - def initialize - super - require 'yaml' - require 'json' - - require_relative 'CAdvisorMetricsAPIClient' - require_relative 'oms_common' - require_relative 'omslog' - end - - config_param :run_interval, :time, :default => '1m' - config_param :tag, :string, :default => "oms.api.cadvisorperf" - config_param :mdmtag, :string, :default => "mdm.cadvisorperf" - - def configure (conf) - super + class CAdvisor_Perf_Input < Input + Plugin.register_input("cadvisorperf", self) + + def initialize + super + require "yaml" + require "json" + + require_relative "CAdvisorMetricsAPIClient" + require_relative "oms_common" + require_relative "omslog" + end + + config_param :run_interval, :time, :default => "1m" + config_param :tag, :string, :default => "oms.api.cadvisorperf" + config_param :mdmtag, :string, :default => "mdm.cadvisorperf" + + def configure(conf) + super + end + + def start + if @run_interval + @finished = false + @condition = ConditionVariable.new + @mutex = Mutex.new + @thread = Thread.new(&method(:run_periodic)) end - - def start - if @run_interval - @finished = false - @condition = ConditionVariable.new - @mutex = Mutex.new - @thread = Thread.new(&method(:run_periodic)) - end + end + + def shutdown + if @run_interval + @mutex.synchronize { + @finished = true + @condition.signal + } + @thread.join end - - def shutdown - if @run_interval - @mutex.synchronize { - @finished = true - @condition.signal - } - @thread.join + end + + def enumerate() + time = Time.now.to_f + begin + eventStream = MultiEventStream.new + metricData = CAdvisorMetricsAPIClient.getMetrics() + metricData.each do |record| + record["DataType"] = "LINUX_PERF_BLOB" + record["IPName"] = "LogManagement" + eventStream.add(time, record) if record + #router.emit(@tag, time, record) if record end - end - - def enumerate() - time = Time.now.to_f - begin - eventStream = MultiEventStream.new - metricData = CAdvisorMetricsAPIClient.getMetrics() - metricData.each do |record| - record['DataType'] = "LINUX_PERF_BLOB" - record['IPName'] = "LogManagement" - eventStream.add(time, record) if record - #router.emit(@tag, time, record) if record - end - - router.emit_stream(@tag, eventStream) if eventStream - router.emit_stream(@mdmtag, eventStream) if eventStream - @@istestvar = ENV['ISTEST'] - if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0) - $log.info("cAdvisorPerfEmitStreamSuccess @ #{Time.now.utc.iso8601}") - end - rescue => errorStr - $log.warn "Failed to retrieve cadvisor metric data: #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) + + router.emit_stream(@tag, eventStream) if eventStream + router.emit_stream(@mdmtag, eventStream) if eventStream + @@istestvar = ENV["ISTEST"] + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) + $log.info("cAdvisorPerfEmitStreamSuccess @ #{Time.now.utc.iso8601}") end + rescue => errorStr + $log.warn "Failed to retrieve cadvisor metric data: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) end - - def run_periodic - @mutex.lock + end + + def run_periodic + @mutex.lock + done = @finished + until done + @condition.wait(@mutex, @run_interval) done = @finished - until done - @condition.wait(@mutex, @run_interval) - done = @finished - @mutex.unlock - if !done - begin - $log.info("in_cadvisor_perf::run_periodic @ #{Time.now.utc.iso8601}") - enumerate - rescue => errorStr - $log.warn "in_cadvisor_perf::run_periodic: enumerate Failed to retrieve cadvisor perf metrics: #{errorStr}" - end + @mutex.unlock + if !done + begin + $log.info("in_cadvisor_perf::run_periodic @ #{Time.now.utc.iso8601}") + enumerate + rescue => errorStr + $log.warn "in_cadvisor_perf::run_periodic: enumerate Failed to retrieve cadvisor perf metrics: #{errorStr}" end - @mutex.lock end - @mutex.unlock + @mutex.lock end - end # CAdvisor_Perf_Input + @mutex.unlock + end + end # CAdvisor_Perf_Input end # module - diff --git a/source/code/plugin/in_containerinventory.rb b/source/code/plugin/in_containerinventory.rb index f501421a2..4d83278a9 100644 --- a/source/code/plugin/in_containerinventory.rb +++ b/source/code/plugin/in_containerinventory.rb @@ -2,29 +2,28 @@ # frozen_string_literal: true module Fluent - class Container_Inventory_Input < Input - Plugin.register_input('containerinventory', self) + Plugin.register_input("containerinventory", self) - @@PluginName = 'ContainerInventory' - @@RunningState = 'Running' - @@FailedState = 'Failed' - @@StoppedState = 'Stopped' - @@PausedState = 'Paused' + @@PluginName = "ContainerInventory" + @@RunningState = "Running" + @@FailedState = "Failed" + @@StoppedState = "Stopped" + @@PausedState = "Paused" def initialize super - require 'json' - require_relative 'DockerApiClient' - require_relative 'ContainerInventoryState' - require_relative 'ApplicationInsightsUtility' - require_relative 'omslog' + require "json" + require_relative "DockerApiClient" + require_relative "ContainerInventoryState" + require_relative "ApplicationInsightsUtility" + require_relative "omslog" end - config_param :run_interval, :time, :default => '1m' + config_param :run_interval, :time, :default => "1m" config_param :tag, :string, :default => "oms.containerinsights.containerinventory" - - def configure (conf) + + def configure(conf) super end @@ -50,16 +49,16 @@ def shutdown def obtainContainerConfig(instance, container) begin - configValue = container['Config'] + configValue = container["Config"] if !configValue.nil? - instance['ContainerHostname'] = configValue['Hostname'] + instance["ContainerHostname"] = configValue["Hostname"] - envValue = configValue['Env'] + envValue = configValue["Env"] envValueString = (envValue.nil?) ? "" : envValue.to_s # Skip environment variable processing if it contains the flag AZMON_COLLECT_ENV=FALSE if /AZMON_COLLECT_ENV=FALSE/i.match(envValueString) envValueString = ["AZMON_COLLECT_ENV=FALSE"] - $log.warn("Environment Variable collection for container: #{container['Id']} skipped because AZMON_COLLECT_ENV is set to false") + $log.warn("Environment Variable collection for container: #{container["Id"]} skipped because AZMON_COLLECT_ENV is set to false") end # Restricting the ENV string value to 200kb since the size of this string can go very high if envValueString.length > 200000 @@ -68,88 +67,88 @@ def obtainContainerConfig(instance, container) if !lastIndex.nil? envValueStringTruncated = envValueStringTruncated.slice(0..lastIndex) + "]" end - instance['EnvironmentVar'] = envValueStringTruncated + instance["EnvironmentVar"] = envValueStringTruncated else - instance['EnvironmentVar'] = envValueString + instance["EnvironmentVar"] = envValueString end - cmdValue = configValue['Cmd'] + cmdValue = configValue["Cmd"] cmdValueString = (cmdValue.nil?) ? "" : cmdValue.to_s - instance['Command'] = cmdValueString + instance["Command"] = cmdValueString - instance['ComposeGroup'] = "" - labelsValue = configValue['Labels'] + instance["ComposeGroup"] = "" + labelsValue = configValue["Labels"] if !labelsValue.nil? && !labelsValue.empty? - instance['ComposeGroup'] = labelsValue['com.docker.compose.project'] + instance["ComposeGroup"] = labelsValue["com.docker.compose.project"] end else - $log.warn("Attempt in ObtainContainerConfig to get container: #{container['Id']} config information returned null") - end - rescue => errorStr - $log.warn("Exception in obtainContainerConfig: #{errorStr}") + $log.warn("Attempt in ObtainContainerConfig to get container: #{container["Id"]} config information returned null") end + rescue => errorStr + $log.warn("Exception in obtainContainerConfig: #{errorStr}") + end end def obtainContainerState(instance, container) begin - stateValue = container['State'] + stateValue = container["State"] if !stateValue.nil? - exitCodeValue = stateValue['ExitCode'] + exitCodeValue = stateValue["ExitCode"] # Exit codes less than 0 are not supported by the engine if exitCodeValue < 0 - exitCodeValue = 128 - $log.info("obtainContainerState::Container: #{container['Id']} returned negative exit code") + exitCodeValue = 128 + $log.info("obtainContainerState::Container: #{container["Id"]} returned negative exit code") end - instance['ExitCode'] = exitCodeValue + instance["ExitCode"] = exitCodeValue if exitCodeValue > 0 - instance['State'] = @@FailedState + instance["State"] = @@FailedState else # Set the Container status : Running/Paused/Stopped - runningValue = stateValue['Running'] + runningValue = stateValue["Running"] if runningValue - pausedValue = stateValue['Paused'] + pausedValue = stateValue["Paused"] # Checking for paused within running is true state because docker returns true for both Running and Paused fields when the container is paused if pausedValue - instance['State'] = @@PausedState + instance["State"] = @@PausedState else - instance['State'] = @@RunningState + instance["State"] = @@RunningState end else - instance['State'] = @@StoppedState + instance["State"] = @@StoppedState end end - instance['StartedTime'] = stateValue['StartedAt'] - instance['FinishedTime'] = stateValue['FinishedAt'] + instance["StartedTime"] = stateValue["StartedAt"] + instance["FinishedTime"] = stateValue["FinishedAt"] else - $log.info("Attempt in ObtainContainerState to get container: #{container['Id']} state information returned null") + $log.info("Attempt in ObtainContainerState to get container: #{container["Id"]} state information returned null") end - rescue => errorStr - $log.warn("Exception in obtainContainerState: #{errorStr}") + rescue => errorStr + $log.warn("Exception in obtainContainerState: #{errorStr}") end end def obtainContainerHostConfig(instance, container) begin - hostConfig = container['HostConfig'] + hostConfig = container["HostConfig"] if !hostConfig.nil? - links = hostConfig['Links'] - instance['Links'] = "" + links = hostConfig["Links"] + instance["Links"] = "" if !links.nil? linksString = links.to_s - instance['Links'] = (linksString == "null")? "" : linksString + instance["Links"] = (linksString == "null") ? "" : linksString end - portBindings = hostConfig['PortBindings'] - instance['Ports'] = "" + portBindings = hostConfig["PortBindings"] + instance["Ports"] = "" if !portBindings.nil? portBindingsString = portBindings.to_s - instance['Ports'] = (portBindingsString == "null")? "" : portBindingsString + instance["Ports"] = (portBindingsString == "null") ? "" : portBindingsString end else - $log.info("Attempt in ObtainContainerHostConfig to get container: #{container['Id']} host config information returned null") - end - rescue => errorStr - $log.warn("Exception in obtainContainerHostConfig: #{errorStr}") + $log.info("Attempt in ObtainContainerHostConfig to get container: #{container["Id"]} host config information returned null") end + rescue => errorStr + $log.warn("Exception in obtainContainerHostConfig: #{errorStr}") + end end def inspectContainer(id, nameMap) @@ -157,29 +156,29 @@ def inspectContainer(id, nameMap) begin container = DockerApiClient.dockerInspectContainer(id) if !container.nil? && !container.empty? - containerInstance['InstanceID'] = container['Id'] - containerInstance['CreatedTime'] = container['Created'] - containerName = container['Name'] + containerInstance["InstanceID"] = container["Id"] + containerInstance["CreatedTime"] = container["Created"] + containerName = container["Name"] if !containerName.nil? && !containerName.empty? # Remove the leading / from the name if it exists (this is an API issue) - containerInstance['ElementName'] = (containerName[0] == '/') ? containerName[1..-1] : containerName + containerInstance["ElementName"] = (containerName[0] == "/") ? containerName[1..-1] : containerName end - imageValue = container['Image'] + imageValue = container["Image"] if !imageValue.nil? && !imageValue.empty? - containerInstance['ImageId'] = imageValue + containerInstance["ImageId"] = imageValue repoImageTagArray = nameMap[imageValue] if nameMap.has_key? imageValue - containerInstance['Repository'] = repoImageTagArray[0] - containerInstance['Image'] = repoImageTagArray[1] - containerInstance['ImageTag'] = repoImageTagArray[2] + containerInstance["Repository"] = repoImageTagArray[0] + containerInstance["Image"] = repoImageTagArray[1] + containerInstance["ImageTag"] = repoImageTagArray[2] end end - obtainContainerConfig(containerInstance, container); - obtainContainerState(containerInstance, container); - obtainContainerHostConfig(containerInstance, container); + obtainContainerConfig(containerInstance, container) + obtainContainerState(containerInstance, container) + obtainContainerHostConfig(containerInstance, container) end rescue => errorStr - $log.warn("Exception in inspectContainer: #{errorStr} for container: #{id}") + $log.warn("Exception in inspectContainer: #{errorStr} for container: #{id}") end return containerInstance end @@ -199,8 +198,8 @@ def enumerate containerIds.each do |containerId| inspectedContainer = {} inspectedContainer = inspectContainer(containerId, nameMap) - inspectedContainer['Computer'] = hostname - inspectedContainer['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated + inspectedContainer["Computer"] = hostname + inspectedContainer["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated containerInventory.push inspectedContainer ContainerInventoryState.writeContainerState(inspectedContainer) end @@ -210,8 +209,8 @@ def enumerate deletedContainers.each do |deletedContainer| container = ContainerInventoryState.readContainerState(deletedContainer) if !container.nil? - container.each{|k,v| container[k]=v} - container['State'] = "Deleted" + container.each { |k, v| container[k] = v } + container["State"] = "Deleted" containerInventory.push container end end @@ -219,28 +218,28 @@ def enumerate containerInventory.each do |record| wrapper = { - "DataType"=>"CONTAINER_INVENTORY_BLOB", - "IPName"=>"ContainerInsights", - "DataItems"=>[record.each{|k,v| record[k]=v}] + "DataType" => "CONTAINER_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [record.each { |k, v| record[k] = v }], } eventStream.add(emitTime, wrapper) if wrapper end router.emit_stream(@tag, eventStream) if eventStream - @@istestvar = ENV['ISTEST'] - if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0) + @@istestvar = ENV["ISTEST"] + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) $log.info("containerInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") end - timeDifference = (DateTime.now.to_time.to_i - @@telemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference/60 - if (timeDifferenceInMinutes >= 5) - @@telemetryTimeTracker = DateTime.now.to_time.to_i - telemetryProperties = {} - telemetryProperties['Computer'] = hostname - telemetryProperties['ContainerCount'] = containerInventory.length - ApplicationInsightsUtility.sendTelemetry(@@PluginName, telemetryProperties) - end $log.info("in_container_inventory::enumerate : Processing complete - emitted stream @ #{Time.now.utc.iso8601}") end + timeDifference = (DateTime.now.to_time.to_i - @@telemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 5) + @@telemetryTimeTracker = DateTime.now.to_time.to_i + telemetryProperties = {} + telemetryProperties["Computer"] = hostname + telemetryProperties["ContainerCount"] = containerInventory.length + ApplicationInsightsUtility.sendTelemetry(@@PluginName, telemetryProperties) + end rescue => errorStr $log.warn("Exception in enumerate container inventory: #{errorStr}") end @@ -265,7 +264,5 @@ def run_periodic end @mutex.unlock end - end # Container_Inventory_Input - -end # module \ No newline at end of file +end # module diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index ba1dacbe0..aabda441e 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -2,181 +2,176 @@ # frozen_string_literal: true module Fluent + class Kube_nodeInventory_Input < Input + Plugin.register_input("kubenodeinventory", self) - class Kube_nodeInventory_Input < Input - Plugin.register_input('kubenodeinventory', self) - - @@ContainerNodeInventoryTag = 'oms.containerinsights.ContainerNodeInventory' - @@MDMKubeNodeInventoryTag = 'mdm.kubenodeinventory' + @@ContainerNodeInventoryTag = "oms.containerinsights.ContainerNodeInventory" + @@MDMKubeNodeInventoryTag = "mdm.kubenodeinventory" - def initialize - super - require 'yaml' - require 'json' - - require_relative 'KubernetesApiClient' - require_relative 'ApplicationInsightsUtility' - require_relative 'oms_common' - require_relative 'omslog' + def initialize + super + require "yaml" + require "json" + require_relative "KubernetesApiClient" + require_relative "ApplicationInsightsUtility" + require_relative "oms_common" + require_relative "omslog" + end + + config_param :run_interval, :time, :default => "1m" + config_param :tag, :string, :default => "oms.containerinsights.KubeNodeInventory" + + def configure(conf) + super + end + + def start + if @run_interval + @finished = false + @condition = ConditionVariable.new + @mutex = Mutex.new + @thread = Thread.new(&method(:run_periodic)) + @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i end - - config_param :run_interval, :time, :default => '1m' - config_param :tag, :string, :default => "oms.containerinsights.KubeNodeInventory" - - def configure (conf) - super - end - - def start - if @run_interval - @finished = false - @condition = ConditionVariable.new - @mutex = Mutex.new - @thread = Thread.new(&method(:run_periodic)) - @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i - end - end - - def shutdown - if @run_interval - @mutex.synchronize { - @finished = true - @condition.signal - } - @thread.join - end - end - - def enumerate - currentTime = Time.now - emitTime = currentTime.to_f - batchTime = currentTime.utc.iso8601 - telemetrySent = false - $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") - nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('nodes').body) - $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") - begin - if(!nodeInventory.empty?) - eventStream = MultiEventStream.new - containerNodeInventoryEventStream = MultiEventStream.new - #get node inventory - nodeInventory['items'].each do |items| - record = {} - # Sending records for ContainerNodeInventory - containerNodeInventoryRecord = {} - containerNodeInventoryRecord['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated - containerNodeInventoryRecord['Computer'] = items['metadata']['name'] + end - record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated - record['Computer'] = items['metadata']['name'] - record['ClusterName'] = KubernetesApiClient.getClusterName - record['ClusterId'] = KubernetesApiClient.getClusterId - record['CreationTimeStamp'] = items['metadata']['creationTimestamp'] - record['Labels'] = [items['metadata']['labels']] - record['Status'] = "" + def shutdown + if @run_interval + @mutex.synchronize { + @finished = true + @condition.signal + } + @thread.join + end + end - # Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions. - # We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we - # populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk" - # implying that the node is ready for hosting pods, however its out of disk. - - if items['status'].key?("conditions") && !items['status']['conditions'].empty? - allNodeConditions="" - items['status']['conditions'].each do |condition| - if condition['status'] == "True" - if !allNodeConditions.empty? - allNodeConditions = allNodeConditions + "," + condition['type'] - else - allNodeConditions = condition['type'] - end - end - #collect last transition to/from ready (no matter ready is true/false) - if condition['type'] == "Ready" && !condition['lastTransitionTime'].nil? - record['LastTransitionTimeReady'] = condition['lastTransitionTime'] - end - end - if !allNodeConditions.empty? - record['Status'] = allNodeConditions - end + def enumerate + currentTime = Time.now + emitTime = currentTime.to_f + batchTime = currentTime.utc.iso8601 + telemetrySent = false + $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") + nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes").body) + $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + begin + if (!nodeInventory.empty?) + eventStream = MultiEventStream.new + containerNodeInventoryEventStream = MultiEventStream.new + #get node inventory + nodeInventory["items"].each do |items| + record = {} + # Sending records for ContainerNodeInventory + containerNodeInventoryRecord = {} + containerNodeInventoryRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + containerNodeInventoryRecord["Computer"] = items["metadata"]["name"] - end + record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + record["Computer"] = items["metadata"]["name"] + record["ClusterName"] = KubernetesApiClient.getClusterName + record["ClusterId"] = KubernetesApiClient.getClusterId + record["CreationTimeStamp"] = items["metadata"]["creationTimestamp"] + record["Labels"] = [items["metadata"]["labels"]] + record["Status"] = "" - nodeInfo = items['status']['nodeInfo'] - record['KubeletVersion'] = nodeInfo['kubeletVersion'] - record['KubeProxyVersion'] = nodeInfo['kubeProxyVersion'] - containerNodeInventoryRecord['OperatingSystem'] = nodeInfo['osImage'] - dockerVersion = nodeInfo['containerRuntimeVersion'] - dockerVersion.slice! "docker://" - containerNodeInventoryRecord['DockerVersion'] = dockerVersion - # ContainerNodeInventory data for docker version and operating system. - containerNodeInventoryWrapper = { - "DataType"=>"CONTAINER_NODE_INVENTORY_BLOB", - "IPName"=>"ContainerInsights", - "DataItems"=>[containerNodeInventoryRecord.each{|k,v| containerNodeInventoryRecord[k]=v}] - } - containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryWrapper) if containerNodeInventoryWrapper + # Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions. + # We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we + # populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk" + # implying that the node is ready for hosting pods, however its out of disk. - wrapper = { - "DataType"=>"KUBE_NODE_INVENTORY_BLOB", - "IPName"=>"ContainerInsights", - "DataItems"=>[record.each{|k,v| record[k]=v}] - } - eventStream.add(emitTime, wrapper) if wrapper - # Adding telemetry to send node telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference/60 - if (timeDifferenceInMinutes >= 5) - properties = {} - properties["Computer"] = record["Computer"] - properties["KubeletVersion"] = record["KubeletVersion"] - capacityInfo = items['status']['capacity'] - ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"] , properties) - ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"] , properties) - telemetrySent = true - end - end - router.emit_stream(@tag, eventStream) if eventStream - router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream - router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream - if telemetrySent == true - @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i + if items["status"].key?("conditions") && !items["status"]["conditions"].empty? + allNodeConditions = "" + items["status"]["conditions"].each do |condition| + if condition["status"] == "True" + if !allNodeConditions.empty? + allNodeConditions = allNodeConditions + "," + condition["type"] + else + allNodeConditions = condition["type"] + end end - @@istestvar = ENV['ISTEST'] - if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0) - $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") + #collect last transition to/from ready (no matter ready is true/false) + if condition["type"] == "Ready" && !condition["lastTransitionTime"].nil? + record["LastTransitionTimeReady"] = condition["lastTransitionTime"] end - end - rescue => errorStr - $log.warn "Failed to retrieve node inventory: #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end - end - - def run_periodic - @mutex.lock - done = @finished - until done - @condition.wait(@mutex, @run_interval) - done = @finished - @mutex.unlock - if !done - begin - $log.info("in_kube_nodes::run_periodic @ #{Time.now.utc.iso8601}") - enumerate - rescue => errorStr - $log.warn "in_kube_nodes::run_periodic: enumerate Failed to retrieve node inventory: #{errorStr}" - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + if !allNodeConditions.empty? + record["Status"] = allNodeConditions + end end + + nodeInfo = items["status"]["nodeInfo"] + record["KubeletVersion"] = nodeInfo["kubeletVersion"] + record["KubeProxyVersion"] = nodeInfo["kubeProxyVersion"] + containerNodeInventoryRecord["OperatingSystem"] = nodeInfo["osImage"] + dockerVersion = nodeInfo["containerRuntimeVersion"] + dockerVersion.slice! "docker://" + containerNodeInventoryRecord["DockerVersion"] = dockerVersion + # ContainerNodeInventory data for docker version and operating system. + containerNodeInventoryWrapper = { + "DataType" => "CONTAINER_NODE_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [containerNodeInventoryRecord.each { |k, v| containerNodeInventoryRecord[k] = v }], + } + containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryWrapper) if containerNodeInventoryWrapper + + wrapper = { + "DataType" => "KUBE_NODE_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [record.each { |k, v| record[k] = v }], + } + eventStream.add(emitTime, wrapper) if wrapper + # Adding telemetry to send node telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 5) + properties = {} + properties["Computer"] = record["Computer"] + properties["KubeletVersion"] = record["KubeletVersion"] + properties["OperatingSystem"] = nodeInfo["operatingSystem"] + properties["DockerVersion"] = dockerVersion + capacityInfo = items["status"]["capacity"] + ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties) + ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties) + telemetrySent = true + end + end + router.emit_stream(@tag, eventStream) if eventStream + router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream + router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream + if telemetrySent == true + @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i + end + @@istestvar = ENV["ISTEST"] + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) + $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") end - @mutex.lock end + rescue => errorStr + $log.warn "Failed to retrieve node inventory: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + end + + def run_periodic + @mutex.lock + done = @finished + until done + @condition.wait(@mutex, @run_interval) + done = @finished @mutex.unlock + if !done + begin + $log.info("in_kube_nodes::run_periodic @ #{Time.now.utc.iso8601}") + enumerate + rescue => errorStr + $log.warn "in_kube_nodes::run_periodic: enumerate Failed to retrieve node inventory: #{errorStr}" + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + end + @mutex.lock end - - end # Kube_Node_Input - - end # module - - \ No newline at end of file + @mutex.unlock + end + end # Kube_Node_Input +end # module diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index 3d026b05f..65573673c 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -2,29 +2,28 @@ # frozen_string_literal: true module Fluent - class Kube_PodInventory_Input < Input - Plugin.register_input('kubepodinventory', self) + Plugin.register_input("kubepodinventory", self) - @@MDMKubePodInventoryTag = 'mdm.kubepodinventory' + @@MDMKubePodInventoryTag = "mdm.kubepodinventory" + @@hostName = (OMS::Common.get_hostname) def initialize super - require 'yaml' - require 'json' - require 'set' - - require_relative 'KubernetesApiClient' - require_relative 'ApplicationInsightsUtility' - require_relative 'oms_common' - require_relative 'omslog' + require "yaml" + require "json" + require "set" + require_relative "KubernetesApiClient" + require_relative "ApplicationInsightsUtility" + require_relative "oms_common" + require_relative "omslog" end - config_param :run_interval, :time, :default => '1m' + config_param :run_interval, :time, :default => "1m" config_param :tag, :string, :default => "oms.containerinsights.KubePodInventory" - def configure (conf) + def configure(conf) super end @@ -48,29 +47,126 @@ def shutdown end end - def enumerate(podList = nil) - if podList.nil? - $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") - podInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('pods').body) - $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") + def enumerate(podList = nil) + if podList.nil? + $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") + podInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("pods").body) + $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") + else + podInventory = podList + end + begin + if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) + #get pod inventory & services + $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") + serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body) + $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") + parse_and_emit_records(podInventory, serviceList) else - podInventory = podList + $log.warn "Received empty podInventory" + end + rescue => errorStr + $log.warn "Failed in enumerate pod inventory: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + end + + def populateWindowsContainerInventoryRecord(container, record, containerEnvVariableHash, batchTime) + begin + containerInventoryRecord = {} + containerName = container["name"] + containerInventoryRecord["InstanceID"] = record["ContainerID"] + containerInventoryRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + containerInventoryRecord["Computer"] = record["Computer"] + containerInventoryRecord["ContainerHostname"] = record["Computer"] + containerInventoryRecord["ElementName"] = containerName + image = container["image"] + repoInfo = image.split("/") + if !repoInfo.nil? + containerInventoryRecord["Repository"] = repoInfo[0] + if !repoInfo[1].nil? + imageInfo = repoInfo[1].split(":") + if !imageInfo.nil? + containerInventoryRecord["Image"] = imageInfo[0] + containerInventoryRecord["ImageTag"] = imageInfo[1] + end + end + end + imageIdInfo = container["imageID"] + imageIdSplitInfo = imageIdInfo.split("@") + if !imageIdSplitInfo.nil? + containerInventoryRecord["ImageId"] = imageIdSplitInfo[1] + end + # Get container state + containerStatus = container["state"] + if containerStatus.keys[0] == "running" + containerInventoryRecord["State"] = "Running" + containerInventoryRecord["StartedTime"] = container["state"]["running"]["startedAt"] + elsif containerStatus.keys[0] == "terminated" + containerExitCode = container["state"]["terminated"]["exitCode"] + containerStartTime = container["state"]["terminated"]["startedAt"] + containerFinishTime = container["state"]["terminated"]["finishedAt"] + if containerExitCode < 0 + # Exit codes less than 0 are not supported by the engine + containerExitCode = 128 + end + if containerExitCode > 0 + containerInventoryRecord["State"] = "Failed" + else + containerInventoryRecord["State"] = "Stopped" + end + containerInventoryRecord["ExitCode"] = containerExitCode + containerInventoryRecord["StartedTime"] = containerStartTime + containerInventoryRecord["FinishedTime"] = containerFinishTime + elsif containerStatus.keys[0] == "waiting" + containerInventoryRecord["State"] = "Waiting" + end + if !containerEnvVariableHash.nil? && !containerEnvVariableHash.empty? + containerInventoryRecord["EnvironmentVar"] = containerEnvVariableHash[containerName] end - begin - if(!podInventory.empty? && podInventory.key?("items") && !podInventory['items'].empty?) - #get pod inventory & services - $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") - serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo('services').body) - $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") - parse_and_emit_records(podInventory, serviceList) - else - $log.warn "Received empty podInventory" - end - rescue => errorStr - $log.warn "Failed in enumerate pod inventory: #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end + return containerInventoryRecord + rescue => errorStr + $log.warn "Failed in populateWindowsContainerInventoryRecord: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + end + + def getContainerEnvironmentVariables(pod) + begin + podSpec = pod["spec"] + containerEnvHash = {} + if !podSpec.nil? && !podSpec["containers"].nil? + podSpec["containers"].each do |container| + envVarsArray = [] + containerEnvArray = container["env"] + # Parsing the environment variable array of hashes to a string value + # since that is format being sent by container inventory workflow in daemonset + # Keeping it in the same format because the workflow expects it in this format + # and the UX expects an array of string for environment variables + if !containerEnvArray.nil? && !containerEnvArray.empty? + containerEnvArray.each do |envVarHash| + envName = envVarHash["name"] + envValue = envVarHash["value"] + envArrayElement = envName + "=" + envValue + envVarsArray.push(envArrayElement) + end + end + # Skip environment variable processing if it contains the flag AZMON_COLLECT_ENV=FALSE + envValueString = envVarsArray.to_s + if /AZMON_COLLECT_ENV=FALSE/i.match(envValueString) + envValueString = ["AZMON_COLLECT_ENV=FALSE"] + end + containerEnvHash[container["name"]] = envValueString + end + end + return containerEnvHash + rescue => errorStr + $log.warn "Failed in getContainerEnvironmentVariables: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end end def parse_and_emit_records(podInventory, serviceList) @@ -80,100 +176,116 @@ def parse_and_emit_records(podInventory, serviceList) eventStream = MultiEventStream.new controllerSet = Set.new [] telemetryFlush = false + winContainerCount = 0 begin #begin block start - podInventory['items'].each do |items| #podInventory block start + # Getting windows nodes from kubeapi + winNodes = KubernetesApiClient.getWindowsNodesArray + + podInventory["items"].each do |items| #podInventory block start + sendWindowsContainerInventoryRecord = false + containerInventoryRecords = [] records = [] record = {} - record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated - record['Name'] = items['metadata']['name'] - podNameSpace = items['metadata']['namespace'] - - if podNameSpace.eql?("kube-system") && !items['metadata'].key?("ownerReferences") + record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + record["Name"] = items["metadata"]["name"] + podNameSpace = items["metadata"]["namespace"] + + if podNameSpace.eql?("kube-system") && !items["metadata"].key?("ownerReferences") # The above case seems to be the only case where you have horizontal scaling of pods # but no controller, in which case cAdvisor picks up kubernetes.io/config.hash # instead of the actual poduid. Since this uid is not being surface into the UX # its ok to use this. # Use kubernetes.io/config.hash to be able to correlate with cadvisor data - podUid = items['metadata']['annotations']['kubernetes.io/config.hash'] + podUid = items["metadata"]["annotations"]["kubernetes.io/config.hash"] else - podUid = items['metadata']['uid'] + podUid = items["metadata"]["uid"] end - record['PodUid'] = podUid - record['PodLabel'] = [items['metadata']['labels']] - record['Namespace'] = podNameSpace - record['PodCreationTimeStamp'] = items['metadata']['creationTimestamp'] + record["PodUid"] = podUid + record["PodLabel"] = [items["metadata"]["labels"]] + record["Namespace"] = podNameSpace + record["PodCreationTimeStamp"] = items["metadata"]["creationTimestamp"] #for unscheduled (non-started) pods startTime does NOT exist - if !items['status']['startTime'].nil? - record['PodStartTime'] = items['status']['startTime'] + if !items["status"]["startTime"].nil? + record["PodStartTime"] = items["status"]["startTime"] else - record['PodStartTime'] = "" + record["PodStartTime"] = "" end #podStatus # the below is for accounting 'NodeLost' scenario, where-in the pod(s) in the lost node is still being reported as running podReadyCondition = true - if !items['status']['reason'].nil? && items['status']['reason'] == "NodeLost" && !items['status']['conditions'].nil? - items['status']['conditions'].each do |condition| - if condition['type'] == "Ready" && condition['status'] == "False" + if !items["status"]["reason"].nil? && items["status"]["reason"] == "NodeLost" && !items["status"]["conditions"].nil? + items["status"]["conditions"].each do |condition| + if condition["type"] == "Ready" && condition["status"] == "False" podReadyCondition = false break end end end if podReadyCondition == false - record['PodStatus'] = "Unknown" + record["PodStatus"] = "Unknown" else - record['PodStatus'] = items['status']['phase'] + record["PodStatus"] = items["status"]["phase"] end #for unscheduled (non-started) pods podIP does NOT exist - if !items['status']['podIP'].nil? - record['PodIp'] =items['status']['podIP'] + if !items["status"]["podIP"].nil? + record["PodIp"] = items["status"]["podIP"] else - record['PodIp'] = "" + record["PodIp"] = "" end #for unscheduled (non-started) pods nodeName does NOT exist - if !items['spec']['nodeName'].nil? - record['Computer'] = items['spec']['nodeName'] + if !items["spec"]["nodeName"].nil? + record["Computer"] = items["spec"]["nodeName"] else - record['Computer'] = "" - end - record['ClusterId'] = KubernetesApiClient.getClusterId - record['ClusterName'] = KubernetesApiClient.getClusterName - record['ServiceName'] = getServiceNameFromLabels(items['metadata']['namespace'], items['metadata']['labels'], serviceList) - # Adding telemetry to send pod telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference/60 - if (timeDifferenceInMinutes >= 5) - telemetryFlush = true - end - if !items['metadata']['ownerReferences'].nil? - record['ControllerKind'] = items['metadata']['ownerReferences'][0]['kind'] - record['ControllerName'] = items['metadata']['ownerReferences'][0]['name'] + record["Computer"] = "" + end + + # Setting this flag to true so that we can send ContainerInventory records for containers + # on windows nodes and parse environment variables for these containers + if winNodes.length > 0 + if (!record["Computer"].empty? && (winNodes.include? record["Computer"])) + sendWindowsContainerInventoryRecord = true + containerEnvVariableHash = getContainerEnvironmentVariables(items) + end + end + + record["ClusterId"] = KubernetesApiClient.getClusterId + record["ClusterName"] = KubernetesApiClient.getClusterName + record["ServiceName"] = getServiceNameFromLabels(items["metadata"]["namespace"], items["metadata"]["labels"], serviceList) + # Adding telemetry to send pod telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 5) + telemetryFlush = true + end + if !items["metadata"]["ownerReferences"].nil? + record["ControllerKind"] = items["metadata"]["ownerReferences"][0]["kind"] + record["ControllerName"] = items["metadata"]["ownerReferences"][0]["name"] if telemetryFlush == true - controllerSet.add(record['ControllerKind'] + record['ControllerName']) + controllerSet.add(record["ControllerKind"] + record["ControllerName"]) end end podRestartCount = 0 - record['PodRestartCount'] = 0 - if items['status'].key?("containerStatuses") && !items['status']['containerStatuses'].empty? #container status block start - items['status']['containerStatuses'].each do |container| - containerRestartCount = 0 - #container Id is of the form - #docker://dfd9da983f1fd27432fb2c1fe3049c0a1d25b1c697b2dc1a530c986e58b16527 - if !container['containerID'].nil? - record['ContainerID'] = container['containerID'].split("//")[1] - else + record["PodRestartCount"] = 0 + if items["status"].key?("containerStatuses") && !items["status"]["containerStatuses"].empty? #container status block start + items["status"]["containerStatuses"].each do |container| + containerRestartCount = 0 + #container Id is of the form + #docker://dfd9da983f1fd27432fb2c1fe3049c0a1d25b1c697b2dc1a530c986e58b16527 + if !container["containerID"].nil? + record["ContainerID"] = container["containerID"].split("//")[1] + else # for containers that have image issues (like invalid image/tag etc..) this will be empty. do not make it all 0 - record['ContainerID'] = "" + record["ContainerID"] = "" end - #keeping this as which is same as InstanceName in perf table - record['ContainerName'] = podUid + "/" +container['name'] - #Pod restart count is a sumtotal of restart counts of individual containers - #within the pod. The restart count of a container is maintained by kubernetes - #itself in the form of a container label. - containerRestartCount = container['restartCount'] - record['ContainerRestartCount'] = containerRestartCount - containerStatus = container['state'] - record['ContainerStatusReason'] = '' + #keeping this as which is same as InstanceName in perf table + record["ContainerName"] = podUid + "/" + container["name"] + #Pod restart count is a sumtotal of restart counts of individual containers + #within the pod. The restart count of a container is maintained by kubernetes + #itself in the form of a container label. + containerRestartCount = container["restartCount"] + record["ContainerRestartCount"] = containerRestartCount + containerStatus = container["state"] + record["ContainerStatusReason"] = "" # state is of the following form , so just picking up the first key name # "state": { # "waiting": { @@ -183,55 +295,80 @@ def parse_and_emit_records(podInventory, serviceList) # }, # the below is for accounting 'NodeLost' scenario, where-in the containers in the lost node/pod(s) is still being reported as running if podReadyCondition == false - record['ContainerStatus'] = "Unknown" + record["ContainerStatus"] = "Unknown" else - record['ContainerStatus'] = containerStatus.keys[0] + record["ContainerStatus"] = containerStatus.keys[0] end #TODO : Remove ContainerCreationTimeStamp from here since we are sending it as a metric #Picking up both container and node start time from cAdvisor to be consistent if containerStatus.keys[0] == "running" - record['ContainerCreationTimeStamp'] = container['state']['running']['startedAt'] + record["ContainerCreationTimeStamp"] = container["state"]["running"]["startedAt"] else - if !containerStatus[containerStatus.keys[0]]['reason'].nil? && !containerStatus[containerStatus.keys[0]]['reason'].empty? - record['ContainerStatusReason'] = containerStatus[containerStatus.keys[0]]['reason'] + if !containerStatus[containerStatus.keys[0]]["reason"].nil? && !containerStatus[containerStatus.keys[0]]["reason"].empty? + record["ContainerStatusReason"] = containerStatus[containerStatus.keys[0]]["reason"] end end - podRestartCount += containerRestartCount - records.push(record.dup) - end + podRestartCount += containerRestartCount + records.push(record.dup) + + #Generate ContainerInventory records for windows nodes so that we can get image and image tag in property panel + if sendWindowsContainerInventoryRecord == true + containerInventoryRecord = populateWindowsContainerInventoryRecord(container, record, containerEnvVariableHash, batchTime) + containerInventoryRecords.push(containerInventoryRecord) + end + end else # for unscheduled pods there are no status.containerStatuses, in this case we still want the pod - records.push(record) + records.push(record) end #container status block end records.each do |record| if !record.nil? - record['PodRestartCount'] = podRestartCount + record["PodRestartCount"] = podRestartCount wrapper = { - "DataType"=>"KUBE_POD_INVENTORY_BLOB", - "IPName"=>"ContainerInsights", - "DataItems"=>[record.each{|k,v| record[k]=v}] + "DataType" => "KUBE_POD_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [record.each { |k, v| record[k] = v }], } eventStream.add(emitTime, wrapper) if wrapper - end - end + end + end + # Send container inventory records for containers on windows nodes + winContainerCount += containerInventoryRecords.length + containerInventoryRecords.each do |cirecord| + if !cirecord.nil? + ciwrapper = { + "DataType" => "CONTAINER_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [cirecord.each { |k, v| cirecord[k] = v }], + } + eventStream.add(emitTime, ciwrapper) if ciwrapper + end + end end #podInventory block end + router.emit_stream(@tag, eventStream) if eventStream router.emit_stream(@@MDMKubePodInventoryTag, eventStream) if eventStream if telemetryFlush == true - ApplicationInsightsUtility.sendHeartBeatEvent("KubePodInventory") - ApplicationInsightsUtility.sendMetricTelemetry("PodCount", podInventory['items'].length , {}) - ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", controllerSet.length , {}) + telemetryProperties = {} + telemetryProperties["Computer"] = @@hostName + ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) + ApplicationInsightsUtility.sendMetricTelemetry("PodCount", podInventory["items"].length, {}) + ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", controllerSet.length, {}) + if winContainerCount > 0 + telemetryProperties["ClusterWideWindowsContainersCount"] = winContainerCount + ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) + end @@podTelemetryTimeTracker = DateTime.now.to_time.to_i end - @@istestvar = ENV['ISTEST'] - if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0) + @@istestvar = ENV["ISTEST"] + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) $log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") end - rescue => errorStr + rescue => errorStr $log.warn "Failed in parse_and_emit_record pod inventory: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end #begin block end - end + end #begin block end + end def run_periodic @mutex.lock @@ -257,37 +394,33 @@ def run_periodic def getServiceNameFromLabels(namespace, labels, serviceList) serviceName = "" begin - if !labels.nil? && !labels.empty? - if( !serviceList.nil? && !serviceList.empty? && serviceList.key?("items") && !serviceList['items'].empty?) - serviceList['items'].each do |item| + if !labels.nil? && !labels.empty? + if (!serviceList.nil? && !serviceList.empty? && serviceList.key?("items") && !serviceList["items"].empty?) + serviceList["items"].each do |item| found = 0 - if !item['spec'].nil? && !item['spec']['selector'].nil? && item['metadata']['namespace'] == namespace - selectorLabels = item['spec']['selector'] + if !item["spec"].nil? && !item["spec"]["selector"].nil? && item["metadata"]["namespace"] == namespace + selectorLabels = item["spec"]["selector"] if !selectorLabels.empty? - selectorLabels.each do |key,value| - if !(labels.select {|k,v| k==key && v==value}.length > 0) + selectorLabels.each do |key, value| + if !(labels.select { |k, v| k == key && v == value }.length > 0) break end found = found + 1 end - end + end if found == selectorLabels.length - return item['metadata']['name'] + return item["metadata"]["name"] end - end + end end - end + end end - rescue => errorStr + rescue => errorStr $log.warn "Failed to retrieve service name from labels: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end return serviceName end - end # Kube_Pod_Input - end # module - - diff --git a/source/code/plugin/in_win_cadvisor_perf.rb b/source/code/plugin/in_win_cadvisor_perf.rb new file mode 100644 index 000000000..2e5f839e6 --- /dev/null +++ b/source/code/plugin/in_win_cadvisor_perf.rb @@ -0,0 +1,120 @@ +#!/usr/local/bin/ruby +# frozen_string_literal: true + +module Fluent + class Win_CAdvisor_Perf_Input < Input + Plugin.register_input("wincadvisorperf", self) + + @@winNodes = [] + + def initialize + super + require "yaml" + require "json" + + require_relative "CAdvisorMetricsAPIClient" + require_relative "KubernetesApiClient" + require_relative "oms_common" + require_relative "omslog" + end + + config_param :run_interval, :time, :default => "1m" + config_param :tag, :string, :default => "oms.api.wincadvisorperf" + config_param :mdmtag, :string, :default => "mdm.cadvisorperf" + + def configure(conf) + super + end + + def start + if @run_interval + @finished = false + @condition = ConditionVariable.new + @mutex = Mutex.new + @thread = Thread.new(&method(:run_periodic)) + @@winNodeQueryTimeTracker = DateTime.now.to_time.to_i + @@cleanupRoutineTimeTracker = DateTime.now.to_time.to_i + end + end + + def shutdown + if @run_interval + @mutex.synchronize { + @finished = true + @condition.signal + } + @thread.join + end + end + + def enumerate() + time = Time.now.to_f + begin + eventStream = MultiEventStream.new + timeDifference = (DateTime.now.to_time.to_i - @@winNodeQueryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + + #Resetting this cache so that it is populated with the current set of containers with every call + CAdvisorMetricsAPIClient.resetWinContainerIdCache() + if (timeDifferenceInMinutes >= 5) + $log.info "in_win_cadvisor_perf: Getting windows nodes" + nodes = KubernetesApiClient.getWindowsNodes() + if !nodes.nil? + @@winNodes = KubernetesApiClient.getWindowsNodes() + end + $log.info "in_win_cadvisor_perf : Successuly got windows nodes after 5 minute interval" + @@winNodeQueryTimeTracker = DateTime.now.to_time.to_i + end + @@winNodes.each do |winNode| + metricData = CAdvisorMetricsAPIClient.getMetrics(winNode) + metricData.each do |record| + if !record.empty? + record["DataType"] = "LINUX_PERF_BLOB" + record["IPName"] = "LogManagement" + eventStream.add(time, record) if record + end + end + router.emit_stream(@tag, eventStream) if eventStream + router.emit_stream(@mdmtag, eventStream) if eventStream + + @@istestvar = ENV["ISTEST"] + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) + $log.info("winCAdvisorPerfEmitStreamSuccess @ #{Time.now.utc.iso8601}") + end + end + + # Cleanup routine to clear deleted containers from cache + cleanupTimeDifference = (DateTime.now.to_time.to_i - @@cleanupRoutineTimeTracker).abs + cleanupTimeDifferenceInMinutes = cleanupTimeDifference / 60 + if (cleanupTimeDifferenceInMinutes >= 5) + $log.info "in_win_cadvisor_perf : Cleanup routine kicking in to clear deleted containers from cache" + CAdvisorMetricsAPIClient.clearDeletedWinContainersFromCache() + @@cleanupRoutineTimeTracker = DateTime.now.to_time.to_i + end + rescue => errorStr + $log.warn "Failed to retrieve cadvisor metric data for windows nodes: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + end + end + + def run_periodic + @mutex.lock + done = @finished + until done + @condition.wait(@mutex, @run_interval) + done = @finished + @mutex.unlock + if !done + begin + $log.info("in_win_cadvisor_perf::run_periodic @ #{Time.now.utc.iso8601}") + enumerate + rescue => errorStr + $log.warn "in_win_cadvisor_perf::run_periodic: enumerate Failed to retrieve cadvisor perf metrics for windows nodes: #{errorStr}" + end + end + @mutex.lock + end + @mutex.unlock + end + end # Win_CAdvisor_Perf_Input +end # module diff --git a/source/code/plugin/out_mdm.rb b/source/code/plugin/out_mdm.rb index 93b32ef50..963069858 100644 --- a/source/code/plugin/out_mdm.rb +++ b/source/code/plugin/out_mdm.rb @@ -2,29 +2,27 @@ # frozen_string_literal: true module Fluent - class OutputMDM < BufferedOutput - config_param :retry_mdm_post_wait_minutes, :integer - Plugin.register_output('out_mdm', self) + Plugin.register_output("out_mdm", self) def initialize super - require 'net/http' - require 'net/https' - require 'uri' - require 'json' - require_relative 'KubernetesApiClient' - require_relative 'ApplicationInsightsUtility' + require "net/http" + require "net/https" + require "uri" + require "json" + require_relative "KubernetesApiClient" + require_relative "ApplicationInsightsUtility" - @@token_resource_url = 'https://monitoring.azure.com/' - @@grant_type = 'client_credentials' - @@azure_json_path = '/etc/kubernetes/host/azure.json' + @@token_resource_url = "https://monitoring.azure.com/" + @@grant_type = "client_credentials" + @@azure_json_path = "/etc/kubernetes/host/azure.json" @@post_request_url_template = "https://%{aks_region}.monitoring.azure.com%{aks_resource_id}/metrics" @@token_url_template = "https://login.microsoftonline.com/%{tenant_id}/oauth2/token" @@plugin_name = "AKSCustomMetricsMDM" - + @data_hash = {} @token_url = nil @http_client = nil @@ -50,12 +48,13 @@ def start @can_send_data_to_mdm = false return end - # Handle the case where the file read fails. Send Telemetry and exit the plugin? + # Handle the case where the file read fails. Send Telemetry and exit the plugin? @data_hash = JSON.parse(file) - @token_url = @@token_url_template % {tenant_id: @data_hash['tenantId']} + @token_url = @@token_url_template % {tenant_id: @data_hash["tenantId"]} @cached_access_token = get_access_token - aks_resource_id = ENV['AKS_RESOURCE_ID'] - aks_region = ENV['AKS_REGION'] + aks_resource_id = ENV["AKS_RESOURCE_ID"] + aks_region = ENV["AKS_REGION"] + if aks_resource_id.to_s.empty? @log.info "Environment Variable AKS_RESOURCE_ID is not set.. " @can_send_data_to_mdm = false @@ -77,7 +76,7 @@ def start # get the access token only if the time to expiry is less than 5 minutes def get_access_token - if @cached_access_token.to_s.empty? || (Time.now + 5*60 > @token_expiry_time) # token is valid for 60 minutes. Refresh token 5 minutes from expiration + if @cached_access_token.to_s.empty? || (Time.now + 5 * 60 > @token_expiry_time) # token is valid for 60 minutes. Refresh token 5 minutes from expiration @log.info "Refreshing access token for out_mdm plugin.." token_uri = URI.parse(@token_url) http_access_token = Net::HTTP.new(token_uri.host, token_uri.port) @@ -85,27 +84,27 @@ def get_access_token token_request = Net::HTTP::Post.new(token_uri.request_uri) token_request.set_form_data( { - 'grant_type' => @@grant_type, - 'client_id' => @data_hash['aadClientId'], - 'client_secret' => @data_hash['aadClientSecret'], - 'resource' => @@token_resource_url - } + "grant_type" => @@grant_type, + "client_id" => @data_hash["aadClientId"], + "client_secret" => @data_hash["aadClientSecret"], + "resource" => @@token_resource_url, + } ) - + token_response = http_access_token.request(token_request) - # Handle the case where the response is not 200 + # Handle the case where the response is not 200 parsed_json = JSON.parse(token_response.body) - @token_expiry_time = Time.now + 59*60 # set the expiry time to be ~one hour from current time - @cached_access_token = parsed_json['access_token'] + @token_expiry_time = Time.now + 59 * 60 # set the expiry time to be ~one hour from current time + @cached_access_token = parsed_json["access_token"] end @cached_access_token - end + end def write_status_file(success, message) - fn = '/var/opt/microsoft/omsagent/log/MDMIngestion.status' + fn = "/var/opt/microsoft/omsagent/log/MDMIngestion.status" status = '{ "operation": "MDMIngestion", "success": "%s", "message": "%s" }' % [success, message] begin - File.open(fn,'w') { |file| file.write(status) } + File.open(fn, "w") { |file| file.write(status) } rescue => e @log.debug "Error:'#{e}'" ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) @@ -123,13 +122,13 @@ def format(tag, time, record) end end - # This method is called every flush interval. Send the buffer chunk to MDM. + # This method is called every flush interval. Send the buffer chunk to MDM. # 'chunk' is a buffer chunk that includes multiple formatted records def write(chunk) begin - if (!@first_post_attempt_made || (Time.now > @last_post_attempt_time + retry_mdm_post_wait_minutes*60)) && @can_send_data_to_mdm + if (!@first_post_attempt_made || (Time.now > @last_post_attempt_time + retry_mdm_post_wait_minutes * 60)) && @can_send_data_to_mdm post_body = [] - chunk.msgpack_each {|(tag, record)| + chunk.msgpack_each { |(tag, record)| post_body.push(record.to_json) } send_to_mdm post_body @@ -137,7 +136,7 @@ def write(chunk) if !@can_send_data_to_mdm @log.info "Cannot send data to MDM since all required conditions were not met" else - @log.info "Last Failed POST attempt to MDM was made #{((Time.now - @last_post_attempt_time)/60).round(1)} min ago. This is less than the current retry threshold of #{@retry_mdm_post_wait_minutes} min. NO-OP" + @log.info "Last Failed POST attempt to MDM was made #{((Time.now - @last_post_attempt_time) / 60).round(1)} min ago. This is less than the current retry threshold of #{@retry_mdm_post_wait_minutes} min. NO-OP" end end rescue Exception => e @@ -146,12 +145,12 @@ def write(chunk) end end - def send_to_mdm(post_body) + def send_to_mdm(post_body) begin access_token = get_access_token request = Net::HTTP::Post.new(@post_request_uri.request_uri) - request['Content-Type'] = "application/x-ndjson" - request['Authorization'] = "Bearer #{access_token}" + request["Content-Type"] = "application/x-ndjson" + request["Authorization"] = "Bearer #{access_token}" request.body = post_body.join("\n") response = @http_client.request(request) response.value # this throws for non 200 HTTP response code @@ -166,10 +165,10 @@ def send_to_mdm(post_body) @first_post_attempt_made = true ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) # Not raising exception, as that will cause retries to happen - elsif !response.code.empty? && response.code.start_with?('4') + elsif !response.code.empty? && response.code.start_with?("4") # Log 400 errors and continue @log.info "Non-retryable HTTPServerException when POSTing Metrics to MDM #{e} Response: #{response}" - else + else # raise if the response code is non-400 @log.info "HTTPServerException when POSTing Metrics to MDM #{e} Response: #{response}" raise e @@ -186,7 +185,8 @@ def send_to_mdm(post_body) raise e end end - private + + private class ChunkErrorHandler include Configurable @@ -218,20 +218,20 @@ def router=(r) end def write(chunk) - chunk.msgpack_each {|(tag, record)| + chunk.msgpack_each { |(tag, record)| @error_handlers[tag].emit(record) } end - - private + + private def create_error_handlers(router) nop_handler = NopErrorHandler.new Hash.new() { |hash, tag| etag = OMS::Common.create_error_tag tag hash[tag] = router.match?(etag) ? - ErrorHandler.new(router, etag) : - nop_handler + ErrorHandler.new(router, etag) : + nop_handler } end @@ -251,10 +251,6 @@ def emit(record) # NOP end end - end - end # class OutputMDM - end # module Fluent -