-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Description
Apache Airflow version
2.9.2
If "Other Airflow 2 version" selected, which one?
No response
What happened?
When an invalid value is passed to any argument in default_args, such as passing a string value "2" to max_active_tis_per_dag which expects an integer, the scheduler crashes. The error message received is:
[2024-06-12T05:04:39.041+0000] {scheduler_command.py:54} ERROR - Exception when running scheduler job
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/airflow/cli/commands/scheduler_command.py", line 52, in _run_scheduler_job
run_job(job=job_runner.job, execute_callable=job_runner._execute)
File "/usr/local/lib/python3.11/site-packages/airflow/utils/session.py", line 79, in wrapper
return func(*args, session=session, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/jobs/job.py", line 395, in run_job
return execute_job(job, execute_callable=execute_callable)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/jobs/job.py", line 424, in execute_job
ret = execute_callable()
^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/astronomer/airflow/version_check/plugin.py", line 30, in run_before
fn(*args, **kwargs)
File "/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 854, in _execute
self._run_scheduler_loop()
File "/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 986, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1099, in _do_scheduling
num_queued_tis = self._critical_section_enqueue_task_instances(session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 677, in _critical_section_enqueue_task_instances
queued_tis = self._executable_task_instances_to_queued(max_tis, session=session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 533, in _executable_task_instances_to_queued
if current_task_concurrency >= task_concurrency_limit:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: '>=' not supported between instances of 'int' and 'str'
What you think should happen instead?
Instead of crashing the scheduler, Airflow should throw an import error on the Airflow UI for the problematic DAG. This would help in identifying the problematic DAG easily. The current behaviour makes it difficult to identify which DAG caused the issue, especially when managing hundreds of DAGs. This small mistake in DAG can also cause an outage in the production environment.
How to reproduce
- Define a DAG with default_args that includes an invalid value for max_active_tis_per_dag, such as:
default_args = {
'max_active_tis_per_dag': "2", # invalid value, should be an integer
}
- Deploy and start the DAG.
- Observe the scheduler crash with the TypeError.
Operating System
Linux ad25902d8cef 6.7.12-orbstack-00201-g2ddb8f197a46 #1 SMP Tue May 21 04:38:26 UTC 2024 aarch64 GNU/Linux
Versions of Apache Airflow Providers
No response
Deployment
Astronomer
Deployment details
No response
Anything else?
This problem occurs every time an invalid value is passed to an argument in default_args.
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct