Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,10 @@ scheduler_zombie_task_threshold = 300
# DAG definition (catchup)
catchup_by_default = True

# Should DAGs be scheduled to execute at the 'start' or 'end'
# of their schedule interval.
schedule_at_interval_end = True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on the default to be false as this is the intuitive / common understanding on how Airflow scheduling works.

Copy link
Author

@iroddis iroddis Aug 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR is designed to have zero impact to current behaviours, to avoid surprises for devops everywhere and to increase the chance that the PR could be accepted for the next 1.x.x stable release.

Would it make more sense to leave this PR as is, and then submit a second PR to make the default False at a later date, for 2.0?


# This changes the batch size of queries in the scheduling main loop.
# If this is too high, SQL query performance may be impacted by one
# or more of the following:
Expand Down
10 changes: 5 additions & 5 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,8 @@ def manage_slas(self, dag, session=None):
if isinstance(task.sla, timedelta):
dttm = dag.following_schedule(dttm)
while dttm < timezone.utcnow():
following_schedule = dag.following_schedule(dttm)
if following_schedule + task.sla < timezone.utcnow():
period_end = dag.period_end(dttm)
if period_end + task.sla < timezone.utcnow():
session.merge(SlaMiss(
task_id=ti.task_id,
dag_id=ti.dag_id,
Expand Down Expand Up @@ -595,7 +595,7 @@ def create_dag_run(self, dag, session=None):
# one period before, so that timezone.utcnow() is AFTER
# the period end, and the job can be created...
now = timezone.utcnow()
next_start = dag.following_schedule(now)
next_start = dag.period_end(now)
last_start = dag.previous_schedule(now)
if next_start <= now:
new_start = last_start
Expand Down Expand Up @@ -648,7 +648,7 @@ def create_dag_run(self, dag, session=None):
if dag.schedule_interval == '@once':
period_end = next_run_date
elif next_run_date:
period_end = dag.following_schedule(next_run_date)
period_end = dag.period_end(next_run_date)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems there is an inconsistency in the language here.

The function is called period_end, yet sometimes it returns the start of the interval.

The name period_end for this variable reflects an assumption that dags are scheduled after the end of the interval.

That's why the code checks that period_end <= timezone.utcnow(): end of interval is in the past.

But language of this PR is in conflict with that.

The parameter schedule_at_interval_end implies that the interval doesn't change, but where we schedule does. So, we may schedule at start or end of "the interval". But as written, if schedule_at_interval_end=False, it will in general be the case that period_end==excecution_date, which implies that exec date is the end of "the interval" and not the start, and this is a contradiction.

It seems that what period_end represents in this code is more like run_after_dttm -- the datetime before which the dag may not be scheduled. When schedule_at_interval_end is True, we can run after exec date + 1 interval; otherwise, we can run after exec date.

So in this bit of code, we probably don't even need period_end() because we could do this:

            run_after_dttm = None
            if dag.schedule_interval == '@once':
                run_after_dttm = next_run_date
            elif next_run_date and not self.schedule_at_interval_end:
                run_after_dttm = next_run_date
            elif next_run_date and self.schedule_at_interval_end:
                run_after_dttm = dag.following_schedule(next_run_date)

And this:

            if next_run_date and run_after_dttm and run_after_dttm <= timezone.utcnow():

But elsewhere, it seems that period_end() function is used to mean min_run_date or target_run_date or run_after_date. Perhaps a name like this would be clearer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nomenclature has absolutely been a pain in this change. The entire vocabulary around scheduling seems to have exploded, and I can't find a clear definition for all the terms. This is well outside the scope of this change, but having well defined terms for things like:

period_start_ts - Timestamp of the start of the period to be processed
period_end_ts - Timestamp of the end of the period to be processed
scheduled_execution_ts - Timestamp of when the execution should start
execution_ts - Timestamp of actual execution start

Many of the current names, like start_date, are datetimes, not dates. It's also not immediately obvious what "start" means: is it period_start_ts, or execution_ts?

I'm happy to change the functions to be whatever is popular, but I think there'd be real value in defining, clearly, and in a single spot, all of the timestamps around a particular DAG's execution, giving them meaningful names, and consolidating on that.

Copy link
Contributor

@dstandish dstandish Oct 30, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah i'm not saying you are responsible for all confusing naming, just suggesting not to add to it ;)

period_end is a new function.

And I think it is definitely in scope to change the local period_end variables since you are, in effect, changing their meaning in this PR.


# Don't schedule a dag beyond its end_date (as specified by the dag param)
if next_run_date and dag.end_date and next_run_date > dag.end_date:
Expand Down Expand Up @@ -1214,7 +1214,7 @@ def _process_dags(self, dagbag, dags, tis_out):
if not dag.is_subdag:
dag_run = self.create_dag_run(dag)
if dag_run:
expected_start_date = dag.following_schedule(dag_run.execution_date)
expected_start_date = dag.period_end(dag_run.execution_date)
if expected_start_date:
schedule_delay = dag_run.start_date - expected_start_date
Stats.timing(
Expand Down
34 changes: 34 additions & 0 deletions airflow/models/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ class DAG(BaseDag, LoggingMixin):
:type schedule_interval: datetime.timedelta or
dateutil.relativedelta.relativedelta or str that acts as a cron
expression
:param schedule_at_interval_end: Should a DAG be scheduled to run at
the end (True, default) or the start (False) of the scheduled interval.
:type schedule_at_interval_end: bool
:param start_date: The timestamp from which the scheduler will
attempt to backfill
:type start_date: datetime.datetime
Expand Down Expand Up @@ -222,6 +225,7 @@ def __init__(
params: Optional[Dict] = None,
access_control: Optional[Dict] = None,
is_paused_upon_creation: Optional[bool] = None,
schedule_at_interval_end: bool = conf.get('scheduler', 'schedule_at_interval_end'),
jinja_environment_kwargs: Optional[Dict] = None
):
self.user_defined_macros = user_defined_macros
Expand Down Expand Up @@ -287,6 +291,7 @@ def __init__(
self._schedule_interval = None
else:
self._schedule_interval = schedule_interval
self.schedule_at_interval_end = schedule_at_interval_end
if isinstance(template_searchpath, str):
template_searchpath = [template_searchpath]
self.template_searchpath = template_searchpath
Expand Down Expand Up @@ -389,6 +394,35 @@ def is_fixed_time_schedule(self):

return False

def period_end(self, dttm):
"""
Calculates the end time of the period in which dttm lies.

:param dttm: utc datetime
:return: utc datetime
"""

if isinstance(self._schedule_interval, str):
# we don't want to rely on the transitions created by
# croniter as they are not always correct
dttm = pendulum.instance(dttm)
naive = timezone.make_naive(dttm, self.timezone)
cron = croniter(self._schedule_interval, naive)

period_end = cron.get_next(datetime)

if not self.schedule_at_interval_end:
next_period_end = cron.get_next(datetime)
interval = next_period_end - period_end
period_end -= interval

return timezone.convert_to_utc(period_end)
elif self._schedule_interval is not None:
if self.schedule_at_interval_end:
return dttm + self._schedule_interval
else:
return dttm

def following_schedule(self, dttm):
"""
Calculates the following schedule for this dag in UTC.
Expand Down
5 changes: 3 additions & 2 deletions airflow/sensors/time_delta_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@

class TimeDeltaSensor(BaseSensorOperator):
"""
Waits for a timedelta after the task's execution_date + schedule_interval.
Waits for a timedelta after the task's execution_date + schedule_interval
(if schedule_at_interval_end is True, otherwise just execution_date).
In Airflow, the daily task stamped with ``execution_date``
2016-01-01 can only start running on 2016-01-02. The timedelta here
represents the time after the execution period has closed.
Expand All @@ -40,7 +41,7 @@ def __init__(self, delta, *args, **kwargs):

def poke(self, context):
dag = context['dag']
target_dttm = dag.following_schedule(context['execution_date'])
target_dttm = dag.period_end(context['execution_date'])
target_dttm += self.delta
self.log.info('Checking if the time (%s) has come', target_dttm)
return timezone.utcnow() > target_dttm
72 changes: 71 additions & 1 deletion docs/scheduler.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Airflow production environment. To kick it off, all you need to do is
execute ``airflow scheduler``. It will use the configuration specified in
``airflow.cfg``.

Note that if you run a DAG on a ``schedule_interval`` of one day,
Note that, by default, if you run a DAG on a ``schedule_interval`` of one day,
the run stamped ``2016-01-01`` will be triggered soon after ``2016-01-01T23:59``.
In other words, the job instance is started once the period it covers
has ended.
Expand Down Expand Up @@ -96,6 +96,76 @@ should be triggered and come to a crawl. It might also create undesired
processing when changing the shape of your DAG, by say adding in new
tasks.

Scheduled Time vs Execution Time
''''''''''''''''''''''''''''''''

A DAG with a ``schedule_interval`` will execute once per interval. By
default, the execution of a DAG will occur at the **end** of the
schedule interval.

A few examples:

- A DAG with ``schedule_interval='@hourly'``: The DAG run that processes
2019-08-16 17:00 will start running just after 2019-08-16 17:59:59,
i.e. once that hour is over.
- A DAG with ``schedule_interval='@daily'``: The DAG run that processes
2019-08-16 will start running shortly after 2019-08-17 00:00.

The reasoning behind this execution vs scheduling behaviour is that
data for the interval to be processed won't be fully available until
the interval has elapsed.

In cases where you wish the DAG to be executed at the **start** of the
interval, specify ``schedule_at_interval_end=False``, either in
``airflow.cfg``, or on a per-DAG basis.

The following example illustrates the differences

.. code:: python

"""
Code that goes along with the Airflow tutorial located at:
https://github.com/apache/airflow/blob/master/airflow/example_dags/tutorial.py
"""

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'start_date': datetime(2015, 12, 1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}

# Processing: 2019-08-08
# Execution: Shortly after 2019-08-09 00:00
# Will execute at the end of the day to be processed
eod = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval='@daily',
catchup=False)


# Processing: 2019-08-08
# Execution: Shortly after 2019-08-08 00:00
# Will execute at the start of the day to be processed
start_of_day = DAG(
'tutorial',
default_args=default_args,
description='A simple tutorial DAG',
schedule_interval='@daily',
catchup=False,
schedule_at_interval_end=False
)

Backfill and Catchup
''''''''''''''''''''

Expand Down
35 changes: 35 additions & 0 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2734,3 +2734,38 @@ def test_find_dags_to_run_skip_paused_dags(self):
dags = scheduler._find_dags_to_process(dagbag.dags.values(), paused_dag_ids=[dag.dag_id])

self.assertNotIn(dag, dags)

def test_schedule_at_interval_end(self):
today = datetime.date.today()
start_date = timezone.datetime(today.year, today.month, today.day)
# default to run at end of the interval
end_dag_id = 'test_schedule_at_interval_end_true'
end_dag = DAG(dag_id=end_dag_id,
start_date=start_date,
schedule_interval='@daily',
schedule_at_interval_end=True
)
end_task_id = end_dag_id + '_task'
DummyOperator(task_id=end_task_id, dag=end_dag)

#
start_dag_id = 'test_schedule_at_interval_end_false'
start_dag = DAG(dag_id=start_dag_id,
start_date=start_date,
schedule_interval='@daily',
schedule_at_interval_end=False
)
start_task_id = start_dag_id + '_task'
DummyOperator(task_id=start_task_id, dag=start_dag)

scheduler = SchedulerJob()

# With no catchup, and today as a start date, there
# should be no pending run for the end edge
end_dr = scheduler.create_dag_run(end_dag)
self.assertIsNone(end_dr)

# But there should be if the start edge is used
# for scheduling
start_dr = scheduler.create_dag_run(start_dag)
self.assertIsNotNone(start_dr)
8 changes: 8 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,3 +890,11 @@ def test_tree_view(self):
self.assertIn('t1', stdout_lines[0])
self.assertIn('t2', stdout_lines[1])
self.assertIn('t3', stdout_lines[2])

def test_period_end(self):
"""Verify period_end calculation."""
from datetime import timedelta
with DAG("test_dag", start_date=DEFAULT_DATE, schedule_at_interval_end=True) as dag:
self.assertEqual(dag.period_end(DEFAULT_DATE), DEFAULT_DATE + timedelta(days=1))
with DAG("test_dag", start_date=DEFAULT_DATE, schedule_at_interval_end=False) as dag:
self.assertEqual(dag.period_end(DEFAULT_DATE), DEFAULT_DATE)