From cd04e84bf0e5c94d01d1562518a442ac402f1f3d Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Tue, 13 Jul 2021 10:48:25 -0600 Subject: [PATCH 01/37] Add non-test code from last serverless effort --- elasticapm/base.py | 22 ++- elasticapm/contrib/serverless/__init__.py | 41 +++++ elasticapm/contrib/serverless/aws.py | 200 ++++++++++++++++++++++ elasticapm/transport/serverless.py | 65 +++++++ 4 files changed, 327 insertions(+), 1 deletion(-) create mode 100644 elasticapm/contrib/serverless/__init__.py create mode 100644 elasticapm/contrib/serverless/aws.py create mode 100644 elasticapm/transport/serverless.py diff --git a/elasticapm/base.py b/elasticapm/base.py index 43219e04a..60b930d52 100644 --- a/elasticapm/base.py +++ b/elasticapm/base.py @@ -147,7 +147,7 @@ def __init__(self, config=None, **inline): constants.EVENTS_API_PATH, ) transport_class = import_string(self.config.transport_class) - self._transport = transport_class(self._api_endpoint_url, self, **transport_kwargs) + self._transport = transport_class(url=self._api_endpoint_url, client=self, **transport_kwargs) self.config.transport = self._transport self._thread_managers["transport"] = self._transport @@ -633,3 +633,23 @@ def set_client(client): logger = get_logger("elasticapm") logger.warning("Client object is being set more than once", stack_info=True) CLIENT_SINGLETON = client + + +class ServerlessClient(Client): + """ + Custom client for serverless applications, where we don't want any + background threads and need to dump messages to logs rather than to the + APM server directly. + """ + + def __init__(self, config=None, **inline): + inline["transport_class"] = "elasticapm.transport.serverless.ServerlessTransport" + if isinstance(config, dict) and "transport_class" in config: + config.pop("transport_class") + super(ServerlessClient, self).__init__(config, **inline) + + def start_threads(self): + """ + No background threads for serverless + """ + pass diff --git a/elasticapm/contrib/serverless/__init__.py b/elasticapm/contrib/serverless/__init__.py new file mode 100644 index 000000000..c64e5a5ee --- /dev/null +++ b/elasticapm/contrib/serverless/__init__.py @@ -0,0 +1,41 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import os + +# Future providers such as GCP and Azure will be added to this if/elif block +# This way you can use the same syntax for each of the providers from a user +# perspective +if os.environ.get("AWS_REGION"): + from elasticapm.contrib.serverless.aws import capture_serverless +else: + from elasticapm.contrib.serverless.aws import capture_serverless + +__all__ = ("capture_serverless",) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py new file mode 100644 index 000000000..6cc0d1b86 --- /dev/null +++ b/elasticapm/contrib/serverless/aws.py @@ -0,0 +1,200 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import base64 +import functools +import json +import os + +import elasticapm +from elasticapm.base import ServerlessClient +from elasticapm.conf import constants +from elasticapm.utils import compat, encoding, get_name_from_func +from elasticapm.utils.disttracing import TraceParent + + +class capture_serverless(object): + """ + Context manager and decorator designed for instrumenting serverless + functions. + + Uses a logging-only version of the transport, and no background threads. + Begins and ends a single transaction. + """ + + def __init__(self, **kwargs): + self.name = kwargs.get("name") + self.event = {} + self.context = {} + self.response = None + + if "framework_name" not in kwargs: + kwargs["framework_name"] = os.environ.get("AWS_EXECUTION_ENV", "AWS_Lambda_python") + + self.client = ServerlessClient(**kwargs) + if not self.client.config.debug and self.client.config.instrument: + elasticapm.instrument() + + def __call__(self, func): + self.name = self.name or get_name_from_func(func) + + @functools.wraps(func) + def decorated(*args, **kwds): + if len(args) == 2: + # Saving these for request context later + self.event, self.context = args + else: + self.event, self.context = {}, {} + if not self.client.config.debug and self.client.config.instrument: + with self: + self.response = func(*args, **kwds) + return self.response + else: + return func(*args, **kwds) + + return decorated + + def __enter__(self): + """ + Transaction setup + """ + trace_parent = TraceParent.from_headers(self.event.get("headers", {})) + if "httpMethod" in self.event: + self.transaction = self.client.begin_transaction("request", trace_parent=trace_parent) + elasticapm.set_context( + lambda: get_data_from_request( + self.event, + capture_body=self.client.config.capture_body in ("transactions", "all"), + capture_headers=self.client.config.capture_headers, + ), + "request", + ) + if os.environ.get("AWS_LAMBDA_FUNCTION_NAME"): + elasticapm.set_transaction_name( + "{} {}".format(self.event["httpMethod"], os.environ["AWS_LAMBDA_FUNCTION_NAME"]) + ) + else: + elasticapm.set_transaction_name(self.name, override=False) + else: + self.transaction = self.client.begin_transaction("function", trace_parent=trace_parent) + elasticapm.set_transaction_name(os.environ.get("AWS_LAMBDA_FUNCTION_NAME", self.name), override=False) + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Transaction teardown + """ + if exc_val: + self.client.capture_exception(exc_info=(exc_type, exc_val, exc_tb), handled=False) + + if self.response and isinstance(self.response, dict): + elasticapm.set_context( + lambda: get_data_from_response(self.response, capture_headers=self.client.config.capture_headers), + "response", + ) + if "statusCode" in self.response: + result = "HTTP {}xx".format(int(self.response["statusCode"]) // 100) + elasticapm.set_transaction_result(result, override=False) + self.client.end_transaction() + + +def get_data_from_request(event, capture_body=False, capture_headers=True): + """ + Capture context data from API gateway event + """ + result = {} + if capture_headers and "headers" in event: + result["headers"] = event["headers"] + if "httpMethod" not in event: + # Not API Gateway + return result + + result["method"] = event["httpMethod"] + if event["httpMethod"] in constants.HTTP_WITH_BODY and "body" in event: + body = event["body"] + if capture_body: + if event.get("isBase64Encoded"): + body = base64.b64decode(body) + else: + try: + jsonbody = json.loads(body) + body = jsonbody + except Exception: + pass + + if body is not None: + result["body"] = body if capture_body else "[REDACTED]" + + result["url"] = get_url_dict(event) + return result + + +def get_data_from_response(response, capture_headers=True): + """ + Capture response data from lambda return + """ + result = {} + + if "statusCode" in response: + result["status_code"] = response["statusCode"] + + if capture_headers and "headers" in response: + result["headers"] = response["headers"] + return result + + +def get_url_dict(event): + """ + Reconstruct URL from API Gateway + """ + headers = event.get("headers", {}) + proto = headers.get("X-Forwarded-Proto", "https") + host = headers.get("Host", "") + path = event.get("path", "") + port = headers.get("X-Forwarded-Port") + stage = "/" + event.get("requestContext", {}).get("stage", "") + query = "" + if event.get("queryStringParameters"): + query = "?" + for k, v in compat.iteritems(event["queryStringParameters"]): + query += "{}={}".format(k, v) + url = proto + "://" + host + stage + path + query + + url_dict = { + "full": encoding.keyword_field(url), + "protocol": proto, + "hostname": encoding.keyword_field(host), + "pathname": encoding.keyword_field(stage + path), + } + + if port: + url_dict["port"] = port + if query: + url_dict["search"] = encoding.keyword_field(query) + return url_dict diff --git a/elasticapm/transport/serverless.py b/elasticapm/transport/serverless.py new file mode 100644 index 000000000..10bb9ec54 --- /dev/null +++ b/elasticapm/transport/serverless.py @@ -0,0 +1,65 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAG + +from __future__ import print_function + +import json + +from elasticapm.transport.base import Transport + + +class ServerlessTransport(Transport): + """ + Transport class for use in serverless environments. + No background threads are used, and the queue() function is overridden + such that it will log the JSON object for the event rather than sending + it anywhere. + """ + + def __init__(self, *args, **kwargs): + super(ServerlessTransport, self).__init__(*args, **kwargs) + self.printed_metadata = False + + def start_thread(self): + """ + No background threads are needed, as we have no queueing needs + """ + pass + + def queue(self, event_type, data, flush=False): + """ + Rather than queueing the data, just dump it to a log immediately + """ + # This does use processors right now, and we're not in a background + # thread. We could cause blocking. + if not self.printed_metadata: + self.printed_metadata = True + print("ELASTICAPM_METADATA " + json.dumps(self._metadata)) + print("ELASTICAPM " + json.dumps(self._process_event(event_type, data))) From b59194e63261ea480530e71d55830044ce2d0618 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Tue, 13 Jul 2021 11:40:59 -0600 Subject: [PATCH 02/37] Fix lambda support to send to APM Server directly * Add disable_metrics_thread conf * Only use the transport background thread * Singleton client for the whole lambda file * Flush after transaction --- elasticapm/base.py | 23 +--------- elasticapm/conf/__init__.py | 1 + elasticapm/contrib/serverless/aws.py | 29 +++++++++---- elasticapm/transport/serverless.py | 65 ---------------------------- 4 files changed, 24 insertions(+), 94 deletions(-) delete mode 100644 elasticapm/transport/serverless.py diff --git a/elasticapm/base.py b/elasticapm/base.py index 60b930d52..8b45118a7 100644 --- a/elasticapm/base.py +++ b/elasticapm/base.py @@ -197,7 +197,8 @@ def __init__(self, config=None, **inline): self._metrics.register("elasticapm.metrics.sets.breakdown.BreakdownMetricSet") if self.config.prometheus_metrics: self._metrics.register("elasticapm.metrics.sets.prometheus.PrometheusMetrics") - self._thread_managers["metrics"] = self._metrics + if self.config.disable_metrics_thread: + self._thread_managers["metrics"] = self._metrics compat.atexit_register(self.close) if self.config.central_config: self._thread_managers["config"] = self.config @@ -633,23 +634,3 @@ def set_client(client): logger = get_logger("elasticapm") logger.warning("Client object is being set more than once", stack_info=True) CLIENT_SINGLETON = client - - -class ServerlessClient(Client): - """ - Custom client for serverless applications, where we don't want any - background threads and need to dump messages to logs rather than to the - APM server directly. - """ - - def __init__(self, config=None, **inline): - inline["transport_class"] = "elasticapm.transport.serverless.ServerlessTransport" - if isinstance(config, dict) and "transport_class" in config: - config.pop("transport_class") - super(ServerlessClient, self).__init__(config, **inline) - - def start_threads(self): - """ - No background threads for serverless - """ - pass diff --git a/elasticapm/conf/__init__.py b/elasticapm/conf/__init__.py index 433368ccb..d00f8d60c 100644 --- a/elasticapm/conf/__init__.py +++ b/elasticapm/conf/__init__.py @@ -563,6 +563,7 @@ class Config(_ConfigBase): prometheus_metrics = _BoolConfigValue("PROMETHEUS_METRICS", default=False) prometheus_metrics_prefix = _ConfigValue("PROMETHEUS_METRICS_PREFIX", default="prometheus.metrics.") disable_metrics = _ListConfigValue("DISABLE_METRICS", type=starmatch_to_regex, default=[]) + disable_metrics_thread = _BoolConfigValue("DISABLE_METRICS_THREAD", default=False) central_config = _BoolConfigValue("CENTRAL_CONFIG", default=True) api_request_size = _ConfigValue("API_REQUEST_SIZE", type=int, validators=[size_validator], default=768 * 1024) api_request_time = _ConfigValue("API_REQUEST_TIME", type=int, validators=[duration_validator], default=10 * 1000) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 6cc0d1b86..75c675f30 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -34,10 +34,13 @@ import os import elasticapm -from elasticapm.base import ServerlessClient +from elasticapm.base import Client, get_client from elasticapm.conf import constants from elasticapm.utils import compat, encoding, get_name_from_func from elasticapm.utils.disttracing import TraceParent +from elasticapm.utils.logging import get_logger + +logger = get_logger("elasticapm.serverless") class capture_serverless(object): @@ -45,21 +48,27 @@ class capture_serverless(object): Context manager and decorator designed for instrumenting serverless functions. - Uses a logging-only version of the transport, and no background threads. - Begins and ends a single transaction. + Begins and ends a single transaction, waiting for the transport to flush + before returning from the wrapped function """ - def __init__(self, **kwargs): - self.name = kwargs.get("name") + def __init__(self, name=None, **kwargs): + self.name = name self.event = {} self.context = {} self.response = None + # Disable all background threads except for transport + kwargs["disable_metrics_thread"] = True + kwargs["central_config"] = False + if "framework_name" not in kwargs: kwargs["framework_name"] = os.environ.get("AWS_EXECUTION_ENV", "AWS_Lambda_python") - self.client = ServerlessClient(**kwargs) - if not self.client.config.debug and self.client.config.instrument: + self.client = get_client() + if not self.client: + self.client = Client(**kwargs) + if not self.client.config.debug and self.client.config.instrument and self.client.config.enabled: elasticapm.instrument() def __call__(self, func): @@ -72,7 +81,7 @@ def decorated(*args, **kwds): self.event, self.context = args else: self.event, self.context = {}, {} - if not self.client.config.debug and self.client.config.instrument: + if not self.client.config.debug and self.client.config.instrument and self.client.config.enabled: with self: self.response = func(*args, **kwds) return self.response @@ -122,6 +131,10 @@ def __exit__(self, exc_type, exc_val, exc_tb): result = "HTTP {}xx".format(int(self.response["statusCode"]) // 100) elasticapm.set_transaction_result(result, override=False) self.client.end_transaction() + try: + self.client._transport.flush() + except ValueError: + logger.warning("flush timed out") def get_data_from_request(event, capture_body=False, capture_headers=True): diff --git a/elasticapm/transport/serverless.py b/elasticapm/transport/serverless.py deleted file mode 100644 index 10bb9ec54..000000000 --- a/elasticapm/transport/serverless.py +++ /dev/null @@ -1,65 +0,0 @@ -# BSD 3-Clause License -# -# Copyright (c) 2019, Elasticsearch BV -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# * Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# * Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# * Neither the name of the copyright holder nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAG - -from __future__ import print_function - -import json - -from elasticapm.transport.base import Transport - - -class ServerlessTransport(Transport): - """ - Transport class for use in serverless environments. - No background threads are used, and the queue() function is overridden - such that it will log the JSON object for the event rather than sending - it anywhere. - """ - - def __init__(self, *args, **kwargs): - super(ServerlessTransport, self).__init__(*args, **kwargs) - self.printed_metadata = False - - def start_thread(self): - """ - No background threads are needed, as we have no queueing needs - """ - pass - - def queue(self, event_type, data, flush=False): - """ - Rather than queueing the data, just dump it to a log immediately - """ - # This does use processors right now, and we're not in a background - # thread. We could cause blocking. - if not self.printed_metadata: - self.printed_metadata = True - print("ELASTICAPM_METADATA " + json.dumps(self._metadata)) - print("ELASTICAPM " + json.dumps(self._process_event(event_type, data))) From ed5b6e473b88953e1b7e73455714fe67c1da1206 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Wed, 14 Jul 2021 09:09:54 -0600 Subject: [PATCH 03/37] Bad boolean --- elasticapm/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elasticapm/base.py b/elasticapm/base.py index 8b45118a7..06702006f 100644 --- a/elasticapm/base.py +++ b/elasticapm/base.py @@ -197,7 +197,7 @@ def __init__(self, config=None, **inline): self._metrics.register("elasticapm.metrics.sets.breakdown.BreakdownMetricSet") if self.config.prometheus_metrics: self._metrics.register("elasticapm.metrics.sets.prometheus.PrometheusMetrics") - if self.config.disable_metrics_thread: + if not self.config.disable_metrics_thread: self._thread_managers["metrics"] = self._metrics compat.atexit_register(self.close) if self.config.central_config: From 2c013f540fdb170fad1102c4a4c175922cc59ceb Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Thu, 15 Jul 2021 10:52:13 -0600 Subject: [PATCH 04/37] Don't collect cloud metadata --- elasticapm/contrib/serverless/aws.py | 1 + 1 file changed, 1 insertion(+) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 75c675f30..7ee48f581 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -61,6 +61,7 @@ def __init__(self, name=None, **kwargs): # Disable all background threads except for transport kwargs["disable_metrics_thread"] = True kwargs["central_config"] = False + kwargs["cloud_provider"] = "none" if "framework_name" not in kwargs: kwargs["framework_name"] = os.environ.get("AWS_EXECUTION_ENV", "AWS_Lambda_python") From 5f54f41e176b0b9fddc2ecbbb44d3a5492c813b1 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Thu, 15 Jul 2021 11:34:36 -0600 Subject: [PATCH 05/37] Start transaction from api gateway timestamp --- elasticapm/contrib/serverless/aws.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 7ee48f581..89d04e42b 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -42,6 +42,8 @@ logger = get_logger("elasticapm.serverless") +COLD_START = True + class capture_serverless(object): """ @@ -84,7 +86,8 @@ def decorated(*args, **kwds): self.event, self.context = {}, {} if not self.client.config.debug and self.client.config.instrument and self.client.config.enabled: with self: - self.response = func(*args, **kwds) + with elasticapm.capture_span(self.name): + self.response = func(*args, **kwds) return self.response else: return func(*args, **kwds) @@ -97,7 +100,15 @@ def __enter__(self): """ trace_parent = TraceParent.from_headers(self.event.get("headers", {})) if "httpMethod" in self.event: - self.transaction = self.client.begin_transaction("request", trace_parent=trace_parent) + global COLD_START + if COLD_START: + # TODO + COLD_START = False + start_time = self.event.get("requestContext", {}).get("requestTimeEpoch") + if start_time: + self.transaction = self.client.begin_transaction("request", trace_parent=trace_parent, start=start_time) + else: + self.transaction = self.client.begin_transaction("request", trace_parent=trace_parent) elasticapm.set_context( lambda: get_data_from_request( self.event, From 7f26df59d189f244d9aafe8f38995a5dc9758ccb Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Thu, 15 Jul 2021 11:44:15 -0600 Subject: [PATCH 06/37] start= needs duration= at the end --- elasticapm/contrib/serverless/aws.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 89d04e42b..88ed815eb 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -32,6 +32,7 @@ import functools import json import os +import time import elasticapm from elasticapm.base import Client, get_client @@ -104,9 +105,11 @@ def __enter__(self): if COLD_START: # TODO COLD_START = False - start_time = self.event.get("requestContext", {}).get("requestTimeEpoch") - if start_time: - self.transaction = self.client.begin_transaction("request", trace_parent=trace_parent, start=start_time) + self.start_time = self.event.get("requestContext", {}).get("requestTimeEpoch") + if self.start_time: + self.transaction = self.client.begin_transaction( + "request", trace_parent=trace_parent, start=self.start_time + ) else: self.transaction = self.client.begin_transaction("request", trace_parent=trace_parent) elasticapm.set_context( @@ -142,7 +145,12 @@ def __exit__(self, exc_type, exc_val, exc_tb): if "statusCode" in self.response: result = "HTTP {}xx".format(int(self.response["statusCode"]) // 100) elasticapm.set_transaction_result(result, override=False) - self.client.end_transaction() + + if self.start_time: + self.client.end_transaction(duration=time.time() - self.start_time) + else: + self.client.end_transaction() + try: self.client._transport.flush() except ValueError: From f97d2f2308ba64d9957c0f91e4cd362617bb3dd3 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Fri, 16 Jul 2021 10:37:38 -0600 Subject: [PATCH 07/37] start_time is in milliseconds --- elasticapm/contrib/serverless/aws.py | 1 + 1 file changed, 1 insertion(+) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 88ed815eb..e3d3d0b1a 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -107,6 +107,7 @@ def __enter__(self): COLD_START = False self.start_time = self.event.get("requestContext", {}).get("requestTimeEpoch") if self.start_time: + self.start_time = float(self.start_time) * 0.001 self.transaction = self.client.begin_transaction( "request", trace_parent=trace_parent, start=self.start_time ) From 84ead98b7ee88a5699db57ba06fa002e303c6231 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Fri, 16 Jul 2021 10:55:56 -0600 Subject: [PATCH 08/37] Allow for start_time without duration for transactions --- elasticapm/contrib/serverless/aws.py | 6 +----- elasticapm/traces.py | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index e3d3d0b1a..1f91a4d6a 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -32,7 +32,6 @@ import functools import json import os -import time import elasticapm from elasticapm.base import Client, get_client @@ -147,10 +146,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): result = "HTTP {}xx".format(int(self.response["statusCode"]) // 100) elasticapm.set_transaction_result(result, override=False) - if self.start_time: - self.client.end_transaction(duration=time.time() - self.start_time) - else: - self.client.end_transaction() + self.client.end_transaction() try: self.client._transport.flush() diff --git a/elasticapm/traces.py b/elasticapm/traces.py index 8cce0eee8..fcab09000 100644 --- a/elasticapm/traces.py +++ b/elasticapm/traces.py @@ -132,7 +132,7 @@ def __init__( self.id = self.get_dist_tracing_id() self.trace_parent = trace_parent if start: - self.timestamp = self.start_time = start + self.timestamp, self.start_time = start, _time_func() else: self.timestamp, self.start_time = time.time(), _time_func() self.name = None From b5ea23bae0fb9c9a48eec2328e3dfdbe33c8fe4f Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Fri, 16 Jul 2021 10:59:27 -0600 Subject: [PATCH 09/37] Revert "Allow for start_time without duration for transactions" This reverts commit 84ead98b7ee88a5699db57ba06fa002e303c6231. --- elasticapm/contrib/serverless/aws.py | 6 +++++- elasticapm/traces.py | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 1f91a4d6a..e3d3d0b1a 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -32,6 +32,7 @@ import functools import json import os +import time import elasticapm from elasticapm.base import Client, get_client @@ -146,7 +147,10 @@ def __exit__(self, exc_type, exc_val, exc_tb): result = "HTTP {}xx".format(int(self.response["statusCode"]) // 100) elasticapm.set_transaction_result(result, override=False) - self.client.end_transaction() + if self.start_time: + self.client.end_transaction(duration=time.time() - self.start_time) + else: + self.client.end_transaction() try: self.client._transport.flush() diff --git a/elasticapm/traces.py b/elasticapm/traces.py index fcab09000..8cce0eee8 100644 --- a/elasticapm/traces.py +++ b/elasticapm/traces.py @@ -132,7 +132,7 @@ def __init__( self.id = self.get_dist_tracing_id() self.trace_parent = trace_parent if start: - self.timestamp, self.start_time = start, _time_func() + self.timestamp = self.start_time = start else: self.timestamp, self.start_time = time.time(), _time_func() self.name = None From ba663e11c450bc607d7237e274f7ac494175144f Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Mon, 19 Jul 2021 11:34:02 -0600 Subject: [PATCH 10/37] Add elasticapm.utils.time.time_to_perf_counter() --- elasticapm/contrib/serverless/aws.py | 6 +-- elasticapm/traces.py | 21 +++++++- elasticapm/utils/time.py | 81 ++++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 6 deletions(-) create mode 100644 elasticapm/utils/time.py diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index e3d3d0b1a..1f91a4d6a 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -32,7 +32,6 @@ import functools import json import os -import time import elasticapm from elasticapm.base import Client, get_client @@ -147,10 +146,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): result = "HTTP {}xx".format(int(self.response["statusCode"]) // 100) elasticapm.set_transaction_result(result, override=False) - if self.start_time: - self.client.end_transaction(duration=time.time() - self.start_time) - else: - self.client.end_transaction() + self.client.end_transaction() try: self.client._transport.flush() diff --git a/elasticapm/traces.py b/elasticapm/traces.py index 8cce0eee8..10c5e0a1a 100644 --- a/elasticapm/traces.py +++ b/elasticapm/traces.py @@ -43,6 +43,7 @@ from elasticapm.utils import compat, encoding, get_name_from_func from elasticapm.utils.disttracing import TraceParent, TracingOptions from elasticapm.utils.logging import get_logger +from elasticapm.utils.time import time_to_perf_counter __all__ = ("capture_span", "label", "set_transaction_name", "set_custom_context", "set_user_context") @@ -129,10 +130,28 @@ class Transaction(BaseSpan): def __init__( self, tracer, transaction_type="custom", trace_parent=None, is_sampled=True, start=None, sample_rate=None ): + """ + tracer + Tracer object + transaction_type + Transaction type + trace_parent + TraceParent object representing the parent trace and trace state + is_sampled + Whether or not this transaction is sampled + start + Optional start timestamp. This is expected to be an epoch timestamp + in seconds (such as from `time.time()`). If it is not, it's recommended + that a `duration` is passed into the `end()` method. + sample_rate + Sample rate which was used to decide whether to sample this transaction. + This is reported to the APM server so that unsampled transactions can + be extrapolated. + """ self.id = self.get_dist_tracing_id() self.trace_parent = trace_parent if start: - self.timestamp = self.start_time = start + self.timestamp, self.start_time = start, time_to_perf_counter(start) else: self.timestamp, self.start_time = time.time(), _time_func() self.name = None diff --git a/elasticapm/utils/time.py b/elasticapm/utils/time.py new file mode 100644 index 000000000..fdfdfeffb --- /dev/null +++ b/elasticapm/utils/time.py @@ -0,0 +1,81 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import sys +import time + +CLOCK_DIFF = None + + +def time_to_perf_counter(timestamp): + """ + This function converts a given epoch timestamp in seconds (typically from + `time.time()`) to the "equivalent" result from `time.perf_counter()`. + + Note that because these functions vary in their resolution and tick rate, + this is only a close approximation. + + Note also that because `time.time()` is *usually* monotonic (but not + guaranteed), if a system clock is changed, this function could become + very inaccurate. + """ + global CLOCK_DIFF + if CLOCK_DIFF is None: + _calculate_clock_diff() + + return timestamp + CLOCK_DIFF + + +def _calculate_clock_diff(): + """ + Calculate the difference between `time.perf_counter()` and `time.time()` + + Uses multiple measurements to try to minimize the tolerance in the + measurements. + + The resulting CLOCK_DIFF can be added to any `time.time()` result to get the + approximate equivalent `time.perf_counter()` + """ + global CLOCK_DIFF + best_tolerance = sys.float_info.max + for _ in range(10): + time1 = time.time() + perftime = time.perf_counter() + time2 = time.time() + + tolerance = (time2 - time1) / 2 + timetime = time1 + tolerance + + if tolerance < best_tolerance: + best_tolerance = tolerance + CLOCK_DIFF = perftime - timetime + + if tolerance < 0.00001: # try to get the two time.time() calls within 20 microseconds + break From c72160a4713a300fca307c75e0c5cf3956835140 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Wed, 21 Jul 2021 11:25:57 -0600 Subject: [PATCH 11/37] Add default context fields plus faas context for API gateway --- elasticapm/contrib/serverless/aws.py | 59 +++++++++++++++++++++++----- 1 file changed, 50 insertions(+), 9 deletions(-) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 1f91a4d6a..225b47677 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -64,9 +64,7 @@ def __init__(self, name=None, **kwargs): kwargs["disable_metrics_thread"] = True kwargs["central_config"] = False kwargs["cloud_provider"] = "none" - - if "framework_name" not in kwargs: - kwargs["framework_name"] = os.environ.get("AWS_EXECUTION_ENV", "AWS_Lambda_python") + kwargs["framework_name"] = "AWS Lambda" self.client = get_client() if not self.client: @@ -99,12 +97,13 @@ def __enter__(self): Transaction setup """ trace_parent = TraceParent.from_headers(self.event.get("headers", {})) + + global COLD_START + cold_start = COLD_START + COLD_START = False + if "httpMethod" in self.event: - global COLD_START - if COLD_START: - # TODO - COLD_START = False - self.start_time = self.event.get("requestContext", {}).get("requestTimeEpoch") + self.start_time = self.event["requestContext"].get("requestTimeEpoch") if self.start_time: self.start_time = float(self.start_time) * 0.001 self.transaction = self.client.begin_transaction( @@ -127,9 +126,22 @@ def __enter__(self): else: elasticapm.set_transaction_name(self.name, override=False) else: - self.transaction = self.client.begin_transaction("function", trace_parent=trace_parent) + self.transaction = self.client.begin_transaction("request", trace_parent=trace_parent) elasticapm.set_transaction_name(os.environ.get("AWS_LAMBDA_FUNCTION_NAME", self.name), override=False) + elasticapm.set_context( + lambda: get_faas_data( + self.event, + self.context, + cold_start, + ), + "faas", + ) + elasticapm.set_context({"runtime": {"name": os.environ.get("AWS_EXECUTION_ENV")}}, "service") + elasticapm.set_context( + {"provider": "aws", "region": os.environ.get("AWS_REGION"), "service": {"name": "lambda"}}, "cloud" + ) + def __exit__(self, exc_type, exc_val, exc_tb): """ Transaction teardown @@ -228,3 +240,32 @@ def get_url_dict(event): if query: url_dict["search"] = encoding.keyword_field(query) return url_dict + + +def get_faas_data(event, context, coldstart): + """ + Compile the faas context using the event and context + """ + faas = {} + faas["coldstart"] = coldstart + faas["id"] = context.invokedFunctionArn # TODO remove alias suffix + faas["execution"] = context.awsRequestId + faas["name"] = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") + faas["version"] = os.environ.get("AWS_LAMBDA_FUNCTION_VERSION") + faas["instance"] = {"id": os.environ.get("AWS_LAMBDA_LOG_STREAM_NAME")} # TODO double check in final spec + + faas["trigger"] = {} + faas["trigger"]["type"] = "other" + + # Trigger type + if "httpMethod" in event: + faas["trigger"]["type"] = "http" + faas["trigger"]["id"] = event["requestContext"]["apiId"] + faas["trigger"]["name"] = "{} {}/{}".format( + event["httpMethod"], event["requestContext"]["resourcePath"], event["requestContext"]["stage"] + ) + faas["trigger"]["account"] = {"id": event["requestContext"]["accountId"]} + faas["trigger"]["version"] = "2.0" if event["requestContext"].get("requestTimeEpoch") else "1.0" + faas["trigger"]["request_id"] = event["requestContext"]["requestId"] + + return faas From b4a088599a6e54640e10a806ee5121a8ca9df415 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Wed, 21 Jul 2021 11:37:23 -0600 Subject: [PATCH 12/37] Fix context references --- elasticapm/contrib/serverless/aws.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 225b47677..25a25b49a 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -248,8 +248,8 @@ def get_faas_data(event, context, coldstart): """ faas = {} faas["coldstart"] = coldstart - faas["id"] = context.invokedFunctionArn # TODO remove alias suffix - faas["execution"] = context.awsRequestId + faas["id"] = context.invoked_function_arn # TODO remove alias suffix + faas["execution"] = context.aws_request_id faas["name"] = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") faas["version"] = os.environ.get("AWS_LAMBDA_FUNCTION_VERSION") faas["instance"] = {"id": os.environ.get("AWS_LAMBDA_LOG_STREAM_NAME")} # TODO double check in final spec @@ -267,5 +267,6 @@ def get_faas_data(event, context, coldstart): faas["trigger"]["account"] = {"id": event["requestContext"]["accountId"]} faas["trigger"]["version"] = "2.0" if event["requestContext"].get("requestTimeEpoch") else "1.0" faas["trigger"]["request_id"] = event["requestContext"]["requestId"] + # TODO sns/sqs/s3 return faas From 84c53fd0a35e5085cd8b1829dec8dcfcbf3ed72e Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Thu, 22 Jul 2021 10:26:05 -0600 Subject: [PATCH 13/37] Use metrics_interval=0 for disabling metrics thread --- .pre-commit-config.yaml | 2 +- elasticapm/__init__.py | 1 + elasticapm/base.py | 2 +- elasticapm/conf/__init__.py | 1 - elasticapm/contrib/serverless/aws.py | 2 +- 5 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8fac11f5b..3978efede 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -19,6 +19,6 @@ repos: name: License header check description: Checks the existance of license headers in all Python files entry: ./tests/scripts/license_headers_check.sh - exclude: "(elasticapm/utils/wrapt/.*|tests/utils/stacks/linenos.py|tests/utils/stacks/linenos2.py)" + exclude: "(elasticapm/utils/wrapt/.*|tests/utils/stacks/linenos.py|tests/utils/stacks/linenos2.py|tests/contrib/serverless/.*json)" language: script types: [python] diff --git a/elasticapm/__init__.py b/elasticapm/__init__.py index 42bbca423..d621a651b 100644 --- a/elasticapm/__init__.py +++ b/elasticapm/__init__.py @@ -31,6 +31,7 @@ from elasticapm.base import Client, get_client # noqa: F401 from elasticapm.conf import setup_logging # noqa: F401 +from elasticapm.contrib.serverless import capture_serverless # noqa: F401 from elasticapm.instrumentation.control import instrument, uninstrument # noqa: F401 from elasticapm.traces import ( # noqa: F401 capture_span, diff --git a/elasticapm/base.py b/elasticapm/base.py index 06702006f..ec1221345 100644 --- a/elasticapm/base.py +++ b/elasticapm/base.py @@ -197,7 +197,7 @@ def __init__(self, config=None, **inline): self._metrics.register("elasticapm.metrics.sets.breakdown.BreakdownMetricSet") if self.config.prometheus_metrics: self._metrics.register("elasticapm.metrics.sets.prometheus.PrometheusMetrics") - if not self.config.disable_metrics_thread: + if self.config.metrics_interval: self._thread_managers["metrics"] = self._metrics compat.atexit_register(self.close) if self.config.central_config: diff --git a/elasticapm/conf/__init__.py b/elasticapm/conf/__init__.py index d00f8d60c..433368ccb 100644 --- a/elasticapm/conf/__init__.py +++ b/elasticapm/conf/__init__.py @@ -563,7 +563,6 @@ class Config(_ConfigBase): prometheus_metrics = _BoolConfigValue("PROMETHEUS_METRICS", default=False) prometheus_metrics_prefix = _ConfigValue("PROMETHEUS_METRICS_PREFIX", default="prometheus.metrics.") disable_metrics = _ListConfigValue("DISABLE_METRICS", type=starmatch_to_regex, default=[]) - disable_metrics_thread = _BoolConfigValue("DISABLE_METRICS_THREAD", default=False) central_config = _BoolConfigValue("CENTRAL_CONFIG", default=True) api_request_size = _ConfigValue("API_REQUEST_SIZE", type=int, validators=[size_validator], default=768 * 1024) api_request_time = _ConfigValue("API_REQUEST_TIME", type=int, validators=[duration_validator], default=10 * 1000) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 25a25b49a..20db1e135 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -61,7 +61,7 @@ def __init__(self, name=None, **kwargs): self.response = None # Disable all background threads except for transport - kwargs["disable_metrics_thread"] = True + kwargs["metrics_interval"] = 0 kwargs["central_config"] = False kwargs["cloud_provider"] = "none" kwargs["framework_name"] = "AWS Lambda" From 6111989e67578d9fdc92a956705c000aef1fa2b9 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Thu, 22 Jul 2021 10:41:53 -0600 Subject: [PATCH 14/37] Add unit test for aws serverless from API gateway --- elasticapm/contrib/serverless/aws.py | 2 + tests/contrib/serverless/__init__.py | 29 +++++ tests/contrib/serverless/aws_test_data.json | 117 ++++++++++++++++++++ tests/contrib/serverless/aws_tests.py | 116 +++++++++++++++++++ 4 files changed, 264 insertions(+) create mode 100644 tests/contrib/serverless/__init__.py create mode 100644 tests/contrib/serverless/aws_test_data.json create mode 100644 tests/contrib/serverless/aws_tests.py diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 20db1e135..ef14fbd30 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -65,6 +65,8 @@ def __init__(self, name=None, **kwargs): kwargs["central_config"] = False kwargs["cloud_provider"] = "none" kwargs["framework_name"] = "AWS Lambda" + if "service_name" not in kwargs: + kwargs["service_name"] = os.environ["AWS_LAMBDA_FUNCTION_NAME"] self.client = get_client() if not self.client: diff --git a/tests/contrib/serverless/__init__.py b/tests/contrib/serverless/__init__.py new file mode 100644 index 000000000..7e2b340e6 --- /dev/null +++ b/tests/contrib/serverless/__init__.py @@ -0,0 +1,29 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/tests/contrib/serverless/aws_test_data.json b/tests/contrib/serverless/aws_test_data.json new file mode 100644 index 000000000..f66f82491 --- /dev/null +++ b/tests/contrib/serverless/aws_test_data.json @@ -0,0 +1,117 @@ +{ + "resource": "/fetch_all", + "path": "/fetch_all", + "httpMethod": "GET", + "headers": { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", + "Accept-Encoding": "gzip, deflate, br", + "Accept-Language": "en-US,en;q=0.5", + "CloudFront-Forwarded-Proto": "https", + "CloudFront-Is-Desktop-Viewer": "true", + "CloudFront-Is-Mobile-Viewer": "false", + "CloudFront-Is-SmartTV-Viewer": "false", + "CloudFront-Is-Tablet-Viewer": "false", + "CloudFront-Viewer-Country": "US", + "Host": "02plqthge2.execute-api.us-east-1.amazonaws.com", + "upgrade-insecure-requests": "1", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:72.0) Gecko/20100101 Firefox/72.0", + "Via": "2.0 969f35f01b6eddd92239a3e818fc1e0d.cloudfront.net (CloudFront)", + "X-Amz-Cf-Id": "eDbpfDwO-CRYymEFLkW6CBCsU_H_PS8R93_us53QWvXWLS45v3NvQw==", + "X-Amzn-Trace-Id": "Root=1-5e502af4-fd0c1c6fdc164e1d6361183b", + "X-Forwarded-For": "76.76.241.57, 52.46.47.139", + "X-Forwarded-Port": "443", + "X-Forwarded-Proto": "https" + }, + "multiValueHeaders": { + "Accept": [ + "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8" + ], + "Accept-Encoding": [ + "gzip, deflate, br" + ], + "Accept-Language": [ + "en-US,en;q=0.5" + ], + "CloudFront-Forwarded-Proto": [ + "https" + ], + "CloudFront-Is-Desktop-Viewer": [ + "true" + ], + "CloudFront-Is-Mobile-Viewer": [ + "false" + ], + "CloudFront-Is-SmartTV-Viewer": [ + "false" + ], + "CloudFront-Is-Tablet-Viewer": [ + "false" + ], + "CloudFront-Viewer-Country": [ + "US" + ], + "Host": [ + "02plqthge2.execute-api.us-east-1.amazonaws.com" + ], + "upgrade-insecure-requests": [ + "1" + ], + "User-Agent": [ + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:72.0) Gecko/20100101 Firefox/72.0" + ], + "Via": [ + "2.0 969f35f01b6eddd92239a3e818fc1e0d.cloudfront.net (CloudFront)" + ], + "X-Amz-Cf-Id": [ + "eDbpfDwO-CRYymEFLkW6CBCsU_H_PS8R93_us53QWvXWLS45v3NvQw==" + ], + "X-Amzn-Trace-Id": [ + "Root=1-5e502af4-fd0c1c6fdc164e1d6361183b" + ], + "X-Forwarded-For": [ + "76.76.241.57, 52.46.47.139" + ], + "X-Forwarded-Port": [ + "443" + ], + "X-Forwarded-Proto": [ + "https" + ] + }, + "queryStringParameters": null, + "multiValueQueryStringParameters": null, + "pathParameters": null, + "stageVariables": null, + "requestContext": { + "resourceId": "y3tkf7", + "resourcePath": "/fetch_all", + "httpMethod": "GET", + "extendedRequestId": "IQumRELJIAMF6fQ=", + "requestTime": "21/Feb/2020:19:09:40 +0000", + "path": "/dev/fetch_all", + "accountId": "571481734049", + "protocol": "HTTP/1.1", + "stage": "dev", + "domainPrefix": "02plqthge2", + "requestTimeEpoch": 1582312180890, + "requestId": "6f3dffca-46f8-4c8b-800b-6bc1ea2554ec", + "identity": { + "cognitoIdentityPoolId": null, + "accountId": null, + "cognitoIdentityId": null, + "caller": null, + "sourceIp": "76.76.241.57", + "principalOrgId": null, + "accessKey": null, + "cognitoAuthenticationType": null, + "cognitoAuthenticationProvider": null, + "userArn": null, + "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:72.0) Gecko/20100101 Firefox/72.0", + "user": null + }, + "domainName": "02plqthge2.execute-api.us-east-1.amazonaws.com", + "apiId": "02plqthge2" + }, + "body": null, + "isBase64Encoded": false +} \ No newline at end of file diff --git a/tests/contrib/serverless/aws_tests.py b/tests/contrib/serverless/aws_tests.py new file mode 100644 index 000000000..04635a868 --- /dev/null +++ b/tests/contrib/serverless/aws_tests.py @@ -0,0 +1,116 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest # isort:skip + +import json +import os + +from elasticapm.conf import constants +from elasticapm.contrib.serverless.aws import capture_serverless, get_data_from_request, get_data_from_response + + +@pytest.fixture +def event(): + aws_data_file = os.path.join(os.path.dirname(__file__), "aws_test_data.json") + with open(aws_data_file) as f: + return json.load(f) + + +@pytest.fixture +def context(): + return SampleContext() + + +class SampleContext: + """ + Stand-in for AWS lambda context object + """ + + def __init__(self): + self.invoked_function_arn = "arrrrrn" + self.aws_request_id = "12345" + + +def test_request_data(event): + data = get_data_from_request(event, capture_body=True, capture_headers=True) + + assert data["method"] == "GET" + assert data["url"]["full"] == "https://02plqthge2.execute-api.us-east-1.amazonaws.com/dev/fetch_all" + assert data["headers"]["Host"] == "02plqthge2.execute-api.us-east-1.amazonaws.com" + + data = get_data_from_request(event, capture_body=False, capture_headers=False) + + assert data["method"] == "GET" + assert data["url"]["full"] == "https://02plqthge2.execute-api.us-east-1.amazonaws.com/dev/fetch_all" + assert "headers" not in data + + +def test_response_data(): + response = {"statusCode": 200, "headers": {"foo": "bar"}} + + data = get_data_from_response(response, capture_headers=True) + + assert data["status_code"] == 200 + assert data["headers"]["foo"] == "bar" + + data = get_data_from_response(response, capture_headers=False) + + assert data["status_code"] == 200 + assert "headers" not in data + + data = get_data_from_response({}, capture_headers=False) + + assert not data + + +def test_capture_serverless(event, context, elasticapm_client): + + os.environ["AWS_LAMBDA_FUNCTION_NAME"] = "test_func" + + capture_object = capture_serverless() + capture_object.event = event + capture_object.name = "GET" + + @capture_serverless() + def test_func(event, context): + return {"statusCode": 200, "headers": {"foo": "bar"}} + + test_func(event, context) + + assert len(elasticapm_client.events[constants.TRANSACTION]) == 1 + transaction = elasticapm_client.events[constants.TRANSACTION][0] + + assert transaction["name"] == "GET test_func" + assert transaction["result"] == "HTTP 2xx" + assert transaction["span_count"]["started"] == 1 + assert transaction["context"]["request"]["method"] == "GET" + assert transaction["context"]["request"]["headers"] + assert transaction["context"]["response"]["status_code"] == 200 From 00a8fb2cef1359cc526bc516281ff5235c863ce1 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Thu, 22 Jul 2021 11:10:36 -0600 Subject: [PATCH 15/37] Add test data for s3/sqs/sns --- ..._test_data.json => aws_api_test_data.json} | 0 .../contrib/serverless/aws_s3_test_data.json | 38 +++++++++++++++++++ .../contrib/serverless/aws_sns_test_data.json | 22 +++++++++++ .../contrib/serverless/aws_sqs_test_data.json | 20 ++++++++++ tests/contrib/serverless/aws_tests.py | 37 ++++++++++++++---- 5 files changed, 109 insertions(+), 8 deletions(-) rename tests/contrib/serverless/{aws_test_data.json => aws_api_test_data.json} (100%) create mode 100644 tests/contrib/serverless/aws_s3_test_data.json create mode 100644 tests/contrib/serverless/aws_sns_test_data.json create mode 100644 tests/contrib/serverless/aws_sqs_test_data.json diff --git a/tests/contrib/serverless/aws_test_data.json b/tests/contrib/serverless/aws_api_test_data.json similarity index 100% rename from tests/contrib/serverless/aws_test_data.json rename to tests/contrib/serverless/aws_api_test_data.json diff --git a/tests/contrib/serverless/aws_s3_test_data.json b/tests/contrib/serverless/aws_s3_test_data.json new file mode 100644 index 000000000..35192cbd4 --- /dev/null +++ b/tests/contrib/serverless/aws_s3_test_data.json @@ -0,0 +1,38 @@ +{ + "Records": [ + { + "eventVersion": "2.1", + "eventSource": "aws:s3", + "awsRegion": "us-east-1", + "eventTime": "2021-07-22T17:00:56.160Z", + "eventName": "ObjectCreated:Put", + "userIdentity": { + "principalId": "A2RA6A8EUPNS7Q" + }, + "requestParameters": { + "sourceIPAddress": "73.3.110.73" + }, + "responseElements": { + "x-amz-request-id": "0FM18R15SDX52CT2", + "x-amz-id-2": "6XuWW0exU7l4TbWDJmZL3oJgm6g8zgKDzkWM7dcuUKJcvd5mWrBSoPKpPGgINRDiQqwDNQIlVbJ3iWmN/S/e17DkKxT8fuQT" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "2ef153aa-f76e-4c29-9d28-24a5eff172ec", + "bucket": { + "name": "basepitestbucket", + "ownerIdentity": { + "principalId": "A2RA6A8EUPNS7Q" + }, + "arn": "arn:aws:s3:::basepitestbucket" + }, + "object": { + "key": "requirements.txt", + "size": 106, + "eTag": "467344a84741a3a3e6d92a7acc24ba5e", + "sequencer": "0060F9A455E75DC318" + } + } + } + ] +} \ No newline at end of file diff --git a/tests/contrib/serverless/aws_sns_test_data.json b/tests/contrib/serverless/aws_sns_test_data.json new file mode 100644 index 000000000..162cd6edf --- /dev/null +++ b/tests/contrib/serverless/aws_sns_test_data.json @@ -0,0 +1,22 @@ +{ + "Records": [ + { + "EventSource": "aws:sns", + "EventVersion": "1.0", + "EventSubscriptionArn": "arn:aws:sns:us-east-1:268121251715:basepiwstesttopic:4763d52a-56d1-4dc5-99eb-ffb4315587af", + "Sns": { + "Type": "Notification", + "MessageId": "07b92f2d-7087-5d2a-b807-765ed74eed4d", + "TopicArn": "arn:aws:sns:us-east-1:268121251715:basepiwstesttopic", + "Subject": null, + "Message": "hello world", + "Timestamp": "2021-07-22T17:06:17.354Z", + "SignatureVersion": "1", + "Signature": "YBGjMCe1m0QQ0DIWq4gZLy3/0bEyXhLPZJzeo4JYMa2P9ercshfn9s+x9nqd6HSYfO3RG0ebCmzxddgO8UCmaddXbhhMRWYjsIDv3+OvUitG8+bFqvpH/rQVHdCEWla5l+NDcye6d2cl9zuYFliTIFUsBmFcqbiroyZbIIHOczUpxNKK9oQcAXU6RgIl6y30DBgxYmzdMm4FMXPpden84v0LwVOyfqVm2gmeMnlccEOB0TRMe8sLsv7OfWLA3GBl3b14MOUZfvUz4Btb15ssCq++QVHoTQWZnbJ5dA7P3ljMauQCagub0Zefx7uUmWAlczxe/5kREJt8rEfl+pN7Mg==", + "SigningCertUrl": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-010a507c1833636cd94bdb98bd93083a.pem", + "UnsubscribeUrl": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:268121251715:basepiwstesttopic:4763d52a-56d1-4dc5-99eb-ffb4315587af", + "MessageAttributes": {} + } + } + ] +} \ No newline at end of file diff --git a/tests/contrib/serverless/aws_sqs_test_data.json b/tests/contrib/serverless/aws_sqs_test_data.json new file mode 100644 index 000000000..cd92f89fc --- /dev/null +++ b/tests/contrib/serverless/aws_sqs_test_data.json @@ -0,0 +1,20 @@ +{ + "Records": [ + { + "messageId": "e56e2c06-2a44-40ca-94e6-77a7ab9f76e6", + "receiptHandle": "AQEBhXF9S0Ezn+QPRzS76vRv7fTy/H5Tc6y4CnVkUizPZJeioFuEvfhbCFCFPKI0byYuzxuzpTfGsp8p7UrzQpZByrxjSpkzGSnQcomuxjL9kwqOLcsMYJWQfjFeaS6fMeHck8bnUUT7ed+3QqDDOxPtVnz4lFY06AGO+r+TcFpscvXGYvc5ONSJ+haQY9BLGcic44No1kEUK85ifJkIbrEbP2ARzWchxAkodNrlsFKJdjZo7hoQCkpXqp0QBt7aA4OZsFgFbkc2MQz7wJhx+dWRWYK1xLss/MP62Uuu1nXtHimIToBEmaBcjYIYtOypUyH4UABF1CoAsoGqg85831xBtn+L+20cTTZ/llvv234yPI5RM9ydR8AqMk5y0S28Xz6H", + "body": "hello world", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1626973700071", + "SenderId": "268121251715", + "ApproximateFirstReceiveTimestamp": "1626973700075" + }, + "messageAttributes": {}, + "md5OfBody": "5eb63bbbe01eeed093cb22bb8f5acdc3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:268121251715:testqueue", + "awsRegion": "us-east-1" + } + ] +} \ No newline at end of file diff --git a/tests/contrib/serverless/aws_tests.py b/tests/contrib/serverless/aws_tests.py index 04635a868..4885af2f4 100644 --- a/tests/contrib/serverless/aws_tests.py +++ b/tests/contrib/serverless/aws_tests.py @@ -38,8 +38,29 @@ @pytest.fixture -def event(): - aws_data_file = os.path.join(os.path.dirname(__file__), "aws_test_data.json") +def event_api(): + aws_data_file = os.path.join(os.path.dirname(__file__), "aws_api_test_data.json") + with open(aws_data_file) as f: + return json.load(f) + + +@pytest.fixture +def event_s3(): + aws_data_file = os.path.join(os.path.dirname(__file__), "aws_s3_test_data.json") + with open(aws_data_file) as f: + return json.load(f) + + +@pytest.fixture +def event_sqs(): + aws_data_file = os.path.join(os.path.dirname(__file__), "aws_sqs_test_data.json") + with open(aws_data_file) as f: + return json.load(f) + + +@pytest.fixture +def event_sns(): + aws_data_file = os.path.join(os.path.dirname(__file__), "aws_sns_test_data.json") with open(aws_data_file) as f: return json.load(f) @@ -59,14 +80,14 @@ def __init__(self): self.aws_request_id = "12345" -def test_request_data(event): - data = get_data_from_request(event, capture_body=True, capture_headers=True) +def test_request_data(event_api): + data = get_data_from_request(event_api, capture_body=True, capture_headers=True) assert data["method"] == "GET" assert data["url"]["full"] == "https://02plqthge2.execute-api.us-east-1.amazonaws.com/dev/fetch_all" assert data["headers"]["Host"] == "02plqthge2.execute-api.us-east-1.amazonaws.com" - data = get_data_from_request(event, capture_body=False, capture_headers=False) + data = get_data_from_request(event_api, capture_body=False, capture_headers=False) assert data["method"] == "GET" assert data["url"]["full"] == "https://02plqthge2.execute-api.us-east-1.amazonaws.com/dev/fetch_all" @@ -91,19 +112,19 @@ def test_response_data(): assert not data -def test_capture_serverless(event, context, elasticapm_client): +def test_capture_serverless(event_api, context, elasticapm_client): os.environ["AWS_LAMBDA_FUNCTION_NAME"] = "test_func" capture_object = capture_serverless() - capture_object.event = event + capture_object.event = event_api capture_object.name = "GET" @capture_serverless() def test_func(event, context): return {"statusCode": 200, "headers": {"foo": "bar"}} - test_func(event, context) + test_func(event_api, context) assert len(elasticapm_client.events[constants.TRANSACTION]) == 1 transaction = elasticapm_client.events[constants.TRANSACTION][0] From c39c623774ed1d0c84c1867280f190d8f3b75e53 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Thu, 22 Jul 2021 11:11:46 -0600 Subject: [PATCH 16/37] Add some test prints --- elasticapm/contrib/serverless/aws.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index ef14fbd30..12b3fcb65 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -163,7 +163,9 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.client.end_transaction() try: + print("flushing elasticapm") self.client._transport.flush() + print("done flushing elasticapm") except ValueError: logger.warning("flush timed out") From 0d9b92966d43c19b9715b7c936ed8224ba38a52b Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Mon, 2 Aug 2021 11:40:27 -0600 Subject: [PATCH 17/37] Review suggestions for time.py --- elasticapm/utils/time.py | 25 +++++++++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/elasticapm/utils/time.py b/elasticapm/utils/time.py index fdfdfeffb..415bba718 100644 --- a/elasticapm/utils/time.py +++ b/elasticapm/utils/time.py @@ -28,10 +28,13 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +import logging import sys import time CLOCK_DIFF = None +CLOCK_DIFF_UPDATED = 0 +logger = logging.getLogger("elasticapm.utils.time") def time_to_perf_counter(timestamp): @@ -46,8 +49,7 @@ def time_to_perf_counter(timestamp): guaranteed), if a system clock is changed, this function could become very inaccurate. """ - global CLOCK_DIFF - if CLOCK_DIFF is None: + if _clock_diff_stale(): _calculate_clock_diff() return timestamp + CLOCK_DIFF @@ -64,6 +66,7 @@ def _calculate_clock_diff(): approximate equivalent `time.perf_counter()` """ global CLOCK_DIFF + global CLOCK_DIFF_UPDATED best_tolerance = sys.float_info.max for _ in range(10): time1 = time.time() @@ -76,6 +79,24 @@ def _calculate_clock_diff(): if tolerance < best_tolerance: best_tolerance = tolerance CLOCK_DIFF = perftime - timetime + CLOCK_DIFF_UPDATED = time.time() if tolerance < 0.00001: # try to get the two time.time() calls within 20 microseconds break + + if best_tolerance >= 0.00001: + logger.warning( + "Clock diff calculator only reached a tolerance of {}. Some " + "timestamps may be inaccurate as a result.".format(best_tolerance) + ) + + +def _clock_diff_stale(): + """ + Checks if the last CLOCK_DIFF we calculated is older than five minutes old. + If so, we should recalculate. + """ + # Should we make the stale time configurable? + if time.time() - CLOCK_DIFF_UPDATED > 300: + return True + return False From 13d152d238c1cf6ffcdc6cef34d7ed9d6cfcdebe Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Tue, 3 Aug 2021 13:35:25 -0600 Subject: [PATCH 18/37] Remove transaction backdating to match spec --- elasticapm/contrib/serverless/aws.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 12b3fcb65..e96ee21fc 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -86,8 +86,7 @@ def decorated(*args, **kwds): self.event, self.context = {}, {} if not self.client.config.debug and self.client.config.instrument and self.client.config.enabled: with self: - with elasticapm.capture_span(self.name): - self.response = func(*args, **kwds) + self.response = func(*args, **kwds) return self.response else: return func(*args, **kwds) @@ -107,12 +106,8 @@ def __enter__(self): if "httpMethod" in self.event: self.start_time = self.event["requestContext"].get("requestTimeEpoch") if self.start_time: - self.start_time = float(self.start_time) * 0.001 - self.transaction = self.client.begin_transaction( - "request", trace_parent=trace_parent, start=self.start_time - ) - else: - self.transaction = self.client.begin_transaction("request", trace_parent=trace_parent) + self.start_time = float(self.start_time) * 0.001 # Epoch in seconds + self.transaction = self.client.begin_transaction("request", trace_parent=trace_parent) elasticapm.set_context( lambda: get_data_from_request( self.event, From 44d2bd8a841898312bb1f7801a16102560dece1b Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Tue, 3 Aug 2021 13:55:18 -0600 Subject: [PATCH 19/37] Fix metrics_interval suffix --- elasticapm/contrib/serverless/aws.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index e96ee21fc..67da6cd99 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -61,7 +61,7 @@ def __init__(self, name=None, **kwargs): self.response = None # Disable all background threads except for transport - kwargs["metrics_interval"] = 0 + kwargs["metrics_interval"] = "0ms" kwargs["central_config"] = False kwargs["cloud_provider"] = "none" kwargs["framework_name"] = "AWS Lambda" From 135ebf9e22e86036df5ad2dbd811c7aaa8cb3a0c Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Thu, 5 Aug 2021 11:14:32 -0600 Subject: [PATCH 20/37] Add tests for s3/sns/sqs/s3_batch --- elasticapm/contrib/serverless/aws.py | 24 +++-- .../serverless/aws_s3_batch_test_data.json | 72 ++++++++++++++ tests/contrib/serverless/aws_tests.py | 93 ++++++++++++++++++- 3 files changed, 177 insertions(+), 12 deletions(-) create mode 100644 tests/contrib/serverless/aws_s3_batch_test_data.json diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 67da6cd99..d012961e8 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -103,11 +103,11 @@ def __enter__(self): cold_start = COLD_START COLD_START = False - if "httpMethod" in self.event: - self.start_time = self.event["requestContext"].get("requestTimeEpoch") - if self.start_time: - self.start_time = float(self.start_time) * 0.001 # Epoch in seconds - self.transaction = self.client.begin_transaction("request", trace_parent=trace_parent) + self.transaction = self.client.begin_transaction("request", trace_parent=trace_parent) + self.source = "other" + + if "httpMethod" in self.event: # API Gateway + self.source = "api" elasticapm.set_context( lambda: get_data_from_request( self.event, @@ -122,8 +122,18 @@ def __enter__(self): ) else: elasticapm.set_transaction_name(self.name, override=False) - else: - self.transaction = self.client.begin_transaction("request", trace_parent=trace_parent) + elif "Records" in self.event and len(self.event["Records"]) == 1: + record = self.event["Records"][0] + if record.get("eventSource") == "aws:s3": # S3 + self.source = "s3" + elasticapm.set_transaction_name("{} {}".format(record["eventName"], record["s3"]["bucket"]["name"])) + elif record.get("EventSource") == "aws:sns": # SNS + self.source = "sns" + elasticapm.set_transaction_name("RECEIVE {}".format(record["Sns"]["TopicArn"].split(":")[5])) + elif record.get("eventSource") == "aws:sqs": # SQS + self.source = "sqs" + elasticapm.set_transaction_name("RECEIVE {}".format(record["eventSourceARN"].split(":")[5])) + else: # Other elasticapm.set_transaction_name(os.environ.get("AWS_LAMBDA_FUNCTION_NAME", self.name), override=False) elasticapm.set_context( diff --git a/tests/contrib/serverless/aws_s3_batch_test_data.json b/tests/contrib/serverless/aws_s3_batch_test_data.json new file mode 100644 index 000000000..35187388a --- /dev/null +++ b/tests/contrib/serverless/aws_s3_batch_test_data.json @@ -0,0 +1,72 @@ +{ + "Records": [ + { + "eventVersion": "2.1", + "eventSource": "aws:s3", + "awsRegion": "us-east-1", + "eventTime": "2021-07-22T17:00:56.160Z", + "eventName": "ObjectCreated:Put", + "userIdentity": { + "principalId": "A2RA6A8EUPNS7Q" + }, + "requestParameters": { + "sourceIPAddress": "73.3.110.73" + }, + "responseElements": { + "x-amz-request-id": "0FM18R15SDX52CT2", + "x-amz-id-2": "6XuWW0exU7l4TbWDJmZL3oJgm6g8zgKDzkWM7dcuUKJcvd5mWrBSoPKpPGgINRDiQqwDNQIlVbJ3iWmN/S/e17DkKxT8fuQT" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "2ef153aa-f76e-4c29-9d28-24a5eff172ec", + "bucket": { + "name": "basepitestbucket", + "ownerIdentity": { + "principalId": "A2RA6A8EUPNS7Q" + }, + "arn": "arn:aws:s3:::basepitestbucket" + }, + "object": { + "key": "requirements.txt", + "size": 106, + "eTag": "467344a84741a3a3e6d92a7acc24ba5e", + "sequencer": "0060F9A455E75DC318" + } + } + }, + { + "eventVersion": "2.1", + "eventSource": "aws:s3", + "awsRegion": "us-east-1", + "eventTime": "2021-07-22T17:00:56.160Z", + "eventName": "ObjectCreated:Put", + "userIdentity": { + "principalId": "A2RA6A8EUPNS7Q" + }, + "requestParameters": { + "sourceIPAddress": "73.3.110.73" + }, + "responseElements": { + "x-amz-request-id": "0FM18R15SDX52CT2", + "x-amz-id-2": "6XuWW0exU7l4TbWDJmZL3oJgm6g8zgKDzkWM7dcuUKJcvd5mWrBSoPKpPGgINRDiQqwDNQIlVbJ3iWmN/S/e17DkKxT8fuQT" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "2ef153aa-f76e-4c29-9d28-24a5eff172ec", + "bucket": { + "name": "basepitestbucket", + "ownerIdentity": { + "principalId": "A2RA6A8EUPNS7Q" + }, + "arn": "arn:aws:s3:::basepitestbucket" + }, + "object": { + "key": "requirements.txt", + "size": 106, + "eTag": "467344a84741a3a3e6d92a7acc24ba5e", + "sequencer": "0060F9A455E75DC318" + } + } + } + ] +} \ No newline at end of file diff --git a/tests/contrib/serverless/aws_tests.py b/tests/contrib/serverless/aws_tests.py index 4885af2f4..4a6a23489 100644 --- a/tests/contrib/serverless/aws_tests.py +++ b/tests/contrib/serverless/aws_tests.py @@ -32,7 +32,9 @@ import json import os +import time +from elasticapm import capture_span from elasticapm.conf import constants from elasticapm.contrib.serverless.aws import capture_serverless, get_data_from_request, get_data_from_response @@ -51,6 +53,13 @@ def event_s3(): return json.load(f) +@pytest.fixture +def event_s3_batch(): + aws_data_file = os.path.join(os.path.dirname(__file__), "aws_s3_batch_test_data.json") + with open(aws_data_file) as f: + return json.load(f) + + @pytest.fixture def event_sqs(): aws_data_file = os.path.join(os.path.dirname(__file__), "aws_sqs_test_data.json") @@ -112,16 +121,14 @@ def test_response_data(): assert not data -def test_capture_serverless(event_api, context, elasticapm_client): +def test_capture_serverless_api_gateway(event_api, context, elasticapm_client): os.environ["AWS_LAMBDA_FUNCTION_NAME"] = "test_func" - capture_object = capture_serverless() - capture_object.event = event_api - capture_object.name = "GET" - @capture_serverless() def test_func(event, context): + with capture_span("test_span"): + time.sleep(0.01) return {"statusCode": 200, "headers": {"foo": "bar"}} test_func(event_api, context) @@ -135,3 +142,79 @@ def test_func(event, context): assert transaction["context"]["request"]["method"] == "GET" assert transaction["context"]["request"]["headers"] assert transaction["context"]["response"]["status_code"] == 200 + + +def test_capture_serverless_s3(event_s3, context, elasticapm_client): + + os.environ["AWS_LAMBDA_FUNCTION_NAME"] = "test_func" + + @capture_serverless() + def test_func(event, context): + with capture_span("test_span"): + time.sleep(0.01) + return + + test_func(event_s3, context) + + assert len(elasticapm_client.events[constants.TRANSACTION]) == 1 + transaction = elasticapm_client.events[constants.TRANSACTION][0] + + assert transaction["name"] == "ObjectCreated:Put basepitestbucket" + assert transaction["span_count"]["started"] == 1 + + +def test_capture_serverless_sns(event_sns, context, elasticapm_client): + + os.environ["AWS_LAMBDA_FUNCTION_NAME"] = "test_func" + + @capture_serverless() + def test_func(event, context): + with capture_span("test_span"): + time.sleep(0.01) + return + + test_func(event_sns, context) + + assert len(elasticapm_client.events[constants.TRANSACTION]) == 1 + transaction = elasticapm_client.events[constants.TRANSACTION][0] + + assert transaction["name"] == "RECEIVE basepiwstesttopic" + assert transaction["span_count"]["started"] == 1 + + +def test_capture_serverless_sqs(event_sqs, context, elasticapm_client): + + os.environ["AWS_LAMBDA_FUNCTION_NAME"] = "test_func" + + @capture_serverless() + def test_func(event, context): + with capture_span("test_span"): + time.sleep(0.01) + return + + test_func(event_sqs, context) + + assert len(elasticapm_client.events[constants.TRANSACTION]) == 1 + transaction = elasticapm_client.events[constants.TRANSACTION][0] + + assert transaction["name"] == "RECEIVE testqueue" + assert transaction["span_count"]["started"] == 1 + + +def test_capture_serverless_s3_batch(event_s3_batch, context, elasticapm_client): + + os.environ["AWS_LAMBDA_FUNCTION_NAME"] = "test_func" + + @capture_serverless() + def test_func(event, context): + with capture_span("test_span"): + time.sleep(0.01) + return + + test_func(event_s3_batch, context) + + assert len(elasticapm_client.events[constants.TRANSACTION]) == 1 + transaction = elasticapm_client.events[constants.TRANSACTION][0] + + assert transaction["name"] == "test_func" + assert transaction["span_count"]["started"] == 1 From e97b2bee1feecf9df427b0468c9685fa90ae4074 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Fri, 1 Oct 2021 11:54:24 -0600 Subject: [PATCH 21/37] Disable server version checks --- elasticapm/contrib/serverless/aws.py | 1 + 1 file changed, 1 insertion(+) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index d012961e8..7fa1134c0 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -69,6 +69,7 @@ def __init__(self, name=None, **kwargs): kwargs["service_name"] = os.environ["AWS_LAMBDA_FUNCTION_NAME"] self.client = get_client() + self.client.server_version = (8, 0, 0) if not self.client: self.client = Client(**kwargs) if not self.client.config.debug and self.client.config.instrument and self.client.config.enabled: From 0bf0f1cf2776a041ae8ca05593f1e728add4cc2b Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Fri, 1 Oct 2021 12:10:24 -0600 Subject: [PATCH 22/37] Make sure the client exists first --- elasticapm/contrib/serverless/aws.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 7fa1134c0..8a9eca0d1 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -69,9 +69,9 @@ def __init__(self, name=None, **kwargs): kwargs["service_name"] = os.environ["AWS_LAMBDA_FUNCTION_NAME"] self.client = get_client() - self.client.server_version = (8, 0, 0) if not self.client: self.client = Client(**kwargs) + self.client.server_version = (8, 0, 0) if not self.client.config.debug and self.client.config.instrument and self.client.config.enabled: elasticapm.instrument() From 30b96c3062cbc679d3ac9112765c25c46c5a47af Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Mon, 4 Oct 2021 10:46:59 -0600 Subject: [PATCH 23/37] Add `server_version_override` config for skipping server version checks --- elasticapm/conf/__init__.py | 3 +++ elasticapm/contrib/serverless/aws.py | 3 ++- elasticapm/transport/http.py | 5 ++++- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/elasticapm/conf/__init__.py b/elasticapm/conf/__init__.py index 2a35d4d60..acc641571 100644 --- a/elasticapm/conf/__init__.py +++ b/elasticapm/conf/__init__.py @@ -516,6 +516,9 @@ class Config(_ConfigBase): debug = _BoolConfigValue("DEBUG", default=False) server_url = _ConfigValue("SERVER_URL", default="http://localhost:8200", required=True) server_cert = _ConfigValue("SERVER_CERT", validators=[FileIsReadableValidator()]) + server_version_override = _ConfigValue( + "SERVER_VERSION_OVERRIDE", validators=[RegexValidator(r"^[0-9]+\.[0-9]+\.[0-9]+$")] + ) verify_server_cert = _BoolConfigValue("VERIFY_SERVER_CERT", default=True) use_certifi = _BoolConfigValue("USE_CERTIFI", default=True) include_paths = _ListConfigValue("INCLUDE_PATHS") diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 8a9eca0d1..e7dc1acd7 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -65,13 +65,14 @@ def __init__(self, name=None, **kwargs): kwargs["central_config"] = False kwargs["cloud_provider"] = "none" kwargs["framework_name"] = "AWS Lambda" + # TODO this can probably be removed once the extension proxies the serverinfo endpoint + kwargs["server_version_override"] = "8.0.0" if "service_name" not in kwargs: kwargs["service_name"] = os.environ["AWS_LAMBDA_FUNCTION_NAME"] self.client = get_client() if not self.client: self.client = Client(**kwargs) - self.client.server_version = (8, 0, 0) if not self.client.config.debug and self.client.config.instrument and self.client.config.enabled: elasticapm.instrument() diff --git a/elasticapm/transport/http.py b/elasticapm/transport/http.py index e274130d1..d21e4875f 100644 --- a/elasticapm/transport/http.py +++ b/elasticapm/transport/http.py @@ -170,7 +170,10 @@ def get_config(self, current_version=None, keys=None): def _process_queue(self): if not self.client.server_version: - self.fetch_server_info() + if self.client.config.server_version_override: + self.client.server_version = version_string_to_tuple(self.client.config.server_version_override) + else: + self.fetch_server_info() super()._process_queue() def fetch_server_info(self): From 349eab52b861a0f5952c0c41338ae932cbd4c611 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Thu, 7 Oct 2021 11:52:05 -0600 Subject: [PATCH 24/37] Switch to a JIT model for writing metadata to transport buffer --- elasticapm/transport/base.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/elasticapm/transport/base.py b/elasticapm/transport/base.py index 6946a553a..dcc416954 100644 --- a/elasticapm/transport/base.py +++ b/elasticapm/transport/base.py @@ -142,6 +142,9 @@ def _process_queue(self): if data is not None: data = self._process_event(event_type, data) if data is not None: + if not buffer_written: + # Write metadata just in time to allow for late metadata changes (such as in lambda) + self._write_metadata(buffer) buffer.write((self._json_serializer({event_type: data}) + "\n").encode("utf-8")) buffer_written = True self._counts[event_type] += 1 @@ -201,9 +204,20 @@ def _process_event(self, event_type, data): def _init_buffer(self): buffer = gzip.GzipFile(fileobj=compat.BytesIO(), mode="w", compresslevel=self._compress_level) + return buffer + + def _write_metadata(self, buffer): data = (self._json_serializer({"metadata": self._metadata}) + "\n").encode("utf-8") buffer.write(data) - return buffer + + def add_metadata(self, data): + """ + Add additional metadata do the dictionary + + Only used in specific instances where metadata relies on data we only + have at request time, such as for lambda metadata + """ + self._metadata.update(data) def _init_event_queue(self, chill_until, max_chill_time): # some libraries like eventlet monkeypatch queue.Queue and switch out the implementation. From 3f5253725be1f84d07c0e69f45a5f946b800b2c2 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Tue, 12 Oct 2021 13:37:06 -0600 Subject: [PATCH 25/37] Fix for uninitialized metadata (probably test-only) --- elasticapm/transport/base.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/elasticapm/transport/base.py b/elasticapm/transport/base.py index dcc416954..d67631fd6 100644 --- a/elasticapm/transport/base.py +++ b/elasticapm/transport/base.py @@ -216,8 +216,16 @@ def add_metadata(self, data): Only used in specific instances where metadata relies on data we only have at request time, such as for lambda metadata + + Note that metadata is not merged. Any key that is present in the + added metadata will overwrite that key in the original metadata. + + TODO: should we be merging? """ - self._metadata.update(data) + if self._metadata is not None: + self._metadata.update(data) + else: + self._metadata = data def _init_event_queue(self, chill_until, max_chill_time): # some libraries like eventlet monkeypatch queue.Queue and switch out the implementation. From 81ac8f9d72a06d0378013efbfeb08340d5de6ba7 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Tue, 12 Oct 2021 13:37:29 -0600 Subject: [PATCH 26/37] Add metadata and context for everything except s3 --- elasticapm/contrib/serverless/aws.py | 189 +++++++++++++++++--------- tests/contrib/serverless/aws_tests.py | 2 +- 2 files changed, 129 insertions(+), 62 deletions(-) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index e7dc1acd7..e870df311 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -29,9 +29,11 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import base64 +import datetime import functools import json import os +import time import elasticapm from elasticapm.base import Client, get_client @@ -105,51 +107,42 @@ def __enter__(self): cold_start = COLD_START COLD_START = False - self.transaction = self.client.begin_transaction("request", trace_parent=trace_parent) self.source = "other" + transaction_type = "request" + transaction_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME", self.name) if "httpMethod" in self.event: # API Gateway self.source = "api" - elasticapm.set_context( - lambda: get_data_from_request( - self.event, - capture_body=self.client.config.capture_body in ("transactions", "all"), - capture_headers=self.client.config.capture_headers, - ), - "request", - ) if os.environ.get("AWS_LAMBDA_FUNCTION_NAME"): - elasticapm.set_transaction_name( - "{} {}".format(self.event["httpMethod"], os.environ["AWS_LAMBDA_FUNCTION_NAME"]) - ) + transaction_name = "{} {}".format(self.event["httpMethod"], os.environ["AWS_LAMBDA_FUNCTION_NAME"]) else: - elasticapm.set_transaction_name(self.name, override=False) + transaction_name = self.name elif "Records" in self.event and len(self.event["Records"]) == 1: record = self.event["Records"][0] if record.get("eventSource") == "aws:s3": # S3 self.source = "s3" - elasticapm.set_transaction_name("{} {}".format(record["eventName"], record["s3"]["bucket"]["name"])) + transaction_name = "{} {}".format(record["eventName"], record["s3"]["bucket"]["name"]) elif record.get("EventSource") == "aws:sns": # SNS self.source = "sns" - elasticapm.set_transaction_name("RECEIVE {}".format(record["Sns"]["TopicArn"].split(":")[5])) + transaction_type = "messaging" + transaction_name = "RECEIVE {}".format(record["Sns"]["TopicArn"].split(":")[5]) elif record.get("eventSource") == "aws:sqs": # SQS self.source = "sqs" - elasticapm.set_transaction_name("RECEIVE {}".format(record["eventSourceARN"].split(":")[5])) - else: # Other - elasticapm.set_transaction_name(os.environ.get("AWS_LAMBDA_FUNCTION_NAME", self.name), override=False) - - elasticapm.set_context( - lambda: get_faas_data( - self.event, - self.context, - cold_start, - ), - "faas", - ) - elasticapm.set_context({"runtime": {"name": os.environ.get("AWS_EXECUTION_ENV")}}, "service") - elasticapm.set_context( - {"provider": "aws", "region": os.environ.get("AWS_REGION"), "service": {"name": "lambda"}}, "cloud" - ) + transaction_type = "messaging" + transaction_name = "RECEIVE {}".format(record["eventSourceARN"].split(":")[5]) + + self.transaction = self.client.begin_transaction(transaction_type, trace_parent=trace_parent) + elasticapm.set_transaction_name(transaction_name, override=False) + if self.source == "api": + elasticapm.set_context( + lambda: get_data_from_request( + self.event, + capture_body=self.client.config.capture_body in ("transactions", "all"), + capture_headers=self.client.config.capture_headers, + ), + "request", + ) + self.set_metadata_and_context(cold_start) def __exit__(self, exc_type, exc_val, exc_tb): """ @@ -176,6 +169,110 @@ def __exit__(self, exc_type, exc_val, exc_tb): except ValueError: logger.warning("flush timed out") + def set_metadata_and_context(self, coldstart): + """ + Process the metadata and context fields for this request + """ + metadata = {} + cloud_context = {"origin": {"provider": "aws"}} + service_context = {} + message_context = {} + + faas = {} + faas["coldstart"] = coldstart + faas["trigger"] = {"type": "other"} + faas["execution"] = self.context.aws_request_id + + if self.source == "api": + faas["trigger"]["type"] = "http" + faas["trigger"]["request_id"] = self.event["requestContext"]["requestId"] + service_context["origin"] = { + "name": "{} {}/{}".format( + self.event["requestContext"]["httpMethod"], + self.event["requestContext"]["resourcePath"], + self.event["requestContext"]["stage"], + ) + } + service_context["origin"]["id"] = self.event["requestContext"]["apiId"] + service_context["origin"]["version"] = "2.0" if self.event["headers"]["Via"].startswith("2.0") else "1.0" + cloud_context["origin"] = {} + cloud_context["origin"]["service"] = {"name": "api gateway"} + cloud_context["origin"]["account"] = {"id": self.event["requestContext"]["accountId"]} + cloud_context["origin"]["provider"] = "aws" + elif self.source == "sqs": + record = self.event["Records"][0] + faas["trigger"]["type"] = "pubsub" + faas["trigger"]["request_id"] = record["messageId"] + service_context["origin"] = {} + service_context["origin"]["name"] = record["eventSourceARN"].split(":")[5] + service_context["origin"]["id"] = record["eventSourceARN"] + cloud_context["origin"] = {} + cloud_context["origin"]["service"] = {"name": "sqs"} + cloud_context["origin"]["region"] = record["awsRegion"] + cloud_context["origin"]["account"] = {"id": record["eventSourceARN"].split(":")[4]} + cloud_context["origin"]["provider"] = "aws" + message_context["queue"] = record["eventSourceARN"] + if "SentTimestamp" in record["attributes"]: + message_context["age"] = int((time.time() * 1000) - int(record["attributes"]["SentTimestamp"])) + if self.client.config.capture_body in ("transactions", "all") and "body" in record: + message_context["body"] = record["body"] + if self.client.config.capture_headers and record["messageAttributes"]: + message_context["headers"] = record["messageAttributes"] + elif self.source == "sns": + record = self.event["Records"][0] + faas["trigger"]["type"] = "pubsub" + faas["trigger"]["request_id"] = record["Sns"]["TopicArn"] + service_context["origin"] = {} + service_context["origin"]["name"] = record["Sns"]["TopicArn"].split(":")[5] + service_context["origin"]["id"] = record["Sns"]["TopicArn"] + service_context["origin"]["version"] = record["EventVersion"] + service_context["origin"]["service"] = {"name": "sns"} + cloud_context["origin"] = {} + cloud_context["origin"]["region"] = record["Sns"]["TopicArn"].split(":")[3] + cloud_context["origin"]["account_id"] = record["Sns"]["TopicArn"].split(":")[4] + cloud_context["origin"]["provider"] = "aws" + message_context["queue"] = record["Sns"]["TopicArn"] + if "Timestamp" in record["Sns"]: + message_context["age"] = int( + ( + datetime.datetime.now() + - datetime.datetime.strptime(record["Sns"]["Timestamp"], r"%Y-%m-%dT%H:%M:%S.%fZ") + ).total_seconds() + * 1000 + ) + if self.client.config.capture_body in ("transactions", "all") and "Message" in record["Sns"]: + message_context["body"] = record["Sns"]["Message"] + if self.client.config.capture_headers and record["Sns"]["MessageAttributes"]: + message_context["headers"] = record["Sns"]["MessageAttributes"] + elif self.source == "s3": + # TODO s3 + faas["trigger"]["type"] = "other" + + metadata["faas"] = faas + + metadata["service"] = {} + metadata["service"]["name"] = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") + metadata["service"]["framework"] = {"name": "AWS Lambda"} + metadata["service"]["runtime"] = {"name": os.environ.get("AWS_EXECUTION_ENV")} + arn = self.context.invoked_function_arn + if len(arn.split(":")) > 7: + arn = ":".join(arn.split(":")[:7]) + metadata["service"]["id"] = arn + metadata["service"]["version"] = os.environ.get("AWS_LAMBDA_FUNCTION_VERSION") + metadata["service"]["node"] = {"configured_name": os.environ.get("AWS_LAMBDA_LOG_STREAM_NAME")} + + metadata["cloud"] = {} + metadata["cloud"]["provider"] = "aws" + metadata["cloud"]["region"] = os.environ.get("AWS_REGION") + metadata["cloud"]["service"] = {"name": "lambda"} + metadata["cloud"]["account"] = {"id": arn.split(":")[4]} + + elasticapm.set_context(cloud_context, "cloud") + elasticapm.set_context(service_context, "service") + if message_context: + elasticapm.set_context(service_context, "message") + self.client._transport.add_metadata(metadata) + def get_data_from_request(event, capture_body=False, capture_headers=True): """ @@ -251,33 +348,3 @@ def get_url_dict(event): if query: url_dict["search"] = encoding.keyword_field(query) return url_dict - - -def get_faas_data(event, context, coldstart): - """ - Compile the faas context using the event and context - """ - faas = {} - faas["coldstart"] = coldstart - faas["id"] = context.invoked_function_arn # TODO remove alias suffix - faas["execution"] = context.aws_request_id - faas["name"] = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") - faas["version"] = os.environ.get("AWS_LAMBDA_FUNCTION_VERSION") - faas["instance"] = {"id": os.environ.get("AWS_LAMBDA_LOG_STREAM_NAME")} # TODO double check in final spec - - faas["trigger"] = {} - faas["trigger"]["type"] = "other" - - # Trigger type - if "httpMethod" in event: - faas["trigger"]["type"] = "http" - faas["trigger"]["id"] = event["requestContext"]["apiId"] - faas["trigger"]["name"] = "{} {}/{}".format( - event["httpMethod"], event["requestContext"]["resourcePath"], event["requestContext"]["stage"] - ) - faas["trigger"]["account"] = {"id": event["requestContext"]["accountId"]} - faas["trigger"]["version"] = "2.0" if event["requestContext"].get("requestTimeEpoch") else "1.0" - faas["trigger"]["request_id"] = event["requestContext"]["requestId"] - # TODO sns/sqs/s3 - - return faas diff --git a/tests/contrib/serverless/aws_tests.py b/tests/contrib/serverless/aws_tests.py index 4a6a23489..c5a48d966 100644 --- a/tests/contrib/serverless/aws_tests.py +++ b/tests/contrib/serverless/aws_tests.py @@ -85,7 +85,7 @@ class SampleContext: """ def __init__(self): - self.invoked_function_arn = "arrrrrn" + self.invoked_function_arn = "arn:aws:lambda:us-west-2:123456789012:function:my-function:someAlias" self.aws_request_id = "12345" From 679695ccdbd243af4b37798540b52caa9378620d Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Wed, 13 Oct 2021 10:18:31 -0600 Subject: [PATCH 27/37] Merge metadata one key deep --- CHANGELOG.asciidoc | 13 +++++++------ elasticapm/transport/base.py | 12 +++++++----- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index f42eaa0a1..669407cc9 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -34,15 +34,16 @@ endif::[] // Unreleased changes go here // When the next release happens, nest these changes under the "Python Agent version 6.x" heading -//[float] -//===== Features -// -// +[float] +===== Features + +* Add experimental support for lambda instrumentation {pull}1193[#1193] + [float] ===== Bug fixes - * Ensure that Prometheus histograms are encoded correctly for APM Server {pull}1354[#1354] - * Fix structlog processor to correctly populate ECS-compatible `event.dataset` {pull}1352[#1352] +* Ensure that Prometheus histograms are encoded correctly for APM Server {pull}1354[#1354] +* Fix structlog processor to correctly populate ECS-compatible `event.dataset` {pull}1352[#1352] [[release-notes-6.x]] === Python Agent version 6.x diff --git a/elasticapm/transport/base.py b/elasticapm/transport/base.py index d67631fd6..8e96e46a3 100644 --- a/elasticapm/transport/base.py +++ b/elasticapm/transport/base.py @@ -217,13 +217,15 @@ def add_metadata(self, data): Only used in specific instances where metadata relies on data we only have at request time, such as for lambda metadata - Note that metadata is not merged. Any key that is present in the - added metadata will overwrite that key in the original metadata. - - TODO: should we be merging? + Metadata is only merged one key deep. """ if self._metadata is not None: - self._metadata.update(data) + # Merge one key deep + for key, val in data.items(): + if isinstance(val, dict) and key in self._metadata and isinstance(self._metadata[key], dict): + self._metadata[key].update(val) + else: + self._metadata[key] = val else: self._metadata = data From 996e7a5b059d39548f2404245e5f6dd3a7853ed8 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Wed, 13 Oct 2021 10:25:41 -0600 Subject: [PATCH 28/37] Add s3 context --- elasticapm/contrib/serverless/aws.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index e870df311..ae9c455a5 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -245,8 +245,17 @@ def set_metadata_and_context(self, coldstart): if self.client.config.capture_headers and record["Sns"]["MessageAttributes"]: message_context["headers"] = record["Sns"]["MessageAttributes"] elif self.source == "s3": - # TODO s3 - faas["trigger"]["type"] = "other" + record = self.event["Records"][0] + faas["trigger"]["type"] = "datasource" + faas["trigger"]["request_id"] = record["responseElements"]["x-amz-request-id"] + service_context["origin"] = {} + service_context["origin"]["name"] = record["s3"]["bucket"]["name"] + service_context["origin"]["id"] = record["s3"]["bucket"]["arn"] + service_context["origin"]["version"] = record["eventVersion"] + cloud_context["origin"] = {} + cloud_context["origin"]["service"] = {"name": "s3"} + cloud_context["origin"]["region"] = record["awsRegion"] + cloud_context["origin"]["provider"] = "aws" metadata["faas"] = faas From fea623a2de5734a0052b02957ed1c5a7aedcfa3a Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Wed, 13 Oct 2021 11:05:26 -0600 Subject: [PATCH 29/37] Manage metadata overrides --- elasticapm/contrib/serverless/aws.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index ae9c455a5..1ff48c812 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -33,6 +33,7 @@ import functools import json import os +import platform import time import elasticapm @@ -262,13 +263,22 @@ def set_metadata_and_context(self, coldstart): metadata["service"] = {} metadata["service"]["name"] = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") metadata["service"]["framework"] = {"name": "AWS Lambda"} - metadata["service"]["runtime"] = {"name": os.environ.get("AWS_EXECUTION_ENV")} + metadata["service"]["runtime"] = { + "name": os.environ.get("AWS_EXECUTION_ENV"), + "version": platform.python_version(), + } arn = self.context.invoked_function_arn if len(arn.split(":")) > 7: arn = ":".join(arn.split(":")[:7]) metadata["service"]["id"] = arn metadata["service"]["version"] = os.environ.get("AWS_LAMBDA_FUNCTION_VERSION") metadata["service"]["node"] = {"configured_name": os.environ.get("AWS_LAMBDA_LOG_STREAM_NAME")} + # This is the one piece of metadata that requires deep merging. We add it manually + # here to avoid having to deep merge in _transport.add_metadata() + if self.client._transport._metadata: + node_name = self.client._transport._metadata.get("service", {}).get("node", {}).get("name") + if node_name: + metadata["service"]["node"]["name"] = node_name metadata["cloud"] = {} metadata["cloud"]["provider"] = "aws" From 8fcc05fec0583f4263dbbde18bb068e19e1286ec Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Wed, 13 Oct 2021 11:16:55 -0600 Subject: [PATCH 30/37] Better logging --- elasticapm/contrib/serverless/aws.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 1ff48c812..b141f0a4f 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -164,9 +164,9 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.client.end_transaction() try: - print("flushing elasticapm") + logger.debug("flushing elasticapm") self.client._transport.flush() - print("done flushing elasticapm") + logger.debug("done flushing elasticapm") except ValueError: logger.warning("flush timed out") From ac8c828669d5f584858c58e03d538db21e5f81aa Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Thu, 14 Oct 2021 11:01:01 -0600 Subject: [PATCH 31/37] Remove time_to_perf_counter stuff (will submit in different PR) --- elasticapm/traces.py | 21 +------- elasticapm/utils/time.py | 102 --------------------------------------- 2 files changed, 1 insertion(+), 122 deletions(-) delete mode 100644 elasticapm/utils/time.py diff --git a/elasticapm/traces.py b/elasticapm/traces.py index 10c5e0a1a..8cce0eee8 100644 --- a/elasticapm/traces.py +++ b/elasticapm/traces.py @@ -43,7 +43,6 @@ from elasticapm.utils import compat, encoding, get_name_from_func from elasticapm.utils.disttracing import TraceParent, TracingOptions from elasticapm.utils.logging import get_logger -from elasticapm.utils.time import time_to_perf_counter __all__ = ("capture_span", "label", "set_transaction_name", "set_custom_context", "set_user_context") @@ -130,28 +129,10 @@ class Transaction(BaseSpan): def __init__( self, tracer, transaction_type="custom", trace_parent=None, is_sampled=True, start=None, sample_rate=None ): - """ - tracer - Tracer object - transaction_type - Transaction type - trace_parent - TraceParent object representing the parent trace and trace state - is_sampled - Whether or not this transaction is sampled - start - Optional start timestamp. This is expected to be an epoch timestamp - in seconds (such as from `time.time()`). If it is not, it's recommended - that a `duration` is passed into the `end()` method. - sample_rate - Sample rate which was used to decide whether to sample this transaction. - This is reported to the APM server so that unsampled transactions can - be extrapolated. - """ self.id = self.get_dist_tracing_id() self.trace_parent = trace_parent if start: - self.timestamp, self.start_time = start, time_to_perf_counter(start) + self.timestamp = self.start_time = start else: self.timestamp, self.start_time = time.time(), _time_func() self.name = None diff --git a/elasticapm/utils/time.py b/elasticapm/utils/time.py deleted file mode 100644 index 415bba718..000000000 --- a/elasticapm/utils/time.py +++ /dev/null @@ -1,102 +0,0 @@ -# BSD 3-Clause License -# -# Copyright (c) 2019, Elasticsearch BV -# All rights reserved. -# -# Redistribution and use in source and binary forms, with or without -# modification, are permitted provided that the following conditions are met: -# -# * Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# * Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation -# and/or other materials provided with the distribution. -# -# * Neither the name of the copyright holder nor the names of its -# contributors may be used to endorse or promote products derived from -# this software without specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -import logging -import sys -import time - -CLOCK_DIFF = None -CLOCK_DIFF_UPDATED = 0 -logger = logging.getLogger("elasticapm.utils.time") - - -def time_to_perf_counter(timestamp): - """ - This function converts a given epoch timestamp in seconds (typically from - `time.time()`) to the "equivalent" result from `time.perf_counter()`. - - Note that because these functions vary in their resolution and tick rate, - this is only a close approximation. - - Note also that because `time.time()` is *usually* monotonic (but not - guaranteed), if a system clock is changed, this function could become - very inaccurate. - """ - if _clock_diff_stale(): - _calculate_clock_diff() - - return timestamp + CLOCK_DIFF - - -def _calculate_clock_diff(): - """ - Calculate the difference between `time.perf_counter()` and `time.time()` - - Uses multiple measurements to try to minimize the tolerance in the - measurements. - - The resulting CLOCK_DIFF can be added to any `time.time()` result to get the - approximate equivalent `time.perf_counter()` - """ - global CLOCK_DIFF - global CLOCK_DIFF_UPDATED - best_tolerance = sys.float_info.max - for _ in range(10): - time1 = time.time() - perftime = time.perf_counter() - time2 = time.time() - - tolerance = (time2 - time1) / 2 - timetime = time1 + tolerance - - if tolerance < best_tolerance: - best_tolerance = tolerance - CLOCK_DIFF = perftime - timetime - CLOCK_DIFF_UPDATED = time.time() - - if tolerance < 0.00001: # try to get the two time.time() calls within 20 microseconds - break - - if best_tolerance >= 0.00001: - logger.warning( - "Clock diff calculator only reached a tolerance of {}. Some " - "timestamps may be inaccurate as a result.".format(best_tolerance) - ) - - -def _clock_diff_stale(): - """ - Checks if the last CLOCK_DIFF we calculated is older than five minutes old. - If so, we should recalculate. - """ - # Should we make the stale time configurable? - if time.time() - CLOCK_DIFF_UPDATED > 300: - return True - return False From 4746f93fca1000931312846b8fcb5b948d561bfe Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Fri, 15 Oct 2021 09:22:45 -0600 Subject: [PATCH 32/37] Clarify CHANGELOG and add a usage example --- CHANGELOG.asciidoc | 2 +- elasticapm/contrib/serverless/aws.py | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 4c29b6de3..976ec658c 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -37,7 +37,7 @@ endif::[] [float] ===== Features -* Add experimental support for lambda instrumentation {pull}1193[#1193] +* Add experimental support for AWS lambda instrumentation {pull}1193[#1193] [float] ===== Bug fixes diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index b141f0a4f..979b6a7ec 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -54,7 +54,15 @@ class capture_serverless(object): functions. Begins and ends a single transaction, waiting for the transport to flush - before returning from the wrapped function + before returning from the wrapped function. + + Example usage: + + from elasticapm import capture_serverless + + @capture_serverless() + def handler(event, context): + return {"statusCode": r.status_code, "body": "Success!"} """ def __init__(self, name=None, **kwargs): From 41e8af51709a92831a7670b4ccae77cad080d23f Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Fri, 15 Oct 2021 09:24:01 -0600 Subject: [PATCH 33/37] Remove unnecessary exclude for license check --- .pre-commit-config.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 3978efede..8fac11f5b 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -19,6 +19,6 @@ repos: name: License header check description: Checks the existance of license headers in all Python files entry: ./tests/scripts/license_headers_check.sh - exclude: "(elasticapm/utils/wrapt/.*|tests/utils/stacks/linenos.py|tests/utils/stacks/linenos2.py|tests/contrib/serverless/.*json)" + exclude: "(elasticapm/utils/wrapt/.*|tests/utils/stacks/linenos.py|tests/utils/stacks/linenos2.py)" language: script types: [python] From ff2860cc4289ac886f051e8d1fb8778a4ad3ee04 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Fri, 15 Oct 2021 09:28:12 -0600 Subject: [PATCH 34/37] Use `server_version` instead of adding `server_version_override` --- elasticapm/conf/__init__.py | 3 --- elasticapm/contrib/serverless/aws.py | 2 +- elasticapm/transport/http.py | 5 +---- 3 files changed, 2 insertions(+), 8 deletions(-) diff --git a/elasticapm/conf/__init__.py b/elasticapm/conf/__init__.py index acc641571..2a35d4d60 100644 --- a/elasticapm/conf/__init__.py +++ b/elasticapm/conf/__init__.py @@ -516,9 +516,6 @@ class Config(_ConfigBase): debug = _BoolConfigValue("DEBUG", default=False) server_url = _ConfigValue("SERVER_URL", default="http://localhost:8200", required=True) server_cert = _ConfigValue("SERVER_CERT", validators=[FileIsReadableValidator()]) - server_version_override = _ConfigValue( - "SERVER_VERSION_OVERRIDE", validators=[RegexValidator(r"^[0-9]+\.[0-9]+\.[0-9]+$")] - ) verify_server_cert = _BoolConfigValue("VERIFY_SERVER_CERT", default=True) use_certifi = _BoolConfigValue("USE_CERTIFI", default=True) include_paths = _ListConfigValue("INCLUDE_PATHS") diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 979b6a7ec..0b3972ef3 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -77,7 +77,7 @@ def __init__(self, name=None, **kwargs): kwargs["cloud_provider"] = "none" kwargs["framework_name"] = "AWS Lambda" # TODO this can probably be removed once the extension proxies the serverinfo endpoint - kwargs["server_version_override"] = "8.0.0" + kwargs["server_version"] = (8, 0, 0) if "service_name" not in kwargs: kwargs["service_name"] = os.environ["AWS_LAMBDA_FUNCTION_NAME"] diff --git a/elasticapm/transport/http.py b/elasticapm/transport/http.py index e010b92fb..f38436b40 100644 --- a/elasticapm/transport/http.py +++ b/elasticapm/transport/http.py @@ -170,10 +170,7 @@ def get_config(self, current_version=None, keys=None): def _process_queue(self): if not self.client.server_version: - if self.client.config.server_version_override: - self.client.server_version = version_string_to_tuple(self.client.config.server_version_override) - else: - self.fetch_server_info() + self.fetch_server_info() super()._process_queue() def fetch_server_info(self): From ee242afb68cf878852d83dbc13bfb2e6c398614a Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Fri, 15 Oct 2021 16:26:31 -0600 Subject: [PATCH 35/37] Ignore case when looking for "statusCode" --- elasticapm/contrib/serverless/aws.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 0b3972ef3..2963d8172 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -165,8 +165,16 @@ def __exit__(self, exc_type, exc_val, exc_tb): lambda: get_data_from_response(self.response, capture_headers=self.client.config.capture_headers), "response", ) - if "statusCode" in self.response: - result = "HTTP {}xx".format(int(self.response["statusCode"]) // 100) + status_code = None + try: + for k, v in self.response.items(): + if k.lower() == "statuscode": + status_code = v + break + except AttributeError: + pass + if status_code: + result = "HTTP {}xx".format(int(status_code) // 100) elasticapm.set_transaction_result(result, override=False) self.client.end_transaction() From 570ea9d9a40546d762746cfdf9a09c87e662be36 Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Mon, 18 Oct 2021 08:52:58 -0600 Subject: [PATCH 36/37] Add note that lambda is experimental --- elasticapm/contrib/serverless/aws.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py index 2963d8172..2c6a822b8 100644 --- a/elasticapm/contrib/serverless/aws.py +++ b/elasticapm/contrib/serverless/aws.py @@ -63,6 +63,9 @@ class capture_serverless(object): @capture_serverless() def handler(event, context): return {"statusCode": r.status_code, "body": "Success!"} + + Note: This is an experimental feature, and we may introduce breaking + changes in the future. """ def __init__(self, name=None, **kwargs): From 744a39ebfaa49634b65d1dea5fee4a374b8e62ae Mon Sep 17 00:00:00 2001 From: Colton Myers Date: Mon, 18 Oct 2021 08:56:58 -0600 Subject: [PATCH 37/37] Add a bunch of missing CHANGELOGs for upcoming release --- CHANGELOG.asciidoc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 976ec658c..24bb397f2 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -38,12 +38,16 @@ endif::[] ===== Features * Add experimental support for AWS lambda instrumentation {pull}1193[#1193] +* Add support for span compression {pull}1321[#1321] +* Auto-infer destination resources for easier instrumentation of new resources {pull}1359[#1359] +* Add support for dropped span statistics {pull}1327[#1327] [float] ===== Bug fixes * Ensure that Prometheus histograms are encoded correctly for APM Server {pull}1354[#1354] * Remove problematic (and duplicate) `event.dataset` from logging integrations {pull}1365[#1365] +* Fix for memcache instrumentation when configured with a unix socket {pull}1357[#1357] [[release-notes-6.x]] === Python Agent version 6.x