From 8efa6789f4e8482fd7dc23f8e4766588301f66eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=9E=20Vishwanath=20Narasimhan=20=F0=9F=8C=A4?= Date: Mon, 16 Mar 2020 23:41:52 -0700 Subject: [PATCH 1/2] Add header for throttling Add telemetry for throttling Flush 15secs for telegraf metrics --- installer/conf/telegraf-rs.conf | 2 +- installer/conf/telegraf.conf | 2 +- source/code/go/src/plugins/oms.go | 29 +++++++++++++++++++++---- source/code/go/src/plugins/telemetry.go | 12 +++++++++- 4 files changed, 38 insertions(+), 7 deletions(-) diff --git a/installer/conf/telegraf-rs.conf b/installer/conf/telegraf-rs.conf index 3450ab88f..64b43d0a9 100644 --- a/installer/conf/telegraf-rs.conf +++ b/installer/conf/telegraf-rs.conf @@ -56,7 +56,7 @@ ## Default flushing interval for all outputs. You shouldn't set this below ## interval. Maximum flush_interval will be flush_interval + flush_jitter - flush_interval = "60s" + flush_interval = "15s" ## Jitter the flush interval by a random amount. This is primarily to avoid ## large write spikes for users running a large number of telegraf instances. ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s diff --git a/installer/conf/telegraf.conf b/installer/conf/telegraf.conf index f9dc3fb6a..77cde0086 100644 --- a/installer/conf/telegraf.conf +++ b/installer/conf/telegraf.conf @@ -56,7 +56,7 @@ ## Default flushing interval for all outputs. You shouldn't set this below ## interval. Maximum flush_interval will be flush_interval + flush_jitter - flush_interval = "60s" + flush_interval = "15s" ## Jitter the flush interval by a random amount. This is primarily to avoid ## large write spikes for users running a large number of telegraf instances. ## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go index 8dfaf0e7e..e7cbbbc86 100644 --- a/source/code/go/src/plugins/oms.go +++ b/source/code/go/src/plugins/oms.go @@ -133,6 +133,12 @@ var ( Log = FLBLogger.Printf ) +var ( + dockerCimprovVersion = "9.0.0.0" + agentName = "ContainerAgent" + userAgent = "" +) + // DataItem represents the object corresponding to the json that is sent by fluentbit tail plugin type DataItem struct { LogEntry string `json:"LogEntry"` @@ -513,6 +519,7 @@ func flushKubeMonAgentEventRecords() { } else { req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled)) req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", userAgent ) //expensive to do string len for every request, so use a flag if ResourceCentric == true { req.Header.Set("x-ms-AzureResourceId", ResourceID) @@ -656,6 +663,7 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int //set headers req.Header.Set("x-ms-date", time.Now().Format(time.RFC3339)) + req.Header.Set("User-Agent", userAgent ) //expensive to do string len for every request, so use a flag if ResourceCentric == true { @@ -669,7 +677,7 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int if err != nil { message := fmt.Sprintf("PostTelegrafMetricsToLA::Error:(retriable) when sending %v metrics. duration:%v err:%q \n", len(laMetrics), elapsed, err.Error()) Log(message) - UpdateNumTelegrafMetricsSentTelemetry(0, 1) + UpdateNumTelegrafMetricsSentTelemetry(0, 1, 0) return output.FLB_RETRY } @@ -677,23 +685,26 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int if resp != nil { Log("PostTelegrafMetricsToLA::Error:(retriable) Response Status %v Status Code %v", resp.Status, resp.StatusCode) } - UpdateNumTelegrafMetricsSentTelemetry(0, 1) + if resp != nil && resp.StatusCode == 429 { + UpdateNumTelegrafMetricsSentTelemetry(0, 1, 1) + } return output.FLB_RETRY } defer resp.Body.Close() numMetrics := len(laMetrics) - UpdateNumTelegrafMetricsSentTelemetry(numMetrics, 0) + UpdateNumTelegrafMetricsSentTelemetry(numMetrics, 0, 0) Log("PostTelegrafMetricsToLA::Info:Successfully flushed %v records in %v", numMetrics, elapsed) return output.FLB_OK } -func UpdateNumTelegrafMetricsSentTelemetry(numMetricsSent int, numSendErrors int) { +func UpdateNumTelegrafMetricsSentTelemetry(numMetricsSent int, numSendErrors int, numSend429Errors int) { ContainerLogTelemetryMutex.Lock() TelegrafMetricsSentCount += float64(numMetricsSent) TelegrafMetricsSendErrorCount += float64(numSendErrors) + TelegrafMetricsSend429ErrorCount += float64(numSend429Errors) ContainerLogTelemetryMutex.Unlock() } @@ -810,6 +821,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled)) req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", userAgent ) //expensive to do string len for every request, so use a flag if ResourceCentric == true { req.Header.Set("x-ms-AzureResourceId", ResourceID) @@ -959,6 +971,15 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { Log("ResourceName=%s", ResourceName) } + //set useragent to be used by ingestion + if len(strings.TrimSpace(os.Getenv("DOCKER_CIMPROV_VERSION"))) > 0 { + dockerCimprovVersion = agentVersion + } + + userAgent = fmt.Sprintf("%s/%s", agentName, dockerCimprovVersion) + + Log("Usage-Agent = %s \n", userAgent) + // Initialize image,name map refresh ticker containerInventoryRefreshInterval, err := strconv.Atoi(pluginConfig["container_inventory_refresh_interval"]) if err != nil { diff --git a/source/code/go/src/plugins/telemetry.go b/source/code/go/src/plugins/telemetry.go index d5675187f..b31bac1c7 100644 --- a/source/code/go/src/plugins/telemetry.go +++ b/source/code/go/src/plugins/telemetry.go @@ -34,6 +34,8 @@ var ( TelegrafMetricsSentCount float64 //Tracks the number of send errors between telemetry ticker periods (uses ContainerLogTelemetryTicker) TelegrafMetricsSendErrorCount float64 + //Tracks the number of 429 (throttle) errors between telemetry ticker periods (uses ContainerLogTelemetryTicker) + TelegrafMetricsSend429ErrorCount float64 ) const ( @@ -49,6 +51,7 @@ const ( metricNameAgentLogProcessingMaxLatencyMs = "ContainerLogsAgentSideLatencyMs" metricNameNumberofTelegrafMetricsSentSuccessfully = "TelegrafMetricsSentCount" metricNameNumberofSendErrorsTelegrafMetrics = "TelegrafMetricsSendErrorCount" + metricNameNumberofSend429ErrorsTelegrafMetrics = "TelegrafMetricsSend429ErrorCount" defaultTelemetryPushIntervalSeconds = 300 @@ -78,8 +81,10 @@ func SendContainerLogPluginMetrics(telemetryPushIntervalProperty string) { logSizeRate := FlushedRecordsSize / float64(elapsed/time.Second) telegrafMetricsSentCount := TelegrafMetricsSentCount telegrafMetricsSendErrorCount := TelegrafMetricsSendErrorCount + telegrafMetricsSend429ErrorCount := TelegrafMetricsSend429ErrorCount TelegrafMetricsSentCount = 0.0 TelegrafMetricsSendErrorCount = 0.0 + TelegrafMetricsSend429ErrorCount = 0.0 FlushedRecordsCount = 0.0 FlushedRecordsSize = 0.0 FlushedRecordsTimeTaken = 0.0 @@ -103,7 +108,12 @@ func SendContainerLogPluginMetrics(telemetryPushIntervalProperty string) { TelemetryClient.Track(logLatencyMetric) } TelemetryClient.Track(appinsights.NewMetricTelemetry(metricNameNumberofTelegrafMetricsSentSuccessfully, telegrafMetricsSentCount)) - TelemetryClient.Track(appinsights.NewMetricTelemetry(metricNameNumberofSendErrorsTelegrafMetrics, telegrafMetricsSendErrorCount)) + if telegrafMetricsSendErrorCount > 0.0 { + TelemetryClient.Track(appinsights.NewMetricTelemetry(metricNameNumberofSendErrorsTelegrafMetrics, telegrafMetricsSendErrorCount)) + } + if telegrafMetricsSend429ErrorCount > 0.0 { + TelemetryClient.Track(appinsights.NewMetricTelemetry(metricNameNumberofSend429ErrorsTelegrafMetrics, telegrafMetricsSend429ErrorCount)) + } start = time.Now() } } From e58ca75a8d764addf0385ae8f7a8a68403ee5b8f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=F0=9F=8C=9E=20Vishwanath=20Narasimhan=20=F0=9F=8C=A4?= Date: Tue, 17 Mar 2020 17:35:09 -0700 Subject: [PATCH 2/2] * Add requestid & log it for errors * Fix bug in semver --- source/code/go/src/plugins/glide.lock | 11 +++++++++-- source/code/go/src/plugins/glide.yaml | 2 ++ source/code/go/src/plugins/oms.go | 18 +++++++++++++----- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/source/code/go/src/plugins/glide.lock b/source/code/go/src/plugins/glide.lock index fc147fe74..b322898e5 100644 --- a/source/code/go/src/plugins/glide.lock +++ b/source/code/go/src/plugins/glide.lock @@ -1,6 +1,8 @@ -hash: a6a873d09ed9c3d890a70122e61efba992ead9850fe48f6fcb020d86800d4ade -updated: 2018-10-10T13:37:51.9703908-07:00 +hash: b00ffe7239c0f8d9d87e0afe8512c75ff9952c41cd2b3b79dc30fe6d3f0e3ec8 +updated: 2020-03-17T15:46:37.8805517-07:00 imports: +- name: code.cloudfoundry.org/clock + version: 86534f4ca3a5cbac1685968132dfaefeb8e8cad2 - name: github.com/fluent/fluent-bit-go version: c4a158a6e3a793166c6ecfa2d5c80d71eada8959 subpackages: @@ -26,6 +28,8 @@ imports: version: 7d79101e329e5a3adf994758c578dab82b90c017 - name: github.com/google/gofuzz version: 44d81051d367757e1c7c6a5a86423ece9afcf63c +- name: github.com/google/uuid + version: 0cd6bf5da1e1c83f8b45653022c74f71af0538a4 - name: github.com/googleapis/gnostic version: 0c5108395e2debce0d731cf0287ddf7242066aba subpackages: @@ -42,12 +46,15 @@ imports: version: d2df5d440eda5372f24fcac03839a64d6cb5f7e5 subpackages: - appinsights + - appinsights/contracts - name: github.com/modern-go/concurrent version: bacd9c7ef1dd9b15be4a9909b8ac7a4e313eec94 - name: github.com/modern-go/reflect2 version: 05fbef0ca5da472bbf96c9322b84a53edc03c9fd - name: github.com/peterbourgon/diskv version: 5f041e8faa004a95c88a202771f4cc3e991971e6 +- name: github.com/satori/go.uuid + version: b2ce2384e17bbe0c6d34077efa39dbab3e09123b - name: github.com/ugorji/go version: 00b869d2f4a5e27445c2d916fa106fc72c106d4c subpackages: diff --git a/source/code/go/src/plugins/glide.yaml b/source/code/go/src/plugins/glide.yaml index b2829391b..9d0f9d2ac 100644 --- a/source/code/go/src/plugins/glide.yaml +++ b/source/code/go/src/plugins/glide.yaml @@ -3,6 +3,8 @@ import: - package: github.com/fluent/fluent-bit-go subpackages: - output +- package: github.com/google/uuid + version: ^1.1.0 - package: gopkg.in/natefinch/lumberjack.v2 version: ^2.1.0 - package: k8s.io/apimachinery diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go index e7cbbbc86..69825f006 100644 --- a/source/code/go/src/plugins/oms.go +++ b/source/code/go/src/plugins/oms.go @@ -14,6 +14,7 @@ import ( "time" "github.com/fluent/fluent-bit-go/output" + "github.com/google/uuid" lumberjack "gopkg.in/natefinch/lumberjack.v2" @@ -520,6 +521,8 @@ func flushKubeMonAgentEventRecords() { req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled)) req.Header.Set("Content-Type", "application/json") req.Header.Set("User-Agent", userAgent ) + reqId := uuid.New().String() + req.Header.Set("X-Request-ID", reqId) //expensive to do string len for every request, so use a flag if ResourceCentric == true { req.Header.Set("x-ms-AzureResourceId", ResourceID) @@ -534,7 +537,7 @@ func flushKubeMonAgentEventRecords() { Log("Failed to flush %d records after %s", len(laKubeMonAgentEventsRecords), elapsed) } else if resp == nil || resp.StatusCode != 200 { if resp != nil { - Log("Status %s Status Code %d", resp.Status, resp.StatusCode) + Log(" RequestId %s Status %s Status Code %d", reqId, resp.Status, resp.StatusCode) } Log("Failed to flush %d records after %s", len(laKubeMonAgentEventsRecords), elapsed) } else { @@ -664,6 +667,8 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int //set headers req.Header.Set("x-ms-date", time.Now().Format(time.RFC3339)) req.Header.Set("User-Agent", userAgent ) + reqId := uuid.New().String() + req.Header.Set("X-Request-ID", reqId) //expensive to do string len for every request, so use a flag if ResourceCentric == true { @@ -683,7 +688,7 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int if resp == nil || resp.StatusCode != 200 { if resp != nil { - Log("PostTelegrafMetricsToLA::Error:(retriable) Response Status %v Status Code %v", resp.Status, resp.StatusCode) + Log("PostTelegrafMetricsToLA::Error:(retriable) RequestID %s Response Status %v Status Code %v", reqId, resp.Status, resp.StatusCode) } if resp != nil && resp.StatusCode == 429 { UpdateNumTelegrafMetricsSentTelemetry(0, 1, 1) @@ -822,6 +827,8 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled)) req.Header.Set("Content-Type", "application/json") req.Header.Set("User-Agent", userAgent ) + reqId := uuid.New().String() + req.Header.Set("X-Request-ID", reqId) //expensive to do string len for every request, so use a flag if ResourceCentric == true { req.Header.Set("x-ms-AzureResourceId", ResourceID) @@ -842,7 +849,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { if resp == nil || resp.StatusCode != 200 { if resp != nil { - Log("Status %s Status Code %d", resp.Status, resp.StatusCode) + Log("RequestId %s Status %s Status Code %d", reqId, resp.Status, resp.StatusCode) } return output.FLB_RETRY } @@ -972,8 +979,9 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { } //set useragent to be used by ingestion - if len(strings.TrimSpace(os.Getenv("DOCKER_CIMPROV_VERSION"))) > 0 { - dockerCimprovVersion = agentVersion + docker_cimprov_version := strings.TrimSpace(os.Getenv("DOCKER_CIMPROV_VERSION")) + if len(docker_cimprov_version) > 0 { + dockerCimprovVersion = docker_cimprov_version } userAgent = fmt.Sprintf("%s/%s", agentName, dockerCimprovVersion)