diff --git a/build/linux/installer/conf/kube.conf b/build/linux/installer/conf/kube.conf
index dbb4db0da..b3055a0b9 100644
--- a/build/linux/installer/conf/kube.conf
+++ b/build/linux/installer/conf/kube.conf
@@ -16,6 +16,14 @@
custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast,eastus2,westus,australiasoutheast,brazilsouth,germanywestcentral,northcentralus,switzerlandnorth
+ #Kubernetes Persistent Volume inventory
+
+ type kubepvinventory
+ tag oms.containerinsights.KubePVInventory
+ run_interval 60
+ log_level debug
+
+
#Kubernetes events
type kubeevents
@@ -98,6 +106,21 @@
max_retry_wait 5m
+
+ type out_oms
+ log_level debug
+ num_threads 5
+ buffer_chunk_limit 4m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/state/out_oms_kubepv*.buffer
+ buffer_queue_limit 20
+ buffer_queue_full_action drop_oldest_chunk
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 5s
+ max_retry_wait 5m
+
+
type out_oms
log_level debug
diff --git a/build/linux/installer/datafiles/base_container.data b/build/linux/installer/datafiles/base_container.data
index ca2538b79..ec42d5967 100644
--- a/build/linux/installer/datafiles/base_container.data
+++ b/build/linux/installer/datafiles/base_container.data
@@ -22,6 +22,7 @@ MAINTAINER: 'Microsoft Corporation'
/opt/microsoft/omsagent/plugin/filter_container.rb; source/plugins/ruby/filter_container.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_kube_podinventory.rb; source/plugins/ruby/in_kube_podinventory.rb; 644; root; root
+/opt/microsoft/omsagent/plugin/in_kube_pvinventory.rb; source/plugins/ruby/in_kube_pvinventory.rb; 644; root; root
/opt/microsoft/omsagent/plugin/in_kube_events.rb; source/plugins/ruby/in_kube_events.rb; 644; root; root
/opt/microsoft/omsagent/plugin/KubernetesApiClient.rb; source/plugins/ruby/KubernetesApiClient.rb; 644; root; root
diff --git a/kubernetes/omsagent.yaml b/kubernetes/omsagent.yaml
index e8352e020..6a7466f44 100644
--- a/kubernetes/omsagent.yaml
+++ b/kubernetes/omsagent.yaml
@@ -21,6 +21,7 @@ rules:
"nodes/proxy",
"namespaces",
"services",
+ "persistentvolumes"
]
verbs: ["list", "get", "watch"]
- apiGroups: ["apps", "extensions", "autoscaling"]
@@ -67,6 +68,14 @@ data:
custom_metrics_azure_regions eastus,southcentralus,westcentralus,westus2,southeastasia,northeurope,westeurope,southafricanorth,centralus,northcentralus,eastus2,koreacentral,eastasia,centralindia,uksouth,canadacentral,francecentral,japaneast,australiaeast,eastus2,westus,australiasoutheast,brazilsouth,germanywestcentral,northcentralus,switzerlandnorth
+ #Kubernetes Persistent Volume inventory
+
+ type kubepvinventory
+ tag oms.containerinsights.KubePVInventory
+ run_interval 60
+ log_level debug
+
+
#Kubernetes events
type kubeevents
@@ -149,6 +158,21 @@ data:
max_retry_wait 5m
+
+ type out_oms
+ log_level debug
+ num_threads 5
+ buffer_chunk_limit 4m
+ buffer_type file
+ buffer_path %STATE_DIR_WS%/state/out_oms_kubepv*.buffer
+ buffer_queue_limit 20
+ buffer_queue_full_action drop_oldest_chunk
+ flush_interval 20s
+ retry_limit 10
+ retry_wait 5s
+ max_retry_wait 5m
+
+
type out_oms
log_level debug
diff --git a/source/plugins/ruby/constants.rb b/source/plugins/ruby/constants.rb
index 35e5f9334..375d1a029 100644
--- a/source/plugins/ruby/constants.rb
+++ b/source/plugins/ruby/constants.rb
@@ -77,6 +77,9 @@ class Constants
OMSAGENT_ZERO_FILL = "omsagent"
KUBESYSTEM_NAMESPACE_ZERO_FILL = "kube-system"
VOLUME_NAME_ZERO_FILL = "-"
+ PV_TYPES =["awsElasticBlockStore", "azureDisk", "azureFile", "cephfs", "cinder", "csi", "fc", "flexVolume",
+ "flocker", "gcePersistentDisk", "glusterfs", "hostPath", "iscsi", "local", "nfs",
+ "photonPersistentDisk", "portworxVolume", "quobyte", "rbd", "scaleIO", "storageos", "vsphereVolume"]
#Telemetry constants
CONTAINER_METRICS_HEART_BEAT_EVENT = "ContainerMetricsMdmHeartBeatEvent"
@@ -84,6 +87,7 @@ class Constants
CONTAINER_RESOURCE_UTIL_HEART_BEAT_EVENT = "ContainerResourceUtilMdmHeartBeatEvent"
PV_USAGE_HEART_BEAT_EVENT = "PVUsageMdmHeartBeatEvent"
PV_KUBE_SYSTEM_METRICS_ENABLED_EVENT = "CollectPVKubeSystemMetricsEnabled"
+ PV_INVENTORY_HEART_BEAT_EVENT = "KubePVInventoryHeartBeatEvent"
TELEMETRY_FLUSH_INTERVAL_IN_MINUTES = 10
KUBE_STATE_TELEMETRY_FLUSH_INTERVAL_IN_MINUTES = 15
ZERO_FILL_METRICS_INTERVAL_IN_MINUTES = 30
diff --git a/source/plugins/ruby/in_kube_pvinventory.rb b/source/plugins/ruby/in_kube_pvinventory.rb
new file mode 100644
index 000000000..b0e09c85b
--- /dev/null
+++ b/source/plugins/ruby/in_kube_pvinventory.rb
@@ -0,0 +1,253 @@
+module Fluent
+ class Kube_PVInventory_Input < Input
+ Plugin.register_input("kubepvinventory", self)
+
+ @@hostName = (OMS::Common.get_hostname)
+
+ def initialize
+ super
+ require "yaml"
+ require "yajl/json_gem"
+ require "yajl"
+ require "time"
+ require_relative "KubernetesApiClient"
+ require_relative "ApplicationInsightsUtility"
+ require_relative "oms_common"
+ require_relative "omslog"
+ require_relative "constants"
+
+ # Response size is around 1500 bytes per PV
+ @PV_CHUNK_SIZE = "5000"
+ @pvTypeToCountHash = {}
+ end
+
+ config_param :run_interval, :time, :default => 60
+ config_param :tag, :string, :default => "oms.containerinsights.KubePVInventory"
+
+ def configure(conf)
+ super
+ end
+
+ def start
+ if @run_interval
+ @finished = false
+ @condition = ConditionVariable.new
+ @mutex = Mutex.new
+ @thread = Thread.new(&method(:run_periodic))
+ @@pvTelemetryTimeTracker = DateTime.now.to_time.to_i
+ end
+ end
+
+ def shutdown
+ if @run_interval
+ @mutex.synchronize {
+ @finished = true
+ @condition.signal
+ }
+ @thread.join
+ end
+ end
+
+ def enumerate
+ begin
+ pvInventory = nil
+ telemetryFlush = false
+ @pvTypeToCountHash = {}
+ currentTime = Time.now
+ batchTime = currentTime.utc.iso8601
+
+ continuationToken = nil
+ $log.info("in_kube_pvinventory::enumerate : Getting PVs from Kube API @ #{Time.now.utc.iso8601}")
+ continuationToken, pvInventory = KubernetesApiClient.getResourcesAndContinuationToken("persistentvolumes?limit=#{@PV_CHUNK_SIZE}")
+ $log.info("in_kube_pvinventory::enumerate : Done getting PVs from Kube API @ #{Time.now.utc.iso8601}")
+
+ if (!pvInventory.nil? && !pvInventory.empty? && pvInventory.key?("items") && !pvInventory["items"].nil? && !pvInventory["items"].empty?)
+ parse_and_emit_records(pvInventory, batchTime)
+ else
+ $log.warn "in_kube_pvinventory::enumerate:Received empty pvInventory"
+ end
+
+ # If we receive a continuation token, make calls, process and flush data until we have processed all data
+ while (!continuationToken.nil? && !continuationToken.empty?)
+ continuationToken, pvInventory = KubernetesApiClient.getResourcesAndContinuationToken("persistentvolumes?limit=#{@PV_CHUNK_SIZE}&continue=#{continuationToken}")
+ if (!pvInventory.nil? && !pvInventory.empty? && pvInventory.key?("items") && !pvInventory["items"].nil? && !pvInventory["items"].empty?)
+ parse_and_emit_records(pvInventory, batchTime)
+ else
+ $log.warn "in_kube_pvinventory::enumerate:Received empty pvInventory"
+ end
+ end
+
+ # Setting this to nil so that we dont hold memory until GC kicks in
+ pvInventory = nil
+
+ # Adding telemetry to send pod telemetry every 10 minutes
+ timeDifference = (DateTime.now.to_time.to_i - @@pvTelemetryTimeTracker).abs
+ timeDifferenceInMinutes = timeDifference / 60
+ if (timeDifferenceInMinutes >= Constants::TELEMETRY_FLUSH_INTERVAL_IN_MINUTES)
+ telemetryFlush = true
+ end
+
+ # Flush AppInsights telemetry once all the processing is done
+ if telemetryFlush == true
+ telemetryProperties = {}
+ telemetryProperties["CountsOfPVTypes"] = @pvTypeToCountHash
+ ApplicationInsightsUtility.sendCustomEvent(Constants::PV_INVENTORY_HEART_BEAT_EVENT, telemetryProperties)
+ @@pvTelemetryTimeTracker = DateTime.now.to_time.to_i
+ end
+
+ rescue => errorStr
+ $log.warn "in_kube_pvinventory::enumerate:Failed in enumerate: #{errorStr}"
+ $log.debug_backtrace(errorStr.backtrace)
+ ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
+ end
+ end # end enumerate
+
+ def parse_and_emit_records(pvInventory, batchTime = Time.utc.iso8601)
+ currentTime = Time.now
+ emitTime = currentTime.to_f
+ eventStream = MultiEventStream.new
+
+ begin
+ records = []
+ pvInventory["items"].each do |item|
+
+ # Node, pod, & usage info can be found by joining with pvUsedBytes metric using PVCNamespace/PVCName
+ record = {}
+ record["CollectionTime"] = batchTime
+ record["ClusterId"] = KubernetesApiClient.getClusterId
+ record["ClusterName"] = KubernetesApiClient.getClusterName
+ record["PVName"] = item["metadata"]["name"]
+ record["PVStatus"] = item["status"]["phase"]
+ record["PVAccessModes"] = item["spec"]["accessModes"].join(', ')
+ record["PVStorageClassName"] = item["spec"]["storageClassName"]
+ record["PVCapacityBytes"] = KubernetesApiClient.getMetricNumericValue("memory", item["spec"]["capacity"]["storage"])
+ record["PVCreationTimeStamp"] = item["metadata"]["creationTimestamp"]
+
+ # Optional values
+ pvcNamespace, pvcName = getPVCInfo(item)
+ type, typeInfo = getTypeInfo(item)
+ record["PVCNamespace"] = pvcNamespace
+ record["PVCName"] = pvcName
+ record["PVType"] = type
+ record["PVTypeInfo"] = typeInfo
+
+ records.push(record)
+
+ # Record telemetry
+ if type == nil
+ type = "empty"
+ end
+ if (@pvTypeToCountHash.has_key? type)
+ @pvTypeToCountHash[type] += 1
+ else
+ @pvTypeToCountHash[type] = 1
+ end
+ end
+
+ records.each do |record|
+ if !record.nil?
+ wrapper = {
+ "DataType" => "KUBE_PV_INVENTORY_BLOB",
+ "IPName" => "ContainerInsights",
+ "DataItems" => [record.each { |k, v| record[k] = v }],
+ }
+ eventStream.add(emitTime, wrapper) if wrapper
+ end
+ end
+
+ router.emit_stream(@tag, eventStream) if eventStream
+
+ rescue => errorStr
+ $log.warn "Failed in parse_and_emit_record for in_kube_pvinventory: #{errorStr}"
+ $log.debug_backtrace(errorStr.backtrace)
+ ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
+ end
+ end
+
+ def getPVCInfo(item)
+ begin
+ if !item["spec"].nil? && !item["spec"]["claimRef"].nil?
+ claimRef = item["spec"]["claimRef"]
+ pvcNamespace = claimRef["namespace"]
+ pvcName = claimRef["name"]
+ return pvcNamespace, pvcName
+ end
+ rescue => errorStr
+ $log.warn "Failed in getPVCInfo for in_kube_pvinventory: #{errorStr}"
+ $log.debug_backtrace(errorStr.backtrace)
+ ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
+ end
+
+ # No PVC or an error
+ return nil, nil
+ end
+
+ def getTypeInfo(item)
+ begin
+ if !item["spec"].nil?
+ (Constants::PV_TYPES).each do |pvType|
+
+ # PV is this type
+ if !item["spec"][pvType].nil?
+
+ # Get additional info if azure disk/file
+ typeInfo = {}
+ if pvType == "azureDisk"
+ azureDisk = item["spec"]["azureDisk"]
+ typeInfo["DiskName"] = azureDisk["diskName"]
+ typeInfo["DiskUri"] = azureDisk["diskURI"]
+ elsif pvType == "azureFile"
+ typeInfo["FileShareName"] = item["spec"]["azureFile"]["shareName"]
+ end
+
+ # Can only have one type: return right away when found
+ return pvType, typeInfo
+
+ end
+ end
+ end
+ rescue => errorStr
+ $log.warn "Failed in getTypeInfo for in_kube_pvinventory: #{errorStr}"
+ $log.debug_backtrace(errorStr.backtrace)
+ ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
+ end
+
+ # No matches from list of types or an error
+ return nil, {}
+ end
+
+
+ def run_periodic
+ @mutex.lock
+ done = @finished
+ @nextTimeToRun = Time.now
+ @waitTimeout = @run_interval
+ until done
+ @nextTimeToRun = @nextTimeToRun + @run_interval
+ @now = Time.now
+ if @nextTimeToRun <= @now
+ @waitTimeout = 1
+ @nextTimeToRun = @now
+ else
+ @waitTimeout = @nextTimeToRun - @now
+ end
+ @condition.wait(@mutex, @waitTimeout)
+ done = @finished
+ @mutex.unlock
+ if !done
+ begin
+ $log.info("in_kube_pvinventory::run_periodic.enumerate.start #{Time.now.utc.iso8601}")
+ enumerate
+ $log.info("in_kube_pvinventory::run_periodic.enumerate.end #{Time.now.utc.iso8601}")
+ rescue => errorStr
+ $log.warn "in_kube_pvinventory::run_periodic: enumerate Failed to retrieve pod inventory: #{errorStr}"
+ ApplicationInsightsUtility.sendExceptionTelemetry(errorStr)
+ end
+ end
+ @mutex.lock
+ end
+ @mutex.unlock
+ end
+
+ end # Kube_PVInventory_Input
+end # module
\ No newline at end of file