From 0029d34ea4324084a15987a560157eac5d3bc2f7 Mon Sep 17 00:00:00 2001 From: RosterIn <48057736+RosterIn@users.noreply.github.com> Date: Mon, 22 Aug 2022 15:41:07 +0300 Subject: [PATCH 1/2] Fix task policy example There is no timeout parameter there is execution_timeout --- tests/cluster_policies/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/cluster_policies/__init__.py b/tests/cluster_policies/__init__.py index b54667647c5cf..768b15754ee34 100644 --- a/tests/cluster_policies/__init__.py +++ b/tests/cluster_policies/__init__.py @@ -89,8 +89,8 @@ class TimedOperator(BaseOperator, ABC): def task_policy(task: TimedOperator): if task.task_type == 'HivePartitionSensor': task.queue = "sensor_queue" - if task.timeout > timedelta(hours=48): - task.timeout = timedelta(hours=48) + if task.execution_timeout > timedelta(hours=48): + task.execution_timeout = timedelta(hours=48) # [END example_task_cluster_policy] From 33e98fb82f19dc6d61fee80e8385983c62ca29f2 Mon Sep 17 00:00:00 2001 From: RosterIn <48057736+RosterIn@users.noreply.github.com> Date: Tue, 23 Aug 2022 15:06:30 +0300 Subject: [PATCH 2/2] update example --- tests/cluster_policies/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/cluster_policies/__init__.py b/tests/cluster_policies/__init__.py index 768b15754ee34..dbb5d0163dc8a 100644 --- a/tests/cluster_policies/__init__.py +++ b/tests/cluster_policies/__init__.py @@ -82,13 +82,13 @@ def dag_policy(dag: DAG): # [START example_task_cluster_policy] -class TimedOperator(BaseOperator, ABC): - timeout: timedelta +class MyCustomOperator(BaseOperator, ABC): + pass -def task_policy(task: TimedOperator): - if task.task_type == 'HivePartitionSensor': - task.queue = "sensor_queue" +def task_policy(task: MyCustomOperator): + if task.task_type == 'BaseOperator': + task.queue = "priority_queue" if task.execution_timeout > timedelta(hours=48): task.execution_timeout = timedelta(hours=48)