diff --git a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py index a44b23d06dc6d..6bda6440e077c 100644 --- a/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py +++ b/task-sdk/src/airflow/sdk/execution_time/secrets/execution_api.py @@ -113,6 +113,26 @@ def get_variable(self, key: str, team_name: str | None = None) -> str | None: if isinstance(msg, VariableResult): return msg.value # Already a string | None return None + except RuntimeError as e: + # TriggerCommsDecoder.send() uses async_to_sync internally, which raises RuntimeError + # when called within an async event loop. In greenback portal contexts (triggerer), + # we catch this and use greenback to call the async version instead. + if str(e).startswith("You cannot use AsyncToSync in the same thread as an async event loop"): + import asyncio + + import greenback + + task = asyncio.current_task() + if greenback.has_portal(task): + import warnings + + warnings.warn( + "You should not use sync calls here -- use `await aget_variable` instead", + stacklevel=2, + ) + return greenback.await_(self.aget_variable(key)) + # Fall through to the general exception handler for other RuntimeErrors + return None except Exception: # If SUPERVISOR_COMMS fails for any reason, return None # to allow fallback to other backends diff --git a/task-sdk/tests/task_sdk/execution_time/test_secrets.py b/task-sdk/tests/task_sdk/execution_time/test_secrets.py index 8f9745b0ffeca..ccdabb3dd5807 100644 --- a/task-sdk/tests/task_sdk/execution_time/test_secrets.py +++ b/task-sdk/tests/task_sdk/execution_time/test_secrets.py @@ -178,6 +178,49 @@ async def mock_aget_connection(self, conn_id): # Verify send was attempted first (and raised RuntimeError) mock_supervisor_comms.send.assert_called_once() + def test_get_variable_runtime_error_triggers_greenback_fallback(self, mocker, mock_supervisor_comms): + """ + Test that RuntimeError from async_to_sync triggers greenback fallback for variables. + + Same as the connection test but for get_variable — verifies the fix for #61676: + triggers calling Variable.get() fail because SUPERVISOR_COMMS.send() raises + RuntimeError in the async event loop, but the greenback fallback was missing. + """ + expected_value = "10" + + # Simulate the RuntimeError that triggers greenback fallback + mock_supervisor_comms.send.side_effect = RuntimeError( + "You cannot use AsyncToSync in the same thread as an async event loop" + ) + + # Mock the greenback and asyncio modules + mocker.patch("greenback.has_portal", return_value=True) + mocker.patch("asyncio.current_task") + + import asyncio + + def greenback_await_side_effect(coro): + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(coro) + finally: + loop.close() + + mock_greenback_await = mocker.patch("greenback.await_", side_effect=greenback_await_side_effect) + + # Mock aget_variable to return the expected value + async def mock_aget_variable(self, key): + return expected_value + + mocker.patch.object(ExecutionAPISecretsBackend, "aget_variable", mock_aget_variable) + + backend = ExecutionAPISecretsBackend() + result = backend.get_variable("retries") + + assert result == expected_value + mock_greenback_await.assert_called_once() + mock_supervisor_comms.send.assert_called_once() + class TestContextDetection: """Test context detection in ensure_secrets_backend_loaded."""