Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions airflow/providers/amazon/aws/example_dags/example_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import json
from datetime import datetime, timedelta
from os import getenv

from airflow import DAG
from airflow.providers.amazon.aws.operators.aws_lambda import AwsLambdaInvokeFunctionOperator

# [START howto_operator_lambda_env_variables]
LAMBDA_FUNCTION_NAME = getenv("LAMBDA_FUNCTION_NAME", "test-function")
# [END howto_operator_lambda_env_variables]

SAMPLE_EVENT = json.dumps({"SampleEvent": {"SampleData": {"Name": "XYZ", "DoB": "1993-01-01"}}})

with DAG(
dag_id='example_lambda',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=60),
tags=['example'],
catchup=False,
) as dag:
# [START howto_lambda_operator]
invoke_lambda_function = AwsLambdaInvokeFunctionOperator(
task_id='setup__invoke_lambda_function',
function_name=LAMBDA_FUNCTION_NAME,
payload=SAMPLE_EVENT,
)
# [END howto_lambda_operator]
91 changes: 74 additions & 17 deletions airflow/providers/amazon/aws/hooks/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

"""This module contains AWS Lambda hook"""
import warnings
from typing import Any, List, Optional

from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook

Expand All @@ -40,30 +41,86 @@ class LambdaHook(AwsBaseHook):

def __init__(
self,
function_name: str,
log_type: str = 'None',
qualifier: str = '$LATEST',
invocation_type: str = 'RequestResponse',
*args,
**kwargs,
) -> None:
self.function_name = function_name
self.log_type = log_type
self.invocation_type = invocation_type
self.qualifier = qualifier
kwargs["client_type"] = "lambda"
super().__init__(*args, **kwargs)

def invoke_lambda(self, payload: str) -> str:
"""Invoke Lambda Function"""
response = self.conn.invoke(
FunctionName=self.function_name,
InvocationType=self.invocation_type,
LogType=self.log_type,
Payload=payload,
Qualifier=self.qualifier,
)
def invoke_lambda(
self,
*,
function_name: str,
invocation_type: Optional[str] = None,
log_type: Optional[str] = None,
client_context: Optional[str] = None,
payload: Optional[str] = None,
qualifier: Optional[str] = None,
):
"""Invoke Lambda Function. Refer to the boto3 documentation for more info."""
invoke_args = {
"FunctionName": function_name,
"InvocationType": invocation_type,
"LogType": log_type,
"ClientContext": client_context,
"Payload": payload,
"Qualifier": qualifier,
}
response = self.conn.invoke(**{k: v for k, v in invoke_args.items() if v is not None})
return response

def create_lambda(
self,
*,
function_name: str,
runtime: str,
role: str,
handler: str,
code: dict,
description: Optional[str] = None,
timeout: Optional[int] = None,
memory_size: Optional[int] = None,
publish: Optional[bool] = None,
vpc_config: Optional[Any] = None,
package_type: Optional[str] = None,
dead_letter_config: Optional[Any] = None,
environment: Optional[Any] = None,
kms_key_arn: Optional[str] = None,
tracing_config: Optional[Any] = None,
tags: Optional[Any] = None,
layers: Optional[list] = None,
file_system_configs: Optional[List[Any]] = None,
image_config: Optional[Any] = None,
code_signing_config_arn: Optional[str] = None,
architectures: Optional[List[str]] = None,
) -> dict:
"""Create a Lambda Function"""
create_function_args = {
"FunctionName": function_name,
"Runtime": runtime,
"Role": role,
"Handler": handler,
"Code": code,
"Description": description,
"Timeout": timeout,
"MemorySize": memory_size,
"Publish": publish,
"VpcConfig": vpc_config,
"PackageType": package_type,
"DeadLetterConfig": dead_letter_config,
"Environment": environment,
"KMSKeyArn": kms_key_arn,
"TracingConfig": tracing_config,
"Tags": tags,
"Layers": layers,
"FileSystemConfigs": file_system_configs,
"ImageConfig": image_config,
"CodeSigningConfigArn": code_signing_config_arn,
"Architectures": architectures,
}
response = self.conn.create_function(
**{k: v for k, v in create_function_args.items() if v is not None},
)
return response


Expand Down
102 changes: 102 additions & 0 deletions airflow/providers/amazon/aws/operators/aws_lambda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import json
from typing import TYPE_CHECKING, Optional, Sequence

from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.lambda_function import LambdaHook

if TYPE_CHECKING:
from airflow.utils.context import Context


class AwsLambdaInvokeFunctionOperator(BaseOperator):
"""
Invokes an AWS Lambda function.
You can invoke a function synchronously (and wait for the response),
or asynchronously.
To invoke a function asynchronously,
set `invocation_type` to `Event`. For more details,
review the boto3 Lambda invoke docs.

:param function_name: The name of the AWS Lambda function, version, or alias.
:param payload: The JSON string that you want to provide to your Lambda function as input.
:param log_type: Set to Tail to include the execution log in the response. Otherwise, set to "None".
:param qualifier: Specify a version or alias to invoke a published version of the function.
:param aws_conn_id: The AWS connection ID to use

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:AwsLambdaInvokeFunctionOperator`

"""

