diff --git a/README.md b/README.md index 92684ba5d..4313de5c0 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,22 @@ additional questions or comments. Note : The agent version(s) below has dates (ciprod), which indicate the agent build dates (not release dates) +### 01/20/2019 - Version microsoft/oms:ciprod02202019 +- Container logs enrichment optimization + * Get container meta data only for containers in current node (vs cluster before) +- Update fluent bit 0.13.7 => 0.14.4 + * This fixes the escaping issue in the container logs +- Mooncake cloud support for agent (AKS only) + * Ability to disable agent telemetry + * Ability to onboard and ingest to mooncake cloud +- Add & populate 'ContainerStatusReason' column to KubePodInventory +- Alertable (custom) metrics (to AzureMonitor - only for AKS clusters) + * Cpuusagenanocores & % metric + * MemoryWorkingsetBytes & % metric + * MemoryRssBytes & % metric + * Podcount by node, phase & namespace metric + * Nodecount metric + ### 01/09/2018 - Version microsoft/oms:ciprod01092019 - Omsagent - 1.8.1.256 (nov 2018 release) - Persist fluentbit state between container restarts @@ -25,7 +41,7 @@ Note : The agent version(s) below has dates (ciprod), which indicate t - Agent telemetry - ContainerLogsAgentSideLatencyMs - Agent telemetry - PodCount - Agent telemetry - ControllerCount -- Agent telemetry - K8S Version +- Agent telemetry - K8S Version - Agent telemetry - NodeCoreCapacity - Agent telemetry - NodeMemoryCapacity - Agent telemetry - KubeEvents (exceptions) diff --git a/installer/conf/container.conf b/installer/conf/container.conf index 091753230..f41bd6f98 100755 --- a/installer/conf/container.conf +++ b/installer/conf/container.conf @@ -23,6 +23,14 @@ log_level debug +#custom_metrics_mdm filter plugin + + type filter_cadvisor2mdm + custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westEurope + metrics_to_collect cpuUsageNanoCores,memoryWorkingSetBytes,memoryRssBytes + log_level info + + type out_oms log_level debug @@ -52,3 +60,19 @@ retry_wait 30s max_retry_wait 9m + + + type out_mdm + log_level debug + num_threads 5 + buffer_chunk_limit 20m + buffer_type file + buffer_path %STATE_DIR_WS%/out_mdm_cdvisorperf*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 30s + max_retry_wait 9m + retry_mdm_post_wait_minutes 60 + diff --git a/installer/conf/kube.conf b/installer/conf/kube.conf index 6331d257e..31a0778d3 100644 --- a/installer/conf/kube.conf +++ b/installer/conf/kube.conf @@ -47,6 +47,12 @@ log_level debug + + type filter_inventory2mdm + custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westEurope + log_level info + + type out_oms log_level debug @@ -119,8 +125,8 @@ max_retry_wait 9m - - type out_oms_api + + type out_oms log_level debug buffer_chunk_limit 20m buffer_type file @@ -146,3 +152,19 @@ retry_wait 30s max_retry_wait 9m + + + type out_mdm + log_level debug + num_threads 5 + buffer_chunk_limit 20m + buffer_type file + buffer_path /var/opt/microsoft/omsagent/6bb1e963-b08c-43a8-b708-1628305e964a/state/out_mdm_*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 30s + max_retry_wait 9m + retry_mdm_post_wait_minutes 60 + \ No newline at end of file diff --git a/installer/conf/td-agent-bit.conf b/installer/conf/td-agent-bit.conf index 29c98bdf1..467489d1c 100644 --- a/installer/conf/td-agent-bit.conf +++ b/installer/conf/td-agent-bit.conf @@ -28,5 +28,5 @@ EnableTelemetry true TelemetryPushIntervalSeconds 300 Match oms.container.log.* - AgentVersion ciprod01092019 + AgentVersion ciprod02202019 diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data index 7181929e2..c263aa505 100644 --- a/installer/datafiles/base_container.data +++ b/installer/datafiles/base_container.data @@ -36,6 +36,9 @@ MAINTAINER: 'Microsoft Corporation' /opt/microsoft/omsagent/plugin/in_cadvisor_perf.rb; source/code/plugin/in_cadvisor_perf.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_services.rb; source/code/plugin/in_kube_services.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_nodes.rb; source/code/plugin/in_kube_nodes.rb; 644; root; root +/opt/microsoft/omsagent/plugin/filter_inventory2mdm.rb; source/code/plugin/filter_inventory2mdm.rb; 644; root; root +/opt/microsoft/omsagent/plugin/CustomMetricsUtils.rb; source/code/plugin/CustomMetricsUtils.rb; 644; root; root + /opt/microsoft/omsagent/plugin/ApplicationInsightsUtility.rb; source/code/plugin/ApplicationInsightsUtility.rb; 644; root; root /opt/microsoft/omsagent/plugin/ContainerInventoryState.rb; source/code/plugin/ContainerInventoryState.rb; 644; root; root @@ -43,6 +46,9 @@ MAINTAINER: 'Microsoft Corporation' /opt/microsoft/omsagent/plugin/DockerApiRestHelper.rb; source/code/plugin/DockerApiRestHelper.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_containerinventory.rb; source/code/plugin/in_containerinventory.rb; 644; root; root +/opt/microsoft/omsagent/plugin/out_mdm.rb; source/code/plugin/out_mdm.rb; 644; root; root +/opt/microsoft/omsagent/plugin/filter_cadvisor2mdm.rb; source/code/plugin/filter_cadvisor2mdm.rb; 644; root; root + /opt/microsoft/omsagent/plugin/lib/application_insights/version.rb; source/code/plugin/lib/application_insights/version.rb; 644; root; root /opt/microsoft/omsagent/plugin/lib/application_insights/rack/track_request.rb; source/code/plugin/lib/application_insights/rack/track_request.rb; 644; root; root /opt/microsoft/omsagent/plugin/lib/application_insights/unhandled_exception.rb; source/code/plugin/lib/application_insights/unhandled_exception.rb; 644; root; root @@ -170,6 +176,14 @@ touch /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt chmod 666 /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt chown omsagent:omiusers /var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt +touch /var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log +chmod 666 /var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log +chown omsagent:omiusers /var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log + +touch /var/opt/microsoft/docker-cimprov/log/filter_inventory2mdm.log +chmod 666 /var/opt/microsoft/docker-cimprov/log/filter_inventory2mdm.log +chown omsagent:omiusers /var/opt/microsoft/docker-cimprov/log/filter_inventory2mdm.log + mv /etc/opt/microsoft/docker-cimprov/container.conf /etc/opt/microsoft/omsagent/sysconf/omsagent.d/container.conf chown omsagent:omsagent /etc/opt/microsoft/omsagent/sysconf/omsagent.d/container.conf diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go index 5d9269d1e..d913c6c32 100644 --- a/source/code/go/src/plugins/oms.go +++ b/source/code/go/src/plugins/oms.go @@ -77,15 +77,15 @@ var ( // DataItem represents the object corresponding to the json that is sent by fluentbit tail plugin type DataItem struct { - LogEntry string `json:"LogEntry"` - LogEntrySource string `json:"LogEntrySource"` - LogEntryTimeStamp string `json:"LogEntryTimeStamp"` - LogEntryTimeOfCommand string `json:"TimeOfCommand"` - ID string `json:"Id"` - Image string `json:"Image"` - Name string `json:"Name"` - SourceSystem string `json:"SourceSystem"` - Computer string `json:"Computer"` + LogEntry string `json:"LogEntry"` + LogEntrySource string `json:"LogEntrySource"` + LogEntryTimeStamp string `json:"LogEntryTimeStamp"` + LogEntryTimeOfCommand string `json:"TimeOfCommand"` + ID string `json:"Id"` + Image string `json:"Image"` + Name string `json:"Name"` + SourceSystem string `json:"SourceSystem"` + Computer string `json:"Computer"` } // ContainerLogBlob represents the object corresponding to the payload that is sent to the ODS end point @@ -137,7 +137,10 @@ func updateContainerImageNameMaps() { _imageIDMap := make(map[string]string) _nameIDMap := make(map[string]string) - pods, err := ClientSet.CoreV1().Pods("").List(metav1.ListOptions{}) + listOptions := metav1.ListOptions{} + listOptions.FieldSelector = fmt.Sprintf("spec.nodeName=%s", Computer) + pods, err := ClientSet.CoreV1().Pods("").List(listOptions) + if err != nil { message := fmt.Sprintf("Error getting pods %s\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) Log(message) @@ -244,31 +247,31 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { if val, ok := imageIDMap[containerID]; ok { stringMap["Image"] = val } else { - Log("ContainerId %s not present in Map ", containerID) + Log("ContainerId %s not present in Name Map ", containerID) } if val, ok := nameIDMap[containerID]; ok { stringMap["Name"] = val } else { - Log("ContainerId %s not present in Map ", containerID) + Log("ContainerId %s not present in Image Map ", containerID) } dataItem := DataItem{ - ID: stringMap["Id"], - LogEntry: stringMap["LogEntry"], - LogEntrySource: stringMap["LogEntrySource"], - LogEntryTimeStamp: stringMap["LogEntryTimeStamp"], - LogEntryTimeOfCommand: start.Format(time.RFC3339), - SourceSystem: stringMap["SourceSystem"], - Computer: Computer, - Image: stringMap["Image"], - Name: stringMap["Name"], + ID: stringMap["Id"], + LogEntry: stringMap["LogEntry"], + LogEntrySource: stringMap["LogEntrySource"], + LogEntryTimeStamp: stringMap["LogEntryTimeStamp"], + LogEntryTimeOfCommand: start.Format(time.RFC3339), + SourceSystem: stringMap["SourceSystem"], + Computer: Computer, + Image: stringMap["Image"], + Name: stringMap["Name"], } dataItems = append(dataItems, dataItem) loggedTime, e := time.Parse(time.RFC3339, dataItem.LogEntryTimeStamp) - if e!= nil { + if e != nil { message := fmt.Sprintf("Error while converting LogEntryTimeStamp for telemetry purposes: %s", e.Error()) Log(message) SendException(message) diff --git a/source/code/go/src/plugins/telemetry.go b/source/code/go/src/plugins/telemetry.go index 82f970d3a..a64ca2218 100644 --- a/source/code/go/src/plugins/telemetry.go +++ b/source/code/go/src/plugins/telemetry.go @@ -120,6 +120,11 @@ func InitializeTelemetryClient(agentVersion string) (int, error) { } TelemetryClient = appinsights.NewTelemetryClient(string(decIkey)) + telemetryOffSwitch := os.Getenv("DISABLE_TELEMETRY") + if strings.Compare(strings.ToLower(telemetryOffSwitch), "true") == 0 { + Log("Appinsights telemetry is disabled \n") + TelemetryClient.SetIsEnabled(false) + } CommonProperties = make(map[string]string) CommonProperties["Computer"] = Computer diff --git a/source/code/plugin/ApplicationInsightsUtility.rb b/source/code/plugin/ApplicationInsightsUtility.rb index 27660d708..5c5e92a6c 100644 --- a/source/code/plugin/ApplicationInsightsUtility.rb +++ b/source/code/plugin/ApplicationInsightsUtility.rb @@ -61,9 +61,16 @@ def initializeUtility() @@CustomProperties['AgentVersion'] = ENV[@@EnvAgentVersion] @@CustomProperties['ControllerType'] = ENV[@@EnvControllerType] encodedAppInsightsKey = ENV[@@EnvApplicationInsightsKey] - if !encodedAppInsightsKey.nil? + + #Check if telemetry is turned off + telemetryOffSwitch = ENV['DISABLE_TELEMETRY'] + if telemetryOffSwitch && !telemetryOffSwitch.nil? && !telemetryOffSwitch.empty? && telemetryOffSwitch.downcase == "true".downcase + $log.warn("AppInsightsUtility: Telemetry is disabled") + @@Tc = ApplicationInsights::TelemetryClient.new + elsif !encodedAppInsightsKey.nil? decodedAppInsightsKey = Base64.decode64(encodedAppInsightsKey) @@Tc = ApplicationInsights::TelemetryClient.new decodedAppInsightsKey + end rescue => errorStr $log.warn("Exception in AppInsightsUtility: initilizeUtility - error: #{errorStr}") @@ -91,7 +98,7 @@ def sendHeartBeatEvent(pluginName) end end - def sendCustomMetric(pluginName, properties) + def sendLastProcessedContainerInventoryCountMetric(pluginName, properties) begin if !(@@Tc.nil?) @@Tc.track_metric 'LastProcessedContainerInventoryCount', properties['ContainerCount'], @@ -105,6 +112,21 @@ def sendCustomMetric(pluginName, properties) end end + def sendCustomEvent(eventName, properties) + begin + if @@CustomProperties.empty? || @@CustomProperties.nil? + initializeUtility() + end + if !(@@Tc.nil?) + @@Tc.track_event eventName, :properties => @@CustomProperties + @@Tc.flush + $log.info("AppInsights Custom Event #{eventName} sent successfully") + end + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: sendCustomEvent - error: #{errorStr}") + end + end + def sendExceptionTelemetry(errorStr) begin if @@CustomProperties.empty? || @@CustomProperties.nil? @@ -132,7 +154,7 @@ def sendTelemetry(pluginName, properties) end @@CustomProperties['Computer'] = properties['Computer'] sendHeartBeatEvent(pluginName) - sendCustomMetric(pluginName, properties) + sendLastProcessedContainerInventoryCountMetric(pluginName, properties) rescue => errorStr $log.warn("Exception in AppInsightsUtility: sendTelemetry - error: #{errorStr}") end diff --git a/source/code/plugin/CustomMetricsUtils.rb b/source/code/plugin/CustomMetricsUtils.rb new file mode 100644 index 000000000..d06c9ad91 --- /dev/null +++ b/source/code/plugin/CustomMetricsUtils.rb @@ -0,0 +1,26 @@ +#!/usr/local/bin/ruby +# frozen_string_literal: true + +class CustomMetricsUtils + def initialize + end + + class << self + def check_custom_metrics_availability(custom_metric_regions) + aks_region = ENV['AKS_REGION'] + aks_resource_id = ENV['AKS_RESOURCE_ID'] + if aks_region.to_s.empty? && aks_resource_id.to_s.empty? + false # This will also take care of AKS-Engine Scenario. AKS_REGION/AKS_RESOURCE_ID is not set for AKS-Engine. Only ACS_RESOURCE_NAME is set + end + + custom_metrics_regions_arr = custom_metric_regions.split(',') + custom_metrics_regions_hash = custom_metrics_regions_arr.map {|x| [x.downcase,true]}.to_h + + if custom_metrics_regions_hash.key?(aks_region.downcase) + true + else + false + end + end + end +end \ No newline at end of file diff --git a/source/code/plugin/filter_cadvisor2mdm.rb b/source/code/plugin/filter_cadvisor2mdm.rb new file mode 100644 index 000000000..85f9f688e --- /dev/null +++ b/source/code/plugin/filter_cadvisor2mdm.rb @@ -0,0 +1,215 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. + +# frozen_string_literal: true + +module Fluent + require 'logger' + require 'json' + require_relative 'oms_common' + require_relative 'CustomMetricsUtils' + + class CAdvisor2MdmFilter < Filter + Fluent::Plugin.register_filter('filter_cadvisor2mdm', self) + + config_param :enable_log, :integer, :default => 0 + config_param :log_path, :string, :default => '/var/opt/microsoft/docker-cimprov/log/filter_cadvisor2mdm.log' + config_param :custom_metrics_azure_regions, :string + config_param :metrics_to_collect, :string, :default => 'cpuUsageNanoCores,memoryWorkingSetBytes,memoryRssBytes' + + @@cpu_usage_milli_cores = 'cpuUsageMillicores' + @@cpu_usage_nano_cores = 'cpuusagenanocores' + @@object_name_k8s_node = 'K8SNode' + @@hostName = (OMS::Common.get_hostname) + @@custom_metrics_template = ' + { + "time": "%{timestamp}", + "data": { + "baseData": { + "metric": "%{metricName}", + "namespace": "Insights.Container/nodes", + "dimNames": [ + "host" + ], + "series": [ + { + "dimValues": [ + "%{hostvalue}" + ], + "min": %{metricminvalue}, + "max": %{metricmaxvalue}, + "sum": %{metricsumvalue}, + "count": 1 + } + ] + } + } + }' + + @@metric_name_metric_percentage_name_hash = { + @@cpu_usage_milli_cores => "cpuUsagePercentage", + "memoryRssBytes" => "memoryRssPercentage", + "memoryWorkingSetBytes" => "memoryWorkingSetPercentage" + } + + @process_incoming_stream = true + @metrics_to_collect_hash = {} + + def initialize + super + end + + def configure(conf) + super + @log = nil + + if @enable_log + @log = Logger.new(@log_path, 'weekly') + @log.debug {'Starting filter_cadvisor2mdm plugin'} + end + end + + def start + super + @process_incoming_stream = CustomMetricsUtils.check_custom_metrics_availability(@custom_metrics_azure_regions) + @metrics_to_collect_hash = build_metrics_hash + @log.debug "After check_custom_metrics_availability process_incoming_stream #{@process_incoming_stream}" + + # initialize cpu and memory limit + if @process_incoming_stream + @cpu_capacity = 0.0 + @memory_capacity = 0.0 + ensure_cpu_memory_capacity_set + end + end + + def build_metrics_hash + @log.debug "Building Hash of Metrics to Collect" + metrics_to_collect_arr = @metrics_to_collect.split(',').map(&:strip) + metrics_hash = metrics_to_collect_arr.map {|x| [x.downcase,true]}.to_h + @log.info "Metrics Collected : #{metrics_hash}" + return metrics_hash + end + + def shutdown + super + end + + def filter(tag, time, record) + begin + if @process_incoming_stream + object_name = record['DataItems'][0]['ObjectName'] + counter_name = record['DataItems'][0]['Collections'][0]['CounterName'] + if object_name == @@object_name_k8s_node && @metrics_to_collect_hash.key?(counter_name.downcase) + percentage_metric_value = 0.0 + + # Compute and send % CPU and Memory + metric_value = record['DataItems'][0]['Collections'][0]['Value'] + if counter_name.downcase == @@cpu_usage_nano_cores + metric_name = @@cpu_usage_milli_cores + metric_value = metric_value/1000000 + if @cpu_capacity != 0.0 + percentage_metric_value = (metric_value*1000000)*100/@cpu_capacity + end + end + + if counter_name.start_with?("memory") + metric_name = counter_name + if @memory_capacity != 0.0 + percentage_metric_value = metric_value*100/@memory_capacity + end + end + return get_metric_records(record, metric_name, metric_value, percentage_metric_value) + else + return [] + end + else + return [] + end + rescue Exception => e + @log.info "Error processing cadvisor record Exception: #{e.class} Message: #{e.message}" + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + return [] + end + end + + def ensure_cpu_memory_capacity_set + + @log.info "ensure_cpu_memory_capacity_set @cpu_capacity #{@cpu_capacity} @memory_capacity #{@memory_capacity}" + if @cpu_capacity != 0.0 && @memory_capacity != 0.0 + @log.info "CPU And Memory Capacity are already set" + return + end + + begin + nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes?fieldSelector=metadata.name%3D#{@@hostName}").body) + rescue Exception => e + @log.info "Error when getting nodeInventory from kube API. Exception: #{e.class} Message: #{e.message} " + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + end + if !nodeInventory.nil? + cpu_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores") + if !cpu_capacity_json.nil? && !cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? + @cpu_capacity = cpu_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] + @log.info "CPU Limit #{@cpu_capacity}" + else + @log.info "Error getting cpu_capacity" + end + memory_capacity_json = KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes") + if !memory_capacity_json.nil? && !memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'].to_s.nil? + @memory_capacity = memory_capacity_json[0]['DataItems'][0]['Collections'][0]['Value'] + @log.info "Memory Limit #{@memory_capacity}" + else + @log.info "Error getting memory_capacity" + end + end + end + + def get_metric_records(record, metric_name, metric_value, percentage_metric_value) + records = [] + custommetricrecord = @@custom_metrics_template % { + timestamp: record['DataItems'][0]['Timestamp'], + metricName: metric_name, + hostvalue: record['DataItems'][0]['Host'], + objectnamevalue: record['DataItems'][0]['ObjectName'], + instancenamevalue: record['DataItems'][0]['InstanceName'], + metricminvalue: metric_value, + metricmaxvalue: metric_value, + metricsumvalue: metric_value + } + records.push(JSON.parse(custommetricrecord)) + + if !percentage_metric_value.nil? + additional_record = @@custom_metrics_template % { + timestamp: record['DataItems'][0]['Timestamp'], + metricName: @@metric_name_metric_percentage_name_hash[metric_name], + hostvalue: record['DataItems'][0]['Host'], + objectnamevalue: record['DataItems'][0]['ObjectName'], + instancenamevalue: record['DataItems'][0]['InstanceName'], + metricminvalue: percentage_metric_value, + metricmaxvalue: percentage_metric_value, + metricsumvalue: percentage_metric_value + } + records.push(JSON.parse(additional_record)) + end + @log.info "Metric Name: #{metric_name} Metric Value: #{metric_value} Percentage Metric Value: #{percentage_metric_value}" + return records + end + + + def filter_stream(tag, es) + new_es = MultiEventStream.new + ensure_cpu_memory_capacity_set + es.each { |time, record| + begin + filtered_records = filter(tag, time, record) + filtered_records.each {|filtered_record| + new_es.add(time, filtered_record) if filtered_record + } if filtered_records + rescue => e + router.emit_error_event(tag, time, record, e) + end + } + new_es + end + end +end diff --git a/source/code/plugin/filter_inventory2mdm.rb b/source/code/plugin/filter_inventory2mdm.rb new file mode 100644 index 000000000..8aaa5ff01 --- /dev/null +++ b/source/code/plugin/filter_inventory2mdm.rb @@ -0,0 +1,260 @@ +# Copyright (c) Microsoft Corporation. All rights reserved. + +# frozen_string_literal: true + +module Fluent + require 'logger' + require 'json' + require_relative 'oms_common' + require_relative 'CustomMetricsUtils' + + class Inventory2MdmFilter < Filter + Fluent::Plugin.register_filter('filter_inventory2mdm', self) + + config_param :enable_log, :integer, :default => 0 + config_param :log_path, :string, :default => '/var/opt/microsoft/docker-cimprov/log/filter_inventory2mdm.log' + config_param :custom_metrics_azure_regions, :string + + @@node_count_metric_name = 'nodesCount' + @@pod_count_metric_name = 'podCount' + @@pod_inventory_tag = 'mdm.kubepodinventory' + @@node_inventory_tag = 'mdm.kubenodeinventory' + @@node_status_ready = 'Ready' + @@node_status_not_ready = 'NotReady' + + @@node_inventory_custom_metrics_template = ' + { + "time": "%{timestamp}", + "data": { + "baseData": { + "metric": "%{metricName}", + "namespace": "insights.container/nodes", + "dimNames": [ + "status" + ], + "series": [ + { + "dimValues": [ + "%{statusValue}" + ], + "min": %{node_status_count}, + "max": %{node_status_count}, + "sum": %{node_status_count}, + "count": 1 + } + ] + } + } + }' + + @@pod_inventory_custom_metrics_template = ' + { + "time": "%{timestamp}", + "data": { + "baseData": { + "metric": "%{metricName}", + "namespace": "insights.container/pods", + "dimNames": [ + "phase", + "Kubernetes namespace", + "node", + "controllerName" + ], + "series": [ + { + "dimValues": [ + "%{phaseDimValue}", + "%{namespaceDimValue}", + "%{nodeDimValue}", + "%{controllerNameDimValue}" + ], + "min": %{podCountMetricValue}, + "max": %{podCountMetricValue}, + "sum": %{podCountMetricValue}, + "count": 1 + } + ] + } + } + }' + + @@pod_phase_values = ['Running', 'Pending', 'Succeeded', 'Failed', 'Unknown'] + + @process_incoming_stream = true + + def initialize + super + end + + def configure(conf) + super + @log = nil + + if @enable_log + @log = Logger.new(@log_path, 'weekly') + @log.debug {'Starting filter_inventory2mdm plugin'} + end + end + + def start + super + @process_incoming_stream = CustomMetricsUtils.check_custom_metrics_availability(@custom_metrics_azure_regions) + @log.debug "After check_custom_metrics_availability process_incoming_stream #{@process_incoming_stream}" + end + + def shutdown + super + end + + def process_node_inventory_records(es) + timestamp = DateTime.now + + begin + node_ready_count = 0 + node_not_ready_count = 0 + records = [] + + es.each{|time,record| + begin + timestamp = record['DataItems'][0]['CollectionTime'] + node_status = record['DataItems'][0]['Status'] + if node_status.downcase == @@node_status_ready.downcase + node_ready_count = node_ready_count+1 + else + node_not_ready_count = node_not_ready_count + 1 + end + rescue => e + end + } + + ready_record = @@node_inventory_custom_metrics_template % { + timestamp: timestamp, + metricName: @@node_count_metric_name, + statusValue: @@node_status_ready, + node_status_count: node_ready_count + } + records.push(JSON.parse(ready_record)) + + not_ready_record = @@node_inventory_custom_metrics_template % { + timestamp: timestamp, + metricName: @@node_count_metric_name, + statusValue: @@node_status_not_ready, + node_status_count: node_not_ready_count + } + records.push(JSON.parse(not_ready_record)) + rescue Exception => e + @log.info "Error processing node inventory records Exception: #{e.class} Message: #{e.message}" + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + return [],timestamp + end + return records,timestamp + end + + def process_pod_inventory_records(es) + timestamp = DateTime.now + pod_count_hash = Hash.new + no_phase_dim_values_hash = Hash.new + begin + records = [] + es.each{|time,record| + + timestamp = record['DataItems'][0]['CollectionTime'] + podPhaseDimValue = record['DataItems'][0]['PodStatus'] + podNamespaceDimValue = record['DataItems'][0]['Namespace'] + podControllerNameDimValue = record['DataItems'][0]['ControllerName'] + podNodeDimValue = record['DataItems'][0]['Computer'] + + # group by distinct dimension values + pod_key = [podNodeDimValue, podNamespaceDimValue, podControllerNameDimValue, podPhaseDimValue].join('~~') + + if pod_count_hash.key?(pod_key) + pod_count = pod_count_hash[pod_key] + pod_count = pod_count + 1 + pod_count_hash[pod_key] = pod_count + else + pod_count = 1 + pod_count_hash[pod_key] = pod_count + end + + # Collect all possible combinations of dimension values other than pod phase + key_without_phase_dim_value = [podNodeDimValue, podNamespaceDimValue, podControllerNameDimValue].join('~~') + if no_phase_dim_values_hash.key?(key_without_phase_dim_value) + @log.info "#{key_without_phase_dim_value} already present in #{no_phase_dim_values_hash}" + next + else + @log.info "Adding #{key_without_phase_dim_value} to #{no_phase_dim_values_hash}" + no_phase_dim_values_hash[key_without_phase_dim_value] = true + end + } + + # generate all possible values of non_phase_dim_values X pod Phases and zero-fill the ones that are not already present + no_phase_dim_values_hash.each {|key, value| + @@pod_phase_values.each{|phase| + pod_key = [key, phase].join('~~') + if !pod_count_hash.key?(pod_key) + pod_count_hash[pod_key] = 0 + @log.info "Zero filled #{pod_key}" + else + next + end + } + } + + pod_count_hash.each {|key, value| + + key_elements = key.split('~~') + if key_elements.length != 4 + next + end + + # get dimension values by key + podNodeDimValue = key_elements[0] + podNamespaceDimValue = key_elements[1] + podControllerNameDimValue = key_elements[2] + podPhaseDimValue = key_elements[3] + + record = @@pod_inventory_custom_metrics_template % { + timestamp: timestamp, + metricName: @@pod_count_metric_name, + phaseDimValue: podPhaseDimValue, + namespaceDimValue: podNamespaceDimValue, + nodeDimValue: podNodeDimValue, + controllerNameDimValue: podControllerNameDimValue, + podCountMetricValue: value + } + records.push(JSON.parse(record)) + } + rescue Exception => e + @log.info "Error processing pod inventory record Exception: #{e.class} Message: #{e.message}" + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + return [],timestamp + end + return records, timestamp + end + + def filter_stream(tag, es) + new_es = MultiEventStream.new + filtered_records = [] + time = DateTime.now + begin + if @process_incoming_stream + @log.info 'Processing NODE inventory records in filter plugin to send to MDM' + if tag.downcase.start_with?(@@node_inventory_tag) + filtered_records, time = process_node_inventory_records(es) + elsif tag.downcase.start_with?(@@pod_inventory_tag) + @log.info 'Processing POD inventory records in filter plugin to send to MDM' + filtered_records, time = process_pod_inventory_records(es) + else + filtered_records = [] + end + end + filtered_records.each {|filtered_record| + new_es.add(time, filtered_record) if filtered_record + } if filtered_records + rescue => e + @log.info "Exception in filter_stream #{e}" + end + new_es + end + end +end diff --git a/source/code/plugin/in_cadvisor_perf.rb b/source/code/plugin/in_cadvisor_perf.rb index 5b551f74e..a857aa6b9 100644 --- a/source/code/plugin/in_cadvisor_perf.rb +++ b/source/code/plugin/in_cadvisor_perf.rb @@ -18,6 +18,7 @@ def initialize config_param :run_interval, :time, :default => '1m' config_param :tag, :string, :default => "oms.api.cadvisorperf" + config_param :mdmtag, :string, :default => "mdm.cadvisorperf" def configure (conf) super @@ -55,6 +56,7 @@ def enumerate() end router.emit_stream(@tag, eventStream) if eventStream + router.emit_stream(@mdmtag, eventStream) if eventStream @@istestvar = ENV['ISTEST'] if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0) $log.info("cAdvisorPerfEmitStreamSuccess @ #{Time.now.utc.iso8601}") diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index a6908fc99..ba1dacbe0 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -6,7 +6,8 @@ module Fluent class Kube_nodeInventory_Input < Input Plugin.register_input('kubenodeinventory', self) - @@ContainerNodeInventoryTag = 'oms.api.ContainerNodeInventory' + @@ContainerNodeInventoryTag = 'oms.containerinsights.ContainerNodeInventory' + @@MDMKubeNodeInventoryTag = 'mdm.kubenodeinventory' def initialize super @@ -109,7 +110,12 @@ def enumerate dockerVersion.slice! "docker://" containerNodeInventoryRecord['DockerVersion'] = dockerVersion # ContainerNodeInventory data for docker version and operating system. - containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryRecord) if containerNodeInventoryRecord + containerNodeInventoryWrapper = { + "DataType"=>"CONTAINER_NODE_INVENTORY_BLOB", + "IPName"=>"ContainerInsights", + "DataItems"=>[containerNodeInventoryRecord.each{|k,v| containerNodeInventoryRecord[k]=v}] + } + containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryWrapper) if containerNodeInventoryWrapper wrapper = { "DataType"=>"KUBE_NODE_INVENTORY_BLOB", @@ -131,6 +137,7 @@ def enumerate end end router.emit_stream(@tag, eventStream) if eventStream + router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream if telemetrySent == true @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index eaf14b035..3d026b05f 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -6,6 +6,8 @@ module Fluent class Kube_PodInventory_Input < Input Plugin.register_input('kubepodinventory', self) + @@MDMKubePodInventoryTag = 'mdm.kubepodinventory' + def initialize super require 'yaml' @@ -171,6 +173,7 @@ def parse_and_emit_records(podInventory, serviceList) containerRestartCount = container['restartCount'] record['ContainerRestartCount'] = containerRestartCount containerStatus = container['state'] + record['ContainerStatusReason'] = '' # state is of the following form , so just picking up the first key name # "state": { # "waiting": { @@ -188,6 +191,10 @@ def parse_and_emit_records(podInventory, serviceList) #Picking up both container and node start time from cAdvisor to be consistent if containerStatus.keys[0] == "running" record['ContainerCreationTimeStamp'] = container['state']['running']['startedAt'] + else + if !containerStatus[containerStatus.keys[0]]['reason'].nil? && !containerStatus[containerStatus.keys[0]]['reason'].empty? + record['ContainerStatusReason'] = containerStatus[containerStatus.keys[0]]['reason'] + end end podRestartCount += containerRestartCount records.push(record.dup) @@ -208,6 +215,7 @@ def parse_and_emit_records(podInventory, serviceList) end end #podInventory block end router.emit_stream(@tag, eventStream) if eventStream + router.emit_stream(@@MDMKubePodInventoryTag, eventStream) if eventStream if telemetryFlush == true ApplicationInsightsUtility.sendHeartBeatEvent("KubePodInventory") ApplicationInsightsUtility.sendMetricTelemetry("PodCount", podInventory['items'].length , {}) diff --git a/source/code/plugin/out_mdm.rb b/source/code/plugin/out_mdm.rb new file mode 100644 index 000000000..6bde98534 --- /dev/null +++ b/source/code/plugin/out_mdm.rb @@ -0,0 +1,244 @@ +module Fluent + + class OutputMDM < BufferedOutput + + config_param :retry_mdm_post_wait_minutes, :integer + + Plugin.register_output('out_mdm', self) + + def initialize + super + require 'net/http' + require 'net/https' + require 'uri' + require 'json' + require_relative 'KubernetesApiClient' + require_relative 'ApplicationInsightsUtility' + + @@token_resource_url = 'https://monitoring.azure.com/' + @@grant_type = 'client_credentials' + @@azure_json_path = '/etc/kubernetes/host/azure.json' + @@post_request_url_template = "https://%{aks_region}.monitoring.azure.com%{aks_resource_id}/metrics" + @@token_url_template = "https://login.microsoftonline.com/%{tenant_id}/oauth2/token" + @@plugin_name = "AKSCustomMetricsMDM" + + @data_hash = {} + @token_url = nil + @http_client = nil + @token_expiry_time = Time.now + @cached_access_token = String.new + @last_post_attempt_time = Time.now + @first_post_attempt_made = false + end + + def configure(conf) + s = conf.add_element("secondary") + s["type"] = ChunkErrorHandler::SecondaryName + super + end + + def start + super + file = File.read(@@azure_json_path) + # Handle the case where the file read fails. Send Telemetry and exit the plugin? + @data_hash = JSON.parse(file) + @token_url = @@token_url_template % {tenant_id: @data_hash['tenantId']} + @cached_access_token = get_access_token + aks_resource_id = ENV['AKS_RESOURCE_ID'] + aks_region = ENV['AKS_REGION'] + if aks_resource_id.to_s.empty? + @log.info "Environment Variable AKS_RESOURCE_ID is not set.. " + raise Exception.new "Environment Variable AKS_RESOURCE_ID is not set!!" + end + if aks_region.to_s.empty? + @log.info "Environment Variable AKS_REGION is not set.. " + raise Exception.new "Environment Variable AKS_REGION is not set!!" + end + + @@post_request_url = @@post_request_url_template % {aks_region: aks_region, aks_resource_id: aks_resource_id} + @post_request_uri = URI.parse(@@post_request_url) + @http_client = Net::HTTP.new(@post_request_uri.host, @post_request_uri.port) + @http_client.use_ssl = true + @log.info "POST Request url: #{@@post_request_url}" + ApplicationInsightsUtility.sendCustomEvent("AKSCustomMetricsMDMPluginStart", {}) + end + + # get the access token only if the time to expiry is less than 5 minutes + def get_access_token + if @cached_access_token.to_s.empty? || (Time.now + 5*60 > @token_expiry_time) # token is valid for 60 minutes. Refresh token 5 minutes from expiration + @log.info "Refreshing access token for out_mdm plugin.." + token_uri = URI.parse(@token_url) + http_access_token = Net::HTTP.new(token_uri.host, token_uri.port) + http_access_token.use_ssl = true + token_request = Net::HTTP::Post.new(token_uri.request_uri) + token_request.set_form_data( + { + 'grant_type' => @@grant_type, + 'client_id' => @data_hash['aadClientId'], + 'client_secret' => @data_hash['aadClientSecret'], + 'resource' => @@token_resource_url + } + ) + + token_response = http_access_token.request(token_request) + # Handle the case where the response is not 200 + parsed_json = JSON.parse(token_response.body) + @token_expiry_time = Time.now + 59*60 # set the expiry time to be ~one hour from current time + @cached_access_token = parsed_json['access_token'] + end + @cached_access_token + end + + def write_status_file(success, message) + fn = '/var/opt/microsoft/omsagent/log/MDMIngestion.status' + status = '{ "operation": "MDMIngestion", "success": "%s", "message": "%s" }' % [success, message] + begin + File.open(fn,'w') { |file| file.write(status) } + rescue => e + @log.debug "Error:'#{e}'" + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + end + end + + # This method is called when an event reaches to Fluentd. + # Convert the event to a raw string. + def format(tag, time, record) + if record != {} + @log.trace "Buffering #{tag}" + return [tag, record].to_msgpack + else + return "" + end + end + + # This method is called every flush interval. Send the buffer chunk to MDM. + # 'chunk' is a buffer chunk that includes multiple formatted records + def write(chunk) + begin + if !@first_post_attempt_made || (Time.now > @last_post_attempt_time + retry_mdm_post_wait_minutes*60) + post_body = [] + chunk.msgpack_each {|(tag, record)| + post_body.push(record.to_json) + } + send_to_mdm post_body + else + @log.info "Last Failed POST attempt to MDM was made #{((Time.now - @last_post_attempt_time)/60).round(1)} min ago. This is less than the current retry threshold of #{@retry_mdm_post_wait_minutes} min. NO-OP" + end + rescue Exception => e + @log.info "Exception when writing to MDM: #{e}" + raise e + end + end + + def send_to_mdm(post_body) + begin + access_token = get_access_token + request = Net::HTTP::Post.new(@post_request_uri.request_uri) + request['Content-Type'] = "application/x-ndjson" + request['Authorization'] = "Bearer #{access_token}" + request.body = post_body.join("\n") + response = @http_client.request(request) + response.value # this throws for non 200 HTTP response code + @log.info "HTTP Post Response Code : #{response.code}" + ApplicationInsightsUtility.sendCustomEvent("AKSCustomMetricsMDMSendSuccessful", {}) + rescue Net::HTTPServerException => e + @log.info "Failed to Post Metrics to MDM : #{e} Response: #{response}" + @log.debug_backtrace(e.backtrace) + if !response.code.empty? && response.code == 403.to_s + @log.info "Response Code #{response.code} Updating @last_post_attempt_time" + @last_post_attempt_time = Time.now + @first_post_attempt_made = true + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + # Not raising exception, as that will cause retries to happen + elsif !response.code.empty? && response.code.start_with?('4') + # Log 400 errors and continue + @log.info "Non-retryable HTTPServerException when POSTing Metrics to MDM #{e} Response: #{response}" + else + # raise if the response code is non-400 + @log.info "HTTPServerException when POSTing Metrics to MDM #{e} Response: #{response}" + raise e + end + rescue Errno::ETIMEDOUT => e + @log.info "Timed out when POSTing Metrics to MDM : #{e} Response: #{response}" + @log.debug_backtrace(e.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + raise e + rescue Exception => e + @log.info "Exception POSTing Metrics to MDM : #{e} Response: #{response}" + @log.debug_backtrace(e.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(e.backtrace) + raise e + end + end + private + + class ChunkErrorHandler + include Configurable + include PluginId + include PluginLoggerMixin + + SecondaryName = "__ChunkErrorHandler__" + + Plugin.register_output(SecondaryName, self) + + def initialize + @router = nil + end + + def secondary_init(primary) + @error_handlers = create_error_handlers @router + end + + def start + # NOP + end + + def shutdown + # NOP + end + + def router=(r) + @router = r + end + + def write(chunk) + chunk.msgpack_each {|(tag, record)| + @error_handlers[tag].emit(record) + } + end + + private + + def create_error_handlers(router) + nop_handler = NopErrorHandler.new + Hash.new() { |hash, tag| + etag = OMS::Common.create_error_tag tag + hash[tag] = router.match?(etag) ? + ErrorHandler.new(router, etag) : + nop_handler + } + end + + class ErrorHandler + def initialize(router, etag) + @router = router + @etag = etag + end + + def emit(record) + @router.emit(@etag, Fluent::Engine.now, record) + end + end + + class NopErrorHandler + def emit(record) + # NOP + end + end + + end + + end # class OutputMDM + +end # module Fluent +