From 0e9b7e70ee070ce0366f9177a7348d72a91f8f74 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Thu, 14 Feb 2019 00:54:04 +0800 Subject: [PATCH] fix huge number of watches in zk issue --- .../worker/WorkerCuratorCoordinator.java | 7 +- .../curator/announcement/NodeAnnouncer.java | 349 ++++++++++++++++++ .../discovery/CuratorDruidNodeAnnouncer.java | 6 +- .../apache/druid/guice/AnnouncerModule.java | 8 + .../druid/query/lookup/LookupModule.java | 4 +- .../org/apache/druid/server/ZKPathsUtils.java | 30 ++ .../CuratorDataSegmentServerAnnouncer.java | 6 +- .../announcer/ListenerResourceAnnouncer.java | 8 +- .../client/BatchServerInventoryViewTest.java | 7 +- .../announcement/NodeAnnouncerTest.java | 309 ++++++++++++++++ ...torDruidNodeAnnouncerAndDiscoveryTest.java | 8 +- .../ListenerResourceAnnouncerTest.java | 23 +- 12 files changed, 722 insertions(+), 43 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java create mode 100644 server/src/main/java/org/apache/druid/server/ZKPathsUtils.java create mode 100644 server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java index 7e7c09893b9d..7ed8aea24f0b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/worker/WorkerCuratorCoordinator.java @@ -25,11 +25,10 @@ import com.google.inject.Inject; import org.apache.curator.framework.CuratorFramework; import org.apache.druid.curator.CuratorUtils; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; @@ -54,7 +53,7 @@ public class WorkerCuratorCoordinator private final ObjectMapper jsonMapper; private final RemoteTaskRunnerConfig config; private final CuratorFramework curatorFramework; - private final Announcer announcer; + private final NodeAnnouncer announcer; private final String baseAnnouncementsPath; private final String baseTaskPath; @@ -77,7 +76,7 @@ public WorkerCuratorCoordinator( this.curatorFramework = curatorFramework; this.worker = worker; - this.announcer = new Announcer(curatorFramework, Execs.directExecutor()); + this.announcer = new NodeAnnouncer(curatorFramework); this.baseAnnouncementsPath = getPath(Arrays.asList(indexerZkConfig.getAnnouncementsPath(), worker.getHost())); this.baseTaskPath = getPath(Arrays.asList(indexerZkConfig.getTasksPath(), worker.getHost())); diff --git a/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java new file mode 100644 index 000000000000..97ce642c4ab2 --- /dev/null +++ b/server/src/main/java/org/apache/druid/curator/announcement/NodeAnnouncer.java @@ -0,0 +1,349 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator.announcement; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.transaction.CuratorTransaction; +import org.apache.curator.framework.api.transaction.CuratorTransactionFinal; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.NodeCache; +import org.apache.curator.framework.recipes.cache.NodeCacheListener; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.guava.CloseQuietly; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.server.ZKPathsUtils; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.data.Stat; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * NodeAnnouncer announces single node on Zookeeper and only watches this node, + * while {@link Announcer} watches all child paths, not only this node + */ +public class NodeAnnouncer +{ + private static final Logger log = new Logger(NodeAnnouncer.class); + + private final CuratorFramework curator; + + // incase a path is added to `toAnnounce` in announce() before zk is connected, + // should remember the path and do announce in start() later + private final List toAnnounce = new ArrayList<>(); + // incase a path is added to `toUpdate` in update() before zk is connected, + // should remember the path and do update in start() later + private final List toUpdate = new ArrayList<>(); + private final ConcurrentMap listeners = new ConcurrentHashMap<>(); + private final ConcurrentMap announcedPaths = new ConcurrentHashMap<>(); + // only who creates the parent path can drop the parent path, so should remmeber the created parents + private final List parentsIBuilt = new CopyOnWriteArrayList(); + + private boolean started = false; + + public NodeAnnouncer(CuratorFramework curator) + { + this.curator = curator; + } + + @VisibleForTesting + Set getAddedPaths() + { + return announcedPaths.keySet(); + } + + @LifecycleStart + public void start() + { + log.info("Starting announcer"); + synchronized (toAnnounce) { + if (started) { + return; + } + + started = true; + + for (Announceable announceable : toAnnounce) { + announce(announceable.path, announceable.bytes, announceable.removeParentsIfCreated); + } + toAnnounce.clear(); + + for (Announceable announceable : toUpdate) { + update(announceable.path, announceable.bytes); + } + toUpdate.clear(); + } + } + + @LifecycleStop + public void stop() + { + log.info("Stopping announcer"); + synchronized (toAnnounce) { + if (!started) { + return; + } + + started = false; + + Closer closer = Closer.create(); + for (NodeCache cache : listeners.values()) { + closer.register(cache); + } + CloseQuietly.close(closer); + + for (String announcementPath : announcedPaths.keySet()) { + unannounce(announcementPath); + } + + if (!parentsIBuilt.isEmpty()) { + CuratorTransaction transaction = curator.inTransaction(); + for (String parent : parentsIBuilt) { + try { + transaction = transaction.delete().forPath(parent).and(); + } + catch (Exception e) { + log.error(e, "Unable to delete parent[%s].", parent); + } + } + try { + ((CuratorTransactionFinal) transaction).commit(); + } + catch (Exception e) { + log.error(e, "Unable to commit transaction."); + } + } + } + } + + /** + * Like announce(path, bytes, true). + */ + public void announce(String path, byte[] bytes) + { + announce(path, bytes, true); + } + + /** + * Announces the provided bytes at the given path. Announcement means that it will create an ephemeral node + * and monitor it to make sure that it always exists until it is unannounced or this object is closed. + * + * @param path The path to announce at + * @param bytes The payload to announce + * @param removeParentIfCreated remove parent of "path" if we had created that parent + */ + public void announce(String path, byte[] bytes, boolean removeParentIfCreated) + { + synchronized (toAnnounce) { + if (!started) { + toAnnounce.add(new Announceable(path, bytes, removeParentIfCreated)); + return; + } + } + + final String parentPath = ZKPathsUtils.getParentPath(path); + boolean buildParentPath = false; + + byte[] value = announcedPaths.get(path); + + if (value == null) { + try { + if (curator.checkExists().forPath(parentPath) == null) { + buildParentPath = true; + } + } + catch (Exception e) { + log.debug(e, "Problem checking if the parent existed, ignoring."); + } + + // Synchronize to make sure that I only create a listener once. + synchronized (toAnnounce) { + if (!listeners.containsKey(path)) { + final NodeCache cache = new NodeCache(curator, path, true); + cache.getListenable().addListener( + new NodeCacheListener() + { + @Override + public void nodeChanged() throws Exception + { + ChildData currentData = cache.getCurrentData(); + if (currentData == null) { + final byte[] value = announcedPaths.get(path); + if (value != null) { + log.info("Node[%s] dropped, reinstating.", path); + createAnnouncement(path, value); + } + } + } + } + ); + + if (started) { + if (buildParentPath) { + createPath(parentPath, removeParentIfCreated); + } + startCache(cache); + listeners.put(path, cache); + } + } + } + } + + boolean created = false; + synchronized (toAnnounce) { + if (started) { + byte[] oldBytes = announcedPaths.putIfAbsent(path, bytes); + + if (oldBytes == null) { + created = true; + } else if (!Arrays.equals(oldBytes, bytes)) { + throw new IAE("Cannot reannounce different values under the same path"); + } + } + } + + if (created) { + try { + createAnnouncement(path, bytes); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + public void update(final String path, final byte[] bytes) + { + synchronized (toAnnounce) { + if (!started) { + // removeParentsIfCreated is not relevant for updates; use dummy value "false". + toUpdate.add(new Announceable(path, bytes, false)); + return; + } + } + + byte[] oldBytes = announcedPaths.get(path); + + if (oldBytes == null) { + throw new ISE("Cannot update a path[%s] that hasn't been announced!", path); + } + + synchronized (toAnnounce) { + try { + if (!Arrays.equals(oldBytes, bytes)) { + announcedPaths.put(path, bytes); + updateAnnouncement(path, bytes); + } + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private String createAnnouncement(final String path, byte[] value) throws Exception + { + return curator.create().compressed().withMode(CreateMode.EPHEMERAL).inBackground().forPath(path, value); + } + + private Stat updateAnnouncement(final String path, final byte[] value) throws Exception + { + return curator.setData().compressed().inBackground().forPath(path, value); + } + + /** + * Unannounces an announcement created at path. Note that if all announcements get removed, the Announcer + * will continue to have ZK watches on paths because clearing them out is a source of ugly race conditions. + *

+ * If you need to completely clear all the state of what is being watched and announced, stop() the Announcer. + * + * @param path the path to unannounce + */ + public void unannounce(String path) + { + log.info("unannouncing [%s]", path); + final byte[] value = announcedPaths.remove(path); + + if (value == null) { + log.error("Path[%s] not announced, cannot unannounce.", path); + return; + } + + try { + curator.inTransaction().delete().forPath(path).and().commit(); + } + catch (KeeperException.NoNodeException e) { + log.info("node[%s] didn't exist anyway...", path); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void startCache(NodeCache cache) + { + try { + cache.start(); + } + catch (Exception e) { + CloseQuietly.close(cache); + throw new RuntimeException(e); + } + } + + private void createPath(String parentPath, boolean removeParentsIfCreated) + { + try { + curator.create().creatingParentsIfNeeded().forPath(parentPath); + if (removeParentsIfCreated) { + parentsIBuilt.add(parentPath); + } + log.debug("Created parentPath[%s], %s remove on stop() called.", parentPath, removeParentsIfCreated ? "will" : "will not"); + } + catch (Exception e) { + log.error(e, "Problem creating parentPath[%s], someone else created it first?", parentPath); + } + } + + private static class Announceable + { + final String path; + final byte[] bytes; + final boolean removeParentsIfCreated; + + public Announceable(String path, byte[] bytes, boolean removeParentsIfCreated) + { + this.path = path; + this.bytes = bytes; + this.removeParentsIfCreated = removeParentsIfCreated; + } + } +} diff --git a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java index eaa5a36e0fa6..54062347c171 100644 --- a/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java +++ b/server/src/main/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncer.java @@ -24,7 +24,7 @@ import com.google.common.base.Throwables; import com.google.inject.Inject; import org.apache.curator.utils.ZKPaths; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeAnnouncer; import org.apache.druid.guice.annotations.Json; @@ -37,12 +37,12 @@ public class CuratorDruidNodeAnnouncer implements DruidNodeAnnouncer { private static final Logger log = new Logger(CuratorDruidNodeAnnouncer.class); - private final Announcer announcer; + private final NodeAnnouncer announcer; private final ZkPathsConfig config; private final ObjectMapper jsonMapper; @Inject - public CuratorDruidNodeAnnouncer(Announcer announcer, ZkPathsConfig config, @Json ObjectMapper jsonMapper) + public CuratorDruidNodeAnnouncer(NodeAnnouncer announcer, ZkPathsConfig config, @Json ObjectMapper jsonMapper) { this.announcer = announcer; this.config = config; diff --git a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java index bb58e694d31a..7885f5aa7041 100644 --- a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java +++ b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java @@ -24,6 +24,7 @@ import com.google.inject.Provides; import org.apache.curator.framework.CuratorFramework; import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.server.coordination.BatchDataSegmentAnnouncer; import org.apache.druid.server.coordination.CuratorDataSegmentServerAnnouncer; @@ -52,4 +53,11 @@ public Announcer getAnnouncer(CuratorFramework curator) { return new Announcer(curator, Execs.singleThreaded("Announcer-%s")); } + + @Provides + @ManageLifecycle + public NodeAnnouncer getNodeAnnouncer(CuratorFramework curator) + { + return new NodeAnnouncer(curator); + } } diff --git a/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java b/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java index 9f14b0d42240..c9f22d0eb4eb 100644 --- a/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java +++ b/server/src/main/java/org/apache/druid/query/lookup/LookupModule.java @@ -36,7 +36,7 @@ import com.sun.jersey.spi.container.ResourceFilters; import org.apache.curator.utils.ZKPaths; import org.apache.druid.common.utils.ServletResourceUtils; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.discovery.LookupNodeService; import org.apache.druid.guice.ExpressionModule; import org.apache.druid.guice.Jerseys; @@ -216,7 +216,7 @@ class LookupResourceListenerAnnouncer extends ListenerResourceAnnouncer { @Inject public LookupResourceListenerAnnouncer( - Announcer announcer, + NodeAnnouncer announcer, LookupListeningAnnouncerConfig lookupListeningAnnouncerConfig, @Self DruidNode node ) diff --git a/server/src/main/java/org/apache/druid/server/ZKPathsUtils.java b/server/src/main/java/org/apache/druid/server/ZKPathsUtils.java new file mode 100644 index 000000000000..9832abde27bf --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/ZKPathsUtils.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server; + +import org.apache.curator.utils.ZKPaths; + +public class ZKPathsUtils +{ + public static String getParentPath(String path) + { + return ZKPaths.getPathAndNode(path).getPath(); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java index 909f0c330e11..00ffef7cfce0 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java +++ b/server/src/main/java/org/apache/druid/server/coordination/CuratorDataSegmentServerAnnouncer.java @@ -24,7 +24,7 @@ import com.google.common.base.Throwables; import com.google.inject.Inject; import org.apache.curator.utils.ZKPaths; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.initialization.ZkPathsConfig; @@ -38,7 +38,7 @@ public class CuratorDataSegmentServerAnnouncer implements DataSegmentServerAnnou private final DruidServerMetadata server; private final ZkPathsConfig config; - private final Announcer announcer; + private final NodeAnnouncer announcer; private final ObjectMapper jsonMapper; private final Object lock = new Object(); @@ -49,7 +49,7 @@ public class CuratorDataSegmentServerAnnouncer implements DataSegmentServerAnnou public CuratorDataSegmentServerAnnouncer( DruidServerMetadata server, ZkPathsConfig config, - Announcer announcer, + NodeAnnouncer announcer, ObjectMapper jsonMapper ) { diff --git a/server/src/main/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncer.java b/server/src/main/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncer.java index b083f97dc707..573a6852e330 100644 --- a/server/src/main/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncer.java +++ b/server/src/main/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncer.java @@ -21,7 +21,7 @@ import com.google.common.base.Throwables; import org.apache.curator.utils.ZKPaths; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; @@ -42,11 +42,11 @@ public abstract class ListenerResourceAnnouncer private static final Logger LOG = new Logger(ListenerResourceAnnouncer.class); private final Object startStopSync = new Object(); private volatile boolean started = false; - private final Announcer announcer; + private final NodeAnnouncer announcer; private final String announcePath; public ListenerResourceAnnouncer( - Announcer announcer, + NodeAnnouncer announcer, ListeningAnnouncerConfig listeningAnnouncerConfig, String listener_key, HostAndPortWithScheme node @@ -60,7 +60,7 @@ public ListenerResourceAnnouncer( } ListenerResourceAnnouncer( - Announcer announcer, + NodeAnnouncer announcer, String announceBasePath, HostAndPortWithScheme node ) diff --git a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java index d07322c4a51c..54468a5a2d73 100644 --- a/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java +++ b/server/src/test/java/org/apache/druid/client/client/BatchServerInventoryViewTest.java @@ -40,6 +40,7 @@ import org.apache.druid.client.ServerView; import org.apache.druid.curator.PotentiallyGzippedCompressionProvider; import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; @@ -91,6 +92,7 @@ public class BatchServerInventoryViewTest private CuratorFramework cf; private ObjectMapper jsonMapper; private Announcer announcer; + private NodeAnnouncer nodeAnnouncer; private BatchDataSegmentAnnouncer segmentAnnouncer; private DataSegmentServerAnnouncer serverAnnouncer; private Set testSegments; @@ -124,6 +126,9 @@ public void setUp() throws Exception ); announcer.start(); + nodeAnnouncer = new NodeAnnouncer(cf); + nodeAnnouncer.start(); + DruidServerMetadata serverMetadata = new DruidServerMetadata( "id", "host", @@ -146,7 +151,7 @@ public String getBase() serverAnnouncer = new CuratorDataSegmentServerAnnouncer( serverMetadata, zkPathsConfig, - announcer, + nodeAnnouncer, jsonMapper ); serverAnnouncer.announce(); diff --git a/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java new file mode 100644 index 000000000000..c0881f3a0db2 --- /dev/null +++ b/server/src/test/java/org/apache/druid/curator/announcement/NodeAnnouncerTest.java @@ -0,0 +1,309 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.curator.announcement; + +import com.google.common.collect.Sets; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.api.CuratorListener; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.api.transaction.CuratorTransactionResult; +import org.apache.curator.test.KillSession; +import org.apache.curator.utils.ZKPaths; +import org.apache.druid.curator.CuratorTestBase; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.data.Stat; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +/** + */ +public class NodeAnnouncerTest extends CuratorTestBase +{ + private static final Logger log = new Logger(NodeAnnouncerTest.class); + private ExecutorService exec; + + @Before + public void setUp() throws Exception + { + setupServerAndCurator(); + exec = Execs.singleThreaded("test-announcer-sanity-%s"); + } + + @After + public void tearDown() + { + tearDownServerAndCurator(); + } + + @Test(timeout = 60_000L) + public void testSanity() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath1 = "/test1"; + final String testPath2 = "/somewhere/test2"; + announcer.announce(testPath1, billy); + + Assert.assertNull("/test1 does not exists", curator.checkExists().forPath(testPath1)); + Assert.assertNull("/somewhere/test2 does not exists", curator.checkExists().forPath(testPath2)); + + announcer.start(); + while (!announcer.getAddedPaths().contains("/test1")) { + Thread.sleep(100); + } + + try { + Assert.assertArrayEquals("/test1 has data", billy, curator.getData().decompressed().forPath(testPath1)); + Assert.assertNull("/somewhere/test2 still does not exist", curator.checkExists().forPath(testPath2)); + + announcer.announce(testPath2, billy); + + Assert.assertArrayEquals("/test1 still has data", billy, curator.getData().decompressed().forPath(testPath1)); + Assert.assertArrayEquals( + "/somewhere/test2 has data", + billy, + curator.getData().decompressed().forPath(testPath2) + ); + + final CountDownLatch latch = new CountDownLatch(1); + curator.getCuratorListenable().addListener( + new CuratorListener() + { + @Override + public void eventReceived(CuratorFramework client, CuratorEvent event) + { + if (event.getType() == CuratorEventType.CREATE && event.getPath().equals(testPath1)) { + latch.countDown(); + } + } + } + ); + final CuratorOp deleteOp = curator.transactionOp().delete().forPath(testPath1); + final Collection results = curator.transaction().forOperations(deleteOp); + Assert.assertEquals(1, results.size()); + final CuratorTransactionResult result = results.iterator().next(); + Assert.assertEquals(Code.OK.intValue(), result.getError()); // assert delete + + Assert.assertTrue("Wait for /test1 to be created", timing.forWaiting().awaitLatch(latch)); + + Assert.assertArrayEquals( + "expect /test1 data is restored", + billy, + curator.getData().decompressed().forPath(testPath1) + ); + Assert.assertArrayEquals( + "expect /somewhere/test2 is still there", + billy, + curator.getData().decompressed().forPath(testPath2) + ); + + announcer.unannounce(testPath1); + Assert.assertNull("expect /test1 unannounced", curator.checkExists().forPath(testPath1)); + Assert.assertArrayEquals( + "expect /somewhere/test2 is still still there", + billy, + curator.getData().decompressed().forPath(testPath2) + ); + } + finally { + announcer.stop(); + } + + Assert.assertNull("expect /test1 remains unannounced", curator.checkExists().forPath(testPath1)); + Assert.assertNull("expect /somewhere/test2 unannounced", curator.checkExists().forPath(testPath2)); + } + + @Test(timeout = 60_000L) + public void testSessionKilled() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + try { + curator.inTransaction().create().forPath("/somewhere").and().commit(); + announcer.start(); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath1 = "/test1"; + final String testPath2 = "/somewhere/test2"; + final Set paths = Sets.newHashSet(testPath1, testPath2); + announcer.announce(testPath1, billy); + announcer.announce(testPath2, billy); + + Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath1)); + Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath2)); + + final CountDownLatch latch = new CountDownLatch(1); + curator.getCuratorListenable().addListener( + new CuratorListener() + { + @Override + public void eventReceived(CuratorFramework client, CuratorEvent event) + { + if (event.getType() == CuratorEventType.CREATE) { + paths.remove(event.getPath()); + if (paths.isEmpty()) { + latch.countDown(); + } + } + } + } + ); + KillSession.kill(curator.getZookeeperClient().getZooKeeper(), server.getConnectString()); + + Assert.assertTrue(timing.forWaiting().awaitLatch(latch)); + + Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath1)); + Assert.assertArrayEquals(billy, curator.getData().decompressed().forPath(testPath2)); + + announcer.stop(); + + while ((curator.checkExists().forPath(testPath1) != null) || (curator.checkExists().forPath(testPath2) != null)) { + Thread.sleep(100); + } + + Assert.assertNull(curator.checkExists().forPath(testPath1)); + Assert.assertNull(curator.checkExists().forPath(testPath2)); + } + finally { + announcer.stop(); + } + } + + @Test + public void testCleansUpItsLittleTurdlings() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath = "/somewhere/test2"; + final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + + announcer.start(); + try { + Assert.assertNull(curator.checkExists().forPath(parent)); + + awaitAnnounce(announcer, testPath, billy, true); + + Assert.assertNotNull(curator.checkExists().forPath(parent)); + } + finally { + announcer.stop(); + } + + Assert.assertNull(curator.checkExists().forPath(parent)); + } + + @Test + public void testLeavesBehindTurdlingsThatAlreadyExisted() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath = "/somewhere/test2"; + final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + + curator.create().forPath(parent); + final Stat initialStat = curator.checkExists().forPath(parent); + + announcer.start(); + try { + Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + + awaitAnnounce(announcer, testPath, billy, true); + + Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + } + finally { + announcer.stop(); + } + + Assert.assertEquals(initialStat.getMzxid(), curator.checkExists().forPath(parent).getMzxid()); + } + + @Test + public void testLeavesBehindTurdlingsWhenToldTo() throws Exception + { + curator.start(); + curator.blockUntilConnected(); + NodeAnnouncer announcer = new NodeAnnouncer(curator); + + final byte[] billy = StringUtils.toUtf8("billy"); + final String testPath = "/somewhere/test2"; + final String parent = ZKPaths.getPathAndNode(testPath).getPath(); + + announcer.start(); + try { + Assert.assertNull(curator.checkExists().forPath(parent)); + + awaitAnnounce(announcer, testPath, billy, false); + + Assert.assertNotNull(curator.checkExists().forPath(parent)); + } + finally { + announcer.stop(); + } + + Assert.assertNotNull(curator.checkExists().forPath(parent)); + } + + private void awaitAnnounce( + final NodeAnnouncer announcer, + final String path, + final byte[] bytes, + boolean removeParentsIfCreated + ) throws InterruptedException + { + final CountDownLatch latch = new CountDownLatch(1); + curator.getCuratorListenable().addListener( + new CuratorListener() + { + @Override + public void eventReceived(CuratorFramework client, CuratorEvent event) + { + if (event.getType() == CuratorEventType.CREATE && event.getPath().equals(path)) { + latch.countDown(); + } + } + } + ); + announcer.announce(path, bytes, removeParentsIfCreated); + latch.await(); + } +} diff --git a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java index c2121a74888d..747925875751 100644 --- a/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java +++ b/server/src/test/java/org/apache/druid/curator/discovery/CuratorDruidNodeAnnouncerAndDiscoveryTest.java @@ -24,12 +24,11 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.curator.CuratorTestBase; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.discovery.DiscoveryDruidNode; import org.apache.druid.discovery.DruidNodeDiscovery; import org.apache.druid.discovery.NodeType; import org.apache.druid.jackson.DefaultObjectMapper; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.server.DruidNode; import org.apache.druid.server.initialization.ServerConfig; import org.apache.druid.server.initialization.ZkPathsConfig; @@ -67,10 +66,7 @@ public void testAnnouncementAndDiscovery() throws Exception curator.start(); curator.blockUntilConnected(); - Announcer announcer = new Announcer( - curator, - Execs.directExecutor() - ); + NodeAnnouncer announcer = new NodeAnnouncer(curator); announcer.start(); CuratorDruidNodeAnnouncer druidNodeAnnouncer = new CuratorDruidNodeAnnouncer( diff --git a/server/src/test/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncerTest.java b/server/src/test/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncerTest.java index 96e2a18bde7b..5d86e2e7e1c9 100644 --- a/server/src/test/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncerTest.java +++ b/server/src/test/java/org/apache/druid/server/listener/announcer/ListenerResourceAnnouncerTest.java @@ -21,21 +21,17 @@ import org.apache.curator.utils.ZKPaths; import org.apache.druid.curator.CuratorTestBase; -import org.apache.druid.curator.announcement.Announcer; +import org.apache.druid.curator.announcement.NodeAnnouncer; import org.apache.druid.java.util.common.StringUtils; -import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.segment.CloserRule; import org.apache.druid.server.http.HostAndPortWithScheme; import org.apache.druid.server.initialization.ZkPathsConfig; import org.easymock.EasyMock; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; import org.junit.Rule; import org.junit.Test; import java.io.Closeable; -import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; public class ListenerResourceAnnouncerTest extends CuratorTestBase @@ -45,19 +41,6 @@ public class ListenerResourceAnnouncerTest extends CuratorTestBase private final String announcePath = listeningAnnouncerConfig.getAnnouncementPath(listenerKey); @Rule public CloserRule closerRule = new CloserRule(true); - private ExecutorService executorService; - - @Before - public void setUp() - { - executorService = Execs.singleThreaded("listener-resource--%d"); - } - - @After - public void tearDown() - { - executorService.shutdownNow(); - } @Test public void testAnnouncerBehaves() throws Exception @@ -68,7 +51,7 @@ public void testAnnouncerBehaves() throws Exception closerRule.closeLater(curator); Assert.assertNotNull(curator.create().forPath("/druid")); Assert.assertTrue(curator.blockUntilConnected(10, TimeUnit.SECONDS)); - final Announcer announcer = new Announcer(curator, executorService); + final NodeAnnouncer announcer = new NodeAnnouncer(curator); final HostAndPortWithScheme node = HostAndPortWithScheme.fromString("localhost"); final ListenerResourceAnnouncer listenerResourceAnnouncer = new ListenerResourceAnnouncer( announcer, @@ -109,7 +92,7 @@ public void close() @Test public void testStartCorrect() { - final Announcer announcer = EasyMock.createStrictMock(Announcer.class); + final NodeAnnouncer announcer = EasyMock.createStrictMock(NodeAnnouncer.class); final HostAndPortWithScheme node = HostAndPortWithScheme.fromString("some_host"); final ListenerResourceAnnouncer resourceAnnouncer = new ListenerResourceAnnouncer(