diff --git a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py b/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py deleted file mode 100644 index e1b25d177d4a1..0000000000000 --- a/airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py +++ /dev/null @@ -1,80 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from datetime import datetime -from os import getenv - -from airflow import DAG -from airflow.decorators import task -from airflow.models.baseoperator import chain -from airflow.providers.amazon.aws.hooks.s3 import S3Hook -from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator -from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator - -S3_BUCKET_NAME = getenv("S3_BUCKET_NAME", "s3_bucket_name") -S3_KEY = getenv("S3_KEY", "s3_filename") -REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "redshift_table") - - -@task(task_id='setup__add_sample_data_to_s3') -def task_add_sample_data_to_s3(): - s3_hook = S3Hook() - s3_hook.load_string("0,Airflow", f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET_NAME, replace=True) - - -@task(task_id='teardown__remove_sample_data_from_s3') -def task_remove_sample_data_from_s3(): - s3_hook = S3Hook() - if s3_hook.check_for_key(f'{S3_KEY}/{REDSHIFT_TABLE}', S3_BUCKET_NAME): - s3_hook.delete_objects(S3_BUCKET_NAME, f'{S3_KEY}/{REDSHIFT_TABLE}') - - -with DAG( - dag_id="example_s3_to_redshift", - start_date=datetime(2021, 1, 1), - catchup=False, - tags=['example'], -) as dag: - add_sample_data_to_s3 = task_add_sample_data_to_s3() - - setup__task_create_table = RedshiftSQLOperator( - sql=f'CREATE TABLE IF NOT EXISTS {REDSHIFT_TABLE}(Id int, Name varchar)', - task_id='setup__create_table', - ) - # [START howto_transfer_s3_to_redshift] - task_transfer_s3_to_redshift = S3ToRedshiftOperator( - s3_bucket=S3_BUCKET_NAME, - s3_key=S3_KEY, - schema='PUBLIC', - table=REDSHIFT_TABLE, - copy_options=['csv'], - task_id='transfer_s3_to_redshift', - ) - # [END howto_transfer_s3_to_redshift] - teardown__task_drop_table = RedshiftSQLOperator( - sql=f'DROP TABLE IF EXISTS {REDSHIFT_TABLE}', - task_id='teardown__drop_table', - ) - - remove_sample_data_from_s3 = task_remove_sample_data_from_s3() - - chain( - [add_sample_data_to_s3, setup__task_create_table], - task_transfer_s3_to_redshift, - [teardown__task_drop_table, remove_sample_data_from_s3], - ) diff --git a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py index 002eb64b7cb32..a7375cef5456e 100644 --- a/airflow/providers/amazon/aws/transfers/redshift_to_s3.py +++ b/airflow/providers/amazon/aws/transfers/redshift_to_s3.py @@ -76,6 +76,7 @@ class RedshiftToS3Operator(BaseOperator): 'table', 'unload_options', 'select_query', + 'redshift_conn_id', ) template_ext: Sequence[str] = ('.sql',) template_fields_renderers = {'select_query': 'sql'} diff --git a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py index 8e46b25510510..620774865a749 100644 --- a/airflow/providers/amazon/aws/transfers/s3_to_redshift.py +++ b/airflow/providers/amazon/aws/transfers/s3_to_redshift.py @@ -64,7 +64,15 @@ class S3ToRedshiftOperator(BaseOperator): :param upsert_keys: List of fields to use as key on upsert action """ - template_fields: Sequence[str] = ('s3_bucket', 's3_key', 'schema', 'table', 'column_list', 'copy_options') + template_fields: Sequence[str] = ( + 's3_bucket', + 's3_key', + 'schema', + 'table', + 'column_list', + 'copy_options', + 'redshift_conn_id', + ) template_ext: Sequence[str] = () ui_color = '#99e699' diff --git a/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst b/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst index 3707cf2b4a1cd..980d44d48b756 100644 --- a/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst +++ b/docs/apache-airflow-providers-amazon/operators/transfer/redshift_to_s3.rst @@ -42,7 +42,7 @@ To get more information about this operator visit: Example usage: -.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_to_s3.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_s3_transfers.py :language: python :dedent: 4 :start-after: [START howto_transfer_redshift_to_s3] diff --git a/docs/apache-airflow-providers-amazon/operators/transfer/s3_to_redshift.rst b/docs/apache-airflow-providers-amazon/operators/transfer/s3_to_redshift.rst index cfc9d404f5949..dbe8c9a2dc85b 100644 --- a/docs/apache-airflow-providers-amazon/operators/transfer/s3_to_redshift.rst +++ b/docs/apache-airflow-providers-amazon/operators/transfer/s3_to_redshift.rst @@ -42,7 +42,7 @@ To get more information about this operator visit: Example usage: -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_redshift_s3_transfers.py :language: python :dedent: 4 :start-after: [START howto_transfer_s3_to_redshift] diff --git a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py index ee1a34095fe7c..ac96213356773 100644 --- a/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py +++ b/tests/providers/amazon/aws/transfers/test_redshift_to_s3.py @@ -297,4 +297,5 @@ def test_template_fields_overrides(self): 'table', 'unload_options', 'select_query', + 'redshift_conn_id', ) diff --git a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py index 2fbc27cdfa47f..5f34311aaa2ba 100644 --- a/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py +++ b/tests/providers/amazon/aws/transfers/test_s3_to_redshift.py @@ -332,6 +332,7 @@ def test_template_fields_overrides(self): 'table', 'column_list', 'copy_options', + 'redshift_conn_id', ) def test_execute_unavailable_method(self): diff --git a/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py b/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py new file mode 100644 index 0000000000000..263b6e5b44f02 --- /dev/null +++ b/tests/system/providers/amazon/aws/example_redshift_s3_transfers.py @@ -0,0 +1,266 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from datetime import datetime + +import boto3 + +from airflow import DAG, settings +from airflow.decorators import task +from airflow.models import Connection +from airflow.models.baseoperator import chain +from airflow.providers.amazon.aws.hooks.redshift_cluster import RedshiftHook +from airflow.providers.amazon.aws.operators.redshift_cluster import ( + RedshiftCreateClusterOperator, + RedshiftDeleteClusterOperator, +) +from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator +from airflow.providers.amazon.aws.operators.s3 import ( + S3CreateBucketOperator, + S3CreateObjectOperator, + S3DeleteBucketOperator, +) +from airflow.providers.amazon.aws.sensors.redshift_cluster import RedshiftClusterSensor +from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor +from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator +from airflow.providers.amazon.aws.transfers.s3_to_redshift import S3ToRedshiftOperator +from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder + +DAG_ID = 'example_redshift_to_s3' + +DB_LOGIN = 'adminuser' +DB_PASS = 'MyAmazonPassword1' +DB_NAME = 'dev' + +IP_PERMISSION = { + 'FromPort': -1, + 'IpProtocol': 'All', + 'IpRanges': [{'CidrIp': '0.0.0.0/0', 'Description': 'Test description'}], +} + +S3_KEY = 's3_key' +S3_KEY_2 = 's3_key_2' +REDSHIFT_TABLE = 'test_table' + +SQL_CREATE_TABLE = f""" + CREATE TABLE IF NOT EXISTS {REDSHIFT_TABLE} ( + fruit_id INTEGER, + name VARCHAR NOT NULL, + color VARCHAR NOT NULL + ); +""" + +SQL_INSERT_DATA = f"INSERT INTO {REDSHIFT_TABLE} VALUES ( 1, 'Banana', 'Yellow');" + +SQL_DROP_TABLE = f"DROP TABLE IF EXISTS {REDSHIFT_TABLE};" + +DATA = "0, 'Airflow', 'testing'" + + +sys_test_context_task = SystemTestContextBuilder().build() + + +@task +def create_connection(conn_id_name: str, cluster_id: str): + redshift_hook = RedshiftHook() + cluster_endpoint = redshift_hook.get_conn().describe_clusters(ClusterIdentifier=cluster_id)['Clusters'][0] + conn = Connection( + conn_id=conn_id_name, + conn_type='redshift', + host=cluster_endpoint['Endpoint']['Address'], + login=DB_LOGIN, + password=DB_PASS, + port=cluster_endpoint['Endpoint']['Port'], + schema=cluster_endpoint['DBName'], + ) + session = settings.Session() + session.add(conn) + session.commit() + + +@task +def setup_security_group(sec_group_name: str, ip_permissions: list[dict]): + client = boto3.client('ec2') + vpc_id = client.describe_vpcs()['Vpcs'][0]['VpcId'] + security_group = client.create_security_group( + Description='Redshift-system-test', GroupName=sec_group_name, VpcId=vpc_id + ) + client.get_waiter('security_group_exists').wait( + GroupIds=[security_group['GroupId']], + GroupNames=[sec_group_name], + WaiterConfig={'Delay': 15, 'MaxAttempts': 4}, + ) + client.authorize_security_group_ingress( + GroupId=security_group['GroupId'], GroupName=sec_group_name, IpPermissions=ip_permissions + ) + return security_group['GroupId'] + + +@task(trigger_rule=TriggerRule.ALL_DONE) +def delete_security_group(sec_group_id: str, sec_group_name: str): + boto3.client('ec2').delete_security_group(GroupId=sec_group_id, GroupName=sec_group_name) + + +with DAG( + dag_id=DAG_ID, + start_date=datetime(2021, 1, 1), + schedule='@once', + catchup=False, + tags=['example'], +) as dag: + test_context = sys_test_context_task() + env_id = test_context[ENV_ID_KEY] + redshift_cluster_identifier = f'{env_id}-redshift-cluster' + conn_id_name = f'{env_id}-conn-id' + sg_name = f'{env_id}-sg' + bucket_name = f'{env_id}-bucket' + + set_up_sg = setup_security_group(sec_group_name=sg_name, ip_permissions=[IP_PERMISSION]) + + create_bucket = S3CreateBucketOperator( + task_id='s3_create_bucket', + bucket_name=bucket_name, + ) + + create_cluster = RedshiftCreateClusterOperator( + task_id='create_cluster', + cluster_identifier=redshift_cluster_identifier, + vpc_security_group_ids=[set_up_sg], + publicly_accessible=True, + cluster_type='single-node', + node_type='dc2.large', + master_username=DB_LOGIN, + master_user_password=DB_PASS, + ) + + wait_cluster_available = RedshiftClusterSensor( + task_id='wait_cluster_available', + cluster_identifier=redshift_cluster_identifier, + target_status='available', + poke_interval=5, + timeout=60 * 15, + ) + + set_up_connection = create_connection(conn_id_name, cluster_id=redshift_cluster_identifier) + + create_object = S3CreateObjectOperator( + task_id='create_object', + s3_bucket=bucket_name, + s3_key=S3_KEY_2, + data=DATA, + replace=True, + ) + + create_table_redshift_data = RedshiftSQLOperator( + task_id='create_table_redshift_data', + redshift_conn_id=conn_id_name, + sql=SQL_CREATE_TABLE, + ) + insert_data = RedshiftSQLOperator( + task_id='insert_data', + redshift_conn_id=conn_id_name, + sql=SQL_INSERT_DATA, + ) + + # [START howto_transfer_redshift_to_s3] + transfer_redshift_to_s3 = RedshiftToS3Operator( + task_id='transfer_redshift_to_s3', + redshift_conn_id=conn_id_name, + s3_bucket=bucket_name, + s3_key=S3_KEY, + schema='PUBLIC', + table=REDSHIFT_TABLE, + ) + # [END howto_transfer_redshift_to_s3] + + check_if_key_exists = S3KeySensor( + task_id='check_if_key_exists', + bucket_name=bucket_name, + bucket_key=f'{S3_KEY}/{REDSHIFT_TABLE}_0000_part_00', + ) + + # [START howto_transfer_s3_to_redshift] + transfer_s3_to_redshift = S3ToRedshiftOperator( + task_id='transfer_s3_to_redshift', + redshift_conn_id=conn_id_name, + s3_bucket=bucket_name, + s3_key=S3_KEY_2, + schema='PUBLIC', + table=REDSHIFT_TABLE, + copy_options=['csv'], + ) + # [END howto_transfer_s3_to_redshift] + + drop_table = RedshiftSQLOperator( + task_id='drop_table', + redshift_conn_id=conn_id_name, + sql=SQL_DROP_TABLE, + trigger_rule=TriggerRule.ALL_DONE, + ) + delete_cluster = RedshiftDeleteClusterOperator( + task_id='delete_cluster', + cluster_identifier=redshift_cluster_identifier, + trigger_rule=TriggerRule.ALL_DONE, + ) + + delete_sg = delete_security_group( + sec_group_id=set_up_sg['security_group_id'], + sec_group_name=sg_name, + ) + + delete_bucket = S3DeleteBucketOperator( + task_id='delete_bucket', + bucket_name=bucket_name, + force_delete=True, + trigger_rule=TriggerRule.ALL_DONE, + ) + + chain( + # TEST SETUP + test_context, + set_up_sg, + create_bucket, + create_cluster, + wait_cluster_available, + set_up_connection, + create_object, + create_table_redshift_data, + insert_data, + # TEST BODY + transfer_redshift_to_s3, + check_if_key_exists, + transfer_s3_to_redshift, + # TEST TEARDOWN + drop_table, + delete_cluster, + delete_sg, + delete_bucket, + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/amazon/aws/example_redshift_to_s3.py b/tests/system/providers/amazon/aws/example_redshift_to_s3.py deleted file mode 100644 index 58c5d61d2d79c..0000000000000 --- a/tests/system/providers/amazon/aws/example_redshift_to_s3.py +++ /dev/null @@ -1,53 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -from __future__ import annotations - -from datetime import datetime -from os import getenv - -from airflow import DAG -from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator -from tests.system.providers.amazon.aws.utils import set_env_id - -ENV_ID = set_env_id() -DAG_ID = 'example_redshift_to_s3' -S3_BUCKET_NAME = getenv("S3_BUCKET_NAME", "s3_bucket_name") -S3_KEY = getenv("S3_KEY", "s3_key") -REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "redshift_table") - -with DAG( - dag_id=DAG_ID, - start_date=datetime(2021, 1, 1), - schedule=None, - catchup=False, - tags=['example'], -) as dag: - # [START howto_transfer_redshift_to_s3] - task_transfer_redshift_to_s3 = RedshiftToS3Operator( - task_id='transfer_redshift_to_s3', - s3_bucket=S3_BUCKET_NAME, - s3_key=S3_KEY, - schema='PUBLIC', - table=REDSHIFT_TABLE, - ) - # [END howto_transfer_redshift_to_s3] - - -from tests.system.utils import get_test_run # noqa: E402 - -# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) -test_run = get_test_run(dag)