diff --git a/installer/conf/td-agent-bit.conf b/installer/conf/td-agent-bit.conf index b39587a97..2a6199987 100644 --- a/installer/conf/td-agent-bit.conf +++ b/installer/conf/td-agent-bit.conf @@ -14,6 +14,15 @@ Path_Key filepath Skip_Long_Lines On +[INPUT] + Name tail + Tag oms.container.log.flbplugin.* + Path /var/log/containers/omsagent*.log + DB /var/opt/microsoft/docker-cimprov/state/omsagent-ai.db + Mem_Buf_Limit 30m + Path_Key filepath + Skip_Long_Lines On + [OUTPUT] Name oms EnableTelemetry true diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go index 665c3f9f2..e0abaea1f 100644 --- a/source/code/go/src/plugins/oms.go +++ b/source/code/go/src/plugins/oms.go @@ -223,7 +223,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { for _, record := range tailPluginRecords { - containerID := GetContainerIDFromFilePath(toString(record["filepath"])) + containerID := GetContainerIDFromFilePath(ToString(record["filepath"])) if containerID == "" || containsKey(ignoreIDSet, containerID) { continue @@ -231,9 +231,9 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { stringMap := make(map[string]string) - stringMap["LogEntry"] = toString(record["log"]) - stringMap["LogEntrySource"] = toString(record["stream"]) - stringMap["LogEntryTimeStamp"] = toString(record["time"]) + stringMap["LogEntry"] = ToString(record["log"]) + stringMap["LogEntrySource"] = ToString(record["stream"]) + stringMap["LogEntryTimeStamp"] = ToString(record["time"]) stringMap["SourceSystem"] = "Containers" stringMap["Id"] = containerID @@ -314,16 +314,6 @@ func containsKey(currentMap map[string]bool, key string) bool { return c } -func toString(s interface{}) string { - switch t := s.(type) { - case []byte: - // prevent encoding to base64 - return string(t) - default: - return "" - } -} - // GetContainerIDFromFilePath Gets the container ID From the file Path func GetContainerIDFromFilePath(filepath string) string { start := strings.LastIndex(filepath, "-") @@ -338,12 +328,19 @@ func GetContainerIDFromFilePath(filepath string) string { } // InitializePlugin reads and populates plugin configuration -func InitializePlugin(pluginConfPath string) { +func InitializePlugin(pluginConfPath string, agentVersion string) { IgnoreIDSet = make(map[string]bool) ImageIDMap = make(map[string]string) NameIDMap = make(map[string]string) + ret, err := InitializeTelemetryClient(agentVersion) + if ret != 0 || err != nil { + message := fmt.Sprintf("Error During Telemetry Initialization :%s", err.Error()) + fmt.Printf(message) + Log(message) + } + pluginConfig, err := ReadConfiguration(pluginConfPath) if err != nil { message := fmt.Sprintf("Error Reading plugin config path : %s \n", err.Error()) @@ -355,9 +352,11 @@ func InitializePlugin(pluginConfPath string) { omsadminConf, err := ReadConfiguration(pluginConfig["omsadmin_conf_path"]) if err != nil { - Log(err.Error()) - SendException(err.Error()) - log.Fatalf("Error Reading omsadmin configuration %s\n", err.Error()) + message := fmt.Sprintf("Error Reading omsadmin configuration %s\n", err.Error()) + Log(message) + SendException(message) + time.Sleep(30 * time.Second) + log.Fatalln(message) } OMSEndpoint = omsadminConf["OMS_ENDPOINT"] WorkspaceID = omsadminConf["WORKSPACE_ID"] @@ -396,7 +395,7 @@ func InitializePlugin(pluginConfPath string) { Log(message) SendException(message) } - Computer = strings.TrimSuffix(toString(containerHostName), "\n") + Computer = strings.TrimSuffix(ToString(containerHostName), "\n") Log("Computer == %s \n", Computer) // Initialize KubeAPI Client diff --git a/source/code/go/src/plugins/out_oms.go b/source/code/go/src/plugins/out_oms.go index e2ee324e7..133e0f039 100644 --- a/source/code/go/src/plugins/out_oms.go +++ b/source/code/go/src/plugins/out_oms.go @@ -19,12 +19,12 @@ func FLBPluginRegister(ctx unsafe.Pointer) int { // ctx (context) pointer to fluentbit context (state/ c code) func FLBPluginInit(ctx unsafe.Pointer) int { Log("Initializing out_oms go plugin for fluentbit") - InitializePlugin(ContainerLogPluginConfFilePath) + agentVersion := output.FLBPluginConfigKey(ctx, "AgentVersion") + InitializePlugin(ContainerLogPluginConfFilePath, agentVersion) enableTelemetry := output.FLBPluginConfigKey(ctx, "EnableTelemetry") if strings.Compare(strings.ToLower(enableTelemetry), "true") == 0 { telemetryPushInterval := output.FLBPluginConfigKey(ctx, "TelemetryPushIntervalSeconds") - agentVersion := output.FLBPluginConfigKey(ctx, "AgentVersion") - go SendContainerLogFlushRateMetric(telemetryPushInterval, agentVersion) + go SendContainerLogPluginMetrics(telemetryPushInterval) } else { Log("Telemetry is not enabled for the plugin %s \n", output.FLBPluginConfigKey(ctx, "Name")) return output.FLB_OK @@ -50,6 +50,12 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { } records = append(records, record) } + + incomingTag := C.GoString(tag) + if strings.Contains(strings.ToLower(incomingTag), "oms.container.log.flbplugin") { + return PushToAppInsightsTraces(records) + } + return PostDataHelper(records) } diff --git a/source/code/go/src/plugins/telemetry.go b/source/code/go/src/plugins/telemetry.go index 72454948d..d943c8eda 100644 --- a/source/code/go/src/plugins/telemetry.go +++ b/source/code/go/src/plugins/telemetry.go @@ -4,12 +4,12 @@ import ( "encoding/base64" "errors" "os" - "runtime" "strconv" "strings" "time" "github.com/Microsoft/ApplicationInsights-Go/appinsights" + "github.com/fluent/fluent-bit-go/output" ) var ( @@ -41,8 +41,8 @@ const ( eventNameDaemonSetHeartbeat = "ContainerLogDaemonSetHeartbeatEvent" ) -// initialize initializes the telemetry artifacts -func initialize(telemetryPushIntervalProperty string, agentVersion string) (int, error) { +// SendContainerLogPluginMetrics is a go-routine that flushes the data periodically (every 5 mins to App Insights) +func SendContainerLogPluginMetrics(telemetryPushIntervalProperty string) { telemetryPushInterval, err := strconv.Atoi(telemetryPushIntervalProperty) if err != nil { @@ -52,6 +52,49 @@ func initialize(telemetryPushIntervalProperty string, agentVersion string) (int, ContainerLogTelemetryTicker = time.NewTicker(time.Second * time.Duration(telemetryPushInterval)) + start := time.Now() + SendEvent(eventNameContainerLogInit, make(map[string]string)) + + for ; true; <-ContainerLogTelemetryTicker.C { + SendEvent(eventNameDaemonSetHeartbeat, make(map[string]string)) + elapsed := time.Since(start) + ContainerLogTelemetryMutex.Lock() + flushRate := FlushedRecordsCount / FlushedRecordsTimeTaken * 1000 + logRate := FlushedRecordsCount / float64(elapsed/time.Second) + FlushedRecordsCount = 0.0 + FlushedRecordsTimeTaken = 0.0 + ContainerLogTelemetryMutex.Unlock() + + flushRateMetric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate) + TelemetryClient.Track(flushRateMetric) + logRateMetric := appinsights.NewMetricTelemetry(metricNameAvgLogGenerationRate, logRate) + TelemetryClient.Track(logRateMetric) + start = time.Now() + } +} + +// SendEvent sends an event to App Insights +func SendEvent(eventName string, dimensions map[string]string) { + Log("Sending Event : %s\n", eventName) + event := appinsights.NewEventTelemetry(eventName) + + // add any extra Properties + for k, v := range dimensions { + event.Properties[k] = v + } + + TelemetryClient.Track(event) +} + +// SendException send an event to the configured app insights instance +func SendException(err interface{}) { + if TelemetryClient != nil { + TelemetryClient.TrackException(err) + } +} + +// InitializeTelemetryClient sets up the telemetry client to send telemetry to the App Insights instance +func InitializeTelemetryClient(agentVersion string) (int, error) { encodedIkey := os.Getenv(envAppInsightsAuth) if encodedIkey == "" { Log("Environment Variable Missing \n") @@ -103,51 +146,14 @@ func initialize(telemetryPushIntervalProperty string, agentVersion string) (int, return 0, nil } -// SendContainerLogFlushRateMetric is a go-routine that flushes the data periodically (every 5 mins to App Insights) -func SendContainerLogFlushRateMetric(telemetryPushIntervalProperty string, agentVersion string) { - - ret, err := initialize(telemetryPushIntervalProperty, agentVersion) - if ret != 0 || err != nil { - Log("Error During Telemetry Initialization :%s", err.Error()) - runtime.Goexit() - } - start := time.Now() - SendEvent(eventNameContainerLogInit, make(map[string]string)) - - for ; true; <-ContainerLogTelemetryTicker.C { - SendEvent(eventNameDaemonSetHeartbeat, make(map[string]string)) - elapsed := time.Since(start) - ContainerLogTelemetryMutex.Lock() - flushRate := FlushedRecordsCount / FlushedRecordsTimeTaken * 1000 - logRate := FlushedRecordsCount / float64(elapsed/time.Second) - FlushedRecordsCount = 0.0 - FlushedRecordsTimeTaken = 0.0 - ContainerLogTelemetryMutex.Unlock() - - flushRateMetric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate) - TelemetryClient.Track(flushRateMetric) - logRateMetric := appinsights.NewMetricTelemetry(metricNameAvgLogGenerationRate, logRate) - TelemetryClient.Track(logRateMetric) - start = time.Now() - } -} - -// SendEvent sends an event to App Insights -func SendEvent(eventName string, dimensions map[string]string) { - Log("Sending Event : %s\n", eventName) - event := appinsights.NewEventTelemetry(eventName) - - // add any extra Properties - for k, v := range dimensions { - event.Properties[k] = v +// PushToAppInsightsTraces sends the log lines as trace messages to the configured App Insights Instance +func PushToAppInsightsTraces(records []map[interface{}]interface{}) int { + var logLines []string + for _, record := range records { + logLines = append(logLines, ToString(record["log"])) } - TelemetryClient.Track(event) -} - -// SendException send an event to the configured app insights instance -func SendException(err interface{}) { - if TelemetryClient != nil { - TelemetryClient.TrackException(err) - } + traceEntry := strings.Join(logLines, "\n") + TelemetryClient.TrackTrace(traceEntry, 1) + return output.FLB_OK } diff --git a/source/code/go/src/plugins/utils.go b/source/code/go/src/plugins/utils.go index 94db033bd..91e433a0f 100644 --- a/source/code/go/src/plugins/utils.go +++ b/source/code/go/src/plugins/utils.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "strings" + "time" ) // ReadConfiguration reads a property file @@ -21,8 +22,8 @@ func ReadConfiguration(filename string) (map[string]string, error) { file, err := os.Open(filename) if err != nil { SendException(err) - log.Fatal(err) - + time.Sleep(30 * time.Second) + fmt.Printf("%s", err.Error()) return nil, err } defer file.Close() @@ -43,7 +44,8 @@ func ReadConfiguration(filename string) (map[string]string, error) { if err := scanner.Err(); err != nil { SendException(err) - log.Fatal(err) + time.Sleep(30 * time.Second) + log.Fatalf("%s", err.Error()) return nil, err } @@ -52,11 +54,11 @@ func ReadConfiguration(filename string) (map[string]string, error) { // CreateHTTPClient used to create the client for sending post requests to OMSEndpoint func CreateHTTPClient() { - cert, err := tls.LoadX509KeyPair(PluginConfiguration["cert_file_path"], PluginConfiguration["key_file_path"]) if err != nil { message := fmt.Sprintf("Error when loading cert %s", err.Error()) SendException(message) + time.Sleep(30 * time.Second) Log(message) log.Fatalf("Error when loading cert %s", err.Error()) } @@ -72,3 +74,14 @@ func CreateHTTPClient() { Log("Successfully created HTTP Client") } + +// ToString converts an interface into a string +func ToString(s interface{}) string { + switch t := s.(type) { + case []byte: + // prevent encoding to base64 + return string(t) + default: + return "" + } +}