Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 60 additions & 18 deletions source/code/go/src/plugins/oms.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,18 @@ var (
IgnoreIDSet map[string]bool
// DataUpdateMutex read and write mutex access to the container id set
DataUpdateMutex = &sync.Mutex{}
// ContainerLogTelemetryMutex read and write mutex access to the Container Log Telemetry
ContainerLogTelemetryMutex = &sync.Mutex{}

// ClientSet for querying KubeAPIs
ClientSet *kubernetes.Clientset
)

var (
// KubeSystemContainersRefreshTicker updates the kube-system containers
KubeSystemContainersRefreshTicker = time.NewTicker(time.Second * 300)
KubeSystemContainersRefreshTicker *time.Ticker
// ContainerImageNameRefreshTicker updates the container image and names periodically
ContainerImageNameRefreshTicker = time.NewTicker(time.Second * 60)
ContainerImageNameRefreshTicker *time.Ticker
)

var (
Expand Down Expand Up @@ -99,6 +102,7 @@ func createLogger() *log.Logger {
fmt.Printf("File Exists. Opening file in append mode...\n")
logfile, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
SendException(err.Error())
fmt.Printf(err.Error())
}
}
Expand All @@ -107,6 +111,7 @@ func createLogger() *log.Logger {
fmt.Printf("File Doesnt Exist. Creating file...\n")
logfile, err = os.Create(path)
if err != nil {
SendException(err.Error())
fmt.Printf(err.Error())
}
}
Expand Down Expand Up @@ -134,7 +139,9 @@ func updateContainerImageNameMaps() {

pods, err := ClientSet.CoreV1().Pods("").List(metav1.ListOptions{})
if err != nil {
Log("Error getting pods %s\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error())
message := fmt.Sprintf("Error getting pods %s\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error())
Log(message)
SendException(message)
continue
}

Expand Down Expand Up @@ -171,7 +178,9 @@ func updateKubeSystemContainerIDs() {

pods, err := ClientSet.CoreV1().Pods("kube-system").List(metav1.ListOptions{})
if err != nil {
Log("Error getting pods %s\nIt is ok to log here and continue. Kube-system logs will be collected", err.Error())
message := fmt.Sprintf("Error getting pods %s\nIt is ok to log here and continue. Kube-system logs will be collected", err.Error())
SendException(message)
Log(message)
continue
}

Expand All @@ -194,17 +203,29 @@ func updateKubeSystemContainerIDs() {
// PostDataHelper sends data to the OMS endpoint
func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {

defer DataUpdateMutex.Unlock()

start := time.Now()
var dataItems []DataItem
ignoreIDSet := make(map[string]bool)
imageIDMap := make(map[string]string)
nameIDMap := make(map[string]string)

DataUpdateMutex.Lock()
for k, v := range IgnoreIDSet {
ignoreIDSet[k] = v
}
for k, v := range ImageIDMap {
imageIDMap[k] = v
}
for k, v := range NameIDMap {
nameIDMap[k] = v
}
DataUpdateMutex.Unlock()

for _, record := range tailPluginRecords {

containerID := GetContainerIDFromFilePath(toString(record["filepath"]))

if containerID == "" || containsKey(IgnoreIDSet, containerID) {
if containerID == "" || containsKey(ignoreIDSet, containerID) {
continue
}

Expand All @@ -216,13 +237,13 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
stringMap["SourceSystem"] = "Containers"
stringMap["Id"] = containerID

if val, ok := ImageIDMap[containerID]; ok {
if val, ok := imageIDMap[containerID]; ok {
stringMap["Image"] = val
} else {
Log("ContainerId %s not present in Map ", containerID)
}

if val, ok := NameIDMap[containerID]; ok {
if val, ok := nameIDMap[containerID]; ok {
stringMap["Name"] = val
} else {
Log("ContainerId %s not present in Map ", containerID)
Expand Down Expand Up @@ -250,7 +271,9 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {

marshalled, err := json.Marshal(logEntry)
if err != nil {
Log("Error while Marshalling log Entry: %s", err.Error())
message := fmt.Sprintf("Error while Marshalling log Entry: %s", err.Error())
Log(message)
SendException(message)
return output.FLB_OK
}
req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled))
Expand All @@ -260,8 +283,11 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
elapsed := time.Since(start)

if err != nil {
Log("Error when sending request %s \n", err.Error())
message := fmt.Sprintf("Error when sending request %s \n", err.Error())
Log(message)
SendException(message)
Log("Failed to flush %d records after %s", len(dataItems), elapsed)

return output.FLB_RETRY
}

Expand All @@ -274,8 +300,10 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {

numRecords := len(dataItems)
Log("Successfully flushed %d records in %s", numRecords, elapsed)
ContainerLogTelemetryMutex.Lock()
FlushedRecordsCount += float64(numRecords)
FlushedRecordsTimeTaken += float64(elapsed / time.Millisecond)
ContainerLogTelemetryMutex.Unlock()
}

return output.FLB_OK
Expand Down Expand Up @@ -318,13 +346,17 @@ func InitializePlugin(pluginConfPath string) {

pluginConfig, err := ReadConfiguration(pluginConfPath)
if err != nil {
Log("Error Reading plugin config path : %s \n", err.Error())
log.Fatalf("Error Reading plugin config path : %s \n", err.Error())
message := fmt.Sprintf("Error Reading plugin config path : %s \n", err.Error())
Log(message)
SendException(message)
time.Sleep(30 * time.Second)
log.Fatalln(message)
}

omsadminConf, err := ReadConfiguration(pluginConfig["omsadmin_conf_path"])
if err != nil {
Log(err.Error())
SendException(err.Error())
log.Fatalf("Error Reading omsadmin configuration %s\n", err.Error())
}
OMSEndpoint = omsadminConf["OMS_ENDPOINT"]
Expand All @@ -334,7 +366,9 @@ func InitializePlugin(pluginConfPath string) {
// Initialize image,name map refresh ticker
containerInventoryRefreshInterval, err := strconv.Atoi(pluginConfig["container_inventory_refresh_interval"])
if err != nil {
Log("Error Reading Container Inventory Refresh Interval %s", err.Error())
message := fmt.Sprintf("Error Reading Container Inventory Refresh Interval %s", err.Error())
Log(message)
SendException(message)
Log("Using Default Refresh Interval of %d s\n", defaultContainerInventoryRefreshInterval)
containerInventoryRefreshInterval = defaultContainerInventoryRefreshInterval
}
Expand All @@ -344,7 +378,9 @@ func InitializePlugin(pluginConfPath string) {
// Initialize Kube System Refresh Ticker
kubeSystemContainersRefreshInterval, err := strconv.Atoi(pluginConfig["kube_system_containers_refresh_interval"])
if err != nil {
Log("Error Reading Kube System Container Ids Refresh Interval %s", err.Error())
message := fmt.Sprintf("Error Reading Kube System Container Ids Refresh Interval %s", err.Error())
Log(message)
SendException(message)
Log("Using Default Refresh Interval of %d s\n", defaultKubeSystemContainersRefreshInterval)
kubeSystemContainersRefreshInterval = defaultKubeSystemContainersRefreshInterval
}
Expand All @@ -356,20 +392,26 @@ func InitializePlugin(pluginConfPath string) {
if err != nil {
// It is ok to log here and continue, because only the Computer column will be missing,
// which can be deduced from a combination of containerId, and docker logs on the node
Log("Error when reading containerHostName file %s.\n It is ok to log here and continue, because only the Computer column will be missing, which can be deduced from a combination of containerId, and docker logs on the nodes\n", err.Error())
message := fmt.Sprintf("Error when reading containerHostName file %s.\n It is ok to log here and continue, because only the Computer column will be missing, which can be deduced from a combination of containerId, and docker logs on the nodes\n", err.Error())
Log(message)
SendException(message)
}
Computer = strings.TrimSuffix(toString(containerHostName), "\n")
Log("Computer == %s \n", Computer)

// Initialize KubeAPI Client
config, err := rest.InClusterConfig()
if err != nil {
Log("Error getting config %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error())
message := fmt.Sprintf("Error getting config %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error())
Log(message)
SendException(message)
}

ClientSet, err = kubernetes.NewForConfig(config)
if err != nil {
Log("Error getting clientset %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error())
message := fmt.Sprintf("Error getting clientset %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error())
SendException(message)
Log(message)
}

PluginConfiguration = pluginConfig
Expand Down
3 changes: 0 additions & 3 deletions source/code/go/src/plugins/out_oms.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ func FLBPluginInit(ctx unsafe.Pointer) int {

//export FLBPluginFlush
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
var count int
var ret int
var record map[interface{}]interface{}
var records []map[interface{}]interface{}
Expand All @@ -43,15 +42,13 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
dec := output.NewDecoder(data, int(length))

// Iterate Records
count = 0
for {
// Extract Record
ret, _, record = output.GetRecord(dec)
if ret != 0 {
break
}
records = append(records, record)
count++
}
return PostDataHelper(records)
}
Expand Down
29 changes: 21 additions & 8 deletions source/code/go/src/plugins/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,14 @@ const (
envACSResourceName = "ACS_RESOURCE_NAME"
envAppInsightsAuth = "APPLICATIONINSIGHTS_AUTH"
metricNameAvgFlushRate = "ContainerLogAvgRecordsFlushedPerSec"
metricNameAvgLogGenerationRate = "ContainerLogsGeneratedPerSec"
defaultTelemetryPushIntervalSeconds = 300

eventNameContainerLogInit = "ContainerLogPluginInitialized"
eventNameDaemonSetHeartbeat = "ContainerLogDaemonSetHeartbeatEvent"
)

// Initialize initializes the telemetry artifacts
// initialize initializes the telemetry artifacts
func initialize(telemetryPushIntervalProperty string, agentVersion string) (int, error) {

telemetryPushInterval, err := strconv.Atoi(telemetryPushIntervalProperty)
Expand Down Expand Up @@ -87,7 +88,7 @@ func initialize(telemetryPushIntervalProperty string, agentVersion string) (int,
CommonProperties["ACSResourceName"] = ""
CommonProperties["AKS_RESOURCE_ID"] = aksResourceID
splitStrings := strings.Split(aksResourceID, "/")
if len(aksResourceID) > 0 && len(aksResourceID) < 10 {
if len(splitStrings) > 0 && len(splitStrings) < 10 {
CommonProperties["SubscriptionID"] = splitStrings[2]
CommonProperties["ResourceGroupName"] = splitStrings[4]
CommonProperties["ClusterName"] = splitStrings[8]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check for bounds rather than assuming [8] exists

Expand All @@ -110,19 +111,24 @@ func SendContainerLogFlushRateMetric(telemetryPushIntervalProperty string, agent
Log("Error During Telemetry Initialization :%s", err.Error())
runtime.Goexit()
}

start := time.Now()
SendEvent(eventNameContainerLogInit, make(map[string]string))

for ; true; <-ContainerLogTelemetryTicker.C {
SendEvent(eventNameDaemonSetHeartbeat, make(map[string]string))
DataUpdateMutex.Lock()
elapsed := time.Since(start)
ContainerLogTelemetryMutex.Lock()
flushRate := FlushedRecordsCount / FlushedRecordsTimeTaken * 1000
Log("Flushed Records : %f Time Taken : %f flush Rate : %f", FlushedRecordsCount, FlushedRecordsTimeTaken, flushRate)
logRate := FlushedRecordsCount / float64(elapsed/time.Second)
FlushedRecordsCount = 0.0
FlushedRecordsTimeTaken = 0.0
DataUpdateMutex.Unlock()
metric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate)
TelemetryClient.Track(metric)
ContainerLogTelemetryMutex.Unlock()

flushRateMetric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate)
TelemetryClient.Track(flushRateMetric)
logRateMetric := appinsights.NewMetricTelemetry(metricNameAvgLogGenerationRate, logRate)
TelemetryClient.Track(logRateMetric)
start = time.Now()
}
}

Expand All @@ -138,3 +144,10 @@ func SendEvent(eventName string, dimensions map[string]string) {

TelemetryClient.Track(event)
}

// SendException send an event to the configured app insights instance
func SendException(err interface{}) {
if TelemetryClient != nil {
TelemetryClient.TrackException(err)
}
}
8 changes: 7 additions & 1 deletion source/code/go/src/plugins/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"bufio"
"crypto/tls"
"fmt"
"log"
"net/http"
"os"
Expand All @@ -19,7 +20,9 @@ func ReadConfiguration(filename string) (map[string]string, error) {

file, err := os.Open(filename)
if err != nil {
SendException(err)
log.Fatal(err)

return nil, err
}
defer file.Close()
Expand All @@ -39,6 +42,7 @@ func ReadConfiguration(filename string) (map[string]string, error) {
}

if err := scanner.Err(); err != nil {
SendException(err)
log.Fatal(err)
return nil, err
}
Expand All @@ -51,7 +55,9 @@ func CreateHTTPClient() {

cert, err := tls.LoadX509KeyPair(PluginConfiguration["cert_file_path"], PluginConfiguration["key_file_path"])
if err != nil {
Log("Error when loading cert %s", err.Error())
message := fmt.Sprintf("Error when loading cert %s", err.Error())
SendException(message)
Log(message)
log.Fatalf("Error when loading cert %s", err.Error())
}

Expand Down