diff --git a/installer/conf/container.conf b/installer/conf/container.conf
old mode 100755
new mode 100644
index 696ffdb6b..93c250fbb
--- a/installer/conf/container.conf
+++ b/installer/conf/container.conf
@@ -11,7 +11,7 @@
type containerinventory
tag oms.containerinsights.containerinventory
- run_interval 60s
+ run_interval 60
log_level debug
@@ -19,7 +19,7 @@
type cadvisorperf
tag oms.api.cadvisorperf
- run_interval 60s
+ run_interval 60
log_level debug
@@ -45,30 +45,28 @@
type out_oms
log_level debug
num_threads 5
- buffer_chunk_limit 20m
buffer_type file
buffer_path %STATE_DIR_WS%/out_oms_containerinventory*.buffer
- buffer_queue_limit 20
buffer_queue_full_action drop_oldest_chunk
+ buffer_chunk_limit 4m
flush_interval 20s
retry_limit 10
- retry_wait 30s
- max_retry_wait 9m
+ retry_wait 5s
+ max_retry_wait 5m
type out_oms
log_level debug
num_threads 5
- buffer_chunk_limit 20m
buffer_type file
buffer_path %STATE_DIR_WS%/out_oms_cadvisorperf*.buffer
- buffer_queue_limit 20
buffer_queue_full_action drop_oldest_chunk
+ buffer_chunk_limit 4m
flush_interval 20s
retry_limit 10
- retry_wait 30s
- max_retry_wait 9m
+ retry_wait 5s
+ max_retry_wait 5m
@@ -80,6 +78,14 @@
heartbeat_type tcp
skip_network_error_at_init true
expire_dns_cache 600s
+ buffer_queue_full_action drop_oldest_chunk
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/out_health_forward*.buffer
+ buffer_chunk_limit 3m
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 5s
+ max_retry_wait 5m
host "#{ENV['HEALTHMODEL_REPLICASET_SERVICE_SERVICE_HOST']}"
@@ -96,14 +102,13 @@
type out_mdm
log_level debug
num_threads 5
- buffer_chunk_limit 20m
buffer_type file
buffer_path %STATE_DIR_WS%/out_mdm_cdvisorperf*.buffer
- buffer_queue_limit 20
buffer_queue_full_action drop_oldest_chunk
+ buffer_chunk_limit 4m
flush_interval 20s
retry_limit 10
- retry_wait 30s
- max_retry_wait 9m
+ retry_wait 5s
+ max_retry_wait 5m
retry_mdm_post_wait_minutes 60
diff --git a/installer/conf/kube.conf b/installer/conf/kube.conf
index 49d0bf62e..207780442 100644
--- a/installer/conf/kube.conf
+++ b/installer/conf/kube.conf
@@ -1,250 +1,218 @@
# Fluentd config file for OMS Docker - cluster components (kubeAPI)
-
- type forward
- port "#{ENV['HEALTHMODEL_REPLICASET_SERVICE_SERVICE_PORT']}"
- bind 0.0.0.0
-
-
-#Kubernetes pod inventory
-
- type kubepodinventory
- tag oms.containerinsights.KubePodInventory
- run_interval 60s
- log_level debug
-
-
-#Kubernetes events
-
- type kubeevents
- tag oms.containerinsights.KubeEvents
- run_interval 60s
- log_level debug
-
-
-#Kubernetes logs
-
- type kubelogs
- tag oms.api.KubeLogs
- run_interval 60s
-
-
-#Kubernetes services
-
- type kubeservices
- tag oms.containerinsights.KubeServices
- run_interval 60s
- log_level debug
-
-
-#Kubernetes Nodes
-
- type kubenodeinventory
- tag oms.containerinsights.KubeNodeInventory
- run_interval 60s
- log_level debug
-
-
-#Kubernetes perf
-
- type kubeperf
- tag oms.api.KubePerf
- run_interval 60s
- log_level debug
-
-
-#Kubernetes health
-
- type kubehealth
- tag kubehealth.ReplicaSet
- run_interval 60s
- log_level debug
-
-
-#cadvisor perf- Windows nodes
-
- type wincadvisorperf
- tag oms.api.wincadvisorperf
- run_interval 60s
- log_level debug
-
-
-
- type filter_inventory2mdm
- custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast
- log_level info
-
-
-#custom_metrics_mdm filter plugin for perf data from windows nodes
-
- type filter_cadvisor2mdm
- custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast
- metrics_to_collect cpuUsageNanoCores,memoryWorkingSetBytes
- log_level info
-
-
-
- type filter_health_model_builder
-
-
- type out_mdm
- log_level debug
- num_threads 5
- buffer_chunk_limit 20m
- buffer_type file
- buffer_path %STATE_DIR_WS%/out_mdm_cdvisorperf*.buffer
- buffer_queue_limit 20
- buffer_queue_full_action drop_oldest_chunk
- flush_interval 20s
- retry_limit 10
- retry_wait 30s
- max_retry_wait 9m
- retry_mdm_post_wait_minutes 60
-
-
-
- type out_oms
- log_level debug
- num_threads 5
- buffer_chunk_limit 20m
- buffer_type file
- buffer_path %STATE_DIR_WS%/out_oms_kubepods*.buffer
- buffer_queue_limit 20
- buffer_queue_full_action drop_oldest_chunk
- flush_interval 20s
- retry_limit 10
- retry_wait 30s
- max_retry_wait 9m
-
-
-
- type out_oms
- log_level debug
- num_threads 5
- buffer_chunk_limit 5m
- buffer_type file
- buffer_path %STATE_DIR_WS%/out_oms_kubeevents*.buffer
- buffer_queue_limit 10
- buffer_queue_full_action drop_oldest_chunk
- flush_interval 20s
- retry_limit 10
- retry_wait 30s
- max_retry_wait 9m
-
-
-
- type out_oms_api
- log_level debug
- buffer_chunk_limit 10m
- buffer_type file
- buffer_path %STATE_DIR_WS%/out_oms_api_kubernetes_logs*.buffer
- buffer_queue_limit 10
- flush_interval 20s
- retry_limit 10
- retry_wait 30s
-
-
-
-
-
- type out_oms
- log_level debug
- num_threads 5
- buffer_chunk_limit 20m
- buffer_type file
- buffer_path %STATE_DIR_WS%/out_oms_kubeservices*.buffer
- buffer_queue_limit 20
- buffer_queue_full_action drop_oldest_chunk
- flush_interval 20s
- retry_limit 10
- retry_wait 30s
- max_retry_wait 9m
-
-
-
- type out_oms
- log_level debug
- num_threads 5
- buffer_chunk_limit 20m
- buffer_type file
- buffer_path %STATE_DIR_WS%/state/out_oms_kubenodes*.buffer
- buffer_queue_limit 20
- buffer_queue_full_action drop_oldest_chunk
- flush_interval 20s
- retry_limit 10
- retry_wait 30s
- max_retry_wait 9m
-
-
-
- type out_oms
- log_level debug
- buffer_chunk_limit 20m
- buffer_type file
- buffer_path %STATE_DIR_WS%/out_oms_containernodeinventory*.buffer
- buffer_queue_limit 20
- flush_interval 20s
- retry_limit 10
- retry_wait 15s
- max_retry_wait 9m
-
-
-
- type out_oms
- log_level debug
- num_threads 5
- buffer_chunk_limit 20m
- buffer_type file
- buffer_path %STATE_DIR_WS%/out_oms_kubeperf*.buffer
- buffer_queue_limit 20
- buffer_queue_full_action drop_oldest_chunk
- flush_interval 20s
- retry_limit 10
- retry_wait 30s
- max_retry_wait 9m
-
-
-
- type out_mdm
- log_level debug
- num_threads 5
- buffer_chunk_limit 20m
- buffer_type file
- buffer_path %STATE_DIR_WS%/out_mdm_*.buffer
- buffer_queue_limit 20
- buffer_queue_full_action drop_oldest_chunk
- flush_interval 20s
- retry_limit 10
- retry_wait 30s
- max_retry_wait 9m
- retry_mdm_post_wait_minutes 60
-
-
-
- type out_oms
- log_level debug
- num_threads 5
- buffer_chunk_limit 20m
- buffer_type file
- buffer_path %STATE_DIR_WS%/out_oms_api_wincadvisorperf*.buffer
- buffer_queue_limit 20
- buffer_queue_full_action drop_oldest_chunk
- flush_interval 20s
- retry_limit 10
- retry_wait 30s
- max_retry_wait 9m
-
-
-
- type out_oms
- log_level debug
- num_threads 5
- buffer_chunk_limit 20m
- buffer_type file
- buffer_path %STATE_DIR_WS%/out_oms_kubehealth*.buffer
- buffer_queue_limit 20
- buffer_queue_full_action drop_oldest_chunk
- flush_interval 20s
- retry_limit 10
- retry_wait 30s
- max_retry_wait 9m
-
+ #fluent forward plugin
+
+ type forward
+ port "#{ENV['HEALTHMODEL_REPLICASET_SERVICE_SERVICE_PORT']}"
+ bind 0.0.0.0
+ chunk_size_limit 4m
+
+
+ #Kubernetes pod inventory
+
+ type kubepodinventory
+ tag oms.containerinsights.KubePodInventory
+ run_interval 60
+ log_level debug
+
+
+ #Kubernetes events
+
+ type kubeevents
+ tag oms.containerinsights.KubeEvents
+ run_interval 60
+ log_level debug
+
+
+ #Kubernetes Nodes
+
+ type kubenodeinventory
+ tag oms.containerinsights.KubeNodeInventory
+ run_interval 60
+ log_level debug
+
+
+ #Kubernetes health
+
+ type kubehealth
+ tag kubehealth.ReplicaSet
+ run_interval 60
+ log_level debug
+
+
+ #cadvisor perf- Windows nodes
+
+ type wincadvisorperf
+ tag oms.api.wincadvisorperf
+ run_interval 60
+ log_level debug
+
+
+
+ type filter_inventory2mdm
+ custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast
+ log_level info
+
+
+ #custom_metrics_mdm filter plugin for perf data from windows nodes
+
+ type filter_cadvisor2mdm
+ custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast
+ metrics_to_collect cpuUsageNanoCores,memoryWorkingSetBytes
+ log_level info
+
+
+ #health model aggregation filter
+
+ type filter_health_model_builder
+
+
+
+ type out_oms
+ log_level debug
+ num_threads 5
+ buffer_chunk_limit 4m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/out_oms_kubepods*.buffer
+ buffer_queue_limit 20
+ buffer_queue_full_action drop_oldest_chunk
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 5s
+ max_retry_wait 5m
+
+
+
+ type out_oms
+ log_level debug
+ num_threads 5
+ buffer_chunk_limit 4m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/out_oms_kubeevents*.buffer
+ buffer_queue_limit 20
+ buffer_queue_full_action drop_oldest_chunk
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 5s
+ max_retry_wait 5m
+
+
+
+ type out_oms
+ log_level debug
+ num_threads 2
+ buffer_chunk_limit 4m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/out_oms_kubeservices*.buffer
+ buffer_queue_limit 20
+ buffer_queue_full_action drop_oldest_chunk
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 5s
+ max_retry_wait 5m
+
+
+
+ type out_oms
+ log_level debug
+ num_threads 5
+ buffer_chunk_limit 4m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/state/out_oms_kubenodes*.buffer
+ buffer_queue_limit 20
+ buffer_queue_full_action drop_oldest_chunk
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 5s
+ max_retry_wait 5m
+
+
+
+ type out_oms
+ log_level debug
+ num_threads 3
+ buffer_chunk_limit 4m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/out_oms_containernodeinventory*.buffer
+ buffer_queue_limit 20
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 5s
+ max_retry_wait 5m
+
+
+
+ type out_oms
+ log_level debug
+ num_threads 5
+ buffer_chunk_limit 4m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/out_oms_kubeperf*.buffer
+ buffer_queue_limit 20
+ buffer_queue_full_action drop_oldest_chunk
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 5s
+ max_retry_wait 5m
+
+
+
+ type out_mdm
+ log_level debug
+ num_threads 5
+ buffer_chunk_limit 4m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/out_mdm_*.buffer
+ buffer_queue_limit 20
+ buffer_queue_full_action drop_oldest_chunk
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 5s
+ max_retry_wait 5m
+ retry_mdm_post_wait_minutes 60
+
+
+
+ type out_oms
+ log_level debug
+ num_threads 5
+ buffer_chunk_limit 4m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/out_oms_api_wincadvisorperf*.buffer
+ buffer_queue_limit 20
+ buffer_queue_full_action drop_oldest_chunk
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 5s
+ max_retry_wait 5m
+
+
+
+ type out_mdm
+ log_level debug
+ num_threads 5
+ buffer_chunk_limit 4m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/out_mdm_cdvisorperf*.buffer
+ buffer_queue_limit 20
+ buffer_queue_full_action drop_oldest_chunk
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 5s
+ max_retry_wait 5m
+ retry_mdm_post_wait_minutes 60
+
+
+
+ type out_oms
+ log_level debug
+ num_threads 5
+ buffer_chunk_limit 4m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/out_oms_kubehealth*.buffer
+ buffer_queue_limit 20
+ buffer_queue_full_action drop_oldest_chunk
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 5s
+ max_retry_wait 5m
+
\ No newline at end of file
diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data
index 4ebc4f338..60de5af18 100644
--- a/installer/datafiles/base_container.data
+++ b/installer/datafiles/base_container.data
@@ -26,16 +26,13 @@ MAINTAINER: 'Microsoft Corporation'
/opt/microsoft/omsagent/plugin/in_kube_podinventory.rb; source/code/plugin/in_kube_podinventory.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_kube_events.rb; source/code/plugin/in_kube_events.rb; 644; root; root
-/opt/microsoft/omsagent/plugin/in_kube_logs.rb; source/code/plugin/in_kube_logs.rb; 644; root; root
/opt/microsoft/omsagent/plugin/KubernetesApiClient.rb; source/code/plugin/KubernetesApiClient.rb; 644; root; root
/etc/opt/microsoft/docker-cimprov/container.conf; installer/conf/container.conf; 644; root; root
/opt/microsoft/omsagent/plugin/CAdvisorMetricsAPIClient.rb; source/code/plugin/CAdvisorMetricsAPIClient.rb; 644; root; root
-/opt/microsoft/omsagent/plugin/in_kube_perf.rb; source/code/plugin/in_kube_perf.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_cadvisor_perf.rb; source/code/plugin/in_cadvisor_perf.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_win_cadvisor_perf.rb; source/code/plugin/in_win_cadvisor_perf.rb; 644; root; root
-/opt/microsoft/omsagent/plugin/in_kube_services.rb; source/code/plugin/in_kube_services.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_kube_nodes.rb; source/code/plugin/in_kube_nodes.rb; 644; root; root
/opt/microsoft/omsagent/plugin/filter_inventory2mdm.rb; source/code/plugin/filter_inventory2mdm.rb; 644; root; root
/opt/microsoft/omsagent/plugin/CustomMetricsUtils.rb; source/code/plugin/CustomMetricsUtils.rb; 644; root; root
@@ -143,12 +140,10 @@ MAINTAINER: 'Microsoft Corporation'
/opt/microsoft/omsagent/plugin/health/health_model_definition_parser.rb; source/code/plugin/health/health_model_definition_parser.rb; 644; root; root
/opt/microsoft/omsagent/plugin/health/health_monitor_helpers.rb; source/code/plugin/health/health_monitor_helpers.rb; 644; root; root
/opt/microsoft/omsagent/plugin/health/health_monitor_optimizer.rb; source/code/plugin/health/health_monitor_optimizer.rb; 644; root; root
-/opt/microsoft/omsagent/plugin/health/health_monitor_helpers.rb; source/code/plugin/health/health_monitor_helpers.rb; 644; root; root
/opt/microsoft/omsagent/plugin/health/health_monitor_provider.rb; source/code/plugin/health/health_monitor_provider.rb; 644; root; root
/opt/microsoft/omsagent/plugin/health/health_monitor_record.rb; source/code/plugin/health/health_monitor_record.rb; 644; root; root
/opt/microsoft/omsagent/plugin/health/health_monitor_state.rb; source/code/plugin/health/health_monitor_state.rb; 644; root; root
/opt/microsoft/omsagent/plugin/health/health_monitor_telemetry.rb; source/code/plugin/health/health_monitor_telemetry.rb; 644; root; root
-/opt/microsoft/omsagent/plugin/health/health_monitor_helpers.rb; source/code/plugin/health/health_monitor_helpers.rb; 644; root; root
/opt/microsoft/omsagent/plugin/health/health_monitor_utils.rb; source/code/plugin/health/health_monitor_utils.rb; 644; root; root
/opt/microsoft/omsagent/plugin/health/health_signal_reducer.rb; source/code/plugin/health/health_signal_reducer.rb; 644; root; root
/opt/microsoft/omsagent/plugin/health/monitor_factory.rb; source/code/plugin/health/monitor_factory.rb; 644; root; root
diff --git a/installer/scripts/livenessprobe.sh b/installer/scripts/livenessprobe.sh
index cb7e8a0ba..e957b4bdf 100644
--- a/installer/scripts/livenessprobe.sh
+++ b/installer/scripts/livenessprobe.sh
@@ -1,7 +1,7 @@
#!/bin/bash
#test to exit non zero value
-(ps -ef | grep omsagent | grep -v "grep") && (ps -ef | grep td-agent-bit | grep -v "grep")
+(ps -ef | grep omsagent- | grep -v "grep") && (ps -ef | grep td-agent-bit | grep -v "grep")
if [ $? -eq 0 ] && [ ! -s "inotifyoutput.txt" ]
then
# inotifyoutput file is empty and the grep commands for omsagent and td-agent-bit succeeded
diff --git a/installer/scripts/tomlparser.rb b/installer/scripts/tomlparser.rb
index cd16cbf9b..ba67d023a 100644
--- a/installer/scripts/tomlparser.rb
+++ b/installer/scripts/tomlparser.rb
@@ -15,6 +15,7 @@
@logTailPath = "/var/log/containers/*.log"
@logExclusionRegexPattern = "(^((?!stdout|stderr).)*$)"
@excludePath = "*.csv2" #some invalid path
+@enrichContainerLogs = false
# Use parser to parse the configmap toml file to a ruby structure
def parseConfigMap
@@ -117,6 +118,16 @@ def populateSettingValuesFromConfigMap(parsedConfig)
rescue => errorStr
ConfigParseErrorLogger.logError("Exception while reading config map settings for cluster level environment variable collection - #{errorStr}, using defaults, please check config map for errors")
end
+
+ #Get container log enrichment setting
+ begin
+ if !parsedConfig[:log_collection_settings][:enrich_container_logs].nil? && !parsedConfig[:log_collection_settings][:enrich_container_logs][:enabled].nil?
+ @enrichContainerLogs = parsedConfig[:log_collection_settings][:enrich_container_logs][:enabled]
+ puts "config::Using config map setting for cluster level container log enrichment"
+ end
+ rescue => errorStr
+ ConfigParseErrorLogger.logError("Exception while reading config map settings for cluster level container log enrichment - #{errorStr}, using defaults, please check config map for errors")
+ end
end
end
@@ -156,6 +167,7 @@ def populateSettingValuesFromConfigMap(parsedConfig)
file.write("export AZMON_STDERR_EXCLUDED_NAMESPACES=#{@stderrExcludeNamespaces}\n")
file.write("export AZMON_CLUSTER_COLLECT_ENV_VAR=#{@collectClusterEnvVariables}\n")
file.write("export AZMON_CLUSTER_LOG_TAIL_EXCLUDE_PATH=#{@excludePath}\n")
+ file.write("export AZMON_CLUSTER_CONTAINER_LOG_ENRICH=#{@enrichContainerLogs}\n")
# Close file after writing all environment variables
file.close
puts "Both stdout & stderr log collection are turned off for namespaces: '#{@excludePath}' "
diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go
index 5a323d7e0..834726c93 100644
--- a/source/code/go/src/plugins/oms.go
+++ b/source/code/go/src/plugins/oms.go
@@ -92,6 +92,8 @@ var (
ResourceName string
//KubeMonAgentEvents skip first flush
skipKubeMonEventsFlush bool
+ // enrich container logs (when true this will add the fields - timeofcommand, containername & containerimage)
+ enrichContainerLogs bool
)
var (
@@ -746,16 +748,30 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
stringMap["Name"] = val
}
- dataItem := DataItem{
- ID: stringMap["Id"],
- LogEntry: stringMap["LogEntry"],
- LogEntrySource: stringMap["LogEntrySource"],
- LogEntryTimeStamp: stringMap["LogEntryTimeStamp"],
- LogEntryTimeOfCommand: start.Format(time.RFC3339),
- SourceSystem: stringMap["SourceSystem"],
- Computer: Computer,
- Image: stringMap["Image"],
- Name: stringMap["Name"],
+ var dataItem DataItem
+ if enrichContainerLogs == true {
+ dataItem = DataItem{
+ ID: stringMap["Id"],
+ LogEntry: stringMap["LogEntry"],
+ LogEntrySource: stringMap["LogEntrySource"],
+ LogEntryTimeStamp: stringMap["LogEntryTimeStamp"],
+ LogEntryTimeOfCommand: start.Format(time.RFC3339),
+ SourceSystem: stringMap["SourceSystem"],
+ Computer: Computer,
+ Image: stringMap["Image"],
+ Name: stringMap["Name"],
+ }
+ } else { // dont collect timeofcommand field as its part of container log enrivhment
+ dataItem = DataItem{
+ ID: stringMap["Id"],
+ LogEntry: stringMap["LogEntry"],
+ LogEntrySource: stringMap["LogEntrySource"],
+ LogEntryTimeStamp: stringMap["LogEntryTimeStamp"],
+ SourceSystem: stringMap["SourceSystem"],
+ Computer: Computer,
+ Image: stringMap["Image"],
+ Name: stringMap["Name"],
+ }
}
FlushedRecordsSize += float64(len(stringMap["LogEntry"]))
@@ -892,6 +908,15 @@ func InitializePlugin(pluginConfPath string, agentVersion string) {
// Initilizing this to true to skip the first kubemonagentevent flush since the errors are not populated at this time
skipKubeMonEventsFlush = true
+ enrichContainerLogsSetting := os.Getenv("AZMON_CLUSTER_CONTAINER_LOG_ENRICH")
+ if (strings.Compare(enrichContainerLogsSetting, "true") == 0) {
+ enrichContainerLogs = true
+ Log("ContainerLogEnrichment=true \n")
+ } else {
+ enrichContainerLogs = false
+ Log("ContainerLogEnrichment=false \n")
+ }
+
pluginConfig, err := ReadConfiguration(pluginConfPath)
if err != nil {
message := fmt.Sprintf("Error Reading plugin config path : %s \n", err.Error())
@@ -989,7 +1014,12 @@ func InitializePlugin(pluginConfPath string, agentVersion string) {
if strings.Compare(strings.ToLower(os.Getenv("CONTROLLER_TYPE")), "daemonset") == 0 {
populateExcludedStdoutNamespaces()
populateExcludedStderrNamespaces()
- go updateContainerImageNameMaps()
+ if enrichContainerLogs == true {
+ Log("ContainerLogEnrichment=true; starting goroutine to update containerimagenamemaps \n")
+ go updateContainerImageNameMaps()
+ } else {
+ Log("ContainerLogEnrichment=false \n")
+ }
// Flush config error records every hour
go flushKubeMonAgentEventRecords()
diff --git a/source/code/plugin/ApplicationInsightsUtility.rb b/source/code/plugin/ApplicationInsightsUtility.rb
index 85b424e69..f7bd806a0 100644
--- a/source/code/plugin/ApplicationInsightsUtility.rb
+++ b/source/code/plugin/ApplicationInsightsUtility.rb
@@ -6,7 +6,7 @@ class ApplicationInsightsUtility
require_relative "omslog"
require_relative "DockerApiClient"
require_relative "oms_common"
- require "json"
+ require 'yajl/json_gem'
require "base64"
@@HeartBeat = "HeartBeatEvent"
@@ -73,16 +73,37 @@ def initializeUtility()
@@Tc = ApplicationInsights::TelemetryClient.new
elsif !encodedAppInsightsKey.nil?
decodedAppInsightsKey = Base64.decode64(encodedAppInsightsKey)
+
#override ai endpoint if its available otherwise use default.
if appInsightsEndpoint && !appInsightsEndpoint.nil? && !appInsightsEndpoint.empty?
$log.info("AppInsightsUtility: Telemetry client uses overrided endpoint url : #{appInsightsEndpoint}")
- telemetrySynchronousSender = ApplicationInsights::Channel::SynchronousSender.new appInsightsEndpoint
- telemetrySynchronousQueue = ApplicationInsights::Channel::SynchronousQueue.new(telemetrySynchronousSender)
- telemetryChannel = ApplicationInsights::Channel::TelemetryChannel.new nil, telemetrySynchronousQueue
+ #telemetrySynchronousSender = ApplicationInsights::Channel::SynchronousSender.new appInsightsEndpoint
+ #telemetrySynchronousQueue = ApplicationInsights::Channel::SynchronousQueue.new(telemetrySynchronousSender)
+ #telemetryChannel = ApplicationInsights::Channel::TelemetryChannel.new nil, telemetrySynchronousQueue
+ sender = ApplicationInsights::Channel::AsynchronousSender.new appInsightsEndpoint
+ queue = ApplicationInsights::Channel::AsynchronousQueue.new sender
+ channel = ApplicationInsights::Channel::TelemetryChannel.new nil, queue
@@Tc = ApplicationInsights::TelemetryClient.new decodedAppInsightsKey, telemetryChannel
else
- @@Tc = ApplicationInsights::TelemetryClient.new decodedAppInsightsKey
+ sender = ApplicationInsights::Channel::AsynchronousSender.new
+ queue = ApplicationInsights::Channel::AsynchronousQueue.new sender
+ channel = ApplicationInsights::Channel::TelemetryChannel.new nil, queue
+ @@Tc = ApplicationInsights::TelemetryClient.new decodedAppInsightsKey, channel
end
+ # The below are default recommended values. If you change these, ensure you test telemetry flow fully
+
+ # flush telemetry if we have 10 or more telemetry items in our queue
+ #@@Tc.channel.queue.max_queue_length = 10
+
+ # send telemetry to the service in batches of 5
+ #@@Tc.channel.sender.send_buffer_size = 5
+
+ # the background worker thread will be active for 5 seconds before it shuts down. if
+ # during this time items are picked up from the queue, the timer is reset.
+ #@@Tc.channel.sender.send_time = 5
+
+ # the background worker thread will poll the queue every 0.5 seconds for new items
+ #@@Tc.channel.sender.send_interval = 0.5
end
rescue => errorStr
$log.warn("Exception in AppInsightsUtility: initilizeUtility - error: #{errorStr}")
@@ -102,8 +123,7 @@ def sendHeartBeatEvent(pluginName)
eventName = pluginName + @@HeartBeat
if !(@@Tc.nil?)
@@Tc.track_event eventName, :properties => @@CustomProperties
- @@Tc.flush
- $log.info("AppInsights Heartbeat Telemetry sent successfully")
+ $log.info("AppInsights Heartbeat Telemetry put successfully into the queue")
end
rescue => errorStr
$log.warn("Exception in AppInsightsUtility: sendHeartBeatEvent - error: #{errorStr}")
@@ -116,8 +136,7 @@ def sendLastProcessedContainerInventoryCountMetric(pluginName, properties)
@@Tc.track_metric "LastProcessedContainerInventoryCount", properties["ContainerCount"],
:kind => ApplicationInsights::Channel::Contracts::DataPointType::MEASUREMENT,
:properties => @@CustomProperties
- @@Tc.flush
- $log.info("AppInsights Container Count Telemetry sent successfully")
+ $log.info("AppInsights Container Count Telemetry sput successfully into the queue")
end
rescue => errorStr
$log.warn("Exception in AppInsightsUtility: sendCustomMetric - error: #{errorStr}")
@@ -138,7 +157,6 @@ def sendCustomEvent(eventName, properties)
end
if !(@@Tc.nil?)
@@Tc.track_event eventName, :properties => telemetryProps
- @@Tc.flush
$log.info("AppInsights Custom Event #{eventName} sent successfully")
end
rescue => errorStr
@@ -162,8 +180,7 @@ def sendExceptionTelemetry(errorStr, properties = nil)
end
if !(@@Tc.nil?)
@@Tc.track_exception errorStr, :properties => telemetryProps
- @@Tc.flush
- $log.info("AppInsights Exception Telemetry sent successfully")
+ $log.info("AppInsights Exception Telemetry put successfully into the queue")
end
rescue => errorStr
$log.warn("Exception in AppInsightsUtility: sendExceptionTelemetry - error: #{errorStr}")
@@ -209,8 +226,7 @@ def sendMetricTelemetry(metricName, metricValue, properties)
@@Tc.track_metric metricName, metricValue,
:kind => ApplicationInsights::Channel::Contracts::DataPointType::MEASUREMENT,
:properties => telemetryProps
- @@Tc.flush
- $log.info("AppInsights metric Telemetry #{metricName} sent successfully")
+ $log.info("AppInsights metric Telemetry #{metricName} put successfully into the queue")
end
rescue => errorStr
$log.warn("Exception in AppInsightsUtility: sendMetricTelemetry - error: #{errorStr}")
diff --git a/source/code/plugin/CAdvisorMetricsAPIClient.rb b/source/code/plugin/CAdvisorMetricsAPIClient.rb
index 09499b4cf..be61b8b8f 100644
--- a/source/code/plugin/CAdvisorMetricsAPIClient.rb
+++ b/source/code/plugin/CAdvisorMetricsAPIClient.rb
@@ -2,12 +2,13 @@
# frozen_string_literal: true
class CAdvisorMetricsAPIClient
- require "json"
+ require 'yajl/json_gem'
require "logger"
require "net/http"
require "net/https"
require "uri"
require "date"
+ require "time"
require_relative "oms_common"
require_relative "KubernetesApiClient"
@@ -21,6 +22,7 @@ class CAdvisorMetricsAPIClient
@clusterLogTailExcludPath = ENV["AZMON_CLUSTER_LOG_TAIL_EXCLUDE_PATH"]
@clusterLogTailPath = ENV["AZMON_LOG_TAIL_PATH"]
@clusterAgentSchemaVersion = ENV["AZMON_AGENT_CFG_SCHEMA_VERSION"]
+ @clusterContainerLogEnrich = ENV["AZMON_CLUSTER_CONTAINER_LOG_ENRICH"]
@dsPromInterval = ENV["TELEMETRY_DS_PROM_INTERVAL"]
@dsPromFieldPassCount = ENV["TELEMETRY_DS_PROM_FIELDPASS_LENGTH"]
@@ -64,12 +66,11 @@ def getSummaryStatsFromCAdvisor(winNode)
cAdvisorUri = getCAdvisorUri(winNode)
if !cAdvisorUri.nil?
uri = URI.parse(cAdvisorUri)
- http = Net::HTTP.new(uri.host, uri.port)
- http.use_ssl = false
-
- cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri)
- response = http.request(cAdvisorApiRequest)
- @Log.info "Got response code #{response.code} from #{uri.request_uri}"
+ Net::HTTP.start(uri.host, uri.port, :use_ssl => false, :open_timeout => 20, :read_timeout => 40 ) do |http|
+ cAdvisorApiRequest = Net::HTTP::Get.new(uri.request_uri)
+ response = http.request(cAdvisorApiRequest)
+ @Log.info "Got response code #{response.code} from #{uri.request_uri}"
+ end
end
rescue => error
@Log.warn("CAdvisor api request failed: #{error}")
@@ -103,7 +104,7 @@ def getCAdvisorUri(winNode)
end
end
- def getMetrics(winNode = nil)
+ def getMetrics(winNode: nil, metricTime: Time.now.utc.iso8601 )
metricDataItems = []
begin
cAdvisorStats = getSummaryStatsFromCAdvisor(winNode)
@@ -122,27 +123,27 @@ def getMetrics(winNode = nil)
operatingSystem = "Linux"
end
if !metricInfo.nil?
- metricDataItems.concat(getContainerMemoryMetricItems(metricInfo, hostName, "workingSetBytes", "memoryWorkingSetBytes"))
- metricDataItems.concat(getContainerStartTimeMetricItems(metricInfo, hostName, "restartTimeEpoch"))
+ metricDataItems.concat(getContainerMemoryMetricItems(metricInfo, hostName, "workingSetBytes", "memoryWorkingSetBytes", metricTime))
+ metricDataItems.concat(getContainerStartTimeMetricItems(metricInfo, hostName, "restartTimeEpoch", metricTime))
if operatingSystem == "Linux"
- metricDataItems.concat(getContainerCpuMetricItems(metricInfo, hostName, "usageNanoCores", "cpuUsageNanoCores"))
- metricDataItems.concat(getContainerMemoryMetricItems(metricInfo, hostName, "rssBytes", "memoryRssBytes"))
- metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "memory", "rssBytes", "memoryRssBytes"))
+ metricDataItems.concat(getContainerCpuMetricItems(metricInfo, hostName, "usageNanoCores", "cpuUsageNanoCores", metricTime))
+ metricDataItems.concat(getContainerMemoryMetricItems(metricInfo, hostName, "rssBytes", "memoryRssBytes", metricTime))
+ metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "memory", "rssBytes", "memoryRssBytes", metricTime))
elsif operatingSystem == "Windows"
- containerCpuUsageNanoSecondsRate = getContainerCpuMetricItemRate(metricInfo, hostName, "usageCoreNanoSeconds", "cpuUsageNanoCores")
+ containerCpuUsageNanoSecondsRate = getContainerCpuMetricItemRate(metricInfo, hostName, "usageCoreNanoSeconds", "cpuUsageNanoCores", metricTime)
if containerCpuUsageNanoSecondsRate && !containerCpuUsageNanoSecondsRate.empty? && !containerCpuUsageNanoSecondsRate.nil?
metricDataItems.concat(containerCpuUsageNanoSecondsRate)
end
end
- cpuUsageNanoSecondsRate = getNodeMetricItemRate(metricInfo, hostName, "cpu", "usageCoreNanoSeconds", "cpuUsageNanoCores", operatingSystem)
+ cpuUsageNanoSecondsRate = getNodeMetricItemRate(metricInfo, hostName, "cpu", "usageCoreNanoSeconds", "cpuUsageNanoCores", operatingSystem, metricTime)
if cpuUsageNanoSecondsRate && !cpuUsageNanoSecondsRate.empty? && !cpuUsageNanoSecondsRate.nil?
metricDataItems.push(cpuUsageNanoSecondsRate)
end
- metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "memory", "workingSetBytes", "memoryWorkingSetBytes"))
+ metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "memory", "workingSetBytes", "memoryWorkingSetBytes", metricTime))
- metricDataItems.push(getNodeLastRebootTimeMetric(metricInfo, hostName, "restartTimeEpoch"))
+ metricDataItems.push(getNodeLastRebootTimeMetric(metricInfo, hostName, "restartTimeEpoch", metricTime))
# Disabling networkRxRate and networkTxRate since we dont use it as of now.
#metricDataItems.push(getNodeMetricItem(metricInfo, hostName, "network", "rxBytes", "networkRxBytes"))
@@ -165,7 +166,7 @@ def getMetrics(winNode = nil)
return metricDataItems
end
- def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, metricNametoReturn)
+ def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, metricNametoReturn, metricPollTime)
metricItems = []
clusterId = KubernetesApiClient.getClusterId
timeDifference = (DateTime.now.to_time.to_i - @@telemetryCpuMetricTimeTracker).abs
@@ -182,7 +183,7 @@ def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, met
#cpu metric
containerName = container["name"]
metricValue = container["cpu"][cpuMetricNameToCollect]
- metricTime = container["cpu"]["time"]
+ metricTime = metricPollTime #container["cpu"]["time"]
metricItem = {}
metricItem["DataItems"] = []
@@ -219,6 +220,7 @@ def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, met
telemetryProps["clusterlogtailexcludepath"] = @clusterLogTailExcludPath
telemetryProps["clusterLogTailPath"] = @clusterLogTailPath
telemetryProps["clusterAgentSchemaVersion"] = @clusterAgentSchemaVersion
+ telemetryProps["clusterCLEnrich"] = @clusterContainerLogEnrich
end
#telemetry about prometheus metric collections settings for daemonset
if (File.file?(@promConfigMountPath))
@@ -272,7 +274,7 @@ def resetWinContainerIdCache
end
# usageNanoCores doesnt exist for windows nodes. Hence need to compute this from usageCoreNanoSeconds
- def getContainerCpuMetricItemRate(metricJSON, hostName, cpuMetricNameToCollect, metricNametoReturn)
+ def getContainerCpuMetricItemRate(metricJSON, hostName, cpuMetricNameToCollect, metricNametoReturn, metricPollTime)
metricItems = []
clusterId = KubernetesApiClient.getClusterId
timeDifference = (DateTime.now.to_time.to_i - @@telemetryCpuMetricTimeTracker).abs
@@ -292,7 +294,7 @@ def getContainerCpuMetricItemRate(metricJSON, hostName, cpuMetricNameToCollect,
containerCount += 1
containerName = container["name"]
metricValue = container["cpu"][cpuMetricNameToCollect]
- metricTime = container["cpu"]["time"]
+ metricTime = metricPollTime #container["cpu"]["time"]
metricItem = {}
metricItem["DataItems"] = []
@@ -366,7 +368,7 @@ def getContainerCpuMetricItemRate(metricJSON, hostName, cpuMetricNameToCollect,
return metricItems
end
- def getContainerMemoryMetricItems(metricJSON, hostName, memoryMetricNameToCollect, metricNametoReturn)
+ def getContainerMemoryMetricItems(metricJSON, hostName, memoryMetricNameToCollect, metricNametoReturn, metricPollTime)
metricItems = []
clusterId = KubernetesApiClient.getClusterId
timeDifference = (DateTime.now.to_time.to_i - @@telemetryMemoryMetricTimeTracker).abs
@@ -381,7 +383,7 @@ def getContainerMemoryMetricItems(metricJSON, hostName, memoryMetricNameToCollec
pod["containers"].each do |container|
containerName = container["name"]
metricValue = container["memory"][memoryMetricNameToCollect]
- metricTime = container["memory"]["time"]
+ metricTime = metricPollTime #container["memory"]["time"]
metricItem = {}
metricItem["DataItems"] = []
@@ -431,7 +433,7 @@ def getContainerMemoryMetricItems(metricJSON, hostName, memoryMetricNameToCollec
return metricItems
end
- def getNodeMetricItem(metricJSON, hostName, metricCategory, metricNameToCollect, metricNametoReturn)
+ def getNodeMetricItem(metricJSON, hostName, metricCategory, metricNameToCollect, metricNametoReturn, metricPollTime)
metricItem = {}
clusterId = KubernetesApiClient.getClusterId
begin
@@ -441,7 +443,7 @@ def getNodeMetricItem(metricJSON, hostName, metricCategory, metricNameToCollect,
if !node[metricCategory].nil?
metricValue = node[metricCategory][metricNameToCollect]
- metricTime = node[metricCategory]["time"]
+ metricTime = metricPollTime #node[metricCategory]["time"]
metricItem["DataItems"] = []
@@ -467,7 +469,7 @@ def getNodeMetricItem(metricJSON, hostName, metricCategory, metricNameToCollect,
return metricItem
end
- def getNodeMetricItemRate(metricJSON, hostName, metricCategory, metricNameToCollect, metricNametoReturn, operatingSystem)
+ def getNodeMetricItemRate(metricJSON, hostName, metricCategory, metricNameToCollect, metricNametoReturn, operatingSystem, metricPollTime)
metricItem = {}
clusterId = KubernetesApiClient.getClusterId
begin
@@ -477,7 +479,7 @@ def getNodeMetricItemRate(metricJSON, hostName, metricCategory, metricNameToColl
if !node[metricCategory].nil?
metricValue = node[metricCategory][metricNameToCollect]
- metricTime = node[metricCategory]["time"]
+ metricTime = metricPollTime #node[metricCategory]["time"]
# if !(metricNameToCollect == "rxBytes" || metricNameToCollect == "txBytes" || metricNameToCollect == "usageCoreNanoSeconds")
# @Log.warn("getNodeMetricItemRate : rateMetric is supported only for rxBytes, txBytes & usageCoreNanoSeconds and not for #{metricNameToCollect}")
@@ -584,7 +586,7 @@ def getNodeMetricItemRate(metricJSON, hostName, metricCategory, metricNameToColl
return metricItem
end
- def getNodeLastRebootTimeMetric(metricJSON, hostName, metricNametoReturn)
+ def getNodeLastRebootTimeMetric(metricJSON, hostName, metricNametoReturn, metricPollTime)
metricItem = {}
clusterId = KubernetesApiClient.getClusterId
@@ -594,7 +596,7 @@ def getNodeLastRebootTimeMetric(metricJSON, hostName, metricNametoReturn)
nodeName = node["nodeName"]
metricValue = node["startTime"]
- metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z
+ metricTime = metricPollTime #Time.now.utc.iso8601 #2018-01-30T19:36:14Z
metricItem["DataItems"] = []
@@ -620,10 +622,10 @@ def getNodeLastRebootTimeMetric(metricJSON, hostName, metricNametoReturn)
return metricItem
end
- def getContainerStartTimeMetricItems(metricJSON, hostName, metricNametoReturn)
+ def getContainerStartTimeMetricItems(metricJSON, hostName, metricNametoReturn, metricPollTime)
metricItems = []
clusterId = KubernetesApiClient.getClusterId
- currentTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z
+ #currentTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z
begin
metricInfo = metricJSON
metricInfo["pods"].each do |pod|
@@ -632,7 +634,7 @@ def getContainerStartTimeMetricItems(metricJSON, hostName, metricNametoReturn)
pod["containers"].each do |container|
containerName = container["name"]
metricValue = container["startTime"]
- metricTime = currentTime
+ metricTime = metricPollTime #currentTime
metricItem = {}
metricItem["DataItems"] = []
diff --git a/source/code/plugin/ContainerInventoryState.rb b/source/code/plugin/ContainerInventoryState.rb
index 7e5ca18e8..170fa65e3 100644
--- a/source/code/plugin/ContainerInventoryState.rb
+++ b/source/code/plugin/ContainerInventoryState.rb
@@ -2,7 +2,7 @@
# frozen_string_literal: true
class ContainerInventoryState
- require 'json'
+ require 'yajl/json_gem'
require_relative 'omslog'
@@InventoryDirectory = "/var/opt/microsoft/docker-cimprov/state/ContainerInventory/"
diff --git a/source/code/plugin/DockerApiClient.rb b/source/code/plugin/DockerApiClient.rb
index ee2742dd4..f2828b357 100644
--- a/source/code/plugin/DockerApiClient.rb
+++ b/source/code/plugin/DockerApiClient.rb
@@ -3,7 +3,7 @@
class DockerApiClient
require "socket"
- require "json"
+ require "yajl/json_gem"
require "timeout"
require_relative "omslog"
require_relative "DockerApiRestHelper"
@@ -40,7 +40,6 @@ def getResponse(request, isMultiJson, isVersion)
end
break if (isVersion) ? (responseChunk.length < @@ChunkSize) : (responseChunk.end_with? "0\r\n\r\n")
end
- socket.close
return (isTimeOut) ? nil : parseResponse(dockerResponse, isMultiJson)
rescue => errorStr
$log.warn("Socket call failed for request: #{request} error: #{errorStr} , isMultiJson: #{isMultiJson} @ #{Time.now.utc.iso8601}")
@@ -49,6 +48,10 @@ def getResponse(request, isMultiJson, isVersion)
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
return nil
+ ensure
+ if !socket.nil?
+ socket.close
+ end
end
end
diff --git a/source/code/plugin/KubernetesApiClient.rb b/source/code/plugin/KubernetesApiClient.rb
index 6bfdc06f1..e52c77884 100644
--- a/source/code/plugin/KubernetesApiClient.rb
+++ b/source/code/plugin/KubernetesApiClient.rb
@@ -2,7 +2,7 @@
# frozen_string_literal: true
class KubernetesApiClient
- require "json"
+ require "yajl/json_gem"
require "logger"
require "net/http"
require "net/https"
@@ -40,20 +40,17 @@ def getKubeResourceInfo(resource, api_group: nil)
resourceUri = getResourceUri(resource, api_group)
if !resourceUri.nil?
uri = URI.parse(resourceUri)
- http = Net::HTTP.new(uri.host, uri.port)
- http.use_ssl = true
if !File.exist?(@@CaFile)
raise "#{@@CaFile} doesnt exist"
else
- http.ca_file = @@CaFile if File.exist?(@@CaFile)
+ Net::HTTP.start(uri.host, uri.port, :use_ssl => true, :ca_file => @@CaFile, :verify_mode => OpenSSL::SSL::VERIFY_PEER, :open_timeout => 20, :read_timeout => 40) do |http|
+ kubeApiRequest = Net::HTTP::Get.new(uri.request_uri)
+ kubeApiRequest["Authorization"] = "Bearer " + getTokenStr
+ @Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}"
+ response = http.request(kubeApiRequest)
+ @Log.info "KubernetesAPIClient::getKubeResourceInfo : Got response of #{response.code} for #{uri.request_uri} @ #{Time.now.utc.iso8601}"
+ end
end
- http.verify_mode = OpenSSL::SSL::VERIFY_PEER
-
- kubeApiRequest = Net::HTTP::Get.new(uri.request_uri)
- kubeApiRequest["Authorization"] = "Bearer " + getTokenStr
- @Log.info "KubernetesAPIClient::getKubeResourceInfo : Making request to #{uri.request_uri} @ #{Time.now.utc.iso8601}"
- response = http.request(kubeApiRequest)
- @Log.info "KubernetesAPIClient::getKubeResourceInfo : Got response of #{response.code} for #{uri.request_uri} @ #{Time.now.utc.iso8601}"
end
rescue => error
@Log.warn("kubernetes api request failed: #{error} for #{resource} @ #{Time.now.utc.iso8601}")
@@ -338,7 +335,7 @@ def getContainerLogsSinceTime(namespace, pod, container, since, showTimeStamp)
return containerLogs
end
- def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn)
+ def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601)
metricItems = []
begin
clusterId = getClusterId
@@ -373,7 +370,7 @@ def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricName
nodeName = pod["spec"]["nodeName"]
podContainers.each do |container|
containerName = container["name"]
- metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z
+ #metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z
if (!container["resources"].nil? && !container["resources"].empty? && !container["resources"][metricCategory].nil? && !container["resources"][metricCategory][metricNameToCollect].nil?)
metricValue = getMetricNumericValue(metricNameToCollect, container["resources"][metricCategory][metricNameToCollect])
@@ -433,14 +430,14 @@ def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricName
return metricItems
end #getContainerResourceRequestAndLimits
- def parseNodeLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn)
+ def parseNodeLimits(metricJSON, metricCategory, metricNameToCollect, metricNametoReturn, metricTime = Time.now.utc.iso8601)
metricItems = []
begin
metricInfo = metricJSON
clusterId = getClusterId
#Since we are getting all node data at the same time and kubernetes doesnt specify a timestamp for the capacity and allocation metrics,
#if we are coming up with the time it should be same for all nodes
- metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z
+ #metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z
metricInfo["items"].each do |node|
if (!node["status"][metricCategory].nil?)
@@ -551,5 +548,29 @@ def getMetricNumericValue(metricName, metricVal)
end
return metricValue
end # getMetricNumericValue
+
+ def getResourcesAndContinuationToken(uri)
+ continuationToken = nil
+ resourceInventory = nil
+ begin
+ @Log.info "KubernetesApiClient::getResourcesAndContinuationToken : Getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}"
+ resourceInfo = getKubeResourceInfo(uri)
+ @Log.info "KubernetesApiClient::getResourcesAndContinuationToken : Done getting resources from Kube API using url: #{uri} @ #{Time.now.utc.iso8601}"
+ if !resourceInfo.nil?
+ @Log.info "KubernetesApiClient::getResourcesAndContinuationToken:Start:Parsing data for #{uri} using yajl @ #{Time.now.utc.iso8601}"
+ resourceInventory = Yajl::Parser.parse(StringIO.new(resourceInfo.body))
+ @Log.info "KubernetesApiClient::getResourcesAndContinuationToken:End:Parsing data for #{uri} using yajl @ #{Time.now.utc.iso8601}"
+ resourceInfo = nil
+ end
+ if (!resourceInventory.nil? && !resourceInventory["metadata"].nil?)
+ continuationToken = resourceInventory["metadata"]["continue"]
+ end
+ rescue => errorStr
+ @Log.warn "KubernetesApiClient::getResourcesAndContinuationToken:Failed in get resources for #{uri} and continuation token: #{errorStr}"
+ ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
+ resourceInventory = nil
+ end
+ return continuationToken, resourceInventory
+ end #getResourcesAndContinuationToken
end
end
diff --git a/source/code/plugin/filter_cadvisor2mdm.rb b/source/code/plugin/filter_cadvisor2mdm.rb
index a6e643e45..f14a1369b 100644
--- a/source/code/plugin/filter_cadvisor2mdm.rb
+++ b/source/code/plugin/filter_cadvisor2mdm.rb
@@ -4,7 +4,7 @@
module Fluent
require 'logger'
- require 'json'
+ require 'yajl/json_gem'
require_relative 'oms_common'
require_relative 'CustomMetricsUtils'
diff --git a/source/code/plugin/filter_cadvisor_health_container.rb b/source/code/plugin/filter_cadvisor_health_container.rb
index 2eccd125f..93d50e20f 100644
--- a/source/code/plugin/filter_cadvisor_health_container.rb
+++ b/source/code/plugin/filter_cadvisor_health_container.rb
@@ -3,7 +3,7 @@
module Fluent
require 'logger'
- require 'json'
+ require 'yajl/json_gem'
require_relative 'oms_common'
require_relative "ApplicationInsightsUtility"
Dir[File.join(__dir__, './health', '*.rb')].each { |file| require file }
diff --git a/source/code/plugin/filter_cadvisor_health_node.rb b/source/code/plugin/filter_cadvisor_health_node.rb
index d2f735cd1..c6280db60 100644
--- a/source/code/plugin/filter_cadvisor_health_node.rb
+++ b/source/code/plugin/filter_cadvisor_health_node.rb
@@ -3,7 +3,7 @@
module Fluent
require 'logger'
- require 'json'
+ require 'yajl/json_gem'
require_relative 'oms_common'
require_relative "ApplicationInsightsUtility"
require_relative "KubernetesApiClient"
diff --git a/source/code/plugin/filter_docker_log.rb b/source/code/plugin/filter_docker_log.rb
index 7ffd333e3..b80f4c204 100644
--- a/source/code/plugin/filter_docker_log.rb
+++ b/source/code/plugin/filter_docker_log.rb
@@ -5,6 +5,7 @@
module Fluent
require 'logger'
require 'socket'
+ require 'yajl/json_gem'
class DockerLogFilter < Filter
Plugin.register_filter('filter_docker_log', self)
diff --git a/source/code/plugin/filter_health_model_builder.rb b/source/code/plugin/filter_health_model_builder.rb
index 1724065fe..1c451ea38 100644
--- a/source/code/plugin/filter_health_model_builder.rb
+++ b/source/code/plugin/filter_health_model_builder.rb
@@ -4,7 +4,7 @@
module Fluent
require 'logger'
- require 'json'
+ require 'yajl/json_gem'
Dir[File.join(__dir__, './health', '*.rb')].each { |file| require file }
diff --git a/source/code/plugin/filter_inventory2mdm.rb b/source/code/plugin/filter_inventory2mdm.rb
index 30f6f911a..422b4b54a 100644
--- a/source/code/plugin/filter_inventory2mdm.rb
+++ b/source/code/plugin/filter_inventory2mdm.rb
@@ -4,7 +4,7 @@
module Fluent
require 'logger'
- require 'json'
+ require 'yajl/json_gem'
require_relative 'oms_common'
require_relative 'CustomMetricsUtils'
diff --git a/source/code/plugin/health/aggregate_monitor.rb b/source/code/plugin/health/aggregate_monitor.rb
index 10dbdc705..a774478e7 100644
--- a/source/code/plugin/health/aggregate_monitor.rb
+++ b/source/code/plugin/health/aggregate_monitor.rb
@@ -1,7 +1,7 @@
# frozen_string_literal: true
require_relative 'health_model_constants'
-require 'json'
+require 'yajl/json_gem'
# Require only when running inside container.
# otherwise unit tests will fail due to ApplicationInsightsUtility dependency on base omsagent ruby files. If you have your dev machine starting with omsagent-rs, then GOOD LUCK!
@@ -218,7 +218,7 @@ def sort_filter_member_monitors(monitor_set)
member_monitors.push(member_monitor)
}
- filtered = member_monitors.select{|monitor| monitor.state != MonitorState::NONE}
+ filtered = member_monitors.keep_if{|monitor| monitor.state != MonitorState::NONE}
sorted = filtered.sort_by{ |monitor| [@@sort_key_order[monitor.state]] }
return sorted
diff --git a/source/code/plugin/health/cluster_health_state.rb b/source/code/plugin/health/cluster_health_state.rb
index fa9cb42b2..e46d0bf5f 100644
--- a/source/code/plugin/health/cluster_health_state.rb
+++ b/source/code/plugin/health/cluster_health_state.rb
@@ -3,6 +3,7 @@
require "net/http"
require "net/https"
require "uri"
+require 'yajl/json_gem'
module HealthModel
class ClusterHealthState
diff --git a/source/code/plugin/health/health_container_cpu_memory_aggregator.rb b/source/code/plugin/health/health_container_cpu_memory_aggregator.rb
index 29ac91bde..e93c66c14 100644
--- a/source/code/plugin/health/health_container_cpu_memory_aggregator.rb
+++ b/source/code/plugin/health/health_container_cpu_memory_aggregator.rb
@@ -64,8 +64,8 @@ def initialize(resources, provider)
def dedupe_records(container_records)
cpu_deduped_instances = {}
memory_deduped_instances = {}
- container_records = container_records.select{|record| record['CounterName'] == @@memory_counter_name || record['CounterName'] == @@cpu_counter_name}
-
+ container_records = container_records.keep_if{|record| record['CounterName'] == @@memory_counter_name || record['CounterName'] == @@cpu_counter_name}
+
container_records.each do |record|
begin
instance_name = record["InstanceName"]
@@ -98,7 +98,7 @@ def dedupe_records(container_records)
def aggregate(container_records)
#filter and select only cpuUsageNanoCores and memoryRssBytes
- container_records = container_records.select{|record| record['CounterName'] == @@memory_counter_name || record['CounterName'] == @@cpu_counter_name}
+ container_records = container_records.keep_if{|record| record['CounterName'] == @@memory_counter_name || record['CounterName'] == @@cpu_counter_name}
# poduid lookup has poduid/cname --> workload_name, namespace, cpu_limit, memory limit mapping
# from the container records, extract the poduid/cname, get the values from poduid_lookup, and aggregate based on namespace_workload_cname
container_records.each do |record|
diff --git a/source/code/plugin/health/health_container_cpu_memory_record_formatter.rb b/source/code/plugin/health/health_container_cpu_memory_record_formatter.rb
index 0c3f061f1..12c72a120 100644
--- a/source/code/plugin/health/health_container_cpu_memory_record_formatter.rb
+++ b/source/code/plugin/health/health_container_cpu_memory_record_formatter.rb
@@ -1,5 +1,7 @@
# frozen_string_literal: true
+require 'yajl/json_gem'
+
module HealthModel
class HealthContainerCpuMemoryRecordFormatter
diff --git a/source/code/plugin/health/health_hierarchy_builder.rb b/source/code/plugin/health/health_hierarchy_builder.rb
index bb48e083b..a59020996 100644
--- a/source/code/plugin/health/health_hierarchy_builder.rb
+++ b/source/code/plugin/health/health_hierarchy_builder.rb
@@ -1,6 +1,6 @@
# frozen_string_literal: true
+require 'yajl/json_gem'
-require 'json'
module HealthModel
class HealthHierarchyBuilder
diff --git a/source/code/plugin/health/health_model_definition_parser.rb b/source/code/plugin/health/health_model_definition_parser.rb
index 91f8cd24f..c185e5389 100644
--- a/source/code/plugin/health/health_model_definition_parser.rb
+++ b/source/code/plugin/health/health_model_definition_parser.rb
@@ -3,7 +3,7 @@
Class to parse the health model definition. The definition expresses the relationship between monitors, how to roll up to an aggregate monitor,
and what labels to "pass on" to the parent monitor
=end
-require 'json'
+require 'yajl/json_gem'
module HealthModel
class HealthModelDefinitionParser
diff --git a/source/code/plugin/health/health_monitor_optimizer.rb b/source/code/plugin/health/health_monitor_optimizer.rb
index a63d59abf..d87540941 100644
--- a/source/code/plugin/health/health_monitor_optimizer.rb
+++ b/source/code/plugin/health/health_monitor_optimizer.rb
@@ -1,4 +1,5 @@
# frozen_string_literal: true
+require 'yajl/json_gem'
module HealthModel
class HealthMonitorOptimizer
#ctor
diff --git a/source/code/plugin/health/health_monitor_provider.rb b/source/code/plugin/health/health_monitor_provider.rb
index b36c46370..8e1d11143 100644
--- a/source/code/plugin/health/health_monitor_provider.rb
+++ b/source/code/plugin/health/health_monitor_provider.rb
@@ -1,5 +1,6 @@
# frozen_string_literal: true
require_relative 'health_model_constants'
+require 'yajl/json_gem'
module HealthModel
class HealthMonitorProvider
diff --git a/source/code/plugin/health/health_monitor_state.rb b/source/code/plugin/health/health_monitor_state.rb
index 16f8bedc4..110793eeb 100644
--- a/source/code/plugin/health/health_monitor_state.rb
+++ b/source/code/plugin/health/health_monitor_state.rb
@@ -1,5 +1,6 @@
# frozen_string_literal: true
require_relative 'health_model_constants'
+require 'yajl/json_gem'
module HealthModel
diff --git a/source/code/plugin/health/health_monitor_utils.rb b/source/code/plugin/health/health_monitor_utils.rb
index 2fa2d3a52..13d1416b1 100644
--- a/source/code/plugin/health/health_monitor_utils.rb
+++ b/source/code/plugin/health/health_monitor_utils.rb
@@ -2,6 +2,7 @@
require 'logger'
require 'digest'
require_relative 'health_model_constants'
+require 'yajl/json_gem'
module HealthModel
# static class that provides a bunch of utility methods
diff --git a/source/code/plugin/health/unit_monitor.rb b/source/code/plugin/health/unit_monitor.rb
index 6454007b6..8e2de210b 100644
--- a/source/code/plugin/health/unit_monitor.rb
+++ b/source/code/plugin/health/unit_monitor.rb
@@ -1,6 +1,6 @@
# frozen_string_literal: true
require_relative 'health_model_constants'
-require 'json'
+require 'yajl/json_gem'
module HealthModel
class UnitMonitor
diff --git a/source/code/plugin/in_cadvisor_perf.rb b/source/code/plugin/in_cadvisor_perf.rb
index 810fb512f..96aa66aa1 100644
--- a/source/code/plugin/in_cadvisor_perf.rb
+++ b/source/code/plugin/in_cadvisor_perf.rb
@@ -9,14 +9,15 @@ class CAdvisor_Perf_Input < Input
def initialize
super
require "yaml"
- require "json"
+ require 'yajl/json_gem'
+ require "time"
require_relative "CAdvisorMetricsAPIClient"
require_relative "oms_common"
require_relative "omslog"
end
- config_param :run_interval, :time, :default => "1m"
+ config_param :run_interval, :time, :default => 60
config_param :tag, :string, :default => "oms.api.cadvisorperf"
config_param :mdmtag, :string, :default => "mdm.cadvisorperf"
config_param :nodehealthtag, :string, :default => "kubehealth.DaemonSet.Node"
@@ -46,10 +47,12 @@ def shutdown
end
def enumerate()
- time = Time.now.to_f
+ currentTime = Time.now
+ time = currentTime.to_f
+ batchTime = currentTime.utc.iso8601
begin
eventStream = MultiEventStream.new
- metricData = CAdvisorMetricsAPIClient.getMetrics()
+ metricData = CAdvisorMetricsAPIClient.getMetrics(winNode: nil, metricTime: batchTime )
metricData.each do |record|
record["DataType"] = "LINUX_PERF_BLOB"
record["IPName"] = "LogManagement"
@@ -74,14 +77,25 @@ def enumerate()
def run_periodic
@mutex.lock
done = @finished
+ @nextTimeToRun = Time.now
+ @waitTimeout = @run_interval
until done
- @condition.wait(@mutex, @run_interval)
+ @nextTimeToRun = @nextTimeToRun + @run_interval
+ @now = Time.now
+ if @nextTimeToRun <= @now
+ @waitTimeout = 1
+ @nextTimeToRun = @now
+ else
+ @waitTimeout = @nextTimeToRun - @now
+ end
+ @condition.wait(@mutex, @waitTimeout)
done = @finished
@mutex.unlock
if !done
begin
- $log.info("in_cadvisor_perf::run_periodic @ #{Time.now.utc.iso8601}")
+ $log.info("in_cadvisor_perf::run_periodic.enumerate.start @ #{Time.now.utc.iso8601}")
enumerate
+ $log.info("in_cadvisor_perf::run_periodic.enumerate.end @ #{Time.now.utc.iso8601}")
rescue => errorStr
$log.warn "in_cadvisor_perf::run_periodic: enumerate Failed to retrieve cadvisor perf metrics: #{errorStr}"
end
diff --git a/source/code/plugin/in_containerinventory.rb b/source/code/plugin/in_containerinventory.rb
index ccf61ab2e..d107047b4 100644
--- a/source/code/plugin/in_containerinventory.rb
+++ b/source/code/plugin/in_containerinventory.rb
@@ -13,14 +13,15 @@ class Container_Inventory_Input < Input
def initialize
super
- require "json"
+ require 'yajl/json_gem'
+ require "time"
require_relative "DockerApiClient"
require_relative "ContainerInventoryState"
require_relative "ApplicationInsightsUtility"
require_relative "omslog"
end
- config_param :run_interval, :time, :default => "1m"
+ config_param :run_interval, :time, :default => 60
config_param :tag, :string, :default => "oms.containerinsights.containerinventory"
def configure(conf)
@@ -259,14 +260,25 @@ def enumerate
def run_periodic
@mutex.lock
done = @finished
+ @nextTimeToRun = Time.now
+ @waitTimeout = @run_interval
until done
- @condition.wait(@mutex, @run_interval)
+ @nextTimeToRun = @nextTimeToRun + @run_interval
+ @now = Time.now
+ if @nextTimeToRun <= @now
+ @waitTimeout = 1
+ @nextTimeToRun = @now
+ else
+ @waitTimeout = @nextTimeToRun - @now
+ end
+ @condition.wait(@mutex, @waitTimeout)
done = @finished
@mutex.unlock
if !done
begin
- $log.info("in_container_inventory::run_periodic @ #{Time.now.utc.iso8601}")
+ $log.info("in_container_inventory::run_periodic.enumerate.start @ #{Time.now.utc.iso8601}")
enumerate
+ $log.info("in_container_inventory::run_periodic.enumerate.end @ #{Time.now.utc.iso8601}")
rescue => errorStr
$log.warn "in_container_inventory::run_periodic: Failed in enumerate container inventory: #{errorStr}"
end
diff --git a/source/code/plugin/in_containerlog_sudo_tail.rb b/source/code/plugin/in_containerlog_sudo_tail.rb
deleted file mode 100644
index 8faa260d0..000000000
--- a/source/code/plugin/in_containerlog_sudo_tail.rb
+++ /dev/null
@@ -1,189 +0,0 @@
-
-require 'yajl'
-require 'fluent/input'
-require 'fluent/event'
-require 'fluent/config/error'
-require 'fluent/parser'
-require 'open3'
-require 'json'
-require_relative 'omslog'
-require_relative 'KubernetesApiClient'
-
-module Fluent
- class ContainerLogSudoTail < Input
- Plugin.register_input('containerlog_sudo_tail', self)
-
- def initialize
- super
- @command = nil
- @paths = []
- #Using this to construct the file path for all every container json log file.
- #Example container log file path -> /var/lib/docker/containers/{ContainerID}/{ContainerID}-json.log
- #We have read permission on this file but don't have execute permission on the below mentioned path. Hence wildcard character searches to find the container ID's doesn't work.
- @containerLogFilePath = "/var/lib/docker/containers/"
- #This folder contains a list of all the containers running/stopped and we're using it to get all the container ID's which will be needed for the log file path below
- #TODO : Use generic path from docker REST endpoint and find a way to mount the correct folder in the omsagent.yaml
- @containerIDFilePath = "/var/opt/microsoft/docker-cimprov/state/ContainerInventory/*"
- @@systemPodsNamespace = 'kube-system'
- @@getSystemPodsTimeIntervalSecs = 300 #refresh system container list every 5 minutes
- @@lastSystemPodsGetTime = nil;
- @@systemContainerIDList = Hash.new
- @@disableKubeSystemLogCollection = ENV['DISABLE_KUBE_SYSTEM_LOG_COLLECTION']
- if !@@disableKubeSystemLogCollection.nil? && !@@disableKubeSystemLogCollection.empty? && @@disableKubeSystemLogCollection.casecmp('true') == 0
- @@disableKubeSystemLogCollection = 'true'
- $log.info("in_container_sudo_tail : System container log collection is disabled")
- else
- @@disableKubeSystemLogCollection = 'false'
- $log.info("in_container_sudo_tail : System container log collection is enabled")
- end
- end
-
- attr_accessor :command
-
- #The format used to map the program output to the incoming event.
- config_param :format, :string, default: 'none'
-
- #Tag of the event.
- config_param :tag, :string, default: nil
-
- #Fluentd will record the position it last read into this file.
- config_param :pos_file, :string, default: nil
-
- #The interval time between periodic program runs.
- config_param :run_interval, :time, default: nil
-
- BASE_DIR = File.dirname(File.expand_path('..', __FILE__))
- RUBY_DIR = BASE_DIR + '/ruby/bin/ruby '
- TAILSCRIPT = BASE_DIR + '/plugin/containerlogtailfilereader.rb '
-
- def configure(conf)
- super
- unless @pos_file
- raise ConfigError, "'pos_file' is required to keep track of file"
- end
-
- unless @tag
- raise ConfigError, "'tag' is required on sudo tail"
- end
-
- unless @run_interval
- raise ConfigError, "'run_interval' is required for periodic tailing"
- end
-
- @parser = Plugin.new_parser(conf['format'])
- @parser.configure(conf)
- end
-
- def start
- @finished = false
- @thread = Thread.new(&method(:run_periodic))
- end
-
- def shutdown
- @finished = true
- @thread.join
- end
-
- def receive_data(line)
- es = MultiEventStream.new
- begin
- line.chomp! # remove \n
- @parser.parse(line) { |time, record|
- if time && record
- es.add(time, record)
- else
- $log.warn "pattern doesn't match: #{line.inspect}"
- end
- unless es.empty?
- tag=@tag
- router.emit_stream(tag, es)
- end
- }
- rescue => e
- $log.warn line.dump, error: e.to_s
- $log.debug_backtrace(e.backtrace)
- end
- end
-
- def receive_log(line)
- $log.warn "#{line}" if line.start_with?('WARN')
- $log.error "#{line}" if line.start_with?('ERROR')
- $log.info "#{line}" if line.start_with?('INFO')
- end
-
- def readable_path(path)
- if system("sudo test -r #{path}")
- OMS::Log.info_once("Following tail of #{path}")
- return path
- else
- OMS::Log.warn_once("#{path} is not readable. Cannot tail the file.")
- return ""
- end
- end
-
- def set_system_command
- timeNow = DateTime.now
- cName = "Unkown"
- tempContainerInfo = {}
- paths = ""
-
- #if we are on agent & system containers log collection is disabled, get system containerIDs to exclude logs from containers in system containers namespace from being tailed
- if !KubernetesApiClient.isNodeMaster && @@disableKubeSystemLogCollection.casecmp('true') == 0
- if @@lastSystemPodsGetTime.nil? || ((timeNow - @@lastSystemPodsGetTime)*24*60*60).to_i >= @@getSystemPodsTimeIntervalSecs
- $log.info("in_container_sudo_tail : System Container list last refreshed at #{@@lastSystemPodsGetTime} - refreshing now at #{timeNow}")
- sysContainers = KubernetesApiClient.getContainerIDs(@@systemPodsNamespace)
- #BugBug - https://msecg.visualstudio.com/OMS/_workitems/edit/215107 - we get 200 with empty payloaf from time to time
- if (!sysContainers.nil? && !sysContainers.empty?)
- @@systemContainerIDList = sysContainers
- else
- $log.info("in_container_sudo_tail : System Container ID List is empty!!!! Continuing to use currently cached list.")
- end
- @@lastSystemPodsGetTime = timeNow
- $log.info("in_container_sudo_tail : System Container ID List: #{@@systemContainerIDList}")
- end
- end
-
- Dir.glob(@containerIDFilePath).select { |p|
- cName = p.split('/').last;
- if !@@systemContainerIDList.key?("docker://" + cName)
- p = @containerLogFilePath + cName + "/" + cName + "-json.log"
- paths += readable_path(p) + " "
- else
- $log.info("in_container_sudo_tail : Excluding system container with ID #{cName} from tailng for log collection")
- end
- }
- if !system("sudo test -r #{@pos_file}")
- system("sudo touch #{@pos_file}")
- end
- @command = "sudo " << RUBY_DIR << TAILSCRIPT << paths << " -p #{@pos_file}"
- end
-
- def run_periodic
- until @finished
- begin
- sleep @run_interval
- #if we are on master & system containers log collection is disabled, collect nothing (i.e NO COntainer log collection for ANY container)
- #we will be not collection omsagent log as well in this case, but its insignificant & okay!
- if !KubernetesApiClient.isNodeMaster || @@disableKubeSystemLogCollection.casecmp('true') != 0
- set_system_command
- Open3.popen3(@command) {|writeio, readio, errio, wait_thread|
- writeio.close
- while line = readio.gets
- receive_data(line)
- end
- while line = errio.gets
- receive_log(line)
- end
-
- wait_thread.value #wait until child process terminates
- }
- end
- rescue
- $log.error "containerlog_sudo_tail failed to run or shutdown child proces", error => $!.to_s, :error_class => $!.class.to_s
- $log.warn_backtrace $!.backtrace
- end
- end
- end
- end
-
-end
diff --git a/source/code/plugin/in_kube_events.rb b/source/code/plugin/in_kube_events.rb
index e1fdc5df6..6116cb62d 100644
--- a/source/code/plugin/in_kube_events.rb
+++ b/source/code/plugin/in_kube_events.rb
@@ -9,15 +9,20 @@ class Kube_Event_Input < Input
def initialize
super
- require "json"
+ require "yajl/json_gem"
+ require "yajl"
+ require "time"
require_relative "KubernetesApiClient"
require_relative "oms_common"
require_relative "omslog"
require_relative "ApplicationInsightsUtility"
+
+ # 30000 events account to approximately 5MB
+ @EVENTS_CHUNK_SIZE = 30000
end
- config_param :run_interval, :time, :default => "1m"
+ config_param :run_interval, :time, :default => 60
config_param :tag, :string, :default => "oms.containerinsights.KubeEvents"
def configure(conf)
@@ -43,79 +48,114 @@ def shutdown
end
end
- def enumerate(eventList = nil)
- currentTime = Time.now
- emitTime = currentTime.to_f
- batchTime = currentTime.utc.iso8601
+ def enumerate
+ begin
+ eventList = nil
+ currentTime = Time.now
+ batchTime = currentTime.utc.iso8601
+ eventQueryState = getEventQueryState
+ newEventQueryState = []
+
+ # Initializing continuation token to nil
+ continuationToken = nil
+ $log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}")
+ continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}")
+ $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}")
+ if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?)
+ newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime)
+ else
+ $log.warn "in_kube_events::enumerate:Received empty eventList"
+ end
- events = eventList
- $log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}")
- eventInfo = KubernetesApiClient.getKubeResourceInfo("events")
- $log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}")
+ #If we receive a continuation token, make calls, process and flush data until we have processed all data
+ while (!continuationToken.nil? && !continuationToken.empty?)
+ continuationToken, eventList = KubernetesApiClient.getResourcesAndContinuationToken("events?fieldSelector=type!=Normal&limit=#{@EVENTS_CHUNK_SIZE}&continue=#{continuationToken}")
+ if (!eventList.nil? && !eventList.empty? && eventList.key?("items") && !eventList["items"].nil? && !eventList["items"].empty?)
+ newEventQueryState = parse_and_emit_records(eventList, eventQueryState, newEventQueryState, batchTime)
+ else
+ $log.warn "in_kube_events::enumerate:Received empty eventList"
+ end
+ end
- if !eventInfo.nil?
- events = JSON.parse(eventInfo.body)
+ # Setting this to nil so that we dont hold memory until GC kicks in
+ eventList = nil
+ writeEventQueryState(newEventQueryState)
+ rescue => errorStr
+ $log.warn "in_kube_events::enumerate:Failed in enumerate: #{errorStr}"
+ $log.debug_backtrace(errorStr.backtrace)
+ ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
+ end # end enumerate
- eventQueryState = getEventQueryState
- newEventQueryState = []
+ def parse_and_emit_records(events, eventQueryState, newEventQueryState, batchTime = Time.utc.iso8601)
+ currentTime = Time.now
+ emitTime = currentTime.to_f
begin
- if (!events.nil? && !events.empty? && !events["items"].nil?)
- eventStream = MultiEventStream.new
- events["items"].each do |items|
- record = {}
- # - Not sure if ingestion has the below mapping for this custom type. Fix it as part of fixed type conversion
- record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated
- eventId = items["metadata"]["uid"] + "/" + items["count"].to_s
- newEventQueryState.push(eventId)
- if !eventQueryState.empty? && eventQueryState.include?(eventId)
- next
- end
- record["ObjectKind"] = items["involvedObject"]["kind"]
- record["Namespace"] = items["involvedObject"]["namespace"]
- record["Name"] = items["involvedObject"]["name"]
- record["Reason"] = items["reason"]
- record["Message"] = items["message"]
- record["Type"] = items["type"]
- record["TimeGenerated"] = items["metadata"]["creationTimestamp"]
- record["SourceComponent"] = items["source"]["component"]
- record["FirstSeen"] = items["firstTimestamp"]
- record["LastSeen"] = items["lastTimestamp"]
- record["Count"] = items["count"]
- if items["source"].key?("host")
- record["Computer"] = items["source"]["host"]
- else
- record["Computer"] = (OMS::Common.get_hostname)
- end
- record['ClusterName'] = KubernetesApiClient.getClusterName
- record["ClusterId"] = KubernetesApiClient.getClusterId
- wrapper = {
- "DataType" => "KUBE_EVENTS_BLOB",
- "IPName" => "ContainerInsights",
- "DataItems" => [record.each { |k, v| record[k] = v }],
- }
- eventStream.add(emitTime, wrapper) if wrapper
+ eventStream = MultiEventStream.new
+ events["items"].each do |items|
+ record = {}
+ # - Not sure if ingestion has the below mapping for this custom type. Fix it as part of fixed type conversion
+ record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated
+ eventId = items["metadata"]["uid"] + "/" + items["count"].to_s
+ newEventQueryState.push(eventId)
+ if !eventQueryState.empty? && eventQueryState.include?(eventId)
+ next
end
- router.emit_stream(@tag, eventStream) if eventStream
- end
- writeEventQueryState(newEventQueryState)
+ record["ObjectKind"] = items["involvedObject"]["kind"]
+ record["Namespace"] = items["involvedObject"]["namespace"]
+ record["Name"] = items["involvedObject"]["name"]
+ record["Reason"] = items["reason"]
+ record["Message"] = items["message"]
+ record["Type"] = items["type"]
+ record["TimeGenerated"] = items["metadata"]["creationTimestamp"]
+ record["SourceComponent"] = items["source"]["component"]
+ record["FirstSeen"] = items["firstTimestamp"]
+ record["LastSeen"] = items["lastTimestamp"]
+ record["Count"] = items["count"]
+ if items["source"].key?("host")
+ record["Computer"] = items["source"]["host"]
+ else
+ record["Computer"] = (OMS::Common.get_hostname)
+ end
+ record["ClusterName"] = KubernetesApiClient.getClusterName
+ record["ClusterId"] = KubernetesApiClient.getClusterId
+ wrapper = {
+ "DataType" => "KUBE_EVENTS_BLOB",
+ "IPName" => "ContainerInsights",
+ "DataItems" => [record.each { |k, v| record[k] = v }],
+ }
+ eventStream.add(emitTime, wrapper) if wrapper
+ end
+ router.emit_stream(@tag, eventStream) if eventStream
rescue => errorStr
$log.debug_backtrace(errorStr.backtrace)
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
- end
+ end
+ return newEventQueryState
end
def run_periodic
@mutex.lock
done = @finished
+ @nextTimeToRun = Time.now
+ @waitTimeout = @run_interval
until done
- @condition.wait(@mutex, @run_interval)
+ @nextTimeToRun = @nextTimeToRun + @run_interval
+ @now = Time.now
+ if @nextTimeToRun <= @now
+ @waitTimeout = 1
+ @nextTimeToRun = @now
+ else
+ @waitTimeout = @nextTimeToRun - @now
+ end
+ @condition.wait(@mutex, @waitTimeout)
done = @finished
@mutex.unlock
if !done
begin
- $log.info("in_kube_events::run_periodic @ #{Time.now.utc.iso8601}")
+ $log.info("in_kube_events::run_periodic.enumerate.start @ #{Time.now.utc.iso8601}")
enumerate
+ $log.info("in_kube_events::run_periodic.enumerate.end @ #{Time.now.utc.iso8601}")
rescue => errorStr
$log.warn "in_kube_events::run_periodic: enumerate Failed to retrieve kube events: #{errorStr}"
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
diff --git a/source/code/plugin/in_kube_health.rb b/source/code/plugin/in_kube_health.rb
index 57ca07f64..0eebf395b 100644
--- a/source/code/plugin/in_kube_health.rb
+++ b/source/code/plugin/in_kube_health.rb
@@ -21,19 +21,22 @@ def initialize
begin
super
require "yaml"
- require "json"
+ require 'yajl/json_gem'
+ require "yajl"
+ require "time"
@@cluster_id = KubernetesApiClient.getClusterId
@resources = HealthKubernetesResources.instance
@provider = HealthMonitorProvider.new(@@cluster_id, HealthMonitorUtils.get_cluster_labels, @resources, @health_monitor_config_path)
@@ApiGroupApps = "apps"
+ @@KubeInfraNamespace = "kube-system"
rescue => e
ApplicationInsightsUtility.sendExceptionTelemetry(e, {"FeatureArea" => "Health"})
end
end
include HealthModel
- config_param :run_interval, :time, :default => "1m"
+ config_param :run_interval, :time, :default => 60
config_param :tag, :string, :default => "kubehealth.ReplicaSet"
def configure(conf)
@@ -83,10 +86,11 @@ def enumerate
#HealthMonitorUtils.refresh_kubernetes_api_data(@@hmlog, nil)
# we do this so that if the call fails, we get a response code/header etc.
node_inventory_response = KubernetesApiClient.getKubeResourceInfo("nodes")
- node_inventory = JSON.parse(node_inventory_response.body)
- pod_inventory_response = KubernetesApiClient.getKubeResourceInfo("pods")
- pod_inventory = JSON.parse(pod_inventory_response.body)
- replicaset_inventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("replicasets", api_group: @@ApiGroupApps).body)
+ node_inventory = Yajl::Parser.parse(StringIO.new(node_inventory_response.body))
+ pod_inventory_response = KubernetesApiClient.getKubeResourceInfo("pods?fieldSelector=metadata.namespace%3D#{@@KubeInfraNamespace}")
+ pod_inventory = Yajl::Parser.parse(StringIO.new(pod_inventory_response.body))
+ replicaset_inventory_response = KubernetesApiClient.getKubeResourceInfo("replicasets?fieldSelector=metadata.namespace%3D#{@@KubeInfraNamespace}", api_group: @@ApiGroupApps)
+ replicaset_inventory = Yajl::Parser.parse(StringIO.new(replicaset_inventory_response.body))
@resources.node_inventory = node_inventory
@resources.pod_inventory = pod_inventory
@@ -108,8 +112,8 @@ def enumerate
health_monitor_records.push(record) if record
pods_ready_hash = HealthMonitorUtils.get_pods_ready_hash(@resources)
- system_pods = pods_ready_hash.select { |k, v| v["namespace"] == "kube-system" }
- workload_pods = pods_ready_hash.select { |k, v| v["namespace"] != "kube-system" }
+ system_pods = pods_ready_hash.keep_if { |k, v| v["namespace"] == @@KubeInfraNamespace }
+ workload_pods = Hash.new # pods_ready_hash.select{ |k, v| v["namespace"] != @@KubeInfraNamespace }
system_pods_ready_percentage_records = process_pods_ready_percentage(system_pods, MonitorId::SYSTEM_WORKLOAD_PODS_READY_MONITOR_ID)
system_pods_ready_percentage_records.each do |record|
@@ -225,28 +229,28 @@ def process_pods_ready_percentage(pods_hash, config_monitor_id)
hmlog = HealthMonitorUtils.get_log_handle
records = []
- pods_hash.keys.each do |key|
- workload_name = key
- total_pods = pods_hash[workload_name]["totalPods"]
- pods_ready = pods_hash[workload_name]["podsReady"]
- namespace = pods_hash[workload_name]["namespace"]
- workload_kind = pods_hash[workload_name]["kind"]
- percent = pods_ready / total_pods * 100
- timestamp = Time.now.utc.iso8601
-
- state = HealthMonitorUtils.compute_percentage_state(percent, monitor_config)
- health_monitor_record = {"timestamp" => timestamp, "state" => state, "details" => {"totalPods" => total_pods, "podsReady" => pods_ready, "workload_name" => workload_name, "namespace" => namespace, "workload_kind" => workload_kind}}
- monitor_instance_id = HealthMonitorUtils.get_monitor_instance_id(config_monitor_id, [@@cluster_id, namespace, workload_name])
- health_record = {}
- time_now = Time.now.utc.iso8601
- health_record[HealthMonitorRecordFields::MONITOR_ID] = config_monitor_id
- health_record[HealthMonitorRecordFields::MONITOR_INSTANCE_ID] = monitor_instance_id
- health_record[HealthMonitorRecordFields::DETAILS] = health_monitor_record
- health_record[HealthMonitorRecordFields::TIME_GENERATED] = time_now
- health_record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED] = time_now
- health_record[HealthMonitorRecordFields::CLUSTER_ID] = @@cluster_id
- records.push(health_record)
- end
+ pods_hash.keys.each do |key|
+ workload_name = key
+ total_pods = pods_hash[workload_name]["totalPods"]
+ pods_ready = pods_hash[workload_name]["podsReady"]
+ namespace = pods_hash[workload_name]["namespace"]
+ workload_kind = pods_hash[workload_name]["kind"]
+ percent = pods_ready / total_pods * 100
+ timestamp = Time.now.utc.iso8601
+
+ state = HealthMonitorUtils.compute_percentage_state(percent, monitor_config)
+ health_monitor_record = {"timestamp" => timestamp, "state" => state, "details" => {"totalPods" => total_pods, "podsReady" => pods_ready, "workload_name" => workload_name, "namespace" => namespace, "workload_kind" => workload_kind}}
+ monitor_instance_id = HealthMonitorUtils.get_monitor_instance_id(config_monitor_id, [@@cluster_id, namespace, workload_name])
+ health_record = {}
+ time_now = Time.now.utc.iso8601
+ health_record[HealthMonitorRecordFields::MONITOR_ID] = config_monitor_id
+ health_record[HealthMonitorRecordFields::MONITOR_INSTANCE_ID] = monitor_instance_id
+ health_record[HealthMonitorRecordFields::DETAILS] = health_monitor_record
+ health_record[HealthMonitorRecordFields::TIME_GENERATED] = time_now
+ health_record[HealthMonitorRecordFields::TIME_FIRST_OBSERVED] = time_now
+ health_record[HealthMonitorRecordFields::CLUSTER_ID] = @@cluster_id
+ records.push(health_record)
+ end
#@@hmlog.info "Successfully processed pods_ready_percentage for #{config_monitor_id} #{records.size}"
return records
end
@@ -296,10 +300,11 @@ def process_node_condition_monitor(node_inventory)
def initialize_inventory
#this is required because there are other components, like the container cpu memory aggregator, that depends on the mapping being initialized
node_inventory_response = KubernetesApiClient.getKubeResourceInfo("nodes")
- node_inventory = JSON.parse(node_inventory_response.body)
- pod_inventory_response = KubernetesApiClient.getKubeResourceInfo("pods")
- pod_inventory = JSON.parse(pod_inventory_response.body)
- replicaset_inventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("replicasets", api_group: @@ApiGroupApps).body)
+ node_inventory = Yajl::Parser.parse(StringIO.new(node_inventory_response.body))
+ pod_inventory_response = KubernetesApiClient.getKubeResourceInfo("pods?fieldSelector=metadata.namespace%3D#{@@KubeInfraNamespace}")
+ pod_inventory = Yajl::Parser.parse(StringIO.new(pod_inventory_response.body))
+ replicaset_inventory_response = KubernetesApiClient.getKubeResourceInfo("replicasets?fieldSelector=metadata.namespace%3D#{@@KubeInfraNamespace}", api_group: @@ApiGroupApps)
+ replicaset_inventory = Yajl::Parser.parse(StringIO.new(replicaset_inventory_response.body))
@resources.node_inventory = node_inventory
@resources.pod_inventory = pod_inventory
@@ -310,14 +315,25 @@ def initialize_inventory
def run_periodic
@mutex.lock
done = @finished
+ @nextTimeToRun = Time.now
+ @waitTimeout = @run_interval
until done
- @condition.wait(@mutex, @run_interval)
+ @nextTimeToRun = @nextTimeToRun + @run_interval
+ @now = Time.now
+ if @nextTimeToRun <= @now
+ @waitTimeout = 1
+ @nextTimeToRun = @now
+ else
+ @waitTimeout = @nextTimeToRun - @now
+ end
+ @condition.wait(@mutex, @waitTimeout)
done = @finished
@mutex.unlock
if !done
begin
- @@hmlog.info("in_kube_health::run_periodic @ #{Time.now.utc.iso8601}")
+ @@hmlog.info("in_kube_health::run_periodic.enumerate.start @ #{Time.now.utc.iso8601}")
enumerate
+ @@hmlog.info("in_kube_health::run_periodic.enumerate.end @ #{Time.now.utc.iso8601}")
rescue => errorStr
@@hmlog.warn "in_kube_health::run_periodic: enumerate Failed for kubeapi sourced data health: #{errorStr}"
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
diff --git a/source/code/plugin/in_kube_logs.rb b/source/code/plugin/in_kube_logs.rb
deleted file mode 100644
index 119473819..000000000
--- a/source/code/plugin/in_kube_logs.rb
+++ /dev/null
@@ -1,181 +0,0 @@
-#!/usr/local/bin/ruby
-# frozen_string_literal: true
-
-module Fluent
-
- class Kube_Logs_Input < Input
- Plugin.register_input('kubelogs', self)
-
- @@KubeLogsStateFile = "/var/opt/microsoft/docker-cimprov/state/KubeLogQueryState.yaml"
-
- def initialize
- super
- require 'yaml'
- require 'date'
- require 'time'
- require 'json'
-
- require_relative 'KubernetesApiClient'
- require_relative 'oms_common'
- require_relative 'omslog'
- end
-
- config_param :run_interval, :time, :default => '1m'
- config_param :tag, :string, :default => "oms.api.KubeLogs"
-
- def configure (conf)
- super
- end
-
- def start
- if @run_interval
- @finished = false
- @condition = ConditionVariable.new
- @mutex = Mutex.new
- @thread = Thread.new(&method(:run_periodic))
- end
- end
-
- def shutdown
- if @run_interval
- @mutex.synchronize {
- @finished = true
- @condition.signal
- }
- @thread.join
- end
- end
-
- def enumerate(podList = nil)
-
- namespace = ENV['OMS_KUBERNETES_LOGS_NAMESPACE']
- if namespace.nil? || namespace.empty?
- return
- end
-
- time = Time.now.to_f
- if podList.nil?
- pods = KubernetesApiClient.getPods(namespace)
- else
- pods = podList
- end
- logQueryState = getLogQueryState
- newLogQueryState = {}
-
- pods.each do |pod|
- record = {}
- begin
- pod['status']['containerStatuses'].each do |container|
-
- # if container['state']['running']
- # puts container['name'] + ' is running'
- # end
-
- timeStamp = DateTime.now
-
- containerId = pod['metadata']['namespace'] + "_" + pod['metadata']['name'] + "_" + container['name']
- if !logQueryState.empty? && logQueryState[containerId]
- timeStamp = DateTime.parse(logQueryState[containerId])
- end
-
- # Try to get logs for the container
- begin
- $log.debug "Getting logs for #{container['name']}"
- logs = KubernetesApiClient.getContainerLogsSinceTime(pod['metadata']['namespace'], pod['metadata']['name'], container['name'], timeStamp.rfc3339(9), true)
- $log.debug "got something back"
-
- # By default we don't change the timestamp (if no logs were returned or if there was a (hopefully transient) error in retrieval
- newLogQueryState[containerId] = timeStamp.rfc3339(9)
-
- if !logs || logs.empty?
- $log.info "no logs returned"
- else
- $log.debug "response size is #{logs.length}"
- lines = logs.split("\n")
- index = -1
-
- # skip duplicates
- for i in 0...lines.count
- dateTime = DateTime.parse(lines[i].split(" ").first)
- if (dateTime.to_time - timeStamp.to_time) > 0.0
- index = i
- break
- end
- end
-
- if index >= 0
- $log.debug "starting from line #{index}"
- for i in index...lines.count
- record['Namespace'] = pod['metadata']['namespace']
- record['Pod'] = pod['metadata']['name']
- record['Container'] = container['name']
- record['Message'] = lines[i][(lines[i].index(' ') + 1)..(lines[i].length - 1)]
- record['TimeGenerated'] = lines[i].split(" ").first
- record['Node'] = pod['spec']['nodeName']
- record['Computer'] = OMS::Common.get_hostname
- record['ClusterName'] = KubernetesApiClient.getClusterName
- router.emit(@tag, time, record) if record
- end
- newLogQueryState[containerId] = lines.last.split(" ").first
- else
- newLogQueryState[containerId] = DateTime.now.rfc3339(9)
- end
- end
- rescue => logException
- $log.warn "Failed to retrieve logs for container: #{logException}"
- $log.debug_backtrace(logException.backtrace)
- end
- end
- # Update log query state only if logging was succesfful.
- # TODO: May have a few duplicate lines in case of
- writeLogQueryState(newLogQueryState)
- rescue => errorStr
- $log.warn "Exception raised in enumerate: #{errorStr}"
- $log.debug_backtrace(errorStr.backtrace)
- end
- end
- end
-
- def run_periodic
- @mutex.lock
- done = @finished
- until done
- @condition.wait(@mutex, @run_interval)
- done = @finished
- @mutex.unlock
- if !done
- $log.debug "calling enumerate for KubeLogs"
- enumerate
- $log.debug "done with enumerate for KubeLogs"
- end
- @mutex.lock
- end
- @mutex.unlock
- end
-
- def getLogQueryState
- logQueryState = {}
- begin
- if File.file?(@@KubeLogsStateFile)
- logQueryState = YAML.load_file(@@KubeLogsStateFile, {})
- end
- rescue => errorStr
- $log.warn "Failed to load query state #{errorStr}"
- $log.debug_backtrace(errorStr.backtrace)
- end
- return logQueryState
- end
-
- def writeLogQueryState(logQueryState)
- begin
- File.write(@@KubeLogsStateFile, logQueryState.to_yaml)
- rescue => errorStr
- $log.warn "Failed to write query state #{errorStr.to_s}"
- $log.debug_backtrace(errorStr.backtrace)
- end
- end
-
- end # Kube_Log_Input
-
-end # module
-
diff --git a/source/code/plugin/in_kube_nodes.rb b/source/code/plugin/in_kube_nodes.rb
index 0a0fd9d2e..fa0994f43 100644
--- a/source/code/plugin/in_kube_nodes.rb
+++ b/source/code/plugin/in_kube_nodes.rb
@@ -9,6 +9,7 @@ class Kube_nodeInventory_Input < Input
@@MDMKubeNodeInventoryTag = "mdm.kubenodeinventory"
@@promConfigMountPath = "/etc/config/settings/prometheus-data-collection-settings"
@@AzStackCloudFileName = "/etc/kubernetes/host/azurestackcloud.json"
+ @@kubeperfTag = "oms.api.KubePerf"
@@rsPromInterval = ENV["TELEMETRY_RS_PROM_INTERVAL"]
@@rsPromFieldPassCount = ENV["TELEMETRY_RS_PROM_FIELDPASS_LENGTH"]
@@ -21,15 +22,18 @@ class Kube_nodeInventory_Input < Input
def initialize
super
require "yaml"
- require "json"
+ require "yajl/json_gem"
+ require "yajl"
+ require "time"
require_relative "KubernetesApiClient"
require_relative "ApplicationInsightsUtility"
require_relative "oms_common"
require_relative "omslog"
+ @NODES_CHUNK_SIZE = "400"
end
- config_param :run_interval, :time, :default => "1m"
+ config_param :run_interval, :time, :default => 60
config_param :tag, :string, :default => "oms.containerinsights.KubeNodeInventory"
def configure(conf)
@@ -57,158 +61,217 @@ def shutdown
end
def enumerate
- currentTime = Time.now
- emitTime = currentTime.to_f
- batchTime = currentTime.utc.iso8601
- telemetrySent = false
+ begin
+ nodeInventory = nil
+ currentTime = Time.now
+ batchTime = currentTime.utc.iso8601
- nodeInventory = nil
+ # Initializing continuation token to nil
+ continuationToken = nil
+ $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}")
+ continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken("nodes?limit=#{@NODES_CHUNK_SIZE}")
+ $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}")
+ if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?)
+ parse_and_emit_records(nodeInventory, batchTime)
+ else
+ $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory"
+ end
- $log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}")
- nodeInfo = KubernetesApiClient.getKubeResourceInfo("nodes")
- $log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}")
+ #If we receive a continuation token, make calls, process and flush data until we have processed all data
+ while (!continuationToken.nil? && !continuationToken.empty?)
+ continuationToken, nodeInventory = KubernetesApiClient.getResourcesAndContinuationToken("nodes?limit=#{@NODES_CHUNK_SIZE}&continue=#{continuationToken}")
+ if (!nodeInventory.nil? && !nodeInventory.empty? && nodeInventory.key?("items") && !nodeInventory["items"].nil? && !nodeInventory["items"].empty?)
+ parse_and_emit_records(nodeInventory, batchTime)
+ else
+ $log.warn "in_kube_nodes::enumerate:Received empty nodeInventory"
+ end
+ end
- if !nodeInfo.nil?
- nodeInventory = JSON.parse(nodeInfo.body)
+ # Setting this to nil so that we dont hold memory until GC kicks in
+ nodeInventory = nil
+ rescue => errorStr
+ $log.warn "in_kube_nodes::enumerate:Failed in enumerate: #{errorStr}"
+ $log.debug_backtrace(errorStr.backtrace)
+ ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
+ end # end enumerate
+ def parse_and_emit_records(nodeInventory, batchTime = Time.utc.iso8601)
begin
- if (!nodeInventory.nil? && !nodeInventory.empty?)
- eventStream = MultiEventStream.new
- containerNodeInventoryEventStream = MultiEventStream.new
- if !nodeInventory["items"].nil?
- #get node inventory
- nodeInventory["items"].each do |items|
- record = {}
- # Sending records for ContainerNodeInventory
- containerNodeInventoryRecord = {}
- containerNodeInventoryRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated
- containerNodeInventoryRecord["Computer"] = items["metadata"]["name"]
-
- record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated
- record["Computer"] = items["metadata"]["name"]
- record["ClusterName"] = KubernetesApiClient.getClusterName
- record["ClusterId"] = KubernetesApiClient.getClusterId
- record["CreationTimeStamp"] = items["metadata"]["creationTimestamp"]
- record["Labels"] = [items["metadata"]["labels"]]
- record["Status"] = ""
-
- if !items["spec"]["providerID"].nil? && !items["spec"]["providerID"].empty?
- if File.file?(@@AzStackCloudFileName) # existence of this file indicates agent running on azstack
- record["KubernetesProviderID"] = "azurestack"
- else
- record["KubernetesProviderID"] = items["spec"]["providerID"]
- end
- else
- record["KubernetesProviderID"] = "onprem"
- end
+ currentTime = Time.now
+ emitTime = currentTime.to_f
+ telemetrySent = false
+ eventStream = MultiEventStream.new
+ containerNodeInventoryEventStream = MultiEventStream.new
+ #get node inventory
+ nodeInventory["items"].each do |items|
+ record = {}
+ # Sending records for ContainerNodeInventory
+ containerNodeInventoryRecord = {}
+ containerNodeInventoryRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated
+ containerNodeInventoryRecord["Computer"] = items["metadata"]["name"]
- # Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions.
- # We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we
- # populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk"
- # implying that the node is ready for hosting pods, however its out of disk.
-
- if items["status"].key?("conditions") && !items["status"]["conditions"].empty?
- allNodeConditions = ""
- items["status"]["conditions"].each do |condition|
- if condition["status"] == "True"
- if !allNodeConditions.empty?
- allNodeConditions = allNodeConditions + "," + condition["type"]
- else
- allNodeConditions = condition["type"]
- end
- end
- #collect last transition to/from ready (no matter ready is true/false)
- if condition["type"] == "Ready" && !condition["lastTransitionTime"].nil?
- record["LastTransitionTimeReady"] = condition["lastTransitionTime"]
- end
- end
+ record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated
+ record["Computer"] = items["metadata"]["name"]
+ record["ClusterName"] = KubernetesApiClient.getClusterName
+ record["ClusterId"] = KubernetesApiClient.getClusterId
+ record["CreationTimeStamp"] = items["metadata"]["creationTimestamp"]
+ record["Labels"] = [items["metadata"]["labels"]]
+ record["Status"] = ""
+
+ if !items["spec"]["providerID"].nil? && !items["spec"]["providerID"].empty?
+ if File.file?(@@AzStackCloudFileName) # existence of this file indicates agent running on azstack
+ record["KubernetesProviderID"] = "azurestack"
+ else
+ record["KubernetesProviderID"] = items["spec"]["providerID"]
+ end
+ else
+ record["KubernetesProviderID"] = "onprem"
+ end
+
+ # Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions.
+ # We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we
+ # populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk"
+ # implying that the node is ready for hosting pods, however its out of disk.
+
+ if items["status"].key?("conditions") && !items["status"]["conditions"].empty?
+ allNodeConditions = ""
+ items["status"]["conditions"].each do |condition|
+ if condition["status"] == "True"
if !allNodeConditions.empty?
- record["Status"] = allNodeConditions
+ allNodeConditions = allNodeConditions + "," + condition["type"]
+ else
+ allNodeConditions = condition["type"]
end
end
-
- nodeInfo = items["status"]["nodeInfo"]
- record["KubeletVersion"] = nodeInfo["kubeletVersion"]
- record["KubeProxyVersion"] = nodeInfo["kubeProxyVersion"]
- containerNodeInventoryRecord["OperatingSystem"] = nodeInfo["osImage"]
- dockerVersion = nodeInfo["containerRuntimeVersion"]
- dockerVersion.slice! "docker://"
- containerNodeInventoryRecord["DockerVersion"] = dockerVersion
- # ContainerNodeInventory data for docker version and operating system.
- containerNodeInventoryWrapper = {
- "DataType" => "CONTAINER_NODE_INVENTORY_BLOB",
- "IPName" => "ContainerInsights",
- "DataItems" => [containerNodeInventoryRecord.each { |k, v| containerNodeInventoryRecord[k] = v }],
- }
- containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryWrapper) if containerNodeInventoryWrapper
-
- wrapper = {
- "DataType" => "KUBE_NODE_INVENTORY_BLOB",
- "IPName" => "ContainerInsights",
- "DataItems" => [record.each { |k, v| record[k] = v }],
- }
- eventStream.add(emitTime, wrapper) if wrapper
- # Adding telemetry to send node telemetry every 5 minutes
- timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs
- timeDifferenceInMinutes = timeDifference / 60
- if (timeDifferenceInMinutes >= 10)
- properties = {}
- properties["Computer"] = record["Computer"]
- properties["KubeletVersion"] = record["KubeletVersion"]
- properties["OperatingSystem"] = nodeInfo["operatingSystem"]
- properties["DockerVersion"] = dockerVersion
- properties["KubernetesProviderID"] = record["KubernetesProviderID"]
- properties["KernelVersion"] = nodeInfo["kernelVersion"]
- properties["OSImage"] = nodeInfo["osImage"]
-
- capacityInfo = items["status"]["capacity"]
- ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties)
-
- #telemetry about prometheus metric collections settings for replicaset
- if (File.file?(@@promConfigMountPath))
- properties["rsPromInt"] = @@rsPromInterval
- properties["rsPromFPC"] = @@rsPromFieldPassCount
- properties["rsPromFDC"] = @@rsPromFieldDropCount
- properties["rsPromServ"] = @@rsPromK8sServiceCount
- properties["rsPromUrl"] = @@rsPromUrlCount
- properties["rsPromMonPods"] = @@rsPromMonitorPods
- properties["rsPromMonPodsNs"] = @@rsPromMonitorPodsNamespaceLength
- end
- ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties)
- telemetrySent = true
+ #collect last transition to/from ready (no matter ready is true/false)
+ if condition["type"] == "Ready" && !condition["lastTransitionTime"].nil?
+ record["LastTransitionTimeReady"] = condition["lastTransitionTime"]
end
end
+ if !allNodeConditions.empty?
+ record["Status"] = allNodeConditions
+ end
end
- router.emit_stream(@tag, eventStream) if eventStream
- router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream
- router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream
- if telemetrySent == true
- @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i
+
+ nodeInfo = items["status"]["nodeInfo"]
+ record["KubeletVersion"] = nodeInfo["kubeletVersion"]
+ record["KubeProxyVersion"] = nodeInfo["kubeProxyVersion"]
+ containerNodeInventoryRecord["OperatingSystem"] = nodeInfo["osImage"]
+ dockerVersion = nodeInfo["containerRuntimeVersion"]
+ dockerVersion.slice! "docker://"
+ containerNodeInventoryRecord["DockerVersion"] = dockerVersion
+ # ContainerNodeInventory data for docker version and operating system.
+ containerNodeInventoryWrapper = {
+ "DataType" => "CONTAINER_NODE_INVENTORY_BLOB",
+ "IPName" => "ContainerInsights",
+ "DataItems" => [containerNodeInventoryRecord.each { |k, v| containerNodeInventoryRecord[k] = v }],
+ }
+ containerNodeInventoryEventStream.add(emitTime, containerNodeInventoryWrapper) if containerNodeInventoryWrapper
+
+ wrapper = {
+ "DataType" => "KUBE_NODE_INVENTORY_BLOB",
+ "IPName" => "ContainerInsights",
+ "DataItems" => [record.each { |k, v| record[k] = v }],
+ }
+ eventStream.add(emitTime, wrapper) if wrapper
+ # Adding telemetry to send node telemetry every 10 minutes
+ timeDifference = (DateTime.now.to_time.to_i - @@nodeTelemetryTimeTracker).abs
+ timeDifferenceInMinutes = timeDifference / 60
+ if (timeDifferenceInMinutes >= 10)
+ properties = {}
+ properties["Computer"] = record["Computer"]
+ properties["KubeletVersion"] = record["KubeletVersion"]
+ properties["OperatingSystem"] = nodeInfo["operatingSystem"]
+ properties["DockerVersion"] = dockerVersion
+ properties["KubernetesProviderID"] = record["KubernetesProviderID"]
+ properties["KernelVersion"] = nodeInfo["kernelVersion"]
+ properties["OSImage"] = nodeInfo["osImage"]
+
+ capacityInfo = items["status"]["capacity"]
+ ApplicationInsightsUtility.sendMetricTelemetry("NodeMemory", capacityInfo["memory"], properties)
+
+ #telemetry about prometheus metric collections settings for replicaset
+ if (File.file?(@@promConfigMountPath))
+ properties["rsPromInt"] = @@rsPromInterval
+ properties["rsPromFPC"] = @@rsPromFieldPassCount
+ properties["rsPromFDC"] = @@rsPromFieldDropCount
+ properties["rsPromServ"] = @@rsPromK8sServiceCount
+ properties["rsPromUrl"] = @@rsPromUrlCount
+ properties["rsPromMonPods"] = @@rsPromMonitorPods
+ properties["rsPromMonPodsNs"] = @@rsPromMonitorPodsNamespaceLength
+ end
+ ApplicationInsightsUtility.sendMetricTelemetry("NodeCoreCapacity", capacityInfo["cpu"], properties)
+ telemetrySent = true
end
- @@istestvar = ENV["ISTEST"]
- if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0)
- $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
+ end
+ router.emit_stream(@tag, eventStream) if eventStream
+ router.emit_stream(@@MDMKubeNodeInventoryTag, eventStream) if eventStream
+ router.emit_stream(@@ContainerNodeInventoryTag, containerNodeInventoryEventStream) if containerNodeInventoryEventStream
+ if telemetrySent == true
+ @@nodeTelemetryTimeTracker = DateTime.now.to_time.to_i
+ end
+ @@istestvar = ENV["ISTEST"]
+ if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0)
+ $log.info("kubeNodeInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
+ end
+ #:optimize:kubeperf merge
+ begin
+ #if(!nodeInventory.empty?)
+ nodeMetricDataItems = []
+ #allocatable metrics @ node level
+ nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "cpu", "cpuAllocatableNanoCores", batchTime))
+ nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "memory", "memoryAllocatableBytes", batchTime))
+ #capacity metrics @ node level
+ nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores", batchTime))
+ nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes", batchTime))
+
+ kubePerfEventStream = MultiEventStream.new
+
+ nodeMetricDataItems.each do |record|
+ record["DataType"] = "LINUX_PERF_BLOB"
+ record["IPName"] = "LogManagement"
+ kubePerfEventStream.add(emitTime, record) if record
end
+ #end
+ router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream
+ rescue => errorStr
+ $log.warn "Failed in enumerate for KubePerf from in_kube_nodes : #{errorStr}"
+ $log.debug_backtrace(errorStr.backtrace)
+ ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
+ #:optimize:end kubeperf merge
+
rescue => errorStr
$log.warn "Failed to retrieve node inventory: #{errorStr}"
$log.debug_backtrace(errorStr.backtrace)
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
+ $log.warn "in_kube_nodes::parse_and_emit_records:End #{Time.now.utc.iso8601}"
end
def run_periodic
@mutex.lock
done = @finished
+ @nextTimeToRun = Time.now
+ @waitTimeout = @run_interval
until done
- @condition.wait(@mutex, @run_interval)
+ @nextTimeToRun = @nextTimeToRun + @run_interval
+ @now = Time.now
+ if @nextTimeToRun <= @now
+ @waitTimeout = 1
+ @nextTimeToRun = @now
+ else
+ @waitTimeout = @nextTimeToRun - @now
+ end
+ @condition.wait(@mutex, @waitTimeout)
done = @finished
@mutex.unlock
if !done
begin
- $log.info("in_kube_nodes::run_periodic @ #{Time.now.utc.iso8601}")
+ $log.info("in_kube_nodes::run_periodic.enumerate.start #{Time.now.utc.iso8601}")
enumerate
+ $log.info("in_kube_nodes::run_periodic.enumerate.end #{Time.now.utc.iso8601}")
rescue => errorStr
$log.warn "in_kube_nodes::run_periodic: enumerate Failed to retrieve node inventory: #{errorStr}"
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
diff --git a/source/code/plugin/in_kube_perf.rb b/source/code/plugin/in_kube_perf.rb
deleted file mode 100644
index 8b571139d..000000000
--- a/source/code/plugin/in_kube_perf.rb
+++ /dev/null
@@ -1,120 +0,0 @@
-#!/usr/local/bin/ruby
-# frozen_string_literal: true
-
-module Fluent
-
- class Kube_Perf_Input < Input
- Plugin.register_input('kubeperf', self)
-
- def initialize
- super
- require 'yaml'
- require 'json'
-
- require_relative 'KubernetesApiClient'
- require_relative 'oms_common'
- require_relative 'omslog'
- end
-
- config_param :run_interval, :time, :default => '1m'
- config_param :tag, :string, :default => "oms.api.KubePerf"
-
- def configure (conf)
- super
- end
-
- def start
- if @run_interval
- @finished = false
- @condition = ConditionVariable.new
- @mutex = Mutex.new
- @thread = Thread.new(&method(:run_periodic))
- end
- end
-
- def shutdown
- if @run_interval
- @mutex.synchronize {
- @finished = true
- @condition.signal
- }
- @thread.join
- end
- end
-
- def enumerate()
- time = Time.now.to_f
- begin
- eventStream = MultiEventStream.new
-
- $log.info("in_kube_perf::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}")
- #get resource requests & resource limits per container as perf data
- podInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('pods').body)
- $log.info("in_kube_perf::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}")
- if(!podInventory.empty?)
- containerMetricDataItems = []
- hostName = (OMS::Common.get_hostname)
- containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "cpu","cpuRequestNanoCores"))
- containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "memory","memoryRequestBytes"))
- containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "cpu","cpuLimitNanoCores"))
- containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "memory","memoryLimitBytes"))
-
- containerMetricDataItems.each do |record|
- record['DataType'] = "LINUX_PERF_BLOB"
- record['IPName'] = "LogManagement"
- eventStream.add(time, record) if record
- #router.emit(@tag, time, record) if record
- end
- end
-
- #get allocatable limits per node as perf data
- # Node capacity is different from node allocatable. Allocatable is what is avaialble for allocating pods.
- # In theory Capacity = Allocatable + kube-reserved + system-reserved + eviction-threshold
- # For more details refer to https://kubernetes.io/docs/tasks/administer-cluster/reserve-compute-resources/#node-allocatable
- $log.info("in_kube_perf::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}")
- nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo('nodes').body)
- $log.info("in_kube_perf::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}")
- if(!nodeInventory.empty?)
- nodeMetricDataItems = []
- #allocatable metrics @ node level
- nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "cpu", "cpuAllocatableNanoCores"))
- nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "allocatable", "memory", "memoryAllocatableBytes"))
- #capacity metrics @ node level
- nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "cpu", "cpuCapacityNanoCores"))
- nodeMetricDataItems.concat(KubernetesApiClient.parseNodeLimits(nodeInventory, "capacity", "memory", "memoryCapacityBytes"))
-
- nodeMetricDataItems.each do |record|
- record['DataType'] = "LINUX_PERF_BLOB"
- record['IPName'] = "LogManagement"
- eventStream.add(time, record) if record
- #router.emit(@tag, time, record) if record
- end
- end
- router.emit_stream(@tag, eventStream) if eventStream
- rescue => errorStr
- $log.warn "Failed to retrieve metric data: #{errorStr}"
- $log.debug_backtrace(errorStr.backtrace)
- end
- end
-
- def run_periodic
- @mutex.lock
- done = @finished
- until done
- @condition.wait(@mutex, @run_interval)
- done = @finished
- @mutex.unlock
- if !done
- begin
- $log.info("in_kube_perf::run_periodic @ #{Time.now.utc.iso8601}")
- enumerate
- rescue => errorStr
- $log.warn "in_kube_perf::run_periodic: enumerate Failed to retrieve kube perf metrics: #{errorStr}"
- end
- end
- @mutex.lock
- end
- @mutex.unlock
- end
- end # Kube_Perf_Input
-end # module
diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb
index 1dd029b22..28b20bfc0 100644
--- a/source/code/plugin/in_kube_podinventory.rb
+++ b/source/code/plugin/in_kube_podinventory.rb
@@ -7,20 +7,30 @@ class Kube_PodInventory_Input < Input
@@MDMKubePodInventoryTag = "mdm.kubepodinventory"
@@hostName = (OMS::Common.get_hostname)
+ @@kubeperfTag = "oms.api.KubePerf"
+ @@kubeservicesTag = "oms.containerinsights.KubeServices"
def initialize
super
require "yaml"
- require "json"
+ require "yajl/json_gem"
+ require "yajl"
require "set"
+ require "time"
require_relative "KubernetesApiClient"
require_relative "ApplicationInsightsUtility"
require_relative "oms_common"
require_relative "omslog"
+
+ @PODS_CHUNK_SIZE = "1500"
+ @podCount = 0
+ @controllerSet = Set.new []
+ @winContainerCount = 0
+ @controllerData = {}
end
- config_param :run_interval, :time, :default => "1m"
+ config_param :run_interval, :time, :default => 60
config_param :tag, :string, :default => "oms.containerinsights.KubePodInventory"
def configure(conf)
@@ -48,33 +58,77 @@ def shutdown
end
def enumerate(podList = nil)
- podInventory = podList
- $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}")
- podInfo = KubernetesApiClient.getKubeResourceInfo("pods")
- $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}")
+ begin
+ podInventory = podList
+ telemetryFlush = false
+ @podCount = 0
+ @controllerSet = Set.new []
+ @winContainerCount = 0
+ @controllerData = {}
+ currentTime = Time.now
+ batchTime = currentTime.utc.iso8601
- if !podInfo.nil?
- podInventory = JSON.parse(podInfo.body)
- end
+ # Get services first so that we dont need to make a call for very chunk
+ $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}")
+ serviceInfo = KubernetesApiClient.getKubeResourceInfo("services")
+ # serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body)
+ $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}")
- begin
- if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?)
- #get pod inventory & services
- $log.info("in_kube_podinventory::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}")
- serviceList = nil
- serviceInfo = KubernetesApiClient.getKubeResourceInfo("services")
-
- if !serviceInfo.nil?
- serviceList = JSON.parse(serviceInfo.body)
- end
-
- $log.info("in_kube_podinventory::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}")
- parse_and_emit_records(podInventory, serviceList)
+ if !serviceInfo.nil?
+ $log.info("in_kube_podinventory::enumerate:Start:Parsing services data using yajl @ #{Time.now.utc.iso8601}")
+ serviceList = Yajl::Parser.parse(StringIO.new(serviceInfo.body))
+ $log.info("in_kube_podinventory::enumerate:End:Parsing services data using yajl @ #{Time.now.utc.iso8601}")
+ serviceInfo = nil
+ end
+
+ # Initializing continuation token to nil
+ continuationToken = nil
+ $log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}")
+ continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}")
+ $log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}")
+ if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?)
+ parse_and_emit_records(podInventory, serviceList, batchTime)
else
- $log.warn "Received empty podInventory"
+ $log.warn "in_kube_podinventory::enumerate:Received empty podInventory"
+ end
+
+ #If we receive a continuation token, make calls, process and flush data until we have processed all data
+ while (!continuationToken.nil? && !continuationToken.empty?)
+ continuationToken, podInventory = KubernetesApiClient.getResourcesAndContinuationToken("pods?limit=#{@PODS_CHUNK_SIZE}&continue=#{continuationToken}")
+ if (!podInventory.nil? && !podInventory.empty? && podInventory.key?("items") && !podInventory["items"].nil? && !podInventory["items"].empty?)
+ parse_and_emit_records(podInventory, serviceList, batchTime)
+ else
+ $log.warn "in_kube_podinventory::enumerate:Received empty podInventory"
+ end
+ end
+
+ # Setting these to nil so that we dont hold memory until GC kicks in
+ podInventory = nil
+ serviceList = nil
+
+ # Adding telemetry to send pod telemetry every 5 minutes
+ timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs
+ timeDifferenceInMinutes = timeDifference / 60
+ if (timeDifferenceInMinutes >= 5)
+ telemetryFlush = true
+ end
+
+ # Flush AppInsights telemetry once all the processing is done
+ if telemetryFlush == true
+ telemetryProperties = {}
+ telemetryProperties["Computer"] = @@hostName
+ ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties)
+ ApplicationInsightsUtility.sendMetricTelemetry("PodCount", @podCount, {})
+ telemetryProperties["ControllerData"] = @controllerData.to_json
+ ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", @controllerSet.length, telemetryProperties)
+ if @winContainerCount > 0
+ telemetryProperties["ClusterWideWindowsContainersCount"] = @winContainerCount
+ ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties)
+ end
+ @@podTelemetryTimeTracker = DateTime.now.to_time.to_i
end
rescue => errorStr
- $log.warn "Failed in enumerate pod inventory: #{errorStr}"
+ $log.warn "in_kube_podinventory::enumerate:Failed in enumerate: #{errorStr}"
$log.debug_backtrace(errorStr.backtrace)
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
@@ -192,15 +246,12 @@ def getContainerEnvironmentVariables(pod, clusterCollectEnvironmentVar)
end
end
- def parse_and_emit_records(podInventory, serviceList)
+ def parse_and_emit_records(podInventory, serviceList, batchTime = Time.utc.iso8601)
currentTime = Time.now
emitTime = currentTime.to_f
- batchTime = currentTime.utc.iso8601
+ #batchTime = currentTime.utc.iso8601
eventStream = MultiEventStream.new
- controllerSet = Set.new []
- controllerData = {}
- telemetryFlush = false
- winContainerCount = 0
+
begin #begin block start
# Getting windows nodes from kubeapi
winNodes = KubernetesApiClient.getWindowsNodesArray
@@ -283,24 +334,17 @@ def parse_and_emit_records(podInventory, serviceList)
record["ClusterId"] = KubernetesApiClient.getClusterId
record["ClusterName"] = KubernetesApiClient.getClusterName
record["ServiceName"] = getServiceNameFromLabels(items["metadata"]["namespace"], items["metadata"]["labels"], serviceList)
- # Adding telemetry to send pod telemetry every 5 minutes
- timeDifference = (DateTime.now.to_time.to_i - @@podTelemetryTimeTracker).abs
- timeDifferenceInMinutes = timeDifference / 60
- if (timeDifferenceInMinutes >= 5)
- telemetryFlush = true
- end
+
if !items["metadata"]["ownerReferences"].nil?
record["ControllerKind"] = items["metadata"]["ownerReferences"][0]["kind"]
record["ControllerName"] = items["metadata"]["ownerReferences"][0]["name"]
- if telemetryFlush == true
- controllerSet.add(record["ControllerKind"] + record["ControllerName"])
- #Adding controller kind to telemetry ro information about customer workload
- if (controllerData[record["ControllerKind"]].nil?)
- controllerData[record["ControllerKind"]] = 1
- else
- controllerValue = controllerData[record["ControllerKind"]]
- controllerData[record["ControllerKind"]] += 1
- end
+ @controllerSet.add(record["ControllerKind"] + record["ControllerName"])
+ #Adding controller kind to telemetry ro information about customer workload
+ if (@controllerData[record["ControllerKind"]].nil?)
+ @controllerData[record["ControllerKind"]] = 1
+ else
+ controllerValue = @controllerData[record["ControllerKind"]]
+ @controllerData[record["ControllerKind"]] += 1
end
end
podRestartCount = 0
@@ -418,7 +462,7 @@ def parse_and_emit_records(podInventory, serviceList)
end
end
# Send container inventory records for containers on windows nodes
- winContainerCount += containerInventoryRecords.length
+ @winContainerCount += containerInventoryRecords.length
containerInventoryRecords.each do |cirecord|
if !cirecord.nil?
ciwrapper = {
@@ -433,19 +477,66 @@ def parse_and_emit_records(podInventory, serviceList)
router.emit_stream(@tag, eventStream) if eventStream
router.emit_stream(@@MDMKubePodInventoryTag, eventStream) if eventStream
- if telemetryFlush == true
- telemetryProperties = {}
- telemetryProperties["Computer"] = @@hostName
- ApplicationInsightsUtility.sendCustomEvent("KubePodInventoryHeartBeatEvent", telemetryProperties)
- ApplicationInsightsUtility.sendMetricTelemetry("PodCount", podInventory["items"].length, {})
- telemetryProperties["ControllerData"] = controllerData.to_json
- ApplicationInsightsUtility.sendMetricTelemetry("ControllerCount", controllerSet.length, telemetryProperties)
- if winContainerCount > 0
- telemetryProperties["ClusterWideWindowsContainersCount"] = winContainerCount
- ApplicationInsightsUtility.sendCustomEvent("WindowsContainerInventoryEvent", telemetryProperties)
+ #:optimize:kubeperf merge
+ begin
+ #if(!podInventory.empty?)
+ containerMetricDataItems = []
+ #hostName = (OMS::Common.get_hostname)
+ containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "cpu", "cpuRequestNanoCores", batchTime))
+ containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "requests", "memory", "memoryRequestBytes", batchTime))
+ containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "cpu", "cpuLimitNanoCores", batchTime))
+ containerMetricDataItems.concat(KubernetesApiClient.getContainerResourceRequestsAndLimits(podInventory, "limits", "memory", "memoryLimitBytes", batchTime))
+
+ kubePerfEventStream = MultiEventStream.new
+
+ containerMetricDataItems.each do |record|
+ record["DataType"] = "LINUX_PERF_BLOB"
+ record["IPName"] = "LogManagement"
+ kubePerfEventStream.add(emitTime, record) if record
end
- @@podTelemetryTimeTracker = DateTime.now.to_time.to_i
+ #end
+ router.emit_stream(@@kubeperfTag, kubePerfEventStream) if kubePerfEventStream
+ rescue => errorStr
+ $log.warn "Failed in parse_and_emit_record for KubePerf from in_kube_podinventory : #{errorStr}"
+ $log.debug_backtrace(errorStr.backtrace)
+ ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
end
+ #:optimize:end kubeperf merge
+
+ #:optimize:start kubeservices merge
+ begin
+ if (!serviceList.nil? && !serviceList.empty?)
+ kubeServicesEventStream = MultiEventStream.new
+ serviceList["items"].each do |items|
+ kubeServiceRecord = {}
+ kubeServiceRecord["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated
+ kubeServiceRecord["ServiceName"] = items["metadata"]["name"]
+ kubeServiceRecord["Namespace"] = items["metadata"]["namespace"]
+ kubeServiceRecord["SelectorLabels"] = [items["spec"]["selector"]]
+ kubeServiceRecord["ClusterId"] = KubernetesApiClient.getClusterId
+ kubeServiceRecord["ClusterName"] = KubernetesApiClient.getClusterName
+ kubeServiceRecord["ClusterIP"] = items["spec"]["clusterIP"]
+ kubeServiceRecord["ServiceType"] = items["spec"]["type"]
+ # : Add ports and status fields
+ kubeServicewrapper = {
+ "DataType" => "KUBE_SERVICES_BLOB",
+ "IPName" => "ContainerInsights",
+ "DataItems" => [kubeServiceRecord.each { |k, v| kubeServiceRecord[k] = v }],
+ }
+ kubeServicesEventStream.add(emitTime, kubeServicewrapper) if kubeServicewrapper
+ end
+ router.emit_stream(@@kubeservicesTag, kubeServicesEventStream) if kubeServicesEventStream
+ end
+ rescue => errorStr
+ $log.warn "Failed in parse_and_emit_record for KubeServices from in_kube_podinventory : #{errorStr}"
+ $log.debug_backtrace(errorStr.backtrace)
+ ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
+ end
+ #:optimize:end kubeservices merge
+
+ #Updating value for AppInsights telemetry
+ @podCount += podInventory["items"].length
+
@@istestvar = ENV["ISTEST"]
if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp("true") == 0 && eventStream.count > 0)
$log.info("kubePodInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}")
@@ -460,14 +551,25 @@ def parse_and_emit_records(podInventory, serviceList)
def run_periodic
@mutex.lock
done = @finished
+ @nextTimeToRun = Time.now
+ @waitTimeout = @run_interval
until done
- @condition.wait(@mutex, @run_interval)
+ @nextTimeToRun = @nextTimeToRun + @run_interval
+ @now = Time.now
+ if @nextTimeToRun <= @now
+ @waitTimeout = 1
+ @nextTimeToRun = @now
+ else
+ @waitTimeout = @nextTimeToRun - @now
+ end
+ @condition.wait(@mutex, @waitTimeout)
done = @finished
@mutex.unlock
if !done
begin
- $log.info("in_kube_podinventory::run_periodic @ #{Time.now.utc.iso8601}")
+ $log.info("in_kube_podinventory::run_periodic.enumerate.start #{Time.now.utc.iso8601}")
enumerate
+ $log.info("in_kube_podinventory::run_periodic.enumerate.end #{Time.now.utc.iso8601}")
rescue => errorStr
$log.warn "in_kube_podinventory::run_periodic: enumerate Failed to retrieve pod inventory: #{errorStr}"
ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
diff --git a/source/code/plugin/in_kube_services.rb b/source/code/plugin/in_kube_services.rb
deleted file mode 100644
index 7cd703620..000000000
--- a/source/code/plugin/in_kube_services.rb
+++ /dev/null
@@ -1,110 +0,0 @@
-#!/usr/local/bin/ruby
-# frozen_string_literal: true
-
-module Fluent
- class Kube_Services_Input < Input
- Plugin.register_input("kubeservices", self)
-
- def initialize
- super
- require "yaml"
- require "json"
-
- require_relative "KubernetesApiClient"
- require_relative "oms_common"
- require_relative "omslog"
- require_relative "ApplicationInsightsUtility"
- end
-
- config_param :run_interval, :time, :default => "1m"
- config_param :tag, :string, :default => "oms.containerinsights.KubeServices"
-
- def configure(conf)
- super
- end
-
- def start
- if @run_interval
- @finished = false
- @condition = ConditionVariable.new
- @mutex = Mutex.new
- @thread = Thread.new(&method(:run_periodic))
- end
- end
-
- def shutdown
- if @run_interval
- @mutex.synchronize {
- @finished = true
- @condition.signal
- }
- @thread.join
- end
- end
-
- def enumerate
- currentTime = Time.now
- emitTime = currentTime.to_f
- batchTime = currentTime.utc.iso8601
-
- serviceList = nil
-
- $log.info("in_kube_services::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}")
- serviceInfo = KubernetesApiClient.getKubeResourceInfo("services")
- $log.info("in_kube_services::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}")
-
- if !serviceInfo.nil?
- serviceList = JSON.parse(serviceInfo.body)
- end
-
- begin
- if (!serviceList.nil? && !serviceList.empty?)
- eventStream = MultiEventStream.new
- serviceList["items"].each do |items|
- record = {}
- record["CollectionTime"] = batchTime #This is the time that is mapped to become TimeGenerated
- record["ServiceName"] = items["metadata"]["name"]
- record["Namespace"] = items["metadata"]["namespace"]
- record["SelectorLabels"] = [items["spec"]["selector"]]
- record["ClusterId"] = KubernetesApiClient.getClusterId
- record["ClusterName"] = KubernetesApiClient.getClusterName
- record["ClusterIP"] = items["spec"]["clusterIP"]
- record["ServiceType"] = items["spec"]["type"]
- # : Add ports and status fields
- wrapper = {
- "DataType" => "KUBE_SERVICES_BLOB",
- "IPName" => "ContainerInsights",
- "DataItems" => [record.each { |k, v| record[k] = v }],
- }
- eventStream.add(emitTime, wrapper) if wrapper
- end
- router.emit_stream(@tag, eventStream) if eventStream
- end
- rescue => errorStr
- $log.debug_backtrace(errorStr.backtrace)
- ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
- end
- end
-
- def run_periodic
- @mutex.lock
- done = @finished
- until done
- @condition.wait(@mutex, @run_interval)
- done = @finished
- @mutex.unlock
- if !done
- begin
- $log.info("in_kube_services::run_periodic @ #{Time.now.utc.iso8601}")
- enumerate
- rescue => errorStr
- $log.warn "in_kube_services::run_periodic: enumerate Failed to kube services: #{errorStr}"
- ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
- end
- end
- @mutex.lock
- end
- @mutex.unlock
- end
- end # Kube_Services_Input
-end # module
diff --git a/source/code/plugin/in_win_cadvisor_perf.rb b/source/code/plugin/in_win_cadvisor_perf.rb
index 2e5f839e6..695a686cf 100644
--- a/source/code/plugin/in_win_cadvisor_perf.rb
+++ b/source/code/plugin/in_win_cadvisor_perf.rb
@@ -10,7 +10,8 @@ class Win_CAdvisor_Perf_Input < Input
def initialize
super
require "yaml"
- require "json"
+ require 'yajl/json_gem'
+ require "time"
require_relative "CAdvisorMetricsAPIClient"
require_relative "KubernetesApiClient"
@@ -18,7 +19,7 @@ def initialize
require_relative "omslog"
end
- config_param :run_interval, :time, :default => "1m"
+ config_param :run_interval, :time, :default => 60
config_param :tag, :string, :default => "oms.api.wincadvisorperf"
config_param :mdmtag, :string, :default => "mdm.cadvisorperf"
@@ -60,13 +61,13 @@ def enumerate()
$log.info "in_win_cadvisor_perf: Getting windows nodes"
nodes = KubernetesApiClient.getWindowsNodes()
if !nodes.nil?
- @@winNodes = KubernetesApiClient.getWindowsNodes()
+ @@winNodes = nodes
end
$log.info "in_win_cadvisor_perf : Successuly got windows nodes after 5 minute interval"
@@winNodeQueryTimeTracker = DateTime.now.to_time.to_i
end
@@winNodes.each do |winNode|
- metricData = CAdvisorMetricsAPIClient.getMetrics(winNode)
+ metricData = CAdvisorMetricsAPIClient.getMetrics(winNode: winNode, metricTime: Time.now.utc.iso8601)
metricData.each do |record|
if !record.empty?
record["DataType"] = "LINUX_PERF_BLOB"
@@ -100,14 +101,25 @@ def enumerate()
def run_periodic
@mutex.lock
done = @finished
+ @nextTimeToRun = Time.now
+ @waitTimeout = @run_interval
until done
- @condition.wait(@mutex, @run_interval)
+ @nextTimeToRun = @nextTimeToRun + @run_interval
+ @now = Time.now
+ if @nextTimeToRun <= @now
+ @waitTimeout = 1
+ @nextTimeToRun = @now
+ else
+ @waitTimeout = @nextTimeToRun - @now
+ end
+ @condition.wait(@mutex, @waitTimeout)
done = @finished
@mutex.unlock
if !done
begin
- $log.info("in_win_cadvisor_perf::run_periodic @ #{Time.now.utc.iso8601}")
+ $log.info("in_win_cadvisor_perf::run_periodic.enumerate.start @ #{Time.now.utc.iso8601}")
enumerate
+ $log.info("in_win_cadvisor_perf::run_periodic.enumerate.end @ #{Time.now.utc.iso8601}")
rescue => errorStr
$log.warn "in_win_cadvisor_perf::run_periodic: enumerate Failed to retrieve cadvisor perf metrics for windows nodes: #{errorStr}"
end
diff --git a/source/code/plugin/lib/application_insights/channel/contracts/json_serializable.rb b/source/code/plugin/lib/application_insights/channel/contracts/json_serializable.rb
index 8f4677044..60838e215 100644
--- a/source/code/plugin/lib/application_insights/channel/contracts/json_serializable.rb
+++ b/source/code/plugin/lib/application_insights/channel/contracts/json_serializable.rb
@@ -1,4 +1,4 @@
-require 'json'
+require 'yajl/json_gem'
module ApplicationInsights
module Channel
diff --git a/source/code/plugin/lib/application_insights/channel/sender_base.rb b/source/code/plugin/lib/application_insights/channel/sender_base.rb
index 2431bf748..004b4722f 100644
--- a/source/code/plugin/lib/application_insights/channel/sender_base.rb
+++ b/source/code/plugin/lib/application_insights/channel/sender_base.rb
@@ -1,4 +1,4 @@
-require 'json'
+require 'yajl/json_gem'
require 'net/http'
require 'openssl'
require 'stringio'
diff --git a/source/code/plugin/out_mdm.rb b/source/code/plugin/out_mdm.rb
index b8d10090d..0a4e601b2 100644
--- a/source/code/plugin/out_mdm.rb
+++ b/source/code/plugin/out_mdm.rb
@@ -12,7 +12,7 @@ def initialize
require "net/http"
require "net/https"
require "uri"
- require "json"
+ require 'yajl/json_gem'
require_relative "KubernetesApiClient"
require_relative "ApplicationInsightsUtility"