diff --git a/.gitignore b/.gitignore index 92c8c0cf2..e58d69f7b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,6 @@ /test/code/providers/TestScriptPath.h /test/code/providers/providertestutils.cpp +source/code/go/src/plugins/profiling +.vscode/launch.json +source/code/go/src/plugins/vendor/ \ No newline at end of file diff --git a/README.md b/README.md index a822f6f97..0c543e716 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,14 @@ additional questions or comments. ## Release History +### 10/16/2018 - Version microsoft/oms:ciprod10162018 +- Fix for containerID being 00000-00000-00000 +- Move from fluentD to fluentbit for container log collection +- Seg fault fixes in json parsing for container inventory & container image inventory +- Telemetry enablement +- Remove ContainerPerf, ContainerServiceLog, ContainerProcess fluentd-->OMI workflows +- Update log level for all fluentD based workflows + ### 7/31/2018 - Version microsoft/oms:ciprod07312018 - Changes for node lost scenario (roll-up pod & container statuses as Unknown) - Discover unscheduled pods @@ -32,4 +40,4 @@ additional questions or comments. - Kubernetes RBAC enablement - Latest released omsagent (1.6.0-42) - Bug fix so that we do not collect kube-system namespace container logs when kube api calls fail occasionally (Bug #215107) -- .yaml changes (for RBAC) +- .yaml changes (for RBAC) \ No newline at end of file diff --git a/installer/conf/container.conf b/installer/conf/container.conf index 1916300cb..17317871c 100755 --- a/installer/conf/container.conf +++ b/installer/conf/container.conf @@ -46,7 +46,7 @@ # Filter for correct format to endpoint - + type filter_container @@ -63,19 +63,6 @@ max_retry_wait 9m - - type out_oms_api - log_level debug - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_containerprocess*.buffer - buffer_queue_limit 20 - flush_interval 20s - retry_limit 10 - retry_wait 15s - max_retry_wait 9m - - type out_oms log_level debug @@ -102,19 +89,6 @@ max_retry_wait 9m - - type out_oms - log_level debug - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_servicelog*.buffer - buffer_queue_limit 20 - flush_interval 20s - retry_limit 10 - retry_wait 15s - max_retry_wait 9m - - type out_oms log_level debug diff --git a/installer/conf/td-agent-bit.conf b/installer/conf/td-agent-bit.conf index b5d2309e1..b39587a97 100644 --- a/installer/conf/td-agent-bit.conf +++ b/installer/conf/td-agent-bit.conf @@ -12,10 +12,11 @@ Parser docker Mem_Buf_Limit 30m Path_Key filepath - Buffer_Chunk_Size 1m - Buffer_Max_Size 1m Skip_Long_Lines On [OUTPUT] - Name oms - Match oms.container.log.* \ No newline at end of file + Name oms + EnableTelemetry true + TelemetryPushIntervalSeconds 300 + Match oms.container.log.* + AgentVersion ciprod10162018-2 diff --git a/source/code/go/src/plugins/glide.lock b/source/code/go/src/plugins/glide.lock index 4597b594a..fc147fe74 100644 --- a/source/code/go/src/plugins/glide.lock +++ b/source/code/go/src/plugins/glide.lock @@ -1,5 +1,5 @@ -hash: bb32415f402ab29751f29b8e394bc974cbc31861453d817aaeb94ef83dacc488 -updated: 2018-09-14T18:14:28.748047598Z +hash: a6a873d09ed9c3d890a70122e61efba992ead9850fe48f6fcb020d86800d4ade +updated: 2018-10-10T13:37:51.9703908-07:00 imports: - name: github.com/fluent/fluent-bit-go version: c4a158a6e3a793166c6ecfa2d5c80d71eada8959 @@ -38,8 +38,10 @@ imports: - diskcache - name: github.com/json-iterator/go version: f2b4162afba35581b6d4a50d3b8f34e33c144682 -- name: github.com/mitchellh/mapstructure - version: fa473d140ef3c6adf42d6b391fe76707f1f243c8 +- name: github.com/Microsoft/ApplicationInsights-Go + version: d2df5d440eda5372f24fcac03839a64d6cb5f7e5 + subpackages: + - appinsights - name: github.com/modern-go/concurrent version: bacd9c7ef1dd9b15be4a9909b8ac7a4e313eec94 - name: github.com/modern-go/reflect2 diff --git a/source/code/go/src/plugins/glide.yaml b/source/code/go/src/plugins/glide.yaml index 403e1efc4..b2829391b 100644 --- a/source/code/go/src/plugins/glide.yaml +++ b/source/code/go/src/plugins/glide.yaml @@ -1,10 +1,8 @@ -package: plugins +package: . import: - package: github.com/fluent/fluent-bit-go subpackages: - output -- package: github.com/mitchellh/mapstructure - version: ^1.0.0 - package: gopkg.in/natefinch/lumberjack.v2 version: ^2.1.0 - package: k8s.io/apimachinery @@ -15,3 +13,7 @@ import: subpackages: - kubernetes - rest +- package: github.com/Microsoft/ApplicationInsights-Go + version: ^0.4.2 + subpackages: + - appinsights diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go index d20f11d57..807e00937 100644 --- a/source/code/go/src/plugins/oms.go +++ b/source/code/go/src/plugins/oms.go @@ -42,6 +42,8 @@ var ( OMSEndpoint string // Computer (Hostname) when ingesting into ContainerLog table Computer string + // WorkspaceID log analytics workspace id + WorkspaceID string ) var ( @@ -170,6 +172,7 @@ func updateKubeSystemContainerIDs() { pods, err := ClientSet.CoreV1().Pods("kube-system").List(metav1.ListOptions{}) if err != nil { Log("Error getting pods %s\nIt is ok to log here and continue. Kube-system logs will be collected", err.Error()) + continue } _ignoreIDSet := make(map[string]bool) @@ -269,7 +272,10 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { return output.FLB_RETRY } - Log("Successfully flushed %d records in %s", len(dataItems), elapsed) + numRecords := len(dataItems) + Log("Successfully flushed %d records in %s", numRecords, elapsed) + FlushedRecordsCount += float64(numRecords) + FlushedRecordsTimeTaken += float64(elapsed / time.Millisecond) } return output.FLB_OK @@ -322,6 +328,7 @@ func InitializePlugin(pluginConfPath string) { log.Fatalf("Error Reading omsadmin configuration %s\n", err.Error()) } OMSEndpoint = omsadminConf["OMS_ENDPOINT"] + WorkspaceID = omsadminConf["WORKSPACE_ID"] Log("OMSEndpoint %s", OMSEndpoint) // Initialize image,name map refresh ticker diff --git a/source/code/go/src/plugins/out_oms.go b/source/code/go/src/plugins/out_oms.go index 0efc1242d..732ae5216 100644 --- a/source/code/go/src/plugins/out_oms.go +++ b/source/code/go/src/plugins/out_oms.go @@ -5,6 +5,7 @@ import ( ) import ( "C" + "strings" "unsafe" ) @@ -19,6 +20,15 @@ func FLBPluginRegister(ctx unsafe.Pointer) int { func FLBPluginInit(ctx unsafe.Pointer) int { Log("Initializing out_oms go plugin for fluentbit") InitializePlugin(ContainerLogPluginConfFilePath) + enableTelemetry := output.FLBPluginConfigKey(ctx, "EnableTelemetry") + if strings.Compare(strings.ToLower(enableTelemetry), "true") == 0 { + telemetryPushInterval := output.FLBPluginConfigKey(ctx, "TelemetryPushIntervalSeconds") + agentVersion := output.FLBPluginConfigKey(ctx, "AgentVersion") + go SendContainerLogFlushRateMetric(telemetryPushInterval, agentVersion) + } else { + Log("Telemetry is not enabled for the plugin %s \n", output.FLBPluginConfigKey(ctx, "Name")) + return output.FLB_OK + } return output.FLB_OK } @@ -48,6 +58,7 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { // FLBPluginExit exits the plugin func FLBPluginExit() int { + ContainerLogTelemetryTicker.Stop() KubeSystemContainersRefreshTicker.Stop() ContainerImageNameRefreshTicker.Stop() return output.FLB_OK diff --git a/source/code/go/src/plugins/telemetry.go b/source/code/go/src/plugins/telemetry.go new file mode 100644 index 000000000..b1bc4439b --- /dev/null +++ b/source/code/go/src/plugins/telemetry.go @@ -0,0 +1,140 @@ +package main + +import ( + "encoding/base64" + "errors" + "os" + "runtime" + "strconv" + "strings" + "time" + + "github.com/Microsoft/ApplicationInsights-Go/appinsights" +) + +var ( + // FlushedRecordsCount indicates the number of flushed records in the current period + FlushedRecordsCount float64 + // FlushedRecordsTimeTaken indicates the cumulative time taken to flush the records for the current period + FlushedRecordsTimeTaken float64 + // CommonProperties indicates the dimensions that are sent with every event/metric + CommonProperties map[string]string + // TelemetryClient is the client used to send the telemetry + TelemetryClient appinsights.TelemetryClient + // ContainerLogTelemetryTicker sends telemetry periodically + ContainerLogTelemetryTicker *time.Ticker +) + +const ( + clusterTypeACS = "ACS" + clusterTypeAKS = "AKS" + controllerTypeDaemonSet = "DaemonSet" + controllerTypeReplicaSet = "ReplicaSet" + envAKSResourceID = "AKS_RESOURCE_ID" + envACSResourceName = "ACS_RESOURCE_NAME" + envAppInsightsAuth = "APPLICATIONINSIGHTS_AUTH" + metricNameAvgFlushRate = "ContainerLogAvgRecordsFlushedPerSec" + defaultTelemetryPushIntervalSeconds = 300 + + eventNameContainerLogInit = "ContainerLogPluginInitialized" + eventNameDaemonSetHeartbeat = "ContainerLogDaemonSetHeartbeatEvent" +) + +// Initialize initializes the telemetry artifacts +func initialize(telemetryPushIntervalProperty string, agentVersion string) (int, error) { + + telemetryPushInterval, err := strconv.Atoi(telemetryPushIntervalProperty) + if err != nil { + Log("Error Converting telemetryPushIntervalProperty %s. Using Default Interval... %d \n", telemetryPushIntervalProperty, defaultTelemetryPushIntervalSeconds) + telemetryPushInterval = defaultTelemetryPushIntervalSeconds + } + + ContainerLogTelemetryTicker = time.NewTicker(time.Second * time.Duration(telemetryPushInterval)) + + encodedIkey := os.Getenv(envAppInsightsAuth) + if encodedIkey == "" { + Log("Environment Variable Missing \n") + return -1, errors.New("Missing Environment Variable") + } + + decIkey, err := base64.StdEncoding.DecodeString(encodedIkey) + if err != nil { + Log("Decoding Error %s", err.Error()) + return -1, err + } + + TelemetryClient = appinsights.NewTelemetryClient(string(decIkey)) + + CommonProperties = make(map[string]string) + CommonProperties["Computer"] = Computer + CommonProperties["WorkspaceID"] = WorkspaceID + CommonProperties["ControllerType"] = controllerTypeDaemonSet + CommonProperties["AgentVersion"] = agentVersion + + aksResourceID := os.Getenv(envAKSResourceID) + // if the aks resource id is not defined, it is most likely an ACS Cluster + if aksResourceID == "" { + CommonProperties["ACSResourceName"] = os.Getenv(envACSResourceName) + CommonProperties["ClusterType"] = clusterTypeACS + + CommonProperties["SubscriptionID"] = "" + CommonProperties["ResourceGroupName"] = "" + CommonProperties["ClusterName"] = "" + CommonProperties["Region"] = "" + CommonProperties["AKS_RESOURCE_ID"] = "" + + } else { + CommonProperties["ACSResourceName"] = "" + CommonProperties["AKS_RESOURCE_ID"] = aksResourceID + splitStrings := strings.Split(aksResourceID, "/") + if len(aksResourceID) > 0 && len(aksResourceID) < 10 { + CommonProperties["SubscriptionID"] = splitStrings[2] + CommonProperties["ResourceGroupName"] = splitStrings[4] + CommonProperties["ClusterName"] = splitStrings[8] + } + CommonProperties["ClusterType"] = clusterTypeAKS + + region := os.Getenv("AKS_REGION") + CommonProperties["Region"] = region + } + + TelemetryClient.Context().CommonProperties = CommonProperties + return 0, nil +} + +// SendContainerLogFlushRateMetric is a go-routine that flushes the data periodically (every 5 mins to App Insights) +func SendContainerLogFlushRateMetric(telemetryPushIntervalProperty string, agentVersion string) { + + ret, err := initialize(telemetryPushIntervalProperty, agentVersion) + if ret != 0 || err != nil { + Log("Error During Telemetry Initialization :%s", err.Error()) + runtime.Goexit() + } + + SendEvent(eventNameContainerLogInit, make(map[string]string)) + + for ; true; <-ContainerLogTelemetryTicker.C { + SendEvent(eventNameDaemonSetHeartbeat, make(map[string]string)) + DataUpdateMutex.Lock() + flushRate := FlushedRecordsCount / FlushedRecordsTimeTaken * 1000 + Log("Flushed Records : %f Time Taken : %f flush Rate : %f", FlushedRecordsCount, FlushedRecordsTimeTaken, flushRate) + FlushedRecordsCount = 0.0 + FlushedRecordsTimeTaken = 0.0 + DataUpdateMutex.Unlock() + metric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate) + TelemetryClient.Track(metric) + } +} + +// SendEvent sends an event to App Insights +func SendEvent(eventName string, dimensions map[string]string) { + Log("Sending Event : %s\n", eventName) + event := appinsights.NewEventTelemetry(eventName) + + // add any extra Properties + for k, v := range dimensions { + event.Properties[k] = v + } + + TelemetryClient.Track(event) +}