From 83d3ea92cbe3722b3de6f47baf97396b1e7979ac Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Sat, 10 Jun 2023 18:03:37 +0800 Subject: [PATCH 1/3] ZOOKEEPER-4472: Remove persistent watches individually ZOOKEEPER-4466 supports different watch modes one same path, but there are no corresponding `WatcherType`s for persistent watches. Client has to resort to `WatcherType.Any` to remove them. This could accidently interrupt other watches. This PR adds `WatcherType.Persistent` and `WatcherType.PersistentRecursive` to remove persistent watches individually. --- .../java/org/apache/zookeeper/Watcher.java | 7 +- .../org/apache/zookeeper/ZKWatchManager.java | 40 +- .../zookeeper/cli/RemoveWatchesCommand.java | 6 + .../org/apache/zookeeper/server/DataTree.java | 20 +- .../apache/zookeeper/RemoveWatchesTest.java | 574 ++++++++++++------ 5 files changed, 426 insertions(+), 221 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java index ab4b654880e..6347fa4569e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java @@ -191,6 +191,8 @@ public static EventType fromInt(int intValue) { enum WatcherType { Children(1), Data(2), + Persistent(4), + PersistentRecursive(5), Any(3); // Integer representation of value @@ -212,7 +214,10 @@ public static WatcherType fromInt(int intValue) { return WatcherType.Data; case 3: return WatcherType.Any; - + case 4: + return Persistent; + case 5: + return PersistentRecursive; default: throw new RuntimeException("Invalid integer value for conversion to WatcherType"); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java index 9da4249447a..514da01cf24 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZKWatchManager.java @@ -153,6 +153,18 @@ public Map> removeWatcher( } break; } + case Persistent: { + synchronized (persistentWatches) { + removedWatcher = removeWatches(persistentWatches, watcher, clientPath, local, rc, persistentWatchersToRem); + } + break; + } + case PersistentRecursive: { + synchronized (persistentRecursiveWatches) { + removedWatcher = removeWatches(persistentRecursiveWatches, watcher, clientPath, local, rc, persistentWatchersToRem); + } + break; + } case Any: { synchronized (childWatches) { removedWatcher = removeWatches(childWatches, watcher, clientPath, local, rc, childWatchersToRem); @@ -225,18 +237,6 @@ void containsWatcher(String path, Watcher watcher, Watcher.WatcherType watcherTy synchronized (childWatches) { containsWatcher = contains(path, watcher, childWatches); } - - synchronized (persistentWatches) { - boolean contains_temp = contains(path, watcher, - persistentWatches); - containsWatcher |= contains_temp; - } - - synchronized (persistentRecursiveWatches) { - boolean contains_temp = contains(path, watcher, - persistentRecursiveWatches); - containsWatcher |= contains_temp; - } break; } case Data: { @@ -248,17 +248,17 @@ void containsWatcher(String path, Watcher watcher, Watcher.WatcherType watcherTy boolean contains_temp = contains(path, watcher, existWatches); containsWatcher |= contains_temp; } - + break; + } + case Persistent: { synchronized (persistentWatches) { - boolean contains_temp = contains(path, watcher, - persistentWatches); - containsWatcher |= contains_temp; + containsWatcher |= contains(path, watcher, persistentWatches); } - + break; + } + case PersistentRecursive: { synchronized (persistentRecursiveWatches) { - boolean contains_temp = contains(path, watcher, - persistentRecursiveWatches); - containsWatcher |= contains_temp; + containsWatcher |= contains(path, watcher, persistentRecursiveWatches); } break; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/RemoveWatchesCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/RemoveWatchesCommand.java index ddaf15fba5c..748f0c2f2b0 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/cli/RemoveWatchesCommand.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/cli/RemoveWatchesCommand.java @@ -37,6 +37,8 @@ public class RemoveWatchesCommand extends CliCommand { static { options.addOption("c", false, "child watcher type"); options.addOption("d", false, "data watcher type"); + options.addOption("p", false, "persistent watcher type"); + options.addOption("r", false, "persistent recursive watcher type"); options.addOption("a", false, "any watcher type"); options.addOption("l", false, "remove locally when there is no server connection"); } @@ -70,6 +72,10 @@ public boolean exec() throws CliWrapperException, MalformedPathException { wtype = WatcherType.Children; } else if (cl.hasOption("d")) { wtype = WatcherType.Data; + } else if (cl.hasOption("p")) { + wtype = WatcherType.Persistent; + } else if (cl.hasOption("r")) { + wtype = WatcherType.PersistentRecursive; } else if (cl.hasOption("a")) { wtype = WatcherType.Any; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index c6e2ff45693..1f0e7a74974 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -1563,11 +1563,16 @@ public boolean containsWatcher(String path, WatcherType type, Watcher watcher) { case Data: containsWatcher = this.dataWatches.containsWatcher(path, watcher, WatcherMode.STANDARD); break; + case Persistent: + containsWatcher = this.dataWatches.containsWatcher(path, watcher, WatcherMode.PERSISTENT); + break; + case PersistentRecursive: + containsWatcher = this.dataWatches.containsWatcher(path, watcher, WatcherMode.PERSISTENT_RECURSIVE); + break; case Any: if (this.childWatches.containsWatcher(path, watcher, null)) { containsWatcher = true; - } - if (this.dataWatches.containsWatcher(path, watcher, null)) { + } else if (this.dataWatches.containsWatcher(path, watcher, null)) { containsWatcher = true; } break; @@ -1584,6 +1589,17 @@ public boolean removeWatch(String path, WatcherType type, Watcher watcher) { case Data: removed = this.dataWatches.removeWatcher(path, watcher, WatcherMode.STANDARD); break; + case Persistent: + if (this.childWatches.removeWatcher(path, watcher, WatcherMode.PERSISTENT)) { + removed = true; + } + if (this.dataWatches.removeWatcher(path, watcher, WatcherMode.PERSISTENT)) { + removed = true; + } + break; + case PersistentRecursive: + removed = this.dataWatches.removeWatcher(path, watcher, WatcherMode.PERSISTENT_RECURSIVE); + break; case Any: if (this.childWatches.removeWatcher(path, watcher, null)) { removed = true; diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java index a5de63596da..e529c533631 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java @@ -30,6 +30,7 @@ import static org.mockito.Mockito.spy; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -45,6 +46,7 @@ import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.Watcher.WatcherType; import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.test.ClientBase; import org.junit.jupiter.api.AfterEach; @@ -97,9 +99,16 @@ private void removeWatches( MyCallback c1 = new MyCallback(rc.intValue(), path); zk.removeWatches(path, watcher, watcherType, local, c1, null); assertTrue(c1.matches(), "Didn't succeeds removeWatch operation"); - if (KeeperException.Code.OK.intValue() != c1.rc) { - KeeperException ke = KeeperException.create(KeeperException.Code.get(c1.rc)); - throw ke; + if (rc.intValue() != c1.rc) { + throw KeeperException.create(KeeperException.Code.get(c1.rc)); + } + } else if (rc != Code.OK) { + try { + zk.removeWatches(path, watcher, watcherType, local); + fail("expect exception code " + rc); + } catch (KeeperException ex) { + assertEquals(rc, ex.code()); + assertEquals(path, ex.getPath()); } } else { zk.removeWatches(path, watcher, watcherType, local); @@ -118,15 +127,50 @@ private void removeAllWatches( MyCallback c1 = new MyCallback(rc.intValue(), path); zk.removeAllWatches(path, watcherType, local, c1, null); assertTrue(c1.matches(), "Didn't succeeds removeWatch operation"); - if (KeeperException.Code.OK.intValue() != c1.rc) { - KeeperException ke = KeeperException.create(KeeperException.Code.get(c1.rc)); - throw ke; + if (rc.intValue() != c1.rc) { + throw KeeperException.create(KeeperException.Code.get(c1.rc)); + } + } else if (rc != Code.OK) { + try { + zk.removeAllWatches(path, watcherType, local); + fail("expect exception code " + rc); + } catch (KeeperException ex) { + assertEquals(rc, ex.code()); + assertEquals(path, ex.getPath()); } } else { zk.removeAllWatches(path, watcherType, local); } } + private void assertWatchers(ZooKeeper zk, String path, WatcherType... watcherTypes) { + for (WatcherType watcherType : watcherTypes) { + String msg = String.format("expect watcher for path %s and type %s", path, watcherType); + assertTrue(isServerSessionWatcher(zk.getSessionId(), path, watcherType), msg); + } + } + + private void assertNoWatchers(ZooKeeper zk, String path, WatcherType... watcherTypes) { + for (WatcherType watcherType : watcherTypes) { + String msg = String.format("expect no watcher for path %s and type %s", path, watcherType); + assertFalse(isServerSessionWatcher(zk.getSessionId(), path, watcherType), msg); + } + } + + private void assertWatchersExcept(ZooKeeper zk, String path, WatcherType... watcherTypes) { + List excludes = Arrays.asList(watcherTypes); + for (WatcherType watcherType : WatcherType.values()) { + if (watcherType == WatcherType.Any) { + continue; + } + if (excludes.contains(watcherType)) { + assertNoWatchers(zk, path, watcherType); + } else { + assertWatchers(zk, path, watcherType); + } + } + } + /** * Test verifies removal of single watcher when there is server connection */ @@ -338,6 +382,96 @@ public void testRemoveAllChildWatchers(boolean useAsync) throws IOException, Int assertTrue(events.contains(EventType.NodeDataChanged), "Didn't get NodeDataChanged event"); } + /** + * Test verifies removing all watcher with WatcherType.Persistent. + * + *

All other watchers shouldn't be removed. + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @Timeout(value = 90) + public void testRemoveAllPersistentWatchers(boolean useAsync) throws InterruptedException, KeeperException { + zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + BlockingDeque persistentEvents1 = new LinkedBlockingDeque<>(); + BlockingDeque persistentEvents2 = new LinkedBlockingDeque<>(); + + Watcher persistentWatcher1 = persistentEvents1::add; + Watcher persistentWatcher2 = persistentEvents2::add; + zk2.addWatch("/node1", persistentWatcher1, AddWatchMode.PERSISTENT); + zk2.addWatch("/node1", persistentWatcher2, AddWatchMode.PERSISTENT); + + BlockingDeque dataEvents = new LinkedBlockingDeque<>(); + BlockingDeque childrenEvents = new LinkedBlockingDeque<>(); + BlockingDeque recursiveEvents = new LinkedBlockingDeque<>(); + zk2.getData("/node1", dataEvents::add, null); + zk2.getChildren("/node1", childrenEvents::add); + zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE); + + removeWatches(zk2, "/node1", persistentWatcher1, WatcherType.Persistent, false, Code.OK, useAsync); + removeWatches(zk2, "/node1", persistentWatcher2, WatcherType.Persistent, false, Code.OK, useAsync); + removeWatches(zk2, "/node1", persistentWatcher1, WatcherType.Data, false, Code.NOWATCHER, useAsync); + removeWatches(zk2, "/node1", persistentWatcher2, WatcherType.Data, false, Code.NOWATCHER, useAsync); + removeWatches(zk2, "/node1", persistentWatcher1, WatcherType.Children, false, Code.NOWATCHER, useAsync); + removeWatches(zk2, "/node1", persistentWatcher2, WatcherType.Children, false, Code.NOWATCHER, useAsync); + removeWatches(zk2, "/node1", persistentWatcher1, WatcherType.PersistentRecursive, false, Code.NOWATCHER, useAsync); + removeWatches(zk2, "/node1", persistentWatcher2, WatcherType.PersistentRecursive, false, Code.NOWATCHER, useAsync); + + zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk1.setData("/node1", null, -1); + + assertEvent(persistentEvents1, EventType.PersistentWatchRemoved, "/node1"); + assertEvent(persistentEvents2, EventType.PersistentWatchRemoved, "/node1"); + assertEvent(dataEvents, EventType.NodeDataChanged, "/node1"); + assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1"); + assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/node2"); + assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1"); + } + + /** + * Test verifies removing all watcher with WatcherType.PersistentRecursive. + * + *

All other watchers shouldn't be removed + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @Timeout(value = 90) + public void testRemoveAllPersistentRecursiveWatchers(boolean useAsync) throws InterruptedException, KeeperException { + zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + BlockingDeque recursiveEvents1 = new LinkedBlockingDeque<>(); + BlockingDeque recursiveEvents2 = new LinkedBlockingDeque<>(); + + Watcher recursiveWatcher1 = recursiveEvents1::add; + Watcher recursiveWatcher2 = recursiveEvents2::add; + zk2.addWatch("/node1", recursiveWatcher1, AddWatchMode.PERSISTENT_RECURSIVE); + zk2.addWatch("/node1", recursiveWatcher2, AddWatchMode.PERSISTENT_RECURSIVE); + + BlockingDeque dataEvents = new LinkedBlockingDeque<>(); + BlockingDeque childrenEvents = new LinkedBlockingDeque<>(); + BlockingDeque persistentEvents = new LinkedBlockingDeque<>(); + zk2.getData("/node1", dataEvents::add, null); + zk2.getChildren("/node1", childrenEvents::add); + zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT); + + removeWatches(zk2, "/node1", recursiveWatcher1, WatcherType.PersistentRecursive, false, Code.OK, useAsync); + removeWatches(zk2, "/node1", recursiveWatcher2, WatcherType.PersistentRecursive, false, Code.OK, useAsync); + removeWatches(zk2, "/node1", recursiveWatcher1, WatcherType.Data, false, Code.NOWATCHER, useAsync); + removeWatches(zk2, "/node1", recursiveWatcher2, WatcherType.Data, false, Code.NOWATCHER, useAsync); + removeWatches(zk2, "/node1", recursiveWatcher1, WatcherType.Children, false, Code.NOWATCHER, useAsync); + removeWatches(zk2, "/node1", recursiveWatcher2, WatcherType.Children, false, Code.NOWATCHER, useAsync); + removeWatches(zk2, "/node1", recursiveWatcher1, WatcherType.Persistent, false, Code.NOWATCHER, useAsync); + removeWatches(zk2, "/node1", recursiveWatcher2, WatcherType.Persistent, false, Code.NOWATCHER, useAsync); + + assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved, "/node1"); + assertEvent(recursiveEvents2, EventType.PersistentWatchRemoved, "/node1"); + + zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk1.setData("/node1", "test".getBytes(), -1); + + assertEvent(dataEvents, EventType.NodeDataChanged, "/node1"); + assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1"); + assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1"); + assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1"); + } /** * Test verifies given watcher doesn't exists! */ @@ -360,30 +494,10 @@ public void testNoWatcherException(boolean useAsync) throws IOException, Interru // New Watcher which will be used for removal MyWatcher w3 = new MyWatcher("/node1", 2); - try { - removeWatches(zk2, "/node1", w3, WatcherType.Any, false, Code.NOWATCHER, useAsync); - fail("Should throw exception as given watcher doesn't exists"); - } catch (KeeperException.NoWatcherException nwe) { - // expected - } - try { - removeWatches(zk2, "/node1", w3, WatcherType.Children, false, Code.NOWATCHER, useAsync); - fail("Should throw exception as given watcher doesn't exists"); - } catch (KeeperException.NoWatcherException nwe) { - // expected - } - try { - removeWatches(zk2, "/node1", w3, WatcherType.Data, false, Code.NOWATCHER, useAsync); - fail("Should throw exception as given watcher doesn't exists"); - } catch (KeeperException.NoWatcherException nwe) { - // expected - } - try { - removeWatches(zk2, "/nonexists", w3, WatcherType.Data, false, Code.NOWATCHER, useAsync); - fail("Should throw exception as given watcher doesn't exists"); - } catch (KeeperException.NoWatcherException nwe) { - // expected - } + removeWatches(zk2, "/node1", w3, WatcherType.Any, false, Code.NOWATCHER, useAsync); + removeWatches(zk2, "/node1", w3, WatcherType.Children, false, Code.NOWATCHER, useAsync); + removeWatches(zk2, "/node1", w3, WatcherType.Data, false, Code.NOWATCHER, useAsync); + removeWatches(zk2, "/nonexists", w3, WatcherType.Data, false, Code.NOWATCHER, useAsync); } /** @@ -461,12 +575,7 @@ public void testRemoveWatcherWhenNoConnection(boolean useAsync) throws Exception removeWatches(zk2, "/node1", w2, WatcherType.Any, true, Code.OK, useAsync); assertTrue(w2.matches(), "Didn't remove child watcher"); assertFalse(w1.matches(), "Shouldn't remove data watcher"); - try { - removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.CONNECTIONLOSS, useAsync); - fail("Should throw exception as last watch removal requires server connection"); - } catch (KeeperException.ConnectionLossException nwe) { - // expected - } + removeWatches(zk2, "/node1", w1, WatcherType.Any, false, Code.CONNECTIONLOSS, useAsync); assertFalse(w1.matches(), "Shouldn't remove data watcher"); // when local=true, here if connection not available, simply removes @@ -682,25 +791,17 @@ public void testChRootRemoveWatcher(boolean useAsync) throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) @Timeout(value = 90) - public void testNoWatcherServerException(boolean useAsync) throws InterruptedException, IOException, TimeoutException { + public void testNoWatcherServerException(boolean useAsync) throws KeeperException, InterruptedException, IOException, TimeoutException { CountdownWatcher watcher = new CountdownWatcher(); ZooKeeper zk = spy(new ZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher)); MyWatchManager watchManager = new MyWatchManager(false, watcher); doReturn(watchManager).when(zk).getWatchManager(); - boolean nw = false; watcher.waitForConnected(CONNECTION_TIMEOUT); - try { - zk.removeWatches("/nowatchhere", watcher, WatcherType.Data, false); - } catch (KeeperException nwe) { - if (nwe.code().intValue() == Code.NOWATCHER.intValue()) { - nw = true; - } - } + removeWatches(zk, "/nowatchhere", watcher, WatcherType.Data, false, Code.NOWATCHER, useAsync); assertThat("Server didn't return NOWATCHER", watchManager.lastReturnCode, is(Code.NOWATCHER.intValue())); - assertThat("NoWatcherException didn't happen", nw, is(true)); } /** @@ -711,12 +812,7 @@ public void testNoWatcherServerException(boolean useAsync) throws InterruptedExc @Timeout(value = 90) public void testRemoveAllNoWatcherException(boolean useAsync) throws IOException, InterruptedException, KeeperException { zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - try { - removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.NOWATCHER, useAsync); - fail("Should throw exception as given watcher doesn't exists"); - } catch (KeeperException.NoWatcherException nwe) { - // expected - } + removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.NOWATCHER, useAsync); } /** @@ -810,213 +906,287 @@ public void testRemoveWhenMultipleChildWatchesOnAPath(boolean useAsync) throws E } /** - * Test verifies WatcherType.Data - removes only the configured data watcher - * function + * Test verifies {@link WatcherType#Persistent} - removes only the configured watcher function + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @Timeout(value = 90) + public void testRemoveWhenMultiplePersistentWatchesOnAPath(boolean useAsync) throws Exception { + zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + BlockingDeque persistentEvents1 = new LinkedBlockingDeque<>(); + BlockingDeque persistentEvents2 = new LinkedBlockingDeque<>(); + Watcher w1 = persistentEvents1::add; + // Add multiple persistent watches + zk2.addWatch("/node1", w1, AddWatchMode.PERSISTENT); + zk2.addWatch("/node1", persistentEvents2::add, AddWatchMode.PERSISTENT); + + removeWatches(zk2, "/node1", w1, WatcherType.Persistent, false, Code.OK, useAsync); + assertEvent(persistentEvents1, EventType.PersistentWatchRemoved, "/node1"); + + zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + assertEvent(persistentEvents2, EventType.NodeChildrenChanged, "/node1"); + assertNoEvent(persistentEvents1); + } + + /** + * Test verifies {@link WatcherType#PersistentRecursive} - removes only the configured watcher function + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @Timeout(value = 90) + public void testRemoveWhenMultiplePersistentRecursiveWatchesOnAPath(boolean useAsync) throws Exception { + zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + BlockingDeque recursiveEvents1 = new LinkedBlockingDeque<>(); + BlockingDeque recursiveEvents2 = new LinkedBlockingDeque<>(); + Watcher w1 = recursiveEvents1::add; + // Add multiple persistent recursive watches + zk2.addWatch("/node1", w1, AddWatchMode.PERSISTENT_RECURSIVE); + zk2.addWatch("/node1", recursiveEvents2::add, AddWatchMode.PERSISTENT_RECURSIVE); + + removeWatches(zk2, "/node1", w1, WatcherType.PersistentRecursive, false, Code.OK, useAsync); + assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved, "/node1"); + + zk1.create("/node1/node2", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + assertEvent(recursiveEvents2, EventType.NodeCreated, "/node1/node2"); + assertNoEvent(recursiveEvents1); + } + + /** + * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Data}. + * + *

All other watcher types shouldn't be removed. */ @ParameterizedTest @ValueSource(booleans = {true, false}) @Timeout(value = 90) public void testRemoveAllDataWatchesOnAPath(boolean useAsync) throws Exception { zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - final CountDownLatch dWatchCount = new CountDownLatch(2); - final CountDownLatch rmWatchCount = new CountDownLatch(2); - Watcher w1 = event -> { - switch (event.getType()) { - case DataWatchRemoved: - rmWatchCount.countDown(); - break; - case NodeDataChanged: - dWatchCount.countDown(); - break; - default: - break; - } - }; - Watcher w2 = event -> { - switch (event.getType()) { - case DataWatchRemoved: - rmWatchCount.countDown(); - break; - case NodeDataChanged: - dWatchCount.countDown(); - break; - default: - break; - } - }; + + BlockingDeque dataEvents1 = new LinkedBlockingDeque<>(); + BlockingDeque dataEvents2 = new LinkedBlockingDeque<>(); // Add multiple data watches - LOG.info("Adding data watcher {} on path {}", w1, "/node1"); - assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); - LOG.info("Adding data watcher {} on path {}", w2, "/node1"); - assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches"); + zk2.getData("/node1", dataEvents1::add, null); + zk2.getData("/node1", dataEvents2::add, null); + BlockingDeque childrenEvents = new LinkedBlockingDeque<>(); BlockingDeque persistentEvents = new LinkedBlockingDeque<>(); BlockingDeque recursiveEvents = new LinkedBlockingDeque<>(); + zk2.getChildren("/node1", childrenEvents::add); zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT); zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE); - assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher"); + assertWatchers(zk2, "/node1", WatcherType.values()); removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.OK, useAsync); - assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher"); - - assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal"); + assertEvent(dataEvents1, EventType.DataWatchRemoved, "/node1"); + assertEvent(dataEvents2, EventType.DataWatchRemoved, "/node1"); + assertWatchersExcept(zk2, "/node1", WatcherType.Data); zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk1.setData("/node1/child", new byte[0], -1); - zk1.delete("/node1/child", -1); - zk1.setData("/node1", new byte[0], -1); - zk1.delete("/node1", -1); + zk1.setData("/node1", null, -1); + + assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1"); - assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1"); assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1"); assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1"); - assertEvent(persistentEvents, EventType.NodeDeleted, "/node1"); assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child"); - assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1/child"); - assertEvent(recursiveEvents, EventType.NodeDeleted, "/node1/child"); assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1"); - assertEvent(recursiveEvents, EventType.NodeDeleted, "/node1"); - assertEquals(2, dWatchCount.getCount(), "Received watch notification after removal!"); + assertNoEvent(dataEvents1); + assertNoEvent(dataEvents2); } /** - * Test verifies WatcherType.Children - removes only the configured child - * watcher function + * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Children}. + * + *

All other watcher types shouldn't be removed. */ @ParameterizedTest @ValueSource(booleans = {true, false}) @Timeout(value = 90) public void testRemoveAllChildWatchesOnAPath(boolean useAsync) throws Exception { zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - final CountDownLatch cWatchCount = new CountDownLatch(2); - final CountDownLatch rmWatchCount = new CountDownLatch(2); - Watcher w1 = event -> { - switch (event.getType()) { - case ChildWatchRemoved: - rmWatchCount.countDown(); - break; - case NodeChildrenChanged: - cWatchCount.countDown(); - break; - default: - break; - } - }; - Watcher w2 = event -> { - switch (event.getType()) { - case ChildWatchRemoved: - rmWatchCount.countDown(); - break; - case NodeChildrenChanged: - cWatchCount.countDown(); - break; - default: - break; - } - }; + + BlockingDeque childrenEvents1 = new LinkedBlockingDeque<>(); + BlockingDeque childrenEvents2 = new LinkedBlockingDeque<>(); // Add multiple child watches - LOG.info("Adding child watcher {} on path {}", w1, "/node1"); - assertEquals(0, zk2.getChildren("/node1", w1).size(), "Didn't set child watches"); - LOG.info("Adding child watcher {} on path {}", w2, "/node1"); - assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches"); + zk2.getChildren("/node1", childrenEvents1::add); + zk2.getChildren("/node1", childrenEvents2::add); + BlockingDeque dataEvents = new LinkedBlockingDeque<>(); BlockingDeque persistentEvents = new LinkedBlockingDeque<>(); + BlockingDeque recursiveEvents = new LinkedBlockingDeque<>(); + zk2.getData("/node1", dataEvents::add, null); zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT); + zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE); - assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is not a watcher"); + assertWatchers(zk2, "/node1", WatcherType.values()); removeAllWatches(zk2, "/node1", WatcherType.Children, false, Code.OK, useAsync); - assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove child watcher"); - - assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is still a watcher after removal"); + assertEvent(childrenEvents1, EventType.ChildWatchRemoved, "/node1"); + assertEvent(childrenEvents2, EventType.ChildWatchRemoved, "/node1"); + assertWatchersExcept(zk2, "/node1", WatcherType.Children); zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk1.setData("/node1/child", new byte[0], -1); - zk1.delete("/node1/child", -1); - zk1.setData("/node1", new byte[0], -1); - zk1.delete("/node1", -1); + zk1.setData("/node1", null, -1); + assertEvent(dataEvents, EventType.NodeDataChanged, "/node1"); assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1"); + assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1"); + assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child"); + assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1"); + + assertNull(childrenEvents1.poll(10, TimeUnit.MILLISECONDS)); + assertNull(childrenEvents2.poll(10, TimeUnit.MILLISECONDS)); + } + + /** + * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Persistent}. + * + *

All other watcher types shouldn't be removed. + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @Timeout(value = 90) + public void testRemoveAllPersistentWatchesOnAPath(boolean useAsync) throws Exception { + zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + BlockingDeque persistentEvents1 = new LinkedBlockingDeque<>(); + BlockingDeque persistentEvents2 = new LinkedBlockingDeque<>(); + // Add multiple persistent watches + zk2.addWatch("/node1", persistentEvents1::add, AddWatchMode.PERSISTENT); + zk2.addWatch("/node1", persistentEvents2::add, AddWatchMode.PERSISTENT); + + BlockingDeque dataEvents = new LinkedBlockingDeque<>(); + BlockingDeque childrenEvents = new LinkedBlockingDeque<>(); + BlockingDeque recursiveEvents = new LinkedBlockingDeque<>(); + zk2.getData("/node1", dataEvents::add, null); + zk2.getChildren("/node1", childrenEvents::add, null); + zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE); + + assertWatchers(zk2, "/node1", WatcherType.values()); + removeAllWatches(zk2, "/node1", WatcherType.Persistent, false, Code.OK, useAsync); + assertEvent(persistentEvents1, EventType.PersistentWatchRemoved, "/node1"); + assertEvent(persistentEvents2, EventType.PersistentWatchRemoved, "/node1"); + assertWatchersExcept(zk2, "/node1", WatcherType.Persistent); + + zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk1.setData("/node1", null, -1); + + assertEvent(dataEvents, EventType.NodeDataChanged, "/node1"); + assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1"); + assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child1"); + assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1"); + + assertNull(persistentEvents1.poll(10, TimeUnit.MILLISECONDS)); + assertNull(persistentEvents2.poll(10, TimeUnit.MILLISECONDS)); + } + + /** + * Test verifies {@link OpCode#removeWatches} {@link WatcherType#PersistentRecursive}. + * + *

