Skip to content
2 changes: 2 additions & 0 deletions installer/datafiles/base_container.data
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ MAINTAINER: 'Microsoft Corporation'
/opt/microsoft/omsagent/plugin/in_win_cadvisor_perf.rb; source/code/plugin/in_win_cadvisor_perf.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_kube_nodes.rb; source/code/plugin/in_kube_nodes.rb; 644; root; root
/opt/microsoft/omsagent/plugin/filter_inventory2mdm.rb; source/code/plugin/filter_inventory2mdm.rb; 644; root; root
/opt/microsoft/omsagent/plugin/podinventory_to_mdm.rb; source/code/plugin/podinventory_to_mdm.rb; 644; root; root
/opt/microsoft/omsagent/plugin/kubelet_utils.rb; source/code/plugin/kubelet_utils.rb; 644; root; root
/opt/microsoft/omsagent/plugin/CustomMetricsUtils.rb; source/code/plugin/CustomMetricsUtils.rb; 644; root; root


Expand Down
141 changes: 80 additions & 61 deletions source/code/plugin/CAdvisorMetricsAPIClient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,85 +55,58 @@ class CAdvisorMetricsAPIClient
# Keeping track of containers so that can delete the container from the container cpu cache when the container is deleted
# as a part of the cleanup routine
@@winContainerIdCache = []

#cadvisor ports
@@CADVISOR_SECURE_PORT = "10250"
@@CADVISOR_NON_SECURE_PORT = "10255"
def initialize
end

class << self
def getSummaryStatsFromCAdvisor(winNode)
headers = {}
response = nil
@Log.info "Getting CAdvisor Uri"
begin
cAdvisorSecurePort = false
# Check to see if omsagent needs to use 10255(insecure) port or 10250(secure) port
if !@cAdvisorMetricsSecurePort.nil? && @cAdvisorMetricsSecurePort == "true"
cAdvisorSecurePort = true
end

cAdvisorUri = getCAdvisorUri(winNode, cAdvisorSecurePort)
bearerToken = File.read("/var/run/secrets/kubernetes.io/serviceaccount/token")
@Log.info "cAdvisorUri: #{cAdvisorUri}"
relativeUri = "/stats/summary"
return getResponse(winNode, relativeUri)
end

if !cAdvisorUri.nil?
uri = URI.parse(cAdvisorUri)
if !!cAdvisorSecurePort == true
Net::HTTP.start(uri.host, uri.port,
:use_ssl => true, :open_timeout => 20, :read_timeout => 40,
:ca_file => "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
:verify_mode => OpenSSL::SSL::VERIFY_NONE) do |http|
cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri)
cAdvisorApiRequest["Authorization"] = "Bearer #{bearerToken}"
response = http.request(cAdvisorApiRequest)
@Log.info "Got response code #{response.code} from #{uri.request_uri}"
end
else
Net::HTTP.start(uri.host, uri.port, :use_ssl => false, :open_timeout => 20, :read_timeout => 40) do |http|
cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri)
response = http.request(cAdvisorApiRequest)
@Log.info "Got response code #{response.code} from #{uri.request_uri}"
end
end
end
rescue => error
@Log.warn("CAdvisor api request failed: #{error}")
telemetryProps = {}
telemetryProps["Computer"] = winNode["Hostname"]
ApplicationInsightsUtility.sendExceptionTelemetry(error, telemetryProps)
end
return response
def getNodeCapacityFromCAdvisor(winNode: nil)
relativeUri = "/spec/"
return getResponse(winNode, relativeUri)
end

def getCAdvisorUri(winNode, cAdvisorSecurePort)
begin
def getBaseCAdvisorUri(winNode)
cAdvisorSecurePort = isCAdvisorOnSecurePort()

if !!cAdvisorSecurePort == true
defaultHost = "https://localhost:10250"
defaultHost = "https://localhost:#{@@CADVISOR_SECURE_PORT}"
else
defaultHost = "http://localhost:10255"
defaultHost = "http://localhost:#{@@CADVISOR_NON_SECURE_PORT}"
end

relativeUri = "/stats/summary"
if !winNode.nil?
nodeIP = winNode["InternalIP"]
nodeIP = winNode["InternalIP"]
else
nodeIP = ENV["NODE_IP"]
nodeIP = ENV["NODE_IP"]
end

if !nodeIP.nil?
@Log.info("Using #{nodeIP + relativeUri} for CAdvisor Uri")
if !!cAdvisorSecurePort == true
return "https://#{nodeIP}:10250" + relativeUri
else
return "http://#{nodeIP}:10255" + relativeUri
end
@Log.info("Using #{nodeIP} for CAdvisor Host")
if !!cAdvisorSecurePort == true
return "https://#{nodeIP}:#{@@CADVISOR_SECURE_PORT}"
else
return "http://#{nodeIP}:#{@@CADVISOR_NON_SECURE_PORT}"
end
else
@Log.warn ("NODE_IP environment variable not set. Using default as : #{defaultHost + relativeUri} ")
if !winNode.nil?
return nil
else
return defaultHost + relativeUri
end
@Log.warn ("NODE_IP environment variable not set. Using default as : #{defaultHost}")
if !winNode.nil?
return nil
else
return defaultHost
end
end
end
end

