-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.10.2
What happened?
Since 2.10, when running a sensor task in mode=reschedule, the try_number is incremented with each poke. This has a lot of task execution behavior implications such as
- early task failures as
try_numberis incrememnted with poke and will exceed max number of retries more quickly - each poke is broken out into a new log file instead of aggregating many pokes into a single file
From my testing this only occurs with mysql database. I have been unable to repro with sqlite in multiple environments.
What you think should happen instead?
Sensors with mode=reschedule should only increment try_number after timeout has been reached or there is an error on execution.
How to reproduce
Execute the following DAG in an airflow environment using mysql as the metadb. It will repeatedly run a sensor with mode=reschedule. On the Logs tab for the sensor task, you can verify that each poke is being broken out into its own log file and that try_number is being incremented
from airflow.decorators import dag, task
from pendulum import datetime
import requests
from airflow.sensors.python import PythonSensor
def check_dog_availability_func(**context):
return False
@dag(
start_date=datetime(2022, 12, 1),
schedule="@once",
catchup=False,
tags=["sensor"],
)
def pythonsensor_example():
# turn any Python function into a sensor
check_dog_availability = PythonSensor(
task_id="check_dog_availability",
poke_interval=10,
timeout=3600,
mode="reschedule",
python_callable=check_dog_availability_func,
)
# click the link in the logs for a cute picture :)
@task
def print_dog_picture_url(url):
print(url)
print_dog_picture_url(check_dog_availability.output)
pythonsensor_example()
Operating System
Debian Slim-Buster
Versions of Apache Airflow Providers
apache-airflow = {version = "2.10.2"}
apache-airflow-providers-cncf-kubernetes = "8.4.1"
apache-airflow-providers-celery = "3.8.1"
apache-airflow-providers-redis = "3.8.0"
apache-airflow-providers-mysql = "5.7.0"
apache-airflow-providers-amazon = "8.28.0"
apache-airflow-providers-apache-pinot = "4.5.0"
apache-airflow-providers-apache-hive = "8.1.1" # conflicts with sqlpen pandas <2.0 requirement
apache-airflow-providers-arangodb = "2.6.0"
apache-airflow-providers-google = "10.19.0"
apache-airflow-providers-imap = "3.7.0"
apache-airflow-providers-snowflake = "5.5.1"
apache-airflow-providers-presto = "5.5.1"
apache-airflow-providers-tableau = "4.6.0"
mysqlclient = "2.2.4"
Deployment
Other Docker-based deployment
Deployment details
No response
Anything else?
My best guess at this time is that the following sqlalchemy logic for checking if the state of the task is UP_FOR_RESCHEDULE is not working as expected for mysql databases. We are hoping to find time to verify this is where the bug is occurring in the coming week.
https://github.com/apache/airflow/blob/2.10.2/airflow/models/dagrun.py#L1612-L1623
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct