-
Notifications
You must be signed in to change notification settings - Fork 962
Refactored OrderedSafeExecutor and OrderedScheduler #1309
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| * method. | ||
| */ | ||
| @Slf4j | ||
| public class OrderedSafeExecutor implements ExecutorService { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are in a new package, can you rename it to "OrderedExecutor" to be consistent with "OrderedScheduler"? This also make a distinguish from the old OrderedSafeExecutor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, makes sense.
Also, regarding SafeRunnable, I think we should be doing that internally, just taking a Runnable, though that's another story from this PR.
eolivelli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea!
I left some questions but overall is very good
|
|
||
| @Override | ||
| public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { | ||
| checkQueue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't you pass something like the size of the list? checkQueue assumes you are going to add 1 item.
I mean something like
checkQueue(tasks.size())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
| /** | ||
| * Abstract builder class to build {@link OrderedScheduler}. | ||
| */ | ||
| public abstract static class AbstractBuilder<T extends OrderedExecutor> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it could be package private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it needs to be public because the builder methods are returning AbstractBuilder<T> so that needs to be visible to callers.
| } | ||
|
|
||
| public <T> ListenableFuture<T> submitOrdered(long orderingKey, Callable<T> task) { | ||
| SettableFuture<T> future = SettableFuture.create(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more lightweight than CompletableFuture or is there any other reason to use this instead of CF?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing code for submitting a Callable was already expecting ListenableFuture. We can (and should) change to CompletableFuture though I didn't want to cram too many changes in a single PR.
|
@dlg99 Please review. |
|
@merlimat As this is perf focused change, any data / micro benchmarks to validate our assumptions? |
| /** | ||
| * A builder class for an OrderedSafeExecutor. | ||
| */ | ||
| public static class Builder extends AbstractBuilder<OrderedExecutor> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use lombok's @builder to replace this? sounds like one can override build() method: projectlombok/lombok#1144
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I didn't change it here to limit the scope of this already big PR. I think we should do in separate change.
|
@eolivelli can you review this again? @merlimat already addressed your comments. |
eolivelli
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 great work @merlimat !
|
@jvrao I have added a simple microbenchmark (22eba18) Results: |
|
@merlimat @jvrao "337.974 ± 325.934" is a smell. it's too much of a range. looking at microbenchmark, it always uses 1 threaded executor and 16 threads that submit/wait for tasks. I think it hits bottleneck in that one thread's rather than in the queue passing tasks between threads. I'd create executor with 8 (half or producer's) threads an give it a shot. or run the test with 1/2/4/8/16 threads in executor, use num threads as another Param for the test. |
As outlined in https://lists.apache.org/thread.html/102383ea42f473f36720637e41af0ee83fc38d9f992736e0d1a7f985@%3Cdev.bookkeeper.apache.org%3E
right now
OrderedSafeExecutoris implemented on top ofOrderedScheduler. There are few problems with this approach that are causing impact on performance:OrderedScheduleris aScheduledExecutorServicewhich uses a priority queue for tasks. The priority queue has a single mutex for both publishers/consumers on the queueSince in all cases in critical write/read path we don't need delay task execution or futures, we should try to have a light weight execution for that.
Modifications
OrderedSafeExecutorandOrderedScheduler. Now the base class isOrderedSafeExecutorand the other extends from it, since it provides additional methods.OrderedSafeExecutorinbookkeeper-commonsinceOrderedSchedulerwas already there.OrderedSafeGenericCallbackin its own file, since it needs to be inbookkeeper-servermodule at this point.submitOrdered()intoexecuteOrdered()to be consistent with JDK name (submit()returns a future whileexecute()returns void).BookKeeperinstance ofschedulerintoOrderedSchedulerso that the few cases which were using themainWorkerPoolcould be easily converted to use the scheduler instead.