From 04f496515bb788931d11835ea3f8105a5c33edce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miroslav=20=C5=A0ediv=C3=BD?= <6774676+eumiro@users.noreply.github.com> Date: Mon, 28 Aug 2023 21:24:40 +0200 Subject: [PATCH] Refactor unneeded 'continue' jumps in dag processing --- airflow/dag_processing/manager.py | 27 ++++++++++--------------- airflow/dag_processing/processor.py | 4 ++-- airflow/example_dags/plugins/workday.py | 17 ++++++++-------- 3 files changed, 21 insertions(+), 27 deletions(-) diff --git a/airflow/dag_processing/manager.py b/airflow/dag_processing/manager.py index 3b26da2c6822e..be5de2ccb75aa 100644 --- a/airflow/dag_processing/manager.py +++ b/airflow/dag_processing/manager.py @@ -610,16 +610,12 @@ def _run_parsing_loop(self): continue for sentinel in ready: - if sentinel is self._direct_scheduler_conn: - continue - - processor = self.waitables.get(sentinel) - if not processor: - continue - - self._collect_results_from_processor(processor) - self.waitables.pop(sentinel) - self._processors.pop(processor.file_path) + if sentinel is not self._direct_scheduler_conn: + processor = self.waitables.get(sentinel) + if processor: + self._collect_results_from_processor(processor) + self.waitables.pop(sentinel) + self._processors.pop(processor.file_path) if self.standalone_dag_processor: self._fetch_callbacks(max_callbacks_per_loop) @@ -1058,12 +1054,11 @@ def collect_results(self) -> None: ) for sentinel in ready: - if sentinel is self._direct_scheduler_conn: - continue - processor = cast(DagFileProcessorProcess, self.waitables[sentinel]) - self.waitables.pop(processor.waitable_handle) - self._processors.pop(processor.file_path) - self._collect_results_from_processor(processor) + if sentinel is not self._direct_scheduler_conn: + processor = cast(DagFileProcessorProcess, self.waitables[sentinel]) + self.waitables.pop(processor.waitable_handle) + self._processors.pop(processor.file_path) + self._collect_results_from_processor(processor) self.log.debug("%s/%s DAG parsing processes running", len(self._processors), self._parallelism) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 1cb9c74a27431..e452dc4259cac 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -561,8 +561,8 @@ def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) -> cls.logger().warning( "Task %s doesn't exist in DAG anymore, skipping SLA miss notification.", sla.task_id ) - continue - tasks_missed_sla.append(task) + else: + tasks_missed_sla.append(task) emails: set[str] = set() for task in tasks_missed_sla: diff --git a/airflow/example_dags/plugins/workday.py b/airflow/example_dags/plugins/workday.py index 79473e06ddc6f..2c3299c960bbb 100644 --- a/airflow/example_dags/plugins/workday.py +++ b/airflow/example_dags/plugins/workday.py @@ -45,15 +45,14 @@ class AfterWorkdayTimetable(Timetable): def get_next_workday(self, d: DateTime, incr=1) -> DateTime: next_start = d while True: - if next_start.weekday() in (5, 6): # If next start is in the weekend go to next day - next_start = next_start + incr * timedelta(days=1) - continue - if holiday_calendar is not None: - holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime() - if next_start in holidays: # If next start is a holiday go to next day - next_start = next_start + incr * timedelta(days=1) - continue - break + if next_start.weekday() not in (5, 6): # not on weekend + if holiday_calendar is None: + holidays = set() + else: + holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime() + if next_start not in holidays: + break + next_start = next_start.add(days=incr) return next_start # [START howto_timetable_infer_manual_data_interval]