Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ public void registerTimelineCallback(Executor exec, TimelineCallback callback)
}

@Override
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
public void registerServerCallback(Executor exec, ServerCallback callback)
{
// do nothing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ public void registerSegmentCallback(Executor exec, SegmentCallback callback)
}

@Override
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
public void registerServerCallback(Executor exec, ServerCallback callback)
{

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class BatchServerInventoryView implements ServerInventoryView, FilteredSe
private final CuratorInventoryManager<DruidServer, Set<DataSegment>> inventoryManager;
private final AtomicBoolean started = new AtomicBoolean(false);

private final ConcurrentMap<ServerRemovedCallback, Executor> serverRemovedCallbacks = new ConcurrentHashMap<>();
private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new ConcurrentHashMap<>();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new ConcurrentHashMap<>();

private final ConcurrentMap<String, Set<DataSegment>> zNodes = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -127,13 +127,14 @@ public Set<DataSegment> deserializeInventory(byte[] bytes)
public void newContainer(DruidServer container)
{
log.info("New Server[%s]", container);
runServerCallbacks(callback -> callback.serverAdded(container));
}

@Override
public void deadContainer(DruidServer deadContainer)
{
log.info("Server Disappeared[%s]", deadContainer);
runServerRemovedCallbacks(deadContainer);
runServerCallbacks(callback -> callback.serverRemoved(deadContainer));
}

@Override
Expand Down Expand Up @@ -216,9 +217,9 @@ public Collection<DruidServer> getInventory()
}

@Override
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
public void registerServerCallback(Executor exec, ServerCallback callback)
{
serverRemovedCallbacks.put(callback, exec);
serverCallbacks.put(callback, exec);
}

@Override
Expand All @@ -243,13 +244,13 @@ protected void runSegmentCallbacks(
}
}

private void runServerRemovedCallbacks(final DruidServer server)
private void runServerCallbacks(final Function<ServerCallback, CallbackAction> fn)
{
for (final Map.Entry<ServerRemovedCallback, Executor> entry : serverRemovedCallbacks.entrySet()) {
for (final Map.Entry<ServerCallback, Executor> entry : serverCallbacks.entrySet()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Probably cleaner to use Map.forEach.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I prefer the regular for in situations where the forEach can't be a one-liner (& even sometimes then) so I kept it as-is.

entry.getValue().execute(
() -> {
if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
serverRemovedCallbacks.remove(entry.getKey());
if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
serverCallbacks.remove(entry.getKey());
}
}
);
Expand Down
26 changes: 20 additions & 6 deletions server/src/main/java/org/apache/druid/client/BrokerServerView.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,11 +157,25 @@ public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas)
segmentFilter
);

baseView.registerServerRemovedCallback(
baseView.registerServerCallback(
exec,
server -> {
removeServer(server);
return CallbackAction.CONTINUE;
new ServerCallback() {
@Override
public CallbackAction serverAdded(DruidServer server)
{
// We don't track brokers in this view.
if (!server.getType().equals(ServerType.BROKER)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a 1-liner comment like "Do not keep inventory of other Brokers" or something.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment We don't track brokers in this view.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

addServer(server);
}
return CallbackAction.CONTINUE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ever use the other callback action UNREGISTER?
I think it is not used anymore and can probably be removed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not used, & seems like it hasn't been used since #6742. I suppose we could remove it in this PR or in a new one.

}

@Override
public CallbackAction serverRemoved(DruidServer server)
{
removeServer(server);
return CallbackAction.CONTINUE;
}
}
);
}
Expand Down Expand Up @@ -378,9 +392,9 @@ public <T> QueryRunner<T> getQueryRunner(DruidServer server)
}

@Override
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
public void registerServerCallback(Executor exec, ServerCallback callback)
{
baseView.registerServerRemovedCallback(exec, callback);
baseView.registerServerCallback(exec, callback);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,16 @@ public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentS
}
);

baseView.registerServerRemovedCallback(
baseView.registerServerCallback(
exec,
new ServerView.ServerRemovedCallback()
new ServerView.ServerCallback()
{
@Override
public ServerView.CallbackAction serverAdded(DruidServer server)
{
return ServerView.CallbackAction.CONTINUE;
}

@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ void registerSegmentCallback(
Predicate<Pair<DruidServerMetadata, DataSegment>> filter
);

void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback);
void registerServerCallback(Executor exec, ServerView.ServerCallback callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer

private final LifecycleLock lifecycleLock = new LifecycleLock();

private final ConcurrentMap<ServerRemovedCallback, Executor> serverCallbacks = new ConcurrentHashMap<>();
private final ConcurrentMap<ServerCallback, Executor> serverCallbacks = new ConcurrentHashMap<>();
private final ConcurrentMap<SegmentCallback, Executor> segmentCallbacks = new ConcurrentHashMap<>();

private final ConcurrentMap<SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates =
Expand Down Expand Up @@ -286,7 +286,7 @@ public void registerSegmentCallback(
}

@Override
public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback)
public void registerServerCallback(Executor exec, ServerCallback callback)
{
if (lifecycleLock.isStarted()) {
throw new ISE("Lifecycle has already started.");
Expand Down Expand Up @@ -347,18 +347,13 @@ public void run()
}
}

private void runServerRemovedCallbacks(final DruidServer server)
private void runServerCallbacks(final Function<ServerCallback, CallbackAction> fn)
{
for (final Map.Entry<ServerRemovedCallback, Executor> entry : serverCallbacks.entrySet()) {
for (final Map.Entry<ServerCallback, Executor> entry : serverCallbacks.entrySet()) {
entry.getValue().execute(
new Runnable()
{
@Override
public void run()
{
if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) {
serverCallbacks.remove(entry.getKey());
}
() -> {
if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) {
serverCallbacks.remove(entry.getKey());
}
}
);
Expand Down Expand Up @@ -421,12 +416,13 @@ private void updateFinalPredicate()
void serverAdded(DruidServer server)
{
synchronized (servers) {
DruidServerHolder holder = servers.get(server.getName());
if (holder == null) {
DruidServerHolder existing = servers.get(server.getName());
if (existing == null) {
log.info("Server[%s] appeared.", server.getName());
holder = new DruidServerHolder(server);
servers.put(server.getName(), holder);
holder.start();
final DruidServerHolder newHolder = new DruidServerHolder(server);
servers.put(server.getName(), newHolder);
runServerCallbacks(callback -> callback.serverAdded(newHolder.druidServer));
newHolder.start();
} else {
log.info("Server[%s] already exists.", server.getName());
}
Expand All @@ -440,7 +436,7 @@ private void serverRemoved(DruidServer server)
if (holder != null) {
log.info("Server[%s] disappeared.", server.getName());
holder.stop();
runServerRemovedCallbacks(holder.druidServer);
runServerCallbacks(callback -> callback.serverRemoved(holder.druidServer));
} else {
log.info("Ignoring remove notification for unknown server[%s].", server.getName());
}
Expand Down
20 changes: 18 additions & 2 deletions server/src/main/java/org/apache/druid/client/ServerView.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
*/
public interface ServerView
{
void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback);
void registerServerCallback(Executor exec, ServerCallback callback);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super nit:

To allow for more concise syntax (using lambdas) in the calling code, I wonder if we should have an overloaded method with the signature:

  void registerServerCallback(Executor exec, Consumer<DruidServer> onAdded, Consumer<DruidServer> onRemoved);

assuming we get rid of the CallbackAction altogether.

ServerCallback can then be a concrete class which takes two Consumer<DruidServer> in its constructor, default values for which would be no-op.

It might be useful even when we add new methods to ServerCallback as callers would need to override only the required methods.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A builder might be nice too. It would allow keeping together all the ServerCallback stuff in the interface. Something like:

ServerCallback.builder()
              .onServerAdded(server -> ...)
              .onServerRemoved(server -> ...)
              .build()

or for just doing one:

ServerCallback.onServerRemoved(server -> ...)

etc.

Would like to have this be a different PR, so that PR can focus on the thinking about the best way to specify callbacks that only want to listen to one thing.

void registerSegmentCallback(Executor exec, SegmentCallback callback);

enum CallbackAction
Expand All @@ -38,8 +38,24 @@ enum CallbackAction
UNREGISTER,
}

interface ServerRemovedCallback
interface ServerCallback
{
/**
* Called when a server is added.
*
* The return value indicates if this callback has completed its work. Note that even if this callback
* indicates that it should be unregistered, it is not possible to guarantee that this callback will not
* get called again. There is a race condition between when this callback runs and other events that can cause
* the callback to be queued for running. Thus, callbacks shouldn't assume that they will not get called
* again after they are done. The contract is that the callback will eventually be unregistered, enforcing
* a happens-before relationship is not part of the contract.
*
* @param server The server that was added.
* @return UNREGISTER if the callback has completed its work and should be unregistered. CONTINUE if the callback
* should remain registered.
*/
CallbackAction serverAdded(DruidServer server);

/**
* Called when a server is removed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th
containers.put(containerKey, new ContainerHolder(container, inventoryCache));

log.debug("Starting inventory cache for %s, inventoryPath %s", containerKey, inventoryPath);
inventoryCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
strategy.newContainer(container);
inventoryCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
}
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.client.DruidServer;
import org.apache.druid.client.ServerInventoryView;
import org.apache.druid.client.ServerView;
import org.apache.druid.java.util.common.concurrent.Execs;
Expand Down Expand Up @@ -106,13 +107,23 @@ public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentS
}
);

serverInventoryView.registerServerRemovedCallback(
serverInventoryView.registerServerCallback(
executor,
server -> {
if (server.isSegmentReplicationTarget()) {
clusterCostCacheBuilder.removeServer(server.getName());
new ServerView.ServerCallback() {
@Override
public ServerView.CallbackAction serverAdded(DruidServer server)
{
return ServerView.CallbackAction.CONTINUE;
}

@Override
public ServerView.CallbackAction serverRemoved(DruidServer server)
{
if (server.isSegmentReplicationTarget()) {
clusterCostCacheBuilder.removeServer(server.getName());
}
return ServerView.CallbackAction.CONTINUE;
}
return ServerView.CallbackAction.CONTINUE;
}
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public class BrokerServerViewTest extends CuratorTestBase
private final ZkPathsConfig zkPathsConfig;

private CountDownLatch segmentViewInitLatch;
private CountDownLatch serverAddedLatch;
private CountDownLatch segmentAddedLatch;
private CountDownLatch segmentRemovedLatch;

Expand Down Expand Up @@ -158,7 +159,7 @@ public void testMultipleServerAddedRemovedSegment() throws Exception
setupViews();

final List<DruidServer> druidServers = Lists.transform(
ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
ImmutableList.of("localhost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
hostname -> setupHistoricalServer("default_tier", hostname, 0)
);

Expand Down Expand Up @@ -237,6 +238,7 @@ public void testMultipleServerAddedRemovedSegment() throws Exception
public void testMultipleServerAndBroker() throws Exception
{
segmentViewInitLatch = new CountDownLatch(1);
serverAddedLatch = new CountDownLatch(6);
segmentAddedLatch = new CountDownLatch(6);

// temporarily set latch count to 1
Expand All @@ -254,13 +256,26 @@ public void testMultipleServerAndBroker() throws Exception
0
);

final List<DruidServer> druidServers = Lists.transform(
ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
hostname -> setupHistoricalServer("default_tier", hostname, 0)
);
// Materialize this list so all servers are set up
final List<DruidServer> druidServers =
ImmutableList.copyOf(
Lists.transform(
ImmutableList.of("localhost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"),
hostname -> setupHistoricalServer("default_tier", hostname, 0)
)
);

setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper);

Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(serverAddedLatch));

// check server metadatas
Assert.assertEquals(
druidServers.stream().map(DruidServer::getMetadata).collect(Collectors.toSet()),
ImmutableSet.copyOf(brokerServerView.getDruidServerMetadatas())
);

final List<DataSegment> segments = Lists.transform(
ImmutableList.of(
Pair.of("2011-04-01/2011-04-03", "v1"),
Expand All @@ -277,7 +292,6 @@ public void testMultipleServerAndBroker() throws Exception
for (int i = 0; i < 5; ++i) {
announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper);
}
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch));
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch));

TimelineLookup timeline = brokerServerView.getTimeline(
Expand All @@ -297,12 +311,6 @@ public void testMultipleServerAndBroker() throws Exception
)
);

// check server metadatas
Assert.assertEquals(
druidServers.stream().map(DruidServer::getMetadata).collect(Collectors.toSet()),
ImmutableSet.copyOf(brokerServerView.getDruidServerMetadatas())
);

// unannounce the broker segment should do nothing to announcements
unannounceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig);
Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch));
Expand Down Expand Up @@ -617,6 +625,29 @@ private void setupViews(Set<String> watchedTiers, Set<String> ignoredTiers, bool
"test"
)
{
@Override
public void registerServerCallback(Executor exec, ServerCallback callback)
{
super.registerServerCallback(
exec,
new ServerCallback() {
@Override
public CallbackAction serverAdded(DruidServer server)
{
final CallbackAction res = callback.serverAdded(server);
serverAddedLatch.countDown();
return res;
}

@Override
public CallbackAction serverRemoved(DruidServer server)
{
return callback.serverRemoved(server);
}
}
);
}

@Override
public void registerSegmentCallback(Executor exec, final SegmentCallback callback)
{
Expand Down
Loading