From e143fe8abdc30bfa5f2eb095a40b4b728252c518 Mon Sep 17 00:00:00 2001 From: Ian Roddis Date: Mon, 7 Oct 2019 12:33:48 -0300 Subject: [PATCH] [AIRFLOW-5172] Add core schedule_at_interval_end code Adds a boolean DAG attribute schedule_at_interval_end. If true (default), jobs will be scheduled to run shortly after the schedule_interval has elapsed (current airflow behaviour). If False, the DAG will be scheduled to run at the beginning of the schedule interval (new behaviour). This allows for scheduling that more closely mirrors a typical cron schedule. Adds a period_end method to DAG to return the datetime of the end of a period. Adds support in the SchedulerJob for this feature. Adds a default of True to the default config in the [scheduler] section. Add unit test for start/end interval scheduling. Updates docs/scheduler.rst to explain the new behaviour. Co-Authored-By: Ash Berlin-Taylor Update docs/scheduler.rst Replacing straight following_schedule calls with period_end calls as appropriate to handle the new start/end of schedule interval scheduling. --- airflow/config_templates/default_airflow.cfg | 4 ++ airflow/jobs/scheduler_job.py | 10 +-- airflow/models/dag.py | 34 +++++++++ airflow/sensors/time_delta_sensor.py | 5 +- docs/scheduler.rst | 72 +++++++++++++++++++- tests/jobs/test_scheduler_job.py | 35 ++++++++++ tests/models/test_dag.py | 8 +++ 7 files changed, 160 insertions(+), 8 deletions(-) diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index ae46feb5aa1d0..fe07a57c99af5 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -523,6 +523,10 @@ scheduler_zombie_task_threshold = 300 # DAG definition (catchup) catchup_by_default = True +# Should DAGs be scheduled to execute at the 'start' or 'end' +# of their schedule interval. +schedule_at_interval_end = True + # This changes the batch size of queries in the scheduling main loop. # If this is too high, SQL query performance may be impacted by one # or more of the following: diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 94065bf91e199..eac70ba5bcdc9 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -426,8 +426,8 @@ def manage_slas(self, dag, session=None): if isinstance(task.sla, timedelta): dttm = dag.following_schedule(dttm) while dttm < timezone.utcnow(): - following_schedule = dag.following_schedule(dttm) - if following_schedule + task.sla < timezone.utcnow(): + period_end = dag.period_end(dttm) + if period_end + task.sla < timezone.utcnow(): session.merge(SlaMiss( task_id=ti.task_id, dag_id=ti.dag_id, @@ -595,7 +595,7 @@ def create_dag_run(self, dag, session=None): # one period before, so that timezone.utcnow() is AFTER # the period end, and the job can be created... now = timezone.utcnow() - next_start = dag.following_schedule(now) + next_start = dag.period_end(now) last_start = dag.previous_schedule(now) if next_start <= now: new_start = last_start @@ -648,7 +648,7 @@ def create_dag_run(self, dag, session=None): if dag.schedule_interval == '@once': period_end = next_run_date elif next_run_date: - period_end = dag.following_schedule(next_run_date) + period_end = dag.period_end(next_run_date) # Don't schedule a dag beyond its end_date (as specified by the dag param) if next_run_date and dag.end_date and next_run_date > dag.end_date: @@ -1214,7 +1214,7 @@ def _process_dags(self, dagbag, dags, tis_out): if not dag.is_subdag: dag_run = self.create_dag_run(dag) if dag_run: - expected_start_date = dag.following_schedule(dag_run.execution_date) + expected_start_date = dag.period_end(dag_run.execution_date) if expected_start_date: schedule_delay = dag_run.start_date - expected_start_date Stats.timing( diff --git a/airflow/models/dag.py b/airflow/models/dag.py index a6939bfee402b..2e1393d32f032 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -96,6 +96,9 @@ class DAG(BaseDag, LoggingMixin): :type schedule_interval: datetime.timedelta or dateutil.relativedelta.relativedelta or str that acts as a cron expression + :param schedule_at_interval_end: Should a DAG be scheduled to run at + the end (True, default) or the start (False) of the scheduled interval. + :type schedule_at_interval_end: bool :param start_date: The timestamp from which the scheduler will attempt to backfill :type start_date: datetime.datetime @@ -222,6 +225,7 @@ def __init__( params: Optional[Dict] = None, access_control: Optional[Dict] = None, is_paused_upon_creation: Optional[bool] = None, + schedule_at_interval_end: bool = conf.get('scheduler', 'schedule_at_interval_end'), jinja_environment_kwargs: Optional[Dict] = None ): self.user_defined_macros = user_defined_macros @@ -287,6 +291,7 @@ def __init__( self._schedule_interval = None else: self._schedule_interval = schedule_interval + self.schedule_at_interval_end = schedule_at_interval_end if isinstance(template_searchpath, str): template_searchpath = [template_searchpath] self.template_searchpath = template_searchpath @@ -389,6 +394,35 @@ def is_fixed_time_schedule(self): return False + def period_end(self, dttm): + """ + Calculates the end time of the period in which dttm lies. + + :param dttm: utc datetime + :return: utc datetime + """ + + if isinstance(self._schedule_interval, str): + # we don't want to rely on the transitions created by + # croniter as they are not always correct + dttm = pendulum.instance(dttm) + naive = timezone.make_naive(dttm, self.timezone) + cron = croniter(self._schedule_interval, naive) + + period_end = cron.get_next(datetime) + + if not self.schedule_at_interval_end: + next_period_end = cron.get_next(datetime) + interval = next_period_end - period_end + period_end -= interval + + return timezone.convert_to_utc(period_end) + elif self._schedule_interval is not None: + if self.schedule_at_interval_end: + return dttm + self._schedule_interval + else: + return dttm + def following_schedule(self, dttm): """ Calculates the following schedule for this dag in UTC. diff --git a/airflow/sensors/time_delta_sensor.py b/airflow/sensors/time_delta_sensor.py index 4fee53e73b58a..adfda4bb891e8 100644 --- a/airflow/sensors/time_delta_sensor.py +++ b/airflow/sensors/time_delta_sensor.py @@ -24,7 +24,8 @@ class TimeDeltaSensor(BaseSensorOperator): """ - Waits for a timedelta after the task's execution_date + schedule_interval. + Waits for a timedelta after the task's execution_date + schedule_interval + (if schedule_at_interval_end is True, otherwise just execution_date). In Airflow, the daily task stamped with ``execution_date`` 2016-01-01 can only start running on 2016-01-02. The timedelta here represents the time after the execution period has closed. @@ -40,7 +41,7 @@ def __init__(self, delta, *args, **kwargs): def poke(self, context): dag = context['dag'] - target_dttm = dag.following_schedule(context['execution_date']) + target_dttm = dag.period_end(context['execution_date']) target_dttm += self.delta self.log.info('Checking if the time (%s) has come', target_dttm) return timezone.utcnow() > target_dttm diff --git a/docs/scheduler.rst b/docs/scheduler.rst index c958ec77f514f..2355ce6aab927 100644 --- a/docs/scheduler.rst +++ b/docs/scheduler.rst @@ -32,7 +32,7 @@ Airflow production environment. To kick it off, all you need to do is execute ``airflow scheduler``. It will use the configuration specified in ``airflow.cfg``. -Note that if you run a DAG on a ``schedule_interval`` of one day, +Note that, by default, if you run a DAG on a ``schedule_interval`` of one day, the run stamped ``2016-01-01`` will be triggered soon after ``2016-01-01T23:59``. In other words, the job instance is started once the period it covers has ended. @@ -96,6 +96,76 @@ should be triggered and come to a crawl. It might also create undesired processing when changing the shape of your DAG, by say adding in new tasks. +Scheduled Time vs Execution Time +'''''''''''''''''''''''''''''''' + +A DAG with a ``schedule_interval`` will execute once per interval. By +default, the execution of a DAG will occur at the **end** of the +schedule interval. + +A few examples: + +- A DAG with ``schedule_interval='@hourly'``: The DAG run that processes + 2019-08-16 17:00 will start running just after 2019-08-16 17:59:59, + i.e. once that hour is over. +- A DAG with ``schedule_interval='@daily'``: The DAG run that processes + 2019-08-16 will start running shortly after 2019-08-17 00:00. + +The reasoning behind this execution vs scheduling behaviour is that +data for the interval to be processed won't be fully available until +the interval has elapsed. + +In cases where you wish the DAG to be executed at the **start** of the +interval, specify ``schedule_at_interval_end=False``, either in +``airflow.cfg``, or on a per-DAG basis. + +The following example illustrates the differences + +.. code:: python + + """ + Code that goes along with the Airflow tutorial located at: + https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py + """ + + from airflow import DAG + from airflow.operators.bash_operator import BashOperator + from datetime import datetime, timedelta + + default_args = { + 'owner': 'Airflow', + 'depends_on_past': False, + 'start_date': datetime(2015, 12, 1), + 'email': ['airflow@example.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5) + } + + # Processing: 2019-08-08 + # Execution: Shortly after 2019-08-09 00:00 + # Will execute at the end of the day to be processed + eod = DAG( + 'tutorial', + default_args=default_args, + description='A simple tutorial DAG', + schedule_interval='@daily', + catchup=False) + + + # Processing: 2019-08-08 + # Execution: Shortly after 2019-08-08 00:00 + # Will execute at the start of the day to be processed + start_of_day = DAG( + 'tutorial', + default_args=default_args, + description='A simple tutorial DAG', + schedule_interval='@daily', + catchup=False, + schedule_at_interval_end=False + ) + Backfill and Catchup '''''''''''''''''''' diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index eb46f42166655..1415d9db6e680 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2734,3 +2734,38 @@ def test_find_dags_to_run_skip_paused_dags(self): dags = scheduler._find_dags_to_process(dagbag.dags.values(), paused_dag_ids=[dag.dag_id]) self.assertNotIn(dag, dags) + + def test_schedule_at_interval_end(self): + today = datetime.date.today() + start_date = timezone.datetime(today.year, today.month, today.day) + # default to run at end of the interval + end_dag_id = 'test_schedule_at_interval_end_true' + end_dag = DAG(dag_id=end_dag_id, + start_date=start_date, + schedule_interval='@daily', + schedule_at_interval_end=True + ) + end_task_id = end_dag_id + '_task' + DummyOperator(task_id=end_task_id, dag=end_dag) + + # + start_dag_id = 'test_schedule_at_interval_end_false' + start_dag = DAG(dag_id=start_dag_id, + start_date=start_date, + schedule_interval='@daily', + schedule_at_interval_end=False + ) + start_task_id = start_dag_id + '_task' + DummyOperator(task_id=start_task_id, dag=start_dag) + + scheduler = SchedulerJob() + + # With no catchup, and today as a start date, there + # should be no pending run for the end edge + end_dr = scheduler.create_dag_run(end_dag) + self.assertIsNone(end_dr) + + # But there should be if the start edge is used + # for scheduling + start_dr = scheduler.create_dag_run(start_dag) + self.assertIsNotNone(start_dr) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 74f98128cb9a5..e9442f0bab689 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -890,3 +890,11 @@ def test_tree_view(self): self.assertIn('t1', stdout_lines[0]) self.assertIn('t2', stdout_lines[1]) self.assertIn('t3', stdout_lines[2]) + + def test_period_end(self): + """Verify period_end calculation.""" + from datetime import timedelta + with DAG("test_dag", start_date=DEFAULT_DATE, schedule_at_interval_end=True) as dag: + self.assertEqual(dag.period_end(DEFAULT_DATE), DEFAULT_DATE + timedelta(days=1)) + with DAG("test_dag", start_date=DEFAULT_DATE, schedule_at_interval_end=False) as dag: + self.assertEqual(dag.period_end(DEFAULT_DATE), DEFAULT_DATE)