From 2a68f23b06cd7828a909cc48bbaac6768019c36e Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 1 Nov 2025 20:49:23 +0100 Subject: [PATCH 01/10] Enable ruff PLW2901 rule --- .../core_api/routes/public/dag_run.py | 6 +- .../core_api/services/public/pools.py | 4 +- .../core_api/services/public/variables.py | 6 +- .../src/airflow/dag_processing/collection.py | 4 +- .../src/airflow/executors/executor_loader.py | 6 +- airflow-core/src/airflow/models/pool.py | 9 +- .../src/airflow/serialization/serde.py | 19 ++- .../serialization/serialized_objects.py | 115 ++++++++++-------- .../cli/commands/test_connection_command.py | 6 +- .../tests/unit/cli/test_cli_parser.py | 22 ++-- .../unit/core/test_impersonation_tests.py | 5 +- .../tests/unit/models/test_cleartasks.py | 4 +- airflow-core/tests/unit/models/test_dagrun.py | 6 +- .../tests/unit/plugins/test_plugin_ignore.py | 8 +- chart/docs/conf.py | 5 +- dev/airflow-license | 4 +- .../commands/release_management_commands.py | 6 +- .../src/airflow_breeze/global_constants.py | 6 +- .../utils/add_back_references.py | 4 +- .../utils/constraints_version_check.py | 4 +- .../airflow_breeze/utils/gh_workflow_utils.py | 8 +- .../utils/projects_google_spreadsheet.py | 4 +- dev/chart/build_changelog_annotations.py | 4 +- dev/check_files.py | 6 +- .../sphinx_exts/operators_and_hooks_ref.py | 4 +- .../sphinx_exts/substitution_extensions.py | 4 +- .../_internals/forbidden_warnings.py | 4 +- .../providers/amazon/aws/hooks/athena.py | 7 +- .../amazon/aws/secrets/secrets_manager.py | 7 +- .../amazon/aws/transfers/dynamodb_to_s3.py | 4 +- .../amazon/aws/transfers/sql_to_s3.py | 5 +- .../aws/executors/ecs/test_ecs_executor.py | 9 +- .../amazon/aws/hooks/test_hooks_signature.py | 8 +- .../operators/test_sagemaker_processing.py | 6 +- .../aws/operators/test_sagemaker_transform.py | 6 +- .../providers/apache/beam/hooks/beam.py | 4 +- .../providers/apache/hive/hooks/hive.py | 29 +++-- .../apache/spark/hooks/spark_submit.py | 8 +- .../databricks/hooks/databricks_sql.py | 2 +- .../sensors/databricks_partition.py | 7 +- .../providers/docker/operators/docker.py | 4 +- .../elasticsearch/log/elasticmock/__init__.py | 5 +- .../api_endpoints/user_endpoint.py | 4 +- .../auth_manager/security_manager/override.py | 11 +- .../cli_commands/test_role_command.py | 4 +- .../cli_commands/test_user_command.py | 4 +- .../providers/google/cloud/hooks/bigquery.py | 37 +++--- .../google/cloud/operators/translate.py | 4 +- .../google/cloud/transfers/sql_to_gcs.py | 16 +-- .../google/suite/transfers/sql_to_sheets.py | 16 +-- .../mysql/transfers/vertica_to_mysql.py | 6 +- .../providers/openlineage/utils/sql.py | 8 +- pyproject.toml | 3 + .../ci/prek/check_provider_version_compat.py | 7 +- scripts/ci/prek/sort_in_the_wild.py | 8 +- .../run_capture_airflowctl_help.py | 4 +- task-sdk/src/airflow/sdk/bases/operator.py | 6 +- .../src/airflow/sdk/execution_time/context.py | 18 +-- .../execution_time/test_task_runner.py | 8 +- 59 files changed, 294 insertions(+), 264 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py index 9066deabe6fcd..a39d6fd216910 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -187,7 +187,7 @@ def patch_dag_run( data = patch_body.model_dump(include=fields_to_update, by_alias=True) - for attr_name, attr_value in data.items(): + for attr_name, attr_value_raw in data.items(): if attr_name == "state": attr_value = getattr(patch_body, "state") if attr_value == DAGRunPatchStates.SUCCESS: @@ -208,9 +208,9 @@ def patch_dag_run( elif attr_name == "note": updated_dag_run = session.get(DagRun, dag_run.id) if updated_dag_run and updated_dag_run.dag_run_note is None: - updated_dag_run.note = (attr_value, user.get_id()) + updated_dag_run.note = (attr_value_raw, user.get_id()) elif updated_dag_run: - updated_dag_run.dag_run_note.content = attr_value + updated_dag_run.dag_run_note.content = attr_value_raw updated_dag_run.dag_run_note.user_id = user.get_id() final_dag_run = session.get(DagRun, dag_run.id) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py index 1edadb97cc699..d23b0acc02beb 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/pools.py @@ -178,9 +178,9 @@ def handle_bulk_update(self, action: BulkUpdateAction[PoolBody], results: BulkAc if pool.pool not in update_pool_names: continue - pool = update_orm_from_pydantic(pool.pool, pool, action.update_mask, self.session) + updated_pool = update_orm_from_pydantic(pool.pool, pool, action.update_mask, self.session) - results.success.append(str(pool.pool)) # use request field, always consistent + results.success.append(str(updated_pool.pool)) # use request field, always consistent except HTTPException as e: results.errors.append({"error": f"{e.detail}", "status_code": e.status_code}) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py index 6296f22828f60..5193863520e31 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py @@ -148,9 +148,11 @@ def handle_bulk_update(self, action: BulkUpdateAction, results: BulkActionRespon for variable in action.entities: if variable.key not in update_keys: continue - variable = update_orm_from_pydantic(variable.key, variable, action.update_mask, self.session) + variable_py = update_orm_from_pydantic( + variable.key, variable, action.update_mask, self.session + ) - results.success.append(variable.key) + results.success.append(variable_py.key) except HTTPException as e: results.errors.append({"error": f"{e.detail}", "status_code": e.status_code}) diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index c91e8724e8875..371420f44d2ff 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -637,8 +637,8 @@ def _get_dag_assets( inlets: bool = True, outlets: bool = True, ) -> Iterable[tuple[str, AssetT]]: - for task in dag.data["dag"]["tasks"]: - task = task[Encoding.VAR] + for task_raw in dag.data["dag"]["tasks"]: + task = task_raw[Encoding.VAR] ports = _get_task_ports(task["partial_kwargs"] if task.get("_is_mapped") else task, inlets, outlets) for port in ports: if isinstance(obj := BaseSerialization.deserialize(port), of): diff --git a/airflow-core/src/airflow/executors/executor_loader.py b/airflow-core/src/airflow/executors/executor_loader.py index af0385034698c..bfa585ffb7c42 100644 --- a/airflow-core/src/airflow/executors/executor_loader.py +++ b/airflow-core/src/airflow/executors/executor_loader.py @@ -77,8 +77,8 @@ def _get_executor_names(cls) -> list[ExecutorName]: executor_names = [] for team_name, executor_names_config in all_executor_names: executor_names_per_team = [] - for name in executor_names_config: - if len(split_name := name.split(":")) == 1: + for name_raw in executor_names_config: + if len(split_name := name_raw.split(":")) == 1: name = split_name[0] # Check if this is an alias for a core airflow executor, module # paths won't be provided by the user in that case. @@ -109,7 +109,7 @@ def _get_executor_names(cls) -> list[ExecutorName]: ExecutorName(alias=split_name[0], module_path=split_name[1], team_name=team_name) ) else: - raise AirflowConfigException(f"Incorrectly formatted executor configuration: {name}") + raise AirflowConfigException(f"Incorrectly formatted executor configuration: {name_raw}") # As of now, we do not allow duplicate executors (within teams). # Add all module paths to a set, since the actual code is what is unique diff --git a/airflow-core/src/airflow/models/pool.py b/airflow-core/src/airflow/models/pool.py index 477c9c9b3e24b..5ed15f309ff28 100644 --- a/airflow-core/src/airflow/models/pool.py +++ b/airflow-core/src/airflow/models/pool.py @@ -182,9 +182,8 @@ def slots_stats( query = with_row_locks(query, session=session, nowait=True) pool_rows = session.execute(query) - for pool_name, total_slots, include_deferred in pool_rows: - if total_slots == -1: - total_slots = float("inf") + for pool_name, total_slots_in, include_deferred in pool_rows: + total_slots = float("inf") if total_slots_in == -1 else total_slots_in pools[pool_name] = PoolStats( total=total_slots, running=0, queued=0, open=0, deferred=0, scheduled=0 ) @@ -201,9 +200,9 @@ def slots_stats( ) # calculate queued and running metrics - for pool_name, state, count in state_count_by_pool: + for pool_name, state, count_raw in state_count_by_pool: # Some databases return decimal.Decimal here. - count = int(count) + count = int(count_raw) stats_dict: PoolStats | None = pools.get(pool_name) if not stats_dict: diff --git a/airflow-core/src/airflow/serialization/serde.py b/airflow-core/src/airflow/serialization/serde.py index 6faedd88417a7..9d05ebe45830e 100644 --- a/airflow-core/src/airflow/serialization/serde.py +++ b/airflow-core/src/airflow/serialization/serde.py @@ -365,26 +365,23 @@ def _register(): _stringifiers.clear() with Stats.timer("serde.load_serializers") as timer: - for _, name, _ in iter_namespace(airflow.serialization.serializers): - name = import_module(name) - for s in getattr(name, "serializers", ()): - if not isinstance(s, str): - s = qualname(s) + for _, module_name, _ in iter_namespace(airflow.serialization.serializers): + name = import_module(module_name) + for attr_s in getattr(name, "serializers", ()): + s = attr_s if isinstance(attr_s, str) else qualname(attr_s) if s in _serializers and _serializers[s] != name: raise AttributeError(f"duplicate {s} for serialization in {name} and {_serializers[s]}") log.debug("registering %s for serialization", s) _serializers[s] = name - for d in getattr(name, "deserializers", ()): - if not isinstance(d, str): - d = qualname(d) + for attr_d in getattr(name, "deserializers", ()): + d = attr_d if isinstance(attr_d, str) else qualname(attr_d) if d in _deserializers and _deserializers[d] != name: raise AttributeError(f"duplicate {d} for deserialization in {name} and {_serializers[d]}") log.debug("registering %s for deserialization", d) _deserializers[d] = name _extra_allowed.add(d) - for c in getattr(name, "stringifiers", ()): - if not isinstance(c, str): - c = qualname(c) + for attr_c in getattr(name, "stringifiers", ()): + c = attr_c if isinstance(attr_c, str) else qualname(attr_c) if c in _deserializers and _deserializers[c] != name: raise AttributeError(f"duplicate {c} for stringifiers in {name} and {_stringifiers[c]}") log.debug("registering %s for stringifying", c) diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 387859d224f41..1e5dd14429538 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -1069,10 +1069,9 @@ def is_serialized(val): def _serialize_params_dict(cls, params: ParamsDict | dict) -> list[tuple[str, dict]]: """Serialize Params dict for a DAG or task as a list of tuples to ensure ordering.""" serialized_params = [] - for k, v in params.items(): - if isinstance(params, ParamsDict): - # Use native param object, not resolved value if possible - v = params.get_param(k) + for k, raw_v in params.items(): + # Use native param object, not resolved value if possible + v = params.get_param(k) if isinstance(params, ParamsDict) else raw_v try: class_identity = f"{v.__module__}.{v.__class__.__name__}" except AttributeError: @@ -1589,52 +1588,60 @@ def populate_operator( deserialized_partial_kwarg_defaults = {} - for k, v in encoded_op.items(): + for k_in, v_in in encoded_op.items(): # Use centralized field deserialization logic - if k in encoded_op.get("template_fields", []): - pass # Template fields are handled separately - elif k == "_operator_extra_links": + if k_in in encoded_op.get("template_fields", []): + # Template fields are handled separately + k = k_in + v = v_in + elif k_in == "_operator_extra_links": if cls._load_operator_extra_links: - op_predefined_extra_links = cls._deserialize_operator_extra_links(v) + op_predefined_extra_links = cls._deserialize_operator_extra_links(v_in) # If OperatorLinks with the same name exists, Links via Plugin have higher precedence op_predefined_extra_links.update(op_extra_links_from_plugin) else: op_predefined_extra_links = {} - v = list(op_predefined_extra_links.values()) k = "operator_extra_links" - - elif k == "params": - v = cls._deserialize_params_dict(v) - elif k == "partial_kwargs": + v = list(op_predefined_extra_links.values()) + elif k_in == "params": + k = k_in + v = cls._deserialize_params_dict(v_in) + elif k_in == "partial_kwargs": # Use unified deserializer that supports both encoded and non-encoded values - v = cls._deserialize_partial_kwargs(v, client_defaults) - elif k in {"expand_input", "op_kwargs_expand_input"}: - v = _ExpandInputRef(v["type"], cls.deserialize(v["value"])) - elif k == "operator_class": - v = {k_: cls.deserialize(v_) for k_, v_ in v.items()} - elif k == "_is_sensor": + k = k_in + v = cls._deserialize_partial_kwargs(v_in, client_defaults) + elif k_in in {"expand_input", "op_kwargs_expand_input"}: + k = k_in + v = _ExpandInputRef(v_in["type"], cls.deserialize(v_in["value"])) + elif k_in == "operator_class": + k = k_in + v = {k_: cls.deserialize(v_) for k_, v_ in v_in.items()} + elif k_in == "_is_sensor": from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep - if v is False: + if v_in is False: raise RuntimeError("_is_sensor=False should never have been serialized!") object.__setattr__(op, "deps", op.deps | {ReadyToRescheduleDep()}) continue elif ( - k in cls._decorated_fields - or k not in op.get_serialized_fields() - or k in ("outlets", "inlets") + k_in in cls._decorated_fields + or k_in not in op.get_serialized_fields() + or k_in in ("outlets", "inlets") ): - v = cls.deserialize(v) - elif k == "_on_failure_fail_dagrun": + k = k_in + v = cls.deserialize(v_in) + elif k_in == "_on_failure_fail_dagrun": k = "on_failure_fail_dagrun" - elif k == "weight_rule": + v = v_in + elif k_in == "weight_rule": k = "_weight_rule" - v = decode_priority_weight_strategy(v) + v = decode_priority_weight_strategy(v_in) else: # Apply centralized deserialization for all other fields - v = cls._deserialize_field_value(k, v) + k = k_in + v = cls._deserialize_field_value(k, v_in) # Handle field differences between SerializedBaseOperator and MappedOperator # Fields that exist in SerializedBaseOperator but not in MappedOperator need to go to partial_kwargs @@ -2566,13 +2573,14 @@ def _deserialize_dag_internal( # Note: Context is passed explicitly through method parameters, no class attributes needed - for k, v in encoded_dag.items(): - if k == "_downstream_task_ids": - v = set(v) - elif k == "tasks": + for k_in, v_in in encoded_dag.items(): + k = k_in + if k_in == "_downstream_task_ids": + v = set(v_in) + elif k_in == "tasks": SerializedBaseOperator._load_operator_extra_links = cls._load_operator_extra_links tasks = {} - for obj in v: + for obj in v_in: if obj.get(Encoding.TYPE) == DAT.OP: deser = SerializedBaseOperator.deserialize_operator( obj[Encoding.VAR], client_defaults @@ -2580,26 +2588,27 @@ def _deserialize_dag_internal( tasks[deser.task_id] = deser k = "task_dict" v = tasks - elif k == "timezone": - v = cls._deserialize_timezone(v) - elif k == "dagrun_timeout": - v = cls._deserialize_timedelta(v) - elif k.endswith("_date"): - v = cls._deserialize_datetime(v) - elif k == "edge_info": + elif k_in == "timezone": + v = cls._deserialize_timezone(v_in) + elif k_in == "dagrun_timeout": + v = cls._deserialize_timedelta(v_in) + elif k_in.endswith("_date"): + v = cls._deserialize_datetime(v_in) + elif k_in == "edge_info": # Value structure matches exactly - pass - elif k == "timetable": - v = decode_timetable(v) - elif k == "weight_rule": - v = decode_priority_weight_strategy(v) - elif k in cls._decorated_fields: - v = cls.deserialize(v) - elif k == "params": - v = cls._deserialize_params_dict(v) - elif k == "tags": - v = set(v) - # else use v as it is + v = v_in + elif k_in == "timetable": + v = decode_timetable(v_in) + elif k_in == "weight_rule": + v = decode_priority_weight_strategy(v_in) + elif k_in in cls._decorated_fields: + v = cls.deserialize(v_in) + elif k_in == "params": + v = cls._deserialize_params_dict(v_in) + elif k_in == "tags": + v = set(v_in) + else: + v = v_in object.__setattr__(dag, k, v) diff --git a/airflow-core/tests/unit/cli/commands/test_connection_command.py b/airflow-core/tests/unit/cli/commands/test_connection_command.py index 861c46aaa5c17..920bbfa1e2d9d 100644 --- a/airflow-core/tests/unit/cli/commands/test_connection_command.py +++ b/airflow-core/tests/unit/cli/commands/test_connection_command.py @@ -86,10 +86,10 @@ def setup_method(self): def test_cli_connections_list_as_json(self): args = self.parser.parse_args(["connections", "list", "--output", "json"]) - with redirect_stdout(StringIO()) as stdout: + with redirect_stdout(StringIO()) as stdout_io: connection_command.connections_list(args) - print(stdout.getvalue()) - stdout = stdout.getvalue() + print(stdout_io.getvalue()) + stdout = stdout_io.getvalue() for conn_id, conn_type in self.EXPECTED_CONS: assert conn_type in stdout assert conn_id in stdout diff --git a/airflow-core/tests/unit/cli/test_cli_parser.py b/airflow-core/tests/unit/cli/test_cli_parser.py index 70c0555bc1666..94dd8b89639db 100644 --- a/airflow-core/tests/unit/cli/test_cli_parser.py +++ b/airflow-core/tests/unit/cli/test_cli_parser.py @@ -288,9 +288,9 @@ def test_commands_and_command_group_sections(self): with contextlib.redirect_stdout(StringIO()) as stdout: with pytest.raises(SystemExit): parser.parse_args(["--help"]) - stdout = stdout.getvalue() - assert "Commands" in stdout - assert "Groups" in stdout + stdout_val = stdout.getvalue() + assert "Commands" in stdout_val + assert "Groups" in stdout_val def test_dag_parser_commands_and_comamnd_group_sections(self): parser = cli_parser.get_parser(dag_parser=True) @@ -298,9 +298,9 @@ def test_dag_parser_commands_and_comamnd_group_sections(self): with contextlib.redirect_stdout(StringIO()) as stdout: with pytest.raises(SystemExit): parser.parse_args(["--help"]) - stdout = stdout.getvalue() - assert "Commands" in stdout - assert "Groups" in stdout + stdout_val = stdout.getvalue() + assert "Commands" in stdout_val + assert "Groups" in stdout_val def test_should_display_help(self): parser = cli_parser.get_parser() @@ -384,8 +384,10 @@ def test_executor_specific_commands_not_accessible(self, command): parser = cli_parser.get_parser() with pytest.raises(SystemExit): parser.parse_args([command]) - stderr = stderr.getvalue() - assert (f"airflow command error: argument GROUP_OR_COMMAND: invalid choice: '{command}'") in stderr + stderr_val = stderr.getvalue() + assert ( + f"airflow command error: argument GROUP_OR_COMMAND: invalid choice: '{command}'" + ) in stderr_val @pytest.mark.parametrize( ("executor", "expected_args"), @@ -412,8 +414,8 @@ def test_cli_parser_executors(self, executor, expected_args): with pytest.raises(SystemExit) as e: # running the help command exits, so we prevent that parser.parse_args([expected_arg, "--help"]) assert e.value.code == 0, stderr.getvalue() # return code 0 == no problem - stderr = stderr.getvalue() - assert "airflow command error" not in stderr + stderr_val = stderr.getvalue() + assert "airflow command error" not in stderr_val def test_non_existing_directory_raises_when_metavar_is_dir_for_db_export_cleaned(self): """Test that the error message is correct when the directory does not exist.""" diff --git a/airflow-core/tests/unit/core/test_impersonation_tests.py b/airflow-core/tests/unit/core/test_impersonation_tests.py index c4ca10d71dfc4..fd10074059021 100644 --- a/airflow-core/tests/unit/core/test_impersonation_tests.py +++ b/airflow-core/tests/unit/core/test_impersonation_tests.py @@ -58,9 +58,8 @@ def set_permissions(settings: dict[Path | str, int]): orig_permissions = [] try: print(" Change file/directory permissions ".center(72, "+")) - for path, mode in settings.items(): - if isinstance(path, str): - path = Path(path) + for path_raw, mode in settings.items(): + path = Path(path_raw) if isinstance(path_raw, str) else path_raw if len(path.parts) <= 1: raise SystemError(f"Unable to change permission for the root directory: {path}.") diff --git a/airflow-core/tests/unit/models/test_cleartasks.py b/airflow-core/tests/unit/models/test_cleartasks.py index ca2fa19a03e49..38f36864aa357 100644 --- a/airflow-core/tests/unit/models/test_cleartasks.py +++ b/airflow-core/tests/unit/models/test_cleartasks.py @@ -659,8 +659,8 @@ def _get_ti(old_ti): SerializedDAG.clear_dags(dags, only_failed=True) - for ti in tis: - ti = _get_ti(ti) + for ti_in in tis: + ti = _get_ti(ti_in) if ti.dag_id == ti_fail.dag_id: assert ti.state == State.NONE assert ti.try_number == 2 diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 0e2978124ea35..d262f8a4b075d 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -2175,9 +2175,9 @@ def do_something_else(i): ti.map_index = 0 task = ti.task for map_index in range(1, 5): - ti = TI(task, run_id=dr.run_id, map_index=map_index, dag_version_id=ti.dag_version_id) - session.add(ti) - ti.dag_run = dr + ti_new = TI(task, run_id=dr.run_id, map_index=map_index, dag_version_id=ti.dag_version_id) + session.add(ti_new) + ti_new.dag_run = dr else: # run tasks "do_something" to get XCOMs for correct downstream length ti.run() diff --git a/airflow-core/tests/unit/plugins/test_plugin_ignore.py b/airflow-core/tests/unit/plugins/test_plugin_ignore.py index 1eeb9a559bbcf..1f93a96abb6b3 100644 --- a/airflow-core/tests/unit/plugins/test_plugin_ignore.py +++ b/airflow-core/tests/unit/plugins/test_plugin_ignore.py @@ -77,8 +77,8 @@ def test_find_not_should_ignore_path_regexp(self, tmp_path): "test_load_sub1.py", } ignore_list_file = ".airflowignore" - for file_path in find_path_from_directory(plugin_folder_path, ignore_list_file, "regexp"): - file_path = Path(file_path) + for file_path_raw in find_path_from_directory(plugin_folder_path, ignore_list_file, "regexp"): + file_path = Path(file_path_raw) if file_path.is_file() and file_path.suffix == ".py": detected_files.add(file_path.name) assert detected_files == should_not_ignore_files @@ -104,8 +104,8 @@ def test_find_not_should_ignore_path_glob(self, tmp_path): } ignore_list_file = ".airflowignore_glob" print("-" * 20) - for file_path in find_path_from_directory(plugin_folder_path, ignore_list_file, "glob"): - file_path = Path(file_path) + for file_path_raw in find_path_from_directory(plugin_folder_path, ignore_list_file, "glob"): + file_path = Path(file_path_raw) if file_path.is_file() and file_path.suffix == ".py": detected_files.add(file_path.name) print(file_path) diff --git a/chart/docs/conf.py b/chart/docs/conf.py index c0d022eab980a..1250c27bfa09e 100644 --- a/chart/docs/conf.py +++ b/chart/docs/conf.py @@ -204,9 +204,8 @@ def _format_examples(param_name: str, schema: dict) -> str | None: # Nicer to have the parameter name shown as well out = "" - for ex in schema["examples"]: - if schema["type"] == "array": - ex = [ex] + for ex_data in schema["examples"]: + ex = [ex_data] if schema["type"] == "array" else ex_data out += yaml.dump({param_name: ex}) return out diff --git a/dev/airflow-license b/dev/airflow-license index 2144f54b87714..9958f4b7d5288 100755 --- a/dev/airflow-license +++ b/dev/airflow-license @@ -74,8 +74,8 @@ if __name__ == "__main__": notices = get_notices() - for notice in notices: - notice = notice[0] + for notice_arr in notices: + notice = notice_arr[0] license = parse_license_file(notice[1]) print(f"{notice[1]:<30}|{notice[2][:50]:<50}||{notice[0]:<20}||{license:<10}") diff --git a/dev/breeze/src/airflow_breeze/commands/release_management_commands.py b/dev/breeze/src/airflow_breeze/commands/release_management_commands.py index 9ab28c7173b8d..b4e7f06f486f9 100644 --- a/dev/breeze/src/airflow_breeze/commands/release_management_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/release_management_commands.py @@ -2315,11 +2315,11 @@ def is_package_in_dist(dist_files: list[str], package: str) -> bool: def get_suffix_from_package_in_dist(dist_files: list[str], package: str) -> str | None: """Get suffix from package prepared in dist folder.""" - for file in dist_files: - if file.startswith(f"apache_airflow_providers_{package.replace('.', '_')}") and file.endswith( + for filename in dist_files: + if filename.startswith(f"apache_airflow_providers_{package.replace('.', '_')}") and filename.endswith( ".tar.gz" ): - file = file[: -len(".tar.gz")] + file = filename[: -len(".tar.gz")] version = file.split("-")[-1] match = VERSION_MATCH.match(version) if match: diff --git a/dev/breeze/src/airflow_breeze/global_constants.py b/dev/breeze/src/airflow_breeze/global_constants.py index 8bd01a6135b5d..5a8b4af7a158c 100644 --- a/dev/breeze/src/airflow_breeze/global_constants.py +++ b/dev/breeze/src/airflow_breeze/global_constants.py @@ -642,9 +642,9 @@ def get_task_sdk_version(): def get_airflow_extras(): airflow_dockerfile = AIRFLOW_ROOT_PATH / "Dockerfile" with open(airflow_dockerfile) as dockerfile: - for line in dockerfile.readlines(): - if "ARG AIRFLOW_EXTRAS=" in line: - line = line.split("=")[1].strip() + for line_raw in dockerfile.readlines(): + if "ARG AIRFLOW_EXTRAS=" in line_raw: + line = line_raw.split("=")[1].strip() return line.replace('"', "") diff --git a/dev/breeze/src/airflow_breeze/utils/add_back_references.py b/dev/breeze/src/airflow_breeze/utils/add_back_references.py index a87aa1ebb9b9b..3b6a4c25b4a84 100644 --- a/dev/breeze/src/airflow_breeze/utils/add_back_references.py +++ b/dev/breeze/src/airflow_breeze/utils/add_back_references.py @@ -51,8 +51,8 @@ def download_file(url): def construct_old_to_new_tuple_mapping(file_name: Path) -> list[tuple[str, str]]: old_to_new_tuples: list[tuple[str, str]] = [] with file_name.open() as f: - for line in f: - line = line.strip() + for line_raw in f: + line = line_raw.strip() if line and not line.startswith("#"): old_path, new_path = line.split(" ") old_path = old_path.replace(".rst", ".html") diff --git a/dev/breeze/src/airflow_breeze/utils/constraints_version_check.py b/dev/breeze/src/airflow_breeze/utils/constraints_version_check.py index 88f74595815c9..b951bda51e68b 100755 --- a/dev/breeze/src/airflow_breeze/utils/constraints_version_check.py +++ b/dev/breeze/src/airflow_breeze/utils/constraints_version_check.py @@ -244,8 +244,8 @@ def constraints_version_check( def parse_packages_from_lines(lines: list[str], selected_packages: set[str] | None) -> list[tuple[str, str]]: remaining_packages: set[str] = selected_packages.copy() if selected_packages else set() packages = [] - for line in lines: - line = line.strip() + for line_raw in lines: + line = line_raw.strip() if line and not line.startswith("#") and "@" not in line: match = re.match(r"^([a-zA-Z0-9_.\-]+)==([\w.\-]+)$", line) if match: diff --git a/dev/breeze/src/airflow_breeze/utils/gh_workflow_utils.py b/dev/breeze/src/airflow_breeze/utils/gh_workflow_utils.py index e4f45f976b137..7881eef9eaed0 100644 --- a/dev/breeze/src/airflow_breeze/utils/gh_workflow_utils.py +++ b/dev/breeze/src/airflow_breeze/utils/gh_workflow_utils.py @@ -40,10 +40,12 @@ def tigger_workflow(workflow_name: str, repo: str, branch: str = "main", **kwarg command = ["gh", "workflow", "run", workflow_name, "--ref", branch, "--repo", repo] # These are the input parameters to workflow - for key, value in kwargs.items(): + for key, value_raw in kwargs.items(): # GH cli requires bool inputs to be converted to string format - if isinstance(value, bool): - value = "true" if value else "false" + if isinstance(value_raw, bool): + value = "true" if value_raw else "false" + else: + value = value_raw command.extend(["-f", f"{key}={value}"]) diff --git a/dev/breeze/src/airflow_breeze/utils/projects_google_spreadsheet.py b/dev/breeze/src/airflow_breeze/utils/projects_google_spreadsheet.py index 1a7992e893de8..364b50be0c07f 100644 --- a/dev/breeze/src/airflow_breeze/utils/projects_google_spreadsheet.py +++ b/dev/breeze/src/airflow_breeze/utils/projects_google_spreadsheet.py @@ -77,8 +77,8 @@ def read_metadata_from_google_spreadsheet(sheets: Resource): metadata_types.append(MetadataFromSpreadsheet[metadata_field]) metadata_from_spreadsheet[MetadataFromSpreadsheet[metadata_field]] = [] for row in range["values"][1:]: - for index, value in enumerate(row): - value = value.strip() + for index, value_raw in enumerate(row): + value = value_raw.strip() if value: metadata_from_spreadsheet[metadata_types[index]].append(value) get_console().print("[success]Metadata read from Google Spreadsheet.") diff --git a/dev/chart/build_changelog_annotations.py b/dev/chart/build_changelog_annotations.py index a72d8dcfde802..7dbe95d71feba 100755 --- a/dev/chart/build_changelog_annotations.py +++ b/dev/chart/build_changelog_annotations.py @@ -89,8 +89,8 @@ def get_entry(section: str, description: str, pr_number: int | None) -> dict[str section = "" entries = [] with open("chart/RELEASE_NOTES.rst") as f: - for line in f: - line = line.strip() + for line_raw in f: + line = line_raw.strip() if not line or line.startswith(('"""', "----", "^^^^")): pass elif line.startswith("Airflow Helm Chart"): diff --git a/dev/check_files.py b/dev/check_files.py index 222d59e8e3934..921c0d4b6398a 100644 --- a/dev/check_files.py +++ b/dev/check_files.py @@ -112,9 +112,9 @@ def check_providers(files: list[str], release_date: str): ] ) missing_list.extend(check_all_files(expected_files=expected_files, actual_files=files)) - for name, version in get_packages(): - print(f"Checking {name} {version}") - version = strip_rc_suffix(version) + for name, version_raw in get_packages(): + print(f"Checking {name} {version_raw}") + version = strip_rc_suffix(version_raw) expected_files = expand_name_variations( [ f"{name.replace('-', '_')}-{version}.tar.gz", diff --git a/devel-common/src/sphinx_exts/operators_and_hooks_ref.py b/devel-common/src/sphinx_exts/operators_and_hooks_ref.py index bfd0d155e3737..489aebf70fdf4 100644 --- a/devel-common/src/sphinx_exts/operators_and_hooks_ref.py +++ b/devel-common/src/sphinx_exts/operators_and_hooks_ref.py @@ -300,8 +300,8 @@ def _render_deprecations_content(*, header_separator: str): for provider_yaml_path in get_all_provider_yaml_paths(): provider_parent_path = Path(provider_yaml_path).parent provider_info: dict[str, Any] = {"name": "", "deprecations": []} - for root, _, file_names in os.walk(provider_parent_path): - file_names = [f for f in file_names if f.endswith(".py") and f != "__init__.py"] + for root, _, file_names_raw in os.walk(provider_parent_path): + file_names = [f for f in file_names_raw if f.endswith(".py") and f != "__init__.py"] for file_name in file_names: file_path = f"{os.path.relpath(root)}/{file_name}" with open(file_path) as file: diff --git a/devel-common/src/sphinx_exts/substitution_extensions.py b/devel-common/src/sphinx_exts/substitution_extensions.py index 16ae5d128e51d..97545ac5fdc47 100644 --- a/devel-common/src/sphinx_exts/substitution_extensions.py +++ b/devel-common/src/sphinx_exts/substitution_extensions.py @@ -83,8 +83,8 @@ def condition(node): new_text = new_text.replace(f"|{name}|", replacement) # Only replace if the text actually changed if new_text != str(child): - child = nodes.Text(new_text) - node.replace(old_child, child) + child_new = nodes.Text(new_text) + node.replace(old_child, child_new) # For non-Text nodes, do not replace # The highlighter checks this -- without this, it will refuse to apply highlighting diff --git a/devel-common/src/tests_common/_internals/forbidden_warnings.py b/devel-common/src/tests_common/_internals/forbidden_warnings.py index 6e231160e70b9..53409b644bd5c 100644 --- a/devel-common/src/tests_common/_internals/forbidden_warnings.py +++ b/devel-common/src/tests_common/_internals/forbidden_warnings.py @@ -46,8 +46,8 @@ def __init__(self, config: pytest.Config, forbidden_warnings: tuple[str, ...]): "tests/dags_corrupted/", "tests/dags_with_system_exit/", } - for path in self.deprecations_ignore: - path = Path(path).resolve() + for path_raw in self.deprecations_ignore: + path = Path(path_raw).resolve() with path.open() as fp: excluded_cases.update(yaml.safe_load(fp)) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena.py index 952d471eb2307..1d681dbf701e0 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena.py @@ -40,11 +40,14 @@ def query_params_to_string(params: dict[str, str | Collection[str]]) -> str: result = "" - for key, value in params.items(): + for key, value_org in params.items(): if key == "QueryString": value = ( - MULTI_LINE_QUERY_LOG_PREFIX + str(value).replace("\n", MULTI_LINE_QUERY_LOG_PREFIX).rstrip() + MULTI_LINE_QUERY_LOG_PREFIX + + str(value_org).replace("\n", MULTI_LINE_QUERY_LOG_PREFIX).rstrip() ) + else: + value = value_org result += f"\t{key}: {value}\n" return result.rstrip() diff --git a/providers/amazon/src/airflow/providers/amazon/aws/secrets/secrets_manager.py b/providers/amazon/src/airflow/providers/amazon/aws/secrets/secrets_manager.py index 056c073a87813..47f99afd78767 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/secrets/secrets_manager.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/secrets/secrets_manager.py @@ -187,10 +187,9 @@ def _standardize_secret_keys(self, secret: dict[str, Any]) -> dict[str, Any]: } for conn_field, extra_words in self.extra_conn_words.items(): - if conn_field == "user": - # Support `user` for backwards compatibility. - conn_field = "login" - possible_words_for_conn_fields[conn_field].extend(extra_words) + # Support `user` for backwards compatibility. + conn_field_backcompat = "login" if conn_field == "user" else conn_field + possible_words_for_conn_fields[conn_field_backcompat].extend(extra_words) conn_d: dict[str, Any] = {} for conn_field, possible_words in possible_words_for_conn_fields.items(): diff --git a/providers/amazon/src/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py b/providers/amazon/src/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py index 266648f1dc151..d50fcfd00aef6 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/dynamodb_to_s3.py @@ -216,9 +216,9 @@ def _export_entire_data(self): scan_kwargs = copy(self.dynamodb_scan_kwargs) if self.dynamodb_scan_kwargs else {} err = None f: IO[Any] - with NamedTemporaryFile() as f: + with NamedTemporaryFile() as f_tmp: try: - f = self._scan_dynamodb_and_upload_to_s3(f, scan_kwargs, table) + f = self._scan_dynamodb_and_upload_to_s3(f_tmp, scan_kwargs, table) except Exception as e: err = e raise e diff --git a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py index 31a53b0fd25cf..a2e0fc5e60336 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py @@ -304,9 +304,8 @@ def _partition_dataframe( group_df.reset_index(drop=True), ) elif isinstance(df, pl.DataFrame): - for group_label, group_df in df.group_by(**self.groupby_kwargs): # type: ignore[assignment] - if random_column_name: - group_df = group_df.drop(random_column_name) + for group_label, group_df_in in df.group_by(**self.groupby_kwargs): # type: ignore[assignment] + group_df = group_df_in.drop(random_column_name) if random_column_name else group_df_in yield ( cast("str", group_label[0] if isinstance(group_label, tuple) else group_label), group_df, diff --git a/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py b/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py index ec04234c687a5..f01365fa37023 100644 --- a/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py +++ b/providers/amazon/tests/unit/amazon/aws/executors/ecs/test_ecs_executor.py @@ -1420,7 +1420,7 @@ def test_config_defaults_are_applied(self, assign_subnets): task_kwargs = _recursive_flatten_dict(ecs_executor_config.build_task_kwargs(conf)) found_keys = {convert_camel_to_snake(key): key for key in task_kwargs.keys()} - for expected_key, expected_value in CONFIG_DEFAULTS.items(): + for expected_key, expected_value_raw in CONFIG_DEFAULTS.items(): # conn_id, max_run_task_attempts, and check_health_on_startup are used by the executor, # but are not expected to appear in the task_kwargs. if expected_key in [ @@ -1432,8 +1432,11 @@ def test_config_defaults_are_applied(self, assign_subnets): else: assert expected_key in found_keys.keys() # Make sure to convert "assign_public_ip" from True/False to ENABLE/DISABLE. - if expected_key is AllEcsConfigKeys.ASSIGN_PUBLIC_IP: - expected_value = parse_assign_public_ip(expected_value) + expected_value = ( + parse_assign_public_ip(expected_value_raw) + if expected_key is AllEcsConfigKeys.ASSIGN_PUBLIC_IP + else expected_value_raw + ) assert expected_value == task_kwargs[found_keys[expected_key]] def test_provided_values_override_defaults(self, assign_subnets, assign_container_name, monkeypatch): diff --git a/providers/amazon/tests/unit/amazon/aws/hooks/test_hooks_signature.py b/providers/amazon/tests/unit/amazon/aws/hooks/test_hooks_signature.py index 949ff487aea03..8ec01fb2dcab9 100644 --- a/providers/amazon/tests/unit/amazon/aws/hooks/test_hooks_signature.py +++ b/providers/amazon/tests/unit/amazon/aws/hooks/test_hooks_signature.py @@ -105,11 +105,11 @@ def validate_hook(hook: type[AwsGenericHook], hook_name: str, hook_module: str) hook_extra_parameters = set() for k, v in inspect.signature(hook.__init__).parameters.items(): if v.kind == inspect.Parameter.VAR_POSITIONAL: - k = "*args" + hook_extra_parameters.add("*args") elif v.kind == inspect.Parameter.VAR_KEYWORD: - k = "**kwargs" - - hook_extra_parameters.add(k) + hook_extra_parameters.add("**kwargs") + else: + hook_extra_parameters.add(k) hook_extra_parameters.difference_update({"self", "*args", "**kwargs"}) allowed_parameters = ALLOWED_THICK_HOOKS_PARAMETERS.get(hook_name, set()) diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_processing.py b/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_processing.py index 15de17d70a4b6..cf90dc76ed83e 100644 --- a/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_processing.py +++ b/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_processing.py @@ -144,9 +144,9 @@ def test_integer_fields_with_stopping_condition(self, _, __, ___, mock_desc): assert ( sagemaker.integer_fields == EXPECTED_INTEGER_FIELDS + EXPECTED_STOPPING_CONDITION_INTEGER_FIELDS ) - for key1, key2, *key3 in EXPECTED_INTEGER_FIELDS: - if key3: - (key3,) = key3 + for key1, key2, *key3_raw in EXPECTED_INTEGER_FIELDS: + if key3_raw: + (key3,) = key3_raw assert sagemaker.config[key1][key2][key3] == int(sagemaker.config[key1][key2][key3]) else: sagemaker.config[key1][key2] == int(sagemaker.config[key1][key2]) diff --git a/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_transform.py b/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_transform.py index 6da19585ba546..a6640afba5943 100644 --- a/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_transform.py +++ b/providers/amazon/tests/unit/amazon/aws/operators/test_sagemaker_transform.py @@ -98,9 +98,9 @@ def test_integer_fields(self, _, mock_create_transform, __, ___, mock_desc): } self.sagemaker.execute(None) assert self.sagemaker.integer_fields == EXPECTED_INTEGER_FIELDS - for key1, key2, *key3 in EXPECTED_INTEGER_FIELDS: - if key3: - (key3,) = key3 + for key1, key2, *key3_org in EXPECTED_INTEGER_FIELDS: + if key3_org: + (key3,) = key3_org assert self.sagemaker.config[key1][key2][key3] == int(self.sagemaker.config[key1][key2][key3]) else: self.sagemaker.config[key1][key2] == int(self.sagemaker.config[key1][key2]) diff --git a/providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py b/providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py index f7bf7fc3b8c3b..eadf007f0df35 100644 --- a/providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py +++ b/providers/apache/beam/src/airflow/providers/apache/beam/hooks/beam.py @@ -141,8 +141,8 @@ def process_fd( fd_to_log = {proc.stderr: log.warning, proc.stdout: log.info} func_log = fd_to_log[fd] - for line in iter(fd.readline, b""): - line = line.decode() + for line_raw in iter(fd.readline, b""): + line = line_raw.decode() if process_line_callback: process_line_callback(line) func_log(line.rstrip("\n")) diff --git a/providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py b/providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py index 9b43bfb0e6743..5ba56ff2ec277 100644 --- a/providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py +++ b/providers/apache/hive/src/airflow/providers/apache/hive/hooks/hive.py @@ -320,8 +320,8 @@ def run_cli( ) self.sub_process = sub_process stdout = "" - for line in iter(sub_process.stdout.readline, b""): - line = line.decode() + for line_raw in iter(sub_process.stdout.readline, b""): + line = line_raw.decode() stdout += line if verbose: self.log.info(line.strip()) @@ -336,24 +336,23 @@ def test_hql(self, hql: str) -> None: """Test an hql statement using the hive cli and EXPLAIN.""" create, insert, other = [], [], [] for query in hql.split(";"): # naive - query_original = query - query = query.lower().strip() - - if query.startswith("create table"): - create.append(query_original) - elif query.startswith(("set ", "add jar ", "create temporary function")): - other.append(query_original) - elif query.startswith("insert"): - insert.append(query_original) + query_lower = query.lower().strip() + + if query_lower.startswith("create table"): + create.append(query) + elif query_lower.startswith(("set ", "add jar ", "create temporary function")): + other.append(query) + elif query_lower.startswith("insert"): + insert.append(query) other_ = ";".join(other) for query_set in [create, insert]: - for query in query_set: - query_preview = " ".join(query.split())[:50] + for query_item in query_set: + query_preview = " ".join(query_item.split())[:50] self.log.info("Testing HQL [%s (...)]", query_preview) if query_set == insert: - query = other_ + "; explain " + query + query = other_ + "; explain " + query_item else: - query = "explain " + query + query = "explain " + query_item try: self.run_cli(query, verbose=False) except AirflowException as e: diff --git a/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py b/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py index 4a32e93d0cdb0..539c6579bf8fc 100644 --- a/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py +++ b/providers/apache/spark/src/airflow/providers/apache/spark/hooks/spark_submit.py @@ -600,8 +600,8 @@ def _process_spark_submit_log(self, itr: Iterator[Any]) -> None: :param itr: An iterator which iterates over the input of the subprocess """ # Consume the iterator - for line in itr: - line = line.strip() + for line_raw in itr: + line = line_raw.strip() # If we run yarn cluster mode, we want to extract the application id from # the logs so we can kill the application when we stop it unexpectedly if self._is_yarn and self._connection["deploy_mode"] == "cluster": @@ -652,8 +652,8 @@ def _process_spark_status_log(self, itr: Iterator[Any]) -> None: driver_found = False valid_response = False # Consume the iterator - for line in itr: - line = line.strip() + for line_raw in itr: + line = line_raw.strip() # A valid Spark status response should contain a submissionId if "submissionId" in line: diff --git a/providers/databricks/src/airflow/providers/databricks/hooks/databricks_sql.py b/providers/databricks/src/airflow/providers/databricks/hooks/databricks_sql.py index f7619bfbb2ef8..89bec905cdbab 100644 --- a/providers/databricks/src/airflow/providers/databricks/hooks/databricks_sql.py +++ b/providers/databricks/src/airflow/providers/databricks/hooks/databricks_sql.py @@ -278,7 +278,7 @@ def run( self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters) # when using AAD tokens, it could expire if previous query run longer than token lifetime conn = self.get_conn() - with closing(conn.cursor()) as cur: + with closing(conn.cursor()): self.set_autocommit(conn, autocommit) with closing(conn.cursor()) as cur: diff --git a/providers/databricks/src/airflow/providers/databricks/sensors/databricks_partition.py b/providers/databricks/src/airflow/providers/databricks/sensors/databricks_partition.py index 579cc500bcd7d..c63cedb0bbf38 100644 --- a/providers/databricks/src/airflow/providers/databricks/sensors/databricks_partition.py +++ b/providers/databricks/src/airflow/providers/databricks/sensors/databricks_partition.py @@ -189,9 +189,10 @@ def _generate_partition_query( formatted_opts = "" if opts: output_list = [] - for partition_col, partition_value in opts.items(): - if escape_key: - partition_col = self.escaper.escape_item(partition_col) + for partition_col_raw, partition_value in opts.items(): + partition_col = ( + self.escaper.escape_item(partition_col_raw) if escape_key else partition_col_raw + ) if partition_col in partition_columns: if isinstance(partition_value, list): output_list.append(f"""{partition_col} in {tuple(partition_value)}""") diff --git a/providers/docker/src/airflow/providers/docker/operators/docker.py b/providers/docker/src/airflow/providers/docker/operators/docker.py index 045c82d807a8f..2efbdd2690cdd 100644 --- a/providers/docker/src/airflow/providers/docker/operators/docker.py +++ b/providers/docker/src/airflow/providers/docker/operators/docker.py @@ -59,8 +59,8 @@ def stringify(line: str | bytes): def fetch_logs(log_stream, log: Logger): log_lines = [] - for log_chunk in log_stream: - log_chunk = stringify(log_chunk).rstrip() + for log_chunk_raw in log_stream: + log_chunk = stringify(log_chunk_raw).rstrip() log_lines.append(log_chunk) for log_chunk_line in log_chunk.split("\n"): log.info("%s", log_chunk_line) diff --git a/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/__init__.py b/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/__init__.py index a612e16630ff8..40072c6372130 100644 --- a/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/__init__.py +++ b/providers/elasticsearch/tests/unit/elasticsearch/log/elasticmock/__init__.py @@ -62,9 +62,8 @@ def _normalize_hosts(hosts): out = [] - for host in hosts: - if "://" not in host: - host = f"//{host}" + for host_raw in hosts: + host = f"//{host_raw}" if "://" not in host_raw else host_raw parsed_url = urlparse(host) h = {"host": parsed_url.hostname} diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/api_endpoints/user_endpoint.py b/providers/fab/src/airflow/providers/fab/auth_manager/api_endpoints/user_endpoint.py index aecbbb192a0dc..8bee2c4431cb6 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/api_endpoints/user_endpoint.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/api_endpoints/user_endpoint.py @@ -162,8 +162,8 @@ def patch_user(*, username: str, update_mask: UpdateMask = None) -> APIResponse: if update_mask is not None: masked_data = {} missing_mask_names = [] - for field in update_mask: - field = field.strip() + for field_raw in update_mask: + field = field_raw.strip() try: masked_data[field] = data[field] except KeyError: diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py index 4b4c2576b49aa..bf32cbf531624 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/security_manager/override.py @@ -1049,7 +1049,7 @@ def _get_or_create_dag_permission(action_name: str, dag_resource_name: str) -> P self.remove_permission_from_role(role, perm) # Adding the access control permissions - for rolename, resource_actions in access_control.items(): + for rolename, resource_actions_raw in access_control.items(): role = self.find_role(rolename) if not role: raise AirflowException( @@ -1057,9 +1057,12 @@ def _get_or_create_dag_permission(action_name: str, dag_resource_name: str) -> P f"'{rolename}', but that role does not exist" ) - if not isinstance(resource_actions, dict): - # Support for old-style access_control where only the actions are specified - resource_actions = {permissions.RESOURCE_DAG: set(resource_actions)} + # Support for old-style access_control where only the actions are specified + resource_actions = ( + resource_actions_raw + if isinstance(resource_actions_raw, dict) + else {permissions.RESOURCE_DAG: set(resource_actions_raw)} + ) for resource_name, actions in resource_actions.items(): if resource_name not in self.RESOURCE_DETAILS_MAP: diff --git a/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_role_command.py b/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_role_command.py index 95d8d6e0cc05f..49281b6c9d2ae 100644 --- a/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_role_command.py +++ b/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_role_command.py @@ -115,9 +115,9 @@ def test_cli_list_roles(self): self.appbuilder.sm.add_role("FakeTeamA") self.appbuilder.sm.add_role("FakeTeamB") - with redirect_stdout(StringIO()) as stdout: + with redirect_stdout(StringIO()) as stdout_io: role_command.roles_list(self.parser.parse_args(["roles", "list"])) - stdout = stdout.getvalue() + stdout = stdout_io.getvalue() assert "FakeTeamA" in stdout assert "FakeTeamB" in stdout diff --git a/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_user_command.py b/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_user_command.py index 987a2042d4376..05ca349a310ce 100644 --- a/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_user_command.py +++ b/providers/fab/tests/unit/fab/auth_manager/cli_commands/test_user_command.py @@ -248,9 +248,9 @@ def test_cli_list_users(self): ] ) user_command.users_create(args) - with redirect_stdout(StringIO()) as stdout: + with redirect_stdout(StringIO()) as stdout_io: user_command.users_list(self.parser.parse_args(["users", "list"])) - stdout = stdout.getvalue() + stdout = stdout_io.getvalue() for i in range(3): assert f"user{i}" in stdout diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py index b9764ba1c4d46..86d02ee721982 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py @@ -1800,26 +1800,29 @@ def _prepare_query_configuration( if cluster_fields: cluster_fields = {"fields": cluster_fields} # type: ignore - query_param_list: list[tuple[Any, str, str | bool | None | dict, type | tuple[type]]] = [ - (sql, "query", None, (str,)), - (priority, "priority", priority, (str,)), - (use_legacy_sql, "useLegacySql", self.use_legacy_sql, bool), - (query_params, "queryParameters", None, list), - (udf_config, "userDefinedFunctionResources", None, list), - (maximum_billing_tier, "maximumBillingTier", None, int), - (maximum_bytes_billed, "maximumBytesBilled", None, float), - (time_partitioning, "timePartitioning", {}, dict), - (range_partitioning, "rangePartitioning", {}, dict), - (schema_update_options, "schemaUpdateOptions", None, list), - (destination_dataset_table, "destinationTable", None, dict), - (cluster_fields, "clustering", None, dict), + query_param_list: list[tuple[Any, str, type | tuple[type]]] = [ + (sql, "query", (str,)), + (priority, "priority", (str,)), + (use_legacy_sql, "useLegacySql", bool), + (query_params, "queryParameters", list), + (udf_config, "userDefinedFunctionResources", list), + (maximum_billing_tier, "maximumBillingTier", int), + (maximum_bytes_billed, "maximumBytesBilled", float), + (time_partitioning, "timePartitioning", dict), + (range_partitioning, "rangePartitioning", dict), + (schema_update_options, "schemaUpdateOptions", list), + (destination_dataset_table, "destinationTable", dict), + (cluster_fields, "clustering", dict), ] - for param, param_name, param_default, param_type in query_param_list: - if param_name not in configuration["query"] and param in [None, {}, ()]: + for param_raw, param_name, param_type in query_param_list: + if param_name not in configuration["query"] and param_raw in [None, {}, ()]: if param_name == "timePartitioning": - param_default = _cleanse_time_partitioning(destination_dataset_table, time_partitioning) - param = param_default + param = _cleanse_time_partitioning(destination_dataset_table, time_partitioning) + else: + param = param_raw + else: + param = param_raw if param in [None, {}, ()]: continue diff --git a/providers/google/src/airflow/providers/google/cloud/operators/translate.py b/providers/google/src/airflow/providers/google/cloud/operators/translate.py index e107bcd820269..309bda4545562 100644 --- a/providers/google/src/airflow/providers/google/cloud/operators/translate.py +++ b/providers/google/src/airflow/providers/google/cloud/operators/translate.py @@ -1654,8 +1654,8 @@ def execute(self, context: Context) -> Sequence[str]: raise AirflowException(e) result_ids = [] - for glossary_item in results_pager: - glossary_item = type(glossary_item).to_dict(glossary_item) + for glossary_item_raw in results_pager: + glossary_item = type(glossary_item_raw).to_dict(glossary_item_raw) glossary_id = hook.extract_object_id(glossary_item) result_ids.append(glossary_id) self.log.info("Fetching the glossaries list complete. Glossary id-s: %s", result_ids) diff --git a/providers/google/src/airflow/providers/google/cloud/transfers/sql_to_gcs.py b/providers/google/src/airflow/providers/google/cloud/transfers/sql_to_gcs.py index 5fe3242d83839..03131a6bf2da7 100644 --- a/providers/google/src/airflow/providers/google/cloud/transfers/sql_to_gcs.py +++ b/providers/google/src/airflow/providers/google/cloud/transfers/sql_to_gcs.py @@ -295,21 +295,21 @@ def _write_local_data_files(self, cursor): # Proceed to write the row to the localfile if self.export_format == "csv": - row = self.convert_types(schema, col_type_dict, row) + row2 = self.convert_types(schema, col_type_dict, row) if self.null_marker is not None: - row = [value or self.null_marker for value in row] - csv_writer.writerow(row) + row2 = [value or self.null_marker for value in row2] + csv_writer.writerow(row2) elif self.export_format == "parquet": - row = self.convert_types(schema, col_type_dict, row) + row2 = self.convert_types(schema, col_type_dict, row) if self.null_marker is not None: - row = [value or self.null_marker for value in row] - rows_buffer.append(row) + row2 = [value or self.null_marker for value in row2] + rows_buffer.append(row2) if len(rows_buffer) >= self.parquet_row_group_size: self._write_rows_to_parquet(parquet_writer, rows_buffer) rows_buffer = [] else: - row = self.convert_types(schema, col_type_dict, row) - row_dict = dict(zip(schema, row)) + row2 = self.convert_types(schema, col_type_dict, row) + row_dict = dict(zip(schema, row2)) json.dump(row_dict, tmp_file_handle, sort_keys=True, ensure_ascii=False) diff --git a/providers/google/src/airflow/providers/google/suite/transfers/sql_to_sheets.py b/providers/google/src/airflow/providers/google/suite/transfers/sql_to_sheets.py index ed9243a0c900c..c4d408151deed 100644 --- a/providers/google/src/airflow/providers/google/suite/transfers/sql_to_sheets.py +++ b/providers/google/src/airflow/providers/google/suite/transfers/sql_to_sheets.py @@ -87,13 +87,15 @@ def __init__( def _data_prep(self, data): for row in data: item_list = [] - for item in row: - if isinstance(item, (datetime.date, datetime.datetime)): - item = item.isoformat() - elif isinstance(item, int): # To exclude int from the number check. - pass - elif isinstance(item, numbers.Number): - item = float(item) + for item_raw in row: + if isinstance(item_raw, (datetime.date, datetime.datetime)): + item = item_raw.isoformat() + elif isinstance(item_raw, int): # To exclude int from the number check. + item = item_raw + elif isinstance(item_raw, numbers.Number): + item = float(item_raw) + else: + item = item_raw item_list.append(item) yield item_list diff --git a/providers/mysql/src/airflow/providers/mysql/transfers/vertica_to_mysql.py b/providers/mysql/src/airflow/providers/mysql/transfers/vertica_to_mysql.py index 4250a3344271a..9afa4cc535d83 100644 --- a/providers/mysql/src/airflow/providers/mysql/transfers/vertica_to_mysql.py +++ b/providers/mysql/src/airflow/providers/mysql/transfers/vertica_to_mysql.py @@ -145,13 +145,13 @@ def _bulk_load_transfer(self, mysql, vertica): self._run_preoperator(mysql) try: self.log.info("Bulk inserting rows into MySQL...") - with closing(mysql.get_conn()) as conn, closing(conn.cursor()) as cursor: - cursor.execute( + with closing(mysql.get_conn()) as conn2, closing(conn2.cursor()) as cursor2: + cursor2.execute( f"LOAD DATA LOCAL INFILE '{tmpfile.name}' " f"INTO TABLE {self.mysql_table} " f"LINES TERMINATED BY '\r\n' ({', '.join(selected_columns)})" ) - conn.commit() + conn2.commit() self.log.info("Inserted rows into MySQL %s", count) except (MySQLdb.Error, MySQLdb.Warning): self.log.info("Inserted rows into MySQL 0") diff --git a/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py b/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py index 0219088dea733..c7ebf05c3b62b 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py +++ b/providers/openlineage/src/airflow/providers/openlineage/utils/sql.py @@ -232,15 +232,15 @@ def create_filter_clauses( [name.upper() if uppercase_names else name for name in tables] ) if schema: - schema = schema.upper() if uppercase_names else schema + schema_upper = schema.upper() if uppercase_names else schema filter_clause = and_( - information_schema_table.c[table_schema_column_name] == schema, filter_clause + information_schema_table.c[table_schema_column_name] == schema_upper, filter_clause ) schema_level_clauses.append(filter_clause) if db and table_database_column_name: - db = db.upper() if uppercase_names else db + db_upper = db.upper() if uppercase_names else db filter_clause = and_( - information_schema_table.c[table_database_column_name] == db, or_(*schema_level_clauses) + information_schema_table.c[table_database_column_name] == db_upper, or_(*schema_level_clauses) ) filter_clauses.append(filter_clause) else: diff --git a/pyproject.toml b/pyproject.toml index b14a51388d18b..36609be83dd90 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -613,6 +613,9 @@ extend-select = [ "PLW1509", # preexec_fn argument is unsafe when using threads "PLW1510", # subprocess.run without explicit check argument "PLW1641", # Object does not implement __hash__ method + "PLW2101", # Threading lock directly created in with statement has no effect + "PLW2901", # Outer {outer_kind} variable {name} overwritten by inner {inner_kind} target + "PLW3301", # Nested {func} calls can be flattened # Per rule enables "RUF006", # Checks for asyncio dangling task "RUF015", # Checks for unnecessary iterable allocation for first element diff --git a/scripts/ci/prek/check_provider_version_compat.py b/scripts/ci/prek/check_provider_version_compat.py index a443ac7a1e98f..ad111a1cd4714 100755 --- a/scripts/ci/prek/check_provider_version_compat.py +++ b/scripts/ci/prek/check_provider_version_compat.py @@ -49,9 +49,12 @@ def check_and_fix_file(path: Path): f"prek-hook: {path}: Import from wrong provider: {imported_provider} (should be {provider})" ) # auto fix: rewrite the import correctly - line = f"from airflow.providers.{provider}.version_compat import {rest}" + new_lines.append(f"from airflow.providers.{provider}.version_compat import {rest}") changed = True - new_lines.append(line) + else: + new_lines.append(line) + else: + new_lines.append(line) if changed: path.write_text("\n".join(new_lines) + "\n") return not changed diff --git a/scripts/ci/prek/sort_in_the_wild.py b/scripts/ci/prek/sort_in_the_wild.py index 967e7800ffa1a..4c0ff002e49e1 100755 --- a/scripts/ci/prek/sort_in_the_wild.py +++ b/scripts/ci/prek/sort_in_the_wild.py @@ -73,8 +73,10 @@ def stable_sort(x): f"but it starts with {match.group(1)} Replacing the number with 1.\033[0m\n" ) old_line = line - line = "1." + line.split(".", maxsplit=1)[1] - print(f"{old_line.strip()} => {line.strip()}") - companies.append(line) + line_out = "1." + line.split(".", maxsplit=1)[1] + print(f"{old_line.strip()} => {line_out.strip()}") + else: + line_out = line + companies.append(line_out) companies.sort(key=stable_sort) inthewild_path.write_text("".join(header) + "\n" + "".join(companies)) diff --git a/scripts/in_container/run_capture_airflowctl_help.py b/scripts/in_container/run_capture_airflowctl_help.py index 665325e3e413a..2989a615fa2b2 100644 --- a/scripts/in_container/run_capture_airflowctl_help.py +++ b/scripts/in_container/run_capture_airflowctl_help.py @@ -99,8 +99,8 @@ def regenerate_help_images_for_all_airflowctl_commands(commands: list[str], skip # Check for changes changed_commands = [] - for command in commands: - command = command or "main" + for command_raw in commands: + command = command_raw or "main" console.print(f"[bright_blue]Checking command: {command}[/]", end="") if skip_hash_check: diff --git a/task-sdk/src/airflow/sdk/bases/operator.py b/task-sdk/src/airflow/sdk/bases/operator.py index a11b5894b0d60..2cbfbda065a95 100644 --- a/task-sdk/src/airflow/sdk/bases/operator.py +++ b/task-sdk/src/airflow/sdk/bases/operator.py @@ -1257,11 +1257,11 @@ def __deepcopy__(self, memo: dict[int, Any]): shallow_copy = tuple(cls.shallow_copy_attrs) + cls._base_operator_shallow_copy_attrs - for k, v in self.__dict__.items(): + for k, v_org in self.__dict__.items(): if k not in shallow_copy: - v = copy.deepcopy(v, memo) + v = copy.deepcopy(v_org, memo) else: - v = copy.copy(v) + v = copy.copy(v_org) # Bypass any setters, and set it on the object directly. This works since we are cloning ourself so # we know the type is already fine diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index 563957c98029a..dd609e51b559b 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -869,18 +869,18 @@ def context_to_airflow_vars(context: Mapping[str, Any], in_env_var_format: bool ] context_params = settings.get_airflow_context_vars(context) - for key, value in context_params.items(): - if not isinstance(key, str): - raise TypeError(f"key <{key}> must be string") + for key_raw, value in context_params.items(): + if not isinstance(key_raw, str): + raise TypeError(f"key <{key_raw}> must be string") if not isinstance(value, str): - raise TypeError(f"value of key <{key}> must be string, not {type(value)}") + raise TypeError(f"value of key <{key_raw}> must be string, not {type(value)}") - if in_env_var_format: - if not key.startswith(ENV_VAR_FORMAT_PREFIX): - key = ENV_VAR_FORMAT_PREFIX + key.upper() + if in_env_var_format and not key_raw.startswith(ENV_VAR_FORMAT_PREFIX): + key = ENV_VAR_FORMAT_PREFIX + key_raw.upper() + elif not key.startswith(DEFAULT_FORMAT_PREFIX): + key = DEFAULT_FORMAT_PREFIX + key_raw else: - if not key.startswith(DEFAULT_FORMAT_PREFIX): - key = DEFAULT_FORMAT_PREFIX + key + key = key_raw params[key] = value for subject, attr, mapping_key in ops: diff --git a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py index 0a7261ae586af..1615efae917ae 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_task_runner.py +++ b/task-sdk/tests/task_sdk/execution_time/test_task_runner.py @@ -1567,10 +1567,12 @@ def mock_send_side_effect(*args, **kwargs): if not isinstance(map_indexes, Iterable): map_indexes = [map_indexes] - for task_id in task_ids: + for task_id_raw in task_ids: # Without task_ids (or None) expected behavior is to pull with calling task_id - if task_id is None or isinstance(task_id, ArgNotSet): - task_id = test_task_id + task_id = ( + test_task_id if task_id_raw is None or isinstance(task_id_raw, ArgNotSet) else task_id_raw + ) + for map_index in map_indexes: if map_index == NOTSET: mock_supervisor_comms.send.assert_any_call( From 1d8c4d98350f4bc0feb8325639c5ec8ec5b1ac3b Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 1 Nov 2025 22:09:18 +0100 Subject: [PATCH 02/10] Fix follow-up mypy complaints after PLW2901 rule --- airflow-core/src/airflow/models/pool.py | 4 ++-- .../src/airflow/serialization/serialized_objects.py | 1 + .../src/airflow/providers/amazon/aws/hooks/athena.py | 1 + .../airflow/providers/amazon/aws/transfers/sql_to_s3.py | 4 ++-- .../src/airflow/providers/postgres/dialects/postgres.py | 4 ++-- scripts/ci/prek/check_common_compat_lazy_imports.py | 2 +- scripts/in_container/install_airflow_and_providers.py | 8 +++++--- 7 files changed, 14 insertions(+), 10 deletions(-) diff --git a/airflow-core/src/airflow/models/pool.py b/airflow-core/src/airflow/models/pool.py index 5ed15f309ff28..176ccd678642e 100644 --- a/airflow-core/src/airflow/models/pool.py +++ b/airflow-core/src/airflow/models/pool.py @@ -42,7 +42,7 @@ class PoolStats(TypedDict): """Dictionary containing Pool Stats.""" - total: int + total: int | float running: int deferred: int queued: int @@ -221,7 +221,7 @@ def slots_stats( # calculate open metric for pool_name, stats_dict in pools.items(): - stats_dict["open"] = stats_dict["total"] - stats_dict["running"] - stats_dict["queued"] + stats_dict["open"] = int(stats_dict["total"]) - stats_dict["running"] - stats_dict["queued"] if pool_includes_deferred[pool_name]: stats_dict["open"] -= stats_dict["deferred"] diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 1e5dd14429538..fe5499a5bc8f1 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -2574,6 +2574,7 @@ def _deserialize_dag_internal( # Note: Context is passed explicitly through method parameters, no class attributes needed for k_in, v_in in encoded_dag.items(): + v: Any k = k_in if k_in == "_downstream_task_ids": v = set(v_in) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena.py index 1d681dbf701e0..30d72296a7aae 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena.py @@ -41,6 +41,7 @@ def query_params_to_string(params: dict[str, str | Collection[str]]) -> str: result = "" for key, value_org in params.items(): + value: str | Collection[str] if key == "QueryString": value = ( MULTI_LINE_QUERY_LOG_PREFIX diff --git a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py index a2e0fc5e60336..67ae02aaaefeb 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/transfers/sql_to_s3.py @@ -305,10 +305,10 @@ def _partition_dataframe( ) elif isinstance(df, pl.DataFrame): for group_label, group_df_in in df.group_by(**self.groupby_kwargs): # type: ignore[assignment] - group_df = group_df_in.drop(random_column_name) if random_column_name else group_df_in + group_df2 = group_df_in.drop(random_column_name) if random_column_name else group_df_in yield ( cast("str", group_label[0] if isinstance(group_label, tuple) else group_label), - group_df, + group_df2, ) def _get_hook(self) -> DbApiHook: diff --git a/providers/postgres/src/airflow/providers/postgres/dialects/postgres.py b/providers/postgres/src/airflow/providers/postgres/dialects/postgres.py index 7aac7ff9d8f9b..446ea199fa154 100644 --- a/providers/postgres/src/airflow/providers/postgres/dialects/postgres.py +++ b/providers/postgres/src/airflow/providers/postgres/dialects/postgres.py @@ -41,7 +41,7 @@ def get_primary_keys(self, table: str, schema: str | None = None) -> list[str] | """ if schema is None: table, schema = self.extract_schema_from_table(table) - table = self.unescape_word(table) + table = self.unescape_word(table) or table schema = self.unescape_word(schema) if schema else None query = """ select kcu.column_name @@ -75,7 +75,7 @@ def get_column_names( ) -> list[str] | None: if schema is None: table, schema = self.extract_schema_from_table(table) - table = self.unescape_word(table) + table = self.unescape_word(table) or table schema = self.unescape_word(schema) if schema else None query = """ select column_name, diff --git a/scripts/ci/prek/check_common_compat_lazy_imports.py b/scripts/ci/prek/check_common_compat_lazy_imports.py index 6d40eafb1c7d6..72d186fbcaae4 100755 --- a/scripts/ci/prek/check_common_compat_lazy_imports.py +++ b/scripts/ci/prek/check_common_compat_lazy_imports.py @@ -50,7 +50,7 @@ def extract_runtime_maps(py_file: Path) -> tuple[set[str], set[str], set[str]]: for node in tree.body: # Handle both annotated assignments and regular assignments - targets = [] + targets: list[ast.Name | ast.expr] = [] value = None if isinstance(node, ast.AnnAssign) and isinstance(node.target, ast.Name): diff --git a/scripts/in_container/install_airflow_and_providers.py b/scripts/in_container/install_airflow_and_providers.py index 68ca7254f157a..68966eab064ee 100755 --- a/scripts/in_container/install_airflow_and_providers.py +++ b/scripts/in_container/install_airflow_and_providers.py @@ -425,19 +425,21 @@ def find_installation_spec( airflow_ctl_distribution = None airflow_ctl_constraints_location = None compile_ui_assets = False - elif re.match(PR_NUMBER_PATTERN, use_airflow_version) or ( - repo_match := re.match(GITHUB_REPO_BRANCH_PATTERN, use_airflow_version) + elif re.match(PR_NUMBER_PATTERN, use_airflow_version) or re.match( + GITHUB_REPO_BRANCH_PATTERN, use_airflow_version ): # Handle PR number format - resolve to owner/repo:branch first if re.match(PR_NUMBER_PATTERN, use_airflow_version): owner, repo, branch = resolve_pr_number_to_repo_branch(use_airflow_version, github_repository) resolved_version = f"{owner}/{repo}:{branch}" console.print(f"\nInstalling airflow from GitHub PR #{use_airflow_version}: {resolved_version}\n") - else: + elif repo_match := re.match(GITHUB_REPO_BRANCH_PATTERN, use_airflow_version): # Handle owner/repo:branch format owner, repo, branch = repo_match.groups() resolved_version = use_airflow_version console.print(f"\nInstalling airflow from GitHub: {use_airflow_version}\n") + else: + raise ValueError("Invalid format for USE_AIRFLOW_VERSION") # Common logic for both PR number and repo:branch formats vcs_url = f"git+https://github.com/{owner}/{repo}.git@{branch}" From 909d16fe5ac56b4d25d30b01a99ff51b5a03d475 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 2 Nov 2025 14:02:07 +0100 Subject: [PATCH 03/10] Fix tests for pools and infinite size --- airflow-core/src/airflow/models/pool.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/models/pool.py b/airflow-core/src/airflow/models/pool.py index 176ccd678642e..c2325cd75b4d1 100644 --- a/airflow-core/src/airflow/models/pool.py +++ b/airflow-core/src/airflow/models/pool.py @@ -42,11 +42,11 @@ class PoolStats(TypedDict): """Dictionary containing Pool Stats.""" - total: int | float + total: int | float # Note: float("inf") is used to mark infinite slots running: int deferred: int queued: int - open: int + open: int | float # Note: float("inf") is used to mark infinite slots scheduled: int @@ -221,7 +221,7 @@ def slots_stats( # calculate open metric for pool_name, stats_dict in pools.items(): - stats_dict["open"] = int(stats_dict["total"]) - stats_dict["running"] - stats_dict["queued"] + stats_dict["open"] = stats_dict["total"] - stats_dict["running"] - stats_dict["queued"] if pool_includes_deferred[pool_name]: stats_dict["open"] -= stats_dict["deferred"] From 66ba1674acd068dbd6a968d3291d09316cfcdb4f Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 2 Nov 2025 15:01:24 +0100 Subject: [PATCH 04/10] Fix logic in bigquery --- .../providers/google/cloud/hooks/bigquery.py | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py index 86d02ee721982..ebdbbd4714669 100644 --- a/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py +++ b/providers/google/src/airflow/providers/google/cloud/hooks/bigquery.py @@ -1800,27 +1800,28 @@ def _prepare_query_configuration( if cluster_fields: cluster_fields = {"fields": cluster_fields} # type: ignore - query_param_list: list[tuple[Any, str, type | tuple[type]]] = [ - (sql, "query", (str,)), - (priority, "priority", (str,)), - (use_legacy_sql, "useLegacySql", bool), - (query_params, "queryParameters", list), - (udf_config, "userDefinedFunctionResources", list), - (maximum_billing_tier, "maximumBillingTier", int), - (maximum_bytes_billed, "maximumBytesBilled", float), - (time_partitioning, "timePartitioning", dict), - (range_partitioning, "rangePartitioning", dict), - (schema_update_options, "schemaUpdateOptions", list), - (destination_dataset_table, "destinationTable", dict), - (cluster_fields, "clustering", dict), + query_param_list: list[tuple[Any, str, str | bool | None | dict, type | tuple[type]]] = [ + (sql, "query", None, (str,)), + (priority, "priority", priority, (str,)), + (use_legacy_sql, "useLegacySql", self.use_legacy_sql, bool), + (query_params, "queryParameters", None, list), + (udf_config, "userDefinedFunctionResources", None, list), + (maximum_billing_tier, "maximumBillingTier", None, int), + (maximum_bytes_billed, "maximumBytesBilled", None, float), + (time_partitioning, "timePartitioning", {}, dict), + (range_partitioning, "rangePartitioning", {}, dict), + (schema_update_options, "schemaUpdateOptions", None, list), + (destination_dataset_table, "destinationTable", None, dict), + (cluster_fields, "clustering", None, dict), ] - for param_raw, param_name, param_type in query_param_list: + for param_raw, param_name, param_default, param_type in query_param_list: + param: Any if param_name not in configuration["query"] and param_raw in [None, {}, ()]: if param_name == "timePartitioning": param = _cleanse_time_partitioning(destination_dataset_table, time_partitioning) else: - param = param_raw + param = param_default else: param = param_raw From 0c881919086de312316a98101513a5bc613baa92 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 2 Nov 2025 15:03:14 +0100 Subject: [PATCH 05/10] Fix variable renaming --- task-sdk/src/airflow/sdk/execution_time/context.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/task-sdk/src/airflow/sdk/execution_time/context.py b/task-sdk/src/airflow/sdk/execution_time/context.py index dd609e51b559b..e83cc990e4e7d 100644 --- a/task-sdk/src/airflow/sdk/execution_time/context.py +++ b/task-sdk/src/airflow/sdk/execution_time/context.py @@ -877,7 +877,7 @@ def context_to_airflow_vars(context: Mapping[str, Any], in_env_var_format: bool if in_env_var_format and not key_raw.startswith(ENV_VAR_FORMAT_PREFIX): key = ENV_VAR_FORMAT_PREFIX + key_raw.upper() - elif not key.startswith(DEFAULT_FORMAT_PREFIX): + elif not key_raw.startswith(DEFAULT_FORMAT_PREFIX): key = DEFAULT_FORMAT_PREFIX + key_raw else: key = key_raw From 1fc825e40f30f677e82b87979aecf7c3163bddab Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sun, 2 Nov 2025 15:41:56 +0100 Subject: [PATCH 06/10] Fix mypy alert --- airflow-core/src/airflow/jobs/scheduler_job_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index a21d323f1c989..5602663e4f896 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -376,7 +376,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) - self.log.debug("All pools are full!") return [] - max_tis = min(max_tis, pool_slots_free) + max_tis = int(min(max_tis, pool_slots_free)) starved_pools = {pool_name for pool_name, stats in pools.items() if stats["open"] <= 0} From 2b7ff08c858306e8e0804c757a6c39b15352fcec Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Thu, 6 Nov 2025 21:13:57 +0100 Subject: [PATCH 07/10] Review feedback by Wei --- .../core_api/services/public/variables.py | 4 +- .../src/airflow/dag_processing/collection.py | 4 +- .../src/airflow/executors/executor_loader.py | 8 +- airflow-core/src/airflow/models/pool.py | 4 +- .../src/airflow/serialization/serde.py | 34 +++--- .../serialization/serialized_objects.py | 110 ++++++++---------- .../unit/core/test_impersonation_tests.py | 4 +- .../tests/unit/plugins/test_plugin_ignore.py | 4 +- .../providers/amazon/aws/hooks/athena.py | 6 +- .../google/suite/transfers/sql_to_sheets.py | 17 ++- 10 files changed, 95 insertions(+), 100 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py b/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py index 5193863520e31..a5011768cb0aa 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/public/variables.py @@ -148,11 +148,11 @@ def handle_bulk_update(self, action: BulkUpdateAction, results: BulkActionRespon for variable in action.entities: if variable.key not in update_keys: continue - variable_py = update_orm_from_pydantic( + updated_variable = update_orm_from_pydantic( variable.key, variable, action.update_mask, self.session ) - results.success.append(variable_py.key) + results.success.append(updated_variable.key) except HTTPException as e: results.errors.append({"error": f"{e.detail}", "status_code": e.status_code}) diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index 371420f44d2ff..91bff694a7295 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -637,8 +637,8 @@ def _get_dag_assets( inlets: bool = True, outlets: bool = True, ) -> Iterable[tuple[str, AssetT]]: - for task_raw in dag.data["dag"]["tasks"]: - task = task_raw[Encoding.VAR] + for raw_task in dag.data["dag"]["tasks"]: + task = raw_task[Encoding.VAR] ports = _get_task_ports(task["partial_kwargs"] if task.get("_is_mapped") else task, inlets, outlets) for port in ports: if isinstance(obj := BaseSerialization.deserialize(port), of): diff --git a/airflow-core/src/airflow/executors/executor_loader.py b/airflow-core/src/airflow/executors/executor_loader.py index bfa585ffb7c42..834ea58d15292 100644 --- a/airflow-core/src/airflow/executors/executor_loader.py +++ b/airflow-core/src/airflow/executors/executor_loader.py @@ -77,8 +77,8 @@ def _get_executor_names(cls) -> list[ExecutorName]: executor_names = [] for team_name, executor_names_config in all_executor_names: executor_names_per_team = [] - for name_raw in executor_names_config: - if len(split_name := name_raw.split(":")) == 1: + for executor_name in executor_names_config: + if len(split_name := executor_name.split(":")) == 1: name = split_name[0] # Check if this is an alias for a core airflow executor, module # paths won't be provided by the user in that case. @@ -109,7 +109,9 @@ def _get_executor_names(cls) -> list[ExecutorName]: ExecutorName(alias=split_name[0], module_path=split_name[1], team_name=team_name) ) else: - raise AirflowConfigException(f"Incorrectly formatted executor configuration: {name_raw}") + raise AirflowConfigException( + f"Incorrectly formatted executor configuration: {executor_name}" + ) # As of now, we do not allow duplicate executors (within teams). # Add all module paths to a set, since the actual code is what is unique diff --git a/airflow-core/src/airflow/models/pool.py b/airflow-core/src/airflow/models/pool.py index c2325cd75b4d1..b2f546a91daa9 100644 --- a/airflow-core/src/airflow/models/pool.py +++ b/airflow-core/src/airflow/models/pool.py @@ -200,9 +200,9 @@ def slots_stats( ) # calculate queued and running metrics - for pool_name, state, count_raw in state_count_by_pool: + for pool_name, state, decimal_count in state_count_by_pool: # Some databases return decimal.Decimal here. - count = int(count_raw) + count = int(decimal_count) stats_dict: PoolStats | None = pools.get(pool_name) if not stats_dict: diff --git a/airflow-core/src/airflow/serialization/serde.py b/airflow-core/src/airflow/serialization/serde.py index 9d05ebe45830e..1820ba932c7eb 100644 --- a/airflow-core/src/airflow/serialization/serde.py +++ b/airflow-core/src/airflow/serialization/serde.py @@ -366,26 +366,28 @@ def _register(): with Stats.timer("serde.load_serializers") as timer: for _, module_name, _ in iter_namespace(airflow.serialization.serializers): - name = import_module(module_name) - for attr_s in getattr(name, "serializers", ()): - s = attr_s if isinstance(attr_s, str) else qualname(attr_s) - if s in _serializers and _serializers[s] != name: - raise AttributeError(f"duplicate {s} for serialization in {name} and {_serializers[s]}") + module = import_module(module_name) + for serializers in getattr(module, "serializers", ()): + s = serializers if isinstance(serializers, str) else qualname(serializers) + if s in _serializers and _serializers[s] != module: + raise AttributeError(f"duplicate {s} for serialization in {module} and {_serializers[s]}") log.debug("registering %s for serialization", s) - _serializers[s] = name - for attr_d in getattr(name, "deserializers", ()): - d = attr_d if isinstance(attr_d, str) else qualname(attr_d) - if d in _deserializers and _deserializers[d] != name: - raise AttributeError(f"duplicate {d} for deserialization in {name} and {_serializers[d]}") + _serializers[s] = module + for deserializers in getattr(module, "deserializers", ()): + d = deserializers if isinstance(deserializers, str) else qualname(deserializers) + if d in _deserializers and _deserializers[d] != module: + raise AttributeError( + f"duplicate {d} for deserialization in {module} and {_deserializers[d]}" + ) log.debug("registering %s for deserialization", d) - _deserializers[d] = name + _deserializers[d] = module _extra_allowed.add(d) - for attr_c in getattr(name, "stringifiers", ()): - c = attr_c if isinstance(attr_c, str) else qualname(attr_c) - if c in _deserializers and _deserializers[c] != name: - raise AttributeError(f"duplicate {c} for stringifiers in {name} and {_stringifiers[c]}") + for stringifiers in getattr(module, "stringifiers", ()): + c = stringifiers if isinstance(stringifiers, str) else qualname(stringifiers) + if c in _deserializers and _deserializers[c] != module: + raise AttributeError(f"duplicate {c} for stringifiers in {module} and {_stringifiers[c]}") log.debug("registering %s for stringifying", c) - _stringifiers[c] = name + _stringifiers[c] = module log.debug("loading serializers took %.3f seconds", timer.duration) diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index fe5499a5bc8f1..5b87f8311fa01 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -1589,59 +1589,53 @@ def populate_operator( deserialized_partial_kwarg_defaults = {} for k_in, v_in in encoded_op.items(): + k = k_in # surpass PLW2901 + v = v_in # surpass PLW2901 # Use centralized field deserialization logic - if k_in in encoded_op.get("template_fields", []): - # Template fields are handled separately - k = k_in - v = v_in - elif k_in == "_operator_extra_links": + if k in encoded_op.get("template_fields", []): + pass # Template fields are handled separately + elif k == "_operator_extra_links": if cls._load_operator_extra_links: - op_predefined_extra_links = cls._deserialize_operator_extra_links(v_in) + op_predefined_extra_links = cls._deserialize_operator_extra_links(v) # If OperatorLinks with the same name exists, Links via Plugin have higher precedence op_predefined_extra_links.update(op_extra_links_from_plugin) else: op_predefined_extra_links = {} - k = "operator_extra_links" v = list(op_predefined_extra_links.values()) - elif k_in == "params": - k = k_in - v = cls._deserialize_params_dict(v_in) - elif k_in == "partial_kwargs": + k = "operator_extra_links" + + elif k == "params": + v = cls._deserialize_params_dict(v) + elif k == "partial_kwargs": # Use unified deserializer that supports both encoded and non-encoded values - k = k_in - v = cls._deserialize_partial_kwargs(v_in, client_defaults) - elif k_in in {"expand_input", "op_kwargs_expand_input"}: - k = k_in - v = _ExpandInputRef(v_in["type"], cls.deserialize(v_in["value"])) - elif k_in == "operator_class": - k = k_in - v = {k_: cls.deserialize(v_) for k_, v_ in v_in.items()} - elif k_in == "_is_sensor": + v = cls._deserialize_partial_kwargs(v, client_defaults) + elif k in {"expand_input", "op_kwargs_expand_input"}: + v = _ExpandInputRef(v["type"], cls.deserialize(v["value"])) + elif k == "operator_class": + v = {k_: cls.deserialize(v_) for k_, v_ in v.items()} + elif k == "_is_sensor": from airflow.ti_deps.deps.ready_to_reschedule import ReadyToRescheduleDep - if v_in is False: + if v is False: raise RuntimeError("_is_sensor=False should never have been serialized!") object.__setattr__(op, "deps", op.deps | {ReadyToRescheduleDep()}) continue elif ( - k_in in cls._decorated_fields - or k_in not in op.get_serialized_fields() - or k_in in ("outlets", "inlets") + k in cls._decorated_fields + or k not in op.get_serialized_fields() + or k in ("outlets", "inlets") ): - k = k_in - v = cls.deserialize(v_in) - elif k_in == "_on_failure_fail_dagrun": + v = cls.deserialize(v) + elif k == "_on_failure_fail_dagrun": k = "on_failure_fail_dagrun" - v = v_in - elif k_in == "weight_rule": + elif k == "weight_rule": k = "_weight_rule" - v = decode_priority_weight_strategy(v_in) + v = decode_priority_weight_strategy(v) else: # Apply centralized deserialization for all other fields - k = k_in - v = cls._deserialize_field_value(k, v_in) + v = cls._deserialize_field_value(k, v) # Handle field differences between SerializedBaseOperator and MappedOperator # Fields that exist in SerializedBaseOperator but not in MappedOperator need to go to partial_kwargs @@ -2574,14 +2568,14 @@ def _deserialize_dag_internal( # Note: Context is passed explicitly through method parameters, no class attributes needed for k_in, v_in in encoded_dag.items(): - v: Any - k = k_in - if k_in == "_downstream_task_ids": - v = set(v_in) - elif k_in == "tasks": + k = k_in # surpass PLW2901 + v = v_in # surpass PLW2901 + if k == "_downstream_task_ids": + v = set(v) + elif k == "tasks": SerializedBaseOperator._load_operator_extra_links = cls._load_operator_extra_links tasks = {} - for obj in v_in: + for obj in v: if obj.get(Encoding.TYPE) == DAT.OP: deser = SerializedBaseOperator.deserialize_operator( obj[Encoding.VAR], client_defaults @@ -2589,27 +2583,26 @@ def _deserialize_dag_internal( tasks[deser.task_id] = deser k = "task_dict" v = tasks - elif k_in == "timezone": - v = cls._deserialize_timezone(v_in) - elif k_in == "dagrun_timeout": - v = cls._deserialize_timedelta(v_in) - elif k_in.endswith("_date"): - v = cls._deserialize_datetime(v_in) - elif k_in == "edge_info": + elif k == "timezone": + v = cls._deserialize_timezone(v) + elif k == "dagrun_timeout": + v = cls._deserialize_timedelta(v) + elif k.endswith("_date"): + v = cls._deserialize_datetime(v) + elif k == "edge_info": # Value structure matches exactly - v = v_in - elif k_in == "timetable": - v = decode_timetable(v_in) - elif k_in == "weight_rule": - v = decode_priority_weight_strategy(v_in) - elif k_in in cls._decorated_fields: - v = cls.deserialize(v_in) - elif k_in == "params": - v = cls._deserialize_params_dict(v_in) - elif k_in == "tags": - v = set(v_in) - else: - v = v_in + pass + elif k == "timetable": + v = decode_timetable(v) + elif k == "weight_rule": + v = decode_priority_weight_strategy(v) + elif k in cls._decorated_fields: + v = cls.deserialize(v) + elif k == "params": + v = cls._deserialize_params_dict(v) + elif k == "tags": + v = set(v) + # else use v as it is object.__setattr__(dag, k, v) @@ -3326,7 +3319,6 @@ def create_dagrun( deadline_time=deadline_time, callback=deadline.callback, dagrun_id=orm_dagrun.id, - dag_id=orm_dagrun.dag_id, ) ) Stats.incr("deadline_alerts.deadline_created", tags={"dag_id": self.dag_id}) diff --git a/airflow-core/tests/unit/core/test_impersonation_tests.py b/airflow-core/tests/unit/core/test_impersonation_tests.py index fd10074059021..8165d1f6d73f6 100644 --- a/airflow-core/tests/unit/core/test_impersonation_tests.py +++ b/airflow-core/tests/unit/core/test_impersonation_tests.py @@ -58,8 +58,8 @@ def set_permissions(settings: dict[Path | str, int]): orig_permissions = [] try: print(" Change file/directory permissions ".center(72, "+")) - for path_raw, mode in settings.items(): - path = Path(path_raw) if isinstance(path_raw, str) else path_raw + for raw_path, mode in settings.items(): + path = Path(raw_path) if isinstance(raw_path, str) else raw_path if len(path.parts) <= 1: raise SystemError(f"Unable to change permission for the root directory: {path}.") diff --git a/airflow-core/tests/unit/plugins/test_plugin_ignore.py b/airflow-core/tests/unit/plugins/test_plugin_ignore.py index 1f93a96abb6b3..02e9beaa69230 100644 --- a/airflow-core/tests/unit/plugins/test_plugin_ignore.py +++ b/airflow-core/tests/unit/plugins/test_plugin_ignore.py @@ -77,8 +77,8 @@ def test_find_not_should_ignore_path_regexp(self, tmp_path): "test_load_sub1.py", } ignore_list_file = ".airflowignore" - for file_path_raw in find_path_from_directory(plugin_folder_path, ignore_list_file, "regexp"): - file_path = Path(file_path_raw) + for raw_file_path in find_path_from_directory(plugin_folder_path, ignore_list_file, "regexp"): + file_path = Path(raw_file_path) if file_path.is_file() and file_path.suffix == ".py": detected_files.add(file_path.name) assert detected_files == should_not_ignore_files diff --git a/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena.py b/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena.py index 30d72296a7aae..7c82f59d1c1d8 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/hooks/athena.py @@ -40,15 +40,15 @@ def query_params_to_string(params: dict[str, str | Collection[str]]) -> str: result = "" - for key, value_org in params.items(): + for key, original_value in params.items(): value: str | Collection[str] if key == "QueryString": value = ( MULTI_LINE_QUERY_LOG_PREFIX - + str(value_org).replace("\n", MULTI_LINE_QUERY_LOG_PREFIX).rstrip() + + str(original_value).replace("\n", MULTI_LINE_QUERY_LOG_PREFIX).rstrip() ) else: - value = value_org + value = original_value result += f"\t{key}: {value}\n" return result.rstrip() diff --git a/providers/google/src/airflow/providers/google/suite/transfers/sql_to_sheets.py b/providers/google/src/airflow/providers/google/suite/transfers/sql_to_sheets.py index c4d408151deed..806b71aaed910 100644 --- a/providers/google/src/airflow/providers/google/suite/transfers/sql_to_sheets.py +++ b/providers/google/src/airflow/providers/google/suite/transfers/sql_to_sheets.py @@ -87,16 +87,15 @@ def __init__( def _data_prep(self, data): for row in data: item_list = [] - for item_raw in row: - if isinstance(item_raw, (datetime.date, datetime.datetime)): - item = item_raw.isoformat() - elif isinstance(item_raw, int): # To exclude int from the number check. - item = item_raw - elif isinstance(item_raw, numbers.Number): - item = float(item_raw) + for item in row: + if isinstance(item, (datetime.date, datetime.datetime)): + item_list.append(item.isoformat()) + elif isinstance(item, int): # To exclude int from the number check. + item_list.append(item) + elif isinstance(item, numbers.Number): + item_list.append(float(item)) else: - item = item_raw - item_list.append(item) + item_list.append(item) yield item_list def _get_data(self): From fdcef6b3b3dc30aa0aeccdc6ebc545abeb50b572 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Thu, 6 Nov 2025 21:21:48 +0100 Subject: [PATCH 08/10] Fix mypy --- airflow-core/src/airflow/executors/executor_loader.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/executors/executor_loader.py b/airflow-core/src/airflow/executors/executor_loader.py index 834ea58d15292..0ed307080b1e9 100644 --- a/airflow-core/src/airflow/executors/executor_loader.py +++ b/airflow-core/src/airflow/executors/executor_loader.py @@ -74,11 +74,11 @@ def _get_executor_names(cls) -> list[ExecutorName]: all_executor_names: list[tuple[str | None, list[str]]] = cls._get_team_executor_configs() - executor_names = [] + executor_names: list[ExecutorName] = [] for team_name, executor_names_config in all_executor_names: executor_names_per_team = [] - for executor_name in executor_names_config: - if len(split_name := executor_name.split(":")) == 1: + for executor_name_str in executor_names_config: + if len(split_name := executor_name_str.split(":")) == 1: name = split_name[0] # Check if this is an alias for a core airflow executor, module # paths won't be provided by the user in that case. @@ -110,7 +110,7 @@ def _get_executor_names(cls) -> list[ExecutorName]: ) else: raise AirflowConfigException( - f"Incorrectly formatted executor configuration: {executor_name}" + f"Incorrectly formatted executor configuration: {executor_name_str}" ) # As of now, we do not allow duplicate executors (within teams). From 5c0e7ef6faf6dd669f99be768309b5a88539cc24 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Thu, 6 Nov 2025 21:35:51 +0100 Subject: [PATCH 09/10] Revert accidentially deleted line --- airflow-core/src/airflow/serialization/serialized_objects.py | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow-core/src/airflow/serialization/serialized_objects.py b/airflow-core/src/airflow/serialization/serialized_objects.py index 5b87f8311fa01..acb05228beffc 100644 --- a/airflow-core/src/airflow/serialization/serialized_objects.py +++ b/airflow-core/src/airflow/serialization/serialized_objects.py @@ -3319,6 +3319,7 @@ def create_dagrun( deadline_time=deadline_time, callback=deadline.callback, dagrun_id=orm_dagrun.id, + dag_id=orm_dagrun.dag_id, ) ) Stats.incr("deadline_alerts.deadline_created", tags={"dag_id": self.dag_id}) From 0eeea4212b8bb6284d0c7175261ee0c80879e09d Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 8 Nov 2025 12:23:16 +0100 Subject: [PATCH 10/10] Second pass review suggestions by Wei --- .../src/airflow/serialization/serde.py | 36 ++++++++++--------- .../tests/unit/plugins/test_plugin_ignore.py | 4 +-- 2 files changed, 22 insertions(+), 18 deletions(-) diff --git a/airflow-core/src/airflow/serialization/serde.py b/airflow-core/src/airflow/serialization/serde.py index 1820ba932c7eb..d4e86c25f2566 100644 --- a/airflow-core/src/airflow/serialization/serde.py +++ b/airflow-core/src/airflow/serialization/serde.py @@ -368,26 +368,30 @@ def _register(): for _, module_name, _ in iter_namespace(airflow.serialization.serializers): module = import_module(module_name) for serializers in getattr(module, "serializers", ()): - s = serializers if isinstance(serializers, str) else qualname(serializers) - if s in _serializers and _serializers[s] != module: - raise AttributeError(f"duplicate {s} for serialization in {module} and {_serializers[s]}") - log.debug("registering %s for serialization", s) - _serializers[s] = module + s_qualname = serializers if isinstance(serializers, str) else qualname(serializers) + if s_qualname in _serializers and _serializers[s_qualname] != module: + raise AttributeError( + f"duplicate {s_qualname} for serialization in {module} and {_serializers[s_qualname]}" + ) + log.debug("registering %s for serialization", s_qualname) + _serializers[s_qualname] = module for deserializers in getattr(module, "deserializers", ()): - d = deserializers if isinstance(deserializers, str) else qualname(deserializers) - if d in _deserializers and _deserializers[d] != module: + d_qualname = deserializers if isinstance(deserializers, str) else qualname(deserializers) + if d_qualname in _deserializers and _deserializers[d_qualname] != module: raise AttributeError( - f"duplicate {d} for deserialization in {module} and {_deserializers[d]}" + f"duplicate {d_qualname} for deserialization in {module} and {_deserializers[d_qualname]}" ) - log.debug("registering %s for deserialization", d) - _deserializers[d] = module - _extra_allowed.add(d) + log.debug("registering %s for deserialization", d_qualname) + _deserializers[d_qualname] = module + _extra_allowed.add(d_qualname) for stringifiers in getattr(module, "stringifiers", ()): - c = stringifiers if isinstance(stringifiers, str) else qualname(stringifiers) - if c in _deserializers and _deserializers[c] != module: - raise AttributeError(f"duplicate {c} for stringifiers in {module} and {_stringifiers[c]}") - log.debug("registering %s for stringifying", c) - _stringifiers[c] = module + c_qualname = stringifiers if isinstance(stringifiers, str) else qualname(stringifiers) + if c_qualname in _deserializers and _deserializers[c_qualname] != module: + raise AttributeError( + f"duplicate {c_qualname} for stringifiers in {module} and {_stringifiers[c_qualname]}" + ) + log.debug("registering %s for stringifying", c_qualname) + _stringifiers[c_qualname] = module log.debug("loading serializers took %.3f seconds", timer.duration) diff --git a/airflow-core/tests/unit/plugins/test_plugin_ignore.py b/airflow-core/tests/unit/plugins/test_plugin_ignore.py index 02e9beaa69230..8d8bd397b77cb 100644 --- a/airflow-core/tests/unit/plugins/test_plugin_ignore.py +++ b/airflow-core/tests/unit/plugins/test_plugin_ignore.py @@ -104,8 +104,8 @@ def test_find_not_should_ignore_path_glob(self, tmp_path): } ignore_list_file = ".airflowignore_glob" print("-" * 20) - for file_path_raw in find_path_from_directory(plugin_folder_path, ignore_list_file, "glob"): - file_path = Path(file_path_raw) + for raw_file_path in find_path_from_directory(plugin_folder_path, ignore_list_file, "glob"): + file_path = Path(raw_file_path) if file_path.is_file() and file_path.suffix == ".py": detected_files.add(file_path.name) print(file_path)