Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def patch_dag_run(

data = patch_body.model_dump(include=fields_to_update, by_alias=True)

for attr_name, attr_value in data.items():
for attr_name, attr_value_raw in data.items():
if attr_name == "state":
attr_value = getattr(patch_body, "state")
if attr_value == DAGRunPatchStates.SUCCESS:
Expand All @@ -208,9 +208,9 @@ def patch_dag_run(
elif attr_name == "note":
updated_dag_run = session.get(DagRun, dag_run.id)
if updated_dag_run and updated_dag_run.dag_run_note is None:
updated_dag_run.note = (attr_value, user.get_id())
updated_dag_run.note = (attr_value_raw, user.get_id())
elif updated_dag_run:
updated_dag_run.dag_run_note.content = attr_value
updated_dag_run.dag_run_note.content = attr_value_raw
updated_dag_run.dag_run_note.user_id = user.get_id()

final_dag_run = session.get(DagRun, dag_run.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ def handle_bulk_update(self, action: BulkUpdateAction[PoolBody], results: BulkAc
if pool.pool not in update_pool_names:
continue

pool = update_orm_from_pydantic(pool.pool, pool, action.update_mask, self.session)
updated_pool = update_orm_from_pydantic(pool.pool, pool, action.update_mask, self.session)

results.success.append(str(pool.pool)) # use request field, always consistent
results.success.append(str(updated_pool.pool)) # use request field, always consistent

except HTTPException as e:
results.errors.append({"error": f"{e.detail}", "status_code": e.status_code})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,11 @@ def handle_bulk_update(self, action: BulkUpdateAction, results: BulkActionRespon
for variable in action.entities:
if variable.key not in update_keys:
continue
variable = update_orm_from_pydantic(variable.key, variable, action.update_mask, self.session)
updated_variable = update_orm_from_pydantic(
variable.key, variable, action.update_mask, self.session
)

results.success.append(variable.key)
results.success.append(updated_variable.key)

except HTTPException as e:
results.errors.append({"error": f"{e.detail}", "status_code": e.status_code})
Expand Down
4 changes: 2 additions & 2 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,8 +637,8 @@ def _get_dag_assets(
inlets: bool = True,
outlets: bool = True,
) -> Iterable[tuple[str, AssetT]]:
for task in dag.data["dag"]["tasks"]:
task = task[Encoding.VAR]
for raw_task in dag.data["dag"]["tasks"]:
task = raw_task[Encoding.VAR]
ports = _get_task_ports(task["partial_kwargs"] if task.get("_is_mapped") else task, inlets, outlets)
for port in ports:
if isinstance(obj := BaseSerialization.deserialize(port), of):
Expand Down
10 changes: 6 additions & 4 deletions airflow-core/src/airflow/executors/executor_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ def _get_executor_names(cls) -> list[ExecutorName]:

all_executor_names: list[tuple[str | None, list[str]]] = cls._get_team_executor_configs()

executor_names = []
executor_names: list[ExecutorName] = []
for team_name, executor_names_config in all_executor_names:
executor_names_per_team = []
for name in executor_names_config:
if len(split_name := name.split(":")) == 1:
for executor_name_str in executor_names_config:
if len(split_name := executor_name_str.split(":")) == 1:
name = split_name[0]
# Check if this is an alias for a core airflow executor, module
# paths won't be provided by the user in that case.
Expand Down Expand Up @@ -109,7 +109,9 @@ def _get_executor_names(cls) -> list[ExecutorName]:
ExecutorName(alias=split_name[0], module_path=split_name[1], team_name=team_name)
)
else:
raise AirflowConfigException(f"Incorrectly formatted executor configuration: {name}")
raise AirflowConfigException(
f"Incorrectly formatted executor configuration: {executor_name_str}"
)

# As of now, we do not allow duplicate executors (within teams).
# Add all module paths to a set, since the actual code is what is unique
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ def _executable_task_instances_to_queued(self, max_tis: int, session: Session) -
self.log.debug("All pools are full!")
return []

max_tis = min(max_tis, pool_slots_free)
max_tis = int(min(max_tis, pool_slots_free))

starved_pools = {pool_name for pool_name, stats in pools.items() if stats["open"] <= 0}

Expand Down
13 changes: 6 additions & 7 deletions airflow-core/src/airflow/models/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
class PoolStats(TypedDict):
"""Dictionary containing Pool Stats."""

total: int
total: int | float # Note: float("inf") is used to mark infinite slots
running: int
deferred: int
queued: int
open: int
open: int | float # Note: float("inf") is used to mark infinite slots
scheduled: int


Expand Down Expand Up @@ -182,9 +182,8 @@ def slots_stats(
query = with_row_locks(query, session=session, nowait=True)

pool_rows = session.execute(query)
for pool_name, total_slots, include_deferred in pool_rows:
if total_slots == -1:
total_slots = float("inf")
for pool_name, total_slots_in, include_deferred in pool_rows:
total_slots = float("inf") if total_slots_in == -1 else total_slots_in
pools[pool_name] = PoolStats(
total=total_slots, running=0, queued=0, open=0, deferred=0, scheduled=0
)
Expand All @@ -201,9 +200,9 @@ def slots_stats(
)

# calculate queued and running metrics
for pool_name, state, count in state_count_by_pool:
for pool_name, state, decimal_count in state_count_by_pool:
# Some databases return decimal.Decimal here.
count = int(count)
count = int(decimal_count)

stats_dict: PoolStats | None = pools.get(pool_name)
if not stats_dict:
Expand Down
51 changes: 27 additions & 24 deletions airflow-core/src/airflow/serialization/serde.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,30 +365,33 @@ def _register():
_stringifiers.clear()

with Stats.timer("serde.load_serializers") as timer:
for _, name, _ in iter_namespace(airflow.serialization.serializers):
name = import_module(name)
for s in getattr(name, "serializers", ()):
if not isinstance(s, str):
s = qualname(s)
if s in _serializers and _serializers[s] != name:
raise AttributeError(f"duplicate {s} for serialization in {name} and {_serializers[s]}")
log.debug("registering %s for serialization", s)
_serializers[s] = name
for d in getattr(name, "deserializers", ()):
if not isinstance(d, str):
d = qualname(d)
if d in _deserializers and _deserializers[d] != name:
raise AttributeError(f"duplicate {d} for deserialization in {name} and {_serializers[d]}")
log.debug("registering %s for deserialization", d)
_deserializers[d] = name
_extra_allowed.add(d)
for c in getattr(name, "stringifiers", ()):
if not isinstance(c, str):
c = qualname(c)
if c in _deserializers and _deserializers[c] != name:
raise AttributeError(f"duplicate {c} for stringifiers in {name} and {_stringifiers[c]}")
log.debug("registering %s for stringifying", c)
_stringifiers[c] = name
for _, module_name, _ in iter_namespace(airflow.serialization.serializers):
module = import_module(module_name)
for serializers in getattr(module, "serializers", ()):
s_qualname = serializers if isinstance(serializers, str) else qualname(serializers)
if s_qualname in _serializers and _serializers[s_qualname] != module:
raise AttributeError(
f"duplicate {s_qualname} for serialization in {module} and {_serializers[s_qualname]}"
)
log.debug("registering %s for serialization", s_qualname)
_serializers[s_qualname] = module
for deserializers in getattr(module, "deserializers", ()):
d_qualname = deserializers if isinstance(deserializers, str) else qualname(deserializers)
if d_qualname in _deserializers and _deserializers[d_qualname] != module:
raise AttributeError(
f"duplicate {d_qualname} for deserialization in {module} and {_deserializers[d_qualname]}"
)
log.debug("registering %s for deserialization", d_qualname)
_deserializers[d_qualname] = module
_extra_allowed.add(d_qualname)
for stringifiers in getattr(module, "stringifiers", ()):
c_qualname = stringifiers if isinstance(stringifiers, str) else qualname(stringifiers)
if c_qualname in _deserializers and _deserializers[c_qualname] != module:
raise AttributeError(
f"duplicate {c_qualname} for stringifiers in {module} and {_stringifiers[c_qualname]}"
)
log.debug("registering %s for stringifying", c_qualname)
_stringifiers[c_qualname] = module

log.debug("loading serializers took %.3f seconds", timer.duration)

Expand Down
15 changes: 9 additions & 6 deletions airflow-core/src/airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1069,10 +1069,9 @@ def is_serialized(val):
def _serialize_params_dict(cls, params: ParamsDict | dict) -> list[tuple[str, dict]]:
"""Serialize Params dict for a DAG or task as a list of tuples to ensure ordering."""
serialized_params = []
for k, v in params.items():
if isinstance(params, ParamsDict):
# Use native param object, not resolved value if possible
v = params.get_param(k)
for k, raw_v in params.items():
# Use native param object, not resolved value if possible
v = params.get_param(k) if isinstance(params, ParamsDict) else raw_v
try:
class_identity = f"{v.__module__}.{v.__class__.__name__}"
except AttributeError:
Expand Down Expand Up @@ -1589,7 +1588,9 @@ def populate_operator(

deserialized_partial_kwarg_defaults = {}

for k, v in encoded_op.items():
for k_in, v_in in encoded_op.items():
k = k_in # surpass PLW2901
v = v_in # surpass PLW2901
# Use centralized field deserialization logic
if k in encoded_op.get("template_fields", []):
pass # Template fields are handled separately
Expand Down Expand Up @@ -2566,7 +2567,9 @@ def _deserialize_dag_internal(

# Note: Context is passed explicitly through method parameters, no class attributes needed

for k, v in encoded_dag.items():
for k_in, v_in in encoded_dag.items():
k = k_in # surpass PLW2901
v = v_in # surpass PLW2901
if k == "_downstream_task_ids":
v = set(v)
elif k == "tasks":
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ def setup_method(self):

def test_cli_connections_list_as_json(self):
args = self.parser.parse_args(["connections", "list", "--output", "json"])
with redirect_stdout(StringIO()) as stdout:
with redirect_stdout(StringIO()) as stdout_io:
connection_command.connections_list(args)
print(stdout.getvalue())
stdout = stdout.getvalue()
print(stdout_io.getvalue())
stdout = stdout_io.getvalue()
for conn_id, conn_type in self.EXPECTED_CONS:
assert conn_type in stdout
assert conn_id in stdout
Expand Down
22 changes: 12 additions & 10 deletions airflow-core/tests/unit/cli/test_cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,19 +288,19 @@ def test_commands_and_command_group_sections(self):
with contextlib.redirect_stdout(StringIO()) as stdout:
with pytest.raises(SystemExit):
parser.parse_args(["--help"])
stdout = stdout.getvalue()
assert "Commands" in stdout
assert "Groups" in stdout
stdout_val = stdout.getvalue()
assert "Commands" in stdout_val
assert "Groups" in stdout_val

def test_dag_parser_commands_and_comamnd_group_sections(self):
parser = cli_parser.get_parser(dag_parser=True)

with contextlib.redirect_stdout(StringIO()) as stdout:
with pytest.raises(SystemExit):
parser.parse_args(["--help"])
stdout = stdout.getvalue()
assert "Commands" in stdout
assert "Groups" in stdout
stdout_val = stdout.getvalue()
assert "Commands" in stdout_val
assert "Groups" in stdout_val

def test_should_display_help(self):
parser = cli_parser.get_parser()
Expand Down Expand Up @@ -384,8 +384,10 @@ def test_executor_specific_commands_not_accessible(self, command):
parser = cli_parser.get_parser()
with pytest.raises(SystemExit):
parser.parse_args([command])
stderr = stderr.getvalue()
assert (f"airflow command error: argument GROUP_OR_COMMAND: invalid choice: '{command}'") in stderr
stderr_val = stderr.getvalue()
assert (
f"airflow command error: argument GROUP_OR_COMMAND: invalid choice: '{command}'"
) in stderr_val

@pytest.mark.parametrize(
("executor", "expected_args"),
Expand All @@ -412,8 +414,8 @@ def test_cli_parser_executors(self, executor, expected_args):
with pytest.raises(SystemExit) as e: # running the help command exits, so we prevent that
parser.parse_args([expected_arg, "--help"])
assert e.value.code == 0, stderr.getvalue() # return code 0 == no problem
stderr = stderr.getvalue()
assert "airflow command error" not in stderr
stderr_val = stderr.getvalue()
assert "airflow command error" not in stderr_val

def test_non_existing_directory_raises_when_metavar_is_dir_for_db_export_cleaned(self):
"""Test that the error message is correct when the directory does not exist."""
Expand Down
5 changes: 2 additions & 3 deletions airflow-core/tests/unit/core/test_impersonation_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ def set_permissions(settings: dict[Path | str, int]):
orig_permissions = []
try:
print(" Change file/directory permissions ".center(72, "+"))
for path, mode in settings.items():
if isinstance(path, str):
path = Path(path)
for raw_path, mode in settings.items():
path = Path(raw_path) if isinstance(raw_path, str) else raw_path
if len(path.parts) <= 1:
raise SystemError(f"Unable to change permission for the root directory: {path}.")

Expand Down
4 changes: 2 additions & 2 deletions airflow-core/tests/unit/models/test_cleartasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,8 +659,8 @@ def _get_ti(old_ti):

SerializedDAG.clear_dags(dags, only_failed=True)

for ti in tis:
ti = _get_ti(ti)
for ti_in in tis:
ti = _get_ti(ti_in)
if ti.dag_id == ti_fail.dag_id:
assert ti.state == State.NONE
assert ti.try_number == 2
Expand Down
6 changes: 3 additions & 3 deletions airflow-core/tests/unit/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -2175,9 +2175,9 @@ def do_something_else(i):
ti.map_index = 0
task = ti.task
for map_index in range(1, 5):
ti = TI(task, run_id=dr.run_id, map_index=map_index, dag_version_id=ti.dag_version_id)
session.add(ti)
ti.dag_run = dr
ti_new = TI(task, run_id=dr.run_id, map_index=map_index, dag_version_id=ti.dag_version_id)
session.add(ti_new)
ti_new.dag_run = dr
else:
# run tasks "do_something" to get XCOMs for correct downstream length
ti.run()
Expand Down
8 changes: 4 additions & 4 deletions airflow-core/tests/unit/plugins/test_plugin_ignore.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def test_find_not_should_ignore_path_regexp(self, tmp_path):
"test_load_sub1.py",
}
ignore_list_file = ".airflowignore"
for file_path in find_path_from_directory(plugin_folder_path, ignore_list_file, "regexp"):
file_path = Path(file_path)
for raw_file_path in find_path_from_directory(plugin_folder_path, ignore_list_file, "regexp"):
file_path = Path(raw_file_path)
if file_path.is_file() and file_path.suffix == ".py":
detected_files.add(file_path.name)
assert detected_files == should_not_ignore_files
Expand All @@ -104,8 +104,8 @@ def test_find_not_should_ignore_path_glob(self, tmp_path):
}
ignore_list_file = ".airflowignore_glob"
print("-" * 20)
for file_path in find_path_from_directory(plugin_folder_path, ignore_list_file, "glob"):
file_path = Path(file_path)
for raw_file_path in find_path_from_directory(plugin_folder_path, ignore_list_file, "glob"):
file_path = Path(raw_file_path)
if file_path.is_file() and file_path.suffix == ".py":
detected_files.add(file_path.name)
print(file_path)
Expand Down
5 changes: 2 additions & 3 deletions chart/docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,8 @@ def _format_examples(param_name: str, schema: dict) -> str | None:

# Nicer to have the parameter name shown as well
out = ""
for ex in schema["examples"]:
if schema["type"] == "array":
ex = [ex]
for ex_data in schema["examples"]:
ex = [ex_data] if schema["type"] == "array" else ex_data
out += yaml.dump({param_name: ex})
return out

Expand Down
4 changes: 2 additions & 2 deletions dev/airflow-license
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ if __name__ == "__main__":

notices = get_notices()

for notice in notices:
notice = notice[0]
for notice_arr in notices:
notice = notice_arr[0]
license = parse_license_file(notice[1])
print(f"{notice[1]:<30}|{notice[2][:50]:<50}||{notice[0]:<20}||{license:<10}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2315,11 +2315,11 @@ def is_package_in_dist(dist_files: list[str], package: str) -> bool:

def get_suffix_from_package_in_dist(dist_files: list[str], package: str) -> str | None:
"""Get suffix from package prepared in dist folder."""
for file in dist_files:
if file.startswith(f"apache_airflow_providers_{package.replace('.', '_')}") and file.endswith(
for filename in dist_files:
if filename.startswith(f"apache_airflow_providers_{package.replace('.', '_')}") and filename.endswith(
".tar.gz"
):
file = file[: -len(".tar.gz")]
file = filename[: -len(".tar.gz")]
version = file.split("-")[-1]
match = VERSION_MATCH.match(version)
if match:
Expand Down
6 changes: 3 additions & 3 deletions dev/breeze/src/airflow_breeze/global_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,9 +642,9 @@ def get_task_sdk_version():
def get_airflow_extras():
airflow_dockerfile = AIRFLOW_ROOT_PATH / "Dockerfile"
with open(airflow_dockerfile) as dockerfile:
for line in dockerfile.readlines():
if "ARG AIRFLOW_EXTRAS=" in line:
line = line.split("=")[1].strip()
for line_raw in dockerfile.readlines():
if "ARG AIRFLOW_EXTRAS=" in line_raw:
line = line_raw.split("=")[1].strip()
return line.replace('"', "")


Expand Down
Loading
Loading