-
Notifications
You must be signed in to change notification settings - Fork 16.4k
clear xcom when task starts #1180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Coverage increased (+0.009%) to 66.806% when pulling 7466b767f174b42d0d3c83a6369509130210e1d9 on jgao54:791 into 9d6b6d5 on airbnb:master. |
|
LGTM, @jlowin, as the XCom creator, do you approve? |
|
@jgao54 A common case with XComs is pushing data into the future -- for example, a task creates an XCom when it's run to mark the This is good functionality but my recommendation is to add this function to |
|
@jlowin good point, overlooked that use case. The next question is should --clear_xcom_data be defaulted to True or False in 'airflow clear'? I think it is more likely to surprise users if XCom data get deleted when all they want to do is clear a TI, thoughts? |
|
@jgao54 I think this is a great idea and something I think I overlooked originally -- as I think through the implementation, this is the wrinkle:
I think when an XCom gets pushed the execution date is part of the metadata. How about when you clear a task, it clears any XComs that it created which have the same execution date as the cleared task? Then I would support that being on by default. In the above example, the XCom data pushed on March 19 would disappear but the March 18 data would still be available. What do you think? |
|
Something like this, inside your new function: session.query(XCom).filter(
XCom.dag_id == self.dag_id,
XCom.task_id == self.task_id,
# this is the new bit, need to check the correct properties
XCom.execution_date == self.execution_date
).delete() |
My guess is we should probably add execution_date here as TI's execution_date if we want to use that approach. |
|
@jgao54 the execution date will be automatically inserted (see https://github.com/airbnb/airflow/blob/master/airflow/models.py#L1293). Also, after discussion with @r39132, I think my first reaction was too strong. Here's my current thinking:
|
|
@jlowin Cool, he just synced up with me as well. Will add that line to current change and we should be good to go. |
|
@jlowin done. |
|
@jgao54 @jlowin Just making my out-of-band comment in-band for posterity : In the issue, the reporter requested that |
|
Coverage increased (+0.09%) to 66.885% when pulling 44f64bbd464cbe9ed4926dfb2c4d61d467362401 on jgao54:791 into 9d6b6d5 on airbnb:master. |
|
@jgao54 could you please make one more change -- the use of @provide_session
def clear_xcom_data(self, session=None):
"""
Clears all XCom data from the database for the task instance
"""
session.query(XCom).filter(
XCom.dag_id == self.dag_id,
XCom.task_id == self.task_id,
XCom.execution_date == self.execution_date
).delete()
session.commit()Then you won't need to pass session to I'm going to make my own PR to clean up some of the other leftover uses of |
|
@jlowin now that this is merged, how is the recommended approach to achieve this scenario:
? My |
|
/cc @mistercrunch |
|
@rogaha an easy solution is to have two tasks, one dependent on the other, and have the dependent task push the XCom and the first task check for it. |
|
@jlowin I didn't follow. Can you show me an example? How can I pass the |
|
Just FYI, this caused the following bug: https://issues.apache.org/jira/browse/AIRFLOW-703 |
|
@rogaha for example, let's say your DAG contains task A and you want task A to check its own Add a task B to your DAG that fires immediately after task A. All task B does is take A's XCOM and republishes it as its own. That way, the next time A runs, it will look for B's XCOM (which won't have been deleted yet) instead of its own (which will have been deleted). This should workaround issues related to clearing a task's own XComs. |
…ache#1180) * Added BigQueryRelationTransformer and deleted BigQueryNodeVisitor as BigQuery classes aren't accessible from the application classloader at runtime Signed-off-by: Michael Collado <mike@datakin.com> * Changed LibraryTest and JDBCRelationVisitorTest to use SparkAgentTestExtension for adding MarquezAgent to runtime so classes are transformed before being loaded Signed-off-by: Michael Collado <mike@datakin.com> * Added test for reading from big query and writing to hdfs to validate input and output dataset construction Signed-off-by: Michael Collado <mike@datakin.com> * Fixed bug where schema was missing from InsertIntoHadoopFsRelation commands Signed-off-by: Michael Collado <mike@datakin.com> * Added fork for every test to avoid BQ class from already being loaded at test runtime Signed-off-by: Michael Collado <mike@datakin.com> Co-authored-by: Willy Lulciuc <willy@datakin.com>
When a task starts, XCom should be cleared for that task instance. (Referencing issue #791)