diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index df4677c3bc8f..2a3aede2a904 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -556,7 +556,7 @@ public void registerTimelineCallback(Executor exec, TimelineCallback callback) } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { // do nothing } diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 41bd3db31ad6..d16895dc7712 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -352,7 +352,7 @@ public void registerSegmentCallback(Executor exec, SegmentCallback callback) } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { } diff --git a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java index 58d4fb0927ee..f14666030ec8 100644 --- a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java @@ -65,7 +65,7 @@ public class BatchServerInventoryView implements ServerInventoryView, FilteredSe private final CuratorInventoryManager> inventoryManager; private final AtomicBoolean started = new AtomicBoolean(false); - private final ConcurrentMap serverRemovedCallbacks = new ConcurrentHashMap<>(); + private final ConcurrentMap serverCallbacks = new ConcurrentHashMap<>(); private final ConcurrentMap segmentCallbacks = new ConcurrentHashMap<>(); private final ConcurrentMap> zNodes = new ConcurrentHashMap<>(); @@ -127,13 +127,14 @@ public Set deserializeInventory(byte[] bytes) public void newContainer(DruidServer container) { log.info("New Server[%s]", container); + runServerCallbacks(callback -> callback.serverAdded(container)); } @Override public void deadContainer(DruidServer deadContainer) { log.info("Server Disappeared[%s]", deadContainer); - runServerRemovedCallbacks(deadContainer); + runServerCallbacks(callback -> callback.serverRemoved(deadContainer)); } @Override @@ -216,9 +217,9 @@ public Collection getInventory() } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { - serverRemovedCallbacks.put(callback, exec); + serverCallbacks.put(callback, exec); } @Override @@ -243,13 +244,13 @@ protected void runSegmentCallbacks( } } - private void runServerRemovedCallbacks(final DruidServer server) + private void runServerCallbacks(final Function fn) { - for (final Map.Entry entry : serverRemovedCallbacks.entrySet()) { + for (final Map.Entry entry : serverCallbacks.entrySet()) { entry.getValue().execute( () -> { - if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) { - serverRemovedCallbacks.remove(entry.getKey()); + if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { + serverCallbacks.remove(entry.getKey()); } } ); diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 9e13625b9bb8..95dc86367250 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -157,11 +157,25 @@ public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) segmentFilter ); - baseView.registerServerRemovedCallback( + baseView.registerServerCallback( exec, - server -> { - removeServer(server); - return CallbackAction.CONTINUE; + new ServerCallback() { + @Override + public CallbackAction serverAdded(DruidServer server) + { + // We don't track brokers in this view. + if (!server.getType().equals(ServerType.BROKER)) { + addServer(server); + } + return CallbackAction.CONTINUE; + } + + @Override + public CallbackAction serverRemoved(DruidServer server) + { + removeServer(server); + return CallbackAction.CONTINUE; + } } ); } @@ -378,9 +392,9 @@ public QueryRunner getQueryRunner(DruidServer server) } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { - baseView.registerServerRemovedCallback(exec, callback); + baseView.registerServerCallback(exec, callback); } @Override diff --git a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java index 1f83e5e81ce0..e8120ee2bb56 100644 --- a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java +++ b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java @@ -125,10 +125,16 @@ public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentS } ); - baseView.registerServerRemovedCallback( + baseView.registerServerCallback( exec, - new ServerView.ServerRemovedCallback() + new ServerView.ServerCallback() { + @Override + public ServerView.CallbackAction serverAdded(DruidServer server) + { + return ServerView.CallbackAction.CONTINUE; + } + @Override public ServerView.CallbackAction serverRemoved(DruidServer server) { diff --git a/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java b/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java index 0d0ea6347d11..db1a4b4f1b33 100644 --- a/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java @@ -34,5 +34,5 @@ void registerSegmentCallback( Predicate> filter ); - void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback); + void registerServerCallback(Executor exec, ServerView.ServerCallback callback); } diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java index d19df0c2d73e..caff2dee6054 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java @@ -93,7 +93,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer private final LifecycleLock lifecycleLock = new LifecycleLock(); - private final ConcurrentMap serverCallbacks = new ConcurrentHashMap<>(); + private final ConcurrentMap serverCallbacks = new ConcurrentHashMap<>(); private final ConcurrentMap segmentCallbacks = new ConcurrentHashMap<>(); private final ConcurrentMap>> segmentPredicates = @@ -286,7 +286,7 @@ public void registerSegmentCallback( } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { if (lifecycleLock.isStarted()) { throw new ISE("Lifecycle has already started."); @@ -347,18 +347,13 @@ public void run() } } - private void runServerRemovedCallbacks(final DruidServer server) + private void runServerCallbacks(final Function fn) { - for (final Map.Entry entry : serverCallbacks.entrySet()) { + for (final Map.Entry entry : serverCallbacks.entrySet()) { entry.getValue().execute( - new Runnable() - { - @Override - public void run() - { - if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) { - serverCallbacks.remove(entry.getKey()); - } + () -> { + if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { + serverCallbacks.remove(entry.getKey()); } } ); @@ -421,12 +416,13 @@ private void updateFinalPredicate() void serverAdded(DruidServer server) { synchronized (servers) { - DruidServerHolder holder = servers.get(server.getName()); - if (holder == null) { + DruidServerHolder existing = servers.get(server.getName()); + if (existing == null) { log.info("Server[%s] appeared.", server.getName()); - holder = new DruidServerHolder(server); - servers.put(server.getName(), holder); - holder.start(); + final DruidServerHolder newHolder = new DruidServerHolder(server); + servers.put(server.getName(), newHolder); + runServerCallbacks(callback -> callback.serverAdded(newHolder.druidServer)); + newHolder.start(); } else { log.info("Server[%s] already exists.", server.getName()); } @@ -440,7 +436,7 @@ private void serverRemoved(DruidServer server) if (holder != null) { log.info("Server[%s] disappeared.", server.getName()); holder.stop(); - runServerRemovedCallbacks(holder.druidServer); + runServerCallbacks(callback -> callback.serverRemoved(holder.druidServer)); } else { log.info("Ignoring remove notification for unknown server[%s].", server.getName()); } diff --git a/server/src/main/java/org/apache/druid/client/ServerView.java b/server/src/main/java/org/apache/druid/client/ServerView.java index 83cb4856a88f..032cb07f4cf2 100644 --- a/server/src/main/java/org/apache/druid/client/ServerView.java +++ b/server/src/main/java/org/apache/druid/client/ServerView.java @@ -29,7 +29,7 @@ */ public interface ServerView { - void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback); + void registerServerCallback(Executor exec, ServerCallback callback); void registerSegmentCallback(Executor exec, SegmentCallback callback); enum CallbackAction @@ -38,8 +38,24 @@ enum CallbackAction UNREGISTER, } - interface ServerRemovedCallback + interface ServerCallback { + /** + * Called when a server is added. + * + * The return value indicates if this callback has completed its work. Note that even if this callback + * indicates that it should be unregistered, it is not possible to guarantee that this callback will not + * get called again. There is a race condition between when this callback runs and other events that can cause + * the callback to be queued for running. Thus, callbacks shouldn't assume that they will not get called + * again after they are done. The contract is that the callback will eventually be unregistered, enforcing + * a happens-before relationship is not part of the contract. + * + * @param server The server that was added. + * @return UNREGISTER if the callback has completed its work and should be unregistered. CONTINUE if the callback + * should remain registered. + */ + CallbackAction serverAdded(DruidServer server); + /** * Called when a server is removed. * diff --git a/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java b/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java index 4bceb2a8594d..cb3cc3ef6bc3 100644 --- a/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java +++ b/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java @@ -259,8 +259,8 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th containers.put(containerKey, new ContainerHolder(container, inventoryCache)); log.debug("Starting inventory cache for %s, inventoryPath %s", containerKey, inventoryPath); - inventoryCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); strategy.newContainer(container); + inventoryCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); } } break; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java index 41d56109d5bd..daa60257a686 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.druid.client.DruidServer; import org.apache.druid.client.ServerInventoryView; import org.apache.druid.client.ServerView; import org.apache.druid.java.util.common.concurrent.Execs; @@ -106,13 +107,23 @@ public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentS } ); - serverInventoryView.registerServerRemovedCallback( + serverInventoryView.registerServerCallback( executor, - server -> { - if (server.isSegmentReplicationTarget()) { - clusterCostCacheBuilder.removeServer(server.getName()); + new ServerView.ServerCallback() { + @Override + public ServerView.CallbackAction serverAdded(DruidServer server) + { + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction serverRemoved(DruidServer server) + { + if (server.isSegmentReplicationTarget()) { + clusterCostCacheBuilder.removeServer(server.getName()); + } + return ServerView.CallbackAction.CONTINUE; } - return ServerView.CallbackAction.CONTINUE; } ); } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index 707d61b140ba..1c65e2be2beb 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -76,6 +76,7 @@ public class BrokerServerViewTest extends CuratorTestBase private final ZkPathsConfig zkPathsConfig; private CountDownLatch segmentViewInitLatch; + private CountDownLatch serverAddedLatch; private CountDownLatch segmentAddedLatch; private CountDownLatch segmentRemovedLatch; @@ -158,7 +159,7 @@ public void testMultipleServerAddedRemovedSegment() throws Exception setupViews(); final List druidServers = Lists.transform( - ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), + ImmutableList.of("localhost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), hostname -> setupHistoricalServer("default_tier", hostname, 0) ); @@ -237,6 +238,7 @@ public void testMultipleServerAddedRemovedSegment() throws Exception public void testMultipleServerAndBroker() throws Exception { segmentViewInitLatch = new CountDownLatch(1); + serverAddedLatch = new CountDownLatch(6); segmentAddedLatch = new CountDownLatch(6); // temporarily set latch count to 1 @@ -254,13 +256,26 @@ public void testMultipleServerAndBroker() throws Exception 0 ); - final List druidServers = Lists.transform( - ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), - hostname -> setupHistoricalServer("default_tier", hostname, 0) - ); + // Materialize this list so all servers are set up + final List druidServers = + ImmutableList.copyOf( + Lists.transform( + ImmutableList.of("localhost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), + hostname -> setupHistoricalServer("default_tier", hostname, 0) + ) + ); setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(serverAddedLatch)); + + // check server metadatas + Assert.assertEquals( + druidServers.stream().map(DruidServer::getMetadata).collect(Collectors.toSet()), + ImmutableSet.copyOf(brokerServerView.getDruidServerMetadatas()) + ); + final List segments = Lists.transform( ImmutableList.of( Pair.of("2011-04-01/2011-04-03", "v1"), @@ -277,7 +292,6 @@ public void testMultipleServerAndBroker() throws Exception for (int i = 0; i < 5; ++i) { announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper); } - Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); TimelineLookup timeline = brokerServerView.getTimeline( @@ -297,12 +311,6 @@ public void testMultipleServerAndBroker() throws Exception ) ); - // check server metadatas - Assert.assertEquals( - druidServers.stream().map(DruidServer::getMetadata).collect(Collectors.toSet()), - ImmutableSet.copyOf(brokerServerView.getDruidServerMetadatas()) - ); - // unannounce the broker segment should do nothing to announcements unannounceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); @@ -617,6 +625,29 @@ private void setupViews(Set watchedTiers, Set ignoredTiers, bool "test" ) { + @Override + public void registerServerCallback(Executor exec, ServerCallback callback) + { + super.registerServerCallback( + exec, + new ServerCallback() { + @Override + public CallbackAction serverAdded(DruidServer server) + { + final CallbackAction res = callback.serverAdded(server); + serverAddedLatch.countDown(); + return res; + } + + @Override + public CallbackAction serverRemoved(DruidServer server) + { + return callback.serverRemoved(server); + } + } + ); + } + @Override public void registerSegmentCallback(Executor exec, final SegmentCallback callback) { diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index 20972a6127a9..e7a674557476 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -265,7 +265,7 @@ public QueryRunner getQueryRunner(DruidServer server) } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index d42b44b2db6c..d0ae542614c3 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -2643,7 +2643,7 @@ public void registerTimelineCallback(final Executor exec, final TimelineCallback } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { } diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java index 69e0b6671df1..fd026e090cd2 100644 --- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java +++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java @@ -92,6 +92,7 @@ public class HttpServerInventoryViewTest private Map> segmentsAddedToView; private Map> segmentsRemovedFromView; + private Set addedServers; private Set removedServers; private AtomicBoolean inventoryInitialized; @@ -114,6 +115,7 @@ public void setup() segmentsAddedToView = new HashMap<>(); segmentsRemovedFromView = new HashMap<>(); + addedServers = new HashSet<>(); removedServers = new HashSet<>(); createInventoryView( @@ -170,6 +172,7 @@ public void testAddNodeStartsSync() Collection inventory = httpServerInventoryView.getInventory(); Assert.assertEquals(1, inventory.size()); Assert.assertTrue(inventory.contains(server)); + Assert.assertTrue(addedServers.contains(server.getMetadata())); execHelper.emitMetrics(); serviceEmitter.verifyValue(METRIC_SUCCESS, 1); @@ -211,6 +214,22 @@ public void testRemoveNodeStopsSync() httpServerInventoryView.stop(); } + @Test + public void testAddNodeTriggersServerAddedCallback() + { + httpServerInventoryView.start(); + druidNodeDiscovery.markNodeViewInitialized(); + execHelper.finishInventoryInitialization(); + + final DiscoveryDruidNode druidNode = druidNodeDiscovery + .addNodeAndNotifyListeners("localhost"); + final DruidServer server = druidNode.toDruidServer(); + + Assert.assertTrue(addedServers.contains(server.getMetadata())); + + httpServerInventoryView.stop(); + } + @Test(timeout = 60_000L) public void testSyncSegmentLoadAndDrop() { @@ -222,6 +241,8 @@ public void testSyncSegmentLoadAndDrop() .addNodeAndNotifyListeners("localhost"); final DruidServer server = druidNode.toDruidServer(); + Assert.assertTrue(addedServers.contains(server.getMetadata())); + final DataSegment[] segments = CreateDataSegments.ofDatasource("wiki") .forIntervals(4, Granularities.DAY) @@ -318,7 +339,8 @@ public void testSyncWhenRequestFailedToSend() druidNodeDiscovery.markNodeViewInitialized(); execHelper.finishInventoryInitialization(); - druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + final DiscoveryDruidNode druidNode = druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + Assert.assertTrue(addedServers.contains(druidNode.toDruidServer().getMetadata())); httpClient.failToSendNextRequestWith(new ISE("Could not send request to server")); execHelper.sendSyncRequest(); @@ -361,7 +383,8 @@ public void testUnstableServerAlertsAfterTimeout() druidNodeDiscovery.markNodeViewInitialized(); execHelper.finishInventoryInitialization(); - druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + final DiscoveryDruidNode druidNode = druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + Assert.assertTrue(addedServers.contains(druidNode.toDruidServer().getMetadata())); serviceEmitter.flush(); httpClient.completeNextRequestWith(InvalidInput.exception("failure on server")); @@ -384,7 +407,10 @@ public void testInitWaitsForServerToSync() { httpServerInventoryView.start(); druidNodeDiscovery.markNodeViewInitialized(); - druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + final DiscoveryDruidNode druidNode = druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + final DruidServer server = druidNode.toDruidServer(); + + Assert.assertTrue(addedServers.contains(server.getMetadata())); ExecutorService initExecutor = Execs.singleThreaded(EXEC_NAME_PREFIX + "-init"); @@ -417,6 +443,7 @@ public void testInitDoesNotWaitForRemovedServerToSync() httpServerInventoryView.start(); druidNodeDiscovery.markNodeViewInitialized(); DiscoveryDruidNode node = druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + Assert.assertTrue(addedServers.contains(node.toDruidServer().getMetadata())); ExecutorService initExecutor = Execs.singleThreaded(EXEC_NAME_PREFIX + "-init"); @@ -488,11 +515,22 @@ public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentS } ); - httpServerInventoryView.registerServerRemovedCallback( + httpServerInventoryView.registerServerCallback( Execs.directExecutor(), - server -> { - removedServers.add(server.getMetadata()); - return ServerView.CallbackAction.CONTINUE; + new ServerView.ServerCallback() { + @Override + public ServerView.CallbackAction serverAdded(DruidServer server) + { + addedServers.add(server.getMetadata()); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction serverRemoved(DruidServer server) + { + removedServers.add(server.getMetadata()); + return ServerView.CallbackAction.CONTINUE; + } } ); } diff --git a/server/src/test/java/org/apache/druid/client/SimpleServerView.java b/server/src/test/java/org/apache/druid/client/SimpleServerView.java index f7c93f2c9e27..0843fb427e10 100644 --- a/server/src/test/java/org/apache/druid/client/SimpleServerView.java +++ b/server/src/test/java/org/apache/druid/client/SimpleServerView.java @@ -149,7 +149,7 @@ public void registerTimelineCallback(Executor exec, TimelineCallback callback) } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { // do nothing } diff --git a/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java b/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java index f79011ce3a61..b7bf33316fc4 100644 --- a/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java +++ b/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java @@ -28,8 +28,6 @@ import org.apache.curator.test.Timing; import org.apache.curator.utils.ZKPaths; import org.apache.druid.client.DruidServer; -import org.apache.druid.common.utils.UUIDUtils; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.timeline.DataSegment; import org.apache.zookeeper.CreateMode; @@ -45,8 +43,6 @@ public class CuratorTestBase protected Timing timing; protected CuratorFramework curator; - private int batchCtr = 0; - public void setupServerAndCurator() throws Exception { server = new TestingServer(); @@ -127,47 +123,6 @@ protected void announceSegmentForServer( } } - protected String announceBatchSegmentsForServer( - DruidServer druidServer, - ImmutableSet segments, - ZkPathsConfig zkPathsConfig, - ObjectMapper jsonMapper - ) - { - final String segmentAnnouncementPath = ZKPaths.makePath( - zkPathsConfig.getLiveSegmentsPath(), - druidServer.getHost(), - UUIDUtils.generateUuid( - druidServer.getHost(), - druidServer.getType().toString(), - druidServer.getTier(), - DateTimes.nowUtc().toString() - ) + (batchCtr++) - ); - - - try { - curator.create() - .compressed() - .withMode(CreateMode.EPHEMERAL) - .forPath(segmentAnnouncementPath, jsonMapper.writeValueAsBytes(segments)); - } - catch (KeeperException.NodeExistsException e) { - try { - curator.setData() - .forPath(segmentAnnouncementPath, jsonMapper.writeValueAsBytes(segments)); - } - catch (Exception e1) { - throw new RuntimeException(e1); - } - } - catch (Exception e) { - throw new RuntimeException(e); - } - - return segmentAnnouncementPath; - } - protected void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment, ZkPathsConfig zkPathsConfig) throws Exception { @@ -179,12 +134,6 @@ protected void unannounceSegmentForServer(DruidServer druidServer, DataSegment s curator.delete().guaranteed().forPath(path); } - protected void unannounceSegmentFromBatchForServer(DruidServer druidServer, DataSegment segment, String batchPath, ZkPathsConfig zkPathsConfig) - throws Exception - { - curator.delete().guaranteed().forPath(batchPath); - } - public void tearDownServerAndCurator() { try { diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java index a738c671fe8a..89792d85b825 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java @@ -535,7 +535,7 @@ private static class TestServerInventoryView implements ServerInventoryView private final Map serverMap = new HashMap<>(); private final Map> segmentsMap = new HashMap<>(); private final List> segmentCallbacks = new ArrayList<>(); - private final List> serverRemovedCallbacks = new ArrayList<>(); + private final List> serverRemovedCallbacks = new ArrayList<>(); private void init() { @@ -573,7 +573,7 @@ public void registerSegmentCallback( } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { serverRemovedCallbacks.add(new NonnullPair<>(callback, exec)); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java index df17f8beaba7..0c7f68a3ff87 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java @@ -130,7 +130,7 @@ public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { serverChangeHandlers.add(new ServerChangeHandler(callback, exec)); } @@ -196,9 +196,9 @@ public void removeSegment( private static class ServerChangeHandler { private final Executor executor; - private final ServerRemovedCallback callback; + private final ServerCallback callback; - private ServerChangeHandler(ServerRemovedCallback callback, Executor executor) + private ServerChangeHandler(ServerCallback callback, Executor executor) { this.callback = callback; this.executor = executor; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java index f1a8dd0be1c1..e74c1c507761 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java @@ -454,7 +454,7 @@ private static class TestServerInventoryView implements FilteredServerInventoryV private final Map serverMap = new HashMap<>(); private final Map> segmentsMap = new HashMap<>(); private final List> segmentCallbacks = new ArrayList<>(); - private final List> serverRemovedCallbacks = new ArrayList<>(); + private final List> serverRemovedCallbacks = new ArrayList<>(); private void init() { @@ -493,7 +493,7 @@ public void registerSegmentCallback( } @Override - public void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerView.ServerCallback callback) { serverRemovedCallbacks.add(new NonnullPair<>(callback, exec)); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index d7bf79721e66..3e732e24f9f3 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -639,9 +639,9 @@ public void registerSegmentCallback( } @Override - public void registerServerRemovedCallback( + public void registerServerCallback( Executor exec, - ServerView.ServerRemovedCallback callback + ServerView.ServerCallback callback ) { throw new UnsupportedOperationException(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java index a23cefacb4fa..8f0c7eec4b30 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java @@ -176,7 +176,7 @@ public QueryRunner getQueryRunner(DruidServer server) } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { // Do nothing }