From c1761766f3c1897c6c163d54072d851ba53feda5 Mon Sep 17 00:00:00 2001 From: r-dilip Date: Tue, 23 Oct 2018 10:00:33 -0700 Subject: [PATCH 1/2] Telemetry Fixes 1. Added Log Generation Rate 2. Fixed parsing bugs 3. Added code to send Exceptions/errors --- source/code/go/src/plugins/oms.go | 44 +++++++++++++++++++++---- source/code/go/src/plugins/out_oms.go | 3 -- source/code/go/src/plugins/telemetry.go | 29 +++++++++++----- source/code/go/src/plugins/utils.go | 3 ++ 4 files changed, 61 insertions(+), 18 deletions(-) diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go index 807e00937..274a5a1d2 100644 --- a/source/code/go/src/plugins/oms.go +++ b/source/code/go/src/plugins/oms.go @@ -55,15 +55,18 @@ var ( IgnoreIDSet map[string]bool // DataUpdateMutex read and write mutex access to the container id set DataUpdateMutex = &sync.Mutex{} + // ContainerLogTelemetryMutex read and write mutex access to the Container Log Telemetry + ContainerLogTelemetryMutex = &sync.Mutex{} + // ClientSet for querying KubeAPIs ClientSet *kubernetes.Clientset ) var ( // KubeSystemContainersRefreshTicker updates the kube-system containers - KubeSystemContainersRefreshTicker = time.NewTicker(time.Second * 300) + KubeSystemContainersRefreshTicker *time.Ticker // ContainerImageNameRefreshTicker updates the container image and names periodically - ContainerImageNameRefreshTicker = time.NewTicker(time.Second * 60) + ContainerImageNameRefreshTicker *time.Ticker ) var ( @@ -99,6 +102,7 @@ func createLogger() *log.Logger { fmt.Printf("File Exists. Opening file in append mode...\n") logfile, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0600) if err != nil { + SendException(err) fmt.Printf(err.Error()) } } @@ -107,6 +111,7 @@ func createLogger() *log.Logger { fmt.Printf("File Doesnt Exist. Creating file...\n") logfile, err = os.Create(path) if err != nil { + SendException(err) fmt.Printf(err.Error()) } } @@ -134,6 +139,7 @@ func updateContainerImageNameMaps() { pods, err := ClientSet.CoreV1().Pods("").List(metav1.ListOptions{}) if err != nil { + SendException(err) Log("Error getting pods %s\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) continue } @@ -171,6 +177,7 @@ func updateKubeSystemContainerIDs() { pods, err := ClientSet.CoreV1().Pods("kube-system").List(metav1.ListOptions{}) if err != nil { + SendException(err) Log("Error getting pods %s\nIt is ok to log here and continue. Kube-system logs will be collected", err.Error()) continue } @@ -194,17 +201,29 @@ func updateKubeSystemContainerIDs() { // PostDataHelper sends data to the OMS endpoint func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { - defer DataUpdateMutex.Unlock() - start := time.Now() var dataItems []DataItem + ignoreIDSet := make(map[string]bool) + imageIDMap := make(map[string]string) + nameIDMap := make(map[string]string) + DataUpdateMutex.Lock() + for k, v := range IgnoreIDSet { + ignoreIDSet[k] = v + } + for k, v := range ImageIDMap { + imageIDMap[k] = v + } + for k, v := range NameIDMap { + nameIDMap[k] = v + } + DataUpdateMutex.Unlock() for _, record := range tailPluginRecords { containerID := GetContainerIDFromFilePath(toString(record["filepath"])) - if containerID == "" || containsKey(IgnoreIDSet, containerID) { + if containerID == "" || containsKey(ignoreIDSet, containerID) { continue } @@ -216,13 +235,13 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { stringMap["SourceSystem"] = "Containers" stringMap["Id"] = containerID - if val, ok := ImageIDMap[containerID]; ok { + if val, ok := imageIDMap[containerID]; ok { stringMap["Image"] = val } else { Log("ContainerId %s not present in Map ", containerID) } - if val, ok := NameIDMap[containerID]; ok { + if val, ok := nameIDMap[containerID]; ok { stringMap["Name"] = val } else { Log("ContainerId %s not present in Map ", containerID) @@ -251,6 +270,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { marshalled, err := json.Marshal(logEntry) if err != nil { Log("Error while Marshalling log Entry: %s", err.Error()) + SendException(err) return output.FLB_OK } req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled)) @@ -262,6 +282,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { if err != nil { Log("Error when sending request %s \n", err.Error()) Log("Failed to flush %d records after %s", len(dataItems), elapsed) + SendException(err) return output.FLB_RETRY } @@ -274,8 +295,10 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { numRecords := len(dataItems) Log("Successfully flushed %d records in %s", numRecords, elapsed) + ContainerLogTelemetryMutex.Lock() FlushedRecordsCount += float64(numRecords) FlushedRecordsTimeTaken += float64(elapsed / time.Millisecond) + ContainerLogTelemetryMutex.Unlock() } return output.FLB_OK @@ -319,12 +342,14 @@ func InitializePlugin(pluginConfPath string) { pluginConfig, err := ReadConfiguration(pluginConfPath) if err != nil { Log("Error Reading plugin config path : %s \n", err.Error()) + SendException(err) log.Fatalf("Error Reading plugin config path : %s \n", err.Error()) } omsadminConf, err := ReadConfiguration(pluginConfig["omsadmin_conf_path"]) if err != nil { Log(err.Error()) + SendException(err) log.Fatalf("Error Reading omsadmin configuration %s\n", err.Error()) } OMSEndpoint = omsadminConf["OMS_ENDPOINT"] @@ -335,6 +360,7 @@ func InitializePlugin(pluginConfPath string) { containerInventoryRefreshInterval, err := strconv.Atoi(pluginConfig["container_inventory_refresh_interval"]) if err != nil { Log("Error Reading Container Inventory Refresh Interval %s", err.Error()) + SendException(err) Log("Using Default Refresh Interval of %d s\n", defaultContainerInventoryRefreshInterval) containerInventoryRefreshInterval = defaultContainerInventoryRefreshInterval } @@ -345,6 +371,7 @@ func InitializePlugin(pluginConfPath string) { kubeSystemContainersRefreshInterval, err := strconv.Atoi(pluginConfig["kube_system_containers_refresh_interval"]) if err != nil { Log("Error Reading Kube System Container Ids Refresh Interval %s", err.Error()) + SendException(err) Log("Using Default Refresh Interval of %d s\n", defaultKubeSystemContainersRefreshInterval) kubeSystemContainersRefreshInterval = defaultKubeSystemContainersRefreshInterval } @@ -356,6 +383,7 @@ func InitializePlugin(pluginConfPath string) { if err != nil { // It is ok to log here and continue, because only the Computer column will be missing, // which can be deduced from a combination of containerId, and docker logs on the node + SendException(err) Log("Error when reading containerHostName file %s.\n It is ok to log here and continue, because only the Computer column will be missing, which can be deduced from a combination of containerId, and docker logs on the nodes\n", err.Error()) } Computer = strings.TrimSuffix(toString(containerHostName), "\n") @@ -364,11 +392,13 @@ func InitializePlugin(pluginConfPath string) { // Initialize KubeAPI Client config, err := rest.InClusterConfig() if err != nil { + SendException(err) Log("Error getting config %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) } ClientSet, err = kubernetes.NewForConfig(config) if err != nil { + SendException(err) Log("Error getting clientset %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) } diff --git a/source/code/go/src/plugins/out_oms.go b/source/code/go/src/plugins/out_oms.go index 732ae5216..e2ee324e7 100644 --- a/source/code/go/src/plugins/out_oms.go +++ b/source/code/go/src/plugins/out_oms.go @@ -34,7 +34,6 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlush func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { - var count int var ret int var record map[interface{}]interface{} var records []map[interface{}]interface{} @@ -43,7 +42,6 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { dec := output.NewDecoder(data, int(length)) // Iterate Records - count = 0 for { // Extract Record ret, _, record = output.GetRecord(dec) @@ -51,7 +49,6 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { break } records = append(records, record) - count++ } return PostDataHelper(records) } diff --git a/source/code/go/src/plugins/telemetry.go b/source/code/go/src/plugins/telemetry.go index b1bc4439b..72454948d 100644 --- a/source/code/go/src/plugins/telemetry.go +++ b/source/code/go/src/plugins/telemetry.go @@ -34,13 +34,14 @@ const ( envACSResourceName = "ACS_RESOURCE_NAME" envAppInsightsAuth = "APPLICATIONINSIGHTS_AUTH" metricNameAvgFlushRate = "ContainerLogAvgRecordsFlushedPerSec" + metricNameAvgLogGenerationRate = "ContainerLogsGeneratedPerSec" defaultTelemetryPushIntervalSeconds = 300 eventNameContainerLogInit = "ContainerLogPluginInitialized" eventNameDaemonSetHeartbeat = "ContainerLogDaemonSetHeartbeatEvent" ) -// Initialize initializes the telemetry artifacts +// initialize initializes the telemetry artifacts func initialize(telemetryPushIntervalProperty string, agentVersion string) (int, error) { telemetryPushInterval, err := strconv.Atoi(telemetryPushIntervalProperty) @@ -87,7 +88,7 @@ func initialize(telemetryPushIntervalProperty string, agentVersion string) (int, CommonProperties["ACSResourceName"] = "" CommonProperties["AKS_RESOURCE_ID"] = aksResourceID splitStrings := strings.Split(aksResourceID, "/") - if len(aksResourceID) > 0 && len(aksResourceID) < 10 { + if len(splitStrings) > 0 && len(splitStrings) < 10 { CommonProperties["SubscriptionID"] = splitStrings[2] CommonProperties["ResourceGroupName"] = splitStrings[4] CommonProperties["ClusterName"] = splitStrings[8] @@ -110,19 +111,24 @@ func SendContainerLogFlushRateMetric(telemetryPushIntervalProperty string, agent Log("Error During Telemetry Initialization :%s", err.Error()) runtime.Goexit() } - + start := time.Now() SendEvent(eventNameContainerLogInit, make(map[string]string)) for ; true; <-ContainerLogTelemetryTicker.C { SendEvent(eventNameDaemonSetHeartbeat, make(map[string]string)) - DataUpdateMutex.Lock() + elapsed := time.Since(start) + ContainerLogTelemetryMutex.Lock() flushRate := FlushedRecordsCount / FlushedRecordsTimeTaken * 1000 - Log("Flushed Records : %f Time Taken : %f flush Rate : %f", FlushedRecordsCount, FlushedRecordsTimeTaken, flushRate) + logRate := FlushedRecordsCount / float64(elapsed/time.Second) FlushedRecordsCount = 0.0 FlushedRecordsTimeTaken = 0.0 - DataUpdateMutex.Unlock() - metric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate) - TelemetryClient.Track(metric) + ContainerLogTelemetryMutex.Unlock() + + flushRateMetric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate) + TelemetryClient.Track(flushRateMetric) + logRateMetric := appinsights.NewMetricTelemetry(metricNameAvgLogGenerationRate, logRate) + TelemetryClient.Track(logRateMetric) + start = time.Now() } } @@ -138,3 +144,10 @@ func SendEvent(eventName string, dimensions map[string]string) { TelemetryClient.Track(event) } + +// SendException send an event to the configured app insights instance +func SendException(err interface{}) { + if TelemetryClient != nil { + TelemetryClient.TrackException(err) + } +} diff --git a/source/code/go/src/plugins/utils.go b/source/code/go/src/plugins/utils.go index 1ac9b05a9..a22935d9c 100644 --- a/source/code/go/src/plugins/utils.go +++ b/source/code/go/src/plugins/utils.go @@ -20,6 +20,7 @@ func ReadConfiguration(filename string) (map[string]string, error) { file, err := os.Open(filename) if err != nil { log.Fatal(err) + SendException(err) return nil, err } defer file.Close() @@ -40,6 +41,7 @@ func ReadConfiguration(filename string) (map[string]string, error) { if err := scanner.Err(); err != nil { log.Fatal(err) + SendException(err) return nil, err } @@ -51,6 +53,7 @@ func CreateHTTPClient() { cert, err := tls.LoadX509KeyPair(PluginConfiguration["cert_file_path"], PluginConfiguration["key_file_path"]) if err != nil { + SendException(err) Log("Error when loading cert %s", err.Error()) log.Fatalf("Error when loading cert %s", err.Error()) } From 3d49afb7b77b74adaed77ede0e9f725202919624 Mon Sep 17 00:00:00 2001 From: r-dilip Date: Wed, 24 Oct 2018 14:48:54 -0700 Subject: [PATCH 2/2] PR Feedback --- source/code/go/src/plugins/oms.go | 60 +++++++++++++++++------------ source/code/go/src/plugins/utils.go | 11 ++++-- 2 files changed, 43 insertions(+), 28 deletions(-) diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go index 274a5a1d2..665c3f9f2 100644 --- a/source/code/go/src/plugins/oms.go +++ b/source/code/go/src/plugins/oms.go @@ -102,7 +102,7 @@ func createLogger() *log.Logger { fmt.Printf("File Exists. Opening file in append mode...\n") logfile, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0600) if err != nil { - SendException(err) + SendException(err.Error()) fmt.Printf(err.Error()) } } @@ -111,7 +111,7 @@ func createLogger() *log.Logger { fmt.Printf("File Doesnt Exist. Creating file...\n") logfile, err = os.Create(path) if err != nil { - SendException(err) + SendException(err.Error()) fmt.Printf(err.Error()) } } @@ -139,8 +139,9 @@ func updateContainerImageNameMaps() { pods, err := ClientSet.CoreV1().Pods("").List(metav1.ListOptions{}) if err != nil { - SendException(err) - Log("Error getting pods %s\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) + message := fmt.Sprintf("Error getting pods %s\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) + Log(message) + SendException(message) continue } @@ -177,8 +178,9 @@ func updateKubeSystemContainerIDs() { pods, err := ClientSet.CoreV1().Pods("kube-system").List(metav1.ListOptions{}) if err != nil { - SendException(err) - Log("Error getting pods %s\nIt is ok to log here and continue. Kube-system logs will be collected", err.Error()) + message := fmt.Sprintf("Error getting pods %s\nIt is ok to log here and continue. Kube-system logs will be collected", err.Error()) + SendException(message) + Log(message) continue } @@ -269,8 +271,9 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { marshalled, err := json.Marshal(logEntry) if err != nil { - Log("Error while Marshalling log Entry: %s", err.Error()) - SendException(err) + message := fmt.Sprintf("Error while Marshalling log Entry: %s", err.Error()) + Log(message) + SendException(message) return output.FLB_OK } req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled)) @@ -280,9 +283,11 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { elapsed := time.Since(start) if err != nil { - Log("Error when sending request %s \n", err.Error()) + message := fmt.Sprintf("Error when sending request %s \n", err.Error()) + Log(message) + SendException(message) Log("Failed to flush %d records after %s", len(dataItems), elapsed) - SendException(err) + return output.FLB_RETRY } @@ -341,15 +346,17 @@ func InitializePlugin(pluginConfPath string) { pluginConfig, err := ReadConfiguration(pluginConfPath) if err != nil { - Log("Error Reading plugin config path : %s \n", err.Error()) - SendException(err) - log.Fatalf("Error Reading plugin config path : %s \n", err.Error()) + message := fmt.Sprintf("Error Reading plugin config path : %s \n", err.Error()) + Log(message) + SendException(message) + time.Sleep(30 * time.Second) + log.Fatalln(message) } omsadminConf, err := ReadConfiguration(pluginConfig["omsadmin_conf_path"]) if err != nil { Log(err.Error()) - SendException(err) + SendException(err.Error()) log.Fatalf("Error Reading omsadmin configuration %s\n", err.Error()) } OMSEndpoint = omsadminConf["OMS_ENDPOINT"] @@ -359,8 +366,9 @@ func InitializePlugin(pluginConfPath string) { // Initialize image,name map refresh ticker containerInventoryRefreshInterval, err := strconv.Atoi(pluginConfig["container_inventory_refresh_interval"]) if err != nil { - Log("Error Reading Container Inventory Refresh Interval %s", err.Error()) - SendException(err) + message := fmt.Sprintf("Error Reading Container Inventory Refresh Interval %s", err.Error()) + Log(message) + SendException(message) Log("Using Default Refresh Interval of %d s\n", defaultContainerInventoryRefreshInterval) containerInventoryRefreshInterval = defaultContainerInventoryRefreshInterval } @@ -370,8 +378,9 @@ func InitializePlugin(pluginConfPath string) { // Initialize Kube System Refresh Ticker kubeSystemContainersRefreshInterval, err := strconv.Atoi(pluginConfig["kube_system_containers_refresh_interval"]) if err != nil { - Log("Error Reading Kube System Container Ids Refresh Interval %s", err.Error()) - SendException(err) + message := fmt.Sprintf("Error Reading Kube System Container Ids Refresh Interval %s", err.Error()) + Log(message) + SendException(message) Log("Using Default Refresh Interval of %d s\n", defaultKubeSystemContainersRefreshInterval) kubeSystemContainersRefreshInterval = defaultKubeSystemContainersRefreshInterval } @@ -383,8 +392,9 @@ func InitializePlugin(pluginConfPath string) { if err != nil { // It is ok to log here and continue, because only the Computer column will be missing, // which can be deduced from a combination of containerId, and docker logs on the node - SendException(err) - Log("Error when reading containerHostName file %s.\n It is ok to log here and continue, because only the Computer column will be missing, which can be deduced from a combination of containerId, and docker logs on the nodes\n", err.Error()) + message := fmt.Sprintf("Error when reading containerHostName file %s.\n It is ok to log here and continue, because only the Computer column will be missing, which can be deduced from a combination of containerId, and docker logs on the nodes\n", err.Error()) + Log(message) + SendException(message) } Computer = strings.TrimSuffix(toString(containerHostName), "\n") Log("Computer == %s \n", Computer) @@ -392,14 +402,16 @@ func InitializePlugin(pluginConfPath string) { // Initialize KubeAPI Client config, err := rest.InClusterConfig() if err != nil { - SendException(err) - Log("Error getting config %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) + message := fmt.Sprintf("Error getting config %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) + Log(message) + SendException(message) } ClientSet, err = kubernetes.NewForConfig(config) if err != nil { - SendException(err) - Log("Error getting clientset %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) + message := fmt.Sprintf("Error getting clientset %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) + SendException(message) + Log(message) } PluginConfiguration = pluginConfig diff --git a/source/code/go/src/plugins/utils.go b/source/code/go/src/plugins/utils.go index a22935d9c..94db033bd 100644 --- a/source/code/go/src/plugins/utils.go +++ b/source/code/go/src/plugins/utils.go @@ -3,6 +3,7 @@ package main import ( "bufio" "crypto/tls" + "fmt" "log" "net/http" "os" @@ -19,8 +20,9 @@ func ReadConfiguration(filename string) (map[string]string, error) { file, err := os.Open(filename) if err != nil { - log.Fatal(err) SendException(err) + log.Fatal(err) + return nil, err } defer file.Close() @@ -40,8 +42,8 @@ func ReadConfiguration(filename string) (map[string]string, error) { } if err := scanner.Err(); err != nil { - log.Fatal(err) SendException(err) + log.Fatal(err) return nil, err } @@ -53,8 +55,9 @@ func CreateHTTPClient() { cert, err := tls.LoadX509KeyPair(PluginConfiguration["cert_file_path"], PluginConfiguration["key_file_path"]) if err != nil { - SendException(err) - Log("Error when loading cert %s", err.Error()) + message := fmt.Sprintf("Error when loading cert %s", err.Error()) + SendException(message) + Log(message) log.Fatalf("Error when loading cert %s", err.Error()) }