diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 268f23f89..24bb397f2 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -34,15 +34,20 @@ endif::[] // Unreleased changes go here // When the next release happens, nest these changes under the "Python Agent version 6.x" heading -//[float] -//===== Features -// -// +[float] +===== Features + +* Add experimental support for AWS lambda instrumentation {pull}1193[#1193] +* Add support for span compression {pull}1321[#1321] +* Auto-infer destination resources for easier instrumentation of new resources {pull}1359[#1359] +* Add support for dropped span statistics {pull}1327[#1327] + [float] ===== Bug fixes * Ensure that Prometheus histograms are encoded correctly for APM Server {pull}1354[#1354] * Remove problematic (and duplicate) `event.dataset` from logging integrations {pull}1365[#1365] +* Fix for memcache instrumentation when configured with a unix socket {pull}1357[#1357] [[release-notes-6.x]] === Python Agent version 6.x diff --git a/elasticapm/__init__.py b/elasticapm/__init__.py index 42bbca423..d621a651b 100644 --- a/elasticapm/__init__.py +++ b/elasticapm/__init__.py @@ -31,6 +31,7 @@ from elasticapm.base import Client, get_client # noqa: F401 from elasticapm.conf import setup_logging # noqa: F401 +from elasticapm.contrib.serverless import capture_serverless # noqa: F401 from elasticapm.instrumentation.control import instrument, uninstrument # noqa: F401 from elasticapm.traces import ( # noqa: F401 capture_span, diff --git a/elasticapm/base.py b/elasticapm/base.py index d33ba6231..bd1978108 100644 --- a/elasticapm/base.py +++ b/elasticapm/base.py @@ -150,7 +150,7 @@ def __init__(self, config=None, **inline): constants.EVENTS_API_PATH, ) transport_class = import_string(self.config.transport_class) - self._transport = transport_class(self._api_endpoint_url, self, **transport_kwargs) + self._transport = transport_class(url=self._api_endpoint_url, client=self, **transport_kwargs) self.config.transport = self._transport self._thread_managers["transport"] = self._transport @@ -200,7 +200,8 @@ def __init__(self, config=None, **inline): self._metrics.register("elasticapm.metrics.sets.breakdown.BreakdownMetricSet") if self.config.prometheus_metrics: self._metrics.register("elasticapm.metrics.sets.prometheus.PrometheusMetrics") - self._thread_managers["metrics"] = self._metrics + if self.config.metrics_interval: + self._thread_managers["metrics"] = self._metrics compat.atexit_register(self.close) if self.config.central_config: self._thread_managers["config"] = self.config diff --git a/elasticapm/contrib/serverless/__init__.py b/elasticapm/contrib/serverless/__init__.py new file mode 100644 index 000000000..c64e5a5ee --- /dev/null +++ b/elasticapm/contrib/serverless/__init__.py @@ -0,0 +1,41 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import os + +# Future providers such as GCP and Azure will be added to this if/elif block +# This way you can use the same syntax for each of the providers from a user +# perspective +if os.environ.get("AWS_REGION"): + from elasticapm.contrib.serverless.aws import capture_serverless +else: + from elasticapm.contrib.serverless.aws import capture_serverless + +__all__ = ("capture_serverless",) diff --git a/elasticapm/contrib/serverless/aws.py b/elasticapm/contrib/serverless/aws.py new file mode 100644 index 000000000..2c6a822b8 --- /dev/null +++ b/elasticapm/contrib/serverless/aws.py @@ -0,0 +1,388 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import base64 +import datetime +import functools +import json +import os +import platform +import time + +import elasticapm +from elasticapm.base import Client, get_client +from elasticapm.conf import constants +from elasticapm.utils import compat, encoding, get_name_from_func +from elasticapm.utils.disttracing import TraceParent +from elasticapm.utils.logging import get_logger + +logger = get_logger("elasticapm.serverless") + +COLD_START = True + + +class capture_serverless(object): + """ + Context manager and decorator designed for instrumenting serverless + functions. + + Begins and ends a single transaction, waiting for the transport to flush + before returning from the wrapped function. + + Example usage: + + from elasticapm import capture_serverless + + @capture_serverless() + def handler(event, context): + return {"statusCode": r.status_code, "body": "Success!"} + + Note: This is an experimental feature, and we may introduce breaking + changes in the future. + """ + + def __init__(self, name=None, **kwargs): + self.name = name + self.event = {} + self.context = {} + self.response = None + + # Disable all background threads except for transport + kwargs["metrics_interval"] = "0ms" + kwargs["central_config"] = False + kwargs["cloud_provider"] = "none" + kwargs["framework_name"] = "AWS Lambda" + # TODO this can probably be removed once the extension proxies the serverinfo endpoint + kwargs["server_version"] = (8, 0, 0) + if "service_name" not in kwargs: + kwargs["service_name"] = os.environ["AWS_LAMBDA_FUNCTION_NAME"] + + self.client = get_client() + if not self.client: + self.client = Client(**kwargs) + if not self.client.config.debug and self.client.config.instrument and self.client.config.enabled: + elasticapm.instrument() + + def __call__(self, func): + self.name = self.name or get_name_from_func(func) + + @functools.wraps(func) + def decorated(*args, **kwds): + if len(args) == 2: + # Saving these for request context later + self.event, self.context = args + else: + self.event, self.context = {}, {} + if not self.client.config.debug and self.client.config.instrument and self.client.config.enabled: + with self: + self.response = func(*args, **kwds) + return self.response + else: + return func(*args, **kwds) + + return decorated + + def __enter__(self): + """ + Transaction setup + """ + trace_parent = TraceParent.from_headers(self.event.get("headers", {})) + + global COLD_START + cold_start = COLD_START + COLD_START = False + + self.source = "other" + transaction_type = "request" + transaction_name = os.environ.get("AWS_LAMBDA_FUNCTION_NAME", self.name) + + if "httpMethod" in self.event: # API Gateway + self.source = "api" + if os.environ.get("AWS_LAMBDA_FUNCTION_NAME"): + transaction_name = "{} {}".format(self.event["httpMethod"], os.environ["AWS_LAMBDA_FUNCTION_NAME"]) + else: + transaction_name = self.name + elif "Records" in self.event and len(self.event["Records"]) == 1: + record = self.event["Records"][0] + if record.get("eventSource") == "aws:s3": # S3 + self.source = "s3" + transaction_name = "{} {}".format(record["eventName"], record["s3"]["bucket"]["name"]) + elif record.get("EventSource") == "aws:sns": # SNS + self.source = "sns" + transaction_type = "messaging" + transaction_name = "RECEIVE {}".format(record["Sns"]["TopicArn"].split(":")[5]) + elif record.get("eventSource") == "aws:sqs": # SQS + self.source = "sqs" + transaction_type = "messaging" + transaction_name = "RECEIVE {}".format(record["eventSourceARN"].split(":")[5]) + + self.transaction = self.client.begin_transaction(transaction_type, trace_parent=trace_parent) + elasticapm.set_transaction_name(transaction_name, override=False) + if self.source == "api": + elasticapm.set_context( + lambda: get_data_from_request( + self.event, + capture_body=self.client.config.capture_body in ("transactions", "all"), + capture_headers=self.client.config.capture_headers, + ), + "request", + ) + self.set_metadata_and_context(cold_start) + + def __exit__(self, exc_type, exc_val, exc_tb): + """ + Transaction teardown + """ + if exc_val: + self.client.capture_exception(exc_info=(exc_type, exc_val, exc_tb), handled=False) + + if self.response and isinstance(self.response, dict): + elasticapm.set_context( + lambda: get_data_from_response(self.response, capture_headers=self.client.config.capture_headers), + "response", + ) + status_code = None + try: + for k, v in self.response.items(): + if k.lower() == "statuscode": + status_code = v + break + except AttributeError: + pass + if status_code: + result = "HTTP {}xx".format(int(status_code) // 100) + elasticapm.set_transaction_result(result, override=False) + + self.client.end_transaction() + + try: + logger.debug("flushing elasticapm") + self.client._transport.flush() + logger.debug("done flushing elasticapm") + except ValueError: + logger.warning("flush timed out") + + def set_metadata_and_context(self, coldstart): + """ + Process the metadata and context fields for this request + """ + metadata = {} + cloud_context = {"origin": {"provider": "aws"}} + service_context = {} + message_context = {} + + faas = {} + faas["coldstart"] = coldstart + faas["trigger"] = {"type": "other"} + faas["execution"] = self.context.aws_request_id + + if self.source == "api": + faas["trigger"]["type"] = "http" + faas["trigger"]["request_id"] = self.event["requestContext"]["requestId"] + service_context["origin"] = { + "name": "{} {}/{}".format( + self.event["requestContext"]["httpMethod"], + self.event["requestContext"]["resourcePath"], + self.event["requestContext"]["stage"], + ) + } + service_context["origin"]["id"] = self.event["requestContext"]["apiId"] + service_context["origin"]["version"] = "2.0" if self.event["headers"]["Via"].startswith("2.0") else "1.0" + cloud_context["origin"] = {} + cloud_context["origin"]["service"] = {"name": "api gateway"} + cloud_context["origin"]["account"] = {"id": self.event["requestContext"]["accountId"]} + cloud_context["origin"]["provider"] = "aws" + elif self.source == "sqs": + record = self.event["Records"][0] + faas["trigger"]["type"] = "pubsub" + faas["trigger"]["request_id"] = record["messageId"] + service_context["origin"] = {} + service_context["origin"]["name"] = record["eventSourceARN"].split(":")[5] + service_context["origin"]["id"] = record["eventSourceARN"] + cloud_context["origin"] = {} + cloud_context["origin"]["service"] = {"name": "sqs"} + cloud_context["origin"]["region"] = record["awsRegion"] + cloud_context["origin"]["account"] = {"id": record["eventSourceARN"].split(":")[4]} + cloud_context["origin"]["provider"] = "aws" + message_context["queue"] = record["eventSourceARN"] + if "SentTimestamp" in record["attributes"]: + message_context["age"] = int((time.time() * 1000) - int(record["attributes"]["SentTimestamp"])) + if self.client.config.capture_body in ("transactions", "all") and "body" in record: + message_context["body"] = record["body"] + if self.client.config.capture_headers and record["messageAttributes"]: + message_context["headers"] = record["messageAttributes"] + elif self.source == "sns": + record = self.event["Records"][0] + faas["trigger"]["type"] = "pubsub" + faas["trigger"]["request_id"] = record["Sns"]["TopicArn"] + service_context["origin"] = {} + service_context["origin"]["name"] = record["Sns"]["TopicArn"].split(":")[5] + service_context["origin"]["id"] = record["Sns"]["TopicArn"] + service_context["origin"]["version"] = record["EventVersion"] + service_context["origin"]["service"] = {"name": "sns"} + cloud_context["origin"] = {} + cloud_context["origin"]["region"] = record["Sns"]["TopicArn"].split(":")[3] + cloud_context["origin"]["account_id"] = record["Sns"]["TopicArn"].split(":")[4] + cloud_context["origin"]["provider"] = "aws" + message_context["queue"] = record["Sns"]["TopicArn"] + if "Timestamp" in record["Sns"]: + message_context["age"] = int( + ( + datetime.datetime.now() + - datetime.datetime.strptime(record["Sns"]["Timestamp"], r"%Y-%m-%dT%H:%M:%S.%fZ") + ).total_seconds() + * 1000 + ) + if self.client.config.capture_body in ("transactions", "all") and "Message" in record["Sns"]: + message_context["body"] = record["Sns"]["Message"] + if self.client.config.capture_headers and record["Sns"]["MessageAttributes"]: + message_context["headers"] = record["Sns"]["MessageAttributes"] + elif self.source == "s3": + record = self.event["Records"][0] + faas["trigger"]["type"] = "datasource" + faas["trigger"]["request_id"] = record["responseElements"]["x-amz-request-id"] + service_context["origin"] = {} + service_context["origin"]["name"] = record["s3"]["bucket"]["name"] + service_context["origin"]["id"] = record["s3"]["bucket"]["arn"] + service_context["origin"]["version"] = record["eventVersion"] + cloud_context["origin"] = {} + cloud_context["origin"]["service"] = {"name": "s3"} + cloud_context["origin"]["region"] = record["awsRegion"] + cloud_context["origin"]["provider"] = "aws" + + metadata["faas"] = faas + + metadata["service"] = {} + metadata["service"]["name"] = os.environ.get("AWS_LAMBDA_FUNCTION_NAME") + metadata["service"]["framework"] = {"name": "AWS Lambda"} + metadata["service"]["runtime"] = { + "name": os.environ.get("AWS_EXECUTION_ENV"), + "version": platform.python_version(), + } + arn = self.context.invoked_function_arn + if len(arn.split(":")) > 7: + arn = ":".join(arn.split(":")[:7]) + metadata["service"]["id"] = arn + metadata["service"]["version"] = os.environ.get("AWS_LAMBDA_FUNCTION_VERSION") + metadata["service"]["node"] = {"configured_name": os.environ.get("AWS_LAMBDA_LOG_STREAM_NAME")} + # This is the one piece of metadata that requires deep merging. We add it manually + # here to avoid having to deep merge in _transport.add_metadata() + if self.client._transport._metadata: + node_name = self.client._transport._metadata.get("service", {}).get("node", {}).get("name") + if node_name: + metadata["service"]["node"]["name"] = node_name + + metadata["cloud"] = {} + metadata["cloud"]["provider"] = "aws" + metadata["cloud"]["region"] = os.environ.get("AWS_REGION") + metadata["cloud"]["service"] = {"name": "lambda"} + metadata["cloud"]["account"] = {"id": arn.split(":")[4]} + + elasticapm.set_context(cloud_context, "cloud") + elasticapm.set_context(service_context, "service") + if message_context: + elasticapm.set_context(service_context, "message") + self.client._transport.add_metadata(metadata) + + +def get_data_from_request(event, capture_body=False, capture_headers=True): + """ + Capture context data from API gateway event + """ + result = {} + if capture_headers and "headers" in event: + result["headers"] = event["headers"] + if "httpMethod" not in event: + # Not API Gateway + return result + + result["method"] = event["httpMethod"] + if event["httpMethod"] in constants.HTTP_WITH_BODY and "body" in event: + body = event["body"] + if capture_body: + if event.get("isBase64Encoded"): + body = base64.b64decode(body) + else: + try: + jsonbody = json.loads(body) + body = jsonbody + except Exception: + pass + + if body is not None: + result["body"] = body if capture_body else "[REDACTED]" + + result["url"] = get_url_dict(event) + return result + + +def get_data_from_response(response, capture_headers=True): + """ + Capture response data from lambda return + """ + result = {} + + if "statusCode" in response: + result["status_code"] = response["statusCode"] + + if capture_headers and "headers" in response: + result["headers"] = response["headers"] + return result + + +def get_url_dict(event): + """ + Reconstruct URL from API Gateway + """ + headers = event.get("headers", {}) + proto = headers.get("X-Forwarded-Proto", "https") + host = headers.get("Host", "") + path = event.get("path", "") + port = headers.get("X-Forwarded-Port") + stage = "/" + event.get("requestContext", {}).get("stage", "") + query = "" + if event.get("queryStringParameters"): + query = "?" + for k, v in compat.iteritems(event["queryStringParameters"]): + query += "{}={}".format(k, v) + url = proto + "://" + host + stage + path + query + + url_dict = { + "full": encoding.keyword_field(url), + "protocol": proto, + "hostname": encoding.keyword_field(host), + "pathname": encoding.keyword_field(stage + path), + } + + if port: + url_dict["port"] = port + if query: + url_dict["search"] = encoding.keyword_field(query) + return url_dict diff --git a/elasticapm/transport/base.py b/elasticapm/transport/base.py index 6946a553a..8e96e46a3 100644 --- a/elasticapm/transport/base.py +++ b/elasticapm/transport/base.py @@ -142,6 +142,9 @@ def _process_queue(self): if data is not None: data = self._process_event(event_type, data) if data is not None: + if not buffer_written: + # Write metadata just in time to allow for late metadata changes (such as in lambda) + self._write_metadata(buffer) buffer.write((self._json_serializer({event_type: data}) + "\n").encode("utf-8")) buffer_written = True self._counts[event_type] += 1 @@ -201,9 +204,30 @@ def _process_event(self, event_type, data): def _init_buffer(self): buffer = gzip.GzipFile(fileobj=compat.BytesIO(), mode="w", compresslevel=self._compress_level) + return buffer + + def _write_metadata(self, buffer): data = (self._json_serializer({"metadata": self._metadata}) + "\n").encode("utf-8") buffer.write(data) - return buffer + + def add_metadata(self, data): + """ + Add additional metadata do the dictionary + + Only used in specific instances where metadata relies on data we only + have at request time, such as for lambda metadata + + Metadata is only merged one key deep. + """ + if self._metadata is not None: + # Merge one key deep + for key, val in data.items(): + if isinstance(val, dict) and key in self._metadata and isinstance(self._metadata[key], dict): + self._metadata[key].update(val) + else: + self._metadata[key] = val + else: + self._metadata = data def _init_event_queue(self, chill_until, max_chill_time): # some libraries like eventlet monkeypatch queue.Queue and switch out the implementation. diff --git a/tests/contrib/serverless/__init__.py b/tests/contrib/serverless/__init__.py new file mode 100644 index 000000000..7e2b340e6 --- /dev/null +++ b/tests/contrib/serverless/__init__.py @@ -0,0 +1,29 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/tests/contrib/serverless/aws_api_test_data.json b/tests/contrib/serverless/aws_api_test_data.json new file mode 100644 index 000000000..f66f82491 --- /dev/null +++ b/tests/contrib/serverless/aws_api_test_data.json @@ -0,0 +1,117 @@ +{ + "resource": "/fetch_all", + "path": "/fetch_all", + "httpMethod": "GET", + "headers": { + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8", + "Accept-Encoding": "gzip, deflate, br", + "Accept-Language": "en-US,en;q=0.5", + "CloudFront-Forwarded-Proto": "https", + "CloudFront-Is-Desktop-Viewer": "true", + "CloudFront-Is-Mobile-Viewer": "false", + "CloudFront-Is-SmartTV-Viewer": "false", + "CloudFront-Is-Tablet-Viewer": "false", + "CloudFront-Viewer-Country": "US", + "Host": "02plqthge2.execute-api.us-east-1.amazonaws.com", + "upgrade-insecure-requests": "1", + "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:72.0) Gecko/20100101 Firefox/72.0", + "Via": "2.0 969f35f01b6eddd92239a3e818fc1e0d.cloudfront.net (CloudFront)", + "X-Amz-Cf-Id": "eDbpfDwO-CRYymEFLkW6CBCsU_H_PS8R93_us53QWvXWLS45v3NvQw==", + "X-Amzn-Trace-Id": "Root=1-5e502af4-fd0c1c6fdc164e1d6361183b", + "X-Forwarded-For": "76.76.241.57, 52.46.47.139", + "X-Forwarded-Port": "443", + "X-Forwarded-Proto": "https" + }, + "multiValueHeaders": { + "Accept": [ + "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8" + ], + "Accept-Encoding": [ + "gzip, deflate, br" + ], + "Accept-Language": [ + "en-US,en;q=0.5" + ], + "CloudFront-Forwarded-Proto": [ + "https" + ], + "CloudFront-Is-Desktop-Viewer": [ + "true" + ], + "CloudFront-Is-Mobile-Viewer": [ + "false" + ], + "CloudFront-Is-SmartTV-Viewer": [ + "false" + ], + "CloudFront-Is-Tablet-Viewer": [ + "false" + ], + "CloudFront-Viewer-Country": [ + "US" + ], + "Host": [ + "02plqthge2.execute-api.us-east-1.amazonaws.com" + ], + "upgrade-insecure-requests": [ + "1" + ], + "User-Agent": [ + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:72.0) Gecko/20100101 Firefox/72.0" + ], + "Via": [ + "2.0 969f35f01b6eddd92239a3e818fc1e0d.cloudfront.net (CloudFront)" + ], + "X-Amz-Cf-Id": [ + "eDbpfDwO-CRYymEFLkW6CBCsU_H_PS8R93_us53QWvXWLS45v3NvQw==" + ], + "X-Amzn-Trace-Id": [ + "Root=1-5e502af4-fd0c1c6fdc164e1d6361183b" + ], + "X-Forwarded-For": [ + "76.76.241.57, 52.46.47.139" + ], + "X-Forwarded-Port": [ + "443" + ], + "X-Forwarded-Proto": [ + "https" + ] + }, + "queryStringParameters": null, + "multiValueQueryStringParameters": null, + "pathParameters": null, + "stageVariables": null, + "requestContext": { + "resourceId": "y3tkf7", + "resourcePath": "/fetch_all", + "httpMethod": "GET", + "extendedRequestId": "IQumRELJIAMF6fQ=", + "requestTime": "21/Feb/2020:19:09:40 +0000", + "path": "/dev/fetch_all", + "accountId": "571481734049", + "protocol": "HTTP/1.1", + "stage": "dev", + "domainPrefix": "02plqthge2", + "requestTimeEpoch": 1582312180890, + "requestId": "6f3dffca-46f8-4c8b-800b-6bc1ea2554ec", + "identity": { + "cognitoIdentityPoolId": null, + "accountId": null, + "cognitoIdentityId": null, + "caller": null, + "sourceIp": "76.76.241.57", + "principalOrgId": null, + "accessKey": null, + "cognitoAuthenticationType": null, + "cognitoAuthenticationProvider": null, + "userArn": null, + "userAgent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:72.0) Gecko/20100101 Firefox/72.0", + "user": null + }, + "domainName": "02plqthge2.execute-api.us-east-1.amazonaws.com", + "apiId": "02plqthge2" + }, + "body": null, + "isBase64Encoded": false +} \ No newline at end of file diff --git a/tests/contrib/serverless/aws_s3_batch_test_data.json b/tests/contrib/serverless/aws_s3_batch_test_data.json new file mode 100644 index 000000000..35187388a --- /dev/null +++ b/tests/contrib/serverless/aws_s3_batch_test_data.json @@ -0,0 +1,72 @@ +{ + "Records": [ + { + "eventVersion": "2.1", + "eventSource": "aws:s3", + "awsRegion": "us-east-1", + "eventTime": "2021-07-22T17:00:56.160Z", + "eventName": "ObjectCreated:Put", + "userIdentity": { + "principalId": "A2RA6A8EUPNS7Q" + }, + "requestParameters": { + "sourceIPAddress": "73.3.110.73" + }, + "responseElements": { + "x-amz-request-id": "0FM18R15SDX52CT2", + "x-amz-id-2": "6XuWW0exU7l4TbWDJmZL3oJgm6g8zgKDzkWM7dcuUKJcvd5mWrBSoPKpPGgINRDiQqwDNQIlVbJ3iWmN/S/e17DkKxT8fuQT" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "2ef153aa-f76e-4c29-9d28-24a5eff172ec", + "bucket": { + "name": "basepitestbucket", + "ownerIdentity": { + "principalId": "A2RA6A8EUPNS7Q" + }, + "arn": "arn:aws:s3:::basepitestbucket" + }, + "object": { + "key": "requirements.txt", + "size": 106, + "eTag": "467344a84741a3a3e6d92a7acc24ba5e", + "sequencer": "0060F9A455E75DC318" + } + } + }, + { + "eventVersion": "2.1", + "eventSource": "aws:s3", + "awsRegion": "us-east-1", + "eventTime": "2021-07-22T17:00:56.160Z", + "eventName": "ObjectCreated:Put", + "userIdentity": { + "principalId": "A2RA6A8EUPNS7Q" + }, + "requestParameters": { + "sourceIPAddress": "73.3.110.73" + }, + "responseElements": { + "x-amz-request-id": "0FM18R15SDX52CT2", + "x-amz-id-2": "6XuWW0exU7l4TbWDJmZL3oJgm6g8zgKDzkWM7dcuUKJcvd5mWrBSoPKpPGgINRDiQqwDNQIlVbJ3iWmN/S/e17DkKxT8fuQT" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "2ef153aa-f76e-4c29-9d28-24a5eff172ec", + "bucket": { + "name": "basepitestbucket", + "ownerIdentity": { + "principalId": "A2RA6A8EUPNS7Q" + }, + "arn": "arn:aws:s3:::basepitestbucket" + }, + "object": { + "key": "requirements.txt", + "size": 106, + "eTag": "467344a84741a3a3e6d92a7acc24ba5e", + "sequencer": "0060F9A455E75DC318" + } + } + } + ] +} \ No newline at end of file diff --git a/tests/contrib/serverless/aws_s3_test_data.json b/tests/contrib/serverless/aws_s3_test_data.json new file mode 100644 index 000000000..35192cbd4 --- /dev/null +++ b/tests/contrib/serverless/aws_s3_test_data.json @@ -0,0 +1,38 @@ +{ + "Records": [ + { + "eventVersion": "2.1", + "eventSource": "aws:s3", + "awsRegion": "us-east-1", + "eventTime": "2021-07-22T17:00:56.160Z", + "eventName": "ObjectCreated:Put", + "userIdentity": { + "principalId": "A2RA6A8EUPNS7Q" + }, + "requestParameters": { + "sourceIPAddress": "73.3.110.73" + }, + "responseElements": { + "x-amz-request-id": "0FM18R15SDX52CT2", + "x-amz-id-2": "6XuWW0exU7l4TbWDJmZL3oJgm6g8zgKDzkWM7dcuUKJcvd5mWrBSoPKpPGgINRDiQqwDNQIlVbJ3iWmN/S/e17DkKxT8fuQT" + }, + "s3": { + "s3SchemaVersion": "1.0", + "configurationId": "2ef153aa-f76e-4c29-9d28-24a5eff172ec", + "bucket": { + "name": "basepitestbucket", + "ownerIdentity": { + "principalId": "A2RA6A8EUPNS7Q" + }, + "arn": "arn:aws:s3:::basepitestbucket" + }, + "object": { + "key": "requirements.txt", + "size": 106, + "eTag": "467344a84741a3a3e6d92a7acc24ba5e", + "sequencer": "0060F9A455E75DC318" + } + } + } + ] +} \ No newline at end of file diff --git a/tests/contrib/serverless/aws_sns_test_data.json b/tests/contrib/serverless/aws_sns_test_data.json new file mode 100644 index 000000000..162cd6edf --- /dev/null +++ b/tests/contrib/serverless/aws_sns_test_data.json @@ -0,0 +1,22 @@ +{ + "Records": [ + { + "EventSource": "aws:sns", + "EventVersion": "1.0", + "EventSubscriptionArn": "arn:aws:sns:us-east-1:268121251715:basepiwstesttopic:4763d52a-56d1-4dc5-99eb-ffb4315587af", + "Sns": { + "Type": "Notification", + "MessageId": "07b92f2d-7087-5d2a-b807-765ed74eed4d", + "TopicArn": "arn:aws:sns:us-east-1:268121251715:basepiwstesttopic", + "Subject": null, + "Message": "hello world", + "Timestamp": "2021-07-22T17:06:17.354Z", + "SignatureVersion": "1", + "Signature": "YBGjMCe1m0QQ0DIWq4gZLy3/0bEyXhLPZJzeo4JYMa2P9ercshfn9s+x9nqd6HSYfO3RG0ebCmzxddgO8UCmaddXbhhMRWYjsIDv3+OvUitG8+bFqvpH/rQVHdCEWla5l+NDcye6d2cl9zuYFliTIFUsBmFcqbiroyZbIIHOczUpxNKK9oQcAXU6RgIl6y30DBgxYmzdMm4FMXPpden84v0LwVOyfqVm2gmeMnlccEOB0TRMe8sLsv7OfWLA3GBl3b14MOUZfvUz4Btb15ssCq++QVHoTQWZnbJ5dA7P3ljMauQCagub0Zefx7uUmWAlczxe/5kREJt8rEfl+pN7Mg==", + "SigningCertUrl": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-010a507c1833636cd94bdb98bd93083a.pem", + "UnsubscribeUrl": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:268121251715:basepiwstesttopic:4763d52a-56d1-4dc5-99eb-ffb4315587af", + "MessageAttributes": {} + } + } + ] +} \ No newline at end of file diff --git a/tests/contrib/serverless/aws_sqs_test_data.json b/tests/contrib/serverless/aws_sqs_test_data.json new file mode 100644 index 000000000..cd92f89fc --- /dev/null +++ b/tests/contrib/serverless/aws_sqs_test_data.json @@ -0,0 +1,20 @@ +{ + "Records": [ + { + "messageId": "e56e2c06-2a44-40ca-94e6-77a7ab9f76e6", + "receiptHandle": "AQEBhXF9S0Ezn+QPRzS76vRv7fTy/H5Tc6y4CnVkUizPZJeioFuEvfhbCFCFPKI0byYuzxuzpTfGsp8p7UrzQpZByrxjSpkzGSnQcomuxjL9kwqOLcsMYJWQfjFeaS6fMeHck8bnUUT7ed+3QqDDOxPtVnz4lFY06AGO+r+TcFpscvXGYvc5ONSJ+haQY9BLGcic44No1kEUK85ifJkIbrEbP2ARzWchxAkodNrlsFKJdjZo7hoQCkpXqp0QBt7aA4OZsFgFbkc2MQz7wJhx+dWRWYK1xLss/MP62Uuu1nXtHimIToBEmaBcjYIYtOypUyH4UABF1CoAsoGqg85831xBtn+L+20cTTZ/llvv234yPI5RM9ydR8AqMk5y0S28Xz6H", + "body": "hello world", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1626973700071", + "SenderId": "268121251715", + "ApproximateFirstReceiveTimestamp": "1626973700075" + }, + "messageAttributes": {}, + "md5OfBody": "5eb63bbbe01eeed093cb22bb8f5acdc3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:268121251715:testqueue", + "awsRegion": "us-east-1" + } + ] +} \ No newline at end of file diff --git a/tests/contrib/serverless/aws_tests.py b/tests/contrib/serverless/aws_tests.py new file mode 100644 index 000000000..c5a48d966 --- /dev/null +++ b/tests/contrib/serverless/aws_tests.py @@ -0,0 +1,220 @@ +# BSD 3-Clause License +# +# Copyright (c) 2019, Elasticsearch BV +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# * Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +import pytest # isort:skip + +import json +import os +import time + +from elasticapm import capture_span +from elasticapm.conf import constants +from elasticapm.contrib.serverless.aws import capture_serverless, get_data_from_request, get_data_from_response + + +@pytest.fixture +def event_api(): + aws_data_file = os.path.join(os.path.dirname(__file__), "aws_api_test_data.json") + with open(aws_data_file) as f: + return json.load(f) + + +@pytest.fixture +def event_s3(): + aws_data_file = os.path.join(os.path.dirname(__file__), "aws_s3_test_data.json") + with open(aws_data_file) as f: + return json.load(f) + + +@pytest.fixture +def event_s3_batch(): + aws_data_file = os.path.join(os.path.dirname(__file__), "aws_s3_batch_test_data.json") + with open(aws_data_file) as f: + return json.load(f) + + +@pytest.fixture +def event_sqs(): + aws_data_file = os.path.join(os.path.dirname(__file__), "aws_sqs_test_data.json") + with open(aws_data_file) as f: + return json.load(f) + + +@pytest.fixture +def event_sns(): + aws_data_file = os.path.join(os.path.dirname(__file__), "aws_sns_test_data.json") + with open(aws_data_file) as f: + return json.load(f) + + +@pytest.fixture +def context(): + return SampleContext() + + +class SampleContext: + """ + Stand-in for AWS lambda context object + """ + + def __init__(self): + self.invoked_function_arn = "arn:aws:lambda:us-west-2:123456789012:function:my-function:someAlias" + self.aws_request_id = "12345" + + +def test_request_data(event_api): + data = get_data_from_request(event_api, capture_body=True, capture_headers=True) + + assert data["method"] == "GET" + assert data["url"]["full"] == "https://02plqthge2.execute-api.us-east-1.amazonaws.com/dev/fetch_all" + assert data["headers"]["Host"] == "02plqthge2.execute-api.us-east-1.amazonaws.com" + + data = get_data_from_request(event_api, capture_body=False, capture_headers=False) + + assert data["method"] == "GET" + assert data["url"]["full"] == "https://02plqthge2.execute-api.us-east-1.amazonaws.com/dev/fetch_all" + assert "headers" not in data + + +def test_response_data(): + response = {"statusCode": 200, "headers": {"foo": "bar"}} + + data = get_data_from_response(response, capture_headers=True) + + assert data["status_code"] == 200 + assert data["headers"]["foo"] == "bar" + + data = get_data_from_response(response, capture_headers=False) + + assert data["status_code"] == 200 + assert "headers" not in data + + data = get_data_from_response({}, capture_headers=False) + + assert not data + + +def test_capture_serverless_api_gateway(event_api, context, elasticapm_client): + + os.environ["AWS_LAMBDA_FUNCTION_NAME"] = "test_func" + + @capture_serverless() + def test_func(event, context): + with capture_span("test_span"): + time.sleep(0.01) + return {"statusCode": 200, "headers": {"foo": "bar"}} + + test_func(event_api, context) + + assert len(elasticapm_client.events[constants.TRANSACTION]) == 1 + transaction = elasticapm_client.events[constants.TRANSACTION][0] + + assert transaction["name"] == "GET test_func" + assert transaction["result"] == "HTTP 2xx" + assert transaction["span_count"]["started"] == 1 + assert transaction["context"]["request"]["method"] == "GET" + assert transaction["context"]["request"]["headers"] + assert transaction["context"]["response"]["status_code"] == 200 + + +def test_capture_serverless_s3(event_s3, context, elasticapm_client): + + os.environ["AWS_LAMBDA_FUNCTION_NAME"] = "test_func" + + @capture_serverless() + def test_func(event, context): + with capture_span("test_span"): + time.sleep(0.01) + return + + test_func(event_s3, context) + + assert len(elasticapm_client.events[constants.TRANSACTION]) == 1 + transaction = elasticapm_client.events[constants.TRANSACTION][0] + + assert transaction["name"] == "ObjectCreated:Put basepitestbucket" + assert transaction["span_count"]["started"] == 1 + + +def test_capture_serverless_sns(event_sns, context, elasticapm_client): + + os.environ["AWS_LAMBDA_FUNCTION_NAME"] = "test_func" + + @capture_serverless() + def test_func(event, context): + with capture_span("test_span"): + time.sleep(0.01) + return + + test_func(event_sns, context) + + assert len(elasticapm_client.events[constants.TRANSACTION]) == 1 + transaction = elasticapm_client.events[constants.TRANSACTION][0] + + assert transaction["name"] == "RECEIVE basepiwstesttopic" + assert transaction["span_count"]["started"] == 1 + + +def test_capture_serverless_sqs(event_sqs, context, elasticapm_client): + + os.environ["AWS_LAMBDA_FUNCTION_NAME"] = "test_func" + + @capture_serverless() + def test_func(event, context): + with capture_span("test_span"): + time.sleep(0.01) + return + + test_func(event_sqs, context) + + assert len(elasticapm_client.events[constants.TRANSACTION]) == 1 + transaction = elasticapm_client.events[constants.TRANSACTION][0] + + assert transaction["name"] == "RECEIVE testqueue" + assert transaction["span_count"]["started"] == 1 + + +def test_capture_serverless_s3_batch(event_s3_batch, context, elasticapm_client): + + os.environ["AWS_LAMBDA_FUNCTION_NAME"] = "test_func" + + @capture_serverless() + def test_func(event, context): + with capture_span("test_span"): + time.sleep(0.01) + return + + test_func(event_s3_batch, context) + + assert len(elasticapm_client.events[constants.TRANSACTION]) == 1 + transaction = elasticapm_client.events[constants.TRANSACTION][0] + + assert transaction["name"] == "test_func" + assert transaction["span_count"]["started"] == 1