-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Separate the case where DagFileInfo should have bundle path from the one where it doesn't have to #46528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
08185ce to
8b610e9
Compare
8b610e9 to
127dc2f
Compare
airflow/dag_processing/parse_info.py
Outdated
| In-parsing time context about bundle being processed. | ||
|
|
||
| :param name: Bundle name. | ||
| :param root_path: Root path of the bundle version. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might not exactly be the "root" of the bundle because a bundle's path attr isn't exactly "where the bundle is located on disk" but "where the dags are located on disk".
the this path you refer to here i think is the path attr on bundle right? maybe we should rename it to dags_path in the bundle interface. paging @jedcunningham. but here it's probably better to be ambiguous and call it path which aligns with path than to call it root_path when that might be misleading.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't think of root_path could be interpreted like that, since the bundle itself doesn't have to be part of the FS at all, this name came as "root of the repositories". Anyway, bundle.path is straight-forward and doesn't leave room for misinterpretation, proceeding with path. Renamed.
And thanks for pointing out offline to be careful with hash of these structs - there was a bug (before and after the JSON serialization had different hashes), I've added some test along the fix.
As for the "DagEntrypoint" name, I've specifically avoided file term for a couple of reasons.
- First, to avoid confusion with an actual "file" in FS and have different names between case where file has a backing bundle's path from the case where it is not relevant and where we identify by bundle name + in-bundle DAG path.
- Second, I found it strange that DAG has a strict association with a file and a bit restrictive.
A "DAG path" could also work as it exactly describes it, but "path" is used a lot in many point. "Entrypoint" is not perfect, but my reasoning was that: - it is not overused in Airflow yet and
- that what effectively DAG paths is - there is no guarantee that the DAG structure itself defined in the file, for ex. it can be defined in some other module that ends up being imported and set in top-level DAGs in this module. So it is actually an entrypoint for the code that populates the DAG at the end of the import. Let me know what you think about this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolving for the original comment. Please reopen if "DagEntrypoint" name (raised offline) is of a concert
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really a fan of the DagEntrypoint name - it really is file based. e.g. processors are processing a specific file, stats are also for a specific file.
I'll try and come up with a suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about just DagFile?
This keeps it pretty simple. I agree entrypoint is unnecessarily confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really don't know about DagFile. The problem is - that structure is not a file. DagFile gives a false hope that it can be opened, but you can't really do it from this structure without some extra information (bundle and version). Because it's an identifier = a.k.a virtual path for the module that need to be imported to get a DAG, that only Airflow can process.
The stats are also not for a file, but for the identifier of a module. On every refresh a bundle might generate unique temporary path with files (even without changing the version), but the stat would still use the same key.
I will proceed with DagFile to unblock the work stream, since there seem to be a greenlight for it from 2 reviewers, but I feel like it will be the problem in the future :D
dstandish
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks ok to me but i think you need to rename root_path to simply path
| def __hash__(self): | ||
| return hash(self._normalized()) | ||
|
|
||
| def __eq__(self, other: Any): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we not raise NotImplementedError when comparing against something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added return NotImplemented (since the rest of the Airflow codebase prefers this pattern)
airflow/dag_processing/parse_info.py
Outdated
| """ | ||
| DAG file entrypoint identifier. | ||
|
|
||
| Fully identifies an entrypoint for potential DAG files within an Airflow deployment and other Airflow entities related to it (import errors, warnings, etc.). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"entrypoint for a dag file" does not really make sense. a dag file is just a dag file it does not have an entrypoint.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
airflow/dag_processing/parse_info.py
Outdated
| @dataclass(frozen=True) | ||
| class ParseFileInfo(_ParseFileInfo): | ||
| """ | ||
| Information about a DAG file at parse time with fixed version. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when you say "with fixed version" .... it doesn't seem like you really guarantee that since version is optional in ParseBundleInfo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was referred to a "version" as "whatever presented as a version of a bundle data", not as "version uid". Changed to "resolved bundle metadata".
| path=bundle.path, | ||
| version=bundle.version, | ||
| ) | ||
| return (ParseFileInfo(rel_path=p, bundle=bundle_info) for p in rel_paths) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are going to have version baked in here (via the bundle info), we also need to go out and update the entries in _file_queue also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I noticed while addressing this comment is that we currently don't differentiate between entries in the queue that are there "because it's time to update a DAG" vs "because we need to execute a callback with a fixed DAG version". Essentially, dag processor always executes a callback of the last refreshed version if it was already present in the queue. Should we add TODO to fix that and allow callback executions at fixed version?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It already does, check out this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Er, looking again, it does not. Going to reopen that issue.
read it too quick, it does :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, added TODO to the callbacks code to fix it up (probably it is easier to have a separate queue to process first and don't persist parsingng result). Will probably follow-up with a test for the sequence of "parse new DAG version" -> "request callback on older one", check that the callback and DAG latest version in DB are working as expected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't see the edit. Yeah, I think it works and avoids duplicates by
- (1) "If this is a refresh latest version request - don't persist version, just store absolute path"
- (2) "If this is a callback and bundle is versioned - store both version and absolute path for the version" - guarantees no collision with (1) due to a version difference
- (3) "If this is a callback and bundle is not versioned - store just absolute path" and permit collision with (1). A very minor bug - this might cause a regular "refreshing" parsing being skipped once, as its result will be discarded if any of the callbacks got called. Technically, if you time its execution right, you can achieve an non-updateable DAG, even :)
That looks like a pretty fragile invariant, wonder if having two queues is more transparent...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, looking back into it, I believe on the longer run splitting the queue first and not mixing the processing of callbacks from "getting the freshest version" is probably the way to go, but it would've must come before this refactoring. (That might also be in line with the idea of moving callbacks away from executor, which were mentioned in the very beginning of AIP-72, but then scoped out)
But taking the size of such refactoring into the account, the right thing to do right now (at the middle of February) is to stick with what we have, which is universal DagFileInfo key that works correctly for most of the cases :)
I'll mark this PR as a draft and will proceed with cleaning up abs. locs from DagCode, ImportErrors & SerializedDag with the current DagFileInfo key, let's hope it will continue working. Sorry for consuming your time with its review
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions. |
related: #45623 (AIP-66 cleanup)
Before migrating the rest of entities in DB that rely on an absolute path of the DAG to use relative one + bundle name, this PR clearly separates the cases where DAG metadata has absolute path determined from the ones, where it is not needed. Effectively, it splits
DagFileInfointo 2 entities:DagEntrypoint- where DAG(s) file is referenced without a tie to any version (and path where it is actually present as a file)ParseFileInfo- where DAG(s) file is parsed within a fixed parse context - here we guarantee that the file has some absolute path attached to it and can be read.This separation of types delegates the pain of figuring out of "do I have an absolute path set here?" to static type checks and revealed a couple of bugs in tests.
With this implemented, the migration of DAG code and import errors becomes straight forward and removes the need to pass multiple additional parameters across multiple methods in
dag_processing/collection.pyof model. Otherwise we would have to drag bundle's name, path and version as arguments across multiple functions (or do an even more disruptive refactoring, which wouldn't align with feature-freeze goals at all).^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.