Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
)

opensearchpy = pytest.importorskip("opensearchpy")
pytestmark = pytest.mark.db_test


class TestOpensearchJSONFormatter:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from airflow.providers.opensearch.log.os_task_handler import OpensearchTaskHandler

opensearchpy = pytest.importorskip("opensearchpy")
pytestmark = pytest.mark.db_test


class TestHitAndHitMetaAndOpenSearchResponse:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
from unit.opensearch.conftest import MockClient

opensearchpy = pytest.importorskip("opensearchpy")
pytestmark = pytest.mark.db_test

ES_PROVIDER_YAML_FILE = AIRFLOW_PROVIDERS_ROOT_PATH / "elasticsearch" / "provider.yaml"

Expand Down Expand Up @@ -191,6 +190,7 @@ def test_client_with_patterns(self):
)
assert handler.index_patterns == patterns

@pytest.mark.db_test
def test_read(self, ti):
ts = pendulum.now()
logs, metadatas = self.os_task_handler.read(
Expand Down Expand Up @@ -220,6 +220,7 @@ def test_read(self, ti):
assert not metadata["end_of_log"]
assert timezone.parse(metadata["last_log_timestamp"]) > ts

@pytest.mark.db_test
def test_read_with_patterns(self, ti):
ts = pendulum.now()
with mock.patch.object(self.os_task_handler, "index_patterns", new="test_*,other_*"):
Expand Down Expand Up @@ -250,6 +251,7 @@ def test_read_with_patterns(self, ti):
assert not metadata["end_of_log"]
assert timezone.parse(metadata["last_log_timestamp"]) > ts

@pytest.mark.db_test
def test_read_with_patterns_no_match(self, ti):
ts = pendulum.now()
with mock.patch.object(self.os_task_handler, "index_patterns", new="test_other_*,test_another_*"):
Expand Down Expand Up @@ -284,6 +286,7 @@ def test_read_with_patterns_no_match(self, ti):
# last_log_timestamp won't change if no log lines read.
assert timezone.parse(metadata["last_log_timestamp"]) == ts

@pytest.mark.db_test
def test_read_with_missing_index(self, ti):
ts = pendulum.now()
with mock.patch.object(self.os_task_handler, "index_patterns", new="nonexistent,test_*"):
Expand All @@ -304,6 +307,7 @@ def test_read_with_missing_index(self, ti):
)

@pytest.mark.parametrize("seconds", [3, 6])
@pytest.mark.db_test
def test_read_missing_logs(self, seconds, create_task_instance):
"""
When the log actually isn't there to be found, we only want to wait for 5 seconds.
Expand Down Expand Up @@ -359,6 +363,7 @@ def test_read_missing_logs(self, seconds, create_task_instance):
assert metadatas[0]["offset"] == "0"
assert timezone.parse(metadatas[0]["last_log_timestamp"]) == ts

@pytest.mark.db_test
def test_read_with_none_metadata(self, ti):
logs, metadatas = self.os_task_handler.read(ti, 1)

Expand Down Expand Up @@ -386,17 +391,20 @@ def test_read_with_none_metadata(self, ti):
assert not metadata["end_of_log"]
assert timezone.parse(metadata["last_log_timestamp"]) < pendulum.now()

@pytest.mark.db_test
def test_set_context(self, ti):
self.os_task_handler.set_context(ti)
assert self.os_task_handler.mark_end_on_close

@pytest.mark.db_test
def test_set_context_w_json_format_and_write_stdout(self, ti):
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
self.os_task_handler.formatter = formatter
self.os_task_handler.write_stdout = True
self.os_task_handler.json_format = True
self.os_task_handler.set_context(ti)

@pytest.mark.db_test
def test_close(self, ti):
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
self.os_task_handler.formatter = formatter
Expand All @@ -413,6 +421,7 @@ def test_close(self, ti):
assert log_line.endswith(self.end_of_log_mark.strip())
assert self.os_task_handler.closed

@pytest.mark.db_test
def test_close_no_mark_end(self, ti):
ti.raw = True
self.os_task_handler.set_context(ti)
Expand All @@ -423,6 +432,7 @@ def test_close_no_mark_end(self, ti):
assert self.end_of_log_mark not in log_file.read()
assert self.os_task_handler.closed

@pytest.mark.db_test
def test_close_closed(self, ti):
self.os_task_handler.closed = True
self.os_task_handler.set_context(ti)
Expand All @@ -432,6 +442,7 @@ def test_close_closed(self, ti):
) as log_file:
assert len(log_file.read()) == 0

@pytest.mark.db_test
def test_close_with_no_handler(self, ti):
self.os_task_handler.set_context(ti)
self.os_task_handler.handler = None
Expand All @@ -442,6 +453,7 @@ def test_close_with_no_handler(self, ti):
assert len(log_file.read()) == 0
assert self.os_task_handler.closed

@pytest.mark.db_test
def test_close_with_no_stream(self, ti):
self.os_task_handler.set_context(ti)
self.os_task_handler.handler.stream = None
Expand All @@ -461,18 +473,19 @@ def test_close_with_no_stream(self, ti):
assert self.end_of_log_mark in log_file.read()
assert self.os_task_handler.closed

@pytest.mark.db_test
def test_render_log_id(self, ti):
assert self.os_task_handler._render_log_id(ti, 1) == self.LOG_ID

self.os_task_handler.json_format = True
assert self.os_task_handler._render_log_id(ti, 1) == self.JSON_LOG_ID

#
def test_clean_date(self):
clean_execution_date = self.os_task_handler._clean_date(datetime(2016, 7, 8, 9, 10, 11, 12))
assert clean_execution_date == "2016_07_08T09_10_11_000012"

@mock.patch("sys.__stdout__", new_callable=StringIO)
@pytest.mark.db_test
def test_dynamic_offset(self, stdout_mock, ti, time_machine):
# arrange
handler = OpensearchTaskHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
from airflow.utils.timezone import datetime

opensearchpy = pytest.importorskip("opensearchpy")
pytestmark = pytest.mark.db_test


TEST_DAG_ID = "unit_tests"
Expand Down