Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@

/test/code/providers/TestScriptPath.h
/test/code/providers/providertestutils.cpp
source/code/go/src/plugins/profiling
.vscode/launch.json
source/code/go/src/plugins/vendor/
10 changes: 9 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ additional questions or comments.

## Release History

### 10/16/2018 - Version microsoft/oms:ciprod10162018
- Fix for containerID being 00000-00000-00000
- Move from fluentD to fluentbit for container log collection
- Seg fault fixes in json parsing for container inventory & container image inventory
- Telemetry enablement
- Remove ContainerPerf, ContainerServiceLog, ContainerProcess fluentd-->OMI workflows
- Update log level for all fluentD based workflows

### 7/31/2018 - Version microsoft/oms:ciprod07312018
- Changes for node lost scenario (roll-up pod & container statuses as Unknown)
- Discover unscheduled pods
Expand All @@ -32,4 +40,4 @@ additional questions or comments.
- Kubernetes RBAC enablement
- Latest released omsagent (1.6.0-42)
- Bug fix so that we do not collect kube-system namespace container logs when kube api calls fail occasionally (Bug #215107)
- .yaml changes (for RBAC)
- .yaml changes (for RBAC)
28 changes: 1 addition & 27 deletions installer/conf/container.conf
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
</source>

# Filter for correct format to endpoint
<filter oms.container.containerinventory oms.container.imageinventory oms.container.servicelog>
<filter oms.container.containerinventory oms.container.imageinventory>
type filter_container
</filter>

Expand All @@ -63,19 +63,6 @@
max_retry_wait 9m
</match>

<match oms.api.ContainerProcess**>
type out_oms_api
log_level debug
buffer_chunk_limit 20m
buffer_type file
buffer_path %STATE_DIR_WS%/out_oms_containerprocess*.buffer
buffer_queue_limit 20
flush_interval 20s
retry_limit 10
retry_wait 15s
max_retry_wait 9m
</match>

<match oms.container.containerinventory**>
type out_oms
log_level debug
Expand All @@ -102,19 +89,6 @@
max_retry_wait 9m
</match>

<match oms.container.servicelog**>
type out_oms
log_level debug
buffer_chunk_limit 20m
buffer_type file
buffer_path %STATE_DIR_WS%/out_oms_servicelog*.buffer
buffer_queue_limit 20
flush_interval 20s
retry_limit 10
retry_wait 15s
max_retry_wait 9m
</match>

<match oms.api.cadvisorperf**>
type out_oms
log_level debug
Expand Down
9 changes: 5 additions & 4 deletions installer/conf/td-agent-bit.conf
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
Parser docker
Mem_Buf_Limit 30m
Path_Key filepath
Buffer_Chunk_Size 1m
Buffer_Max_Size 1m
Skip_Long_Lines On

[OUTPUT]
Name oms
Match oms.container.log.*
Name oms
EnableTelemetry true
TelemetryPushIntervalSeconds 300
Match oms.container.log.*
AgentVersion ciprod10162018-2
10 changes: 6 additions & 4 deletions source/code/go/src/plugins/glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions source/code/go/src/plugins/glide.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package: plugins
package: .
import:
- package: github.com/fluent/fluent-bit-go
subpackages:
- output
- package: github.com/mitchellh/mapstructure
version: ^1.0.0
- package: gopkg.in/natefinch/lumberjack.v2
version: ^2.1.0
- package: k8s.io/apimachinery
Expand All @@ -15,3 +13,7 @@ import:
subpackages:
- kubernetes
- rest
- package: github.com/Microsoft/ApplicationInsights-Go
version: ^0.4.2
subpackages:
- appinsights
9 changes: 8 additions & 1 deletion source/code/go/src/plugins/oms.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ var (
OMSEndpoint string
// Computer (Hostname) when ingesting into ContainerLog table
Computer string
// WorkspaceID log analytics workspace id
WorkspaceID string
)

var (
Expand Down Expand Up @@ -170,6 +172,7 @@ func updateKubeSystemContainerIDs() {
pods, err := ClientSet.CoreV1().Pods("kube-system").List(metav1.ListOptions{})
if err != nil {
Log("Error getting pods %s\nIt is ok to log here and continue. Kube-system logs will be collected", err.Error())
continue
}

_ignoreIDSet := make(map[string]bool)
Expand Down Expand Up @@ -269,7 +272,10 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
return output.FLB_RETRY
}

Log("Successfully flushed %d records in %s", len(dataItems), elapsed)
numRecords := len(dataItems)
Log("Successfully flushed %d records in %s", numRecords, elapsed)
FlushedRecordsCount += float64(numRecords)
FlushedRecordsTimeTaken += float64(elapsed / time.Millisecond)
}

return output.FLB_OK
Expand Down Expand Up @@ -322,6 +328,7 @@ func InitializePlugin(pluginConfPath string) {
log.Fatalf("Error Reading omsadmin configuration %s\n", err.Error())
}
OMSEndpoint = omsadminConf["OMS_ENDPOINT"]
WorkspaceID = omsadminConf["WORKSPACE_ID"]
Log("OMSEndpoint %s", OMSEndpoint)

// Initialize image,name map refresh ticker
Expand Down
11 changes: 11 additions & 0 deletions source/code/go/src/plugins/out_oms.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
)
import (
"C"
"strings"
"unsafe"
)

