From 17116bb042738c3c943b6cc1c3066ba6b0c55b13 Mon Sep 17 00:00:00 2001 From: Niko Oliveira Date: Fri, 11 Mar 2022 17:01:30 -0800 Subject: [PATCH] Add docs and example dag for AWS Glue --- .../amazon/aws/example_dags/example_glue.py | 123 ++++++++++++++++++ .../providers/amazon/aws/operators/glue.py | 4 + .../amazon/aws/operators/glue_crawler.py | 12 +- airflow/providers/amazon/aws/sensors/glue.py | 4 + .../amazon/aws/sensors/glue_crawler.py | 4 + airflow/providers/amazon/provider.yaml | 2 + .../operators/glue.rst | 101 ++++++++++++++ docs/spelling_wordlist.txt | 1 + 8 files changed, 249 insertions(+), 2 deletions(-) create mode 100644 airflow/providers/amazon/aws/example_dags/example_glue.py create mode 100644 docs/apache-airflow-providers-amazon/operators/glue.rst diff --git a/airflow/providers/amazon/aws/example_dags/example_glue.py b/airflow/providers/amazon/aws/example_dags/example_glue.py new file mode 100644 index 0000000000000..65417dc24f97c --- /dev/null +++ b/airflow/providers/amazon/aws/example_dags/example_glue.py @@ -0,0 +1,123 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from datetime import datetime +from os import getenv + +from airflow import DAG +from airflow.decorators import task +from airflow.models.baseoperator import chain +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.amazon.aws.operators.glue import GlueJobOperator +from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator +from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor +from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor + +GLUE_DATABASE_NAME = getenv('GLUE_DATABASE_NAME', 'glue_database_name') +GLUE_EXAMPLE_S3_BUCKET = getenv('GLUE_EXAMPLE_S3_BUCKET', 'glue_example_s3_bucket') + +# Role needs putobject/getobject access to the above bucket as well as the glue +# service role, see docs here: https://docs.aws.amazon.com/glue/latest/dg/create-an-iam-role.html +GLUE_CRAWLER_ROLE = getenv('GLUE_CRAWLER_ROLE', 'glue_crawler_role') +GLUE_CRAWLER_NAME = 'example_crawler' +GLUE_CRAWLER_CONFIG = { + 'Name': GLUE_CRAWLER_NAME, + 'Role': GLUE_CRAWLER_ROLE, + 'DatabaseName': GLUE_DATABASE_NAME, + 'Targets': { + 'S3Targets': [ + { + 'Path': f'{GLUE_EXAMPLE_S3_BUCKET}/input', + } + ] + }, +} + +# Example csv data used as input to the example AWS Glue Job. +EXAMPLE_CSV = ''' +apple,0.5 +milk,2.5 +bread,4.0 +''' + +# Example Spark script to operate on the above sample csv data. +EXAMPLE_SCRIPT = f''' +from pyspark.context import SparkContext +from awsglue.context import GlueContext + +glueContext = GlueContext(SparkContext.getOrCreate()) +datasource = glueContext.create_dynamic_frame.from_catalog( + database='{GLUE_DATABASE_NAME}', table_name='input') +print('There are %s items in the table' % datasource.count()) + +datasource.toDF().write.format('csv').mode("append").save('s3://{GLUE_EXAMPLE_S3_BUCKET}/output') +''' + + +@task(task_id='setup__upload_artifacts_to_s3') +def upload_artifacts_to_s3(): + '''Upload example CSV input data and an example Spark script to be used by the Glue Job''' + s3_hook = S3Hook() + s3_load_kwargs = {"replace": True, "bucket_name": GLUE_EXAMPLE_S3_BUCKET} + s3_hook.load_string(string_data=EXAMPLE_CSV, key='input/input.csv', **s3_load_kwargs) + s3_hook.load_string(string_data=EXAMPLE_SCRIPT, key='etl_script.py', **s3_load_kwargs) + + +with DAG( + dag_id='example_glue', + schedule_interval=None, + start_date=datetime(2021, 1, 1), + tags=['example'], + catchup=False, +) as glue_dag: + + setup_upload_artifacts_to_s3 = upload_artifacts_to_s3() + + # [START howto_operator_glue_crawler] + crawl_s3 = GlueCrawlerOperator( + task_id='crawl_s3', + config=GLUE_CRAWLER_CONFIG, + wait_for_completion=False, + ) + # [END howto_operator_glue_crawler] + + # [START howto_sensor_glue_crawler] + wait_for_crawl = GlueCrawlerSensor(task_id='wait_for_crawl', crawler_name=GLUE_CRAWLER_NAME) + # [END howto_sensor_glue_crawler] + + # [START howto_operator_glue] + job_name = 'example_glue_job' + submit_glue_job = GlueJobOperator( + task_id='submit_glue_job', + job_name=job_name, + wait_for_completion=False, + script_location=f's3://{GLUE_EXAMPLE_S3_BUCKET}/etl_script.py', + s3_bucket=GLUE_EXAMPLE_S3_BUCKET, + iam_role_name=GLUE_CRAWLER_ROLE.split('/')[-1], + create_job_kwargs={'GlueVersion': '3.0', 'NumberOfWorkers': 2, 'WorkerType': 'G.1X'}, + ) + # [END howto_operator_glue] + + # [START howto_sensor_glue] + wait_for_job = GlueJobSensor( + task_id='wait_for_job', + job_name=job_name, + # Job ID extracted from previous Glue Job Operator task + run_id=submit_glue_job.output, + ) + # [END howto_sensor_glue] + + chain(setup_upload_artifacts_to_s3, crawl_s3, wait_for_crawl, submit_glue_job, wait_for_job) diff --git a/airflow/providers/amazon/aws/operators/glue.py b/airflow/providers/amazon/aws/operators/glue.py index 10e0530df7754..05eafe72692f7 100644 --- a/airflow/providers/amazon/aws/operators/glue.py +++ b/airflow/providers/amazon/aws/operators/glue.py @@ -34,6 +34,10 @@ class GlueJobOperator(BaseOperator): ETL service for running Spark Jobs on the AWS cloud. Language support: Python and Scala + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GlueJobOperator` + :param job_name: unique job name per AWS Account :param script_location: location of ETL script. Must be a local or S3 path :param job_desc: job description details diff --git a/airflow/providers/amazon/aws/operators/glue_crawler.py b/airflow/providers/amazon/aws/operators/glue_crawler.py index 0bd27aaeaf0b6..a584301f926d1 100644 --- a/airflow/providers/amazon/aws/operators/glue_crawler.py +++ b/airflow/providers/amazon/aws/operators/glue_crawler.py @@ -38,9 +38,14 @@ class GlueCrawlerOperator(BaseOperator): service that manages a catalog of metadata tables that contain the inferred schema, format and data types of data stores within the AWS cloud. + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:GlueCrawlerOperator` + :param config: Configurations for the AWS Glue crawler :param aws_conn_id: aws connection to use :param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status + :param wait_for_completion: Whether or not wait for crawl execution completion. (default: True) """ ui_color = '#ededed' @@ -50,11 +55,13 @@ def __init__( config, aws_conn_id='aws_default', poll_interval: int = 5, + wait_for_completion: bool = True, **kwargs, ): super().__init__(**kwargs) self.aws_conn_id = aws_conn_id self.poll_interval = poll_interval + self.wait_for_completion = wait_for_completion self.config = config @cached_property @@ -76,8 +83,9 @@ def execute(self, context: 'Context'): self.log.info("Triggering AWS Glue Crawler") self.hook.start_crawler(crawler_name) - self.log.info("Waiting for AWS Glue Crawler") - self.hook.wait_for_crawler_completion(crawler_name=crawler_name, poll_interval=self.poll_interval) + if self.wait_for_completion: + self.log.info("Waiting for AWS Glue Crawler") + self.hook.wait_for_crawler_completion(crawler_name=crawler_name, poll_interval=self.poll_interval) return crawler_name diff --git a/airflow/providers/amazon/aws/sensors/glue.py b/airflow/providers/amazon/aws/sensors/glue.py index 149c77cf52a16..59bdaa4c4eff3 100644 --- a/airflow/providers/amazon/aws/sensors/glue.py +++ b/airflow/providers/amazon/aws/sensors/glue.py @@ -31,6 +31,10 @@ class GlueJobSensor(BaseSensorOperator): Waits for an AWS Glue Job to reach any of the status below 'FAILED', 'STOPPED', 'SUCCEEDED' + .. seealso:: + For more information on how to use this sensor, take a look at the guide: + :ref:`howto/sensor:GlueJobSensor` + :param job_name: The AWS Glue Job unique name :param run_id: The AWS Glue current running job identifier """ diff --git a/airflow/providers/amazon/aws/sensors/glue_crawler.py b/airflow/providers/amazon/aws/sensors/glue_crawler.py index 10ff0a074f40d..52944ce2eb30e 100644 --- a/airflow/providers/amazon/aws/sensors/glue_crawler.py +++ b/airflow/providers/amazon/aws/sensors/glue_crawler.py @@ -31,6 +31,10 @@ class GlueCrawlerSensor(BaseSensorOperator): Waits for an AWS Glue crawler to reach any of the statuses below 'FAILED', 'CANCELLED', 'SUCCEEDED' + .. seealso:: + For more information on how to use this sensor, take a look at the guide: + :ref:`howto/sensor:GlueCrawlerSensor` + :param crawler_name: The AWS Glue crawler unique name :param aws_conn_id: aws connection to use, defaults to 'aws_default' """ diff --git a/airflow/providers/amazon/provider.yaml b/airflow/providers/amazon/provider.yaml index abf9a249d6ca7..311c2c25481e0 100644 --- a/airflow/providers/amazon/provider.yaml +++ b/airflow/providers/amazon/provider.yaml @@ -174,6 +174,8 @@ integrations: - integration-name: AWS Glue external-doc-url: https://aws.amazon.com/glue/ logo: /integration-logos/aws/AWS-Glue_light-bg@4x.png + how-to-guide: + - /docs/apache-airflow-providers-amazon/operators/glue.rst tags: [aws] - integration-name: AWS Lambda external-doc-url: https://aws.amazon.com/lambda/ diff --git a/docs/apache-airflow-providers-amazon/operators/glue.rst b/docs/apache-airflow-providers-amazon/operators/glue.rst new file mode 100644 index 0000000000000..306354c500a1c --- /dev/null +++ b/docs/apache-airflow-providers-amazon/operators/glue.rst @@ -0,0 +1,101 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + + +AWS Glue Operators +=================== + +`AWS Glue `__ is a serverless data integration service that makes it +easy to discover, prepare, and combine data for analytics, machine learning, and application development. +AWS Glue provides all the capabilities needed for data integration so that you can start analyzing +your data and putting it to use in minutes instead of months. + +Prerequisite Tasks +------------------ + +.. include:: _partials/prerequisite_tasks.rst + +.. _howto/operator:GlueCrawlerOperator: + +AWS Glue Crawler Operator +""""""""""""""""""""""""" + +AWS Glue Crawlers allow you to easily extract data from various data sources. +To create a new AWS Glue Crawler or run an existing one you can +use :class:`~airflow.providers.amazon.aws.operators.glue_crawler.GlueCrawlerOperator`. + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_glue_crawler] + :end-before: [END howto_operator_glue_crawler] + +Note that the AWS IAM role included in the ``config`` needs access to the source data location +(e.g. s3:PutObject access if data is stored in Amazon S3) as well as the ``AWSGlueServiceRole`` +policy. See the References section below for a link to more details. + +.. _howto/sensor:GlueCrawlerSensor: + +AWS Glue Crawler Sensor +""""""""""""""""""""""" + +To wait on the state of an AWS Glue Crawler execution until it reaches a terminal state you can +use :class:`~airflow.providers.amazon.aws.sensors.glue_crawler.GlueCrawlerSensor`. + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py + :language: python + :dedent: 4 + :start-after: [START howto_sensor_glue_crawler] + :end-before: [END howto_sensor_glue_crawler] + +.. _howto/operator:GlueJobOperator: + +AWS Glue Job Operator +""""""""""""""""""""" + +To submit a new AWS Glue Job you can use :class:`~airflow.providers.amazon.aws.operators.glue.GlueJobOperator`. + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_glue] + :end-before: [END howto_operator_glue] + +Note that the same AWS IAM role used for the Crawler can be used here as well, but it will need +policies to provide access to the output location for result data. + +.. _howto/sensor:GlueJobSensor: + +AWS Glue Job Sensor +""""""""""""""""""" + +To wait on the state of an AWS Glue Job until it reaches a terminal state you can +use :class:`~airflow.providers.amazon.aws.sensors.glue.GlueJobSensor` + +.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_glue.py + :language: python + :dedent: 4 + :start-after: [START howto_sensor_glue] + :end-before: [END howto_sensor_glue] + +Reference +--------- + +For further information, look at: + +* `Boto3 Library Documentation for Glue `__ +* `Glue IAM Role creation `__ diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 81e0a7bd79e0a..cbc6b2abdd7b9 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -309,6 +309,7 @@ ProductSearchClient Protobuf PublisherClient Pubsub +PutObject Py PyPI Pyarrow