Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/run-unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ on: # yamllint disable-line rule:truthy
required: false
default: "true"
type: string
database-isolation:
description: "Whether to enable database isolattion or not (true/false)"
required: false
default: "false"
type: string
force-lowest-dependencies:
description: "Whether to force lowest dependencies for the tests or not (true/false)"
required: false
Expand Down Expand Up @@ -152,6 +157,7 @@ jobs:
PYTHON_MAJOR_MINOR_VERSION: "${{ matrix.python-version }}"
UPGRADE_BOTO: "${{ inputs.upgrade-boto }}"
AIRFLOW_MONITOR_DELAY_TIME_IN_SECONDS: "${{inputs.monitor-delay-time-in-seconds}}"
DATABASE_ISOLATION: "${{ inputs.database-isolation }}"
VERBOSE: "true"
steps:
- name: "Cleanup repo"
Expand Down
23 changes: 23 additions & 0 deletions .github/workflows/special-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,29 @@ jobs:
run-coverage: ${{ inputs.run-coverage }}
debug-resources: ${{ inputs.debug-resources }}

tests-database-isolation:
name: "Database isolation test"
uses: ./.github/workflows/run-unit-tests.yml
permissions:
contents: read
packages: read
secrets: inherit
with:
runs-on-as-json-default: ${{ inputs.runs-on-as-json-default }}
enable-aip-44: "true"
database-isolation: "true"
test-name: "DatabaseIsolation-Postgres"
test-scope: "DB"
backend: "postgres"
image-tag: ${{ inputs.image-tag }}
python-versions: "['${{ inputs.default-python-version }}']"
backend-versions: "['${{ inputs.default-postgres-version }}']"
excludes: "[]"
parallel-test-types-list-as-string: ${{ inputs.parallel-test-types-list-as-string }}
include-success-outputs: ${{ needs.build-info.outputs.include-success-outputs }}
run-coverage: ${{ inputs.run-coverage }}
debug-resources: ${{ inputs.debug-resources }}

tests-quarantined:
name: "Quarantined test"
uses: ./.github/workflows/run-unit-tests.yml
Expand Down
6 changes: 3 additions & 3 deletions airflow/api_internal/endpoints/rpc_api_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,19 +228,19 @@ def internal_airflow_api(body: dict[str, Any]) -> APIResponse:
except Exception:
return log_and_build_error_response(message="Error deserializing parameters.", status=400)

log.info("Calling method %s\nparams: %s", method_name, params)
log.debug("Calling method %s\nparams: %s", method_name, params)
try:
# Session must be created there as it may be needed by serializer for lazy-loaded fields.
with create_session() as session:
output = handler(**params, session=session)
output_json = BaseSerialization.serialize(output, use_pydantic_models=True)
response = json.dumps(output_json) if output_json is not None else None
log.info("Sending response: %s", response)
log.debug("Sending response: %s", response)
return Response(response=response, headers={"Content-Type": "application/json"})
except AirflowException as e: # In case of AirflowException transport the exception class back to caller
exception_json = BaseSerialization.serialize(e, use_pydantic_models=True)
response = json.dumps(exception_json)
log.info("Sending exception response: %s", response)
log.debug("Sending exception response: %s", response)
return Response(response=response, headers={"Content-Type": "application/json"})
except Exception:
return log_and_build_error_response(message=f"Error executing method '{method_name}'.", status=500)
3 changes: 2 additions & 1 deletion airflow/models/baseoperator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1516,10 +1516,11 @@ def run(
data_interval=info.data_interval,
)
ti = TaskInstance(self, run_id=dr.run_id)
session.add(ti)
ti.dag_run = dr
session.add(dr)
session.flush()

session.commit()
ti.run(
mark_success=mark_success,
ignore_depends_on_past=ignore_depends_on_past,
Expand Down
51 changes: 36 additions & 15 deletions airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,8 @@ def remove(*args, **kwargs):
AIRFLOW_SETTINGS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "settings.py")
AIRFLOW_UTILS_SESSION_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "session.py")
AIRFLOW_MODELS_BASEOPERATOR_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", "baseoperator.py")
AIRFLOW_MODELS_DAG_PATH = os.path.join(AIRFLOW_PATH, "airflow", "models", "dag.py")
AIRFLOW_DB_UTILS_PATH = os.path.join(AIRFLOW_PATH, "airflow", "utils", "db.py")


