Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1558,16 +1558,16 @@ public boolean containsWatcher(String path, WatcherType type, Watcher watcher) {
boolean containsWatcher = false;
switch (type) {
case Children:
containsWatcher = this.childWatches.containsWatcher(path, watcher);
containsWatcher = this.childWatches.containsWatcher(path, watcher, WatcherMode.STANDARD);
break;
case Data:
containsWatcher = this.dataWatches.containsWatcher(path, watcher);
containsWatcher = this.dataWatches.containsWatcher(path, watcher, WatcherMode.STANDARD);
break;
case Any:
if (this.childWatches.containsWatcher(path, watcher)) {
if (this.childWatches.containsWatcher(path, watcher, null)) {
containsWatcher = true;
}
if (this.dataWatches.containsWatcher(path, watcher)) {
if (this.dataWatches.containsWatcher(path, watcher, null)) {
containsWatcher = true;
}
break;
Expand All @@ -1579,16 +1579,16 @@ public boolean removeWatch(String path, WatcherType type, Watcher watcher) {
boolean removed = false;
switch (type) {
case Children:
removed = this.childWatches.removeWatcher(path, watcher);
removed = this.childWatches.removeWatcher(path, watcher, WatcherMode.STANDARD);
break;
case Data:
removed = this.dataWatches.removeWatcher(path, watcher);
removed = this.dataWatches.removeWatcher(path, watcher, WatcherMode.STANDARD);
break;
case Any:
if (this.childWatches.removeWatcher(path, watcher)) {
if (this.childWatches.removeWatcher(path, watcher, null)) {
removed = true;
}
if (this.dataWatches.removeWatcher(path, watcher)) {
if (this.dataWatches.removeWatcher(path, watcher, null)) {
removed = true;
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.zookeeper.server.watch;

import java.io.PrintWriter;
import javax.annotation.Nullable;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;

Expand Down Expand Up @@ -60,6 +61,21 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode)
*/
boolean containsWatcher(String path, Watcher watcher);

/**
* Checks the specified watcher exists for the given path and mode.
*
* @param path znode path
* @param watcher watcher object reference
* @param watcherMode watcher mode, null for any mode
* @return true if the watcher exists, false otherwise
*/
default boolean containsWatcher(String path, Watcher watcher, @Nullable WatcherMode watcherMode) {
if (watcherMode == null || watcherMode == WatcherMode.DEFAULT_WATCHER_MODE) {
return containsWatcher(path, watcher);
}
throw new UnsupportedOperationException("persistent watch");
}

/**
* Removes the specified watcher for the given path.
*
Expand All @@ -70,6 +86,21 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode)
*/
boolean removeWatcher(String path, Watcher watcher);

/**
* Removes the specified watcher for the given path and mode.
*
* @param path znode path
* @param watcher watcher object reference
* @param watcherMode watcher mode, null to remove all modes
* @return true if the watcher successfully removed, false otherwise
*/
default boolean removeWatcher(String path, Watcher watcher, WatcherMode watcherMode) {
if (watcherMode == null || watcherMode == WatcherMode.DEFAULT_WATCHER_MODE) {
return removeWatcher(path, watcher);
}
throw new UnsupportedOperationException("persistent watch");
}

/**
* The entry to remove the watcher when the cnxn is closed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,36 +249,69 @@ public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {

@Override
public synchronized boolean containsWatcher(String path, Watcher watcher) {
Set<Watcher> list = watchTable.get(path);
return list != null && list.contains(watcher);
return containsWatcher(path, watcher, null);
}

@Override
public synchronized boolean removeWatcher(String path, Watcher watcher) {
public synchronized boolean containsWatcher(String path, Watcher watcher, WatcherMode watcherMode) {
Map<String, WatchStats> paths = watch2Paths.get(watcher);
if (paths == null) {
return false;
}
WatchStats stats = paths.get(path);
return stats != null && (watcherMode == null || stats.hasMode(watcherMode));
}

private WatchStats unwatch(String path, Watcher watcher, Map<String, WatchStats> paths, Set<Watcher> watchers) {
WatchStats stats = paths.remove(path);
if (stats == null) {
return false;
return WatchStats.NONE;
}
if (stats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
--recursiveWatchQty;
if (paths.isEmpty()) {
watch2Paths.remove(watcher);
}
watchers.remove(watcher);
if (watchers.isEmpty()) {
watchTable.remove(path);
}
return stats;
}

Set<Watcher> list = watchTable.get(path);
if (list == null || !list.remove(watcher)) {
LOG.warn("inconsistent watch table for path {}, {} not in watcher list", path, watcher);
@Override
public synchronized boolean removeWatcher(String path, Watcher watcher, WatcherMode watcherMode) {
Map<String, WatchStats> paths = watch2Paths.get(watcher);
Set<Watcher> watchers = watchTable.get(path);
if (paths == null || watchers == null) {
return false;
}

if (list.isEmpty()) {
watchTable.remove(path);
WatchStats oldStats;
WatchStats newStats;
if (watcherMode != null) {
oldStats = paths.getOrDefault(path, WatchStats.NONE);
newStats = oldStats.removeMode(watcherMode);
if (newStats != WatchStats.NONE) {
if (newStats != oldStats) {
paths.put(path, newStats);
}
} else if (oldStats != WatchStats.NONE) {
unwatch(path, watcher, paths, watchers);
}
} else {
oldStats = unwatch(path, watcher, paths, watchers);
newStats = WatchStats.NONE;
}

return true;
if (oldStats.hasMode(WatcherMode.PERSISTENT_RECURSIVE) && !newStats.hasMode(WatcherMode.PERSISTENT_RECURSIVE)) {
--recursiveWatchQty;
}

return oldStats != newStats;
}

@Override
public synchronized boolean removeWatcher(String path, Watcher watcher) {
return removeWatcher(path, watcher, null);
}

// VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -847,11 +850,35 @@ public void testRemoveAllDataWatchesOnAPath(boolean useAsync) throws Exception {
LOG.info("Adding data watcher {} on path {}", w2, "/node1");
assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");

BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);

assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher");
removeAllWatches(zk2, "/node1", WatcherType.Data, false, Code.OK, useAsync);
assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher");

assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal");

zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk1.setData("/node1/child", new byte[0], -1);
zk1.delete("/node1/child", -1);
zk1.setData("/node1", new byte[0], -1);
zk1.delete("/node1", -1);

assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeDeleted, "/node1");

assertEvent(recursiveEvents, EventType.NodeCreated, "/node1/child");
assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1/child");
assertEvent(recursiveEvents, EventType.NodeDeleted, "/node1/child");
assertEvent(recursiveEvents, EventType.NodeDataChanged, "/node1");
assertEvent(recursiveEvents, EventType.NodeDeleted, "/node1");

assertEquals(2, dWatchCount.getCount(), "Received watch notification after removal!");
}

/**
Expand Down Expand Up @@ -895,11 +922,27 @@ public void testRemoveAllChildWatchesOnAPath(boolean useAsync) throws Exception
LOG.info("Adding child watcher {} on path {}", w2, "/node1");
assertEquals(0, zk2.getChildren("/node1", w2).size(), "Didn't set child watches");

BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);

assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is not a watcher");
removeAllWatches(zk2, "/node1", WatcherType.Children, false, Code.OK, useAsync);
assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove child watcher");

assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is still a watcher after removal");

zk1.create("/node1/child", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
zk1.setData("/node1/child", new byte[0], -1);
zk1.delete("/node1/child", -1);
zk1.setData("/node1", new byte[0], -1);
zk1.delete("/node1", -1);

assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeChildrenChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeDataChanged, "/node1");
assertEvent(persistentEvents, EventType.NodeDeleted, "/node1");

assertEquals(2, cWatchCount.getCount(), "Received watch notification after removal!");
}

/**
Expand Down Expand Up @@ -953,10 +996,26 @@ public void testRemoveAllWatchesOnAPath(boolean useAsync) throws Exception {
LOG.info("Adding data watcher {} on path {}", w2, "/node1");
assertNotNull(zk2.exists("/node1", w2), "Didn't set data watches");

BlockingDeque<WatchedEvent> persistentEvents = new LinkedBlockingDeque<>();
BlockingDeque<WatchedEvent> recursiveEvents = new LinkedBlockingDeque<>();
zk2.addWatch("/node1", persistentEvents::add, AddWatchMode.PERSISTENT);
zk2.addWatch("/node1", recursiveEvents::add, AddWatchMode.PERSISTENT_RECURSIVE);

assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Any), "Server session is not a watcher");
assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is not a watcher");
assertTrue(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is not a watcher");
removeAllWatches(zk2, "/node1", WatcherType.Any, false, Code.OK, useAsync);
assertTrue(rmWatchCount.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS), "Didn't remove data watcher");
assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Any), "Server session is still a watcher after removal");
assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Data), "Server session is still a watcher after removal");
assertFalse(isServerSessionWatcher(zk2.getSessionId(), "/node1", WatcherType.Children), "Server session is still a watcher after removal");

assertEvent(persistentEvents, EventType.PersistentWatchRemoved, "/node1");
assertEvent(recursiveEvents, EventType.PersistentWatchRemoved, "/node1");

zk1.delete("/node1", -1);
assertNull(persistentEvents.poll(10, TimeUnit.MILLISECONDS));
assertNull(recursiveEvents.poll(10, TimeUnit.MILLISECONDS));
assertEquals(2, watchCount.getCount(), "Received watch notification after removal!");
}

Expand Down Expand Up @@ -1090,4 +1149,14 @@ private boolean isServerSessionWatcher(long sessionId, String path, WatcherType
return false;
}

/**
* Asserts next event from queue has given event type and path.
*/
private void assertEvent(BlockingQueue<WatchedEvent> events, Watcher.Event.EventType eventType, String path)
throws InterruptedException {
WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
assertNotNull(event);
assertEquals(eventType, event.getType());
assertEquals(path, event.getPath());
}
}
Loading