Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,16 @@ def _get_try_number_success(val):

def _executor_initializer():
"""
Initialize worker processes for the executor used for DagRun listener.
Initialize processes for the executor used with DAGRun listener's methods (on scheduler).

This function must be picklable, so it cannot be defined as an inner method or local function.

Reconfigures the ORM engine to prevent issues that arise when multiple processes interact with
the Airflow database.
"""
if not AIRFLOW_V_3_0_PLUS:
settings.configure_orm()
# This initializer is used only on the scheduler
# We can configure_orm regardless of the Airflow version, as DB access is always allowed from scheduler.
settings.configure_orm()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb Wasn't this causing workers to fail too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the following error in @ashb 's PR:

/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py:244 in on_task_instance_success
/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py:326 in _on_task_instance_success
/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py:452 in _execute
/usr/local/lib/python3.12/site-packages/airflow/providers/openlineage/plugins/listener.py:484 in _fork_execute
/usr/local/lib/python3.12/site-packages/airflow/settings.py:363 in configure_orm
<string>:2 in create_engine
/usr/local/lib/python3.12/site-packages/sqlalchemy/util/deprecations.py:375 in warned
/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/create.py:514 in create_engine
/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/url.py:738 in make_url
/usr/local/lib/python3.12/site-packages/sqlalchemy/engine/url.py:799 in _parse_url

Copy link
Member

@kaxil kaxil Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aah there were two instances of settings.configure_orm(), ok 👍

in _fork_execute & _executor_initializer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly, the DAG level events follow different path - now even more with task sdk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kacpermuda maybe splitting those logically into two listeners now would be a good idea? Those won't share as much logic now. Maybe after Airflow 2 goes out of provider scope?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kacpermuda maybe splitting those logically into two listeners now would be a good idea? Those won't share as much logic now. Maybe after Airflow 2 goes out of provider scope?

Yeah, I like that idea



class OpenLineageListener:
Expand Down