From aa6ed8c84eef8f95a92511db75cc7d5963155565 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 3 Jul 2025 05:50:51 -0700 Subject: [PATCH 1/6] Include empty servers in BrokerServerView. Fixes #18199, because this makes empty Historicals visible through getDruidServerMetadatas. It also makes them visible through getDruidServers, causes them to show up in the sys.servers table. --- .../CachingClusteredClientBenchmark.java | 2 +- .../movingaverage/MovingAverageQueryTest.java | 2 +- .../client/BatchServerInventoryView.java | 17 +++--- .../apache/druid/client/BrokerServerView.java | 25 ++++++--- .../druid/client/CoordinatorServerView.java | 10 +++- .../client/FilteredServerInventoryView.java | 2 +- .../druid/client/HttpServerInventoryView.java | 32 +++++------- .../org/apache/druid/client/ServerView.java | 20 ++++++- .../inventory/CuratorInventoryManager.java | 2 +- .../CachingCostBalancerStrategyFactory.java | 21 ++++++-- .../druid/client/BrokerServerViewTest.java | 27 ++++++---- ...chingClusteredClientFunctionalityTest.java | 2 +- .../client/CachingClusteredClientTest.java | 2 +- .../client/HttpServerInventoryViewTest.java | 52 ++++++++++++++++--- .../apache/druid/client/SimpleServerView.java | 2 +- .../apache/druid/curator/CuratorTestBase.java | 51 ------------------ ...inatorSegmentDataCacheConcurrencyTest.java | 4 +- .../simulate/TestServerInventoryView.java | 6 +-- ...erSegmentMetadataCacheConcurrencyTest.java | 4 +- .../druid/sql/calcite/util/CalciteTests.java | 4 +- .../calcite/util/TestTimelineServerView.java | 2 +- 21 files changed, 162 insertions(+), 127 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index df4677c3bc8f..2a3aede2a904 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -556,7 +556,7 @@ public void registerTimelineCallback(Executor exec, TimelineCallback callback) } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { // do nothing } diff --git a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java index 41bd3db31ad6..d16895dc7712 100644 --- a/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java +++ b/extensions-contrib/moving-average-query/src/test/java/org/apache/druid/query/movingaverage/MovingAverageQueryTest.java @@ -352,7 +352,7 @@ public void registerSegmentCallback(Executor exec, SegmentCallback callback) } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { } diff --git a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java index 58d4fb0927ee..f14666030ec8 100644 --- a/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/BatchServerInventoryView.java @@ -65,7 +65,7 @@ public class BatchServerInventoryView implements ServerInventoryView, FilteredSe private final CuratorInventoryManager> inventoryManager; private final AtomicBoolean started = new AtomicBoolean(false); - private final ConcurrentMap serverRemovedCallbacks = new ConcurrentHashMap<>(); + private final ConcurrentMap serverCallbacks = new ConcurrentHashMap<>(); private final ConcurrentMap segmentCallbacks = new ConcurrentHashMap<>(); private final ConcurrentMap> zNodes = new ConcurrentHashMap<>(); @@ -127,13 +127,14 @@ public Set deserializeInventory(byte[] bytes) public void newContainer(DruidServer container) { log.info("New Server[%s]", container); + runServerCallbacks(callback -> callback.serverAdded(container)); } @Override public void deadContainer(DruidServer deadContainer) { log.info("Server Disappeared[%s]", deadContainer); - runServerRemovedCallbacks(deadContainer); + runServerCallbacks(callback -> callback.serverRemoved(deadContainer)); } @Override @@ -216,9 +217,9 @@ public Collection getInventory() } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { - serverRemovedCallbacks.put(callback, exec); + serverCallbacks.put(callback, exec); } @Override @@ -243,13 +244,13 @@ protected void runSegmentCallbacks( } } - private void runServerRemovedCallbacks(final DruidServer server) + private void runServerCallbacks(final Function fn) { - for (final Map.Entry entry : serverRemovedCallbacks.entrySet()) { + for (final Map.Entry entry : serverCallbacks.entrySet()) { entry.getValue().execute( () -> { - if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) { - serverRemovedCallbacks.remove(entry.getKey()); + if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { + serverCallbacks.remove(entry.getKey()); } } ); diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 9e13625b9bb8..6b0acf7f7de3 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -157,11 +157,24 @@ public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) segmentFilter ); - baseView.registerServerRemovedCallback( + baseView.registerServerCallback( exec, - server -> { - removeServer(server); - return CallbackAction.CONTINUE; + new ServerCallback() { + @Override + public CallbackAction serverAdded(DruidServer server) + { + if (!server.getType().equals(ServerType.BROKER)) { + addServer(server); + } + return CallbackAction.CONTINUE; + } + + @Override + public CallbackAction serverRemoved(DruidServer server) + { + removeServer(server); + return CallbackAction.CONTINUE; + } } ); } @@ -378,9 +391,9 @@ public QueryRunner getQueryRunner(DruidServer server) } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { - baseView.registerServerRemovedCallback(exec, callback); + baseView.registerServerCallback(exec, callback); } @Override diff --git a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java index 1f83e5e81ce0..e8120ee2bb56 100644 --- a/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java +++ b/server/src/main/java/org/apache/druid/client/CoordinatorServerView.java @@ -125,10 +125,16 @@ public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentS } ); - baseView.registerServerRemovedCallback( + baseView.registerServerCallback( exec, - new ServerView.ServerRemovedCallback() + new ServerView.ServerCallback() { + @Override + public ServerView.CallbackAction serverAdded(DruidServer server) + { + return ServerView.CallbackAction.CONTINUE; + } + @Override public ServerView.CallbackAction serverRemoved(DruidServer server) { diff --git a/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java b/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java index 0d0ea6347d11..db1a4b4f1b33 100644 --- a/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/FilteredServerInventoryView.java @@ -34,5 +34,5 @@ void registerSegmentCallback( Predicate> filter ); - void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback); + void registerServerCallback(Executor exec, ServerView.ServerCallback callback); } diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java index d19df0c2d73e..caff2dee6054 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java @@ -93,7 +93,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer private final LifecycleLock lifecycleLock = new LifecycleLock(); - private final ConcurrentMap serverCallbacks = new ConcurrentHashMap<>(); + private final ConcurrentMap serverCallbacks = new ConcurrentHashMap<>(); private final ConcurrentMap segmentCallbacks = new ConcurrentHashMap<>(); private final ConcurrentMap>> segmentPredicates = @@ -286,7 +286,7 @@ public void registerSegmentCallback( } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { if (lifecycleLock.isStarted()) { throw new ISE("Lifecycle has already started."); @@ -347,18 +347,13 @@ public void run() } } - private void runServerRemovedCallbacks(final DruidServer server) + private void runServerCallbacks(final Function fn) { - for (final Map.Entry entry : serverCallbacks.entrySet()) { + for (final Map.Entry entry : serverCallbacks.entrySet()) { entry.getValue().execute( - new Runnable() - { - @Override - public void run() - { - if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) { - serverCallbacks.remove(entry.getKey()); - } + () -> { + if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { + serverCallbacks.remove(entry.getKey()); } } ); @@ -421,12 +416,13 @@ private void updateFinalPredicate() void serverAdded(DruidServer server) { synchronized (servers) { - DruidServerHolder holder = servers.get(server.getName()); - if (holder == null) { + DruidServerHolder existing = servers.get(server.getName()); + if (existing == null) { log.info("Server[%s] appeared.", server.getName()); - holder = new DruidServerHolder(server); - servers.put(server.getName(), holder); - holder.start(); + final DruidServerHolder newHolder = new DruidServerHolder(server); + servers.put(server.getName(), newHolder); + runServerCallbacks(callback -> callback.serverAdded(newHolder.druidServer)); + newHolder.start(); } else { log.info("Server[%s] already exists.", server.getName()); } @@ -440,7 +436,7 @@ private void serverRemoved(DruidServer server) if (holder != null) { log.info("Server[%s] disappeared.", server.getName()); holder.stop(); - runServerRemovedCallbacks(holder.druidServer); + runServerCallbacks(callback -> callback.serverRemoved(holder.druidServer)); } else { log.info("Ignoring remove notification for unknown server[%s].", server.getName()); } diff --git a/server/src/main/java/org/apache/druid/client/ServerView.java b/server/src/main/java/org/apache/druid/client/ServerView.java index 83cb4856a88f..8763c2552e2d 100644 --- a/server/src/main/java/org/apache/druid/client/ServerView.java +++ b/server/src/main/java/org/apache/druid/client/ServerView.java @@ -29,7 +29,7 @@ */ public interface ServerView { - void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback); + void registerServerCallback(Executor exec, ServerCallback callback); void registerSegmentCallback(Executor exec, SegmentCallback callback); enum CallbackAction @@ -38,8 +38,24 @@ enum CallbackAction UNREGISTER, } - interface ServerRemovedCallback + interface ServerCallback { + /** + * Called when a server is added. + * + * The return value indicates if this callback has completed its work. Note that even if this callback + * indicates that it should be unregistered, it is not possible to guarantee that this callback will not + * get called again. There is a race condition between when this callback runs and other events that can cause + * the callback to be queued for running. Thus, callbacks shouldn't assume that they will not get called + * again after they are done. The contract is that the callback will eventually be unregistered, enforcing + * a happens-before relationship is not part of the contract. + * + * @param server The server that was removed. + * @return UNREGISTER if the callback has completed its work and should be unregistered. CONTINUE if the callback + * should remain registered. + */ + CallbackAction serverAdded(DruidServer server); + /** * Called when a server is removed. * diff --git a/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java b/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java index 4bceb2a8594d..cb3cc3ef6bc3 100644 --- a/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java +++ b/server/src/main/java/org/apache/druid/curator/inventory/CuratorInventoryManager.java @@ -259,8 +259,8 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th containers.put(containerKey, new ContainerHolder(container, inventoryCache)); log.debug("Starting inventory cache for %s, inventoryPath %s", containerKey, inventoryPath); - inventoryCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); strategy.newContainer(container); + inventoryCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); } } break; diff --git a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java index 41d56109d5bd..daa60257a686 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/balancer/CachingCostBalancerStrategyFactory.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; import com.google.common.util.concurrent.ListeningExecutorService; +import org.apache.druid.client.DruidServer; import org.apache.druid.client.ServerInventoryView; import org.apache.druid.client.ServerView; import org.apache.druid.java.util.common.concurrent.Execs; @@ -106,13 +107,23 @@ public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentS } ); - serverInventoryView.registerServerRemovedCallback( + serverInventoryView.registerServerCallback( executor, - server -> { - if (server.isSegmentReplicationTarget()) { - clusterCostCacheBuilder.removeServer(server.getName()); + new ServerView.ServerCallback() { + @Override + public ServerView.CallbackAction serverAdded(DruidServer server) + { + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction serverRemoved(DruidServer server) + { + if (server.isSegmentReplicationTarget()) { + clusterCostCacheBuilder.removeServer(server.getName()); + } + return ServerView.CallbackAction.CONTINUE; } - return ServerView.CallbackAction.CONTINUE; } ); } diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index 707d61b140ba..bf203ccdc1f8 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -254,13 +254,25 @@ public void testMultipleServerAndBroker() throws Exception 0 ); - final List druidServers = Lists.transform( - ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), - hostname -> setupHistoricalServer("default_tier", hostname, 0) - ); + // Materialize this list so all servers are set up + final List druidServers = + ImmutableList.copyOf( + Lists.transform( + ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), + hostname -> setupHistoricalServer("default_tier", hostname, 0) + ) + ); setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper); + Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + + // check server metadatas + Assert.assertEquals( + druidServers.stream().map(DruidServer::getMetadata).collect(Collectors.toSet()), + ImmutableSet.copyOf(brokerServerView.getDruidServerMetadatas()) + ); + final List segments = Lists.transform( ImmutableList.of( Pair.of("2011-04-01/2011-04-03", "v1"), @@ -277,7 +289,6 @@ public void testMultipleServerAndBroker() throws Exception for (int i = 0; i < 5; ++i) { announceSegmentForServer(druidServers.get(i), segments.get(i), zkPathsConfig, jsonMapper); } - Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentAddedLatch)); TimelineLookup timeline = brokerServerView.getTimeline( @@ -297,12 +308,6 @@ public void testMultipleServerAndBroker() throws Exception ) ); - // check server metadatas - Assert.assertEquals( - druidServers.stream().map(DruidServer::getMetadata).collect(Collectors.toSet()), - ImmutableSet.copyOf(brokerServerView.getDruidServerMetadatas()) - ); - // unannounce the broker segment should do nothing to announcements unannounceSegmentForServer(druidBroker, brokerSegment, zkPathsConfig); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentRemovedLatch)); diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java index 20972a6127a9..e7a674557476 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientFunctionalityTest.java @@ -265,7 +265,7 @@ public QueryRunner getQueryRunner(DruidServer server) } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { } diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index d42b44b2db6c..d0ae542614c3 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -2643,7 +2643,7 @@ public void registerTimelineCallback(final Executor exec, final TimelineCallback } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { } diff --git a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java index 69e0b6671df1..fd026e090cd2 100644 --- a/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java +++ b/server/src/test/java/org/apache/druid/client/HttpServerInventoryViewTest.java @@ -92,6 +92,7 @@ public class HttpServerInventoryViewTest private Map> segmentsAddedToView; private Map> segmentsRemovedFromView; + private Set addedServers; private Set removedServers; private AtomicBoolean inventoryInitialized; @@ -114,6 +115,7 @@ public void setup() segmentsAddedToView = new HashMap<>(); segmentsRemovedFromView = new HashMap<>(); + addedServers = new HashSet<>(); removedServers = new HashSet<>(); createInventoryView( @@ -170,6 +172,7 @@ public void testAddNodeStartsSync() Collection inventory = httpServerInventoryView.getInventory(); Assert.assertEquals(1, inventory.size()); Assert.assertTrue(inventory.contains(server)); + Assert.assertTrue(addedServers.contains(server.getMetadata())); execHelper.emitMetrics(); serviceEmitter.verifyValue(METRIC_SUCCESS, 1); @@ -211,6 +214,22 @@ public void testRemoveNodeStopsSync() httpServerInventoryView.stop(); } + @Test + public void testAddNodeTriggersServerAddedCallback() + { + httpServerInventoryView.start(); + druidNodeDiscovery.markNodeViewInitialized(); + execHelper.finishInventoryInitialization(); + + final DiscoveryDruidNode druidNode = druidNodeDiscovery + .addNodeAndNotifyListeners("localhost"); + final DruidServer server = druidNode.toDruidServer(); + + Assert.assertTrue(addedServers.contains(server.getMetadata())); + + httpServerInventoryView.stop(); + } + @Test(timeout = 60_000L) public void testSyncSegmentLoadAndDrop() { @@ -222,6 +241,8 @@ public void testSyncSegmentLoadAndDrop() .addNodeAndNotifyListeners("localhost"); final DruidServer server = druidNode.toDruidServer(); + Assert.assertTrue(addedServers.contains(server.getMetadata())); + final DataSegment[] segments = CreateDataSegments.ofDatasource("wiki") .forIntervals(4, Granularities.DAY) @@ -318,7 +339,8 @@ public void testSyncWhenRequestFailedToSend() druidNodeDiscovery.markNodeViewInitialized(); execHelper.finishInventoryInitialization(); - druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + final DiscoveryDruidNode druidNode = druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + Assert.assertTrue(addedServers.contains(druidNode.toDruidServer().getMetadata())); httpClient.failToSendNextRequestWith(new ISE("Could not send request to server")); execHelper.sendSyncRequest(); @@ -361,7 +383,8 @@ public void testUnstableServerAlertsAfterTimeout() druidNodeDiscovery.markNodeViewInitialized(); execHelper.finishInventoryInitialization(); - druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + final DiscoveryDruidNode druidNode = druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + Assert.assertTrue(addedServers.contains(druidNode.toDruidServer().getMetadata())); serviceEmitter.flush(); httpClient.completeNextRequestWith(InvalidInput.exception("failure on server")); @@ -384,7 +407,10 @@ public void testInitWaitsForServerToSync() { httpServerInventoryView.start(); druidNodeDiscovery.markNodeViewInitialized(); - druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + final DiscoveryDruidNode druidNode = druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + final DruidServer server = druidNode.toDruidServer(); + + Assert.assertTrue(addedServers.contains(server.getMetadata())); ExecutorService initExecutor = Execs.singleThreaded(EXEC_NAME_PREFIX + "-init"); @@ -417,6 +443,7 @@ public void testInitDoesNotWaitForRemovedServerToSync() httpServerInventoryView.start(); druidNodeDiscovery.markNodeViewInitialized(); DiscoveryDruidNode node = druidNodeDiscovery.addNodeAndNotifyListeners("localhost"); + Assert.assertTrue(addedServers.contains(node.toDruidServer().getMetadata())); ExecutorService initExecutor = Execs.singleThreaded(EXEC_NAME_PREFIX + "-init"); @@ -488,11 +515,22 @@ public ServerView.CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentS } ); - httpServerInventoryView.registerServerRemovedCallback( + httpServerInventoryView.registerServerCallback( Execs.directExecutor(), - server -> { - removedServers.add(server.getMetadata()); - return ServerView.CallbackAction.CONTINUE; + new ServerView.ServerCallback() { + @Override + public ServerView.CallbackAction serverAdded(DruidServer server) + { + addedServers.add(server.getMetadata()); + return ServerView.CallbackAction.CONTINUE; + } + + @Override + public ServerView.CallbackAction serverRemoved(DruidServer server) + { + removedServers.add(server.getMetadata()); + return ServerView.CallbackAction.CONTINUE; + } } ); } diff --git a/server/src/test/java/org/apache/druid/client/SimpleServerView.java b/server/src/test/java/org/apache/druid/client/SimpleServerView.java index f7c93f2c9e27..0843fb427e10 100644 --- a/server/src/test/java/org/apache/druid/client/SimpleServerView.java +++ b/server/src/test/java/org/apache/druid/client/SimpleServerView.java @@ -149,7 +149,7 @@ public void registerTimelineCallback(Executor exec, TimelineCallback callback) } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { // do nothing } diff --git a/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java b/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java index f79011ce3a61..b7bf33316fc4 100644 --- a/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java +++ b/server/src/test/java/org/apache/druid/curator/CuratorTestBase.java @@ -28,8 +28,6 @@ import org.apache.curator.test.Timing; import org.apache.curator.utils.ZKPaths; import org.apache.druid.client.DruidServer; -import org.apache.druid.common.utils.UUIDUtils; -import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.server.initialization.ZkPathsConfig; import org.apache.druid.timeline.DataSegment; import org.apache.zookeeper.CreateMode; @@ -45,8 +43,6 @@ public class CuratorTestBase protected Timing timing; protected CuratorFramework curator; - private int batchCtr = 0; - public void setupServerAndCurator() throws Exception { server = new TestingServer(); @@ -127,47 +123,6 @@ protected void announceSegmentForServer( } } - protected String announceBatchSegmentsForServer( - DruidServer druidServer, - ImmutableSet segments, - ZkPathsConfig zkPathsConfig, - ObjectMapper jsonMapper - ) - { - final String segmentAnnouncementPath = ZKPaths.makePath( - zkPathsConfig.getLiveSegmentsPath(), - druidServer.getHost(), - UUIDUtils.generateUuid( - druidServer.getHost(), - druidServer.getType().toString(), - druidServer.getTier(), - DateTimes.nowUtc().toString() - ) + (batchCtr++) - ); - - - try { - curator.create() - .compressed() - .withMode(CreateMode.EPHEMERAL) - .forPath(segmentAnnouncementPath, jsonMapper.writeValueAsBytes(segments)); - } - catch (KeeperException.NodeExistsException e) { - try { - curator.setData() - .forPath(segmentAnnouncementPath, jsonMapper.writeValueAsBytes(segments)); - } - catch (Exception e1) { - throw new RuntimeException(e1); - } - } - catch (Exception e) { - throw new RuntimeException(e); - } - - return segmentAnnouncementPath; - } - protected void unannounceSegmentForServer(DruidServer druidServer, DataSegment segment, ZkPathsConfig zkPathsConfig) throws Exception { @@ -179,12 +134,6 @@ protected void unannounceSegmentForServer(DruidServer druidServer, DataSegment s curator.delete().guaranteed().forPath(path); } - protected void unannounceSegmentFromBatchForServer(DruidServer druidServer, DataSegment segment, String batchPath, ZkPathsConfig zkPathsConfig) - throws Exception - { - curator.delete().guaranteed().forPath(batchPath); - } - public void tearDownServerAndCurator() { try { diff --git a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java index a738c671fe8a..89792d85b825 100644 --- a/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java +++ b/server/src/test/java/org/apache/druid/segment/metadata/CoordinatorSegmentDataCacheConcurrencyTest.java @@ -535,7 +535,7 @@ private static class TestServerInventoryView implements ServerInventoryView private final Map serverMap = new HashMap<>(); private final Map> segmentsMap = new HashMap<>(); private final List> segmentCallbacks = new ArrayList<>(); - private final List> serverRemovedCallbacks = new ArrayList<>(); + private final List> serverRemovedCallbacks = new ArrayList<>(); private void init() { @@ -573,7 +573,7 @@ public void registerSegmentCallback( } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { serverRemovedCallbacks.add(new NonnullPair<>(callback, exec)); } diff --git a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java index df17f8beaba7..0c7f68a3ff87 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/simulate/TestServerInventoryView.java @@ -130,7 +130,7 @@ public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { serverChangeHandlers.add(new ServerChangeHandler(callback, exec)); } @@ -196,9 +196,9 @@ public void removeSegment( private static class ServerChangeHandler { private final Executor executor; - private final ServerRemovedCallback callback; + private final ServerCallback callback; - private ServerChangeHandler(ServerRemovedCallback callback, Executor executor) + private ServerChangeHandler(ServerCallback callback, Executor executor) { this.callback = callback; this.executor = executor; diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java index f1a8dd0be1c1..e74c1c507761 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/BrokerSegmentMetadataCacheConcurrencyTest.java @@ -454,7 +454,7 @@ private static class TestServerInventoryView implements FilteredServerInventoryV private final Map serverMap = new HashMap<>(); private final Map> segmentsMap = new HashMap<>(); private final List> segmentCallbacks = new ArrayList<>(); - private final List> serverRemovedCallbacks = new ArrayList<>(); + private final List> serverRemovedCallbacks = new ArrayList<>(); private void init() { @@ -493,7 +493,7 @@ public void registerSegmentCallback( } @Override - public void registerServerRemovedCallback(Executor exec, ServerView.ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerView.ServerCallback callback) { serverRemovedCallbacks.add(new NonnullPair<>(callback, exec)); } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index d7bf79721e66..3e732e24f9f3 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -639,9 +639,9 @@ public void registerSegmentCallback( } @Override - public void registerServerRemovedCallback( + public void registerServerCallback( Executor exec, - ServerView.ServerRemovedCallback callback + ServerView.ServerCallback callback ) { throw new UnsupportedOperationException(); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java index a23cefacb4fa..8f0c7eec4b30 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/TestTimelineServerView.java @@ -176,7 +176,7 @@ public QueryRunner getQueryRunner(DruidServer server) } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerServerCallback(Executor exec, ServerCallback callback) { // Do nothing } From 12531355fae03366b553d99914a6972b38e3573a Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 3 Jul 2025 06:25:27 -0700 Subject: [PATCH 2/6] Fix comment. --- server/src/main/java/org/apache/druid/client/ServerView.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/apache/druid/client/ServerView.java b/server/src/main/java/org/apache/druid/client/ServerView.java index 8763c2552e2d..032cb07f4cf2 100644 --- a/server/src/main/java/org/apache/druid/client/ServerView.java +++ b/server/src/main/java/org/apache/druid/client/ServerView.java @@ -50,7 +50,7 @@ interface ServerCallback * again after they are done. The contract is that the callback will eventually be unregistered, enforcing * a happens-before relationship is not part of the contract. * - * @param server The server that was removed. + * @param server The server that was added. * @return UNREGISTER if the callback has completed its work and should be unregistered. CONTINUE if the callback * should remain registered. */ From ae760847ca75fb8b2845403719131599fe18dd41 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 3 Jul 2025 07:06:42 -0700 Subject: [PATCH 3/6] Less flaky. --- .../druid/client/BrokerServerViewTest.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index bf203ccdc1f8..2fcb10a9ea87 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -76,6 +76,7 @@ public class BrokerServerViewTest extends CuratorTestBase private final ZkPathsConfig zkPathsConfig; private CountDownLatch segmentViewInitLatch; + private CountDownLatch serverAddedLatch; private CountDownLatch segmentAddedLatch; private CountDownLatch segmentRemovedLatch; @@ -237,6 +238,7 @@ public void testMultipleServerAddedRemovedSegment() throws Exception public void testMultipleServerAndBroker() throws Exception { segmentViewInitLatch = new CountDownLatch(1); + serverAddedLatch = new CountDownLatch(5); segmentAddedLatch = new CountDownLatch(6); // temporarily set latch count to 1 @@ -266,6 +268,7 @@ public void testMultipleServerAndBroker() throws Exception setupZNodeForServer(druidBroker, zkPathsConfig, jsonMapper); Assert.assertTrue(timing.forWaiting().awaitLatch(segmentViewInitLatch)); + Assert.assertTrue(timing.forWaiting().awaitLatch(serverAddedLatch)); // check server metadatas Assert.assertEquals( @@ -622,6 +625,29 @@ private void setupViews(Set watchedTiers, Set ignoredTiers, bool "test" ) { + @Override + public void registerServerCallback(Executor exec, ServerCallback callback) + { + super.registerServerCallback( + exec, + new ServerCallback() { + @Override + public CallbackAction serverAdded(DruidServer server) + { + final CallbackAction res = callback.serverAdded(server); + serverAddedLatch.countDown(); + return res; + } + + @Override + public CallbackAction serverRemoved(DruidServer server) + { + return callback.serverRemoved(server); + } + } + ); + } + @Override public void registerSegmentCallback(Executor exec, final SegmentCallback callback) { From 6539526209a4c6280069d1bf5ded61089e90050b Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 3 Jul 2025 07:25:23 -0700 Subject: [PATCH 4/6] Fix latch count. --- .../java/org/apache/druid/client/BrokerServerViewTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index 2fcb10a9ea87..fc87a1faf2a9 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -159,7 +159,7 @@ public void testMultipleServerAddedRemovedSegment() throws Exception setupViews(); final List druidServers = Lists.transform( - ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), + ImmutableList.of("localhost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), hostname -> setupHistoricalServer("default_tier", hostname, 0) ); @@ -238,7 +238,7 @@ public void testMultipleServerAddedRemovedSegment() throws Exception public void testMultipleServerAndBroker() throws Exception { segmentViewInitLatch = new CountDownLatch(1); - serverAddedLatch = new CountDownLatch(5); + serverAddedLatch = new CountDownLatch(6); segmentAddedLatch = new CountDownLatch(6); // temporarily set latch count to 1 @@ -260,7 +260,7 @@ public void testMultipleServerAndBroker() throws Exception final List druidServers = ImmutableList.copyOf( Lists.transform( - ImmutableList.of("locahost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), + ImmutableList.of("localhost:0", "localhost:1", "localhost:2", "localhost:3", "localhost:4"), hostname -> setupHistoricalServer("default_tier", hostname, 0) ) ); From 1928bf784057de3c1d30e22fcd8b11005324bfa4 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 3 Jul 2025 09:02:44 -0700 Subject: [PATCH 5/6] Fix style. --- .../test/java/org/apache/druid/client/BrokerServerViewTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java index fc87a1faf2a9..1c65e2be2beb 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerServerViewTest.java @@ -238,7 +238,7 @@ public void testMultipleServerAddedRemovedSegment() throws Exception public void testMultipleServerAndBroker() throws Exception { segmentViewInitLatch = new CountDownLatch(1); - serverAddedLatch = new CountDownLatch(6); + serverAddedLatch = new CountDownLatch(6); segmentAddedLatch = new CountDownLatch(6); // temporarily set latch count to 1 From 0741a229b1bfb68ef9f84cb11ccbb3dc2db73086 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sun, 6 Jul 2025 14:32:09 -0700 Subject: [PATCH 6/6] Add comment. --- .../src/main/java/org/apache/druid/client/BrokerServerView.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main/java/org/apache/druid/client/BrokerServerView.java b/server/src/main/java/org/apache/druid/client/BrokerServerView.java index 6b0acf7f7de3..95dc86367250 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerServerView.java +++ b/server/src/main/java/org/apache/druid/client/BrokerServerView.java @@ -163,6 +163,7 @@ public CallbackAction segmentSchemasAnnounced(SegmentSchemas segmentSchemas) @Override public CallbackAction serverAdded(DruidServer server) { + // We don't track brokers in this view. if (!server.getType().equals(ServerType.BROKER)) { addServer(server); }