Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c5a8141
Deprecate implicit DAG schedule
uranusjr Aug 7, 2024
9375f61
Fix some test and example DAGs
uranusjr Aug 7, 2024
39ebe98
Fix more DAGs in tests
uranusjr Aug 7, 2024
d45bdab
Fix dag_maker fixture
uranusjr Aug 7, 2024
4846aa4
Missed some
uranusjr Aug 7, 2024
b8f5aa2
Fix SerializedDAG init warning
uranusjr Aug 8, 2024
47822d0
More test DAG warning fixes
uranusjr Aug 8, 2024
2de04c7
More test DAG warning fixes
uranusjr Aug 8, 2024
1aa30d3
dag_maker needs to have a default schedule
uranusjr Aug 8, 2024
ba37796
More warnings from DAGs in tests
uranusjr Aug 8, 2024
f9a64e3
More DAG warning fixes in tests
uranusjr Aug 8, 2024
f51d2cb
Fix DAG warning in API tests
uranusjr Aug 8, 2024
9cdec72
More warning fixes in tests
uranusjr Aug 8, 2024
42fca56
Use the correct schedule for test
uranusjr Aug 8, 2024
c4bb218
Match previous default in serialization tests
uranusjr Aug 8, 2024
77f2b37
More deprecation fixes in providers
uranusjr Aug 8, 2024
2890822
Fix mising schedule in tests
potiuk Aug 13, 2024
5bc131f
Merge pull request #1512 from potiuk/add-missing-schedule
uranusjr Aug 13, 2024
97e0483
Merge branch 'main' into deprecate-missing-schedule
uranusjr Aug 13, 2024
d12b93d
More schedule fixes in tests
uranusjr Aug 14, 2024
c7e92c0
More schedule deprecation warnings
uranusjr Aug 14, 2024
faedd7d
More schedule deprecation warnings
uranusjr Aug 14, 2024
0cf9c6e
I hope we're near the end
uranusjr Aug 14, 2024
4ea73fd
Are these the only ones left?
uranusjr Aug 14, 2024
a6a5956
Fix dag_maker schedule for compat arguments
uranusjr Aug 14, 2024
379e016
How many more?
uranusjr Aug 14, 2024
913a65e
This should fix all the providers
uranusjr Aug 14, 2024
210df42
Correctly reset ParseImportError in test
uranusjr Aug 14, 2024
cc0c2a4
Merge branch 'main' into deprecate-missing-schedule
uranusjr Aug 14, 2024
7225cbf
MySQL and SQLite fixes
uranusjr Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow/example_dags/example_dynamic_task_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from airflow.decorators import task
from airflow.models.dag import DAG

with DAG(dag_id="example_dynamic_task_mapping", start_date=datetime(2022, 3, 4)) as dag:
with DAG(dag_id="example_dynamic_task_mapping", schedule=None, start_date=datetime(2022, 3, 4)) as dag:

@task
def add_one(x: int):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def execute(self, context):

with DAG(
dag_id="example_dynamic_task_mapping_with_no_taskflow_operators",
schedule=None,
start_date=datetime(2022, 3, 4),
catchup=False,
):
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_setup_teardown.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

with DAG(
dag_id="example_setup_teardown",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_setup_teardown_taskflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

with DAG(
dag_id="example_setup_teardown_taskflow",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
2 changes: 1 addition & 1 deletion airflow/example_dags/example_short_circuit_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.utils.trigger_rule import TriggerRule


@dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"])
@dag(schedule=None, start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"])
def example_short_circuit_decorator():
# [START howto_operator_short_circuit]
@task.short_circuit()
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_short_circuit_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

with DAG(
dag_id="example_short_circuit_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
2 changes: 2 additions & 0 deletions airflow/example_dags/example_skip_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from __future__ import annotations

import datetime
from typing import TYPE_CHECKING

import pendulum
Expand Down Expand Up @@ -63,6 +64,7 @@ def create_test_pipeline(suffix, trigger_rule):

with DAG(
dag_id="example_skip_dag",
schedule=datetime.timedelta(days=1),
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_task_group.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
# [START howto_task_group]
with DAG(
dag_id="example_task_group",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
1 change: 1 addition & 0 deletions airflow/example_dags/example_task_group_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ def task_group_function(value: int) -> None:
# Executing Tasks and TaskGroups
with DAG(
dag_id="example_task_group_decorator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
Expand Down
8 changes: 7 additions & 1 deletion airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,12 @@ def __init__(
self.timetable = DatasetTriggeredTimetable(DatasetAll(*schedule))
self.schedule_interval = self.timetable.summary
elif isinstance(schedule, ArgNotSet):
warnings.warn(
"Creating a DAG with an implicit schedule is deprecated, and will stop working "
"in a future release. Set `schedule=datetime.timedelta(days=1)` explicitly.",
RemovedInAirflow3Warning,
stacklevel=2,
)
self.timetable = create_timetable(schedule, self.timezone)
self.schedule_interval = DEFAULT_SCHEDULE_INTERVAL
else:
Expand Down Expand Up @@ -3282,7 +3288,7 @@ def get_serialized_fields(cls):
"auto_register",
"fail_stop",
}
cls.__serialized_fields = frozenset(vars(DAG(dag_id="test"))) - exclusion_list
cls.__serialized_fields = frozenset(vars(DAG(dag_id="test", schedule=None))) - exclusion_list
return cls.__serialized_fields

def get_edge_info(self, upstream_task_id: str, downstream_task_id: str) -> EdgeInfoType:
Expand Down
2 changes: 1 addition & 1 deletion airflow/serialization/serialized_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1670,7 +1670,7 @@ def serialize_dag(cls, dag: DAG) -> dict:
@classmethod
def deserialize_dag(cls, encoded_dag: dict[str, Any]) -> SerializedDAG:
"""Deserializes a DAG from a JSON object."""
dag = SerializedDAG(dag_id=encoded_dag["_dag_id"])
dag = SerializedDAG(dag_id=encoded_dag["_dag_id"], schedule=None)

for k, v in encoded_dag.items():
if k == "_downstream_task_ids":
Expand Down
2 changes: 1 addition & 1 deletion kubernetes_tests/test_kubernetes_pod_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@


def create_context(task) -> Context:
dag = DAG(dag_id="dag")
dag = DAG(dag_id="dag", schedule=None)
execution_date = timezone.datetime(
2016, 1, 1, 1, 0, 0, tzinfo=timezone.parse_timezone("Europe/Amsterdam")
)
Expand Down
37 changes: 19 additions & 18 deletions tests/api_connexion/endpoints/test_dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,18 @@ def configured_app(minimal_app_for_api):

with DAG(
DAG_ID,
schedule=None,
start_date=datetime(2020, 6, 15),
doc_md="details",
params={"foo": 1},
tags=["example"],
) as dag:
EmptyOperator(task_id=TASK_ID)

with DAG(DAG2_ID, start_date=datetime(2020, 6, 15)) as dag2: # no doc_md
with DAG(DAG2_ID, schedule=None, start_date=datetime(2020, 6, 15)) as dag2: # no doc_md
EmptyOperator(task_id=TASK_ID)

with DAG(DAG3_ID) as dag3: # DAG start_date set to None
with DAG(DAG3_ID, schedule=None) as dag3: # DAG start_date set to None
EmptyOperator(task_id=TASK_ID, start_date=datetime(2019, 6, 12))

dag_bag = DagBag(os.devnull, include_examples=False)
Expand Down Expand Up @@ -962,10 +963,10 @@ def test_only_active_false_returns_all_dags(self, url_safe_serializer):
)
def test_filter_dags_by_tags_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1", tags=["t1"])
dag2 = DAG(dag_id="TEST_DAG_2", tags=["t2"])
dag3 = DAG(dag_id="TEST_DAG_3", tags=["t1", "t2"])
dag4 = DAG(dag_id="TEST_DAG_4")
dag1 = DAG(dag_id="TEST_DAG_1", schedule=None, tags=["t1"])
dag2 = DAG(dag_id="TEST_DAG_2", schedule=None, tags=["t2"])
dag3 = DAG(dag_id="TEST_DAG_3", schedule=None, tags=["t1", "t2"])
dag4 = DAG(dag_id="TEST_DAG_4", schedule=None)
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
Expand All @@ -990,10 +991,10 @@ def test_filter_dags_by_tags_works(self, url, expected_dag_ids):
)
def test_filter_dags_by_dag_id_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1")
dag2 = DAG(dag_id="TEST_DAG_2")
dag3 = DAG(dag_id="SAMPLE_DAG_1")
dag4 = DAG(dag_id="SAMPLE_DAG_2")
dag1 = DAG(dag_id="TEST_DAG_1", schedule=None)
dag2 = DAG(dag_id="TEST_DAG_2", schedule=None)
dag3 = DAG(dag_id="SAMPLE_DAG_1", schedule=None)
dag4 = DAG(dag_id="SAMPLE_DAG_2", schedule=None)
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
Expand Down Expand Up @@ -1886,10 +1887,10 @@ def test_only_active_false_returns_all_dags(self, url_safe_serializer, session):
)
def test_filter_dags_by_tags_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1", tags=["t1"])
dag2 = DAG(dag_id="TEST_DAG_2", tags=["t2"])
dag3 = DAG(dag_id="TEST_DAG_3", tags=["t1", "t2"])
dag4 = DAG(dag_id="TEST_DAG_4")
dag1 = DAG(dag_id="TEST_DAG_1", schedule=None, tags=["t1"])
dag2 = DAG(dag_id="TEST_DAG_2", schedule=None, tags=["t2"])
dag3 = DAG(dag_id="TEST_DAG_3", schedule=None, tags=["t1", "t2"])
dag4 = DAG(dag_id="TEST_DAG_4", schedule=None)
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
Expand Down Expand Up @@ -1919,10 +1920,10 @@ def test_filter_dags_by_tags_works(self, url, expected_dag_ids):
)
def test_filter_dags_by_dag_id_works(self, url, expected_dag_ids):
# test filter by tags
dag1 = DAG(dag_id="TEST_DAG_1")
dag2 = DAG(dag_id="TEST_DAG_2")
dag3 = DAG(dag_id="SAMPLE_DAG_1")
dag4 = DAG(dag_id="SAMPLE_DAG_2")
dag1 = DAG(dag_id="TEST_DAG_1", schedule=None)
dag2 = DAG(dag_id="TEST_DAG_2", schedule=None)
dag3 = DAG(dag_id="SAMPLE_DAG_1", schedule=None)
dag4 = DAG(dag_id="SAMPLE_DAG_2", schedule=None)
dag1.sync_to_db()
dag2.sync_to_db()
dag3.sync_to_db()
Expand Down
2 changes: 1 addition & 1 deletion tests/api_connexion/endpoints/test_extra_link_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def teardown_method(self) -> None:
clear_db_xcom()

def _create_dag(self):
with DAG(dag_id="TEST_DAG_ID", default_args={"start_date": self.default_time}) as dag:
with DAG(dag_id="TEST_DAG_ID", schedule=None, default_args={"start_date": self.default_time}) as dag:
CustomOperator(task_id="TEST_SINGLE_LINK", bash_command="TEST_LINK_VALUE")
CustomOperator(
task_id="TEST_MULTIPLE_LINK", bash_command=["TEST_LINK_VALUE_1", "TEST_LINK_VALUE_2"]
Expand Down
2 changes: 1 addition & 1 deletion tests/api_connexion/endpoints/test_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_qu

# Recreate DAG without tasks
dagbag = self.app.dag_bag
dag = DAG(self.DAG_ID, start_date=timezone.parse(self.default_time))
dag = DAG(self.DAG_ID, schedule=None, start_date=timezone.parse(self.default_time))
del dagbag.dags[self.DAG_ID]
dagbag.bag_dag(dag=dag)

Expand Down
4 changes: 2 additions & 2 deletions tests/api_connexion/endpoints/test_task_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ class TestTaskEndpoint:

@pytest.fixture(scope="class")
def setup_dag(self, configured_app):
with DAG(self.dag_id, start_date=self.task1_start_date, doc_md="details") as dag:
with DAG(self.dag_id, schedule=None, start_date=self.task1_start_date, doc_md="details") as dag:
task1 = EmptyOperator(task_id=self.task_id, params={"foo": "bar"})
task2 = EmptyOperator(task_id=self.task_id2, start_date=self.task2_start_date)

with DAG(self.mapped_dag_id, start_date=self.task1_start_date) as mapped_dag:
with DAG(self.mapped_dag_id, schedule=None, start_date=self.task1_start_date) as mapped_dag:
EmptyOperator(task_id=self.task_id3)
# Use the private _expand() method to avoid the empty kwargs check.
# We don't care about how the operator runs here, only its presence.
Expand Down
3 changes: 2 additions & 1 deletion tests/api_connexion/schemas/test_dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.
from __future__ import annotations

from datetime import datetime
from datetime import datetime, timedelta

import pendulum
import pytest
Expand Down Expand Up @@ -150,6 +150,7 @@ def test_serialize_test_dag_collection_schema(url_safe_serializer):
def test_serialize_test_dag_detail_schema(url_safe_serializer):
dag = DAG(
dag_id="test_dag",
schedule=timedelta(days=1),
start_date=datetime(2020, 6, 19),
doc_md="docs",
orientation="LR",
Expand Down
6 changes: 5 additions & 1 deletion tests/api_experimental/common/test_delete_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ class TestDeleteDAGSuccessfulDelete:
def setup_dag_models(self):
task = EmptyOperator(
task_id="dummy",
dag=DAG(dag_id=self.key, default_args={"start_date": timezone.datetime(2022, 1, 1)}),
dag=DAG(
dag_id=self.key,
schedule=None,
default_args={"start_date": timezone.datetime(2022, 1, 1)},
),
owner="airflow",
)

Expand Down
16 changes: 12 additions & 4 deletions tests/api_experimental/common/test_trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def test_trigger_dag_dag_not_found(self, dag_bag_mock):
@mock.patch("airflow.models.DagBag")
def test_trigger_dag_dag_run_exist(self, dag_bag_mock, dag_run_mock):
dag_id = "dag_run_exist"
dag = DAG(dag_id)
dag = DAG(dag_id, schedule=None)
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag
dag_run_mock.find_duplicate.return_value = DagRun()
Expand All @@ -58,7 +58,11 @@ def test_trigger_dag_dag_run_exist(self, dag_bag_mock, dag_run_mock):
@mock.patch("airflow.models.DagBag")
def test_trigger_dag_with_too_early_start_date(self, dag_bag_mock):
dag_id = "trigger_dag_with_too_early_start_date"
dag = DAG(dag_id, default_args={"start_date": timezone.datetime(2016, 9, 5, 10, 10, 0)})
dag = DAG(
dag_id=dag_id,
schedule=None,
default_args={"start_date": timezone.datetime(2016, 9, 5, 10, 10, 0)},
)
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag

Expand All @@ -68,7 +72,11 @@ def test_trigger_dag_with_too_early_start_date(self, dag_bag_mock):
@mock.patch("airflow.models.DagBag")
def test_trigger_dag_with_valid_start_date(self, dag_bag_mock):
dag_id = "trigger_dag_with_valid_start_date"
dag = DAG(dag_id, default_args={"start_date": timezone.datetime(2016, 9, 5, 10, 10, 0)})
dag = DAG(
dag_id=dag_id,
schedule=None,
default_args={"start_date": timezone.datetime(2016, 9, 5, 10, 10, 0)},
)
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag
dag_bag_mock.dags_hash = {}
Expand All @@ -88,7 +96,7 @@ def test_trigger_dag_with_valid_start_date(self, dag_bag_mock):
@mock.patch("airflow.models.DagBag")
def test_trigger_dag_with_conf(self, dag_bag_mock, conf, expected_conf):
dag_id = "trigger_dag_with_conf"
dag = DAG(dag_id)
dag = DAG(dag_id, schedule=None)
dag_bag_mock.dags = [dag_id]
dag_bag_mock.get_dag.return_value = dag

Expand Down
5 changes: 4 additions & 1 deletion tests/callbacks/test_callback_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ def test_from_json(self, input, request_class):
if input is None:
ti = TaskInstance(
task=BashOperator(
task_id="test", bash_command="true", dag=DAG(dag_id="id"), start_date=datetime.now()
task_id="test",
bash_command="true",
start_date=datetime.now(),
dag=DAG(dag_id="id", schedule=None),
),
run_id="fake_run",
state=State.RUNNING,
Expand Down
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -815,6 +815,7 @@ def dag_maker(request):
if serialized_marker:
(want_serialized,) = serialized_marker.args or (True,)

from airflow.utils.helpers import NOTSET
from airflow.utils.log.logging_mixin import LoggingMixin

class DagFactory(LoggingMixin):
Expand Down Expand Up @@ -927,6 +928,7 @@ def create_dagrun_after(self, dagrun, **kwargs):
def __call__(
self,
dag_id="test_dag",
schedule=NOTSET,
serialized=want_serialized,
fileloc=None,
processor_subdir=None,
Expand Down Expand Up @@ -955,6 +957,12 @@ def __call__(
DEFAULT_DATE = timezone.datetime(2016, 1, 1)
self.start_date = DEFAULT_DATE
self.kwargs["start_date"] = self.start_date
# Set schedule argument to explicitly set value, or a default if no
# other scheduling arguments are set.
if schedule is not NOTSET:
self.kwargs["schedule"] = schedule
elif "timetable" not in self.kwargs and "schedule_interval" not in self.kwargs:
self.kwargs["schedule"] = timedelta(days=1)
self.dag = DAG(dag_id, **self.kwargs)
self.dag.fileloc = fileloc or request.module.__file__
self.want_serialized = serialized
Expand Down
13 changes: 12 additions & 1 deletion tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,13 @@
from tests.models import TEST_DAGS_FOLDER
from tests.test_utils.compat import ParseImportError
from tests.test_utils.config import conf_vars
from tests.test_utils.db import clear_db_callbacks, clear_db_dags, clear_db_runs, clear_db_serialized_dags
from tests.test_utils.db import (
clear_db_callbacks,
clear_db_dags,
clear_db_import_errors,
clear_db_runs,
clear_db_serialized_dags,
)

pytestmark = pytest.mark.db_test

Expand Down Expand Up @@ -148,7 +154,12 @@ def run_processor_manager_one_loop(self, manager, parent_pipe):
return results
raise RuntimeError("Shouldn't get here - nothing to read, but manager not finished!")

@pytest.fixture
def clear_parse_import_errors(self):
clear_db_import_errors()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.usefixtures("clear_parse_import_errors")
@conf_vars({("core", "load_examples"): "False"})
def test_remove_file_clears_import_error(self, tmp_path):
path_to_parse = tmp_path / "temp_dag.py"
Expand Down
4 changes: 3 additions & 1 deletion tests/dags/test_cli_triggered_dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ def success(ti=None, *args, **kwargs):
# DAG tests that tasks ignore all dependencies

dag1 = DAG(
dag_id="test_run_ignores_all_dependencies", default_args=dict(depends_on_past=True, **default_args)
dag_id="test_run_ignores_all_dependencies",
schedule=None,
default_args={"depends_on_past": True, **default_args},
)
dag1_task1 = PythonOperator(task_id="test_run_dependency_task", python_callable=fail, dag=dag1)
dag1_task2 = PythonOperator(task_id="test_run_dependent_task", python_callable=success, dag=dag1)
Expand Down
2 changes: 1 addition & 1 deletion tests/dags/test_dagrun_fast_follow.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@


dag_id = "test_dagrun_fast_follow"
dag = DAG(dag_id=dag_id, default_args=args)
dag = DAG(dag_id=dag_id, schedule=None, default_args=args)

# A -> B -> C
task_a = PythonOperator(task_id="A", dag=dag, python_callable=lambda: True)
Expand Down
Loading