Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions BREEZE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,15 @@ For example this will only run provider tests for airbyte and http providers:

breeze testing tests --test-type "Providers[airbyte,http]"

You can also exclude tests for some providers from being run when whole "Providers" test type is run.

For example this will run tests for all providers except amazon and google provider tests:

.. code-block:: bash

breeze testing tests --test-type "Providers[-amazon,google]"


You can also run parallel tests with ``--run-in-parallel`` flag - by default it will run all tests types
in parallel, but you can specify the test type that you want to run with space separated list of test
types passed to ``--parallel-test-types`` flag.
Expand Down
13 changes: 13 additions & 0 deletions Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,19 @@ else
${TEST_TYPE} == "Postgres" || ${TEST_TYPE} == "MySQL" || \
${TEST_TYPE} == "Long" ]]; then
SELECTED_TESTS=("${ALL_TESTS[@]}")
elif [[ ${TEST_TYPE} =~ Providers\[\-(.*)\] ]]; then
# When providers start with `-` it means that we should run all provider tests except those
SELECTED_TESTS=("${PROVIDERS_TESTS[@]}")
for provider in ${BASH_REMATCH[1]//,/ }
do
providers_dir="tests/providers/${provider//./\/}"
if [[ -d ${providers_dir} ]]; then
echo "${COLOR_BLUE}Ignoring ${providers_dir} as it has been deselected.${COLOR_RESET}"
EXTRA_PYTEST_ARGS+=("--ignore=tests/providers/${provider//./\/}")
else
echo "${COLOR_YELLOW}Skipping ${providers_dir} as the directory does not exist.${COLOR_RESET}"
fi
done
elif [[ ${TEST_TYPE} =~ Providers\[(.*)\] ]]; then
SELECTED_TESTS=()
for provider in ${BASH_REMATCH[1]//,/ }
Expand Down
8 changes: 7 additions & 1 deletion TESTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -297,12 +297,18 @@ In case of Providers tests, you can run tests for all providers

breeze testing tests --test-type Providers

You can also limit the set of providers you would like to run tests of
You can limit the set of providers you would like to run tests of

.. code-block:: bash

breeze testing tests --test-type "Providers[airbyte,http]"

You can also run all providers but exclude the providers you would like to skip

.. code-block:: bash

breeze testing tests --test-type "Providers[-amazon,google]"


Running full Airflow unit test suite in parallel
------------------------------------------------
Expand Down
59 changes: 5 additions & 54 deletions dev/breeze/src/airflow_breeze/commands/testing_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
from __future__ import annotations

import os
import re
import sys
from datetime import datetime

Expand Down Expand Up @@ -65,12 +64,11 @@
from airflow_breeze.utils.parallel import (
GenericRegexpProgressMatcher,
SummarizeAfter,
bytes2human,
check_async_run_results,
run_with_pool,
)
from airflow_breeze.utils.path_utils import FILES_DIR, cleanup_python_generated_files
from airflow_breeze.utils.run_tests import run_docker_compose_tests
from airflow_breeze.utils.run_tests import file_name_from_test_type, run_docker_compose_tests
from airflow_breeze.utils.run_utils import get_filesystem_type, run_command

LOW_MEMORY_CONDITION = 8 * 1024 * 1024 * 1024
Expand Down Expand Up @@ -143,7 +141,7 @@ def _run_test(
"[error]Only 'Providers' test type can specify actual tests with \\[\\][/]"
)
sys.exit(1)
project_name = _file_name_from_test_type(exec_shell_params.test_type)
project_name = file_name_from_test_type(exec_shell_params.test_type)
down_cmd = [
*DOCKER_COMPOSE_COMMAND,
"--project-name",
Expand Down Expand Up @@ -209,11 +207,6 @@ def _run_test(
return result.returncode, f"Test: {exec_shell_params.test_type}"


def _file_name_from_test_type(test_type: str):
test_type_no_brackets = test_type.lower().replace("[", "_").replace("]", "")
return re.sub("[,\.]", "_", test_type_no_brackets)[:30]


def _run_tests_in_pool(
tests_to_run: list[str],
parallelism: int,
Expand Down Expand Up @@ -268,43 +261,12 @@ def run_tests_in_parallel(
parallel_test_types_list: list[str],
extra_pytest_args: tuple,
db_reset: bool,
full_tests_needed: bool,
test_timeout: int,
include_success_outputs: bool,
debug_resources: bool,
parallelism: int,
skip_cleanup: bool,
) -> None:
import psutil
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can now remote this whole part - each test is "smaller" so the memory issues we experienced in the past should be far less frequent.


memory_available = psutil.virtual_memory()
if memory_available.available < LOW_MEMORY_CONDITION and exec_shell_params.backend in ["mssql", "mysql"]:
# Run heavy tests sequentially
heavy_test_types_to_run = {"Core", "Providers"} & set(parallel_test_types_list)
if heavy_test_types_to_run:
# some of those are requested
get_console().print(
f"[warning]Running {heavy_test_types_to_run} tests sequentially"
f"for {exec_shell_params.backend}"
f" backend due to low memory available: {bytes2human(memory_available.available)}"
)
tests_to_run_sequentially = []
for heavy_test_type in heavy_test_types_to_run:
for test_type in parallel_test_types_list:
if test_type.startswith(heavy_test_type):
parallel_test_types_list.remove(test_type)
tests_to_run_sequentially.append(test_type)
_run_tests_in_pool(
tests_to_run=tests_to_run_sequentially,
parallelism=1,
exec_shell_params=exec_shell_params,
extra_pytest_args=extra_pytest_args,
test_timeout=test_timeout,
db_reset=db_reset,
include_success_outputs=include_success_outputs,
debug_resources=debug_resources,
skip_cleanup=skip_cleanup,
)
_run_tests_in_pool(
tests_to_run=parallel_test_types_list,
parallelism=parallelism,
Expand Down Expand Up @@ -336,8 +298,9 @@ def run_tests_in_parallel(
@option_mount_sources
@click.option(
"--test-type",
help="Type of test to run. Note that with Providers, you can also specify which provider "
'tests should be run - for example --test-type "Providers[airbyte,http]"',
help="Type of test to run. With Providers, you can specify tests of which providers "
"should be run: `Providers[airbyte,http]` or "
"excluded from the full test suite: `Providers[-amazon,google]`",
default="All",
type=NotVerifiedBetterChoice(ALLOWED_TEST_TYPE_CHOICES),
)
Expand All @@ -361,12 +324,6 @@ def run_tests_in_parallel(
show_default=True,
envvar="PARALLEL_TEST_TYPES",
)
@click.option(
"--full-tests-needed",
help="Whether full set of tests is run.",
is_flag=True,
envvar="FULL_TESTS_NEEDED",
)
@click.option(
"--upgrade-boto",
help="Remove aiobotocore and upgrade botocore and boto to the latest version.",
Expand Down Expand Up @@ -405,7 +362,6 @@ def command_for_tests(
debug_resources: bool,
include_success_outputs: bool,
parallel_test_types: str,
full_tests_needed: bool,
mount_sources: str,
extra_pytest_args: tuple,
upgrade_boto: bool,
Expand Down Expand Up @@ -434,16 +390,11 @@ def command_for_tests(
perform_environment_checks()
if run_in_parallel:
test_list = parallel_test_types.split(" ")
test_list.sort(key=lambda x: x in ["Providers", "WWW"], reverse=True)
run_tests_in_parallel(
exec_shell_params=exec_shell_params,
parallel_test_types_list=test_list,
extra_pytest_args=extra_pytest_args,
db_reset=db_reset,
# Allow to pass information on whether to use full tests in the parallel execution mode
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This parameter was unused.

# or not - this will allow to skip some heavy tests on more resource-heavy configurations
# in case full tests are not required, some of those will be skipped
full_tests_needed=full_tests_needed,
test_timeout=test_timeout,
include_success_outputs=include_success_outputs,
parallelism=parallelism,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
"--skip-cleanup",
"--debug-resources",
"--include-success-outputs",
"--full-tests-needed",
],
},
{
Expand Down
11 changes: 8 additions & 3 deletions dev/breeze/src/airflow_breeze/params/shell_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
MSSQL_TMP_DIR_NAME,
SCRIPTS_CI_DIR,
)
from airflow_breeze.utils.run_tests import file_name_from_test_type
from airflow_breeze.utils.run_utils import get_filesystem_type, run_command
from airflow_breeze.utils.shared_options import get_verbose

Expand Down Expand Up @@ -267,9 +268,13 @@ def command_passed(self):
@property
def mssql_data_volume(self) -> str:
docker_filesystem = get_filesystem_type("/var/lib/docker")
# in case of Providers[....], only leave Providers
base_test_type = self.test_type.split("[")[0] if self.test_type else None
volume_name = f"tmp-mssql-volume-{base_test_type}" if base_test_type else "tmp-mssql-volume"
# Make sure the test type is not too long to be used as a volume name in docker-compose
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We needed that to handle several Providers* tests run in parallel - each parallel docker-compose needs unique volume name

# The tmp directory in our self-hosted runners can be quite long, so we should limit the volume name
volume_name = (
"tmp-mssql-volume-" + file_name_from_test_type(self.test_type)[:20]
if self.test_type
else "tmp-mssql-volume"
)
if docker_filesystem == "tmpfs":
return os.fspath(Path.home() / MSSQL_TMP_DIR_NAME / f"{volume_name}-{self.mssql_version}")
else:
Expand Down
6 changes: 6 additions & 0 deletions dev/breeze/src/airflow_breeze/utils/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import os
import re
import sys
from subprocess import DEVNULL

Expand Down Expand Up @@ -75,3 +76,8 @@ def run_docker_compose_tests(image_name: str, extra_pytest_args: tuple) -> tuple
check=False,
)
return command_result.returncode, f"Testing docker-compose python with {image_name}"


def file_name_from_test_type(test_type: str):
test_type_no_brackets = test_type.lower().replace("[", "_").replace("]", "")
return re.sub("[,.]", "_", test_type_no_brackets)[:30]
50 changes: 49 additions & 1 deletion dev/breeze/src/airflow_breeze/utils/selective_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -588,6 +588,36 @@ def _get_test_types_to_run(self) -> list[str]:
get_console().print(sorted_candidate_test_types)
return sorted_candidate_test_types

@staticmethod
def _extract_long_provider_tests(current_test_types: set[str]):
"""
In case there are Provider tests in the list of test to run (either in the form of
Providers or Providers[...] we subtract them from the test type,
and add them to the list of tests to run individually.

In case of Providers, we need to replace it with Providers[-<list_of_long_tests>], but
in case of Providers[list_of_tests] we need to remove the long tests from the list.

"""
long_tests = ["amazon", "google"]
for original_test_type in tuple(current_test_types):
if original_test_type == "Providers":
current_test_types.remove(original_test_type)
for long_test in long_tests:
current_test_types.add(f"Providers[{long_test}]")
current_test_types.add(f"Providers[-{','.join(long_tests)}]")
elif original_test_type.startswith("Providers["):
provider_tests_to_run = (
original_test_type.replace("Providers[", "").replace("]", "").split(",")
)
if any(long_test in provider_tests_to_run for long_test in long_tests):
current_test_types.remove(original_test_type)
for long_test in long_tests:
if long_test in provider_tests_to_run:
current_test_types.add(f"Providers[{long_test}]")
provider_tests_to_run.remove(long_test)
current_test_types.add(f"Providers[{','.join(provider_tests_to_run)}]")

@cached_property
def parallel_test_types(self) -> str:
if not self.run_tests:
Expand All @@ -606,7 +636,25 @@ def parallel_test_types(self) -> str:
)
test_types_to_remove.add(test_type)
current_test_types = current_test_types - test_types_to_remove
return " ".join(sorted(current_test_types))

self._extract_long_provider_tests(current_test_types)

# this should be hard-coded as we want to have very specific sequence of tests
sorting_order = ["Core", "Providers[-amazon,google]", "Other", "Providers[amazon]", "WWW"]

def sort_key(t: str) -> str:
# Put the test types in the order we want them to run
if t in sorting_order:
return str(sorting_order.index(t))
else:
return str(len(sorting_order)) + t

return " ".join(
sorted(
current_test_types,
key=sort_key,
)
)

@cached_property
def basic_checks_only(self) -> bool:
Expand Down
Loading