Expand All @@ -19,6 +20,15 @@ func FLBPluginRegister(ctx unsafe.Pointer) int {
func FLBPluginInit(ctx unsafe.Pointer) int {
Log("Initializing out_oms go plugin for fluentbit")
InitializePlugin(ContainerLogPluginConfFilePath)
enableTelemetry := output.FLBPluginConfigKey(ctx, "EnableTelemetry")
if strings.Compare(strings.ToLower(enableTelemetry), "true") == 0 {
telemetryPushInterval := output.FLBPluginConfigKey(ctx, "TelemetryPushIntervalSeconds")
agentVersion := output.FLBPluginConfigKey(ctx, "AgentVersion")
go SendContainerLogFlushRateMetric(telemetryPushInterval, agentVersion)
} else {
Log("Telemetry is not enabled for the plugin %s \n", output.FLBPluginConfigKey(ctx, "Name"))
return output.FLB_OK
}
return output.FLB_OK
}

Expand Down Expand Up @@ -48,6 +58,7 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {

// FLBPluginExit exits the plugin
func FLBPluginExit() int {
ContainerLogTelemetryTicker.Stop()
KubeSystemContainersRefreshTicker.Stop()
ContainerImageNameRefreshTicker.Stop()
return output.FLB_OK
Expand Down
140 changes: 140 additions & 0 deletions source/code/go/src/plugins/telemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package main

import (
"encoding/base64"
"errors"
"os"
"runtime"
"strconv"
"strings"
"time"

"github.com/Microsoft/ApplicationInsights-Go/appinsights"
)

var (
// FlushedRecordsCount indicates the number of flushed records in the current period
FlushedRecordsCount float64
// FlushedRecordsTimeTaken indicates the cumulative time taken to flush the records for the current period
FlushedRecordsTimeTaken float64
// CommonProperties indicates the dimensions that are sent with every event/metric
CommonProperties map[string]string
// TelemetryClient is the client used to send the telemetry
TelemetryClient appinsights.TelemetryClient
// ContainerLogTelemetryTicker sends telemetry periodically
ContainerLogTelemetryTicker *time.Ticker
)

const (
clusterTypeACS = "ACS"
clusterTypeAKS = "AKS"
controllerTypeDaemonSet = "DaemonSet"
controllerTypeReplicaSet = "ReplicaSet"
envAKSResourceID = "AKS_RESOURCE_ID"
envACSResourceName = "ACS_RESOURCE_NAME"
envAppInsightsAuth = "APPLICATIONINSIGHTS_AUTH"
metricNameAvgFlushRate = "ContainerLogAvgRecordsFlushedPerSec"
defaultTelemetryPushIntervalSeconds = 300

eventNameContainerLogInit = "ContainerLogPluginInitialized"
eventNameDaemonSetHeartbeat = "ContainerLogDaemonSetHeartbeatEvent"
)

// Initialize initializes the telemetry artifacts
func initialize(telemetryPushIntervalProperty string, agentVersion string) (int, error) {

telemetryPushInterval, err := strconv.Atoi(telemetryPushIntervalProperty)
if err != nil {
Log("Error Converting telemetryPushIntervalProperty %s. Using Default Interval... %d \n", telemetryPushIntervalProperty, defaultTelemetryPushIntervalSeconds)
telemetryPushInterval = defaultTelemetryPushIntervalSeconds
}

ContainerLogTelemetryTicker = time.NewTicker(time.Second * time.Duration(telemetryPushInterval))

encodedIkey := os.Getenv(envAppInsightsAuth)
if encodedIkey == "" {
Log("Environment Variable Missing \n")
return -1, errors.New("Missing Environment Variable")
}

decIkey, err := base64.StdEncoding.DecodeString(encodedIkey)
if err != nil {
Log("Decoding Error %s", err.Error())
return -1, err
}

TelemetryClient = appinsights.NewTelemetryClient(string(decIkey))

CommonProperties = make(map[string]string)
CommonProperties["Computer"] = Computer
CommonProperties["WorkspaceID"] = WorkspaceID
CommonProperties["ControllerType"] = controllerTypeDaemonSet
CommonProperties["AgentVersion"] = agentVersion

aksResourceID := os.Getenv(envAKSResourceID)
// if the aks resource id is not defined, it is most likely an ACS Cluster
if aksResourceID == "" {
CommonProperties["ACSResourceName"] = os.Getenv(envACSResourceName)
CommonProperties["ClusterType"] = clusterTypeACS

CommonProperties["SubscriptionID"] = ""
CommonProperties["ResourceGroupName"] = ""
CommonProperties["ClusterName"] = ""
CommonProperties["Region"] = ""
CommonProperties["AKS_RESOURCE_ID"] = ""

} else {
CommonProperties["ACSResourceName"] = ""
CommonProperties["AKS_RESOURCE_ID"] = aksResourceID
splitStrings := strings.Split(aksResourceID, "/")
if len(aksResourceID) > 0 && len(aksResourceID) < 10 {
CommonProperties["SubscriptionID"] = splitStrings[2]
CommonProperties["ResourceGroupName"] = splitStrings[4]
CommonProperties["ClusterName"] = splitStrings[8]
}
CommonProperties["ClusterType"] = clusterTypeAKS

region := os.Getenv("AKS_REGION")
CommonProperties["Region"] = region
}

TelemetryClient.Context().CommonProperties = CommonProperties
return 0, nil
}

// SendContainerLogFlushRateMetric is a go-routine that flushes the data periodically (every 5 mins to App Insights)
func SendContainerLogFlushRateMetric(telemetryPushIntervalProperty string, agentVersion string) {

ret, err := initialize(telemetryPushIntervalProperty, agentVersion)
if ret != 0 || err != nil {
Log("Error During Telemetry Initialization :%s", err.Error())
runtime.Goexit()
}

SendEvent(eventNameContainerLogInit, make(map[string]string))

for ; true; <-ContainerLogTelemetryTicker.C {
SendEvent(eventNameDaemonSetHeartbeat, make(map[string]string))
DataUpdateMutex.Lock()
flushRate := FlushedRecordsCount / FlushedRecordsTimeTaken * 1000
Log("Flushed Records : %f Time Taken : %f flush Rate : %f", FlushedRecordsCount, FlushedRecordsTimeTaken, flushRate)
FlushedRecordsCount = 0.0
FlushedRecordsTimeTaken = 0.0
DataUpdateMutex.Unlock()
metric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate)
TelemetryClient.Track(metric)
}
}

// SendEvent sends an event to App Insights
func SendEvent(eventName string, dimensions map[string]string) {
Log("Sending Event : %s\n", eventName)
event := appinsights.NewEventTelemetry(eventName)

// add any extra Properties
for k, v := range dimensions {
event.Properties[k] = v
}

TelemetryClient.Track(event)
}