-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Fix CLI dag parsing issues during reserialization #59928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix CLI dag parsing issues during reserialization #59928
Conversation
|
Hello Maintainers - This is my first PR modifying a core component of dag processing, as far as I can tell. Please feel free to suggest if I am overlooking some other point of possible failure. Thanks! |
Nataneljpwd
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a few comments, maybe it is worth trying to change the loader
| new_module = importlib.util.module_from_spec(spec) | ||
| sys.modules[spec.name] = new_module | ||
| loader.exec_module(new_module) | ||
| module_dir = os.fspath(Path(filepath).parent) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like a patch only of it is 1 directory above or below, what happens when a task is nested 2 modules deep?
| # Insert module directory into sys.path to allow relative imports. Imp to note | ||
| # that this is just for the duration of `loader.exec_module(new_module)`, ensuring | ||
| # that helper modules located alongside the DAG are importable. | ||
| path_inserted = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this intermediate variable? As it will be in position 0 only when we add it there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Nataneljpwd You're right, this is an overkill. I will remove it.
| @@ -439,7 +439,20 @@ def parse(mod_name, filepath): | |||
| spec = importlib.util.spec_from_loader(mod_name, loader) | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should change to not use a source file loader and instead use spec_from_file_location and use the loader withing the spec?
I think it can solve this issue
Thanks, I will take a look and make the changes over the next few days :) |
Fixes #59003.
Issue 1:
airflow dags reserializefails to import related modules in CLI while working fine on UI:ModuleNotFoundError: No module named 'module.task_b'Fix: So this was happening because when airflow tries to load the new module in:
airflow/airflow-core/src/airflow/dag_processing/dagbag.py
Line 442 in 1585e45
The airflow cli that runs
airflow dags reserializeloads each DAG file with a synthetic, top‑level module name viaSourceFileLoaderand immediately executes it, but it never adds the DAG’s directory, i.e./files/dagsin this case, tosys.path. Once we add the helper package to thesys.path, it works as expected.Issue 2:
airflow tasks clear my_dagcomplains:AttributeError: 'DAG' object has no attribute 'clear'Fix: This was happening because of a discrepancy in the
typeof the dag object returned by thetask_clear()function in:airflow/airflow-core/src/airflow/cli/commands/task_command.py
Lines 472 to 477 in cf80ae1
Specifically, in the
ifblock, the call toget_dag_by_file_location()was returning a DAG object which has noclear()attribute, whereas in theelseblock, the call toget_dags()returns a SerializedDAG object which the expectedclear()attribute. Since thisget_dag_by_file_location()does not seem to be called elsewhere, I simply altered the way we retrieve dags here from the DagModel to the SerializedDagModel.^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.