Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions installer/conf/td-agent-bit.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@
Path_Key filepath
Skip_Long_Lines On

[INPUT]
Name tail
Tag oms.container.log.flbplugin.*
Path /var/log/containers/omsagent*.log
DB /var/opt/microsoft/docker-cimprov/state/omsagent-ai.db
Mem_Buf_Limit 30m
Path_Key filepath
Skip_Long_Lines On

[OUTPUT]
Name oms
EnableTelemetry true
Expand Down
37 changes: 18 additions & 19 deletions source/code/go/src/plugins/oms.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,17 +223,17 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {

for _, record := range tailPluginRecords {

containerID := GetContainerIDFromFilePath(toString(record["filepath"]))
containerID := GetContainerIDFromFilePath(ToString(record["filepath"]))

if containerID == "" || containsKey(ignoreIDSet, containerID) {
continue
}

stringMap := make(map[string]string)

stringMap["LogEntry"] = toString(record["log"])
stringMap["LogEntrySource"] = toString(record["stream"])
stringMap["LogEntryTimeStamp"] = toString(record["time"])
stringMap["LogEntry"] = ToString(record["log"])
stringMap["LogEntrySource"] = ToString(record["stream"])
stringMap["LogEntryTimeStamp"] = ToString(record["time"])
stringMap["SourceSystem"] = "Containers"
stringMap["Id"] = containerID

Expand Down Expand Up @@ -314,16 +314,6 @@ func containsKey(currentMap map[string]bool, key string) bool {
return c
}

func toString(s interface{}) string {
switch t := s.(type) {
case []byte:
// prevent encoding to base64
return string(t)
default:
return ""
}
}

// GetContainerIDFromFilePath Gets the container ID From the file Path
func GetContainerIDFromFilePath(filepath string) string {
start := strings.LastIndex(filepath, "-")
Expand All @@ -338,12 +328,19 @@ func GetContainerIDFromFilePath(filepath string) string {
}

// InitializePlugin reads and populates plugin configuration
func InitializePlugin(pluginConfPath string) {
func InitializePlugin(pluginConfPath string, agentVersion string) {

IgnoreIDSet = make(map[string]bool)
ImageIDMap = make(map[string]string)
NameIDMap = make(map[string]string)

ret, err := InitializeTelemetryClient(agentVersion)
if ret != 0 || err != nil {
message := fmt.Sprintf("Error During Telemetry Initialization :%s", err.Error())
fmt.Printf(message)
Log(message)
}

pluginConfig, err := ReadConfiguration(pluginConfPath)
if err != nil {
message := fmt.Sprintf("Error Reading plugin config path : %s \n", err.Error())
Expand All @@ -355,9 +352,11 @@ func InitializePlugin(pluginConfPath string) {

omsadminConf, err := ReadConfiguration(pluginConfig["omsadmin_conf_path"])
if err != nil {
Log(err.Error())
SendException(err.Error())
log.Fatalf("Error Reading omsadmin configuration %s\n", err.Error())
message := fmt.Sprintf("Error Reading omsadmin configuration %s\n", err.Error())
Log(message)
SendException(message)
time.Sleep(30 * time.Second)
log.Fatalln(message)
}
OMSEndpoint = omsadminConf["OMS_ENDPOINT"]
WorkspaceID = omsadminConf["WORKSPACE_ID"]
Expand Down Expand Up @@ -396,7 +395,7 @@ func InitializePlugin(pluginConfPath string) {
Log(message)
SendException(message)
}
Computer = strings.TrimSuffix(toString(containerHostName), "\n")
Computer = strings.TrimSuffix(ToString(containerHostName), "\n")
Log("Computer == %s \n", Computer)

// Initialize KubeAPI Client
Expand Down
12 changes: 9 additions & 3 deletions source/code/go/src/plugins/out_oms.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ func FLBPluginRegister(ctx unsafe.Pointer) int {
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {
Log("Initializing out_oms go plugin for fluentbit")
InitializePlugin(ContainerLogPluginConfFilePath)
agentVersion := output.FLBPluginConfigKey(ctx, "AgentVersion")
InitializePlugin(ContainerLogPluginConfFilePath, agentVersion)
enableTelemetry := output.FLBPluginConfigKey(ctx, "EnableTelemetry")
if strings.Compare(strings.ToLower(enableTelemetry), "true") == 0 {
telemetryPushInterval := output.FLBPluginConfigKey(ctx, "TelemetryPushIntervalSeconds")
agentVersion := output.FLBPluginConfigKey(ctx, "AgentVersion")
go SendContainerLogFlushRateMetric(telemetryPushInterval, agentVersion)
go SendContainerLogPluginMetrics(telemetryPushInterval)
} else {
Log("Telemetry is not enabled for the plugin %s \n", output.FLBPluginConfigKey(ctx, "Name"))
return output.FLB_OK
Expand All @@ -50,6 +50,12 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
}
records = append(records, record)
}

incomingTag := C.GoString(tag)
if strings.Contains(strings.ToLower(incomingTag), "oms.container.log.flbplugin") {
return PushToAppInsightsTraces(records)
}

return PostDataHelper(records)
}

Expand Down
102 changes: 54 additions & 48 deletions source/code/go/src/plugins/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ import (
"encoding/base64"
"errors"
"os"
"runtime"
"strconv"
"strings"
"time"

"github.com/Microsoft/ApplicationInsights-Go/appinsights"
"github.com/fluent/fluent-bit-go/output"
)

var (
Expand Down Expand Up @@ -41,8 +41,8 @@ const (
eventNameDaemonSetHeartbeat = "ContainerLogDaemonSetHeartbeatEvent"
)

// initialize initializes the telemetry artifacts
func initialize(telemetryPushIntervalProperty string, agentVersion string) (int, error) {
// SendContainerLogPluginMetrics is a go-routine that flushes the data periodically (every 5 mins to App Insights)
func SendContainerLogPluginMetrics(telemetryPushIntervalProperty string) {

telemetryPushInterval, err := strconv.Atoi(telemetryPushIntervalProperty)
if err != nil {
Expand All @@ -52,6 +52,49 @@ func initialize(telemetryPushIntervalProperty string, agentVersion string) (int,

ContainerLogTelemetryTicker = time.NewTicker(time.Second * time.Duration(telemetryPushInterval))

start := time.Now()
SendEvent(eventNameContainerLogInit, make(map[string]string))

for ; true; <-ContainerLogTelemetryTicker.C {
SendEvent(eventNameDaemonSetHeartbeat, make(map[string]string))
elapsed := time.Since(start)
ContainerLogTelemetryMutex.Lock()
flushRate := FlushedRecordsCount / FlushedRecordsTimeTaken * 1000
logRate := FlushedRecordsCount / float64(elapsed/time.Second)
FlushedRecordsCount = 0.0
FlushedRecordsTimeTaken = 0.0
ContainerLogTelemetryMutex.Unlock()

flushRateMetric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate)
TelemetryClient.Track(flushRateMetric)
logRateMetric := appinsights.NewMetricTelemetry(metricNameAvgLogGenerationRate, logRate)
TelemetryClient.Track(logRateMetric)
start = time.Now()
}
}

// SendEvent sends an event to App Insights
func SendEvent(eventName string, dimensions map[string]string) {
Log("Sending Event : %s\n", eventName)
event := appinsights.NewEventTelemetry(eventName)

// add any extra Properties
for k, v := range dimensions {
event.Properties[k] = v
}

TelemetryClient.Track(event)
}

// SendException send an event to the configured app insights instance
func SendException(err interface{}) {
if TelemetryClient != nil {
TelemetryClient.TrackException(err)
}
}

// InitializeTelemetryClient sets up the telemetry client to send telemetry to the App Insights instance
func InitializeTelemetryClient(agentVersion string) (int, error) {
encodedIkey := os.Getenv(envAppInsightsAuth)
if encodedIkey == "" {
Log("Environment Variable Missing \n")
Expand Down Expand Up @@ -103,51 +146,14 @@ func initialize(telemetryPushIntervalProperty string, agentVersion string) (int,
return 0, nil
}

// SendContainerLogFlushRateMetric is a go-routine that flushes the data periodically (every 5 mins to App Insights)
func SendContainerLogFlushRateMetric(telemetryPushIntervalProperty string, agentVersion string) {

ret, err := initialize(telemetryPushIntervalProperty, agentVersion)
if ret != 0 || err != nil {
Log("Error During Telemetry Initialization :%s", err.Error())
runtime.Goexit()
}
start := time.Now()
SendEvent(eventNameContainerLogInit, make(map[string]string))

for ; true; <-ContainerLogTelemetryTicker.C {
SendEvent(eventNameDaemonSetHeartbeat, make(map[string]string))
elapsed := time.Since(start)
ContainerLogTelemetryMutex.Lock()
flushRate := FlushedRecordsCount / FlushedRecordsTimeTaken * 1000
logRate := FlushedRecordsCount / float64(elapsed/time.Second)
FlushedRecordsCount = 0.0
FlushedRecordsTimeTaken = 0.0
ContainerLogTelemetryMutex.Unlock()

flushRateMetric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate)
TelemetryClient.Track(flushRateMetric)
logRateMetric := appinsights.NewMetricTelemetry(metricNameAvgLogGenerationRate, logRate)
TelemetryClient.Track(logRateMetric)
start = time.Now()
}
}

// SendEvent sends an event to App Insights
func SendEvent(eventName string, dimensions map[string]string) {
Log("Sending Event : %s\n", eventName)
event := appinsights.NewEventTelemetry(eventName)

// add any extra Properties
for k, v := range dimensions {
event.Properties[k] = v
// PushToAppInsightsTraces sends the log lines as trace messages to the configured App Insights Instance
func PushToAppInsightsTraces(records []map[interface{}]interface{}) int {
var logLines []string
for _, record := range records {
logLines = append(logLines, ToString(record["log"]))
}

TelemetryClient.Track(event)
}

// SendException send an event to the configured app insights instance
func SendException(err interface{}) {
if TelemetryClient != nil {
TelemetryClient.TrackException(err)
}
traceEntry := strings.Join(logLines, "\n")
TelemetryClient.TrackTrace(traceEntry, 1)
return output.FLB_OK
}
21 changes: 17 additions & 4 deletions source/code/go/src/plugins/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"
"os"
"strings"
"time"
)

// ReadConfiguration reads a property file
Expand All @@ -21,8 +22,8 @@ func ReadConfiguration(filename string) (map[string]string, error) {
file, err := os.Open(filename)
if err != nil {
SendException(err)
log.Fatal(err)

time.Sleep(30 * time.Second)
fmt.Printf("%s", err.Error())
return nil, err
}
defer file.Close()
Expand All @@ -43,7 +44,8 @@ func ReadConfiguration(filename string) (map[string]string, error) {

if err := scanner.Err(); err != nil {
SendException(err)
log.Fatal(err)
time.Sleep(30 * time.Second)
log.Fatalf("%s", err.Error())
return nil, err
}

Expand All @@ -52,11 +54,11 @@ func ReadConfiguration(filename string) (map[string]string, error) {

// CreateHTTPClient used to create the client for sending post requests to OMSEndpoint
func CreateHTTPClient() {

cert, err := tls.LoadX509KeyPair(PluginConfiguration["cert_file_path"], PluginConfiguration["key_file_path"])
if err != nil {
message := fmt.Sprintf("Error when loading cert %s", err.Error())
SendException(message)
time.Sleep(30 * time.Second)
Log(message)
log.Fatalf("Error when loading cert %s", err.Error())
}
Expand All @@ -72,3 +74,14 @@ func CreateHTTPClient() {

Log("Successfully created HTTP Client")
}

// ToString converts an interface into a string
func ToString(s interface{}) string {
switch t := s.(type) {
case []byte:
// prevent encoding to base64
return string(t)
default:
return ""
}
}