Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions airflow/providers/amazon/aws/example_dags/example_glue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
from os import getenv

from airflow import DAG
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor

GLUE_DATABASE_NAME = getenv('GLUE_DATABASE_NAME', 'glue_database_name')
GLUE_EXAMPLE_S3_BUCKET = getenv('GLUE_EXAMPLE_S3_BUCKET', 'glue_example_s3_bucket')

# Role needs putobject/getobject access to the above bucket as well as the glue
# service role, see docs here: https://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html
GLUE_CRAWLER_ROLE = getenv('GLUE_CRAWLER_ROLE', 'glue_crawler_role')
GLUE_CRAWLER_NAME = 'example_crawler'
GLUE_CRAWLER_CONFIG = {
'Name': GLUE_CRAWLER_NAME,
'Role': GLUE_CRAWLER_ROLE,
'DatabaseName': GLUE_DATABASE_NAME,
'Targets': {
'S3Targets': [
{
'Path': f'{GLUE_EXAMPLE_S3_BUCKET}/input',
}
]
},
}

# Example csv data used as input to the example AWS Glue Job.
EXAMPLE_CSV = '''
apple,0.5
milk,2.5
bread,4.0
'''

# Example Spark script to operate on the above sample csv data.
EXAMPLE_SCRIPT = f'''
from pyspark.context import SparkContext
from awsglue.context import GlueContext

glueContext = GlueContext(SparkContext.getOrCreate())
datasource = glueContext.create_dynamic_frame.from_catalog(
database='{GLUE_DATABASE_NAME}', table_name='input')
print('There are %s items in the table' % datasource.count())

datasource.toDF().write.format('csv').mode("append").save('s3://{GLUE_EXAMPLE_S3_BUCKET}/output')
'''


@task(task_id='setup__upload_artifacts_to_s3')
def upload_artifacts_to_s3():
'''Upload example CSV input data and an example Spark script to be used by the Glue Job'''
s3_hook = S3Hook()
s3_load_kwargs = {"replace": True, "bucket_name": GLUE_EXAMPLE_S3_BUCKET}
s3_hook.load_string(string_data=EXAMPLE_CSV, key='input/input.csv', **s3_load_kwargs)
s3_hook.load_string(string_data=EXAMPLE_SCRIPT, key='etl_script.py', **s3_load_kwargs)


