-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow Provider(s)
amazon
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==7.1.0
Apache Airflow version
2.5.1
Operating System
Docker on ECS (apache/airflow:2.5.1-python3.9)
Deployment
Other Docker-based deployment
Deployment details
No response
What happened
The EcsRunTaskOperator has a reattach option. The idea is that when a task is launched on ECS, its arn will be saved in the xcom table so that if airflow restarts or something, it'll be able to reattach to the currently-running task in ECS rather than launching a new one.
It always fails to get the arn of the running task from the xcom table however.
What you think should happen instead
When airflow restarts and retries an EcsRunTaskOperator task that was killed by the the restart, it should find the arn of the currently-running ECS task and continue waiting for that ECS task to finish instead of starting a new one.
How to reproduce
- Create a DAG containing an
EcsRunTaskOperatortask which, for testing purposes, takes at least a few minutes to complete - Trigger a DAG run
- While the task is running, redeploy airflow
Check the task logs when the task restarts. In the logs you'll see "No active previously launched task found to reattach"
Anything else
There are two problems from what I can tell.
Problem 1
When is pushes the xcom data, it uses the task_id of the task.
When in tries to retrieve the data it uses a made-up task_id, so it'll never find the one saved earlier.
It also uses the same made-up task_id when it tries to later delete the xcom data.
Problem 2
Switching from the made-up task_id to the normal task_id during retrieval doesn't help, since all xcom rows with the task/dag/run id are deleted when the task restarts, so the arn saved on the previous attempt is never available.
I tried changing the xcom_push to this:
XCom.set(
task_id=self.REATTACH_XCOM_TASK_ID_TEMPLATE.format(task_id=self.task_id),
key=self.REATTACH_XCOM_KEY,
value=self.arn,
dag_id=self.dag_id,
run_id=context["ti"].run_id,
)This causes this error:
psycopg2.errors.ForeignKeyViolation: insert or update on table "xcom" violates foreign key constraint "xcom_task_instance_fkey"
DETAIL: Key (dag_id, task_id, run_id, map_index)=(meltano_distribution, distribution_to_snowflake_task_arn, manual__2023-02-17T19:08:34.015748+00:00, -1) is not present in table "task_instance".
You used to be able to make up a task_id as a hack to save things in the xcom table, but that foreign key constraint must have been added at some point.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct