-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Add TaskMap and TaskInstance.map_id #20286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
637d31c to
13105b0
Compare
13105b0 to
aded37b
Compare
airflow/migrations/versions/e655c0453f75_add_taskmap_and_map_id_on_taskinstance.py
Outdated
Show resolved
Hide resolved
cc39c58 to
76cb924
Compare
|
I think this is ready-ish. Probably needs more eyes on the migration code. |
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Migration looks good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not that it matters, but for consistency should we have the same default here as we do on TI's map_index column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default on TI.map_index is mainly added so we don’t need to change too much existing code (otherwise we’d need to add a ton of map_index=-1), but this being a new class, I think I prefer being explicit instead.
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need more tests of _record_task_map_for_downstreams behaviour though please.
f59e703 to
9a41876
Compare
|
Alright, modified the code a bit and added a few tests. We can’t put mapped task in a DAG yet ( |
Yeah, I guess those'll be fixed by #20743 |
9a41876 to
572a6d5
Compare
TaskMap is populated by a TaskInstance when it pushes its return value to XCom.
On MySQL you must not drop an index "needed by a primary key", so we need to drop the primary key first. On both MySQL and MSSQL, a column used by a primary key cannot be nullable (which we already accounted for, but I forgot to add nullable=False in the migration file).
This call actually makes more sense in TI._execute_task(), which calls xcom_push() for the return value, since we get to remove a somewhat hackish if check.
This makes it easier to keep the logic in sync with BaseOperator.
An XCom value is arbitrary and can potentially be a giant blob of data, which would make the log unreadable. Instead, only log the *type* of the XCom value instead; since the offending TaskInstance's key is also logged, the user can always look up the actual value in the XCom storage if needed. The exception still contains a reference to the actual value so the catching frame can access it. Since that value is already in memory anyway, keeping it a bit longer shouldn't be too big a problem.
We need to traverse the entire DAG to find mapped decendants because task groups are not currently recorded in the upstream-downstream lists, only operators. This should change in the future, but until then we need to make do.
dea0f77 to
b7038e9
Compare
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doc change but LGTM
|
The PR most likely needs to run full matrix of tests because it modifies parts of the core of Airflow. However, committers might decide to merge it quickly and take the risk. If they don't merge it quickly - please rebase it to the latest main at your convenience, or amend the last commit of the PR, and push it with --force-with-lease. |
Co-authored-by: Ash Berlin-Taylor <ash_github@firemirror.com>
| with op.batch_alter_table("task_instance") as batch_op: | ||
| # I think we always use this name for TaskInstance after 7b2661a43ba3? | ||
| batch_op.drop_constraint("task_instance_pkey", type_="primary") | ||
| batch_op.add_column(Column("map_index", Integer, nullable=False, default=-1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I’m investigating if it works as expected on all engines (if not I’ll just do an UPDATE SET map_index = -1 after this line)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doing it with a server default is much preferred -- as otherwise an UPDATE requires us to re-write/touch every row, but a server default (at least on Postgres) doesn't.
TaskMap is populated by a TaskInstance when it pushes its return value to XCom.
Mostly for feedback for now. The pushing part needs some additional logic from #19965 to discover downstreams that need mapping.