-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
Milestone
Description
Apache Airflow version
2.3.2 (latest released)
What happened
The mini-scheduler run after a task finishes sometimes fails with an error "AttributeError: 'NoneType' object has no attribute 'keys'"; see full traceback below.
What you think should happen instead
No response
How to reproduce
The minimal reproducing example I could find is this:
import pendulum
from airflow.models import BaseOperator
from airflow.utils.task_group import TaskGroup
from airflow.decorators import task
from airflow import DAG
@task
def task0():
pass
class Op0(BaseOperator):
template_fields = ["some_input"]
def __init__(self, some_input, **kwargs):
super().__init__(**kwargs)
self.some_input = some_input
if __name__ == "__main__":
with DAG("dag0", start_date=pendulum.now()) as dag:
with TaskGroup(group_id="tg1"):
Op0(task_id="task1", some_input=task0())
dag.partial_subset("tg1.task1")Running this script with airflow 2.3.2 produces this traceback:
Traceback (most recent call last):
File "/app/airflow-bug-minimal.py", line 22, in <module>
dag.partial_subset("tg1.task1")
File "/venv/lib/python3.10/site-packages/airflow/models/dag.py", line 2013, in partial_subset
dag.task_dict = {
File "/venv/lib/python3.10/site-packages/airflow/models/dag.py", line 2014, in <dictcomp>
t.task_id: _deepcopy_task(t)
File "/venv/lib/python3.10/site-packages/airflow/models/dag.py", line 2011, in _deepcopy_task
return copy.deepcopy(t, memo)
File "/usr/local/lib/python3.10/copy.py", line 153, in deepcopy
y = copier(memo)
File "/venv/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 1156, in __deepcopy__
setattr(result, k, copy.deepcopy(v, memo))
File "/venv/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 1000, in __setattr__
self.set_xcomargs_dependencies()
File "/venv/lib/python3.10/site-packages/airflow/models/baseoperator.py", line 1107, in set_xcomargs_dependencies
XComArg.apply_upstream_relationship(self, arg)
File "/venv/lib/python3.10/site-packages/airflow/models/xcom_arg.py", line 186, in apply_upstream_relationship
op.set_upstream(ref.operator)
File "/venv/lib/python3.10/site-packages/airflow/models/taskmixin.py", line 241, in set_upstream
self._set_relatives(task_or_task_list, upstream=True, edge_modifier=edge_modifier)
File "/venv/lib/python3.10/site-packages/airflow/models/taskmixin.py", line 185, in _set_relatives
dags: Set["DAG"] = {task.dag for task in [*self.roots, *task_list] if task.has_dag() and task.dag}
File "/venv/lib/python3.10/site-packages/airflow/models/taskmixin.py", line 185, in <setcomp>
dags: Set["DAG"] = {task.dag for task in [*self.roots, *task_list] if task.has_dag() and task.dag}
File "/venv/lib/python3.10/site-packages/airflow/models/dag.py", line 508, in __hash__
val = tuple(self.task_dict.keys())
AttributeError: 'NoneType' object has no attribute 'keys'
Note that the call to dag.partial_subset usually happens in the mini-scheduler: https://github.com/apache/airflow/blob/2.3.2/airflow/jobs/local_task_job.py#L253
Operating System
Linux (Debian 9)
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct