diff --git a/airflow/models.py b/airflow/models.py index 805cb85eabeb1..dd14f57399582 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -732,6 +732,18 @@ def refresh_from_db(self, main_session=None): session.commit() session.close() + @provide_session + def clear_xcom_data(self, session=None): + """ + Clears all XCom data from the database for the task instance + """ + session.query(XCom).filter( + XCom.dag_id == self.dag_id, + XCom.task_id == self.task_id, + XCom.execution_date == self.execution_date + ).delete() + session.commit() + @property def key(self): """ @@ -979,6 +991,7 @@ def run( session = settings.Session() self.refresh_from_db(session) session.commit() + self.clear_xcom_data() self.job_id = job_id iso = datetime.now().isoformat() self.hostname = socket.gethostname()