-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Don't just default to scheduler heartbeat in jobs #33084
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Now that Triggerer heartbeat is configurable, the heartbeat set on Job must use the correct config source, either Scheduler or Triggerer depending on the Job.
hussein-awala
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! LGTM
| @cached_property | ||
| def heartrate(self): | ||
| if self.job_type == "TriggererJob": | ||
| return conf.getfloat("triggerer", "JOB_HEARTBEAT_SEC") | ||
| else: | ||
| # Heartrate used to be hardcoded to scheduler, so in all other | ||
| # cases continue to use that value for back compat | ||
| return conf.getfloat("scheduler", "JOB_HEARTBEAT_SEC") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This breaks line 110/108 unfortunately, where self.heartrate is assigned to.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! Interesting that no tests failed for that. I'll check self for a heartbeat first before fetching from the config
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I think about it, I think likely no tests failed because we never pass a value to the constructor that isn't itself from config. Since after reading it back from the DB it can only be the config value.
I'll still add a test to cover it though.
Like I mentioned in the other comment thread, I'm trying to scope this down to a bug fix. I'll cut another Issue to investigate refactoring heartrate entirely (I think adding a column for it in Job on the DB would be best).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking deeper, I don't think this breaks those lines at all. If a value is assigned to heartbeat directly in the constructor then the cached_property is not used. self.executor is already working in this way if you look above.
I added a unit test to confirm this and also checked manually with a dummy class in a python session:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested it yesterday:
When we initiate the Job from the CLI, we provide the heartbeat and it will be used. But when we load the row from the Metadata, the heartbeat will be None, in this case we will use the cached property.
I don't see any issue with this approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually this is ok accordig to how cached_property works. I recently stumbled upon that.
The @cached_property works different than @property -> it actually creates an attribute that is set the first time the property is used and you can freely delete or assign the attribute a different value. This is a nice one, because you can even invalidate the cache by simply deleting the attribute (which is for example very cool for testing and I am already using it to invalidat sensitive field list when loading provider configuration https://github.com/apache/airflow/blob/main/airflow/configuration.py#L1854
It was working like that since 3.8 but it made it into "official part of the API" in Python 3.9. Similarly as HashMap ordering graduated from being implementation detail to official behaviour.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The mechanics of cached_property() are somewhat different from property(). A regular property blocks attribute writes unless a setter is defined. In contrast, a cached_property allows writes.
https://docs.python.org/3/library/functools.html#functools.cached_property
So I guess this is good then.
774a567 to
cb7e70d
Compare
|
I'd like to get this bug fix in for 2.7.0 if possible. I think we have some approvals and got to the bottom of the requested change (no change required, @uranusjr if you could clear that request for changes that'd be great). CI is being quite stubborn, lots of "this job failed" and networking issues in the containers. It was green previously with the exact same code, so it should go again, just have to babysit it. |
|
Green now. @uranusjr ? |
(cherry picked from commit c39359e)

Now that Triggerer heartbeat is configurable (via #32123), the heartbeat set on Job must use the correct config source, either Scheduler or Triggerer depending on the Job.
This is noticeable in the health check apis for example. If one increased the heartrate of the Triggerer to 10s and left the scheduler at the default of 5s, one should expected heartbeats of up to heartrate*2.1 seconds (so 21s) to be acceptable for the triggerer, but instead we see in the UI a warning dialog which indicates that the last heartbeat was received 13s ago and throws the warning (this shows that it is using the default 5s seconds from the scheduler not the updated 10s seconds on the triggerer).
NOTE:
This change moves the heartbeat from a class attribute to a cached property. All tests pass and it's working correctly from what I can see. But I'm interested to hear form folks who have historical context on Job.py to double check this and let me know if they are aware of any gotchas with this approach.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in newsfragments.