From ac4f3e76a95042dece0561a066ef265e96057d62 Mon Sep 17 00:00:00 2001 From: Chirag S Date: Sat, 19 Feb 2022 17:18:26 +0000 Subject: [PATCH 1/7] add invoke lambda function operator --- .../amazon/aws/hooks/lambda_function.py | 29 +++-- .../amazon/aws/operators/aws_lambda.py | 121 ++++++++++++++++++ airflow/providers/amazon/provider.yaml | 3 + .../amazon/aws/hooks/test_lambda_function.py | 6 +- .../amazon/aws/operators/test_lambda.py | 107 ++++++++++++++++ 5 files changed, 249 insertions(+), 17 deletions(-) create mode 100644 airflow/providers/amazon/aws/operators/aws_lambda.py create mode 100644 tests/providers/amazon/aws/operators/test_lambda.py diff --git a/airflow/providers/amazon/aws/hooks/lambda_function.py b/airflow/providers/amazon/aws/hooks/lambda_function.py index 56ea7ba81a25f..521d71db475dc 100644 --- a/airflow/providers/amazon/aws/hooks/lambda_function.py +++ b/airflow/providers/amazon/aws/hooks/lambda_function.py @@ -40,30 +40,31 @@ class LambdaHook(AwsBaseHook): def __init__( self, - function_name: str, - log_type: str = 'None', - qualifier: str = '$LATEST', - invocation_type: str = 'RequestResponse', *args, **kwargs, ) -> None: - self.function_name = function_name - self.log_type = log_type - self.invocation_type = invocation_type - self.qualifier = qualifier kwargs["client_type"] = "lambda" super().__init__(*args, **kwargs) - def invoke_lambda(self, payload: str) -> str: + def invoke_lambda(self, function_name: str, **kwargs): """Invoke Lambda Function""" response = self.conn.invoke( - FunctionName=self.function_name, - InvocationType=self.invocation_type, - LogType=self.log_type, - Payload=payload, - Qualifier=self.qualifier, + FunctionName=function_name, **{k: v for k, v in kwargs.items() if v is not None} ) + return response + def create_lambda( + self, function_name: str, runtime: str, role: str, handler: str, code: dict, **kwargs + ) -> dict: + """Create a Lambda Function""" + response = self.conn.create_function( + FunctionName=function_name, + Runtime=runtime, + Role=role, + Handler=handler, + Code=code, + **{k: v for k, v in kwargs.items() if v is not None}, + ) return response diff --git a/airflow/providers/amazon/aws/operators/aws_lambda.py b/airflow/providers/amazon/aws/operators/aws_lambda.py new file mode 100644 index 0000000000000..1ba404d7a41d7 --- /dev/null +++ b/airflow/providers/amazon/aws/operators/aws_lambda.py @@ -0,0 +1,121 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +from typing import TYPE_CHECKING, Optional, Sequence + +from airflow.models import BaseOperator +from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook + +if TYPE_CHECKING: + from airflow.utils.context import Context + + +class AwsLambdaInvokeFunctionOperator(BaseOperator): + """ + Invokes an AWS Lambda function. + You can invoke a function synchronously (and wait for the response), + or asynchronously. + To invoke a function asynchronously, + set InvocationType to `Event`. + + :param function_name: The name of the AWS Lambda function, version, or alias. + :param payload: The JSON string that you want to provide to your Lambda function as input. + :param log_type: Set to Tail to include the execution log in the response. Otherwise, set to "None". + :param qualifier: Specify a version or alias to invoke a published version of the function. + :param aws_conn_id: The AWS connection ID to use + """ + + template_fields: Sequence[str] = ('function_name', 'payload', 'qualifier', 'invocation_type') + ui_color = '#ff7300' + + def __init__( + self, + *, + function_name: str, + log_type: Optional[str] = None, + qualifier: Optional[str] = None, + invocation_type: Optional[str] = None, + client_context: Optional[str] = None, + payload: Optional[str] = None, + aws_conn_id='aws_default', + **kwargs, + ): + super().__init__(**kwargs) + self.FunctionName = function_name + self.Payload = payload + self.LogType = log_type + self.Qualifier = qualifier + self.InvocationType = invocation_type + self.ClientContext = client_context + self.aws_conn_id = aws_conn_id + + def execute(self, context: 'Context'): + """ + Invokes the target AWS Lambda function from Airflow. + + :return: The response payload from the function, or an error object. + """ + hook = LambdaHook(aws_conn_id=self.aws_conn_id) + success_status_codes = [200, 202, 204] + self.log.info("Invoking AWS Lambda function: %s with payload: %s", self.FunctionName, self.Payload) + try: + response = hook.invoke_lambda( + function_name=self.FunctionName, + **{ + key: self.__dict__[key] + for key in self.__dict__ + if ( + key + in [ + "Payload", + "LogType", + "Qualifier", + "InvocationType", + "ClientContext", + ] + and self.__dict__[key] is not None + ) + }, + ) + self.log.info("Lambda response metadata: %s", json.dumps(response.get("ResponseMetadata"))) + except Exception as e: + self.log.error(e) + raise e + if response.get("StatusCode") not in success_status_codes: + self.log.error( + 'Lambda function invocation failed: %s', json.dumps(response.get("ResponseMetadata")) + ) + raise ValueError('Lambda function did not execute', json.dumps(response.get("ResponseMetadata"))) + if "FunctionError" in response: + self.log.error( + 'Lambda function execution resulted in error: %s', + json.dumps(response.get("ResponseMetadata")), + ) + error_payload_stream = response.get("Payload") + error_payload = error_payload_stream.read().decode() + raise ValueError( + 'Lambda function execution resulted in error', + {"ResponseMetadata": response.get("ResponseMetadata"), "Payload": error_payload}, + ) + self.log.info( + 'Lambda function invocation succeeded: %s', json.dumps(response.get("ResponseMetadata")) + ) + payload_stream = response.get("Payload") + payload = payload_stream.read().decode() + return payload diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 3b99c6457efd7..3bcd9c2133bc8 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -229,6 +229,9 @@ operators: python-modules: - airflow.providers.amazon.aws.operators.glue - airflow.providers.amazon.aws.operators.glue_crawler + - integration-name: AWS Lambda + python-modules: + - airflow.providers.amazon.aws.operators.aws_lambda - integration-name: Amazon Simple Storage Service (S3) python-modules: - airflow.providers.amazon.aws.operators.s3_bucket diff --git a/tests/providers/amazon/aws/hooks/test_lambda_function.py b/tests/providers/amazon/aws/hooks/test_lambda_function.py index 91c5fe2b2e110..3fdc7d07a9163 100644 --- a/tests/providers/amazon/aws/hooks/test_lambda_function.py +++ b/tests/providers/amazon/aws/hooks/test_lambda_function.py @@ -29,17 +29,17 @@ class TestLambdaHook: @mock_lambda def test_get_conn_returns_a_boto3_connection(self): - hook = LambdaHook(aws_conn_id='aws_default', function_name="test_function", region_name="us-east-1") + hook = LambdaHook(aws_conn_id='aws_default') assert hook.conn is not None @mock_lambda def test_invoke_lambda_function(self): - hook = LambdaHook(aws_conn_id='aws_default', function_name="test_function", region_name="us-east-1") + hook = LambdaHook(aws_conn_id='aws_default') with patch.object(hook.conn, 'invoke') as mock_invoke: payload = '{"hello": "airflow"}' - hook.invoke_lambda(payload=payload) + hook.invoke_lambda(function_name="test_function", payload=payload) mock_invoke.asset_called_once_with( FunctionName="test_function", diff --git a/tests/providers/amazon/aws/operators/test_lambda.py b/tests/providers/amazon/aws/operators/test_lambda.py new file mode 100644 index 0000000000000..3dfb3287d331e --- /dev/null +++ b/tests/providers/amazon/aws/operators/test_lambda.py @@ -0,0 +1,107 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import io +import json +import unittest +import zipfile + +from moto import mock_iam, mock_lambda, mock_sts + +from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook +from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook +from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator + + +@mock_lambda +@mock_sts +@mock_iam +class TestAwsLambdaInvokeFunctionOperator(unittest.TestCase): + def test_init(self): + lambda_operator = AwsLambdaInvokeFunctionOperator( + task_id="test", + function_name="test", + payload=json.dumps({"TestInput": "Testdata"}), + log_type="None", + aws_conn_id="aws_conn_test", + ) + assert lambda_operator.task_id == "test" + assert lambda_operator.FunctionName == "test" + assert lambda_operator.Payload == json.dumps({"TestInput": "Testdata"}) + assert lambda_operator.LogType == "None" + assert lambda_operator.aws_conn_id == "aws_conn_test" + + def create_zip(self, body): + code = body + zip_output = io.BytesIO() + zip_file = zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED) + zip_file.writestr("lambda_function.py", code) + zip_file.close() + zip_output.seek(0) + return zip_output.read() + + def create_iam_role(self, role_name: str): + iam = AwsBaseHook("aws_conn_test", client_type="iam") + resp = iam.conn.create_role( + RoleName=role_name, + AssumeRolePolicyDocument=json.dumps( + { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"Service": "lambda.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], + } + ), + Description="IAM role for Lambda execution.", + ) + return resp["Role"]["Arn"] + + def create_lambda_function(self, function_name: str): + code = """def handler(event, context): + return event + """ + role_name = "LambdaRole" + role_arn = self.create_iam_role(role_name) + zipped_code = self.create_zip(code) + lambda_client = LambdaHook(aws_conn_id="aws_conn_test") + resp = lambda_client.create_lambda( + function_name=function_name, + runtime="python3.7", + role=role_arn, + code={ + "ZipFile": zipped_code, + }, + handler="lambda_function.handler", + ) + return resp + + def test_invoke_lambda(self): + self.create_lambda_function('test') + lambda_invoke_function = AwsLambdaInvokeFunctionOperator( + task_id="task_test", + function_name="test", + log_type='None', + payload=json.dumps({"TestInput": "Testdata"}), + ) + value = lambda_invoke_function.execute(None) + print(f"value: {value}") + assert json.dumps(json.loads(value)) == json.dumps({"TestInput": "Testdata"}) From 343e925eff33313c9aa2d8197c5f202b3d705e1b Mon Sep 17 00:00:00 2001 From: Chirag S Date: Sun, 20 Feb 2022 14:54:50 +0000 Subject: [PATCH 2/7] AWS Lambda Invoke function operator update operator and underlying hook code to explicitly specify invoke and create function arguments --- .../amazon/aws/hooks/lambda_function.py | 78 ++++++++++++++++--- .../amazon/aws/operators/aws_lambda.py | 52 ++++--------- .../amazon/aws/operators/test_lambda.py | 12 +-- 3 files changed, 89 insertions(+), 53 deletions(-) diff --git a/airflow/providers/amazon/aws/hooks/lambda_function.py b/airflow/providers/amazon/aws/hooks/lambda_function.py index 521d71db475dc..3c60f1cbf33a4 100644 --- a/airflow/providers/amazon/aws/hooks/lambda_function.py +++ b/airflow/providers/amazon/aws/hooks/lambda_function.py @@ -18,6 +18,7 @@ """This module contains AWS Lambda hook""" import warnings +from typing import Any, List, Optional from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook @@ -46,24 +47,77 @@ def __init__( kwargs["client_type"] = "lambda" super().__init__(*args, **kwargs) - def invoke_lambda(self, function_name: str, **kwargs): - """Invoke Lambda Function""" - response = self.conn.invoke( - FunctionName=function_name, **{k: v for k, v in kwargs.items() if v is not None} - ) + def invoke_lambda( + self, + function_name: str, + invocation_type: Optional[str] = None, + log_type: Optional[str] = None, + client_context: Optional[str] = None, + payload: Optional[str] = None, + qualifier: Optional[str] = None, + ): + """Invoke Lambda Function. Refer to the boto3 documentation for more info.""" + invoke_args = { + "FunctionName": function_name, + "InvocationType": invocation_type, + "LogType": log_type, + "ClientContext": client_context, + "Payload": payload, + "Qualifier": qualifier, + } + response = self.conn.invoke(**{k: v for k, v in invoke_args.items() if v is not None}) return response def create_lambda( - self, function_name: str, runtime: str, role: str, handler: str, code: dict, **kwargs + self, + function_name: str, + runtime: str, + role: str, + handler: str, + code: dict, + description: Optional[str] = None, + timeout: Optional[int] = None, + memory_size: Optional[int] = None, + publish: Optional[bool] = None, + vpc_config: Optional[Any] = None, + package_type: Optional[str] = None, + dead_letter_config: Optional[Any] = None, + environment: Optional[Any] = None, + kms_key_arn: Optional[str] = None, + tracing_config: Optional[Any] = None, + tags: Optional[Any] = None, + layers: Optional[list] = None, + file_system_configs: Optional[List[Any]] = None, + image_config: Optional[Any] = None, + code_signing_config_arn: Optional[str] = None, + architectures: Optional[List[str]] = None, ) -> dict: """Create a Lambda Function""" + create_function_args = { + "FunctionName": function_name, + "Runtime": runtime, + "Role": role, + "Handler": handler, + "Code": code, + "Description": description, + "Timeout": timeout, + "MemorySize": memory_size, + "Publish": publish, + "VpcConfig": vpc_config, + "PackageType": package_type, + "DeadLetterConfig": dead_letter_config, + "Environment": environment, + "KMSKeyArn": kms_key_arn, + "TracingConfig": tracing_config, + "Tags": tags, + "Layers": layers, + "FileSystemConfigs": file_system_configs, + "ImageConfig": image_config, + "CodeSigningConfigArn": code_signing_config_arn, + "Architectures": architectures, + } response = self.conn.create_function( - FunctionName=function_name, - Runtime=runtime, - Role=role, - Handler=handler, - Code=code, - **{k: v for k, v in kwargs.items() if v is not None}, + **{k: v for k, v in create_function_args.items() if v is not None}, ) return response diff --git a/airflow/providers/amazon/aws/operators/aws_lambda.py b/airflow/providers/amazon/aws/operators/aws_lambda.py index 1ba404d7a41d7..b175e98e454dd 100644 --- a/airflow/providers/amazon/aws/operators/aws_lambda.py +++ b/airflow/providers/amazon/aws/operators/aws_lambda.py @@ -32,7 +32,8 @@ class AwsLambdaInvokeFunctionOperator(BaseOperator): You can invoke a function synchronously (and wait for the response), or asynchronously. To invoke a function asynchronously, - set InvocationType to `Event`. + set `invocation_type` to `Event`. For more details, + review the boto3 Lambda invoke docs. :param function_name: The name of the AWS Lambda function, version, or alias. :param payload: The JSON string that you want to provide to your Lambda function as input. @@ -57,12 +58,12 @@ def __init__( **kwargs, ): super().__init__(**kwargs) - self.FunctionName = function_name - self.Payload = payload - self.LogType = log_type - self.Qualifier = qualifier - self.InvocationType = invocation_type - self.ClientContext = client_context + self.function_name = function_name + self.payload = payload + self.log_type = log_type + self.qualifier = qualifier + self.invocation_type = invocation_type + self.client_context = client_context self.aws_conn_id = aws_conn_id def execute(self, context: 'Context'): @@ -73,49 +74,30 @@ def execute(self, context: 'Context'): """ hook = LambdaHook(aws_conn_id=self.aws_conn_id) success_status_codes = [200, 202, 204] - self.log.info("Invoking AWS Lambda function: %s with payload: %s", self.FunctionName, self.Payload) + self.log.info("Invoking AWS Lambda function: %s with payload: %s", self.function_name, self.payload) try: response = hook.invoke_lambda( - function_name=self.FunctionName, - **{ - key: self.__dict__[key] - for key in self.__dict__ - if ( - key - in [ - "Payload", - "LogType", - "Qualifier", - "InvocationType", - "ClientContext", - ] - and self.__dict__[key] is not None - ) - }, + function_name=self.function_name, + invocation_type=self.invocation_type, + log_type=self.log_type, + client_context=self.client_context, + payload=self.payload, + qualifier=self.qualifier, ) - self.log.info("Lambda response metadata: %s", json.dumps(response.get("ResponseMetadata"))) + self.log.info("Lambda response metadata: %r", response.get("ResponseMetadata")) except Exception as e: self.log.error(e) raise e if response.get("StatusCode") not in success_status_codes: - self.log.error( - 'Lambda function invocation failed: %s', json.dumps(response.get("ResponseMetadata")) - ) raise ValueError('Lambda function did not execute', json.dumps(response.get("ResponseMetadata"))) if "FunctionError" in response: - self.log.error( - 'Lambda function execution resulted in error: %s', - json.dumps(response.get("ResponseMetadata")), - ) error_payload_stream = response.get("Payload") error_payload = error_payload_stream.read().decode() raise ValueError( 'Lambda function execution resulted in error', {"ResponseMetadata": response.get("ResponseMetadata"), "Payload": error_payload}, ) - self.log.info( - 'Lambda function invocation succeeded: %s', json.dumps(response.get("ResponseMetadata")) - ) + self.log.info('Lambda function invocation succeeded: %r', response.get("ResponseMetadata")) payload_stream = response.get("Payload") payload = payload_stream.read().decode() return payload diff --git a/tests/providers/amazon/aws/operators/test_lambda.py b/tests/providers/amazon/aws/operators/test_lambda.py index 3dfb3287d331e..ed5350672acb9 100644 --- a/tests/providers/amazon/aws/operators/test_lambda.py +++ b/tests/providers/amazon/aws/operators/test_lambda.py @@ -41,9 +41,9 @@ def test_init(self): aws_conn_id="aws_conn_test", ) assert lambda_operator.task_id == "test" - assert lambda_operator.FunctionName == "test" - assert lambda_operator.Payload == json.dumps({"TestInput": "Testdata"}) - assert lambda_operator.LogType == "None" + assert lambda_operator.function_name == "test" + assert lambda_operator.payload == json.dumps({"TestInput": "Testdata"}) + assert lambda_operator.log_type == "None" assert lambda_operator.aws_conn_id == "aws_conn_test" def create_zip(self, body): @@ -96,12 +96,12 @@ def create_lambda_function(self, function_name: str): def test_invoke_lambda(self): self.create_lambda_function('test') + test_event_input = {"TestInput": "Testdata"} lambda_invoke_function = AwsLambdaInvokeFunctionOperator( task_id="task_test", function_name="test", log_type='None', - payload=json.dumps({"TestInput": "Testdata"}), + payload=json.dumps(test_event_input), ) value = lambda_invoke_function.execute(None) - print(f"value: {value}") - assert json.dumps(json.loads(value)) == json.dumps({"TestInput": "Testdata"}) + assert json.dumps(json.loads(value)) == json.dumps(test_event_input) From 518a1bb5b5ab798f5c6cbdfa87b402abc0457cd6 Mon Sep 17 00:00:00 2001 From: Chirag S Date: Mon, 21 Feb 2022 04:08:36 +0000 Subject: [PATCH 3/7] AWS Lambda Invoke Function Operator modify test cases to use static method for lambda invoke operator remove redundant try-except block --- .../amazon/aws/operators/aws_lambda.py | 22 ++++++++----------- .../amazon/aws/operators/test_lambda.py | 18 ++++++++------- 2 files changed, 19 insertions(+), 21 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/aws_lambda.py b/airflow/providers/amazon/aws/operators/aws_lambda.py index b175e98e454dd..6aea9d5f8d10c 100644 --- a/airflow/providers/amazon/aws/operators/aws_lambda.py +++ b/airflow/providers/amazon/aws/operators/aws_lambda.py @@ -75,19 +75,15 @@ def execute(self, context: 'Context'): hook = LambdaHook(aws_conn_id=self.aws_conn_id) success_status_codes = [200, 202, 204] self.log.info("Invoking AWS Lambda function: %s with payload: %s", self.function_name, self.payload) - try: - response = hook.invoke_lambda( - function_name=self.function_name, - invocation_type=self.invocation_type, - log_type=self.log_type, - client_context=self.client_context, - payload=self.payload, - qualifier=self.qualifier, - ) - self.log.info("Lambda response metadata: %r", response.get("ResponseMetadata")) - except Exception as e: - self.log.error(e) - raise e + response = hook.invoke_lambda( + function_name=self.function_name, + invocation_type=self.invocation_type, + log_type=self.log_type, + client_context=self.client_context, + payload=self.payload, + qualifier=self.qualifier, + ) + self.log.info("Lambda response metadata: %r", response.get("ResponseMetadata")) if response.get("StatusCode") not in success_status_codes: raise ValueError('Lambda function did not execute', json.dumps(response.get("ResponseMetadata"))) if "FunctionError" in response: diff --git a/tests/providers/amazon/aws/operators/test_lambda.py b/tests/providers/amazon/aws/operators/test_lambda.py index ed5350672acb9..23e9e86fedf8b 100644 --- a/tests/providers/amazon/aws/operators/test_lambda.py +++ b/tests/providers/amazon/aws/operators/test_lambda.py @@ -46,16 +46,17 @@ def test_init(self): assert lambda_operator.log_type == "None" assert lambda_operator.aws_conn_id == "aws_conn_test" - def create_zip(self, body): + @staticmethod + def create_zip(body): code = body zip_output = io.BytesIO() - zip_file = zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED) - zip_file.writestr("lambda_function.py", code) - zip_file.close() + with zipfile.ZipFile(zip_output, "w", zipfile.ZIP_DEFLATED) as zip_file: + zip_file.writestr("lambda_function.py", code) zip_output.seek(0) return zip_output.read() - def create_iam_role(self, role_name: str): + @staticmethod + def create_iam_role(role_name: str): iam = AwsBaseHook("aws_conn_test", client_type="iam") resp = iam.conn.create_role( RoleName=role_name, @@ -75,13 +76,14 @@ def create_iam_role(self, role_name: str): ) return resp["Role"]["Arn"] - def create_lambda_function(self, function_name: str): + @staticmethod + def create_lambda_function(function_name: str): code = """def handler(event, context): return event """ role_name = "LambdaRole" - role_arn = self.create_iam_role(role_name) - zipped_code = self.create_zip(code) + role_arn = TestAwsLambdaInvokeFunctionOperator.create_iam_role(role_name) + zipped_code = TestAwsLambdaInvokeFunctionOperator.create_zip(code) lambda_client = LambdaHook(aws_conn_id="aws_conn_test") resp = lambda_client.create_lambda( function_name=function_name, From 63a21ef2d63e6d598d635ea3ae4ea650e3ca3e5a Mon Sep 17 00:00:00 2001 From: Chirag S Date: Mon, 21 Feb 2022 14:43:46 +0000 Subject: [PATCH 4/7] Update AWS Lambda Hook update AWS lambda hook functions to enforce keyword only args --- airflow/providers/amazon/aws/hooks/lambda_function.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/airflow/providers/amazon/aws/hooks/lambda_function.py b/airflow/providers/amazon/aws/hooks/lambda_function.py index 3c60f1cbf33a4..0f04af7e3b57b 100644 --- a/airflow/providers/amazon/aws/hooks/lambda_function.py +++ b/airflow/providers/amazon/aws/hooks/lambda_function.py @@ -49,6 +49,7 @@ def __init__( def invoke_lambda( self, + *, function_name: str, invocation_type: Optional[str] = None, log_type: Optional[str] = None, @@ -70,6 +71,7 @@ def invoke_lambda( def create_lambda( self, + *, function_name: str, runtime: str, role: str, From 2c46f17c0fb99ccf8d7bb82adc1f8808fff16912 Mon Sep 17 00:00:00 2001 From: Chirag S Date: Wed, 23 Feb 2022 03:46:26 +0000 Subject: [PATCH 5/7] add lambda function invoke operator --- .../amazon/aws/example_dags/example_lambda.py | 48 +++++++++++++++++ airflow/providers/amazon/provider.yaml | 2 + .../operators/lambda.rst | 54 +++++++++++++++++++ 3 files changed, 104 insertions(+) create mode 100644 airflow/providers/amazon/aws/example_dags/example_lambda.py create mode 100644 docs/apache-airflow-providers-amazon/operators/lambda.rst diff --git a/airflow/providers/amazon/aws/example_dags/example_lambda.py b/airflow/providers/amazon/aws/example_dags/example_lambda.py new file mode 100644 index 0000000000000..bf8888fedc133 --- /dev/null +++ b/airflow/providers/amazon/aws/example_dags/example_lambda.py @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import json +from datetime import datetime, timedelta +from os import getenv + +from airflow import DAG +from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator + +# [START howto_operator_lambda_env_variables] +LAMBDA_FUNCTION_NAME = getenv("LAMBDA_FUNCTION_NAME", "test-function") +# [END howto_operator_lambda_env_variables] + +SAMPLE_EVENT = json.dumps({"SampleEvent": {"SampleData": {"Name": "XYZ", "DoB": "1993-01-01"}}}) + +with DAG( + dag_id='example_lambda', + schedule_interval=None, + start_date=datetime(2021, 1, 1), + dagrun_timeout=timedelta(minutes=60), + tags=['example'], + catchup=False, +) as dag: + # [START howto_lambda_operator] + invoke_lambda_function = AwsLambdaInvokeFunctionOperator( + task_id='setup__invoke_lambda_function', + function_name=LAMBDA_FUNCTION_NAME, + log_type="None", + qualifier="$LATEST", + payload=SAMPLE_EVENT, + max_tries=None, + ) + # [END howto_lambda_operator] diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index 3bcd9c2133bc8..b61afcf95ca5a 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -167,6 +167,8 @@ integrations: - integration-name: AWS Lambda external-doc-url: https://aws.amazon.com/lambda/ logo: /integration-logos/aws/AWS-Lambda_light-bg@4x.png + how-to-guide: + - /docs/apache-airflow-providers-amazon/operators/lambda.rst tags: [aws] - integration-name: AWS Step Functions external-doc-url: https://aws.amazon.com/step-functions/ diff --git a/docs/apache-airflow-providers-amazon/operators/lambda.rst b/docs/apache-airflow-providers-amazon/operators/lambda.rst new file mode 100644 index 0000000000000..10197de8b9b50 --- /dev/null +++ b/docs/apache-airflow-providers-amazon/operators/lambda.rst @@ -0,0 +1,54 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +.. _howto/operator:AwsLambdaInvokeFunctionOperator: + +AWS Lambda Invoke Function Operator +=================================== + +.. contents:: + :depth: 1 + :local: + +Prerequisite Tasks +------------------ + +.. include:: _partials/prerequisite_tasks.rst + +Using Operator +-------------- +Use the +:class:`~airflow.providers.amazon.aws.operators.aws_lambda.AwsLambdaInvokeFunctionOperator` +to invoke an AWS Lambda function. To get started with AWS Lambda please visit +`aws.amazon.com/lambda `_ + + +In the following example, we invoke an AWS Lambda function. + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_lambda.py + :language: python + :start-after: [START howto_lambda_operator] + :end-before: [END howto_lambda_operator] + +More information +---------------- + +For further information, look at the documentation of :meth:`~Lambda.Client.invoke` method +in `boto3`_. + +.. _boto3: https://pypi.org/project/boto3/ From c04a16ca0b662611f9fd0fbaabf46be91eaa4657 Mon Sep 17 00:00:00 2001 From: Chirag S Date: Sat, 26 Feb 2022 13:59:12 +0000 Subject: [PATCH 6/7] update documentaiton code based on feedback --- airflow/providers/amazon/aws/operators/aws_lambda.py | 10 ++++------ .../operators/lambda.rst | 3 ++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/airflow/providers/amazon/aws/operators/aws_lambda.py b/airflow/providers/amazon/aws/operators/aws_lambda.py index 6aea9d5f8d10c..d68a59df8df26 100644 --- a/airflow/providers/amazon/aws/operators/aws_lambda.py +++ b/airflow/providers/amazon/aws/operators/aws_lambda.py @@ -54,7 +54,7 @@ def __init__( invocation_type: Optional[str] = None, client_context: Optional[str] = None, payload: Optional[str] = None, - aws_conn_id='aws_default', + aws_conn_id: str = 'aws_default', **kwargs, ): super().__init__(**kwargs) @@ -86,14 +86,12 @@ def execute(self, context: 'Context'): self.log.info("Lambda response metadata: %r", response.get("ResponseMetadata")) if response.get("StatusCode") not in success_status_codes: raise ValueError('Lambda function did not execute', json.dumps(response.get("ResponseMetadata"))) + payload_stream = response.get("Payload") + payload = payload_stream.read().decode() if "FunctionError" in response: - error_payload_stream = response.get("Payload") - error_payload = error_payload_stream.read().decode() raise ValueError( 'Lambda function execution resulted in error', - {"ResponseMetadata": response.get("ResponseMetadata"), "Payload": error_payload}, + {"ResponseMetadata": response.get("ResponseMetadata"), "Payload": payload}, ) self.log.info('Lambda function invocation succeeded: %r', response.get("ResponseMetadata")) - payload_stream = response.get("Payload") - payload = payload_stream.read().decode() return payload diff --git a/docs/apache-airflow-providers-amazon/operators/lambda.rst b/docs/apache-airflow-providers-amazon/operators/lambda.rst index 10197de8b9b50..4261f09e7963f 100644 --- a/docs/apache-airflow-providers-amazon/operators/lambda.rst +++ b/docs/apache-airflow-providers-amazon/operators/lambda.rst @@ -18,7 +18,7 @@ .. _howto/operator:AwsLambdaInvokeFunctionOperator: -AWS Lambda Invoke Function Operator +AWS Lambda Operators =================================== .. contents:: @@ -42,6 +42,7 @@ In the following example, we invoke an AWS Lambda function. .. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_lambda.py :language: python + :dedent: 4 :start-after: [START howto_lambda_operator] :end-before: [END howto_lambda_operator] From fe32a06dababcf7dde68898af6cc69238265a212 Mon Sep 17 00:00:00 2001 From: Chirag S Date: Sun, 27 Feb 2022 08:04:50 +0000 Subject: [PATCH 7/7] add AWSLambdaInvokeFunction operator example DAG --- .../amazon/aws/example_dags/example_lambda.py | 3 -- .../amazon/aws/operators/aws_lambda.py | 5 +++ .../operators/lambda.rst | 43 ++++++++++--------- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/airflow/providers/amazon/aws/example_dags/example_lambda.py b/airflow/providers/amazon/aws/example_dags/example_lambda.py index bf8888fedc133..17ba0dab48eb9 100644 --- a/airflow/providers/amazon/aws/example_dags/example_lambda.py +++ b/airflow/providers/amazon/aws/example_dags/example_lambda.py @@ -40,9 +40,6 @@ invoke_lambda_function = AwsLambdaInvokeFunctionOperator( task_id='setup__invoke_lambda_function', function_name=LAMBDA_FUNCTION_NAME, - log_type="None", - qualifier="$LATEST", payload=SAMPLE_EVENT, - max_tries=None, ) # [END howto_lambda_operator] diff --git a/airflow/providers/amazon/aws/operators/aws_lambda.py b/airflow/providers/amazon/aws/operators/aws_lambda.py index d68a59df8df26..c2d9d022fbcbc 100644 --- a/airflow/providers/amazon/aws/operators/aws_lambda.py +++ b/airflow/providers/amazon/aws/operators/aws_lambda.py @@ -40,6 +40,11 @@ class AwsLambdaInvokeFunctionOperator(BaseOperator): :param log_type: Set to Tail to include the execution log in the response. Otherwise, set to "None". :param qualifier: Specify a version or alias to invoke a published version of the function. :param aws_conn_id: The AWS connection ID to use + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AwsLambdaInvokeFunctionOperator` + """ template_fields: Sequence[str] = ('function_name', 'payload', 'qualifier', 'invocation_type') diff --git a/docs/apache-airflow-providers-amazon/operators/lambda.rst b/docs/apache-airflow-providers-amazon/operators/lambda.rst index 4261f09e7963f..79374eb506c34 100644 --- a/docs/apache-airflow-providers-amazon/operators/lambda.rst +++ b/docs/apache-airflow-providers-amazon/operators/lambda.rst @@ -16,29 +16,32 @@ under the License. -.. _howto/operator:AwsLambdaInvokeFunctionOperator: - AWS Lambda Operators -=================================== +================================================== -.. contents:: - :depth: 1 - :local: +`AWS Lambda `__ is a +serverless, event-driven compute service that lets you +run code for virtually any type of application +or backend service without provisioning or managing servers. +You can trigger Lambda from over 200 AWS services and software as a service (SaaS) applications, +and only pay for what you use. + +Airflow provides an operator to invoke an AWS Lambda function. Prerequisite Tasks ------------------- +^^^^^^^^^^^^^^^^^^ -.. include:: _partials/prerequisite_tasks.rst +.. include::/operators/_partials/prerequisite_tasks.rst -Using Operator --------------- -Use the -:class:`~airflow.providers.amazon.aws.operators.aws_lambda.AwsLambdaInvokeFunctionOperator` -to invoke an AWS Lambda function. To get started with AWS Lambda please visit -`aws.amazon.com/lambda `_ +.. _howto/operator:AwsLambdaInvokeFunctionOperator: + +Invoke an existing AWS Lambda function with a payload +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +To publish a message to an Amazon SNS Topic you can use +:class:`~airflow.providers.amazon.aws.operators.aws_lambda.AwsLambdaInvokeFunctionOperator`. -In the following example, we invoke an AWS Lambda function. .. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_lambda.py :language: python @@ -46,10 +49,10 @@ In the following example, we invoke an AWS Lambda function. :start-after: [START howto_lambda_operator] :end-before: [END howto_lambda_operator] -More information ----------------- -For further information, look at the documentation of :meth:`~Lambda.Client.invoke` method -in `boto3`_. +Reference +^^^^^^^^^ + +For further information, look at: -.. _boto3: https://pypi.org/project/boto3/ +* `Boto3 Library Documentation for Lambda `__