diff --git a/installer/conf/td-agent-bit.conf b/installer/conf/td-agent-bit.conf index 4e3de6c46..6a1bf3e3e 100644 --- a/installer/conf/td-agent-bit.conf +++ b/installer/conf/td-agent-bit.conf @@ -28,6 +28,7 @@ Path /var/log/containers/omsagent*.log DB /var/opt/microsoft/docker-cimprov/state/omsagent-ai.db DB.Sync Off + Parser docker Mem_Buf_Limit 1m Path_Key filepath Skip_Long_Lines On @@ -51,7 +52,6 @@ [FILTER] Name grep Match oms.container.log.flbplugin.* - Exclude log E! [\[]inputs.prometheus[\]] [OUTPUT] Name oms diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data index 3dc1a18cd..7accc02fb 100644 --- a/installer/datafiles/base_container.data +++ b/installer/datafiles/base_container.data @@ -115,6 +115,7 @@ MAINTAINER: 'Microsoft Corporation' /opt/tomlparser.rb; installer/scripts/tomlparser.rb; 755; root; root /opt/tomlparser-prom-customconfig.rb; installer/scripts/tomlparser-prom-customconfig.rb; 755; root; root /opt/td-agent-bit-conf-customizer.rb; installer/scripts/td-agent-bit-conf-customizer.rb; 755; root; root +/opt/ConfigParseErrorLogger.rb; installer/scripts/ConfigParseErrorLogger.rb; 755; root; root diff --git a/installer/scripts/ConfigParseErrorLogger.rb b/installer/scripts/ConfigParseErrorLogger.rb new file mode 100644 index 000000000..5d6db8016 --- /dev/null +++ b/installer/scripts/ConfigParseErrorLogger.rb @@ -0,0 +1,21 @@ +#!/usr/local/bin/ruby +# frozen_string_literal: true + +class ConfigParseErrorLogger + require "json" + + def initialize + end + + class << self + def logError(message) + begin + errorMessage = "config::error::" + message + jsonMessage = errorMessage.to_json + STDERR.puts jsonMessage + rescue => errorStr + puts "Error in ConfigParserErrorLogger::logError: #{errorStr}" + end + end + end +end diff --git a/installer/scripts/td-agent-bit-conf-customizer.rb b/installer/scripts/td-agent-bit-conf-customizer.rb index 1e62e3cc2..fae3acb36 100644 --- a/installer/scripts/td-agent-bit-conf-customizer.rb +++ b/installer/scripts/td-agent-bit-conf-customizer.rb @@ -1,4 +1,5 @@ #!/usr/local/bin/ruby +require_relative "ConfigParseErrorLogger" @td_agent_bit_conf_path = "/etc/opt/microsoft/docker-cimprov/td-agent-bit.conf" @@ -40,7 +41,7 @@ def substituteFluentBitPlaceHolders File.open(@td_agent_bit_conf_path, "w") { |file| file.puts new_contents } puts "config::Successfully substituted the placeholders in td-agent-bit.conf file" rescue => errorStr - puts "td-agent-bit-config-customizer: error while substituting values: #{errorStr}" + ConfigParseErrorLogger.logError("td-agent-bit-config-customizer: error while substituting values in td-agent-bit.conf file: #{errorStr}") end end diff --git a/installer/scripts/tomlparser-prom-customconfig.rb b/installer/scripts/tomlparser-prom-customconfig.rb index d44bf3342..ab868f1a9 100644 --- a/installer/scripts/tomlparser-prom-customconfig.rb +++ b/installer/scripts/tomlparser-prom-customconfig.rb @@ -1,6 +1,7 @@ #!/usr/local/bin/ruby require_relative "tomlrb" +require_relative "ConfigParseErrorLogger" require "fileutils" @promConfigMapMountPath = "/etc/config/settings/prometheus-data-collection-settings" @@ -40,7 +41,7 @@ def parseConfigMap return nil end rescue => errorStr - puts "config::error::Exception while parsing toml config file for prometheus config: #{errorStr}, using defaults" + ConfigParseErrorLogger.logError("Exception while parsing config map for prometheus config: #{errorStr}, using defaults, please check config map for errors") return nil end end @@ -66,7 +67,7 @@ def replaceDefaultMonitorPodSettings(new_contents, monitorKubernetesPods) new_contents = new_contents.gsub("$AZMON_RS_PROM_MONITOR_PODS", ("monitor_kubernetes_pods = #{monitorKubernetesPods}")) new_contents = new_contents.gsub("$AZMON_RS_PROM_PLUGINS_WITH_NAMESPACE_FILTER", "") rescue => errorStr - puts "config::error::Exception while replacing default pod monitor settings: #{errorStr}" + puts "Exception while replacing default pod monitor settings: #{errorStr}" end return new_contents end @@ -98,7 +99,7 @@ def createPrometheusPluginsWithNamespaceSetting(monitorKubernetesPods, monitorKu new_contents = new_contents.gsub("$AZMON_RS_PROM_PLUGINS_WITH_NAMESPACE_FILTER", pluginConfigsWithNamespaces) return new_contents rescue => errorStr - puts "config::error::Exception while creating prometheus input plugins to filter namespaces: #{errorStr}, using defaults" + puts "Exception while creating prometheus input plugins to filter namespaces: #{errorStr}, using defaults" replaceDefaultMonitorPodSettings(new_contents, monitorKubernetesPods) end end @@ -181,10 +182,10 @@ def populateSettingValuesFromConfigMap(parsedConfig) puts "config::Successfully created telemetry file for replicaset" end else - puts "config::Typecheck failed for prometheus config settings for replicaset, using defaults" + ConfigParseErrorLogger.logError("Typecheck failed for prometheus config settings for replicaset, using defaults, please use right types for all settings") end # end of type check condition rescue => errorStr - puts "config::error::Exception while parsing config file for prometheus config for replicaset: #{errorStr}, using defaults" + ConfigParseErrorLogger.logError("Exception while parsing config file for prometheus config for replicaset: #{errorStr}, using defaults") setRsPromDefaults puts "****************End Prometheus Config Processing********************" end @@ -236,16 +237,16 @@ def populateSettingValuesFromConfigMap(parsedConfig) puts "config::Successfully created telemetry file for daemonset" end else - puts "config::Typecheck failed for prometheus config settings for daemonset, using defaults" + ConfigParseErrorLogger.logError("Typecheck failed for prometheus config settings for daemonset, using defaults, please use right types for all settings") end # end of type check condition rescue => errorStr - puts "config::error::Exception while parsing config file for prometheus config for daemonset: #{errorStr}, using defaults" + ConfigParseErrorLogger.logError("Exception while parsing config file for prometheus config for daemonset: #{errorStr}, using defaults, please check correctness of configmap") puts "****************End Prometheus Config Processing********************" end end # end of controller type check end else - puts "config::error:: Controller undefined while processing prometheus config, using defaults" + ConfigParseErrorLogger.logError("Controller undefined while processing prometheus config, using defaults") end end @@ -258,7 +259,7 @@ def populateSettingValuesFromConfigMap(parsedConfig) end else if (File.file?(@promConfigMapMountPath)) - puts "config::unsupported/missing config schema version - '#{@configSchemaVersion}' , using defaults" + ConfigParseErrorLogger.logError("config::unsupported/missing config schema version - '#{@configSchemaVersion}' , using defaults, please use supported version") else puts "config::No configmap mounted for prometheus custom config, using defaults" end diff --git a/installer/scripts/tomlparser.rb b/installer/scripts/tomlparser.rb index b66e1257e..523f8c307 100644 --- a/installer/scripts/tomlparser.rb +++ b/installer/scripts/tomlparser.rb @@ -1,7 +1,8 @@ #!/usr/local/bin/ruby require_relative "tomlrb" -require 'json' +require_relative "ConfigParseErrorLogger" +require "json" @log_settings_config_map_mount_path = "/etc/config/settings/log-data-collection-settings" @agent_settings_config_map_mount_path = "/etc/config/settings/agent-settings" @@ -33,7 +34,7 @@ def parseConfigMap(path) return nil end rescue => errorStr - puts "config::error::Exception while parsing toml config file: #{errorStr}, using defaults" + ConfigParseErrorLogger.logError("Exception while parsing config map for log collection/env variable settings: #{errorStr}, using defaults, please check config map for errors") @excludePath = "*_kube-system_*.log" return nil end @@ -70,7 +71,7 @@ def populateSettingValuesFromConfigMap(parsedConfig) end end rescue => errorStr - puts "config::error::Exception while reading config settings for stdout log collection - #{errorStr}, using defaults" + ConfigParseErrorLogger.logError("Exception while reading config map settings for stdout log collection - #{errorStr}, using defaults, please check config map for errors") end #Get stderr log config settings @@ -107,7 +108,7 @@ def populateSettingValuesFromConfigMap(parsedConfig) end end rescue => errorStr - puts "config::error:Exception while reading config settings for stderr log collection - #{errorStr}, using defaults" + ConfigParseErrorLogger.logError("Exception while reading config map settings for stderr log collection - #{errorStr}, using defaults, please check config map for errors") end #Get environment variables log config settings @@ -117,42 +118,43 @@ def populateSettingValuesFromConfigMap(parsedConfig) puts "config::Using config map setting for cluster level environment variable collection" end rescue => errorStr - puts "config::error::Exception while reading config settings for cluster level environment variable collection - #{errorStr}, using defaults" + ConfigParseErrorLogger.logError("Exception while reading config map settings for cluster level environment variable collection - #{errorStr}, using defaults, please check config map for errors") end end begin if !parsedConfig.nil? && !parsedConfig[:agent_settings].nil? && !parsedConfig[:agent_settings][:health_model].nil? && !parsedConfig[:agent_settings][:health_model][:enabled].nil? - @enable_health_model = parsedConfig[:agent_settings][:health_model][:enabled] + @enable_health_model = parsedConfig[:agent_settings][:health_model][:enabled] else - @enable_health_model = false + @enable_health_model = false end puts "enable_health_model = #{@enable_health_model}" rescue => errorStr - puts "config::error:Exception while reading config settings for health_model enabled setting - #{errorStr}, using defaults" + ConfigParseErrorLogger.logError("Exception while reading config map settings for health_model enabled setting - #{errorStr}, using defaults, please check config map for errors") @enable_health_model = false end end @configSchemaVersion = ENV["AZMON_AGENT_CFG_SCHEMA_VERSION"] puts "****************Start Config Processing********************" + if !@configSchemaVersion.nil? && !@configSchemaVersion.empty? && @configSchemaVersion.strip.casecmp("v1") == 0 #note v1 is the only supported schema version , so hardcoding it - configMapSettings = {} + configMapSettings = {} - #iterate over every *settings file and build a hash of settings - Dir["/etc/config/settings/*settings"].each{|file| - puts "Parsing File #{file}" - settings = parseConfigMap(file) - if !settings.nil? - configMapSettings = configMapSettings.merge(settings) - end - } + #iterate over every *settings file and build a hash of settings + Dir["/etc/config/settings/*settings"].each { |file| + puts "Parsing File #{file}" + settings = parseConfigMap(file) + if !settings.nil? + configMapSettings = configMapSettings.merge(settings) + end + } if !configMapSettings.nil? populateSettingValuesFromConfigMap(configMapSettings) end else - puts "config::unsupported/missing config schema version - '#{@configSchemaVersion}' , using defaults" + ConfigParseErrorLogger.logError("config::unsupported/missing config schema version - '#{@configSchemaVersion}' , using defaults, please use supported schema version") @excludePath = "*_kube-system_*.log" end @@ -178,13 +180,13 @@ def populateSettingValuesFromConfigMap(parsedConfig) file.write("export AZMON_STDERR_EXCLUDED_NAMESPACES=#{@stderrExcludeNamespaces}\n") file.write("export AZMON_CLUSTER_COLLECT_ENV_VAR=#{@collectClusterEnvVariables}\n") file.write("export AZMON_CLUSTER_LOG_TAIL_EXCLUDE_PATH=#{@excludePath}\n") - #health_model settings + #health_model settings file.write("export AZMON_CLUSTER_ENABLE_HEALTH_MODEL=#{@enable_health_model}\n") # Close file after writing all environment variables file.close puts "Both stdout & stderr log collection are turned off for namespaces: '#{@excludePath}' " puts "****************End Config Processing********************" else - puts "config::error::Exception while opening file for writing config environment variables" + puts "Exception while opening file for writing config environment variables" puts "****************End Config Processing********************" end diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go index c5ad307d8..6d78455bd 100644 --- a/source/code/go/src/plugins/oms.go +++ b/source/code/go/src/plugins/oms.go @@ -28,6 +28,9 @@ const ContainerLogDataType = "CONTAINER_LOG_BLOB" // DataType for Insights metric const InsightsMetricsDataType = "INSIGHTS_METRICS_BLOB" +// DataType for KubeMonAgentEvent +const KubeMonAgentEventDataType = "KUBE_MON_AGENT_EVENTS_BLOB" + //env varibale which has ResourceId for LA const ResourceIdEnv = "AKS_RESOURCE_ID" @@ -46,6 +49,20 @@ const TelegrafTagClusterName = "clusterName" // clusterId tag const TelegrafTagClusterID = "clusterId" +const ConfigErrorEventCategory = "container.azm.ms/configmap" + +const PromScrapingErrorEventCategory = "container.azm.ms/promscraping" + +const NoErrorEventCategory = "container.azm.ms/noerror" + +const KubeMonAgentEventError = "Error" + +const KubeMonAgentEventWarning = "Warning" + +const KubeMonAgentEventInfo = "Info" + +const KubeMonAgentEventsFlushedEvent = "KubeMonAgentEventsFlushed" + // ContainerLogPluginConfFilePath --> config file path for container log plugin const DaemonSetContainerLogPluginConfFilePath = "/etc/opt/microsoft/docker-cimprov/out_oms.conf" const ReplicaSetContainerLogPluginConfFilePath = "/etc/opt/microsoft/docker-cimprov/out_oms.conf" @@ -54,6 +71,8 @@ const ReplicaSetContainerLogPluginConfFilePath = "/etc/opt/microsoft/docker-cimp const IPName = "Containers" const defaultContainerInventoryRefreshInterval = 60 +const kubeMonAgentConfigEventFlushInterval = 60 + var ( // PluginConfiguration the plugins configuration PluginConfiguration map[string]string @@ -71,6 +90,8 @@ var ( ResourceCentric bool //ResourceName ResourceName string + //KubeMonAgentEvents skip first flush + skipKubeMonEventsFlush bool ) var ( @@ -88,11 +109,19 @@ var ( ContainerLogTelemetryMutex = &sync.Mutex{} // ClientSet for querying KubeAPIs ClientSet *kubernetes.Clientset + // Config error hash + ConfigErrorEvent map[string]KubeMonAgentEventTags + // Prometheus scraping error hash + PromScrapeErrorEvent map[string]KubeMonAgentEventTags + // EventHashUpdateMutex read and write mutex access to the event hash + EventHashUpdateMutex = &sync.Mutex{} ) var ( // ContainerImageNameRefreshTicker updates the container image and names periodically ContainerImageNameRefreshTicker *time.Ticker + // KubeMonAgentConfigEventsSendTicker to send config events every hour + KubeMonAgentConfigEventsSendTicker *time.Ticker ) var ( @@ -142,6 +171,41 @@ type ContainerLogBlob struct { DataItems []DataItem `json:"DataItems"` } +// Config Error message to be sent to Log Analytics +type laKubeMonAgentEvents struct { + Computer string `json:"Computer"` + CollectionTime string `json:"CollectionTime"` //mapped to TimeGenerated + Category string `json:"Category"` + Level string `json:"Level"` + ClusterId string `json:"ClusterId"` + ClusterName string `json:"ClusterName"` + Message string `json:"Message"` + Tags string `json:"Tags"` +} + +type KubeMonAgentEventTags struct { + PodName string + ContainerId string + FirstOccurance string + LastOccurance string + Count int +} + +type KubeMonAgentEventBlob struct { + DataType string `json:"DataType"` + IPName string `json:"IPName"` + DataItems []laKubeMonAgentEvents `json:"DataItems"` +} + +// KubeMonAgentEventType to be used as enum +type KubeMonAgentEventType int + +const ( + // KubeMonAgentEventType to be used as enum for ConfigError and ScrapingError + ConfigError KubeMonAgentEventType = iota + PromScrapingError +) + func createLogger() *log.Logger { var logfile *os.File path := "/var/opt/microsoft/docker-cimprov/log/fluent-bit-out-oms-runtime.log" @@ -262,6 +326,223 @@ func convert(in interface{}) (float64, bool) { } } +// PostConfigErrorstoLA sends config/prometheus scraping error log lines to LA +func populateKubeMonAgentEventHash(record map[interface{}]interface{}, errType KubeMonAgentEventType) { + var logRecordString = ToString(record["log"]) + var eventTimeStamp = ToString(record["time"]) + containerID, _, podName := GetContainerIDK8sNamespacePodNameFromFileName(ToString(record["filepath"])) + + Log("Locked EventHashUpdateMutex for updating hash \n ") + EventHashUpdateMutex.Lock() + switch errType { + case ConfigError: + // Doing this since the error logger library is adding quotes around the string and a newline to the end because + // we are converting string to json to log lines in different lines as one record + logRecordString = strings.TrimSuffix(logRecordString, "\n") + logRecordString = logRecordString[1 : len(logRecordString)-1] + + if val, ok := ConfigErrorEvent[logRecordString]; ok { + Log("In config error existing hash update\n") + eventCount := val.Count + eventFirstOccurance := val.FirstOccurance + + ConfigErrorEvent[logRecordString] = KubeMonAgentEventTags{ + PodName: podName, + ContainerId: containerID, + FirstOccurance: eventFirstOccurance, + LastOccurance: eventTimeStamp, + Count: eventCount + 1, + } + } else { + ConfigErrorEvent[logRecordString] = KubeMonAgentEventTags{ + PodName: podName, + ContainerId: containerID, + FirstOccurance: eventTimeStamp, + LastOccurance: eventTimeStamp, + Count: 1, + } + } + + case PromScrapingError: + // Splitting this based on the string 'E! [inputs.prometheus]: ' since the log entry has timestamp and we want to remove that before building the hash + var scrapingSplitString = strings.Split(logRecordString, "E! [inputs.prometheus]: ") + if scrapingSplitString != nil && len(scrapingSplitString) == 2 { + var splitString = scrapingSplitString[1] + // Trimming the newline character at the end since this is being added as the key + splitString = strings.TrimSuffix(splitString, "\n") + if splitString != "" { + if val, ok := PromScrapeErrorEvent[splitString]; ok { + Log("In config error existing hash update\n") + eventCount := val.Count + eventFirstOccurance := val.FirstOccurance + + PromScrapeErrorEvent[splitString] = KubeMonAgentEventTags{ + PodName: podName, + ContainerId: containerID, + FirstOccurance: eventFirstOccurance, + LastOccurance: eventTimeStamp, + Count: eventCount + 1, + } + } else { + PromScrapeErrorEvent[splitString] = KubeMonAgentEventTags{ + PodName: podName, + ContainerId: containerID, + FirstOccurance: eventTimeStamp, + LastOccurance: eventTimeStamp, + Count: 1, + } + } + } + } + } + EventHashUpdateMutex.Unlock() + Log("Unlocked EventHashUpdateMutex after updating hash \n ") +} + +// Function to get config error log records after iterating through the two hashes +func flushKubeMonAgentEventRecords() { + for ; true; <-KubeMonAgentConfigEventsSendTicker.C { + if skipKubeMonEventsFlush != true { + Log("In flushConfigErrorRecords\n") + start := time.Now() + var resp *http.Response + var postError error + var elapsed time.Duration + var laKubeMonAgentEventsRecords []laKubeMonAgentEvents + telemetryDimensions := make(map[string]string) + + telemetryDimensions["ConfigErrorEventCount"] = strconv.Itoa(len(ConfigErrorEvent)) + telemetryDimensions["PromScrapeErrorEventCount"] = strconv.Itoa(len(PromScrapeErrorEvent)) + + if (len(ConfigErrorEvent) > 0) || (len(PromScrapeErrorEvent) > 0) { + EventHashUpdateMutex.Lock() + Log("Locked EventHashUpdateMutex for reading hashes\n") + for k, v := range ConfigErrorEvent { + tagJson, err := json.Marshal(v) + + if err != nil { + message := fmt.Sprintf("Error while Marshalling config error event tags: %s", err.Error()) + Log(message) + SendException(message) + } else { + laKubeMonAgentEventsRecord := laKubeMonAgentEvents{ + Computer: Computer, + CollectionTime: start.Format(time.RFC3339), + Category: ConfigErrorEventCategory, + Level: KubeMonAgentEventError, + ClusterId: ResourceID, + ClusterName: ResourceName, + Message: k, + Tags: fmt.Sprintf("%s", tagJson), + } + laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord) + } + } + + for k, v := range PromScrapeErrorEvent { + tagJson, err := json.Marshal(v) + if err != nil { + message := fmt.Sprintf("Error while Marshalling prom scrape error event tags: %s", err.Error()) + Log(message) + SendException(message) + } else { + laKubeMonAgentEventsRecord := laKubeMonAgentEvents{ + Computer: Computer, + CollectionTime: start.Format(time.RFC3339), + Category: PromScrapingErrorEventCategory, + Level: KubeMonAgentEventWarning, + ClusterId: ResourceID, + ClusterName: ResourceName, + Message: k, + Tags: fmt.Sprintf("%s", tagJson), + } + laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord) + } + } + + //Clearing out the prometheus scrape hash so that it can be rebuilt with the errors in the next hour + for k := range PromScrapeErrorEvent { + delete(PromScrapeErrorEvent, k) + } + Log("PromScrapeErrorEvent cache cleared\n") + EventHashUpdateMutex.Unlock() + Log("Unlocked EventHashUpdateMutex for reading hashes\n") + } else { + //Sending a record in case there are no errors to be able to differentiate between no data vs no errors + tagsValue := KubeMonAgentEventTags{} + + tagJson, err := json.Marshal(tagsValue) + if err != nil { + message := fmt.Sprintf("Error while Marshalling no error tags: %s", err.Error()) + Log(message) + SendException(message) + } else { + laKubeMonAgentEventsRecord := laKubeMonAgentEvents{ + Computer: Computer, + CollectionTime: start.Format(time.RFC3339), + Category: NoErrorEventCategory, + Level: KubeMonAgentEventInfo, + ClusterId: ResourceID, + ClusterName: ResourceName, + Message: "No errors", + Tags: fmt.Sprintf("%s", tagJson), + } + laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord) + } + } + + if len(laKubeMonAgentEventsRecords) > 0 { + kubeMonAgentEventEntry := KubeMonAgentEventBlob{ + DataType: KubeMonAgentEventDataType, + IPName: IPName, + DataItems: laKubeMonAgentEventsRecords} + + marshalled, err := json.Marshal(kubeMonAgentEventEntry) + + if err != nil { + message := fmt.Sprintf("Error while marshalling kubemonagentevent entry: %s", err.Error()) + Log(message) + SendException(message) + } else { + req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled)) + req.Header.Set("Content-Type", "application/json") + //expensive to do string len for every request, so use a flag + if ResourceCentric == true { + req.Header.Set("x-ms-AzureResourceId", ResourceID) + } + + resp, postError = HTTPClient.Do(req) + elapsed = time.Since(start) + + if postError != nil { + message := fmt.Sprintf("Error when sending kubemonagentevent request %s \n", err.Error()) + Log(message) + Log("Failed to flush %d records after %s", len(laKubeMonAgentEventsRecords), elapsed) + } else if resp == nil || resp.StatusCode != 200 { + if resp != nil { + Log("Status %s Status Code %d", resp.Status, resp.StatusCode) + } + Log("Failed to flush %d records after %s", len(laKubeMonAgentEventsRecords), elapsed) + } else { + numRecords := len(laKubeMonAgentEventsRecords) + Log("Successfully flushed %d records in %s", numRecords, elapsed) + + // Send telemetry to AppInsights resource + SendEvent(KubeMonAgentEventsFlushedEvent, telemetryDimensions) + + } + if resp != nil && resp.Body != nil { + defer resp.Body.Close() + } + } + } + } else { + // Setting this to false to allow for subsequent flushes after the first hour + skipKubeMonEventsFlush = false + } + } +} + //Translates telegraf time series to one or more Azure loganalytics metric(s) func translateTelegrafMetrics(m map[interface{}]interface{}) ([]*laTelegrafMetric, error) { @@ -431,7 +712,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { DataUpdateMutex.Unlock() for _, record := range tailPluginRecords { - containerID, k8sNamespace := GetContainerIDK8sNamespaceFromFileName(ToString(record["filepath"])) + containerID, k8sNamespace, _ := GetContainerIDK8sNamespacePodNameFromFileName(ToString(record["filepath"])) logEntrySource := ToString(record["stream"]) if strings.EqualFold(logEntrySource, "stdout") { @@ -502,6 +783,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { SendException(message) return output.FLB_OK } + req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled)) req.Header.Set("Content-Type", "application/json") //expensive to do string len for every request, so use a flag @@ -552,11 +834,12 @@ func containsKey(currentMap map[string]bool, key string) bool { return c } -// GetContainerIDK8sNamespaceFromFileName Gets the container ID From the file Name +// GetContainerIDK8sNamespacePodNameFromFileName Gets the container ID, k8s namespace and pod name From the file Name // sample filename kube-proxy-dgcx7_kube-system_kube-proxy-8df7e49e9028b60b5b0d0547f409c455a9567946cf763267b7e6fa053ab8c182.log -func GetContainerIDK8sNamespaceFromFileName(filename string) (string, string) { +func GetContainerIDK8sNamespacePodNameFromFileName(filename string) (string, string, string) { id := "" ns := "" + podName := "" start := strings.LastIndex(filename, "-") end := strings.LastIndex(filename, ".") @@ -576,7 +859,16 @@ func GetContainerIDK8sNamespaceFromFileName(filename string) (string, string) { ns = filename[start+1 : end] } - return id, ns + start = strings.Index(filename, "/containers/") + end = strings.Index(filename, "_") + + if start >= end || start == -1 || end == -1 { + podName = "" + } else { + podName = filename[(start + len("/containers/")):end] + } + + return id, ns, podName } // InitializePlugin reads and populates plugin configuration @@ -586,6 +878,12 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { StderrIgnoreNsSet = make(map[string]bool) ImageIDMap = make(map[string]string) NameIDMap = make(map[string]string) + // Keeping the two error hashes separate since we need to keep the config error hash for the lifetime of the container + // whereas the prometheus scrape error hash needs to be refreshed every hour + ConfigErrorEvent = make(map[string]KubeMonAgentEventTags) + PromScrapeErrorEvent = make(map[string]KubeMonAgentEventTags) + // Initilizing this to true to skip the first kubemonagentevent flush since the errors are not populated at this time + skipKubeMonEventsFlush = true pluginConfig, err := ReadConfiguration(pluginConfPath) if err != nil { @@ -640,6 +938,9 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { Log("containerInventoryRefreshInterval = %d \n", containerInventoryRefreshInterval) ContainerImageNameRefreshTicker = time.NewTicker(time.Second * time.Duration(containerInventoryRefreshInterval)) + Log("kubeMonAgentConfigEventFlushInterval = %d \n", kubeMonAgentConfigEventFlushInterval) + KubeMonAgentConfigEventsSendTicker = time.NewTicker(time.Minute * time.Duration(kubeMonAgentConfigEventFlushInterval)) + // Populate Computer field containerHostName, err := ioutil.ReadFile(pluginConfig["container_host_file_path"]) if err != nil { @@ -682,7 +983,11 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { populateExcludedStdoutNamespaces() populateExcludedStderrNamespaces() go updateContainerImageNameMaps() + + // Flush config error records every hour + go flushKubeMonAgentEventRecords() } else { Log("Running in replicaset. Disabling container enrichment caching & updates \n") } + } diff --git a/source/code/go/src/plugins/out_oms.go b/source/code/go/src/plugins/out_oms.go index e9e7124b7..1f1915798 100644 --- a/source/code/go/src/plugins/out_oms.go +++ b/source/code/go/src/plugins/out_oms.go @@ -1,14 +1,14 @@ package main import ( - "github.com/fluent/fluent-bit-go/output" "github.com/Microsoft/ApplicationInsights-Go/appinsights" + "github.com/fluent/fluent-bit-go/output" ) import ( "C" + "os" "strings" "unsafe" - "os" ) //export FLBPluginRegister @@ -61,6 +61,7 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { incomingTag := strings.ToLower(C.GoString(tag)) if strings.Contains(incomingTag, "oms.container.log.flbplugin") { + // This will also include populating cache to be sent as for config events return PushToAppInsightsTraces(records, appinsights.Information, incomingTag) } else if strings.Contains(incomingTag, "oms.container.perf.telegraf") { return PostTelegrafMetricsToLA(records) diff --git a/source/code/go/src/plugins/telemetry.go b/source/code/go/src/plugins/telemetry.go index 4f22b8c03..d5675187f 100644 --- a/source/code/go/src/plugins/telemetry.go +++ b/source/code/go/src/plugins/telemetry.go @@ -198,7 +198,15 @@ func InitializeTelemetryClient(agentVersion string) (int, error) { func PushToAppInsightsTraces(records []map[interface{}]interface{}, severityLevel contracts.SeverityLevel, tag string) int { var logLines []string for _, record := range records { - logLines = append(logLines, ToString(record["log"])) + // If record contains config error or prometheus scraping errors send it to KubeMonAgentEvents table + var logEntry = ToString(record["log"]) + if strings.Contains(logEntry, "config::error") { + populateKubeMonAgentEventHash(record, ConfigError) + } else if strings.Contains(logEntry, "E! [inputs.prometheus]") { + populateKubeMonAgentEventHash(record, PromScrapingError) + } else { + logLines = append(logLines, logEntry) + } } traceEntry := strings.Join(logLines, "\n")