diff --git a/airflow/providers/amazon/aws/example_dags/example_glue.py b/airflow/providers/amazon/aws/example_dags/example_glue.py
new file mode 100644
index 0000000000000..65417dc24f97c
--- /dev/null
+++ b/airflow/providers/amazon/aws/example_dags/example_glue.py
@@ -0,0 +1,123 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from datetime import datetime
+from os import getenv
+
+from airflow import DAG
+from airflow.decorators import task
+from airflow.models.baseoperator import chain
+from airflow.providers.amazon.aws.hooks.s3 import S3Hook
+from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
+from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
+from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
+from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
+
+GLUE_DATABASE_NAME = getenv('GLUE_DATABASE_NAME', 'glue_database_name')
+GLUE_EXAMPLE_S3_BUCKET = getenv('GLUE_EXAMPLE_S3_BUCKET', 'glue_example_s3_bucket')
+
+# Role needs putobject/getobject access to the above bucket as well as the glue
+# service role, see docs here: https://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html
+GLUE_CRAWLER_ROLE = getenv('GLUE_CRAWLER_ROLE', 'glue_crawler_role')
+GLUE_CRAWLER_NAME = 'example_crawler'
+GLUE_CRAWLER_CONFIG = {
+ 'Name': GLUE_CRAWLER_NAME,
+ 'Role': GLUE_CRAWLER_ROLE,
+ 'DatabaseName': GLUE_DATABASE_NAME,
+ 'Targets': {
+ 'S3Targets': [
+ {
+ 'Path': f'{GLUE_EXAMPLE_S3_BUCKET}/input',
+ }
+ ]
+ },
+}
+
+# Example csv data used as input to the example AWS Glue Job.
+EXAMPLE_CSV = '''
+apple,0.5
+milk,2.5
+bread,4.0
+'''
+
+# Example Spark script to operate on the above sample csv data.
+EXAMPLE_SCRIPT = f'''
+from pyspark.context import SparkContext
+from awsglue.context import GlueContext
+
+glueContext = GlueContext(SparkContext.getOrCreate())
+datasource = glueContext.create_dynamic_frame.from_catalog(
+ database='{GLUE_DATABASE_NAME}', table_name='input')
+print('There are %s items in the table' % datasource.count())
+
+datasource.toDF().write.format('csv').mode("append").save('s3://{GLUE_EXAMPLE_S3_BUCKET}/output')
+'''
+
+
+@task(task_id='setup__upload_artifacts_to_s3')
+def upload_artifacts_to_s3():
+ '''Upload example CSV input data and an example Spark script to be used by the Glue Job'''
+ s3_hook = S3Hook()
+ s3_load_kwargs = {"replace": True, "bucket_name": GLUE_EXAMPLE_S3_BUCKET}
+ s3_hook.load_string(string_data=EXAMPLE_CSV, key='input/input.csv', **s3_load_kwargs)
+ s3_hook.load_string(string_data=EXAMPLE_SCRIPT, key='etl_script.py', **s3_load_kwargs)
+
+
+with DAG(
+ dag_id='example_glue',
+ schedule_interval=None,
+ start_date=datetime(2021, 1, 1),
+ tags=['example'],
+ catchup=False,
+) as glue_dag:
+
+ setup_upload_artifacts_to_s3 = upload_artifacts_to_s3()
+
+ # [START howto_operator_glue_crawler]
+ crawl_s3 = GlueCrawlerOperator(
+ task_id='crawl_s3',
+ config=GLUE_CRAWLER_CONFIG,
+ wait_for_completion=False,
+ )
+ # [END howto_operator_glue_crawler]
+
+ # [START howto_sensor_glue_crawler]
+ wait_for_crawl = GlueCrawlerSensor(task_id='wait_for_crawl', crawler_name=GLUE_CRAWLER_NAME)
+ # [END howto_sensor_glue_crawler]
+
+ # [START howto_operator_glue]
+ job_name = 'example_glue_job'
+ submit_glue_job = GlueJobOperator(
+ task_id='submit_glue_job',
+ job_name=job_name,
+ wait_for_completion=False,
+ script_location=f's3://{GLUE_EXAMPLE_S3_BUCKET}/etl_script.py',
+ s3_bucket=GLUE_EXAMPLE_S3_BUCKET,
+ iam_role_name=GLUE_CRAWLER_ROLE.split('/')[-1],
+ create_job_kwargs={'GlueVersion': '3.0', 'NumberOfWorkers': 2, 'WorkerType': 'G.1X'},
+ )
+ # [END howto_operator_glue]
+
+ # [START howto_sensor_glue]
+ wait_for_job = GlueJobSensor(
+ task_id='wait_for_job',
+ job_name=job_name,
+ # Job ID extracted from previous Glue Job Operator task
+ run_id=submit_glue_job.output,
+ )
+ # [END howto_sensor_glue]
+
+ chain(setup_upload_artifacts_to_s3, crawl_s3, wait_for_crawl, submit_glue_job, wait_for_job)
diff --git a/airflow/providers/amazon/aws/operators/glue.py b/airflow/providers/amazon/aws/operators/glue.py
index 10e0530df7754..05eafe72692f7 100644
--- a/airflow/providers/amazon/aws/operators/glue.py
+++ b/airflow/providers/amazon/aws/operators/glue.py
@@ -34,6 +34,10 @@ class GlueJobOperator(BaseOperator):
ETL service for running Spark Jobs on the AWS cloud.
Language support: Python and Scala
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:GlueJobOperator`
+
:param job_name: unique job name per AWS Account
:param script_location: location of ETL script. Must be a local or S3 path
:param job_desc: job description details
diff --git a/airflow/providers/amazon/aws/operators/glue_crawler.py b/airflow/providers/amazon/aws/operators/glue_crawler.py
index 0bd27aaeaf0b6..a584301f926d1 100644
--- a/airflow/providers/amazon/aws/operators/glue_crawler.py
+++ b/airflow/providers/amazon/aws/operators/glue_crawler.py
@@ -38,9 +38,14 @@ class GlueCrawlerOperator(BaseOperator):
service that manages a catalog of metadata tables that contain the inferred
schema, format and data types of data stores within the AWS cloud.
+ .. seealso::
+ For more information on how to use this operator, take a look at the guide:
+ :ref:`howto/operator:GlueCrawlerOperator`
+
:param config: Configurations for the AWS Glue crawler
:param aws_conn_id: aws connection to use
:param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status
+ :param wait_for_completion: Whether or not wait for crawl execution completion. (default: True)
"""
ui_color = '#ededed'
@@ -50,11 +55,13 @@ def __init__(
config,
aws_conn_id='aws_default',
poll_interval: int = 5,
+ wait_for_completion: bool = True,
**kwargs,
):
super().__init__(**kwargs)
self.aws_conn_id = aws_conn_id
self.poll_interval = poll_interval
+ self.wait_for_completion = wait_for_completion
self.config = config
@cached_property
@@ -76,8 +83,9 @@ def execute(self, context: 'Context'):
self.log.info("Triggering AWS Glue Crawler")
self.hook.start_crawler(crawler_name)
- self.log.info("Waiting for AWS Glue Crawler")
- self.hook.wait_for_crawler_completion(crawler_name=crawler_name, poll_interval=self.poll_interval)
+ if self.wait_for_completion:
+ self.log.info("Waiting for AWS Glue Crawler")
+ self.hook.wait_for_crawler_completion(crawler_name=crawler_name, poll_interval=self.poll_interval)
return crawler_name
diff --git a/airflow/providers/amazon/aws/sensors/glue.py b/airflow/providers/amazon/aws/sensors/glue.py
index 149c77cf52a16..59bdaa4c4eff3 100644
--- a/airflow/providers/amazon/aws/sensors/glue.py
+++ b/airflow/providers/amazon/aws/sensors/glue.py
@@ -31,6 +31,10 @@ class GlueJobSensor(BaseSensorOperator):
Waits for an AWS Glue Job to reach any of the status below
'FAILED', 'STOPPED', 'SUCCEEDED'
+ .. seealso::
+ For more information on how to use this sensor, take a look at the guide:
+ :ref:`howto/sensor:GlueJobSensor`
+
:param job_name: The AWS Glue Job unique name
:param run_id: The AWS Glue current running job identifier
"""
diff --git a/airflow/providers/amazon/aws/sensors/glue_crawler.py b/airflow/providers/amazon/aws/sensors/glue_crawler.py
index 10ff0a074f40d..52944ce2eb30e 100644
--- a/airflow/providers/amazon/aws/sensors/glue_crawler.py
+++ b/airflow/providers/amazon/aws/sensors/glue_crawler.py
@@ -31,6 +31,10 @@ class GlueCrawlerSensor(BaseSensorOperator):
Waits for an AWS Glue crawler to reach any of the statuses below
'FAILED', 'CANCELLED', 'SUCCEEDED'
+ .. seealso::
+ For more information on how to use this sensor, take a look at the guide:
+ :ref:`howto/sensor:GlueCrawlerSensor`
+
:param crawler_name: The AWS Glue crawler unique name
:param aws_conn_id: aws connection to use, defaults to 'aws_default'
"""
diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml
index abf9a249d6ca7..311c2c25481e0 100644
--- a/airflow/providers/amazon/provider.yaml
+++ b/airflow/providers/amazon/provider.yaml
@@ -174,6 +174,8 @@ integrations:
- integration-name: AWS Glue
external-doc-url: https://aws.amazon.com/glue/
logo: /integration-logos/aws/AWS-Glue_light-bg@4x.png
+ how-to-guide:
+ - /docs/apache-airflow-providers-amazon/operators/glue.rst
tags: [aws]
- integration-name: AWS Lambda
external-doc-url: https://aws.amazon.com/lambda/
diff --git a/docs/apache-airflow-providers-amazon/operators/glue.rst b/docs/apache-airflow-providers-amazon/operators/glue.rst
new file mode 100644
index 0000000000000..306354c500a1c
--- /dev/null
+++ b/docs/apache-airflow-providers-amazon/operators/glue.rst
@@ -0,0 +1,101 @@
+ .. Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ .. http://www.apache.org/licenses/LICENSE-2.0
+
+ .. Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+
+AWS Glue Operators
+===================
+
+`AWS Glue `__ is a serverless data integration service that makes it
+easy to discover, prepare, and combine data for analytics, machine learning, and application development.
+AWS Glue provides all the capabilities needed for data integration so that you can start analyzing
+your data and putting it to use in minutes instead of months.
+
+Prerequisite Tasks
+------------------
+
+.. include:: _partials/prerequisite_tasks.rst
+
+.. _howto/operator:GlueCrawlerOperator:
+
+AWS Glue Crawler Operator
+"""""""""""""""""""""""""
+
+AWS Glue Crawlers allow you to easily extract data from various data sources.
+To create a new AWS Glue Crawler or run an existing one you can
+use :class:`~airflow.providers.amazon.aws.operators.glue_crawler.GlueCrawlerOperator`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_glue_crawler]
+ :end-before: [END howto_operator_glue_crawler]
+
+Note that the AWS IAM role included in the ``config`` needs access to the source data location
+(e.g. s3:PutObject access if data is stored in Amazon S3) as well as the ``AWSGlueServiceRole``
+policy. See the References section below for a link to more details.
+
+.. _howto/sensor:GlueCrawlerSensor:
+
+AWS Glue Crawler Sensor
+"""""""""""""""""""""""
+
+To wait on the state of an AWS Glue Crawler execution until it reaches a terminal state you can
+use :class:`~airflow.providers.amazon.aws.sensors.glue_crawler.GlueCrawlerSensor`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_sensor_glue_crawler]
+ :end-before: [END howto_sensor_glue_crawler]
+
+.. _howto/operator:GlueJobOperator:
+
+AWS Glue Job Operator
+"""""""""""""""""""""
+
+To submit a new AWS Glue Job you can use :class:`~airflow.providers.amazon.aws.operators.glue.GlueJobOperator`.
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_operator_glue]
+ :end-before: [END howto_operator_glue]
+
+Note that the same AWS IAM role used for the Crawler can be used here as well, but it will need
+policies to provide access to the output location for result data.
+
+.. _howto/sensor:GlueJobSensor:
+
+AWS Glue Job Sensor
+"""""""""""""""""""
+
+To wait on the state of an AWS Glue Job until it reaches a terminal state you can
+use :class:`~airflow.providers.amazon.aws.sensors.glue.GlueJobSensor`
+
+.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py
+ :language: python
+ :dedent: 4
+ :start-after: [START howto_sensor_glue]
+ :end-before: [END howto_sensor_glue]
+
+Reference
+---------
+
+For further information, look at:
+
+* `Boto3 Library Documentation for Glue `__
+* `Glue IAM Role creation `__
diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt
index 81e0a7bd79e0a..cbc6b2abdd7b9 100644
--- a/docs/spelling_wordlist.txt
+++ b/docs/spelling_wordlist.txt
@@ -309,6 +309,7 @@ ProductSearchClient
Protobuf
PublisherClient
Pubsub
+PutObject
Py
PyPI
Pyarrow