Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,16 +610,12 @@ def _run_parsing_loop(self):
continue

for sentinel in ready:
if sentinel is self._direct_scheduler_conn:
continue

processor = self.waitables.get(sentinel)
if not processor:
continue

self._collect_results_from_processor(processor)
self.waitables.pop(sentinel)
self._processors.pop(processor.file_path)
if sentinel is not self._direct_scheduler_conn:
processor = self.waitables.get(sentinel)
if processor:
self._collect_results_from_processor(processor)
self.waitables.pop(sentinel)
self._processors.pop(processor.file_path)

if self.standalone_dag_processor:
self._fetch_callbacks(max_callbacks_per_loop)
Expand Down Expand Up @@ -1058,12 +1054,11 @@ def collect_results(self) -> None:
)

for sentinel in ready:
if sentinel is self._direct_scheduler_conn:
continue
processor = cast(DagFileProcessorProcess, self.waitables[sentinel])
self.waitables.pop(processor.waitable_handle)
self._processors.pop(processor.file_path)
self._collect_results_from_processor(processor)
if sentinel is not self._direct_scheduler_conn:
processor = cast(DagFileProcessorProcess, self.waitables[sentinel])
self.waitables.pop(processor.waitable_handle)
self._processors.pop(processor.file_path)
self._collect_results_from_processor(processor)

self.log.debug("%s/%s DAG parsing processes running", len(self._processors), self._parallelism)

Expand Down
4 changes: 2 additions & 2 deletions airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,8 @@ def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) ->
cls.logger().warning(
"Task %s doesn't exist in DAG anymore, skipping SLA miss notification.", sla.task_id
)
continue
tasks_missed_sla.append(task)
else:
tasks_missed_sla.append(task)

emails: set[str] = set()
for task in tasks_missed_sla:
Expand Down
17 changes: 8 additions & 9 deletions airflow/example_dags/plugins/workday.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,14 @@ class AfterWorkdayTimetable(Timetable):
def get_next_workday(self, d: DateTime, incr=1) -> DateTime:
next_start = d
while True:
if next_start.weekday() in (5, 6): # If next start is in the weekend go to next day
next_start = next_start + incr * timedelta(days=1)
continue
if holiday_calendar is not None:
holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime()
if next_start in holidays: # If next start is a holiday go to next day
next_start = next_start + incr * timedelta(days=1)
continue
break
if next_start.weekday() not in (5, 6): # not on weekend
if holiday_calendar is None:
holidays = set()
else:
holidays = holiday_calendar.holidays(start=next_start, end=next_start).to_pydatetime()
if next_start not in holidays:
break
next_start = next_start.add(days=incr)
return next_start

# [START howto_timetable_infer_manual_data_interval]
Expand Down