From df1a04cbffd1e3753bc90803c2a29fe77d60abb8 Mon Sep 17 00:00:00 2001 From: Ganga Mahesh Siddem Date: Thu, 27 Jan 2022 14:18:50 -0800 Subject: [PATCH] remove v1 fallback hidden option --- source/plugins/go/src/oms.go | 277 +++++++++++++++++------------------ 1 file changed, 136 insertions(+), 141 deletions(-) diff --git a/source/plugins/go/src/oms.go b/source/plugins/go/src/oms.go index ee221a60b..8c7695346 100644 --- a/source/plugins/go/src/oms.go +++ b/source/plugins/go/src/oms.go @@ -21,9 +21,10 @@ import ( "github.com/google/uuid" "github.com/tinylib/msgp/msgp" - lumberjack "gopkg.in/natefinch/lumberjack.v2" "Docker-Provider/source/plugins/go/src/extension" + lumberjack "gopkg.in/natefinch/lumberjack.v2" + "github.com/Azure/azure-kusto-go/kusto/ingest" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -85,7 +86,6 @@ const WindowsContainerLogPluginConfFilePath = "/etc/omsagentwindows/out_oms.conf // IPName const IPName = "ContainerInsights" - const defaultContainerInventoryRefreshInterval = 60 const kubeMonAgentConfigEventFlushInterval = 60 @@ -102,9 +102,6 @@ const ContainerLogsV2Route = "v2" const ContainerLogsADXRoute = "adx" -//fallback option v1 route i.e. ODS direct if required in any case -const ContainerLogsV1Route = "v1" - //container logs schema (v2=ContainerLogsV2 table in LA, anything else ContainerLogs table in LA. This is applicable only if Container logs route is NOT ADX) const ContainerLogV2SchemaVersion = "v2" @@ -252,29 +249,29 @@ type DataItemLAv1 struct { // DataItemLAv2 == ContainerLogV2 table in LA // Please keep the names same as destination column names, to avoid transforming one to another in the pipeline type DataItemLAv2 struct { - TimeGenerated string `json:"TimeGenerated"` - Computer string `json:"Computer"` - ContainerId string `json:"ContainerId"` - ContainerName string `json:"ContainerName"` - PodName string `json:"PodName"` - PodNamespace string `json:"PodNamespace"` - LogMessage string `json:"LogMessage"` - LogSource string `json:"LogSource"` + TimeGenerated string `json:"TimeGenerated"` + Computer string `json:"Computer"` + ContainerId string `json:"ContainerId"` + ContainerName string `json:"ContainerName"` + PodName string `json:"PodName"` + PodNamespace string `json:"PodNamespace"` + LogMessage string `json:"LogMessage"` + LogSource string `json:"LogSource"` //PodLabels string `json:"PodLabels"` } // DataItemADX == ContainerLogV2 table in ADX type DataItemADX struct { - TimeGenerated string `json:"TimeGenerated"` - Computer string `json:"Computer"` - ContainerId string `json:"ContainerId"` - ContainerName string `json:"ContainerName"` - PodName string `json:"PodName"` - PodNamespace string `json:"PodNamespace"` - LogMessage string `json:"LogMessage"` - LogSource string `json:"LogSource"` + TimeGenerated string `json:"TimeGenerated"` + Computer string `json:"Computer"` + ContainerId string `json:"ContainerId"` + ContainerName string `json:"ContainerName"` + PodName string `json:"PodName"` + PodNamespace string `json:"PodNamespace"` + LogMessage string `json:"LogMessage"` + LogSource string `json:"LogSource"` //PodLabels string `json:"PodLabels"` - AzureResourceId string `json:"AzureResourceId"` + AzureResourceId string `json:"AzureResourceId"` } // telegraf metric DataItem represents the object corresponding to the json that is sent by fluentbit tail plugin @@ -299,15 +296,15 @@ type InsightsMetricsBlob struct { // ContainerLogBlob represents the object corresponding to the payload that is sent to the ODS end point type ContainerLogBlobLAv1 struct { - DataType string `json:"DataType"` - IPName string `json:"IPName"` + DataType string `json:"DataType"` + IPName string `json:"IPName"` DataItems []DataItemLAv1 `json:"DataItems"` } // ContainerLogBlob represents the object corresponding to the payload that is sent to the ODS end point type ContainerLogBlobLAv2 struct { - DataType string `json:"DataType"` - IPName string `json:"IPName"` + DataType string `json:"DataType"` + IPName string `json:"IPName"` DataItems []DataItemLAv2 `json:"DataItems"` } @@ -361,6 +358,7 @@ const ( // DataType to be used as enum per data type socket client creation type DataType int + const ( // DataType to be used as enum per data type socket client creation ContainerLogV2 DataType = iota @@ -628,12 +626,12 @@ func flushKubeMonAgentEventRecords() { Log(message) SendException(message) } else { - msgPackEntry := MsgPackEntry{ + msgPackEntry := MsgPackEntry{ Record: stringMap, } - msgPackEntries = append(msgPackEntries, msgPackEntry) - } - } + msgPackEntries = append(msgPackEntries, msgPackEntry) + } + } } } @@ -670,8 +668,8 @@ func flushKubeMonAgentEventRecords() { msgPackEntry := MsgPackEntry{ Record: stringMap, } - msgPackEntries = append(msgPackEntries, msgPackEntry) - } + msgPackEntries = append(msgPackEntries, msgPackEntry) + } } } } @@ -713,18 +711,18 @@ func flushKubeMonAgentEventRecords() { } else { if err := json.Unmarshal(jsonBytes, &stringMap); err != nil { message := fmt.Sprintf("Error while UnMarshalling json bytes to stringmap: %s", err.Error()) - Log(message) - SendException(message) + Log(message) + SendException(message) } else { msgPackEntry := MsgPackEntry{ Record: stringMap, - } - msgPackEntries = append(msgPackEntries, msgPackEntry) + } + msgPackEntries = append(msgPackEntries, msgPackEntry) } } } } - if (IsWindows == false && len(msgPackEntries) > 0) { //for linux, mdsd route + if IsWindows == false && len(msgPackEntries) > 0 { //for linux, mdsd route if IsAADMSIAuthMode == true && strings.HasPrefix(MdsdKubeMonAgentEventsTagName, MdsdOutputStreamIdTagPrefix) == false { Log("Info::mdsd::obtaining output stream id for data type: %s", KubeMonAgentEventDataType) MdsdKubeMonAgentEventsTagName = extension.GetInstance(FLBLogger, ContainerType).GetOutputStreamId(KubeMonAgentEventDataType) @@ -757,7 +755,7 @@ func flushKubeMonAgentEventRecords() { } else { numRecords := len(msgPackEntries) Log("FlushKubeMonAgentEventRecords::Info::Successfully flushed %d records that was %d bytes in %s", numRecords, bts, elapsed) - // Send telemetry to AppInsights resource + // Send telemetry to AppInsights resource SendEvent(KubeMonAgentEventsFlushedEvent, telemetryDimensions) } } else { @@ -788,8 +786,8 @@ func flushKubeMonAgentEventRecords() { if IsAADMSIAuthMode == true { IngestionAuthTokenUpdateMutex.Lock() - ingestionAuthToken := ODSIngestionAuthToken - IngestionAuthTokenUpdateMutex.Unlock() + ingestionAuthToken := ODSIngestionAuthToken + IngestionAuthTokenUpdateMutex.Unlock() if ingestionAuthToken == "" { Log("Error::ODS Ingestion Auth Token is empty. Please check error log.") } @@ -910,77 +908,77 @@ func PostTelegrafMetricsToLA(telegrafRecords []map[interface{}]interface{}) int var msgPackEntries []MsgPackEntry var i int start := time.Now() - var elapsed time.Duration + var elapsed time.Duration for i = 0; i < len(laMetrics); i++ { - var interfaceMap map[string]interface{} - stringMap := make(map[string]string) - jsonBytes, err := json.Marshal(*laMetrics[i]) - if err != nil { - message := fmt.Sprintf("PostTelegrafMetricsToLA::Error:when marshalling json %q", err) + var interfaceMap map[string]interface{} + stringMap := make(map[string]string) + jsonBytes, err := json.Marshal(*laMetrics[i]) + if err != nil { + message := fmt.Sprintf("PostTelegrafMetricsToLA::Error:when marshalling json %q", err) + Log(message) + SendException(message) + return output.FLB_OK + } else { + if err := json.Unmarshal(jsonBytes, &interfaceMap); err != nil { + message := fmt.Sprintf("Error while UnMarshalling json bytes to interfaceMap: %s", err.Error()) Log(message) SendException(message) return output.FLB_OK } else { - if err := json.Unmarshal(jsonBytes, &interfaceMap); err != nil { - message := fmt.Sprintf("Error while UnMarshalling json bytes to interfaceMap: %s", err.Error()) - Log(message) - SendException(message) - return output.FLB_OK - } else { - for key, value := range interfaceMap { - strKey := fmt.Sprintf("%v", key) - strValue := fmt.Sprintf("%v", value) - stringMap[strKey] = strValue - } - msgPackEntry := MsgPackEntry{ - Record: stringMap, - } - msgPackEntries = append(msgPackEntries, msgPackEntry) + for key, value := range interfaceMap { + strKey := fmt.Sprintf("%v", key) + strValue := fmt.Sprintf("%v", value) + stringMap[strKey] = strValue + } + msgPackEntry := MsgPackEntry{ + Record: stringMap, } + msgPackEntries = append(msgPackEntries, msgPackEntry) } + } } - if (len(msgPackEntries) > 0) { - if IsAADMSIAuthMode == true && (strings.HasPrefix(MdsdInsightsMetricsTagName, MdsdOutputStreamIdTagPrefix) == false) { - Log("Info::mdsd::obtaining output stream id for InsightsMetricsDataType since Log Analytics AAD MSI Auth Enabled") - MdsdInsightsMetricsTagName = extension.GetInstance(FLBLogger, ContainerType).GetOutputStreamId(InsightsMetricsDataType) - } - msgpBytes := convertMsgPackEntriesToMsgpBytes(MdsdInsightsMetricsTagName, msgPackEntries) + if len(msgPackEntries) > 0 { + if IsAADMSIAuthMode == true && (strings.HasPrefix(MdsdInsightsMetricsTagName, MdsdOutputStreamIdTagPrefix) == false) { + Log("Info::mdsd::obtaining output stream id for InsightsMetricsDataType since Log Analytics AAD MSI Auth Enabled") + MdsdInsightsMetricsTagName = extension.GetInstance(FLBLogger, ContainerType).GetOutputStreamId(InsightsMetricsDataType) + } + msgpBytes := convertMsgPackEntriesToMsgpBytes(MdsdInsightsMetricsTagName, msgPackEntries) + if MdsdInsightsMetricsMsgpUnixSocketClient == nil { + Log("Error::mdsd::mdsd connection does not exist. re-connecting ...") + CreateMDSDClient(InsightsMetrics, ContainerType) if MdsdInsightsMetricsMsgpUnixSocketClient == nil { - Log("Error::mdsd::mdsd connection does not exist. re-connecting ...") - CreateMDSDClient(InsightsMetrics, ContainerType) - if MdsdInsightsMetricsMsgpUnixSocketClient == nil { - Log("Error::mdsd::Unable to create mdsd client for insights metrics. Please check error log.") - ContainerLogTelemetryMutex.Lock() - defer ContainerLogTelemetryMutex.Unlock() - InsightsMetricsMDSDClientCreateErrors += 1 - return output.FLB_RETRY - } - } - - deadline := 10 * time.Second - MdsdInsightsMetricsMsgpUnixSocketClient.SetWriteDeadline(time.Now().Add(deadline)) //this is based of clock time, so cannot reuse - bts, er := MdsdInsightsMetricsMsgpUnixSocketClient.Write(msgpBytes) - - elapsed = time.Since(start) - - if er != nil { - Log("Error::mdsd::Failed to write to mdsd %d records after %s. Will retry ... error : %s", len(msgPackEntries), elapsed, er.Error()) - UpdateNumTelegrafMetricsSentTelemetry(0, 1, 0) - if MdsdInsightsMetricsMsgpUnixSocketClient != nil { - MdsdInsightsMetricsMsgpUnixSocketClient.Close() - MdsdInsightsMetricsMsgpUnixSocketClient = nil - } - + Log("Error::mdsd::Unable to create mdsd client for insights metrics. Please check error log.") ContainerLogTelemetryMutex.Lock() defer ContainerLogTelemetryMutex.Unlock() InsightsMetricsMDSDClientCreateErrors += 1 return output.FLB_RETRY - } else { - numTelegrafMetricsRecords := len(msgPackEntries) - UpdateNumTelegrafMetricsSentTelemetry(numTelegrafMetricsRecords, 0, 0) - Log("Success::mdsd::Successfully flushed %d telegraf metrics records that was %d bytes to mdsd in %s ", numTelegrafMetricsRecords, bts, elapsed) } + } + + deadline := 10 * time.Second + MdsdInsightsMetricsMsgpUnixSocketClient.SetWriteDeadline(time.Now().Add(deadline)) //this is based of clock time, so cannot reuse + bts, er := MdsdInsightsMetricsMsgpUnixSocketClient.Write(msgpBytes) + + elapsed = time.Since(start) + + if er != nil { + Log("Error::mdsd::Failed to write to mdsd %d records after %s. Will retry ... error : %s", len(msgPackEntries), elapsed, er.Error()) + UpdateNumTelegrafMetricsSentTelemetry(0, 1, 0) + if MdsdInsightsMetricsMsgpUnixSocketClient != nil { + MdsdInsightsMetricsMsgpUnixSocketClient.Close() + MdsdInsightsMetricsMsgpUnixSocketClient = nil + } + + ContainerLogTelemetryMutex.Lock() + defer ContainerLogTelemetryMutex.Unlock() + InsightsMetricsMDSDClientCreateErrors += 1 + return output.FLB_RETRY + } else { + numTelegrafMetricsRecords := len(msgPackEntries) + UpdateNumTelegrafMetricsSentTelemetry(numTelegrafMetricsRecords, 0, 0) + Log("Success::mdsd::Successfully flushed %d telegraf metrics records that was %d bytes to mdsd in %s ", numTelegrafMetricsRecords, bts, elapsed) + } } } else { // for windows, ODS direct @@ -1117,12 +1115,12 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { stringMap = make(map[string]string) //below id & name are used by latency telemetry in both v1 & v2 LA schemas id := "" - name := "" + name := "" logEntry := ToString(record["log"]) logEntryTimeStamp := ToString(record["time"]) //ADX Schema & LAv2 schema are almost the same (except resourceId) - if (ContainerLogSchemaV2 == true || ContainerLogsRouteADX == true) { + if ContainerLogSchemaV2 == true || ContainerLogsRouteADX == true { stringMap["Computer"] = Computer stringMap["ContainerId"] = containerID stringMap["ContainerName"] = containerName @@ -1171,29 +1169,29 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { stringMap["AzureResourceId"] = "" } dataItemADX = DataItemADX{ - TimeGenerated: stringMap["TimeGenerated"], - Computer: stringMap["Computer"], - ContainerId: stringMap["ContainerId"], - ContainerName: stringMap["ContainerName"], - PodName: stringMap["PodName"], - PodNamespace: stringMap["PodNamespace"], - LogMessage: stringMap["LogMessage"], - LogSource: stringMap["LogSource"], - AzureResourceId: stringMap["AzureResourceId"], + TimeGenerated: stringMap["TimeGenerated"], + Computer: stringMap["Computer"], + ContainerId: stringMap["ContainerId"], + ContainerName: stringMap["ContainerName"], + PodName: stringMap["PodName"], + PodNamespace: stringMap["PodNamespace"], + LogMessage: stringMap["LogMessage"], + LogSource: stringMap["LogSource"], + AzureResourceId: stringMap["AzureResourceId"], } //ADX dataItemsADX = append(dataItemsADX, dataItemADX) } else { - if (ContainerLogSchemaV2 == true) { + if ContainerLogSchemaV2 == true { dataItemLAv2 = DataItemLAv2{ - TimeGenerated: stringMap["TimeGenerated"], - Computer: stringMap["Computer"], - ContainerId: stringMap["ContainerId"], - ContainerName: stringMap["ContainerName"], - PodName: stringMap["PodName"], - PodNamespace: stringMap["PodNamespace"], - LogMessage: stringMap["LogMessage"], - LogSource: stringMap["LogSource"], + TimeGenerated: stringMap["TimeGenerated"], + Computer: stringMap["Computer"], + ContainerId: stringMap["ContainerId"], + ContainerName: stringMap["ContainerName"], + PodName: stringMap["PodName"], + PodNamespace: stringMap["PodNamespace"], + LogMessage: stringMap["LogMessage"], + LogSource: stringMap["LogSource"], } //ODS-v2 schema dataItemsLAv2 = append(dataItemsLAv2, dataItemLAv2) @@ -1211,10 +1209,10 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { Image: stringMap["Image"], Name: stringMap["Name"], } - //ODS-v1 schema - dataItemsLAv1 = append(dataItemsLAv1, dataItemLAv1) - name = stringMap["Name"] - id = stringMap["Id"] + //ODS-v1 schema + dataItemsLAv1 = append(dataItemsLAv1, dataItemLAv1) + name = stringMap["Name"] + id = stringMap["Id"] } } @@ -1364,18 +1362,18 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { numContainerLogRecords = len(dataItemsADX) Log("Success::ADX::Successfully wrote %d container log records to ADX in %s", numContainerLogRecords, elapsed) - } else if ((ContainerLogSchemaV2 == true && len(dataItemsLAv2) > 0) || len(dataItemsLAv1) > 0) { //ODS + } else if (ContainerLogSchemaV2 == true && len(dataItemsLAv2) > 0) || len(dataItemsLAv1) > 0 { //ODS var logEntry interface{} recordType := "" loglinesCount := 0 //schema v2 - if (len(dataItemsLAv2) > 0 && ContainerLogSchemaV2 == true) { + if len(dataItemsLAv2) > 0 && ContainerLogSchemaV2 == true { logEntry = ContainerLogBlobLAv2{ DataType: ContainerLogV2DataType, IPName: IPName, DataItems: dataItemsLAv2} - loglinesCount = len(dataItemsLAv2) - recordType = "ContainerLogV2" + loglinesCount = len(dataItemsLAv2) + recordType = "ContainerLogV2" } else { //schema v1 if len(dataItemsLAv1) > 0 { @@ -1383,8 +1381,8 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { DataType: ContainerLogDataType, IPName: IPName, DataItems: dataItemsLAv1} - loglinesCount = len(dataItemsLAv1) - recordType = "ContainerLog" + loglinesCount = len(dataItemsLAv1) + recordType = "ContainerLog" } } @@ -1416,7 +1414,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { return output.FLB_RETRY } // add authorization header to the req - req.Header.Set("Authorization", "Bearer "+ingestionAuthToken) + req.Header.Set("Authorization", "Bearer "+ingestionAuthToken) } resp, err := HTTPClient.Do(req) @@ -1444,7 +1442,7 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { numContainerLogRecords = loglinesCount Log("PostDataHelper::Info::Successfully flushed %d %s records to ODS in %s", numContainerLogRecords, recordType, elapsed) - } + } ContainerLogTelemetryMutex.Lock() defer ContainerLogTelemetryMutex.Unlock() @@ -1558,7 +1556,7 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { Log("Container Type %s", ContainerType) osType := os.Getenv("OS_TYPE") - IsWindows = false + IsWindows = false // Linux if strings.Compare(strings.ToLower(osType), "windows") != 0 { Log("Reading configuration for Linux from %s", pluginConfPath) @@ -1703,7 +1701,7 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { ContainerLogsRouteADX = false if strings.Compare(ContainerLogsRoute, ContainerLogsADXRoute) == 0 { - // Try to read the ADX database name from environment variables. Default to DefaultAdsDatabaseName if not set. + // Try to read the ADX database name from environment variables. Default to DefaultAdsDatabaseName if not set. // This SHOULD be set by tomlparser.rb so it's a highly unexpected event if it isn't. // It should be set by the logic in tomlparser.rb EVEN if ADX logging isn't enabled AdxDatabaseName := strings.TrimSpace(os.Getenv("AZMON_ADX_DATABASE_NAME")) @@ -1747,10 +1745,7 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { fmt.Fprintf(os.Stdout, "Routing container logs thru %s route...\n", ContainerLogsADXRoute) } } else if strings.Compare(strings.ToLower(osType), "windows") != 0 { //for linux, oneagent will be default route - ContainerLogsRouteV2 = true //default is mdsd route - if strings.Compare(ContainerLogsRoute, ContainerLogsV1Route) == 0 { - ContainerLogsRouteV2 = false //fallback option when hiddensetting set - } + ContainerLogsRouteV2 = true //default is mdsd route Log("Routing container logs thru %s route...", ContainerLogsRoute) fmt.Fprintf(os.Stdout, "Routing container logs thru %s route... \n", ContainerLogsRoute) } @@ -1768,14 +1763,14 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { Log("Creating MDSD clients for KubeMonAgentEvents & InsightsMetrics") CreateMDSDClient(KubeMonAgentEvents, ContainerType) CreateMDSDClient(InsightsMetrics, ContainerType) - } + } ContainerLogSchemaVersion := strings.TrimSpace(strings.ToLower(os.Getenv("AZMON_CONTAINER_LOG_SCHEMA_VERSION"))) Log("AZMON_CONTAINER_LOG_SCHEMA_VERSION:%s", ContainerLogSchemaVersion) - ContainerLogSchemaV2 = false //default is v1 schema + ContainerLogSchemaV2 = false //default is v1 schema - if strings.Compare(ContainerLogSchemaVersion, ContainerLogV2SchemaVersion) == 0 && ContainerLogsRouteADX != true { + if strings.Compare(ContainerLogSchemaVersion, ContainerLogV2SchemaVersion) == 0 && ContainerLogsRouteADX != true { ContainerLogSchemaV2 = true Log("Container logs schema=%s", ContainerLogV2SchemaVersion) fmt.Fprintf(os.Stdout, "Container logs schema=%s... \n", ContainerLogV2SchemaVersion) @@ -1801,15 +1796,15 @@ func InitializePlugin(pluginConfPath string, agentVersion string) { if ContainerLogSchemaV2 == true { MdsdContainerLogTagName = MdsdContainerLogV2SourceName } else { - MdsdContainerLogTagName = MdsdContainerLogSourceName - } + MdsdContainerLogTagName = MdsdContainerLogSourceName + } MdsdInsightsMetricsTagName = MdsdInsightsMetricsSourceName - MdsdKubeMonAgentEventsTagName = MdsdKubeMonAgentEventsSourceName + MdsdKubeMonAgentEventsTagName = MdsdKubeMonAgentEventsSourceName Log("ContainerLogsRouteADX: %v, IsWindows: %v, IsAADMSIAuthMode = %v \n", ContainerLogsRouteADX, IsWindows, IsAADMSIAuthMode) if !ContainerLogsRouteADX && IsWindows && IsAADMSIAuthMode { Log("defaultIngestionAuthTokenRefreshIntervalSeconds = %d \n", defaultIngestionAuthTokenRefreshIntervalSeconds) - IngestionAuthTokenRefreshTicker = time.NewTicker(time.Second * time.Duration(defaultIngestionAuthTokenRefreshIntervalSeconds)) + IngestionAuthTokenRefreshTicker = time.NewTicker(time.Second * time.Duration(defaultIngestionAuthTokenRefreshIntervalSeconds)) go refreshIngestionAuthToken() } }