-
Notifications
You must be signed in to change notification settings - Fork 115
grwehner/pv inventory #455
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
62 commits
Select commit
Hold shift + click to select a range
04826d0
Add in pv metrics from cadvisor
e0fbdef
Merge branch 'ci_dev' into grwehner/pv
a459794
changed to send only pv usage & add kube-system toggle config
0ec8ef9
variable name fixes
fb8a214
Added kube-system config
0b2f9dc
mdm filter
1bad74f
add pv_used_bytes to mdm filter metrics conf
7068629
filter fixes
94348cd
more filter fixes
58230fd
end statement fix
f68c04a
log fixes
46c1b50
all pv records to mdm
db24b0f
different mdm generator method
9d6874f
out_mdm log path
cdf96a0
try to get out_mdm logging path
c902df6
pv metric now sending to ME
0f41269
add in threshold condition
d4148cc
constants and consistent naming
1d6cee6
comments and code cleanup
357914a
remove container name, add pod name/uid
9377262
log fixes and constnat change
ee14b2b
naming fix
c1d46e8
cleanup
130e5d7
add pvUsedBytes as metric to collect
gracewehner d0f8d58
more cleanup
7cce941
Merge branch 'grwehner/pv' of https://github.com/microsoft/Docker-Pro…
0e7593b
Merge remote-tracking branch 'origin/ci_dev' into grwehner/pv
f0885e4
boolean fix
62b84ba
set threshold to 60
gracewehner 9da59bb
add pv inventory fluent plugin structure
gracewehner 9163a90
structure fixes
gracewehner 6f705b7
send as insights metrics
gracewehner 2e1a2cd
include in pod inventory
gracewehner 8fca127
add check that pvUsedBytes is a configured metric to collect
gracewehner da0a34d
code review feedback changes
gracewehner 68404bf
after testing changes
gracewehner 4505b45
whitespace fix
gracewehner c08054b
variable name fix
gracewehner c88b9ab
naming changes
gracewehner d2c6c4a
Merge remote-tracking branch 'origin/grwehner/pv' into grwehner/pv-in…
gracewehner 4447cdd
make call for pv instead of pvc
gracewehner dc3351d
disk info and telemetry
gracewehner 4d5e228
logging and pvcs in pod inventory
gracewehner 8312caf
parsing fixes
gracewehner 610cc71
pv inventory in pod inventory and more telemetry
gracewehner 7c4a547
cleanup and add logging for kube api response size
gracewehner a578ba3
payload investigation
gracewehner 4578b80
getting more resposne size info
gracewehner bc6b8cc
use continuation token, get rid of mdm path
gracewehner 7545214
use kubepvinventory path
gracewehner 4ae76a1
add back in parse_and_emit
gracewehner a9218a1
additions for PV Type
gracewehner bc4a943
updated schema, sending to insights metrics for testing
gracewehner 948bf9a
bug fixes
gracewehner 75d1e75
Merge remote-tracking branch 'origin/ci_dev' into grwehner/pv-inventory
gracewehner 9016f14
route to new LA table
gracewehner cc8c023
refactoring
gracewehner 062306a
add back in pv type list
gracewehner b1ff023
after testing fixes
gracewehner 59108b4
comments and rescues
gracewehner 677d938
remove extra logging
gracewehner db84280
fix variable naming
gracewehner File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,253 @@ | ||
| module Fluent | ||
| class Kube_PVInventory_Input < Input | ||
| Plugin.register_input("kubepvinventory", self) | ||
|
|
||
| @@hostName = (OMS::Common.get_hostname) | ||
|
|
||
| def initialize | ||
| super | ||
| require "yaml" | ||
| require "yajl/json_gem" | ||
| require "yajl" | ||
| require "time" | ||
| require_relative "KubernetesApiClient" | ||
| require_relative "ApplicationInsightsUtility" | ||
| require_relative "oms_common" | ||
| require_relative "omslog" | ||
| require_relative "constants" | ||
|
|
||
| # Response size is around 1500 bytes per PV | ||
| @PV_CHUNK_SIZE = "5000" | ||
vishiy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| @pvTypeToCountHash = {} | ||
| end | ||
|
|
||
| config_param :run_interval, :time, :default => 60 | ||
| config_param :tag, :string, :default => "oms.containerinsights.KubePVInventory" | ||
|
|
||
| def configure(conf) | ||
| super | ||
| end | ||
|
|
||
| def start | ||
| if @run_interval | ||
| @finished = false | ||
| @condition = ConditionVariable.new | ||
| @mutex = Mutex.new | ||
| @thread = Thread.new(&method(:run_periodic)) | ||
| @@pvTelemetryTimeTracker = DateTime.now.to_time.to_i | ||
| end | ||
| end | ||
|
|
||
| def shutdown | ||
| if @run_interval | ||
| @mutex.synchronize { | ||
| @finished = true | ||
| @condition.signal | ||
| } | ||
| @thread.join | ||
| end | ||
| end | ||
|
|
||
| def enumerate | ||
| begin | ||
| pvInventory = nil | ||
| telemetryFlush = false | ||
| @pvTypeToCountHash = {} | ||
| currentTime = Time.now | ||
| batchTime = currentTime.utc.iso8601 | ||
|
|
||
| continuationToken = nil | ||
| $log.info("in_kube_pvinventory::enumerate : Getting PVs from Kube API @ #{Time.now.utc.iso8601}") | ||
| continuationToken, pvInventory = KubernetesApiClient.getResourcesAndContinuationToken("persistentvolumes?limit=#{@PV_CHUNK_SIZE}") | ||
| $log.info("in_kube_pvinventory::enumerate : Done getting PVs from Kube API @ #{Time.now.utc.iso8601}") | ||
|
|
||
| if (!pvInventory.nil? && !pvInventory.empty? && pvInventory.key?("items") && !pvInventory["items"].nil? && !pvInventory["items"].empty?) | ||
| parse_and_emit_records(pvInventory, batchTime) | ||
| else | ||
| $log.warn "in_kube_pvinventory::enumerate:Received empty pvInventory" | ||
| end | ||
|
|
||
| # If we receive a continuation token, make calls, process and flush data until we have processed all data | ||
| while (!continuationToken.nil? && !continuationToken.empty?) | ||
| continuationToken, pvInventory = KubernetesApiClient.getResourcesAndContinuationToken("persistentvolumes?limit=#{@PV_CHUNK_SIZE}&continue=#{continuationToken}") | ||
| if (!pvInventory.nil? && !pvInventory.empty? && pvInventory.key?("items") && !pvInventory["items"].nil? && !pvInventory["items"].empty?) | ||
| parse_and_emit_records(pvInventory, batchTime) | ||
| else | ||
| $log.warn "in_kube_pvinventory::enumerate:Received empty pvInventory" | ||
| end | ||
| end | ||
|
|
||
| # Setting this to nil so that we dont hold memory until GC kicks in | ||
| pvInventory = nil | ||
|
|
||
| # Adding telemetry to send pod telemetry every 10 minutes | ||
| timeDifference = (DateTime.now.to_time.to_i - @@pvTelemetryTimeTracker).abs | ||
| timeDifferenceInMinutes = timeDifference / 60 | ||
| if (timeDifferenceInMinutes >= Constants::TELEMETRY_FLUSH_INTERVAL_IN_MINUTES) | ||
| telemetryFlush = true | ||
| end | ||
|
|
||
| # Flush AppInsights telemetry once all the processing is done | ||
| if telemetryFlush == true | ||
| telemetryProperties = {} | ||
| telemetryProperties["CountsOfPVTypes"] = @pvTypeToCountHash | ||
| ApplicationInsightsUtility.sendCustomEvent(Constants::PV_INVENTORY_HEART_BEAT_EVENT, telemetryProperties) | ||
| @@pvTelemetryTimeTracker = DateTime.now.to_time.to_i | ||
| end | ||
|
|
||
| rescue => errorStr | ||
| $log.warn "in_kube_pvinventory::enumerate:Failed in enumerate: #{errorStr}" | ||
| $log.debug_backtrace(errorStr.backtrace) | ||
| ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) | ||
| end | ||
| end # end enumerate | ||
|
|
||
| def parse_and_emit_records(pvInventory, batchTime = Time.utc.iso8601) | ||
| currentTime = Time.now | ||
| emitTime = currentTime.to_f | ||
| eventStream = MultiEventStream.new | ||
|
|
||
| begin | ||
| records = [] | ||
| pvInventory["items"].each do |item| | ||
|
|
||
| # Node, pod, & usage info can be found by joining with pvUsedBytes metric using PVCNamespace/PVCName | ||
| record = {} | ||
| record["CollectionTime"] = batchTime | ||
| record["ClusterId"] = KubernetesApiClient.getClusterId | ||
| record["ClusterName"] = KubernetesApiClient.getClusterName | ||
| record["PVName"] = item["metadata"]["name"] | ||
| record["PVStatus"] = item["status"]["phase"] | ||
| record["PVAccessModes"] = item["spec"]["accessModes"].join(', ') | ||
| record["PVStorageClassName"] = item["spec"]["storageClassName"] | ||
| record["PVCapacityBytes"] = KubernetesApiClient.getMetricNumericValue("memory", item["spec"]["capacity"]["storage"]) | ||
| record["PVCreationTimeStamp"] = item["metadata"]["creationTimestamp"] | ||
|
|
||
| # Optional values | ||
| pvcNamespace, pvcName = getPVCInfo(item) | ||
| type, typeInfo = getTypeInfo(item) | ||
| record["PVCNamespace"] = pvcNamespace | ||
| record["PVCName"] = pvcName | ||
| record["PVType"] = type | ||
| record["PVTypeInfo"] = typeInfo | ||
|
|
||
| records.push(record) | ||
|
|
||
| # Record telemetry | ||
| if type == nil | ||
| type = "empty" | ||
| end | ||
| if (@pvTypeToCountHash.has_key? type) | ||
| @pvTypeToCountHash[type] += 1 | ||
| else | ||
| @pvTypeToCountHash[type] = 1 | ||
| end | ||
| end | ||
|
|
||
| records.each do |record| | ||
| if !record.nil? | ||
| wrapper = { | ||
| "DataType" => "KUBE_PV_INVENTORY_BLOB", | ||
| "IPName" => "ContainerInsights", | ||
| "DataItems" => [record.each { |k, v| record[k] = v }], | ||
| } | ||
| eventStream.add(emitTime, wrapper) if wrapper | ||
| end | ||
| end | ||
|
|
||
| router.emit_stream(@tag, eventStream) if eventStream | ||
|
|
||
| rescue => errorStr | ||
| $log.warn "Failed in parse_and_emit_record for in_kube_pvinventory: #{errorStr}" | ||
| $log.debug_backtrace(errorStr.backtrace) | ||
| ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) | ||
| end | ||
| end | ||
|
|
||
| def getPVCInfo(item) | ||
| begin | ||
| if !item["spec"].nil? && !item["spec"]["claimRef"].nil? | ||
| claimRef = item["spec"]["claimRef"] | ||
| pvcNamespace = claimRef["namespace"] | ||
vishiy marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| pvcName = claimRef["name"] | ||
| return pvcNamespace, pvcName | ||
| end | ||
| rescue => errorStr | ||
| $log.warn "Failed in getPVCInfo for in_kube_pvinventory: #{errorStr}" | ||
| $log.debug_backtrace(errorStr.backtrace) | ||
| ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) | ||
| end | ||
|
|
||
| # No PVC or an error | ||
| return nil, nil | ||
| end | ||
|
|
||
| def getTypeInfo(item) | ||
| begin | ||
| if !item["spec"].nil? | ||
| (Constants::PV_TYPES).each do |pvType| | ||
|
|
||
| # PV is this type | ||
| if !item["spec"][pvType].nil? | ||
|
|
||
| # Get additional info if azure disk/file | ||
| typeInfo = {} | ||
| if pvType == "azureDisk" | ||
| azureDisk = item["spec"]["azureDisk"] | ||
| typeInfo["DiskName"] = azureDisk["diskName"] | ||
| typeInfo["DiskUri"] = azureDisk["diskURI"] | ||
| elsif pvType == "azureFile" | ||
| typeInfo["FileShareName"] = item["spec"]["azureFile"]["shareName"] | ||
| end | ||
|
|
||
| # Can only have one type: return right away when found | ||
| return pvType, typeInfo | ||
|
|
||
| end | ||
| end | ||
| end | ||
| rescue => errorStr | ||
| $log.warn "Failed in getTypeInfo for in_kube_pvinventory: #{errorStr}" | ||
| $log.debug_backtrace(errorStr.backtrace) | ||
| ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) | ||
| end | ||
|
|
||
| # No matches from list of types or an error | ||
| return nil, {} | ||
| end | ||
|
|
||
|
|
||
| def run_periodic | ||
| @mutex.lock | ||
| done = @finished | ||
| @nextTimeToRun = Time.now | ||
| @waitTimeout = @run_interval | ||
| until done | ||
| @nextTimeToRun = @nextTimeToRun + @run_interval | ||
| @now = Time.now | ||
| if @nextTimeToRun <= @now | ||
| @waitTimeout = 1 | ||
| @nextTimeToRun = @now | ||
| else | ||
| @waitTimeout = @nextTimeToRun - @now | ||
| end | ||
| @condition.wait(@mutex, @waitTimeout) | ||
| done = @finished | ||
| @mutex.unlock | ||
| if !done | ||
| begin | ||
| $log.info("in_kube_pvinventory::run_periodic.enumerate.start #{Time.now.utc.iso8601}") | ||
| enumerate | ||
| $log.info("in_kube_pvinventory::run_periodic.enumerate.end #{Time.now.utc.iso8601}") | ||
| rescue => errorStr | ||
| $log.warn "in_kube_pvinventory::run_periodic: enumerate Failed to retrieve pod inventory: #{errorStr}" | ||
| ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) | ||
| end | ||
| end | ||
| @mutex.lock | ||
| end | ||
| @mutex.unlock | ||
| end | ||
|
|
||
| end # Kube_PVInventory_Input | ||
| end # module | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.