From f238f1d614061573fca48817cbf5314c772d12d2 Mon Sep 17 00:00:00 2001 From: Joy Gao Date: Sun, 20 Mar 2016 18:49:02 -0700 Subject: [PATCH] clear xcom data when task instance starts --- airflow/models.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/airflow/models.py b/airflow/models.py index 805cb85eabeb1..dd14f57399582 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -732,6 +732,18 @@ def refresh_from_db(self, main_session=None): session.commit() session.close() + @provide_session + def clear_xcom_data(self, session=None): + """ + Clears all XCom data from the database for the task instance + """ + session.query(XCom).filter( + XCom.dag_id == self.dag_id, + XCom.task_id == self.task_id, + XCom.execution_date == self.execution_date + ).delete() + session.commit() + @property def key(self): """ @@ -979,6 +991,7 @@ def run( session = settings.Session() self.refresh_from_db(session) session.commit() + self.clear_xcom_data() self.job_id = job_id iso = datetime.now().isoformat() self.hostname = socket.gethostname()