From d30d22b74cda8ab5ed3e02fe94bbef509b48d669 Mon Sep 17 00:00:00 2001 From: Kacper Muda Date: Fri, 21 Mar 2025 11:48:11 +0100 Subject: [PATCH] fix: Re-add configuring orm for OpenLineage's listener on scheduler --- .../src/airflow/providers/openlineage/plugins/listener.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py index b46c65de64af2..1860da211b264 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py @@ -69,15 +69,16 @@ def _get_try_number_success(val): def _executor_initializer(): """ - Initialize worker processes for the executor used for DagRun listener. + Initialize processes for the executor used with DAGRun listener's methods (on scheduler). This function must be picklable, so it cannot be defined as an inner method or local function. Reconfigures the ORM engine to prevent issues that arise when multiple processes interact with the Airflow database. """ - if not AIRFLOW_V_3_0_PLUS: - settings.configure_orm() + # This initializer is used only on the scheduler + # We can configure_orm regardless of the Airflow version, as DB access is always allowed from scheduler. + settings.configure_orm() class OpenLineageListener: