-
Notifications
You must be signed in to change notification settings - Fork 331
SAMZA-1795 table: add common retry for IO functions #618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Add common retry functionality to table IO functions for data stores that do not have native retry support. We use failsafe as the retry library.
|
@prateekm please help review when you have a chance. |
weisong44
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution, overall LGTM. A few minor comments.
|
|
||
| import org.apache.samza.SamzaException; | ||
|
|
||
| import net.jodah.failsafe.RetryPolicy; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally the retry policy is just a POJO that captures retry parameters. This class should be independent on the implementation (failsafe) we chose. And it should not take a dependency on failsafe.
| * Currently, the policy object can be translated into {@link RetryPolicy} of failsafe library. | ||
| */ | ||
| public class TableRetryPolicy implements Serializable { | ||
| enum BackoffType { NONE, FIXED, RANDOM, EXPONENTIAL } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we describe the behavior and related parameters of each?
| * Wrapper of retry-related metrics common to both {@link RetriableReadFunction} and | ||
| * {@link RetriableWriteFunction}. | ||
| */ | ||
| class RetryMetrics { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It it possible to get counts for failed request after retry?
|
|
||
| private AsyncFailsafe<?> failsafe() { | ||
| long startMs = System.currentTimeMillis(); | ||
| return Failsafe.with(retryPolicy).with(retryExecutor) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this part reusable or do we have to create a new instance for every request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need the instance because the timestamp is unique per request and each failsafe object contains its own context as retry is per-request not shared across all requests.
| TableRetryPolicy writeRetryPolicy = null; | ||
|
|
||
| if (readRetryPolicy != null || writeRetryPolicy != null) { | ||
| retryExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering if it's better to share this across all tasks and tables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I'll make this a singleton
- extract failsafe logic into an adapter class from TableRetryPolicy - added a unit test class for TableRetryPolicy + FailsafeAdapter - use retry executor service as a singleton - added a permanent-failure metric
This allows the application to have a say on which exception types can be retried. Exception will be retried if either table function or the custom predicate say so.
weisong44
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM
| */ | ||
| public TableRetryPolicy withRetryOn(RetryPredicate isRetriable) { | ||
| Preconditions.checkNotNull(isRetriable); | ||
| this.isRetriable = isRetriable; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we simply call it retryPredicate?
| * @param isRetriable predicate for retriable exception identification | ||
| * @return this policy instance | ||
| */ | ||
| public TableRetryPolicy withRetryOn(RetryPredicate isRetriable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we just call it withRetryPredicate?
xinyuiscool
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Have one minor comment.
A question for table provider: is it created per task? I want to make sure the readFn and writeFn will be created a new instance for each task. thx
samza-core/src/main/java/org/apache/samza/table/retry/RetriableReadFunction.java
Outdated
Show resolved
Hide resolved
|
@xinyuiscool yes, the table and the associated read/write fns are created per task instance. |
|
Thanks for answering my question. Please address all @weisong44 's feedback and I will commit it once it's finalized. |
Add common retry functionality to table IO functions for data stores
that do not have native retry support. We use failsafe as the retry
library.