From 6057b8dcb2d844a8d9be871bbc261e4c06bc6c5e Mon Sep 17 00:00:00 2001 From: Vishwanath Narasimhan Date: Fri, 14 Dec 2018 15:27:38 -0800 Subject: [PATCH] Changes - * use /var/log for state * new metric ContainerLogsAgentSideLatencyMs * new field 'timeOfComand' --- installer/conf/td-agent-bit.conf | 2 +- source/code/go/src/plugins/oms.go | 43 ++++++++++++++++++------- source/code/go/src/plugins/telemetry.go | 12 +++++++ 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/installer/conf/td-agent-bit.conf b/installer/conf/td-agent-bit.conf index c3252a185..b6b9bcc44 100644 --- a/installer/conf/td-agent-bit.conf +++ b/installer/conf/td-agent-bit.conf @@ -8,7 +8,7 @@ Name tail Tag oms.container.log.* Path /var/log/containers/*.log - DB /var/opt/microsoft/docker-cimprov/state/fblogs.db + DB /var/log/omsagent-fblogs.db Parser docker Mem_Buf_Limit 30m Path_Key filepath diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go index 9876acc42..30e844915 100644 --- a/source/code/go/src/plugins/oms.go +++ b/source/code/go/src/plugins/oms.go @@ -77,9 +77,10 @@ var ( // DataItem represents the object corresponding to the json that is sent by fluentbit tail plugin type DataItem struct { - LogEntry string `json:"LogEntry"` - LogEntrySource string `json:"LogEntrySource"` - LogEntryTimeStamp string `json:"LogEntryTimeStamp"` + LogEntry string `json:"LogEntry"` + LogEntrySource string `json:"LogEntrySource"` + LogEntryTimeStamp string `json:"LogEntryTimeStamp"` + LogEntryTimeOfCommand string `json:"TimeOfCommand"` ID string `json:"Id"` Image string `json:"Image"` Name string `json:"Name"` @@ -204,6 +205,8 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { start := time.Now() var dataItems []DataItem + var maxLatency float64 + var maxLatencyContainer string ignoreIDSet := make(map[string]bool) imageIDMap := make(map[string]string) nameIDMap := make(map[string]string) @@ -248,18 +251,32 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { Log("ContainerId %s not present in Map ", containerID) } + dataItem := DataItem{ - ID: stringMap["Id"], - LogEntry: stringMap["LogEntry"], - LogEntrySource: stringMap["LogEntrySource"], - LogEntryTimeStamp: stringMap["LogEntryTimeStamp"], - SourceSystem: stringMap["SourceSystem"], - Computer: Computer, - Image: stringMap["Image"], - Name: stringMap["Name"], + ID: stringMap["Id"], + LogEntry: stringMap["LogEntry"], + LogEntrySource: stringMap["LogEntrySource"], + LogEntryTimeStamp: stringMap["LogEntryTimeStamp"], + LogEntryTimeOfCommand: start.Format(time.RFC3339), + SourceSystem: stringMap["SourceSystem"], + Computer: Computer, + Image: stringMap["Image"], + Name: stringMap["Name"], } dataItems = append(dataItems, dataItem) + loggedTime, e := time.Parse(time.RFC3339, dataItem.LogEntryTimeStamp) + if e!= nil { + message := fmt.Sprintf("Error while converting LogEntryTimeStamp for telemetry purposes: %s", e.Error()) + Log(message) + SendException(message) + } else { + ltncy := float64(start.Sub(loggedTime) / time.Millisecond) + if ltncy >= maxLatency { + maxLatency = ltncy + maxLatencyContainer = dataItem.Name + "=" + dataItem.ID + } + } } if len(dataItems) > 0 { @@ -302,6 +319,10 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { ContainerLogTelemetryMutex.Lock() FlushedRecordsCount += float64(numRecords) FlushedRecordsTimeTaken += float64(elapsed / time.Millisecond) + if maxLatency >= AgentLogProcessingMaxLatencyMs { + AgentLogProcessingMaxLatencyMs = maxLatency + AgentLogProcessingMaxLatencyMsContainer = maxLatencyContainer + } ContainerLogTelemetryMutex.Unlock() } diff --git a/source/code/go/src/plugins/telemetry.go b/source/code/go/src/plugins/telemetry.go index 5952ac9ac..0d5513362 100644 --- a/source/code/go/src/plugins/telemetry.go +++ b/source/code/go/src/plugins/telemetry.go @@ -17,6 +17,10 @@ var ( FlushedRecordsCount float64 // FlushedRecordsTimeTaken indicates the cumulative time taken to flush the records for the current period FlushedRecordsTimeTaken float64 + // This is telemetry for how old/latent logs we are processing in milliseconds (max over a period of time) + AgentLogProcessingMaxLatencyMs float64 + // This is telemetry for which container logs were latent (max over a period of time) + AgentLogProcessingMaxLatencyMsContainer string // CommonProperties indicates the dimensions that are sent with every event/metric CommonProperties map[string]string // TelemetryClient is the client used to send the telemetry @@ -35,6 +39,7 @@ const ( envAppInsightsAuth = "APPLICATIONINSIGHTS_AUTH" metricNameAvgFlushRate = "ContainerLogAvgRecordsFlushedPerSec" metricNameAvgLogGenerationRate = "ContainerLogsGeneratedPerSec" + metricNameAgentLogProcessingMaxLatencyMs = "ContainerLogsAgentSideLatencyMs" defaultTelemetryPushIntervalSeconds = 300 eventNameContainerLogInit = "ContainerLogPluginInitialized" @@ -62,12 +67,19 @@ func SendContainerLogPluginMetrics(telemetryPushIntervalProperty string) { logRate := FlushedRecordsCount / float64(elapsed/time.Second) FlushedRecordsCount = 0.0 FlushedRecordsTimeTaken = 0.0 + logLatencyMs := AgentLogProcessingMaxLatencyMs + logLatencyMsContainer := AgentLogProcessingMaxLatencyMsContainer + AgentLogProcessingMaxLatencyMs = 0 + AgentLogProcessingMaxLatencyMsContainer = "" ContainerLogTelemetryMutex.Unlock() flushRateMetric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate) TelemetryClient.Track(flushRateMetric) logRateMetric := appinsights.NewMetricTelemetry(metricNameAvgLogGenerationRate, logRate) TelemetryClient.Track(logRateMetric) + logLatencyMetric := appinsights.NewMetricTelemetry(metricNameAgentLogProcessingMaxLatencyMs, logLatencyMs) + logLatencyMetric.Properties["Container"] = logLatencyMsContainer + TelemetryClient.Track(logLatencyMetric) start = time.Now() } }