From 5479e23d603b4e9e0f95e86e99c983b4df9f09d9 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Wed, 14 Sep 2022 16:25:22 -0700 Subject: [PATCH 1/2] Convert EKS with Nodegroups sample DAG to a system test (AIP-47) --- .../aws}/example_eks_with_nodegroups.py | 129 +++++++++++------- 1 file changed, 77 insertions(+), 52 deletions(-) rename {airflow/providers/amazon/aws/example_dags => tests/system/providers/amazon/aws}/example_eks_with_nodegroups.py (52%) diff --git a/airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py b/tests/system/providers/amazon/aws/example_eks_with_nodegroups.py similarity index 52% rename from airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py rename to tests/system/providers/amazon/aws/example_eks_with_nodegroups.py index f0d839cb39b06..1d750e9f44f26 100644 --- a/airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py +++ b/tests/system/providers/amazon/aws/example_eks_with_nodegroups.py @@ -17,8 +17,8 @@ from __future__ import annotations from datetime import datetime -from os import environ +from airflow.models.baseoperator import chain from airflow.models.dag import DAG from airflow.providers.amazon.aws.hooks.eks import ClusterStates, NodegroupStates from airflow.providers.amazon.aws.operators.eks import ( @@ -29,75 +29,81 @@ EksPodOperator, ) from airflow.providers.amazon.aws.sensors.eks import EksClusterStateSensor, EksNodegroupStateSensor +from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder # Ignore missing args provided by default_args # type: ignore[call-arg] -CLUSTER_NAME = 'eks-demo' -NODEGROUP_SUFFIX = '-nodegroup' -NODEGROUP_NAME = CLUSTER_NAME + NODEGROUP_SUFFIX -ROLE_ARN = environ.get('EKS_DEMO_ROLE_ARN', 'arn:aws:iam::123456789012:role/role_name') -SUBNETS = environ.get('EKS_DEMO_SUBNETS', 'subnet-12345ab subnet-67890cd').split(' ') -VPC_CONFIG = { - 'subnetIds': SUBNETS, - 'endpointPublicAccess': True, - 'endpointPrivateAccess': False, -} +DAG_ID = 'example_eks_with_nodegroups' +# Externally fetched variables: +ROLE_ARN_KEY = 'ROLE_ARN' +SUBNETS_KEY = 'SUBNETS' + +sys_test_context_task = ( + SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).add_variable(SUBNETS_KEY, split_string=True).build() +) with DAG( - dag_id='example_eks_with_nodegroups', + dag_id=DAG_ID, + schedule='@once', start_date=datetime(2021, 1, 1), tags=['example'], catchup=False, ) as dag: + test_context = sys_test_context_task() + env_id = test_context[ENV_ID_KEY] + + cluster_name = f'{env_id}-cluster' + nodegroup_name = f'{env_id}-nodegroup' # [START howto_operator_eks_create_cluster] # Create an Amazon EKS Cluster control plane without attaching compute service. create_cluster = EksCreateClusterOperator( - task_id='create_eks_cluster', - cluster_name=CLUSTER_NAME, - cluster_role_arn=ROLE_ARN, - resources_vpc_config=VPC_CONFIG, + task_id='create_cluster', + cluster_name=cluster_name, + cluster_role_arn=test_context[ROLE_ARN_KEY], + resources_vpc_config={'subnetIds': test_context[SUBNETS_KEY]}, compute=None, ) # [END howto_operator_eks_create_cluster] # [START howto_sensor_eks_cluster] await_create_cluster = EksClusterStateSensor( - task_id='wait_for_create_cluster', - cluster_name=CLUSTER_NAME, + task_id='await_create_cluster', + cluster_name=cluster_name, target_state=ClusterStates.ACTIVE, ) # [END howto_sensor_eks_cluster] # [START howto_operator_eks_create_nodegroup] create_nodegroup = EksCreateNodegroupOperator( - task_id='create_eks_nodegroup', - cluster_name=CLUSTER_NAME, - nodegroup_name=NODEGROUP_NAME, - nodegroup_subnets=SUBNETS, - nodegroup_role_arn=ROLE_ARN, + task_id='create_nodegroup', + cluster_name=cluster_name, + nodegroup_name=nodegroup_name, + nodegroup_subnets=test_context[SUBNETS_KEY], + nodegroup_role_arn=test_context[ROLE_ARN_KEY], ) # [END howto_operator_eks_create_nodegroup] # [START howto_sensor_eks_nodegroup] await_create_nodegroup = EksNodegroupStateSensor( - task_id='wait_for_create_nodegroup', - cluster_name=CLUSTER_NAME, - nodegroup_name=NODEGROUP_NAME, + task_id='await_create_nodegroup', + cluster_name=cluster_name, + nodegroup_name=nodegroup_name, target_state=NodegroupStates.ACTIVE, ) # [END howto_sensor_eks_nodegroup] # [START howto_operator_eks_pod_operator] start_pod = EksPodOperator( - task_id="run_pod", - cluster_name=CLUSTER_NAME, - pod_name="run_pod", - image="amazon/aws-cli:latest", - cmds=["sh", "-c", "ls"], - labels={"demo": "hello_world"}, + task_id='start_pod', + pod_name='test_pod', + cluster_name=cluster_name, + image='amazon/aws-cli:latest', + cmds=['sh', '-c', 'echo Test Airflow; date'], + labels={'demo': 'hello_world'}, get_logs=True, # Delete the pod when it reaches its final state, or the execution is interrupted. is_delete_operator_pod=True, @@ -106,40 +112,59 @@ # [START howto_operator_eks_delete_nodegroup] delete_nodegroup = EksDeleteNodegroupOperator( - task_id='delete_eks_nodegroup', - cluster_name=CLUSTER_NAME, - nodegroup_name=NODEGROUP_NAME, + task_id='delete_nodegroup', + cluster_name=cluster_name, + nodegroup_name=nodegroup_name, ) # [END howto_operator_eks_delete_nodegroup] + delete_nodegroup.trigger_rule = TriggerRule.ALL_DONE await_delete_nodegroup = EksNodegroupStateSensor( - task_id='wait_for_delete_nodegroup', - cluster_name=CLUSTER_NAME, - nodegroup_name=NODEGROUP_NAME, + task_id='await_delete_nodegroup', + trigger_rule=TriggerRule.ALL_DONE, + cluster_name=cluster_name, + nodegroup_name=nodegroup_name, target_state=NodegroupStates.NONEXISTENT, ) # [START howto_operator_eks_delete_cluster] delete_cluster = EksDeleteClusterOperator( - task_id='delete_eks_cluster', - cluster_name=CLUSTER_NAME, + task_id='delete_cluster', + cluster_name=cluster_name, ) # [END howto_operator_eks_delete_cluster] + delete_cluster.trigger_rule = TriggerRule.ALL_DONE await_delete_cluster = EksClusterStateSensor( - task_id='wait_for_delete_cluster', - cluster_name=CLUSTER_NAME, + task_id='await_delete_cluster', + trigger_rule=TriggerRule.ALL_DONE, + cluster_name=cluster_name, target_state=ClusterStates.NONEXISTENT, ) - ( - create_cluster - >> await_create_cluster - >> create_nodegroup - >> await_create_nodegroup - >> start_pod - >> delete_nodegroup - >> await_delete_nodegroup - >> delete_cluster - >> await_delete_cluster + chain( + # TEST SETUP + test_context, + # TEST BODY + create_cluster, + await_create_cluster, + create_nodegroup, + await_create_nodegroup, + start_pod, + delete_nodegroup, + await_delete_nodegroup, + delete_cluster, + await_delete_cluster, ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) From 1663c3e653ccb5ff46a510b52b5aa66df32f8b15 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Tue, 20 Sep 2022 16:23:05 -0700 Subject: [PATCH 2/2] Fix docs links --- .../operators/eks.rst | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/apache-airflow-providers-amazon/operators/eks.rst b/docs/apache-airflow-providers-amazon/operators/eks.rst index dadfd32a73cde..925a1d238d993 100644 --- a/docs/apache-airflow-providers-amazon/operators/eks.rst +++ b/docs/apache-airflow-providers-amazon/operators/eks.rst @@ -46,7 +46,7 @@ Note: An AWS IAM role with the following permissions is required: ``eks.amazonaws.com`` must be added to the Trusted Relationships ``AmazonEKSClusterPolicy`` IAM Policy must be attached -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py :language: python :dedent: 4 :start-after: [START howto_operator_eks_create_cluster] @@ -98,7 +98,7 @@ Delete an Amazon EKS Cluster To delete an existing Amazon EKS Cluster you can use :class:`~airflow.providers.amazon.aws.operators.eks.EksDeleteClusterOperator`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py :language: python :dedent: 4 :start-after: [START howto_operator_eks_delete_cluster] @@ -127,7 +127,7 @@ Note: An AWS IAM role with the following permissions is required: ``AmazonEC2ContainerRegistryReadOnly`` IAM Policy must be attached ``AmazonEKSWorkerNodePolicy`` IAM Policy must be attached -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py :language: python :dedent: 4 :start-after: [START howto_operator_eks_create_nodegroup] @@ -141,7 +141,7 @@ Delete an Amazon EKS managed node group To delete an existing Amazon EKS managed node group you can use :class:`~airflow.providers.amazon.aws.operators.eks.EksDeleteNodegroupOperator`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py :language: python :dedent: 4 :start-after: [START howto_operator_eks_delete_nodegroup] @@ -190,7 +190,7 @@ To run a pod on an existing Amazon EKS Cluster, you can use Note: An Amazon EKS Cluster with underlying compute infrastructure is required. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py :language: python :dedent: 4 :start-after: [START howto_operator_eks_pod_operator] @@ -207,7 +207,7 @@ Wait on an Amazon EKS cluster state To check the state of an Amazon EKS Cluster until it reaches the target state or another terminal state you can use :class:`~airflow.providers.amazon.aws.sensors.eks.EksClusterStateSensor`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py :language: python :dedent: 4 :start-after: [START howto_sensor_eks_cluster] @@ -221,7 +221,7 @@ Wait on an Amazon EKS managed node group state To check the state of an Amazon EKS managed node group until it reaches the target state or another terminal state you can use :class:`~airflow.providers.amazon.aws.sensors.eks.EksNodegroupStateSensor`. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py :language: python :dedent: 4 :start-after: [START howto_sensor_eks_nodegroup]