From b836c087c4aa462c1b44b36ba84c7c70488b2d74 Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Mon, 22 Aug 2022 23:27:59 -0400 Subject: [PATCH 1/2] System test for Redshift to S3 using SystemTestContextBuilder pattern. --- .../amazon/aws/transfers/redshift_to_s3.py | 1 + .../amazon/aws/example_redshift_to_s3.py | 195 +++++++++++++++++- 2 files changed, 186 insertions(+), 10 deletions(-) diff --git a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py index 002eb64b7cb32..a7375cef5456e 100644 --- a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py @@ -76,6 +76,7 @@ class RedshiftToS3Operator(BaseOperator): 'table', 'unload_options', 'select_query', + 'redshift_conn_id', ) template_ext: Sequence[str] = ('.sql',) template_fields_renderers = {'select_query': 'sql'} diff --git a/tests/system/providers/amazon/aws/example_redshift_to_s3.py b/tests/system/providers/amazon/aws/example_redshift_to_s3.py index 58c5d61d2d79c..bd60d7748fcb2 100644 --- a/tests/system/providers/amazon/aws/example_redshift_to_s3.py +++ b/tests/system/providers/amazon/aws/example_redshift_to_s3.py @@ -16,36 +16,211 @@ # under the License. from __future__ import annotations + from datetime import datetime -from os import getenv +from typing import List + +import boto3 -from airflow import DAG +from airflow import DAG, settings +from airflow.decorators import task +from airflow.models import Connection +from airflow.models.baseoperator import chain +from airflow.operators.python import get_current_context +from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook +from airflow.providers.amazon.aws.operators.redshift_cluster import ( + RedshiftCreateClusterOperator, + RedshiftDeleteClusterOperator, +) +from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator +from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator +from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator -from tests.system.providers.amazon.aws.utils import set_env_id +from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder -ENV_ID = set_env_id() DAG_ID = 'example_redshift_to_s3' -S3_BUCKET_NAME = getenv("S3_BUCKET_NAME", "s3_bucket_name") -S3_KEY = getenv("S3_KEY", "s3_key") -REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "redshift_table") + +DB_LOGIN = 'adminuser' +DB_PASS = 'MyAmazonPassword1' +DB_NAME = 'dev' + +IP_PERMISSION = { + 'FromPort': -1, + 'IpProtocol': 'All', + 'IpRanges': [{'CidrIp': '0.0.0.0/0', 'Description': 'Test description'}], +} + +S3_KEY = 's3_key' +REDSHIFT_TABLE = 'test_table' + +SQL_CREATE_TABLE = f""" + CREATE TABLE IF NOT EXISTS {REDSHIFT_TABLE} ( + fruit_id INTEGER, + name VARCHAR NOT NULL, + color VARCHAR NOT NULL + ); +""" + +SQL_INSERT_DATA = f"INSERT INTO {REDSHIFT_TABLE} VALUES ( 1, 'Banana', 'Yellow');" + +SQL_DROP_TABLE = f"DROP TABLE IF EXISTS {REDSHIFT_TABLE};" + +sys_test_context_task = SystemTestContextBuilder().build() + + +@task +def create_connection(conn_id_name: str, cluster_id: str): + redshift_hook = RedshiftHook() + cluster_endpoint = redshift_hook.get_conn().describe_clusters(ClusterIdentifier=cluster_id)['Clusters'][0] + conn = Connection( + conn_id=conn_id_name, + conn_type='redshift', + host=cluster_endpoint['Endpoint']['Address'], + login=DB_LOGIN, + password=DB_PASS, + port=cluster_endpoint['Endpoint']['Port'], + schema=cluster_endpoint['DBName'], + ) + session = settings.Session() + session.add(conn) + session.commit() + + +@task +def setup_security_group(sg_name: str, ip_permissions: List[dict]): + client = boto3.client('ec2') + vpc_id = client.describe_vpcs()['Vpcs'][0]['VpcId'] + security_group = client.create_security_group( + Description='Redshift-system-test', GroupName=sg_name, VpcId=vpc_id + ) + client.get_waiter('security_group_exists').wait( + GroupIds=[security_group['GroupId']], + GroupNames=[sg_name], + WaiterConfig={'Delay': 15, 'MaxAttempts': 4}, + ) + client.authorize_security_group_ingress( + GroupId=security_group['GroupId'], GroupName=sg_name, IpPermissions=ip_permissions + ) + ti = get_current_context()['ti'] + ti.xcom_push(key='security_group_id', value=security_group['GroupId']) + + +@task(trigger_rule=TriggerRule.ALL_DONE) +def delete_security_group(sg_id: str, sg_name: str): + boto3.client('ec2').delete_security_group(GroupId=sg_id, GroupName=sg_name) + with DAG( dag_id=DAG_ID, start_date=datetime(2021, 1, 1), - schedule=None, + schedule='@once', catchup=False, tags=['example'], ) as dag: + test_context = sys_test_context_task() + env_id = test_context[ENV_ID_KEY] + redshift_cluster_identifier = f'{env_id}-redshift-cluster' + conn_id_name = f'{env_id}-conn-id' + sg_name = f'{env_id}-sg' + bucket_name = f'{env_id}-bucket' + + set_up_sg = setup_security_group(sg_name=sg_name, ip_permissions=[IP_PERMISSION]) + + create_bucket = S3CreateBucketOperator( + task_id='s3_create_bucket', + bucket_name=bucket_name, + ) + + create_cluster = RedshiftCreateClusterOperator( + task_id="create_cluster", + cluster_identifier=redshift_cluster_identifier, + vpc_security_group_ids=[set_up_sg['security_group_id']], + publicly_accessible=True, + cluster_type="single-node", + node_type="dc2.large", + master_username=DB_LOGIN, + master_user_password=DB_PASS, + ) + + wait_cluster_available = RedshiftClusterSensor( + task_id='wait_cluster_available', + cluster_identifier=redshift_cluster_identifier, + target_status='available', + poke_interval=5, + timeout=60 * 15, + ) + + set_up_connection = create_connection(conn_id_name, cluster_id=redshift_cluster_identifier) + + create_table_redshift_data = RedshiftSQLOperator( + task_id='create_table_redshift_data', + redshift_conn_id=conn_id_name, + sql=SQL_CREATE_TABLE, + ) + insert_data = RedshiftSQLOperator( + task_id='insert_data', + redshift_conn_id=conn_id_name, + sql=SQL_INSERT_DATA, + ) + # [START howto_transfer_redshift_to_s3] - task_transfer_redshift_to_s3 = RedshiftToS3Operator( + transfer_redshift_to_s3 = RedshiftToS3Operator( task_id='transfer_redshift_to_s3', - s3_bucket=S3_BUCKET_NAME, + redshift_conn_id=conn_id_name, + s3_bucket=bucket_name, s3_key=S3_KEY, schema='PUBLIC', table=REDSHIFT_TABLE, ) # [END howto_transfer_redshift_to_s3] + drop_table = RedshiftSQLOperator( + task_id='drop_table', + redshift_conn_id=conn_id_name, + sql=SQL_DROP_TABLE, + trigger_rule=TriggerRule.ALL_DONE, + ) + delete_cluster = RedshiftDeleteClusterOperator( + task_id='delete_cluster', + cluster_identifier=redshift_cluster_identifier, + trigger_rule=TriggerRule.ALL_DONE, + ) + + delete_sg = delete_security_group( + sg_id=set_up_sg['security_group_id'], + sg_name=sg_name, + ) + + delete_bucket = S3DeleteBucketOperator( + task_id='delete_bucket', bucket_name=bucket_name, force_delete=True + ) + + chain( + # TEST SETUP + test_context, + set_up_sg, + create_bucket, + create_cluster, + wait_cluster_available, + set_up_connection, + create_table_redshift_data, + insert_data, + # TEST BODY + transfer_redshift_to_s3, + # TEST TEARDOWN + drop_table, + delete_cluster, + delete_sg, + delete_bucket, + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + from tests.system.utils import get_test_run # noqa: E402 From 8fcd0fc53c2fbc5b104cc6049705c054469b1aac Mon Sep 17 00:00:00 2001 From: Syed Hussain Date: Tue, 20 Sep 2022 16:23:53 -0700 Subject: [PATCH 2/2] Combine S3 to Redshift, and Redshift to S3 system tests --- .../example_dags/example_s3_to_redshift.py | 80 ------------------- .../amazon/aws/transfers/s3_to_redshift.py | 10 ++- .../operators/transfer/redshift_to_s3.rst | 2 +- .../operators/transfer/s3_to_redshift.rst | 2 +- .../aws/transfers/test_redshift_to_s3.py | 1 + .../aws/transfers/test_s3_to_redshift.py | 1 + ...s3.py => example_redshift_s3_transfers.py} | 78 +++++++++++++----- 7 files changed, 71 insertions(+), 103 deletions(-) delete mode 100644 airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py rename tests/system/providers/amazon/aws/{example_redshift_to_s3.py => example_redshift_s3_transfers.py} (74%) diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py deleted file mode 100644 index e1b25d177d4a1..0000000000000 --- a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py +++ /dev/null @@ -1,80 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from datetime import datetime -from os import getenv - -from airflow import DAG -from airflow.decorators import task -from airflow.models.baseoperator import chain -from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator -from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator - -S3_BUCKET_NAME = getenv("S3_BUCKET_NAME", "s3_bucket_name") -S3_KEY = getenv("S3_KEY", "s3_filename") -REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "redshift_table") - - -@task(task_id='setup__add_sample_data_to_s3') -def task_add_sample_data_to_s3(): - s3_hook = S3Hook() - s3_hook.load_string("0,Airflow", f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET_NAME, replace=True) - - -@task(task_id='teardown__remove_sample_data_from_s3') -def task_remove_sample_data_from_s3(): - s3_hook = S3Hook() - if s3_hook.check_for_key(f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET_NAME): - s3_hook.delete_objects(S3_BUCKET_NAME, f'{S3_KEY}/{REDSHIFT_TABLE}') - - -with DAG( - dag_id="example_s3_to_redshift", - start_date=datetime(2021, 1, 1), - catchup=False, - tags=['example'], -) as dag: - add_sample_data_to_s3 = task_add_sample_data_to_s3() - - setup__task_create_table = RedshiftSQLOperator( - sql=f'CREATE TABLE IF NOT EXISTS {REDSHIFT_TABLE}(Id int, Name varchar)', - task_id='setup__create_table', - ) - # [START howto_transfer_s3_to_redshift] - task_transfer_s3_to_redshift = S3ToRedshiftOperator( - s3_bucket=S3_BUCKET_NAME, - s3_key=S3_KEY, - schema='PUBLIC', - table=REDSHIFT_TABLE, - copy_options=['csv'], - task_id='transfer_s3_to_redshift', - ) - # [END howto_transfer_s3_to_redshift] - teardown__task_drop_table = RedshiftSQLOperator( - sql=f'DROP TABLE IF EXISTS {REDSHIFT_TABLE}', - task_id='teardown__drop_table', - ) - - remove_sample_data_from_s3 = task_remove_sample_data_from_s3() - - chain( - [add_sample_data_to_s3, setup__task_create_table], - task_transfer_s3_to_redshift, - [teardown__task_drop_table, remove_sample_data_from_s3], - ) diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py index 8e46b25510510..620774865a749 100644 --- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py +++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py @@ -64,7 +64,15 @@ class S3ToRedshiftOperator(BaseOperator): :param upsert_keys: List of fields to use as key on upsert action """ - template_fields: Sequence[str] = ('s3_bucket', 's3_key', 'schema', 'table', 'column_list', 'copy_options') + template_fields: Sequence[str] = ( + 's3_bucket', + 's3_key', + 'schema', + 'table', + 'column_list', + 'copy_options', + 'redshift_conn_id', + ) template_ext: Sequence[str] = () ui_color = '#99e699' diff --git a/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst b/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst index 3707cf2b4a1cd..980d44d48b756 100644 --- a/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst +++ b/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst @@ -42,7 +42,7 @@ To get more information about this operator visit: Example usage: -.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_to_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_s3_transfers.py :language: python :dedent: 4 :start-after: [START howto_transfer_redshift_to_s3] diff --git a/docs/apache-airflow-providers-amazon/operators/transfer/s3_to_redshift.rst b/docs/apache-airflow-providers-amazon/operators/transfer/s3_to_redshift.rst index cfc9d404f5949..dbe8c9a2dc85b 100644 --- a/docs/apache-airflow-providers-amazon/operators/transfer/s3_to_redshift.rst +++ b/docs/apache-airflow-providers-amazon/operators/transfer/s3_to_redshift.rst @@ -42,7 +42,7 @@ To get more information about this operator visit: Example usage: -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_s3_transfers.py :language: python :dedent: 4 :start-after: [START howto_transfer_s3_to_redshift] diff --git a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py index ee1a34095fe7c..ac96213356773 100644 --- a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py +++ b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py @@ -297,4 +297,5 @@ def test_template_fields_overrides(self): 'table', 'unload_options', 'select_query', + 'redshift_conn_id', ) diff --git a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py index 2fbc27cdfa47f..5f34311aaa2ba 100644 --- a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py +++ b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py @@ -332,6 +332,7 @@ def test_template_fields_overrides(self): 'table', 'column_list', 'copy_options', + 'redshift_conn_id', ) def test_execute_unavailable_method(self): diff --git a/tests/system/providers/amazon/aws/example_redshift_to_s3.py b/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py similarity index 74% rename from tests/system/providers/amazon/aws/example_redshift_to_s3.py rename to tests/system/providers/amazon/aws/example_redshift_s3_transfers.py index bd60d7748fcb2..263b6e5b44f02 100644 --- a/tests/system/providers/amazon/aws/example_redshift_to_s3.py +++ b/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py @@ -16,9 +16,7 @@ # under the License. from __future__ import annotations - from datetime import datetime -from typing import List import boto3 @@ -26,16 +24,21 @@ from airflow.decorators import task from airflow.models import Connection from airflow.models.baseoperator import chain -from airflow.operators.python import get_current_context from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook from airflow.providers.amazon.aws.operators.redshift_cluster import ( RedshiftCreateClusterOperator, RedshiftDeleteClusterOperator, ) from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator -from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3DeleteBucketOperator +from airflow.providers.amazon.aws.operators.s3 import ( + S3CreateBucketOperator, + S3CreateObjectOperator, + S3DeleteBucketOperator, +) from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor +from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator +from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator from airflow.utils.trigger_rule import TriggerRule from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder @@ -52,6 +55,7 @@ } S3_KEY = 's3_key' +S3_KEY_2 = 's3_key_2' REDSHIFT_TABLE = 'test_table' SQL_CREATE_TABLE = f""" @@ -66,6 +70,9 @@ SQL_DROP_TABLE = f"DROP TABLE IF EXISTS {REDSHIFT_TABLE};" +DATA = "0, 'Airflow', 'testing'" + + sys_test_context_task = SystemTestContextBuilder().build() @@ -88,27 +95,26 @@ def create_connection(conn_id_name: str, cluster_id: str): @task -def setup_security_group(sg_name: str, ip_permissions: List[dict]): +def setup_security_group(sec_group_name: str, ip_permissions: list[dict]): client = boto3.client('ec2') vpc_id = client.describe_vpcs()['Vpcs'][0]['VpcId'] security_group = client.create_security_group( - Description='Redshift-system-test', GroupName=sg_name, VpcId=vpc_id + Description='Redshift-system-test', GroupName=sec_group_name, VpcId=vpc_id ) client.get_waiter('security_group_exists').wait( GroupIds=[security_group['GroupId']], - GroupNames=[sg_name], + GroupNames=[sec_group_name], WaiterConfig={'Delay': 15, 'MaxAttempts': 4}, ) client.authorize_security_group_ingress( - GroupId=security_group['GroupId'], GroupName=sg_name, IpPermissions=ip_permissions + GroupId=security_group['GroupId'], GroupName=sec_group_name, IpPermissions=ip_permissions ) - ti = get_current_context()['ti'] - ti.xcom_push(key='security_group_id', value=security_group['GroupId']) + return security_group['GroupId'] @task(trigger_rule=TriggerRule.ALL_DONE) -def delete_security_group(sg_id: str, sg_name: str): - boto3.client('ec2').delete_security_group(GroupId=sg_id, GroupName=sg_name) +def delete_security_group(sec_group_id: str, sec_group_name: str): + boto3.client('ec2').delete_security_group(GroupId=sec_group_id, GroupName=sec_group_name) with DAG( @@ -125,7 +131,7 @@ def delete_security_group(sg_id: str, sg_name: str): sg_name = f'{env_id}-sg' bucket_name = f'{env_id}-bucket' - set_up_sg = setup_security_group(sg_name=sg_name, ip_permissions=[IP_PERMISSION]) + set_up_sg = setup_security_group(sec_group_name=sg_name, ip_permissions=[IP_PERMISSION]) create_bucket = S3CreateBucketOperator( task_id='s3_create_bucket', @@ -133,12 +139,12 @@ def delete_security_group(sg_id: str, sg_name: str): ) create_cluster = RedshiftCreateClusterOperator( - task_id="create_cluster", + task_id='create_cluster', cluster_identifier=redshift_cluster_identifier, - vpc_security_group_ids=[set_up_sg['security_group_id']], + vpc_security_group_ids=[set_up_sg], publicly_accessible=True, - cluster_type="single-node", - node_type="dc2.large", + cluster_type='single-node', + node_type='dc2.large', master_username=DB_LOGIN, master_user_password=DB_PASS, ) @@ -153,6 +159,14 @@ def delete_security_group(sg_id: str, sg_name: str): set_up_connection = create_connection(conn_id_name, cluster_id=redshift_cluster_identifier) + create_object = S3CreateObjectOperator( + task_id='create_object', + s3_bucket=bucket_name, + s3_key=S3_KEY_2, + data=DATA, + replace=True, + ) + create_table_redshift_data = RedshiftSQLOperator( task_id='create_table_redshift_data', redshift_conn_id=conn_id_name, @@ -175,6 +189,24 @@ def delete_security_group(sg_id: str, sg_name: str): ) # [END howto_transfer_redshift_to_s3] + check_if_key_exists = S3KeySensor( + task_id='check_if_key_exists', + bucket_name=bucket_name, + bucket_key=f'{S3_KEY}/{REDSHIFT_TABLE}_0000_part_00', + ) + + # [START howto_transfer_s3_to_redshift] + transfer_s3_to_redshift = S3ToRedshiftOperator( + task_id='transfer_s3_to_redshift', + redshift_conn_id=conn_id_name, + s3_bucket=bucket_name, + s3_key=S3_KEY_2, + schema='PUBLIC', + table=REDSHIFT_TABLE, + copy_options=['csv'], + ) + # [END howto_transfer_s3_to_redshift] + drop_table = RedshiftSQLOperator( task_id='drop_table', redshift_conn_id=conn_id_name, @@ -188,12 +220,15 @@ def delete_security_group(sg_id: str, sg_name: str): ) delete_sg = delete_security_group( - sg_id=set_up_sg['security_group_id'], - sg_name=sg_name, + sec_group_id=set_up_sg['security_group_id'], + sec_group_name=sg_name, ) delete_bucket = S3DeleteBucketOperator( - task_id='delete_bucket', bucket_name=bucket_name, force_delete=True + task_id='delete_bucket', + bucket_name=bucket_name, + force_delete=True, + trigger_rule=TriggerRule.ALL_DONE, ) chain( @@ -204,10 +239,13 @@ def delete_security_group(sg_id: str, sg_name: str): create_cluster, wait_cluster_available, set_up_connection, + create_object, create_table_redshift_data, insert_data, # TEST BODY transfer_redshift_to_s3, + check_if_key_exists, + transfer_s3_to_redshift, # TEST TEARDOWN drop_table, delete_cluster,