From 51b6c2305eef43ce99f34069ae270b5732a5f151 Mon Sep 17 00:00:00 2001 From: Maksim Yermakou Date: Mon, 26 May 2025 15:03:59 +0000 Subject: [PATCH] Create DAG for showing how to enable Ray on GKE cluster --- .../tests/unit/always/test_example_dags.py | 5 - .../operators/cloud/kubernetes_engine.rst | 12 +++ .../example_kubernetes_engine_ray.py | 96 +++++++++++++++++++ 3 files changed, 108 insertions(+), 5 deletions(-) create mode 100644 providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_ray.py diff --git a/airflow-core/tests/unit/always/test_example_dags.py b/airflow-core/tests/unit/always/test_example_dags.py index fe951b8700434..937e703294afd 100644 --- a/airflow-core/tests/unit/always/test_example_dags.py +++ b/airflow-core/tests/unit/always/test_example_dags.py @@ -51,17 +51,12 @@ # Generally, these should be resolved as soon as a parameter or operator is deprecated. # If the deprecation is postponed, the item should be added to this tuple, # and a corresponding Issue should be created on GitHub. - "providers/google/tests/system/google/cloud/bigquery/example_bigquery_operations.py", - "providers/google/tests/system/google/cloud/dataflow/example_dataflow_sql.py", "providers/google/tests/system/google/cloud/dataproc/example_dataproc_gke.py", - "providers/google/tests/system/google/cloud/datapipelines/example_datapipeline.py", - "providers/google/tests/system/google/cloud/gcs/example_gcs_sensor.py", "providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine.py", "providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_async.py", "providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_job.py", "providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_kueue.py", "providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_resource.py", - "providers/google/tests/system/google/cloud/life_sciences/example_life_sciences.py", # Deprecated Operators/Hooks, which replaced by common.sql Operators/Hooks ) diff --git a/providers/google/docs/operators/cloud/kubernetes_engine.rst b/providers/google/docs/operators/cloud/kubernetes_engine.rst index 07ecec664240e..8a3176c0ce1b5 100644 --- a/providers/google/docs/operators/cloud/kubernetes_engine.rst +++ b/providers/google/docs/operators/cloud/kubernetes_engine.rst @@ -70,6 +70,18 @@ lot less resources wasted on idle Operators or Sensors: :start-after: [START howto_operator_gke_create_cluster_async] :end-before: [END howto_operator_gke_create_cluster_async] +Create GKE cluster with Ray enabled +''''''''''''''''''''''''''''''''''' + +`Ray `__ is an open source framework to build and scale ML and Python applications. + +Here is an example of a cluster definition with Ray enabled: + +.. exampleinclude:: /../../google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_ray.py + :language: python + :start-after: [START howto_operator_gcp_gke_create_cluster_definition_with_ray] + :end-before: [END howto_operator_gcp_gke_create_cluster_definition_with_ray] + .. _howto/operator:GKEStartKueueInsideClusterOperator: diff --git a/providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_ray.py b/providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_ray.py new file mode 100644 index 0000000000000..193b8c316cf3c --- /dev/null +++ b/providers/google/tests/system/google/cloud/kubernetes_engine/example_kubernetes_engine_ray.py @@ -0,0 +1,96 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +""" +Example Airflow DAG for creating GKE cluster with Ray enabled. +""" + +from __future__ import annotations + +import os +from datetime import datetime + +from airflow.models.dag import DAG +from airflow.providers.google.cloud.operators.kubernetes_engine import ( + GKECreateClusterOperator, + GKEDeleteClusterOperator, +) +from airflow.utils.trigger_rule import TriggerRule + +from system.google import DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID + +ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID", "default") +DAG_ID = "kubernetes_engine_ray" +GCP_PROJECT_ID = os.environ.get("SYSTEM_TESTS_GCP_PROJECT") or DEFAULT_GCP_SYSTEM_TEST_PROJECT_ID + +GCP_LOCATION = "europe-west1" +CLUSTER_NAME_BASE = f"cluster-{DAG_ID}".replace("_", "-") +CLUSTER_NAME_FULL = CLUSTER_NAME_BASE + f"-{ENV_ID}".replace("_", "-") +CLUSTER_NAME = CLUSTER_NAME_BASE if len(CLUSTER_NAME_FULL) >= 33 else CLUSTER_NAME_FULL + +# [START howto_operator_gcp_gke_create_cluster_definition_with_ray] +CLUSTER = { + "name": CLUSTER_NAME, + "node_pools": [ + { + "name": f"{CLUSTER_NAME}-node", + "initial_node_count": 1, + }, + ], + "autopilot": {"enabled": True}, + "addons_config": { + "ray_operator_config": {"enabled": True}, + }, +} +# [END howto_operator_gcp_gke_create_cluster_definition_with_ray] + + +with DAG( + DAG_ID, + schedule="@once", # Override to match your needs + start_date=datetime(2021, 1, 1), + catchup=False, + tags=["example"], +) as dag: + create_cluster = GKECreateClusterOperator( + task_id="create_cluster", + project_id=GCP_PROJECT_ID, + location=GCP_LOCATION, + body=CLUSTER, + ) + + delete_cluster = GKEDeleteClusterOperator( + task_id="delete_cluster", + cluster_name=CLUSTER_NAME, + project_id=GCP_PROJECT_ID, + location=GCP_LOCATION, + ) + delete_cluster.trigger_rule = TriggerRule.ALL_DONE + + create_cluster >> delete_cluster + + from tests_common.test_utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "teardown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests_common.test_utils.system_tests import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag)