diff --git a/README.md b/README.md index 0c543e716..6e0285fe6 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# AKS Container Health monitoring +# Azure Monitor for Containers ## Code of Conduct @@ -9,7 +9,24 @@ additional questions or comments. ## Release History -### 10/16/2018 - Version microsoft/oms:ciprod10162018 + +### 11/29/2018 - Version microsoft/oms:ciprod11292018 +- Disable Container Image inventory workflow +- Kube_Events memory leak fix for replica-set +- Timeout (30 secs) for outOMS +- Reduce critical lock duration for quicker log processing (for log enrichment) +- Disable OMI based Container Inventory workflow to fluentD based Container Inventory +- Moby support for the new Container Inventory workflow +- Ability to disable environment variables collection by individual container +- Bugfix - No inventory data due to container status(es) not available +- Agent telemetry cpu usage & memory usage (for DaemonSet and ReplicaSet) +- Agent telemetry - log generation rate +- Agent telemetry - container count per node +- Agent telemetry - collect container logs from agent (DaemonSet and ReplicaSet) as AI trace +- Agent telemetry - errors/exceptions for Container Inventory workflow +- Agent telemetry - Container Inventory Heartbeat + +### 10/16/2018 - Version microsoft/oms:ciprod10162018-2 - Fix for containerID being 00000-00000-00000 - Move from fluentD to fluentbit for container log collection - Seg fault fixes in json parsing for container inventory & container image inventory @@ -40,4 +57,4 @@ additional questions or comments. - Kubernetes RBAC enablement - Latest released omsagent (1.6.0-42) - Bug fix so that we do not collect kube-system namespace container logs when kube api calls fail occasionally (Bug #215107) -- .yaml changes (for RBAC) \ No newline at end of file +- .yaml changes (for RBAC) diff --git a/installer/conf/container.conf b/installer/conf/container.conf index 17317871c..798bd8eb6 100755 --- a/installer/conf/container.conf +++ b/installer/conf/container.conf @@ -9,22 +9,10 @@ # Container inventory - type omi - run_interval 60s - tag oms.container.containerinventory - items [ - ["root/cimv2","Container_ContainerInventory"] - ] - - -# Image inventory - - type omi - run_interval 60s - tag oms.container.imageinventory - items [ - ["root/cimv2","Container_ImageInventory"] - ] + type containerinventory + tag oms.containerinsights.containerinventory + run_interval 60s + log_level debug # Container host inventory @@ -45,11 +33,6 @@ log_level debug -# Filter for correct format to endpoint - - type filter_container - - type out_oms_api log_level debug @@ -63,33 +46,22 @@ max_retry_wait 9m - + type out_oms log_level debug + num_threads 5 buffer_chunk_limit 20m buffer_type file buffer_path %STATE_DIR_WS%/out_oms_containerinventory*.buffer buffer_queue_limit 20 + buffer_queue_full_action drop_oldest_chunk flush_interval 20s retry_limit 10 - retry_wait 15s - max_retry_wait 9m - - - - type out_oms - log_level debug - buffer_chunk_limit 20m - buffer_type file - buffer_path %STATE_DIR_WS%/out_oms_imageinventory*.buffer - buffer_queue_limit 20 - flush_interval 20s - retry_limit 10 - retry_wait 15s + retry_wait 30s max_retry_wait 9m - + type out_oms log_level debug num_threads 5 diff --git a/installer/conf/td-agent-bit.conf b/installer/conf/td-agent-bit.conf index b39587a97..c3252a185 100644 --- a/installer/conf/td-agent-bit.conf +++ b/installer/conf/td-agent-bit.conf @@ -14,9 +14,19 @@ Path_Key filepath Skip_Long_Lines On +[INPUT] + Name tail + Tag oms.container.log.flbplugin.* + Path /var/log/containers/omsagent*.log + DB /var/opt/microsoft/docker-cimprov/state/omsagent-ai.db + Mem_Buf_Limit 30m + Path_Key filepath + Skip_Long_Lines On + [OUTPUT] Name oms EnableTelemetry true TelemetryPushIntervalSeconds 300 Match oms.container.log.* - AgentVersion ciprod10162018-2 + AgentVersion ciprod11292018 + diff --git a/installer/datafiles/base_container.data b/installer/datafiles/base_container.data index 85a128b2a..7181929e2 100644 --- a/installer/datafiles/base_container.data +++ b/installer/datafiles/base_container.data @@ -37,6 +37,57 @@ MAINTAINER: 'Microsoft Corporation' /opt/microsoft/omsagent/plugin/in_kube_services.rb; source/code/plugin/in_kube_services.rb; 644; root; root /opt/microsoft/omsagent/plugin/in_kube_nodes.rb; source/code/plugin/in_kube_nodes.rb; 644; root; root +/opt/microsoft/omsagent/plugin/ApplicationInsightsUtility.rb; source/code/plugin/ApplicationInsightsUtility.rb; 644; root; root +/opt/microsoft/omsagent/plugin/ContainerInventoryState.rb; source/code/plugin/ContainerInventoryState.rb; 644; root; root +/opt/microsoft/omsagent/plugin/DockerApiClient.rb; source/code/plugin/DockerApiClient.rb; 644; root; root +/opt/microsoft/omsagent/plugin/DockerApiRestHelper.rb; source/code/plugin/DockerApiRestHelper.rb; 644; root; root +/opt/microsoft/omsagent/plugin/in_containerinventory.rb; source/code/plugin/in_containerinventory.rb; 644; root; root + +/opt/microsoft/omsagent/plugin/lib/application_insights/version.rb; source/code/plugin/lib/application_insights/version.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/rack/track_request.rb; source/code/plugin/lib/application_insights/rack/track_request.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/unhandled_exception.rb; source/code/plugin/lib/application_insights/unhandled_exception.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/telemetry_client.rb; source/code/plugin/lib/application_insights/telemetry_client.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/queue_base.rb; source/code/plugin/lib/application_insights/channel/queue_base.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/asynchronous_queue.rb; source/code/plugin/lib/application_insights/channel/asynchronous_queue.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/synchronous_sender.rb; source/code/plugin/lib/application_insights/channel/synchronous_sender.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/data_point_type.rb; source/code/plugin/lib/application_insights/channel/contracts/data_point_type.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/data_point.rb; source/code/plugin/lib/application_insights/channel/contracts/data_point.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/stack_frame.rb; source/code/plugin/lib/application_insights/channel/contracts/stack_frame.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/request_data.rb; source/code/plugin/lib/application_insights/channel/contracts/request_data.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/session.rb; source/code/plugin/lib/application_insights/channel/contracts/session.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/page_view_data.rb; source/code/plugin/lib/application_insights/channel/contracts/page_view_data.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/remote_dependency_data.rb; source/code/plugin/lib/application_insights/channel/contracts/remote_dependency_data.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/exception_data.rb; source/code/plugin/lib/application_insights/channel/contracts/exception_data.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/location.rb; source/code/plugin/lib/application_insights/channel/contracts/location.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/operation.rb; source/code/plugin/lib/application_insights/channel/contracts/operation.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/data.rb; source/code/plugin/lib/application_insights/channel/contracts/data.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/event_data.rb; source/code/plugin/lib/application_insights/channel/contracts/event_data.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/metric_data.rb; source/code/plugin/lib/application_insights/channel/contracts/metric_data.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/device.rb; source/code/plugin/lib/application_insights/channel/contracts/device.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/message_data.rb; source/code/plugin/lib/application_insights/channel/contracts/message_data.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/dependency_source_type.rb; source/code/plugin/lib/application_insights/channel/contracts/dependency_source_type.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/user.rb; source/code/plugin/lib/application_insights/channel/contracts/user.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/severity_level.rb; source/code/plugin/lib/application_insights/channel/contracts/severity_level.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/application.rb; source/code/plugin/lib/application_insights/channel/contracts/application.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/dependency_kind.rb; source/code/plugin/lib/application_insights/channel/contracts/dependency_kind.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/cloud.rb; source/code/plugin/lib/application_insights/channel/contracts/cloud.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/envelope.rb; source/code/plugin/lib/application_insights/channel/contracts/envelope.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/json_serializable.rb; source/code/plugin/lib/application_insights/channel/contracts/json_serializable.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/domain.rb; source/code/plugin/lib/application_insights/channel/contracts/domain.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/base.rb; source/code/plugin/lib/application_insights/channel/contracts/base.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/reopenings.rb; source/code/plugin/lib/application_insights/channel/contracts/reopenings.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/page_view_perf_data.rb; source/code/plugin/lib/application_insights/channel/contracts/page_view_perf_data.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/internal.rb; source/code/plugin/lib/application_insights/channel/contracts/internal.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/availability_data.rb; source/code/plugin/lib/application_insights/channel/contracts/availability_data.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts/exception_details.rb; source/code/plugin/lib/application_insights/channel/contracts/exception_details.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/synchronous_queue.rb; source/code/plugin/lib/application_insights/channel/synchronous_queue.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/sender_base.rb; source/code/plugin/lib/application_insights/channel/sender_base.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/telemetry_context.rb; source/code/plugin/lib/application_insights/channel/telemetry_context.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/asynchronous_sender.rb; source/code/plugin/lib/application_insights/channel/asynchronous_sender.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/telemetry_channel.rb; source/code/plugin/lib/application_insights/channel/telemetry_channel.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/event.rb; source/code/plugin/lib/application_insights/channel/event.rb; 644; root; root +/opt/microsoft/omsagent/plugin/lib/application_insights.rb; source/code/plugin/lib/application_insights.rb; 644; root; root + /opt/td-agent-bit/bin/out_oms.so; intermediate/${{BUILD_CONFIGURATION}}/out_oms.so; 755; root; root /etc/opt/microsoft/docker-cimprov/td-agent-bit.conf; installer/conf/td-agent-bit.conf; 644; root; root /etc/opt/microsoft/docker-cimprov/out_oms.conf; installer/conf/out_oms.conf; 644; root; root @@ -75,12 +126,17 @@ MAINTAINER: 'Microsoft Corporation' /var/opt/microsoft/docker-cimprov; 755; root; root /var/opt/microsoft/docker-cimprov/state; 755; root; root /var/opt/microsoft/docker-cimprov/state/ContainerInventory; 755; root; root -/var/opt/microsoft/docker-cimprov/state/ImageInventory; 755; root; root /var/opt/microsoft/docker-cimprov/log; 755; root; root /opt/td-agent-bit; 755; root; root;sysdir /opt/td-agent-bit/bin; 755; root; root;sysdir +/opt/microsoft/omsagent/plugin/lib; 755; root; root; sysdir +/opt/microsoft/omsagent/plugin/lib/application_insights; 755; root; root; sysdir +/opt/microsoft/omsagent/plugin/lib/application_insights/channel; 755; root; root; sysdir +/opt/microsoft/omsagent/plugin/lib/application_insights/channel/contracts; 755; root; root; sysdir +/opt/microsoft/omsagent/plugin/lib/application_insights/rack; 755; root; root; sysdir + %Dependencies %Postinstall_10 @@ -90,6 +146,9 @@ WriteInstallInfo() { } WriteInstallInfo +#Make omsagent owner for ContainerInventory directory. This is needed for ruby plugin to have access +chown omsagent:omsagent /var/opt/microsoft/docker-cimprov/state/ContainerInventory + # Get the state file in place with proper permissions touch /var/opt/microsoft/docker-cimprov/state/LastEventQueryTime.txt chmod 644 /var/opt/microsoft/docker-cimprov/state/LastEventQueryTime.txt diff --git a/source/code/go/src/plugins/oms.go b/source/code/go/src/plugins/oms.go index 807e00937..9876acc42 100644 --- a/source/code/go/src/plugins/oms.go +++ b/source/code/go/src/plugins/oms.go @@ -55,15 +55,17 @@ var ( IgnoreIDSet map[string]bool // DataUpdateMutex read and write mutex access to the container id set DataUpdateMutex = &sync.Mutex{} + // ContainerLogTelemetryMutex read and write mutex access to the Container Log Telemetry + ContainerLogTelemetryMutex = &sync.Mutex{} // ClientSet for querying KubeAPIs ClientSet *kubernetes.Clientset ) var ( // KubeSystemContainersRefreshTicker updates the kube-system containers - KubeSystemContainersRefreshTicker = time.NewTicker(time.Second * 300) + KubeSystemContainersRefreshTicker *time.Ticker // ContainerImageNameRefreshTicker updates the container image and names periodically - ContainerImageNameRefreshTicker = time.NewTicker(time.Second * 60) + ContainerImageNameRefreshTicker *time.Ticker ) var ( @@ -99,6 +101,7 @@ func createLogger() *log.Logger { fmt.Printf("File Exists. Opening file in append mode...\n") logfile, err = os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0600) if err != nil { + SendException(err.Error()) fmt.Printf(err.Error()) } } @@ -107,6 +110,7 @@ func createLogger() *log.Logger { fmt.Printf("File Doesnt Exist. Creating file...\n") logfile, err = os.Create(path) if err != nil { + SendException(err.Error()) fmt.Printf(err.Error()) } } @@ -134,7 +138,9 @@ func updateContainerImageNameMaps() { pods, err := ClientSet.CoreV1().Pods("").List(metav1.ListOptions{}) if err != nil { - Log("Error getting pods %s\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) + message := fmt.Sprintf("Error getting pods %s\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) + Log(message) + SendException(message) continue } @@ -171,7 +177,9 @@ func updateKubeSystemContainerIDs() { pods, err := ClientSet.CoreV1().Pods("kube-system").List(metav1.ListOptions{}) if err != nil { - Log("Error getting pods %s\nIt is ok to log here and continue. Kube-system logs will be collected", err.Error()) + message := fmt.Sprintf("Error getting pods %s\nIt is ok to log here and continue. Kube-system logs will be collected", err.Error()) + SendException(message) + Log(message) continue } @@ -194,35 +202,47 @@ func updateKubeSystemContainerIDs() { // PostDataHelper sends data to the OMS endpoint func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { - defer DataUpdateMutex.Unlock() - start := time.Now() var dataItems []DataItem + ignoreIDSet := make(map[string]bool) + imageIDMap := make(map[string]string) + nameIDMap := make(map[string]string) + DataUpdateMutex.Lock() + for k, v := range IgnoreIDSet { + ignoreIDSet[k] = v + } + for k, v := range ImageIDMap { + imageIDMap[k] = v + } + for k, v := range NameIDMap { + nameIDMap[k] = v + } + DataUpdateMutex.Unlock() for _, record := range tailPluginRecords { - containerID := GetContainerIDFromFilePath(toString(record["filepath"])) + containerID := GetContainerIDFromFilePath(ToString(record["filepath"])) - if containerID == "" || containsKey(IgnoreIDSet, containerID) { + if containerID == "" || containsKey(ignoreIDSet, containerID) { continue } stringMap := make(map[string]string) - stringMap["LogEntry"] = toString(record["log"]) - stringMap["LogEntrySource"] = toString(record["stream"]) - stringMap["LogEntryTimeStamp"] = toString(record["time"]) + stringMap["LogEntry"] = ToString(record["log"]) + stringMap["LogEntrySource"] = ToString(record["stream"]) + stringMap["LogEntryTimeStamp"] = ToString(record["time"]) stringMap["SourceSystem"] = "Containers" stringMap["Id"] = containerID - if val, ok := ImageIDMap[containerID]; ok { + if val, ok := imageIDMap[containerID]; ok { stringMap["Image"] = val } else { Log("ContainerId %s not present in Map ", containerID) } - if val, ok := NameIDMap[containerID]; ok { + if val, ok := nameIDMap[containerID]; ok { stringMap["Name"] = val } else { Log("ContainerId %s not present in Map ", containerID) @@ -250,7 +270,9 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { marshalled, err := json.Marshal(logEntry) if err != nil { - Log("Error while Marshalling log Entry: %s", err.Error()) + message := fmt.Sprintf("Error while Marshalling log Entry: %s", err.Error()) + Log(message) + SendException(message) return output.FLB_OK } req, _ := http.NewRequest("POST", OMSEndpoint, bytes.NewBuffer(marshalled)) @@ -260,8 +282,11 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { elapsed := time.Since(start) if err != nil { - Log("Error when sending request %s \n", err.Error()) + message := fmt.Sprintf("Error when sending request %s \n", err.Error()) + Log(message) + SendException(message) Log("Failed to flush %d records after %s", len(dataItems), elapsed) + return output.FLB_RETRY } @@ -274,8 +299,10 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int { numRecords := len(dataItems) Log("Successfully flushed %d records in %s", numRecords, elapsed) + ContainerLogTelemetryMutex.Lock() FlushedRecordsCount += float64(numRecords) FlushedRecordsTimeTaken += float64(elapsed / time.Millisecond) + ContainerLogTelemetryMutex.Unlock() } return output.FLB_OK @@ -286,16 +313,6 @@ func containsKey(currentMap map[string]bool, key string) bool { return c } -func toString(s interface{}) string { - switch t := s.(type) { - case []byte: - // prevent encoding to base64 - return string(t) - default: - return "" - } -} - // GetContainerIDFromFilePath Gets the container ID From the file Path func GetContainerIDFromFilePath(filepath string) string { start := strings.LastIndex(filepath, "-") @@ -310,7 +327,7 @@ func GetContainerIDFromFilePath(filepath string) string { } // InitializePlugin reads and populates plugin configuration -func InitializePlugin(pluginConfPath string) { +func InitializePlugin(pluginConfPath string, agentVersion string) { IgnoreIDSet = make(map[string]bool) ImageIDMap = make(map[string]string) @@ -318,14 +335,20 @@ func InitializePlugin(pluginConfPath string) { pluginConfig, err := ReadConfiguration(pluginConfPath) if err != nil { - Log("Error Reading plugin config path : %s \n", err.Error()) - log.Fatalf("Error Reading plugin config path : %s \n", err.Error()) + message := fmt.Sprintf("Error Reading plugin config path : %s \n", err.Error()) + Log(message) + SendException(message) + time.Sleep(30 * time.Second) + log.Fatalln(message) } omsadminConf, err := ReadConfiguration(pluginConfig["omsadmin_conf_path"]) if err != nil { - Log(err.Error()) - log.Fatalf("Error Reading omsadmin configuration %s\n", err.Error()) + message := fmt.Sprintf("Error Reading omsadmin configuration %s\n", err.Error()) + Log(message) + SendException(message) + time.Sleep(30 * time.Second) + log.Fatalln(message) } OMSEndpoint = omsadminConf["OMS_ENDPOINT"] WorkspaceID = omsadminConf["WORKSPACE_ID"] @@ -334,7 +357,9 @@ func InitializePlugin(pluginConfPath string) { // Initialize image,name map refresh ticker containerInventoryRefreshInterval, err := strconv.Atoi(pluginConfig["container_inventory_refresh_interval"]) if err != nil { - Log("Error Reading Container Inventory Refresh Interval %s", err.Error()) + message := fmt.Sprintf("Error Reading Container Inventory Refresh Interval %s", err.Error()) + Log(message) + SendException(message) Log("Using Default Refresh Interval of %d s\n", defaultContainerInventoryRefreshInterval) containerInventoryRefreshInterval = defaultContainerInventoryRefreshInterval } @@ -344,7 +369,9 @@ func InitializePlugin(pluginConfPath string) { // Initialize Kube System Refresh Ticker kubeSystemContainersRefreshInterval, err := strconv.Atoi(pluginConfig["kube_system_containers_refresh_interval"]) if err != nil { - Log("Error Reading Kube System Container Ids Refresh Interval %s", err.Error()) + message := fmt.Sprintf("Error Reading Kube System Container Ids Refresh Interval %s", err.Error()) + Log(message) + SendException(message) Log("Using Default Refresh Interval of %d s\n", defaultKubeSystemContainersRefreshInterval) kubeSystemContainersRefreshInterval = defaultKubeSystemContainersRefreshInterval } @@ -356,20 +383,33 @@ func InitializePlugin(pluginConfPath string) { if err != nil { // It is ok to log here and continue, because only the Computer column will be missing, // which can be deduced from a combination of containerId, and docker logs on the node - Log("Error when reading containerHostName file %s.\n It is ok to log here and continue, because only the Computer column will be missing, which can be deduced from a combination of containerId, and docker logs on the nodes\n", err.Error()) + message := fmt.Sprintf("Error when reading containerHostName file %s.\n It is ok to log here and continue, because only the Computer column will be missing, which can be deduced from a combination of containerId, and docker logs on the nodes\n", err.Error()) + Log(message) + SendException(message) } - Computer = strings.TrimSuffix(toString(containerHostName), "\n") + Computer = strings.TrimSuffix(ToString(containerHostName), "\n") Log("Computer == %s \n", Computer) + ret, err := InitializeTelemetryClient(agentVersion) + if ret != 0 || err != nil { + message := fmt.Sprintf("Error During Telemetry Initialization :%s", err.Error()) + fmt.Printf(message) + Log(message) + } + // Initialize KubeAPI Client config, err := rest.InClusterConfig() if err != nil { - Log("Error getting config %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) + message := fmt.Sprintf("Error getting config %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) + Log(message) + SendException(message) } ClientSet, err = kubernetes.NewForConfig(config) if err != nil { - Log("Error getting clientset %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) + message := fmt.Sprintf("Error getting clientset %s.\nIt is ok to log here and continue, because the logs will be missing image and Name, but the logs will still have the containerID", err.Error()) + SendException(message) + Log(message) } PluginConfiguration = pluginConfig diff --git a/source/code/go/src/plugins/out_oms.go b/source/code/go/src/plugins/out_oms.go index 732ae5216..133e0f039 100644 --- a/source/code/go/src/plugins/out_oms.go +++ b/source/code/go/src/plugins/out_oms.go @@ -19,12 +19,12 @@ func FLBPluginRegister(ctx unsafe.Pointer) int { // ctx (context) pointer to fluentbit context (state/ c code) func FLBPluginInit(ctx unsafe.Pointer) int { Log("Initializing out_oms go plugin for fluentbit") - InitializePlugin(ContainerLogPluginConfFilePath) + agentVersion := output.FLBPluginConfigKey(ctx, "AgentVersion") + InitializePlugin(ContainerLogPluginConfFilePath, agentVersion) enableTelemetry := output.FLBPluginConfigKey(ctx, "EnableTelemetry") if strings.Compare(strings.ToLower(enableTelemetry), "true") == 0 { telemetryPushInterval := output.FLBPluginConfigKey(ctx, "TelemetryPushIntervalSeconds") - agentVersion := output.FLBPluginConfigKey(ctx, "AgentVersion") - go SendContainerLogFlushRateMetric(telemetryPushInterval, agentVersion) + go SendContainerLogPluginMetrics(telemetryPushInterval) } else { Log("Telemetry is not enabled for the plugin %s \n", output.FLBPluginConfigKey(ctx, "Name")) return output.FLB_OK @@ -34,7 +34,6 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlush func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { - var count int var ret int var record map[interface{}]interface{} var records []map[interface{}]interface{} @@ -43,7 +42,6 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { dec := output.NewDecoder(data, int(length)) // Iterate Records - count = 0 for { // Extract Record ret, _, record = output.GetRecord(dec) @@ -51,8 +49,13 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int { break } records = append(records, record) - count++ } + + incomingTag := C.GoString(tag) + if strings.Contains(strings.ToLower(incomingTag), "oms.container.log.flbplugin") { + return PushToAppInsightsTraces(records) + } + return PostDataHelper(records) } diff --git a/source/code/go/src/plugins/telemetry.go b/source/code/go/src/plugins/telemetry.go index b1bc4439b..5952ac9ac 100644 --- a/source/code/go/src/plugins/telemetry.go +++ b/source/code/go/src/plugins/telemetry.go @@ -4,12 +4,12 @@ import ( "encoding/base64" "errors" "os" - "runtime" "strconv" "strings" "time" "github.com/Microsoft/ApplicationInsights-Go/appinsights" + "github.com/fluent/fluent-bit-go/output" ) var ( @@ -34,15 +34,15 @@ const ( envACSResourceName = "ACS_RESOURCE_NAME" envAppInsightsAuth = "APPLICATIONINSIGHTS_AUTH" metricNameAvgFlushRate = "ContainerLogAvgRecordsFlushedPerSec" + metricNameAvgLogGenerationRate = "ContainerLogsGeneratedPerSec" defaultTelemetryPushIntervalSeconds = 300 eventNameContainerLogInit = "ContainerLogPluginInitialized" eventNameDaemonSetHeartbeat = "ContainerLogDaemonSetHeartbeatEvent" ) -// Initialize initializes the telemetry artifacts -func initialize(telemetryPushIntervalProperty string, agentVersion string) (int, error) { - +// SendContainerLogPluginMetrics is a go-routine that flushes the data periodically (every 5 mins to App Insights) +func SendContainerLogPluginMetrics(telemetryPushIntervalProperty string) { telemetryPushInterval, err := strconv.Atoi(telemetryPushIntervalProperty) if err != nil { Log("Error Converting telemetryPushIntervalProperty %s. Using Default Interval... %d \n", telemetryPushIntervalProperty, defaultTelemetryPushIntervalSeconds) @@ -51,6 +51,49 @@ func initialize(telemetryPushIntervalProperty string, agentVersion string) (int, ContainerLogTelemetryTicker = time.NewTicker(time.Second * time.Duration(telemetryPushInterval)) + start := time.Now() + SendEvent(eventNameContainerLogInit, make(map[string]string)) + + for ; true; <-ContainerLogTelemetryTicker.C { + SendEvent(eventNameDaemonSetHeartbeat, make(map[string]string)) + elapsed := time.Since(start) + ContainerLogTelemetryMutex.Lock() + flushRate := FlushedRecordsCount / FlushedRecordsTimeTaken * 1000 + logRate := FlushedRecordsCount / float64(elapsed/time.Second) + FlushedRecordsCount = 0.0 + FlushedRecordsTimeTaken = 0.0 + ContainerLogTelemetryMutex.Unlock() + + flushRateMetric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate) + TelemetryClient.Track(flushRateMetric) + logRateMetric := appinsights.NewMetricTelemetry(metricNameAvgLogGenerationRate, logRate) + TelemetryClient.Track(logRateMetric) + start = time.Now() + } +} + +// SendEvent sends an event to App Insights +func SendEvent(eventName string, dimensions map[string]string) { + Log("Sending Event : %s\n", eventName) + event := appinsights.NewEventTelemetry(eventName) + + // add any extra Properties + for k, v := range dimensions { + event.Properties[k] = v + } + + TelemetryClient.Track(event) +} + +// SendException send an event to the configured app insights instance +func SendException(err interface{}) { + if TelemetryClient != nil { + TelemetryClient.TrackException(err) + } +} + +// InitializeTelemetryClient sets up the telemetry client to send telemetry to the App Insights instance +func InitializeTelemetryClient(agentVersion string) (int, error) { encodedIkey := os.Getenv(envAppInsightsAuth) if encodedIkey == "" { Log("Environment Variable Missing \n") @@ -87,7 +130,7 @@ func initialize(telemetryPushIntervalProperty string, agentVersion string) (int, CommonProperties["ACSResourceName"] = "" CommonProperties["AKS_RESOURCE_ID"] = aksResourceID splitStrings := strings.Split(aksResourceID, "/") - if len(aksResourceID) > 0 && len(aksResourceID) < 10 { + if len(splitStrings) > 0 && len(splitStrings) < 10 { CommonProperties["SubscriptionID"] = splitStrings[2] CommonProperties["ResourceGroupName"] = splitStrings[4] CommonProperties["ClusterName"] = splitStrings[8] @@ -102,39 +145,14 @@ func initialize(telemetryPushIntervalProperty string, agentVersion string) (int, return 0, nil } -// SendContainerLogFlushRateMetric is a go-routine that flushes the data periodically (every 5 mins to App Insights) -func SendContainerLogFlushRateMetric(telemetryPushIntervalProperty string, agentVersion string) { - - ret, err := initialize(telemetryPushIntervalProperty, agentVersion) - if ret != 0 || err != nil { - Log("Error During Telemetry Initialization :%s", err.Error()) - runtime.Goexit() +// PushToAppInsightsTraces sends the log lines as trace messages to the configured App Insights Instance +func PushToAppInsightsTraces(records []map[interface{}]interface{}) int { + var logLines []string + for _, record := range records { + logLines = append(logLines, ToString(record["log"])) } - SendEvent(eventNameContainerLogInit, make(map[string]string)) - - for ; true; <-ContainerLogTelemetryTicker.C { - SendEvent(eventNameDaemonSetHeartbeat, make(map[string]string)) - DataUpdateMutex.Lock() - flushRate := FlushedRecordsCount / FlushedRecordsTimeTaken * 1000 - Log("Flushed Records : %f Time Taken : %f flush Rate : %f", FlushedRecordsCount, FlushedRecordsTimeTaken, flushRate) - FlushedRecordsCount = 0.0 - FlushedRecordsTimeTaken = 0.0 - DataUpdateMutex.Unlock() - metric := appinsights.NewMetricTelemetry(metricNameAvgFlushRate, flushRate) - TelemetryClient.Track(metric) - } -} - -// SendEvent sends an event to App Insights -func SendEvent(eventName string, dimensions map[string]string) { - Log("Sending Event : %s\n", eventName) - event := appinsights.NewEventTelemetry(eventName) - - // add any extra Properties - for k, v := range dimensions { - event.Properties[k] = v - } - - TelemetryClient.Track(event) + traceEntry := strings.Join(logLines, "\n") + TelemetryClient.TrackTrace(traceEntry, 1) + return output.FLB_OK } diff --git a/source/code/go/src/plugins/utils.go b/source/code/go/src/plugins/utils.go index 1ac9b05a9..85af80d7a 100644 --- a/source/code/go/src/plugins/utils.go +++ b/source/code/go/src/plugins/utils.go @@ -3,10 +3,12 @@ package main import ( "bufio" "crypto/tls" + "fmt" "log" "net/http" "os" "strings" + "time" ) // ReadConfiguration reads a property file @@ -19,7 +21,9 @@ func ReadConfiguration(filename string) (map[string]string, error) { file, err := os.Open(filename) if err != nil { - log.Fatal(err) + SendException(err) + time.Sleep(30 * time.Second) + fmt.Printf("%s", err.Error()) return nil, err } defer file.Close() @@ -39,7 +43,9 @@ func ReadConfiguration(filename string) (map[string]string, error) { } if err := scanner.Err(); err != nil { - log.Fatal(err) + SendException(err) + time.Sleep(30 * time.Second) + log.Fatalf("%s", err.Error()) return nil, err } @@ -48,10 +54,12 @@ func ReadConfiguration(filename string) (map[string]string, error) { // CreateHTTPClient used to create the client for sending post requests to OMSEndpoint func CreateHTTPClient() { - cert, err := tls.LoadX509KeyPair(PluginConfiguration["cert_file_path"], PluginConfiguration["key_file_path"]) if err != nil { - Log("Error when loading cert %s", err.Error()) + message := fmt.Sprintf("Error when loading cert %s", err.Error()) + SendException(message) + time.Sleep(30 * time.Second) + Log(message) log.Fatalf("Error when loading cert %s", err.Error()) } @@ -62,7 +70,21 @@ func CreateHTTPClient() { tlsConfig.BuildNameToCertificate() transport := &http.Transport{TLSClientConfig: tlsConfig} - HTTPClient = http.Client{Transport: transport} + HTTPClient = http.Client{ + Transport: transport, + Timeout: 30 * time.Second, + } Log("Successfully created HTTP Client") } + +// ToString converts an interface into a string +func ToString(s interface{}) string { + switch t := s.(type) { + case []byte: + // prevent encoding to base64 + return string(t) + default: + return "" + } +} diff --git a/source/code/plugin/ApplicationInsightsUtility.rb b/source/code/plugin/ApplicationInsightsUtility.rb new file mode 100644 index 000000000..78553a83f --- /dev/null +++ b/source/code/plugin/ApplicationInsightsUtility.rb @@ -0,0 +1,174 @@ +#!/usr/local/bin/ruby +# frozen_string_literal: true + +class ApplicationInsightsUtility + require_relative 'lib/application_insights' + require_relative 'omslog' + require_relative 'DockerApiClient' + require_relative 'oms_common' + require 'json' + require 'base64' + + @@HeartBeat = 'HeartBeatEvent' + @@Exception = 'ExceptionEvent' + @@AcsClusterType = 'ACS' + @@AksClusterType = 'AKS' + @@DaemonsetControllerType = 'DaemonSet' + @OmsAdminFilePath = '/etc/opt/microsoft/omsagent/conf/omsadmin.conf' + @@EnvAcsResourceName = 'ACS_RESOURCE_NAME' + @@EnvAksRegion = 'AKS_REGION' + @@EnvAgentVersion = 'AGENT_VERSION' + @@EnvApplicationInsightsKey = 'APPLICATIONINSIGHTS_AUTH' + @@CustomProperties = {} + @@Tc = nil + @@hostName = (OMS::Common.get_hostname) + + def initialize + end + + class << self + #Set default properties for telemetry event + def initializeUtility() + begin + resourceInfo = ENV['AKS_RESOURCE_ID'] + if resourceInfo.nil? || resourceInfo.empty? + @@CustomProperties["ACSResourceName"] = ENV[@@EnvAcsResourceName] + @@CustomProperties["ClusterType"] = @@AcsClusterType + @@CustomProperties["SubscriptionID"] = "" + @@CustomProperties["ResourceGroupName"] = "" + @@CustomProperties["ClusterName"] = "" + @@CustomProperties["Region"] = "" + else + @@CustomProperties["AKS_RESOURCE_ID"] = resourceInfo + begin + splitStrings = resourceInfo.split('/') + subscriptionId = splitStrings[2] + resourceGroupName = splitStrings[4] + clusterName = splitStrings[8] + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: parsing AKS resourceId: #{resourceInfo}, error: #{errorStr}") + end + @@CustomProperties["ClusterType"] = @@AksClusterType + @@CustomProperties["SubscriptionID"] = subscriptionId + @@CustomProperties["ResourceGroupName"] = resourceGroupName + @@CustomProperties["ClusterName"] = clusterName + @@CustomProperties["Region"] = ENV[@@EnvAksRegion] + end + @@CustomProperties['ControllerType'] = @@DaemonsetControllerType + dockerInfo = DockerApiClient.dockerInfo + @@CustomProperties['DockerVersion'] = dockerInfo['Version'] + @@CustomProperties['DockerApiVersion'] = dockerInfo['ApiVersion'] + @@CustomProperties['WorkspaceID'] = getWorkspaceId + @@CustomProperties['AgentVersion'] = ENV[@@EnvAgentVersion] + encodedAppInsightsKey = ENV[@@EnvApplicationInsightsKey] + if !encodedAppInsightsKey.nil? + decodedAppInsightsKey = Base64.decode64(encodedAppInsightsKey) + @@Tc = ApplicationInsights::TelemetryClient.new decodedAppInsightsKey + end + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: initilizeUtility - error: #{errorStr}") + end + end + + def sendHeartBeatEvent(pluginName) + begin + eventName = pluginName + @@HeartBeat + if !(@@Tc.nil?) + @@Tc.track_event eventName , :properties => @@CustomProperties + @@Tc.flush + $log.info("AppInsights Heartbeat Telemetry sent successfully") + end + rescue =>errorStr + $log.warn("Exception in AppInsightsUtility: sendHeartBeatEvent - error: #{errorStr}") + end + end + + def sendCustomEvent(pluginName, properties) + begin + if !(@@Tc.nil?) + @@Tc.track_metric 'LastProcessedContainerInventoryCount', properties['ContainerCount'], + :kind => ApplicationInsights::Channel::Contracts::DataPointType::MEASUREMENT, + :properties => @@CustomProperties + @@Tc.flush + $log.info("AppInsights Container Count Telemetry sent successfully") + end + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: sendCustomEvent - error: #{errorStr}") + end + end + + def sendExceptionTelemetry(errorStr) + begin + if @@CustomProperties.empty? || @@CustomProperties.nil? + initializeUtility + end + if !(@@Tc.nil?) + @@Tc.track_exception errorStr , :properties => @@CustomProperties + @@Tc.flush + $log.info("AppInsights Exception Telemetry sent successfully") + end + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: sendExceptionTelemetry - error: #{errorStr}") + end + end + + #Method to send heartbeat and container inventory count + def sendTelemetry(pluginName, properties) + begin + if @@CustomProperties.empty? || @@CustomProperties.nil? + initializeUtility + end + @@CustomProperties['Computer'] = properties['Computer'] + sendHeartBeatEvent(pluginName) + sendCustomEvent(pluginName, properties) + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: sendTelemetry - error: #{errorStr}") + end + end + + #Method to send metric. It will merge passed-in properties with common custom properties + def sendMetricTelemetry(metricName, metricValue, properties) + begin + if (metricName.empty? || metricName.nil?) + $log.warn("SendMetricTelemetry: metricName is missing") + return + end + if @@CustomProperties.empty? || @@CustomProperties.nil? + initializeUtility + end + telemetryProps = {} + telemetryProps["Computer"] = @@hostName + # add common dimensions + @@CustomProperties.each{ |k,v| telemetryProps[k]=v} + # add passed-in dimensions if any + if (!properties.nil? && !properties.empty?) + properties.each{ |k,v| telemetryProps[k]=v} + end + if !(@@Tc.nil?) + @@Tc.track_metric metricName, metricValue, + :kind => ApplicationInsights::Channel::Contracts::DataPointType::MEASUREMENT, + :properties => telemetryProps + @@Tc.flush + $log.info("AppInsights metric Telemetry #{metricName} sent successfully") + end + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: sendMetricTelemetry - error: #{errorStr}") + end + end + + def getWorkspaceId() + begin + adminConf = {} + confFile = File.open(@OmsAdminFilePath, "r") + confFile.each_line do |line| + splitStrings = line.split('=') + adminConf[splitStrings[0]] = splitStrings[1] + end + workspaceId = adminConf['WORKSPACE_ID'] + return workspaceId + rescue => errorStr + $log.warn("Exception in AppInsightsUtility: getWorkspaceId - error: #{errorStr}") + end + end + end +end \ No newline at end of file diff --git a/source/code/plugin/CAdvisorMetricsAPIClient.rb b/source/code/plugin/CAdvisorMetricsAPIClient.rb index c10cbad4a..9e47e5a9e 100644 --- a/source/code/plugin/CAdvisorMetricsAPIClient.rb +++ b/source/code/plugin/CAdvisorMetricsAPIClient.rb @@ -12,6 +12,7 @@ class CAdvisorMetricsAPIClient require_relative 'oms_common' require_relative 'KubernetesApiClient' + require_relative 'ApplicationInsightsUtility' @LogPath = "/var/opt/microsoft/docker-cimprov/log/kubernetes_perf_log.txt" @Log = Logger.new(@LogPath, 2, 10*1048576) #keep last 2 files, max log file size = 10M @@ -19,6 +20,8 @@ class CAdvisorMetricsAPIClient @@rxBytesTimeLast = nil @@txBytesLast = nil @@txBytesTimeLast = nil + @@telemetryCpuMetricTimeTracker = DateTime.now.to_time.to_i + @@telemetryMemoryMetricTimeTracker = DateTime.now.to_time.to_i def initialize end @@ -97,10 +100,15 @@ def getMetrics() def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, metricNametoReturn) metricItems = [] clusterId = KubernetesApiClient.getClusterId + timeDifference = (DateTime.now.to_time.to_i - @@telemetryCpuMetricTimeTracker).abs + timeDifferenceInMinutes = timeDifference/60 begin metricInfo = metricJSON metricInfo['pods'].each do |pod| podUid = pod['podRef']['uid'] + podName = pod['podRef']['name'] + podNamespace = pod['podRef']['namespace'] + if (!pod['containers'].nil?) pod['containers'].each do |container| #cpu metric @@ -124,9 +132,29 @@ def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, met metricProps['Collections'].push(metricCollections) metricItem['DataItems'].push(metricProps) metricItems.push(metricItem) + #Telemetry about agent performance + begin + # we can only do this much now. Ideally would like to use the docker image repository to find our pods/containers + # cadvisor does not have pod/container metadata. so would need more work to cache as pv & use + if (podName.downcase.start_with?('omsagent-') && podNamespace.eql?("kube-system") && containerName.downcase.start_with?('omsagent') && metricNametoReturn.eql?("cpuUsageNanoCores")) + + if (timeDifferenceInMinutes >= 10) + telemetryProps = {} + telemetryProps['PodName'] = podName + telemetryProps['ContainerName'] = containerName + ApplicationInsightsUtility.sendMetricTelemetry(metricNametoReturn, metricValue, telemetryProps) + end + end + rescue => errorStr + $log.warn("Exception while generating Telemetry from getcontainerCpuMetricItems failed: #{errorStr} for metric #{cpuMetricNameToCollect}") + end end end end + # reset time outside pod iterator as we use one timer per metric for 2 pods (ds & rs) + if (timeDifferenceInMinutes >= 10 && metricNametoReturn.eql?("cpuUsageNanoCores")) + @@telemetryCpuMetricTimeTracker = DateTime.now.to_time.to_i + end rescue => error @Log.warn("getcontainerCpuMetricItems failed: #{error} for metric #{cpuMetricNameToCollect}") return metricItems @@ -137,10 +165,14 @@ def getContainerCpuMetricItems(metricJSON, hostName, cpuMetricNameToCollect, met def getContainerMemoryMetricItems(metricJSON, hostName, memoryMetricNameToCollect, metricNametoReturn) metricItems = [] clusterId = KubernetesApiClient.getClusterId + timeDifference = (DateTime.now.to_time.to_i - @@telemetryMemoryMetricTimeTracker).abs + timeDifferenceInMinutes = timeDifference/60 begin metricInfo = metricJSON metricInfo['pods'].each do |pod| podUid = pod['podRef']['uid'] + podName = pod['podRef']['name'] + podNamespace = pod['podRef']['namespace'] if (!pod['containers'].nil?) pod['containers'].each do |container| containerName = container['name'] @@ -164,9 +196,28 @@ def getContainerMemoryMetricItems(metricJSON, hostName, memoryMetricNameToCollec metricProps['Collections'].push(metricCollections) metricItem['DataItems'].push(metricProps) metricItems.push(metricItem) + #Telemetry about agent performance + begin + # we can only do this much now. Ideally would like to use the docker image repository to find our pods/containers + # cadvisor does not have pod/container metadata. so would need more work to cache as pv & use + if (podName.downcase.start_with?('omsagent-') && podNamespace.eql?("kube-system") && containerName.downcase.start_with?('omsagent') && metricNametoReturn.eql?("memoryRssBytes")) + if (timeDifferenceInMinutes >= 10) + telemetryProps = {} + telemetryProps['PodName'] = podName + telemetryProps['ContainerName'] = containerName + ApplicationInsightsUtility.sendMetricTelemetry(metricNametoReturn, metricValue, telemetryProps) + end + end + rescue => errorStr + $log.warn("Exception while generating Telemetry from getcontainerMemoryMetricItems failed: #{errorStr} for metric #{memoryMetricNameToCollect}") + end end end end + # reset time outside pod iterator as we use one timer per metric for 2 pods (ds & rs) + if (timeDifferenceInMinutes >= 10 && metricNametoReturn.eql?("memoryRssBytes")) + @@telemetryMemoryMetricTimeTracker = DateTime.now.to_time.to_i + end rescue => error @Log.warn("getcontainerMemoryMetricItems failed: #{error} for metric #{memoryMetricNameToCollect}") @Log.warn metricJSON diff --git a/source/code/plugin/ContainerInventoryState.rb b/source/code/plugin/ContainerInventoryState.rb new file mode 100644 index 000000000..7e5ca18e8 --- /dev/null +++ b/source/code/plugin/ContainerInventoryState.rb @@ -0,0 +1,65 @@ +#!/usr/local/bin/ruby +# frozen_string_literal: true + +class ContainerInventoryState + require 'json' + require_relative 'omslog' + @@InventoryDirectory = "/var/opt/microsoft/docker-cimprov/state/ContainerInventory/" + + def initialize + end + + class << self + # Write the container information to disk with the data that is obtained from the current plugin execution + def writeContainerState(container) + containerId = container['InstanceID'] + if !containerId.nil? && !containerId.empty? + begin + file = File.open(@@InventoryDirectory + containerId, "w") + if !file.nil? + file.write(container.to_json) + file.close + else + $log.warn("Exception while opening file with id: #{containerId}") + end + rescue => errorStr + $log.warn("Exception in writeContainerState: #{errorStr}") + end + end + end + + # Reads the container state for the deleted container + def readContainerState(containerId) + begin + containerObject = nil + filepath = @@InventoryDirectory + containerId + file = File.open(filepath, "r") + if !file.nil? + fileContents = file.read + containerObject = JSON.parse(fileContents) + file.close + # Delete the file since the state is update to deleted + File.delete(filepath) if File.exist?(filepath) + else + $log.warn("Open file for container with id returned nil: #{containerId}") + end + rescue => errorStr + $log.warn("Exception in readContainerState: #{errorStr}") + end + return containerObject + end + + # Gets the containers that were written to the disk with the previous plugin invocation but do not exist in the current container list + # Doing this because we need to update the container state to deleted. Else this will stay running forever. + def getDeletedContainers(containerIds) + deletedContainers = nil + begin + previousContainerList = Dir.entries(@@InventoryDirectory) - [".", ".."] + deletedContainers = previousContainerList - containerIds + rescue => errorStr + $log.warn("Exception in getDeletedContainers: #{errorStr}") + end + return deletedContainers + end + end +end \ No newline at end of file diff --git a/source/code/plugin/DockerApiClient.rb b/source/code/plugin/DockerApiClient.rb new file mode 100644 index 000000000..e12ef13ec --- /dev/null +++ b/source/code/plugin/DockerApiClient.rb @@ -0,0 +1,163 @@ +#!/usr/local/bin/ruby +# frozen_string_literal: true + +class DockerApiClient + + require 'socket' + require 'json' + require 'timeout' + require_relative 'omslog' + require_relative 'DockerApiRestHelper' + require_relative 'ApplicationInsightsUtility' + + @@SocketPath = "/var/run/docker.sock" + @@ChunkSize = 4096 + @@TimeoutInSeconds = 5 + @@PluginName = 'ContainerInventory' + def initialize + end + + class << self + # Make docker socket call for requests + def getResponse(request, isMultiJson, isVersion) + begin + socket = UNIXSocket.new(@@SocketPath) + dockerResponse = "" + isTimeOut = false + socket.write(request) + # iterate through the response until the last chunk is less than the chunk size so that we can read all data in socket. + loop do + begin + responseChunk = "" + timeout(@@TimeoutInSeconds) do + responseChunk = socket.recv(@@ChunkSize) + end + dockerResponse += responseChunk + rescue Timeout::Error + $log.warn("Socket read timedout for request: #{request} @ #{Time.now.utc.iso8601}") + isTimeOut = true + break + end + break if (isVersion)? (responseChunk.length < @@ChunkSize) : (responseChunk.end_with? "0\r\n\r\n") + end + socket.close + return (isTimeOut)? nil : parseResponse(dockerResponse, isMultiJson) + rescue => errorStr + $log.warn("Socket call failed for request: #{request} error: #{errorStr} , isMultiJson: #{isMultiJson} @ #{Time.now.utc.iso8601}") + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + end + + def parseResponse(dockerResponse, isMultiJson) + # Doing this because the response is in the raw format and includes headers. + # Need to do a regex match to extract the json part of the response - Anything between [{}] in response + parsedJsonResponse = nil + begin + jsonResponse = isMultiJson ? dockerResponse[/\[{.+}\]/] : dockerResponse[/{.+}/] + rescue => errorStr + $log.warn("Regex match for docker response failed: #{errorStr} , isMultiJson: #{isMultiJson} @ #{Time.now.utc.iso8601}") + end + begin + if jsonResponse != nil + parsedJsonResponse = JSON.parse(jsonResponse) + end + rescue => errorStr + $log.warn("Json parsing for docker response failed: #{errorStr} , isMultiJson: #{isMultiJson} @ #{Time.now.utc.iso8601}") + ApplicationInsightsUtility.sendExceptionTelemetry(errorStr) + end + return parsedJsonResponse + end + + + def getDockerHostName() + dockerHostName = "" + request = DockerApiRestHelper.restDockerInfo + response = getResponse(request, false, false) + if (response != nil) + dockerHostName = response['Name'] + end + return dockerHostName + end + + def listContainers() + ids = [] + request = DockerApiRestHelper.restDockerPs + containers = getResponse(request, true, false) + if !containers.nil? && !containers.empty? + containers.each do |container| + ids.push(container['Id']) + end + end + return ids + end + + # This method splits the tag value into an array - repository, image and tag + def getImageRepositoryImageTag(tagValue) + result = ["", "", ""] + begin + if !tagValue.empty? + # Find delimiters in the string of format repository/image:imagetag + slashLocation = tagValue.index('/') + colonLocation = tagValue.index(':') + if !colonLocation.nil? + if slashLocation.nil? + # image:imagetag + result[1] = tagValue[0..(colonLocation-1)] + else + # repository/image:imagetag + result[0] = tagValue[0..(slashLocation-1)] + result[1] = tagValue[(slashLocation + 1)..(colonLocation - 1)] + end + result[2] = tagValue[(colonLocation + 1)..-1] + end + end + rescue => errorStr + $log.warn("Exception at getImageRepositoryImageTag: #{errorStr} @ #{Time.now.utc.iso8601}") + end + return result + end + + # Image is in the format repository/image:imagetag - This method creates a hash of image id and repository, image and tag + def getImageIdMap() + result = nil + begin + request = DockerApiRestHelper.restDockerImages + images = getResponse(request, true, false) + if !images.nil? && !images.empty? + result = {} + images.each do |image| + tagValue = "" + tags = image['RepoTags'] + if !tags.nil? && tags.kind_of?(Array) && tags.length > 0 + tagValue = tags[0] + end + idValue = image['Id'] + if !idValue.nil? + result[idValue] = getImageRepositoryImageTag(tagValue) + end + end + end + rescue => errorStr + $log.warn("Exception at getImageIdMap: #{errorStr} @ #{Time.now.utc.iso8601}") + end + return result + end + + def dockerInspectContainer(id) + request = DockerApiRestHelper.restDockerInspect(id) + return getResponse(request, false, false) + end + + # This method returns docker version and docker api version for telemetry + def dockerInfo() + request = DockerApiRestHelper.restDockerVersion + response = getResponse(request, false, true) + dockerInfo = {} + if (response != nil) + dockerInfo['Version'] = response['Version'] + dockerInfo['ApiVersion'] = response['ApiVersion'] + end + return dockerInfo + end + end +end diff --git a/source/code/plugin/DockerApiRestHelper.rb b/source/code/plugin/DockerApiRestHelper.rb new file mode 100644 index 000000000..76361b122 --- /dev/null +++ b/source/code/plugin/DockerApiRestHelper.rb @@ -0,0 +1,55 @@ +#!/usr/local/bin/ruby +# frozen_string_literal: true + +class DockerApiRestHelper + def initialize + end + + class << self + # Create the REST request to list images + # https://docs.docker.com/engine/reference/api/docker_remote_api_v1.21/#list-images + # returns Request in string format + def restDockerImages() + begin + return "GET /images/json?all=0 HTTP/1.1\r\nHost: localhost\r\n\r\n"; + end + end + + # Create the REST request to list containers + # https://docs.docker.com/engine/reference/api/docker_remote_api_v1.21/#list-containers + # returns Request in string format + def restDockerPs() + begin + return "GET /containers/json?all=1 HTTP/1.1\r\nHost: localhost\r\n\r\n"; + end + end + + # Create the REST request to inspect a container + # https://docs.docker.com/engine/reference/api/docker_remote_api_v1.21/#inspect-a-container + # parameter - ID of the container to be inspected + # returns Request in string format + def restDockerInspect(id) + begin + return "GET /containers/" + id + "/json HTTP/1.1\r\nHost: localhost\r\n\r\n"; + end + end + + # Create the REST request to get docker info + # https://docs.docker.com/engine/reference/api/docker_remote_api_v1.21/#get-container-stats-based-on-resource-usage + # returns Request in string format + def restDockerInfo() + begin + return "GET /info HTTP/1.1\r\nHost: localhost\r\n\r\n"; + end + end + + # Create the REST request to get docker info + # https://docs.docker.com/engine/api/v1.21/#21-containers + # returns Request in string format + def restDockerVersion() + begin + return "GET /version HTTP/1.1\r\nHost: localhost\r\n\r\n"; + end + end + end +end \ No newline at end of file diff --git a/source/code/plugin/in_containerinventory.rb b/source/code/plugin/in_containerinventory.rb new file mode 100644 index 000000000..f501421a2 --- /dev/null +++ b/source/code/plugin/in_containerinventory.rb @@ -0,0 +1,271 @@ +#!/usr/local/bin/ruby +# frozen_string_literal: true + +module Fluent + + class Container_Inventory_Input < Input + Plugin.register_input('containerinventory', self) + + @@PluginName = 'ContainerInventory' + @@RunningState = 'Running' + @@FailedState = 'Failed' + @@StoppedState = 'Stopped' + @@PausedState = 'Paused' + + def initialize + super + require 'json' + require_relative 'DockerApiClient' + require_relative 'ContainerInventoryState' + require_relative 'ApplicationInsightsUtility' + require_relative 'omslog' + end + + config_param :run_interval, :time, :default => '1m' + config_param :tag, :string, :default => "oms.containerinsights.containerinventory" + + def configure (conf) + super + end + + def start + if @run_interval + @finished = false + @condition = ConditionVariable.new + @mutex = Mutex.new + @thread = Thread.new(&method(:run_periodic)) + @@telemetryTimeTracker = DateTime.now.to_time.to_i + end + end + + def shutdown + if @run_interval + @mutex.synchronize { + @finished = true + @condition.signal + } + @thread.join + end + end + + def obtainContainerConfig(instance, container) + begin + configValue = container['Config'] + if !configValue.nil? + instance['ContainerHostname'] = configValue['Hostname'] + + envValue = configValue['Env'] + envValueString = (envValue.nil?) ? "" : envValue.to_s + # Skip environment variable processing if it contains the flag AZMON_COLLECT_ENV=FALSE + if /AZMON_COLLECT_ENV=FALSE/i.match(envValueString) + envValueString = ["AZMON_COLLECT_ENV=FALSE"] + $log.warn("Environment Variable collection for container: #{container['Id']} skipped because AZMON_COLLECT_ENV is set to false") + end + # Restricting the ENV string value to 200kb since the size of this string can go very high + if envValueString.length > 200000 + envValueStringTruncated = envValueString.slice(0..200000) + lastIndex = envValueStringTruncated.rindex("\", ") + if !lastIndex.nil? + envValueStringTruncated = envValueStringTruncated.slice(0..lastIndex) + "]" + end + instance['EnvironmentVar'] = envValueStringTruncated + else + instance['EnvironmentVar'] = envValueString + end + + cmdValue = configValue['Cmd'] + cmdValueString = (cmdValue.nil?) ? "" : cmdValue.to_s + instance['Command'] = cmdValueString + + instance['ComposeGroup'] = "" + labelsValue = configValue['Labels'] + if !labelsValue.nil? && !labelsValue.empty? + instance['ComposeGroup'] = labelsValue['com.docker.compose.project'] + end + else + $log.warn("Attempt in ObtainContainerConfig to get container: #{container['Id']} config information returned null") + end + rescue => errorStr + $log.warn("Exception in obtainContainerConfig: #{errorStr}") + end + end + + def obtainContainerState(instance, container) + begin + stateValue = container['State'] + if !stateValue.nil? + exitCodeValue = stateValue['ExitCode'] + # Exit codes less than 0 are not supported by the engine + if exitCodeValue < 0 + exitCodeValue = 128 + $log.info("obtainContainerState::Container: #{container['Id']} returned negative exit code") + end + instance['ExitCode'] = exitCodeValue + if exitCodeValue > 0 + instance['State'] = @@FailedState + else + # Set the Container status : Running/Paused/Stopped + runningValue = stateValue['Running'] + if runningValue + pausedValue = stateValue['Paused'] + # Checking for paused within running is true state because docker returns true for both Running and Paused fields when the container is paused + if pausedValue + instance['State'] = @@PausedState + else + instance['State'] = @@RunningState + end + else + instance['State'] = @@StoppedState + end + end + instance['StartedTime'] = stateValue['StartedAt'] + instance['FinishedTime'] = stateValue['FinishedAt'] + else + $log.info("Attempt in ObtainContainerState to get container: #{container['Id']} state information returned null") + end + rescue => errorStr + $log.warn("Exception in obtainContainerState: #{errorStr}") + end + end + + def obtainContainerHostConfig(instance, container) + begin + hostConfig = container['HostConfig'] + if !hostConfig.nil? + links = hostConfig['Links'] + instance['Links'] = "" + if !links.nil? + linksString = links.to_s + instance['Links'] = (linksString == "null")? "" : linksString + end + portBindings = hostConfig['PortBindings'] + instance['Ports'] = "" + if !portBindings.nil? + portBindingsString = portBindings.to_s + instance['Ports'] = (portBindingsString == "null")? "" : portBindingsString + end + else + $log.info("Attempt in ObtainContainerHostConfig to get container: #{container['Id']} host config information returned null") + end + rescue => errorStr + $log.warn("Exception in obtainContainerHostConfig: #{errorStr}") + end + end + + def inspectContainer(id, nameMap) + containerInstance = {} + begin + container = DockerApiClient.dockerInspectContainer(id) + if !container.nil? && !container.empty? + containerInstance['InstanceID'] = container['Id'] + containerInstance['CreatedTime'] = container['Created'] + containerName = container['Name'] + if !containerName.nil? && !containerName.empty? + # Remove the leading / from the name if it exists (this is an API issue) + containerInstance['ElementName'] = (containerName[0] == '/') ? containerName[1..-1] : containerName + end + imageValue = container['Image'] + if !imageValue.nil? && !imageValue.empty? + containerInstance['ImageId'] = imageValue + repoImageTagArray = nameMap[imageValue] + if nameMap.has_key? imageValue + containerInstance['Repository'] = repoImageTagArray[0] + containerInstance['Image'] = repoImageTagArray[1] + containerInstance['ImageTag'] = repoImageTagArray[2] + end + end + obtainContainerConfig(containerInstance, container); + obtainContainerState(containerInstance, container); + obtainContainerHostConfig(containerInstance, container); + end + rescue => errorStr + $log.warn("Exception in inspectContainer: #{errorStr} for container: #{id}") + end + return containerInstance + end + + def enumerate + currentTime = Time.now + emitTime = currentTime.to_f + batchTime = currentTime.utc.iso8601 + containerInventory = Array.new + $log.info("in_container_inventory::enumerate : Begin processing @ #{Time.now.utc.iso8601}") + hostname = DockerApiClient.getDockerHostName + begin + containerIds = DockerApiClient.listContainers + if !containerIds.empty? + eventStream = MultiEventStream.new + nameMap = DockerApiClient.getImageIdMap + containerIds.each do |containerId| + inspectedContainer = {} + inspectedContainer = inspectContainer(containerId, nameMap) + inspectedContainer['Computer'] = hostname + inspectedContainer['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated + containerInventory.push inspectedContainer + ContainerInventoryState.writeContainerState(inspectedContainer) + end + # Update the state for deleted containers + deletedContainers = ContainerInventoryState.getDeletedContainers(containerIds) + if !deletedContainers.nil? && !deletedContainers.empty? + deletedContainers.each do |deletedContainer| + container = ContainerInventoryState.readContainerState(deletedContainer) + if !container.nil? + container.each{|k,v| container[k]=v} + container['State'] = "Deleted" + containerInventory.push container + end + end + end + + containerInventory.each do |record| + wrapper = { + "DataType"=>"CONTAINER_INVENTORY_BLOB", + "IPName"=>"ContainerInsights", + "DataItems"=>[record.each{|k,v| record[k]=v}] + } + eventStream.add(emitTime, wrapper) if wrapper + end + router.emit_stream(@tag, eventStream) if eventStream + @@istestvar = ENV['ISTEST'] + if (!@@istestvar.nil? && !@@istestvar.empty? && @@istestvar.casecmp('true') == 0 && eventStream.count > 0) + $log.info("containerInventoryEmitStreamSuccess @ #{Time.now.utc.iso8601}") + end + timeDifference = (DateTime.now.to_time.to_i - @@telemetryTimeTracker).abs + timeDifferenceInMinutes = timeDifference/60 + if (timeDifferenceInMinutes >= 5) + @@telemetryTimeTracker = DateTime.now.to_time.to_i + telemetryProperties = {} + telemetryProperties['Computer'] = hostname + telemetryProperties['ContainerCount'] = containerInventory.length + ApplicationInsightsUtility.sendTelemetry(@@PluginName, telemetryProperties) + end + $log.info("in_container_inventory::enumerate : Processing complete - emitted stream @ #{Time.now.utc.iso8601}") + end + rescue => errorStr + $log.warn("Exception in enumerate container inventory: #{errorStr}") + end + end + + def run_periodic + @mutex.lock + done = @finished + until done + @condition.wait(@mutex, @run_interval) + done = @finished + @mutex.unlock + if !done + begin + $log.info("in_container_inventory::run_periodic @ #{Time.now.utc.iso8601}") + enumerate + rescue => errorStr + $log.warn "in_container_inventory::run_periodic: Failed in enumerate container inventory: #{errorStr}" + end + end + @mutex.lock + end + @mutex.unlock + end + + end # Container_Inventory_Input + +end # module \ No newline at end of file diff --git a/source/code/plugin/in_kube_events.rb b/source/code/plugin/in_kube_events.rb index 6a6ae9296..5df31df95 100644 --- a/source/code/plugin/in_kube_events.rb +++ b/source/code/plugin/in_kube_events.rb @@ -10,7 +10,6 @@ class Kube_Event_Input < Input def initialize super - require 'yaml' require 'json' require_relative 'KubernetesApiClient' @@ -62,6 +61,7 @@ def enumerate(eventList = nil) eventStream = MultiEventStream.new events['items'].each do |items| record = {} + # - Not sure if ingestion has the below mapping for this custom type. Fix it as part of fixed type conversion record['CollectionTime'] = batchTime #This is the time that is mapped to become TimeGenerated eventId = items['metadata']['uid'] + "/" + items['count'].to_s newEventQueryState.push(eventId) @@ -86,7 +86,7 @@ def enumerate(eventList = nil) end record['ClusterName'] = KubernetesApiClient.getClusterName record['ClusterId'] = KubernetesApiClient.getClusterId - eventStream.add(emitTime, record) if record + eventStream.add(emitTime, record) if record end router.emit_stream(@tag, eventStream) if eventStream end @@ -121,7 +121,10 @@ def getEventQueryState eventQueryState = [] begin if File.file?(@@KubeEventsStateFile) - eventQueryState = YAML.load_file(@@KubeEventsStateFile, []) + # Do not read the entire file in one shot as it spikes memory (50+MB) for ~5k events + File.foreach(@@KubeEventsStateFile) do |line| + eventQueryState.push(line.chomp) #puts will append newline which needs to be removed + end end rescue => errorStr $log.warn $log.warn line.dump, error: errorStr.to_s @@ -132,7 +135,12 @@ def getEventQueryState def writeEventQueryState(eventQueryState) begin - File.write(@@KubeEventsStateFile, eventQueryState.to_yaml) + if(!eventQueryState.nil? && !eventQueryState.empty?) + # No need to close file handle (f) due to block scope + File.open(@@KubeEventsStateFile, "w") do |f| + f.puts(eventQueryState) + end + end rescue => errorStr $log.warn $log.warn line.dump, error: errorStr.to_s $log.debug_backtrace(errorStr.backtrace) diff --git a/source/code/plugin/in_kube_podinventory.rb b/source/code/plugin/in_kube_podinventory.rb index 2cd1e1bc3..ec76bac61 100644 --- a/source/code/plugin/in_kube_podinventory.rb +++ b/source/code/plugin/in_kube_podinventory.rb @@ -101,7 +101,7 @@ def parse_and_emit_records(podInventory, serviceList) #podStatus # the below is for accounting 'NodeLost' scenario, where-in the pod(s) in the lost node is still being reported as running podReadyCondition = true - if !items['status']['reason'].nil? && items['status']['reason'] == "NodeLost" + if !items['status']['reason'].nil? && items['status']['reason'] == "NodeLost" && !items['status']['conditions'].nil? items['status']['conditions'].each do |condition| if condition['type'] == "Ready" && condition['status'] == "False" podReadyCondition = false diff --git a/source/code/plugin/lib/application_insights.rb b/source/code/plugin/lib/application_insights.rb new file mode 100644 index 000000000..0a683d484 --- /dev/null +++ b/source/code/plugin/lib/application_insights.rb @@ -0,0 +1,9 @@ +require_relative 'application_insights/telemetry_client' +require_relative 'application_insights/unhandled_exception' +require_relative 'application_insights/version' + +module ApplicationInsights + module Rack + autoload :TrackRequest, "application_insights/rack/track_request" + end +end diff --git a/source/code/plugin/lib/application_insights/channel/asynchronous_queue.rb b/source/code/plugin/lib/application_insights/channel/asynchronous_queue.rb new file mode 100644 index 000000000..333f6968b --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/asynchronous_queue.rb @@ -0,0 +1,58 @@ +require_relative 'event' +require_relative 'queue_base' + +module ApplicationInsights + module Channel + # An asynchronous queue for use in conjunction with the {AsynchronousSender}. + # The queue will notify the sender that it needs to pick up items when it + # reaches {#max_queue_length}, or when the consumer calls {#flush} via the + # {#flush_notification} event. + # + # @example + # require 'application_insights' + # require 'thread' + # queue = ApplicationInsights::Channel::AsynchronousQueue.new nil + # Thread.new do + # sleep 1 + # queue.push 1 + # queue.flush + # end + # queue.flush_notification.wait + # queue.flush_notification.clear + # result = queue.pop + class AsynchronousQueue < QueueBase + # Initializes a new instance of the class. + # @param [SenderBase] sender the sender object that will be used in + # conjunction with this queue. In addition to the sender object must + # support a {AsynchronousSender#start} method which is invoked each time + # an item is pushed to the queue as well as use the {#flush_notification} + # event. + def initialize(sender) + @flush_notification = Event.new + super sender + end + + # The flush notification {ApplicationInsights::Channel::Event} that the {#sender} + # will use to get notified that a flush is needed. + # @return [Event] object that the {#sender} can wait on. + attr_reader :flush_notification + + # Adds the passed in item object to the queue and notifies the {#sender} + # to start an asynchronous send operation + # by calling {AsynchronousSender#start}. + # @param [Contracts::Envelope] item the telemetry envelope object to send + # to the service. + def push(item) + super item + @sender.start if @sender + end + + # Flushes the current queue by notifying the {#sender} via the + # {#flush_notification} event. + def flush + @flush_notification.set + @sender.start if @sender + end + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/asynchronous_sender.rb b/source/code/plugin/lib/application_insights/channel/asynchronous_sender.rb new file mode 100644 index 000000000..da573f08c --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/asynchronous_sender.rb @@ -0,0 +1,133 @@ +require_relative 'sender_base' +require 'thread' + +module ApplicationInsights + module Channel + # An asynchronous sender that works in conjunction with the {AsynchronousQueue}. + # The sender object will start a worker thread that will pull items from the + # {#queue}. The thread will be created when the client calls {#start} and + # will check for queue items every {#send_interval} seconds. The worker thread + # can also be forced to check the queue by setting the + # {AsynchronousQueue#flush_notification} event. + # + # - If no items are found, the thread will go back to sleep. + # - If items are found, the worker thread will send items to the specified + # service in batches of {#send_buffer_size}. + # + # If no queue items are found for {#send_time} seconds, the worker thread + # will shut down (and {#start} will need to be called again). + class AsynchronousSender < SenderBase + SERVICE_ENDPOINT_URI = 'https://dc.services.visualstudio.com/v2/track' + # Initializes a new instance of the class. + # @param [String] service_endpoint_uri the address of the service to send + # telemetry data to. + def initialize(service_endpoint_uri = SERVICE_ENDPOINT_URI) + @send_interval = 1.0 + @send_remaining_time = 0 + @send_time = 3.0 + @lock_work_thread = Mutex.new + @work_thread = nil + @start_notification_processed = true + super service_endpoint_uri + end + + # The time span in seconds at which the the worker thread will check the + # {#queue} for items (defaults to: 1.0). + # @return [Fixnum] the interval in seconds. + attr_accessor :send_interval + + # The time span in seconds for which the worker thread will stay alive if + # no items are found in the {#queue} (defaults to 3.0). + # @return [Fixnum] the interval in seconds. + attr_accessor :send_time + + # The worker thread which checks queue items and send data every + # (#send_interval) seconds or upon flush. + # @return [Thread] the work thread + attr_reader :work_thread + + # Calling this method will create a worker thread that checks the {#queue} + # every {#send_interval} seconds for a total duration of {#send_time} + # seconds for new items. If a worker thread has already been created, + # calling this method does nothing. + def start + @start_notification_processed = false + # Maintain one working thread at one time + unless @work_thread + @lock_work_thread.synchronize do + unless @work_thread + local_send_interval = [@send_interval, 0.1].max + @send_remaining_time = [@send_time, local_send_interval].max + @work_thread = Thread.new { run } + @work_thread.abort_on_exception = false + end + end + end + end + + private + + def run + # save the queue locally + local_queue = @queue + if local_queue.nil? + @work_thread = nil + return + end + + begin + # fix up the send interval (can't be lower than 100ms) + local_send_interval = [@send_interval, 0.1].max + + while true + @start_notification_processed = true + while true + # get at most @send_buffer_size items from the queue + data = [] + @send_buffer_size.downto(1) do + item = local_queue.pop + break if not item + data.push item + end + + # if we didn't get any items from the queue, we're done here + break if data.length == 0 + + # reset the send time + @send_remaining_time = @send_time + + # finally send the data + send data + end + + # wait at most @send_interval ms (or until we get signalled) + result = local_queue.flush_notification.wait local_send_interval + if result + local_queue.flush_notification.clear + next + end + + # decrement the remaining time + @send_remaining_time -= local_send_interval + # If remaining time <=0 and there is no start notification unprocessed, + # then stop the working thread + if @send_remaining_time <= 0 && @start_notification_processed + # Note: there is still a chance some start notification could be + # missed, e.g., the start method got triggered between the above and + # following line. However the data is not lost as it would be + # processed later when next start notification comes after the worker + # thread stops. The cost to ensure no notification miss is high where + # a lock is required each time the start method calls. + @work_thread = nil + break + end + end + rescue Exception => e + # Make sure work_thread sets to nil when it terminates abnormally + @work_thread = nil + @logger.error('application_insights') { "Asynchronous sender work thread terminated abnormally: #{e.to_s}" } + end + end + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/application.rb b/source/code/plugin/lib/application_insights/channel/contracts/application.rb new file mode 100644 index 000000000..071c37385 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/application.rb @@ -0,0 +1,13 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class Application + include JsonSerializable + + attr_accessor :ver + + attribute_mapping( + ver: 'ai.application.ver' + ) + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/availability_data.rb b/source/code/plugin/lib/application_insights/channel/contracts/availability_data.rb new file mode 100644 index 000000000..d560dd15b --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/availability_data.rb @@ -0,0 +1,34 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class AvailabilityData + include JsonSerializable + + attr_accessor :ver, :id, :name, :duration, :success, :run_location, :message, + :properties, :measurements + + attribute_mapping( + ver: 'ver', + id: 'id', + name: 'name', + duration: 'duration', + success: 'success', + run_location: 'runLocation', + message: 'message', + properties: 'properties', + measurements: 'measurements' + ) + + def ver + @ver ||= 2 + end + + def properties + @properties ||= {} + end + + def measurements + @measurements ||= {} + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/base.rb b/source/code/plugin/lib/application_insights/channel/contracts/base.rb new file mode 100644 index 000000000..bb88a4625 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/base.rb @@ -0,0 +1,13 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class Base + include JsonSerializable + + attr_accessor :base_type + + attribute_mapping( + base_type: 'baseType' + ) + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/cloud.rb b/source/code/plugin/lib/application_insights/channel/contracts/cloud.rb new file mode 100644 index 000000000..5aaeeee04 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/cloud.rb @@ -0,0 +1,14 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class Cloud + include JsonSerializable + + attr_accessor :role, :role_instance + + attribute_mapping( + role: 'ai.cloud.role', + role_instance: 'ai.cloud.roleInstance' + ) + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/data.rb b/source/code/plugin/lib/application_insights/channel/contracts/data.rb new file mode 100644 index 000000000..c7184edfd --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/data.rb @@ -0,0 +1,14 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class Data + include JsonSerializable + + attr_accessor :base_type, :base_data + + attribute_mapping( + base_type: 'baseType', + base_data: 'baseData' + ) + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/data_point.rb b/source/code/plugin/lib/application_insights/channel/contracts/data_point.rb new file mode 100644 index 000000000..6556b351b --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/data_point.rb @@ -0,0 +1,25 @@ +require_relative 'json_serializable' +require_relative 'data_point_type' + +module ApplicationInsights::Channel::Contracts + class DataPoint + include JsonSerializable + + attr_accessor :ns, :name, :kind, :value, :count, :min, :max, :std_dev + + attribute_mapping( + ns: 'ns', + name: 'name', + kind: 'kind', + value: 'value', + count: 'count', + min: 'min', + max: 'max', + std_dev: 'stdDev' + ) + + def kind + @kind ||= DataPointType::MEASUREMENT + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/data_point_type.rb b/source/code/plugin/lib/application_insights/channel/contracts/data_point_type.rb new file mode 100644 index 000000000..f9816e4a9 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/data_point_type.rb @@ -0,0 +1,7 @@ +module ApplicationInsights::Channel::Contracts + class DataPointType + MEASUREMENT = 0 + + AGGREGATION = 1 + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/dependency_kind.rb b/source/code/plugin/lib/application_insights/channel/contracts/dependency_kind.rb new file mode 100644 index 000000000..38a441499 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/dependency_kind.rb @@ -0,0 +1,9 @@ +module ApplicationInsights::Channel::Contracts + class DependencyKind + SQL = 0 + + HTTP = 1 + + OTHER = 2 + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/dependency_source_type.rb b/source/code/plugin/lib/application_insights/channel/contracts/dependency_source_type.rb new file mode 100644 index 000000000..a68dad72b --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/dependency_source_type.rb @@ -0,0 +1,9 @@ +module ApplicationInsights::Channel::Contracts + class DependencySourceType + UNDEFINED = 0 + + AIC = 1 + + APMC = 2 + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/device.rb b/source/code/plugin/lib/application_insights/channel/contracts/device.rb new file mode 100644 index 000000000..af6855102 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/device.rb @@ -0,0 +1,18 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class Device + include JsonSerializable + + attr_accessor :id, :locale, :model, :oem_name, :os_version, :type + + attribute_mapping( + id: 'ai.device.id', + locale: 'ai.device.locale', + model: 'ai.device.model', + oem_name: 'ai.device.oemName', + os_version: 'ai.device.osVersion', + type: 'ai.device.type' + ) + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/domain.rb b/source/code/plugin/lib/application_insights/channel/contracts/domain.rb new file mode 100644 index 000000000..8a7ba880d --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/domain.rb @@ -0,0 +1,10 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class Domain + include JsonSerializable + + attribute_mapping( + ) + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/envelope.rb b/source/code/plugin/lib/application_insights/channel/contracts/envelope.rb new file mode 100644 index 000000000..b8608e388 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/envelope.rb @@ -0,0 +1,32 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class Envelope + include JsonSerializable + + attr_accessor :ver, :name, :time, :sample_rate, :seq, :i_key, :tags, :data + + attribute_mapping( + ver: 'ver', + name: 'name', + time: 'time', + sample_rate: 'sampleRate', + seq: 'seq', + i_key: 'iKey', + tags: 'tags', + data: 'data' + ) + + def ver + @ver ||= 1 + end + + def sample_rate + @sample_rate ||= 100.0 + end + + def tags + @tags ||= {} + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/event_data.rb b/source/code/plugin/lib/application_insights/channel/contracts/event_data.rb new file mode 100644 index 000000000..4bfb16124 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/event_data.rb @@ -0,0 +1,28 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class EventData + include JsonSerializable + + attr_accessor :ver, :name, :properties, :measurements + + attribute_mapping( + ver: 'ver', + name: 'name', + properties: 'properties', + measurements: 'measurements' + ) + + def ver + @ver ||= 2 + end + + def properties + @properties ||= {} + end + + def measurements + @measurements ||= {} + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/exception_data.rb b/source/code/plugin/lib/application_insights/channel/contracts/exception_data.rb new file mode 100644 index 000000000..5cffd1253 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/exception_data.rb @@ -0,0 +1,35 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class ExceptionData + include JsonSerializable + + attr_accessor :ver, :exceptions, :severity_level, :problem_id, :properties, + :measurements + + attribute_mapping( + ver: 'ver', + exceptions: 'exceptions', + severity_level: 'severityLevel', + problem_id: 'problemId', + properties: 'properties', + measurements: 'measurements' + ) + + def ver + @ver ||= 2 + end + + def exceptions + @exceptions ||= [] + end + + def properties + @properties ||= {} + end + + def measurements + @measurements ||= {} + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/exception_details.rb b/source/code/plugin/lib/application_insights/channel/contracts/exception_details.rb new file mode 100644 index 000000000..85bfc6282 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/exception_details.rb @@ -0,0 +1,28 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class ExceptionDetails + include JsonSerializable + + attr_accessor :id, :outer_id, :type_name, :message, :has_full_stack, :stack, + :parsed_stack + + attribute_mapping( + id: 'id', + outer_id: 'outerId', + type_name: 'typeName', + message: 'message', + has_full_stack: 'hasFullStack', + stack: 'stack', + parsed_stack: 'parsedStack' + ) + + def has_full_stack + @has_full_stack.nil? ? true : @has_full_stack + end + + def parsed_stack + @parsed_stack ||= [] + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/internal.rb b/source/code/plugin/lib/application_insights/channel/contracts/internal.rb new file mode 100644 index 000000000..6e8f3d300 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/internal.rb @@ -0,0 +1,15 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class Internal + include JsonSerializable + + attr_accessor :sdk_version, :agent_version, :node_name + + attribute_mapping( + sdk_version: 'ai.internal.sdkVersion', + agent_version: 'ai.internal.agentVersion', + node_name: 'ai.internal.nodeName' + ) + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/json_serializable.rb b/source/code/plugin/lib/application_insights/channel/contracts/json_serializable.rb new file mode 100644 index 000000000..8f4677044 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/json_serializable.rb @@ -0,0 +1,59 @@ +require 'json' + +module ApplicationInsights + module Channel + module Contracts + module JsonSerializable + module ClassMethods + attr_reader :json_mappings + + def attribute_mapping(mappings = {}) + @json_mappings = mappings + end + end + + def self.included(klass) + klass.extend JsonSerializable::ClassMethods + end + + def initialize(attributes = {}) + attributes.each { |k, v| send(:"#{k}=", v) } + end + + def to_h + output = {} + klass = self.class + + klass.json_mappings.each do |attr, name| + value = visit self.send(attr) + is_empty = value.respond_to?(:empty?) && value.empty? + + output[name] = value unless value.nil? || is_empty + end + + output + end + + def to_json(args = {}) + JSON.generate self.to_h, args + end + + private + + def visit(object) + return if object.nil? + + if object.is_a? Array + object.map { |e| visit e } + elsif object.is_a? Hash + Hash[object.map { |k, v| [k, visit(v)] }] + elsif object.respond_to? :to_h + object.to_h + else + object + end + end + end + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/location.rb b/source/code/plugin/lib/application_insights/channel/contracts/location.rb new file mode 100644 index 000000000..4136c869b --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/location.rb @@ -0,0 +1,13 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class Location + include JsonSerializable + + attr_accessor :ip + + attribute_mapping( + ip: 'ai.location.ip' + ) + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/message_data.rb b/source/code/plugin/lib/application_insights/channel/contracts/message_data.rb new file mode 100644 index 000000000..1340f5ba7 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/message_data.rb @@ -0,0 +1,24 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class MessageData + include JsonSerializable + + attr_accessor :ver, :message, :severity_level, :properties + + attribute_mapping( + ver: 'ver', + message: 'message', + severity_level: 'severityLevel', + properties: 'properties' + ) + + def ver + @ver ||= 2 + end + + def properties + @properties ||= {} + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/metric_data.rb b/source/code/plugin/lib/application_insights/channel/contracts/metric_data.rb new file mode 100644 index 000000000..bcb5739d6 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/metric_data.rb @@ -0,0 +1,27 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class MetricData + include JsonSerializable + + attr_accessor :ver, :metrics, :properties + + attribute_mapping( + ver: 'ver', + metrics: 'metrics', + properties: 'properties' + ) + + def ver + @ver ||= 2 + end + + def metrics + @metrics ||= [] + end + + def properties + @properties ||= {} + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/operation.rb b/source/code/plugin/lib/application_insights/channel/contracts/operation.rb new file mode 100644 index 000000000..c86dd111b --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/operation.rb @@ -0,0 +1,17 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class Operation + include JsonSerializable + + attr_accessor :id, :name, :parent_id, :synthetic_source, :correlation_vector + + attribute_mapping( + id: 'ai.operation.id', + name: 'ai.operation.name', + parent_id: 'ai.operation.parentId', + synthetic_source: 'ai.operation.syntheticSource', + correlation_vector: 'ai.operation.correlationVector' + ) + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/page_view_data.rb b/source/code/plugin/lib/application_insights/channel/contracts/page_view_data.rb new file mode 100644 index 000000000..d17dd2f79 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/page_view_data.rb @@ -0,0 +1,33 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class PageViewData + include JsonSerializable + + attr_accessor :ver, :url, :name, :duration, :id, :referrer_uri, :properties, + :measurements + + attribute_mapping( + ver: 'ver', + url: 'url', + name: 'name', + duration: 'duration', + id: 'id', + referrer_uri: 'referrerUri', + properties: 'properties', + measurements: 'measurements' + ) + + def ver + @ver ||= 2 + end + + def properties + @properties ||= {} + end + + def measurements + @measurements ||= {} + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/page_view_perf_data.rb b/source/code/plugin/lib/application_insights/channel/contracts/page_view_perf_data.rb new file mode 100644 index 000000000..adde3f3ad --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/page_view_perf_data.rb @@ -0,0 +1,39 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class PageViewPerfData + include JsonSerializable + + attr_accessor :ver, :url, :perf_total, :name, :duration, :network_connect, + :sent_request, :received_response, :id, :dom_processing, :referrer_uri, + :properties, :measurements + + attribute_mapping( + ver: 'ver', + url: 'url', + perf_total: 'perfTotal', + name: 'name', + duration: 'duration', + network_connect: 'networkConnect', + sent_request: 'sentRequest', + received_response: 'receivedResponse', + id: 'id', + dom_processing: 'domProcessing', + referrer_uri: 'referrerUri', + properties: 'properties', + measurements: 'measurements' + ) + + def ver + @ver ||= 2 + end + + def properties + @properties ||= {} + end + + def measurements + @measurements ||= {} + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/remote_dependency_data.rb b/source/code/plugin/lib/application_insights/channel/contracts/remote_dependency_data.rb new file mode 100644 index 000000000..a238841f6 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/remote_dependency_data.rb @@ -0,0 +1,40 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class RemoteDependencyData + include JsonSerializable + + attr_accessor :ver, :name, :id, :result_code, :duration, :success, :data, + :target, :type, :properties, :measurements + + attribute_mapping( + ver: 'ver', + name: 'name', + id: 'id', + result_code: 'resultCode', + duration: 'duration', + success: 'success', + data: 'data', + target: 'target', + type: 'type', + properties: 'properties', + measurements: 'measurements' + ) + + def ver + @ver ||= 2 + end + + def success + @success.nil? ? true : @success + end + + def properties + @properties ||= {} + end + + def measurements + @measurements ||= {} + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/reopenings.rb b/source/code/plugin/lib/application_insights/channel/contracts/reopenings.rb new file mode 100644 index 000000000..394bf8afb --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/reopenings.rb @@ -0,0 +1,27 @@ +module ApplicationInsights::Channel::Contracts + class ExceptionData + def handled_at + @properties["handledAt"] if @properties + end + + def handled_at=(handled_at) + if handled_at + @properties ||= {} + @properties["handledAt"] = handled_at + end + end + end + + class RequestData + def http_method + @properties["httpMethod"] if @properties + end + + def http_method=(http_method) + if http_method + @properties ||= {} + @properties["httpMethod"] = http_method + end + end + end +end \ No newline at end of file diff --git a/source/code/plugin/lib/application_insights/channel/contracts/request_data.rb b/source/code/plugin/lib/application_insights/channel/contracts/request_data.rb new file mode 100644 index 000000000..af2581c2b --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/request_data.rb @@ -0,0 +1,35 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class RequestData + include JsonSerializable + + attr_accessor :ver, :id, :source, :name, :duration, :response_code, :success, + :url, :properties, :measurements + + attribute_mapping( + ver: 'ver', + id: 'id', + source: 'source', + name: 'name', + duration: 'duration', + response_code: 'responseCode', + success: 'success', + url: 'url', + properties: 'properties', + measurements: 'measurements' + ) + + def ver + @ver ||= 2 + end + + def properties + @properties ||= {} + end + + def measurements + @measurements ||= {} + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/session.rb b/source/code/plugin/lib/application_insights/channel/contracts/session.rb new file mode 100644 index 000000000..a761c51c5 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/session.rb @@ -0,0 +1,14 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class Session + include JsonSerializable + + attr_accessor :id, :is_first + + attribute_mapping( + id: 'ai.session.id', + is_first: 'ai.session.isFirst' + ) + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/severity_level.rb b/source/code/plugin/lib/application_insights/channel/contracts/severity_level.rb new file mode 100644 index 000000000..322a00ec3 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/severity_level.rb @@ -0,0 +1,13 @@ +module ApplicationInsights::Channel::Contracts + class SeverityLevel + VERBOSE = 0 + + INFORMATION = 1 + + WARNING = 2 + + ERROR = 3 + + CRITICAL = 4 + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/stack_frame.rb b/source/code/plugin/lib/application_insights/channel/contracts/stack_frame.rb new file mode 100644 index 000000000..b4f4b9844 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/stack_frame.rb @@ -0,0 +1,17 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class StackFrame + include JsonSerializable + + attr_accessor :level, :method, :assembly, :file_name, :line + + attribute_mapping( + level: 'level', + method: 'method', + assembly: 'assembly', + file_name: 'fileName', + line: 'line' + ) + end +end diff --git a/source/code/plugin/lib/application_insights/channel/contracts/user.rb b/source/code/plugin/lib/application_insights/channel/contracts/user.rb new file mode 100644 index 000000000..a7ff8a7cf --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/contracts/user.rb @@ -0,0 +1,15 @@ +require_relative 'json_serializable' + +module ApplicationInsights::Channel::Contracts + class User + include JsonSerializable + + attr_accessor :account_id, :id, :auth_user_id + + attribute_mapping( + account_id: 'ai.user.accountId', + id: 'ai.user.id', + auth_user_id: 'ai.user.authUserId' + ) + end +end diff --git a/source/code/plugin/lib/application_insights/channel/event.rb b/source/code/plugin/lib/application_insights/channel/event.rb new file mode 100644 index 000000000..ae61064f8 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/event.rb @@ -0,0 +1,68 @@ +require_relative 'queue_base' +require 'thread' + +module ApplicationInsights + module Channel + # An event class that allows simple cross-thread signalling. + # + # An object of this type managers an internal flag that can be set to true + # via the {#set} method and reset via the {#clear} method. Calling the + # {#wait} method will block until the flag is set to true. + # + # @example + # require 'application_insights' + # require 'thread' + # event = ApplicationInsights::Channel::Event.new + # Thread.new do + # sleep 1 + # event.set + # end + # puts 'Main screen turn on.' + # result = event.wait + # puts 'All your base are belong to us.' + class Event + # Initializes a new instance of the class. + def initialize + @mutex = Mutex.new + @condition_variable = ConditionVariable.new + @signal = false + end + + # The signal value for this object. Note that the value of this property is + # not synchronized with respect to {#set} and {#clear} meaning that it + # could return false positives or negatives. + # @return [Boolean] the signal value. + attr_reader :signal + + # Sets the internal flag to true. Calling this method will also cause all + # waiting threads to awaken. + def set + @mutex.synchronize do + @signal = true + @condition_variable.broadcast + end + end + + # Sets the internal flag to false. + def clear + @mutex.synchronize do + @signal = false + end + end + + # Calling this method will block until the internal flag is set to true. + # If the flag is set to true before calling this method, we will return + # immediately. If the timeout parameter is specified, the method will + # unblock after the specified number of seconds. + # @param [Fixnum] timeout the timeout for the operation in seconds. + # @return [Boolean] the value of the internal flag on exit. + def wait(timeout=nil) + @mutex.synchronize do + @condition_variable.wait(@mutex, timeout) unless @signal + end + + @signal + end + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/queue_base.rb b/source/code/plugin/lib/application_insights/channel/queue_base.rb new file mode 100644 index 000000000..91226b17f --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/queue_base.rb @@ -0,0 +1,73 @@ +require 'thread' + +module ApplicationInsights + module Channel + # The base class for all types of queues for use in conjunction with an + # implementation of {SenderBase}. The queue will notify the sender that it + # needs to pick up items when it reaches {#max_queue_length}, or when the + # consumer calls {#flush}. + class QueueBase + # Initializes a new instance of the class. + # @param [SenderBase] sender the sender object that will be used in + # conjunction with this queue. + def initialize(sender) + @queue = Queue.new + @max_queue_length = 500 + self.sender = sender + end + + # The maximum number of items that will be held by the queue before the + # queue will call the {#flush} method. + # @return [Fixnum] the maximum queue size. (defaults to: 500) + attr_accessor :max_queue_length + + # The sender that is associated with this queue that this queue will use to + # send data to the service. + # @return [SenderBase] the sender object. + attr_reader :sender + + # Change the sender that is associated with this queue. + # @param [SenderBase] sender the sender object. + # @return [SenderBase] the sender object. + def sender=(sender) + @sender = sender + @sender.queue = self if sender + @sender + end + + # Adds the passed in item object to the queue and calls {#flush} if the + # size of the queue is larger than {#max_queue_length}. This method does + # nothing if the passed in item is nil. + # @param [Contracts::Envelope] item the telemetry envelope object to send + # to the service. + def push(item) + return unless item + + @queue.push(item) + + flush if @queue.length >= @max_queue_length + end + + # Pops a single item from the queue and returns it. If the queue is empty, + # this method will return nil. + # @return [Contracts::Envelope] a telemetry envelope object or nil if the + # queue is empty. + def pop + return @queue.pop(true) + rescue ThreadError + return nil + end + + # Flushes the current queue by notifying the {#sender}. This method needs + # to be overridden by a concrete implementations of the queue class. + def flush + end + + # Indicates whether the queue is empty. + # @return [Boolean] true if the queue is empty + def empty? + @queue.empty? + end + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/sender_base.rb b/source/code/plugin/lib/application_insights/channel/sender_base.rb new file mode 100644 index 000000000..2431bf748 --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/sender_base.rb @@ -0,0 +1,88 @@ +require 'json' +require 'net/http' +require 'openssl' +require 'stringio' +require 'zlib' +require 'logger' + +module ApplicationInsights + module Channel + # The base class for all types of senders for use in conjunction with an + # implementation of {QueueBase}. The queue will notify the sender that it + # needs to pick up items. The concrete sender implementation will listen to + # these notifications and will pull items from the queue using + # {QueueBase#pop} getting at most {#send_buffer_size} items. + # It will then call {#send} using the list of items pulled from the queue. + class SenderBase + # Initializes a new instance of the class. + # @param [String] service_endpoint_uri the address of the service to send + # telemetry data to. + def initialize(service_endpoint_uri) + @service_endpoint_uri = service_endpoint_uri + @queue = nil + @send_buffer_size = 100 + @logger = Logger.new(STDOUT) + end + + # The service endpoint URI where this sender will send data to. + # @return [String] the service endpoint URI. + attr_accessor :service_endpoint_uri + + # The queue that this sender is draining. While {SenderBase} doesn't + # implement any means of doing so, derivations of this class do. + # @return [QueueBase] the queue instance that this sender is draining. + attr_accessor :queue + + # The buffer size for a single batch of telemetry. This is the maximum number + # of items in a single service request that this sender is going to send. + # @return [Fixnum] the maximum number of items in a telemetry batch. + attr_accessor :send_buffer_size + + # The logger for the sender. + attr_accessor :logger + + # Immediately sends the data passed in to {#service_endpoint_uri}. If the + # service request fails, the passed in items are pushed back to the {#queue}. + # @param [Array] data_to_send an array of + # {Contracts::Envelope} objects to send to the service. + def send(data_to_send) + uri = URI(@service_endpoint_uri) + headers = { + 'Accept' => 'application/json', + 'Content-Type' => 'application/json; charset=utf-8', + 'Content-Encoding' => 'gzip' + } + request = Net::HTTP::Post.new(uri.path, headers) + + # Use JSON.generate instead of to_json, otherwise it will + # default to ActiveSupport::JSON.encode for Rails app + json = JSON.generate(data_to_send) + compressed_data = compress(json) + request.body = compressed_data + + http = Net::HTTP.new uri.hostname, uri.port + if uri.scheme.downcase == 'https' + http.use_ssl = true + http.verify_mode = OpenSSL::SSL::VERIFY_NONE + end + + response = http.request(request) + http.finish if http.started? + + if !response.kind_of? Net::HTTPSuccess + @logger.warn('application_insights') { "Failed to send data: #{response.message}" } + end + end + + private + + def compress(string) + wio = StringIO.new("w") + w_gz = Zlib::GzipWriter.new wio, nil, nil + w_gz.write(string) + w_gz.close + wio.string + end + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/synchronous_queue.rb b/source/code/plugin/lib/application_insights/channel/synchronous_queue.rb new file mode 100644 index 000000000..13c2281ac --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/synchronous_queue.rb @@ -0,0 +1,45 @@ +require_relative 'queue_base' + +module ApplicationInsights + module Channel + # A synchronous queue for use in conjunction with the {SynchronousSender}. + # The queue will call {SenderBase#send} when it reaches {#max_queue_length}, + # or when the consumer calls {#flush}. + # + # @example + # require 'application_insights' + # require 'thread' + # queue = ApplicationInsights::Channel::SynchronousQueue.new nil + # queue.max_queue_length = 1 + # queue.push 1 + class SynchronousQueue < QueueBase + # Initializes a new instance of the class. + # @param [SenderBase] sender the sender object that will be used in + # conjunction with this queue. + def initialize(sender) + super sender + end + + # Flushes the current queue by by calling {#sender}'s + # {SenderBase#send} method. + def flush + local_sender = @sender + return unless local_sender + + while true + # get at most send_buffer_size items and send them + data = [] + while data.length < local_sender.send_buffer_size + item = pop() + break if not item + data.push item + end + + break if data.length == 0 + + local_sender.send(data) + end + end + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/synchronous_sender.rb b/source/code/plugin/lib/application_insights/channel/synchronous_sender.rb new file mode 100644 index 000000000..ade2f086c --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/synchronous_sender.rb @@ -0,0 +1,17 @@ +require_relative 'sender_base' + +module ApplicationInsights + module Channel + # A synchronous sender that works in conjunction with the {SynchronousQueue}. + # The queue will call {#send} on the current instance with the data to send. + class SynchronousSender < SenderBase + SERVICE_ENDPOINT_URI = 'https://dc.services.visualstudio.com/v2/track' + # Initializes a new instance of the class. + # @param [String] service_endpoint_uri the address of the service to send + # telemetry data to. + def initialize(service_endpoint_uri = SERVICE_ENDPOINT_URI) + super service_endpoint_uri + end + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/telemetry_channel.rb b/source/code/plugin/lib/application_insights/channel/telemetry_channel.rb new file mode 100644 index 000000000..e026ebf7d --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/telemetry_channel.rb @@ -0,0 +1,131 @@ +require 'time' +require_relative 'asynchronous_queue' +require_relative 'asynchronous_sender' +require_relative 'telemetry_context' +require_relative 'synchronous_queue' +require_relative 'synchronous_sender' +require_relative 'contracts/envelope' +require_relative 'contracts/data' +require_relative 'contracts/internal' +require_relative '../../application_insights/version' + +module ApplicationInsights + module Channel + # The telemetry channel is responsible for constructing a + # {Contracts::Envelope} object from the passed in data and specified + # telemetry context. + # + # @example + # require 'application_insights' + # channel = ApplicationInsights::Channel::TelemetryChannel.new + # event = ApplicationInsights::Channel::Contracts::EventData.new name: 'My event' + # channel.write event + class TelemetryChannel + # Initializes a new instance of the class. + # @param [TelemetryContext] context the telemetry context to use when + # sending telemetry data. + # @param [QueueBase] queue the queue to enqueue the resulting + # {Contracts::Envelope} to. + def initialize(context=nil, queue=nil) + @context = context || TelemetryContext.new + @queue = queue || SynchronousQueue.new(SynchronousSender.new) + end + + # The context associated with this channel. All {Contracts::Envelope} + # objects created by this channel will use this value if it's present or if + # none is specified as part of the {#write} call. + # @return [TelemetryContext] the context instance + # (defaults to: TelemetryContext.new) + attr_reader :context + + # The queue associated with this channel. All {Contracts::Envelope} objects + # created by this channel will be pushed to this queue. + # @return [QueueBase] the queue instance (defaults to: SynchronousQueue.new) + attr_reader :queue + + # The sender associated with this channel. This instance will be used to + # transmit telemetry to the service. + # @return [SenderBase] the sender instance (defaults to: SynchronousSender.new) + def sender + @queue.sender + end + + # Flushes the enqueued data by calling {QueueBase#flush}. + def flush + @queue.flush + end + + # Enqueues the passed in data to the {#queue}. If the caller specifies a + # context as well, it will take precedence over the instance in {#context}. + # @param [Object] data the telemetry data to send. This will be wrapped in + # an {Contracts::Envelope} before being enqueued to the {#queue}. + # @param [TelemetryContext] context the override context to use when + # constructing the {Contracts::Envelope}. + # @param [Time|String] time the timestamp of the telemetry used to construct the + # {Contracts::Envelope}. + def write(data, context=nil, time=nil) + local_context = context || @context + raise ArgumentError, 'Context was required but not provided' unless local_context + + if time && time.is_a?(String) + local_time = time + elsif time && time.is_a?(Time) + local_time = time.iso8601(7) + else + local_time = Time.now.iso8601(7) + end + + data_type = data.class.name.gsub(/^.*::/, '') + set_properties data, local_context + data_attributes = { + :base_type => data_type, + :base_data => data + } + envelope_attributes = { + :name => 'Microsoft.ApplicationInsights.' + data_type[0..-5], + :time => local_time, + :i_key => local_context.instrumentation_key, + :tags => get_tags(local_context), + :data => Contracts::Data.new(data_attributes) + } + envelope = Contracts::Envelope.new envelope_attributes + @queue.push(envelope) + end + + private + + def get_tags(context) + hash = {} + internal_context_attributes = { + :sdk_version => 'rb:' + ApplicationInsights::VERSION + } + internal_context = Contracts::Internal.new internal_context_attributes + + [internal_context, + context.application, + context.cloud, + context.device, + context.user, + context.session, + context.location, + context.operation].each { |c| hash.merge!(c.to_h) if c } + + hash.delete_if { |k, v| v.nil? } + + hash + end + + def set_properties(data, context) + if context.properties + properties = data.properties || {} + context.properties.each do |key, value| + unless properties.key?(key) + properties[key] = value + end + end + data.properties = properties + end + end + end + end +end diff --git a/source/code/plugin/lib/application_insights/channel/telemetry_context.rb b/source/code/plugin/lib/application_insights/channel/telemetry_context.rb new file mode 100644 index 000000000..bb24af24e --- /dev/null +++ b/source/code/plugin/lib/application_insights/channel/telemetry_context.rb @@ -0,0 +1,85 @@ +require_relative 'contracts/application' +require_relative 'contracts/cloud' +require_relative 'contracts/device' +require_relative 'contracts/user' +require_relative 'contracts/session' +require_relative 'contracts/operation' +require_relative 'contracts/location' + +module ApplicationInsights + module Channel + # Represents the context for sending telemetry to the + # Application Insights service. + # + # @example + # require 'application_insights' + # context = ApplicationInsights::Channel::TelemetryContext.new + # context.instrumentation_key = '' + # context.application.id = 'My application' + # context.application.ver = '1.2.3' + # context.device.id = 'My current device' + # context.device.oem_name = 'Asus' + # context.device.model = 'X31A' + # context.device.type = "Other" + # context.user.id = 'santa@northpole.net' + class TelemetryContext + # Initializes a new instance of the class. + def initialize + @instrumentation_key = nil + @application = Contracts::Application.new + @cloud = Contracts::Cloud.new + @device = Contracts::Device.new + @user = Contracts::User.new + @session = Contracts::Session.new + @operation = Contracts::Operation.new + @location = Contracts::Location.new + @properties = {} + end + + # The instrumentation key that is used to identify which + # Application Insights application this data is for. + # @return [String] the instrumentation key. + attr_accessor :instrumentation_key + + # The application context. This contains properties of the + # application you are running. + # @return [Contracts::Application] the context object. + attr_accessor :application + + # The cloud context. This contains properties of the + # cloud role you are generating telemetry for. + # @return [Contracts::Cloud] the context object. + attr_accessor :cloud + + # The device context. This contains properties of the + # device you are running on. + # @return [Contracts::Device] the context object. + attr_accessor :device + + # The user context. This contains properties of the + # user you are generating telemetry for. + # @return [Contracts::User] the context object. + attr_accessor :user + + # The session context. This contains properties of the + # session you are generating telemetry for. + # @return [Contracts::Session] the context object. + attr_accessor :session + + # The operation context. This contains properties of the + # operation you are generating telemetry for. + # @return [Contracts::Operation] the context object. + attr_accessor :operation + + # The location context. This contains properties of the + # location you are generating telemetry from. + # @return [Contracts::Location] the context object. + attr_accessor :location + + # The property context. This contains free-form properties + # that you can add to your telemetry. + # @return [Hash] the context object. + attr_accessor :properties + end + end +end diff --git a/source/code/plugin/lib/application_insights/rack/track_request.rb b/source/code/plugin/lib/application_insights/rack/track_request.rb new file mode 100644 index 000000000..62c2b0844 --- /dev/null +++ b/source/code/plugin/lib/application_insights/rack/track_request.rb @@ -0,0 +1,154 @@ +require 'rack' +require 'securerandom' +require_relative '../channel/contracts/request_data' +require_relative '../telemetry_client' + +module ApplicationInsights + module Rack + # Track every request and sends the request data to Application Insights. + class TrackRequest + # Initializes a new instance of the class. + # @param [Object] app the inner rack application. + # @param [String] instrumentation_key to identify which Application Insights + # application this data is for. + # @param [Fixnum] buffer_size the buffer size and the buffered requests would + # send to Application Insights when buffer is full. + # @param [Fixnum] send_interval the frequency (in seconds) to check buffer + # and send buffered requests to Application Insights if any. + def initialize(app, instrumentation_key, buffer_size = 500, send_interval = 60) + @app = app + @instrumentation_key = instrumentation_key + @buffer_size = buffer_size + @send_interval = send_interval + + @sender = Channel::AsynchronousSender.new + @sender.send_interval = @send_interval + queue = Channel::AsynchronousQueue.new @sender + queue.max_queue_length = @buffer_size + @channel = Channel::TelemetryChannel.new nil, queue + + @client = TelemetryClient.new @instrumentation_key, @channel + end + + # Track requests and send data to Application Insights asynchronously. + # @param [Hash] env the rack environment. + def call(env) + # Build a request ID, incorporating one from our request if one exists. + request_id = request_id_header(env['HTTP_REQUEST_ID']) + env['ApplicationInsights.request.id'] = request_id + + start = Time.now + begin + status, headers, response = @app.call(env) + rescue Exception => ex + status = 500 + exception = ex + end + stop = Time.now + + start_time = start.iso8601(7) + duration = format_request_duration(stop - start) + success = status.to_i < 400 + + request = ::Rack::Request.new env + options = options_hash(request) + + data = request_data(request_id, start_time, duration, status, success, options) + context = telemetry_context(request_id, env['HTTP_REQUEST_ID']) + + @client.channel.write data, context, start_time + + if exception + @client.track_exception exception, handled_at: 'Unhandled' + raise exception + end + + [status, headers, response] + end + + private + + def sender=(sender) + if sender.is_a? Channel::AsynchronousSender + @sender = sender + @client.channel.queue.sender = @sender + end + end + + def client + @client + end + + def format_request_duration(duration_seconds) + if duration_seconds >= 86400 + # just return 1 day when it takes more than 1 day which should not happen for requests. + return "%02d.%02d:%02d:%02d.%07d" % [1, 0, 0, 0, 0] + end + + Time.at(duration_seconds).gmtime.strftime("00.%H:%M:%S.%7N") + end + + def request_id_header(request_id) + valid_request_id_header = valid_request_id(request_id) + + length = valid_request_id_header ? 5 : 10 + id = SecureRandom.base64(length) + + if valid_request_id_header + request_id_has_end = %w[. _].include?(request_id[-1]) + request_id << '.' unless request_id_has_end + + return "#{request_id}#{id}_" + end + + "|#{id}." + end + + def valid_request_id(request_id) + request_id && request_id[0] == '|' + end + + def operation_id(id) + # Returns the root ID from the '|' to the first '.' if any. + root_start = id[0] == '|' ? 1 : 0 + + root_end = id.index('.') + root_end = root_end ? root_end - 1 : id.length - root_start + + id[root_start..root_end] + end + + def options_hash(request) + { + name: "#{request.request_method} #{request.path}", + http_method: request.request_method, + url: request.url + } + end + + def request_data(request_id, start_time, duration, status, success, options) + Channel::Contracts::RequestData.new( + :id => request_id || 'Null', + :duration => duration || '0:00:00:00.0000000', + :response_code => status || 200, + :success => success == nil ? true : success, + :name => options[:name], + :url => options[:url], + :properties => options[:properties] || {}, + :measurements => options[:measurements] || {}, + # Must initialize http_method after properties because it's actually stored in properties + :http_method => options[:http_method] + ) + end + + def telemetry_context(request_id, request_id_header) + context = Channel::TelemetryContext.new + context.instrumentation_key = @instrumentation_key + context.operation.id = operation_id(request_id) + context.operation.parent_id = request_id_header + + context + end + end + end +end diff --git a/source/code/plugin/lib/application_insights/telemetry_client.rb b/source/code/plugin/lib/application_insights/telemetry_client.rb new file mode 100644 index 000000000..bd066ae70 --- /dev/null +++ b/source/code/plugin/lib/application_insights/telemetry_client.rb @@ -0,0 +1,232 @@ +require_relative 'channel/telemetry_context' +require_relative 'channel/telemetry_channel' +require_relative 'channel/contracts/page_view_data' +require_relative 'channel/contracts/remote_dependency_data' +require_relative 'channel/contracts/exception_data' +require_relative 'channel/contracts/exception_details' +require_relative 'channel/contracts/event_data' +require_relative 'channel/contracts/data_point' +require_relative 'channel/contracts/data_point_type' +require_relative 'channel/contracts/metric_data' +require_relative 'channel/contracts/message_data' +require_relative 'channel/contracts/stack_frame' +require_relative 'channel/contracts/request_data' +require_relative 'channel/contracts/severity_level' +require_relative 'channel/contracts/reopenings' + +module ApplicationInsights + # The telemetry client used for sending all types of telemetry. It serves as + # the main entry point for interacting with the Application Insights service. + class TelemetryClient + # Initializes a new instance of the class. + # @param [String] instrumentation_key to identify which Application Insights + # application this data is for. + # @param [Channel::TelemetryChannel] telemetry_channel the optional telemetry + # channel to be used instead of constructing a default one. + def initialize(instrumentation_key = nil, telemetry_channel = nil) + @context = Channel::TelemetryContext.new + @context.instrumentation_key = instrumentation_key + @channel = telemetry_channel || Channel::TelemetryChannel.new + end + + # The context associated with this client. All data objects created by this + # client will be accompanied by this value. + # @return [Channel::TelemetryContext] the context instance. + attr_reader :context + + # The channel associated with this telemetry client. All data created by this + # client will be passed along with the {#context} object to + # {Channel::TelemetryChannel#write} + # @return [Channel::TelemetryChannel] the channel instance. + attr_reader :channel + + # Send information about the page viewed in the application (a web page for + # instance). + # @param [String] name the name of the page that was viewed. + # @param [String] url the URL of the page that was viewed. + # @param [Hash] options the options to create the + # {Channel::Contracts::PageViewData} object. + # @option options [Fixnum] :duration the duration of the page view in + # milliseconds. (defaults to: 0) + # @option options [Hash] :properties the set of custom properties the client + # wants attached to this data item. (defaults to: {}) + # @option options [Hash] :measurements the set of custom measurements the + # client wants to attach to this data item (defaults to: {}) + def track_page_view(name, url, options={}) + data_attributes = { + :name => name || 'Null', + :url => url, + :duration => options[:duration], + :properties => options[:properties] || {}, + :measurements => options[:measurements] || {} + } + data = Channel::Contracts::PageViewData.new data_attributes + self.channel.write(data, self.context) + end + + # Send information about a single exception that occurred in the application. + # @param [Exception] exception the exception that the client wants to send. + # @param [Hash] options the options to create the + # {Channel::Contracts::ExceptionData} object. + # @option options [String] :handled_at the type of exception + # (defaults to: 'UserCode') + # @option options [Hash] :properties the set of custom properties the client + # wants attached to this data item. (defaults to: {}) + # @option options [Hash] :measurements the set of custom measurements the + # client wants to attach to this data item (defaults to: {}) + def track_exception(exception, options={}) + return unless exception.is_a? Exception + + parsed_stack = [] + if exception.backtrace + frame_pattern = /^(?.*):(?\d+)(\.|:in `((?.*)'$))/ + + exception.backtrace.each_with_index do |frame, counter| + match = frame_pattern.match frame + stack_frame = Channel::Contracts::StackFrame.new( + :assembly => 'Unknown', + :file_name => match['file'], + :level => counter, + :line => match['line'], + :method => match['method'] + ) + + parsed_stack << stack_frame + end + end + + details = Channel::Contracts::ExceptionDetails.new( + :id => 1, + :outer_id => 0, + :type_name => exception.class.name, + :message => exception.message, + :has_full_stack => exception.backtrace != nil, + :stack => (exception.backtrace.join("\n") if exception.backtrace), + :parsed_stack => parsed_stack + ) + + data = Channel::Contracts::ExceptionData.new( + :exceptions => [details], + :properties => options[:properties] || {}, + :measurements => options[:measurements] || {}, + # Must initialize handled_at after properties because it's actually stored in properties + :handled_at => options.fetch(:handled_at, 'UserCode') + ) + + self.channel.write(data, self.context) + end + + # Send information about a single event that has occurred in the context of + # the application. + # @param [String] name the data to associate to this event. + # @param [Hash] options the options to create the + # {Channel::Contracts::EventData} object. + # @option options [Hash] :properties the set of custom properties the client + # wants attached to this data item. (defaults to: {}) + # @option options [Hash] :measurements the set of custom measurements the + # client wants to attach to this data item (defaults to: {}) + def track_event(name, options={}) + data = Channel::Contracts::EventData.new( + :name => name || 'Null', + :properties => options[:properties] || {}, + :measurements => options[:measurements] || {} + ) + + self.channel.write(data, self.context) + end + + # Send information about a single metric data point that was captured for + # the application. + # @param [String] name the name of the metric that was captured. + # @param [Fixnum] value the value of the metric that was captured. + # @param [Hash] options the options to create the + # {Channel::Contracts::MetricData} object. + # @option options [Channel::Contracts::DataPointType] :type the type of the + # metric (defaults to: {Channel::Contracts::DataPointType::AGGREGATION}) + # @option options [Fixnum] :count the number of metrics that were aggregated + # into this data point (defaults to: 0) + # @option options [Fixnum] :min the minimum of all metrics collected that + # were aggregated into this data point (defaults to: 0) + # @option options [Fixnum] :max the maximum of all metrics collected that + # were aggregated into this data point (defaults to: 0) + # @option options [Fixnum] :std_dev the standard deviation of all metrics + # collected that were aggregated into this data point (defaults to: 0) + # @option options [Hash] :properties the set of custom properties the client + # wants attached to this data item. (defaults to: {}) + # @option options [Hash] :measurements the set of custom measurements the + # client wants to attach to this data item (defaults to: {}) + def track_metric(name, value, options={}) + data_point = Channel::Contracts::DataPoint.new( + :name => name || 'Null', + :value => value || 0, + :kind => options[:type] || Channel::Contracts::DataPointType::AGGREGATION, + :count => options[:count], + :min => options[:min], + :max => options[:max], + :std_dev => options[:std_dev] + ) + + data = Channel::Contracts::MetricData.new( + :metrics => [data_point], + :properties => options[:properties] || {} + ) + + self.channel.write(data, self.context) + end + + # Sends a single trace statement. + # @param [String] name the trace statement. + # @param [Channel::Contracts::SeverityLevel] severity_level the severity level. + # @param [Hash] options the options to create the + # {Channel::Contracts::EventData} object. + # @option options [Hash] :properties the set of custom properties the client + # wants attached to this data item. (defaults to: {}) + def track_trace(name, severity_level = nil, options={}) + data = Channel::Contracts::MessageData.new( + :message => name || 'Null', + :severity_level => severity_level || Channel::Contracts::SeverityLevel::INFORMATION, + :properties => options[:properties] || {} + ) + + self.channel.write(data, self.context) + end + + # Sends a single request. + # @param [String] id the unique identifier of the request. + # @param (String) start_time the start time of the request. + # @param [String] duration the duration to process the request. + # @param [String] response_code the response code of the request. + # @param [Boolean] success indicates whether the request succeeds or not. + # @param [Hash] options the options to create the + # {Channel::Contracts::RequestData} object. + # @option options [String] :name the name of the request. + # @option options [String] :http_method the http method used for the request. + # @option options [String] :url the url of the request. + # @option options [Hash] :properties the set of custom properties the client + # wants attached to this data item. (defaults to: {}) + # @option options [Hash] :measurements the set of custom measurements the + # client wants to attach to this data item (defaults to: {}) + def track_request(id, start_time, duration, response_code, success, options={}) + data = Channel::Contracts::RequestData.new( + :id => id || 'Null', + :duration => duration || '0:00:00:00.0000000', + :response_code => response_code || 200, + :success => success = nil ? true : success, + :name => options[:name], + :url => options[:url], + :properties => options[:properties] || {}, + :measurements => options[:measurements] || {}, + # Must initialize http_method after properties because it's actually stored in properties + :http_method => options[:http_method] + ) + + self.channel.write(data, self.context, start_time) + end + + # Flushes data in the queue. Data in the queue will be sent either immediately + # irrespective of what sender is being used. + def flush + self.channel.flush + end + end +end diff --git a/source/code/plugin/lib/application_insights/unhandled_exception.rb b/source/code/plugin/lib/application_insights/unhandled_exception.rb new file mode 100644 index 000000000..aa87b6f85 --- /dev/null +++ b/source/code/plugin/lib/application_insights/unhandled_exception.rb @@ -0,0 +1,49 @@ +require_relative 'telemetry_client' +require_relative 'channel/telemetry_channel' +require_relative 'channel/synchronous_queue' +require_relative 'channel/synchronous_sender' + +include ApplicationInsights + +module ApplicationInsights + module UnhandledException + @sender = nil + + # Auto collects unhandled exception and send to the Application Insights service. + # @param (string) instrumentation_key used to identify which Application + # Insights application this data is for. + # @example + # require 'application_insights' + # ApplicationInsights::UnhandledException.collect('') + # raise Exception, 'Boom!' + def self.collect(instrumentation_key) + at_exit do + # Avoid sending exception more than once if this method got invoked multiple times + send(instrumentation_key) unless @sender + end + end + + # @api private + # Send the last raised exception to the Application Insights service if + # telemetry_sender is not customized. + # @param (string) instrumentation_key used to identify which Application + # Insights application this data is for. + # @param (SenderBase) telemetry_sender used to send the last raised exception. + def self.send(instrumentation_key, telemetry_sender = nil) + if $! && !$!.is_a?(SystemExit) && !$!.is_a?(SignalException) + if telemetry_sender + @sender = telemetry_sender + elsif !@sender + # Use a synchronized sender to guarantee the data would be sent out once flush + @sender = Channel::SynchronousSender.new + end + + queue = Channel::SynchronousQueue.new @sender + channel = Channel::TelemetryChannel.new nil, queue + client = TelemetryClient.new instrumentation_key, channel + client.track_exception($!, handled_at: 'Unhandled') + client.flush + end + end + end +end diff --git a/source/code/plugin/lib/application_insights/version.rb b/source/code/plugin/lib/application_insights/version.rb new file mode 100644 index 000000000..d2d56e833 --- /dev/null +++ b/source/code/plugin/lib/application_insights/version.rb @@ -0,0 +1,3 @@ +module ApplicationInsights + VERSION = '0.5.7'.freeze +end