Skip to content

Guava-free Same Thread Executor#5413

Closed
drcrallen wants to merge 16 commits intoapache:masterfrom
drcrallen:guava/noSameThreadExecutor
Closed

Guava-free Same Thread Executor#5413
drcrallen wants to merge 16 commits intoapache:masterfrom
drcrallen:guava/noSameThreadExecutor

Conversation

@drcrallen
Copy link
Copy Markdown
Contributor

The MoreExecs.sameThreadExecutor() is deprecated as per
https://github.com/google/guava/blob/v18.0/guava/src/com/google/common/util/concurrent/MoreExecutors.java#L266-L267
and the function signature moves around a lot in future guava releases.

This creates a new SameThreadExecutorService using more modern java
features.

Charles Allen added 4 commits February 21, 2018 20:48
The `MoreExecs.sameThreadExecutor()` is deprecated as per
https://github.com/google/guava/blob/v18.0/guava/src/com/google/common/util/concurrent/MoreExecutors.java#L266-L267
and the function signature moves around a lot in future guava releases.

This creates a new `SameThreadExecutorService` using more modern java
features.
}).writeValueAsString(taskGroups.get(groupId).sequenceOffsets);
final Map<String, Object> context = spec.getContext() == null
? ImmutableMap.of("checkpoints", checkpoints, IS_INCREMENTAL_HANDOFF_SUPPORTED, true)
? ImmutableMap.of(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please replace ternary with if-else

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -361,11 +362,17 @@ public void testBasics() throws Exception

for (DataSegment publishedSegment : publishedSegments) {
Optional<Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>>> optional = handOffCallbacks.entrySet().stream()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please break line before .entrySet() to make reduce indentation below. Same below in this class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or thought I did, checking

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haha, pushed to wrong repo, fixed now

SupervisorManager.class)));
tac = new LocalTaskActionClientFactory(
taskStorage,
new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not formatted properly

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

public class SameThreadExecutorService extends AbstractExecutorService
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code just copied from Guava? Could you refer to a file on Github (under google/guava) that contains it, in javadoc comment?

Could you add "TODO: replace with ... when updated to Guava X", and a corresponding issue?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not copied from guava

@drcrallen
Copy link
Copy Markdown
Contributor Author

@leventov addressed comments

Copy link
Copy Markdown
Contributor

@b-slim b-slim left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, can you also ban those functions as well to avoid future invocation?

notices.add(new RunNotice());
}
}, MoreExecutors.sameThreadExecutor()
}, Execs.sameThreadExecutor()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execs.sameThreadExecutor() should be on the next line.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

public Runnable commit()
{
return () -> {};
return () -> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's Runnables.getNoopRunnable().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

public Runnable commit()
{
return () -> {};
return () -> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

return DummyExecutorService.INSTANCE;
}

public static ListeningExecutorService sameThreadExecutor()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a mistake in Guava's API to call this method this way, because it gives full impression that some static constant is returned from it (I thought that until now), while in reality this method (and the removed method in Guava) creates a new Object each time. (Guava developers fixed this mistake by calling the new method newDirectExecutorService().)

However, I think that Druid actually never tries to terminate this service, so it would be better to keep the method name as is, actually cache the executor service, and simplify it's implementation, by either throwing an UnsupportedOperationException in shutdown() and shutdownNow() or making them no-ops, and always returning false from isTerminated(), and removing all state and synchronization from the class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's reasonable

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@drcrallen
Copy link
Copy Markdown
Contributor Author

I'm waiting on #5413 (review) because doing it will collide with #5414

publishedSegment.getShardSpec().getPartitionNum()
)))
.findFirst();
Optional<Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>>> optional = handOffCallbacks
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This construction looks like handOffCallbacks.get()...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not change any logic. This just retains the prior code

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

publishedSegment.getShardSpec().getPartitionNum()
)))
.findFirst();
Optional<Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>>> optional = handOffCallbacks
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not going to change logic in this PR

publishedSegment.getShardSpec().getPartitionNum()
)))
.findFirst();
Optional<Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>>> optional = handOffCallbacks
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic is retained as the prior on purpose

/**
* A simple class that implements the ExecutorService interface, but runs the code on a call to submit
*/
public class SameThreadExecutorService extends AbstractExecutorService
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make this class a singleton

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made it singleton in Execs

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Execs doesn't prohibit instantiation of new SameThreadExecutorService objects.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that is true. Why force singleton?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To prohibit accidential instantiation.

Referencing Execs in the javadoc comment would be useful too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the class constructor package private and added some more comments.

final long nanos = TimeUnit.NANOSECONDS.convert(timeout, unit);
final long millis = TimeUnit.MILLISECONDS.convert(timeout, unit);
final int sleepNanos = (int) (nanos - millis * 1_000_000L);
Thread.sleep(millis, sleepNanos);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think sleeping makes much sense here, either return immediately or throw an UnsupportedOperationException.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The contract says it will wait, so I wait

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

awaitTermination() should be called only after shutdown() or shutdownNow(), which throw UnsupportedOperationException, so it's even more reasonable to throw it from awaitTermination()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interface is unambiguous : https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ExecutorService.html#awaitTermination-long-java.util.concurrent.TimeUnit-

Blocks until all tasks have completed execution after a shutdown request, or the timeout occurs, or the current thread is interrupted, whichever happens first.

Which is what this implementation does.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Javadocs of shutdown() and shutdownNow() also don't give an option to throw an UnsupportedOperationException. Yet when awaitTermination() is called on SameThreadExecutorService, it should be a programming bug, so better to report it rather than silence.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnsupportedOperationException is an unchecked exception, it does not need to be declared

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and that is why there is no problem with throwing it from the method, although the interface doesn't specify. I mean, if we throw UOE from shutdown() and shutdownNow(), rather than make those methods no-ops, it makes just as much sense to throw UOE from awaitTermination() as well. And so to keep them consistent.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drcrallen the PR looks good to me, except this thing, but it's not blocking for merge. However I still think it should be changed.

* SameTheadExecutorService constructor now package private
*/
public class SameThreadExecutorService extends AbstractExecutorService
{
// Use io.druid.java.util.common.concurrent.Execs#sameThreadExecutor()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please make this a Javadoc comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or just move to class-level comment

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added class level comment

@drcrallen
Copy link
Copy Markdown
Contributor Author

@leventov any other comments here?


/**
* A simple class that implements the ExecutorService interface, but runs the code on a call to submit
* Use io.druid.java.util.common.concurrent.Execs#sameThreadExecutor() to get the instance
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make it a javadoc link

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

@drcrallen
Copy link
Copy Markdown
Contributor Author

@leventov anything else here?

@drcrallen
Copy link
Copy Markdown
Contributor Author

ick, looks like io.druid.curator.announcement.Announcer#Announcer shuts down the announcer it was passed! which causes tests to fail. I'm going to remove the failure on shutdown

@drcrallen
Copy link
Copy Markdown
Contributor Author

had to revert "only one instance" for the same thread executor service :(

@leventov
Copy link
Copy Markdown
Member

Why?

@drcrallen
Copy link
Copy Markdown
Contributor Author

@leventov some of the tests close out the executor service. Announcer comes to mind. Fixing that is a bit much for this pr.

@drcrallen
Copy link
Copy Markdown
Contributor Author

@leventov see #5527

@leventov
Copy link
Copy Markdown
Member

In this case, why wouldn't we temporarily implement all shutdown(), shutdownNow() and isShutdown() as no-ops? And switch to throwing UnsupportedOperationException once #5527 is fixed?

@drcrallen
Copy link
Copy Markdown
Contributor Author

@leventov because that is only one use case, I'm not sure if there are others since I didn't try to get as many tests to pass as possible. I don't want to make assumptions about how the code is using executor services if it is already shown to do odd things with who-controls-this-executor.

As such keeping behavior close to the executor service interface is desired. This PR keeps behavior closer to the executor service interface

@leventov
Copy link
Copy Markdown
Member

Having those methods implemented as no-ops (hence awaitTermination() returns immediately) is guaranteed not to cause errors in runtime. It may not catch extra bugs, yes, but we know that we have bugs already. Once those bugs are fixed, it will be desired to switch back to singleton model, so why changing code back and forth? If it's made singleton now, all that is going to be needed is not replace no-op bodies with bodies throwing exceptions.

@drcrallen
Copy link
Copy Markdown
Contributor Author

The constructor is package private, all instances must come through Execs.sameThreadExecutor() moving to singleton later will be straight forward

@leventov
Copy link
Copy Markdown
Member

I'm concerned about potential performance impact of this solution. Now the executor that is supposed to be a cost-free abstraction (just execute the Runnable's run or Callable's call, passed in) does two operations with a Phaser. As the very minimum, it's a flush or read/write CPU core pipelines. And if some instance of sameThreadExecutor() is used widely across Threads, assuming it has no state, it could suddenly become a concurrency bottleneck. For no real business logic / correctness purpose.

It seems to me that the no-op solution doesn't have any risks and couldn't theoretically have any performance impact.

@drcrallen
Copy link
Copy Markdown
Contributor Author

@leventov com.google.common.util.concurrent.MoreExecutors.SameThreadExecutorService uses a state lock whenever tasks are submitted. Are you suggesting the performance of a Phaser would be worse?

@b-slim
Copy link
Copy Markdown
Contributor

b-slim commented Apr 12, 2018

@drcrallen can we take this to the finish line?

@leventov
Copy link
Copy Markdown
Member

@drcrallen the current SameThreadExecutorService implementation with Phaser has logical issues:

  • It should check values returned from register(), correctly processing termination case
  • Initial register() and final arriveAndDeregister() is potentially done from different threads, that could lead to errors (this bug has the same root cause).

I didn't mention that before because I wanted to persuade you not to use Phaser using different arguments. But now considering that the current implementation of SameThreadExecutorService is relatively complex and prone to some bugs or inconsistencies, while the proposed no-op approach is absolutely trivial, would you agree to switch to stateless singleton with no-op methods?

@drcrallen
Copy link
Copy Markdown
Contributor Author

This is too old at this point. closing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants