Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion installer/conf/td-agent-bit.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
Name tail
Tag oms.container.log.*
Path /var/log/containers/*.log
DB /var/opt/microsoft/docker-cimprov/state/fblogs.db
DB /var/log/omsagent-fblogs.db
Parser docker
Mem_Buf_Limit 30m
Path_Key filepath
Expand Down
43 changes: 32 additions & 11 deletions source/code/go/src/plugins/oms.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ var (

// DataItem represents the object corresponding to the json that is sent by fluentbit tail plugin
type DataItem struct {
LogEntry string `json:"LogEntry"`
LogEntrySource string `json:"LogEntrySource"`
LogEntryTimeStamp string `json:"LogEntryTimeStamp"`
LogEntry string `json:"LogEntry"`
LogEntrySource string `json:"LogEntrySource"`
LogEntryTimeStamp string `json:"LogEntryTimeStamp"`
LogEntryTimeOfCommand string `json:"TimeOfCommand"`
ID string `json:"Id"`
Image string `json:"Image"`
Name string `json:"Name"`
Expand Down Expand Up @@ -204,6 +205,8 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {

start := time.Now()
var dataItems []DataItem
var maxLatency float64
var maxLatencyContainer string
ignoreIDSet := make(map[string]bool)
imageIDMap := make(map[string]string)
nameIDMap := make(map[string]string)
Expand Down Expand Up @@ -248,18 +251,32 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
Log("ContainerId %s not present in Map ", containerID)
}


dataItem := DataItem{
ID: stringMap["Id"],
LogEntry: stringMap["LogEntry"],
LogEntrySource: stringMap["LogEntrySource"],
LogEntryTimeStamp: stringMap["LogEntryTimeStamp"],
SourceSystem: stringMap["SourceSystem"],
Computer: Computer,
Image: stringMap["Image"],
Name: stringMap["Name"],
ID: stringMap["Id"],
LogEntry: stringMap["LogEntry"],
LogEntrySource: stringMap["LogEntrySource"],
LogEntryTimeStamp: stringMap["LogEntryTimeStamp"],
LogEntryTimeOfCommand: start.Format(time.RFC3339),
SourceSystem: stringMap["SourceSystem"],
Computer: Computer,
Image: stringMap["Image"],
Name: stringMap["Name"],
}

dataItems = append(dataItems, dataItem)
loggedTime, e := time.Parse(time.RFC3339, dataItem.LogEntryTimeStamp)
if e!= nil {
message := fmt.Sprintf("Error while converting LogEntryTimeStamp for telemetry purposes: %s", e.Error())
Log(message)
SendException(message)
} else {
ltncy := float64(start.Sub(loggedTime) / time.Millisecond)
if ltncy >= maxLatency {
maxLatency = ltncy
maxLatencyContainer = dataItem.Name + "=" + dataItem.ID
}
}
}

if len(dataItems) > 0 {
Expand Down Expand Up @@ -302,6 +319,10 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
ContainerLogTelemetryMutex.Lock()
FlushedRecordsCount += float64(numRecords)
FlushedRecordsTimeTaken += float64(elapsed / time.Millisecond)
if maxLatency >= AgentLogProcessingMaxLatencyMs {
AgentLogProcessingMaxLatencyMs = maxLatency
AgentLogProcessingMaxLatencyMsContainer = maxLatencyContainer
}
ContainerLogTelemetryMutex.Unlock()
}

Expand Down
12 changes: 12 additions & 0 deletions source/code/go/src/plugins/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ var (
FlushedRecordsCount float64
// FlushedRecordsTimeTaken indicates the cumulative time taken to flush the records for the current period
FlushedRecordsTimeTaken float64
// This is telemetry for how old/latent logs we are processing in milliseconds (max over a period of time)
AgentLogProcessingMaxLatencyMs float64
// This is telemetry for which container logs were latent (max over a period of time)
AgentLogProcessingMaxLatencyMsContainer string
// CommonProperties indicates the dimensions that are sent with every event/metric
CommonProperties map[string]string
// TelemetryClient is the client used to send the telemetry
Expand All @@ -35,6 +39,7 @@ const (
envAppInsightsAuth = "APPLICATIONINSIGHTS_AUTH"
metricNameAvgFlushRate = "ContainerLogAvgRecordsFlushedPerSec"
metricNameAvgLogGenerationRate = "ContainerLogsGeneratedPerSec"
metricNameAgentLogProcessingMaxLatencyMs = "ContainerLogsAgentSideLatencyMs"
defaultTelemetryPushIntervalSeconds = 300

eventNameContainerLogInit = "ContainerLogPluginInitialized"
Expand Down Expand Up @@ -62,12 +67,19 @@ func SendContainerLogPluginMetrics(telemetryPushIntervalProperty string) {
logRate := FlushedRecordsCount / float64(elapsed/time.Second)
FlushedRecordsCount = 0.0
FlushedRecordsTimeTaken = 0.0
logLatencyMs := AgentLogProcessingMaxLatencyMs
logLatencyMsContainer := AgentLogProcessingMaxLatencyMsContainer
AgentLogProcessingMaxLatencyMs = 0
AgentLogProcessingMaxLatencyMsContainer = ""
ContainerLogTelemetryMutex.Unlock()

flushRateMetric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate)
TelemetryClient.Track(flushRateMetric)
logRateMetric := appinsights.NewMetricTelemetry(metricNameAvgLogGenerationRate, logRate)
TelemetryClient.Track(logRateMetric)
logLatencyMetric := appinsights.NewMetricTelemetry(metricNameAgentLogProcessingMaxLatencyMs, logLatencyMs)
logLatencyMetric.Properties["Container"] = logLatencyMsContainer
TelemetryClient.Track(logLatencyMetric)
start = time.Now()
}
}
Expand Down