Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions airflow/providers/amazon/aws/example_dags/example_batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from json import loads
from os import environ

from airflow import DAG
from airflow.providers.amazon.aws.operators.batch import BatchOperator
from airflow.providers.amazon.aws.sensors.batch import BatchSensor

# The inputs below are required for the submit batch example DAG.
JOB_NAME = environ.get('BATCH_JOB_NAME', 'example_job_name')
JOB_DEFINITION = environ.get('BATCH_JOB_DEFINITION', 'example_job_definition')
JOB_QUEUE = environ.get('BATCH_JOB_QUEUE', 'example_job_queue')
JOB_OVERRIDES = loads(environ.get('BATCH_JOB_OVERRIDES', '{}'))

# An existing (externally triggered) job id is required for the sensor example DAG.
JOB_ID = environ.get('BATCH_JOB_ID', '00000000-0000-0000-0000-000000000000')


with DAG(
dag_id='example_batch_submit_job',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as submit_dag:

# [START howto_operator_batch]
submit_batch_job = BatchOperator(
task_id='submit_batch_job',
job_name=JOB_NAME,
job_queue=JOB_QUEUE,
job_definition=JOB_DEFINITION,
overrides=JOB_OVERRIDES,
)
# [END howto_operator_batch]

with DAG(
dag_id='example_batch_wait_for_job_sensor',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as sensor_dag:

# [START howto_sensor_batch]
wait_for_batch_job = BatchSensor(
task_id='wait_for_batch_job',
job_id=JOB_ID,
)
# [END howto_sensor_batch]
5 changes: 4 additions & 1 deletion airflow/providers/amazon/aws/operators/batch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand Down Expand Up @@ -41,6 +40,10 @@ class BatchOperator(BaseOperator):
"""
Execute a job on AWS Batch

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:BatchOperator`

:param job_name: the name for the job that will run on AWS Batch (templated)

:param job_definition: the job definition name on AWS Batch
Expand Down
4 changes: 4 additions & 0 deletions airflow/providers/amazon/aws/sensors/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ class BatchSensor(BaseSensorOperator):
Asks for the state of the Batch Job execution until it reaches a failure state or success state.
If the job fails, the task will fail.

.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/sensor:BatchSensor`

:param job_id: Batch job_id to check the state for
:param aws_conn_id: aws connection to use, defaults to 'aws_default'
"""
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ integrations:
- integration-name: AWS Batch
external-doc-url: https://aws.amazon.com/batch/
logo: /integration-logos/aws/AWS-Batch_light-bg@4x.png
how-to-guide:
- /docs/apache-airflow-providers-amazon/operators/batch.rst
tags: [aws]
- integration-name: AWS DataSync
external-doc-url: https://aws.amazon.com/datasync/
Expand Down
65 changes: 65 additions & 0 deletions docs/apache-airflow-providers-amazon/operators/batch.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.


AWS Batch Operators
===================

`AWS Batch <https://aws.amazon.com/batch/>`__ enables you to run batch computing workloads on the AWS Cloud.
Batch computing is a common way for developers, scientists, and engineers to access large amounts of compute
resources. AWS Batch removes the undifferentiated heavy lifting of configuring and managing the required
infrastructure.

Prerequisite Tasks
------------------

.. include:: _partials/prerequisite_tasks.rst

.. _howto/sensor:BatchSensor:

AWS Batch Sensor
""""""""""""""""

To wait on the state of an AWS Batch Job until it reaches a terminal state you can
use :class:`~airflow.providers.amazon.aws.sensors.batch.BatchSensor`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_batch.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_batch]
:end-before: [END howto_sensor_batch]

.. _howto/operator:BatchOperator:

AWS Batch Operator
""""""""""""""""""

To submit a new AWS Batch Job and monitor it until it reaches a terminal state you can
use :class:`~airflow.providers.amazon.aws.operators.batch.BatchOperator`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_batch.py
:language: python
:dedent: 4
:start-after: [START howto_operator_batch]
:end-before: [END howto_operator_batch]

Reference
---------

For further information, look at:

* `Boto3 Library Documentation for Batch <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/batch.html>`__