diff --git a/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java b/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java index 5026130f175f..8548dfd6e237 100644 --- a/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java +++ b/server/src/main/java/io/druid/client/AbstractCuratorServerInventoryView.java @@ -156,14 +156,7 @@ public void inventoryInitialized() { log.info("Inventory Initialized"); runSegmentCallbacks( - new Function() - { - @Override - public CallbackAction apply(SegmentCallback input) - { - return input.segmentViewInitialized(); - } - } + (SegmentCallback input) -> input.segmentViewInitialized() ); } } @@ -232,15 +225,10 @@ protected void runSegmentCallbacks( { for (final Map.Entry entry : segmentCallbacks.entrySet()) { entry.getValue().execute( - new Runnable() - { - @Override - public void run() - { - if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { - segmentCallbackRemoved(entry.getKey()); - segmentCallbacks.remove(entry.getKey()); - } + () -> { + if (CallbackAction.UNREGISTER == fn.apply(entry.getKey())) { + segmentCallbackRemoved(entry.getKey()); + segmentCallbacks.remove(entry.getKey()); } } ); @@ -251,14 +239,9 @@ private void runServerRemovedCallbacks(final DruidServer server) { for (final Map.Entry entry : serverRemovedCallbacks.entrySet()) { entry.getValue().execute( - new Runnable() - { - @Override - public void run() - { - if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) { - serverRemovedCallbacks.remove(entry.getKey()); - } + () -> { + if (CallbackAction.UNREGISTER == entry.getKey().serverRemoved(server)) { + serverRemovedCallbacks.remove(entry.getKey()); } } ); @@ -285,14 +268,7 @@ protected void addSingleInventory( container.addDataSegment(inventory.getIdentifier(), inventory); runSegmentCallbacks( - new Function() - { - @Override - public CallbackAction apply(SegmentCallback input) - { - return input.segmentAdded(container.getMetadata(), inventory); - } - } + (input) -> input.segmentAdded(container.getMetadata(), inventory) ); } @@ -314,14 +290,7 @@ protected void removeSingleInventory(final DruidServer container, String invento container.removeDataSegment(inventoryKey); runSegmentCallbacks( - new Function() - { - @Override - public CallbackAction apply(SegmentCallback input) - { - return input.segmentRemoved(container.getMetadata(), segment); - } - } + (input) -> input.segmentRemoved(container.getMetadata(), segment) ); } diff --git a/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java b/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java index 5b66d6917a12..c79e43369e8e 100644 --- a/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java +++ b/server/src/main/java/io/druid/curator/inventory/CuratorInventoryManager.java @@ -19,7 +19,6 @@ package io.druid.curator.inventory; -import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.MapMaker; import com.google.common.collect.Sets; @@ -39,7 +38,9 @@ import java.io.IOException; import java.util.Set; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** @@ -69,6 +70,8 @@ public class CuratorInventoryManager private volatile PathChildrenCache childrenCache; + private final CountDownLatch latch = new CountDownLatch(1); + public CuratorInventoryManager( CuratorFramework curatorFramework, InventoryManagerConfig config, @@ -106,7 +109,8 @@ public void start() throws Exception childrenCache = cacheFactory.make(curatorFramework, config.getContainerPath()); } - childrenCache.getListenable().addListener(new ContainerCacheListener()); + ContainerCacheListener containerCacheListener = new ContainerCacheListener(); + childrenCache.getListenable().addListener(containerCacheListener); try { childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); @@ -122,6 +126,13 @@ public void start() throws Exception } throw e; } + + // Block start() till PathChildrenCache is fully loaded + while (!latch.await(1, TimeUnit.MINUTES)) { + log.info("Waiting for PathChildrenCache to be completely loaded."); + } + + log.info("PathChildrenCache has been loaded."); } @LifecycleStop @@ -164,14 +175,7 @@ public Iterable getInventory() { return Iterables.transform( containers.values(), - new Function() - { - @Override - public ContainerClass apply(ContainerHolder input) - { - return input.getContainer(); - } - } + (ContainerHolder input) -> input.getContainer() ); } @@ -197,7 +201,7 @@ private class ContainerHolder PathChildrenCache cache ) { - this.container = new AtomicReference(container); + this.container = new AtomicReference<>(container); this.cache = cache; } @@ -351,6 +355,7 @@ private void maybeDoneInitializing() // only fire if we are done initializing the parent PathChildrenCache if (containersInitialized && uninitializedInventory.isEmpty()) { doneInitializing = true; + latch.countDown(); strategy.inventoryInitialized(); } } diff --git a/server/src/test/java/io/druid/curator/inventory/CuratorInventoryManagerTest.java b/server/src/test/java/io/druid/curator/inventory/CuratorInventoryManagerTest.java index 1e54b7a759ba..0e1030089284 100644 --- a/server/src/test/java/io/druid/curator/inventory/CuratorInventoryManagerTest.java +++ b/server/src/test/java/io/druid/curator/inventory/CuratorInventoryManagerTest.java @@ -27,7 +27,6 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.framework.api.CuratorEventType; -import org.apache.curator.framework.api.CuratorListener; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.Watcher; import org.junit.After; @@ -62,7 +61,7 @@ public void tearDown() throws Exception public void testSanity() throws Exception { final MapStrategy strategy = new MapStrategy(); - CuratorInventoryManager, Integer> manager = new CuratorInventoryManager, Integer>( + CuratorInventoryManager, Integer> manager = new CuratorInventoryManager<>( curator, new StringInventoryManagerConfig("/container", "/inventory"), exec, strategy ); @@ -114,14 +113,10 @@ curator, new StringInventoryManagerConfig("/container", "/inventory"), exec, str final CountDownLatch latch = new CountDownLatch(1); curator.getCuratorListenable().addListener( - new CuratorListener() { - @Override - public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception - { - if (event.getType() == CuratorEventType.WATCHED - && event.getWatchedEvent().getState() == Watcher.Event.KeeperState.Disconnected) { - latch.countDown(); - } + (CuratorFramework client, CuratorEvent event) -> { + if (event.getType() == CuratorEventType.WATCHED + && event.getWatchedEvent().getState() == Watcher.Event.KeeperState.Disconnected) { + latch.countDown(); } } ); @@ -247,11 +242,6 @@ private void setNewContainerLatch(CountDownLatch newContainerLatch) this.newContainerLatch = newContainerLatch; } - private void setDeadContainerLatch(CountDownLatch deadContainerLatch) - { - this.deadContainerLatch = deadContainerLatch; - } - private void setNewInventoryLatch(CountDownLatch newInventoryLatch) { this.newInventoryLatch = newInventoryLatch;