-
Notifications
You must be signed in to change notification settings - Fork 16.4k
[AIRFLOW-5172] Add choice of interval edge scheduling #5787
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
This needs some docs re-writing as a result -https://airflow.apache.org/scheduler.html (which lives under docs/ somewhere) at the least. |
|
See also https://github.com/apache/airflow/pull/4768/files for previous discussion about this. I think I'm okay with this generally, but we'll need to make sure the docs about it are clear - i.e. more than just a comment in a config file; likely a section in that scheduler page with some images/pictures of timelines. Page 14 of https://drive.google.com/open?id=1DVN4HXtOC-HXvv00sEkoB90mxLDnCIKc may be a starting point for that image. |
|
Yup, I'll make those updates. After more thought, I'm also going to change the variable from a str to a bool, and likely the name to something like schedule_at_interval_end. Any suggestions for better names? |
baca170 to
7ca8717
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if schedule_at_interval_end is a better/clearer name.
I'm probably splitting hairs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm all for hair splitting if it means clearer usage down the road, and is more consistent with established nomenclature.
Part of my confusion is that schedule seams to have multiple meanings: A job scheduled daily will be scheduled to run once the day has finished. So there's the declared schedule, and then the implied schedule (execution time).
I have no vested interest in the name, just let me know if you'd like to change it.
Either way, maybe it would be worth adding (in a separate PR) a dictionary of terms and precise definitions? I'd be happy submit that after this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All these terms are confusing and overlapping.
The reason I wanted to avoid execute is because the execution_date property of the task will remain constant non matter what this setting. So I think this does need to change, but I'm not at all convinced my wording is better.
Either way, maybe it would be worth adding (in a separate PR) a dictionary of terms and precise definitions? I'd be happy submit that after this.
Yes, that would be great!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the name, I like it, and it makes sense. Also made a few more very small documentation changes, and squashed all the commits. Please let me know if you'd like to change anything else.
Do you think there's any way this could get cherry picked for the next stable release? Right now I'm using a hacked up local version to get this behaviour, and it would be great to get back to upstream stable.
cc90955 to
97b516a
Compare
|
I'm not sure why CI failed on this push ... is it possible to re-run them? edit: Nevermind. I see the same tests failing in my local repo. They also fail when I test against my current master branch. |
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy with the code on this change, I'm opening a thread on the mailing list to discuss this.
(Committers: please don't merge without checking that thread first)
|
Really like the spirit of this change. My vote would be to change the flag to schedule_at_interval_start and default it to false for now as scheduling_at_interval_start is the more intuitive behavior. What are people's feelings on changing the default execution to schedule interval start and communicating this to existing users in the Updating notes so that they can preserve the old behavior? Could potentially cause headaches for users who don't read the notes but I think it might make sense to bite the bullet at some point for more intuitive behavior overall for new users. I haven't looked that carefully, but I think there are potentially a lot more places in the code that need to be changed that use dag.following_schedule(), e.g. latest_only_operator, cli.py. In fact it might make more sense to make the change in the following_schedule function itself. There are also functions previous_schedule that might need to be modified too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 on the default to be false as this is the intuitive / common understanding on how Airflow scheduling works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PR is designed to have zero impact to current behaviours, to avoid surprises for devops everywhere and to increase the chance that the PR could be accepted for the next 1.x.x stable release.
Would it make more sense to leave this PR as is, and then submit a second PR to make the default False at a later date, for 2.0?
It might be more intuitive to new users but this would mean that every single dag in existence right now would need to be updated, so it's a very disrputive change.
Yes, that seems sensible. @iroddis Could you take a look at this and see if it makes sense or other places need updating too? |
airflow/jobs/scheduler_job.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does similar logic need to be applied to manage_slas?
|
Sorry for the random commits on this. Getting the unit tests running is still a challenge, especially on osx where I'm doing these changes currently. |
97b516a to
9bebb1f
Compare
Adds a boolean DAG attribute schedule_at_interval_end. If true (default), jobs will be scheduled to run shortly after the schedule_interval has elapsed (current airflow behaviour). If False, the DAG will be scheduled to run at the beginning of the schedule interval (new behaviour). This allows for scheduling that more closely mirrors a typical cron schedule. Adds a period_end method to DAG to return the datetime of the end of a period. Adds support in the SchedulerJob for this feature. Adds a default of True to the default config in the [scheduler] section. Add unit test for start/end interval scheduling. Updates docs/scheduler.rst to explain the new behaviour. Co-Authored-By: Ash Berlin-Taylor <ash_github@firemirror.com> Update docs/scheduler.rst Replacing straight following_schedule calls with period_end calls as appropriate to handle the new start/end of schedule interval scheduling.
9bebb1f to
e143fe8
Compare
Codecov Report
@@ Coverage Diff @@
## master #5787 +/- ##
==========================================
- Coverage 80.09% 80.04% -0.06%
==========================================
Files 612 612
Lines 35320 35336 +16
==========================================
- Hits 28289 28284 -5
- Misses 7031 7052 +21
Continue to review full report at Codecov.
|
|
The assumption that the run time for an interval is the dag.following_schedule(dttm) is baked in all over the place. I've found some more cases, and rebased off of current master. Making the change in following_schedule() was not the right route; that method is frequently used to iterate through run times. I instead added a dag.period_end(dttm) method to use that incorporates the new As of this last checkin, all tests seem to be passing except some that look like race conditions, as they'll pass and not pass from run to run. |
|
Pretty much interested in this functionality. Any upcoming plans to make this changes part of release before v2.0, if not we'd like to cherry pick this commit for v1.10.5 and see if it works. BTW do we have any guidance, if we can pick this up and merge with older version specifically v1.10.2? |
|
I haven't run the tests or tried to backport the functionality on older releases. All of the tests are currently passing, but I'm sure there are edge cases. I'd love to see this in a release, maybe tagged as an alpha feature, to get some wider testing. |
| period_end = next_run_date | ||
| elif next_run_date: | ||
| period_end = dag.following_schedule(next_run_date) | ||
| period_end = dag.period_end(next_run_date) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems there is an inconsistency in the language here.
The function is called period_end, yet sometimes it returns the start of the interval.
The name period_end for this variable reflects an assumption that dags are scheduled after the end of the interval.
That's why the code checks that period_end <= timezone.utcnow(): end of interval is in the past.
But language of this PR is in conflict with that.
The parameter schedule_at_interval_end implies that the interval doesn't change, but where we schedule does. So, we may schedule at start or end of "the interval". But as written, if schedule_at_interval_end=False, it will in general be the case that period_end==excecution_date, which implies that exec date is the end of "the interval" and not the start, and this is a contradiction.
It seems that what period_end represents in this code is more like run_after_dttm -- the datetime before which the dag may not be scheduled. When schedule_at_interval_end is True, we can run after exec date + 1 interval; otherwise, we can run after exec date.
So in this bit of code, we probably don't even need period_end() because we could do this:
run_after_dttm = None
if dag.schedule_interval == '@once':
run_after_dttm = next_run_date
elif next_run_date and not self.schedule_at_interval_end:
run_after_dttm = next_run_date
elif next_run_date and self.schedule_at_interval_end:
run_after_dttm = dag.following_schedule(next_run_date)
And this:
if next_run_date and run_after_dttm and run_after_dttm <= timezone.utcnow():
But elsewhere, it seems that period_end() function is used to mean min_run_date or target_run_date or run_after_date. Perhaps a name like this would be clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nomenclature has absolutely been a pain in this change. The entire vocabulary around scheduling seems to have exploded, and I can't find a clear definition for all the terms. This is well outside the scope of this change, but having well defined terms for things like:
period_start_ts - Timestamp of the start of the period to be processed
period_end_ts - Timestamp of the end of the period to be processed
scheduled_execution_ts - Timestamp of when the execution should start
execution_ts - Timestamp of actual execution start
Many of the current names, like start_date, are datetimes, not dates. It's also not immediately obvious what "start" means: is it period_start_ts, or execution_ts?
I'm happy to change the functions to be whatever is popular, but I think there'd be real value in defining, clearly, and in a single spot, all of the timestamps around a particular DAG's execution, giving them meaningful names, and consolidating on that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah i'm not saying you are responsible for all confusing naming, just suggesting not to add to it ;)
period_end is a new function.
And I think it is definitely in scope to change the local period_end variables since you are, in effect, changing their meaning in this PR.
@ashb To clarify, the idea here is to set a cluster-level default in the airflow.cfg to use the new semantics (execution date = start date), then anyone who owns an Airflow cluster with many customers can just make sure to change the default value back to the old semantics when we change the cluster-level default; no dags would need to be updated on such clusters. Anybody setting up Airflow for the first time would have sensical semantics, and maybe we can even deprecate the old semantics after some point in time since I don't think many new users will use it. |
|
@aoen That sounds sensible, so long as we clearly "sign-post" the change for users when upgrading somehow. The default could only change from 2.0 though. Are we still talking about having a per-dag setting to, or are you thinking that we just have a single global config setting? |
|
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
|
do we want to include this in 2.0? |
|
@mistercrunch @kaxil @potiuk @mik-laj @turbaszek @ashb @iroddis @ryw |
|
@mistercrunch @kaxil @potiuk @mik-laj @turbaszek @ashb @iroddis @ryw |
|
@iroddis I'm pushing to get this into 2.0. Do you have time to iterate on the PR over the next week? Addressing @dstandish concerns, etc, rebasing it on the HA scheduler code (which we will merge very soon). If you don't have the time, we can take the PR over to fix it up and get it merged. |
|
@ryw I’m sorry, I stopped working on this a long time ago, and I don’t know when I’ll have the time to pick it back up. I think some of the comments were correct about subtler edge cases that weren’t accounted for with scheduling, but got lost in the weeds before I could figure them out. |
|
Going to close out this PR (thansk for opening it!) as we're doing a more complete implementation in https://github.com/apache/airflow/projects/10 as part of AIP-39 |
Make sure you have checked all steps below.
Jira
Description
This change introduces the attribute
schedule_interval_edge, a string containing either 'start' or 'end', to DAGs. The scheduler uses the value to determining if a DAG should be scheduled at the start or the end of the schedule interval.A parameter with the same name was also added to the default_airflow.cfg in the [scheduler] section.
Tests
tests.jobs.test_scheduler_job:SchedulerJobTest.test_schedule_interval_edge
Commits
Documentation
Code Quality
flake8