Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion installer/conf/telegraf-rs.conf
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

## Default flushing interval for all outputs. You shouldn't set this below
## interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "60s"
flush_interval = "15s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
Expand Down
2 changes: 1 addition & 1 deletion installer/conf/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

## Default flushing interval for all outputs. You shouldn't set this below
## interval. Maximum flush_interval will be flush_interval + flush_jitter
flush_interval = "60s"
flush_interval = "15s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
Expand Down
11 changes: 9 additions & 2 deletions source/code/go/src/plugins/glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions source/code/go/src/plugins/glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import:
- package: github.com/fluent/fluent-bit-go
subpackages:
- output
- package: github.com/google/uuid
version: ^1.1.0
- package: gopkg.in/natefinch/lumberjack.v2
version: ^2.1.0
- package: k8s.io/apimachinery
Expand Down
43 changes: 36 additions & 7 deletions source/code/go/src/plugins/oms.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/fluent/fluent-bit-go/output"
"github.com/google/uuid"

lumberjack "gopkg.in/natefinch/lumberjack.v2"

Expand Down Expand Up @@ -133,6 +134,12 @@ var (
Log = FLBLogger.Printf
)

var (
dockerCimprovVersion = "9.0.0.0"
agentName = "ContainerAgent"
userAgent = ""
)