def getCAdvisorUri(winNode, relativeUri)
baseUri = getBaseCAdvisorUri(winNode)
return baseUri + relativeUri
end

def getMetrics(winNode: nil, metricTime: Time.now.utc.iso8601)
Expand Down Expand Up @@ -696,5 +669,51 @@ def getContainerStartTimeMetricItems(metricJSON, hostName, metricNametoReturn, m
end
return metricItems
end

def getResponse(winNode, relativeUri)
response = nil
@Log.info "Getting CAdvisor Uri Response"
bearerToken = File.read("/var/run/secrets/kubernetes.io/serviceaccount/token")
begin
cAdvisorUri = getCAdvisorUri(winNode, relativeUri)
@Log.info "cAdvisorUri: #{cAdvisorUri}"

if !cAdvisorUri.nil?
uri = URI.parse(cAdvisorUri)
if isCAdvisorOnSecurePort()
Net::HTTP.start(uri.host, uri.port,
:use_ssl => true, :open_timeout => 20, :read_timeout => 40,
:ca_file => "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
:verify_mode => OpenSSL::SSL::VERIFY_NONE) do |http|
cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri)
cAdvisorApiRequest["Authorization"] = "Bearer #{bearerToken}"
response = http.request(cAdvisorApiRequest)
@Log.info "Got response code #{response.code} from #{uri.request_uri}"
end
else
Net::HTTP.start(uri.host, uri.port, :use_ssl => false, :open_timeout => 20, :read_timeout => 40) do |http|
cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri)
response = http.request(cAdvisorApiRequest)
@Log.info "Got response code #{response.code} from #{uri.request_uri}"
end
end
end
rescue => error
@Log.warn("CAdvisor api request for #{cAdvisorUri} failed: #{error}")
telemetryProps = {}
telemetryProps["Computer"] = winNode["Hostname"]
ApplicationInsightsUtility.sendExceptionTelemetry(error, telemetryProps)
end
return response
end

def isCAdvisorOnSecurePort
cAdvisorSecurePort = false
# Check to see whether omsagent needs to use 10255(insecure) port or 10250(secure) port
if !@cAdvisorMetricsSecurePort.nil? && @cAdvisorMetricsSecurePort == "true"
cAdvisorSecurePort = true
end
return cAdvisorSecurePort
end
end
end
52 changes: 31 additions & 21 deletions source/code/plugin/filter_cadvisor2mdm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Fluent
require 'yajl/json_gem'
require_relative 'oms_common'
require_relative 'CustomMetricsUtils'
require_relative 'kubelet_utils'

class CAdvisor2MdmFilter < Filter
Fluent::Plugin.register_filter('filter_cadvisor2mdm', self)
Expand Down Expand Up @@ -110,9 +111,10 @@ def filter(tag, time, record)
metric_value = record['DataItems'][0]['Collections'][0]['Value']
if counter_name.downcase == @@cpu_usage_nano_cores
metric_name = @@cpu_usage_milli_cores
metric_value = metric_value/1000000
metric_value /= 1000000 #cadvisor record is in nanocores. Convert to mc
@log.info "Metric_value: #{metric_value} CPU Capacity #{@cpu_capacity}"
if @cpu_capacity != 0.0
percentage_metric_value = (metric_value*1000000)*100/@cpu_capacity
percentage_metric_value = (metric_value)*100/@cpu_capacity
end
end

Expand All @@ -138,34 +140,42 @@ def filter(tag, time, record)

def ensure_cpu_memory_capacity_set

@log.info "ensure_cpu_memory_capacity_set @cpu_capacity #{@cpu_capacity} @memory_capacity #{@memory_capacity}"
if @cpu_capacity != 0.0 && @memory_capacity != 0.0
@log.info "CPU And Memory Capacity are already set"
return
end

begin
controller_type = ENV["CONTROLLER_TYPE"]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this plugin is used in RS as well for windows AKS clusters. Hence this check

if controller_type.downcase == 'replicaset'
@log.info "ensure_cpu_memory_capacity_set @cpu_capacity #{@cpu_capacity} @memory_capacity #{@memory_capacity}"

begin
resourceUri = KubernetesApiClient.getNodesResourceUri("nodes?fieldSelector=metadata.name%3D#{@@hostName}")
nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo(resourceUri).body)
rescue Exception => e
@log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} "
ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
end
if !nodeInventory.nil?
cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores")
if !cpu_capacity_json.nil? && !cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil?
@cpu_capacity = cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value']
@log.info "CPU Limit #{@cpu_capacity}"
else
@log.info "Error getting cpu_capacity"
rescue Exception => e
@log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} "
ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
end
memory_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes")
if !memory_capacity_json.nil? && !memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil?
@memory_capacity = memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value']
@log.info "Memory Limit #{@memory_capacity}"
else
@log.info "Error getting memory_capacity"
if !nodeInventory.nil?
cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores")
if !cpu_capacity_json.nil? && !cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil?
@cpu_capacity = cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value']
@log.info "CPU Limit #{@cpu_capacity}"
else
@log.info "Error getting cpu_capacity"
end
memory_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes")
if !memory_capacity_json.nil? && !memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil?
@memory_capacity = memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value']
@log.info "Memory Limit #{@memory_capacity}"
else
@log.info "Error getting memory_capacity"
end
end
elsif controller_type.downcase == 'daemonset'
capacity_from_kubelet = KubeletUtils.get_node_capacity
@cpu_capacity = capacity_from_kubelet[0]
@memory_capacity = capacity_from_kubelet[1]
end
end

Expand Down
4 changes: 2 additions & 2 deletions source/code/plugin/filter_cadvisor_health_node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,13 @@ def process_node_cpu_record(record, metric_value)
else
instance_name = record['DataItems'][0]['InstanceName']
#@log.info "CPU capacity #{@cpu_capacity}"

metric_value /= 1000000
percent = (metric_value.to_f/@cpu_capacity*100).round(2)
#@log.debug "Percentage of CPU limit: #{percent}"
state = HealthMonitorUtils.compute_percentage_state(percent, @provider.get_config(MonitorId::NODE_CPU_MONITOR_ID))
#@log.debug "Computed State : #{state}"
timestamp = record['DataItems'][0]['Timestamp']
health_monitor_record = {"timestamp" => timestamp, "state" => state, "details" => {"cpuUsageMillicores" => metric_value/1000000.to_f, "cpuUtilizationPercentage" => percent}}
health_monitor_record = {"timestamp" => timestamp, "state" => state, "details" => {"cpuUsageMillicores" => metric_value, "cpuUtilizationPercentage" => percent}}

monitor_instance_id = HealthMonitorUtils.get_monitor_instance_id(monitor_id, [@@clusterId, @@hostName])
# temp = record.nil? ? "Nil" : record["MonitorInstanceId"]
Expand Down
4 changes: 2 additions & 2 deletions source/code/plugin/filter_inventory2mdm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def process_pod_inventory_records(es)
no_phase_dim_values_hash = Hash.new
total_pod_count = 0
pod_count_by_phase = {}
podUids = {}
podUids = {}
record_count = 0
begin
records = []
Expand All @@ -165,7 +165,7 @@ def process_pod_inventory_records(es)
timestamp = record['DataItems'][0]['CollectionTime']
podUid = record['DataItems'][0]['PodUid']

if podUids.key?(podUid)
if podUids.key?(podUid)
#@log.info "pod with #{podUid} already counted"
next
end
Expand Down
40 changes: 2 additions & 38 deletions source/code/plugin/health/health_monitor_utils.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require 'digest'
require_relative 'health_model_constants'
require 'yajl/json_gem'
require_relative '../kubelet_utils'

module HealthModel
# static class that provides a bunch of utility methods
Expand Down Expand Up @@ -265,50 +266,13 @@ def get_monitor_instance_id(monitor_id, args = [])
end

def ensure_cpu_memory_capacity_set(log, cpu_capacity, memory_capacity, hostname)

log.info "ensure_cpu_memory_capacity_set cpu_capacity #{cpu_capacity} memory_capacity #{memory_capacity}"
if cpu_capacity != 1.0 && memory_capacity != 1.0
log.info "CPU And Memory Capacity are already set"
return [cpu_capacity, memory_capacity]
end

log.info "CPU and Memory Capacity Not set"
begin
resourceUri = KubernetesApiClient.getNodesResourceUri("nodes")
@@nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo(resourceUri).body)
rescue Exception => e
log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} "
ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
end
if !@@nodeInventory.nil?
cpu_capacity_json = KubernetesApiClient.parseNodeLimits(@@nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores")
if !cpu_capacity_json.nil?
cpu_capacity_json.each do |cpu_info_node|
if !cpu_info_node['DataItems'][0]['Host'].nil? && cpu_info_node['DataItems'][0]['Host'] == hostname
if !cpu_info_node['DataItems'][0]['Collections'][0]['Value'].nil?
cpu_capacity = cpu_info_node['DataItems'][0]['Collections'][0]['Value']
end
end
end
log.info "CPU Limit #{cpu_capacity}"
else
log.info "Error getting cpu_capacity"
end
memory_capacity_json = KubernetesApiClient.parseNodeLimits(@@nodeInventory, "capacity", "memory", "memoryCapacityBytes")
if !memory_capacity_json.nil?
memory_capacity_json.each do |memory_info_node|
if !memory_info_node['DataItems'][0]['Host'].nil? && memory_info_node['DataItems'][0]['Host'] == hostname
if !memory_info_node['DataItems'][0]['Collections'][0]['Value'].nil?
memory_capacity = memory_info_node['DataItems'][0]['Collections'][0]['Value']
end
end
end
log.info "memory Limit #{memory_capacity}"
else
log.info "Error getting memory_capacity"
end
return [cpu_capacity, memory_capacity]
end
return KubeletUtils.get_node_capacity
end

def build_metrics_hash(metrics_to_collect)
Expand Down
Loading