diff --git a/providers/opensearch/tests/unit/opensearch/log/test_os_json_formatter.py b/providers/opensearch/tests/unit/opensearch/log/test_os_json_formatter.py index e85f02b56beea..14700b44737fb 100644 --- a/providers/opensearch/tests/unit/opensearch/log/test_os_json_formatter.py +++ b/providers/opensearch/tests/unit/opensearch/log/test_os_json_formatter.py @@ -29,7 +29,6 @@ ) opensearchpy = pytest.importorskip("opensearchpy") -pytestmark = pytest.mark.db_test class TestOpensearchJSONFormatter: diff --git a/providers/opensearch/tests/unit/opensearch/log/test_os_response.py b/providers/opensearch/tests/unit/opensearch/log/test_os_response.py index 31af433754ff4..f7f36b6732fb4 100644 --- a/providers/opensearch/tests/unit/opensearch/log/test_os_response.py +++ b/providers/opensearch/tests/unit/opensearch/log/test_os_response.py @@ -31,7 +31,6 @@ from airflow.providers.opensearch.log.os_task_handler import OpensearchTaskHandler opensearchpy = pytest.importorskip("opensearchpy") -pytestmark = pytest.mark.db_test class TestHitAndHitMetaAndOpenSearchResponse: diff --git a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py index ee90e3c622081..fb51c56e469ec 100644 --- a/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py +++ b/providers/opensearch/tests/unit/opensearch/log/test_os_task_handler.py @@ -48,7 +48,6 @@ from unit.opensearch.conftest import MockClient opensearchpy = pytest.importorskip("opensearchpy") -pytestmark = pytest.mark.db_test ES_PROVIDER_YAML_FILE = AIRFLOW_PROVIDERS_ROOT_PATH / "elasticsearch" / "provider.yaml" @@ -191,6 +190,7 @@ def test_client_with_patterns(self): ) assert handler.index_patterns == patterns + @pytest.mark.db_test def test_read(self, ti): ts = pendulum.now() logs, metadatas = self.os_task_handler.read( @@ -220,6 +220,7 @@ def test_read(self, ti): assert not metadata["end_of_log"] assert timezone.parse(metadata["last_log_timestamp"]) > ts + @pytest.mark.db_test def test_read_with_patterns(self, ti): ts = pendulum.now() with mock.patch.object(self.os_task_handler, "index_patterns", new="test_*,other_*"): @@ -250,6 +251,7 @@ def test_read_with_patterns(self, ti): assert not metadata["end_of_log"] assert timezone.parse(metadata["last_log_timestamp"]) > ts + @pytest.mark.db_test def test_read_with_patterns_no_match(self, ti): ts = pendulum.now() with mock.patch.object(self.os_task_handler, "index_patterns", new="test_other_*,test_another_*"): @@ -284,6 +286,7 @@ def test_read_with_patterns_no_match(self, ti): # last_log_timestamp won't change if no log lines read. assert timezone.parse(metadata["last_log_timestamp"]) == ts + @pytest.mark.db_test def test_read_with_missing_index(self, ti): ts = pendulum.now() with mock.patch.object(self.os_task_handler, "index_patterns", new="nonexistent,test_*"): @@ -304,6 +307,7 @@ def test_read_with_missing_index(self, ti): ) @pytest.mark.parametrize("seconds", [3, 6]) + @pytest.mark.db_test def test_read_missing_logs(self, seconds, create_task_instance): """ When the log actually isn't there to be found, we only want to wait for 5 seconds. @@ -359,6 +363,7 @@ def test_read_missing_logs(self, seconds, create_task_instance): assert metadatas[0]["offset"] == "0" assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts + @pytest.mark.db_test def test_read_with_none_metadata(self, ti): logs, metadatas = self.os_task_handler.read(ti, 1) @@ -386,10 +391,12 @@ def test_read_with_none_metadata(self, ti): assert not metadata["end_of_log"] assert timezone.parse(metadata["last_log_timestamp"]) < pendulum.now() + @pytest.mark.db_test def test_set_context(self, ti): self.os_task_handler.set_context(ti) assert self.os_task_handler.mark_end_on_close + @pytest.mark.db_test def test_set_context_w_json_format_and_write_stdout(self, ti): formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") self.os_task_handler.formatter = formatter @@ -397,6 +404,7 @@ def test_set_context_w_json_format_and_write_stdout(self, ti): self.os_task_handler.json_format = True self.os_task_handler.set_context(ti) + @pytest.mark.db_test def test_close(self, ti): formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") self.os_task_handler.formatter = formatter @@ -413,6 +421,7 @@ def test_close(self, ti): assert log_line.endswith(self.end_of_log_mark.strip()) assert self.os_task_handler.closed + @pytest.mark.db_test def test_close_no_mark_end(self, ti): ti.raw = True self.os_task_handler.set_context(ti) @@ -423,6 +432,7 @@ def test_close_no_mark_end(self, ti): assert self.end_of_log_mark not in log_file.read() assert self.os_task_handler.closed + @pytest.mark.db_test def test_close_closed(self, ti): self.os_task_handler.closed = True self.os_task_handler.set_context(ti) @@ -432,6 +442,7 @@ def test_close_closed(self, ti): ) as log_file: assert len(log_file.read()) == 0 + @pytest.mark.db_test def test_close_with_no_handler(self, ti): self.os_task_handler.set_context(ti) self.os_task_handler.handler = None @@ -442,6 +453,7 @@ def test_close_with_no_handler(self, ti): assert len(log_file.read()) == 0 assert self.os_task_handler.closed + @pytest.mark.db_test def test_close_with_no_stream(self, ti): self.os_task_handler.set_context(ti) self.os_task_handler.handler.stream = None @@ -461,18 +473,19 @@ def test_close_with_no_stream(self, ti): assert self.end_of_log_mark in log_file.read() assert self.os_task_handler.closed + @pytest.mark.db_test def test_render_log_id(self, ti): assert self.os_task_handler._render_log_id(ti, 1) == self.LOG_ID self.os_task_handler.json_format = True assert self.os_task_handler._render_log_id(ti, 1) == self.JSON_LOG_ID - # def test_clean_date(self): clean_execution_date = self.os_task_handler._clean_date(datetime(2016, 7, 8, 9, 10, 11, 12)) assert clean_execution_date == "2016_07_08T09_10_11_000012" @mock.patch("sys.__stdout__", new_callable=StringIO) + @pytest.mark.db_test def test_dynamic_offset(self, stdout_mock, ti, time_machine): # arrange handler = OpensearchTaskHandler( diff --git a/providers/opensearch/tests/unit/opensearch/operators/test_opensearch.py b/providers/opensearch/tests/unit/opensearch/operators/test_opensearch.py index fd42a91a71dd6..02eb9c2d68e71 100644 --- a/providers/opensearch/tests/unit/opensearch/operators/test_opensearch.py +++ b/providers/opensearch/tests/unit/opensearch/operators/test_opensearch.py @@ -28,7 +28,6 @@ from airflow.utils.timezone import datetime opensearchpy = pytest.importorskip("opensearchpy") -pytestmark = pytest.mark.db_test TEST_DAG_ID = "unit_tests"