-
Notifications
You must be signed in to change notification settings - Fork 16.4k
[AIRFLOW-3136] Add retry_number to TaskInstance Key property to avoid race condition #3994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
airflow/models.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now the PK columns of the TI table is (self.dag_id, self.task_id, self.execution_date) and try_number is adjusted in place.
This change might be a good idea but I think it is more complex than just this -- for instance it probably makes sense to store each individual try in the task instance table, but that is a much bigger change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ashb for your comment.
Storing individual try information in task_instance table will actually connect more dots, currently we get individual task try information by joining with task_fail table, though not explicitly mentioned for which try job was failed but playing with timestamps, one can infer retry_number.
On the other side, I guess it's okay to keep schema as is because it will not add additional load on task_instance table instead it will keep on delegating load to task_fail only which has comparatively lesser or almost no reads, however adding retry_number in task_fail table will add value.
IMHO Keeping key property only for TaskInstance class seems to be a special behaviour probably it was meant only for scheduler to convey eligible task information to executor queued_tasks, if that's not true, we can also introduce executor_key as another property and make existing references to this new property, thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ashb Looking forward to your feedback!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This collision situation is arising because of the following steps happening periodically by scheduler.
- In the _execute_helper method, heartbeat method is being called, which calls execute_async in further and putting key(dag_id, task_id, execution_date) and command to executor queue and then from there onwards based on executor type, task starts executing asynchronously.
- Sync methods runs followed by execute_async method which push keys from result_queue to event_buffer
- _process_executor_events method being called followed by hearbeat method which detects external killing of task, if TI.state is queued and event_buffer key status is success/failed
Another suggestion for change could be taking out execute_async method from heartbeat method and call it after _process_executor_events method call in _execute_helper method in jobs.py
…tting race condition Changes wrt key change Added fix in failed test case
…tting race condition Changes in Test case Made change to anticipate new member of key Renamed retry to try
Codecov Report
@@ Coverage Diff @@
## master #3994 +/- ##
==========================================
- Coverage 75.49% 75.48% -0.02%
==========================================
Files 199 199
Lines 15947 15947
==========================================
- Hits 12039 12037 -2
- Misses 3908 3910 +2
Continue to review full report at Codecov.
|
|
Is there an easy way I can reproduce this locally to see the bug for myself? |
|
@ashb For us, we can easily reproduce by running a DAG which has a task bound to fail with retries set >0(preferably 4 or 5) and retry_delay set to 1 second, let me know if you're able to repro else I'll share sample DAG file. Also, our email_on_retry and email_on_failure is marked true, so once we see this issue of race condition, we will get an email something like below
EDIT-1: We have also done 1 testing at our side, where we were not able to reproduce issue when dag_folder consist less number of files but as soon as it touched 40+ Dags in our dag_folder, we were able to reproduce, probably increasing dags in dag_folder increasing 1 cycle of dags processing. EDIT-2: Dag which was used to reproduce this issue can be found here and this is actually the originally shipped example dag with few changes mentioned here. |
|
Thankss @vardancse that is useful. I will find time to look at this in detail (hopefully on Monday?) |
|
@ashb let me know if by any chance you got time to look at the issue? |
|
Thanks, your reproduction steps were enough for me to reproduce this. The fix seemed to make the error go away, and things still get scheduled :D |
…dition (#3994) We were seeing an intermittent issue where executor reports task instance finished while task says it's in queue state, it was due to a race condition between scheduler which was clearing event_buffer in _process_executor_events method in jobs.py executor was about to put next_retry task's status as running which was failed in previous try. So, we thought to add retry_number as the member of TaskInstance key property.
…dition (apache#3994) We were seeing an intermittent issue where executor reports task instance finished while task says it's in queue state, it was due to a race condition between scheduler which was clearing event_buffer in _process_executor_events method in jobs.py executor was about to put next_retry task's status as running which was failed in previous try. So, we thought to add retry_number as the member of TaskInstance key property.
…dition (apache#3994) We were seeing an intermittent issue where executor reports task instance finished while task says it's in queue state, it was due to a race condition between scheduler which was clearing event_buffer in _process_executor_events method in jobs.py executor was about to put next_retry task's status as running which was failed in previous try. So, we thought to add retry_number as the member of TaskInstance key property.
…dition (apache#3994) We were seeing an intermittent issue where executor reports task instance finished while task says it's in queue state, it was due to a race condition between scheduler which was clearing event_buffer in _process_executor_events method in jobs.py executor was about to put next_retry task's status as running which was failed in previous try. So, we thought to add retry_number as the member of TaskInstance key property.
| labels['dag_id'], labels['task_id'], | ||
| self._label_safe_datestring_to_datetime(labels['execution_date'])) | ||
| self._label_safe_datestring_to_datetime(labels['execution_date']), | ||
| labels['try_number']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dict labels doesn't contain try_number key, as it's not set in WorkerConfiguration make_pod. That's the reason why pods are not deleted.
https://github.com/apache/incubator-airflow/blob/v1-10-stable/airflow/contrib/kubernetes/worker_configuration.py#L197
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
D'oh.
Would someone (@vardancse? @aliaksandr-d?) be able to fix this, or at least add a test case that covers this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ashb I'm looking into it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aliaksandr-d @ashb Is there any bug logged for the same so that I can raise the PR for the same?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aliaksandr-d can you please apply the patch and confirm the resolution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aliaksandr-d #4163 already created for the same by @wyndhblb
…dition (apache#3994) We were seeing an intermittent issue where executor reports task instance finished while task says it's in queue state, it was due to a race condition between scheduler which was clearing event_buffer in _process_executor_events method in jobs.py executor was about to put next_retry task's status as running which was failed in previous try. So, we thought to add retry_number as the member of TaskInstance key property.
…dition (apache#3994) We were seeing an intermittent issue where executor reports task instance finished while task says it's in queue state, it was due to a race condition between scheduler which was clearing event_buffer in _process_executor_events method in jobs.py executor was about to put next_retry task's status as running which was failed in previous try. So, we thought to add retry_number as the member of TaskInstance key property.
…3994) We were seeing an intermittent issue where executor reports task instance finished while task says it's in queue state, it was due to a race condition between scheduler which was clearing event_buffer in _process_executor_events method in jobs.py executor was about to put next_retry task's status as running which was failed in previous try. So, we thought to add retry_number as the member of TaskInstance key property.
…dition (apache#3994) We were seeing an intermittent issue where executor reports task instance finished while task says it's in queue state, it was due to a race condition between scheduler which was clearing event_buffer in _process_executor_events method in jobs.py executor was about to put next_retry task's status as running which was failed in previous try. So, we thought to add retry_number as the member of TaskInstance key property.
Make sure you have checked all steps below.
Jira
Description
We were seeing an intermittent issue where executor reports task instance finished while task says it's in queue state, it was due to a race condition between scheduler which was clearing event_buffer in _process_executor_events method in jobs.py executor was about to put next_retry task's status as running which was failed in previous try. So, we thought to add retry_number as the member of TaskInstance key property.
Tests
Commits
Documentation
Code Quality
git diff upstream/master -u -- "*.py" | flake8 --diff