All other watcher types shouldn't be removed. + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @Timeout(value = 90) + public void testRemoveAllPersistentRecursiveWatchesOnAPath(boolean useAsync) throws Exception { + zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + BlockingDeque recursiveEvents1 = new LinkedBlockingDeque<>(); + BlockingDeque recursiveEvents2 = new LinkedBlockingDeque<>(); + // Add multiple persistent recursive watches + zk2.addWatch("/node1", recursiveEvents1::add, AddWatchMode.PERSISTENT_RECURSIVE); + zk2.addWatch("/node1", recursiveEvents2::add, AddWatchMode.PERSISTENT_RECURSIVE); + + BlockingDeque dataEvents = new LinkedBlockingDeque<>(); + BlockingDeque childrenEvents = new LinkedBlockingDeque<>(); + BlockingDeque persistentEvents = new LinkedBlockingDeque<>(); + zk2.getData("/node1", dataEvents::add, null); + zk2.getChildren("/node1", childrenEvents::add, null); + zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT); + + assertWatchers(zk2, "/node1", WatcherType.values()); + removeAllWatches(zk2, "/node1", WatcherType.PersistentRecursive, false, Code.OK, useAsync); + assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved, "/node1"); + assertEvent(recursiveEvents2, EventType.PersistentWatchRemoved, "/node1"); + assertWatchersExcept(zk2, "/node1", WatcherType.PersistentRecursive); + + zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk1.setData("/node1", null, -1); + + assertEvent(dataEvents, EventType.NodeDataChanged, "/node1"); + assertEvent(childrenEvents, EventType.NodeChildrenChanged, "/node1"); assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1"); assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1"); - assertEvent(persistentEvents, EventType.NodeDeleted, "/node1"); - assertEquals(2, cWatchCount.getCount(), "Received watch notification after removal!"); + assertNull(recursiveEvents1.poll(10, TimeUnit.MILLISECONDS)); + assertNull(recursiveEvents2.poll(10, TimeUnit.MILLISECONDS)); } /** - * Test verifies WatcherType.Any - removes all the configured child,data - * watcher functions + * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Any}. + * + *

