From 062019618b377923ca213932770b3a7c9e12b8ef Mon Sep 17 00:00:00 2001 From: Ganga Mahesh Siddem Date: Fri, 30 Jul 2021 13:27:20 -0700 Subject: [PATCH 1/2] emit log entries when more than 0 after exclusion --- source/plugins/go/src/oms.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/plugins/go/src/oms.go b/source/plugins/go/src/oms.go index 026d36d6c..8123889e8 100644 --- a/source/plugins/go/src/oms.go +++ b/source/plugins/go/src/oms.go @@ -1357,7 +1357,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { numContainerLogRecords = len(dataItemsADX) Log("Success::ADX::Successfully wrote %d container log records to ADX in %s", numContainerLogRecords, elapsed) - } else { //ODS + } else if (ContainerLogSchemaV2 == true && len(dataItemsLAv2) > 0) || len(dataItemsLAv1) > 0) { //ODS var logEntry interface{} recordType := "" loglinesCount := 0 @@ -1437,7 +1437,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { numContainerLogRecords = loglinesCount Log("PostDataHelper::Info::Successfully flushed %d %s records to ODS in %s", numContainerLogRecords, recordType, elapsed) - } + } ContainerLogTelemetryMutex.Lock() defer ContainerLogTelemetryMutex.Unlock() From 5d99de9c8b1b62c88061f461d600257381672480 Mon Sep 17 00:00:00 2001 From: Ganga Mahesh Siddem Date: Mon, 2 Aug 2021 09:39:35 -0700 Subject: [PATCH 2/2] fix build error --- source/plugins/go/src/oms.go | 168 +++++++++++++++++------------------ 1 file changed, 84 insertions(+), 84 deletions(-) diff --git a/source/plugins/go/src/oms.go b/source/plugins/go/src/oms.go index 8123889e8..32ea0f67f 100644 --- a/source/plugins/go/src/oms.go +++ b/source/plugins/go/src/oms.go @@ -165,17 +165,17 @@ var ( // ADX tenantID AdxTenantID string //ADX client secret - AdxClientSecret string + AdxClientSecret string // container log or container log v2 tag name for oneagent route - MdsdContainerLogTagName string + MdsdContainerLogTagName string // kubemonagent events tag name for oneagent route MdsdKubeMonAgentEventsTagName string // InsightsMetrics tag name for oneagent route - MdsdInsightsMetricsTagName string + MdsdInsightsMetricsTagName string // flag to check if its Windows OS IsWindows bool - // container type - ContainerType string + // container type + ContainerType string // flag to check whether LA AAD MSI Auth Enabled or not IsAADMSIAuthMode bool ) @@ -206,7 +206,7 @@ var ( // IngestionAuthTokenUpdateMutex read and write mutex access for ODSIngestionAuthToken IngestionAuthTokenUpdateMutex = &sync.Mutex{} // ODSIngestionAuthToken for windows agent AAD MSI Auth - ODSIngestionAuthToken string + ODSIngestionAuthToken string ) var ( @@ -355,12 +355,12 @@ const ( ) // DataType to be used as enum per data type socket client creation -type DataType int +type DataType int const ( // DataType to be used as enum per data type socket client creation ContainerLogV2 DataType = iota - KubeMonAgentEvents - InsightsMetrics + KubeMonAgentEvents + InsightsMetrics ) func createLogger() *log.Logger { @@ -608,7 +608,7 @@ func flushKubeMonAgentEventRecords() { Message: k, Tags: fmt.Sprintf("%s", tagJson), } - laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord) + laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord) var stringMap map[string]string jsonBytes, err := json.Marshal(&laKubeMonAgentEventsRecord) if err != nil { @@ -621,10 +621,10 @@ func flushKubeMonAgentEventRecords() { Log(message) SendException(message) } else { - msgPackEntry := MsgPackEntry{ + msgPackEntry := MsgPackEntry{ Record: stringMap, } - msgPackEntries = append(msgPackEntries, msgPackEntry) + msgPackEntries = append(msgPackEntries, msgPackEntry) } } } @@ -647,23 +647,23 @@ func flushKubeMonAgentEventRecords() { Message: k, Tags: fmt.Sprintf("%s", tagJson), } - laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord) + laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord) var stringMap map[string]string jsonBytes, err := json.Marshal(&laKubeMonAgentEventsRecord) if err != nil { message := fmt.Sprintf("Error while Marshalling laKubeMonAgentEventsRecord to json bytes: %s", err.Error()) Log(message) SendException(message) - } else { - if err := json.Unmarshal(jsonBytes, &stringMap); err != nil { + } else { + if err := json.Unmarshal(jsonBytes, &stringMap); err != nil { message := fmt.Sprintf("Error while UnMarhalling json bytes to stringmap: %s", err.Error()) Log(message) SendException(message) } else { - msgPackEntry := MsgPackEntry{ + msgPackEntry := MsgPackEntry{ Record: stringMap, - } - msgPackEntries = append(msgPackEntries, msgPackEntry) + } + msgPackEntries = append(msgPackEntries, msgPackEntry) } } } @@ -696,66 +696,66 @@ func flushKubeMonAgentEventRecords() { Message: "No errors", Tags: fmt.Sprintf("%s", tagJson), } - laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord) + laKubeMonAgentEventsRecords = append(laKubeMonAgentEventsRecords, laKubeMonAgentEventsRecord) var stringMap map[string]string jsonBytes, err := json.Marshal(&laKubeMonAgentEventsRecord) - if err != nil { + if err != nil { message := fmt.Sprintf("Error while Marshalling laKubeMonAgentEventsRecord to json bytes: %s", err.Error()) Log(message) SendException(message) } else { - if err := json.Unmarshal(jsonBytes, &stringMap); err != nil { + if err := json.Unmarshal(jsonBytes, &stringMap); err != nil { message := fmt.Sprintf("Error while UnMarshalling json bytes to stringmap: %s", err.Error()) Log(message) SendException(message) - } else { - msgPackEntry := MsgPackEntry{ + } else { + msgPackEntry := MsgPackEntry{ Record: stringMap, } - msgPackEntries = append(msgPackEntries, msgPackEntry) + msgPackEntries = append(msgPackEntries, msgPackEntry) } } } } - if (IsWindows == false && len(msgPackEntries) > 0) { //for linux, mdsd route + if (IsWindows == false && len(msgPackEntries) > 0) { //for linux, mdsd route if IsAADMSIAuthMode == true && strings.HasPrefix(MdsdKubeMonAgentEventsTagName, MdsdOutputStreamIdTagPrefix) == false { Log("Info::mdsd::obtaining output stream id for data type: %s", KubeMonAgentEventDataType) MdsdKubeMonAgentEventsTagName = extension.GetInstance(FLBLogger, ContainerType).GetOutputStreamId(KubeMonAgentEventDataType) - } + } Log("Info::mdsd:: using mdsdsource name for KubeMonAgentEvents: %s", MdsdKubeMonAgentEventsTagName) - msgpBytes := convertMsgPackEntriesToMsgpBytes(MdsdKubeMonAgentEventsTagName, msgPackEntries) + msgpBytes := convertMsgPackEntriesToMsgpBytes(MdsdKubeMonAgentEventsTagName, msgPackEntries) if MdsdKubeMonMsgpUnixSocketClient == nil { Log("Error::mdsd::mdsd connection for KubeMonAgentEvents does not exist. re-connecting ...") CreateMDSDClient(KubeMonAgentEvents, ContainerType) if MdsdKubeMonMsgpUnixSocketClient == nil { - Log("Error::mdsd::Unable to create mdsd client for KubeMonAgentEvents. Please check error log.") + Log("Error::mdsd::Unable to create mdsd client for KubeMonAgentEvents. Please check error log.") ContainerLogTelemetryMutex.Lock() defer ContainerLogTelemetryMutex.Unlock() - KubeMonEventsMDSDClientCreateErrors += 1 - } + KubeMonEventsMDSDClientCreateErrors += 1 + } } - if MdsdKubeMonMsgpUnixSocketClient != nil { + if MdsdKubeMonMsgpUnixSocketClient != nil { deadline := 10 * time.Second - MdsdKubeMonMsgpUnixSocketClient.SetWriteDeadline(time.Now().Add(deadline)) //this is based of clock time, so cannot reuse + MdsdKubeMonMsgpUnixSocketClient.SetWriteDeadline(time.Now().Add(deadline)) //this is based of clock time, so cannot reuse bts, er := MdsdKubeMonMsgpUnixSocketClient.Write(msgpBytes) - elapsed = time.Since(start) + elapsed = time.Since(start) if er != nil { message := fmt.Sprintf("Error::mdsd::Failed to write to kubemonagent mdsd %d records after %s. Will retry ... error : %s", len(msgPackEntries), elapsed, er.Error()) Log(message) if MdsdKubeMonMsgpUnixSocketClient != nil { MdsdKubeMonMsgpUnixSocketClient.Close() MdsdKubeMonMsgpUnixSocketClient = nil - } + } SendException(message) } else { numRecords := len(msgPackEntries) Log("FlushKubeMonAgentEventRecords::Info::Successfully flushed %d records that was %d bytes in %s", numRecords, bts, elapsed) // Send telemetry to AppInsights resource SendEvent(KubeMonAgentEventsFlushedEvent, telemetryDimensions) - } + } } else { - Log("Error::mdsd::Unable to create mdsd client for KubeMonAgentEvents. Please check error log.") - } + Log("Error::mdsd::Unable to create mdsd client for KubeMonAgentEvents. Please check error log.") + } } else if len(laKubeMonAgentEventsRecords) > 0 { //for windows, ODS direct kubeMonAgentEventEntry := KubeMonAgentEventBlob{ DataType: KubeMonAgentEventDataType, @@ -782,10 +782,10 @@ func flushKubeMonAgentEventRecords() { if IsAADMSIAuthMode == true { IngestionAuthTokenUpdateMutex.Lock() ingestionAuthToken := ODSIngestionAuthToken - IngestionAuthTokenUpdateMutex.Unlock() - if ingestionAuthToken == "" { - Log("Error::ODS Ingestion Auth Token is empty. Please check error log.") - } + IngestionAuthTokenUpdateMutex.Unlock() + if ingestionAuthToken == "" { + Log("Error::ODS Ingestion Auth Token is empty. Please check error log.") + } req.Header.Set("Authorization", "Bearer "+ingestionAuthToken) } @@ -898,15 +898,15 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int message := fmt.Sprintf("PostTelegrafMetricsToLA::Info:derived %v metrics from %v timeseries", len(laMetrics), len(telegrafRecords)) Log(message) } - + if IsWindows == false { //for linux, mdsd route - var msgPackEntries []MsgPackEntry + var msgPackEntries []MsgPackEntry var i int start := time.Now() var elapsed time.Duration - for i = 0; i < len(laMetrics); i++ { - var interfaceMap map[string]interface{} + for i = 0; i < len(laMetrics); i++ { + var interfaceMap map[string]interface{} stringMap := make(map[string]string) jsonBytes, err := json.Marshal(*laMetrics[i]) if err != nil { @@ -915,35 +915,35 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int SendException(message) return output.FLB_OK } else { - if err := json.Unmarshal(jsonBytes, &interfaceMap); err != nil { + if err := json.Unmarshal(jsonBytes, &interfaceMap); err != nil { message := fmt.Sprintf("Error while UnMarshalling json bytes to interfaceMap: %s", err.Error()) Log(message) SendException(message) return output.FLB_OK - } else { + } else { for key, value := range interfaceMap { strKey := fmt.Sprintf("%v", key) strValue := fmt.Sprintf("%v", value) stringMap[strKey] = strValue - } - msgPackEntry := MsgPackEntry{ + } + msgPackEntry := MsgPackEntry{ Record: stringMap, } - msgPackEntries = append(msgPackEntries, msgPackEntry) - } + msgPackEntries = append(msgPackEntries, msgPackEntry) + } } } - if (len(msgPackEntries) > 0) { + if (len(msgPackEntries) > 0) { if IsAADMSIAuthMode == true && (strings.HasPrefix(MdsdInsightsMetricsTagName, MdsdOutputStreamIdTagPrefix) == false) { Log("Info::mdsd::obtaining output stream id for InsightsMetricsDataType since Log Analytics AAD MSI Auth Enabled") MdsdInsightsMetricsTagName = extension.GetInstance(FLBLogger, ContainerType).GetOutputStreamId(InsightsMetricsDataType) - } - msgpBytes := convertMsgPackEntriesToMsgpBytes(MdsdInsightsMetricsTagName, msgPackEntries) + } + msgpBytes := convertMsgPackEntriesToMsgpBytes(MdsdInsightsMetricsTagName, msgPackEntries) if MdsdInsightsMetricsMsgpUnixSocketClient == nil { Log("Error::mdsd::mdsd connection does not exist. re-connecting ...") CreateMDSDClient(InsightsMetrics, ContainerType) if MdsdInsightsMetricsMsgpUnixSocketClient == nil { - Log("Error::mdsd::Unable to create mdsd client for insights metrics. Please check error log.") + Log("Error::mdsd::Unable to create mdsd client for insights metrics. Please check error log.") ContainerLogTelemetryMutex.Lock() defer ContainerLogTelemetryMutex.Unlock() InsightsMetricsMDSDClientCreateErrors += 1 @@ -952,7 +952,7 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int } deadline := 10 * time.Second - MdsdInsightsMetricsMsgpUnixSocketClient.SetWriteDeadline(time.Now().Add(deadline)) //this is based of clock time, so cannot reuse + MdsdInsightsMetricsMsgpUnixSocketClient.SetWriteDeadline(time.Now().Add(deadline)) //this is based of clock time, so cannot reuse bts, er := MdsdInsightsMetricsMsgpUnixSocketClient.Write(msgpBytes) elapsed = time.Since(start) @@ -967,7 +967,7 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int ContainerLogTelemetryMutex.Lock() defer ContainerLogTelemetryMutex.Unlock() - InsightsMetricsMDSDClientCreateErrors += 1 + InsightsMetricsMDSDClientCreateErrors += 1 return output.FLB_RETRY } else { numTelegrafMetricsRecords := len(msgPackEntries) @@ -975,7 +975,7 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int Log("Success::mdsd::Successfully flushed %d telegraf metrics records that was %d bytes to mdsd in %s ", numTelegrafMetricsRecords, bts, elapsed) } } - + } else { // for windows, ODS direct var metrics []laTelegrafMetric @@ -1017,9 +1017,9 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int if IsAADMSIAuthMode == true { IngestionAuthTokenUpdateMutex.Lock() ingestionAuthToken := ODSIngestionAuthToken - IngestionAuthTokenUpdateMutex.Unlock() - if ingestionAuthToken == "" { - message := "Error::ODS Ingestion Auth Token is empty. Please check error log." + IngestionAuthTokenUpdateMutex.Unlock() + if ingestionAuthToken == "" { + message := "Error::ODS Ingestion Auth Token is empty. Please check error log." Log(message) return output.FLB_RETRY } @@ -1230,7 +1230,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { numContainerLogRecords := 0 if len(msgPackEntries) > 0 && ContainerLogsRouteV2 == true { - //flush to mdsd + //flush to mdsd if IsAADMSIAuthMode == true && strings.HasPrefix(MdsdContainerLogTagName, MdsdOutputStreamIdTagPrefix) == false { Log("Info::mdsd::obtaining output stream id") if ContainerLogSchemaV2 == true { @@ -1240,7 +1240,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { } Log("Info::mdsd:: using mdsdsource name: %s", MdsdContainerLogTagName) } - + fluentForward := MsgPackForward{ Tag: MdsdContainerLogTagName, Entries: msgPackEntries, @@ -1357,7 +1357,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { numContainerLogRecords = len(dataItemsADX) Log("Success::ADX::Successfully wrote %d container log records to ADX in %s", numContainerLogRecords, elapsed) - } else if (ContainerLogSchemaV2 == true && len(dataItemsLAv2) > 0) || len(dataItemsLAv1) > 0) { //ODS + } else if ((ContainerLogSchemaV2 == true && len(dataItemsLAv2) > 0) || len(dataItemsLAv1) > 0) { //ODS var logEntry interface{} recordType := "" loglinesCount := 0 @@ -1399,19 +1399,19 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { if ResourceCentric == true { req.Header.Set("x-ms-AzureResourceId", ResourceID) } - + if IsAADMSIAuthMode == true { IngestionAuthTokenUpdateMutex.Lock() ingestionAuthToken := ODSIngestionAuthToken IngestionAuthTokenUpdateMutex.Unlock() - if ingestionAuthToken == "" { - Log("Error::ODS Ingestion Auth Token is empty. Please check error log.") + if ingestionAuthToken == "" { + Log("Error::ODS Ingestion Auth Token is empty. Please check error log.") return output.FLB_RETRY } // add authorization header to the req req.Header.Set("Authorization", "Bearer "+ingestionAuthToken) - } - + } + resp, err := HTTPClient.Do(req) elapsed = time.Since(start) @@ -1420,7 +1420,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { Log(message) // Commenting this out for now. TODO - Add better telemetry for ods errors using aggregation //SendException(message) - + Log("Failed to flush %d records after %s", loglinesCount, elapsed) return output.FLB_RETRY @@ -1508,7 +1508,7 @@ func GetContainerIDK8sNamespacePodNameFromFileName(filename string) (string, str } // InitializePlugin reads and populates plugin configuration -func InitializePlugin(pluginConfPath string, agentVersion string) { +func InitializePlugin(pluginConfPath string, agentVersion string) { go func() { isTest := os.Getenv("ISTEST") if strings.Compare(strings.ToLower(strings.TrimSpace(isTest)), "true") == 0 { @@ -1548,10 +1548,10 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { } ContainerType = os.Getenv(ContainerTypeEnv) - Log("Container Type %s", ContainerType) + Log("Container Type %s", ContainerType) osType := os.Getenv("OS_TYPE") - IsWindows = false + IsWindows = false // Linux if strings.Compare(strings.ToLower(osType), "windows") != 0 { Log("Reading configuration for Linux from %s", pluginConfPath) @@ -1570,7 +1570,7 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { SendException(message) time.Sleep(30 * time.Second) log.Fatalln(message) - } + } OMSEndpoint = "https://" + WorkspaceID + ".ods." + LogAnalyticsWorkspaceDomain + "/OperationalData.svc/PostJsonDataItems" // Populate Computer field containerHostName, err1 := ioutil.ReadFile(pluginConfig["container_host_file_path"]) @@ -1600,7 +1600,7 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { } } else { // windows - IsWindows = true + IsWindows = true Computer = os.Getenv("HOSTNAME") WorkspaceID = os.Getenv("WSID") logAnalyticsDomain := os.Getenv("DOMAIN") @@ -1612,7 +1612,7 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { IsAADMSIAuthMode = false if strings.Compare(strings.ToLower(os.Getenv(AADMSIAuthMode)), "true") == 0 { IsAADMSIAuthMode = true - Log("AAD MSI Auth Mode Configured") + Log("AAD MSI Auth Mode Configured") } ResourceID = os.Getenv(envAKSResourceID) @@ -1687,13 +1687,13 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { Log(message) } - PluginConfiguration = pluginConfig + PluginConfiguration = pluginConfig ContainerLogsRoute := strings.TrimSpace(strings.ToLower(os.Getenv("AZMON_CONTAINER_LOGS_ROUTE"))) Log("AZMON_CONTAINER_LOGS_ROUTE:%s", ContainerLogsRoute) - ContainerLogsRouteV2 = false - ContainerLogsRouteADX = false + ContainerLogsRouteV2 = false + ContainerLogsRouteADX = false if strings.Compare(ContainerLogsRoute, ContainerLogsADXRoute) == 0 { //check if adx clusteruri, clientid & secret are set @@ -1726,14 +1726,14 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { Log("Routing container logs thru %s route...", ContainerLogsADXRoute) fmt.Fprintf(os.Stdout, "Routing container logs thru %s route...\n", ContainerLogsADXRoute) } - } else if strings.Compare(strings.ToLower(osType), "windows") != 0 { //for linux, oneagent will be default route + } else if strings.Compare(strings.ToLower(osType), "windows") != 0 { //for linux, oneagent will be default route ContainerLogsRouteV2 = true //default is mdsd route - if strings.Compare(ContainerLogsRoute, ContainerLogsV1Route) == 0 { + if strings.Compare(ContainerLogsRoute, ContainerLogsV1Route) == 0 { ContainerLogsRouteV2 = false //fallback option when hiddensetting set } Log("Routing container logs thru %s route...", ContainerLogsRoute) fmt.Fprintf(os.Stdout, "Routing container logs thru %s route... \n", ContainerLogsRoute) - } + } if ContainerLogsRouteV2 == true { CreateMDSDClient(ContainerLogV2, ContainerType) @@ -1746,7 +1746,7 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { if IsWindows == false { // mdsd linux specific Log("Creating MDSD clients for KubeMonAgentEvents & InsightsMetrics") - CreateMDSDClient(KubeMonAgentEvents, ContainerType) + CreateMDSDClient(KubeMonAgentEvents, ContainerType) CreateMDSDClient(InsightsMetrics, ContainerType) } @@ -1785,7 +1785,7 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { } MdsdInsightsMetricsTagName = MdsdInsightsMetricsSourceName - MdsdKubeMonAgentEventsTagName = MdsdKubeMonAgentEventsSourceName + MdsdKubeMonAgentEventsTagName = MdsdKubeMonAgentEventsSourceName Log("ContainerLogsRouteADX: %v, IsWindows: %v, IsAADMSIAuthMode = %v \n", ContainerLogsRouteADX, IsWindows, IsAADMSIAuthMode) if !ContainerLogsRouteADX && IsWindows && IsAADMSIAuthMode { Log("defaultIngestionAuthTokenRefreshIntervalSeconds = %d \n", defaultIngestionAuthTokenRefreshIntervalSeconds)