class TracebackSessionForTests:
Expand Down Expand Up @@ -370,6 +372,9 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None]
:return: True if the object was created from test code, False otherwise.
"""
self.traceback = traceback.extract_stack()
if any(filename.endswith("_pytest/fixtures.py") for filename, _, _, _ in self.traceback):
# This is a fixture call
return True, None
airflow_frames = [
tb
for tb in self.traceback
Expand All @@ -378,24 +383,30 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None]
and not tb.filename == AIRFLOW_UTILS_SESSION_PATH
]
if any(
filename.endswith("conftest.py") or filename.endswith("tests/test_utils/db.py")
for filename, _, _, _ in airflow_frames
filename.endswith("conftest.py")
or filename.endswith("tests/test_utils/db.py")
or (filename.startswith(AIRFLOW_TESTS_PATH) and name in ("setup_method", "teardown_method"))
for filename, _, name, _ in airflow_frames
):
# This is a fixture call or testing utilities
return True, None
if (
len(airflow_frames) >= 2
and airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH)
and airflow_frames[-1].filename == AIRFLOW_MODELS_BASEOPERATOR_PATH
and airflow_frames[-1].name == "run"
):
# This is baseoperator run method that is called directly from the test code and this is
# usual pattern where we create a session in the test code to create dag_runs for tests.
# If `run` code will be run inside a real "airflow" code the stack trace would be longer
# and it would not be directly called from the test code. Also if subsequently any of the
# run_task() method called later from the task code will attempt to execute any DB
# method, the stack trace will be longer and we will catch it as "illegal" call.
return True, None
if len(airflow_frames) >= 2 and airflow_frames[-2].filename.startswith(AIRFLOW_TESTS_PATH):
# Let's look at what we are calling directly from the test code
current_filename, current_method_name = airflow_frames[-1].filename, airflow_frames[-1].name
if (current_filename, current_method_name) in (
(AIRFLOW_MODELS_BASEOPERATOR_PATH, "run"),
(AIRFLOW_MODELS_DAG_PATH, "create_dagrun"),
):
# This is baseoperator run method that is called directly from the test code and this is
# usual pattern where we create a session in the test code to create dag_runs for tests.
# If `run` code will be run inside a real "airflow" code the stack trace would be longer
# and it would not be directly called from the test code. Also if subsequently any of the
# run_task() method called later from the task code will attempt to execute any DB
# method, the stack trace will be longer and we will catch it as "illegal" call.
return True, None
if current_filename == AIRFLOW_DB_UTILS_PATH:
# This is a util method called directly from the test code
return True, None
for tb in airflow_frames[::-1]:
if tb.filename.startswith(AIRFLOW_PATH):
if tb.filename.startswith(AIRFLOW_TESTS_PATH):
Expand All @@ -407,6 +418,16 @@ def is_called_from_test_code(self) -> tuple[bool, traceback.FrameSummary | None]
# The traceback line will be always 3rd (two bottom ones are Airflow)
return False, self.traceback[-2]

def get_bind(
self,
mapper=None,
clause=None,
bind=None,
_sa_skip_events=None,
_sa_skip_for_implicit_returning=False,
):
pass


def _is_sqlite_db_path_relative(sqla_conn_str: str) -> bool:
"""Determine whether the database connection URI specifies a relative path."""
Expand Down
2 changes: 2 additions & 0 deletions dev/breeze/src/airflow_breeze/commands/testing_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def _run_test(
helm_test_package=None,
keep_env_variables=shell_params.keep_env_variables,
no_db_cleanup=shell_params.no_db_cleanup,
database_isolation=shell_params.database_isolation,
)
)
run_cmd.extend(list(extra_pytest_args))
Expand Down Expand Up @@ -968,6 +969,7 @@ def helm_tests(
helm_test_package=helm_test_package,
keep_env_variables=False,
no_db_cleanup=False,
database_isolation=False,
)
cmd = ["docker", "compose", "run", "--service-ports", "--rm", "airflow", *pytest_args, *extra_pytest_args]
result = run_command(cmd, check=False, env=env, output_outside_the_group=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Features
{%- endif %}


{%- if classified_changes.fixes %}
{%- if classified_changes and classified_changes.fixes %}

Bug Fixes
~~~~~~~~~
Expand All @@ -50,7 +50,7 @@ Bug Fixes
{%- endif %}


{%- if classified_changes.misc %}
{%- if classified_changes and classified_changes.misc %}

Misc
~~~~
Expand All @@ -62,7 +62,7 @@ Misc

.. Below changes are excluded from the changelog. Move them to
appropriate section above if needed. Do not delete the lines(!):
{%- if classified_changes.other %}
{%- if classified_changes and classified_changes.other %}
{%- for other in classified_changes.other %}
* ``{{ other.message_without_backticks | safe }}``
{%- endfor %}
Expand Down
4 changes: 3 additions & 1 deletion dev/breeze/src/airflow_breeze/utils/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ def generate_args_for_pytest(
helm_test_package: str | None,
keep_env_variables: bool,
no_db_cleanup: bool,
database_isolation: bool,
):
result_log_file, warnings_file, coverage_file = test_paths(test_type, backend, helm_test_package)
if skip_db_tests:
Expand All @@ -327,12 +328,13 @@ def generate_args_for_pytest(
helm_test_package=helm_test_package,
python_version=python_version,
)
max_fail = 50
args.extend(
[
"--verbosity=0",
"--strict-markers",
"--durations=100",
"--maxfail=50",
f"--maxfail={max_fail}",
"--color=yes",
f"--junitxml={result_log_file}",
# timeouts in seconds for individual tests
Expand Down
3 changes: 3 additions & 0 deletions tests/cli/commands/test_internal_api_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ def test_ready_prefix_on_cmdline_dead_process(self):
assert self.monitor._get_num_ready_workers_running() == 0


# Those tests are skipped in isolation mode because they interfere with the internal API
# server already running in the background in the isolation mode.
@pytest.mark.skip_if_database_isolation_mode
@pytest.mark.db_test
@pytest.mark.skipif(not _ENABLE_AIP_44, reason="AIP-44 is disabled")
class TestCliInternalAPI(_ComonCLIGunicornTestClass):
Expand Down
3 changes: 3 additions & 0 deletions tests/cli/commands/test_webserver_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ def test_ready_prefix_on_cmdline_dead_process(self):
assert self.monitor._get_num_ready_workers_running() == 0


# Those tests are skipped in isolation mode because they interfere with the internal API
# server already running in the background in the isolation mode.
@pytest.mark.skip_if_database_isolation_mode
@pytest.mark.db_test
class TestCliWebServer(_ComonCLIGunicornTestClass):
main_process_regexp = r"airflow webserver"
Expand Down
3 changes: 3 additions & 0 deletions tests/decorators/test_bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@

DEFAULT_DATE = timezone.datetime(2023, 1, 1)

# TODO(potiuk) see why this test hangs in DB isolation mode
pytestmark = pytest.mark.skip_if_database_isolation_mode


@pytest.mark.db_test
class TestBashDecorator:
Expand Down
3 changes: 2 additions & 1 deletion tests/decorators/test_branch_external_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
from airflow.decorators import task
from airflow.utils.state import State

pytestmark = pytest.mark.db_test
# TODO: (potiuk) - AIP-44 - check why this test hangs
pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]


class Test_BranchExternalPythonDecoratedOperator:
Expand Down
3 changes: 2 additions & 1 deletion tests/decorators/test_branch_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from airflow.decorators import task
from airflow.utils.state import State

pytestmark = pytest.mark.db_test
# TODO: (potiuk) - AIP-44 - check why this test hangs
pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]


class Test_BranchPythonDecoratedOperator:
Expand Down
3 changes: 2 additions & 1 deletion tests/decorators/test_branch_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
from airflow.decorators import task
from airflow.utils.state import State

pytestmark = pytest.mark.db_test
# TODO: (potiuk) - AIP-44 - check why this test hangs
pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]


class TestBranchPythonVirtualenvDecoratedOperator:
Expand Down
3 changes: 2 additions & 1 deletion tests/decorators/test_condition.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
from airflow.models.taskinstance import TaskInstance
from airflow.utils.context import Context

pytestmark = pytest.mark.db_test
# TODO(potiuk) see why this test hangs in DB isolation mode
pytestmark = [pytest.mark.db_test, pytest.mark.skip_if_database_isolation_mode]


@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
Expand Down
2 changes: 1 addition & 1 deletion tests/decorators/test_external_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
from airflow.decorators import setup, task, teardown
from airflow.utils import timezone

pytestmark = pytest.mark.db_test
pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]


DEFAULT_DATE = timezone.datetime(2016, 1, 1)
Expand Down
11 changes: 10 additions & 1 deletion tests/decorators/test_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
from airflow.utils.xcom import XCOM_RETURN_KEY
from tests.operators.test_python import BasePythonTest

pytestmark = pytest.mark.db_test
pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]


if TYPE_CHECKING:
Expand Down Expand Up @@ -281,6 +281,8 @@ class Test:
def add_number(self, num: int) -> int:
return self.num + num

# TODO(potiuk) see why this test hangs in DB isolation mode
@pytest.mark.skip_if_database_isolation_mode
def test_fail_multiple_outputs_key_type(self):
@task_decorator(multiple_outputs=True)
def add_number(num: int):
Expand All @@ -293,6 +295,8 @@ def add_number(num: int):
with pytest.raises(AirflowException):
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

# TODO(potiuk) see why this test hangs in DB isolation mode
@pytest.mark.skip_if_database_isolation_mode
def test_fail_multiple_outputs_no_dict(self):
@task_decorator(multiple_outputs=True)
def add_number(num: int):
Expand Down Expand Up @@ -541,6 +545,8 @@ def add_2(number: int):

assert "add_2" in self.dag_non_serialized.task_ids

# TODO(potiuk) see why this test hangs in DB isolation mode
@pytest.mark.skip_if_database_isolation_mode
def test_dag_task_multiple_outputs(self):
"""Tests dag.task property to generate task with multiple outputs"""

Expand Down Expand Up @@ -863,6 +869,7 @@ def org_test_func():
assert decorated_test_func.__wrapped__ is org_test_func, "__wrapped__ attr is not the original function"


@pytest.mark.need_serialized_dag(False)
@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_upstream_exception_produces_none_xcom(dag_maker, session):
from airflow.exceptions import AirflowSkipException
Expand Down Expand Up @@ -900,6 +907,7 @@ def down(a, b):
assert result == "'example' None"


@pytest.mark.need_serialized_dag(False)
@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.parametrize("multiple_outputs", [True, False])
def test_multiple_outputs_produces_none_xcom_when_task_is_skipped(dag_maker, session, multiple_outputs):
Expand Down Expand Up @@ -958,6 +966,7 @@ def other(x): ...
assert caplog.messages == []


@pytest.mark.need_serialized_dag(False)
@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_task_decorator_dataset(dag_maker, session):
from airflow.datasets import Dataset
Expand Down
4 changes: 3 additions & 1 deletion tests/decorators/test_python_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from airflow.utils import timezone
from airflow.utils.state import TaskInstanceState

pytestmark = pytest.mark.db_test
pytestmark = [pytest.mark.db_test, pytest.mark.need_serialized_dag]

DEFAULT_DATE = timezone.datetime(2016, 1, 1)
PYTHON_VERSION = f"{sys.version_info.major}{sys.version_info.minor}"
Expand Down Expand Up @@ -373,6 +373,8 @@ def f():
assert teardown_task.on_failure_fail_dagrun is on_failure_fail_dagrun
ret.operator.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)

# TODO(potiuk) see why this test hangs in DB isolation mode
@pytest.mark.skip_if_database_isolation_mode
def test_invalid_annotation(self, dag_maker):
import uuid

Expand Down
Loading