diff --git a/installer/conf/td-agent-bit-rs.conf b/installer/conf/td-agent-bit-rs.conf index 7945261aa..7839b0eee 100644 --- a/installer/conf/td-agent-bit-rs.conf +++ b/installer/conf/td-agent-bit-rs.conf @@ -4,16 +4,6 @@ Parsers_File /etc/td-agent-bit/parsers.conf Log_File /var/opt/microsoft/docker-cimprov/log/fluent-bit.log -[INPUT] - Name tail - Tag oms.container.log.telegraf.err.* - Path /var/opt/microsoft/docker-cimprov/log/telegraf.log - DB /var/opt/microsoft/docker-cimprov/state/telegraf-log-state.db - Mem_Buf_Limit 2m - Path_Key filepath - Skip_Long_Lines On - Ignore_Older 5m - [INPUT] Name tcp Tag oms.container.perf.telegraf.* diff --git a/installer/conf/td-agent-bit.conf b/installer/conf/td-agent-bit.conf index 2dee26234..e7aabd242 100644 --- a/installer/conf/td-agent-bit.conf +++ b/installer/conf/td-agent-bit.conf @@ -6,7 +6,7 @@ [INPUT] Name tail - Tag oms.container.log.* + Tag oms.container.log.la.* Path ${AZMON_LOG_TAIL_PATH} DB /var/log/omsagent-fblogs.db DB.Sync Off @@ -32,17 +32,6 @@ Skip_Long_Lines On Ignore_Older 2m -[INPUT] - Name tail - Tag oms.container.log.telegraf.err.* - Path /var/opt/microsoft/docker-cimprov/log/telegraf.log - DB /var/opt/microsoft/docker-cimprov/state/telegraf-log-state.db - DB.Sync Off - Mem_Buf_Limit 1m - Path_Key filepath - Skip_Long_Lines On - Ignore_Older 2m - [INPUT] Name tcp Tag oms.container.perf.telegraf.* @@ -53,9 +42,16 @@ [FILTER] Name grep - Match oms.container.log.* + Match oms.container.log.la.* Exclude stream ${AZMON_LOG_EXCLUSION_REGEX_PATTERN} +# Exclude prometheus plugin exceptions that might be caused due to invalid config.(Logs which contain - E! [inputs.prometheus]) +# Excluding these logs from being sent to AI since it can result in high volume of data in telemetry due to invalid config. +[FILTER] + Name grep + Match oms.container.log.flbplugin.* + Exclude log E! [\[]inputs.prometheus[\]] + [OUTPUT] Name oms EnableTelemetry true diff --git a/installer/conf/telegraf-rs.conf b/installer/conf/telegraf-rs.conf index cb9a36685..ce60bfa04 100644 --- a/installer/conf/telegraf-rs.conf +++ b/installer/conf/telegraf-rs.conf @@ -77,7 +77,7 @@ ## Run telegraf in quiet mode (error log messages only). quiet = true ## Specify the log file name. The empty string means to log to stderr. - logfile = "/var/opt/microsoft/docker-cimprov/log/telegraf.log" + logfile = "" ## Override default hostname, if empty use os.Hostname() #hostname = "placeholder_hostname" @@ -536,32 +536,75 @@ #tagexclude = ["AgentVersion","AKS_RESOURCE_ID","ACS_RESOURCE_NAME", "Region", "ClusterName", "ClusterType", "Computer", "ControllerType"] # [inputs.prometheus.tagpass] -[[inputs.exec]] - ## Commands array - interval = "15m" - commands = [ - "/opt/microsoft/docker-cimprov/bin/TelegrafTCPErrorTelemetry.sh" - ] +#Prometheus Custom Metrics +[[inputs.prometheus]] + interval = "$AZMON_RS_PROM_INTERVAL" - ## Timeout for each command to complete. - timeout = "15s" + ## An array of urls to scrape metrics from. + urls = $AZMON_RS_PROM_URLS + + ## An array of Kubernetes services to scrape metrics from. + kubernetes_services = $AZMON_RS_PROM_K8S_SERVICES + + ## Scrape Kubernetes pods for the following prometheus annotations: + ## - prometheus.io/scrape: Enable scraping for this pod + ## - prometheus.io/scheme: If the metrics endpoint is secured then you will need to + ## set this to `https` & most likely set the tls config. + ## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation. + ## - prometheus.io/port: If port is not 9102 use this annotation + monitor_kubernetes_pods = $AZMON_RS_PROM_MONITOR_PODS - ## measurement name suffix (for separating different commands) - name_suffix = "_telemetry" + fieldpass = $AZMON_RS_PROM_FIELDPASS + fielddrop = $AZMON_RS_PROM_FIELDDROP - ## Data format to consume. - ## Each data format has its own unique set of configuration options, read - ## more about them here: - ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - data_format = "influx" - #tagexclude = ["hostName"] - [inputs.exec.tags] - AgentVersion = "$AGENT_VERSION" - AKS_RESOURCE_ID = "$TELEMETRY_AKS_RESOURCE_ID" - ACS_RESOURCE_NAME = "$TELEMETRY_ACS_RESOURCE_NAME" - Region = "$TELEMETRY_AKS_REGION" - ClusterName = "$TELEMETRY_CLUSTER_NAME" - ClusterType = "$TELEMETRY_CLUSTER_TYPE" - Computer = "placeholder_hostname" - ControllerType = "$CONTROLLER_TYPE" + metric_version = 2 + url_tag = "scrapeUrl" + + ## Kubernetes config file to create client from. + # kube_config = "/path/to/kubernetes.config" + + ## Use bearer token for authorization. ('bearer_token' takes priority) + bearer_token = "/var/run/secrets/kubernetes.io/serviceaccount/token" + ## OR + # bearer_token_string = "abc_123" + + ## Specify timeout duration for slower prometheus clients (default is 3s) + response_timeout = "15s" + + ## Optional TLS Config + tls_ca = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + #tls_cert = /path/to/certfile + # tls_key = /path/to/keyfile + ## Use TLS but skip chain & host verification + insecure_skip_verify = true + #tagexclude = ["AgentVersion","AKS_RESOURCE_ID","ACS_RESOURCE_NAME", "Region", "ClusterName", "ClusterType", "Computer", "ControllerType"] + +# [[inputs.exec]] +# ## Commands array +# interval = "15m" +# commands = [ +# "/opt/microsoft/docker-cimprov/bin/TelegrafTCPErrorTelemetry.sh" +# ] + +# ## Timeout for each command to complete. +# timeout = "15s" + +# ## measurement name suffix (for separating different commands) +# name_suffix = "_telemetry" + +# ## Data format to consume. +# ## Each data format has its own unique set of configuration options, read +# ## more about them here: +# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md +# data_format = "influx" +# #tagexclude = ["hostName"] +# [inputs.exec.tags] +# AgentVersion = "$AGENT_VERSION" +# AKS_RESOURCE_ID = "$TELEMETRY_AKS_RESOURCE_ID" +# ACS_RESOURCE_NAME = "$TELEMETRY_ACS_RESOURCE_NAME" +# Region = "$TELEMETRY_AKS_REGION" +# ClusterName = "$TELEMETRY_CLUSTER_NAME" +# ClusterType = "$TELEMETRY_CLUSTER_TYPE" +# Computer = "placeholder_hostname" +# ControllerType = "$CONTROLLER_TYPE" diff --git a/installer/conf/telegraf.conf b/installer/conf/telegraf.conf index 06b1c55eb..4883de81b 100644 --- a/installer/conf/telegraf.conf +++ b/installer/conf/telegraf.conf @@ -77,8 +77,7 @@ ## Run telegraf in quiet mode (error log messages only). quiet = true ## Specify the log file name. The empty string means to log to stderr. - logfile = "/var/opt/microsoft/docker-cimprov/log/telegraf.log" - + logfile = "" ## Override default hostname, if empty use os.Hostname() #hostname = "placeholder_hostname" ## If set to true, do no set the "host" tag in the telegraf agent. @@ -568,31 +567,66 @@ insecure_skip_verify = true #tagexclude = ["AgentVersion","AKS_RESOURCE_ID","ACS_RESOURCE_NAME", "Region", "ClusterName", "ClusterType", "Computer", "ControllerType"] -[[inputs.exec]] - ## Commands array - interval = "15m" - commands = [ - "/opt/microsoft/docker-cimprov/bin/TelegrafTCPErrorTelemetry.sh" - ] - ## Timeout for each command to complete. - timeout = "15s" +## prometheus custom metrics +[[inputs.prometheus]] - ## measurement name suffix (for separating different commands) - name_suffix = "_telemetry" + interval = "$AZMON_DS_PROM_INTERVAL" - ## Data format to consume. - ## Each data format has its own unique set of configuration options, read - ## more about them here: - ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md - data_format = "influx" - tagexclude = ["hostName"] - [inputs.exec.tags] - AgentVersion = "$AGENT_VERSION" - AKS_RESOURCE_ID = "$TELEMETRY_AKS_RESOURCE_ID" - ACS_RESOURCE_NAME = "$TELEMETRY_ACS_RESOURCE_NAME" - Region = "$TELEMETRY_AKS_REGION" - ClusterName = "$TELEMETRY_CLUSTER_NAME" - ClusterType = "$TELEMETRY_CLUSTER_TYPE" - Computer = "placeholder_hostname" - ControllerType = "$CONTROLLER_TYPE" \ No newline at end of file + ## An array of urls to scrape metrics from. + urls = $AZMON_DS_PROM_URLS + + fieldpass = $AZMON_DS_PROM_FIELDPASS + + fielddrop = $AZMON_DS_PROM_FIELDDROP + + metric_version = 2 + url_tag = "scrapeUrl" + + ## Kubernetes config file to create client from. + # kube_config = "/path/to/kubernetes.config" + + ## Use bearer token for authorization. ('bearer_token' takes priority) + bearer_token = "/var/run/secrets/kubernetes.io/serviceaccount/token" + ## OR + # bearer_token_string = "abc_123" + + ## Specify timeout duration for slower prometheus clients (default is 3s) + response_timeout = "15s" + + ## Optional TLS Config + tls_ca = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" + #tls_cert = /path/to/certfile + # tls_key = /path/to/keyfile + ## Use TLS but skip chain & host verification + insecure_skip_verify = true + #tagexclude = ["AgentVersion","AKS_RESOURCE_ID","ACS_RESOURCE_NAME", "Region", "ClusterName", "ClusterType", "Computer", "ControllerType"] + +# [[inputs.exec]] +# ## Commands array +# interval = "15m" +# commands = [ +# "/opt/microsoft/docker-cimprov/bin/TelegrafTCPErrorTelemetry.sh" +# ] + +# ## Timeout for each command to complete. +# timeout = "15s" + +# ## measurement name suffix (for separating different commands) +# name_suffix = "_telemetry" + +# ## Data format to consume. +# ## Each data format has its own unique set of configuration options, read +# ## more about them here: +# ## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md +# data_format = "influx" +# tagexclude = ["hostName"] +# [inputs.exec.tags] +# AgentVersion = "$AGENT_VERSION" +# AKS_RESOURCE_ID = "$TELEMETRY_AKS_RESOURCE_ID" +# ACS_RESOURCE_NAME = "$TELEMETRY_ACS_RESOURCE_NAME" +# Region = "$TELEMETRY_AKS_REGION" +# ClusterName = "$TELEMETRY_CLUSTER_NAME" +# ClusterType = "$TELEMETRY_CLUSTER_TYPE" +# Computer = "placeholder_hostname" +# ControllerType = "$CONTROLLER_TYPE" \ No newline at end of file diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data index 58a74aa0a..fe1635335 100644 --- a/installer/datafiles/base_container.data +++ b/installer/datafiles/base_container.data @@ -110,9 +110,10 @@ MAINTAINER: 'Microsoft Corporation' /etc/opt/microsoft/docker-cimprov/out_oms.conf; installer/conf/out_oms.conf; 644; root; root /etc/opt/microsoft/docker-cimprov/telegraf.conf; installer/conf/telegraf.conf; 644; root; root /etc/opt/microsoft/docker-cimprov/telegraf-rs.conf; installer/conf/telegraf-rs.conf; 644; root; root -/opt/microsoft/docker-cimprov/bin/TelegrafTCPErrorTelemetry.sh; installer/scripts/TelegrafTCPErrorTelemetry.sh; 755; root; root +/opt/microsoft/docker-cimprov/bin/TelegrafTCPErrorTelemetry.sh; installer/scripts/TelegrafTCPErrorTelemetry.sh; 755; root; root /opt/livenessprobe.sh; installer/scripts/livenessprobe.sh; 755; root; root /opt/tomlparser.rb; installer/scripts/tomlparser.rb; 755; root; root +/opt/tomlparser-prom-customconfig.rb; installer/scripts/tomlparser-prom-customconfig.rb; 755; root; root %Links /opt/omi/lib/libcontainer.${{SHLIB_EXT}}; /opt/microsoft/docker-cimprov/lib/libcontainer.${{SHLIB_EXT}}; 644; root; root diff --git a/installer/scripts/tomlparser-prom-customconfig.rb b/installer/scripts/tomlparser-prom-customconfig.rb new file mode 100644 index 000000000..d9fdf1cc2 --- /dev/null +++ b/installer/scripts/tomlparser-prom-customconfig.rb @@ -0,0 +1,200 @@ +#!/usr/local/bin/ruby + +require_relative "tomlrb" +require "fileutils" + +@promConfigMapMountPath = "/etc/config/settings/prometheus-data-collection-settings" +@replicaset = "replicaset" +@daemonset = "daemonset" +@configSchemaVersion = "" +@defaultDsInterval = "1m" +@defaultDsPromUrls = [] +@defaultDsFieldPass = [] +@defaultDsFieldDrop = [] +@defaultRsInterval = "1m" +@defaultRsPromUrls = [] +@defaultRsFieldPass = [] +@defaultRsFieldDrop = [] +@defaultRsK8sServices = [] +@defaultRsMonitorPods = false + +# Use parser to parse the configmap toml file to a ruby structure +def parseConfigMap + begin + # Check to see if config map is created + if (File.file?(@promConfigMapMountPath)) + puts "config::configmap container-azm-ms-agentconfig for settings mounted, parsing values for prometheus config map" + parsedConfig = Tomlrb.load_file(@promConfigMapMountPath, symbolize_keys: true) + puts "config::Successfully parsed mounted prometheus config map" + return parsedConfig + else + puts "config::configmap container-azm-ms-agentconfig for settings not mounted, using defaults for prometheus scraping" + return nil + end + rescue => errorStr + puts "config::error::Exception while parsing toml config file for prometheus config: #{errorStr}, using defaults" + return nil + end +end + +def checkForTypeArray(arrayValue, arrayType) + if (arrayValue.nil? || (arrayValue.kind_of?(Array) && arrayValue.length > 0 && arrayValue[0].kind_of?(arrayType))) + return true + else + return false + end +end + +def checkForType(variable, varType) + if variable.nil? || variable.kind_of?(varType) + return true + else + return false + end +end + +# Use the ruby structure created after config parsing to set the right values to be used as environment variables +def populateSettingValuesFromConfigMap(parsedConfig) + # Checking to see if this is the daemonset or replicaset to parse config accordingly + controller = ENV["CONTROLLER_TYPE"] + if !controller.nil? + if !parsedConfig.nil? && !parsedConfig[:prometheus_data_collection_settings].nil? + if controller.casecmp(@replicaset) == 0 && !parsedConfig[:prometheus_data_collection_settings][:cluster].nil? + #Get prometheus replicaset custom config settings + begin + interval = parsedConfig[:prometheus_data_collection_settings][:cluster][:interval] + fieldPass = parsedConfig[:prometheus_data_collection_settings][:cluster][:fieldpass] + fieldDrop = parsedConfig[:prometheus_data_collection_settings][:cluster][:fielddrop] + urls = parsedConfig[:prometheus_data_collection_settings][:cluster][:urls] + kubernetesServices = parsedConfig[:prometheus_data_collection_settings][:cluster][:kubernetes_services] + monitorKubernetesPods = parsedConfig[:prometheus_data_collection_settings][:cluster][:monitor_kubernetes_pods] + + # Check for the right datattypes to enforce right setting values + if checkForType(interval, String) && + checkForTypeArray(fieldPass, String) && + checkForTypeArray(fieldDrop, String) && + checkForTypeArray(kubernetesServices, String) && + checkForTypeArray(urls, String) && + !monitorKubernetesPods.nil? && (!!monitorKubernetesPods == monitorKubernetesPods) #Checking for Boolean type, since 'Boolean' is not defined as a type in ruby + puts "config::Successfully passed typecheck for config settings for replicaset" + #if setting is nil assign default values + interval = (interval.nil?) ? @defaultRsInterval : interval + fieldPass = (fieldPass.nil?) ? @defaultRsFieldPass : fieldPass + fieldDrop = (fieldDrop.nil?) ? @defaultRsFieldDrop : fieldDrop + kubernetesServices = (kubernetesServices.nil?) ? @defaultRsK8sServices : kubernetesServices + urls = (urls.nil?) ? @defaultRsPromUrls : urls + monitorKubernetesPods = (kubernetesServices.nil?) ? @defaultRsMonitorPods : monitorKubernetesPods + + file_name = "/opt/telegraf-test-rs.conf" + # Copy the telegraf config file to a temp file to run telegraf in test mode with this config + FileUtils.cp("/etc/opt/microsoft/docker-cimprov/telegraf-rs.conf", file_name) + + puts "config::Starting to substitute the placeholders in telegraf conf copy file for replicaset" + #Replace the placeholder config values with values from custom config + text = File.read(file_name) + new_contents = text.gsub("$AZMON_RS_PROM_INTERVAL", interval) + new_contents = new_contents.gsub("$AZMON_RS_PROM_FIELDPASS", ((fieldPass.length > 0) ? ("[\"" + fieldPass.join("\",\"") + "\"]") : "[]")) + new_contents = new_contents.gsub("$AZMON_RS_PROM_FIELDDROP", ((fieldDrop.length > 0) ? ("[\"" + fieldDrop.join("\",\"") + "\"]") : "[]")) + new_contents = new_contents.gsub("$AZMON_RS_PROM_URLS", ((urls.length > 0) ? ("[\"" + urls.join("\",\"") + "\"]") : "[]")) + new_contents = new_contents.gsub("$AZMON_RS_PROM_K8S_SERVICES", ((kubernetesServices.length > 0) ? ("[\"" + kubernetesServices.join("\",\"") + "\"]") : "[]")) + new_contents = new_contents.gsub("$AZMON_RS_PROM_MONITOR_PODS", (monitorKubernetesPods ? "true" : "false")) + File.open(file_name, "w") { |file| file.puts new_contents } + puts "config::Successfully substituted the placeholders in telegraf conf file for replicaset" + #Set environment variables for telemetry + file = File.open("telemetry_prom_config_env_var", "w") + if !file.nil? + file.write("export TELEMETRY_RS_PROM_INTERVAL=\"#{interval}\"\n") + #Setting array lengths as environment variables for telemetry purposes + file.write("export TELEMETRY_RS_PROM_FIELDPASS_LENGTH=\"#{fieldPass.length}\"\n") + file.write("export TELEMETRY_RS_PROM_FIELDDROP_LENGTH=\"#{fieldDrop.length}\"\n") + file.write("export TELEMETRY_RS_PROM_K8S_SERVICES_LENGTH=#{kubernetesServices.length}\n") + file.write("export TELEMETRY_RS_PROM_URLS_LENGTH=#{urls.length}\n") + file.write("export TELEMETRY_RS_PROM_MONITOR_PODS=\"#{monitorKubernetesPods}\"\n") + # Close file after writing all environment variables + file.close + puts "config::Successfully created telemetry file for replicaset" + end + else + puts "config::Typecheck failed for prometheus config settings for replicaset, using defaults" + end # end of type check condition + rescue => errorStr + puts "config::error::Exception while parsing config file for prometheus config for replicaset: #{errorStr}, using defaults" + setRsPromDefaults + puts "****************End Prometheus Config Processing********************" + end + elsif controller.casecmp(@daemonset) == 0 && !parsedConfig[:prometheus_data_collection_settings][:node].nil? + #Get prometheus daemonset custom config settings + begin + interval = parsedConfig[:prometheus_data_collection_settings][:node][:interval] + fieldPass = parsedConfig[:prometheus_data_collection_settings][:node][:fieldpass] + fieldDrop = parsedConfig[:prometheus_data_collection_settings][:node][:fielddrop] + urls = parsedConfig[:prometheus_data_collection_settings][:node][:urls] + + # Check for the right datattypes to enforce right setting values + if checkForType(interval, String) && + checkForTypeArray(fieldPass, String) && + checkForTypeArray(fieldDrop, String) && + checkForTypeArray(urls, String) + puts "config::Successfully passed typecheck for config settings for daemonset" + + #if setting is nil assign default values + interval = (interval.nil?) ? @defaultDsInterval : interval + fieldPass = (fieldPass.nil?) ? @defaultDsFieldPass : fieldPass + fieldDrop = (fieldDrop.nil?) ? @defaultDsFieldDrop : fieldDrop + urls = (urls.nil?) ? @defaultDsPromUrls : urls + + file_name = "/opt/telegraf-test.conf" + # Copy the telegraf config file to a temp file to run telegraf in test mode with this config + FileUtils.cp("/etc/opt/microsoft/docker-cimprov/telegraf.conf", file_name) + + puts "config::Starting to substitute the placeholders in telegraf conf copy file for daemonset" + #Replace the placeholder config values with values from custom config + text = File.read(file_name) + new_contents = text.gsub("$AZMON_DS_PROM_INTERVAL", interval) + new_contents = new_contents.gsub("$AZMON_DS_PROM_FIELDPASS", ((fieldPass.length > 0) ? ("[\"" + fieldPass.join("\",\"") + "\"]") : "[]")) + new_contents = new_contents.gsub("$AZMON_DS_PROM_FIELDDROP", ((fieldDrop.length > 0) ? ("[\"" + fieldDrop.join("\",\"") + "\"]") : "[]")) + new_contents = new_contents.gsub("$AZMON_DS_PROM_URLS", ((urls.length > 0) ? ("[\"" + urls.join("\",\"") + "\"]") : "[]")) + File.open(file_name, "w") { |file| file.puts new_contents } + puts "config::Successfully substituted the placeholders in telegraf conf file for daemonset" + + #Set environment variables for telemetry + file = File.open("telemetry_prom_config_env_var", "w") + if !file.nil? + file.write("export TELEMETRY_DS_PROM_INTERVAL=\"#{interval}\"\n") + #Setting array lengths as environment variables for telemetry purposes + file.write("export TELEMETRY_DS_PROM_FIELDPASS_LENGTH=\"#{fieldPass.length}\"\n") + file.write("export TELEMETRY_DS_PROM_FIELDDROP_LENGTH=\"#{fieldDrop.length}\"\n") + file.write("export TELEMETRY_DS_PROM_URLS_LENGTH=#{urls.length}\n") + # Close file after writing all environment variables + file.close + puts "config::Successfully created telemetry file for daemonset" + end + else + puts "config::Typecheck failed for prometheus config settings for daemonset, using defaults" + end # end of type check condition + rescue => errorStr + puts "config::error::Exception while parsing config file for prometheus config for daemonset: #{errorStr}, using defaults" + puts "****************End Prometheus Config Processing********************" + end + end # end of controller type check + end + else + puts "config::error:: Controller undefined while processing prometheus config, using defaults" + end +end + +@configSchemaVersion = ENV["AZMON_AGENT_CFG_SCHEMA_VERSION"] +puts "****************Start Prometheus Config Processing********************" +if !@configSchemaVersion.nil? && !@configSchemaVersion.empty? && @configSchemaVersion.strip.casecmp("v1") == 0 #note v1 is the only supported schema version , so hardcoding it + configMapSettings = parseConfigMap + if !configMapSettings.nil? + populateSettingValuesFromConfigMap(configMapSettings) + end +else + if (File.file?(@promConfigMapMountPath)) + puts "config::unsupported/missing config schema version - '#{@configSchemaVersion}' , using defaults" + else + puts "config::No configmap mounted for prometheus custom config, using defaults" + end +end +puts "****************End Prometheus Config Processing********************" diff --git a/installer/scripts/tomlparser.rb b/installer/scripts/tomlparser.rb index 3e7f48045..c72e64127 100644 --- a/installer/scripts/tomlparser.rb +++ b/installer/scripts/tomlparser.rb @@ -82,7 +82,7 @@ def populateSettingValuesFromConfigMap(parsedConfig) if @collectStderrLogs && !stderrNamespaces.nil? if stderrNamespaces.kind_of?(Array) if !@stdoutExcludeNamespaces.nil? && !@stdoutExcludeNamespaces.empty? - stdoutNamespaces = @stdoutExcludeNamespaces.split(',') + stdoutNamespaces = @stdoutExcludeNamespaces.split(",") end # Checking only for the first element to be string because toml enforces the arrays to contain elements of same type if stderrNamespaces.length > 0 && stderrNamespaces[0].kind_of?(String) @@ -119,47 +119,47 @@ def populateSettingValuesFromConfigMap(parsedConfig) end end - @configSchemaVersion = ENV['AZMON_AGENT_CFG_SCHEMA_VERSION'] - puts "****************Start Config Processing********************" - if !@configSchemaVersion.nil? && !@configSchemaVersion.empty? && @configSchemaVersion.strip.casecmp('v1') == 0 #note v1 is the only supported schema version , so hardcoding it - configMapSettings = parseConfigMap - if !configMapSettings.nil? - populateSettingValuesFromConfigMap(configMapSettings) - end - else - if (File.file?(@configMapMountPath)) - puts "config::unsupported/missing config schema version - '#{@configSchemaVersion}' , using defaults" - end - @excludePath = "*_kube-system_*.log" +@configSchemaVersion = ENV["AZMON_AGENT_CFG_SCHEMA_VERSION"] +puts "****************Start Config Processing********************" +if !@configSchemaVersion.nil? && !@configSchemaVersion.empty? && @configSchemaVersion.strip.casecmp("v1") == 0 #note v1 is the only supported schema version , so hardcoding it + configMapSettings = parseConfigMap + if !configMapSettings.nil? + populateSettingValuesFromConfigMap(configMapSettings) + end +else + if (File.file?(@configMapMountPath)) + puts "config::unsupported/missing config schema version - '#{@configSchemaVersion}' , using defaults" end + @excludePath = "*_kube-system_*.log" +end - # Write the settings to file, so that they can be set as environment variables - file = File.open("config_env_var", "w") +# Write the settings to file, so that they can be set as environment variables +file = File.open("config_env_var", "w") - if !file.nil? - # This will be used in td-agent-bit.conf file to filter out logs - if (!@collectStdoutLogs && !@collectStderrLogs) - #Stop log tailing completely - @logTailPath = "/opt/nolog*.log" - @logExclusionRegexPattern = "stdout|stderr" - elsif !@collectStdoutLogs - @logExclusionRegexPattern = "stdout" - elsif !@collectStderrLogs - @logExclusionRegexPattern = "stderr" - end - file.write("export AZMON_COLLECT_STDOUT_LOGS=#{@collectStdoutLogs}\n") - file.write("export AZMON_LOG_TAIL_PATH=#{@logTailPath}\n") - file.write("export AZMON_LOG_EXCLUSION_REGEX_PATTERN=\"#{@logExclusionRegexPattern}\"\n") - file.write("export AZMON_STDOUT_EXCLUDED_NAMESPACES=#{@stdoutExcludeNamespaces}\n") - file.write("export AZMON_COLLECT_STDERR_LOGS=#{@collectStderrLogs}\n") - file.write("export AZMON_STDERR_EXCLUDED_NAMESPACES=#{@stderrExcludeNamespaces}\n") - file.write("export AZMON_CLUSTER_COLLECT_ENV_VAR=#{@collectClusterEnvVariables}\n") - file.write("export AZMON_CLUSTER_LOG_TAIL_EXCLUDE_PATH=#{@excludePath}\n") - # Close file after writing all environment variables - file.close - puts "Both stdout & stderr log collection are turned off for namespaces: '#{@excludePath}' " - puts "****************End Config Processing********************" - else - puts "config::error::Exception while opening file for writing config environment variables" - puts "****************End Config Processing********************" +if !file.nil? + # This will be used in td-agent-bit.conf file to filter out logs + if (!@collectStdoutLogs && !@collectStderrLogs) + #Stop log tailing completely + @logTailPath = "/opt/nolog*.log" + @logExclusionRegexPattern = "stdout|stderr" + elsif !@collectStdoutLogs + @logExclusionRegexPattern = "stdout" + elsif !@collectStderrLogs + @logExclusionRegexPattern = "stderr" end + file.write("export AZMON_COLLECT_STDOUT_LOGS=#{@collectStdoutLogs}\n") + file.write("export AZMON_LOG_TAIL_PATH=#{@logTailPath}\n") + file.write("export AZMON_LOG_EXCLUSION_REGEX_PATTERN=\"#{@logExclusionRegexPattern}\"\n") + file.write("export AZMON_STDOUT_EXCLUDED_NAMESPACES=#{@stdoutExcludeNamespaces}\n") + file.write("export AZMON_COLLECT_STDERR_LOGS=#{@collectStderrLogs}\n") + file.write("export AZMON_STDERR_EXCLUDED_NAMESPACES=#{@stderrExcludeNamespaces}\n") + file.write("export AZMON_CLUSTER_COLLECT_ENV_VAR=#{@collectClusterEnvVariables}\n") + file.write("export AZMON_CLUSTER_LOG_TAIL_EXCLUDE_PATH=#{@excludePath}\n") + # Close file after writing all environment variables + file.close + puts "Both stdout & stderr log collection are turned off for namespaces: '#{@excludePath}' " + puts "****************End Config Processing********************" +else + puts "config::error::Exception while opening file for writing config environment variables" + puts "****************End Config Processing********************" +end diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go index b925e7145..319ff3551 100644 --- a/source/code/go/src/plugins/oms.go +++ b/source/code/go/src/plugins/oms.go @@ -34,14 +34,12 @@ const ResourceIdEnv = "AKS_RESOURCE_ID" //env variable which has ResourceName for NON-AKS const ResourceNameEnv = "ACS_RESOURCE_NAME" -// Origin prefix for telegraf Metrics (used as prefix for origin field & prefix for azure monitor specific tags) +// Origin prefix for telegraf Metrics (used as prefix for origin field & prefix for azure monitor specific tags and also for custom-metrics telemetry ) const TelegrafMetricOriginPrefix = "container.azm.ms" // Origin suffix for telegraf Metrics (used as suffix for origin field) const TelegrafMetricOriginSuffix = "telegraf" -// Namespace prefix for telegraf Metrics (used as prefix for Namespace field) -//const TelegrafMetricNamespacePrefix = "plugin" // clusterName tag const TelegrafTagClusterName = "clusterName" @@ -193,7 +191,6 @@ func updateContainerImageNameMaps() { if err != nil { message := fmt.Sprintf("Error getting pods %s\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) Log(message) - SendException(message) continue } @@ -226,7 +223,7 @@ func populateExcludedStdoutNamespaces() { if (strings.Compare(collectStdoutLogs, "true") == 0) && (len(excludeList) > 0) { stdoutNSExcludeList = strings.Split(excludeList, ",") for _, ns := range stdoutNSExcludeList { - Log ("Excluding namespace %s for stdout log collection", ns) + Log("Excluding namespace %s for stdout log collection", ns) StdoutIgnoreNsSet[strings.TrimSpace(ns)] = true } } @@ -239,7 +236,7 @@ func populateExcludedStderrNamespaces() { if (strings.Compare(collectStderrLogs, "true") == 0) && (len(excludeList) > 0) { stderrNSExcludeList = strings.Split(excludeList, ",") for _, ns := range stderrNSExcludeList { - Log ("Excluding namespace %s for stderr log collection", ns) + Log("Excluding namespace %s for stderr log collection", ns) StderrIgnoreNsSet[strings.TrimSpace(ns)] = true } } @@ -384,7 +381,6 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int if err != nil { message := fmt.Sprintf("PostTelegrafMetricsToLA::Error:(retriable) when sending %v metrics. duration:%v err:%q \n", len(laMetrics), elapsed, err.Error()) Log(message) - SendException(message) UpdateNumTelegrafMetricsSentTelemetry(0, 1) return output.FLB_RETRY } @@ -425,7 +421,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { nameIDMap := make(map[string]string) DataUpdateMutex.Lock() - + for k, v := range ImageIDMap { imageIDMap[k] = v } @@ -517,7 +513,8 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { if err != nil { message := fmt.Sprintf("Error when sending request %s \n", err.Error()) Log(message) - SendException(message) + // Commenting this out for now. TODO - Add better telemetry for ods errors using aggregation + //SendException(message) Log("Failed to flush %d records after %s", len(dataItems), elapsed) return output.FLB_RETRY @@ -561,7 +558,7 @@ func GetContainerIDK8sNamespaceFromFileName(filename string) (string, string) { start := strings.LastIndex(filename, "-") end := strings.LastIndex(filename, ".") - + if start >= end || start == -1 || end == -1 { id = "" } else { @@ -641,7 +638,6 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { Log("containerInventoryRefreshInterval = %d \n", containerInventoryRefreshInterval) ContainerImageNameRefreshTicker = time.NewTicker(time.Second * time.Duration(containerInventoryRefreshInterval)) - // Populate Computer field containerHostName, err := ioutil.ReadFile(pluginConfig["container_host_file_path"]) if err != nil { @@ -680,11 +676,11 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { CreateHTTPClient() - if strings.Compare(strings.ToLower(os.Getenv("CONTROLLER_TYPE")), "daemonset") == 0 { + if strings.Compare(strings.ToLower(os.Getenv("CONTROLLER_TYPE")), "daemonset") == 0 { populateExcludedStdoutNamespaces() populateExcludedStderrNamespaces() - go updateContainerImageNameMaps() - } else { + go updateContainerImageNameMaps() + } else { Log("Running in replicaset. Disabling container enrichment caching & updates \n") } } diff --git a/source/code/go/src/plugins/out_oms.go b/source/code/go/src/plugins/out_oms.go index 0fa2ddd4b..e9e7124b7 100644 --- a/source/code/go/src/plugins/out_oms.go +++ b/source/code/go/src/plugins/out_oms.go @@ -64,8 +64,6 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { return PushToAppInsightsTraces(records, appinsights.Information, incomingTag) } else if strings.Contains(incomingTag, "oms.container.perf.telegraf") { return PostTelegrafMetricsToLA(records) - } else if strings.Contains(incomingTag, "oms.container.log.telegraf.err") { - return PushToAppInsightsTraces(records, appinsights.Error, incomingTag) } return PostDataHelper(records) diff --git a/source/code/plugin/CAdvisorMetricsAPIClient.rb b/source/code/plugin/CAdvisorMetricsAPIClient.rb index b842edb29..ec38bcbb5 100644 --- a/source/code/plugin/CAdvisorMetricsAPIClient.rb +++ b/source/code/plugin/CAdvisorMetricsAPIClient.rb @@ -14,12 +14,31 @@ class CAdvisorMetricsAPIClient require_relative "ApplicationInsightsUtility" @configMapMountPath = "/etc/config/settings/log-data-collection-settings" + @promConfigMountPath = "/etc/config/settings/prometheus-data-collection-settings" @clusterEnvVarCollectionEnabled = ENV["AZMON_CLUSTER_COLLECT_ENV_VAR"] @clusterStdErrLogCollectionEnabled = ENV["AZMON_COLLECT_STDERR_LOGS"] @clusterStdOutLogCollectionEnabled = ENV["AZMON_COLLECT_STDOUT_LOGS"] @clusterLogTailExcludPath = ENV["AZMON_CLUSTER_LOG_TAIL_EXCLUDE_PATH"] @clusterLogTailPath = ENV["AZMON_LOG_TAIL_PATH"] @clusterAgentSchemaVersion = ENV["AZMON_AGENT_CFG_SCHEMA_VERSION"] + + @rsPromInterval = ENV["TELEMETRY_RS_PROM_INTERVAL"] + @dsPromInterval = ENV["TELEMETRY_DS_PROM_INTERVAL"] + + @rsPromFieldPassCount = ENV["TELEMETRY_RS_PROM_FIELDPASS_LENGTH"] + @dsPromFieldPassCount = ENV["TELEMETRY_DS_PROM_FIELDPASS_LENGTH"] + + @rsPromFieldDropCount = ENV["TELEMETRY_RS_PROM_FIELDDROP_LENGTH"] + @dsPromFieldDropCount = ENV["TELEMETRY_DS_PROM_FIELDDROP_LENGTH"] + + @rsPromK8sServiceCount = ENV["TELEMETRY_RS_PROM_K8S_SERVICES_LENGTH"] + + @rsPromUrlCount = ENV["TELEMETRY_RS_PROM_URLS_LENGTH"] + @dsPromUrlCount = ENV["TELEMETRY_DS_PROM_URLS_LENGTH"] + + @rsPromMonitorPods = ENV["TELEMETRY_RS_PROM_MONITOR_PODS"] + + @LogPath = "/var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt" @Log = Logger.new(@LogPath, 2, 10 * 1048576) #keep last 2 files, max log file size = 10M # @@rxBytesLast = nil @@ -199,7 +218,7 @@ def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, met telemetryProps["PodName"] = podName telemetryProps["ContainerName"] = containerName telemetryProps["Computer"] = hostName - #telemetry about custom log collections setting + #telemetry about log collections settings if (File.file?(@configMapMountPath)) telemetryProps["clustercustomsettings"] = true telemetryProps["clusterenvvars"] = @clusterEnvVarCollectionEnabled @@ -209,6 +228,19 @@ def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, met telemetryProps["clusterLogTailPath"] = @clusterLogTailPath telemetryProps["clusterAgentSchemaVersion"] = @clusterAgentSchemaVersion end + #telemetry about prometheus metric collections settings + if (File.file?(@promConfigMountPath)) + telemetryProps["rsPromInt"] = @rsPromInterval + telemetryProps["dsPromInt"] = @dsPromInterval + telemetryProps["rsPromFPC"] = @rsPromFieldPassCount + telemetryProps["dsPromFPC"] = @dsPromFieldPassCount + telemetryProps["rsPromFDC"] = @rsPromFieldDropCount + telemetryProps["dsPromFDC"] = @dsPromFieldDropCount + telemetryProps["rsPromServ"] = @rsPromK8sServiceCount + telemetryProps["rsPromUrl"] = @rsPromUrlCount + telemetryProps["dsPromUrl"] = @dsPromUrlCount + telemetryProps["rsPromMonPods"] = @rsPromMonitorPods + end ApplicationInsightsUtility.sendMetricTelemetry(metricNametoReturn, metricValue, telemetryProps) end end diff --git a/source/code/plugin/DockerApiClient.rb b/source/code/plugin/DockerApiClient.rb index 5a46b5fdb..eb9d74531 100644 --- a/source/code/plugin/DockerApiClient.rb +++ b/source/code/plugin/DockerApiClient.rb @@ -2,179 +2,196 @@ # frozen_string_literal: true class DockerApiClient + require "socket" + require "json" + require "timeout" + require_relative "omslog" + require_relative "DockerApiRestHelper" + require_relative "ApplicationInsightsUtility" - require 'socket' - require 'json' - require 'timeout' - require_relative 'omslog' - require_relative 'DockerApiRestHelper' - require_relative 'ApplicationInsightsUtility' + @@SocketPath = "/var/run/host/docker.sock" + @@ChunkSize = 4096 + @@TimeoutInSeconds = 5 + @@PluginName = "ContainerInventory" - @@SocketPath = "/var/run/host/docker.sock" - @@ChunkSize = 4096 - @@TimeoutInSeconds = 5 - @@PluginName = 'ContainerInventory' + def initialize + end - def initialize - end - - class << self - # Make docker socket call for requests - def getResponse(request, isMultiJson, isVersion) - begin - socket = UNIXSocket.new(@@SocketPath) - dockerResponse = "" - isTimeOut = false - socket.write(request) - # iterate through the response until the last chunk is less than the chunk size so that we can read all data in socket. - loop do - begin - responseChunk = "" - timeout(@@TimeoutInSeconds) do - responseChunk = socket.recv(@@ChunkSize) - end - dockerResponse += responseChunk - rescue Timeout::Error - $log.warn("Socket read timedout for request: #{request} @ #{Time.now.utc.iso8601}") - isTimeOut = true - break - end - break if (isVersion)? (responseChunk.length < @@ChunkSize) : (responseChunk.end_with? "0\r\n\r\n") - end - socket.close - return (isTimeOut)? nil : parseResponse(dockerResponse, isMultiJson) - rescue => errorStr - $log.warn("Socket call failed for request: #{request} error: #{errorStr} , isMultiJson: #{isMultiJson} @ #{Time.now.utc.iso8601}") - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + class << self + # Make docker socket call for requests + def getResponse(request, isMultiJson, isVersion) + begin + socket = UNIXSocket.new(@@SocketPath) + dockerResponse = "" + isTimeOut = false + socket.write(request) + # iterate through the response until the last chunk is less than the chunk size so that we can read all data in socket. + loop do + begin + responseChunk = "" + timeout(@@TimeoutInSeconds) do + responseChunk = socket.recv(@@ChunkSize) end + dockerResponse += responseChunk + rescue Timeout::Error + $log.warn("Socket read timedout for request: #{request} @ #{Time.now.utc.iso8601}") + isTimeOut = true + break + end + break if (isVersion) ? (responseChunk.length < @@ChunkSize) : (responseChunk.end_with? "0\r\n\r\n") end + socket.close + return (isTimeOut) ? nil : parseResponse(dockerResponse, isMultiJson) + rescue => errorStr + $log.warn("Socket call failed for request: #{request} error: #{errorStr} , isMultiJson: #{isMultiJson} @ #{Time.now.utc.iso8601}") + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + end - def parseResponse(dockerResponse, isMultiJson) - # Doing this because the response is in the raw format and includes headers. - # Need to do a regex match to extract the json part of the response - Anything between [{}] in response - parsedJsonResponse = nil - begin - jsonResponse = isMultiJson ? dockerResponse[/\[{.+}\]/] : dockerResponse[/{.+}/] - rescue => errorStr - $log.warn("Regex match for docker response failed: #{errorStr} , isMultiJson: #{isMultiJson} @ #{Time.now.utc.iso8601}") - end - begin - if jsonResponse != nil - parsedJsonResponse = JSON.parse(jsonResponse) - end - rescue => errorStr - $log.warn("Json parsing for docker response failed: #{errorStr} , isMultiJson: #{isMultiJson} @ #{Time.now.utc.iso8601}") - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end - return parsedJsonResponse - end + def parseResponse(dockerResponse, isMultiJson) + # Doing this because the response is in the raw format and includes headers. + # Need to do a regex match to extract the json part of the response - Anything between [{}] in response + parsedJsonResponse = nil + begin + jsonResponse = isMultiJson ? dockerResponse[/\[{.+}\]/] : dockerResponse[/{.+}/] + rescue => errorStr + $log.warn("Regex match for docker response failed: #{errorStr} , isMultiJson: #{isMultiJson} @ #{Time.now.utc.iso8601}") + end + begin + if jsonResponse != nil + parsedJsonResponse = JSON.parse(jsonResponse) + end + rescue => errorStr + $log.warn("Json parsing for docker response failed: #{errorStr} , isMultiJson: #{isMultiJson} @ #{Time.now.utc.iso8601}") + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + return parsedJsonResponse + end + def getDockerHostName() + dockerHostName = "" + request = DockerApiRestHelper.restDockerInfo + response = getResponse(request, false, false) + if (response != nil) + dockerHostName = response["Name"] + end + return dockerHostName + end - def getDockerHostName() - dockerHostName = "" - request = DockerApiRestHelper.restDockerInfo - response = getResponse(request, false, false) - if (response != nil) - dockerHostName = response['Name'] + def listContainers() + ids = [] + request = DockerApiRestHelper.restDockerPs + containers = getResponse(request, true, false) + if !containers.nil? && !containers.empty? + containers.each do |container| + labels = (!container["Labels"].nil?) ? container["Labels"] : container["labels"] + if !labels.nil? + labelKeys = labels.keys + dockerTypeLabel = labelKeys.find { |k| "io.kubernetes.docker.type".downcase == k.downcase } + if !dockerTypeLabel.nil? + dockerTypeLabelValue = labels[dockerTypeLabel] + # Checking for 'io.kubernetes.docker.type' label for docker containers to exclude the pause-amd64 containers + if !(dockerTypeLabelValue.downcase == "podsandbox".downcase) + # Case insensitive lookup for pod uid label - This is to exclude containers created using docker run and only include containers that + # are created in the pods for ContainerInventory + keyValue = labelKeys.find { |k| "io.kubernetes.pod.uid".downcase == k.downcase } + if !labels[keyValue].nil? + ids.push(container["Id"]) + end + end end - return dockerHostName + end end + end + return ids + end - def listContainers() - ids = [] - request = DockerApiRestHelper.restDockerPs - containers = getResponse(request, true, false) - if !containers.nil? && !containers.empty? - containers.each do |container| - labels = (!container['Labels'].nil?)? container['Labels'] : container['labels'] - if !labels.nil? - labelKeys = labels.keys - dockerTypeLabel = labelKeys.find {|k| 'io.kubernetes.docker.type'.downcase == k.downcase} - if !dockerTypeLabel.nil? - dockerTypeLabelValue = labels[dockerTypeLabel] - # Checking for 'io.kubernetes.docker.type' label for docker containers to exclude the pause-amd64 containers - if !(dockerTypeLabelValue.downcase == "podsandbox".downcase) - # Case insensitive lookup for pod uid label - This is to exclude containers created using docker run and only include containers that - # are created in the pods for ContainerInventory - keyValue = labelKeys.find {|k| 'io.kubernetes.pod.uid'.downcase == k.downcase} - if !labels[keyValue].nil? - ids.push(container['Id']) - end - end - end - end - end - end - return ids + # This method splits the tag value into an array - repository, image, tag, repodigest-imageid + def getImageRepositoryImageTag(tagValue, digestValue) + result = ["", "", "", ""] + atLocation = nil + begin + if !digestValue.empty? + # digest is of the format - repo@sha256:imageid + atLocation = digestValue.index("@") + if !atLocation.nil? + result[3] = digestValue[(atLocation + 1)..-1] + end end - # This method splits the tag value into an array - repository, image and tag - def getImageRepositoryImageTag(tagValue) - result = ["", "", ""] - begin - if !tagValue.empty? - # Find delimiters in the string of format repository/image:imagetag - slashLocation = tagValue.index('/') - colonLocation = tagValue.index(':') - if !colonLocation.nil? - if slashLocation.nil? - # image:imagetag - result[1] = tagValue[0..(colonLocation-1)] - else - # repository/image:imagetag - result[0] = tagValue[0..(slashLocation-1)] - result[1] = tagValue[(slashLocation + 1)..(colonLocation - 1)] - end - result[2] = tagValue[(colonLocation + 1)..-1] - end - end - rescue => errorStr - $log.warn("Exception at getImageRepositoryImageTag: #{errorStr} @ #{Time.now.utc.iso8601}") + if !tagValue.empty? + # Find delimiters in the string of format repository/image:imagetag + slashLocation = tagValue.index("/") + colonLocation = tagValue.index(":") + if !colonLocation.nil? + if slashLocation.nil? + # image:imagetag + result[1] = tagValue[0..(colonLocation - 1)] + else + # repository/image:imagetag + result[0] = tagValue[0..(slashLocation - 1)] + result[1] = tagValue[(slashLocation + 1)..(colonLocation - 1)] end - return result + result[2] = tagValue[(colonLocation + 1)..-1] + end + elsif !digestValue.empty? + # Getting repo information from repodigests when repotags is empty + if !atLocation.nil? + result[0] = digestValue[0..(atLocation - 1)] + end end + rescue => errorStr + $log.warn("Exception at getImageRepositoryImageTag: #{errorStr} @ #{Time.now.utc.iso8601}") + end + return result + end - # Image is in the format repository/image:imagetag - This method creates a hash of image id and repository, image and tag - def getImageIdMap() - result = nil - begin - request = DockerApiRestHelper.restDockerImages - images = getResponse(request, true, false) - if !images.nil? && !images.empty? - result = {} - images.each do |image| - tagValue = "" - tags = image['RepoTags'] - if !tags.nil? && tags.kind_of?(Array) && tags.length > 0 - tagValue = tags[0] - end - idValue = image['Id'] - if !idValue.nil? - result[idValue] = getImageRepositoryImageTag(tagValue) - end - end - end - rescue => errorStr - $log.warn("Exception at getImageIdMap: #{errorStr} @ #{Time.now.utc.iso8601}") + # Image is in the format repository/image:imagetag - This method creates a hash of image id and repository, image and tag + def getImageIdMap() + result = nil + begin + request = DockerApiRestHelper.restDockerImages + images = getResponse(request, true, false) + if !images.nil? && !images.empty? + result = {} + images.each do |image| + tagValue = "" + tags = image["RepoTags"] + if !tags.nil? && tags.kind_of?(Array) && tags.length > 0 + tagValue = tags[0] + end + digestValue = "" + digests = image["RepoDigests"] + if !digests.nil? && digests.kind_of?(Array) && digests.length > 0 + digestValue = digests[0] + end + idValue = image["Id"] + if !idValue.nil? + result[idValue] = getImageRepositoryImageTag(tagValue, digestValue) end - return result + end end + rescue => errorStr + $log.warn("Exception at getImageIdMap: #{errorStr} @ #{Time.now.utc.iso8601}") + end + return result + end - def dockerInspectContainer(id) - request = DockerApiRestHelper.restDockerInspect(id) - return getResponse(request, false, false) - end + def dockerInspectContainer(id) + request = DockerApiRestHelper.restDockerInspect(id) + return getResponse(request, false, false) + end - # This method returns docker version and docker api version for telemetry - def dockerInfo() - request = DockerApiRestHelper.restDockerVersion - response = getResponse(request, false, true) - dockerInfo = {} - if (response != nil) - dockerInfo['Version'] = response['Version'] - dockerInfo['ApiVersion'] = response['ApiVersion'] - end - return dockerInfo - end + # This method returns docker version and docker api version for telemetry + def dockerInfo() + request = DockerApiRestHelper.restDockerVersion + response = getResponse(request, false, true) + dockerInfo = {} + if (response != nil) + dockerInfo["Version"] = response["Version"] + dockerInfo["ApiVersion"] = response["ApiVersion"] + end + return dockerInfo end + end end diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index 3c6b4f203..58a276cfd 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -57,7 +57,7 @@ def getKubeResourceInfo(resource) rescue => error @Log.warn("kubernetes api request failed: #{error} for #{resource} @ #{Time.now.utc.iso8601}") end - if (response.body.empty?) + if (!response.nil? && !response.body.nil? && response.body.empty?) @Log.warn("KubernetesAPIClient::getKubeResourceInfo : Got empty response from Kube API for #{resource} @ #{Time.now.utc.iso8601}") end return response diff --git a/source/code/plugin/in_containerinventory.rb b/source/code/plugin/in_containerinventory.rb index 05e5bc9ea..4392de280 100644 --- a/source/code/plugin/in_containerinventory.rb +++ b/source/code/plugin/in_containerinventory.rb @@ -170,12 +170,13 @@ def inspectContainer(id, nameMap, clusterCollectEnvironmentVar) end imageValue = container["Image"] if !imageValue.nil? && !imageValue.empty? - containerInstance["ImageId"] = imageValue repoImageTagArray = nameMap[imageValue] if nameMap.has_key? imageValue containerInstance["Repository"] = repoImageTagArray[0] containerInstance["Image"] = repoImageTagArray[1] containerInstance["ImageTag"] = repoImageTagArray[2] + # Setting the image id to the id in the remote repository + containerInstance["ImageId"] = repoImageTagArray[3] end end obtainContainerConfig(containerInstance, container, clusterCollectEnvironmentVar) @@ -200,7 +201,7 @@ def enumerate if !containerIds.empty? eventStream = MultiEventStream.new nameMap = DockerApiClient.getImageIdMap - clusterCollectEnvironmentVar = ENV['AZMON_CLUSTER_COLLECT_ENV_VAR'] + clusterCollectEnvironmentVar = ENV["AZMON_CLUSTER_COLLECT_ENV_VAR"] if !clusterCollectEnvironmentVar.nil? && !clusterCollectEnvironmentVar.empty? && clusterCollectEnvironmentVar.casecmp("false") == 0 $log.warn("Environment Variable collection disabled for cluster") end diff --git a/source/code/plugin/in_kube_events.rb b/source/code/plugin/in_kube_events.rb index 309dd8034..3a0e04c67 100644 --- a/source/code/plugin/in_kube_events.rb +++ b/source/code/plugin/in_kube_events.rb @@ -2,27 +2,25 @@ # frozen_string_literal: true module Fluent - class Kube_Event_Input < Input - Plugin.register_input('kubeevents', self) + Plugin.register_input("kubeevents", self) @@KubeEventsStateFile = "/var/opt/microsoft/docker-cimprov/state/KubeEventQueryState.yaml" def initialize super - require 'json' - - require_relative 'KubernetesApiClient' - require_relative 'oms_common' - require_relative 'omslog' - require_relative 'ApplicationInsightsUtility' + require "json" + require_relative "KubernetesApiClient" + require_relative "oms_common" + require_relative "omslog" + require_relative "ApplicationInsightsUtility" end - config_param :run_interval, :time, :default => '1m' + config_param :run_interval, :time, :default => "1m" config_param :tag, :string, :default => "oms.containerinsights.KubeEvents" - def configure (conf) + def configure(conf) super end @@ -46,63 +44,62 @@ def shutdown end def enumerate(eventList = nil) - currentTime = Time.now - emitTime = currentTime.to_f - batchTime = currentTime.utc.iso8601 - if eventList.nil? - $log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}") - events = JSON.parse(KubernetesApiClient.getKubeResourceInfo('events').body) - $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}") - else - events = eventList + currentTime = Time.now + emitTime = currentTime.to_f + batchTime = currentTime.utc.iso8601 + if eventList.nil? + $log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}") + events = JSON.parse(KubernetesApiClient.getKubeResourceInfo("events").body) + $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}") + else + events = eventList + end + eventQueryState = getEventQueryState + newEventQueryState = [] + begin + if (!events.empty? && !events["items"].nil?) + eventStream = MultiEventStream.new + events["items"].each do |items| + record = {} + # - Not sure if ingestion has the below mapping for this custom type. Fix it as part of fixed type conversion + record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + eventId = items["metadata"]["uid"] + "/" + items["count"].to_s + newEventQueryState.push(eventId) + if !eventQueryState.empty? && eventQueryState.include?(eventId) + next + end + record["ObjectKind"] = items["involvedObject"]["kind"] + record["Namespace"] = items["involvedObject"]["namespace"] + record["Name"] = items["involvedObject"]["name"] + record["Reason"] = items["reason"] + record["Message"] = items["message"] + record["Type"] = items["type"] + record["TimeGenerated"] = items["metadata"]["creationTimestamp"] + record["SourceComponent"] = items["source"]["component"] + record["FirstSeen"] = items["firstTimestamp"] + record["LastSeen"] = items["lastTimestamp"] + record["Count"] = items["count"] + if items["source"].key?("host") + record["Computer"] = items["source"]["host"] + else + record["Computer"] = (OMS::Common.get_hostname) + end + record["ClusterName"] = KubernetesApiClient.getClusterName + record["ClusterId"] = KubernetesApiClient.getClusterId + wrapper = { + "DataType" => "KUBE_EVENTS_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [record.each { |k, v| record[k] = v }], + } + eventStream.add(emitTime, wrapper) if wrapper end - eventQueryState = getEventQueryState - newEventQueryState = [] - begin - if(!events.empty?) - eventStream = MultiEventStream.new - events['items'].each do |items| - record = {} - # - Not sure if ingestion has the below mapping for this custom type. Fix it as part of fixed type conversion - record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated - eventId = items['metadata']['uid'] + "/" + items['count'].to_s - newEventQueryState.push(eventId) - if !eventQueryState.empty? && eventQueryState.include?(eventId) - next - end - record['ObjectKind']= items['involvedObject']['kind'] - record['Namespace'] = items['involvedObject']['namespace'] - record['Name'] = items['involvedObject']['name'] - record['Reason'] = items['reason'] - record['Message'] = items['message'] - record['Type'] = items['type'] - record['TimeGenerated'] = items['metadata']['creationTimestamp'] - record['SourceComponent'] = items['source']['component'] - record['FirstSeen'] = items['firstTimestamp'] - record['LastSeen'] = items['lastTimestamp'] - record['Count'] = items['count'] - if items['source'].key?('host') - record['Computer'] = items['source']['host'] - else - record['Computer'] = (OMS::Common.get_hostname) - end - record['ClusterName'] = KubernetesApiClient.getClusterName - record['ClusterId'] = KubernetesApiClient.getClusterId - wrapper = { - "DataType"=>"KUBE_EVENTS_BLOB", - "IPName"=>"ContainerInsights", - "DataItems"=>[record.each{|k,v| record[k]=v}] - } - eventStream.add(emitTime, wrapper) if wrapper - end - router.emit_stream(@tag, eventStream) if eventStream - end - writeEventQueryState(newEventQueryState) - rescue => errorStr - $log.warn line.dump, error: errorStr.to_s - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end + router.emit_stream(@tag, eventStream) if eventStream + end + writeEventQueryState(newEventQueryState) + rescue => errorStr + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end end def run_periodic @@ -135,7 +132,7 @@ def getEventQueryState eventQueryState.push(line.chomp) #puts will append newline which needs to be removed end end - rescue => errorStr + rescue => errorStr $log.warn $log.warn line.dump, error: errorStr.to_s $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) @@ -145,20 +142,17 @@ def getEventQueryState def writeEventQueryState(eventQueryState) begin - if(!eventQueryState.nil? && !eventQueryState.empty?) + if (!eventQueryState.nil? && !eventQueryState.empty?) # No need to close file handle (f) due to block scope File.open(@@KubeEventsStateFile, "w") do |f| f.puts(eventQueryState) end end - rescue => errorStr + rescue => errorStr $log.warn $log.warn line.dump, error: errorStr.to_s $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end end - end # Kube_Event_Input - end # module - diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index aabda441e..0310fa419 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -58,81 +58,83 @@ def enumerate if (!nodeInventory.empty?) eventStream = MultiEventStream.new containerNodeInventoryEventStream = MultiEventStream.new - #get node inventory - nodeInventory["items"].each do |items| - record = {} - # Sending records for ContainerNodeInventory - containerNodeInventoryRecord = {} - containerNodeInventoryRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - containerNodeInventoryRecord["Computer"] = items["metadata"]["name"] + if !nodeInventory["items"].nil? + #get node inventory + nodeInventory["items"].each do |items| + record = {} + # Sending records for ContainerNodeInventory + containerNodeInventoryRecord = {} + containerNodeInventoryRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + containerNodeInventoryRecord["Computer"] = items["metadata"]["name"] - record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - record["Computer"] = items["metadata"]["name"] - record["ClusterName"] = KubernetesApiClient.getClusterName - record["ClusterId"] = KubernetesApiClient.getClusterId - record["CreationTimeStamp"] = items["metadata"]["creationTimestamp"] - record["Labels"] = [items["metadata"]["labels"]] - record["Status"] = "" + record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + record["Computer"] = items["metadata"]["name"] + record["ClusterName"] = KubernetesApiClient.getClusterName + record["ClusterId"] = KubernetesApiClient.getClusterId + record["CreationTimeStamp"] = items["metadata"]["creationTimestamp"] + record["Labels"] = [items["metadata"]["labels"]] + record["Status"] = "" - # Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions. - # We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we - # populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk" - # implying that the node is ready for hosting pods, however its out of disk. + # Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions. + # We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we + # populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk" + # implying that the node is ready for hosting pods, however its out of disk. - if items["status"].key?("conditions") && !items["status"]["conditions"].empty? - allNodeConditions = "" - items["status"]["conditions"].each do |condition| - if condition["status"] == "True" - if !allNodeConditions.empty? - allNodeConditions = allNodeConditions + "," + condition["type"] - else - allNodeConditions = condition["type"] + if items["status"].key?("conditions") && !items["status"]["conditions"].empty? + allNodeConditions = "" + items["status"]["conditions"].each do |condition| + if condition["status"] == "True" + if !allNodeConditions.empty? + allNodeConditions = allNodeConditions + "," + condition["type"] + else + allNodeConditions = condition["type"] + end + end + #collect last transition to/from ready (no matter ready is true/false) + if condition["type"] == "Ready" && !condition["lastTransitionTime"].nil? + record["LastTransitionTimeReady"] = condition["lastTransitionTime"] end end - #collect last transition to/from ready (no matter ready is true/false) - if condition["type"] == "Ready" && !condition["lastTransitionTime"].nil? - record["LastTransitionTimeReady"] = condition["lastTransitionTime"] + if !allNodeConditions.empty? + record["Status"] = allNodeConditions end end - if !allNodeConditions.empty? - record["Status"] = allNodeConditions - end - end - nodeInfo = items["status"]["nodeInfo"] - record["KubeletVersion"] = nodeInfo["kubeletVersion"] - record["KubeProxyVersion"] = nodeInfo["kubeProxyVersion"] - containerNodeInventoryRecord["OperatingSystem"] = nodeInfo["osImage"] - dockerVersion = nodeInfo["containerRuntimeVersion"] - dockerVersion.slice! "docker://" - containerNodeInventoryRecord["DockerVersion"] = dockerVersion - # ContainerNodeInventory data for docker version and operating system. - containerNodeInventoryWrapper = { - "DataType" => "CONTAINER_NODE_INVENTORY_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [containerNodeInventoryRecord.each { |k, v| containerNodeInventoryRecord[k] = v }], - } - containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryWrapper) if containerNodeInventoryWrapper + nodeInfo = items["status"]["nodeInfo"] + record["KubeletVersion"] = nodeInfo["kubeletVersion"] + record["KubeProxyVersion"] = nodeInfo["kubeProxyVersion"] + containerNodeInventoryRecord["OperatingSystem"] = nodeInfo["osImage"] + dockerVersion = nodeInfo["containerRuntimeVersion"] + dockerVersion.slice! "docker://" + containerNodeInventoryRecord["DockerVersion"] = dockerVersion + # ContainerNodeInventory data for docker version and operating system. + containerNodeInventoryWrapper = { + "DataType" => "CONTAINER_NODE_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [containerNodeInventoryRecord.each { |k, v| containerNodeInventoryRecord[k] = v }], + } + containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryWrapper) if containerNodeInventoryWrapper - wrapper = { - "DataType" => "KUBE_NODE_INVENTORY_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [record.each { |k, v| record[k] = v }], - } - eventStream.add(emitTime, wrapper) if wrapper - # Adding telemetry to send node telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference / 60 - if (timeDifferenceInMinutes >= 5) - properties = {} - properties["Computer"] = record["Computer"] - properties["KubeletVersion"] = record["KubeletVersion"] - properties["OperatingSystem"] = nodeInfo["operatingSystem"] - properties["DockerVersion"] = dockerVersion - capacityInfo = items["status"]["capacity"] - ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties) - ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties) - telemetrySent = true + wrapper = { + "DataType" => "KUBE_NODE_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [record.each { |k, v| record[k] = v }], + } + eventStream.add(emitTime, wrapper) if wrapper + # Adding telemetry to send node telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 5) + properties = {} + properties["Computer"] = record["Computer"] + properties["KubeletVersion"] = record["KubeletVersion"] + properties["OperatingSystem"] = nodeInfo["operatingSystem"] + properties["DockerVersion"] = dockerVersion + capacityInfo = items["status"]["capacity"] + ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties) + ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties) + telemetrySent = true + end end end router.emit_stream(@tag, eventStream) if eventStream diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index 79490ba7d..d0056fb14 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -152,8 +152,10 @@ def getContainerEnvironmentVariables(pod, clusterCollectEnvironmentVar) containerEnvArray.each do |envVarHash| envName = envVarHash["name"] envValue = envVarHash["value"] - envArrayElement = envName + "=" + envValue - envVarsArray.push(envArrayElement) + if !envName.nil? && !envValue.nil? + envArrayElement = envName + "=" + envValue + envVarsArray.push(envArrayElement) + end end end # Skip environment variable processing if it contains the flag AZMON_COLLECT_ENV=FALSE @@ -201,7 +203,11 @@ def parse_and_emit_records(podInventory, serviceList) # instead of the actual poduid. Since this uid is not being surface into the UX # its ok to use this. # Use kubernetes.io/config.hash to be able to correlate with cadvisor data - podUid = items["metadata"]["annotations"]["kubernetes.io/config.hash"] + if items["metadata"]["annotations"].nil? + next + else + podUid = items["metadata"]["annotations"]["kubernetes.io/config.hash"] + end else podUid = items["metadata"]["uid"] end @@ -287,7 +293,11 @@ def parse_and_emit_records(podInventory, serviceList) record["ContainerID"] = "" end #keeping this as which is same as InstanceName in perf table - record["ContainerName"] = podUid + "/" + container["name"] + if podUid.nil? || container["name"].nil? + next + else + record["ContainerName"] = podUid + "/" + container["name"] + end #Pod restart count is a sumtotal of restart counts of individual containers #within the pod. The restart count of a container is maintained by kubernetes #itself in the form of a container label. diff --git a/source/code/plugin/in_kube_services.rb b/source/code/plugin/in_kube_services.rb index e1bb93f30..8b0a013e4 100644 --- a/source/code/plugin/in_kube_services.rb +++ b/source/code/plugin/in_kube_services.rb @@ -2,108 +2,101 @@ # frozen_string_literal: true module Fluent - - class Kube_Services_Input < Input - Plugin.register_input('kubeservices', self) - - def initialize - super - require 'yaml' - require 'json' - - require_relative 'KubernetesApiClient' - require_relative 'oms_common' - require_relative 'omslog' - require_relative 'ApplicationInsightsUtility' + class Kube_Services_Input < Input + Plugin.register_input("kubeservices", self) - end - - config_param :run_interval, :time, :default => '1m' - config_param :tag, :string, :default => "oms.containerinsights.KubeServices" - - def configure (conf) - super - end - - def start - if @run_interval - @finished = false - @condition = ConditionVariable.new - @mutex = Mutex.new - @thread = Thread.new(&method(:run_periodic)) - end - end - - def shutdown - if @run_interval - @mutex.synchronize { - @finished = true - @condition.signal + def initialize + super + require "yaml" + require "json" + + require_relative "KubernetesApiClient" + require_relative "oms_common" + require_relative "omslog" + require_relative "ApplicationInsightsUtility" + end + + config_param :run_interval, :time, :default => "1m" + config_param :tag, :string, :default => "oms.containerinsights.KubeServices" + + def configure(conf) + super + end + + def start + if @run_interval + @finished = false + @condition = ConditionVariable.new + @mutex = Mutex.new + @thread = Thread.new(&method(:run_periodic)) + end + end + + def shutdown + if @run_interval + @mutex.synchronize { + @finished = true + @condition.signal + } + @thread.join + end + end + + def enumerate + currentTime = Time.now + emitTime = currentTime.to_f + batchTime = currentTime.utc.iso8601 + $log.info("in_kube_services::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") + serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body) + $log.info("in_kube_services::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") + begin + if (!serviceList.empty?) + eventStream = MultiEventStream.new + serviceList["items"].each do |items| + record = {} + record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + record["ServiceName"] = items["metadata"]["name"] + record["Namespace"] = items["metadata"]["namespace"] + record["SelectorLabels"] = [items["spec"]["selector"]] + record["ClusterId"] = KubernetesApiClient.getClusterId + record["ClusterName"] = KubernetesApiClient.getClusterName + record["ClusterIP"] = items["spec"]["clusterIP"] + record["ServiceType"] = items["spec"]["type"] + # : Add ports and status fields + wrapper = { + "DataType" => "KUBE_SERVICES_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [record.each { |k, v| record[k] = v }], } - @thread.join + eventStream.add(emitTime, wrapper) if wrapper end + router.emit_stream(@tag, eventStream) if eventStream end - - def enumerate - currentTime = Time.now - emitTime = currentTime.to_f - batchTime = currentTime.utc.iso8601 - $log.info("in_kube_services::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") - serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo('services').body) - $log.info("in_kube_services::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") - begin - if(!serviceList.empty?) - eventStream = MultiEventStream.new - serviceList['items'].each do |items| - record = {} - record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated - record['ServiceName'] = items['metadata']['name'] - record['Namespace'] = items['metadata']['namespace'] - record['SelectorLabels'] = [items['spec']['selector']] - record['ClusterId'] = KubernetesApiClient.getClusterId - record['ClusterName'] = KubernetesApiClient.getClusterName - record['ClusterIP'] = items['spec']['clusterIP'] - record['ServiceType'] = items['spec']['type'] - # : Add ports and status fields - wrapper = { - "DataType"=>"KUBE_SERVICES_BLOB", - "IPName"=>"ContainerInsights", - "DataItems"=>[record.each{|k,v| record[k]=v}] - } - eventStream.add(emitTime, wrapper) if wrapper - end - router.emit_stream(@tag, eventStream) if eventStream - end - rescue => errorStr - $log.warn line.dump, error: errorStr.to_s - $log.debug_backtrace(e.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end - end - - def run_periodic - @mutex.lock - done = @finished - until done - @condition.wait(@mutex, @run_interval) - done = @finished - @mutex.unlock - if !done - begin - $log.info("in_kube_services::run_periodic @ #{Time.now.utc.iso8601}") - enumerate - rescue => errorStr - $log.warn "in_kube_services::run_periodic: enumerate Failed to kube services: #{errorStr}" - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end - end - @mutex.lock + rescue => errorStr + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + end + + def run_periodic + @mutex.lock + done = @finished + until done + @condition.wait(@mutex, @run_interval) + done = @finished + @mutex.unlock + if !done + begin + $log.info("in_kube_services::run_periodic @ #{Time.now.utc.iso8601}") + enumerate + rescue => errorStr + $log.warn "in_kube_services::run_periodic: enumerate Failed to kube services: #{errorStr}" + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end - @mutex.unlock end - - end # Kube_Services_Input - - end # module - - \ No newline at end of file + @mutex.lock + end + @mutex.unlock + end + end # Kube_Services_Input +end # module