Skip to content
Merged
3 changes: 2 additions & 1 deletion installer/conf/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,8 @@
## Use TLS but skip chain & host verification
insecure_skip_verify = true
#tagexclude = ["AgentVersion","AKS_RESOURCE_ID","ACS_RESOURCE_NAME", "Region", "ClusterName", "ClusterType", "Computer", "ControllerType"]

[inputs.prometheus.tagpass]
operation_type = ["create_container", "remove_container", "pull_image"]

## prometheus custom metrics
[[inputs.prometheus]]
Expand Down
2 changes: 1 addition & 1 deletion installer/scripts/tomlparser-prom-customconfig.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def parseConfigMap
end

def checkForTypeArray(arrayValue, arrayType)
if (arrayValue.nil? || (arrayValue.kind_of?(Array) && arrayValue.length > 0 && arrayValue[0].kind_of?(arrayType)))
if (arrayValue.nil? || (arrayValue.kind_of?(Array) && ((arrayValue.length == 0) || (arrayValue.length > 0 && arrayValue[0].kind_of?(arrayType)))))
return true
else
return false
Expand Down
85 changes: 47 additions & 38 deletions source/code/go/src/plugins/oms.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ type laKubeMonAgentEvents struct {
}

type KubeMonAgentEventTags struct {
PodName string
ContainerId string
FirstOccurance string
LastOccurance string
Count int
PodName string
ContainerId string
FirstOccurrence string
LastOccurrence string
Count int
}

type KubeMonAgentEventBlob struct {
Expand Down Expand Up @@ -259,7 +259,14 @@ func updateContainerImageNameMaps() {
}

for _, pod := range pods.Items {
for _, status := range pod.Status.ContainerStatuses {
podContainerStatuses := pod.Status.ContainerStatuses

// Doing this to include init container logs as well
podInitContainerStatuses := pod.Status.InitContainerStatuses
if (podInitContainerStatuses != nil) && (len(podInitContainerStatuses) > 0) {
podContainerStatuses = append(podContainerStatuses, podInitContainerStatuses...)
}
for _, status := range podContainerStatuses {
lastSlashIndex := strings.LastIndex(status.ContainerID, "/")
containerID := status.ContainerID[lastSlashIndex+1 : len(status.ContainerID)]
image := status.Image
Expand Down Expand Up @@ -344,22 +351,22 @@ func populateKubeMonAgentEventHash(record map[interface{}]interface{}, errType K
if val, ok := ConfigErrorEvent[logRecordString]; ok {
Log("In config error existing hash update\n")
eventCount := val.Count
eventFirstOccurance := val.FirstOccurance
eventFirstOccurrence := val.FirstOccurrence

ConfigErrorEvent[logRecordString] = KubeMonAgentEventTags{
PodName: podName,
ContainerId: containerID,
FirstOccurance: eventFirstOccurance,
LastOccurance: eventTimeStamp,
Count: eventCount + 1,
PodName: podName,
ContainerId: containerID,
FirstOccurrence: eventFirstOccurrence,
LastOccurrence: eventTimeStamp,
Count: eventCount + 1,
}
} else {
ConfigErrorEvent[logRecordString] = KubeMonAgentEventTags{
PodName: podName,
ContainerId: containerID,
FirstOccurance: eventTimeStamp,
LastOccurance: eventTimeStamp,
Count: 1,
PodName: podName,
ContainerId: containerID,
FirstOccurrence: eventTimeStamp,
LastOccurrence: eventTimeStamp,
Count: 1,
}
}

Expand All @@ -374,22 +381,22 @@ func populateKubeMonAgentEventHash(record map[interface{}]interface{}, errType K
if val, ok := PromScrapeErrorEvent[splitString]; ok {
Log("In config error existing hash update\n")
eventCount := val.Count
eventFirstOccurance := val.FirstOccurance
eventFirstOccurrence := val.FirstOccurrence

PromScrapeErrorEvent[splitString] = KubeMonAgentEventTags{
PodName: podName,
ContainerId: containerID,
FirstOccurance: eventFirstOccurance,
LastOccurance: eventTimeStamp,
Count: eventCount + 1,
PodName: podName,
ContainerId: containerID,
FirstOccurrence: eventFirstOccurrence,
LastOccurrence: eventTimeStamp,
Count: eventCount + 1,
}
} else {
PromScrapeErrorEvent[splitString] = KubeMonAgentEventTags{
PodName: podName,
ContainerId: containerID,
FirstOccurance: eventTimeStamp,
LastOccurance: eventTimeStamp,
Count: 1,
PodName: podName,
ContainerId: containerID,
FirstOccurrence: eventTimeStamp,
LastOccurrence: eventTimeStamp,
Count: 1,
}
}
}
Expand Down Expand Up @@ -756,16 +763,18 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
FlushedRecordsSize += float64(len(stringMap["LogEntry"]))

dataItems = append(dataItems, dataItem)
loggedTime, e := time.Parse(time.RFC3339, dataItem.LogEntryTimeStamp)
if e != nil {
message := fmt.Sprintf("Error while converting LogEntryTimeStamp for telemetry purposes: %s", e.Error())
Log(message)
SendException(message)
} else {
ltncy := float64(start.Sub(loggedTime) / time.Millisecond)
if ltncy >= maxLatency {
maxLatency = ltncy
maxLatencyContainer = dataItem.Name + "=" + dataItem.ID
if dataItem.LogEntryTimeStamp != "" {
loggedTime, e := time.Parse(time.RFC3339, dataItem.LogEntryTimeStamp)
if e != nil {
message := fmt.Sprintf("Error while converting LogEntryTimeStamp for telemetry purposes: %s", e.Error())
Log(message)
SendException(message)
} else {
ltncy := float64(start.Sub(loggedTime) / time.Millisecond)
if ltncy >= maxLatency {
maxLatency = ltncy
maxLatencyContainer = dataItem.Name + "=" + dataItem.ID
}
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions source/code/plugin/KubernetesApiClient.rb
Original file line number Diff line number Diff line change
Expand Up @@ -356,9 +356,19 @@ def getContainerResourceRequestsAndLimits(metricJSON, metricCategory, metricName
else
podUid = pod["metadata"]["uid"]
end
if (!pod["spec"]["containers"].nil? && !pod["spec"]["nodeName"].nil?)

podContainers = []
if !pod["spec"]["containers"].nil? && !pod["spec"]["containers"].empty?
podContainers = podContainers + pod["spec"]["containers"]
end
# Adding init containers to the record list as well.
if !pod["spec"]["initContainers"].nil? && !pod["spec"]["initContainers"].empty?
podContainers = podContainers + pod["spec"]["initContainers"]
end

if (!podContainers.nil? && !podContainers.empty? && !pod["spec"]["nodeName"].nil?)
nodeName = pod["spec"]["nodeName"]
pod["spec"]["containers"].each do |container|
podContainers.each do |container|
containerName = container["name"]
metricTime = Time.now.utc.iso8601 #2018-01-30T19:36:14Z
if (!container["resources"].nil? && !container["resources"].empty? && !container["resources"][metricCategory].nil? && !container["resources"][metricCategory][metricNameToCollect].nil?)
Expand Down
17 changes: 10 additions & 7 deletions source/code/plugin/in_kube_events.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,20 @@ def enumerate(eventList = nil)
currentTime = Time.now
emitTime = currentTime.to_f
batchTime = currentTime.utc.iso8601
if eventList.nil?
$log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}")
events = JSON.parse(KubernetesApiClient.getKubeResourceInfo("events").body)
$log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}")
else
events = eventList

events = eventList
$log.info("in_kube_events::enumerate : Getting events from Kube API @ #{Time.now.utc.iso8601}")
eventInfo = KubernetesApiClient.getKubeResourceInfo("events")
$log.info("in_kube_events::enumerate : Done getting events from Kube API @ #{Time.now.utc.iso8601}")

if !eventInfo.nil?
events = JSON.parse(eventInfo.body)
end

eventQueryState = getEventQueryState
newEventQueryState = []
begin
if (!events.empty? && !events["items"].nil?)
if (!events.nil? && !events.empty? && !events["items"].nil?)
eventStream = MultiEventStream.new
events["items"].each do |items|
record = {}
Expand Down
13 changes: 10 additions & 3 deletions source/code/plugin/in_kube_nodes.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,19 @@ def enumerate
emitTime = currentTime.to_f
batchTime = currentTime.utc.iso8601
telemetrySent = false

nodeInventory = nil

$log.info("in_kube_nodes::enumerate : Getting nodes from Kube API @ #{Time.now.utc.iso8601}")
nodeInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("nodes").body)
nodeInfo = KubernetesApiClient.getKubeResourceInfo("nodes")
$log.info("in_kube_nodes::enumerate : Done getting nodes from Kube API @ #{Time.now.utc.iso8601}")

if !nodeInfo.nil?
nodeInventory = JSON.parse(nodeInfo.body)
end

begin
if (!nodeInventory.empty?)
if (!nodeInventory.nil? && !nodeInventory.empty?)
eventStream = MultiEventStream.new
containerNodeInventoryEventStream = MultiEventStream.new
if !nodeInventory["items"].nil?
Expand Down Expand Up @@ -95,7 +103,6 @@ def enumerate
record["KubernetesProviderID"] = "onprem"
end


# Refer to https://kubernetes.io/docs/concepts/architecture/nodes/#condition for possible node conditions.
# We check the status of each condition e.g. {"type": "OutOfDisk","status": "False"} . Based on this we
# populate the KubeNodeInventory Status field. A possible value for this field could be "Ready OutofDisk"
Expand Down
41 changes: 31 additions & 10 deletions source/code/plugin/in_kube_podinventory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,15 @@ def shutdown
end

def enumerate(podList = nil)
if podList.nil?
$log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}")
podInventory = JSON.parse(KubernetesApiClient.getKubeResourceInfo("pods").body)
$log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}")
else
podInventory = podList
podInventory = podList
$log.info("in_kube_podinventory::enumerate : Getting pods from Kube API @ #{Time.now.utc.iso8601}")
podInfo = KubernetesApiClient.getKubeResourceInfo("pods")
$log.info("in_kube_podinventory::enumerate : Done getting pods from Kube API @ #{Time.now.utc.iso8601}")

if !podInfo.nil?
podInventory = JSON.parse(podInfo.body)
end

begin
if (!podInventory.empty? && podInventory.key?("items") && !podInventory["items"].empty?)
#get pod inventory & services
Expand Down Expand Up @@ -137,8 +139,16 @@ def getContainerEnvironmentVariables(pod, clusterCollectEnvironmentVar)
begin
podSpec = pod["spec"]
containerEnvHash = {}
if !podSpec.nil? && !podSpec["containers"].nil?
podSpec["containers"].each do |container|
podContainersEnv = []
if !podSpec["containers"].nil? && !podSpec["containers"].empty?
podContainersEnv = podContainersEnv + podSpec["containers"]
end
# Adding init containers to the record list as well.
if !podSpec["initContainers"].nil? && !podSpec["initContainers"].empty?
podContainersEnv = podContainersEnv + podSpec["initContainers"]
end
if !podContainersEnv.nil? && !podContainersEnv.empty?
podContainersEnv.each do |container|
if !clusterCollectEnvironmentVar.nil? && !clusterCollectEnvironmentVar.empty? && clusterCollectEnvironmentVar.casecmp("false") == 0
containerEnvHash[container["name"]] = ["AZMON_CLUSTER_COLLECT_ENV_VAR=FALSE"]
else
Expand Down Expand Up @@ -289,8 +299,19 @@ def parse_and_emit_records(podInventory, serviceList)
end
podRestartCount = 0
record["PodRestartCount"] = 0
if items["status"].key?("containerStatuses") && !items["status"]["containerStatuses"].empty? #container status block start
items["status"]["containerStatuses"].each do |container|

podContainers = []
if items["status"].key?("containerStatuses") && !items["status"]["containerStatuses"].empty?
podContainers = podContainers + items["status"]["containerStatuses"]
end
# Adding init containers to the record list as well.
if items["status"].key?("initContainerStatuses") && !items["status"]["initContainerStatuses"].empty?
podContainers = podContainers + items["status"]["initContainerStatuses"]
end

# if items["status"].key?("containerStatuses") && !items["status"]["containerStatuses"].empty? #container status block start
if !podContainers.empty? #container status block start
podContainers.each do |container|
containerRestartCount = 0
#container Id is of the form
#docker://dfd9da983f1fd27432fb2c1fe3049c0a1d25b1c697b2dc1a530c986e58b16527
Expand Down
12 changes: 10 additions & 2 deletions source/code/plugin/in_kube_services.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,19 @@ def enumerate
currentTime = Time.now
emitTime = currentTime.to_f
batchTime = currentTime.utc.iso8601

serviceList = nil

$log.info("in_kube_services::enumerate : Getting services from Kube API @ #{Time.now.utc.iso8601}")
serviceList = JSON.parse(KubernetesApiClient.getKubeResourceInfo("services").body)
serviceInfo = KubernetesApiClient.getKubeResourceInfo("services")
$log.info("in_kube_services::enumerate : Done getting services from Kube API @ #{Time.now.utc.iso8601}")

if !serviceInfo.nil?
serviceList = JSON.parse(serviceInfo.body)
end

begin
if (!serviceList.empty?)
if (!serviceList.nil? && !serviceList.empty?)
eventStream = MultiEventStream.new
serviceList["items"].each do |items|
record = {}
Expand Down