// DataItem represents the object corresponding to the json that is sent by fluentbit tail plugin
type DataItem struct {
LogEntry string `json:"LogEntry"`
Expand Down Expand Up @@ -513,6 +520,9 @@ func flushKubeMonAgentEventRecords() {
} else {
req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", userAgent )
reqId := uuid.New().String()
req.Header.Set("X-Request-ID", reqId)
//expensive to do string len for every request, so use a flag
if ResourceCentric == true {
req.Header.Set("x-ms-AzureResourceId", ResourceID)
Expand All @@ -527,7 +537,7 @@ func flushKubeMonAgentEventRecords() {
Log("Failed to flush %d records after %s", len(laKubeMonAgentEventsRecords), elapsed)
} else if resp == nil || resp.StatusCode != 200 {
if resp != nil {
Log("Status %s Status Code %d", resp.Status, resp.StatusCode)
Log(" RequestId %s Status %s Status Code %d", reqId, resp.Status, resp.StatusCode)
}
Log("Failed to flush %d records after %s", len(laKubeMonAgentEventsRecords), elapsed)
} else {
Expand Down Expand Up @@ -656,6 +666,9 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int

//set headers
req.Header.Set("x-ms-date", time.Now().Format(time.RFC3339))
req.Header.Set("User-Agent", userAgent )
reqId := uuid.New().String()
req.Header.Set("X-Request-ID", reqId)

//expensive to do string len for every request, so use a flag
if ResourceCentric == true {
Expand All @@ -669,31 +682,34 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int
if err != nil {
message := fmt.Sprintf("PostTelegrafMetricsToLA::Error:(retriable) when sending %v metrics. duration:%v err:%q \n", len(laMetrics), elapsed, err.Error())
Log(message)
UpdateNumTelegrafMetricsSentTelemetry(0, 1)
UpdateNumTelegrafMetricsSentTelemetry(0, 1, 0)
return output.FLB_RETRY
}

if resp == nil || resp.StatusCode != 200 {
if resp != nil {
Log("PostTelegrafMetricsToLA::Error:(retriable) Response Status %v Status Code %v", resp.Status, resp.StatusCode)
Log("PostTelegrafMetricsToLA::Error:(retriable) RequestID %s Response Status %v Status Code %v", reqId, resp.Status, resp.StatusCode)
}
if resp != nil && resp.StatusCode == 429 {
UpdateNumTelegrafMetricsSentTelemetry(0, 1, 1)
}
UpdateNumTelegrafMetricsSentTelemetry(0, 1)
return output.FLB_RETRY
}

defer resp.Body.Close()

numMetrics := len(laMetrics)
UpdateNumTelegrafMetricsSentTelemetry(numMetrics, 0)
UpdateNumTelegrafMetricsSentTelemetry(numMetrics, 0, 0)
Log("PostTelegrafMetricsToLA::Info:Successfully flushed %v records in %v", numMetrics, elapsed)

return output.FLB_OK
}

func UpdateNumTelegrafMetricsSentTelemetry(numMetricsSent int, numSendErrors int) {
func UpdateNumTelegrafMetricsSentTelemetry(numMetricsSent int, numSendErrors int, numSend429Errors int) {
ContainerLogTelemetryMutex.Lock()
TelegrafMetricsSentCount += float64(numMetricsSent)
TelegrafMetricsSendErrorCount += float64(numSendErrors)
TelegrafMetricsSend429ErrorCount += float64(numSend429Errors)
ContainerLogTelemetryMutex.Unlock()
}

Expand Down Expand Up @@ -810,6 +826,9 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {

req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", userAgent )
reqId := uuid.New().String()
req.Header.Set("X-Request-ID", reqId)
//expensive to do string len for every request, so use a flag
if ResourceCentric == true {
req.Header.Set("x-ms-AzureResourceId", ResourceID)
Expand All @@ -830,7 +849,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {

if resp == nil || resp.StatusCode != 200 {
if resp != nil {
Log("Status %s Status Code %d", resp.Status, resp.StatusCode)
Log("RequestId %s Status %s Status Code %d", reqId, resp.Status, resp.StatusCode)
}
return output.FLB_RETRY
}
Expand Down Expand Up @@ -959,6 +978,16 @@ func InitializePlugin(pluginConfPath string, agentVersion string) {
Log("ResourceName=%s", ResourceName)
}

//set useragent to be used by ingestion
docker_cimprov_version := strings.TrimSpace(os.Getenv("DOCKER_CIMPROV_VERSION"))
if len(docker_cimprov_version) > 0 {
dockerCimprovVersion = docker_cimprov_version
}

userAgent = fmt.Sprintf("%s/%s", agentName, dockerCimprovVersion)

Log("Usage-Agent = %s \n", userAgent)

// Initialize image,name map refresh ticker
containerInventoryRefreshInterval, err := strconv.Atoi(pluginConfig["container_inventory_refresh_interval"])
if err != nil {
Expand Down
12 changes: 11 additions & 1 deletion source/code/go/src/plugins/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ var (
TelegrafMetricsSentCount float64
//Tracks the number of send errors between telemetry ticker periods (uses ContainerLogTelemetryTicker)
TelegrafMetricsSendErrorCount float64
//Tracks the number of 429 (throttle) errors between telemetry ticker periods (uses ContainerLogTelemetryTicker)
TelegrafMetricsSend429ErrorCount float64
)

const (
Expand All @@ -49,6 +51,7 @@ const (
metricNameAgentLogProcessingMaxLatencyMs = "ContainerLogsAgentSideLatencyMs"
metricNameNumberofTelegrafMetricsSentSuccessfully = "TelegrafMetricsSentCount"
metricNameNumberofSendErrorsTelegrafMetrics = "TelegrafMetricsSendErrorCount"
metricNameNumberofSend429ErrorsTelegrafMetrics = "TelegrafMetricsSend429ErrorCount"

defaultTelemetryPushIntervalSeconds = 300

Expand Down Expand Up @@ -78,8 +81,10 @@ func SendContainerLogPluginMetrics(telemetryPushIntervalProperty string) {
logSizeRate := FlushedRecordsSize / float64(elapsed/time.Second)
telegrafMetricsSentCount := TelegrafMetricsSentCount
telegrafMetricsSendErrorCount := TelegrafMetricsSendErrorCount
telegrafMetricsSend429ErrorCount := TelegrafMetricsSend429ErrorCount
TelegrafMetricsSentCount = 0.0
TelegrafMetricsSendErrorCount = 0.0
TelegrafMetricsSend429ErrorCount = 0.0
FlushedRecordsCount = 0.0
FlushedRecordsSize = 0.0
FlushedRecordsTimeTaken = 0.0
Expand All @@ -103,7 +108,12 @@ func SendContainerLogPluginMetrics(telemetryPushIntervalProperty string) {
TelemetryClient.Track(logLatencyMetric)
}
TelemetryClient.Track(appinsights.NewMetricTelemetry(metricNameNumberofTelegrafMetricsSentSuccessfully, telegrafMetricsSentCount))
TelemetryClient.Track(appinsights.NewMetricTelemetry(metricNameNumberofSendErrorsTelegrafMetrics, telegrafMetricsSendErrorCount))
if telegrafMetricsSendErrorCount > 0.0 {
TelemetryClient.Track(appinsights.NewMetricTelemetry(metricNameNumberofSendErrorsTelegrafMetrics, telegrafMetricsSendErrorCount))
}
if telegrafMetricsSend429ErrorCount > 0.0 {
TelemetryClient.Track(appinsights.NewMetricTelemetry(metricNameNumberofSend429ErrorsTelegrafMetrics, telegrafMetricsSend429ErrorCount))
}
start = time.Now()
}
}
Expand Down