Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from os import environ

from airflow import DAG
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.operators.step_function import (
StepFunctionGetExecutionOutputOperator,
StepFunctionStartExecutionOperator,
)
from airflow.providers.amazon.aws.sensors.step_function import StepFunctionExecutionSensor

STEP_FUNCTIONS_STATE_MACHINE_ARN = environ.get('STEP_FUNCTIONS_STATE_MACHINE_ARN', 'state_machine_arn')

with DAG(
dag_id='example_step_functions',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as dag:

# [START howto_operator_step_function_start_execution]
start_execution = StepFunctionStartExecutionOperator(
task_id='start_execution', state_machine_arn=STEP_FUNCTIONS_STATE_MACHINE_ARN
)
# [END howto_operator_step_function_start_execution]

# [START howto_operator_step_function_execution_sensor]
wait_for_execution = StepFunctionExecutionSensor(
task_id='wait_for_execution', execution_arn=start_execution.output
)
# [END howto_operator_step_function_execution_sensor]

# [START howto_operator_step_function_get_execution_output]
get_execution_output = StepFunctionGetExecutionOutputOperator(
task_id='get_execution_output', execution_arn=start_execution.output
)
# [END howto_operator_step_function_get_execution_output]

chain(start_execution, wait_for_execution, get_execution_output)
10 changes: 6 additions & 4 deletions airflow/providers/amazon/aws/operators/step_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@

class StepFunctionStartExecutionOperator(BaseOperator):
"""
An Operator that begins execution of an Step Function State Machine
An Operator that begins execution of an AWS Step Function State Machine.

Additional arguments may be specified and are passed down to the underlying BaseOperator.

.. seealso::
:class:`~airflow.models.BaseOperator`
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:StepFunctionStartExecutionOperator`

:param state_machine_arn: ARN of the Step Function State Machine
:param name: The name of the execution.
Expand Down Expand Up @@ -79,12 +80,13 @@ def execute(self, context: 'Context'):

class StepFunctionGetExecutionOutputOperator(BaseOperator):
"""
An Operator that begins execution of an Step Function State Machine
An Operator that returns the output of an AWS Step Function State Machine execution.

Additional arguments may be specified and are passed down to the underlying BaseOperator.

.. seealso::
:class:`~airflow.models.BaseOperator`
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:StepFunctionGetExecutionOutputOperator`

:param execution_arn: ARN of the Step Function State Machine Execution
:param aws_conn_id: aws connection to use, defaults to 'aws_default'
Expand Down
8 changes: 6 additions & 2 deletions airflow/providers/amazon/aws/sensors/step_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,17 @@

class StepFunctionExecutionSensor(BaseSensorOperator):
"""
Asks for the state of the Step Function State Machine Execution until it
Asks for the state of the AWS Step Function State Machine Execution until it
reaches a failure state or success state.
If it fails, failing the task.
If it fails, then fail the task.

On successful completion of the Execution the Sensor will do an XCom Push
of the State Machine's output to `output`

.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/operator:StepFunctionExecutionSensor`

:param execution_arn: execution_arn to check the state of
:param aws_conn_id: aws connection to use, defaults to 'aws_default'
"""
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ integrations:
- integration-name: AWS Step Functions
external-doc-url: https://aws.amazon.com/step-functions/
logo: /integration-logos/aws/AWS-Step-Functions_light-bg@4x.png
how-to-guide:
- /docs/apache-airflow-providers-amazon/operators/step_functions.rst
tags: [aws]
- integration-name: AWS Database Migration Service
external-doc-url: https://aws.amazon.com/dms/
Expand Down
78 changes: 78 additions & 0 deletions docs/apache-airflow-providers-amazon/operators/step_functions.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.


AWS Step Functions Operators
============================

`AWS Step Functions <https://aws.amazon.com/step-functions/>`__ makes it easy to coordinate the components
of distributed applications as a series of steps in a visual workflow. You can quickly build and run state
machines to execute the steps of your application in a reliable and scalable fashion.

Prerequisite Tasks
------------------

.. include:: _partials/prerequisite_tasks.rst

.. _howto/operator:StepFunctionStartExecutionOperator:

AWS Step Functions Start Execution Operator
"""""""""""""""""""""""""""""""""""""""""""

To start a new AWS Step Functions State Machine execution
use :class:`~airflow.providers.amazon.aws.operators.step_function.StepFunctionStartExecutionOperator`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_step_functions.py
:language: python
:dedent: 4
:start-after: [START howto_operator_step_function_start_execution]
:end-before: [END howto_operator_step_function_start_execution]

.. _howto/operator:StepFunctionExecutionSensor:

AWS Step Functions Execution Sensor
"""""""""""""""""""""""""""""""""""

To wait on the state of an AWS Step Function State Machine execution until it reaches a terminal state you can
use :class:`~airflow.providers.amazon.aws.sensors.step_function.StepFunctionExecutionSensor`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_step_functions.py
:language: python
:dedent: 4
:start-after: [START howto_operator_step_function_execution_sensor]
:end-before: [END howto_operator_step_function_execution_sensor]

.. _howto/operator:StepFunctionGetExecutionOutputOperator:

AWS Step Functions Get Execution Output Operator
""""""""""""""""""""""""""""""""""""""""""""""""

To fetch the output from an AWS Step Function State Machine execution you can
use :class:`~airflow.providers.amazon.aws.operators.step_function.StepFunctionGetExecutionOutputOperator`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_step_functions.py
:language: python
:dedent: 4
:start-after: [START howto_operator_step_function_get_execution_output]
:end-before: [END howto_operator_step_function_get_execution_output]

References
----------

For further information, look at:

* `Boto3 Library Documentation for Step Functions <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/stepfunctions.html>`__