diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index ae46feb5aa1d0..fe07a57c99af5 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -523,6 +523,10 @@ scheduler_zombie_task_threshold = 300 # DAG definition (catchup) catchup_by_default = True +# Should DAGs be scheduled to execute at the 'start' or 'end' +# of their schedule interval. +schedule_at_interval_end = True + # This changes the batch size of queries in the scheduling main loop. # If this is too high, SQL query performance may be impacted by one # or more of the following: diff --git a/airflow/jobs/scheduler_job.py b/airflow/jobs/scheduler_job.py index 94065bf91e199..eac70ba5bcdc9 100644 --- a/airflow/jobs/scheduler_job.py +++ b/airflow/jobs/scheduler_job.py @@ -426,8 +426,8 @@ def manage_slas(self, dag, session=None): if isinstance(task.sla, timedelta): dttm = dag.following_schedule(dttm) while dttm < timezone.utcnow(): - following_schedule = dag.following_schedule(dttm) - if following_schedule + task.sla < timezone.utcnow(): + period_end = dag.period_end(dttm) + if period_end + task.sla < timezone.utcnow(): session.merge(SlaMiss( task_id=ti.task_id, dag_id=ti.dag_id, @@ -595,7 +595,7 @@ def create_dag_run(self, dag, session=None): # one period before, so that timezone.utcnow() is AFTER # the period end, and the job can be created... now = timezone.utcnow() - next_start = dag.following_schedule(now) + next_start = dag.period_end(now) last_start = dag.previous_schedule(now) if next_start <= now: new_start = last_start @@ -648,7 +648,7 @@ def create_dag_run(self, dag, session=None): if dag.schedule_interval == '@once': period_end = next_run_date elif next_run_date: - period_end = dag.following_schedule(next_run_date) + period_end = dag.period_end(next_run_date) # Don't schedule a dag beyond its end_date (as specified by the dag param) if next_run_date and dag.end_date and next_run_date > dag.end_date: @@ -1214,7 +1214,7 @@ def _process_dags(self, dagbag, dags, tis_out): if not dag.is_subdag: dag_run = self.create_dag_run(dag) if dag_run: - expected_start_date = dag.following_schedule(dag_run.execution_date) + expected_start_date = dag.period_end(dag_run.execution_date) if expected_start_date: schedule_delay = dag_run.start_date - expected_start_date Stats.timing( diff --git a/airflow/models/dag.py b/airflow/models/dag.py index a6939bfee402b..2e1393d32f032 100644 --- a/airflow/models/dag.py +++ b/airflow/models/dag.py @@ -96,6 +96,9 @@ class DAG(BaseDag, LoggingMixin): :type schedule_interval: datetime.timedelta or dateutil.relativedelta.relativedelta or str that acts as a cron expression + :param schedule_at_interval_end: Should a DAG be scheduled to run at + the end (True, default) or the start (False) of the scheduled interval. + :type schedule_at_interval_end: bool :param start_date: The timestamp from which the scheduler will attempt to backfill :type start_date: datetime.datetime @@ -222,6 +225,7 @@ def __init__( params: Optional[Dict] = None, access_control: Optional[Dict] = None, is_paused_upon_creation: Optional[bool] = None, + schedule_at_interval_end: bool = conf.get('scheduler', 'schedule_at_interval_end'), jinja_environment_kwargs: Optional[Dict] = None ): self.user_defined_macros = user_defined_macros @@ -287,6 +291,7 @@ def __init__( self._schedule_interval = None else: self._schedule_interval = schedule_interval + self.schedule_at_interval_end = schedule_at_interval_end if isinstance(template_searchpath, str): template_searchpath = [template_searchpath] self.template_searchpath = template_searchpath @@ -389,6 +394,35 @@ def is_fixed_time_schedule(self): return False + def period_end(self, dttm): + """ + Calculates the end time of the period in which dttm lies. + + :param dttm: utc datetime + :return: utc datetime + """ + + if isinstance(self._schedule_interval, str): + # we don't want to rely on the transitions created by + # croniter as they are not always correct + dttm = pendulum.instance(dttm) + naive = timezone.make_naive(dttm, self.timezone) + cron = croniter(self._schedule_interval, naive) + + period_end = cron.get_next(datetime) + + if not self.schedule_at_interval_end: + next_period_end = cron.get_next(datetime) + interval = next_period_end - period_end + period_end -= interval + + return timezone.convert_to_utc(period_end) + elif self._schedule_interval is not None: + if self.schedule_at_interval_end: + return dttm + self._schedule_interval + else: + return dttm + def following_schedule(self, dttm): """ Calculates the following schedule for this dag in UTC. diff --git a/airflow/sensors/time_delta_sensor.py b/airflow/sensors/time_delta_sensor.py index 4fee53e73b58a..adfda4bb891e8 100644 --- a/airflow/sensors/time_delta_sensor.py +++ b/airflow/sensors/time_delta_sensor.py @@ -24,7 +24,8 @@ class TimeDeltaSensor(BaseSensorOperator): """ - Waits for a timedelta after the task's execution_date + schedule_interval. + Waits for a timedelta after the task's execution_date + schedule_interval + (if schedule_at_interval_end is True, otherwise just execution_date). In Airflow, the daily task stamped with ``execution_date`` 2016-01-01 can only start running on 2016-01-02. The timedelta here represents the time after the execution period has closed. @@ -40,7 +41,7 @@ def __init__(self, delta, *args, **kwargs): def poke(self, context): dag = context['dag'] - target_dttm = dag.following_schedule(context['execution_date']) + target_dttm = dag.period_end(context['execution_date']) target_dttm += self.delta self.log.info('Checking if the time (%s) has come', target_dttm) return timezone.utcnow() > target_dttm diff --git a/docs/scheduler.rst b/docs/scheduler.rst index c958ec77f514f..2355ce6aab927 100644 --- a/docs/scheduler.rst +++ b/docs/scheduler.rst @@ -32,7 +32,7 @@ Airflow production environment. To kick it off, all you need to do is execute ``airflow scheduler``. It will use the configuration specified in ``airflow.cfg``. -Note that if you run a DAG on a ``schedule_interval`` of one day, +Note that, by default, if you run a DAG on a ``schedule_interval`` of one day, the run stamped ``2016-01-01`` will be triggered soon after ``2016-01-01T23:59``. In other words, the job instance is started once the period it covers has ended. @@ -96,6 +96,76 @@ should be triggered and come to a crawl. It might also create undesired processing when changing the shape of your DAG, by say adding in new tasks. +Scheduled Time vs Execution Time +'''''''''''''''''''''''''''''''' + +A DAG with a ``schedule_interval`` will execute once per interval. By +default, the execution of a DAG will occur at the **end** of the +schedule interval. + +A few examples: + +- A DAG with ``schedule_interval='@hourly'``: The DAG run that processes + 2019-08-16 17:00 will start running just after 2019-08-16 17:59:59, + i.e. once that hour is over. +- A DAG with ``schedule_interval='@daily'``: The DAG run that processes + 2019-08-16 will start running shortly after 2019-08-17 00:00. + +The reasoning behind this execution vs scheduling behaviour is that +data for the interval to be processed won't be fully available until +the interval has elapsed. + +In cases where you wish the DAG to be executed at the **start** of the +interval, specify ``schedule_at_interval_end=False``, either in +``airflow.cfg``, or on a per-DAG basis. + +The following example illustrates the differences + +.. code:: python + + """ + Code that goes along with the Airflow tutorial located at: + https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py + """ + + from airflow import DAG + from airflow.operators.bash_operator import BashOperator + from datetime import datetime, timedelta + + default_args = { + 'owner': 'Airflow', + 'depends_on_past': False, + 'start_date': datetime(2015, 12, 1), + 'email': ['airflow@example.com'], + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5) + } + + # Processing: 2019-08-08 + # Execution: Shortly after 2019-08-09 00:00 + # Will execute at the end of the day to be processed + eod = DAG( + 'tutorial', + default_args=default_args, + description='A simple tutorial DAG', + schedule_interval='@daily', + catchup=False) + + + # Processing: 2019-08-08 + # Execution: Shortly after 2019-08-08 00:00 + # Will execute at the start of the day to be processed + start_of_day = DAG( + 'tutorial', + default_args=default_args, + description='A simple tutorial DAG', + schedule_interval='@daily', + catchup=False, + schedule_at_interval_end=False + ) + Backfill and Catchup '''''''''''''''''''' diff --git a/tests/jobs/test_scheduler_job.py b/tests/jobs/test_scheduler_job.py index eb46f42166655..1415d9db6e680 100644 --- a/tests/jobs/test_scheduler_job.py +++ b/tests/jobs/test_scheduler_job.py @@ -2734,3 +2734,38 @@ def test_find_dags_to_run_skip_paused_dags(self): dags = scheduler._find_dags_to_process(dagbag.dags.values(), paused_dag_ids=[dag.dag_id]) self.assertNotIn(dag, dags) + + def test_schedule_at_interval_end(self): + today = datetime.date.today() + start_date = timezone.datetime(today.year, today.month, today.day) + # default to run at end of the interval + end_dag_id = 'test_schedule_at_interval_end_true' + end_dag = DAG(dag_id=end_dag_id, + start_date=start_date, + schedule_interval='@daily', + schedule_at_interval_end=True + ) + end_task_id = end_dag_id + '_task' + DummyOperator(task_id=end_task_id, dag=end_dag) + + # + start_dag_id = 'test_schedule_at_interval_end_false' + start_dag = DAG(dag_id=start_dag_id, + start_date=start_date, + schedule_interval='@daily', + schedule_at_interval_end=False + ) + start_task_id = start_dag_id + '_task' + DummyOperator(task_id=start_task_id, dag=start_dag) + + scheduler = SchedulerJob() + + # With no catchup, and today as a start date, there + # should be no pending run for the end edge + end_dr = scheduler.create_dag_run(end_dag) + self.assertIsNone(end_dr) + + # But there should be if the start edge is used + # for scheduling + start_dr = scheduler.create_dag_run(start_dag) + self.assertIsNotNone(start_dr) diff --git a/tests/models/test_dag.py b/tests/models/test_dag.py index 74f98128cb9a5..e9442f0bab689 100644 --- a/tests/models/test_dag.py +++ b/tests/models/test_dag.py @@ -890,3 +890,11 @@ def test_tree_view(self): self.assertIn('t1', stdout_lines[0]) self.assertIn('t2', stdout_lines[1]) self.assertIn('t3', stdout_lines[2]) + + def test_period_end(self): + """Verify period_end calculation.""" + from datetime import timedelta + with DAG("test_dag", start_date=DEFAULT_DATE, schedule_at_interval_end=True) as dag: + self.assertEqual(dag.period_end(DEFAULT_DATE), DEFAULT_DATE + timedelta(days=1)) + with DAG("test_dag", start_date=DEFAULT_DATE, schedule_at_interval_end=False) as dag: + self.assertEqual(dag.period_end(DEFAULT_DATE), DEFAULT_DATE)