Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dev/breeze/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,6 @@ PLEASE DO NOT MODIFY THE HASH BELOW! IT IS AUTOMATICALLY UPDATED BY PRE-COMMIT.

---------------------------------------------------------------------------------------------------------

Package config hash: 14ae9299422a78b0af9db9ec4e81c9b6a9412f93a5499a367da252cf58d13e1c8bae31d3642c693e356136fb07a94c25569d088acee5521099a08274dcae3278
Package config hash: 19b7a69c4b7ef23d1c665286fd7ca1a1d8c28fa9ba8523da6c3e215d8cd7c4bc0406186898a90c92d8e9f527bc8fa8d5c6407f914d7674e59e4981bb3c795e8c

---------------------------------------------------------------------------------------------------------
4 changes: 1 addition & 3 deletions dev/breeze/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ dependencies = [
"psutil>=5.9.6",
"pygithub>=2.1.1",
"pytest-xdist>=3.3.1",
# 8.4.0 introduces changes related to fixture that cause CI failure.
# TODO: we'll need to check how to get around with it.
"pytest>=8.2,<8.4.0",
"pytest>=8.3.3",
"pyyaml>=6.0.2",
"requests>=2.31.0",
"restructuredtext-lint>=1.4.0",
Expand Down
4 changes: 1 addition & 3 deletions devel-common/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,7 @@ dependencies = [
"pytest-timeouts>=1.2.1",
"pytest-unordered>=0.6.1",
"pytest-xdist>=3.5.0",
# 8.4.0 introduces changes related to fixture that cause CI failure.
# TODO: we'll need to check how to get around with it.
"pytest>=8.3.3,<8.4.0",
"pytest>=8.3.3",
]
"sentry" = [
"blinker>=1.7.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ def aws_region():
return AWS_REGION


@mock_aws
@pytest.fixture
def patch_hook(monkeypatch, aws_region):
"""Patch hook object by dummy boto3 Batch client."""
Expand All @@ -59,6 +58,7 @@ def test_batch_waiters(aws_region):
assert isinstance(batch_waiters, BatchWaitersHook)


@mock_aws
class TestBatchWaiters:
@pytest.fixture(autouse=True)
def setup_tests(self, patch_hook):
Expand Down Expand Up @@ -215,6 +215,7 @@ def test_wait_for_job_raises_for_waiter_error(self):
assert mock_waiter.wait.call_count == 1


@mock_aws
class TestBatchJobWaiters:
"""Test default waiters."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,43 +184,43 @@ def test_event_to_str(self):

@pytest.mark.db_test
class TestCloudwatchTaskHandler:
@conf_vars({("logging", "remote_log_conn_id"): "aws_default"})
@pytest.fixture(autouse=True)
def setup_tests(self, create_log_template, tmp_path_factory, session):
self.remote_log_group = "log_group_name"
self.region_name = "us-west-2"
self.local_log_location = str(tmp_path_factory.mktemp("local-cloudwatch-log-location"))
if AIRFLOW_V_3_0_PLUS:
create_log_template("{dag_id}/{task_id}/{logical_date}/{try_number}.log")
else:
create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log")
self.cloudwatch_task_handler = CloudwatchTaskHandler(
self.local_log_location,
f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
)

date = datetime(2020, 1, 1)
dag_id = "dag_for_testing_cloudwatch_task_handler"
task_id = "task_for_testing_cloudwatch_log_handler"
self.dag = DAG(dag_id=dag_id, schedule=None, start_date=date)
task = EmptyOperator(task_id=task_id, dag=self.dag)
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
dag_id=self.dag.dag_id,
logical_date=date,
run_id="test",
run_type="scheduled",
)
else:
dag_run = DagRun(
dag_id=self.dag.dag_id,
execution_date=date,
run_id="test",
run_type="scheduled",
def setup(self, create_log_template, tmp_path_factory, session):
with conf_vars({("logging", "remote_log_conn_id"): "aws_default"}):
self.remote_log_group = "log_group_name"
self.region_name = "us-west-2"
self.local_log_location = str(tmp_path_factory.mktemp("local-cloudwatch-log-location"))
if AIRFLOW_V_3_0_PLUS:
create_log_template("{dag_id}/{task_id}/{logical_date}/{try_number}.log")
else:
create_log_template("{dag_id}/{task_id}/{execution_date}/{try_number}.log")
self.cloudwatch_task_handler = CloudwatchTaskHandler(
self.local_log_location,
f"arn:aws:logs:{self.region_name}:11111111:log-group:{self.remote_log_group}",
)
session.add(dag_run)
session.commit()
session.refresh(dag_run)

date = datetime(2020, 1, 1)
dag_id = "dag_for_testing_cloudwatch_task_handler"
task_id = "task_for_testing_cloudwatch_log_handler"
self.dag = DAG(dag_id=dag_id, schedule=None, start_date=date)
task = EmptyOperator(task_id=task_id, dag=self.dag)
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
dag_id=self.dag.dag_id,
logical_date=date,
run_id="test",
run_type="scheduled",
)
else:
dag_run = DagRun(
dag_id=self.dag.dag_id,
execution_date=date,
run_id="test",
run_type="scheduled",
)
session.add(dag_run)
session.commit()
session.refresh(dag_run)

self.ti = TaskInstance(task=task, run_id=dag_run.run_id)
self.ti.dag_run = dag_run
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,39 +46,39 @@ def s3mock():

@pytest.mark.db_test
class TestS3RemoteLogIO:
@conf_vars({("logging", "remote_log_conn_id"): "aws_default"})
@pytest.fixture(autouse=True)
def setup_tests(self, create_log_template, tmp_path_factory, session):
self.remote_log_base = "s3://bucket/remote/log/location"
self.remote_log_location = "s3://bucket/remote/log/location/1.log"
self.remote_log_key = "remote/log/location/1.log"
self.local_log_location = str(tmp_path_factory.mktemp("local-s3-log-location"))
create_log_template("{try_number}.log")
self.s3_task_handler = S3TaskHandler(self.local_log_location, self.remote_log_base)
# Verify the hook now with the config override
self.subject = self.s3_task_handler.io
assert self.subject.hook is not None

date = datetime(2016, 1, 1)
self.dag = DAG("dag_for_testing_s3_task_handler", schedule=None, start_date=date)
task = EmptyOperator(task_id="task_for_testing_s3_log_handler", dag=self.dag)
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
dag_id=self.dag.dag_id,
logical_date=date,
run_id="test",
run_type="manual",
)
else:
dag_run = DagRun(
dag_id=self.dag.dag_id,
execution_date=date,
run_id="test",
run_type="manual",
)
session.add(dag_run)
session.commit()
session.refresh(dag_run)
with conf_vars({("logging", "remote_log_conn_id"): "aws_default"}):
self.remote_log_base = "s3://bucket/remote/log/location"
self.remote_log_location = "s3://bucket/remote/log/location/1.log"
self.remote_log_key = "remote/log/location/1.log"
self.local_log_location = str(tmp_path_factory.mktemp("local-s3-log-location"))
create_log_template("{try_number}.log")
self.s3_task_handler = S3TaskHandler(self.local_log_location, self.remote_log_base)
# Verify the hook now with the config override
self.subject = self.s3_task_handler.io
assert self.subject.hook is not None

date = datetime(2016, 1, 1)
self.dag = DAG("dag_for_testing_s3_task_handler", schedule=None, start_date=date)
task = EmptyOperator(task_id="task_for_testing_s3_log_handler", dag=self.dag)
if AIRFLOW_V_3_0_PLUS:
dag_run = DagRun(
dag_id=self.dag.dag_id,
logical_date=date,
run_id="test",
run_type="manual",
)
else:
dag_run = DagRun(
dag_id=self.dag.dag_id,
execution_date=date,
run_id="test",
run_type="manual",
)
session.add(dag_run)
session.commit()
session.refresh(dag_run)

self.ti = TaskInstance(task=task, run_id=dag_run.run_id)
self.ti.dag_run = dag_run
Expand Down
Loading