with DAG(
dag_id='example_glue',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as glue_dag:

setup_upload_artifacts_to_s3 = upload_artifacts_to_s3()

# [START howto_operator_glue_crawler]
crawl_s3 = GlueCrawlerOperator(
task_id='crawl_s3',
config=GLUE_CRAWLER_CONFIG,
wait_for_completion=False,
)
# [END howto_operator_glue_crawler]

# [START howto_sensor_glue_crawler]
wait_for_crawl = GlueCrawlerSensor(task_id='wait_for_crawl', crawler_name=GLUE_CRAWLER_NAME)
# [END howto_sensor_glue_crawler]

# [START howto_operator_glue]
job_name = 'example_glue_job'
submit_glue_job = GlueJobOperator(
task_id='submit_glue_job',
job_name=job_name,
wait_for_completion=False,
script_location=f's3://{GLUE_EXAMPLE_S3_BUCKET}/etl_script.py',
s3_bucket=GLUE_EXAMPLE_S3_BUCKET,
iam_role_name=GLUE_CRAWLER_ROLE.split('/')[-1],
create_job_kwargs={'GlueVersion': '3.0', 'NumberOfWorkers': 2, 'WorkerType': 'G.1X'},
)
# [END howto_operator_glue]

# [START howto_sensor_glue]
wait_for_job = GlueJobSensor(
task_id='wait_for_job',
job_name=job_name,
# Job ID extracted from previous Glue Job Operator task
run_id=submit_glue_job.output,
)
# [END howto_sensor_glue]

chain(setup_upload_artifacts_to_s3, crawl_s3, wait_for_crawl, submit_glue_job, wait_for_job)
4 changes: 4 additions & 0 deletions airflow/providers/amazon/aws/operators/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ class GlueJobOperator(BaseOperator):
ETL service for running Spark Jobs on the AWS cloud.
Language support: Python and Scala

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GlueJobOperator`

:param job_name: unique job name per AWS Account
:param script_location: location of ETL script. Must be a local or S3 path
:param job_desc: job description details
Expand Down
12 changes: 10 additions & 2 deletions airflow/providers/amazon/aws/operators/glue_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,14 @@ class GlueCrawlerOperator(BaseOperator):
service that manages a catalog of metadata tables that contain the inferred
schema, format and data types of data stores within the AWS cloud.

.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GlueCrawlerOperator`

:param config: Configurations for the AWS Glue crawler
:param aws_conn_id: aws connection to use
:param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status
:param wait_for_completion: Whether or not wait for crawl execution completion. (default: True)
"""

ui_color = '#ededed'
Expand All @@ -50,11 +55,13 @@ def __init__(
config,
aws_conn_id='aws_default',
poll_interval: int = 5,
wait_for_completion: bool = True,
**kwargs,
):
super().__init__(**kwargs)
self.aws_conn_id = aws_conn_id
self.poll_interval = poll_interval
self.wait_for_completion = wait_for_completion
self.config = config

@cached_property
Expand All @@ -76,8 +83,9 @@ def execute(self, context: 'Context'):

self.log.info("Triggering AWS Glue Crawler")
self.hook.start_crawler(crawler_name)
self.log.info("Waiting for AWS Glue Crawler")
self.hook.wait_for_crawler_completion(crawler_name=crawler_name, poll_interval=self.poll_interval)
if self.wait_for_completion:
self.log.info("Waiting for AWS Glue Crawler")
self.hook.wait_for_crawler_completion(crawler_name=crawler_name, poll_interval=self.poll_interval)

return crawler_name

Expand Down
4 changes: 4 additions & 0 deletions airflow/providers/amazon/aws/sensors/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class GlueJobSensor(BaseSensorOperator):
Waits for an AWS Glue Job to reach any of the status below
'FAILED', 'STOPPED', 'SUCCEEDED'

.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/sensor:GlueJobSensor`

:param job_name: The AWS Glue Job unique name
:param run_id: The AWS Glue current running job identifier
"""
Expand Down
4 changes: 4 additions & 0 deletions airflow/providers/amazon/aws/sensors/glue_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ class GlueCrawlerSensor(BaseSensorOperator):
Waits for an AWS Glue crawler to reach any of the statuses below
'FAILED', 'CANCELLED', 'SUCCEEDED'

.. seealso::
For more information on how to use this sensor, take a look at the guide:
:ref:`howto/sensor:GlueCrawlerSensor`

:param crawler_name: The AWS Glue crawler unique name
:param aws_conn_id: aws connection to use, defaults to 'aws_default'
"""
Expand Down
2 changes: 2 additions & 0 deletions airflow/providers/amazon/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ integrations:
- integration-name: AWS Glue
external-doc-url: https://aws.amazon.com/glue/
logo: /integration-logos/aws/AWS-Glue_light-bg@4x.png
how-to-guide:
- /docs/apache-airflow-providers-amazon/operators/glue.rst
tags: [aws]
- integration-name: AWS Lambda
external-doc-url: https://aws.amazon.com/lambda/
Expand Down
101 changes: 101 additions & 0 deletions docs/apache-airflow-providers-amazon/operators/glue.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.


AWS Glue Operators
===================

`AWS Glue <https://aws.amazon.com/glue/>`__ is a serverless data integration service that makes it
easy to discover, prepare, and combine data for analytics, machine learning, and application development.
AWS Glue provides all the capabilities needed for data integration so that you can start analyzing
your data and putting it to use in minutes instead of months.

Prerequisite Tasks
------------------

.. include:: _partials/prerequisite_tasks.rst

.. _howto/operator:GlueCrawlerOperator:

AWS Glue Crawler Operator
"""""""""""""""""""""""""

AWS Glue Crawlers allow you to easily extract data from various data sources.
To create a new AWS Glue Crawler or run an existing one you can
use :class:`~airflow.providers.amazon.aws.operators.glue_crawler.GlueCrawlerOperator`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py
:language: python
:dedent: 4
:start-after: [START howto_operator_glue_crawler]
:end-before: [END howto_operator_glue_crawler]

Note that the AWS IAM role included in the ``config`` needs access to the source data location
(e.g. s3:PutObject access if data is stored in Amazon S3) as well as the ``AWSGlueServiceRole``
policy. See the References section below for a link to more details.

.. _howto/sensor:GlueCrawlerSensor:

AWS Glue Crawler Sensor
"""""""""""""""""""""""

To wait on the state of an AWS Glue Crawler execution until it reaches a terminal state you can
use :class:`~airflow.providers.amazon.aws.sensors.glue_crawler.GlueCrawlerSensor`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_glue_crawler]
:end-before: [END howto_sensor_glue_crawler]

.. _howto/operator:GlueJobOperator:

AWS Glue Job Operator
"""""""""""""""""""""

To submit a new AWS Glue Job you can use :class:`~airflow.providers.amazon.aws.operators.glue.GlueJobOperator`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py
:language: python
:dedent: 4
:start-after: [START howto_operator_glue]
:end-before: [END howto_operator_glue]

Note that the same AWS IAM role used for the Crawler can be used here as well, but it will need
policies to provide access to the output location for result data.

.. _howto/sensor:GlueJobSensor:

AWS Glue Job Sensor
"""""""""""""""""""

To wait on the state of an AWS Glue Job until it reaches a terminal state you can
use :class:`~airflow.providers.amazon.aws.sensors.glue.GlueJobSensor`

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_glue]
:end-before: [END howto_sensor_glue]

Reference
---------

For further information, look at:

* `Boto3 Library Documentation for Glue <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/glue.html>`__
* `Glue IAM Role creation <https://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html>`__
1 change: 1 addition & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ ProductSearchClient
Protobuf
PublisherClient
Pubsub
PutObject
Py
PyPI
Pyarrow
Expand Down