diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index 603cb0b3803..c6e2ff45693 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -1558,16 +1558,16 @@ public boolean containsWatcher(String path, WatcherType type, Watcher watcher) { boolean containsWatcher = false; switch (type) { case Children: - containsWatcher = this.childWatches.containsWatcher(path, watcher); + containsWatcher = this.childWatches.containsWatcher(path, watcher, WatcherMode.STANDARD); break; case Data: - containsWatcher = this.dataWatches.containsWatcher(path, watcher); + containsWatcher = this.dataWatches.containsWatcher(path, watcher, WatcherMode.STANDARD); break; case Any: - if (this.childWatches.containsWatcher(path, watcher)) { + if (this.childWatches.containsWatcher(path, watcher, null)) { containsWatcher = true; } - if (this.dataWatches.containsWatcher(path, watcher)) { + if (this.dataWatches.containsWatcher(path, watcher, null)) { containsWatcher = true; } break; @@ -1579,16 +1579,16 @@ public boolean removeWatch(String path, WatcherType type, Watcher watcher) { boolean removed = false; switch (type) { case Children: - removed = this.childWatches.removeWatcher(path, watcher); + removed = this.childWatches.removeWatcher(path, watcher, WatcherMode.STANDARD); break; case Data: - removed = this.dataWatches.removeWatcher(path, watcher); + removed = this.dataWatches.removeWatcher(path, watcher, WatcherMode.STANDARD); break; case Any: - if (this.childWatches.removeWatcher(path, watcher)) { + if (this.childWatches.removeWatcher(path, watcher, null)) { removed = true; } - if (this.dataWatches.removeWatcher(path, watcher)) { + if (this.dataWatches.removeWatcher(path, watcher, null)) { removed = true; } break; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java index 4eea5eca03a..c3900eb64e9 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java @@ -19,6 +19,7 @@ package org.apache.zookeeper.server.watch; import java.io.PrintWriter; +import javax.annotation.Nullable; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; @@ -60,6 +61,21 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) */ boolean containsWatcher(String path, Watcher watcher); + /** + * Checks the specified watcher exists for the given path and mode. + * + * @param path znode path + * @param watcher watcher object reference + * @param watcherMode watcher mode, null for any mode + * @return true if the watcher exists, false otherwise + */ + default boolean containsWatcher(String path, Watcher watcher, @Nullable WatcherMode watcherMode) { + if (watcherMode == null || watcherMode == WatcherMode.DEFAULT_WATCHER_MODE) { + return containsWatcher(path, watcher); + } + throw new UnsupportedOperationException("persistent watch"); + } + /** * Removes the specified watcher for the given path. * @@ -70,6 +86,21 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) */ boolean removeWatcher(String path, Watcher watcher); + /** + * Removes the specified watcher for the given path and mode. + * + * @param path znode path + * @param watcher watcher object reference + * @param watcherMode watcher mode, null to remove all modes + * @return true if the watcher successfully removed, false otherwise + */ + default boolean removeWatcher(String path, Watcher watcher, WatcherMode watcherMode) { + if (watcherMode == null || watcherMode == WatcherMode.DEFAULT_WATCHER_MODE) { + return removeWatcher(path, watcher); + } + throw new UnsupportedOperationException("persistent watch"); + } + /** * The entry to remove the watcher when the cnxn is closed. * diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java index c85c3d84639..6e9a3a52bb0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java @@ -249,36 +249,69 @@ public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) { @Override public synchronized boolean containsWatcher(String path, Watcher watcher) { - Set list = watchTable.get(path); - return list != null && list.contains(watcher); + return containsWatcher(path, watcher, null); } @Override - public synchronized boolean removeWatcher(String path, Watcher watcher) { + public synchronized boolean containsWatcher(String path, Watcher watcher, WatcherMode watcherMode) { Map paths = watch2Paths.get(watcher); if (paths == null) { return false; } + WatchStats stats = paths.get(path); + return stats != null && (watcherMode == null || stats.hasMode(watcherMode)); + } + private WatchStats unwatch(String path, Watcher watcher, Map paths, Set watchers) { WatchStats stats = paths.remove(path); if (stats == null) { - return false; + return WatchStats.NONE; } - if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) { - --recursiveWatchQty; + if (paths.isEmpty()) { + watch2Paths.remove(watcher); + } + watchers.remove(watcher); + if (watchers.isEmpty()) { + watchTable.remove(path); } + return stats; + } - Set list = watchTable.get(path); - if (list == null || !list.remove(watcher)) { - LOG.warn("inconsistent watch table for path {}, {} not in watcher list", path, watcher); + @Override + public synchronized boolean removeWatcher(String path, Watcher watcher, WatcherMode watcherMode) { + Map paths = watch2Paths.get(watcher); + Set watchers = watchTable.get(path); + if (paths == null || watchers == null) { return false; } - if (list.isEmpty()) { - watchTable.remove(path); + WatchStats oldStats; + WatchStats newStats; + if (watcherMode != null) { + oldStats = paths.getOrDefault(path, WatchStats.NONE); + newStats = oldStats.removeMode(watcherMode); + if (newStats != WatchStats.NONE) { + if (newStats != oldStats) { + paths.put(path, newStats); + } + } else if (oldStats != WatchStats.NONE) { + unwatch(path, watcher, paths, watchers); + } + } else { + oldStats = unwatch(path, watcher, paths, watchers); + newStats = WatchStats.NONE; } - return true; + if (oldStats.hasMode(WatcherMode.PERSISTENT_RECURSIVE) && !newStats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) { + --recursiveWatchQty; + } + + return oldStats != newStats; + } + + @Override + public synchronized boolean removeWatcher(String path, Watcher watcher) { + return removeWatcher(path, watcher, null); } // VisibleForTesting diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java index 945b87cb0ae..a5de63596da 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java @@ -34,7 +34,10 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.apache.commons.collections4.CollectionUtils; @@ -847,11 +850,35 @@ public void testRemoveAllDataWatchesOnAPath(boolean useAsync) throws Exception { LOG.info("Adding data watcher {} on path {}", w2, "/node1"); assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches"); + BlockingDeque persistentEvents = new LinkedBlockingDeque<>(); + BlockingDeque recursiveEvents = new LinkedBlockingDeque<>(); + zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT); + zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE); + assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher"); removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.OK, useAsync); assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher"); assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal"); + + zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk1.setData("/node1/child", new byte[0], -1); + zk1.delete("/node1/child", -1); + zk1.setData("/node1", new byte[0], -1); + zk1.delete("/node1", -1); + + assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1"); + assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1"); + assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1"); + assertEvent(persistentEvents, EventType.NodeDeleted, "/node1"); + + assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child"); + assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1/child"); + assertEvent(recursiveEvents, EventType.NodeDeleted, "/node1/child"); + assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1"); + assertEvent(recursiveEvents, EventType.NodeDeleted, "/node1"); + + assertEquals(2, dWatchCount.getCount(), "Received watch notification after removal!"); } /** @@ -895,11 +922,27 @@ public void testRemoveAllChildWatchesOnAPath(boolean useAsync) throws Exception LOG.info("Adding child watcher {} on path {}", w2, "/node1"); assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches"); + BlockingDeque persistentEvents = new LinkedBlockingDeque<>(); + zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT); + assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is not a watcher"); removeAllWatches(zk2, "/node1", WatcherType.Children, false, Code.OK, useAsync); assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove child watcher"); assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is still a watcher after removal"); + + zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk1.setData("/node1/child", new byte[0], -1); + zk1.delete("/node1/child", -1); + zk1.setData("/node1", new byte[0], -1); + zk1.delete("/node1", -1); + + assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1"); + assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1"); + assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1"); + assertEvent(persistentEvents, EventType.NodeDeleted, "/node1"); + + assertEquals(2, cWatchCount.getCount(), "Received watch notification after removal!"); } /** @@ -953,10 +996,26 @@ public void testRemoveAllWatchesOnAPath(boolean useAsync) throws Exception { LOG.info("Adding data watcher {} on path {}", w2, "/node1"); assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches"); + BlockingDeque persistentEvents = new LinkedBlockingDeque<>(); + BlockingDeque recursiveEvents = new LinkedBlockingDeque<>(); + zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT); + zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE); + + assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Any), "Server session is not a watcher"); assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher"); + assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is not a watcher"); removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.OK, useAsync); assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher"); + assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Any), "Server session is still a watcher after removal"); assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal"); + assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is still a watcher after removal"); + + assertEvent(persistentEvents, EventType.PersistentWatchRemoved, "/node1"); + assertEvent(recursiveEvents, EventType.PersistentWatchRemoved, "/node1"); + + zk1.delete("/node1", -1); + assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS)); + assertNull(recursiveEvents.poll(10, TimeUnit.MILLISECONDS)); assertEquals(2, watchCount.getCount(), "Received watch notification after removal!"); } @@ -1090,4 +1149,14 @@ private boolean isServerSessionWatcher(long sessionId, String path, WatcherType return false; } + /** + * Asserts next event from queue has given event type and path. + */ + private void assertEvent(BlockingQueue events, Watcher.Event.EventType eventType, String path) + throws InterruptedException { + WatchedEvent event = events.poll(5, TimeUnit.SECONDS); + assertNotNull(event); + assertEquals(eventType, event.getType()); + assertEquals(path, event.getPath()); + } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java index f65391c497c..5f40ef3559d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java @@ -19,6 +19,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.util.ArrayList; @@ -38,6 +39,7 @@ import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ServerMetrics; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -342,6 +344,317 @@ public void testRemoveWatcherOnPath(String className) throws IOException { assertEquals(watchesAdded.get(), watchesRemoved.get() + manager.size()); } + /** + * Test add, contains and remove on generic watch manager. + */ + @ParameterizedTest + @MethodSource("data") + public void testAddRemoveWatcher(String className) throws IOException { + IWatchManager manager = getWatchManager(className); + Watcher watcher1 = new DumbWatcher(); + Watcher watcher2 = new DumbWatcher(); + + // given: add watcher1 to "/node1" + manager.addWatch("/node1", watcher1); + + // then: contains or remove should fail on mismatch path and watcher pair + assertFalse(manager.containsWatcher("/node1", watcher2)); + assertFalse(manager.containsWatcher("/node2", watcher1)); + assertFalse(manager.removeWatcher("/node1", watcher2)); + assertFalse(manager.removeWatcher("/node2", watcher1)); + + // then: contains or remove should succeed on matching path and watcher pair + assertTrue(manager.containsWatcher("/node1", watcher1)); + assertTrue(manager.removeWatcher("/node1", watcher1)); + + // then: contains or remove should fail on removed path and watcher pair + assertFalse(manager.containsWatcher("/node1", watcher1)); + assertFalse(manager.removeWatcher("/node1", watcher1)); + } + + /** + * Test containsWatcher on all pairs, and removeWatcher on mismatch pairs. + */ + @Test + public void testContainsMode() { + IWatchManager manager = new WatchManager(); + Watcher watcher1 = new DumbWatcher(); + Watcher watcher2 = new DumbWatcher(); + + // given: add watcher1 to "/node1" in persistent mode + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT)); + assertNotEquals(WatcherMode.PERSISTENT, WatcherMode.DEFAULT_WATCHER_MODE); + + // then: contains should succeed on watcher1 to "/node1" in persistent and any mode + assertTrue(manager.containsWatcher("/node1", watcher1)); + assertTrue(manager.containsWatcher("/node1", watcher1, null)); + assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + + // then: contains and remove should fail on mismatch watcher + assertFalse(manager.containsWatcher("/node1", watcher2)); + assertFalse(manager.containsWatcher("/node1", watcher2, null)); + assertFalse(manager.containsWatcher("/node1", watcher2, WatcherMode.STANDARD)); + assertFalse(manager.containsWatcher("/node1", watcher2, WatcherMode.PERSISTENT)); + assertFalse(manager.containsWatcher("/node1", watcher2, WatcherMode.PERSISTENT_RECURSIVE)); + assertFalse(manager.removeWatcher("/node1", watcher2)); + assertFalse(manager.removeWatcher("/node1", watcher2, null)); + assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.STANDARD)); + assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.PERSISTENT)); + assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.PERSISTENT_RECURSIVE)); + + // then: contains and remove should fail on mismatch path + assertFalse(manager.containsWatcher("/node2", watcher1)); + assertFalse(manager.containsWatcher("/node2", watcher1, null)); + assertFalse(manager.containsWatcher("/node2", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.containsWatcher("/node2", watcher1, WatcherMode.PERSISTENT)); + assertFalse(manager.containsWatcher("/node2", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + assertFalse(manager.removeWatcher("/node2", watcher1)); + assertFalse(manager.removeWatcher("/node2", watcher1, null)); + assertFalse(manager.removeWatcher("/node2", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.removeWatcher("/node2", watcher1, WatcherMode.PERSISTENT)); + assertFalse(manager.removeWatcher("/node2", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + + // then: contains and remove should fail on mismatch modes + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + + // when: add watcher1 to "/node1" in remaining modes + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD)); + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + + // then: contains should succeed on watcher to "/node1" in all modes + assertTrue(manager.containsWatcher("/node1", watcher1)); + assertTrue(manager.containsWatcher("/node1", watcher1, null)); + assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + } + + /** + * Test repeatedly {@link WatchManager#addWatch(String, Watcher, WatcherMode)}. + */ + @Test + public void testAddModeRepeatedly() { + IWatchManager manager = new WatchManager(); + Watcher watcher1 = new DumbWatcher(); + + // given: add watcher1 to "/node1" in all modes + manager.addWatch("/node1", watcher1, WatcherMode.STANDARD); + manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT); + manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE); + + // when: add watcher1 to "/node1" in these modes repeatedly + assertFalse(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT)); + assertFalse(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + + // then: contains and remove should work normally on watcher1 to "/node1" + assertTrue(manager.containsWatcher("/node1", watcher1)); + assertTrue(manager.containsWatcher("/node1", watcher1, null)); + assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD)); + + assertTrue(manager.containsWatcher("/node1", watcher1)); + assertTrue(manager.containsWatcher("/node1", watcher1, null)); + assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + + assertTrue(manager.containsWatcher("/node1", watcher1)); + assertTrue(manager.containsWatcher("/node1", watcher1, null)); + assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + + assertFalse(manager.containsWatcher("/node1", watcher1)); + assertFalse(manager.containsWatcher("/node1", watcher1, null)); + assertFalse(manager.removeWatcher("/node1", watcher1)); + assertFalse(manager.removeWatcher("/node1", watcher1, null)); + } + + /** + * Test {@link WatchManager#removeWatcher(String, Watcher, WatcherMode)} on one pair should not break others. + */ + @Test + public void testRemoveModeOne() { + IWatchManager manager = new WatchManager(); + Watcher watcher1 = new DumbWatcher(); + Watcher watcher2 = new DumbWatcher(); + + // given: add watcher1 to "/node1" and watcher2 to "/node2" in all modes + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD)); + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT)); + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.STANDARD)); + assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.PERSISTENT)); + assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE)); + + // when: remove one pair + assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD)); + + // then: contains and remove should succeed on other pairs + assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.STANDARD)); + assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.PERSISTENT)); + assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE)); + assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + assertTrue(manager.removeWatcher("/node2", watcher2, WatcherMode.STANDARD)); + assertTrue(manager.removeWatcher("/node2", watcher2, WatcherMode.PERSISTENT)); + assertTrue(manager.removeWatcher("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE)); + } + + /** + * Test {@link WatchManager#removeWatcher(String, Watcher, WatcherMode)} with {@code null} watcher mode. + */ + @Test + public void testRemoveModeAll() { + IWatchManager manager = new WatchManager(); + Watcher watcher1 = new DumbWatcher(); + + // given: add watcher1 to "/node1" in all modes + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD)); + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT)); + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + + // when: remove watcher1 using null watcher mode + assertTrue(manager.removeWatcher("/node1", watcher1, null)); + + // then: contains and remove should fail on watcher1 to "/node1" in all modes + assertFalse(manager.containsWatcher("/node1", watcher1)); + assertFalse(manager.containsWatcher("/node1", watcher1, null)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + assertFalse(manager.removeWatcher("/node1", watcher1)); + assertFalse(manager.removeWatcher("/node1", watcher1, null)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + + // given: add watcher1 to "/node1" in all modes + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD)); + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT)); + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + + // then: remove watcher1 without a mode should behave same to removing all modes + assertTrue(manager.removeWatcher("/node1", watcher1)); + + assertFalse(manager.containsWatcher("/node1", watcher1)); + assertFalse(manager.containsWatcher("/node1", watcher1, null)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + assertFalse(manager.removeWatcher("/node1", watcher1)); + assertFalse(manager.removeWatcher("/node1", watcher1, null)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + } + + /** + * Test {@link WatchManager#removeWatcher(String, Watcher)}. + */ + @Test + public void testRemoveModeAllDefault() { + IWatchManager manager = new WatchManager(); + Watcher watcher1 = new DumbWatcher(); + + // given: add watcher1 to "/node1" in all modes + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD)); + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT)); + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + + // then: remove watcher1 without a mode should behave same to removing all modes + assertTrue(manager.removeWatcher("/node1", watcher1)); + + assertFalse(manager.containsWatcher("/node1", watcher1)); + assertFalse(manager.containsWatcher("/node1", watcher1, null)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + assertFalse(manager.removeWatcher("/node1", watcher1)); + assertFalse(manager.removeWatcher("/node1", watcher1, null)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + } + + /** + * Test {@link WatchManager#removeWatcher(String, Watcher, WatcherMode)} all modes individually. + */ + @Test + public void testRemoveModeAllIndividually() { + IWatchManager manager = new WatchManager(); + Watcher watcher1 = new DumbWatcher(); + + // given: add watcher1 to "/node1" in all modes + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD)); + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT)); + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + + // when: remove all modes individually + assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertTrue(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + + // then: contains and remove should fail on watcher1 to "/node1" in all modes + assertFalse(manager.containsWatcher("/node1", watcher1)); + assertFalse(manager.containsWatcher("/node1", watcher1, null)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertFalse(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + assertFalse(manager.removeWatcher("/node1", watcher1)); + assertFalse(manager.removeWatcher("/node1", watcher1, null)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertFalse(manager.removeWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + } + + /** + * Test {@link WatchManager#removeWatcher(String, Watcher, WatcherMode)} on mismatch pair should break nothing. + */ + @Test + public void testRemoveModeMismatch() { + IWatchManager manager = new WatchManager(); + Watcher watcher1 = new DumbWatcher(); + Watcher watcher2 = new DumbWatcher(); + + // given: add watcher1 to "/node1" and watcher2 to "/node2" in all modes + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.STANDARD)); + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT)); + assertTrue(manager.addWatch("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.STANDARD)); + assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.PERSISTENT)); + assertTrue(manager.addWatch("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE)); + + // when: remove mismatch path and watcher pairs + assertFalse(manager.removeWatcher("/node1", watcher2)); + assertFalse(manager.removeWatcher("/node1", watcher2, null)); + assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.STANDARD)); + assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.PERSISTENT)); + assertFalse(manager.removeWatcher("/node1", watcher2, WatcherMode.PERSISTENT_RECURSIVE)); + + // then: no existing watching pairs should break + assertTrue(manager.containsWatcher("/node1", watcher1)); + assertTrue(manager.containsWatcher("/node1", watcher1, null)); + assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.STANDARD)); + assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT)); + assertTrue(manager.containsWatcher("/node1", watcher1, WatcherMode.PERSISTENT_RECURSIVE)); + assertTrue(manager.containsWatcher("/node2", watcher2)); + assertTrue(manager.containsWatcher("/node2", watcher2, null)); + assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.STANDARD)); + assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.PERSISTENT)); + assertTrue(manager.containsWatcher("/node2", watcher2, WatcherMode.PERSISTENT_RECURSIVE)); + } + /** * Concurrently add watch while close the watcher to simulate the * client connections closed on prod.