All watcher types should be removed. */ @ParameterizedTest @ValueSource(booleans = {true, false}) @Timeout(value = 90) public void testRemoveAllWatchesOnAPath(boolean useAsync) throws Exception { zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - final CountDownLatch watchCount = new CountDownLatch(2); - final CountDownLatch rmWatchCount = new CountDownLatch(4); - Watcher w1 = event -> { - switch (event.getType()) { - case ChildWatchRemoved: - case DataWatchRemoved: - rmWatchCount.countDown(); - break; - case NodeChildrenChanged: - case NodeDataChanged: - watchCount.countDown(); - break; - default: - break; - } - }; - Watcher w2 = event -> { - switch (event.getType()) { - case ChildWatchRemoved: - case DataWatchRemoved: - rmWatchCount.countDown(); - break; - case NodeChildrenChanged: - case NodeDataChanged: - watchCount.countDown(); - break; - default: - break; - } - }; + // Add multiple child watches - LOG.info("Adding child watcher {} on path {}", w1, "/node1"); - assertEquals(0, zk2.getChildren("/node1", w1).size(), "Didn't set child watches"); - LOG.info("Adding child watcher {} on path {}", w2, "/node1"); - assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches"); + BlockingDeque childEvents1 = new LinkedBlockingDeque<>(); + BlockingDeque childEvents2 = new LinkedBlockingDeque<>(); + zk2.getChildren("/node1", childEvents1::add); + zk2.getChildren("/node1", childEvents2::add); // Add multiple data watches - LOG.info("Adding data watcher {} on path {}", w1, "/node1"); - assertNotNull(zk2.exists("/node1", w1), "Didn't set data watches"); - LOG.info("Adding data watcher {} on path {}", w2, "/node1"); - assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches"); + BlockingDeque dataEvents1 = new LinkedBlockingDeque<>(); + BlockingDeque dataEvents2 = new LinkedBlockingDeque<>(); + zk2.getData("/node1", dataEvents1::add, null); + zk2.exists("/node1", dataEvents2::add); + + // Add multiple persistent watches + BlockingDeque persistentEvents1 = new LinkedBlockingDeque<>(); + BlockingDeque persistentEvents2 = new LinkedBlockingDeque<>(); + zk2.addWatch("/node1", persistentEvents1::add, AddWatchMode.PERSISTENT); + zk2.addWatch("/node1", persistentEvents2::add, AddWatchMode.PERSISTENT); + + // Add multiple recursive watches + BlockingDeque recursiveEvents1 = new LinkedBlockingDeque<>(); + BlockingDeque recursiveEvents2 = new LinkedBlockingDeque<>(); + zk2.addWatch("/node1", recursiveEvents1::add, AddWatchMode.PERSISTENT_RECURSIVE); + zk2.addWatch("/node1", recursiveEvents2::add, AddWatchMode.PERSISTENT_RECURSIVE); + + assertWatchers(zk2, "/node1", WatcherType.values()); + removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.OK, useAsync); - BlockingDeque persistentEvents = new LinkedBlockingDeque<>(); - BlockingDeque recursiveEvents = new LinkedBlockingDeque<>(); - zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT); - zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE); + assertEvent(childEvents1, EventType.ChildWatchRemoved, "/node1"); + assertEvent(childEvents2, EventType.ChildWatchRemoved, "/node1"); - assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Any), "Server session is not a watcher"); - assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher"); - assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is not a watcher"); - removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.OK, useAsync); - assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher"); - assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Any), "Server session is still a watcher after removal"); - assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal"); - assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is still a watcher after removal"); + assertEvent(dataEvents1, EventType.DataWatchRemoved, "/node1"); + assertEvent(dataEvents2, EventType.DataWatchRemoved, "/node1"); - assertEvent(persistentEvents, EventType.PersistentWatchRemoved, "/node1"); - assertEvent(recursiveEvents, EventType.PersistentWatchRemoved, "/node1"); + assertEvent(persistentEvents1, EventType.PersistentWatchRemoved, "/node1"); + assertEvent(persistentEvents2, EventType.PersistentWatchRemoved, "/node1"); - zk1.delete("/node1", -1); - assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS)); - assertNull(recursiveEvents.poll(10, TimeUnit.MILLISECONDS)); - assertEquals(2, watchCount.getCount(), "Received watch notification after removal!"); + assertEvent(recursiveEvents1, EventType.PersistentWatchRemoved, "/node1"); + assertEvent(recursiveEvents2, EventType.PersistentWatchRemoved, "/node1"); + assertNoWatchers(zk2, "/node1", WatcherType.values()); + + zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk1.setData("/node1", null, -1); + + assertNoEvent(childEvents1); + assertNoEvent(childEvents2); + assertNoEvent(dataEvents1); + assertNoEvent(dataEvents2); + assertNoEvent(persistentEvents1); + assertNoEvent(persistentEvents2); + assertNoEvent(recursiveEvents1); + assertNoEvent(recursiveEvents2); } private static class MyWatchManager extends ZKWatchManager { @@ -1159,4 +1329,12 @@ private void assertEvent(BlockingQueue events, Watcher.Event.Event assertEquals(eventType, event.getType()); assertEquals(path, event.getPath()); } + + /** + * Asserts no event from queue in a short period. + */ + private void assertNoEvent(BlockingQueue events) throws InterruptedException { + // Short timeout so we don't hurt CI too much. It will fail finally given enough run if there are bugs. + assertNull(events.poll(10, TimeUnit.MILLISECONDS)); + } } From 148b212582140fbe1c21362738f50525cc968413 Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Wed, 14 Jun 2023 10:50:54 +0800 Subject: [PATCH 2/3] Assert removing WatcherType::Data from AddWatchMode::Persistent is impossible --- .../apache/zookeeper/RemoveWatchesTest.java | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java index e529c533631..ccc640467e2 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java @@ -953,6 +953,34 @@ public void testRemoveWhenMultiplePersistentRecursiveWatchesOnAPath(boolean useA assertNoEvent(recursiveEvents1); } + /** + * Test verifies {@link OpCode#checkWatches} {@link WatcherType#Persistent} using {@link WatcherType#Data}. + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @Timeout(value = 90) + public void testRemovePersistentWatchesOnAPathPartially(boolean useAsync) throws Exception { + zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + BlockingDeque persistentEvents = new LinkedBlockingDeque<>(); + Watcher persistentWatcher = persistentEvents::add;; + zk2.addWatch("/node1", persistentWatcher, AddWatchMode.PERSISTENT); + + assertWatchers(zk2, "/node1", WatcherType.Persistent); + assertNoWatchers(zk2, "/node1", WatcherType.Data); + removeWatches(zk2, "/node1", persistentWatcher, WatcherType.Data, false, Code.NOWATCHER, useAsync); + assertWatchers(zk2, "/node1", WatcherType.Persistent); + assertNoWatchers(zk2, "/node1", WatcherType.Data); + + zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk1.setData("/node1", null, -1); + + assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1"); + assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1"); + + assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS)); + } + /** * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Data}. * @@ -1083,6 +1111,33 @@ public void testRemoveAllPersistentWatchesOnAPath(boolean useAsync) throws Excep assertNull(persistentEvents2.poll(10, TimeUnit.MILLISECONDS)); } + /** + * Test verifies {@link OpCode#removeWatches} {@link WatcherType#Persistent} using {@link WatcherType#Data}. + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + @Timeout(value = 90) + public void testRemoveAllPersistentWatchesOnAPathPartially(boolean useAsync) throws Exception { + zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + + BlockingDeque persistentEvents = new LinkedBlockingDeque<>(); + zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT); + + assertWatchers(zk2, "/node1", WatcherType.Persistent); + assertNoWatchers(zk2, "/node1", WatcherType.Data); + removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.NOWATCHER, useAsync); + assertWatchers(zk2, "/node1", WatcherType.Persistent); + assertNoWatchers(zk2, "/node1", WatcherType.Data); + + zk1.create("/node1/child1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk1.setData("/node1", null, -1); + + assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1"); + assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1"); + + assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS)); + } + /** * Test verifies {@link OpCode#removeWatches} {@link WatcherType#PersistentRecursive}. * From 7498a854de6cb4b848fef2acda4aa4fa2143f375 Mon Sep 17 00:00:00 2001 From: Kezhu Wang Date: Wed, 14 Jun 2023 11:18:14 +0800 Subject: [PATCH 3/3] fixup! Assert removing WatcherType::Data from AddWatchMode::Persistent is impossible --- .../src/test/java/org/apache/zookeeper/RemoveWatchesTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java index ccc640467e2..5f3cf9b1ded 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/RemoveWatchesTest.java @@ -963,7 +963,7 @@ public void testRemovePersistentWatchesOnAPathPartially(boolean useAsync) throws zk1.create("/node1", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); BlockingDeque persistentEvents = new LinkedBlockingDeque<>(); - Watcher persistentWatcher = persistentEvents::add;; + Watcher persistentWatcher = persistentEvents::add; zk2.addWatch("/node1", persistentWatcher, AddWatchMode.PERSISTENT); assertWatchers(zk2, "/node1", WatcherType.Persistent);