template_fields: Sequence[str] = ('function_name', 'payload', 'qualifier', 'invocation_type')
ui_color = '#ff7300'

def __init__(
self,
*,
function_name: str,
log_type: Optional[str] = None,
qualifier: Optional[str] = None,
invocation_type: Optional[str] = None,
client_context: Optional[str] = None,
payload: Optional[str] = None,
aws_conn_id: str = 'aws_default',
**kwargs,
):
super().__init__(**kwargs)
self.function_name = function_name
self.payload = payload
self.log_type = log_type
self.qualifier = qualifier
self.invocation_type = invocation_type
self.client_context = client_context
self.aws_conn_id = aws_conn_id

def execute(self, context: 'Context'):
"""
Invokes the target AWS Lambda function from Airflow.

:return: The response payload from the function, or an error object.
"""
hook = LambdaHook(aws_conn_id=self.aws_conn_id)
success_status_codes = [200, 202, 204]
self.log.info("Invoking AWS Lambda function: %s with payload: %s", self.function_name, self.payload)
response = hook.invoke_lambda(
function_name=self.function_name,
invocation_type=self.invocation_type,
log_type=self.log_type,
client_context=self.client_context,
payload=self.payload,
qualifier=self.qualifier,
)
self.log.info("Lambda response metadata: %r", response.get("ResponseMetadata"))
if response.get("StatusCode") not in success_status_codes:
raise ValueError('Lambda function did not execute', json.dumps(response.get("ResponseMetadata")))
payload_stream = response.get("Payload")
payload = payload_stream.read().decode()
if "FunctionError" in response:
raise ValueError(
'Lambda function execution resulted in error',
{"ResponseMetadata": response.get("ResponseMetadata"), "Payload": payload},
)
self.log.info('Lambda function invocation succeeded: %r', response.get("ResponseMetadata"))
return payload
5 changes: 5 additions & 0 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ integrations:
- integration-name: AWS Lambda
external-doc-url: https://aws.amazon.com/lambda/
logo: /integration-logos/aws/AWS-Lambda_light-bg@4x.png
how-to-guide:
- /docs/apache-airflow-providers-amazon/operators/lambda.rst
tags: [aws]
- integration-name: AWS Step Functions
external-doc-url: https://aws.amazon.com/step-functions/
Expand Down Expand Up @@ -229,6 +231,9 @@ operators:
python-modules:
- airflow.providers.amazon.aws.operators.glue
- airflow.providers.amazon.aws.operators.glue_crawler
- integration-name: AWS Lambda
python-modules:
- airflow.providers.amazon.aws.operators.aws_lambda
- integration-name: Amazon Simple Storage Service (S3)
python-modules:
- airflow.providers.amazon.aws.operators.s3_bucket
Expand Down
58 changes: 58 additions & 0 deletions docs/apache-airflow-providers-amazon/operators/lambda.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.


AWS Lambda Operators
==================================================

`AWS Lambda <https://aws.amazon.com/lambda/>`__ is a
serverless, event-driven compute service that lets you
run code for virtually any type of application
or backend service without provisioning or managing servers.
You can trigger Lambda from over 200 AWS services and software as a service (SaaS) applications,
and only pay for what you use.

Airflow provides an operator to invoke an AWS Lambda function.

Prerequisite Tasks
^^^^^^^^^^^^^^^^^^

.. include::/operators/_partials/prerequisite_tasks.rst


.. _howto/operator:AwsLambdaInvokeFunctionOperator:

Invoke an existing AWS Lambda function with a payload
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

To publish a message to an Amazon SNS Topic you can use
:class:`~airflow.providers.amazon.aws.operators.aws_lambda.AwsLambdaInvokeFunctionOperator`.


.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_lambda.py
:language: python
:dedent: 4
:start-after: [START howto_lambda_operator]
:end-before: [END howto_lambda_operator]


Reference
^^^^^^^^^

For further information, look at:

* `Boto3 Library Documentation for Lambda <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/lambda.html>`__
6 changes: 3 additions & 3 deletions tests/providers/amazon/aws/hooks/test_lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@
class TestLambdaHook:
@mock_lambda
def test_get_conn_returns_a_boto3_connection(self):
hook = LambdaHook(aws_conn_id='aws_default', function_name="test_function", region_name="us-east-1")
hook = LambdaHook(aws_conn_id='aws_default')
assert hook.conn is not None

@mock_lambda
def test_invoke_lambda_function(self):

hook = LambdaHook(aws_conn_id='aws_default', function_name="test_function", region_name="us-east-1")
hook = LambdaHook(aws_conn_id='aws_default')

with patch.object(hook.conn, 'invoke') as mock_invoke:
payload = '{"hello": "airflow"}'
hook.invoke_lambda(payload=payload)
hook.invoke_lambda(function_name="test_function", payload=payload)

mock_invoke.asset_called_once_with(
FunctionName="test_function",
Expand Down
Loading