diff --git a/README.md b/README.md index 4674700c4..ff3e2890c 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,31 @@ additional questions or comments. Note : The agent version(s) below has dates (ciprod), which indicate the agent build dates (not release dates) +### 12/03/2019 - +##### Version microsoft/oms:ciprod12032019 Version mcr.microsoft.com/azuremonitor/containerinsights/ciprod:ciprod12032019 +- Fix scheduler for all input plugins +- Fix liveness probe +- Reduce chunk sizes for all fluentD buffers to support larger clusters (nodes & pods) +- Chunk Kubernetes API calls (pods,nodes,events) +- Use HTTP.start instead of HTTP.new +- Merge KubePerf into KubePods & KubeNodes +- Merge KubeServices into KubePod +- Use stream based yajl for JSON parsing +- Health - Query only kube-system pods +- Health - Use keep_if instead of select +- Container log enrichment (turned OFF by default for TimeOfCommand, ContainerName & ContainerImage) +- Application Insights Telemetry - Async +- Fix metricTime to be batch time for all metric input plugins +- Close socket connections properly for DockerAPIClient +- Fix top un handled exceptions in Kubernetes API Client and pod inventory +- Fix retries, wait between retries, chunk size, thread counts to be consistent for all FluentD workflows +- Back-off for containerlog enrichment K8S API calls +- Add new regions (3) for Azure Monitor Custom metrics +- Increase the cpu & memory limits for replica-set to support larger clusters (nodes & pods) +- Move to Ubuntu 18.04 LTS +- Support for Kubernetes 1.16 +- Use ifconfig for detecting network connectivity issues + ### 10/11/2019 - ##### Version microsoft/oms:ciprod10112019 Version mcr.microsoft.com/azuremonitor/containerinsights/ciprod:ciprod10112019 - Update prometheus config scraping capability to restrict collecting metrics from pods in specific namespaces. diff --git a/installer/conf/container.conf b/installer/conf/container.conf old mode 100755 new mode 100644 index f9540bde8..93c250fbb --- a/installer/conf/container.conf +++ b/installer/conf/container.conf @@ -11,7 +11,7 @@ type containerinventory tag oms.containerinsights.containerinventory - run_interval 60s + run_interval 60 log_level debug @@ -19,7 +19,7 @@ type cadvisorperf tag oms.api.cadvisorperf - run_interval 60s + run_interval 60 log_level debug @@ -36,7 +36,7 @@ #custom_metrics_mdm filter plugin type filter_cadvisor2mdm - custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral + custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast metrics_to_collect cpuUsageNanoCores,memoryWorkingSetBytes,memoryRssBytes log_level info @@ -45,30 +45,28 @@ type out_oms log_level debug num_threads 5 - buffer_chunk_limit 20m buffer_type file buffer_path %STATE_DIR_WS%/out_oms_containerinventory*.buffer - buffer_queue_limit 20 buffer_queue_full_action drop_oldest_chunk + buffer_chunk_limit 4m flush_interval 20s retry_limit 10 - retry_wait 30s - max_retry_wait 9m + retry_wait 5s + max_retry_wait 5m type out_oms log_level debug num_threads 5 - buffer_chunk_limit 20m buffer_type file buffer_path %STATE_DIR_WS%/out_oms_cadvisorperf*.buffer - buffer_queue_limit 20 buffer_queue_full_action drop_oldest_chunk + buffer_chunk_limit 4m flush_interval 20s retry_limit 10 - retry_wait 30s - max_retry_wait 9m + retry_wait 5s + max_retry_wait 5m @@ -80,6 +78,14 @@ heartbeat_type tcp skip_network_error_at_init true expire_dns_cache 600s + buffer_queue_full_action drop_oldest_chunk + buffer_type file + buffer_path %STATE_DIR_WS%/out_health_forward*.buffer + buffer_chunk_limit 3m + flush_interval 20s + retry_limit 10 + retry_wait 5s + max_retry_wait 5m host "#{ENV['HEALTHMODEL_REPLICASET_SERVICE_SERVICE_HOST']}" @@ -96,14 +102,13 @@ type out_mdm log_level debug num_threads 5 - buffer_chunk_limit 20m buffer_type file buffer_path %STATE_DIR_WS%/out_mdm_cdvisorperf*.buffer - buffer_queue_limit 20 buffer_queue_full_action drop_oldest_chunk + buffer_chunk_limit 4m flush_interval 20s retry_limit 10 - retry_wait 30s - max_retry_wait 9m + retry_wait 5s + max_retry_wait 5m retry_mdm_post_wait_minutes 60 diff --git a/installer/conf/kube.conf b/installer/conf/kube.conf index 40f4ac880..207780442 100644 --- a/installer/conf/kube.conf +++ b/installer/conf/kube.conf @@ -1,250 +1,218 @@ # Fluentd config file for OMS Docker - cluster components (kubeAPI) - - type forward - port "#{ENV['HEALTHMODEL_REPLICASET_SERVICE_SERVICE_PORT']}" - bind 0.0.0.0 - - -#Kubernetes pod inventory - - type kubepodinventory - tag oms.containerinsights.KubePodInventory - run_interval 60s - log_level debug - - -#Kubernetes events - - type kubeevents - tag oms.containerinsights.KubeEvents - run_interval 60s - log_level debug - - -#Kubernetes logs - - type kubelogs - tag oms.api.KubeLogs - run_interval 60s - - -#Kubernetes services - - type kubeservices - tag oms.containerinsights.KubeServices - run_interval 60s - log_level debug - - -#Kubernetes Nodes - - type kubenodeinventory - tag oms.containerinsights.KubeNodeInventory - run_interval 60s - log_level debug - - -#Kubernetes perf - - type kubeperf - tag oms.api.KubePerf - run_interval 60s - log_level debug - - -#Kubernetes health - - type kubehealth - tag kubehealth.ReplicaSet - run_interval 60s - log_level debug - - -#cadvisor perf- Windows nodes - - type wincadvisorperf - tag oms.api.wincadvisorperf - run_interval 60s - log_level debug - - - - type filter_inventory2mdm - custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral - log_level info - - -#custom_metrics_mdm filter plugin for perf data from windows nodes - - type filter_cadvisor2mdm - custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral - metrics_to_collect cpuUsageNanoCores,memoryWorkingSetBytes - log_level info - - - - type filter_health_model_builder - - - type out_mdm - log_level debug - num_threads 5 - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/out_mdm_cdvisorperf*.buffer - buffer_queue_limit 20 - buffer_queue_full_action drop_oldest_chunk - flush_interval 20s - retry_limit 10 - retry_wait 30s - max_retry_wait 9m - retry_mdm_post_wait_minutes 60 - - - - type out_oms - log_level debug - num_threads 5 - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_kubepods*.buffer - buffer_queue_limit 20 - buffer_queue_full_action drop_oldest_chunk - flush_interval 20s - retry_limit 10 - retry_wait 30s - max_retry_wait 9m - - - - type out_oms - log_level debug - num_threads 5 - buffer_chunk_limit 5m - buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_kubeevents*.buffer - buffer_queue_limit 10 - buffer_queue_full_action drop_oldest_chunk - flush_interval 20s - retry_limit 10 - retry_wait 30s - max_retry_wait 9m - - - - type out_oms_api - log_level debug - buffer_chunk_limit 10m - buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_api_kubernetes_logs*.buffer - buffer_queue_limit 10 - flush_interval 20s - retry_limit 10 - retry_wait 30s - - - - - - type out_oms - log_level debug - num_threads 5 - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_kubeservices*.buffer - buffer_queue_limit 20 - buffer_queue_full_action drop_oldest_chunk - flush_interval 20s - retry_limit 10 - retry_wait 30s - max_retry_wait 9m - - - - type out_oms - log_level debug - num_threads 5 - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/state/out_oms_kubenodes*.buffer - buffer_queue_limit 20 - buffer_queue_full_action drop_oldest_chunk - flush_interval 20s - retry_limit 10 - retry_wait 30s - max_retry_wait 9m - - - - type out_oms - log_level debug - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_containernodeinventory*.buffer - buffer_queue_limit 20 - flush_interval 20s - retry_limit 10 - retry_wait 15s - max_retry_wait 9m - - - - type out_oms - log_level debug - num_threads 5 - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_kubeperf*.buffer - buffer_queue_limit 20 - buffer_queue_full_action drop_oldest_chunk - flush_interval 20s - retry_limit 10 - retry_wait 30s - max_retry_wait 9m - - - - type out_mdm - log_level debug - num_threads 5 - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/out_mdm_*.buffer - buffer_queue_limit 20 - buffer_queue_full_action drop_oldest_chunk - flush_interval 20s - retry_limit 10 - retry_wait 30s - max_retry_wait 9m - retry_mdm_post_wait_minutes 60 - - - - type out_oms - log_level debug - num_threads 5 - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_api_wincadvisorperf*.buffer - buffer_queue_limit 20 - buffer_queue_full_action drop_oldest_chunk - flush_interval 20s - retry_limit 10 - retry_wait 30s - max_retry_wait 9m - - - - type out_oms - log_level debug - num_threads 5 - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_kubehealth*.buffer - buffer_queue_limit 20 - buffer_queue_full_action drop_oldest_chunk - flush_interval 20s - retry_limit 10 - retry_wait 30s - max_retry_wait 9m - \ No newline at end of file + #fluent forward plugin + + type forward + port "#{ENV['HEALTHMODEL_REPLICASET_SERVICE_SERVICE_PORT']}" + bind 0.0.0.0 + chunk_size_limit 4m + + + #Kubernetes pod inventory + + type kubepodinventory + tag oms.containerinsights.KubePodInventory + run_interval 60 + log_level debug + + + #Kubernetes events + + type kubeevents + tag oms.containerinsights.KubeEvents + run_interval 60 + log_level debug + + + #Kubernetes Nodes + + type kubenodeinventory + tag oms.containerinsights.KubeNodeInventory + run_interval 60 + log_level debug + + + #Kubernetes health + + type kubehealth + tag kubehealth.ReplicaSet + run_interval 60 + log_level debug + + + #cadvisor perf- Windows nodes + + type wincadvisorperf + tag oms.api.wincadvisorperf + run_interval 60 + log_level debug + + + + type filter_inventory2mdm + custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast + log_level info + + + #custom_metrics_mdm filter plugin for perf data from windows nodes + + type filter_cadvisor2mdm + custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast + metrics_to_collect cpuUsageNanoCores,memoryWorkingSetBytes + log_level info + + + #health model aggregation filter + + type filter_health_model_builder + + + + type out_oms + log_level debug + num_threads 5 + buffer_chunk_limit 4m + buffer_type file + buffer_path %STATE_DIR_WS%/out_oms_kubepods*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 5s + max_retry_wait 5m + + + + type out_oms + log_level debug + num_threads 5 + buffer_chunk_limit 4m + buffer_type file + buffer_path %STATE_DIR_WS%/out_oms_kubeevents*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 5s + max_retry_wait 5m + + + + type out_oms + log_level debug + num_threads 2 + buffer_chunk_limit 4m + buffer_type file + buffer_path %STATE_DIR_WS%/out_oms_kubeservices*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 5s + max_retry_wait 5m + + + + type out_oms + log_level debug + num_threads 5 + buffer_chunk_limit 4m + buffer_type file + buffer_path %STATE_DIR_WS%/state/out_oms_kubenodes*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 5s + max_retry_wait 5m + + + + type out_oms + log_level debug + num_threads 3 + buffer_chunk_limit 4m + buffer_type file + buffer_path %STATE_DIR_WS%/out_oms_containernodeinventory*.buffer + buffer_queue_limit 20 + flush_interval 20s + retry_limit 10 + retry_wait 5s + max_retry_wait 5m + + + + type out_oms + log_level debug + num_threads 5 + buffer_chunk_limit 4m + buffer_type file + buffer_path %STATE_DIR_WS%/out_oms_kubeperf*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 5s + max_retry_wait 5m + + + + type out_mdm + log_level debug + num_threads 5 + buffer_chunk_limit 4m + buffer_type file + buffer_path %STATE_DIR_WS%/out_mdm_*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 5s + max_retry_wait 5m + retry_mdm_post_wait_minutes 60 + + + + type out_oms + log_level debug + num_threads 5 + buffer_chunk_limit 4m + buffer_type file + buffer_path %STATE_DIR_WS%/out_oms_api_wincadvisorperf*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 5s + max_retry_wait 5m + + + + type out_mdm + log_level debug + num_threads 5 + buffer_chunk_limit 4m + buffer_type file + buffer_path %STATE_DIR_WS%/out_mdm_cdvisorperf*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 5s + max_retry_wait 5m + retry_mdm_post_wait_minutes 60 + + + + type out_oms + log_level debug + num_threads 5 + buffer_chunk_limit 4m + buffer_type file + buffer_path %STATE_DIR_WS%/out_oms_kubehealth*.buffer + buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk + flush_interval 20s + retry_limit 10 + retry_wait 5s + max_retry_wait 5m + \ No newline at end of file diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data index 4ebc4f338..60de5af18 100644 --- a/installer/datafiles/base_container.data +++ b/installer/datafiles/base_container.data @@ -26,16 +26,13 @@ MAINTAINER: 'Microsoft Corporation' /opt/microsoft/omsagent/plugin/in_kube_podinventory.rb; source/code/plugin/in_kube_podinventory.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_events.rb; source/code/plugin/in_kube_events.rb; 644; root; root -/opt/microsoft/omsagent/plugin/in_kube_logs.rb; source/code/plugin/in_kube_logs.rb; 644; root; root /opt/microsoft/omsagent/plugin/KubernetesApiClient.rb; source/code/plugin/KubernetesApiClient.rb; 644; root; root /etc/opt/microsoft/docker-cimprov/container.conf; installer/conf/container.conf; 644; root; root /opt/microsoft/omsagent/plugin/CAdvisorMetricsAPIClient.rb; source/code/plugin/CAdvisorMetricsAPIClient.rb; 644; root; root -/opt/microsoft/omsagent/plugin/in_kube_perf.rb; source/code/plugin/in_kube_perf.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_cadvisor_perf.rb; source/code/plugin/in_cadvisor_perf.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_win_cadvisor_perf.rb; source/code/plugin/in_win_cadvisor_perf.rb; 644; root; root -/opt/microsoft/omsagent/plugin/in_kube_services.rb; source/code/plugin/in_kube_services.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_nodes.rb; source/code/plugin/in_kube_nodes.rb; 644; root; root /opt/microsoft/omsagent/plugin/filter_inventory2mdm.rb; source/code/plugin/filter_inventory2mdm.rb; 644; root; root /opt/microsoft/omsagent/plugin/CustomMetricsUtils.rb; source/code/plugin/CustomMetricsUtils.rb; 644; root; root @@ -143,12 +140,10 @@ MAINTAINER: 'Microsoft Corporation' /opt/microsoft/omsagent/plugin/health/health_model_definition_parser.rb; source/code/plugin/health/health_model_definition_parser.rb; 644; root; root /opt/microsoft/omsagent/plugin/health/health_monitor_helpers.rb; source/code/plugin/health/health_monitor_helpers.rb; 644; root; root /opt/microsoft/omsagent/plugin/health/health_monitor_optimizer.rb; source/code/plugin/health/health_monitor_optimizer.rb; 644; root; root -/opt/microsoft/omsagent/plugin/health/health_monitor_helpers.rb; source/code/plugin/health/health_monitor_helpers.rb; 644; root; root /opt/microsoft/omsagent/plugin/health/health_monitor_provider.rb; source/code/plugin/health/health_monitor_provider.rb; 644; root; root /opt/microsoft/omsagent/plugin/health/health_monitor_record.rb; source/code/plugin/health/health_monitor_record.rb; 644; root; root /opt/microsoft/omsagent/plugin/health/health_monitor_state.rb; source/code/plugin/health/health_monitor_state.rb; 644; root; root /opt/microsoft/omsagent/plugin/health/health_monitor_telemetry.rb; source/code/plugin/health/health_monitor_telemetry.rb; 644; root; root -/opt/microsoft/omsagent/plugin/health/health_monitor_helpers.rb; source/code/plugin/health/health_monitor_helpers.rb; 644; root; root /opt/microsoft/omsagent/plugin/health/health_monitor_utils.rb; source/code/plugin/health/health_monitor_utils.rb; 644; root; root /opt/microsoft/omsagent/plugin/health/health_signal_reducer.rb; source/code/plugin/health/health_signal_reducer.rb; 644; root; root /opt/microsoft/omsagent/plugin/health/monitor_factory.rb; source/code/plugin/health/monitor_factory.rb; 644; root; root diff --git a/installer/scripts/livenessprobe.sh b/installer/scripts/livenessprobe.sh index cb7e8a0ba..e957b4bdf 100644 --- a/installer/scripts/livenessprobe.sh +++ b/installer/scripts/livenessprobe.sh @@ -1,7 +1,7 @@ #!/bin/bash #test to exit non zero value -(ps -ef | grep omsagent | grep -v "grep") && (ps -ef | grep td-agent-bit | grep -v "grep") +(ps -ef | grep omsagent- | grep -v "grep") && (ps -ef | grep td-agent-bit | grep -v "grep") if [ $? -eq 0 ] && [ ! -s "inotifyoutput.txt" ] then # inotifyoutput file is empty and the grep commands for omsagent and td-agent-bit succeeded diff --git a/installer/scripts/tomlparser.rb b/installer/scripts/tomlparser.rb index cd16cbf9b..ba67d023a 100644 --- a/installer/scripts/tomlparser.rb +++ b/installer/scripts/tomlparser.rb @@ -15,6 +15,7 @@ @logTailPath = "/var/log/containers/*.log" @logExclusionRegexPattern = "(^((?!stdout|stderr).)*$)" @excludePath = "*.csv2" #some invalid path +@enrichContainerLogs = false # Use parser to parse the configmap toml file to a ruby structure def parseConfigMap @@ -117,6 +118,16 @@ def populateSettingValuesFromConfigMap(parsedConfig) rescue => errorStr ConfigParseErrorLogger.logError("Exception while reading config map settings for cluster level environment variable collection - #{errorStr}, using defaults, please check config map for errors") end + + #Get container log enrichment setting + begin + if !parsedConfig[:log_collection_settings][:enrich_container_logs].nil? && !parsedConfig[:log_collection_settings][:enrich_container_logs][:enabled].nil? + @enrichContainerLogs = parsedConfig[:log_collection_settings][:enrich_container_logs][:enabled] + puts "config::Using config map setting for cluster level container log enrichment" + end + rescue => errorStr + ConfigParseErrorLogger.logError("Exception while reading config map settings for cluster level container log enrichment - #{errorStr}, using defaults, please check config map for errors") + end end end @@ -156,6 +167,7 @@ def populateSettingValuesFromConfigMap(parsedConfig) file.write("export AZMON_STDERR_EXCLUDED_NAMESPACES=#{@stderrExcludeNamespaces}\n") file.write("export AZMON_CLUSTER_COLLECT_ENV_VAR=#{@collectClusterEnvVariables}\n") file.write("export AZMON_CLUSTER_LOG_TAIL_EXCLUDE_PATH=#{@excludePath}\n") + file.write("export AZMON_CLUSTER_CONTAINER_LOG_ENRICH=#{@enrichContainerLogs}\n") # Close file after writing all environment variables file.close puts "Both stdout & stderr log collection are turned off for namespaces: '#{@excludePath}' " diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go index 5a323d7e0..834726c93 100644 --- a/source/code/go/src/plugins/oms.go +++ b/source/code/go/src/plugins/oms.go @@ -92,6 +92,8 @@ var ( ResourceName string //KubeMonAgentEvents skip first flush skipKubeMonEventsFlush bool + // enrich container logs (when true this will add the fields - timeofcommand, containername & containerimage) + enrichContainerLogs bool ) var ( @@ -746,16 +748,30 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { stringMap["Name"] = val } - dataItem := DataItem{ - ID: stringMap["Id"], - LogEntry: stringMap["LogEntry"], - LogEntrySource: stringMap["LogEntrySource"], - LogEntryTimeStamp: stringMap["LogEntryTimeStamp"], - LogEntryTimeOfCommand: start.Format(time.RFC3339), - SourceSystem: stringMap["SourceSystem"], - Computer: Computer, - Image: stringMap["Image"], - Name: stringMap["Name"], + var dataItem DataItem + if enrichContainerLogs == true { + dataItem = DataItem{ + ID: stringMap["Id"], + LogEntry: stringMap["LogEntry"], + LogEntrySource: stringMap["LogEntrySource"], + LogEntryTimeStamp: stringMap["LogEntryTimeStamp"], + LogEntryTimeOfCommand: start.Format(time.RFC3339), + SourceSystem: stringMap["SourceSystem"], + Computer: Computer, + Image: stringMap["Image"], + Name: stringMap["Name"], + } + } else { // dont collect timeofcommand field as its part of container log enrivhment + dataItem = DataItem{ + ID: stringMap["Id"], + LogEntry: stringMap["LogEntry"], + LogEntrySource: stringMap["LogEntrySource"], + LogEntryTimeStamp: stringMap["LogEntryTimeStamp"], + SourceSystem: stringMap["SourceSystem"], + Computer: Computer, + Image: stringMap["Image"], + Name: stringMap["Name"], + } } FlushedRecordsSize += float64(len(stringMap["LogEntry"])) @@ -892,6 +908,15 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { // Initilizing this to true to skip the first kubemonagentevent flush since the errors are not populated at this time skipKubeMonEventsFlush = true + enrichContainerLogsSetting := os.Getenv("AZMON_CLUSTER_CONTAINER_LOG_ENRICH") + if (strings.Compare(enrichContainerLogsSetting, "true") == 0) { + enrichContainerLogs = true + Log("ContainerLogEnrichment=true \n") + } else { + enrichContainerLogs = false + Log("ContainerLogEnrichment=false \n") + } + pluginConfig, err := ReadConfiguration(pluginConfPath) if err != nil { message := fmt.Sprintf("Error Reading plugin config path : %s \n", err.Error()) @@ -989,7 +1014,12 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { if strings.Compare(strings.ToLower(os.Getenv("CONTROLLER_TYPE")), "daemonset") == 0 { populateExcludedStdoutNamespaces() populateExcludedStderrNamespaces() - go updateContainerImageNameMaps() + if enrichContainerLogs == true { + Log("ContainerLogEnrichment=true; starting goroutine to update containerimagenamemaps \n") + go updateContainerImageNameMaps() + } else { + Log("ContainerLogEnrichment=false \n") + } // Flush config error records every hour go flushKubeMonAgentEventRecords() diff --git a/source/code/plugin/ApplicationInsightsUtility.rb b/source/code/plugin/ApplicationInsightsUtility.rb index 85b424e69..f7bd806a0 100644 --- a/source/code/plugin/ApplicationInsightsUtility.rb +++ b/source/code/plugin/ApplicationInsightsUtility.rb @@ -6,7 +6,7 @@ class ApplicationInsightsUtility require_relative "omslog" require_relative "DockerApiClient" require_relative "oms_common" - require "json" + require 'yajl/json_gem' require "base64" @@HeartBeat = "HeartBeatEvent" @@ -73,16 +73,37 @@ def initializeUtility() @@Tc = ApplicationInsights::TelemetryClient.new elsif !encodedAppInsightsKey.nil? decodedAppInsightsKey = Base64.decode64(encodedAppInsightsKey) + #override ai endpoint if its available otherwise use default. if appInsightsEndpoint && !appInsightsEndpoint.nil? && !appInsightsEndpoint.empty? $log.info("AppInsightsUtility: Telemetry client uses overrided endpoint url : #{appInsightsEndpoint}") - telemetrySynchronousSender = ApplicationInsights::Channel::SynchronousSender.new appInsightsEndpoint - telemetrySynchronousQueue = ApplicationInsights::Channel::SynchronousQueue.new(telemetrySynchronousSender) - telemetryChannel = ApplicationInsights::Channel::TelemetryChannel.new nil, telemetrySynchronousQueue + #telemetrySynchronousSender = ApplicationInsights::Channel::SynchronousSender.new appInsightsEndpoint + #telemetrySynchronousQueue = ApplicationInsights::Channel::SynchronousQueue.new(telemetrySynchronousSender) + #telemetryChannel = ApplicationInsights::Channel::TelemetryChannel.new nil, telemetrySynchronousQueue + sender = ApplicationInsights::Channel::AsynchronousSender.new appInsightsEndpoint + queue = ApplicationInsights::Channel::AsynchronousQueue.new sender + channel = ApplicationInsights::Channel::TelemetryChannel.new nil, queue @@Tc = ApplicationInsights::TelemetryClient.new decodedAppInsightsKey, telemetryChannel else - @@Tc = ApplicationInsights::TelemetryClient.new decodedAppInsightsKey + sender = ApplicationInsights::Channel::AsynchronousSender.new + queue = ApplicationInsights::Channel::AsynchronousQueue.new sender + channel = ApplicationInsights::Channel::TelemetryChannel.new nil, queue + @@Tc = ApplicationInsights::TelemetryClient.new decodedAppInsightsKey, channel end + # The below are default recommended values. If you change these, ensure you test telemetry flow fully + + # flush telemetry if we have 10 or more telemetry items in our queue + #@@Tc.channel.queue.max_queue_length = 10 + + # send telemetry to the service in batches of 5 + #@@Tc.channel.sender.send_buffer_size = 5 + + # the background worker thread will be active for 5 seconds before it shuts down. if + # during this time items are picked up from the queue, the timer is reset. + #@@Tc.channel.sender.send_time = 5 + + # the background worker thread will poll the queue every 0.5 seconds for new items + #@@Tc.channel.sender.send_interval = 0.5 end rescue => errorStr $log.warn("Exception in AppInsightsUtility: initilizeUtility - error: #{errorStr}") @@ -102,8 +123,7 @@ def sendHeartBeatEvent(pluginName) eventName = pluginName + @@HeartBeat if !(@@Tc.nil?) @@Tc.track_event eventName, :properties => @@CustomProperties - @@Tc.flush - $log.info("AppInsights Heartbeat Telemetry sent successfully") + $log.info("AppInsights Heartbeat Telemetry put successfully into the queue") end rescue => errorStr $log.warn("Exception in AppInsightsUtility: sendHeartBeatEvent - error: #{errorStr}") @@ -116,8 +136,7 @@ def sendLastProcessedContainerInventoryCountMetric(pluginName, properties) @@Tc.track_metric "LastProcessedContainerInventoryCount", properties["ContainerCount"], :kind => ApplicationInsights::Channel::Contracts::DataPointType::MEASUREMENT, :properties => @@CustomProperties - @@Tc.flush - $log.info("AppInsights Container Count Telemetry sent successfully") + $log.info("AppInsights Container Count Telemetry sput successfully into the queue") end rescue => errorStr $log.warn("Exception in AppInsightsUtility: sendCustomMetric - error: #{errorStr}") @@ -138,7 +157,6 @@ def sendCustomEvent(eventName, properties) end if !(@@Tc.nil?) @@Tc.track_event eventName, :properties => telemetryProps - @@Tc.flush $log.info("AppInsights Custom Event #{eventName} sent successfully") end rescue => errorStr @@ -162,8 +180,7 @@ def sendExceptionTelemetry(errorStr, properties = nil) end if !(@@Tc.nil?) @@Tc.track_exception errorStr, :properties => telemetryProps - @@Tc.flush - $log.info("AppInsights Exception Telemetry sent successfully") + $log.info("AppInsights Exception Telemetry put successfully into the queue") end rescue => errorStr $log.warn("Exception in AppInsightsUtility: sendExceptionTelemetry - error: #{errorStr}") @@ -209,8 +226,7 @@ def sendMetricTelemetry(metricName, metricValue, properties) @@Tc.track_metric metricName, metricValue, :kind => ApplicationInsights::Channel::Contracts::DataPointType::MEASUREMENT, :properties => telemetryProps - @@Tc.flush - $log.info("AppInsights metric Telemetry #{metricName} sent successfully") + $log.info("AppInsights metric Telemetry #{metricName} put successfully into the queue") end rescue => errorStr $log.warn("Exception in AppInsightsUtility: sendMetricTelemetry - error: #{errorStr}") diff --git a/source/code/plugin/CAdvisorMetricsAPIClient.rb b/source/code/plugin/CAdvisorMetricsAPIClient.rb index 09499b4cf..be61b8b8f 100644 --- a/source/code/plugin/CAdvisorMetricsAPIClient.rb +++ b/source/code/plugin/CAdvisorMetricsAPIClient.rb @@ -2,12 +2,13 @@ # frozen_string_literal: true class CAdvisorMetricsAPIClient - require "json" + require 'yajl/json_gem' require "logger" require "net/http" require "net/https" require "uri" require "date" + require "time" require_relative "oms_common" require_relative "KubernetesApiClient" @@ -21,6 +22,7 @@ class CAdvisorMetricsAPIClient @clusterLogTailExcludPath = ENV["AZMON_CLUSTER_LOG_TAIL_EXCLUDE_PATH"] @clusterLogTailPath = ENV["AZMON_LOG_TAIL_PATH"] @clusterAgentSchemaVersion = ENV["AZMON_AGENT_CFG_SCHEMA_VERSION"] + @clusterContainerLogEnrich = ENV["AZMON_CLUSTER_CONTAINER_LOG_ENRICH"] @dsPromInterval = ENV["TELEMETRY_DS_PROM_INTERVAL"] @dsPromFieldPassCount = ENV["TELEMETRY_DS_PROM_FIELDPASS_LENGTH"] @@ -64,12 +66,11 @@ def getSummaryStatsFromCAdvisor(winNode) cAdvisorUri = getCAdvisorUri(winNode) if !cAdvisorUri.nil? uri = URI.parse(cAdvisorUri) - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = false - - cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri) - response = http.request(cAdvisorApiRequest) - @Log.info "Got response code #{response.code} from #{uri.request_uri}" + Net::HTTP.start(uri.host, uri.port, :use_ssl => false, :open_timeout => 20, :read_timeout => 40 ) do |http| + cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri) + response = http.request(cAdvisorApiRequest) + @Log.info "Got response code #{response.code} from #{uri.request_uri}" + end end rescue => error @Log.warn("CAdvisor api request failed: #{error}") @@ -103,7 +104,7 @@ def getCAdvisorUri(winNode) end end - def getMetrics(winNode = nil) + def getMetrics(winNode: nil, metricTime: Time.now.utc.iso8601 ) metricDataItems = [] begin cAdvisorStats = getSummaryStatsFromCAdvisor(winNode) @@ -122,27 +123,27 @@ def getMetrics(winNode = nil) operatingSystem = "Linux" end if !metricInfo.nil? - metricDataItems.concat(getContainerMemoryMetricItems(metricInfo, hostName, "workingSetBytes", "memoryWorkingSetBytes")) - metricDataItems.concat(getContainerStartTimeMetricItems(metricInfo, hostName, "restartTimeEpoch")) + metricDataItems.concat(getContainerMemoryMetricItems(metricInfo, hostName, "workingSetBytes", "memoryWorkingSetBytes", metricTime)) + metricDataItems.concat(getContainerStartTimeMetricItems(metricInfo, hostName, "restartTimeEpoch", metricTime)) if operatingSystem == "Linux" - metricDataItems.concat(getContainerCpuMetricItems(metricInfo, hostName, "usageNanoCores", "cpuUsageNanoCores")) - metricDataItems.concat(getContainerMemoryMetricItems(metricInfo, hostName, "rssBytes", "memoryRssBytes")) - metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "memory", "rssBytes", "memoryRssBytes")) + metricDataItems.concat(getContainerCpuMetricItems(metricInfo, hostName, "usageNanoCores", "cpuUsageNanoCores", metricTime)) + metricDataItems.concat(getContainerMemoryMetricItems(metricInfo, hostName, "rssBytes", "memoryRssBytes", metricTime)) + metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "memory", "rssBytes", "memoryRssBytes", metricTime)) elsif operatingSystem == "Windows" - containerCpuUsageNanoSecondsRate = getContainerCpuMetricItemRate(metricInfo, hostName, "usageCoreNanoSeconds", "cpuUsageNanoCores") + containerCpuUsageNanoSecondsRate = getContainerCpuMetricItemRate(metricInfo, hostName, "usageCoreNanoSeconds", "cpuUsageNanoCores", metricTime) if containerCpuUsageNanoSecondsRate && !containerCpuUsageNanoSecondsRate.empty? && !containerCpuUsageNanoSecondsRate.nil? metricDataItems.concat(containerCpuUsageNanoSecondsRate) end end - cpuUsageNanoSecondsRate = getNodeMetricItemRate(metricInfo, hostName, "cpu", "usageCoreNanoSeconds", "cpuUsageNanoCores", operatingSystem) + cpuUsageNanoSecondsRate = getNodeMetricItemRate(metricInfo, hostName, "cpu", "usageCoreNanoSeconds", "cpuUsageNanoCores", operatingSystem, metricTime) if cpuUsageNanoSecondsRate && !cpuUsageNanoSecondsRate.empty? && !cpuUsageNanoSecondsRate.nil? metricDataItems.push(cpuUsageNanoSecondsRate) end - metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "memory", "workingSetBytes", "memoryWorkingSetBytes")) + metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "memory", "workingSetBytes", "memoryWorkingSetBytes", metricTime)) - metricDataItems.push(getNodeLastRebootTimeMetric(metricInfo, hostName, "restartTimeEpoch")) + metricDataItems.push(getNodeLastRebootTimeMetric(metricInfo, hostName, "restartTimeEpoch", metricTime)) # Disabling networkRxRate and networkTxRate since we dont use it as of now. #metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "network", "rxBytes", "networkRxBytes")) @@ -165,7 +166,7 @@ def getMetrics(winNode = nil) return metricDataItems end - def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, metricNametoReturn) + def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, metricNametoReturn, metricPollTime) metricItems = [] clusterId = KubernetesApiClient.getClusterId timeDifference = (DateTime.now.to_time.to_i - @@telemetryCpuMetricTimeTracker).abs @@ -182,7 +183,7 @@ def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, met #cpu metric containerName = container["name"] metricValue = container["cpu"][cpuMetricNameToCollect] - metricTime = container["cpu"]["time"] + metricTime = metricPollTime #container["cpu"]["time"] metricItem = {} metricItem["DataItems"] = [] @@ -219,6 +220,7 @@ def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, met telemetryProps["clusterlogtailexcludepath"] = @clusterLogTailExcludPath telemetryProps["clusterLogTailPath"] = @clusterLogTailPath telemetryProps["clusterAgentSchemaVersion"] = @clusterAgentSchemaVersion + telemetryProps["clusterCLEnrich"] = @clusterContainerLogEnrich end #telemetry about prometheus metric collections settings for daemonset if (File.file?(@promConfigMountPath)) @@ -272,7 +274,7 @@ def resetWinContainerIdCache end # usageNanoCores doesnt exist for windows nodes. Hence need to compute this from usageCoreNanoSeconds - def getContainerCpuMetricItemRate(metricJSON, hostName, cpuMetricNameToCollect, metricNametoReturn) + def getContainerCpuMetricItemRate(metricJSON, hostName, cpuMetricNameToCollect, metricNametoReturn, metricPollTime) metricItems = [] clusterId = KubernetesApiClient.getClusterId timeDifference = (DateTime.now.to_time.to_i - @@telemetryCpuMetricTimeTracker).abs @@ -292,7 +294,7 @@ def getContainerCpuMetricItemRate(metricJSON, hostName, cpuMetricNameToCollect, containerCount += 1 containerName = container["name"] metricValue = container["cpu"][cpuMetricNameToCollect] - metricTime = container["cpu"]["time"] + metricTime = metricPollTime #container["cpu"]["time"] metricItem = {} metricItem["DataItems"] = [] @@ -366,7 +368,7 @@ def getContainerCpuMetricItemRate(metricJSON, hostName, cpuMetricNameToCollect, return metricItems end - def getContainerMemoryMetricItems(metricJSON, hostName, memoryMetricNameToCollect, metricNametoReturn) + def getContainerMemoryMetricItems(metricJSON, hostName, memoryMetricNameToCollect, metricNametoReturn, metricPollTime) metricItems = [] clusterId = KubernetesApiClient.getClusterId timeDifference = (DateTime.now.to_time.to_i - @@telemetryMemoryMetricTimeTracker).abs @@ -381,7 +383,7 @@ def getContainerMemoryMetricItems(metricJSON, hostName, memoryMetricNameToCollec pod["containers"].each do |container| containerName = container["name"] metricValue = container["memory"][memoryMetricNameToCollect] - metricTime = container["memory"]["time"] + metricTime = metricPollTime #container["memory"]["time"] metricItem = {} metricItem["DataItems"] = [] @@ -431,7 +433,7 @@ def getContainerMemoryMetricItems(metricJSON, hostName, memoryMetricNameToCollec return metricItems end - def getNodeMetricItem(metricJSON, hostName, metricCategory, metricNameToCollect, metricNametoReturn) + def getNodeMetricItem(metricJSON, hostName, metricCategory, metricNameToCollect, metricNametoReturn, metricPollTime) metricItem = {} clusterId = KubernetesApiClient.getClusterId begin @@ -441,7 +443,7 @@ def getNodeMetricItem(metricJSON, hostName, metricCategory, metricNameToCollect, if !node[metricCategory].nil? metricValue = node[metricCategory][metricNameToCollect] - metricTime = node[metricCategory]["time"] + metricTime = metricPollTime #node[metricCategory]["time"] metricItem["DataItems"] = [] @@ -467,7 +469,7 @@ def getNodeMetricItem(metricJSON, hostName, metricCategory, metricNameToCollect, return metricItem end - def getNodeMetricItemRate(metricJSON, hostName, metricCategory, metricNameToCollect, metricNametoReturn, operatingSystem) + def getNodeMetricItemRate(metricJSON, hostName, metricCategory, metricNameToCollect, metricNametoReturn, operatingSystem, metricPollTime) metricItem = {} clusterId = KubernetesApiClient.getClusterId begin @@ -477,7 +479,7 @@ def getNodeMetricItemRate(metricJSON, hostName, metricCategory, metricNameToColl if !node[metricCategory].nil? metricValue = node[metricCategory][metricNameToCollect] - metricTime = node[metricCategory]["time"] + metricTime = metricPollTime #node[metricCategory]["time"] # if !(metricNameToCollect == "rxBytes" || metricNameToCollect == "txBytes" || metricNameToCollect == "usageCoreNanoSeconds") # @Log.warn("getNodeMetricItemRate : rateMetric is supported only for rxBytes, txBytes & usageCoreNanoSeconds and not for #{metricNameToCollect}") @@ -584,7 +586,7 @@ def getNodeMetricItemRate(metricJSON, hostName, metricCategory, metricNameToColl return metricItem end - def getNodeLastRebootTimeMetric(metricJSON, hostName, metricNametoReturn) + def getNodeLastRebootTimeMetric(metricJSON, hostName, metricNametoReturn, metricPollTime) metricItem = {} clusterId = KubernetesApiClient.getClusterId @@ -594,7 +596,7 @@ def getNodeLastRebootTimeMetric(metricJSON, hostName, metricNametoReturn) nodeName = node["nodeName"] metricValue = node["startTime"] - metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z + metricTime = metricPollTime #Time.now.utc.iso8601 #2018-01-30T19:36:14Z metricItem["DataItems"] = [] @@ -620,10 +622,10 @@ def getNodeLastRebootTimeMetric(metricJSON, hostName, metricNametoReturn) return metricItem end - def getContainerStartTimeMetricItems(metricJSON, hostName, metricNametoReturn) + def getContainerStartTimeMetricItems(metricJSON, hostName, metricNametoReturn, metricPollTime) metricItems = [] clusterId = KubernetesApiClient.getClusterId - currentTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z + #currentTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z begin metricInfo = metricJSON metricInfo["pods"].each do |pod| @@ -632,7 +634,7 @@ def getContainerStartTimeMetricItems(metricJSON, hostName, metricNametoReturn) pod["containers"].each do |container| containerName = container["name"] metricValue = container["startTime"] - metricTime = currentTime + metricTime = metricPollTime #currentTime metricItem = {} metricItem["DataItems"] = [] diff --git a/source/code/plugin/ContainerInventoryState.rb b/source/code/plugin/ContainerInventoryState.rb index 7e5ca18e8..170fa65e3 100644 --- a/source/code/plugin/ContainerInventoryState.rb +++ b/source/code/plugin/ContainerInventoryState.rb @@ -2,7 +2,7 @@ # frozen_string_literal: true class ContainerInventoryState - require 'json' + require 'yajl/json_gem' require_relative 'omslog' @@InventoryDirectory = "/var/opt/microsoft/docker-cimprov/state/ContainerInventory/" diff --git a/source/code/plugin/DockerApiClient.rb b/source/code/plugin/DockerApiClient.rb index ee2742dd4..f2828b357 100644 --- a/source/code/plugin/DockerApiClient.rb +++ b/source/code/plugin/DockerApiClient.rb @@ -3,7 +3,7 @@ class DockerApiClient require "socket" - require "json" + require "yajl/json_gem" require "timeout" require_relative "omslog" require_relative "DockerApiRestHelper" @@ -40,7 +40,6 @@ def getResponse(request, isMultiJson, isVersion) end break if (isVersion) ? (responseChunk.length < @@ChunkSize) : (responseChunk.end_with? "0\r\n\r\n") end - socket.close return (isTimeOut) ? nil : parseResponse(dockerResponse, isMultiJson) rescue => errorStr $log.warn("Socket call failed for request: #{request} error: #{errorStr} , isMultiJson: #{isMultiJson} @ #{Time.now.utc.iso8601}") @@ -49,6 +48,10 @@ def getResponse(request, isMultiJson, isVersion) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end return nil + ensure + if !socket.nil? + socket.close + end end end diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb index be1a51791..e52c77884 100644 --- a/source/code/plugin/KubernetesApiClient.rb +++ b/source/code/plugin/KubernetesApiClient.rb @@ -2,7 +2,7 @@ # frozen_string_literal: true class KubernetesApiClient - require "json" + require "yajl/json_gem" require "logger" require "net/http" require "net/https" @@ -12,6 +12,8 @@ class KubernetesApiClient require_relative "oms_common" @@ApiVersion = "v1" + @@ApiVersionApps = "v1" + @@ApiGroupApps = "apps" @@CaFile = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt" @@ClusterName = nil @@ClusterId = nil @@ -30,35 +32,33 @@ def initialize end class << self - def getKubeResourceInfo(resource, api_version: nil) + def getKubeResourceInfo(resource, api_group: nil) headers = {} response = nil - @Log.info "Getting Kube resource api_version #{api_version}" - @Log.info resource + @Log.info "Getting Kube resource: #{resource}" begin - resourceUri = getResourceUri(resource, api_version: api_version) + resourceUri = getResourceUri(resource, api_group) if !resourceUri.nil? uri = URI.parse(resourceUri) - http = Net::HTTP.new(uri.host, uri.port) - http.use_ssl = true if !File.exist?(@@CaFile) raise "#{@@CaFile} doesnt exist" else - http.ca_file = @@CaFile if File.exist?(@@CaFile) + Net::HTTP.start(uri.host, uri.port, :use_ssl => true, :ca_file => @@CaFile, :verify_mode => OpenSSL::SSL::VERIFY_PEER, :open_timeout => 20, :read_timeout => 40) do |http| + kubeApiRequest = Net::HTTP::Get.new(uri.request_uri) + kubeApiRequest["Authorization"] = "Bearer " + getTokenStr + @Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}" + response = http.request(kubeApiRequest) + @Log.info "KubernetesAPIClient::getKubeResourceInfo : Got response of #{response.code} for #{uri.request_uri} @ #{Time.now.utc.iso8601}" + end end - http.verify_mode = OpenSSL::SSL::VERIFY_PEER - - kubeApiRequest = Net::HTTP::Get.new(uri.request_uri) - kubeApiRequest["Authorization"] = "Bearer " + getTokenStr - @Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}" - response = http.request(kubeApiRequest) - @Log.info "KubernetesAPIClient::getKubeResourceInfo : Got response of #{response.code} for #{uri.request_uri} @ #{Time.now.utc.iso8601}" end rescue => error @Log.warn("kubernetes api request failed: #{error} for #{resource} @ #{Time.now.utc.iso8601}") end - if (!response.nil? && !response.body.nil? && response.body.empty?) - @Log.warn("KubernetesAPIClient::getKubeResourceInfo : Got empty response from Kube API for #{resource} @ #{Time.now.utc.iso8601}") + if (!response.nil?) + if (!response.body.nil? && response.body.empty?) + @Log.warn("KubernetesAPIClient::getKubeResourceInfo : Got empty response from Kube API for #{resource} @ #{Time.now.utc.iso8601}") + end end return response end @@ -85,14 +85,14 @@ def getClusterRegion end end - def getResourceUri(resource, api_version: nil) + def getResourceUri(resource, api_group) begin if ENV["KUBERNETES_SERVICE_HOST"] && ENV["KUBERNETES_PORT_443_TCP_PORT"] - if !api_version.nil? - return "https://#{ENV["KUBERNETES_SERVICE_HOST"]}:#{ENV["KUBERNETES_PORT_443_TCP_PORT"]}/apis/" + api_version + "/" + resource - end - api_version = @@ApiVersion - return "https://#{ENV["KUBERNETES_SERVICE_HOST"]}:#{ENV["KUBERNETES_PORT_443_TCP_PORT"]}/api/" + api_version + "/" + resource + if api_group.nil? + return "https://#{ENV["KUBERNETES_SERVICE_HOST"]}:#{ENV["KUBERNETES_PORT_443_TCP_PORT"]}/api/" + @@ApiVersion + "/" + resource + elsif api_group == @@ApiGroupApps + return "https://#{ENV["KUBERNETES_SERVICE_HOST"]}:#{ENV["KUBERNETES_PORT_443_TCP_PORT"]}/apis/apps/" + @@ApiVersionApps + "/" + resource + end else @Log.warn ("Kubernetes environment variable not set KUBERNETES_SERVICE_HOST: #{ENV["KUBERNETES_SERVICE_HOST"]} KUBERNETES_PORT_443_TCP_PORT: #{ENV["KUBERNETES_PORT_443_TCP_PORT"]}. Unable to form resourceUri") return nil @@ -335,7 +335,7 @@ def getContainerLogsSinceTime(namespace, pod, container, since, showTimeStamp) return containerLogs end - def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn) + def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601) metricItems = [] begin clusterId = getClusterId @@ -370,7 +370,7 @@ def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricName nodeName = pod["spec"]["nodeName"] podContainers.each do |container| containerName = container["name"] - metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z + #metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z if (!container["resources"].nil? && !container["resources"].empty? && !container["resources"][metricCategory].nil? && !container["resources"][metricCategory][metricNameToCollect].nil?) metricValue = getMetricNumericValue(metricNameToCollect, container["resources"][metricCategory][metricNameToCollect]) @@ -430,14 +430,14 @@ def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricName return metricItems end #getContainerResourceRequestAndLimits - def parseNodeLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn) + def parseNodeLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601) metricItems = [] begin metricInfo = metricJSON clusterId = getClusterId #Since we are getting all node data at the same time and kubernetes doesnt specify a timestamp for the capacity and allocation metrics, #if we are coming up with the time it should be same for all nodes - metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z + #metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z metricInfo["items"].each do |node| if (!node["status"][metricCategory].nil?) @@ -548,5 +548,29 @@ def getMetricNumericValue(metricName, metricVal) end return metricValue end # getMetricNumericValue + + def getResourcesAndContinuationToken(uri) + continuationToken = nil + resourceInventory = nil + begin + @Log.info "KubernetesApiClient::getResourcesAndContinuationToken : Getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}" + resourceInfo = getKubeResourceInfo(uri) + @Log.info "KubernetesApiClient::getResourcesAndContinuationToken : Done getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}" + if !resourceInfo.nil? + @Log.info "KubernetesApiClient::getResourcesAndContinuationToken:Start:Parsing data for #{uri} using yajl @ #{Time.now.utc.iso8601}" + resourceInventory = Yajl::Parser.parse(StringIO.new(resourceInfo.body)) + @Log.info "KubernetesApiClient::getResourcesAndContinuationToken:End:Parsing data for #{uri} using yajl @ #{Time.now.utc.iso8601}" + resourceInfo = nil + end + if (!resourceInventory.nil? && !resourceInventory["metadata"].nil?) + continuationToken = resourceInventory["metadata"]["continue"] + end + rescue => errorStr + @Log.warn "KubernetesApiClient::getResourcesAndContinuationToken:Failed in get resources for #{uri} and continuation token: #{errorStr}" + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + resourceInventory = nil + end + return continuationToken, resourceInventory + end #getResourcesAndContinuationToken end end diff --git a/source/code/plugin/filter_cadvisor2mdm.rb b/source/code/plugin/filter_cadvisor2mdm.rb index a6e643e45..f14a1369b 100644 --- a/source/code/plugin/filter_cadvisor2mdm.rb +++ b/source/code/plugin/filter_cadvisor2mdm.rb @@ -4,7 +4,7 @@ module Fluent require 'logger' - require 'json' + require 'yajl/json_gem' require_relative 'oms_common' require_relative 'CustomMetricsUtils' diff --git a/source/code/plugin/filter_cadvisor_health_container.rb b/source/code/plugin/filter_cadvisor_health_container.rb index 2eccd125f..93d50e20f 100644 --- a/source/code/plugin/filter_cadvisor_health_container.rb +++ b/source/code/plugin/filter_cadvisor_health_container.rb @@ -3,7 +3,7 @@ module Fluent require 'logger' - require 'json' + require 'yajl/json_gem' require_relative 'oms_common' require_relative "ApplicationInsightsUtility" Dir[File.join(__dir__, './health', '*.rb')].each { |file| require file } diff --git a/source/code/plugin/filter_cadvisor_health_node.rb b/source/code/plugin/filter_cadvisor_health_node.rb index d2f735cd1..c6280db60 100644 --- a/source/code/plugin/filter_cadvisor_health_node.rb +++ b/source/code/plugin/filter_cadvisor_health_node.rb @@ -3,7 +3,7 @@ module Fluent require 'logger' - require 'json' + require 'yajl/json_gem' require_relative 'oms_common' require_relative "ApplicationInsightsUtility" require_relative "KubernetesApiClient" diff --git a/source/code/plugin/filter_docker_log.rb b/source/code/plugin/filter_docker_log.rb index 7ffd333e3..b80f4c204 100644 --- a/source/code/plugin/filter_docker_log.rb +++ b/source/code/plugin/filter_docker_log.rb @@ -5,6 +5,7 @@ module Fluent require 'logger' require 'socket' + require 'yajl/json_gem' class DockerLogFilter < Filter Plugin.register_filter('filter_docker_log', self) diff --git a/source/code/plugin/filter_health_model_builder.rb b/source/code/plugin/filter_health_model_builder.rb index 47ce7a631..1c451ea38 100644 --- a/source/code/plugin/filter_health_model_builder.rb +++ b/source/code/plugin/filter_health_model_builder.rb @@ -4,7 +4,7 @@ module Fluent require 'logger' - require 'json' + require 'yajl/json_gem' Dir[File.join(__dir__, './health', '*.rb')].each { |file| require file } @@ -97,12 +97,11 @@ def filter_stream(tag, es) } end container_records_aggregator = HealthContainerCpuMemoryAggregator.new(@resources, @provider) - deduped_records = container_records_aggregator.dedupe_records(container_records) if @container_cpu_memory_records.nil? @log.info "@container_cpu_memory_records was not initialized" @container_cpu_memory_records = [] #in some clusters, this is null, so initialize it again. end - @container_cpu_memory_records.push(*deduped_records) # push the records for aggregation later + @container_cpu_memory_records.push(*container_records) # push the records for aggregation later return MultiEventStream.new elsif tag.start_with?("kubehealth.ReplicaSet") records = [] @@ -114,7 +113,8 @@ def filter_stream(tag, es) aggregated_container_records = [] if !@container_cpu_memory_records.nil? && !@container_cpu_memory_records.empty? container_records_aggregator = HealthContainerCpuMemoryAggregator.new(@resources, @provider) - container_records_aggregator.aggregate(@container_cpu_memory_records) + deduped_records = container_records_aggregator.dedupe_records(@container_cpu_memory_records) + container_records_aggregator.aggregate(deduped_records) container_records_aggregator.compute_state aggregated_container_records = container_records_aggregator.get_records end diff --git a/source/code/plugin/filter_inventory2mdm.rb b/source/code/plugin/filter_inventory2mdm.rb index 30f6f911a..422b4b54a 100644 --- a/source/code/plugin/filter_inventory2mdm.rb +++ b/source/code/plugin/filter_inventory2mdm.rb @@ -4,7 +4,7 @@ module Fluent require 'logger' - require 'json' + require 'yajl/json_gem' require_relative 'oms_common' require_relative 'CustomMetricsUtils' diff --git a/source/code/plugin/health/aggregate_monitor.rb b/source/code/plugin/health/aggregate_monitor.rb index 794f716ce..a774478e7 100644 --- a/source/code/plugin/health/aggregate_monitor.rb +++ b/source/code/plugin/health/aggregate_monitor.rb @@ -1,7 +1,13 @@ # frozen_string_literal: true require_relative 'health_model_constants' -require 'json' +require 'yajl/json_gem' + +# Require only when running inside container. +# otherwise unit tests will fail due to ApplicationInsightsUtility dependency on base omsagent ruby files. If you have your dev machine starting with omsagent-rs, then GOOD LUCK! +if Socket.gethostname.start_with?('omsagent-rs') + require_relative '../ApplicationInsightsUtility' +end module HealthModel class AggregateMonitor @@ -16,6 +22,8 @@ class AggregateMonitor MonitorState::NONE => 5 } + @@telemetry_sent_hash = {} + # constructor def initialize( monitor_id, @@ -127,17 +135,43 @@ def calculate_percentage_state(monitor_set) #sort #TODO: What if sorted_filtered is empty? is that even possible? + log = HealthMonitorHelpers.get_log_handle sorted_filtered = sort_filter_member_monitors(monitor_set) state_threshold = @aggregation_algorithm_params['state_threshold'].to_f - size = sorted_filtered.size + if sorted_filtered.nil? + size = 0 + else + size = sorted_filtered.size + end + if size == 1 @state = sorted_filtered[0].state else count = ((state_threshold*size)/100).ceil index = size - count - @state = sorted_filtered[index].state + if sorted_filtered.nil? || sorted_filtered[index].nil? + @state = HealthMonitorStates::UNKNOWN + if !@@telemetry_sent_hash.key?(@monitor_instance_id) + log.debug "Adding to telemetry sent hash #{@monitor_instance_id}" + @@telemetry_sent_hash[@monitor_instance_id] = true + log.info "Index: #{index} size: #{size} Count: #{count}" + custom_error_event_map = {} + custom_error_event_map["count"] = count + custom_error_event_map["index"] = index + custom_error_event_map["size"] = size + if !sorted_filtered.nil? + sorted_filtered.each_index{|i| + custom_error_event_map[i] = sorted_filtered[i].state + } + end + ApplicationInsightsUtility.sendCustomEvent("PercentageStateCalculationErrorEvent", custom_error_event_map) + end + else + @state = sorted_filtered[index].state + end + @state end end @@ -184,7 +218,7 @@ def sort_filter_member_monitors(monitor_set) member_monitors.push(member_monitor) } - filtered = member_monitors.select{|monitor| monitor.state != MonitorState::NONE} + filtered = member_monitors.keep_if{|monitor| monitor.state != MonitorState::NONE} sorted = filtered.sort_by{ |monitor| [@@sort_key_order[monitor.state]] } return sorted diff --git a/source/code/plugin/health/cluster_health_state.rb b/source/code/plugin/health/cluster_health_state.rb index fa9cb42b2..e46d0bf5f 100644 --- a/source/code/plugin/health/cluster_health_state.rb +++ b/source/code/plugin/health/cluster_health_state.rb @@ -3,6 +3,7 @@ require "net/http" require "net/https" require "uri" +require 'yajl/json_gem' module HealthModel class ClusterHealthState diff --git a/source/code/plugin/health/health_container_cpu_memory_aggregator.rb b/source/code/plugin/health/health_container_cpu_memory_aggregator.rb index f6b57e0ae..e93c66c14 100644 --- a/source/code/plugin/health/health_container_cpu_memory_aggregator.rb +++ b/source/code/plugin/health/health_container_cpu_memory_aggregator.rb @@ -49,6 +49,9 @@ class HealthContainerCpuMemoryAggregator @@limit_is_array_event_sent = {} @@WORKLOAD_CONTAINER_COUNT_EMPTY_EVENT = "WorkloadContainerCountEmptyEvent" @@LIMIT_IS_ARRAY_EVENT = "ResourceLimitIsAnArrayEvent" + @@cpu_last_sent_monitors = {} + @@memory_last_sent_monitors = {} + def initialize(resources, provider) @pod_uid_lookup = resources.get_pod_uid_lookup @workload_container_count = resources.get_workload_container_count @@ -61,8 +64,8 @@ def initialize(resources, provider) def dedupe_records(container_records) cpu_deduped_instances = {} memory_deduped_instances = {} - container_records = container_records.select{|record| record['CounterName'] == @@memory_counter_name || record['CounterName'] == @@cpu_counter_name} - + container_records = container_records.keep_if{|record| record['CounterName'] == @@memory_counter_name || record['CounterName'] == @@cpu_counter_name} + container_records.each do |record| begin instance_name = record["InstanceName"] @@ -81,12 +84,13 @@ def dedupe_records(container_records) else r = resource_instances[instance_name] if record["Timestamp"] > r["Timestamp"] - @log.info "Dropping older record" + @log.info "Dropping older record for instance #{instance_name} new: #{record["Timestamp"]} old: #{r["Timestamp"]}" resource_instances[instance_name] = record end end rescue => e @log.info "Exception when deduping record #{record}" + next end end return cpu_deduped_instances.values.concat(memory_deduped_instances.values) @@ -94,7 +98,7 @@ def dedupe_records(container_records) def aggregate(container_records) #filter and select only cpuUsageNanoCores and memoryRssBytes - container_records = container_records.select{|record| record['CounterName'] == @@memory_counter_name || record['CounterName'] == @@cpu_counter_name} + container_records = container_records.keep_if{|record| record['CounterName'] == @@memory_counter_name || record['CounterName'] == @@cpu_counter_name} # poduid lookup has poduid/cname --> workload_name, namespace, cpu_limit, memory limit mapping # from the container records, extract the poduid/cname, get the values from poduid_lookup, and aggregate based on namespace_workload_cname container_records.each do |record| @@ -137,7 +141,6 @@ def aggregate(container_records) end container_instance_record = {} - pod_name = @pod_uid_lookup[lookup_key]["pod_name"] #append the record to the hash # append only if the record is not a duplicate record @@ -160,13 +163,14 @@ def compute_state() # if limits not set, set state to warning # if all records present, sort in descending order of metric, compute index based on StateThresholdPercentage, get the state (pass/fail/warn) based on monitor state (Using [Fail/Warn]ThresholdPercentage, and set the state) @memory_records.each{|k,v| + @@memory_last_sent_monitors.delete(k) #remove from last sent list if the record is present in the current set of signals calculate_monitor_state(v, @provider.get_config(MonitorId::CONTAINER_MEMORY_MONITOR_ID)) } @cpu_records.each{|k,v| + @@cpu_last_sent_monitors.delete(k) #remove from last sent list if the record is present in the current set of signals calculate_monitor_state(v, @provider.get_config(MonitorId::CONTAINER_CPU_MONITOR_ID)) } - @log.info "Finished computing state" end @@ -175,7 +179,6 @@ def get_records container_cpu_memory_records = [] @cpu_records.each{|resource_key, record| - cpu_limit_mc = 1.0 if record["limit"].is_a?(Numeric) cpu_limit_mc = record["limit"]/1000000.to_f @@ -221,6 +224,42 @@ def get_records container_cpu_memory_records.push(health_record) } + # If all records that were sent previously are present in current set, this will not be executed + if @@cpu_last_sent_monitors.keys.size != 0 + @@cpu_last_sent_monitors.keys.each{|key| + begin + @log.info "Container CPU monitor #{key} not present in current set. Sending none state transition" + tokens = key.split('_') + namespace = tokens[0] + workload_name = "#{tokens[0]}~~#{tokens[1]}" + container = tokens[2] + health_monitor_record = { + "timestamp" => time_now, + "state" => HealthMonitorStates::NONE, + "details" => { + "reason" => "No record received for workload #{workload_name}", + "workload_name" => workload_name, + "namespace" => namespace, + "container" => container + } + } + + monitor_instance_id = HealthMonitorHelpers.get_monitor_instance_id(MonitorId::CONTAINER_CPU_MONITOR_ID, key.split('_')) #container_cpu_utilization-namespace-workload-container + + health_record = {} + health_record[HealthMonitorRecordFields::MONITOR_ID] = MonitorId::CONTAINER_CPU_MONITOR_ID + health_record[HealthMonitorRecordFields::MONITOR_INSTANCE_ID] = monitor_instance_id + health_record[HealthMonitorRecordFields::DETAILS] = health_monitor_record + health_record[HealthMonitorRecordFields::TIME_GENERATED] = time_now + health_record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED] = time_now + container_cpu_memory_records.push(health_record) + rescue => e + @log.info "Error when trying to create NONE State transition signal for #{key} for monitor #{monitor_instance_id} #{e.message}" + next + end + } + end + @memory_records.each{|resource_key, record| health_monitor_record = { "timestamp" => time_now, @@ -245,6 +284,52 @@ def get_records health_record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED] = time_now container_cpu_memory_records.push(health_record) } + + # If all records that were sent previously are present in current set, this will not be executed + if @@memory_last_sent_monitors.keys.size != 0 + @@memory_last_sent_monitors.keys.each{|key| + begin + @log.info "Container Memory monitor #{key} not present in current set. Sending none state transition" + tokens = key.split('_') + namespace = tokens[0] + workload_name = "#{tokens[0]}~~#{tokens[1]}" + container = tokens[2] + health_monitor_record = { + "timestamp" => time_now, + "state" => HealthMonitorStates::NONE, + "details" => { + "reason" => "No record received for workload #{workload_name}", + "workload_name" => workload_name, + "namespace" => namespace, + "container" => container + } + } + monitor_instance_id = HealthMonitorHelpers.get_monitor_instance_id(MonitorId::CONTAINER_MEMORY_MONITOR_ID, key.split('_')) #container_cpu_utilization-namespace-workload-container + health_record = {} + health_record[HealthMonitorRecordFields::MONITOR_ID] = MonitorId::CONTAINER_MEMORY_MONITOR_ID + health_record[HealthMonitorRecordFields::MONITOR_INSTANCE_ID] = monitor_instance_id + health_record[HealthMonitorRecordFields::DETAILS] = health_monitor_record + health_record[HealthMonitorRecordFields::TIME_GENERATED] = time_now + health_record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED] = time_now + container_cpu_memory_records.push(health_record) + rescue => e + @log.info "Error when trying to create NONE State transition signal for #{key} for monitor #{monitor_instance_id} #{e.message}" + next + end + } + end + + #reset the last sent monitors list + @@memory_last_sent_monitors = {} + @@cpu_last_sent_monitors = {} + + # add the current set of signals for comparison in next iteration + @cpu_records.keys.each{|k| + @@cpu_last_sent_monitors[k] = true + } + @memory_records.keys.each{|k| + @@memory_last_sent_monitors[k] = true + } return container_cpu_memory_records end @@ -298,4 +383,4 @@ def calculate_container_instance_state(counter_value, limit, config) end end end -end \ No newline at end of file +end diff --git a/source/code/plugin/health/health_container_cpu_memory_record_formatter.rb b/source/code/plugin/health/health_container_cpu_memory_record_formatter.rb index 0c3f061f1..12c72a120 100644 --- a/source/code/plugin/health/health_container_cpu_memory_record_formatter.rb +++ b/source/code/plugin/health/health_container_cpu_memory_record_formatter.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require 'yajl/json_gem' + module HealthModel class HealthContainerCpuMemoryRecordFormatter diff --git a/source/code/plugin/health/health_hierarchy_builder.rb b/source/code/plugin/health/health_hierarchy_builder.rb index bb48e083b..a59020996 100644 --- a/source/code/plugin/health/health_hierarchy_builder.rb +++ b/source/code/plugin/health/health_hierarchy_builder.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true +require 'yajl/json_gem' -require 'json' module HealthModel class HealthHierarchyBuilder diff --git a/source/code/plugin/health/health_model_definition_parser.rb b/source/code/plugin/health/health_model_definition_parser.rb index 907bc1fd1..c185e5389 100644 --- a/source/code/plugin/health/health_model_definition_parser.rb +++ b/source/code/plugin/health/health_model_definition_parser.rb @@ -3,7 +3,7 @@ Class to parse the health model definition. The definition expresses the relationship between monitors, how to roll up to an aggregate monitor, and what labels to "pass on" to the parent monitor =end -require 'json' +require 'yajl/json_gem' module HealthModel class HealthModelDefinitionParser @@ -29,6 +29,7 @@ def parse_file labels = entry['labels'] if entry['labels'] aggregation_algorithm = entry['aggregation_algorithm'] if entry['aggregation_algorithm'] aggregation_algorithm_params = entry['aggregation_algorithm_params'] if entry['aggregation_algorithm_params'] + default_parent_monitor_id = entry['default_parent_monitor_id'] if entry['default_parent_monitor_id'] if parent_monitor_id.is_a?(Array) conditions = [] parent_monitor_id.each{|condition| @@ -38,7 +39,7 @@ def parse_file parent_id = condition['id'] conditions.push({"key" => key, "operator" => operator, "value" => value, "parent_id" => parent_id}) } - @health_model_definition[monitor_id] = {"conditions" => conditions, "labels" => labels, "aggregation_algorithm" => aggregation_algorithm, "aggregation_algorithm_params" =>aggregation_algorithm_params} + @health_model_definition[monitor_id] = {"conditions" => conditions, "labels" => labels, "aggregation_algorithm" => aggregation_algorithm, "aggregation_algorithm_params" =>aggregation_algorithm_params, "default_parent_monitor_id" => default_parent_monitor_id} elsif parent_monitor_id.is_a?(String) @health_model_definition[monitor_id] = {"parent_monitor_id" => parent_monitor_id, "labels" => labels, "aggregation_algorithm" => aggregation_algorithm, "aggregation_algorithm_params" =>aggregation_algorithm_params} elsif parent_monitor_id.nil? diff --git a/source/code/plugin/health/health_monitor_optimizer.rb b/source/code/plugin/health/health_monitor_optimizer.rb index a63d59abf..d87540941 100644 --- a/source/code/plugin/health/health_monitor_optimizer.rb +++ b/source/code/plugin/health/health_monitor_optimizer.rb @@ -1,4 +1,5 @@ # frozen_string_literal: true +require 'yajl/json_gem' module HealthModel class HealthMonitorOptimizer #ctor diff --git a/source/code/plugin/health/health_monitor_provider.rb b/source/code/plugin/health/health_monitor_provider.rb index b36c46370..8e1d11143 100644 --- a/source/code/plugin/health/health_monitor_provider.rb +++ b/source/code/plugin/health/health_monitor_provider.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true require_relative 'health_model_constants' +require 'yajl/json_gem' module HealthModel class HealthMonitorProvider diff --git a/source/code/plugin/health/health_monitor_state.rb b/source/code/plugin/health/health_monitor_state.rb index 16f8bedc4..110793eeb 100644 --- a/source/code/plugin/health/health_monitor_state.rb +++ b/source/code/plugin/health/health_monitor_state.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true require_relative 'health_model_constants' +require 'yajl/json_gem' module HealthModel diff --git a/source/code/plugin/health/health_monitor_utils.rb b/source/code/plugin/health/health_monitor_utils.rb index 2fa2d3a52..13d1416b1 100644 --- a/source/code/plugin/health/health_monitor_utils.rb +++ b/source/code/plugin/health/health_monitor_utils.rb @@ -2,6 +2,7 @@ require 'logger' require 'digest' require_relative 'health_model_constants' +require 'yajl/json_gem' module HealthModel # static class that provides a bunch of utility methods diff --git a/source/code/plugin/health/unit_monitor.rb b/source/code/plugin/health/unit_monitor.rb index 6454007b6..8e2de210b 100644 --- a/source/code/plugin/health/unit_monitor.rb +++ b/source/code/plugin/health/unit_monitor.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true require_relative 'health_model_constants' -require 'json' +require 'yajl/json_gem' module HealthModel class UnitMonitor diff --git a/source/code/plugin/in_cadvisor_perf.rb b/source/code/plugin/in_cadvisor_perf.rb index 810fb512f..96aa66aa1 100644 --- a/source/code/plugin/in_cadvisor_perf.rb +++ b/source/code/plugin/in_cadvisor_perf.rb @@ -9,14 +9,15 @@ class CAdvisor_Perf_Input < Input def initialize super require "yaml" - require "json" + require 'yajl/json_gem' + require "time" require_relative "CAdvisorMetricsAPIClient" require_relative "oms_common" require_relative "omslog" end - config_param :run_interval, :time, :default => "1m" + config_param :run_interval, :time, :default => 60 config_param :tag, :string, :default => "oms.api.cadvisorperf" config_param :mdmtag, :string, :default => "mdm.cadvisorperf" config_param :nodehealthtag, :string, :default => "kubehealth.DaemonSet.Node" @@ -46,10 +47,12 @@ def shutdown end def enumerate() - time = Time.now.to_f + currentTime = Time.now + time = currentTime.to_f + batchTime = currentTime.utc.iso8601 begin eventStream = MultiEventStream.new - metricData = CAdvisorMetricsAPIClient.getMetrics() + metricData = CAdvisorMetricsAPIClient.getMetrics(winNode: nil, metricTime: batchTime ) metricData.each do |record| record["DataType"] = "LINUX_PERF_BLOB" record["IPName"] = "LogManagement" @@ -74,14 +77,25 @@ def enumerate() def run_periodic @mutex.lock done = @finished + @nextTimeToRun = Time.now + @waitTimeout = @run_interval until done - @condition.wait(@mutex, @run_interval) + @nextTimeToRun = @nextTimeToRun + @run_interval + @now = Time.now + if @nextTimeToRun <= @now + @waitTimeout = 1 + @nextTimeToRun = @now + else + @waitTimeout = @nextTimeToRun - @now + end + @condition.wait(@mutex, @waitTimeout) done = @finished @mutex.unlock if !done begin - $log.info("in_cadvisor_perf::run_periodic @ #{Time.now.utc.iso8601}") + $log.info("in_cadvisor_perf::run_periodic.enumerate.start @ #{Time.now.utc.iso8601}") enumerate + $log.info("in_cadvisor_perf::run_periodic.enumerate.end @ #{Time.now.utc.iso8601}") rescue => errorStr $log.warn "in_cadvisor_perf::run_periodic: enumerate Failed to retrieve cadvisor perf metrics: #{errorStr}" end diff --git a/source/code/plugin/in_containerinventory.rb b/source/code/plugin/in_containerinventory.rb index ccf61ab2e..d107047b4 100644 --- a/source/code/plugin/in_containerinventory.rb +++ b/source/code/plugin/in_containerinventory.rb @@ -13,14 +13,15 @@ class Container_Inventory_Input < Input def initialize super - require "json" + require 'yajl/json_gem' + require "time" require_relative "DockerApiClient" require_relative "ContainerInventoryState" require_relative "ApplicationInsightsUtility" require_relative "omslog" end - config_param :run_interval, :time, :default => "1m" + config_param :run_interval, :time, :default => 60 config_param :tag, :string, :default => "oms.containerinsights.containerinventory" def configure(conf) @@ -259,14 +260,25 @@ def enumerate def run_periodic @mutex.lock done = @finished + @nextTimeToRun = Time.now + @waitTimeout = @run_interval until done - @condition.wait(@mutex, @run_interval) + @nextTimeToRun = @nextTimeToRun + @run_interval + @now = Time.now + if @nextTimeToRun <= @now + @waitTimeout = 1 + @nextTimeToRun = @now + else + @waitTimeout = @nextTimeToRun - @now + end + @condition.wait(@mutex, @waitTimeout) done = @finished @mutex.unlock if !done begin - $log.info("in_container_inventory::run_periodic @ #{Time.now.utc.iso8601}") + $log.info("in_container_inventory::run_periodic.enumerate.start @ #{Time.now.utc.iso8601}") enumerate + $log.info("in_container_inventory::run_periodic.enumerate.end @ #{Time.now.utc.iso8601}") rescue => errorStr $log.warn "in_container_inventory::run_periodic: Failed in enumerate container inventory: #{errorStr}" end diff --git a/source/code/plugin/in_containerlog_sudo_tail.rb b/source/code/plugin/in_containerlog_sudo_tail.rb deleted file mode 100644 index 8faa260d0..000000000 --- a/source/code/plugin/in_containerlog_sudo_tail.rb +++ /dev/null @@ -1,189 +0,0 @@ - -require 'yajl' -require 'fluent/input' -require 'fluent/event' -require 'fluent/config/error' -require 'fluent/parser' -require 'open3' -require 'json' -require_relative 'omslog' -require_relative 'KubernetesApiClient' - -module Fluent - class ContainerLogSudoTail < Input - Plugin.register_input('containerlog_sudo_tail', self) - - def initialize - super - @command = nil - @paths = [] - #Using this to construct the file path for all every container json log file. - #Example container log file path -> /var/lib/docker/containers/{ContainerID}/{ContainerID}-json.log - #We have read permission on this file but don't have execute permission on the below mentioned path. Hence wildcard character searches to find the container ID's doesn't work. - @containerLogFilePath = "/var/lib/docker/containers/" - #This folder contains a list of all the containers running/stopped and we're using it to get all the container ID's which will be needed for the log file path below - #TODO : Use generic path from docker REST endpoint and find a way to mount the correct folder in the omsagent.yaml - @containerIDFilePath = "/var/opt/microsoft/docker-cimprov/state/ContainerInventory/*" - @@systemPodsNamespace = 'kube-system' - @@getSystemPodsTimeIntervalSecs = 300 #refresh system container list every 5 minutes - @@lastSystemPodsGetTime = nil; - @@systemContainerIDList = Hash.new - @@disableKubeSystemLogCollection = ENV['DISABLE_KUBE_SYSTEM_LOG_COLLECTION'] - if !@@disableKubeSystemLogCollection.nil? && !@@disableKubeSystemLogCollection.empty? && @@disableKubeSystemLogCollection.casecmp('true') == 0 - @@disableKubeSystemLogCollection = 'true' - $log.info("in_container_sudo_tail : System container log collection is disabled") - else - @@disableKubeSystemLogCollection = 'false' - $log.info("in_container_sudo_tail : System container log collection is enabled") - end - end - - attr_accessor :command - - #The format used to map the program output to the incoming event. - config_param :format, :string, default: 'none' - - #Tag of the event. - config_param :tag, :string, default: nil - - #Fluentd will record the position it last read into this file. - config_param :pos_file, :string, default: nil - - #The interval time between periodic program runs. - config_param :run_interval, :time, default: nil - - BASE_DIR = File.dirname(File.expand_path('..', __FILE__)) - RUBY_DIR = BASE_DIR + '/ruby/bin/ruby ' - TAILSCRIPT = BASE_DIR + '/plugin/containerlogtailfilereader.rb ' - - def configure(conf) - super - unless @pos_file - raise ConfigError, "'pos_file' is required to keep track of file" - end - - unless @tag - raise ConfigError, "'tag' is required on sudo tail" - end - - unless @run_interval - raise ConfigError, "'run_interval' is required for periodic tailing" - end - - @parser = Plugin.new_parser(conf['format']) - @parser.configure(conf) - end - - def start - @finished = false - @thread = Thread.new(&method(:run_periodic)) - end - - def shutdown - @finished = true - @thread.join - end - - def receive_data(line) - es = MultiEventStream.new - begin - line.chomp! # remove \n - @parser.parse(line) { |time, record| - if time && record - es.add(time, record) - else - $log.warn "pattern doesn't match: #{line.inspect}" - end - unless es.empty? - tag=@tag - router.emit_stream(tag, es) - end - } - rescue => e - $log.warn line.dump, error: e.to_s - $log.debug_backtrace(e.backtrace) - end - end - - def receive_log(line) - $log.warn "#{line}" if line.start_with?('WARN') - $log.error "#{line}" if line.start_with?('ERROR') - $log.info "#{line}" if line.start_with?('INFO') - end - - def readable_path(path) - if system("sudo test -r #{path}") - OMS::Log.info_once("Following tail of #{path}") - return path - else - OMS::Log.warn_once("#{path} is not readable. Cannot tail the file.") - return "" - end - end - - def set_system_command - timeNow = DateTime.now - cName = "Unkown" - tempContainerInfo = {} - paths = "" - - #if we are on agent & system containers log collection is disabled, get system containerIDs to exclude logs from containers in system containers namespace from being tailed - if !KubernetesApiClient.isNodeMaster && @@disableKubeSystemLogCollection.casecmp('true') == 0 - if @@lastSystemPodsGetTime.nil? || ((timeNow - @@lastSystemPodsGetTime)*24*60*60).to_i >= @@getSystemPodsTimeIntervalSecs - $log.info("in_container_sudo_tail : System Container list last refreshed at #{@@lastSystemPodsGetTime} - refreshing now at #{timeNow}") - sysContainers = KubernetesApiClient.getContainerIDs(@@systemPodsNamespace) - #BugBug - https://msecg.visualstudio.com/OMS/_workitems/edit/215107 - we get 200 with empty payloaf from time to time - if (!sysContainers.nil? && !sysContainers.empty?) - @@systemContainerIDList = sysContainers - else - $log.info("in_container_sudo_tail : System Container ID List is empty!!!! Continuing to use currently cached list.") - end - @@lastSystemPodsGetTime = timeNow - $log.info("in_container_sudo_tail : System Container ID List: #{@@systemContainerIDList}") - end - end - - Dir.glob(@containerIDFilePath).select { |p| - cName = p.split('/').last; - if !@@systemContainerIDList.key?("docker://" + cName) - p = @containerLogFilePath + cName + "/" + cName + "-json.log" - paths += readable_path(p) + " " - else - $log.info("in_container_sudo_tail : Excluding system container with ID #{cName} from tailng for log collection") - end - } - if !system("sudo test -r #{@pos_file}") - system("sudo touch #{@pos_file}") - end - @command = "sudo " << RUBY_DIR << TAILSCRIPT << paths << " -p #{@pos_file}" - end - - def run_periodic - until @finished - begin - sleep @run_interval - #if we are on master & system containers log collection is disabled, collect nothing (i.e NO COntainer log collection for ANY container) - #we will be not collection omsagent log as well in this case, but its insignificant & okay! - if !KubernetesApiClient.isNodeMaster || @@disableKubeSystemLogCollection.casecmp('true') != 0 - set_system_command - Open3.popen3(@command) {|writeio, readio, errio, wait_thread| - writeio.close - while line = readio.gets - receive_data(line) - end - while line = errio.gets - receive_log(line) - end - - wait_thread.value #wait until child process terminates - } - end - rescue - $log.error "containerlog_sudo_tail failed to run or shutdown child proces", error => $!.to_s, :error_class => $!.class.to_s - $log.warn_backtrace $!.backtrace - end - end - end - end - -end diff --git a/source/code/plugin/in_kube_events.rb b/source/code/plugin/in_kube_events.rb index e1fdc5df6..6116cb62d 100644 --- a/source/code/plugin/in_kube_events.rb +++ b/source/code/plugin/in_kube_events.rb @@ -9,15 +9,20 @@ class Kube_Event_Input < Input def initialize super - require "json" + require "yajl/json_gem" + require "yajl" + require "time" require_relative "KubernetesApiClient" require_relative "oms_common" require_relative "omslog" require_relative "ApplicationInsightsUtility" + + # 30000 events account to approximately 5MB + @EVENTS_CHUNK_SIZE = 30000 end - config_param :run_interval, :time, :default => "1m" + config_param :run_interval, :time, :default => 60 config_param :tag, :string, :default => "oms.containerinsights.KubeEvents" def configure(conf) @@ -43,79 +48,114 @@ def shutdown end end - def enumerate(eventList = nil) - currentTime = Time.now - emitTime = currentTime.to_f - batchTime = currentTime.utc.iso8601 + def enumerate + begin + eventList = nil + currentTime = Time.now + batchTime = currentTime.utc.iso8601 + eventQueryState = getEventQueryState + newEventQueryState = [] + + # Initializing continuation token to nil + continuationToken = nil + $log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}") + continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}") + $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}") + if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?) + newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime) + else + $log.warn "in_kube_events::enumerate:Received empty eventList" + end - events = eventList - $log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}") - eventInfo = KubernetesApiClient.getKubeResourceInfo("events") - $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}") + #If we receive a continuation token, make calls, process and flush data until we have processed all data + while (!continuationToken.nil? && !continuationToken.empty?) + continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}&continue=#{continuationToken}") + if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?) + newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime) + else + $log.warn "in_kube_events::enumerate:Received empty eventList" + end + end - if !eventInfo.nil? - events = JSON.parse(eventInfo.body) + # Setting this to nil so that we dont hold memory until GC kicks in + eventList = nil + writeEventQueryState(newEventQueryState) + rescue => errorStr + $log.warn "in_kube_events::enumerate:Failed in enumerate: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end + end # end enumerate - eventQueryState = getEventQueryState - newEventQueryState = [] + def parse_and_emit_records(events, eventQueryState, newEventQueryState, batchTime = Time.utc.iso8601) + currentTime = Time.now + emitTime = currentTime.to_f begin - if (!events.nil? && !events.empty? && !events["items"].nil?) - eventStream = MultiEventStream.new - events["items"].each do |items| - record = {} - # - Not sure if ingestion has the below mapping for this custom type. Fix it as part of fixed type conversion - record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - eventId = items["metadata"]["uid"] + "/" + items["count"].to_s - newEventQueryState.push(eventId) - if !eventQueryState.empty? && eventQueryState.include?(eventId) - next - end - record["ObjectKind"] = items["involvedObject"]["kind"] - record["Namespace"] = items["involvedObject"]["namespace"] - record["Name"] = items["involvedObject"]["name"] - record["Reason"] = items["reason"] - record["Message"] = items["message"] - record["Type"] = items["type"] - record["TimeGenerated"] = items["metadata"]["creationTimestamp"] - record["SourceComponent"] = items["source"]["component"] - record["FirstSeen"] = items["firstTimestamp"] - record["LastSeen"] = items["lastTimestamp"] - record["Count"] = items["count"] - if items["source"].key?("host") - record["Computer"] = items["source"]["host"] - else - record["Computer"] = (OMS::Common.get_hostname) - end - record['ClusterName'] = KubernetesApiClient.getClusterName - record["ClusterId"] = KubernetesApiClient.getClusterId - wrapper = { - "DataType" => "KUBE_EVENTS_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [record.each { |k, v| record[k] = v }], - } - eventStream.add(emitTime, wrapper) if wrapper + eventStream = MultiEventStream.new + events["items"].each do |items| + record = {} + # - Not sure if ingestion has the below mapping for this custom type. Fix it as part of fixed type conversion + record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + eventId = items["metadata"]["uid"] + "/" + items["count"].to_s + newEventQueryState.push(eventId) + if !eventQueryState.empty? && eventQueryState.include?(eventId) + next end - router.emit_stream(@tag, eventStream) if eventStream - end - writeEventQueryState(newEventQueryState) + record["ObjectKind"] = items["involvedObject"]["kind"] + record["Namespace"] = items["involvedObject"]["namespace"] + record["Name"] = items["involvedObject"]["name"] + record["Reason"] = items["reason"] + record["Message"] = items["message"] + record["Type"] = items["type"] + record["TimeGenerated"] = items["metadata"]["creationTimestamp"] + record["SourceComponent"] = items["source"]["component"] + record["FirstSeen"] = items["firstTimestamp"] + record["LastSeen"] = items["lastTimestamp"] + record["Count"] = items["count"] + if items["source"].key?("host") + record["Computer"] = items["source"]["host"] + else + record["Computer"] = (OMS::Common.get_hostname) + end + record["ClusterName"] = KubernetesApiClient.getClusterName + record["ClusterId"] = KubernetesApiClient.getClusterId + wrapper = { + "DataType" => "KUBE_EVENTS_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [record.each { |k, v| record[k] = v }], + } + eventStream.add(emitTime, wrapper) if wrapper + end + router.emit_stream(@tag, eventStream) if eventStream rescue => errorStr $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end + end + return newEventQueryState end def run_periodic @mutex.lock done = @finished + @nextTimeToRun = Time.now + @waitTimeout = @run_interval until done - @condition.wait(@mutex, @run_interval) + @nextTimeToRun = @nextTimeToRun + @run_interval + @now = Time.now + if @nextTimeToRun <= @now + @waitTimeout = 1 + @nextTimeToRun = @now + else + @waitTimeout = @nextTimeToRun - @now + end + @condition.wait(@mutex, @waitTimeout) done = @finished @mutex.unlock if !done begin - $log.info("in_kube_events::run_periodic @ #{Time.now.utc.iso8601}") + $log.info("in_kube_events::run_periodic.enumerate.start @ #{Time.now.utc.iso8601}") enumerate + $log.info("in_kube_events::run_periodic.enumerate.end @ #{Time.now.utc.iso8601}") rescue => errorStr $log.warn "in_kube_events::run_periodic: enumerate Failed to retrieve kube events: #{errorStr}" ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) diff --git a/source/code/plugin/in_kube_health.rb b/source/code/plugin/in_kube_health.rb index 51ffa86d5..0eebf395b 100644 --- a/source/code/plugin/in_kube_health.rb +++ b/source/code/plugin/in_kube_health.rb @@ -7,12 +7,12 @@ require_relative "ApplicationInsightsUtility" module Fluent + Dir[File.join(__dir__, "./health", "*.rb")].each { |file| require file } - Dir[File.join(__dir__, './health', '*.rb')].each { |file| require file } class KubeHealthInput < Input Plugin.register_input("kubehealth", self) - config_param :health_monitor_config_path, :default => '/etc/opt/microsoft/docker-cimprov/health/healthmonitorconfig.json' + config_param :health_monitor_config_path, :default => "/etc/opt/microsoft/docker-cimprov/health/healthmonitorconfig.json" @@clusterCpuCapacity = 0.0 @@clusterMemoryCapacity = 0.0 @@ -21,18 +21,22 @@ def initialize begin super require "yaml" - require "json" + require 'yajl/json_gem' + require "yajl" + require "time" @@cluster_id = KubernetesApiClient.getClusterId @resources = HealthKubernetesResources.instance @provider = HealthMonitorProvider.new(@@cluster_id, HealthMonitorUtils.get_cluster_labels, @resources, @health_monitor_config_path) + @@ApiGroupApps = "apps" + @@KubeInfraNamespace = "kube-system" rescue => e ApplicationInsightsUtility.sendExceptionTelemetry(e, {"FeatureArea" => "Health"}) end end include HealthModel - config_param :run_interval, :time, :default => "1m" + config_param :run_interval, :time, :default => 60 config_param :tag, :string, :default => "kubehealth.ReplicaSet" def configure(conf) @@ -40,25 +44,25 @@ def configure(conf) end def start - begin - if @run_interval - @finished = false - @condition = ConditionVariable.new - @mutex = Mutex.new - @thread = Thread.new(&method(:run_periodic)) - - @@hmlog = HealthMonitorUtils.get_log_handle - @@clusterName = KubernetesApiClient.getClusterName - @@clusterRegion = KubernetesApiClient.getClusterRegion - cluster_capacity = HealthMonitorUtils.get_cluster_cpu_memory_capacity(@@hmlog) - @@clusterCpuCapacity = cluster_capacity[0] - @@clusterMemoryCapacity = cluster_capacity[1] - @@hmlog.info "Cluster CPU Capacity: #{@@clusterCpuCapacity} Memory Capacity: #{@@clusterMemoryCapacity}" - initialize_inventory - end - rescue => e - ApplicationInsightsUtility.sendExceptionTelemetry(e, {"FeatureArea" => "Health"}) + begin + if @run_interval + @finished = false + @condition = ConditionVariable.new + @mutex = Mutex.new + @thread = Thread.new(&method(:run_periodic)) + + @@hmlog = HealthMonitorUtils.get_log_handle + @@clusterName = KubernetesApiClient.getClusterName + @@clusterRegion = KubernetesApiClient.getClusterRegion + cluster_capacity = HealthMonitorUtils.get_cluster_cpu_memory_capacity(@@hmlog) + @@clusterCpuCapacity = cluster_capacity[0] + @@clusterMemoryCapacity = cluster_capacity[1] + @@hmlog.info "Cluster CPU Capacity: #{@@clusterCpuCapacity} Memory Capacity: #{@@clusterMemoryCapacity}" + initialize_inventory end + rescue => e + ApplicationInsightsUtility.sendExceptionTelemetry(e, {"FeatureArea" => "Health"}) + end end def shutdown @@ -73,7 +77,6 @@ def shutdown def enumerate begin - currentTime = Time.now emitTime = currentTime.to_f batchTime = currentTime.utc.iso8601 @@ -83,10 +86,11 @@ def enumerate #HealthMonitorUtils.refresh_kubernetes_api_data(@@hmlog, nil) # we do this so that if the call fails, we get a response code/header etc. node_inventory_response = KubernetesApiClient.getKubeResourceInfo("nodes") - node_inventory = JSON.parse(node_inventory_response.body) - pod_inventory_response = KubernetesApiClient.getKubeResourceInfo("pods") - pod_inventory = JSON.parse(pod_inventory_response.body) - replicaset_inventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("replicasets", api_version: "extensions/v1beta1").body) + node_inventory = Yajl::Parser.parse(StringIO.new(node_inventory_response.body)) + pod_inventory_response = KubernetesApiClient.getKubeResourceInfo("pods?fieldSelector=metadata.namespace%3D#{@@KubeInfraNamespace}") + pod_inventory = Yajl::Parser.parse(StringIO.new(pod_inventory_response.body)) + replicaset_inventory_response = KubernetesApiClient.getKubeResourceInfo("replicasets?fieldSelector=metadata.namespace%3D#{@@KubeInfraNamespace}", api_group: @@ApiGroupApps) + replicaset_inventory = Yajl::Parser.parse(StringIO.new(replicaset_inventory_response.body)) @resources.node_inventory = node_inventory @resources.pod_inventory = pod_inventory @@ -108,8 +112,8 @@ def enumerate health_monitor_records.push(record) if record pods_ready_hash = HealthMonitorUtils.get_pods_ready_hash(@resources) - system_pods = pods_ready_hash.select{|k,v| v['namespace'] == 'kube-system'} - workload_pods = pods_ready_hash.select{|k,v| v['namespace'] != 'kube-system'} + system_pods = pods_ready_hash.keep_if { |k, v| v["namespace"] == @@KubeInfraNamespace } + workload_pods = Hash.new # pods_ready_hash.select{ |k, v| v["namespace"] != @@KubeInfraNamespace } system_pods_ready_percentage_records = process_pods_ready_percentage(system_pods, MonitorId::SYSTEM_WORKLOAD_PODS_READY_MONITOR_ID) system_pods_ready_percentage_records.each do |record| @@ -147,13 +151,13 @@ def enumerate def process_cpu_oversubscribed_monitor(pod_inventory, node_inventory) timestamp = Time.now.utc.iso8601 @@clusterCpuCapacity = HealthMonitorUtils.get_cluster_cpu_memory_capacity(@@hmlog, node_inventory: node_inventory)[0] - subscription = HealthMonitorUtils.get_resource_subscription(pod_inventory,"cpu", @@clusterCpuCapacity) + subscription = HealthMonitorUtils.get_resource_subscription(pod_inventory, "cpu", @@clusterCpuCapacity) @@hmlog.info "Refreshed Cluster CPU Capacity #{@@clusterCpuCapacity}" - state = subscription > @@clusterCpuCapacity ? "fail" : "pass" + state = subscription > @@clusterCpuCapacity ? "fail" : "pass" #CPU monitor_id = MonitorId::WORKLOAD_CPU_OVERSUBSCRIBED_MONITOR_ID - health_monitor_record = {"timestamp" => timestamp, "state" => state, "details" => {"clusterCpuCapacity" => @@clusterCpuCapacity/1000000.to_f, "clusterCpuRequests" => subscription/1000000.to_f}} + health_monitor_record = {"timestamp" => timestamp, "state" => state, "details" => {"clusterCpuCapacity" => @@clusterCpuCapacity / 1000000.to_f, "clusterCpuRequests" => subscription / 1000000.to_f}} # @@hmlog.info health_monitor_record monitor_instance_id = HealthMonitorUtils.get_monitor_instance_id(monitor_id, [@@cluster_id]) @@ -163,8 +167,8 @@ def process_cpu_oversubscribed_monitor(pod_inventory, node_inventory) health_record[HealthMonitorRecordFields::MONITOR_ID] = monitor_id health_record[HealthMonitorRecordFields::MONITOR_INSTANCE_ID] = monitor_instance_id health_record[HealthMonitorRecordFields::DETAILS] = health_monitor_record - health_record[HealthMonitorRecordFields::TIME_GENERATED] = time_now - health_record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED] = time_now + health_record[HealthMonitorRecordFields::TIME_GENERATED] = time_now + health_record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED] = time_now health_record[HealthMonitorRecordFields::CLUSTER_ID] = @@cluster_id #@@hmlog.info "Successfully processed process_cpu_oversubscribed_monitor" return health_record @@ -172,10 +176,10 @@ def process_cpu_oversubscribed_monitor(pod_inventory, node_inventory) def process_memory_oversubscribed_monitor(pod_inventory, node_inventory) timestamp = Time.now.utc.iso8601 - @@clusterMemoryCapacity = HealthMonitorUtils.get_cluster_cpu_memory_capacity(@@hmlog,node_inventory: node_inventory)[1] + @@clusterMemoryCapacity = HealthMonitorUtils.get_cluster_cpu_memory_capacity(@@hmlog, node_inventory: node_inventory)[1] @@hmlog.info "Refreshed Cluster Memory Capacity #{@@clusterMemoryCapacity}" - subscription = HealthMonitorUtils.get_resource_subscription(pod_inventory,"memory", @@clusterMemoryCapacity) - state = subscription > @@clusterMemoryCapacity ? "fail" : "pass" + subscription = HealthMonitorUtils.get_resource_subscription(pod_inventory, "memory", @@clusterMemoryCapacity) + state = subscription > @@clusterMemoryCapacity ? "fail" : "pass" #@@hmlog.debug "Memory Oversubscribed Monitor State : #{state}" #CPU @@ -189,8 +193,8 @@ def process_memory_oversubscribed_monitor(pod_inventory, node_inventory) health_record[HealthMonitorRecordFields::MONITOR_ID] = monitor_id health_record[HealthMonitorRecordFields::MONITOR_INSTANCE_ID] = monitor_instance_id health_record[HealthMonitorRecordFields::DETAILS] = health_monitor_record - health_record[HealthMonitorRecordFields::TIME_GENERATED] = time_now - health_record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED] = time_now + health_record[HealthMonitorRecordFields::TIME_GENERATED] = time_now + health_record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED] = time_now health_record[HealthMonitorRecordFields::CLUSTER_ID] = @@cluster_id #@@hmlog.info "Successfully processed process_memory_oversubscribed_monitor" return health_record @@ -201,7 +205,7 @@ def process_kube_api_up_monitor(state, response) monitor_id = MonitorId::KUBE_API_STATUS details = response.each_header.to_h - details['ResponseCode'] = response.code + details["ResponseCode"] = response.code health_monitor_record = {"timestamp" => timestamp, "state" => state, "details" => details} hmlog = HealthMonitorUtils.get_log_handle #hmlog.info health_monitor_record @@ -213,8 +217,8 @@ def process_kube_api_up_monitor(state, response) health_record[HealthMonitorRecordFields::MONITOR_ID] = monitor_id health_record[HealthMonitorRecordFields::MONITOR_INSTANCE_ID] = monitor_instance_id health_record[HealthMonitorRecordFields::DETAILS] = health_monitor_record - health_record[HealthMonitorRecordFields::TIME_GENERATED] = time_now - health_record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED] = time_now + health_record[HealthMonitorRecordFields::TIME_GENERATED] = time_now + health_record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED] = time_now health_record[HealthMonitorRecordFields::CLUSTER_ID] = @@cluster_id #@@hmlog.info "Successfully processed process_kube_api_up_monitor" return health_record @@ -225,28 +229,28 @@ def process_pods_ready_percentage(pods_hash, config_monitor_id) hmlog = HealthMonitorUtils.get_log_handle records = [] - pods_hash.keys.each do |key| - workload_name = key - total_pods = pods_hash[workload_name]['totalPods'] - pods_ready = pods_hash[workload_name]['podsReady'] - namespace = pods_hash[workload_name]['namespace'] - workload_kind = pods_hash[workload_name]['kind'] - percent = pods_ready / total_pods * 100 - timestamp = Time.now.utc.iso8601 - - state = HealthMonitorUtils.compute_percentage_state(percent, monitor_config) - health_monitor_record = {"timestamp" => timestamp, "state" => state, "details" => {"totalPods" => total_pods, "podsReady" => pods_ready, "workload_name" => workload_name, "namespace" => namespace, "workload_kind" => workload_kind}} - monitor_instance_id = HealthMonitorUtils.get_monitor_instance_id(config_monitor_id, [@@cluster_id, namespace, workload_name]) - health_record = {} - time_now = Time.now.utc.iso8601 - health_record[HealthMonitorRecordFields::MONITOR_ID] = config_monitor_id - health_record[HealthMonitorRecordFields::MONITOR_INSTANCE_ID] = monitor_instance_id - health_record[HealthMonitorRecordFields::DETAILS] = health_monitor_record - health_record[HealthMonitorRecordFields::TIME_GENERATED] = time_now - health_record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED] = time_now - health_record[HealthMonitorRecordFields::CLUSTER_ID] = @@cluster_id - records.push(health_record) - end + pods_hash.keys.each do |key| + workload_name = key + total_pods = pods_hash[workload_name]["totalPods"] + pods_ready = pods_hash[workload_name]["podsReady"] + namespace = pods_hash[workload_name]["namespace"] + workload_kind = pods_hash[workload_name]["kind"] + percent = pods_ready / total_pods * 100 + timestamp = Time.now.utc.iso8601 + + state = HealthMonitorUtils.compute_percentage_state(percent, monitor_config) + health_monitor_record = {"timestamp" => timestamp, "state" => state, "details" => {"totalPods" => total_pods, "podsReady" => pods_ready, "workload_name" => workload_name, "namespace" => namespace, "workload_kind" => workload_kind}} + monitor_instance_id = HealthMonitorUtils.get_monitor_instance_id(config_monitor_id, [@@cluster_id, namespace, workload_name]) + health_record = {} + time_now = Time.now.utc.iso8601 + health_record[HealthMonitorRecordFields::MONITOR_ID] = config_monitor_id + health_record[HealthMonitorRecordFields::MONITOR_INSTANCE_ID] = monitor_instance_id + health_record[HealthMonitorRecordFields::DETAILS] = health_monitor_record + health_record[HealthMonitorRecordFields::TIME_GENERATED] = time_now + health_record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED] = time_now + health_record[HealthMonitorRecordFields::CLUSTER_ID] = @@cluster_id + records.push(health_record) + end #@@hmlog.info "Successfully processed pods_ready_percentage for #{config_monitor_id} #{records.size}" return records end @@ -296,10 +300,11 @@ def process_node_condition_monitor(node_inventory) def initialize_inventory #this is required because there are other components, like the container cpu memory aggregator, that depends on the mapping being initialized node_inventory_response = KubernetesApiClient.getKubeResourceInfo("nodes") - node_inventory = JSON.parse(node_inventory_response.body) - pod_inventory_response = KubernetesApiClient.getKubeResourceInfo("pods") - pod_inventory = JSON.parse(pod_inventory_response.body) - replicaset_inventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("replicasets", api_version: "extensions/v1beta1").body) + node_inventory = Yajl::Parser.parse(StringIO.new(node_inventory_response.body)) + pod_inventory_response = KubernetesApiClient.getKubeResourceInfo("pods?fieldSelector=metadata.namespace%3D#{@@KubeInfraNamespace}") + pod_inventory = Yajl::Parser.parse(StringIO.new(pod_inventory_response.body)) + replicaset_inventory_response = KubernetesApiClient.getKubeResourceInfo("replicasets?fieldSelector=metadata.namespace%3D#{@@KubeInfraNamespace}", api_group: @@ApiGroupApps) + replicaset_inventory = Yajl::Parser.parse(StringIO.new(replicaset_inventory_response.body)) @resources.node_inventory = node_inventory @resources.pod_inventory = pod_inventory @@ -310,14 +315,25 @@ def initialize_inventory def run_periodic @mutex.lock done = @finished + @nextTimeToRun = Time.now + @waitTimeout = @run_interval until done - @condition.wait(@mutex, @run_interval) + @nextTimeToRun = @nextTimeToRun + @run_interval + @now = Time.now + if @nextTimeToRun <= @now + @waitTimeout = 1 + @nextTimeToRun = @now + else + @waitTimeout = @nextTimeToRun - @now + end + @condition.wait(@mutex, @waitTimeout) done = @finished @mutex.unlock if !done begin - @@hmlog.info("in_kube_health::run_periodic @ #{Time.now.utc.iso8601}") + @@hmlog.info("in_kube_health::run_periodic.enumerate.start @ #{Time.now.utc.iso8601}") enumerate + @@hmlog.info("in_kube_health::run_periodic.enumerate.end @ #{Time.now.utc.iso8601}") rescue => errorStr @@hmlog.warn "in_kube_health::run_periodic: enumerate Failed for kubeapi sourced data health: #{errorStr}" ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) diff --git a/source/code/plugin/in_kube_logs.rb b/source/code/plugin/in_kube_logs.rb deleted file mode 100644 index 119473819..000000000 --- a/source/code/plugin/in_kube_logs.rb +++ /dev/null @@ -1,181 +0,0 @@ -#!/usr/local/bin/ruby -# frozen_string_literal: true - -module Fluent - - class Kube_Logs_Input < Input - Plugin.register_input('kubelogs', self) - - @@KubeLogsStateFile = "/var/opt/microsoft/docker-cimprov/state/KubeLogQueryState.yaml" - - def initialize - super - require 'yaml' - require 'date' - require 'time' - require 'json' - - require_relative 'KubernetesApiClient' - require_relative 'oms_common' - require_relative 'omslog' - end - - config_param :run_interval, :time, :default => '1m' - config_param :tag, :string, :default => "oms.api.KubeLogs" - - def configure (conf) - super - end - - def start - if @run_interval - @finished = false - @condition = ConditionVariable.new - @mutex = Mutex.new - @thread = Thread.new(&method(:run_periodic)) - end - end - - def shutdown - if @run_interval - @mutex.synchronize { - @finished = true - @condition.signal - } - @thread.join - end - end - - def enumerate(podList = nil) - - namespace = ENV['OMS_KUBERNETES_LOGS_NAMESPACE'] - if namespace.nil? || namespace.empty? - return - end - - time = Time.now.to_f - if podList.nil? - pods = KubernetesApiClient.getPods(namespace) - else - pods = podList - end - logQueryState = getLogQueryState - newLogQueryState = {} - - pods.each do |pod| - record = {} - begin - pod['status']['containerStatuses'].each do |container| - - # if container['state']['running'] - # puts container['name'] + ' is running' - # end - - timeStamp = DateTime.now - - containerId = pod['metadata']['namespace'] + "_" + pod['metadata']['name'] + "_" + container['name'] - if !logQueryState.empty? && logQueryState[containerId] - timeStamp = DateTime.parse(logQueryState[containerId]) - end - - # Try to get logs for the container - begin - $log.debug "Getting logs for #{container['name']}" - logs = KubernetesApiClient.getContainerLogsSinceTime(pod['metadata']['namespace'], pod['metadata']['name'], container['name'], timeStamp.rfc3339(9), true) - $log.debug "got something back" - - # By default we don't change the timestamp (if no logs were returned or if there was a (hopefully transient) error in retrieval - newLogQueryState[containerId] = timeStamp.rfc3339(9) - - if !logs || logs.empty? - $log.info "no logs returned" - else - $log.debug "response size is #{logs.length}" - lines = logs.split("\n") - index = -1 - - # skip duplicates - for i in 0...lines.count - dateTime = DateTime.parse(lines[i].split(" ").first) - if (dateTime.to_time - timeStamp.to_time) > 0.0 - index = i - break - end - end - - if index >= 0 - $log.debug "starting from line #{index}" - for i in index...lines.count - record['Namespace'] = pod['metadata']['namespace'] - record['Pod'] = pod['metadata']['name'] - record['Container'] = container['name'] - record['Message'] = lines[i][(lines[i].index(' ') + 1)..(lines[i].length - 1)] - record['TimeGenerated'] = lines[i].split(" ").first - record['Node'] = pod['spec']['nodeName'] - record['Computer'] = OMS::Common.get_hostname - record['ClusterName'] = KubernetesApiClient.getClusterName - router.emit(@tag, time, record) if record - end - newLogQueryState[containerId] = lines.last.split(" ").first - else - newLogQueryState[containerId] = DateTime.now.rfc3339(9) - end - end - rescue => logException - $log.warn "Failed to retrieve logs for container: #{logException}" - $log.debug_backtrace(logException.backtrace) - end - end - # Update log query state only if logging was succesfful. - # TODO: May have a few duplicate lines in case of - writeLogQueryState(newLogQueryState) - rescue => errorStr - $log.warn "Exception raised in enumerate: #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - end - end - end - - def run_periodic - @mutex.lock - done = @finished - until done - @condition.wait(@mutex, @run_interval) - done = @finished - @mutex.unlock - if !done - $log.debug "calling enumerate for KubeLogs" - enumerate - $log.debug "done with enumerate for KubeLogs" - end - @mutex.lock - end - @mutex.unlock - end - - def getLogQueryState - logQueryState = {} - begin - if File.file?(@@KubeLogsStateFile) - logQueryState = YAML.load_file(@@KubeLogsStateFile, {}) - end - rescue => errorStr - $log.warn "Failed to load query state #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - end - return logQueryState - end - - def writeLogQueryState(logQueryState) - begin - File.write(@@KubeLogsStateFile, logQueryState.to_yaml) - rescue => errorStr - $log.warn "Failed to write query state #{errorStr.to_s}" - $log.debug_backtrace(errorStr.backtrace) - end - end - - end # Kube_Log_Input - -end # module - diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb index 0a0fd9d2e..fa0994f43 100644 --- a/source/code/plugin/in_kube_nodes.rb +++ b/source/code/plugin/in_kube_nodes.rb @@ -9,6 +9,7 @@ class Kube_nodeInventory_Input < Input @@MDMKubeNodeInventoryTag = "mdm.kubenodeinventory" @@promConfigMountPath = "/etc/config/settings/prometheus-data-collection-settings" @@AzStackCloudFileName = "/etc/kubernetes/host/azurestackcloud.json" + @@kubeperfTag = "oms.api.KubePerf" @@rsPromInterval = ENV["TELEMETRY_RS_PROM_INTERVAL"] @@rsPromFieldPassCount = ENV["TELEMETRY_RS_PROM_FIELDPASS_LENGTH"] @@ -21,15 +22,18 @@ class Kube_nodeInventory_Input < Input def initialize super require "yaml" - require "json" + require "yajl/json_gem" + require "yajl" + require "time" require_relative "KubernetesApiClient" require_relative "ApplicationInsightsUtility" require_relative "oms_common" require_relative "omslog" + @NODES_CHUNK_SIZE = "400" end - config_param :run_interval, :time, :default => "1m" + config_param :run_interval, :time, :default => 60 config_param :tag, :string, :default => "oms.containerinsights.KubeNodeInventory" def configure(conf) @@ -57,158 +61,217 @@ def shutdown end def enumerate - currentTime = Time.now - emitTime = currentTime.to_f - batchTime = currentTime.utc.iso8601 - telemetrySent = false + begin + nodeInventory = nil + currentTime = Time.now + batchTime = currentTime.utc.iso8601 - nodeInventory = nil + # Initializing continuation token to nil + continuationToken = nil + $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") + continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken("nodes?limit=#{@NODES_CHUNK_SIZE}") + $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?) + parse_and_emit_records(nodeInventory, batchTime) + else + $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory" + end - $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") - nodeInfo = KubernetesApiClient.getKubeResourceInfo("nodes") - $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") + #If we receive a continuation token, make calls, process and flush data until we have processed all data + while (!continuationToken.nil? && !continuationToken.empty?) + continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken("nodes?limit=#{@NODES_CHUNK_SIZE}&continue=#{continuationToken}") + if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?) + parse_and_emit_records(nodeInventory, batchTime) + else + $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory" + end + end - if !nodeInfo.nil? - nodeInventory = JSON.parse(nodeInfo.body) + # Setting this to nil so that we dont hold memory until GC kicks in + nodeInventory = nil + rescue => errorStr + $log.warn "in_kube_nodes::enumerate:Failed in enumerate: #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end + end # end enumerate + def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601) begin - if (!nodeInventory.nil? && !nodeInventory.empty?) - eventStream = MultiEventStream.new - containerNodeInventoryEventStream = MultiEventStream.new - if !nodeInventory["items"].nil? - #get node inventory - nodeInventory["items"].each do |items| - record = {} - # Sending records for ContainerNodeInventory - containerNodeInventoryRecord = {} - containerNodeInventoryRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - containerNodeInventoryRecord["Computer"] = items["metadata"]["name"] - - record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - record["Computer"] = items["metadata"]["name"] - record["ClusterName"] = KubernetesApiClient.getClusterName - record["ClusterId"] = KubernetesApiClient.getClusterId - record["CreationTimeStamp"] = items["metadata"]["creationTimestamp"] - record["Labels"] = [items["metadata"]["labels"]] - record["Status"] = "" - - if !items["spec"]["providerID"].nil? && !items["spec"]["providerID"].empty? - if File.file?(@@AzStackCloudFileName) # existence of this file indicates agent running on azstack - record["KubernetesProviderID"] = "azurestack" - else - record["KubernetesProviderID"] = items["spec"]["providerID"] - end - else - record["KubernetesProviderID"] = "onprem" - end + currentTime = Time.now + emitTime = currentTime.to_f + telemetrySent = false + eventStream = MultiEventStream.new + containerNodeInventoryEventStream = MultiEventStream.new + #get node inventory + nodeInventory["items"].each do |items| + record = {} + # Sending records for ContainerNodeInventory + containerNodeInventoryRecord = {} + containerNodeInventoryRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + containerNodeInventoryRecord["Computer"] = items["metadata"]["name"] - # Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions. - # We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we - # populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk" - # implying that the node is ready for hosting pods, however its out of disk. - - if items["status"].key?("conditions") && !items["status"]["conditions"].empty? - allNodeConditions = "" - items["status"]["conditions"].each do |condition| - if condition["status"] == "True" - if !allNodeConditions.empty? - allNodeConditions = allNodeConditions + "," + condition["type"] - else - allNodeConditions = condition["type"] - end - end - #collect last transition to/from ready (no matter ready is true/false) - if condition["type"] == "Ready" && !condition["lastTransitionTime"].nil? - record["LastTransitionTimeReady"] = condition["lastTransitionTime"] - end - end + record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + record["Computer"] = items["metadata"]["name"] + record["ClusterName"] = KubernetesApiClient.getClusterName + record["ClusterId"] = KubernetesApiClient.getClusterId + record["CreationTimeStamp"] = items["metadata"]["creationTimestamp"] + record["Labels"] = [items["metadata"]["labels"]] + record["Status"] = "" + + if !items["spec"]["providerID"].nil? && !items["spec"]["providerID"].empty? + if File.file?(@@AzStackCloudFileName) # existence of this file indicates agent running on azstack + record["KubernetesProviderID"] = "azurestack" + else + record["KubernetesProviderID"] = items["spec"]["providerID"] + end + else + record["KubernetesProviderID"] = "onprem" + end + + # Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions. + # We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we + # populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk" + # implying that the node is ready for hosting pods, however its out of disk. + + if items["status"].key?("conditions") && !items["status"]["conditions"].empty? + allNodeConditions = "" + items["status"]["conditions"].each do |condition| + if condition["status"] == "True" if !allNodeConditions.empty? - record["Status"] = allNodeConditions + allNodeConditions = allNodeConditions + "," + condition["type"] + else + allNodeConditions = condition["type"] end end - - nodeInfo = items["status"]["nodeInfo"] - record["KubeletVersion"] = nodeInfo["kubeletVersion"] - record["KubeProxyVersion"] = nodeInfo["kubeProxyVersion"] - containerNodeInventoryRecord["OperatingSystem"] = nodeInfo["osImage"] - dockerVersion = nodeInfo["containerRuntimeVersion"] - dockerVersion.slice! "docker://" - containerNodeInventoryRecord["DockerVersion"] = dockerVersion - # ContainerNodeInventory data for docker version and operating system. - containerNodeInventoryWrapper = { - "DataType" => "CONTAINER_NODE_INVENTORY_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [containerNodeInventoryRecord.each { |k, v| containerNodeInventoryRecord[k] = v }], - } - containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryWrapper) if containerNodeInventoryWrapper - - wrapper = { - "DataType" => "KUBE_NODE_INVENTORY_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [record.each { |k, v| record[k] = v }], - } - eventStream.add(emitTime, wrapper) if wrapper - # Adding telemetry to send node telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference / 60 - if (timeDifferenceInMinutes >= 10) - properties = {} - properties["Computer"] = record["Computer"] - properties["KubeletVersion"] = record["KubeletVersion"] - properties["OperatingSystem"] = nodeInfo["operatingSystem"] - properties["DockerVersion"] = dockerVersion - properties["KubernetesProviderID"] = record["KubernetesProviderID"] - properties["KernelVersion"] = nodeInfo["kernelVersion"] - properties["OSImage"] = nodeInfo["osImage"] - - capacityInfo = items["status"]["capacity"] - ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties) - - #telemetry about prometheus metric collections settings for replicaset - if (File.file?(@@promConfigMountPath)) - properties["rsPromInt"] = @@rsPromInterval - properties["rsPromFPC"] = @@rsPromFieldPassCount - properties["rsPromFDC"] = @@rsPromFieldDropCount - properties["rsPromServ"] = @@rsPromK8sServiceCount - properties["rsPromUrl"] = @@rsPromUrlCount - properties["rsPromMonPods"] = @@rsPromMonitorPods - properties["rsPromMonPodsNs"] = @@rsPromMonitorPodsNamespaceLength - end - ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties) - telemetrySent = true + #collect last transition to/from ready (no matter ready is true/false) + if condition["type"] == "Ready" && !condition["lastTransitionTime"].nil? + record["LastTransitionTimeReady"] = condition["lastTransitionTime"] end end + if !allNodeConditions.empty? + record["Status"] = allNodeConditions + end end - router.emit_stream(@tag, eventStream) if eventStream - router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream - router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream - if telemetrySent == true - @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i + + nodeInfo = items["status"]["nodeInfo"] + record["KubeletVersion"] = nodeInfo["kubeletVersion"] + record["KubeProxyVersion"] = nodeInfo["kubeProxyVersion"] + containerNodeInventoryRecord["OperatingSystem"] = nodeInfo["osImage"] + dockerVersion = nodeInfo["containerRuntimeVersion"] + dockerVersion.slice! "docker://" + containerNodeInventoryRecord["DockerVersion"] = dockerVersion + # ContainerNodeInventory data for docker version and operating system. + containerNodeInventoryWrapper = { + "DataType" => "CONTAINER_NODE_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [containerNodeInventoryRecord.each { |k, v| containerNodeInventoryRecord[k] = v }], + } + containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryWrapper) if containerNodeInventoryWrapper + + wrapper = { + "DataType" => "KUBE_NODE_INVENTORY_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [record.each { |k, v| record[k] = v }], + } + eventStream.add(emitTime, wrapper) if wrapper + # Adding telemetry to send node telemetry every 10 minutes + timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 10) + properties = {} + properties["Computer"] = record["Computer"] + properties["KubeletVersion"] = record["KubeletVersion"] + properties["OperatingSystem"] = nodeInfo["operatingSystem"] + properties["DockerVersion"] = dockerVersion + properties["KubernetesProviderID"] = record["KubernetesProviderID"] + properties["KernelVersion"] = nodeInfo["kernelVersion"] + properties["OSImage"] = nodeInfo["osImage"] + + capacityInfo = items["status"]["capacity"] + ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties) + + #telemetry about prometheus metric collections settings for replicaset + if (File.file?(@@promConfigMountPath)) + properties["rsPromInt"] = @@rsPromInterval + properties["rsPromFPC"] = @@rsPromFieldPassCount + properties["rsPromFDC"] = @@rsPromFieldDropCount + properties["rsPromServ"] = @@rsPromK8sServiceCount + properties["rsPromUrl"] = @@rsPromUrlCount + properties["rsPromMonPods"] = @@rsPromMonitorPods + properties["rsPromMonPodsNs"] = @@rsPromMonitorPodsNamespaceLength + end + ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties) + telemetrySent = true end - @@istestvar = ENV["ISTEST"] - if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) - $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") + end + router.emit_stream(@tag, eventStream) if eventStream + router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream + router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream + if telemetrySent == true + @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i + end + @@istestvar = ENV["ISTEST"] + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) + $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") + end + #:optimize:kubeperf merge + begin + #if(!nodeInventory.empty?) + nodeMetricDataItems = [] + #allocatable metrics @ node level + nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "cpu", "cpuAllocatableNanoCores", batchTime)) + nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "memory", "memoryAllocatableBytes", batchTime)) + #capacity metrics @ node level + nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores", batchTime)) + nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes", batchTime)) + + kubePerfEventStream = MultiEventStream.new + + nodeMetricDataItems.each do |record| + record["DataType"] = "LINUX_PERF_BLOB" + record["IPName"] = "LogManagement" + kubePerfEventStream.add(emitTime, record) if record end + #end + router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream + rescue => errorStr + $log.warn "Failed in enumerate for KubePerf from in_kube_nodes : #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end + #:optimize:end kubeperf merge + rescue => errorStr $log.warn "Failed to retrieve node inventory: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end + $log.warn "in_kube_nodes::parse_and_emit_records:End #{Time.now.utc.iso8601}" end def run_periodic @mutex.lock done = @finished + @nextTimeToRun = Time.now + @waitTimeout = @run_interval until done - @condition.wait(@mutex, @run_interval) + @nextTimeToRun = @nextTimeToRun + @run_interval + @now = Time.now + if @nextTimeToRun <= @now + @waitTimeout = 1 + @nextTimeToRun = @now + else + @waitTimeout = @nextTimeToRun - @now + end + @condition.wait(@mutex, @waitTimeout) done = @finished @mutex.unlock if !done begin - $log.info("in_kube_nodes::run_periodic @ #{Time.now.utc.iso8601}") + $log.info("in_kube_nodes::run_periodic.enumerate.start #{Time.now.utc.iso8601}") enumerate + $log.info("in_kube_nodes::run_periodic.enumerate.end #{Time.now.utc.iso8601}") rescue => errorStr $log.warn "in_kube_nodes::run_periodic: enumerate Failed to retrieve node inventory: #{errorStr}" ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) diff --git a/source/code/plugin/in_kube_perf.rb b/source/code/plugin/in_kube_perf.rb deleted file mode 100644 index 8b571139d..000000000 --- a/source/code/plugin/in_kube_perf.rb +++ /dev/null @@ -1,120 +0,0 @@ -#!/usr/local/bin/ruby -# frozen_string_literal: true - -module Fluent - - class Kube_Perf_Input < Input - Plugin.register_input('kubeperf', self) - - def initialize - super - require 'yaml' - require 'json' - - require_relative 'KubernetesApiClient' - require_relative 'oms_common' - require_relative 'omslog' - end - - config_param :run_interval, :time, :default => '1m' - config_param :tag, :string, :default => "oms.api.KubePerf" - - def configure (conf) - super - end - - def start - if @run_interval - @finished = false - @condition = ConditionVariable.new - @mutex = Mutex.new - @thread = Thread.new(&method(:run_periodic)) - end - end - - def shutdown - if @run_interval - @mutex.synchronize { - @finished = true - @condition.signal - } - @thread.join - end - end - - def enumerate() - time = Time.now.to_f - begin - eventStream = MultiEventStream.new - - $log.info("in_kube_perf::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") - #get resource requests & resource limits per container as perf data - podInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('pods').body) - $log.info("in_kube_perf::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") - if(!podInventory.empty?) - containerMetricDataItems = [] - hostName = (OMS::Common.get_hostname) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "cpu","cpuRequestNanoCores")) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "memory","memoryRequestBytes")) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "cpu","cpuLimitNanoCores")) - containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "memory","memoryLimitBytes")) - - containerMetricDataItems.each do |record| - record['DataType'] = "LINUX_PERF_BLOB" - record['IPName'] = "LogManagement" - eventStream.add(time, record) if record - #router.emit(@tag, time, record) if record - end - end - - #get allocatable limits per node as perf data - # Node capacity is different from node allocatable. Allocatable is what is avaialble for allocating pods. - # In theory Capacity = Allocatable + kube-reserved + system-reserved + eviction-threshold - # For more details refer to https://kubernetes.io/docs/tasks/administer-cluster/reserve-compute-resources/#node-allocatable - $log.info("in_kube_perf::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}") - nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('nodes').body) - $log.info("in_kube_perf::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}") - if(!nodeInventory.empty?) - nodeMetricDataItems = [] - #allocatable metrics @ node level - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "cpu", "cpuAllocatableNanoCores")) - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "memory", "memoryAllocatableBytes")) - #capacity metrics @ node level - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores")) - nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes")) - - nodeMetricDataItems.each do |record| - record['DataType'] = "LINUX_PERF_BLOB" - record['IPName'] = "LogManagement" - eventStream.add(time, record) if record - #router.emit(@tag, time, record) if record - end - end - router.emit_stream(@tag, eventStream) if eventStream - rescue => errorStr - $log.warn "Failed to retrieve metric data: #{errorStr}" - $log.debug_backtrace(errorStr.backtrace) - end - end - - def run_periodic - @mutex.lock - done = @finished - until done - @condition.wait(@mutex, @run_interval) - done = @finished - @mutex.unlock - if !done - begin - $log.info("in_kube_perf::run_periodic @ #{Time.now.utc.iso8601}") - enumerate - rescue => errorStr - $log.warn "in_kube_perf::run_periodic: enumerate Failed to retrieve kube perf metrics: #{errorStr}" - end - end - @mutex.lock - end - @mutex.unlock - end - end # Kube_Perf_Input -end # module diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index 766831a66..28b20bfc0 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -7,20 +7,30 @@ class Kube_PodInventory_Input < Input @@MDMKubePodInventoryTag = "mdm.kubepodinventory" @@hostName = (OMS::Common.get_hostname) + @@kubeperfTag = "oms.api.KubePerf" + @@kubeservicesTag = "oms.containerinsights.KubeServices" def initialize super require "yaml" - require "json" + require "yajl/json_gem" + require "yajl" require "set" + require "time" require_relative "KubernetesApiClient" require_relative "ApplicationInsightsUtility" require_relative "oms_common" require_relative "omslog" + + @PODS_CHUNK_SIZE = "1500" + @podCount = 0 + @controllerSet = Set.new [] + @winContainerCount = 0 + @controllerData = {} end - config_param :run_interval, :time, :default => "1m" + config_param :run_interval, :time, :default => 60 config_param :tag, :string, :default => "oms.containerinsights.KubePodInventory" def configure(conf) @@ -48,27 +58,77 @@ def shutdown end def enumerate(podList = nil) - podInventory = podList - $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") - podInfo = KubernetesApiClient.getKubeResourceInfo("pods") - $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") + begin + podInventory = podList + telemetryFlush = false + @podCount = 0 + @controllerSet = Set.new [] + @winContainerCount = 0 + @controllerData = {} + currentTime = Time.now + batchTime = currentTime.utc.iso8601 - if !podInfo.nil? - podInventory = JSON.parse(podInfo.body) - end + # Get services first so that we dont need to make a call for very chunk + $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") + serviceInfo = KubernetesApiClient.getKubeResourceInfo("services") + # serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body) + $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") - begin - if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?) - #get pod inventory & services - $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") - serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body) - $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") - parse_and_emit_records(podInventory, serviceList) + if !serviceInfo.nil? + $log.info("in_kube_podinventory::enumerate:Start:Parsing services data using yajl @ #{Time.now.utc.iso8601}") + serviceList = Yajl::Parser.parse(StringIO.new(serviceInfo.body)) + $log.info("in_kube_podinventory::enumerate:End:Parsing services data using yajl @ #{Time.now.utc.iso8601}") + serviceInfo = nil + end + + # Initializing continuation token to nil + continuationToken = nil + $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}") + continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}") + $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}") + if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?) + parse_and_emit_records(podInventory, serviceList, batchTime) else - $log.warn "Received empty podInventory" + $log.warn "in_kube_podinventory::enumerate:Received empty podInventory" + end + + #If we receive a continuation token, make calls, process and flush data until we have processed all data + while (!continuationToken.nil? && !continuationToken.empty?) + continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}") + if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?) + parse_and_emit_records(podInventory, serviceList, batchTime) + else + $log.warn "in_kube_podinventory::enumerate:Received empty podInventory" + end + end + + # Setting these to nil so that we dont hold memory until GC kicks in + podInventory = nil + serviceList = nil + + # Adding telemetry to send pod telemetry every 5 minutes + timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference / 60 + if (timeDifferenceInMinutes >= 5) + telemetryFlush = true + end + + # Flush AppInsights telemetry once all the processing is done + if telemetryFlush == true + telemetryProperties = {} + telemetryProperties["Computer"] = @@hostName + ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) + ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {}) + telemetryProperties["ControllerData"] = @controllerData.to_json + ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties) + if @winContainerCount > 0 + telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount + ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) + end + @@podTelemetryTimeTracker = DateTime.now.to_time.to_i end rescue => errorStr - $log.warn "Failed in enumerate pod inventory: #{errorStr}" + $log.warn "in_kube_podinventory::enumerate:Failed in enumerate: #{errorStr}" $log.debug_backtrace(errorStr.backtrace) ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end @@ -186,15 +246,12 @@ def getContainerEnvironmentVariables(pod, clusterCollectEnvironmentVar) end end - def parse_and_emit_records(podInventory, serviceList) + def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8601) currentTime = Time.now emitTime = currentTime.to_f - batchTime = currentTime.utc.iso8601 + #batchTime = currentTime.utc.iso8601 eventStream = MultiEventStream.new - controllerSet = Set.new [] - controllerData = {} - telemetryFlush = false - winContainerCount = 0 + begin #begin block start # Getting windows nodes from kubeapi winNodes = KubernetesApiClient.getWindowsNodesArray @@ -277,24 +334,17 @@ def parse_and_emit_records(podInventory, serviceList) record["ClusterId"] = KubernetesApiClient.getClusterId record["ClusterName"] = KubernetesApiClient.getClusterName record["ServiceName"] = getServiceNameFromLabels(items["metadata"]["namespace"], items["metadata"]["labels"], serviceList) - # Adding telemetry to send pod telemetry every 5 minutes - timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs - timeDifferenceInMinutes = timeDifference / 60 - if (timeDifferenceInMinutes >= 5) - telemetryFlush = true - end + if !items["metadata"]["ownerReferences"].nil? record["ControllerKind"] = items["metadata"]["ownerReferences"][0]["kind"] record["ControllerName"] = items["metadata"]["ownerReferences"][0]["name"] - if telemetryFlush == true - controllerSet.add(record["ControllerKind"] + record["ControllerName"]) - #Adding controller kind to telemetry ro information about customer workload - if (controllerData[record["ControllerKind"]].nil?) - controllerData[record["ControllerKind"]] = 1 - else - controllerValue = controllerData[record["ControllerKind"]] - controllerData[record["ControllerKind"]] += 1 - end + @controllerSet.add(record["ControllerKind"] + record["ControllerName"]) + #Adding controller kind to telemetry ro information about customer workload + if (@controllerData[record["ControllerKind"]].nil?) + @controllerData[record["ControllerKind"]] = 1 + else + controllerValue = @controllerData[record["ControllerKind"]] + @controllerData[record["ControllerKind"]] += 1 end end podRestartCount = 0 @@ -412,7 +462,7 @@ def parse_and_emit_records(podInventory, serviceList) end end # Send container inventory records for containers on windows nodes - winContainerCount += containerInventoryRecords.length + @winContainerCount += containerInventoryRecords.length containerInventoryRecords.each do |cirecord| if !cirecord.nil? ciwrapper = { @@ -427,19 +477,66 @@ def parse_and_emit_records(podInventory, serviceList) router.emit_stream(@tag, eventStream) if eventStream router.emit_stream(@@MDMKubePodInventoryTag, eventStream) if eventStream - if telemetryFlush == true - telemetryProperties = {} - telemetryProperties["Computer"] = @@hostName - ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties) - ApplicationInsightsUtility.sendMetricTelemetry("PodCount", podInventory["items"].length, {}) - telemetryProperties["ControllerData"] = controllerData.to_json - ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", controllerSet.length, telemetryProperties) - if winContainerCount > 0 - telemetryProperties["ClusterWideWindowsContainersCount"] = winContainerCount - ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties) + #:optimize:kubeperf merge + begin + #if(!podInventory.empty?) + containerMetricDataItems = [] + #hostName = (OMS::Common.get_hostname) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "cpu", "cpuRequestNanoCores", batchTime)) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "memory", "memoryRequestBytes", batchTime)) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "cpu", "cpuLimitNanoCores", batchTime)) + containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "memory", "memoryLimitBytes", batchTime)) + + kubePerfEventStream = MultiEventStream.new + + containerMetricDataItems.each do |record| + record["DataType"] = "LINUX_PERF_BLOB" + record["IPName"] = "LogManagement" + kubePerfEventStream.add(emitTime, record) if record end - @@podTelemetryTimeTracker = DateTime.now.to_time.to_i + #end + router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream + rescue => errorStr + $log.warn "Failed in parse_and_emit_record for KubePerf from in_kube_podinventory : #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) end + #:optimize:end kubeperf merge + + #:optimize:start kubeservices merge + begin + if (!serviceList.nil? && !serviceList.empty?) + kubeServicesEventStream = MultiEventStream.new + serviceList["items"].each do |items| + kubeServiceRecord = {} + kubeServiceRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated + kubeServiceRecord["ServiceName"] = items["metadata"]["name"] + kubeServiceRecord["Namespace"] = items["metadata"]["namespace"] + kubeServiceRecord["SelectorLabels"] = [items["spec"]["selector"]] + kubeServiceRecord["ClusterId"] = KubernetesApiClient.getClusterId + kubeServiceRecord["ClusterName"] = KubernetesApiClient.getClusterName + kubeServiceRecord["ClusterIP"] = items["spec"]["clusterIP"] + kubeServiceRecord["ServiceType"] = items["spec"]["type"] + # : Add ports and status fields + kubeServicewrapper = { + "DataType" => "KUBE_SERVICES_BLOB", + "IPName" => "ContainerInsights", + "DataItems" => [kubeServiceRecord.each { |k, v| kubeServiceRecord[k] = v }], + } + kubeServicesEventStream.add(emitTime, kubeServicewrapper) if kubeServicewrapper + end + router.emit_stream(@@kubeservicesTag, kubeServicesEventStream) if kubeServicesEventStream + end + rescue => errorStr + $log.warn "Failed in parse_and_emit_record for KubeServices from in_kube_podinventory : #{errorStr}" + $log.debug_backtrace(errorStr.backtrace) + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + #:optimize:end kubeservices merge + + #Updating value for AppInsights telemetry + @podCount += podInventory["items"].length + @@istestvar = ENV["ISTEST"] if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0) $log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") @@ -454,14 +551,25 @@ def parse_and_emit_records(podInventory, serviceList) def run_periodic @mutex.lock done = @finished + @nextTimeToRun = Time.now + @waitTimeout = @run_interval until done - @condition.wait(@mutex, @run_interval) + @nextTimeToRun = @nextTimeToRun + @run_interval + @now = Time.now + if @nextTimeToRun <= @now + @waitTimeout = 1 + @nextTimeToRun = @now + else + @waitTimeout = @nextTimeToRun - @now + end + @condition.wait(@mutex, @waitTimeout) done = @finished @mutex.unlock if !done begin - $log.info("in_kube_podinventory::run_periodic @ #{Time.now.utc.iso8601}") + $log.info("in_kube_podinventory::run_periodic.enumerate.start #{Time.now.utc.iso8601}") enumerate + $log.info("in_kube_podinventory::run_periodic.enumerate.end #{Time.now.utc.iso8601}") rescue => errorStr $log.warn "in_kube_podinventory::run_periodic: enumerate Failed to retrieve pod inventory: #{errorStr}" ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) diff --git a/source/code/plugin/in_kube_services.rb b/source/code/plugin/in_kube_services.rb deleted file mode 100644 index 7cd703620..000000000 --- a/source/code/plugin/in_kube_services.rb +++ /dev/null @@ -1,110 +0,0 @@ -#!/usr/local/bin/ruby -# frozen_string_literal: true - -module Fluent - class Kube_Services_Input < Input - Plugin.register_input("kubeservices", self) - - def initialize - super - require "yaml" - require "json" - - require_relative "KubernetesApiClient" - require_relative "oms_common" - require_relative "omslog" - require_relative "ApplicationInsightsUtility" - end - - config_param :run_interval, :time, :default => "1m" - config_param :tag, :string, :default => "oms.containerinsights.KubeServices" - - def configure(conf) - super - end - - def start - if @run_interval - @finished = false - @condition = ConditionVariable.new - @mutex = Mutex.new - @thread = Thread.new(&method(:run_periodic)) - end - end - - def shutdown - if @run_interval - @mutex.synchronize { - @finished = true - @condition.signal - } - @thread.join - end - end - - def enumerate - currentTime = Time.now - emitTime = currentTime.to_f - batchTime = currentTime.utc.iso8601 - - serviceList = nil - - $log.info("in_kube_services::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}") - serviceInfo = KubernetesApiClient.getKubeResourceInfo("services") - $log.info("in_kube_services::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}") - - if !serviceInfo.nil? - serviceList = JSON.parse(serviceInfo.body) - end - - begin - if (!serviceList.nil? && !serviceList.empty?) - eventStream = MultiEventStream.new - serviceList["items"].each do |items| - record = {} - record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated - record["ServiceName"] = items["metadata"]["name"] - record["Namespace"] = items["metadata"]["namespace"] - record["SelectorLabels"] = [items["spec"]["selector"]] - record["ClusterId"] = KubernetesApiClient.getClusterId - record["ClusterName"] = KubernetesApiClient.getClusterName - record["ClusterIP"] = items["spec"]["clusterIP"] - record["ServiceType"] = items["spec"]["type"] - # : Add ports and status fields - wrapper = { - "DataType" => "KUBE_SERVICES_BLOB", - "IPName" => "ContainerInsights", - "DataItems" => [record.each { |k, v| record[k] = v }], - } - eventStream.add(emitTime, wrapper) if wrapper - end - router.emit_stream(@tag, eventStream) if eventStream - end - rescue => errorStr - $log.debug_backtrace(errorStr.backtrace) - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end - end - - def run_periodic - @mutex.lock - done = @finished - until done - @condition.wait(@mutex, @run_interval) - done = @finished - @mutex.unlock - if !done - begin - $log.info("in_kube_services::run_periodic @ #{Time.now.utc.iso8601}") - enumerate - rescue => errorStr - $log.warn "in_kube_services::run_periodic: enumerate Failed to kube services: #{errorStr}" - ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) - end - end - @mutex.lock - end - @mutex.unlock - end - end # Kube_Services_Input -end # module diff --git a/source/code/plugin/in_win_cadvisor_perf.rb b/source/code/plugin/in_win_cadvisor_perf.rb index 2e5f839e6..695a686cf 100644 --- a/source/code/plugin/in_win_cadvisor_perf.rb +++ b/source/code/plugin/in_win_cadvisor_perf.rb @@ -10,7 +10,8 @@ class Win_CAdvisor_Perf_Input < Input def initialize super require "yaml" - require "json" + require 'yajl/json_gem' + require "time" require_relative "CAdvisorMetricsAPIClient" require_relative "KubernetesApiClient" @@ -18,7 +19,7 @@ def initialize require_relative "omslog" end - config_param :run_interval, :time, :default => "1m" + config_param :run_interval, :time, :default => 60 config_param :tag, :string, :default => "oms.api.wincadvisorperf" config_param :mdmtag, :string, :default => "mdm.cadvisorperf" @@ -60,13 +61,13 @@ def enumerate() $log.info "in_win_cadvisor_perf: Getting windows nodes" nodes = KubernetesApiClient.getWindowsNodes() if !nodes.nil? - @@winNodes = KubernetesApiClient.getWindowsNodes() + @@winNodes = nodes end $log.info "in_win_cadvisor_perf : Successuly got windows nodes after 5 minute interval" @@winNodeQueryTimeTracker = DateTime.now.to_time.to_i end @@winNodes.each do |winNode| - metricData = CAdvisorMetricsAPIClient.getMetrics(winNode) + metricData = CAdvisorMetricsAPIClient.getMetrics(winNode: winNode, metricTime: Time.now.utc.iso8601) metricData.each do |record| if !record.empty? record["DataType"] = "LINUX_PERF_BLOB" @@ -100,14 +101,25 @@ def enumerate() def run_periodic @mutex.lock done = @finished + @nextTimeToRun = Time.now + @waitTimeout = @run_interval until done - @condition.wait(@mutex, @run_interval) + @nextTimeToRun = @nextTimeToRun + @run_interval + @now = Time.now + if @nextTimeToRun <= @now + @waitTimeout = 1 + @nextTimeToRun = @now + else + @waitTimeout = @nextTimeToRun - @now + end + @condition.wait(@mutex, @waitTimeout) done = @finished @mutex.unlock if !done begin - $log.info("in_win_cadvisor_perf::run_periodic @ #{Time.now.utc.iso8601}") + $log.info("in_win_cadvisor_perf::run_periodic.enumerate.start @ #{Time.now.utc.iso8601}") enumerate + $log.info("in_win_cadvisor_perf::run_periodic.enumerate.end @ #{Time.now.utc.iso8601}") rescue => errorStr $log.warn "in_win_cadvisor_perf::run_periodic: enumerate Failed to retrieve cadvisor perf metrics for windows nodes: #{errorStr}" end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/json_serializable.rb b/source/code/plugin/lib/application_insights/channel/contracts/json_serializable.rb index 8f4677044..60838e215 100644 --- a/source/code/plugin/lib/application_insights/channel/contracts/json_serializable.rb +++ b/source/code/plugin/lib/application_insights/channel/contracts/json_serializable.rb @@ -1,4 +1,4 @@ -require 'json' +require 'yajl/json_gem' module ApplicationInsights module Channel diff --git a/source/code/plugin/lib/application_insights/channel/sender_base.rb b/source/code/plugin/lib/application_insights/channel/sender_base.rb index 2431bf748..004b4722f 100644 --- a/source/code/plugin/lib/application_insights/channel/sender_base.rb +++ b/source/code/plugin/lib/application_insights/channel/sender_base.rb @@ -1,4 +1,4 @@ -require 'json' +require 'yajl/json_gem' require 'net/http' require 'openssl' require 'stringio' diff --git a/source/code/plugin/out_mdm.rb b/source/code/plugin/out_mdm.rb index b8d10090d..0a4e601b2 100644 --- a/source/code/plugin/out_mdm.rb +++ b/source/code/plugin/out_mdm.rb @@ -12,7 +12,7 @@ def initialize require "net/http" require "net/https" require "uri" - require "json" + require 'yajl/json_gem' require_relative "KubernetesApiClient" require_relative "ApplicationInsightsUtility"