diff --git a/airflow/providers/amazon/aws/example_dags/example_lambda.py b/airflow/providers/amazon/aws/example_dags/example_lambda.py new file mode 100644 index 0000000000000..17ba0dab48eb9 --- /dev/null +++ b/airflow/providers/amazon/aws/example_dags/example_lambda.py @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +from datetime import datetime, timedelta +from os import getenv + +from airflow import DAG +from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator + +# [START howto_operator_lambda_env_variables] +LAMBDA_FUNCTION_NAME = getenv("LAMBDA_FUNCTION_NAME", "test-function") +# [END howto_operator_lambda_env_variables] + +SAMPLE_EVENT = json.dumps({"SampleEvent": {"SampleData": {"Name": "XYZ", "DoB": "1993-01-01"}}}) + +with DAG( + dag_id='example_lambda', + schedule_interval=None, + start_date=datetime(2021, 1, 1), + dagrun_timeout=timedelta(minutes=60), + tags=['example'], + catchup=False, +) as dag: + # [START howto_lambda_operator] + invoke_lambda_function = AwsLambdaInvokeFunctionOperator( + task_id='setup__invoke_lambda_function', + function_name=LAMBDA_FUNCTION_NAME, + payload=SAMPLE_EVENT, + ) + # [END howto_lambda_operator] diff --git a/airflow/providers/amazon/aws/hooks/lambda_function.py b/airflow/providers/amazon/aws/hooks/lambda_function.py index 56ea7ba81a25f..0f04af7e3b57b 100644 --- a/airflow/providers/amazon/aws/hooks/lambda_function.py +++ b/airflow/providers/amazon/aws/hooks/lambda_function.py @@ -18,6 +18,7 @@ """This module contains AWS Lambda hook""" import warnings +from typing import Any, List, Optional from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook @@ -40,30 +41,86 @@ class LambdaHook(AwsBaseHook): def __init__( self, - function_name: str, - log_type: str = 'None', - qualifier: str = '$LATEST', - invocation_type: str = 'RequestResponse', *args, **kwargs, ) -> None: - self.function_name = function_name - self.log_type = log_type - self.invocation_type = invocation_type - self.qualifier = qualifier kwargs["client_type"] = "lambda" super().__init__(*args, **kwargs) - def invoke_lambda(self, payload: str) -> str: - """Invoke Lambda Function""" - response = self.conn.invoke( - FunctionName=self.function_name, - InvocationType=self.invocation_type, - LogType=self.log_type, - Payload=payload, - Qualifier=self.qualifier, - ) + def invoke_lambda( + self, + *, + function_name: str, + invocation_type: Optional[str] = None, + log_type: Optional[str] = None, + client_context: Optional[str] = None, + payload: Optional[str] = None, + qualifier: Optional[str] = None, + ): + """Invoke Lambda Function. Refer to the boto3 documentation for more info.""" + invoke_args = { + "FunctionName": function_name, + "InvocationType": invocation_type, + "LogType": log_type, + "ClientContext": client_context, + "Payload": payload, + "Qualifier": qualifier, + } + response = self.conn.invoke(**{k: v for k, v in invoke_args.items() if v is not None}) + return response + def create_lambda( + self, + *, + function_name: str, + runtime: str, + role: str, + handler: str, + code: dict, + description: Optional[str] = None, + timeout: Optional[int] = None, + memory_size: Optional[int] = None, + publish: Optional[bool] = None, + vpc_config: Optional[Any] = None, + package_type: Optional[str] = None, + dead_letter_config: Optional[Any] = None, + environment: Optional[Any] = None, + kms_key_arn: Optional[str] = None, + tracing_config: Optional[Any] = None, + tags: Optional[Any] = None, + layers: Optional[list] = None, + file_system_configs: Optional[List[Any]] = None, + image_config: Optional[Any] = None, + code_signing_config_arn: Optional[str] = None, + architectures: Optional[List[str]] = None, + ) -> dict: + """Create a Lambda Function""" + create_function_args = { + "FunctionName": function_name, + "Runtime": runtime, + "Role": role, + "Handler": handler, + "Code": code, + "Description": description, + "Timeout": timeout, + "MemorySize": memory_size, + "Publish": publish, + "VpcConfig": vpc_config, + "PackageType": package_type, + "DeadLetterConfig": dead_letter_config, + "Environment": environment, + "KMSKeyArn": kms_key_arn, + "TracingConfig": tracing_config, + "Tags": tags, + "Layers": layers, + "FileSystemConfigs": file_system_configs, + "ImageConfig": image_config, + "CodeSigningConfigArn": code_signing_config_arn, + "Architectures": architectures, + } + response = self.conn.create_function( + **{k: v for k, v in create_function_args.items() if v is not None}, + ) return response diff --git a/airflow/providers/amazon/aws/operators/aws_lambda.py b/airflow/providers/amazon/aws/operators/aws_lambda.py new file mode 100644 index 0000000000000..c2d9d022fbcbc --- /dev/null +++ b/airflow/providers/amazon/aws/operators/aws_lambda.py @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +from typing import TYPE_CHECKING, Optional, Sequence + +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class AwsLambdaInvokeFunctionOperator(BaseOperator): + """ + Invokes an AWS Lambda function. + You can invoke a function synchronously (and wait for the response), + or asynchronously. + To invoke a function asynchronously, + set `invocation_type` to `Event`. For more details, + review the boto3 Lambda invoke docs. + + :param function_name: The name of the AWS Lambda function, version, or alias. + :param payload: The JSON string that you want to provide to your Lambda function as input. + :param log_type: Set to Tail to include the execution log in the response. Otherwise, set to "None". + :param qualifier: Specify a version or alias to invoke a published version of the function. + :param aws_conn_id: The AWS connection ID to use + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AwsLambdaInvokeFunctionOperator` + + """ + + template_fields: Sequence[str] = ('function_name', 'payload', 'qualifier', 'invocation_type') + ui_color = '#ff7300' + + def __init__( + self, + *, + function_name: str, + log_type: Optional[str] = None, + qualifier: Optional[str] = None, + invocation_type: Optional[str] = None, + client_context: Optional[str] = None, + payload: Optional[str] = None, + aws_conn_id: str = 'aws_default', + **kwargs, + ): + super().__init__(**kwargs) + self.function_name = function_name + self.payload = payload + self.log_type = log_type + self.qualifier = qualifier + self.invocation_type = invocation_type + self.client_context = client_context + self.aws_conn_id = aws_conn_id + + def execute(self, context: 'Context'): + """ + Invokes the target AWS Lambda function from Airflow. + + :return: The response payload from the function, or an error object. + """ + hook = LambdaHook(aws_conn_id=self.aws_conn_id) + success_status_codes = [200, 202, 204] + self.log.info("Invoking AWS Lambda function: %s with payload: %s", self.function_name, self.payload) + response = hook.invoke_lambda( + function_name=self.function_name, + invocation_type=self.invocation_type, + log_type=self.log_type, + client_context=self.client_context, + payload=self.payload, + qualifier=self.qualifier, + ) + self.log.info("Lambda response metadata: %r", response.get("ResponseMetadata")) + if response.get("StatusCode") not in success_status_codes: + raise ValueError('Lambda function did not execute', json.dumps(response.get("ResponseMetadata"))) + payload_stream = response.get("Payload") + payload = payload_stream.read().decode() + if "FunctionError" in response: + raise ValueError( + 'Lambda function execution resulted in error', + {"ResponseMetadata": response.get("ResponseMetadata"), "Payload": payload}, + ) + self.log.info('Lambda function invocation succeeded: %r', response.get("ResponseMetadata")) + return payload diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 3b99c6457efd7..b61afcf95ca5a 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -167,6 +167,8 @@ integrations: - integration-name: AWS Lambda external-doc-url: https://aws.amazon.com/lambda/ logo: /integration-logos/aws/AWS-Lambda_light-bg@4x.png + how-to-guide: + - /docs/apache-airflow-providers-amazon/operators/lambda.rst tags: [aws] - integration-name: AWS Step Functions external-doc-url: https://aws.amazon.com/step-functions/ @@ -229,6 +231,9 @@ operators: python-modules: - airflow.providers.amazon.aws.operators.glue - airflow.providers.amazon.aws.operators.glue_crawler + - integration-name: AWS Lambda + python-modules: + - airflow.providers.amazon.aws.operators.aws_lambda - integration-name: Amazon Simple Storage Service (S3) python-modules: - airflow.providers.amazon.aws.operators.s3_bucket diff --git a/docs/apache-airflow-providers-amazon/operators/lambda.rst b/docs/apache-airflow-providers-amazon/operators/lambda.rst new file mode 100644 index 0000000000000..79374eb506c34 --- /dev/null +++ b/docs/apache-airflow-providers-amazon/operators/lambda.rst @@ -0,0 +1,58 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +AWS Lambda Operators +================================================== + +`AWS Lambda `__ is a +serverless, event-driven compute service that lets you +run code for virtually any type of application +or backend service without provisioning or managing servers. +You can trigger Lambda from over 200 AWS services and software as a service (SaaS) applications, +and only pay for what you use. + +Airflow provides an operator to invoke an AWS Lambda function. + +Prerequisite Tasks +^^^^^^^^^^^^^^^^^^ + +.. include::/operators/_partials/prerequisite_tasks.rst + + +.. _howto/operator:AwsLambdaInvokeFunctionOperator: + +Invoke an existing AWS Lambda function with a payload +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +To publish a message to an Amazon SNS Topic you can use +:class:`~airflow.providers.amazon.aws.operators.aws_lambda.AwsLambdaInvokeFunctionOperator`. + + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_lambda.py + :language: python + :dedent: 4 + :start-after: [START howto_lambda_operator] + :end-before: [END howto_lambda_operator] + + +Reference +^^^^^^^^^ + +For further information, look at: + +* `Boto3 Library Documentation for Lambda `__ diff --git a/tests/providers/amazon/aws/hooks/test_lambda_function.py b/tests/providers/amazon/aws/hooks/test_lambda_function.py index 91c5fe2b2e110..3fdc7d07a9163 100644 --- a/tests/providers/amazon/aws/hooks/test_lambda_function.py +++ b/tests/providers/amazon/aws/hooks/test_lambda_function.py @@ -29,17 +29,17 @@ class TestLambdaHook: @mock_lambda def test_get_conn_returns_a_boto3_connection(self): - hook = LambdaHook(aws_conn_id='aws_default', function_name="test_function", region_name="us-east-1") + hook = LambdaHook(aws_conn_id='aws_default') assert hook.conn is not None @mock_lambda def test_invoke_lambda_function(self): - hook = LambdaHook(aws_conn_id='aws_default', function_name="test_function", region_name="us-east-1") + hook = LambdaHook(aws_conn_id='aws_default') with patch.object(hook.conn, 'invoke') as mock_invoke: payload = '{"hello": "airflow"}' - hook.invoke_lambda(payload=payload) + hook.invoke_lambda(function_name="test_function", payload=payload) mock_invoke.asset_called_once_with( FunctionName="test_function", diff --git a/tests/providers/amazon/aws/operators/test_lambda.py b/tests/providers/amazon/aws/operators/test_lambda.py new file mode 100644 index 0000000000000..23e9e86fedf8b --- /dev/null +++ b/tests/providers/amazon/aws/operators/test_lambda.py @@ -0,0 +1,109 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import io +import json +import unittest +import zipfile + +from moto import mock_iam, mock_lambda, mock_sts + +from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook +from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook +from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator + + +@mock_lambda +@mock_sts +@mock_iam +class TestAwsLambdaInvokeFunctionOperator(unittest.TestCase): + def test_init(self): + lambda_operator = AwsLambdaInvokeFunctionOperator( + task_id="test", + function_name="test", + payload=json.dumps({"TestInput": "Testdata"}), + log_type="None", + aws_conn_id="aws_conn_test", + ) + assert lambda_operator.task_id == "test" + assert lambda_operator.function_name == "test" + assert lambda_operator.payload == json.dumps({"TestInput": "Testdata"}) + assert lambda_operator.log_type == "None" + assert lambda_operator.aws_conn_id == "aws_conn_test" + + @staticmethod + def create_zip(body): + code = body + zip_output = io.BytesIO() + with zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED) as zip_file: + zip_file.writestr("lambda_function.py", code) + zip_output.seek(0) + return zip_output.read() + + @staticmethod + def create_iam_role(role_name: str): + iam = AwsBaseHook("aws_conn_test", client_type="iam") + resp = iam.conn.create_role( + RoleName=role_name, + AssumeRolePolicyDocument=json.dumps( + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"Service": "lambda.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], + } + ), + Description="IAM role for Lambda execution.", + ) + return resp["Role"]["Arn"] + + @staticmethod + def create_lambda_function(function_name: str): + code = """def handler(event, context): + return event + """ + role_name = "LambdaRole" + role_arn = TestAwsLambdaInvokeFunctionOperator.create_iam_role(role_name) + zipped_code = TestAwsLambdaInvokeFunctionOperator.create_zip(code) + lambda_client = LambdaHook(aws_conn_id="aws_conn_test") + resp = lambda_client.create_lambda( + function_name=function_name, + runtime="python3.7", + role=role_arn, + code={ + "ZipFile": zipped_code, + }, + handler="lambda_function.handler", + ) + return resp + + def test_invoke_lambda(self): + self.create_lambda_function('test') + test_event_input = {"TestInput": "Testdata"} + lambda_invoke_function = AwsLambdaInvokeFunctionOperator( + task_id="task_test", + function_name="test", + log_type='None', + payload=json.dumps(test_event_input), + ) + value = lambda_invoke_function.execute(None) + assert json.dumps(json.loads(value)) == json.dumps(test_event_input)