diff --git a/docs/apache-airflow-providers-amazon/operators/eks.rst b/docs/apache-airflow-providers-amazon/operators/eks.rst index ee38f3d31f050..dadfd32a73cde 100644 --- a/docs/apache-airflow-providers-amazon/operators/eks.rst +++ b/docs/apache-airflow-providers-amazon/operators/eks.rst @@ -65,7 +65,7 @@ Note: An AWS IAM role with the following permissions is required: ``AmazonEKSClusterPolicy`` IAM Policy must be attached ``AmazonEKSWorkerNodePolicy`` IAM Policy must be attached -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py :language: python :dedent: 4 :start-after: [START howto_operator_eks_create_cluster_with_nodegroup] @@ -108,7 +108,7 @@ Note: If the cluster has any attached resources, such as an Amazon EKS Nodegroup Fargate profile, the cluster can not be deleted. Using the ``force`` parameter will attempt to delete any attached resources first. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py :language: python :dedent: 4 :start-after: [START howto_operator_eks_force_delete_cluster] diff --git a/airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py b/tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py similarity index 53% rename from airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py rename to tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py index ce6032c37c3e0..6da3aee9d56b4 100644 --- a/airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py +++ b/tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py @@ -17,8 +17,8 @@ from __future__ import annotations from datetime import datetime -from os import environ +from airflow.models.baseoperator import chain from airflow.models.dag import DAG from airflow.providers.amazon.aws.hooks.eks import ClusterStates, NodegroupStates from airflow.providers.amazon.aws.operators.eks import ( @@ -27,58 +27,63 @@ EksPodOperator, ) from airflow.providers.amazon.aws.sensors.eks import EksClusterStateSensor, EksNodegroupStateSensor +from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder -# Ignore missing args provided by default_args -# type: ignore[call-arg] +DAG_ID = 'example_eks_with_nodegroup_in_one_step' -CLUSTER_NAME = environ.get('EKS_CLUSTER_NAME', 'eks-demo') -NODEGROUP_NAME = f'{CLUSTER_NAME}-nodegroup' -ROLE_ARN = environ.get('EKS_DEMO_ROLE_ARN', 'arn:aws:iam::123456789012:role/role_name') -SUBNETS = environ.get('EKS_DEMO_SUBNETS', 'subnet-12345ab subnet-67890cd').split(' ') -VPC_CONFIG = { - 'subnetIds': SUBNETS, - 'endpointPublicAccess': True, - 'endpointPrivateAccess': False, -} +# Externally fetched variables: +ROLE_ARN_KEY = 'ROLE_ARN' +SUBNETS_KEY = 'SUBNETS' + +sys_test_context_task = ( + SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).add_variable(SUBNETS_KEY, split_string=True).build() +) with DAG( - dag_id='example_eks_with_nodegroup_in_one_step', + dag_id=DAG_ID, + schedule='@once', start_date=datetime(2021, 1, 1), tags=['example'], catchup=False, ) as dag: + test_context = sys_test_context_task() + env_id = test_context[ENV_ID_KEY] + + cluster_name = f'{env_id}-cluster' + nodegroup_name = f'{env_id}-nodegroup' # [START howto_operator_eks_create_cluster_with_nodegroup] # Create an Amazon EKS cluster control plane and an EKS nodegroup compute platform in one step. create_cluster_and_nodegroup = EksCreateClusterOperator( - task_id='create_eks_cluster_and_nodegroup', - cluster_name=CLUSTER_NAME, - nodegroup_name=NODEGROUP_NAME, - cluster_role_arn=ROLE_ARN, - nodegroup_role_arn=ROLE_ARN, + task_id='create_cluster_and_nodegroup', + cluster_name=cluster_name, + nodegroup_name=nodegroup_name, + cluster_role_arn=test_context[ROLE_ARN_KEY], # Opting to use the same ARN for the cluster and the nodegroup here, # but a different ARN could be configured and passed if desired. - resources_vpc_config=VPC_CONFIG, - # Compute defaults to 'nodegroup' but is called out here for the purposed of the example. + nodegroup_role_arn=test_context[ROLE_ARN_KEY], + resources_vpc_config={'subnetIds': test_context[SUBNETS_KEY]}, + # ``compute='nodegroup'`` is the default, explicitly set here for demo purposes. compute='nodegroup', ) # [END howto_operator_eks_create_cluster_with_nodegroup] await_create_nodegroup = EksNodegroupStateSensor( - task_id='wait_for_create_nodegroup', - cluster_name=CLUSTER_NAME, - nodegroup_name=NODEGROUP_NAME, + task_id='await_create_nodegroup', + cluster_name=cluster_name, + nodegroup_name=nodegroup_name, target_state=NodegroupStates.ACTIVE, ) start_pod = EksPodOperator( - task_id="run_pod", - cluster_name=CLUSTER_NAME, - pod_name="run_pod", - image="amazon/aws-cli:latest", - cmds=["sh", "-c", "echo Test Airflow; date"], - labels={"demo": "hello_world"}, + task_id='start_pod', + pod_name='test_pod', + cluster_name=cluster_name, + image='amazon/aws-cli:latest', + cmds=['sh', '-c', 'echo Test Airflow; date'], + labels={'demo': 'hello_world'}, get_logs=True, # Delete the pod when it reaches its final state, or the execution is interrupted. is_delete_operator_pod=True, @@ -87,23 +92,40 @@ # [START howto_operator_eks_force_delete_cluster] # An Amazon EKS cluster can not be deleted with attached resources such as nodegroups or Fargate profiles. # Setting the `force` to `True` will delete any attached resources before deleting the cluster. - delete_all = EksDeleteClusterOperator( + delete_nodegroup_and_cluster = EksDeleteClusterOperator( task_id='delete_nodegroup_and_cluster', - cluster_name=CLUSTER_NAME, + trigger_rule=TriggerRule.ALL_DONE, + cluster_name=cluster_name, force_delete_compute=True, ) # [END howto_operator_eks_force_delete_cluster] await_delete_cluster = EksClusterStateSensor( - task_id='wait_for_delete_cluster', - cluster_name=CLUSTER_NAME, + task_id='await_delete_cluster', + trigger_rule=TriggerRule.ALL_DONE, + cluster_name=cluster_name, target_state=ClusterStates.NONEXISTENT, ) - ( - create_cluster_and_nodegroup - >> await_create_nodegroup - >> start_pod - >> delete_all - >> await_delete_cluster + chain( + # TEST SETUP + test_context, + # TEST BODY + create_cluster_and_nodegroup, + await_create_nodegroup, + start_pod, + delete_nodegroup_and_cluster, + await_delete_cluster, ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)