-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Added retry to ECS Operator #14263
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added retry to ECS Operator #14263
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contribution Guide (https://github.com/apache/airflow/blob/master/CONTRIBUTING.rst)
|
|
@turbaszek Thanks for the feedback. I also saw some checks fail (i.e. |
|
@turbaszek just a friendly bump to see what the next steps are here |
| from airflow.providers.amazon.aws.models.exceptions import ECSOperatorError | ||
| from airflow.utils.log.logging_mixin import LoggingMixin | ||
|
|
||
| ECS_QUOTA_ERROR_REASONS = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these are ECS specific they shouldn't really be in the base_aws module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what to do here. It feels weird to move this to ecs.py if I have code in base_aws.py that catches ECSOperatorError. For now, I've moved this variable into the is_permissible_error() scope to clarify intent. Any suggestions are welcomed.
| """ | ||
|
|
||
| def decorator_f(self): | ||
| quota_retry = getattr(self, 'quota_retry', None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about this parameter name -- it seems very specific to EcsOperator only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. Hm I've changed it to retry_args to be more generic, but I'm open to any suggestions.
|
@ashb Friendly bump here |
|
Thanks, I'll take a look on Monday |
| return self.get_client_type("iam").get_role(RoleName=role)["Role"]["Arn"] | ||
|
|
||
| @staticmethod | ||
| def retry(fun: Callable): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we change this to
def retry(should_retry: Callable[[Exception], bool], fun: Callable)Then the retry_if_permissible_error can move out of BaseAWS in to ECSHook, used like this:
def should_retry(exception: Exception):
"""Check if exception is related to ECS resource quota (CPU, MEM)."""
return isinstance(exception, ECSOperatorError) and any(
quota_reason in failure['reason']
for quota_reason in ['RESOURCE:MEMORY', 'RESOURCE:CPU']
for failure in exception.failures
)
...
@AwsBaseHook.retry(should_retry)
def _start_task(self):There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wish I thought of this. I've made this change.
| limit. | ||
| """ | ||
|
|
||
| def decorator_f(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| def decorator_f(self): | |
| @functools.wraps | |
| def decorator_f(self, *args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
@markhopson See the review I just left -- I think that gives us a way to have the base retry functionality in the base hook, without having to put any service specific login in there. WDYT? |
@ashb Good tip. I like it. I've made the change. |
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
|
The Workflow run is cancelling this PR. It has some failed jobs matching ^Pylint$,^Static checks,^Build docs$,^Spell check docs$,^Backport packages$,^Provider packages,^Checks: Helm tests$,^Test OpenAPI*. |
|
@ashb Friendly bump. |
ashb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good @markhopson
Could you fix up the static checks please?
|
The PR is likely OK to be merged with just subset of tests for default Python and Database versions without running the full matrix of tests, because it does not modify the core of Airflow. If the committers decide that the full tests matrix is needed, they will add the label 'full tests needed'. Then you should rebase to the latest master or amend the last commit of the PR, and push it with --force-with-lease. |
|
TestECSOperator.test_execute_with_failures is now failing becuase the type of the exception it throws has changed -- I think that's fine, and the tests just need updating. I haven't look in detail at the code/test though. |
|
Awesome work, congrats on your first merged pull request! |
This PR allows users to enable retry-behaviour for
ECSOperator._start_task.This feature was discussed briefly: here #13725
The design was lifted from
hooks/base_google.py. There's 2 main differences with my AWS implementation ...the user can configure the retry with a param
quota_retry(Dict) that is passed in to theECSOperatorthe user can configure the retry without tenacity (e.g. passing in
max=300andmultiplier=1instead oftenacity.wait_exponential(multiplier=1, max=300)