-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.6.2
What happened
After upgrading to version 2.6.2, we started getting a ZeroDivisionError the first time some ExternalTaskSensor were poked.
What you think should happen instead
Sensor should exit with return code 0, as it did when cleared after the first fail:
[2023-06-19, 10:03:57 UTC] {external_task.py:240} INFO - Poking for task_group 'lote2db_etl' in dag 'malha_batch' on 2023-06-16T08:30:00+00:00 ...
[2023-06-19, 10:03:57 UTC] {base.py:255} INFO - Success criteria met. Exiting.
[2023-06-19, 10:03:57 UTC] {taskinstance.py:1345} INFO - Marking task as SUCCESS. dag_id=rotinas_risco, task_id=ets_malha_lote2db_etl, execution_date=20230616T080000, start_date=20230619T100357, end_date=20230619T100357
[2023-06-19, 10:03:57 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
How to reproduce
We have a DAG, called malha_batch, whose schedule parameter equals to "30 8 * * 1-5". We then have another one, called rotinas_risco, whose schedule parameter equals to "0 8 * * 1-5", with four ExternalTaskSensor pointing to malha_batch. Below are their definitions:
Excerpt from rotinas_risco.py
ets_malha_bbg_post_processing = ExternalTaskSensor(
task_id="ets_malha_bbg_post_processing",
external_dag_id="malha_batch",
external_task_group_id="bloomberg.post_processing",
allowed_states=[State.SUCCESS, State.SKIPPED],
failed_states=[State.FAILED],
execution_delta=timedelta(minutes=-30),
poke_interval=300,
mode="reschedule",
)
ets_malha_bbg_refreshes = ExternalTaskSensor(
task_id="ets_malha_bbg_refreshes",
external_dag_id="malha_batch",
external_task_group_id="bloomberg.refreshes",
allowed_states=[State.SUCCESS, State.SKIPPED],
failed_states=[State.FAILED],
execution_delta=timedelta(minutes=-30),
poke_interval=300,
mode="reschedule",
)
ets_malha_bbg_conversion_factor_to_base = ExternalTaskSensor(
task_id="ets_malha_bbg_conversion_factor_to_base",
external_dag_id="malha_batch",
external_task_id="prices.conversion_factor_to_base",
allowed_states=[State.SUCCESS, State.SKIPPED],
failed_states=[State.FAILED],
execution_delta=timedelta(minutes=-30),
poke_interval=300,
mode="reschedule",
)
ets_malha_lote2db_etl = ExternalTaskSensor(
task_id="ets_malha_lote2db_etl",
external_dag_id="malha_batch",
external_task_group_id="lote2db_etl",
allowed_states=[State.SUCCESS, State.SKIPPED],
failed_states=[State.FAILED],
execution_delta=timedelta(minutes=-30),
poke_interval=300,
mode="reschedule",
)Out of those four ExternalTaskSensor, just one behave as expected, while the three others failed upon the first poking attempt with the following traceback:
[2023-06-19, 08:00:02 UTC] {external_task.py:240} INFO - Poking for task_group 'lote2db_etl' in dag 'malha_batch' on 2023-06-16T08:30:00+00:00 ...
[2023-06-19, 08:00:02 UTC] {taskinstance.py:1824} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/conda/envs/airflow/lib/python3.9/site-packages/airflow/sensors/base.py", line 225, in execute
raise e
File "/opt/conda/envs/airflow/lib/python3.9/site-packages/airflow/sensors/base.py", line 212, in execute
poke_return = self.poke(context)
File "/opt/conda/envs/airflow/lib/python3.9/site-packages/airflow/utils/session.py", line 76, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/conda/envs/airflow/lib/python3.9/site-packages/airflow/sensors/external_task.py", line 260, in poke
count_failed = self.get_count(dttm_filter, session, self.failed_states)
File "/opt/conda/envs/airflow/lib/python3.9/site-packages/airflow/sensors/external_task.py", line 369, in get_count
count = (
ZeroDivisionError: division by zero
The successful ExternalTaskSensor logged as follows:
[2023-06-19, 08:00:02 UTC] {external_task.py:232} INFO - Poking for tasks ['prices.conversion_factor_to_base'] in dag malha_batch on 2023-06-16T08:30:00+00:00 ...
[2023-06-19, 08:00:02 UTC] {taskinstance.py:1784} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2023-06-19, 08:00:02 UTC] {local_task_job_runner.py:225} INFO - Task exited with return code 0
[2023-06-19, 08:00:02 UTC] {taskinstance.py:2653} INFO - 0 downstream tasks scheduled from follow-on schedule check
I was not able to reproduce the error with a smaller example, but the mere fact that, out of four similarly-defined sensors, three failed and one succeeded, to me, suggests we are facing a bug. Additionally, the problem did not arise with version 2.6.1.
Operating System
Ubuntu 20.04.6 LTS (Focal Fossa)
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==8.1.0
apache-airflow-providers-celery==3.2.0
apache-airflow-providers-common-sql==1.5.1
apache-airflow-providers-ftp==3.4.1
apache-airflow-providers-http==4.4.1
apache-airflow-providers-imap==3.2.1
apache-airflow-providers-postgres==5.5.0
apache-airflow-providers-redis==3.2.0
apache-airflow-providers-sqlite==3.4.1
Deployment
Virtualenv installation
Deployment details
Just a vanilla setup following https://airflow.apache.org/docs/apache-airflow/stable/installation/installing-from-pypi.html.
Anything else
Please let me know whether additional log files from the scheduler or executor (Celery) should be provided.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct