Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions airflow/utils/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,26 @@
from __future__ import annotations

import contextlib
import logging
import sys
from functools import wraps
from inspect import signature
from typing import Callable, Generator, TypeVar, cast

from airflow import settings
from airflow.typing_compat import ParamSpec

logger = logging.getLogger("airflow.task")


@contextlib.contextmanager
def create_session() -> Generator[settings.SASession, None, None]:
"""Contextmanager that will create and teardown a session."""
method = ""
for i in range(1, 4):
method += sys._getframe(i).f_back.f_code.co_name + ":" # type: ignore

logger.info("CREATE SESSION %s", method)
Session = getattr(settings, "Session", None)
if Session is None:
raise RuntimeError("Session must be set before!")
Expand All @@ -39,6 +48,7 @@ def create_session() -> Generator[settings.SASession, None, None]:
session.rollback()
raise
finally:
logger.info("DESTROY_SESSION %s", method)
session.close()


Expand Down
1 change: 1 addition & 0 deletions tests/core/test_sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def test_add_tagging(self, sentry, task_instance):
for key, value in scope._tags.items():
assert TEST_SCOPE[key] == value

@pytest.mark.skip("Chase leaking sessions")
@time_machine.travel(CRUMB_DATE)
def test_add_breadcrumbs(self, sentry, task_instance):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1099,6 +1099,7 @@ def test_driver_logging_failure(
name="flink-stream-example",
)

@pytest.mark.skip("Chase leaking sessions")
@patch(
"kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object",
return_value=TEST_READY_CLUSTER,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,6 +792,7 @@ def test_driver_logging_failure(
)
error_log_call.assert_called_once_with(TEST_POD_LOG_RESULT)

@pytest.mark.skip("Chase leaking sessions")
@patch(
"kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object",
return_value=TEST_COMPLETED_APPLICATION,
Expand Down Expand Up @@ -839,6 +840,7 @@ def test_driver_logging_error(
sensor.poke(None)
warn_log_call.assert_called_once()

@pytest.mark.skip("Chase leaking sessions")
@patch(
"kubernetes.client.api.custom_objects_api.CustomObjectsApi.get_namespaced_custom_object",
return_value=TEST_DRIVER_WITH_SIDECAR_APPLICATION,
Expand Down
1 change: 1 addition & 0 deletions tests/providers/google/cloud/log/test_gcs_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def test_write_to_remote_on_close(self, mock_blob, mock_client, mock_creds):
mock_blob.from_string.return_value.upload_from_string(data="CONTENT\nMESSAGE\n")
assert self.gcs_task_handler.closed is True

@pytest.mark.skip("Chase leaking sessions")
@mock.patch(
"airflow.providers.google.cloud.log.gcs_task_handler.get_credentials_and_project_id",
return_value=("TEST_CREDENTIALS", "TEST_PROJECT_ID"),
Expand Down