From 7092dcb2548ce33181dce1df4ce9f6e320ed9758 Mon Sep 17 00:00:00 2001 From: Maksim Moiseenkov Date: Wed, 14 Aug 2024 14:47:25 +0000 Subject: [PATCH] Refactor DataprocCreateBatchOperator and Dataproc system tests --- .../google/cloud/operators/dataproc.py | 160 ++++++++---------- .../google/cloud/triggers/dataproc.py | 5 +- scripts/ci/pre_commit/check_system_tests.py | 3 +- .../google/cloud/operators/test_dataproc.py | 8 +- .../google/cloud/triggers/test_dataproc.py | 30 +++- .../cloud/dataproc/example_dataproc_batch.py | 26 ++- .../example_dataproc_batch_deferrable.py | 3 + .../example_dataproc_batch_persistent.py | 4 + ...cluster_create_existing_stopped_cluster.py | 5 + .../example_dataproc_cluster_deferrable.py | 4 + .../example_dataproc_cluster_diagnose.py | 3 + .../example_dataproc_cluster_generator.py | 3 + .../example_dataproc_cluster_start_stop.py | 3 + .../example_dataproc_cluster_update.py | 4 + .../cloud/dataproc/example_dataproc_gke.py | 3 + .../cloud/dataproc/example_dataproc_hadoop.py | 3 + .../cloud/dataproc/example_dataproc_hive.py | 3 + .../cloud/dataproc/example_dataproc_pig.py | 3 + .../cloud/dataproc/example_dataproc_presto.py | 3 + .../dataproc/example_dataproc_pyspark.py | 3 + .../cloud/dataproc/example_dataproc_spark.py | 3 + .../dataproc/example_dataproc_spark_async.py | 3 + .../example_dataproc_spark_deferrable.py | 3 + .../dataproc/example_dataproc_spark_sql.py | 3 + .../cloud/dataproc/example_dataproc_sparkr.py | 3 + .../cloud/dataproc/example_dataproc_trino.py | 3 + 26 files changed, 180 insertions(+), 117 deletions(-) diff --git a/airflow/providers/google/cloud/operators/dataproc.py b/airflow/providers/google/cloud/operators/dataproc.py index efaf0d6326d08..2384bfbd6251a 100644 --- a/airflow/providers/google/cloud/operators/dataproc.py +++ b/airflow/providers/google/cloud/operators/dataproc.py @@ -30,6 +30,7 @@ from dataclasses import dataclass from datetime import datetime, timedelta from enum import Enum +from functools import cached_property from typing import TYPE_CHECKING, Any, Sequence from deprecated import deprecated @@ -638,7 +639,7 @@ def __init__( request_id: str | None = None, delete_on_error: bool = True, use_if_exists: bool = True, - retry: AsyncRetry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault | Retry = DEFAULT, timeout: float = 1 * 60 * 60, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", @@ -1184,7 +1185,7 @@ def __init__( project_id: str = PROVIDE_PROJECT_ID, cluster_uuid: str | None = None, request_id: str | None = None, - retry: AsyncRetry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault | Retry = DEFAULT, timeout: float = 1 * 60 * 60, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", @@ -2712,7 +2713,7 @@ def __init__( region: str, request_id: str | None = None, project_id: str = PROVIDE_PROJECT_ID, - retry: AsyncRetry | _MethodDefault = DEFAULT, + retry: AsyncRetry | _MethodDefault | Retry = DEFAULT, timeout: float | None = None, metadata: Sequence[tuple[str, str]] = (), gcp_conn_id: str = "google_cloud_default", @@ -2985,10 +2986,10 @@ class DataprocCreateBatchOperator(GoogleCloudBaseOperator): def __init__( self, *, - region: str | None = None, + region: str, project_id: str = PROVIDE_PROJECT_ID, batch: dict | Batch, - batch_id: str, + batch_id: str | None = None, request_id: str | None = None, retry: Retry | _MethodDefault = DEFAULT, timeout: float | None = None, @@ -3021,20 +3022,20 @@ def __init__( self.polling_interval_seconds = polling_interval_seconds def execute(self, context: Context): - hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) - # batch_id might not be set and will be generated - if self.batch_id: - link = DATAPROC_BATCH_LINK.format( - region=self.region, project_id=self.project_id, batch_id=self.batch_id + if self.asynchronous and self.deferrable: + raise AirflowException( + "Both asynchronous and deferrable parameters were passed. Please, provide only one." ) - self.log.info("Creating batch %s", self.batch_id) - self.log.info("Once started, the batch job will be available at %s", link) + + batch_id: str = "" + if self.batch_id: + batch_id = self.batch_id + self.log.info("Starting batch %s", batch_id) else: - self.log.info("Starting batch job. The batch ID will be generated since it was not provided.") - if self.region is None: - raise AirflowException("Region should be set here") + self.log.info("Starting batch. The batch ID will be generated since it was not provided.") + try: - self.operation = hook.create_batch( + self.operation = self.hook.create_batch( region=self.region, project_id=self.project_id, batch=self.batch, @@ -3044,85 +3045,62 @@ def execute(self, context: Context): timeout=self.timeout, metadata=self.metadata, ) - if self.operation is None: - raise RuntimeError("The operation should be set here!") - - if not self.deferrable: - if not self.asynchronous: - result = hook.wait_for_operation( - timeout=self.timeout, result_retry=self.result_retry, operation=self.operation - ) - self.log.info("Batch %s created", self.batch_id) - - else: - DataprocBatchLink.persist( - context=context, - operator=self, - project_id=self.project_id, - region=self.region, - batch_id=self.batch_id, - ) - return self.operation.operation.name - - else: - # processing ends in execute_complete - self.defer( - trigger=DataprocBatchTrigger( - batch_id=self.batch_id, - project_id=self.project_id, - region=self.region, - gcp_conn_id=self.gcp_conn_id, - impersonation_chain=self.impersonation_chain, - polling_interval_seconds=self.polling_interval_seconds, - ), - method_name="execute_complete", - ) - except AlreadyExists: - self.log.info("Batch with given id already exists") - # This is only likely to happen if batch_id was provided - # Could be running if Airflow was restarted after task started - # poll until a final state is reached - - self.log.info("Attaching to the job %s if it is still running.", self.batch_id) + self.log.info("Batch with given id already exists.") + self.log.info("Attaching to the job %s if it is still running.", batch_id) + else: + batch_id = self.operation.metadata.batch.split("/")[-1] + self.log.info("The batch %s was created.", batch_id) - # deferrable handling of a batch_id that already exists - processing ends in execute_complete - if self.deferrable: - self.defer( - trigger=DataprocBatchTrigger( - batch_id=self.batch_id, - project_id=self.project_id, - region=self.region, - gcp_conn_id=self.gcp_conn_id, - impersonation_chain=self.impersonation_chain, - polling_interval_seconds=self.polling_interval_seconds, - ), - method_name="execute_complete", - ) + DataprocBatchLink.persist( + context=context, + operator=self, + project_id=self.project_id, + region=self.region, + batch_id=batch_id, + ) - # non-deferrable handling of a batch_id that already exists - result = hook.wait_for_batch( - batch_id=self.batch_id, + if self.asynchronous: + batch = self.hook.get_batch( + batch_id=batch_id, region=self.region, project_id=self.project_id, retry=self.retry, timeout=self.timeout, metadata=self.metadata, - wait_check_interval=self.polling_interval_seconds, ) - batch_id = self.batch_id or result.name.split("/")[-1] + self.log.info("The batch %s was created asynchronously. Exiting.", batch_id) + return Batch.to_dict(batch) - self.handle_batch_status(context, result.state, batch_id) - project_id = self.project_id or hook.project_id - if project_id: - DataprocBatchLink.persist( - context=context, - operator=self, - project_id=project_id, - region=self.region, - batch_id=batch_id, + if self.deferrable: + self.defer( + trigger=DataprocBatchTrigger( + batch_id=batch_id, + project_id=self.project_id, + region=self.region, + gcp_conn_id=self.gcp_conn_id, + impersonation_chain=self.impersonation_chain, + polling_interval_seconds=self.polling_interval_seconds, + ), + method_name="execute_complete", ) - return Batch.to_dict(result) + + self.log.info("Waiting for the completion of batch job %s", batch_id) + batch = self.hook.wait_for_batch( + batch_id=batch_id, + region=self.region, + project_id=self.project_id, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + + self.handle_batch_status(context, batch.state, batch_id, batch.state_message) + return Batch.to_dict(batch) + + @cached_property + def hook(self) -> DataprocHook: + return DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain) def execute_complete(self, context, event=None) -> None: """ @@ -3135,23 +3113,27 @@ def execute_complete(self, context, event=None) -> None: raise AirflowException("Batch failed.") state = event["batch_state"] batch_id = event["batch_id"] - self.handle_batch_status(context, state, batch_id) + self.handle_batch_status(context, state, batch_id, state_message=event["batch_state_message"]) def on_kill(self): if self.operation: self.operation.cancel() - def handle_batch_status(self, context: Context, state: Batch.State, batch_id: str) -> None: + def handle_batch_status( + self, context: Context, state: Batch.State, batch_id: str, state_message: str | None = None + ) -> None: # The existing batch may be a number of states other than 'SUCCEEDED'\ # wait_for_operation doesn't fail if the job is cancelled, so we will check for it here which also # finds a cancelling|canceled|unspecified job from wait_for_batch or the deferred trigger link = DATAPROC_BATCH_LINK.format(region=self.region, project_id=self.project_id, batch_id=batch_id) if state == Batch.State.FAILED: - raise AirflowException("Batch job %s failed. Driver Logs: %s", batch_id, link) + raise AirflowException( + f"Batch job {batch_id} failed with error: {state_message}\nDriver Logs: {link}" + ) if state in (Batch.State.CANCELLED, Batch.State.CANCELLING): - raise AirflowException("Batch job %s was cancelled. Driver logs: %s", batch_id, link) + raise AirflowException(f"Batch job {batch_id} was cancelled. Driver logs: {link}") if state == Batch.State.STATE_UNSPECIFIED: - raise AirflowException("Batch job %s unspecified. Driver logs: %s", batch_id, link) + raise AirflowException(f"Batch job {batch_id} unspecified. Driver logs: {link}") self.log.info("Batch job %s completed. Driver logs: %s", batch_id, link) diff --git a/airflow/providers/google/cloud/triggers/dataproc.py b/airflow/providers/google/cloud/triggers/dataproc.py index 99800d266a86a..508b0444c05aa 100644 --- a/airflow/providers/google/cloud/triggers/dataproc.py +++ b/airflow/providers/google/cloud/triggers/dataproc.py @@ -371,7 +371,10 @@ async def run(self): self.log.info("Current state is %s", state) self.log.info("Sleeping for %s seconds.", self.polling_interval_seconds) await asyncio.sleep(self.polling_interval_seconds) - yield TriggerEvent({"batch_id": self.batch_id, "batch_state": state}) + + yield TriggerEvent( + {"batch_id": self.batch_id, "batch_state": state, "batch_state_message": batch.state_message} + ) class DataprocDeleteClusterTrigger(DataprocBaseTrigger): diff --git a/scripts/ci/pre_commit/check_system_tests.py b/scripts/ci/pre_commit/check_system_tests.py index 89e2a9f24ae5c..4c82272ad7875 100755 --- a/scripts/ci/pre_commit/check_system_tests.py +++ b/scripts/ci/pre_commit/check_system_tests.py @@ -35,7 +35,6 @@ errors: list[str] = [] WATCHER_APPEND_INSTRUCTION = "list(dag.tasks) >> watcher()" -WATCHER_APPEND_INSTRUCTION_SHORT = " >> watcher()" PYTEST_FUNCTION = """ from tests.system.utils import get_test_run # noqa: E402 @@ -53,7 +52,7 @@ def _check_file(file: Path): content = file.read_text() if "from tests.system.utils.watcher import watcher" in content: - index = content.find(WATCHER_APPEND_INSTRUCTION_SHORT) + index = content.find(WATCHER_APPEND_INSTRUCTION) if index == -1: errors.append( f"[red]The example {file} imports tests.system.utils.watcher " diff --git a/tests/providers/google/cloud/operators/test_dataproc.py b/tests/providers/google/cloud/operators/test_dataproc.py index bcfe4eb818aa8..1d1f2a1ef818a 100644 --- a/tests/providers/google/cloud/operators/test_dataproc.py +++ b/tests/providers/google/cloud/operators/test_dataproc.py @@ -2708,7 +2708,7 @@ def test_execute_batch_failed(self, mock_hook, to_dict_mock): timeout=TIMEOUT, metadata=METADATA, ) - mock_hook.return_value.wait_for_operation.return_value = Batch(state=Batch.State.FAILED) + mock_hook.return_value.wait_for_batch.return_value = Batch(state=Batch.State.FAILED) with pytest.raises(AirflowException): op.execute(context=MagicMock()) @@ -2729,12 +2729,12 @@ def test_execute_batch_already_exists_succeeds(self, mock_hook): ) mock_hook.return_value.wait_for_operation.side_effect = AlreadyExists("") mock_hook.return_value.wait_for_batch.return_value = Batch(state=Batch.State.SUCCEEDED) + mock_hook.return_value.create_batch.return_value.metadata.batch = f"prefix/{BATCH_ID}" op.execute(context=MagicMock()) mock_hook.return_value.wait_for_batch.assert_called_once_with( batch_id=BATCH_ID, region=GCP_REGION, project_id=GCP_PROJECT, - wait_check_interval=5, retry=RETRY, timeout=TIMEOUT, metadata=METADATA, @@ -2757,13 +2757,13 @@ def test_execute_batch_already_exists_fails(self, mock_hook): ) mock_hook.return_value.wait_for_operation.side_effect = AlreadyExists("") mock_hook.return_value.wait_for_batch.return_value = Batch(state=Batch.State.FAILED) + mock_hook.return_value.create_batch.return_value.metadata.batch = f"prefix/{BATCH_ID}" with pytest.raises(AirflowException): op.execute(context=MagicMock()) mock_hook.return_value.wait_for_batch.assert_called_once_with( batch_id=BATCH_ID, region=GCP_REGION, project_id=GCP_PROJECT, - wait_check_interval=5, retry=RETRY, timeout=TIMEOUT, metadata=METADATA, @@ -2786,13 +2786,13 @@ def test_execute_batch_already_exists_cancelled(self, mock_hook): ) mock_hook.return_value.wait_for_operation.side_effect = AlreadyExists("") mock_hook.return_value.wait_for_batch.return_value = Batch(state=Batch.State.CANCELLED) + mock_hook.return_value.create_batch.return_value.metadata.batch = f"prefix/{BATCH_ID}" with pytest.raises(AirflowException): op.execute(context=MagicMock()) mock_hook.return_value.wait_for_batch.assert_called_once_with( batch_id=BATCH_ID, region=GCP_REGION, project_id=GCP_PROJECT, - wait_check_interval=5, retry=RETRY, timeout=TIMEOUT, metadata=METADATA, diff --git a/tests/providers/google/cloud/triggers/test_dataproc.py b/tests/providers/google/cloud/triggers/test_dataproc.py index 39ed949463c4b..d2d4b44467319 100644 --- a/tests/providers/google/cloud/triggers/test_dataproc.py +++ b/tests/providers/google/cloud/triggers/test_dataproc.py @@ -38,6 +38,7 @@ TEST_PROJECT_ID = "project-id" TEST_REGION = "region" TEST_BATCH_ID = "batch-id" +TEST_BATCH_STATE_MESSAGE = "Test batch state message" BATCH_CONFIG = { "spark_batch": { "jar_file_uris": ["file:///usr/lib/spark/examples/jars/spark-examples.jar"], @@ -391,12 +392,15 @@ async def test_async_create_batch_trigger_triggers_on_success_should_execute_suc Tests the DataprocBatchTrigger only fires once the batch execution reaches a successful state. """ - mock_hook.return_value = async_get_batch(state=Batch.State.SUCCEEDED, batch_id=TEST_BATCH_ID) + mock_hook.return_value = async_get_batch( + state=Batch.State.SUCCEEDED, batch_id=TEST_BATCH_ID, state_message=TEST_BATCH_STATE_MESSAGE + ) expected_event = TriggerEvent( { "batch_id": TEST_BATCH_ID, "batch_state": Batch.State.SUCCEEDED, + "batch_state_message": TEST_BATCH_STATE_MESSAGE, } ) @@ -409,9 +413,17 @@ async def test_async_create_batch_trigger_triggers_on_success_should_execute_suc async def test_async_create_batch_trigger_run_returns_failed_event( self, mock_hook, batch_trigger, async_get_batch ): - mock_hook.return_value = async_get_batch(state=Batch.State.FAILED, batch_id=TEST_BATCH_ID) + mock_hook.return_value = async_get_batch( + state=Batch.State.FAILED, batch_id=TEST_BATCH_ID, state_message=TEST_BATCH_STATE_MESSAGE + ) - expected_event = TriggerEvent({"batch_id": TEST_BATCH_ID, "batch_state": Batch.State.FAILED}) + expected_event = TriggerEvent( + { + "batch_id": TEST_BATCH_ID, + "batch_state": Batch.State.FAILED, + "batch_state_message": TEST_BATCH_STATE_MESSAGE, + } + ) actual_event = await batch_trigger.run().asend(None) await asyncio.sleep(0.5) @@ -420,9 +432,17 @@ async def test_async_create_batch_trigger_run_returns_failed_event( @pytest.mark.asyncio @mock.patch("airflow.providers.google.cloud.hooks.dataproc.DataprocAsyncHook.get_batch") async def test_create_batch_run_returns_cancelled_event(self, mock_hook, batch_trigger, async_get_batch): - mock_hook.return_value = async_get_batch(state=Batch.State.CANCELLED, batch_id=TEST_BATCH_ID) + mock_hook.return_value = async_get_batch( + state=Batch.State.CANCELLED, batch_id=TEST_BATCH_ID, state_message=TEST_BATCH_STATE_MESSAGE + ) - expected_event = TriggerEvent({"batch_id": TEST_BATCH_ID, "batch_state": Batch.State.CANCELLED}) + expected_event = TriggerEvent( + { + "batch_id": TEST_BATCH_ID, + "batch_state": Batch.State.CANCELLED, + "batch_state_message": TEST_BATCH_STATE_MESSAGE, + } + ) actual_event = await batch_trigger.run().asend(None) await asyncio.sleep(0.5) diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py index 852fa0914d089..2796b16d6be62 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch.py @@ -69,6 +69,7 @@ region=REGION, batch=BATCH_CONFIG, batch_id=BATCH_ID, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) create_batch_2 = DataprocCreateBatchOperator( @@ -87,6 +88,7 @@ batch=BATCH_CONFIG, batch_id=BATCH_ID_3, asynchronous=True, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_batch_operator] @@ -128,18 +130,10 @@ task_id="cancel_operation", project_id=PROJECT_ID, region=REGION, - operation_name="{{ task_instance.xcom_pull('create_batch_4') }}", + operation_name="{{ task_instance.xcom_pull('create_batch_4')['operation'] }}", ) # [END how_to_cloud_dataproc_cancel_operation_operator] - batch_cancelled_sensor = DataprocBatchSensor( - task_id="batch_cancelled_sensor", - region=REGION, - project_id=PROJECT_ID, - batch_id=BATCH_ID_4, - poke_interval=10, - ) - # [START how_to_cloud_dataproc_delete_batch_operator] delete_batch = DataprocDeleteBatchOperator( task_id="delete_batch", project_id=PROJECT_ID, region=REGION, batch_id=BATCH_ID @@ -161,7 +155,9 @@ ( # TEST SETUP - [create_batch, create_batch_2, create_batch_3] + create_batch + >> create_batch_2 + >> create_batch_3 # TEST BODY >> batch_async_sensor >> get_batch @@ -169,8 +165,9 @@ >> create_batch_4 >> cancel_operation # TEST TEARDOWN - >> [delete_batch, delete_batch_2, delete_batch_3] - >> batch_cancelled_sensor + >> delete_batch + >> delete_batch_2 + >> delete_batch_3 >> delete_batch_4 ) @@ -178,10 +175,7 @@ # This test needs watcher in order to properly mark success/failure # when "teardown" task with trigger rule is part of the DAG - - # Excluding sensor because we expect it to fail due to cancelled operation - [task for task in dag.tasks if task.task_id != "batch_cancelled_sensor"] >> watcher() - + list(dag.tasks) >> watcher() from tests.system.utils import get_test_run # noqa: E402 diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py index 1d2fec951e18f..afd509258d597 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_deferrable.py @@ -25,6 +25,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateBatchOperator, @@ -62,6 +64,7 @@ batch=BATCH_CONFIG, batch_id=BATCH_ID, deferrable=True, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_batch_operator_async] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py index cb4e731c6785b..fbaf197c73a6e 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_batch_persistent.py @@ -23,6 +23,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( ClusterGenerator, @@ -89,6 +91,7 @@ cluster_config=CLUSTER_GENERATOR_CONFIG_FOR_PHS, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_cluster_for_persistent_history_server] @@ -99,6 +102,7 @@ region=REGION, batch=BATCH_CONFIG_WITH_PHS, batch_id=BATCH_ID, + result_retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_batch_operator_with_persistent_history_server] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py index 10403cea3b065..fd75917ccee6e 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_create_existing_stopped_cluster.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -69,6 +71,7 @@ region=REGION, cluster_name=CLUSTER_NAME, use_if_exists=True, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) start_cluster = DataprocStartClusterOperator( @@ -76,6 +79,7 @@ project_id=PROJECT_ID, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) stop_cluster = DataprocStopClusterOperator( @@ -92,6 +96,7 @@ region=REGION, cluster_name=CLUSTER_NAME, use_if_exists=True, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) delete_cluster = DataprocDeleteClusterOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py index 35adca660d9a0..267cd043eb9c7 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_deferrable.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -95,6 +97,7 @@ region=REGION, cluster_name=CLUSTER_NAME, deferrable=True, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_cluster_operator_async] @@ -108,6 +111,7 @@ project_id=PROJECT_ID, region=REGION, deferrable=True, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_update_cluster_operator_async] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py index a610049bf99e3..37a5ec65f4e1c 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_diagnose.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -72,6 +74,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [START how_to_cloud_dataproc_diagnose_cluster] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py index d64fda25e4568..31370e4693864 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_generator.py @@ -25,6 +25,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( ClusterGenerator, @@ -103,6 +105,7 @@ project_id=PROJECT_ID, region=REGION, cluster_config=CLUSTER_GENERATOR_CONFIG, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_cluster_generate_cluster_config_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_start_stop.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_start_stop.py index 2af1352fdc1de..a08f23cc74bc2 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_start_stop.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_start_stop.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -68,6 +70,7 @@ region=REGION, cluster_name=CLUSTER_NAME, use_if_exists=True, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [START how_to_cloud_dataproc_start_cluster_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py index 610a19e17f84f..c192990753fc5 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_cluster_update.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -83,6 +85,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [START how_to_cloud_dataproc_update_cluster_operator] @@ -94,6 +97,7 @@ graceful_decommission_timeout=TIMEOUT, project_id=PROJECT_ID, region=REGION, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_update_cluster_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py index 550dec97c4187..7f0ff12f80255 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_gke.py @@ -31,6 +31,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -110,6 +112,7 @@ region=REGION, cluster_name=CLUSTER_NAME, virtual_cluster_config=VIRTUAL_CLUSTER_CONFIG, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_cluster_operator_in_gke] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py index f8f5cc3063736..45a668a90d45a 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hadoop.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -91,6 +93,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) hadoop_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py index 5e1189c7f9125..41409dd6497b1 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_hive.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -93,6 +95,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [END how_to_cloud_dataproc_create_cluster_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py index 9b76c10cbe346..f5d20827771e1 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_pig.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -80,6 +82,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) pig_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py index 1c3cdf208252e..924547ddacc60 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_presto.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -87,6 +89,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) presto_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py index 4c6a64783a425..fe9d493f4ac40 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_pyspark.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -103,6 +105,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [START how_to_cloud_dataproc_submit_job_to_cluster_operator] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark.py index f939347baac4b..d8a3e2d2391ec 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -83,6 +85,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) spark_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_async.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_async.py index cd1060b5fea57..d31f6f24faf5f 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_async.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_async.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -82,6 +84,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) # [START cloud_dataproc_async_submit_sensor] diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py index d20aa0aa0ed51..0efee5f60aa32 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_deferrable.py @@ -25,6 +25,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -84,6 +86,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) spark_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_sql.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_sql.py index 0ca0d062d2989..cae42d25c4533 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_sql.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_spark_sql.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -80,6 +82,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) spark_sql_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py index 50454904759a6..70df13ed2efc1 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_sparkr.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -101,6 +103,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) sparkr_task = DataprocSubmitJobOperator( diff --git a/tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py b/tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py index 936430ccf0f12..f69dbb5360915 100644 --- a/tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py +++ b/tests/system/providers/google/cloud/dataproc/example_dataproc_trino.py @@ -24,6 +24,8 @@ import os from datetime import datetime +from google.api_core.retry import Retry + from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.dataproc import ( DataprocCreateClusterOperator, @@ -89,6 +91,7 @@ cluster_config=CLUSTER_CONFIG, region=REGION, cluster_name=CLUSTER_NAME, + retry=Retry(maximum=100.0, initial=10.0, multiplier=1.0), ) trino_task = DataprocSubmitJobOperator(