Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 53 additions & 49 deletions source/code/plugin/filter_cadvisor2mdm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,45 +10,45 @@ module Fluent

class CAdvisor2MdmFilter < Filter
Fluent::Plugin.register_filter('filter_cadvisor2mdm', self)

config_param :enable_log, :integer, :default => 0
config_param :log_path, :string, :default => '/var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log'
config_param :custom_metrics_azure_regions, :string
config_param :metrics_to_collect, :string, :default => 'cpuUsageNanoCores,memoryWorkingSetBytes,memoryRssBytes'

@@cpu_usage_milli_cores = 'cpuUsageMillicores'
@@cpu_usage_nano_cores = 'cpuusagenanocores'
@@object_name_k8s_node = 'K8SNode'
@@hostName = (OMS::Common.get_hostname)
@@custom_metrics_template = '
{
"time": "%{timestamp}",
"data": {
"baseData": {
"metric": "%{metricName}",
"namespace": "Insights.Container/nodes",
"dimNames": [
{
"time": "%{timestamp}",
"data": {
"baseData": {
"metric": "%{metricName}",
"namespace": "Insights.Container/nodes",
"dimNames": [
"host"
],
"series": [
{
"dimValues": [
],
"series": [
{
"dimValues": [
"%{hostvalue}"
],
],
"min": %{metricminvalue},
"max": %{metricmaxvalue},
"sum": %{metricsumvalue},
"count": 1
}
]
}
}
"max": %{metricmaxvalue},
"sum": %{metricsumvalue},
"count": 1
}
]
}
}
}'

@@metric_name_metric_percentage_name_hash = {
@@cpu_usage_milli_cores => "cpuUsagePercentage",
@@cpu_usage_milli_cores => "cpuUsagePercentage",
"memoryRssBytes" => "memoryRssPercentage",
"memoryWorkingSetBytes" => "memoryWorkingSetPercentage"
"memoryWorkingSetBytes" => "memoryWorkingSetPercentage"
}

@process_incoming_stream = true
Expand All @@ -61,7 +61,7 @@ def initialize
def configure(conf)
super
@log = nil

if @enable_log
@log = Logger.new(@log_path, 1, 5000000)
@log.debug {'Starting filter_cadvisor2mdm plugin'}
Expand All @@ -70,15 +70,19 @@ def configure(conf)

def start
super
@process_incoming_stream = CustomMetricsUtils.check_custom_metrics_availability(@custom_metrics_azure_regions)
@metrics_to_collect_hash = build_metrics_hash
@log.debug "After check_custom_metrics_availability process_incoming_stream #{@process_incoming_stream}"

# initialize cpu and memory limit
if @process_incoming_stream
@cpu_capacity = 0.0
@memory_capacity = 0.0
ensure_cpu_memory_capacity_set
begin
@process_incoming_stream = CustomMetricsUtils.check_custom_metrics_availability(@custom_metrics_azure_regions)
@metrics_to_collect_hash = build_metrics_hash
@log.debug "After check_custom_metrics_availability process_incoming_stream #{@process_incoming_stream}"

# initialize cpu and memory limit
if @process_incoming_stream
@cpu_capacity = 0.0
@memory_capacity = 0.0
ensure_cpu_memory_capacity_set
end
rescue => e
@log.info "Error initializing plugin #{e}"
end
end

Expand Down Expand Up @@ -117,9 +121,9 @@ def filter(tag, time, record)
if @memory_capacity != 0.0
percentage_metric_value = metric_value*100/@memory_capacity
end
end
end
return get_metric_records(record, metric_name, metric_value, percentage_metric_value)
else
else
return []
end
else
Expand All @@ -140,13 +144,13 @@ def ensure_cpu_memory_capacity_set
return
end

begin
begin
nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes?fieldSelector=metadata.name%3D#{@@hostName}").body)
rescue Exception => e
@log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} "
ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
end
if !nodeInventory.nil?
if !nodeInventory.nil?
cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores")
if !cpu_capacity_json.nil? && !cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil?
@cpu_capacity = cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value']
Expand All @@ -163,7 +167,7 @@ def ensure_cpu_memory_capacity_set
end
end
end

def get_metric_records(record, metric_name, metric_value, percentage_metric_value)
records = []
custommetricrecord = @@custom_metrics_template % {
Expand Down Expand Up @@ -194,20 +198,20 @@ def get_metric_records(record, metric_name, metric_value, percentage_metric_valu
return records
end


def filter_stream(tag, es)
new_es = MultiEventStream.new
ensure_cpu_memory_capacity_set
es.each { |time, record|
begin
begin
ensure_cpu_memory_capacity_set
es.each { |time, record|
filtered_records = filter(tag, time, record)
filtered_records.each {|filtered_record|
filtered_records.each {|filtered_record|
new_es.add(time, filtered_record) if filtered_record
} if filtered_records
rescue => e
router.emit_error_event(tag, time, record, e)
end
}
} if filtered_records
}
rescue => e
@log.info "Error in filter_stream #{e.message}"
end
new_es
end
end
Expand Down
2 changes: 1 addition & 1 deletion source/code/plugin/out_mdm.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ def write(chunk)
end
end
rescue Exception => e
ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
@log.info "Exception when writing to MDM: #{e}"
raise e
end
Expand All @@ -163,7 +164,6 @@ def send_to_mdm(post_body)
@log.info "Response Code #{response.code} Updating @last_post_attempt_time"
@last_post_attempt_time = Time.now
@first_post_attempt_made = true
ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace)
# Not raising exception, as that will cause retries to happen
elsif !response.code.empty? && response.code.start_with?("4")
# Log 400 errors and continue
Expand Down