From 5c9570d13a9383a59f2418125f3d5aa0d9b4bce8 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Tue, 6 Dec 2022 19:35:27 +0100 Subject: [PATCH] Move Integration tests to separate package under tests This is the first stage of improving the way how integration tests are run in our CI - first we want to separate them in a separate packages, and then we want to run them separately - one integration each in the CI. --- .pre-commit-config.yaml | 1 + Dockerfile.ci | 7 +- .../commands/testing_commands.py | 2 +- scripts/docker/entrypoint_ci.sh | 7 +- tests/cli/commands/test_celery_command.py | 31 -- tests/executors/test_celery_executor.py | 259 +------------ .../hooks => integration}/__init__.py | 0 tests/integration/api/__init__.py | 16 + tests/integration/api/auth/__init__.py | 16 + .../integration/api/auth/backend/__init__.py | 16 + .../api/auth/backend/test_kerberos_auth.py | 0 tests/integration/cli/__init__.py | 16 + tests/integration/cli/commands/__init__.py | 16 + .../cli/commands/test_celery_command.py | 57 +++ tests/integration/executors/__init__.py | 16 + .../executors/test_celery_executor.py | 339 ++++++++++++++++++ tests/integration/providers/__init__.py | 16 + .../integration/providers/apache/__init__.py | 16 + .../providers/apache/cassandra/__init__.py | 16 + .../apache/cassandra/hooks/__init__.py | 16 + .../apache/cassandra/hooks/test_cassandra.py | 0 .../providers/apache/pinot/__init__.py | 16 + .../providers/apache/pinot/hooks/__init__.py | 16 + .../apache/pinot/hooks/test_pinot.py | 34 ++ .../integration/providers/google/__init__.py | 16 + .../providers/google/cloud/__init__.py | 16 + .../google/cloud/transfers/__init__.py | 16 + .../cloud/transfers/test_presto_to_gcs.py | 0 .../transfers/test_presto_to_gcs_system.py | 0 .../cloud/transfers/test_trino_to_gcs.py | 0 tests/integration/providers/mongo/__init__.py | 16 + .../providers/mongo/sensors/__init__.py | 16 + .../providers/mongo/sensors/test_mongo.py | 0 tests/integration/providers/redis/__init__.py | 16 + .../providers/redis/hooks/__init__.py | 16 + .../providers/redis/hooks/test_redis.py | 39 ++ .../providers/redis/operators/__init__.py | 16 + .../redis/operators/test_redis_publish.py | 0 .../providers/redis/sensors/__init__.py | 16 + .../providers/redis/sensors/test_redis_key.py | 0 .../redis/sensors/test_redis_pub_sub.py | 80 +++++ tests/integration/providers/trino/__init__.py | 16 + .../providers/trino/hooks/__init__.py | 16 + .../providers/trino/hooks/test_trino.py | 48 +++ tests/integration/security/__init__.py | 16 + tests/integration/security/test_kerberos.py | 75 ++++ .../apache/pinot/hooks/test_pinot.py | 10 - tests/providers/redis/hooks/test_redis.py | 18 - .../redis/sensors/test_redis_pub_sub.py | 46 --- tests/providers/trino/hooks/test_trino.py | 24 -- tests/security/test_kerberos.py | 48 --- 51 files changed, 1069 insertions(+), 440 deletions(-) rename tests/{providers/apache/cassandra/hooks => integration}/__init__.py (100%) create mode 100644 tests/integration/api/__init__.py create mode 100644 tests/integration/api/auth/__init__.py create mode 100644 tests/integration/api/auth/backend/__init__.py rename tests/{ => integration}/api/auth/backend/test_kerberos_auth.py (100%) create mode 100644 tests/integration/cli/__init__.py create mode 100644 tests/integration/cli/commands/__init__.py create mode 100644 tests/integration/cli/commands/test_celery_command.py create mode 100644 tests/integration/executors/__init__.py create mode 100644 tests/integration/executors/test_celery_executor.py create mode 100644 tests/integration/providers/__init__.py create mode 100644 tests/integration/providers/apache/__init__.py create mode 100644 tests/integration/providers/apache/cassandra/__init__.py create mode 100644 tests/integration/providers/apache/cassandra/hooks/__init__.py rename tests/{ => integration}/providers/apache/cassandra/hooks/test_cassandra.py (100%) create mode 100644 tests/integration/providers/apache/pinot/__init__.py create mode 100644 tests/integration/providers/apache/pinot/hooks/__init__.py create mode 100644 tests/integration/providers/apache/pinot/hooks/test_pinot.py create mode 100644 tests/integration/providers/google/__init__.py create mode 100644 tests/integration/providers/google/cloud/__init__.py create mode 100644 tests/integration/providers/google/cloud/transfers/__init__.py rename tests/{ => integration}/providers/google/cloud/transfers/test_presto_to_gcs.py (100%) rename tests/{ => integration}/providers/google/cloud/transfers/test_presto_to_gcs_system.py (100%) rename tests/{ => integration}/providers/google/cloud/transfers/test_trino_to_gcs.py (100%) create mode 100644 tests/integration/providers/mongo/__init__.py create mode 100644 tests/integration/providers/mongo/sensors/__init__.py rename tests/{ => integration}/providers/mongo/sensors/test_mongo.py (100%) create mode 100644 tests/integration/providers/redis/__init__.py create mode 100644 tests/integration/providers/redis/hooks/__init__.py create mode 100644 tests/integration/providers/redis/hooks/test_redis.py create mode 100644 tests/integration/providers/redis/operators/__init__.py rename tests/{ => integration}/providers/redis/operators/test_redis_publish.py (100%) create mode 100644 tests/integration/providers/redis/sensors/__init__.py rename tests/{ => integration}/providers/redis/sensors/test_redis_key.py (100%) create mode 100644 tests/integration/providers/redis/sensors/test_redis_pub_sub.py create mode 100644 tests/integration/providers/trino/__init__.py create mode 100644 tests/integration/providers/trino/hooks/__init__.py create mode 100644 tests/integration/providers/trino/hooks/test_trino.py create mode 100644 tests/integration/security/__init__.py create mode 100644 tests/integration/security/test_kerberos.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 6af5e51781b44..bc56fd61fbb43 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -467,6 +467,7 @@ repos: ^airflow/www/static/| ^airflow/providers/| ^tests/providers/apache/cassandra/hooks/test_cassandra.py$| + ^tests/integration/providers/apache/cassandra/hooks/test_cassandra.py$| ^tests/system/providers/apache/spark/example_spark_dag.py$| ^docs/apache-airflow-providers-apache-cassandra/connections/cassandra.rst$| ^docs/apache-airflow-providers-apache-hive/commits.rst$| diff --git a/Dockerfile.ci b/Dockerfile.ci index f2e64830fbbd6..0a30eb611031e 100644 --- a/Dockerfile.ci +++ b/Dockerfile.ci @@ -890,11 +890,13 @@ else ) WWW_TESTS=("tests/www") HELM_CHART_TESTS=("tests/charts") + INTEGRATION_TESTS=("tests/integration") ALL_TESTS=("tests") ALL_PRESELECTED_TESTS=( "${CLI_TESTS[@]}" "${API_TESTS[@]}" "${HELM_CHART_TESTS[@]}" + "${INTEGRATION_TESTS[@]}" "${PROVIDERS_TESTS[@]}" "${CORE_TESTS[@]}" "${ALWAYS_TESTS[@]}" @@ -915,14 +917,15 @@ else SELECTED_TESTS=("${WWW_TESTS[@]}") elif [[ ${TEST_TYPE:=""} == "Helm" ]]; then SELECTED_TESTS=("${HELM_CHART_TESTS[@]}") + elif [[ ${TEST_TYPE:=""} == "Integration" ]]; then + SELECTED_TESTS=("${INTEGRATION_TESTS[@]}") elif [[ ${TEST_TYPE:=""} == "Other" ]]; then find_all_other_tests SELECTED_TESTS=("${ALL_OTHER_TESTS[@]}") elif [[ ${TEST_TYPE:=""} == "All" || ${TEST_TYPE} == "Quarantined" || \ ${TEST_TYPE} == "Always" || \ ${TEST_TYPE} == "Postgres" || ${TEST_TYPE} == "MySQL" || \ - ${TEST_TYPE} == "Long" || \ - ${TEST_TYPE} == "Integration" ]]; then + ${TEST_TYPE} == "Long" ]]; then SELECTED_TESTS=("${ALL_TESTS[@]}") elif [[ ${TEST_TYPE} =~ Providers\[(.*)\] ]]; then SELECTED_TESTS=() diff --git a/dev/breeze/src/airflow_breeze/commands/testing_commands.py b/dev/breeze/src/airflow_breeze/commands/testing_commands.py index 33781e33735bf..5b3dd953acd4b 100644 --- a/dev/breeze/src/airflow_breeze/commands/testing_commands.py +++ b/dev/breeze/src/airflow_breeze/commands/testing_commands.py @@ -226,7 +226,7 @@ def _run_tests_in_pool( progress_matcher=GenericRegexpProgressMatcher( regexp=TEST_PROGRESS_REGEXP, regexp_for_joined_line=PERCENT_TEST_PROGRESS_REGEXP, - lines_to_search=40, + lines_to_search=200, ), ) as (pool, outputs): results = [ diff --git a/scripts/docker/entrypoint_ci.sh b/scripts/docker/entrypoint_ci.sh index ca147c61c031d..29c8a52b446e2 100755 --- a/scripts/docker/entrypoint_ci.sh +++ b/scripts/docker/entrypoint_ci.sh @@ -335,11 +335,13 @@ else ) WWW_TESTS=("tests/www") HELM_CHART_TESTS=("tests/charts") + INTEGRATION_TESTS=("tests/integration") ALL_TESTS=("tests") ALL_PRESELECTED_TESTS=( "${CLI_TESTS[@]}" "${API_TESTS[@]}" "${HELM_CHART_TESTS[@]}" + "${INTEGRATION_TESTS[@]}" "${PROVIDERS_TESTS[@]}" "${CORE_TESTS[@]}" "${ALWAYS_TESTS[@]}" @@ -360,14 +362,15 @@ else SELECTED_TESTS=("${WWW_TESTS[@]}") elif [[ ${TEST_TYPE:=""} == "Helm" ]]; then SELECTED_TESTS=("${HELM_CHART_TESTS[@]}") + elif [[ ${TEST_TYPE:=""} == "Integration" ]]; then + SELECTED_TESTS=("${INTEGRATION_TESTS[@]}") elif [[ ${TEST_TYPE:=""} == "Other" ]]; then find_all_other_tests SELECTED_TESTS=("${ALL_OTHER_TESTS[@]}") elif [[ ${TEST_TYPE:=""} == "All" || ${TEST_TYPE} == "Quarantined" || \ ${TEST_TYPE} == "Always" || \ ${TEST_TYPE} == "Postgres" || ${TEST_TYPE} == "MySQL" || \ - ${TEST_TYPE} == "Long" || \ - ${TEST_TYPE} == "Integration" ]]; then + ${TEST_TYPE} == "Long" ]]; then SELECTED_TESTS=("${ALL_TESTS[@]}") elif [[ ${TEST_TYPE} =~ Providers\[(.*)\] ]]; then SELECTED_TESTS=() diff --git a/tests/cli/commands/test_celery_command.py b/tests/cli/commands/test_celery_command.py index 9271e1b5812e4..9acea81c60193 100644 --- a/tests/cli/commands/test_celery_command.py +++ b/tests/cli/commands/test_celery_command.py @@ -61,37 +61,6 @@ def test_validate_session_dbapi_exception(self, mock_session): assert airflow.settings.validate_session() is False -@pytest.mark.integration("redis") -@pytest.mark.integration("rabbitmq") -@pytest.mark.backend("mysql", "postgres") -class TestWorkerServeLogs: - @classmethod - def setup_class(cls): - cls.parser = cli_parser.get_parser() - - @mock.patch("airflow.cli.commands.celery_command.celery_app") - @conf_vars({("core", "executor"): "CeleryExecutor"}) - def test_serve_logs_on_worker_start(self, mock_celery_app): - with mock.patch("airflow.cli.commands.celery_command.Process") as mock_process: - args = self.parser.parse_args(["celery", "worker", "--concurrency", "1"]) - - with mock.patch("celery.platforms.check_privileges") as mock_privil: - mock_privil.return_value = 0 - celery_command.worker(args) - mock_process.assert_called() - - @mock.patch("airflow.cli.commands.celery_command.celery_app") - @conf_vars({("core", "executor"): "CeleryExecutor"}) - def test_skip_serve_logs_on_worker_start(self, mock_celery_app): - with mock.patch("airflow.cli.commands.celery_command.Process") as mock_popen: - args = self.parser.parse_args(["celery", "worker", "--concurrency", "1", "--skip-serve-logs"]) - - with mock.patch("celery.platforms.check_privileges") as mock_privil: - mock_privil.return_value = 0 - celery_command.worker(args) - mock_popen.assert_not_called() - - @pytest.mark.backend("mysql", "postgres") class TestCeleryStopCommand: @classmethod diff --git a/tests/executors/test_celery_executor.py b/tests/executors/test_celery_executor.py index 6ab99d0706bf9..cbbe64c5649c6 100644 --- a/tests/executors/test_celery_executor.py +++ b/tests/executors/test_celery_executor.py @@ -18,12 +18,9 @@ from __future__ import annotations import contextlib -import json -import logging import os import signal import sys -import unittest from datetime import datetime, timedelta from unittest import mock @@ -31,22 +28,16 @@ import celery.contrib.testing.tasks # noqa: F401 import pytest from celery import Celery -from celery.backends.base import BaseBackend, BaseKeyValueStoreBackend -from celery.backends.database import DatabaseBackend -from celery.contrib.testing.worker import start_worker from celery.result import AsyncResult from freezegun import freeze_time from kombu.asynchronous import set_event_loop from parameterized import parameterized from airflow.configuration import conf -from airflow.exceptions import AirflowException, AirflowTaskTimeout from airflow.executors import celery_executor -from airflow.executors.celery_executor import BulkStateFetcher from airflow.models.baseoperator import BaseOperator from airflow.models.dag import DAG -from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance, TaskInstanceKey -from airflow.operators.bash import BashOperator +from airflow.models.taskinstance import TaskInstance, TaskInstanceKey from airflow.utils import timezone from airflow.utils.state import State from tests.test_utils import db @@ -107,149 +98,6 @@ def teardown_method(self) -> None: db.clear_db_runs() db.clear_db_jobs() - @parameterized.expand(_prepare_test_bodies()) - @pytest.mark.integration("redis") - @pytest.mark.integration("rabbitmq") - @pytest.mark.backend("mysql", "postgres") - def test_celery_integration(self, broker_url): - success_command = ["airflow", "tasks", "run", "true", "some_parameter"] - fail_command = ["airflow", "version"] - - def fake_execute_command(command): - if command != success_command: - raise AirflowException("fail") - - with _prepare_app(broker_url, execute=fake_execute_command) as app: - executor = celery_executor.CeleryExecutor() - assert executor.tasks == {} - executor.start() - - with start_worker(app=app, logfile=sys.stdout, loglevel="info"): - execute_date = datetime.now() - - task_tuples_to_send = [ - ( - ("success", "fake_simple_ti", execute_date, 0), - success_command, - celery_executor.celery_configuration["task_default_queue"], - celery_executor.execute_command, - ), - ( - ("fail", "fake_simple_ti", execute_date, 0), - fail_command, - celery_executor.celery_configuration["task_default_queue"], - celery_executor.execute_command, - ), - ] - - # "Enqueue" them. We don't have a real SimpleTaskInstance, so directly edit the dict - for (key, command, queue, task) in task_tuples_to_send: - executor.queued_tasks[key] = (command, 1, queue, None) - executor.task_publish_retries[key] = 1 - - executor._process_tasks(task_tuples_to_send) - - assert list(executor.tasks.keys()) == [ - ("success", "fake_simple_ti", execute_date, 0), - ("fail", "fake_simple_ti", execute_date, 0), - ] - assert ( - executor.event_buffer[("success", "fake_simple_ti", execute_date, 0)][0] == State.QUEUED - ) - assert executor.event_buffer[("fail", "fake_simple_ti", execute_date, 0)][0] == State.QUEUED - - executor.end(synchronous=True) - - assert executor.event_buffer[("success", "fake_simple_ti", execute_date, 0)][0] == State.SUCCESS - assert executor.event_buffer[("fail", "fake_simple_ti", execute_date, 0)][0] == State.FAILED - - assert "success" not in executor.tasks - assert "fail" not in executor.tasks - - assert executor.queued_tasks == {} - assert timedelta(0, 600) == executor.task_adoption_timeout - - @pytest.mark.integration("redis") - @pytest.mark.integration("rabbitmq") - @pytest.mark.backend("mysql", "postgres") - def test_error_sending_task(self): - def fake_execute_command(): - pass - - with _prepare_app(execute=fake_execute_command): - # fake_execute_command takes no arguments while execute_command takes 1, - # which will cause TypeError when calling task.apply_async() - executor = celery_executor.CeleryExecutor() - task = BashOperator( - task_id="test", bash_command="true", dag=DAG(dag_id="id"), start_date=datetime.now() - ) - when = datetime.now() - value_tuple = ( - "command", - 1, - None, - SimpleTaskInstance.from_ti(ti=TaskInstance(task=task, run_id=None)), - ) - key = ("fail", "fake_simple_ti", when, 0) - executor.queued_tasks[key] = value_tuple - executor.task_publish_retries[key] = 1 - executor.heartbeat() - assert 0 == len(executor.queued_tasks), "Task should no longer be queued" - assert executor.event_buffer[("fail", "fake_simple_ti", when, 0)][0] == State.FAILED - - @pytest.mark.integration("redis") - @pytest.mark.integration("rabbitmq") - @pytest.mark.backend("mysql", "postgres") - def test_retry_on_error_sending_task(self, caplog): - """Test that Airflow retries publishing tasks to Celery Broker at least 3 times""" - - with _prepare_app(), caplog.at_level(logging.INFO), mock.patch.object( - # Mock `with timeout()` to _instantly_ fail. - celery_executor.timeout, - "__enter__", - side_effect=AirflowTaskTimeout, - ): - executor = celery_executor.CeleryExecutor() - assert executor.task_publish_retries == {} - assert executor.task_publish_max_retries == 3, "Assert Default Max Retries is 3" - - task = BashOperator( - task_id="test", bash_command="true", dag=DAG(dag_id="id"), start_date=datetime.now() - ) - when = datetime.now() - value_tuple = ( - "command", - 1, - None, - SimpleTaskInstance.from_ti(ti=TaskInstance(task=task, run_id=None)), - ) - key = ("fail", "fake_simple_ti", when, 0) - executor.queued_tasks[key] = value_tuple - - # Test that when heartbeat is called again, task is published again to Celery Queue - executor.heartbeat() - assert dict(executor.task_publish_retries) == {key: 1} - assert 1 == len(executor.queued_tasks), "Task should remain in queue" - assert executor.event_buffer == {} - assert f"[Try 1 of 3] Task Timeout Error for Task: ({key})." in caplog.text - - executor.heartbeat() - assert dict(executor.task_publish_retries) == {key: 2} - assert 1 == len(executor.queued_tasks), "Task should remain in queue" - assert executor.event_buffer == {} - assert f"[Try 2 of 3] Task Timeout Error for Task: ({key})." in caplog.text - - executor.heartbeat() - assert dict(executor.task_publish_retries) == {key: 3} - assert 1 == len(executor.queued_tasks), "Task should remain in queue" - assert executor.event_buffer == {} - assert f"[Try 3 of 3] Task Timeout Error for Task: ({key})." in caplog.text - - executor.heartbeat() - assert dict(executor.task_publish_retries) == {} - assert 0 == len(executor.queued_tasks), "Task should no longer be in queue" - assert executor.event_buffer[("fail", "fake_simple_ti", when, 0)][0] == State.FAILED - @pytest.mark.quarantined @pytest.mark.backend("mysql", "postgres") def test_exception_propagation(self): @@ -481,111 +329,6 @@ def test_operation_timeout_config(): assert celery_executor.OPERATION_TIMEOUT == 1 -class ClassWithCustomAttributes: - """Class for testing purpose: allows to create objects with custom attributes in one single statement.""" - - def __init__(self, **kwargs): - for key, value in kwargs.items(): - setattr(self, key, value) - - def __str__(self): - return f"{ClassWithCustomAttributes.__name__}({str(self.__dict__)})" - - def __repr__(self): - return self.__str__() - - def __eq__(self, other): - return self.__dict__ == other.__dict__ - - def __ne__(self, other): - return not self.__eq__(other) - - -class TestBulkStateFetcher(unittest.TestCase): - @mock.patch( - "celery.backends.base.BaseKeyValueStoreBackend.mget", - return_value=[json.dumps({"status": "SUCCESS", "task_id": "123"})], - ) - @pytest.mark.integration("redis") - @pytest.mark.integration("rabbitmq") - @pytest.mark.backend("mysql", "postgres") - def test_should_support_kv_backend(self, mock_mget): - with _prepare_app(): - mock_backend = BaseKeyValueStoreBackend(app=celery_executor.app) - with mock.patch( - "airflow.executors.celery_executor.Celery.backend", mock_backend - ), self.assertLogs("airflow.executors.celery_executor.BulkStateFetcher", level="DEBUG") as cm: - fetcher = BulkStateFetcher() - result = fetcher.get_many( - [ - mock.MagicMock(task_id="123"), - mock.MagicMock(task_id="456"), - ] - ) - - # Assert called - ignore order - mget_args, _ = mock_mget.call_args - assert set(mget_args[0]) == {b"celery-task-meta-456", b"celery-task-meta-123"} - mock_mget.assert_called_once_with(mock.ANY) - - assert result == {"123": ("SUCCESS", None), "456": ("PENDING", None)} - assert [ - "DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 2 state(s) for 2 task(s)" - ] == cm.output - - @mock.patch("celery.backends.database.DatabaseBackend.ResultSession") - @pytest.mark.integration("redis") - @pytest.mark.integration("rabbitmq") - @pytest.mark.backend("mysql", "postgres") - def test_should_support_db_backend(self, mock_session): - with _prepare_app(): - mock_backend = DatabaseBackend(app=celery_executor.app, url="sqlite3://") - - with mock.patch( - "airflow.executors.celery_executor.Celery.backend", mock_backend - ), self.assertLogs("airflow.executors.celery_executor.BulkStateFetcher", level="DEBUG") as cm: - mock_session = mock_backend.ResultSession.return_value - mock_session.query.return_value.filter.return_value.all.return_value = [ - mock.MagicMock(**{"to_dict.return_value": {"status": "SUCCESS", "task_id": "123"}}) - ] - - fetcher = BulkStateFetcher() - result = fetcher.get_many( - [ - mock.MagicMock(task_id="123"), - mock.MagicMock(task_id="456"), - ] - ) - - assert result == {"123": ("SUCCESS", None), "456": ("PENDING", None)} - assert [ - "DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 2 state(s) for 2 task(s)" - ] == cm.output - - @pytest.mark.integration("redis") - @pytest.mark.integration("rabbitmq") - @pytest.mark.backend("mysql", "postgres") - def test_should_support_base_backend(self): - with _prepare_app(): - mock_backend = mock.MagicMock(autospec=BaseBackend) - - with mock.patch( - "airflow.executors.celery_executor.Celery.backend", mock_backend - ), self.assertLogs("airflow.executors.celery_executor.BulkStateFetcher", level="DEBUG") as cm: - fetcher = BulkStateFetcher(1) - result = fetcher.get_many( - [ - ClassWithCustomAttributes(task_id="123", state="SUCCESS"), - ClassWithCustomAttributes(task_id="456", state="PENDING"), - ] - ) - - assert result == {"123": ("SUCCESS", None), "456": ("PENDING", None)} - assert [ - "DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 2 state(s) for 2 task(s)" - ] == cm.output - - class MockTask: """ A picklable object used to mock tasks sent to Celery. Can't use the mock library diff --git a/tests/providers/apache/cassandra/hooks/__init__.py b/tests/integration/__init__.py similarity index 100% rename from tests/providers/apache/cassandra/hooks/__init__.py rename to tests/integration/__init__.py diff --git a/tests/integration/api/__init__.py b/tests/integration/api/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/api/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/api/auth/__init__.py b/tests/integration/api/auth/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/api/auth/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/api/auth/backend/__init__.py b/tests/integration/api/auth/backend/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/api/auth/backend/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/api/auth/backend/test_kerberos_auth.py b/tests/integration/api/auth/backend/test_kerberos_auth.py similarity index 100% rename from tests/api/auth/backend/test_kerberos_auth.py rename to tests/integration/api/auth/backend/test_kerberos_auth.py diff --git a/tests/integration/cli/__init__.py b/tests/integration/cli/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/cli/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/cli/commands/__init__.py b/tests/integration/cli/commands/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/cli/commands/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/cli/commands/test_celery_command.py b/tests/integration/cli/commands/test_celery_command.py new file mode 100644 index 0000000000000..3cc9da0a13473 --- /dev/null +++ b/tests/integration/cli/commands/test_celery_command.py @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.cli import cli_parser +from airflow.cli.commands import celery_command +from tests.test_utils.config import conf_vars + + +@pytest.mark.integration("redis") +@pytest.mark.integration("rabbitmq") +@pytest.mark.backend("mysql", "postgres") +class TestWorkerServeLogs: + @classmethod + def setup_class(cls): + cls.parser = cli_parser.get_parser() + + @mock.patch("airflow.cli.commands.celery_command.celery_app") + @conf_vars({("core", "executor"): "CeleryExecutor"}) + def test_serve_logs_on_worker_start(self, mock_celery_app): + with mock.patch("airflow.cli.commands.celery_command.Process") as mock_process: + args = self.parser.parse_args(["celery", "worker", "--concurrency", "1"]) + + with mock.patch("celery.platforms.check_privileges") as mock_privil: + mock_privil.return_value = 0 + celery_command.worker(args) + mock_process.assert_called() + + @mock.patch("airflow.cli.commands.celery_command.celery_app") + @conf_vars({("core", "executor"): "CeleryExecutor"}) + def test_skip_serve_logs_on_worker_start(self, mock_celery_app): + with mock.patch("airflow.cli.commands.celery_command.Process") as mock_popen: + args = self.parser.parse_args(["celery", "worker", "--concurrency", "1", "--skip-serve-logs"]) + + with mock.patch("celery.platforms.check_privileges") as mock_privil: + mock_privil.return_value = 0 + celery_command.worker(args) + mock_popen.assert_not_called() diff --git a/tests/integration/executors/__init__.py b/tests/integration/executors/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/executors/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/executors/test_celery_executor.py b/tests/integration/executors/test_celery_executor.py new file mode 100644 index 0000000000000..f7f69fcfce789 --- /dev/null +++ b/tests/integration/executors/test_celery_executor.py @@ -0,0 +1,339 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import contextlib +import json +import logging +import os +import sys +import unittest +from datetime import datetime, timedelta +from unittest import mock + +# leave this it is used by the test worker +import celery.contrib.testing.tasks # noqa: F401 +import pytest +from celery import Celery +from celery.backends.base import BaseBackend, BaseKeyValueStoreBackend +from celery.backends.database import DatabaseBackend +from celery.contrib.testing.worker import start_worker +from kombu.asynchronous import set_event_loop +from parameterized import parameterized + +from airflow.configuration import conf +from airflow.exceptions import AirflowException, AirflowTaskTimeout +from airflow.executors import celery_executor +from airflow.executors.celery_executor import BulkStateFetcher +from airflow.models.dag import DAG +from airflow.models.taskinstance import SimpleTaskInstance, TaskInstance +from airflow.operators.bash import BashOperator +from airflow.utils.state import State +from tests.test_utils import db + + +def _prepare_test_bodies(): + if "CELERY_BROKER_URLS" in os.environ: + return [(url,) for url in os.environ["CELERY_BROKER_URLS"].split(",")] + return [(conf.get("celery", "BROKER_URL"))] + + +class FakeCeleryResult: + @property + def state(self): + raise Exception() + + def task_id(self): + return "task_id" + + +@contextlib.contextmanager +def _prepare_app(broker_url=None, execute=None): + broker_url = broker_url or conf.get("celery", "BROKER_URL") + execute = execute or celery_executor.execute_command.__wrapped__ + + test_config = dict(celery_executor.celery_configuration) + test_config.update({"broker_url": broker_url}) + test_app = Celery(broker_url, config_source=test_config) + test_execute = test_app.task(execute) + patch_app = mock.patch("airflow.executors.celery_executor.app", test_app) + patch_execute = mock.patch("airflow.executors.celery_executor.execute_command", test_execute) + + backend = test_app.backend + + if hasattr(backend, "ResultSession"): + # Pre-create the database tables now, otherwise SQLA vis Celery has a + # race condition where it one of the subprocesses can die with "Table + # already exists" error, because SQLA checks for which tables exist, + # then issues a CREATE TABLE, rather than doing CREATE TABLE IF NOT + # EXISTS + session = backend.ResultSession() + session.close() + + with patch_app, patch_execute: + try: + yield test_app + finally: + # Clear event loop to tear down each celery instance + set_event_loop(None) + + +@pytest.mark.integration("redis") +@pytest.mark.integration("rabbitmq") +@pytest.mark.backend("mysql", "postgres") +class TestCeleryExecutor: + def setup_method(self) -> None: + db.clear_db_runs() + db.clear_db_jobs() + + def teardown_method(self) -> None: + db.clear_db_runs() + db.clear_db_jobs() + + @parameterized.expand(_prepare_test_bodies()) + def test_celery_integration(self, broker_url): + success_command = ["airflow", "tasks", "run", "true", "some_parameter"] + fail_command = ["airflow", "version"] + + def fake_execute_command(command): + if command != success_command: + raise AirflowException("fail") + + with _prepare_app(broker_url, execute=fake_execute_command) as app: + executor = celery_executor.CeleryExecutor() + assert executor.tasks == {} + executor.start() + + with start_worker(app=app, logfile=sys.stdout, loglevel="info"): + execute_date = datetime.now() + + task_tuples_to_send = [ + ( + ("success", "fake_simple_ti", execute_date, 0), + success_command, + celery_executor.celery_configuration["task_default_queue"], + celery_executor.execute_command, + ), + ( + ("fail", "fake_simple_ti", execute_date, 0), + fail_command, + celery_executor.celery_configuration["task_default_queue"], + celery_executor.execute_command, + ), + ] + + # "Enqueue" them. We don't have a real SimpleTaskInstance, so directly edit the dict + for (key, command, queue, task) in task_tuples_to_send: + executor.queued_tasks[key] = (command, 1, queue, None) + executor.task_publish_retries[key] = 1 + + executor._process_tasks(task_tuples_to_send) + + assert list(executor.tasks.keys()) == [ + ("success", "fake_simple_ti", execute_date, 0), + ("fail", "fake_simple_ti", execute_date, 0), + ] + assert ( + executor.event_buffer[("success", "fake_simple_ti", execute_date, 0)][0] == State.QUEUED + ) + assert executor.event_buffer[("fail", "fake_simple_ti", execute_date, 0)][0] == State.QUEUED + + executor.end(synchronous=True) + + assert executor.event_buffer[("success", "fake_simple_ti", execute_date, 0)][0] == State.SUCCESS + assert executor.event_buffer[("fail", "fake_simple_ti", execute_date, 0)][0] == State.FAILED + + assert "success" not in executor.tasks + assert "fail" not in executor.tasks + + assert executor.queued_tasks == {} + assert timedelta(0, 600) == executor.task_adoption_timeout + + def test_error_sending_task(self): + def fake_execute_command(): + pass + + with _prepare_app(execute=fake_execute_command): + # fake_execute_command takes no arguments while execute_command takes 1, + # which will cause TypeError when calling task.apply_async() + executor = celery_executor.CeleryExecutor() + task = BashOperator( + task_id="test", bash_command="true", dag=DAG(dag_id="id"), start_date=datetime.now() + ) + when = datetime.now() + value_tuple = ( + "command", + 1, + None, + SimpleTaskInstance.from_ti(ti=TaskInstance(task=task, run_id=None)), + ) + key = ("fail", "fake_simple_ti", when, 0) + executor.queued_tasks[key] = value_tuple + executor.task_publish_retries[key] = 1 + executor.heartbeat() + assert 0 == len(executor.queued_tasks), "Task should no longer be queued" + assert executor.event_buffer[("fail", "fake_simple_ti", when, 0)][0] == State.FAILED + + def test_retry_on_error_sending_task(self, caplog): + """Test that Airflow retries publishing tasks to Celery Broker at least 3 times""" + + with _prepare_app(), caplog.at_level(logging.INFO), mock.patch.object( + # Mock `with timeout()` to _instantly_ fail. + celery_executor.timeout, + "__enter__", + side_effect=AirflowTaskTimeout, + ): + executor = celery_executor.CeleryExecutor() + assert executor.task_publish_retries == {} + assert executor.task_publish_max_retries == 3, "Assert Default Max Retries is 3" + + task = BashOperator( + task_id="test", bash_command="true", dag=DAG(dag_id="id"), start_date=datetime.now() + ) + when = datetime.now() + value_tuple = ( + "command", + 1, + None, + SimpleTaskInstance.from_ti(ti=TaskInstance(task=task, run_id=None)), + ) + key = ("fail", "fake_simple_ti", when, 0) + executor.queued_tasks[key] = value_tuple + + # Test that when heartbeat is called again, task is published again to Celery Queue + executor.heartbeat() + assert dict(executor.task_publish_retries) == {key: 1} + assert 1 == len(executor.queued_tasks), "Task should remain in queue" + assert executor.event_buffer == {} + assert f"[Try 1 of 3] Task Timeout Error for Task: ({key})." in caplog.text + + executor.heartbeat() + assert dict(executor.task_publish_retries) == {key: 2} + assert 1 == len(executor.queued_tasks), "Task should remain in queue" + assert executor.event_buffer == {} + assert f"[Try 2 of 3] Task Timeout Error for Task: ({key})." in caplog.text + + executor.heartbeat() + assert dict(executor.task_publish_retries) == {key: 3} + assert 1 == len(executor.queued_tasks), "Task should remain in queue" + assert executor.event_buffer == {} + assert f"[Try 3 of 3] Task Timeout Error for Task: ({key})." in caplog.text + + executor.heartbeat() + assert dict(executor.task_publish_retries) == {} + assert 0 == len(executor.queued_tasks), "Task should no longer be in queue" + assert executor.event_buffer[("fail", "fake_simple_ti", when, 0)][0] == State.FAILED + + +class ClassWithCustomAttributes: + """Class for testing purpose: allows to create objects with custom attributes in one single statement.""" + + def __init__(self, **kwargs): + for key, value in kwargs.items(): + setattr(self, key, value) + + def __str__(self): + return f"{ClassWithCustomAttributes.__name__}({str(self.__dict__)})" + + def __repr__(self): + return self.__str__() + + def __eq__(self, other): + return self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not self.__eq__(other) + + +@pytest.mark.integration("redis") +@pytest.mark.integration("rabbitmq") +@pytest.mark.backend("mysql", "postgres") +class TestBulkStateFetcher(unittest.TestCase): + @mock.patch( + "celery.backends.base.BaseKeyValueStoreBackend.mget", + return_value=[json.dumps({"status": "SUCCESS", "task_id": "123"})], + ) + def test_should_support_kv_backend(self, mock_mget): + with _prepare_app(): + mock_backend = BaseKeyValueStoreBackend(app=celery_executor.app) + with mock.patch( + "airflow.executors.celery_executor.Celery.backend", mock_backend + ), self.assertLogs("airflow.executors.celery_executor.BulkStateFetcher", level="DEBUG") as cm: + fetcher = BulkStateFetcher() + result = fetcher.get_many( + [ + mock.MagicMock(task_id="123"), + mock.MagicMock(task_id="456"), + ] + ) + + # Assert called - ignore order + mget_args, _ = mock_mget.call_args + assert set(mget_args[0]) == {b"celery-task-meta-456", b"celery-task-meta-123"} + mock_mget.assert_called_once_with(mock.ANY) + + assert result == {"123": ("SUCCESS", None), "456": ("PENDING", None)} + assert [ + "DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 2 state(s) for 2 task(s)" + ] == cm.output + + @mock.patch("celery.backends.database.DatabaseBackend.ResultSession") + def test_should_support_db_backend(self, mock_session): + with _prepare_app(): + mock_backend = DatabaseBackend(app=celery_executor.app, url="sqlite3://") + + with mock.patch( + "airflow.executors.celery_executor.Celery.backend", mock_backend + ), self.assertLogs("airflow.executors.celery_executor.BulkStateFetcher", level="DEBUG") as cm: + mock_session = mock_backend.ResultSession.return_value + mock_session.query.return_value.filter.return_value.all.return_value = [ + mock.MagicMock(**{"to_dict.return_value": {"status": "SUCCESS", "task_id": "123"}}) + ] + + fetcher = BulkStateFetcher() + result = fetcher.get_many( + [ + mock.MagicMock(task_id="123"), + mock.MagicMock(task_id="456"), + ] + ) + + assert result == {"123": ("SUCCESS", None), "456": ("PENDING", None)} + assert [ + "DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 2 state(s) for 2 task(s)" + ] == cm.output + + def test_should_support_base_backend(self): + with _prepare_app(): + mock_backend = mock.MagicMock(autospec=BaseBackend) + + with mock.patch( + "airflow.executors.celery_executor.Celery.backend", mock_backend + ), self.assertLogs("airflow.executors.celery_executor.BulkStateFetcher", level="DEBUG") as cm: + fetcher = BulkStateFetcher(1) + result = fetcher.get_many( + [ + ClassWithCustomAttributes(task_id="123", state="SUCCESS"), + ClassWithCustomAttributes(task_id="456", state="PENDING"), + ] + ) + + assert result == {"123": ("SUCCESS", None), "456": ("PENDING", None)} + assert [ + "DEBUG:airflow.executors.celery_executor.BulkStateFetcher:Fetched 2 state(s) for 2 task(s)" + ] == cm.output diff --git a/tests/integration/providers/__init__.py b/tests/integration/providers/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/apache/__init__.py b/tests/integration/providers/apache/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/apache/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/apache/cassandra/__init__.py b/tests/integration/providers/apache/cassandra/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/apache/cassandra/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/apache/cassandra/hooks/__init__.py b/tests/integration/providers/apache/cassandra/hooks/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/apache/cassandra/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/apache/cassandra/hooks/test_cassandra.py b/tests/integration/providers/apache/cassandra/hooks/test_cassandra.py similarity index 100% rename from tests/providers/apache/cassandra/hooks/test_cassandra.py rename to tests/integration/providers/apache/cassandra/hooks/test_cassandra.py diff --git a/tests/integration/providers/apache/pinot/__init__.py b/tests/integration/providers/apache/pinot/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/apache/pinot/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/apache/pinot/hooks/__init__.py b/tests/integration/providers/apache/pinot/hooks/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/apache/pinot/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/apache/pinot/hooks/test_pinot.py b/tests/integration/providers/apache/pinot/hooks/test_pinot.py new file mode 100644 index 0000000000000..d99e8efdf4808 --- /dev/null +++ b/tests/integration/providers/apache/pinot/hooks/test_pinot.py @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook + + +@pytest.mark.integration("pinot") +class TestPinotDbApiHookIntegration: + @mock.patch.dict("os.environ", AIRFLOW_CONN_PINOT_BROKER_DEFAULT="pinot://pinot:8000/") + def test_should_return_records(self): + hook = PinotDbApiHook() + sql = "select playerName from baseballStats ORDER BY playerName limit 5" + records = hook.get_records(sql) + assert [["A. Harry"], ["A. Harry"], ["Aaron"], ["Aaron Albert"], ["Aaron Albert"]] == records diff --git a/tests/integration/providers/google/__init__.py b/tests/integration/providers/google/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/google/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/google/cloud/__init__.py b/tests/integration/providers/google/cloud/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/google/cloud/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/google/cloud/transfers/__init__.py b/tests/integration/providers/google/cloud/transfers/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/google/cloud/transfers/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/google/cloud/transfers/test_presto_to_gcs.py b/tests/integration/providers/google/cloud/transfers/test_presto_to_gcs.py similarity index 100% rename from tests/providers/google/cloud/transfers/test_presto_to_gcs.py rename to tests/integration/providers/google/cloud/transfers/test_presto_to_gcs.py diff --git a/tests/providers/google/cloud/transfers/test_presto_to_gcs_system.py b/tests/integration/providers/google/cloud/transfers/test_presto_to_gcs_system.py similarity index 100% rename from tests/providers/google/cloud/transfers/test_presto_to_gcs_system.py rename to tests/integration/providers/google/cloud/transfers/test_presto_to_gcs_system.py diff --git a/tests/providers/google/cloud/transfers/test_trino_to_gcs.py b/tests/integration/providers/google/cloud/transfers/test_trino_to_gcs.py similarity index 100% rename from tests/providers/google/cloud/transfers/test_trino_to_gcs.py rename to tests/integration/providers/google/cloud/transfers/test_trino_to_gcs.py diff --git a/tests/integration/providers/mongo/__init__.py b/tests/integration/providers/mongo/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/mongo/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/mongo/sensors/__init__.py b/tests/integration/providers/mongo/sensors/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/mongo/sensors/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/mongo/sensors/test_mongo.py b/tests/integration/providers/mongo/sensors/test_mongo.py similarity index 100% rename from tests/providers/mongo/sensors/test_mongo.py rename to tests/integration/providers/mongo/sensors/test_mongo.py diff --git a/tests/integration/providers/redis/__init__.py b/tests/integration/providers/redis/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/redis/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/redis/hooks/__init__.py b/tests/integration/providers/redis/hooks/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/redis/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/redis/hooks/test_redis.py b/tests/integration/providers/redis/hooks/test_redis.py new file mode 100644 index 0000000000000..eac17ee676edc --- /dev/null +++ b/tests/integration/providers/redis/hooks/test_redis.py @@ -0,0 +1,39 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import pytest + +from airflow.providers.redis.hooks.redis import RedisHook + + +@pytest.mark.integration("redis") +class TestRedisHook: + def test_real_ping(self): + hook = RedisHook(redis_conn_id="redis_default") + redis = hook.get_conn() + + assert redis.ping(), "Connection to Redis with PING works." + + def test_real_get_and_set(self): + hook = RedisHook(redis_conn_id="redis_default") + redis = hook.get_conn() + + assert redis.set("test_key", "test_value"), "Connection to Redis with SET works." + assert redis.get("test_key") == b"test_value", "Connection to Redis with GET works." + assert redis.delete("test_key") == 1, "Connection to Redis with DELETE works." diff --git a/tests/integration/providers/redis/operators/__init__.py b/tests/integration/providers/redis/operators/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/redis/operators/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/redis/operators/test_redis_publish.py b/tests/integration/providers/redis/operators/test_redis_publish.py similarity index 100% rename from tests/providers/redis/operators/test_redis_publish.py rename to tests/integration/providers/redis/operators/test_redis_publish.py diff --git a/tests/integration/providers/redis/sensors/__init__.py b/tests/integration/providers/redis/sensors/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/redis/sensors/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/redis/sensors/test_redis_key.py b/tests/integration/providers/redis/sensors/test_redis_key.py similarity index 100% rename from tests/providers/redis/sensors/test_redis_key.py rename to tests/integration/providers/redis/sensors/test_redis_key.py diff --git a/tests/integration/providers/redis/sensors/test_redis_pub_sub.py b/tests/integration/providers/redis/sensors/test_redis_pub_sub.py new file mode 100644 index 0000000000000..e99f2a38d9914 --- /dev/null +++ b/tests/integration/providers/redis/sensors/test_redis_pub_sub.py @@ -0,0 +1,80 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from time import sleep +from unittest.mock import MagicMock, call + +import pytest + +from airflow.models.dag import DAG +from airflow.providers.redis.hooks.redis import RedisHook +from airflow.providers.redis.sensors.redis_pub_sub import RedisPubSubSensor +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2017, 1, 1) + + +@pytest.mark.integration("redis") +class TestRedisPubSubSensor: + def setup_method(self): + args = {"owner": "airflow", "start_date": DEFAULT_DATE} + + self.dag = DAG("test_dag_id", default_args=args) + + self.mock_context = MagicMock() + + def test_poke_true(self): + sensor = RedisPubSubSensor( + task_id="test_task", dag=self.dag, channels="test", redis_conn_id="redis_default" + ) + + hook = RedisHook(redis_conn_id="redis_default") + redis = hook.get_conn() + redis.publish("test", "message") + + result = sensor.poke(self.mock_context) + assert not result + + for _ in range(1, 10): + result = sensor.poke(self.mock_context) + if result: + break + sleep(0.1) + assert result + context_calls = [ + call.xcom_push( + key="message", + value={"type": "message", "pattern": None, "channel": b"test", "data": b"message"}, + ) + ] + assert self.mock_context["ti"].method_calls == context_calls, "context calls should be same" + result = sensor.poke(self.mock_context) + assert not result + + def test_poke_false(self): + sensor = RedisPubSubSensor( + task_id="test_task", dag=self.dag, channels="test", redis_conn_id="redis_default" + ) + + result = sensor.poke(self.mock_context) + assert not result + assert self.mock_context["ti"].method_calls == [], "context calls should be same" + result = sensor.poke(self.mock_context) + assert not result + assert self.mock_context["ti"].method_calls == [], "context calls should be same" diff --git a/tests/integration/providers/trino/__init__.py b/tests/integration/providers/trino/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/trino/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/trino/hooks/__init__.py b/tests/integration/providers/trino/hooks/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/providers/trino/hooks/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/providers/trino/hooks/test_trino.py b/tests/integration/providers/trino/hooks/test_trino.py new file mode 100644 index 0000000000000..bb06d53887e66 --- /dev/null +++ b/tests/integration/providers/trino/hooks/test_trino.py @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from unittest import mock + +import pytest + +from airflow.providers.trino.hooks.trino import TrinoHook + + +@pytest.mark.integration("trino") +class TestTrinoHookIntegration: + @mock.patch.dict("os.environ", AIRFLOW_CONN_TRINO_DEFAULT="trino://airflow@trino:8080/") + def test_should_record_records(self): + hook = TrinoHook() + sql = "SELECT name FROM tpch.sf1.customer ORDER BY custkey ASC LIMIT 3" + records = hook.get_records(sql) + assert [["Customer#000000001"], ["Customer#000000002"], ["Customer#000000003"]] == records + + @pytest.mark.integration("kerberos") + def test_should_record_records_with_kerberos_auth(self): + conn_url = ( + "trino://airflow@trino.example.com:7778/?" + "auth=kerberos&kerberos__service_name=HTTP&" + "verify=False&" + "protocol=https" + ) + with mock.patch.dict("os.environ", AIRFLOW_CONN_TRINO_DEFAULT=conn_url): + hook = TrinoHook() + sql = "SELECT name FROM tpch.sf1.customer ORDER BY custkey ASC LIMIT 3" + records = hook.get_records(sql) + assert [["Customer#000000001"], ["Customer#000000002"], ["Customer#000000003"]] == records diff --git a/tests/integration/security/__init__.py b/tests/integration/security/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/integration/security/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/integration/security/test_kerberos.py b/tests/integration/security/test_kerberos.py new file mode 100644 index 0000000000000..033b455b56ea7 --- /dev/null +++ b/tests/integration/security/test_kerberos.py @@ -0,0 +1,75 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +import logging +import os +from contextlib import nullcontext +from unittest import mock + +import pytest + +from airflow.security import kerberos +from airflow.security.kerberos import renew_from_kt +from tests.test_utils.config import conf_vars + + +@pytest.mark.integration("kerberos") +class TestKerberosIntegration: + @classmethod + def setup_class(cls): + assert "KRB5_KTNAME" in os.environ, "Missing KRB5_KTNAME environment variable" + cls.keytab = os.environ["KRB5_KTNAME"] + + @pytest.mark.parametrize( + "kerberos_config", + [ + pytest.param({}, id="default-config"), + pytest.param({("kerberos", "include_ip"): "True"}, id="explicit-include-ip"), + pytest.param({("kerberos", "include_ip"): "False"}, id="explicit-not-include-ip"), + pytest.param({("kerberos", "forwardable"): "True"}, id="explicit-forwardable"), + pytest.param({("kerberos", "forwardable"): "False"}, id="explicit-not-forwardable"), + ], + ) + def test_renew_from_kt(self, kerberos_config): + """We expect return 0 (exit code) and successful run.""" + with conf_vars(kerberos_config): + assert renew_from_kt(principal=None, keytab=self.keytab) == 0 + + @pytest.mark.parametrize( + "exit_on_fail, expected_context", + [ + pytest.param(True, pytest.raises(SystemExit), id="exit-on-fail"), + pytest.param(False, nullcontext(), id="return-code-of-fail"), + ], + ) + def test_args_from_cli(self, exit_on_fail, expected_context, caplog): + """Test exit code if keytab not exist.""" + keytab = "/not/exists/keytab" + result = None + + with mock.patch.dict(os.environ, KRB5_KTNAME=keytab), conf_vars({("kerberos", "keytab"): keytab}): + with expected_context as ctx: + with caplog.at_level(logging.ERROR, logger=kerberos.log.name): + caplog.clear() + result = renew_from_kt(principal=None, keytab=keytab, exit_on_fail=exit_on_fail) + + # If `exit_on_fail` set to True than exit code in exception, otherwise in function return + exit_code = ctx.value.code if exit_on_fail else result + assert exit_code == 1 + assert caplog.record_tuples diff --git a/tests/providers/apache/pinot/hooks/test_pinot.py b/tests/providers/apache/pinot/hooks/test_pinot.py index 0ca2dfb1c581a..89dde59666b2b 100644 --- a/tests/providers/apache/pinot/hooks/test_pinot.py +++ b/tests/providers/apache/pinot/hooks/test_pinot.py @@ -271,13 +271,3 @@ def test_get_pandas_df(self): assert column == df.columns[0] for i, item in enumerate(result_sets): assert item[0] == df.values.tolist()[i][0] - - -@pytest.mark.integration("pinot") -class TestPinotDbApiHookIntegration: - @mock.patch.dict("os.environ", AIRFLOW_CONN_PINOT_BROKER_DEFAULT="pinot://pinot:8000/") - def test_should_return_records(self): - hook = PinotDbApiHook() - sql = "select playerName from baseballStats ORDER BY playerName limit 5" - records = hook.get_records(sql) - assert [["A. Harry"], ["A. Harry"], ["Aaron"], ["Aaron Albert"], ["Aaron Albert"]] == records diff --git a/tests/providers/redis/hooks/test_redis.py b/tests/providers/redis/hooks/test_redis.py index 0eb4edd76fc0a..119964fd3b22c 100644 --- a/tests/providers/redis/hooks/test_redis.py +++ b/tests/providers/redis/hooks/test_redis.py @@ -19,8 +19,6 @@ from unittest import mock -import pytest - from airflow.models import Connection from airflow.providers.redis.hooks.redis import RedisHook @@ -76,19 +74,3 @@ def test_get_conn_password_stays_none(self): hook = RedisHook(redis_conn_id="redis_default") hook.get_conn() assert hook.password is None - - @pytest.mark.integration("redis") - def test_real_ping(self): - hook = RedisHook(redis_conn_id="redis_default") - redis = hook.get_conn() - - assert redis.ping(), "Connection to Redis with PING works." - - @pytest.mark.integration("redis") - def test_real_get_and_set(self): - hook = RedisHook(redis_conn_id="redis_default") - redis = hook.get_conn() - - assert redis.set("test_key", "test_value"), "Connection to Redis with SET works." - assert redis.get("test_key") == b"test_value", "Connection to Redis with GET works." - assert redis.delete("test_key") == 1, "Connection to Redis with DELETE works." diff --git a/tests/providers/redis/sensors/test_redis_pub_sub.py b/tests/providers/redis/sensors/test_redis_pub_sub.py index 5ed0c40db71bd..dae08797ba69d 100644 --- a/tests/providers/redis/sensors/test_redis_pub_sub.py +++ b/tests/providers/redis/sensors/test_redis_pub_sub.py @@ -17,13 +17,9 @@ # under the License. from __future__ import annotations -from time import sleep from unittest.mock import MagicMock, call, patch -import pytest - from airflow.models.dag import DAG -from airflow.providers.redis.hooks.redis import RedisHook from airflow.providers.redis.sensors.redis_pub_sub import RedisPubSubSensor from airflow.utils import timezone @@ -76,45 +72,3 @@ def test_poke_mock_false(self, mock_redis_conn): context_calls = [] assert self.mock_context["ti"].method_calls == context_calls, "context calls should be same" - - @pytest.mark.integration("redis") - def test_poke_true(self): - sensor = RedisPubSubSensor( - task_id="test_task", dag=self.dag, channels="test", redis_conn_id="redis_default" - ) - - hook = RedisHook(redis_conn_id="redis_default") - redis = hook.get_conn() - redis.publish("test", "message") - - result = sensor.poke(self.mock_context) - assert not result - - for _ in range(1, 10): - result = sensor.poke(self.mock_context) - if result: - break - sleep(0.1) - assert result - context_calls = [ - call.xcom_push( - key="message", - value={"type": "message", "pattern": None, "channel": b"test", "data": b"message"}, - ) - ] - assert self.mock_context["ti"].method_calls == context_calls, "context calls should be same" - result = sensor.poke(self.mock_context) - assert not result - - @pytest.mark.integration("redis") - def test_poke_false(self): - sensor = RedisPubSubSensor( - task_id="test_task", dag=self.dag, channels="test", redis_conn_id="redis_default" - ) - - result = sensor.poke(self.mock_context) - assert not result - assert self.mock_context["ti"].method_calls == [], "context calls should be same" - result = sensor.poke(self.mock_context) - assert not result - assert self.mock_context["ti"].method_calls == [], "context calls should be same" diff --git a/tests/providers/trino/hooks/test_trino.py b/tests/providers/trino/hooks/test_trino.py index d4560a50ace12..4a0f2e6d2f680 100644 --- a/tests/providers/trino/hooks/test_trino.py +++ b/tests/providers/trino/hooks/test_trino.py @@ -311,27 +311,3 @@ def test_connection_failure(self, mock_conn): def test_serialize_cell(self): assert "foo" == self.db_hook._serialize_cell("foo", None) assert 1 == self.db_hook._serialize_cell(1, None) - - -@pytest.mark.integration("trino") -class TestTrinoHookIntegration: - @mock.patch.dict("os.environ", AIRFLOW_CONN_TRINO_DEFAULT="trino://airflow@trino:8080/") - def test_should_record_records(self): - hook = TrinoHook() - sql = "SELECT name FROM tpch.sf1.customer ORDER BY custkey ASC LIMIT 3" - records = hook.get_records(sql) - assert [["Customer#000000001"], ["Customer#000000002"], ["Customer#000000003"]] == records - - @pytest.mark.integration("kerberos") - def test_should_record_records_with_kerberos_auth(self): - conn_url = ( - "trino://airflow@trino.example.com:7778/?" - "auth=kerberos&kerberos__service_name=HTTP&" - "verify=False&" - "protocol=https" - ) - with mock.patch.dict("os.environ", AIRFLOW_CONN_TRINO_DEFAULT=conn_url): - hook = TrinoHook() - sql = "SELECT name FROM tpch.sf1.customer ORDER BY custkey ASC LIMIT 3" - records = hook.get_records(sql) - assert [["Customer#000000001"], ["Customer#000000002"], ["Customer#000000003"]] == records diff --git a/tests/security/test_kerberos.py b/tests/security/test_kerberos.py index 81f139c78206a..1a582220e0f74 100644 --- a/tests/security/test_kerberos.py +++ b/tests/security/test_kerberos.py @@ -18,9 +18,7 @@ from __future__ import annotations import logging -import os import shlex -from contextlib import nullcontext from unittest import mock import pytest @@ -30,52 +28,6 @@ from tests.test_utils.config import conf_vars -@pytest.mark.integration("kerberos") -class TestKerberosIntegration: - @classmethod - def setup_class(cls): - assert "KRB5_KTNAME" in os.environ, "Missing KRB5_KTNAME environment variable" - cls.keytab = os.environ["KRB5_KTNAME"] - - @pytest.mark.parametrize( - "kerberos_config", - [ - pytest.param({}, id="default-config"), - pytest.param({("kerberos", "include_ip"): "True"}, id="explicit-include-ip"), - pytest.param({("kerberos", "include_ip"): "False"}, id="explicit-not-include-ip"), - pytest.param({("kerberos", "forwardable"): "True"}, id="explicit-forwardable"), - pytest.param({("kerberos", "forwardable"): "False"}, id="explicit-not-forwardable"), - ], - ) - def test_renew_from_kt(self, kerberos_config): - """We expect return 0 (exit code) and successful run.""" - with conf_vars(kerberos_config): - assert renew_from_kt(principal=None, keytab=self.keytab) == 0 - - @pytest.mark.parametrize( - "exit_on_fail, expected_context", - [ - pytest.param(True, pytest.raises(SystemExit), id="exit-on-fail"), - pytest.param(False, nullcontext(), id="return-code-of-fail"), - ], - ) - def test_args_from_cli(self, exit_on_fail, expected_context, caplog): - """Test exit code if keytab not exist.""" - keytab = "/not/exists/keytab" - result = None - - with mock.patch.dict(os.environ, KRB5_KTNAME=keytab), conf_vars({("kerberos", "keytab"): keytab}): - with expected_context as ctx: - with caplog.at_level(logging.ERROR, logger=kerberos.log.name): - caplog.clear() - result = renew_from_kt(principal=None, keytab=keytab, exit_on_fail=exit_on_fail) - - # If `exit_on_fail` set to True than exit code in exception, otherwise in function return - exit_code = ctx.value.code if exit_on_fail else result - assert exit_code == 1 - assert caplog.record_tuples - - class TestKerberos: @pytest.mark.parametrize( "kerberos_config, expected_cmd",