Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/apache-airflow-providers-amazon/operators/eks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Note: An AWS IAM role with the following permissions is required:
``AmazonEKSClusterPolicy`` IAM Policy must be attached
``AmazonEKSWorkerNodePolicy`` IAM Policy must be attached

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
:language: python
:dedent: 4
:start-after: [START howto_operator_eks_create_cluster_with_nodegroup]
Expand Down Expand Up @@ -108,7 +108,7 @@ Note: If the cluster has any attached resources, such as an Amazon EKS Nodegroup
Fargate profile, the cluster can not be deleted. Using the ``force`` parameter will
attempt to delete any attached resources first.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroup_in_one_step.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroup_in_one_step.py
:language: python
:dedent: 4
:start-after: [START howto_operator_eks_force_delete_cluster]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from __future__ import annotations

from datetime import datetime
from os import environ

from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.hooks.eks import ClusterStates, NodegroupStates
from airflow.providers.amazon.aws.operators.eks import (
Expand All @@ -27,58 +27,63 @@
EksPodOperator,
)
from airflow.providers.amazon.aws.sensors.eks import EksClusterStateSensor, EksNodegroupStateSensor
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder

# Ignore missing args provided by default_args
# type: ignore[call-arg]
DAG_ID = 'example_eks_with_nodegroup_in_one_step'

CLUSTER_NAME = environ.get('EKS_CLUSTER_NAME', 'eks-demo')
NODEGROUP_NAME = f'{CLUSTER_NAME}-nodegroup'
ROLE_ARN = environ.get('EKS_DEMO_ROLE_ARN', 'arn:aws:iam::123456789012:role/role_name')
SUBNETS = environ.get('EKS_DEMO_SUBNETS', 'subnet-12345ab subnet-67890cd').split(' ')
VPC_CONFIG = {
'subnetIds': SUBNETS,
'endpointPublicAccess': True,
'endpointPrivateAccess': False,
}
# Externally fetched variables:
ROLE_ARN_KEY = 'ROLE_ARN'
SUBNETS_KEY = 'SUBNETS'

sys_test_context_task = (
SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).add_variable(SUBNETS_KEY, split_string=True).build()
)


with DAG(
dag_id='example_eks_with_nodegroup_in_one_step',
dag_id=DAG_ID,
schedule='@once',
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as dag:
test_context = sys_test_context_task()
env_id = test_context[ENV_ID_KEY]

cluster_name = f'{env_id}-cluster'
nodegroup_name = f'{env_id}-nodegroup'

# [START howto_operator_eks_create_cluster_with_nodegroup]
# Create an Amazon EKS cluster control plane and an EKS nodegroup compute platform in one step.
create_cluster_and_nodegroup = EksCreateClusterOperator(
task_id='create_eks_cluster_and_nodegroup',
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
cluster_role_arn=ROLE_ARN,
nodegroup_role_arn=ROLE_ARN,
task_id='create_cluster_and_nodegroup',
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
cluster_role_arn=test_context[ROLE_ARN_KEY],
# Opting to use the same ARN for the cluster and the nodegroup here,
# but a different ARN could be configured and passed if desired.
resources_vpc_config=VPC_CONFIG,
# Compute defaults to 'nodegroup' but is called out here for the purposed of the example.
nodegroup_role_arn=test_context[ROLE_ARN_KEY],
resources_vpc_config={'subnetIds': test_context[SUBNETS_KEY]},
# ``compute='nodegroup'`` is the default, explicitly set here for demo purposes.
compute='nodegroup',
)
# [END howto_operator_eks_create_cluster_with_nodegroup]

await_create_nodegroup = EksNodegroupStateSensor(
task_id='wait_for_create_nodegroup',
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
task_id='await_create_nodegroup',
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
target_state=NodegroupStates.ACTIVE,
)

start_pod = EksPodOperator(
task_id="run_pod",
cluster_name=CLUSTER_NAME,
pod_name="run_pod",
image="amazon/aws-cli:latest",
cmds=["sh", "-c", "echo Test Airflow; date"],
labels={"demo": "hello_world"},
task_id='start_pod',
pod_name='test_pod',
cluster_name=cluster_name,
image='amazon/aws-cli:latest',
cmds=['sh', '-c', 'echo Test Airflow; date'],
labels={'demo': 'hello_world'},
get_logs=True,
# Delete the pod when it reaches its final state, or the execution is interrupted.
is_delete_operator_pod=True,
Expand All @@ -87,23 +92,40 @@
# [START howto_operator_eks_force_delete_cluster]
# An Amazon EKS cluster can not be deleted with attached resources such as nodegroups or Fargate profiles.
# Setting the `force` to `True` will delete any attached resources before deleting the cluster.
delete_all = EksDeleteClusterOperator(
delete_nodegroup_and_cluster = EksDeleteClusterOperator(
task_id='delete_nodegroup_and_cluster',
cluster_name=CLUSTER_NAME,
trigger_rule=TriggerRule.ALL_DONE,
cluster_name=cluster_name,
force_delete_compute=True,
)
# [END howto_operator_eks_force_delete_cluster]

await_delete_cluster = EksClusterStateSensor(
task_id='wait_for_delete_cluster',
cluster_name=CLUSTER_NAME,
task_id='await_delete_cluster',
trigger_rule=TriggerRule.ALL_DONE,
cluster_name=cluster_name,
target_state=ClusterStates.NONEXISTENT,
)

(
create_cluster_and_nodegroup
>> await_create_nodegroup
>> start_pod
>> delete_all
>> await_delete_cluster
chain(
# TEST SETUP
test_context,
# TEST BODY
create_cluster_and_nodegroup,
await_create_nodegroup,
start_pod,
delete_nodegroup_and_cluster,
await_delete_cluster,
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)