diff --git a/installer/conf/container.conf b/installer/conf/container.conf index 93c250fbb..16acd6353 100644 --- a/installer/conf/container.conf +++ b/installer/conf/container.conf @@ -110,5 +110,19 @@ retry_limit 10 retry_wait 5s max_retry_wait 5m - retry_mdm_post_wait_minutes 60 + retry_mdm_post_wait_minutes 30 + + + + type out_oms + log_level debug + num_threads 5 + buffer_type file + buffer_path %STATE_DIR_WS%/out_oms_insightsmetrics*.buffer + buffer_queue_full_action drop_oldest_chunk + buffer_chunk_limit 4m + flush_interval 20s + retry_limit 10 + retry_wait 5s + max_retry_wait 5m diff --git a/installer/conf/kube.conf b/installer/conf/kube.conf index 207780442..98a2fbb63 100644 --- a/installer/conf/kube.conf +++ b/installer/conf/kube.conf @@ -13,6 +13,7 @@ tag oms.containerinsights.KubePodInventory run_interval 60 log_level debug + custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast #Kubernetes events @@ -47,7 +48,7 @@ log_level debug - + type filter_inventory2mdm custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast log_level info @@ -140,7 +141,7 @@ max_retry_wait 5m - + type out_oms log_level debug num_threads 5 @@ -215,4 +216,19 @@ retry_limit 10 retry_wait 5s max_retry_wait 5m + + + + type out_oms + log_level debug + num_threads 5 + buffer_chunk_limit 4m + buffer_type file + buffer_path %STATE_DIR_WS%/out_oms_insightsmetrics*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 5s + max_retry_wait 5m \ No newline at end of file diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data index 60de5af18..e011dddf9 100644 --- a/installer/datafiles/base_container.data +++ b/installer/datafiles/base_container.data @@ -35,8 +35,10 @@ MAINTAINER: 'Microsoft Corporation' /opt/microsoft/omsagent/plugin/in_win_cadvisor_perf.rb; source/code/plugin/in_win_cadvisor_perf.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_nodes.rb; source/code/plugin/in_kube_nodes.rb; 644; root; root /opt/microsoft/omsagent/plugin/filter_inventory2mdm.rb; source/code/plugin/filter_inventory2mdm.rb; 644; root; root +/opt/microsoft/omsagent/plugin/podinventory_to_mdm.rb; source/code/plugin/podinventory_to_mdm.rb; 644; root; root +/opt/microsoft/omsagent/plugin/kubelet_utils.rb; source/code/plugin/kubelet_utils.rb; 644; root; root /opt/microsoft/omsagent/plugin/CustomMetricsUtils.rb; source/code/plugin/CustomMetricsUtils.rb; 644; root; root - +/opt/microsoft/omsagent/plugin/constants.rb; source/code/plugin/constants.rb; 644; root; root /opt/microsoft/omsagent/plugin/ApplicationInsightsUtility.rb; source/code/plugin/ApplicationInsightsUtility.rb; 644; root; root /opt/microsoft/omsagent/plugin/ContainerInventoryState.rb; source/code/plugin/ContainerInventoryState.rb; 644; root; root diff --git a/installer/scripts/tomlparser.rb b/installer/scripts/tomlparser.rb index ba67d023a..5f2596bca 100644 --- a/installer/scripts/tomlparser.rb +++ b/installer/scripts/tomlparser.rb @@ -16,6 +16,7 @@ @logExclusionRegexPattern = "(^((?!stdout|stderr).)*$)" @excludePath = "*.csv2" #some invalid path @enrichContainerLogs = false +@collectAllKubeEvents = false # Use parser to parse the configmap toml file to a ruby structure def parseConfigMap @@ -128,6 +129,16 @@ def populateSettingValuesFromConfigMap(parsedConfig) rescue => errorStr ConfigParseErrorLogger.logError("Exception while reading config map settings for cluster level container log enrichment - #{errorStr}, using defaults, please check config map for errors") end + + #Get kube events enrichment setting + begin + if !parsedConfig[:log_collection_settings][:collect_all_kube_events].nil? && !parsedConfig[:log_collection_settings][:collect_all_kube_events][:enabled].nil? + @collectAllKubeEvents = parsedConfig[:log_collection_settings][:collect_all_kube_events][:enabled] + puts "config::Using config map setting for kube event collection" + end + rescue => errorStr + ConfigParseErrorLogger.logError("Exception while reading config map settings for kube event collection - #{errorStr}, using defaults, please check config map for errors") + end end end @@ -168,6 +179,7 @@ def populateSettingValuesFromConfigMap(parsedConfig) file.write("export AZMON_CLUSTER_COLLECT_ENV_VAR=#{@collectClusterEnvVariables}\n") file.write("export AZMON_CLUSTER_LOG_TAIL_EXCLUDE_PATH=#{@excludePath}\n") file.write("export AZMON_CLUSTER_CONTAINER_LOG_ENRICH=#{@enrichContainerLogs}\n") + file.write("export AZMON_CLUSTER_COLLECT_ALL_KUBE_EVENTS=#{@collectAllKubeEvents}\n") # Close file after writing all environment variables file.close puts "Both stdout & stderr log collection are turned off for namespaces: '#{@excludePath}' " diff --git a/source/code/plugin/CAdvisorMetricsAPIClient.rb b/source/code/plugin/CAdvisorMetricsAPIClient.rb index 8b0105a6f..53139ea4e 100644 --- a/source/code/plugin/CAdvisorMetricsAPIClient.rb +++ b/source/code/plugin/CAdvisorMetricsAPIClient.rb @@ -13,6 +13,7 @@ class CAdvisorMetricsAPIClient require_relative "oms_common" require_relative "KubernetesApiClient" require_relative "ApplicationInsightsUtility" + require_relative "constants" @configMapMountPath = "/etc/config/settings/log-data-collection-settings" @promConfigMountPath = "/etc/config/settings/prometheus-data-collection-settings" @@ -55,85 +56,58 @@ class CAdvisorMetricsAPIClient # Keeping track of containers so that can delete the container from the container cpu cache when the container is deleted # as a part of the cleanup routine @@winContainerIdCache = [] - + #cadvisor ports + @@CADVISOR_SECURE_PORT = "10250" + @@CADVISOR_NON_SECURE_PORT = "10255" def initialize end class << self def getSummaryStatsFromCAdvisor(winNode) - headers = {} - response = nil - @Log.info "Getting CAdvisor Uri" - begin - cAdvisorSecurePort = false - # Check to see if omsagent needs to use 10255(insecure) port or 10250(secure) port - if !@cAdvisorMetricsSecurePort.nil? && @cAdvisorMetricsSecurePort == "true" - cAdvisorSecurePort = true - end - - cAdvisorUri = getCAdvisorUri(winNode, cAdvisorSecurePort) - bearerToken = File.read("/var/run/secrets/kubernetes.io/serviceaccount/token") - @Log.info "cAdvisorUri: #{cAdvisorUri}" + relativeUri = "/stats/summary" + return getResponse(winNode, relativeUri) + end - if !cAdvisorUri.nil? - uri = URI.parse(cAdvisorUri) - if !!cAdvisorSecurePort == true - Net::HTTP.start(uri.host, uri.port, - :use_ssl => true, :open_timeout => 20, :read_timeout => 40, - :ca_file => "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", - :verify_mode => OpenSSL::SSL::VERIFY_NONE) do |http| - cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri) - cAdvisorApiRequest["Authorization"] = "Bearer #{bearerToken}" - response = http.request(cAdvisorApiRequest) - @Log.info "Got response code #{response.code} from #{uri.request_uri}" - end - else - Net::HTTP.start(uri.host, uri.port, :use_ssl => false, :open_timeout => 20, :read_timeout => 40) do |http| - cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri) - response = http.request(cAdvisorApiRequest) - @Log.info "Got response code #{response.code} from #{uri.request_uri}" - end - end - end - rescue => error - @Log.warn("CAdvisor api request failed: #{error}") - telemetryProps = {} - telemetryProps["Computer"] = winNode["Hostname"] - ApplicationInsightsUtility.sendExceptionTelemetry(error, telemetryProps) - end - return response + def getNodeCapacityFromCAdvisor(winNode: nil) + relativeUri = "/spec/" + return getResponse(winNode, relativeUri) end - def getCAdvisorUri(winNode, cAdvisorSecurePort) - begin + def getBaseCAdvisorUri(winNode) + cAdvisorSecurePort = isCAdvisorOnSecurePort() + if !!cAdvisorSecurePort == true - defaultHost = "https://localhost:10250" + defaultHost = "https://localhost:#{@@CADVISOR_SECURE_PORT}" else - defaultHost = "http://localhost:10255" + defaultHost = "http://localhost:#{@@CADVISOR_NON_SECURE_PORT}" end - relativeUri = "/stats/summary" if !winNode.nil? - nodeIP = winNode["InternalIP"] + nodeIP = winNode["InternalIP"] else - nodeIP = ENV["NODE_IP"] + nodeIP = ENV["NODE_IP"] end + if !nodeIP.nil? - @Log.info("Using #{nodeIP + relativeUri} for CAdvisor Uri") - if !!cAdvisorSecurePort == true - return "https://#{nodeIP}:10250" + relativeUri - else - return "http://#{nodeIP}:10255" + relativeUri - end + @Log.info("Using #{nodeIP} for CAdvisor Host") + if !!cAdvisorSecurePort == true + return "https://#{nodeIP}:#{@@CADVISOR_SECURE_PORT}" + else + return "http://#{nodeIP}:#{@@CADVISOR_NON_SECURE_PORT}" + end else - @Log.warn ("NODE_IP environment variable not set. Using default as : #{defaultHost + relativeUri} ") - if !winNode.nil? - return nil - else - return defaultHost + relativeUri - end + @Log.warn ("NODE_IP environment variable not set. Using default as : #{defaultHost}") + if !winNode.nil? + return nil + else + return defaultHost + end end - end + end + + def getCAdvisorUri(winNode, relativeUri) + baseUri = getBaseCAdvisorUri(winNode) + return baseUri + relativeUri end def getMetrics(winNode: nil, metricTime: Time.now.utc.iso8601) @@ -282,6 +256,101 @@ def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, met return metricItems end + def getInsightsMetrics(winNode: nil, metricTime: Time.now.utc.iso8601) + metricDataItems = [] + begin + cAdvisorStats = getSummaryStatsFromCAdvisor(winNode) + if !cAdvisorStats.nil? + metricInfo = JSON.parse(cAdvisorStats.body) + end + if !winNode.nil? + hostName = winNode["Hostname"] + operatingSystem = "Windows" + else + if !metricInfo.nil? && !metricInfo["node"].nil? && !metricInfo["node"]["nodeName"].nil? + hostName = metricInfo["node"]["nodeName"] + else + hostName = (OMS::Common.get_hostname) + end + operatingSystem = "Linux" + end + if !metricInfo.nil? + metricDataItems.concat(getContainerGpuMetricsAsInsightsMetrics(metricInfo, hostName, "memoryTotal", "containerGpumemoryTotalBytes", metricTime)) + metricDataItems.concat(getContainerGpuMetricsAsInsightsMetrics(metricInfo, hostName, "memoryUsed","containerGpumemoryUsedBytes", metricTime)) + metricDataItems.concat(getContainerGpuMetricsAsInsightsMetrics(metricInfo, hostName, "dutyCycle","containerGpuDutyCycle", metricTime)) + else + @Log.warn("Couldn't get Insights metrics information for host: #{hostName} os:#{operatingSystem}") + end + rescue => error + @Log.warn("CAdvisorMetricsAPIClient::getInsightsMetrics failed: #{error}") + return metricDataItems + end + return metricDataItems + end + + def getContainerGpuMetricsAsInsightsMetrics(metricJSON, hostName, metricNameToCollect, metricNametoReturn, metricPollTime) + metricItems = [] + clusterId = KubernetesApiClient.getClusterId + clusterName = KubernetesApiClient.getClusterName + begin + metricInfo = metricJSON + metricInfo["pods"].each do |pod| + podUid = pod["podRef"]["uid"] + podName = pod["podRef"]["name"] + podNamespace = pod["podRef"]["namespace"] + + if (!pod["containers"].nil?) + pod["containers"].each do |container| + #gpu metrics + if (!container["accelerators"].nil?) + container["accelerators"].each do |accelerator| + if (!accelerator[metricNameToCollect].nil?) #empty check is invalid for non-strings + containerName = container["name"] + metricValue = accelerator[metricNameToCollect] + + + metricItem = {} + metricItem["CollectionTime"] = metricPollTime + metricItem["Computer"] = hostName + metricItem["Name"] = metricNametoReturn + metricItem["Value"] = metricValue + metricItem["Origin"] = Constants::INSIGHTSMETRICS_TAGS_ORIGIN + metricItem["Namespace"] = Constants::INSIGHTSMETRICS_TAGS_GPU_NAMESPACE + + metricTags = {} + metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERID ] = clusterId + metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERNAME] = clusterName + metricTags[Constants::INSIGHTSMETRICS_TAGS_CONTAINER_NAME] = podUid + "/" + containerName + #metricTags[Constants::INSIGHTSMETRICS_TAGS_K8SNAMESPACE] = podNameSpace + + if (!accelerator["make"].nil? && !accelerator["make"].empty?) + metricTags[Constants::INSIGHTSMETRICS_TAGS_GPU_VENDOR] = accelerator["make"] + end + + if (!accelerator["model"].nil? && !accelerator["model"].empty?) + metricTags[Constants::INSIGHTSMETRICS_TAGS_GPU_MODEL] = accelerator["model"] + end + + if (!accelerator["id"].nil? && !accelerator["id"].empty?) + metricTags[Constants::INSIGHTSMETRICS_TAGS_GPU_ID] = accelerator["id"] + end + + metricItem["Tags"] = metricTags + + metricItems.push(metricItem) + end + end + end + end + end + end + rescue => errorStr + @Log.warn("getContainerGpuMetricsAsInsightsMetrics failed: #{errorStr} for metric #{metricNameToCollect}") + return metricItems + end + return metricItems + end + def clearDeletedWinContainersFromCache() begin winCpuUsageNanoSecondsKeys = @@winContainerCpuUsageNanoSecondsLast.keys @@ -696,5 +765,51 @@ def getContainerStartTimeMetricItems(metricJSON, hostName, metricNametoReturn, m end return metricItems end + + def getResponse(winNode, relativeUri) + response = nil + @Log.info "Getting CAdvisor Uri Response" + bearerToken = File.read("/var/run/secrets/kubernetes.io/serviceaccount/token") + begin + cAdvisorUri = getCAdvisorUri(winNode, relativeUri) + @Log.info "cAdvisorUri: #{cAdvisorUri}" + + if !cAdvisorUri.nil? + uri = URI.parse(cAdvisorUri) + if isCAdvisorOnSecurePort() + Net::HTTP.start(uri.host, uri.port, + :use_ssl => true, :open_timeout => 20, :read_timeout => 40, + :ca_file => "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", + :verify_mode => OpenSSL::SSL::VERIFY_NONE) do |http| + cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri) + cAdvisorApiRequest["Authorization"] = "Bearer #{bearerToken}" + response = http.request(cAdvisorApiRequest) + @Log.info "Got response code #{response.code} from #{uri.request_uri}" + end + else + Net::HTTP.start(uri.host, uri.port, :use_ssl => false, :open_timeout => 20, :read_timeout => 40) do |http| + cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri) + response = http.request(cAdvisorApiRequest) + @Log.info "Got response code #{response.code} from #{uri.request_uri}" + end + end + end + rescue => error + @Log.warn("CAdvisor api request for #{cAdvisorUri} failed: #{error}") + telemetryProps = {} + telemetryProps["Computer"] = winNode["Hostname"] + ApplicationInsightsUtility.sendExceptionTelemetry(error, telemetryProps) + end + return response + end + + def isCAdvisorOnSecurePort + cAdvisorSecurePort = false + # Check to see whether omsagent needs to use 10255(insecure) port or 10250(secure) port + if !@cAdvisorMetricsSecurePort.nil? && @cAdvisorMetricsSecurePort == "true" + cAdvisorSecurePort = true + end + return cAdvisorSecurePort + end end end diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index e52c77884..b864ef718 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -10,6 +10,7 @@ class KubernetesApiClient require "time" require_relative "oms_common" + require_relative "constants" @@ApiVersion = "v1" @@ApiVersionApps = "v1" @@ -18,6 +19,7 @@ class KubernetesApiClient @@ClusterName = nil @@ClusterId = nil @@IsNodeMaster = nil + @@IsAROV3Cluster = nil #@@IsValidRunningNode = nil #@@IsLinuxCluster = nil @@KubeSystemNamespace = "kube-system" @@ -152,6 +154,20 @@ def getClusterId return @@ClusterId end + def isAROV3Cluster + return @@IsAROV3Cluster if !@@IsAROV3Cluster.nil? + @@IsAROV3Cluster = false + begin + cluster = getClusterId + if !cluster.nil? && !cluster.empty? && cluster.downcase.include?("/microsoft.containerservice/openshiftmanagedclusters") + @@IsAROV3Cluster = true + end + rescue => error + @Log.warn("KubernetesApiClient::IsAROV3Cluster : IsAROV3Cluster failed #{error}") + end + return @@IsAROV3Cluster + end + def isNodeMaster return @@IsNodeMaster if !@@IsNodeMaster.nil? @@IsNodeMaster = false @@ -177,6 +193,22 @@ def isNodeMaster return @@IsNodeMaster end + def getNodesResourceUri(nodesResourceUri) + begin + # For ARO v3 cluster, filter out all other node roles other than compute + if isAROV3Cluster() + if !nodesResourceUri.nil? && !nodesResourceUri.index("?").nil? + nodesResourceUri = nodesResourceUri + "&labelSelector=node-role.kubernetes.io%2Fcompute%3Dtrue" + else + nodesResourceUri = nodesResourceUri + "labelSelector=node-role.kubernetes.io%2Fcompute%3Dtrue" + end + end + rescue => error + @Log.warn("getNodesResourceUri failed: #{error}") + end + return nodesResourceUri + end + #def isValidRunningNode # return @@IsValidRunningNode if !@@IsValidRunningNode.nil? # @@IsValidRunningNode = false @@ -240,7 +272,8 @@ def getPods(namespace) def getWindowsNodes winNodes = [] begin - nodeInventory = JSON.parse(getKubeResourceInfo("nodes").body) + resourceUri = getNodesResourceUri("nodes") + nodeInventory = JSON.parse(getKubeResourceInfo(resourceUri).body) @Log.info "KubernetesAPIClient::getWindowsNodes : Got nodes from kube api" # Resetting the windows node cache @@WinNodeArray.clear @@ -357,6 +390,14 @@ def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricName podUid = pod["metadata"]["uid"] end + # For ARO, skip the pods scheduled on to master or infra nodes to ingest + if isAROV3Cluster() && !pod["spec"].nil? && !pod["spec"]["nodeName"].nil? && + ( pod["spec"]["nodeName"].downcase.start_with?("infra-") || + pod["spec"]["nodeName"].downcase.start_with?("master-") ) + next + end + + podContainers = [] if !pod["spec"]["containers"].nil? && !pod["spec"]["containers"].empty? podContainers = podContainers + pod["spec"]["containers"] @@ -430,6 +471,87 @@ def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricName return metricItems end #getContainerResourceRequestAndLimits + def getContainerResourceRequestsAndLimitsAsInsightsMetrics(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601) + metricItems = [] + begin + clusterId = getClusterId + clusterName = getClusterName + + metricInfo = metricJSON + metricInfo["items"].each do |pod| + podNameSpace = pod["metadata"]["namespace"] + if podNameSpace.eql?("kube-system") && !pod["metadata"].key?("ownerReferences") + # The above case seems to be the only case where you have horizontal scaling of pods + # but no controller, in which case cAdvisor picks up kubernetes.io/config.hash + # instead of the actual poduid. Since this uid is not being surface into the UX + # its ok to use this. + # Use kubernetes.io/config.hash to be able to correlate with cadvisor data + if pod["metadata"]["annotations"].nil? + next + else + podUid = pod["metadata"]["annotations"]["kubernetes.io/config.hash"] + end + else + podUid = pod["metadata"]["uid"] + end + + podContainers = [] + if !pod["spec"]["containers"].nil? && !pod["spec"]["containers"].empty? + podContainers = podContainers + pod["spec"]["containers"] + end + # Adding init containers to the record list as well. + if !pod["spec"]["initContainers"].nil? && !pod["spec"]["initContainers"].empty? + podContainers = podContainers + pod["spec"]["initContainers"] + end + + if (!podContainers.nil? && !podContainers.empty?) + if (!pod["spec"]["nodeName"].nil?) + nodeName = pod["spec"]["nodeName"] + else + nodeName = "" #unscheduled pod. We still want to collect limits & requests for GPU + end + podContainers.each do |container| + metricValue = nil + containerName = container["name"] + #metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z + if (!container["resources"].nil? && !container["resources"].empty? && !container["resources"][metricCategory].nil? && !container["resources"][metricCategory][metricNameToCollect].nil?) + metricValue = getMetricNumericValue(metricNameToCollect, container["resources"][metricCategory][metricNameToCollect]) + else + #No container level limit for the given metric, so default to node level limit for non-gpu metrics + if (metricNameToCollect.downcase != "nvidia.com/gpu") && (metricNameToCollect.downcase != "amd.com/gpu") + nodeMetricsHashKey = clusterId + "/" + nodeName + "_" + "allocatable" + "_" + metricNameToCollect + metricValue = @@NodeMetrics[nodeMetricsHashKey] + end + end + if (!metricValue.nil?) + metricItem = {} + metricItem["CollectionTime"] = metricTime + metricItem["Computer"] = nodeName + metricItem["Name"] = metricNametoReturn + metricItem["Value"] = metricValue + metricItem["Origin"] = Constants::INSIGHTSMETRICS_TAGS_ORIGIN + metricItem["Namespace"] = Constants::INSIGHTSMETRICS_TAGS_GPU_NAMESPACE + + metricTags = {} + metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERID ] = clusterId + metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERNAME] = clusterName + metricTags[Constants::INSIGHTSMETRICS_TAGS_CONTAINER_NAME] = podUid + "/" + containerName + #metricTags[Constants::INSIGHTSMETRICS_TAGS_K8SNAMESPACE] = podNameSpace + + metricItem["Tags"] = metricTags + + metricItems.push(metricItem) + end + end + end + end + rescue => error + @Log.warn("getcontainerResourceRequestsAndLimitsAsInsightsMetrics failed: #{error} for metric #{metricCategory} #{metricNameToCollect}") + return metricItems + end + return metricItems + end #getContainerResourceRequestAndLimitsAsInsightsMetrics + def parseNodeLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601) metricItems = [] begin @@ -473,6 +595,51 @@ def parseNodeLimits(metricJSON, metricCategory, metricNameToCollect, metricNamet return metricItems end #parseNodeLimits + def parseNodeLimitsAsInsightsMetrics(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601) + metricItems = [] + begin + metricInfo = metricJSON + clusterId = getClusterId + clusterName = getClusterName + #Since we are getting all node data at the same time and kubernetes doesnt specify a timestamp for the capacity and allocation metrics, + #if we are coming up with the time it should be same for all nodes + #metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z + metricInfo["items"].each do |node| + if (!node["status"][metricCategory].nil?) && (!node["status"][metricCategory][metricNameToCollect].nil?) + + # metricCategory can be "capacity" or "allocatable" and metricNameToCollect can be "cpu" or "memory" or "amd.com/gpu" or "nvidia.com/gpu" + metricValue = getMetricNumericValue(metricNameToCollect, node["status"][metricCategory][metricNameToCollect]) + + metricItem = {} + metricItem["CollectionTime"] = metricTime + metricItem["Computer"] = node["metadata"]["name"] + metricItem["Name"] = metricNametoReturn + metricItem["Value"] = metricValue + metricItem["Origin"] = Constants::INSIGHTSMETRICS_TAGS_ORIGIN + metricItem["Namespace"] = Constants::INSIGHTSMETRICS_TAGS_GPU_NAMESPACE + + metricTags = {} + metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERID ] = clusterId + metricTags[Constants::INSIGHTSMETRICS_TAGS_CLUSTERNAME] = clusterName + metricTags[Constants::INSIGHTSMETRICS_TAGS_GPU_VENDOR] = metricNameToCollect + + metricItem["Tags"] = metricTags + + metricItems.push(metricItem) + #push node level metrics (except gpu ones) to a inmem hash so that we can use it looking up at container level. + #Currently if container level cpu & memory limits are not defined we default to node level limits + if (metricNameToCollect.downcase != "nvidia.com/gpu") && (metricNameToCollect.downcase != "amd.com/gpu") + @@NodeMetrics[clusterId + "/" + node["metadata"]["name"] + "_" + metricCategory + "_" + metricNameToCollect] = metricValue + #@Log.info ("Node metric hash: #{@@NodeMetrics}") + end + end + end + rescue => error + @Log.warn("parseNodeLimitsAsInsightsMetrics failed: #{error} for metric #{metricCategory} #{metricNameToCollect}") + end + return metricItems + end + def getMetricNumericValue(metricName, metricVal) metricValue = metricVal.downcase begin @@ -538,6 +705,10 @@ def getMetricNumericValue(metricName, metricVal) else #assuming no units specified, it is cores that we are converting to nanocores (the below conversion will fail for other unsupported 'units') metricValue = Float(metricValue) * 1000.0 ** 3 end + when "nvidia.com/gpu" + metricValue = Float(metricValue) * 1.0 + when "amd.com/gpu" + metricValue = Float(metricValue) * 1.0 else @Log.warn("getMetricNumericValue: Unsupported metric #{metricName}. Returning 0 for metric value") metricValue = 0 diff --git a/source/code/plugin/constants.rb b/source/code/plugin/constants.rb new file mode 100644 index 000000000..20114ea2b --- /dev/null +++ b/source/code/plugin/constants.rb @@ -0,0 +1,15 @@ +class Constants + INSIGHTSMETRICS_TAGS_ORIGIN = "container.azm.ms" + INSIGHTSMETRICS_TAGS_CLUSTERID = "container.azm.ms/clusterId" + INSIGHTSMETRICS_TAGS_CLUSTERNAME = "container.azm.ms/clusterName" + INSIGHTSMETRICS_TAGS_GPU_VENDOR = "gpuVendor" + INSIGHTSMETRICS_TAGS_GPU_NAMESPACE = "container.azm.ms/gpu" + INSIGHTSMETRICS_TAGS_GPU_MODEL = "gpuModel" + INSIGHTSMETRICS_TAGS_GPU_ID = "gpuId" + INSIGHTSMETRICS_TAGS_CONTAINER_NAME = "containerName" + INSIGHTSMETRICS_TAGS_CONTAINER_ID = "containerName" + INSIGHTSMETRICS_TAGS_K8SNAMESPACE = "k8sNamespace" + INSIGHTSMETRICS_TAGS_CONTROLLER_NAME = "controllerName" + INSIGHTSMETRICS_TAGS_CONTROLLER_KIND = "controllerKind" + INSIGHTSMETRICS_FLUENT_TAG = "oms.api.InsightsMetrics" +end \ No newline at end of file diff --git a/source/code/plugin/filter_cadvisor2mdm.rb b/source/code/plugin/filter_cadvisor2mdm.rb index f14a1369b..45f0d9d6f 100644 --- a/source/code/plugin/filter_cadvisor2mdm.rb +++ b/source/code/plugin/filter_cadvisor2mdm.rb @@ -7,6 +7,7 @@ module Fluent require 'yajl/json_gem' require_relative 'oms_common' require_relative 'CustomMetricsUtils' + require_relative 'kubelet_utils' class CAdvisor2MdmFilter < Filter Fluent::Plugin.register_filter('filter_cadvisor2mdm', self) @@ -110,9 +111,10 @@ def filter(tag, time, record) metric_value = record['DataItems'][0]['Collections'][0]['Value'] if counter_name.downcase == @@cpu_usage_nano_cores metric_name = @@cpu_usage_milli_cores - metric_value = metric_value/1000000 + metric_value /= 1000000 #cadvisor record is in nanocores. Convert to mc + @log.info "Metric_value: #{metric_value} CPU Capacity #{@cpu_capacity}" if @cpu_capacity != 0.0 - percentage_metric_value = (metric_value*1000000)*100/@cpu_capacity + percentage_metric_value = (metric_value)*100/@cpu_capacity end end @@ -138,33 +140,42 @@ def filter(tag, time, record) def ensure_cpu_memory_capacity_set - @log.info "ensure_cpu_memory_capacity_set @cpu_capacity #{@cpu_capacity} @memory_capacity #{@memory_capacity}" if @cpu_capacity != 0.0 && @memory_capacity != 0.0 @log.info "CPU And Memory Capacity are already set" return end - begin - nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes?fieldSelector=metadata.name%3D#{@@hostName}").body) - rescue Exception => e - @log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} " - ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) - end - if !nodeInventory.nil? - cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores") - if !cpu_capacity_json.nil? && !cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? - @cpu_capacity = cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] - @log.info "CPU Limit #{@cpu_capacity}" - else - @log.info "Error getting cpu_capacity" + controller_type = ENV["CONTROLLER_TYPE"] + if controller_type.downcase == 'replicaset' + @log.info "ensure_cpu_memory_capacity_set @cpu_capacity #{@cpu_capacity} @memory_capacity #{@memory_capacity}" + + begin + resourceUri = KubernetesApiClient.getNodesResourceUri("nodes?fieldSelector=metadata.name%3D#{@@hostName}") + nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo(resourceUri).body) + rescue Exception => e + @log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} " + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) end - memory_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes") - if !memory_capacity_json.nil? && !memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? - @memory_capacity = memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] - @log.info "Memory Limit #{@memory_capacity}" - else - @log.info "Error getting memory_capacity" + if !nodeInventory.nil? + cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores") + if !cpu_capacity_json.nil? && !cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? + @cpu_capacity = cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] + @log.info "CPU Limit #{@cpu_capacity}" + else + @log.info "Error getting cpu_capacity" + end + memory_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes") + if !memory_capacity_json.nil? && !memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? + @memory_capacity = memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] + @log.info "Memory Limit #{@memory_capacity}" + else + @log.info "Error getting memory_capacity" + end end + elsif controller_type.downcase == 'daemonset' + capacity_from_kubelet = KubeletUtils.get_node_capacity + @cpu_capacity = capacity_from_kubelet[0] + @memory_capacity = capacity_from_kubelet[1] end end diff --git a/source/code/plugin/filter_cadvisor_health_node.rb b/source/code/plugin/filter_cadvisor_health_node.rb index c6280db60..4106b4d82 100644 --- a/source/code/plugin/filter_cadvisor_health_node.rb +++ b/source/code/plugin/filter_cadvisor_health_node.rb @@ -131,13 +131,13 @@ def process_node_cpu_record(record, metric_value) else instance_name = record['DataItems'][0]['InstanceName'] #@log.info "CPU capacity #{@cpu_capacity}" - + metric_value /= 1000000 percent = (metric_value.to_f/@cpu_capacity*100).round(2) #@log.debug "Percentage of CPU limit: #{percent}" state = HealthMonitorUtils.compute_percentage_state(percent, @provider.get_config(MonitorId::NODE_CPU_MONITOR_ID)) #@log.debug "Computed State : #{state}" timestamp = record['DataItems'][0]['Timestamp'] - health_monitor_record = {"timestamp" => timestamp, "state" => state, "details" => {"cpuUsageMillicores" => metric_value/1000000.to_f, "cpuUtilizationPercentage" => percent}} + health_monitor_record = {"timestamp" => timestamp, "state" => state, "details" => {"cpuUsageMillicores" => metric_value, "cpuUtilizationPercentage" => percent}} monitor_instance_id = HealthMonitorUtils.get_monitor_instance_id(monitor_id, [@@clusterId, @@hostName]) # temp = record.nil? ? "Nil" : record["MonitorInstanceId"] diff --git a/source/code/plugin/filter_inventory2mdm.rb b/source/code/plugin/filter_inventory2mdm.rb index 422b4b54a..16f2bb148 100644 --- a/source/code/plugin/filter_inventory2mdm.rb +++ b/source/code/plugin/filter_inventory2mdm.rb @@ -156,7 +156,7 @@ def process_pod_inventory_records(es) no_phase_dim_values_hash = Hash.new total_pod_count = 0 pod_count_by_phase = {} - podUids = {} + podUids = {} record_count = 0 begin records = [] @@ -165,7 +165,7 @@ def process_pod_inventory_records(es) timestamp = record['DataItems'][0]['CollectionTime'] podUid = record['DataItems'][0]['PodUid'] - if podUids.key?(podUid) + if podUids.key?(podUid) #@log.info "pod with #{podUid} already counted" next end diff --git a/source/code/plugin/health/health_monitor_utils.rb b/source/code/plugin/health/health_monitor_utils.rb index 13d1416b1..c23d8824a 100644 --- a/source/code/plugin/health/health_monitor_utils.rb +++ b/source/code/plugin/health/health_monitor_utils.rb @@ -3,6 +3,7 @@ require 'digest' require_relative 'health_model_constants' require 'yajl/json_gem' +require_relative '../kubelet_utils' module HealthModel # static class that provides a bunch of utility methods @@ -161,7 +162,8 @@ def get_resource_subscription(pod_inventory, metric_name, metric_capacity) def get_cluster_cpu_memory_capacity(log, node_inventory: nil) begin if node_inventory.nil? - node_inventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes").body) + resourceUri = KubernetesApiClient.getNodesResourceUri("nodes") + node_inventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo(resourceUri).body) end cluster_cpu_capacity = 0.0 cluster_memory_capacity = 0.0 @@ -207,7 +209,8 @@ def refresh_kubernetes_api_data(log, hostName, force: false) end begin - @@nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes").body) + resourceUri = KubernetesApiClient.getNodesResourceUri("nodes") + @@nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo(resourceUri).body) if !hostName.nil? podInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("pods?fieldSelector=spec.nodeName%3D#{hostName}").body) else @@ -263,49 +266,13 @@ def get_monitor_instance_id(monitor_id, args = []) end def ensure_cpu_memory_capacity_set(log, cpu_capacity, memory_capacity, hostname) - log.info "ensure_cpu_memory_capacity_set cpu_capacity #{cpu_capacity} memory_capacity #{memory_capacity}" if cpu_capacity != 1.0 && memory_capacity != 1.0 log.info "CPU And Memory Capacity are already set" return [cpu_capacity, memory_capacity] end - log.info "CPU and Memory Capacity Not set" - begin - @@nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes").body) - rescue Exception => e - log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} " - ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) - end - if !@@nodeInventory.nil? - cpu_capacity_json = KubernetesApiClient.parseNodeLimits(@@nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores") - if !cpu_capacity_json.nil? - cpu_capacity_json.each do |cpu_info_node| - if !cpu_info_node['DataItems'][0]['Host'].nil? && cpu_info_node['DataItems'][0]['Host'] == hostname - if !cpu_info_node['DataItems'][0]['Collections'][0]['Value'].nil? - cpu_capacity = cpu_info_node['DataItems'][0]['Collections'][0]['Value'] - end - end - end - log.info "CPU Limit #{cpu_capacity}" - else - log.info "Error getting cpu_capacity" - end - memory_capacity_json = KubernetesApiClient.parseNodeLimits(@@nodeInventory, "capacity", "memory", "memoryCapacityBytes") - if !memory_capacity_json.nil? - memory_capacity_json.each do |memory_info_node| - if !memory_info_node['DataItems'][0]['Host'].nil? && memory_info_node['DataItems'][0]['Host'] == hostname - if !memory_info_node['DataItems'][0]['Collections'][0]['Value'].nil? - memory_capacity = memory_info_node['DataItems'][0]['Collections'][0]['Value'] - end - end - end - log.info "memory Limit #{memory_capacity}" - else - log.info "Error getting memory_capacity" - end - return [cpu_capacity, memory_capacity] - end + return KubeletUtils.get_node_capacity end def build_metrics_hash(metrics_to_collect) diff --git a/source/code/plugin/in_cadvisor_perf.rb b/source/code/plugin/in_cadvisor_perf.rb index 96aa66aa1..a44365e9d 100644 --- a/source/code/plugin/in_cadvisor_perf.rb +++ b/source/code/plugin/in_cadvisor_perf.rb @@ -15,6 +15,7 @@ def initialize require_relative "CAdvisorMetricsAPIClient" require_relative "oms_common" require_relative "omslog" + require_relative "constants" end config_param :run_interval, :time, :default => 60 @@ -50,8 +51,10 @@ def enumerate() currentTime = Time.now time = currentTime.to_f batchTime = currentTime.utc.iso8601 + @@istestvar = ENV["ISTEST"] begin eventStream = MultiEventStream.new + insightsMetricsEventStream = MultiEventStream.new metricData = CAdvisorMetricsAPIClient.getMetrics(winNode: nil, metricTime: batchTime ) metricData.each do |record| record["DataType"] = "LINUX_PERF_BLOB" @@ -64,10 +67,38 @@ def enumerate() router.emit_stream(@containerhealthtag, eventStream) if eventStream router.emit_stream(@nodehealthtag, eventStream) if eventStream - @@istestvar = ENV["ISTEST"] + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) $log.info("cAdvisorPerfEmitStreamSuccess @ #{Time.now.utc.iso8601}") end + + #start GPU InsightsMetrics items + begin + containerGPUusageInsightsMetricsDataItems = [] + containerGPUusageInsightsMetricsDataItems.concat(CAdvisorMetricsAPIClient.getInsightsMetrics(winNode: nil, metricTime: batchTime)) + + + containerGPUusageInsightsMetricsDataItems.each do |insightsMetricsRecord| + wrapper = { + "DataType" => "INSIGHTS_METRICS_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [insightsMetricsRecord.each { |k, v| insightsMetricsRecord[k] = v }], + } + insightsMetricsEventStream.add(time, wrapper) if wrapper + end + + router.emit_stream(Constants::INSIGHTSMETRICS_FLUENT_TAG, insightsMetricsEventStream) if insightsMetricsEventStream + + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && insightsMetricsEventStream.count > 0) + $log.info("cAdvisorInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}") + end + rescue => errorStr + $log.warn "Failed when processing GPU Usage metrics in_cadvisor_perf : #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + #end GPU InsightsMetrics items + rescue => errorStr $log.warn "Failed to retrieve cadvisor metric data: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) diff --git a/source/code/plugin/in_kube_events.rb b/source/code/plugin/in_kube_events.rb index 6116cb62d..bb0ab6f05 100644 --- a/source/code/plugin/in_kube_events.rb +++ b/source/code/plugin/in_kube_events.rb @@ -4,7 +4,6 @@ module Fluent class Kube_Event_Input < Input Plugin.register_input("kubeevents", self) - @@KubeEventsStateFile = "/var/opt/microsoft/docker-cimprov/state/KubeEventQueryState.yaml" def initialize @@ -17,9 +16,15 @@ def initialize require_relative "oms_common" require_relative "omslog" require_relative "ApplicationInsightsUtility" - + # 30000 events account to approximately 5MB @EVENTS_CHUNK_SIZE = 30000 + + # Initializing events count for telemetry + @eventsCount = 0 + + # Initilize enable/disable normal event collection + @collectAllKubeEvents = false end config_param :run_interval, :time, :default => 60 @@ -35,6 +40,16 @@ def start @condition = ConditionVariable.new @mutex = Mutex.new @thread = Thread.new(&method(:run_periodic)) + collectAllKubeEventsSetting = ENV["AZMON_CLUSTER_COLLECT_ALL_KUBE_EVENTS"] + if !collectAllKubeEventsSetting.nil? && !collectAllKubeEventsSetting.empty? + if collectAllKubeEventsSetting.casecmp("false") == 0 + @collectAllKubeEvents = false + $log.warn("Normal kube events collection disabled for cluster") + else + @collectAllKubeEvents = true + $log.warn("Normal kube events collection enabled for cluster") + end + end end end @@ -55,11 +70,16 @@ def enumerate batchTime = currentTime.utc.iso8601 eventQueryState = getEventQueryState newEventQueryState = [] + @eventsCount = 0 # Initializing continuation token to nil continuationToken = nil $log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}") - continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}") + if @collectAllKubeEvents + continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?limit=#{@EVENTS_CHUNK_SIZE}") + else + continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}") + end $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}") if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?) newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime) @@ -80,6 +100,13 @@ def enumerate # Setting this to nil so that we dont hold memory until GC kicks in eventList = nil writeEventQueryState(newEventQueryState) + + # Flush AppInsights telemetry once all the processing is done, only if the number of events flushed is greater than 0 + if (@eventsCount > 0) + telemetryProperties = {} + telemetryProperties["CollectAllKubeEvents"] = @collectAllKubeEvents + ApplicationInsightsUtility.sendMetricTelemetry("EventCount", @eventsCount, {}) + end rescue => errorStr $log.warn "in_kube_events::enumerate:Failed in enumerate: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) @@ -101,6 +128,14 @@ def parse_and_emit_records(events, eventQueryState, newEventQueryState, batchTim if !eventQueryState.empty? && eventQueryState.include?(eventId) next end + + nodeName = items["source"].key?("host") ? items["source"]["host"] : (OMS::Common.get_hostname) + # For ARO v3 cluster, drop the master and infra node sourced events to ingest + if KubernetesApiClient.isAROV3Cluster && !nodeName.nil? && !nodeName.empty? && + (nodeName.downcase.start_with?("infra-") || nodeName.downcase.start_with?("master-")) + next + end + record["ObjectKind"] = items["involvedObject"]["kind"] record["Namespace"] = items["involvedObject"]["namespace"] record["Name"] = items["involvedObject"]["name"] @@ -112,11 +147,7 @@ def parse_and_emit_records(events, eventQueryState, newEventQueryState, batchTim record["FirstSeen"] = items["firstTimestamp"] record["LastSeen"] = items["lastTimestamp"] record["Count"] = items["count"] - if items["source"].key?("host") - record["Computer"] = items["source"]["host"] - else - record["Computer"] = (OMS::Common.get_hostname) - end + record["Computer"] = nodeName record["ClusterName"] = KubernetesApiClient.getClusterName record["ClusterId"] = KubernetesApiClient.getClusterId wrapper = { @@ -125,6 +156,7 @@ def parse_and_emit_records(events, eventQueryState, newEventQueryState, batchTim "DataItems" => [record.each { |k, v| record[k] = v }], } eventStream.add(emitTime, wrapper) if wrapper + @eventsCount += 1 end router.emit_stream(@tag, eventStream) if eventStream rescue => errorStr diff --git a/source/code/plugin/in_kube_health.rb b/source/code/plugin/in_kube_health.rb index 0eebf395b..f9b211f11 100644 --- a/source/code/plugin/in_kube_health.rb +++ b/source/code/plugin/in_kube_health.rb @@ -85,17 +85,26 @@ def enumerate #HealthMonitorUtils.refresh_kubernetes_api_data(@@hmlog, nil) # we do this so that if the call fails, we get a response code/header etc. - node_inventory_response = KubernetesApiClient.getKubeResourceInfo("nodes") - node_inventory = Yajl::Parser.parse(StringIO.new(node_inventory_response.body)) + resourceUri = KubernetesApiClient.getNodesResourceUri("nodes") + node_inventory_response = KubernetesApiClient.getKubeResourceInfo(resourceUri) + if !node_inventory_response.nil? && !node_inventory_response.body.nil? + node_inventory = Yajl::Parser.parse(StringIO.new(node_inventory_response.body)) + @resources.node_inventory = node_inventory + end + pod_inventory_response = KubernetesApiClient.getKubeResourceInfo("pods?fieldSelector=metadata.namespace%3D#{@@KubeInfraNamespace}") - pod_inventory = Yajl::Parser.parse(StringIO.new(pod_inventory_response.body)) + if !pod_inventory_response.nil? && !pod_inventory_response.body.nil? + pod_inventory = Yajl::Parser.parse(StringIO.new(pod_inventory_response.body)) + @resources.pod_inventory = pod_inventory + @resources.build_pod_uid_lookup + end + replicaset_inventory_response = KubernetesApiClient.getKubeResourceInfo("replicasets?fieldSelector=metadata.namespace%3D#{@@KubeInfraNamespace}", api_group: @@ApiGroupApps) - replicaset_inventory = Yajl::Parser.parse(StringIO.new(replicaset_inventory_response.body)) + if !replicaset_inventory_response.nil? && !replicaset_inventory_response.body.nil? + replicaset_inventory = Yajl::Parser.parse(StringIO.new(replicaset_inventory_response.body)) + @resources.set_replicaset_inventory(replicaset_inventory) + end - @resources.node_inventory = node_inventory - @resources.pod_inventory = pod_inventory - @resources.set_replicaset_inventory(replicaset_inventory) - @resources.build_pod_uid_lookup if node_inventory_response.code.to_i != 200 record = process_kube_api_up_monitor("fail", node_inventory_response) @@ -299,7 +308,8 @@ def process_node_condition_monitor(node_inventory) def initialize_inventory #this is required because there are other components, like the container cpu memory aggregator, that depends on the mapping being initialized - node_inventory_response = KubernetesApiClient.getKubeResourceInfo("nodes") + resourceUri = KubernetesApiClient.getNodesResourceUri("nodes") + node_inventory_response = KubernetesApiClient.getKubeResourceInfo(resourceUri) node_inventory = Yajl::Parser.parse(StringIO.new(node_inventory_response.body)) pod_inventory_response = KubernetesApiClient.getKubeResourceInfo("pods?fieldSelector=metadata.namespace%3D#{@@KubeInfraNamespace}") pod_inventory = Yajl::Parser.parse(StringIO.new(pod_inventory_response.body)) diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index fa0994f43..4242a8dba 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -31,6 +31,7 @@ def initialize require_relative "oms_common" require_relative "omslog" @NODES_CHUNK_SIZE = "400" + require_relative "constants" end config_param :run_interval, :time, :default => 60 @@ -69,7 +70,9 @@ def enumerate # Initializing continuation token to nil continuationToken = nil $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") - continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken("nodes?limit=#{@NODES_CHUNK_SIZE}") + resourceUri = KubernetesApiClient.getNodesResourceUri("nodes?limit=#{@NODES_CHUNK_SIZE}") + continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken(resourceUri) + $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?) parse_and_emit_records(nodeInventory, batchTime) @@ -79,7 +82,7 @@ def enumerate #If we receive a continuation token, make calls, process and flush data until we have processed all data while (!continuationToken.nil? && !continuationToken.empty?) - continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken("nodes?limit=#{@NODES_CHUNK_SIZE}&continue=#{continuationToken}") + continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken(resourceUri + "&continue=#{continuationToken}") if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?) parse_and_emit_records(nodeInventory, batchTime) else @@ -103,6 +106,8 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) telemetrySent = false eventStream = MultiEventStream.new containerNodeInventoryEventStream = MultiEventStream.new + insightsMetricsEventStream = MultiEventStream.new + @@istestvar = ENV["ISTEST"] #get node inventory nodeInventory["items"].each do |items| record = {} @@ -191,6 +196,20 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) capacityInfo = items["status"]["capacity"] ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties) + begin + if (!capacityInfo["nvidia.com/gpu"].nil?) && (!capacityInfo["nvidia.com/gpu"].empty?) + properties["nvigpus"] = capacityInfo["nvidia.com/gpu"] + end + + if (!capacityInfo["amd.com/gpu"].nil?) && (!capacityInfo["amd.com/gpu"].empty?) + properties["amdgpus"] = capacityInfo["amd.com/gpu"] + end + rescue => errorStr + $log.warn "Failed in getting GPU telemetry in_kube_nodes : #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + #telemetry about prometheus metric collections settings for replicaset if (File.file?(@@promConfigMountPath)) properties["rsPromInt"] = @@rsPromInterval @@ -211,7 +230,7 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) if telemetrySent == true @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i end - @@istestvar = ENV["ISTEST"] + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") end @@ -235,6 +254,35 @@ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) end #end router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream + + #start GPU InsightsMetrics items + begin + nodeGPUInsightsMetricsDataItems = [] + nodeGPUInsightsMetricsDataItems.concat(KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(nodeInventory, "allocatable", "nvidia.com/gpu", "nodeGpuAllocatable", batchTime)) + nodeGPUInsightsMetricsDataItems.concat(KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(nodeInventory, "capacity", "nvidia.com/gpu", "nodeGpuCapacity", batchTime)) + + nodeGPUInsightsMetricsDataItems.concat(KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(nodeInventory, "allocatable", "amd.com/gpu", "nodeGpuAllocatable", batchTime)) + nodeGPUInsightsMetricsDataItems.concat(KubernetesApiClient.parseNodeLimitsAsInsightsMetrics(nodeInventory, "capacity", "amd.com/gpu", "nodeGpuCapacity", batchTime)) + + nodeGPUInsightsMetricsDataItems.each do |insightsMetricsRecord| + wrapper = { + "DataType" => "INSIGHTS_METRICS_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [insightsMetricsRecord.each { |k, v| insightsMetricsRecord[k] = v }], + } + insightsMetricsEventStream.add(emitTime, wrapper) if wrapper + end + + router.emit_stream(Constants::INSIGHTSMETRICS_FLUENT_TAG, insightsMetricsEventStream) if insightsMetricsEventStream + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && insightsMetricsEventStream.count > 0) + $log.info("kubeNodeInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}") + end + rescue => errorStr + $log.warn "Failed when processing GPU metrics in_kube_nodes : #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + #end GPU InsightsMetrics items rescue => errorStr $log.warn "Failed in enumerate for KubePerf from in_kube_nodes : #{errorStr}" $log.debug_backtrace(errorStr.backtrace) diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index 28b20bfc0..29438d076 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -2,6 +2,8 @@ # frozen_string_literal: true module Fluent + require_relative "podinventory_to_mdm" + class Kube_PodInventory_Input < Input Plugin.register_input("kubepodinventory", self) @@ -22,6 +24,7 @@ def initialize require_relative "ApplicationInsightsUtility" require_relative "oms_common" require_relative "omslog" + require_relative "constants" @PODS_CHUNK_SIZE = "1500" @podCount = 0 @@ -32,9 +35,11 @@ def initialize config_param :run_interval, :time, :default => 60 config_param :tag, :string, :default => "oms.containerinsights.KubePodInventory" + config_param :custom_metrics_azure_regions, :string def configure(conf) super + @inventoryToMdmConvertor = Inventory2MdmConvertor.new(@custom_metrics_azure_regions) end def start @@ -87,7 +92,7 @@ def enumerate(podList = nil) continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}") $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?) - parse_and_emit_records(podInventory, serviceList, batchTime) + parse_and_emit_records(podInventory, serviceList, continuationToken, batchTime) else $log.warn "in_kube_podinventory::enumerate:Received empty podInventory" end @@ -96,7 +101,7 @@ def enumerate(podList = nil) while (!continuationToken.nil? && !continuationToken.empty?) continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?) - parse_and_emit_records(podInventory, serviceList, batchTime) + parse_and_emit_records(podInventory, serviceList, continuationToken, batchTime) else $log.warn "in_kube_podinventory::enumerate:Received empty podInventory" end @@ -143,18 +148,25 @@ def populateWindowsContainerInventoryRecord(container, record, containerEnvVaria containerInventoryRecord["Computer"] = record["Computer"] containerInventoryRecord["ContainerHostname"] = record["Computer"] containerInventoryRecord["ElementName"] = containerName - image = container["image"] - repoInfo = image.split("/") - if !repoInfo.nil? - containerInventoryRecord["Repository"] = repoInfo[0] - if !repoInfo[1].nil? - imageInfo = repoInfo[1].split(":") - if !imageInfo.nil? - containerInventoryRecord["Image"] = imageInfo[0] - containerInventoryRecord["ImageTag"] = imageInfo[1] + + # Find delimiters in the string of format repository/image:imagetag + imageValue = container["image"] + if !imageValue.empty? + slashLocation = imageValue.index("/") + colonLocation = imageValue.index(":") + if !colonLocation.nil? + if slashLocation.nil? + # image:imagetag + containerInventoryRecord["Image"] = imageValue[0..(colonLocation - 1)] + else + # repository/image:imagetag + containerInventoryRecord["Repository"] = imageValue[0..(slashLocation - 1)] + containerInventoryRecord["Image"] = imageValue[(slashLocation + 1)..(colonLocation - 1)] end + containerInventoryRecord["ImageTag"] = imageValue[(colonLocation + 1)..-1] end end + imageIdInfo = container["imageID"] imageIdSplitInfo = imageIdInfo.split("@") if !imageIdSplitInfo.nil? @@ -246,11 +258,12 @@ def getContainerEnvironmentVariables(pod, clusterCollectEnvironmentVar) end end - def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8601) + def parse_and_emit_records(podInventory, serviceList, continuationToken, batchTime = Time.utc.iso8601) currentTime = Time.now emitTime = currentTime.to_f #batchTime = currentTime.utc.iso8601 eventStream = MultiEventStream.new + @@istestvar = ENV["ISTEST"] begin #begin block start # Getting windows nodes from kubeapi @@ -265,6 +278,13 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso86 record["Name"] = items["metadata"]["name"] podNameSpace = items["metadata"]["namespace"] + # For ARO v3 cluster, skip the pods scheduled on to master or infra nodes + if KubernetesApiClient.isAROV3Cluster && !items["spec"].nil? && !items["spec"]["nodeName"].nil? && + (items["spec"]["nodeName"].downcase.start_with?("infra-") || + items["spec"]["nodeName"].downcase.start_with?("master-")) + next + end + if podNameSpace.eql?("kube-system") && !items["metadata"].key?("ownerReferences") # The above case seems to be the only case where you have horizontal scaling of pods # but no controller, in which case cAdvisor picks up kubernetes.io/config.hash @@ -459,6 +479,7 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso86 "DataItems" => [record.each { |k, v| record[k] = v }], } eventStream.add(emitTime, wrapper) if wrapper + @inventoryToMdmConvertor.process_pod_inventory_record(wrapper) end end # Send container inventory records for containers on windows nodes @@ -476,7 +497,18 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso86 end #podInventory block end router.emit_stream(@tag, eventStream) if eventStream - router.emit_stream(@@MDMKubePodInventoryTag, eventStream) if eventStream + + if continuationToken.nil? #no more chunks in this batch to be sent, get all pod inventory records to send + @log.info "Sending pod inventory mdm records to out_mdm" + pod_inventory_mdm_records = @inventoryToMdmConvertor.get_pod_inventory_mdm_records(batchTime) + @log.info "pod_inventory_mdm_records.size #{pod_inventory_mdm_records.size}" + mdm_pod_inventory_es = MultiEventStream.new + pod_inventory_mdm_records.each { |pod_inventory_mdm_record| + mdm_pod_inventory_es.add(batchTime, pod_inventory_mdm_record) if pod_inventory_mdm_record + } if pod_inventory_mdm_records + router.emit_stream(@@MDMKubePodInventoryTag, mdm_pod_inventory_es) if mdm_pod_inventory_es + end + #:optimize:kubeperf merge begin #if(!podInventory.empty?) @@ -488,6 +520,7 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso86 containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "memory", "memoryLimitBytes", batchTime)) kubePerfEventStream = MultiEventStream.new + insightsMetricsEventStream = MultiEventStream.new containerMetricDataItems.each do |record| record["DataType"] = "LINUX_PERF_BLOB" @@ -496,6 +529,38 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso86 end #end router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream + + begin + #start GPU InsightsMetrics items + + containerGPUInsightsMetricsDataItems = [] + containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(podInventory, "requests", "nvidia.com/gpu", "containerGpuRequests", batchTime)) + containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(podInventory, "limits", "nvidia.com/gpu", "containerGpuLimits", batchTime)) + + containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(podInventory, "requests", "amd.com/gpu", "containerGpuRequests", batchTime)) + containerGPUInsightsMetricsDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimitsAsInsightsMetrics(podInventory, "limits", "amd.com/gpu", "containerGpuLimits", batchTime)) + + containerGPUInsightsMetricsDataItems.each do |insightsMetricsRecord| + wrapper = { + "DataType" => "INSIGHTS_METRICS_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [insightsMetricsRecord.each { |k, v| insightsMetricsRecord[k] = v }], + } + insightsMetricsEventStream.add(emitTime, wrapper) if wrapper + + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && insightsMetricsEventStream.count > 0) + $log.info("kubePodInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}") + end + + end + + router.emit_stream(Constants::INSIGHTSMETRICS_FLUENT_TAG, insightsMetricsEventStream) if insightsMetricsEventStream + #end GPU InsightsMetrics items + rescue => errorStr + $log.warn "Failed when processing GPU metrics in_kube_podinventory : #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end rescue => errorStr $log.warn "Failed in parse_and_emit_record for KubePerf from in_kube_podinventory : #{errorStr}" $log.debug_backtrace(errorStr.backtrace) @@ -537,7 +602,7 @@ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso86 #Updating value for AppInsights telemetry @podCount += podInventory["items"].length - @@istestvar = ENV["ISTEST"] + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) $log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") end diff --git a/source/code/plugin/in_win_cadvisor_perf.rb b/source/code/plugin/in_win_cadvisor_perf.rb index 695a686cf..38868f2f5 100644 --- a/source/code/plugin/in_win_cadvisor_perf.rb +++ b/source/code/plugin/in_win_cadvisor_perf.rb @@ -17,6 +17,7 @@ def initialize require_relative "KubernetesApiClient" require_relative "oms_common" require_relative "omslog" + require_relative "constants" end config_param :run_interval, :time, :default => 60 @@ -52,8 +53,10 @@ def enumerate() time = Time.now.to_f begin eventStream = MultiEventStream.new + insightsMetricsEventStream = MultiEventStream.new timeDifference = (DateTime.now.to_time.to_i - @@winNodeQueryTimeTracker).abs timeDifferenceInMinutes = timeDifference / 60 + @@istestvar = ENV["ISTEST"] #Resetting this cache so that it is populated with the current set of containers with every call CAdvisorMetricsAPIClient.resetWinContainerIdCache() @@ -78,10 +81,36 @@ def enumerate() router.emit_stream(@tag, eventStream) if eventStream router.emit_stream(@mdmtag, eventStream) if eventStream - @@istestvar = ENV["ISTEST"] + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) $log.info("winCAdvisorPerfEmitStreamSuccess @ #{Time.now.utc.iso8601}") end + + #start GPU InsightsMetrics items + begin + containerGPUusageInsightsMetricsDataItems = [] + containerGPUusageInsightsMetricsDataItems.concat(CAdvisorMetricsAPIClient.getInsightsMetrics(winNode: winNode, metricTime: Time.now.utc.iso8601)) + + containerGPUusageInsightsMetricsDataItems.each do |insightsMetricsRecord| + wrapper = { + "DataType" => "INSIGHTS_METRICS_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [insightsMetricsRecord.each { |k, v| insightsMetricsRecord[k] = v }], + } + insightsMetricsEventStream.add(time, wrapper) if wrapper + end + + router.emit_stream(Constants::INSIGHTSMETRICS_FLUENT_TAG, insightsMetricsEventStream) if insightsMetricsEventStream + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && insightsMetricsEventStream.count > 0) + $log.info("winCAdvisorInsightsMetricsEmitStreamSuccess @ #{Time.now.utc.iso8601}") + end + rescue => errorStr + $log.warn "Failed when processing GPU Usage metrics in_win_cadvisor_perf : #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + #end GPU InsightsMetrics items + end # Cleanup routine to clear deleted containers from cache diff --git a/source/code/plugin/kubelet_utils.rb b/source/code/plugin/kubelet_utils.rb new file mode 100644 index 000000000..6d97e30a9 --- /dev/null +++ b/source/code/plugin/kubelet_utils.rb @@ -0,0 +1,23 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. +#!/usr/local/bin/ruby +# frozen_string_literal: true + +require_relative 'CAdvisorMetricsAPIClient' + +class KubeletUtils + class << self + def get_node_capacity + + cpu_capacity = 1.0 + memory_capacity = 1.0 + + response = CAdvisorMetricsAPIClient.getNodeCapacityFromCAdvisor(winNode: nil) + if !response.nil? && !response.body.nil? + cpu_capacity = JSON.parse(response.body)["num_cores"].nil? ? 1.0 : (JSON.parse(response.body)["num_cores"] * 1000.0) + memory_capacity = JSON.parse(response.body)["memory_capacity"].nil? ? 1.0 : JSON.parse(response.body)["memory_capacity"].to_f + $log.info "CPU = #{cpu_capacity}mc Memory = #{memory_capacity/1024/1024}MB" + return [cpu_capacity, memory_capacity] + end + end + end +end \ No newline at end of file diff --git a/source/code/plugin/out_mdm.rb b/source/code/plugin/out_mdm.rb index 0a4e601b2..243251bca 100644 --- a/source/code/plugin/out_mdm.rb +++ b/source/code/plugin/out_mdm.rb @@ -12,7 +12,7 @@ def initialize require "net/http" require "net/https" require "uri" - require 'yajl/json_gem' + require "yajl/json_gem" require_relative "KubernetesApiClient" require_relative "ApplicationInsightsUtility" @@ -20,11 +20,19 @@ def initialize @@grant_type = "client_credentials" @@azure_json_path = "/etc/kubernetes/host/azure.json" @@post_request_url_template = "https://%{aks_region}.monitoring.azure.com%{aks_resource_id}/metrics" - @@token_url_template = "https://login.microsoftonline.com/%{tenant_id}/oauth2/token" + @@aad_token_url_template = "https://login.microsoftonline.com/%{tenant_id}/oauth2/token" + + # msiEndpoint is the well known endpoint for getting MSI authentications tokens + @@msi_endpoint_template = "http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&client_id=%{user_assigned_client_id}&resource=%{resource}" + @@userAssignedClientId = ENV["USER_ASSIGNED_IDENTITY_CLIENT_ID"] + @@plugin_name = "AKSCustomMetricsMDM" + @@record_batch_size = 2600 + + @@tokenRefreshBackoffInterval = 30 @data_hash = {} - @token_url = nil + @parsed_token_uri = nil @http_client = nil @token_expiry_time = Time.now @cached_access_token = String.new @@ -32,6 +40,10 @@ def initialize @first_post_attempt_made = false @can_send_data_to_mdm = true @last_telemetry_sent_time = nil + # Setting useMsi to false by default + @useMsi = false + + @get_access_token_backoff_expiry = Time.now end def configure(conf) @@ -56,51 +68,102 @@ def start @log.info "Environment Variable AKS_REGION is not set.. " @can_send_data_to_mdm = false else - aks_region = aks_region.gsub(" ","") + aks_region = aks_region.gsub(" ", "") end if @can_send_data_to_mdm @log.info "MDM Metrics supported in #{aks_region} region" - @token_url = @@token_url_template % {tenant_id: @data_hash["tenantId"]} - @cached_access_token = get_access_token + @@post_request_url = @@post_request_url_template % {aks_region: aks_region, aks_resource_id: aks_resource_id} @post_request_uri = URI.parse(@@post_request_url) @http_client = Net::HTTP.new(@post_request_uri.host, @post_request_uri.port) @http_client.use_ssl = true @log.info "POST Request url: #{@@post_request_url}" ApplicationInsightsUtility.sendCustomEvent("AKSCustomMetricsMDMPluginStart", {}) + + # Check to see if SP exists, if it does use SP. Else, use msi + sp_client_id = @data_hash["aadClientId"] + sp_client_secret = @data_hash["aadClientSecret"] + + if (!sp_client_id.nil? && !sp_client_id.empty? && sp_client_id.downcase != "msi") + @useMsi = false + aad_token_url = @@aad_token_url_template % {tenant_id: @data_hash["tenantId"]} + @parsed_token_uri = URI.parse(aad_token_url) + else + @useMsi = true + msi_endpoint = @@msi_endpoint_template % {user_assigned_client_id: @@userAssignedClientId, resource: @@token_resource_url} + @parsed_token_uri = URI.parse(msi_endpoint) + end + + @cached_access_token = get_access_token end rescue => e @log.info "exception when initializing out_mdm #{e}" ApplicationInsightsUtility.sendExceptionTelemetry(e, {"FeatureArea" => "MDM"}) - @can_send_data_to_mdm = false return end - end - # get the access token only if the time to expiry is less than 5 minutes + # get the access token only if the time to expiry is less than 5 minutes and get_access_token_backoff has expired def get_access_token - if @cached_access_token.to_s.empty? || (Time.now + 5 * 60 > @token_expiry_time) # token is valid for 60 minutes. Refresh token 5 minutes from expiration - @log.info "Refreshing access token for out_mdm plugin.." - token_uri = URI.parse(@token_url) - http_access_token = Net::HTTP.new(token_uri.host, token_uri.port) - http_access_token.use_ssl = true - token_request = Net::HTTP::Post.new(token_uri.request_uri) - token_request.set_form_data( - { - "grant_type" => @@grant_type, - "client_id" => @data_hash["aadClientId"], - "client_secret" => @data_hash["aadClientSecret"], - "resource" => @@token_resource_url, - } - ) - - token_response = http_access_token.request(token_request) - # Handle the case where the response is not 200 - parsed_json = JSON.parse(token_response.body) - @token_expiry_time = Time.now + 59 * 60 # set the expiry time to be ~one hour from current time - @cached_access_token = parsed_json["access_token"] + if (Time.now > @get_access_token_backoff_expiry) + http_access_token = nil + retries = 0 + begin + if @cached_access_token.to_s.empty? || (Time.now + 5 * 60 > @token_expiry_time) # Refresh token 5 minutes from expiration + @log.info "Refreshing access token for out_mdm plugin.." + + if (!!@useMsi) + @log.info "Using msi to get the token to post MDM data" + ApplicationInsightsUtility.sendCustomEvent("AKSCustomMetricsMDMToken-MSI", {}) + @log.info "Opening TCP connection" + http_access_token = Net::HTTP.start(@parsed_token_uri.host, @parsed_token_uri.port, :use_ssl => false) + # http_access_token.use_ssl = false + token_request = Net::HTTP::Get.new(@parsed_token_uri.request_uri) + token_request["Metadata"] = true + else + @log.info "Using SP to get the token to post MDM data" + ApplicationInsightsUtility.sendCustomEvent("AKSCustomMetricsMDMToken-SP", {}) + @log.info "Opening TCP connection" + http_access_token = Net::HTTP.start(@parsed_token_uri.host, @parsed_token_uri.port, :use_ssl => true) + # http_access_token.use_ssl = true + token_request = Net::HTTP::Post.new(@parsed_token_uri.request_uri) + token_request.set_form_data( + { + "grant_type" => @@grant_type, + "client_id" => @data_hash["aadClientId"], + "client_secret" => @data_hash["aadClientSecret"], + "resource" => @@token_resource_url, + } + ) + end + + @log.info "making request to get token.." + token_response = http_access_token.request(token_request) + # Handle the case where the response is not 200 + parsed_json = JSON.parse(token_response.body) + @token_expiry_time = Time.now + @@tokenRefreshBackoffInterval * 60 # set the expiry time to be ~thirty minutes from current time + @cached_access_token = parsed_json["access_token"] + @log.info "Successfully got access token" + end + rescue => err + @log.info "Exception in get_access_token: #{err}" + if (retries < 2) + retries += 1 + @log.info "Retrying request to get token - retry number: #{retries}" + sleep(retries) + retry + else + @get_access_token_backoff_expiry = Time.now + @@tokenRefreshBackoffInterval * 60 + @log.info "@get_access_token_backoff_expiry set to #{@get_access_token_backoff_expiry}" + ApplicationInsightsUtility.sendExceptionTelemetry(err, {"FeatureArea" => "MDM"}) + end + ensure + if http_access_token + @log.info "Closing http connection" + http_access_token.finish + end + end end @cached_access_token end @@ -136,7 +199,14 @@ def write(chunk) chunk.msgpack_each { |(tag, record)| post_body.push(record.to_json) } - send_to_mdm post_body + # the limit of the payload is 1MB. Each record is ~300 bytes. using a batch size of 2600, so that + # the pay load size becomes approximately 800 Kb. + count = post_body.size + while count > 0 + current_batch = post_body.first(@@record_batch_size) + count -= current_batch.size + send_to_mdm current_batch + end else if !@can_send_data_to_mdm @log.info "Cannot send data to MDM since all required conditions were not met" @@ -157,15 +227,16 @@ def send_to_mdm(post_body) request = Net::HTTP::Post.new(@post_request_uri.request_uri) request["Content-Type"] = "application/x-ndjson" request["Authorization"] = "Bearer #{access_token}" + request.body = post_body.join("\n") + @log.info "REQUEST BODY SIZE #{request.body.bytesize/1024}" response = @http_client.request(request) response.value # this throws for non 200 HTTP response code @log.info "HTTP Post Response Code : #{response.code}" if @last_telemetry_sent_time.nil? || @last_telemetry_sent_time + 60 * 60 < Time.now - ApplicationInsightsUtility.sendCustomEvent("AKSCustomMetricsMDMSendSuccessful", {}) - @last_telemetry_sent_time = Time.now + ApplicationInsightsUtility.sendCustomEvent("AKSCustomMetricsMDMSendSuccessful", {}) + @last_telemetry_sent_time = Time.now end - rescue Net::HTTPServerException => e @log.info "Failed to Post Metrics to MDM : #{e} Response: #{response}" @log.debug_backtrace(e.backtrace) diff --git a/source/code/plugin/podinventory_to_mdm.rb b/source/code/plugin/podinventory_to_mdm.rb new file mode 100644 index 000000000..21ef12c34 --- /dev/null +++ b/source/code/plugin/podinventory_to_mdm.rb @@ -0,0 +1,190 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. + +# frozen_string_literal: true + +require 'logger' +require 'yajl/json_gem' +require_relative 'oms_common' +require_relative 'CustomMetricsUtils' + + +class Inventory2MdmConvertor + + @@node_count_metric_name = 'nodesCount' + @@pod_count_metric_name = 'podCount' + @@pod_inventory_tag = 'mdm.kubepodinventory' + @@node_inventory_tag = 'mdm.kubenodeinventory' + @@node_status_ready = 'Ready' + @@node_status_not_ready = 'NotReady' + + @@node_inventory_custom_metrics_template = ' + { + "time": "%{timestamp}", + "data": { + "baseData": { + "metric": "%{metricName}", + "namespace": "insights.container/nodes", + "dimNames": [ + "status" + ], + "series": [ + { + "dimValues": [ + "%{statusValue}" + ], + "min": %{node_status_count}, + "max": %{node_status_count}, + "sum": %{node_status_count}, + "count": 1 + } + ] + } + } + }' + + @@pod_inventory_custom_metrics_template = ' + { + "time": "%{timestamp}", + "data": { + "baseData": { + "metric": "%{metricName}", + "namespace": "insights.container/pods", + "dimNames": [ + "phase", + "Kubernetes namespace", + "node", + "controllerName" + ], + "series": [ + { + "dimValues": [ + "%{phaseDimValue}", + "%{namespaceDimValue}", + "%{nodeDimValue}", + "%{controllerNameDimValue}" + ], + "min": %{podCountMetricValue}, + "max": %{podCountMetricValue}, + "sum": %{podCountMetricValue}, + "count": 1 + } + ] + } + } + }' + + @@pod_phase_values = ['Running', 'Pending', 'Succeeded', 'Failed', 'Unknown'] + @process_incoming_stream = false + + def initialize(custom_metrics_azure_regions) + @log_path = '/var/opt/microsoft/docker-cimprov/log/filter_inventory2mdm.log' + @log = Logger.new(@log_path, 1, 5000000) + @pod_count_hash = {} + @no_phase_dim_values_hash = {} + @pod_count_by_phase = {} + @pod_uids = {} + @process_incoming_stream = CustomMetricsUtils.check_custom_metrics_availability(custom_metrics_azure_regions) + @log.debug "After check_custom_metrics_availability process_incoming_stream #{@process_incoming_stream}" + @log.debug {'Starting filter_inventory2mdm plugin'} + end + + def get_pod_inventory_mdm_records(batch_time) + begin + # generate all possible values of non_phase_dim_values X pod Phases and zero-fill the ones that are not already present + @no_phase_dim_values_hash.each {|key, value| + @@pod_phase_values.each{|phase| + pod_key = [key, phase].join('~~') + if !@pod_count_hash.key?(pod_key) + @pod_count_hash[pod_key] = 0 + #@log.info "Zero filled #{pod_key}" + else + next + end + } + } + records = [] + @pod_count_hash.each {|key, value| + key_elements = key.split('~~') + if key_elements.length != 4 + next + end + + # get dimension values by key + podNodeDimValue = key_elements[0] + podNamespaceDimValue = key_elements[1] + podControllerNameDimValue = key_elements[2] + podPhaseDimValue = key_elements[3] + + record = @@pod_inventory_custom_metrics_template % { + timestamp: batch_time, + metricName: @@pod_count_metric_name, + phaseDimValue: podPhaseDimValue, + namespaceDimValue: podNamespaceDimValue, + nodeDimValue: podNodeDimValue, + controllerNameDimValue: podControllerNameDimValue, + podCountMetricValue: value + } + records.push(JSON.parse(record)) + } + rescue Exception => e + @log.info "Error processing pod inventory record Exception: #{e.class} Message: #{e.message}" + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + return [] + end + @log.info "Pod Count To Phase #{@pod_count_by_phase} " + @log.info "resetting convertor state " + @pod_count_hash = {} + @no_phase_dim_values_hash = {} + @pod_count_by_phase = {} + @pod_uids = {} + return records + end + + def process_pod_inventory_record(record) + if @process_incoming_stream + begin + records = [] + + podUid = record['DataItems'][0]['PodUid'] + if @pod_uids.key?(podUid) + #@log.info "pod with #{podUid} already counted" + return + end + + @pod_uids[podUid] = true + podPhaseDimValue = record['DataItems'][0]['PodStatus'] + podNamespaceDimValue = record['DataItems'][0]['Namespace'] + podControllerNameDimValue = record['DataItems'][0]['ControllerName'] + podNodeDimValue = record['DataItems'][0]['Computer'] + + if podControllerNameDimValue.nil? || podControllerNameDimValue.empty? + podControllerNameDimValue = 'No Controller' + end + + if podNodeDimValue.empty? && podPhaseDimValue.downcase == 'pending' + podNodeDimValue = 'unscheduled' + elsif podNodeDimValue.empty? + podNodeDimValue = 'unknown' + end + + # group by distinct dimension values + pod_key = [podNodeDimValue, podNamespaceDimValue, podControllerNameDimValue, podPhaseDimValue].join('~~') + + @pod_count_by_phase[podPhaseDimValue] = @pod_count_by_phase.key?(podPhaseDimValue) ? @pod_count_by_phase[podPhaseDimValue] + 1 : 1 + @pod_count_hash[pod_key] = @pod_count_hash.key?(pod_key) ? @pod_count_hash[pod_key] + 1 : 1 + + # Collect all possible combinations of dimension values other than pod phase + key_without_phase_dim_value = [podNodeDimValue, podNamespaceDimValue, podControllerNameDimValue].join('~~') + if @no_phase_dim_values_hash.key?(key_without_phase_dim_value) + return + else + @no_phase_dim_values_hash[key_without_phase_dim_value] = true + end + rescue Exception => e + @log.info "Error processing pod inventory record Exception: #{e.class} Message: #{e.message}" + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + end + end + end +end +