Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions docs/apache-airflow-providers-amazon/operators/eks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Note: An AWS IAM role with the following permissions is required:
``eks.amazonaws.com`` must be added to the Trusted Relationships
``AmazonEKSClusterPolicy`` IAM Policy must be attached

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py
:language: python
:dedent: 4
:start-after: [START howto_operator_eks_create_cluster]
Expand Down Expand Up @@ -98,7 +98,7 @@ Delete an Amazon EKS Cluster
To delete an existing Amazon EKS Cluster you can use
:class:`~airflow.providers.amazon.aws.operators.eks.EksDeleteClusterOperator`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py
:language: python
:dedent: 4
:start-after: [START howto_operator_eks_delete_cluster]
Expand Down Expand Up @@ -127,7 +127,7 @@ Note: An AWS IAM role with the following permissions is required:
``AmazonEC2ContainerRegistryReadOnly`` IAM Policy must be attached
``AmazonEKSWorkerNodePolicy`` IAM Policy must be attached

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py
:language: python
:dedent: 4
:start-after: [START howto_operator_eks_create_nodegroup]
Expand All @@ -141,7 +141,7 @@ Delete an Amazon EKS managed node group
To delete an existing Amazon EKS managed node group you can use
:class:`~airflow.providers.amazon.aws.operators.eks.EksDeleteNodegroupOperator`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py
:language: python
:dedent: 4
:start-after: [START howto_operator_eks_delete_nodegroup]
Expand Down Expand Up @@ -190,7 +190,7 @@ To run a pod on an existing Amazon EKS Cluster, you can use

Note: An Amazon EKS Cluster with underlying compute infrastructure is required.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py
:language: python
:dedent: 4
:start-after: [START howto_operator_eks_pod_operator]
Expand All @@ -207,7 +207,7 @@ Wait on an Amazon EKS cluster state
To check the state of an Amazon EKS Cluster until it reaches the target state or another terminal
state you can use :class:`~airflow.providers.amazon.aws.sensors.eks.EksClusterStateSensor`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_eks_cluster]
Expand All @@ -221,7 +221,7 @@ Wait on an Amazon EKS managed node group state
To check the state of an Amazon EKS managed node group until it reaches the target state or another terminal
state you can use :class:`~airflow.providers.amazon.aws.sensors.eks.EksNodegroupStateSensor`.

.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_eks_with_nodegroups.py
.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_eks_with_nodegroups.py
:language: python
:dedent: 4
:start-after: [START howto_sensor_eks_nodegroup]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
from __future__ import annotations

from datetime import datetime
from os import environ

from airflow.models.baseoperator import chain
from airflow.models.dag import DAG
from airflow.providers.amazon.aws.hooks.eks import ClusterStates, NodegroupStates
from airflow.providers.amazon.aws.operators.eks import (
Expand All @@ -29,75 +29,81 @@
EksPodOperator,
)
from airflow.providers.amazon.aws.sensors.eks import EksClusterStateSensor, EksNodegroupStateSensor
from airflow.utils.trigger_rule import TriggerRule
from tests.system.providers.amazon.aws.utils import ENV_ID_KEY, SystemTestContextBuilder

# Ignore missing args provided by default_args
# type: ignore[call-arg]

CLUSTER_NAME = 'eks-demo'
NODEGROUP_SUFFIX = '-nodegroup'
NODEGROUP_NAME = CLUSTER_NAME + NODEGROUP_SUFFIX
ROLE_ARN = environ.get('EKS_DEMO_ROLE_ARN', 'arn:aws:iam::123456789012:role/role_name')
SUBNETS = environ.get('EKS_DEMO_SUBNETS', 'subnet-12345ab subnet-67890cd').split(' ')
VPC_CONFIG = {
'subnetIds': SUBNETS,
'endpointPublicAccess': True,
'endpointPrivateAccess': False,
}
DAG_ID = 'example_eks_with_nodegroups'

# Externally fetched variables:
ROLE_ARN_KEY = 'ROLE_ARN'
SUBNETS_KEY = 'SUBNETS'

sys_test_context_task = (
SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).add_variable(SUBNETS_KEY, split_string=True).build()
)

with DAG(
dag_id='example_eks_with_nodegroups',
dag_id=DAG_ID,
schedule='@once',
start_date=datetime(2021, 1, 1),
tags=['example'],
catchup=False,
) as dag:
test_context = sys_test_context_task()
env_id = test_context[ENV_ID_KEY]

cluster_name = f'{env_id}-cluster'
nodegroup_name = f'{env_id}-nodegroup'

# [START howto_operator_eks_create_cluster]
# Create an Amazon EKS Cluster control plane without attaching compute service.
create_cluster = EksCreateClusterOperator(
task_id='create_eks_cluster',
cluster_name=CLUSTER_NAME,
cluster_role_arn=ROLE_ARN,
resources_vpc_config=VPC_CONFIG,
task_id='create_cluster',
cluster_name=cluster_name,
cluster_role_arn=test_context[ROLE_ARN_KEY],
resources_vpc_config={'subnetIds': test_context[SUBNETS_KEY]},
compute=None,
)
# [END howto_operator_eks_create_cluster]

# [START howto_sensor_eks_cluster]
await_create_cluster = EksClusterStateSensor(
task_id='wait_for_create_cluster',
cluster_name=CLUSTER_NAME,
task_id='await_create_cluster',
cluster_name=cluster_name,
target_state=ClusterStates.ACTIVE,
)
# [END howto_sensor_eks_cluster]

# [START howto_operator_eks_create_nodegroup]
create_nodegroup = EksCreateNodegroupOperator(
task_id='create_eks_nodegroup',
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
nodegroup_subnets=SUBNETS,
nodegroup_role_arn=ROLE_ARN,
task_id='create_nodegroup',
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
nodegroup_subnets=test_context[SUBNETS_KEY],
nodegroup_role_arn=test_context[ROLE_ARN_KEY],
)
# [END howto_operator_eks_create_nodegroup]

# [START howto_sensor_eks_nodegroup]
await_create_nodegroup = EksNodegroupStateSensor(
task_id='wait_for_create_nodegroup',
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
task_id='await_create_nodegroup',
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
target_state=NodegroupStates.ACTIVE,
)
# [END howto_sensor_eks_nodegroup]

# [START howto_operator_eks_pod_operator]
start_pod = EksPodOperator(
task_id="run_pod",
cluster_name=CLUSTER_NAME,
pod_name="run_pod",
image="amazon/aws-cli:latest",
cmds=["sh", "-c", "ls"],
labels={"demo": "hello_world"},
task_id='start_pod',
pod_name='test_pod',
cluster_name=cluster_name,
image='amazon/aws-cli:latest',
cmds=['sh', '-c', 'echo Test Airflow; date'],
labels={'demo': 'hello_world'},
get_logs=True,
# Delete the pod when it reaches its final state, or the execution is interrupted.
is_delete_operator_pod=True,
Expand All @@ -106,40 +112,59 @@

# [START howto_operator_eks_delete_nodegroup]
delete_nodegroup = EksDeleteNodegroupOperator(
task_id='delete_eks_nodegroup',
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
task_id='delete_nodegroup',
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
)
# [END howto_operator_eks_delete_nodegroup]
delete_nodegroup.trigger_rule = TriggerRule.ALL_DONE

await_delete_nodegroup = EksNodegroupStateSensor(
task_id='wait_for_delete_nodegroup',
cluster_name=CLUSTER_NAME,
nodegroup_name=NODEGROUP_NAME,
task_id='await_delete_nodegroup',
trigger_rule=TriggerRule.ALL_DONE,
cluster_name=cluster_name,
nodegroup_name=nodegroup_name,
target_state=NodegroupStates.NONEXISTENT,
)

# [START howto_operator_eks_delete_cluster]
delete_cluster = EksDeleteClusterOperator(
task_id='delete_eks_cluster',
cluster_name=CLUSTER_NAME,
task_id='delete_cluster',
cluster_name=cluster_name,
)
# [END howto_operator_eks_delete_cluster]
delete_cluster.trigger_rule = TriggerRule.ALL_DONE

await_delete_cluster = EksClusterStateSensor(
task_id='wait_for_delete_cluster',
cluster_name=CLUSTER_NAME,
task_id='await_delete_cluster',
trigger_rule=TriggerRule.ALL_DONE,
cluster_name=cluster_name,
target_state=ClusterStates.NONEXISTENT,
)

(
create_cluster
>> await_create_cluster
>> create_nodegroup
>> await_create_nodegroup
>> start_pod
>> delete_nodegroup
>> await_delete_nodegroup
>> delete_cluster
>> await_delete_cluster
chain(
# TEST SETUP
test_context,
# TEST BODY
create_cluster,
await_create_cluster,
create_nodegroup,
await_create_nodegroup,
start_pod,
delete_nodegroup,
await_delete_nodegroup,
delete_cluster,
await_delete_cluster,
)

from tests.system.utils.watcher import watcher

# This test needs watcher in order to properly mark success/failure
# when "tearDown" task with trigger rule is part of the DAG
list(dag.tasks) >> watcher()


from tests.system.utils import get_test_run # noqa: E402

# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest)
test_run = get_test_run(dag)