diff --git a/airflow/utils/dag_edges.py b/airflow/utils/dag_edges.py index f7d0f7e7d8327..bd1ad268aefed 100644 --- a/airflow/utils/dag_edges.py +++ b/airflow/utils/dag_edges.py @@ -115,10 +115,9 @@ def collect_edges(task_group): edge = (task.task_id, child.task_id) if task.is_setup and child.is_teardown: setup_teardown_edges.add(edge) - if edge in edges: - continue - edges.add(edge) - tasks_to_trace_next.append(child) + if edge not in edges: + edges.add(edge) + tasks_to_trace_next.append(child) tasks_to_trace = tasks_to_trace_next result = [] diff --git a/airflow/utils/db.py b/airflow/utils/db.py index db4631f14867a..16b9d6595c1f5 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -1446,9 +1446,7 @@ class BadReferenceConfig: dangling_table_name = _format_airflow_moved_table_name(source_table.name, change_version, "dangling") if dangling_table_name in existing_table_names: invalid_row_count = bad_rows_query.count() - if invalid_row_count <= 0: - continue - else: + if invalid_row_count: yield _format_dangling_error( source_table=source_table.name, target_table=dangling_table_name, diff --git a/airflow/utils/db_cleanup.py b/airflow/utils/db_cleanup.py index 0f7eb8064ce07..b246eb8c4091a 100644 --- a/airflow/utils/db_cleanup.py +++ b/airflow/utils/db_cleanup.py @@ -435,19 +435,19 @@ def run_cleanup( _confirm_delete(date=clean_before_timestamp, tables=sorted(effective_table_names)) existing_tables = reflect_tables(tables=None, session=session).tables for table_name, table_config in effective_config_dict.items(): - if table_name not in existing_tables: + if table_name in existing_tables: + with _suppress_with_logging(table_name, session): + _cleanup_table( + clean_before_timestamp=clean_before_timestamp, + dry_run=dry_run, + verbose=verbose, + **table_config.__dict__, + skip_archive=skip_archive, + session=session, + ) + session.commit() + else: logger.warning("Table %s not found. Skipping.", table_name) - continue - with _suppress_with_logging(table_name, session): - _cleanup_table( - clean_before_timestamp=clean_before_timestamp, - dry_run=dry_run, - verbose=verbose, - **table_config.__dict__, - skip_archive=skip_archive, - session=session, - ) - session.commit() @provide_session diff --git a/airflow/utils/email.py b/airflow/utils/email.py index 2957e5e1d3f64..659c3bdb0f192 100644 --- a/airflow/utils/email.py +++ b/airflow/utils/email.py @@ -271,18 +271,17 @@ def send_mime_email( try: smtp_conn = _get_smtp_connection(smtp_host, smtp_port, smtp_timeout, smtp_ssl) except smtplib.SMTPServerDisconnected: - if attempt < smtp_retry_limit: - continue - raise - - if smtp_starttls: - smtp_conn.starttls() - if smtp_user and smtp_password: - smtp_conn.login(smtp_user, smtp_password) - log.info("Sent an alert email to %s", e_to) - smtp_conn.sendmail(e_from, e_to, mime_msg.as_string()) - smtp_conn.quit() - break + if attempt == smtp_retry_limit: + raise + else: + if smtp_starttls: + smtp_conn.starttls() + if smtp_user and smtp_password: + smtp_conn.login(smtp_user, smtp_password) + log.info("Sent an alert email to %s", e_to) + smtp_conn.sendmail(e_from, e_to, mime_msg.as_string()) + smtp_conn.quit() + break def get_email_address_list(addresses: str | Iterable[str]) -> list[str]: diff --git a/airflow/utils/file.py b/airflow/utils/file.py index 9178fa4af5541..f885e57b37f88 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -244,12 +244,10 @@ def _find_path_from_directory( patterns_by_dir.update({dirpath: patterns.copy()}) for file in files: - if file == ignore_file_name: - continue - abs_file_path = Path(root) / file - if ignore_rule_type.match(abs_file_path, patterns): - continue - yield str(abs_file_path) + if file != ignore_file_name: + abs_file_path = Path(root) / file + if not ignore_rule_type.match(abs_file_path, patterns): + yield str(abs_file_path) def find_path_from_directory( @@ -310,16 +308,11 @@ def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> l file_paths = [] for file_path in find_path_from_directory(directory, ".airflowignore"): + path = Path(file_path) try: - if not os.path.isfile(file_path): - continue - _, file_ext = os.path.splitext(os.path.split(file_path)[-1]) - if file_ext != ".py" and not zipfile.is_zipfile(file_path): - continue - if not might_contain_dag(file_path, safe_mode): - continue - - file_paths.append(file_path) + if path.is_file() and (path.suffix == ".py" or zipfile.is_zipfile(path)): + if might_contain_dag(file_path, safe_mode): + file_paths.append(file_path) except Exception: log.exception("Error while examining %s", file_path) diff --git a/airflow/utils/log/file_task_handler.py b/airflow/utils/log/file_task_handler.py index 2530b8fdb5313..25b664074efc9 100644 --- a/airflow/utils/log/file_task_handler.py +++ b/airflow/utils/log/file_task_handler.py @@ -109,14 +109,13 @@ def _parse_timestamps_in_log_file(lines: Iterable[str]): timestamp = None next_timestamp = None for idx, line in enumerate(lines): - if not line: - continue - with suppress(Exception): - # next_timestamp unchanged if line can't be parsed - next_timestamp = _parse_timestamp(line) - if next_timestamp: - timestamp = next_timestamp - yield timestamp, idx, line + if line: + with suppress(Exception): + # next_timestamp unchanged if line can't be parsed + next_timestamp = _parse_timestamp(line) + if next_timestamp: + timestamp = next_timestamp + yield timestamp, idx, line def _interleave_logs(*logs): diff --git a/airflow/utils/log/secrets_masker.py b/airflow/utils/log/secrets_masker.py index 735d6e03a7a96..246377c169c0b 100644 --- a/airflow/utils/log/secrets_masker.py +++ b/airflow/utils/log/secrets_masker.py @@ -206,9 +206,8 @@ def filter(self, record) -> bool: if self.replacer: for k, v in record.__dict__.items(): - if k in self._record_attrs_to_ignore: - continue - record.__dict__[k] = self.redact(v) + if k not in self._record_attrs_to_ignore: + record.__dict__[k] = self.redact(v) if record.exc_info and record.exc_info[1] is not None: exc = record.exc_info[1] self._redact_exception_with_context(exc)