> getWatches(int rc) {
+ return recursive ? watchManager.persistentRecursiveWatches : watchManager.persistentWatches;
+ }
+
+ @Override
+ protected boolean shouldAddWatch(int rc) {
+ return rc == 0 || rc == KeeperException.Code.NONODE.intValue();
+ }
+ }
+
@InterfaceAudience.Public
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
@@ -2678,6 +2787,77 @@ public void sync(final String path, VoidCallback cb, Object ctx){
clientPath, serverPath, ctx, null);
}
+ /**
+ *
+ * Set a watcher on the given path that: a) does not get removed when triggered (i.e. it stays active
+ * until it is removed); b) optionally applies not only to the registered path but all child paths recursively. This watcher
+ * is triggered for both data and child events. To remove the watcher, use
+ * removeWatches() with WatcherType.Any
+ *
+ *
+ *
+ * If recursive is false, the watcher behaves as if you placed an exists() watch and
+ * a getData() watch on the ZNode at the given path.
+ *
+ *
+ *
+ * If recursive is true, the watcher behaves as if you placed an exists() watch and
+ * a getData() watch on the ZNode at the given path and any ZNodes that are children
+ * of the given path including children added later.
+ *
+ *
+ *
+ * NOTE: when there are active recursive watches there is a small performance decrease as all segments
+ * of ZNode paths must be checked for watch triggering.
+ *
+ *
+ * @param basePath the top path that the watcher applies to
+ * @param watcher the watcher
+ * @param recursive if true applies not only to the registered path but all child paths recursively including
+ * any child nodes added in the future
+ * @throws InterruptedException If the server transaction is interrupted.
+ * @throws KeeperException If the server signals an error with a non-zero
+ * error code.
+ */
+ public void addPersistentWatch(String basePath, Watcher watcher, boolean recursive)
+ throws KeeperException, InterruptedException {
+ PathUtils.validatePath(basePath);
+ String serverPath = prependChroot(basePath);
+
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.addPersistentWatch);
+ AddPersistentWatcherRequest request = new AddPersistentWatcherRequest(serverPath, recursive);
+ ReplyHeader r = cnxn.submitRequest(h, request, new ErrorResponse(),
+ new PersistentWatchRegistration(watcher, basePath, recursive));
+ if (r.getErr() != 0) {
+ throw KeeperException.create(KeeperException.Code.get(r.getErr()),
+ basePath);
+ }
+ }
+
+ /**
+ * Async version of {@link #addPersistentWatch(String, Watcher, boolean)} (see it for details)
+ *
+ * @param basePath the top path that the watcher applies to
+ * @param watcher the watcher
+ * @param recursive if true applies not only to the registered path but all child paths recursively including
+ * any child nodes added in the future
+ * @param cb a handler for the callback
+ * @param ctx context to be provided to the callback
+ * @throws IllegalArgumentException if an invalid path is specified
+ */
+ public void addPersistentWatch(String basePath, Watcher watcher, boolean recursive,
+ VoidCallback cb, Object ctx) {
+ PathUtils.validatePath(basePath);
+ String serverPath = prependChroot(basePath);
+
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.addPersistentWatch);
+ AddPersistentWatcherRequest request = new AddPersistentWatcherRequest(serverPath, recursive);
+ cnxn.queuePacket(h, new ReplyHeader(), request, new ErrorResponse(), cb,
+ basePath, serverPath, ctx, new PersistentWatchRegistration(watcher, basePath, recursive));
+ }
+
/**
* For the given znode path, removes the specified watcher of given
* watcherType.
diff --git a/src/java/main/org/apache/zookeeper/server/DataTree.java b/src/java/main/org/apache/zookeeper/server/DataTree.java
index 55f15bde02d..148f9c9ba6f 100644
--- a/src/java/main/org/apache/zookeeper/server/DataTree.java
+++ b/src/java/main/org/apache/zookeeper/server/DataTree.java
@@ -633,6 +633,12 @@ public String getMaxPrefixWithQuota(String path) {
}
}
+ public void addPersistentWatch(String basePath, Watcher watcher, boolean recursive) {
+ WatchManager.Type type = recursive ? WatchManager.Type.PERSISTENT_RECURSIVE : WatchManager.Type.PERSISTENT;
+ dataWatches.addWatch(basePath, watcher, type);
+ childWatches.addWatch(basePath, watcher, type);
+ }
+
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
@@ -642,7 +648,7 @@ public byte[] getData(String path, Stat stat, Watcher watcher)
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
- dataWatches.addWatch(path, watcher);
+ dataWatches.addWatch(path, watcher, WatchManager.Type.STANDARD);
}
return n.data;
}
@@ -653,7 +659,7 @@ public Stat statNode(String path, Watcher watcher)
Stat stat = new Stat();
DataNode n = nodes.get(path);
if (watcher != null) {
- dataWatches.addWatch(path, watcher);
+ dataWatches.addWatch(path, watcher, WatchManager.Type.STANDARD);
}
if (n == null) {
throw new KeeperException.NoNodeException();
@@ -677,7 +683,7 @@ public List getChildren(String path, Stat stat, Watcher watcher)
List children=new ArrayList(n.getChildren());
if (watcher != null) {
- childWatches.addWatch(path, watcher);
+ childWatches.addWatch(path, watcher, WatchManager.Type.STANDARD);
}
return children;
}
@@ -1320,6 +1326,13 @@ public void removeCnxn(Watcher watcher) {
public void setWatches(long relativeZxid, List dataWatches,
List existWatches, List childWatches,
Watcher watcher) {
+ setWatches(relativeZxid, dataWatches, existWatches, childWatches,
+ Collections.emptyList(), Collections.emptyList(), watcher);
+ }
+
+ public void setWatches(long relativeZxid, List dataWatches,
+ List existWatches, List childWatches, List persistentWatches,
+ List persistentRecursiveWatches, Watcher watcher) {
for (String path : dataWatches) {
DataNode node = getNode(path);
WatchedEvent e = null;
@@ -1330,7 +1343,7 @@ public void setWatches(long relativeZxid, List dataWatches,
watcher.process(new WatchedEvent(EventType.NodeDataChanged,
KeeperState.SyncConnected, path));
} else {
- this.dataWatches.addWatch(path, watcher);
+ this.dataWatches.addWatch(path, watcher, WatchManager.Type.STANDARD);
}
}
for (String path : existWatches) {
@@ -1339,7 +1352,7 @@ public void setWatches(long relativeZxid, List dataWatches,
watcher.process(new WatchedEvent(EventType.NodeCreated,
KeeperState.SyncConnected, path));
} else {
- this.dataWatches.addWatch(path, watcher);
+ this.dataWatches.addWatch(path, watcher, WatchManager.Type.STANDARD);
}
}
for (String path : childWatches) {
@@ -1351,8 +1364,16 @@ public void setWatches(long relativeZxid, List dataWatches,
watcher.process(new WatchedEvent(EventType.NodeChildrenChanged,
KeeperState.SyncConnected, path));
} else {
- this.childWatches.addWatch(path, watcher);
- }
+ this.childWatches.addWatch(path, watcher, WatchManager.Type.STANDARD);
+ }
+ }
+ for (String path : persistentWatches) {
+ this.childWatches.addWatch(path, watcher, WatchManager.Type.PERSISTENT);
+ this.dataWatches.addWatch(path, watcher, WatchManager.Type.PERSISTENT);
+ }
+ for (String path : persistentRecursiveWatches) {
+ this.childWatches.addWatch(path, watcher, WatchManager.Type.PERSISTENT_RECURSIVE);
+ this.dataWatches.addWatch(path, watcher, WatchManager.Type.PERSISTENT_RECURSIVE);
}
}
diff --git a/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java b/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
index 4e1d7627bb3..70c9251fb54 100644
--- a/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ b/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -35,9 +35,11 @@
import org.apache.zookeeper.common.Time;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.AddPersistentWatcherRequest;
import org.apache.zookeeper.proto.CheckWatchesRequest;
import org.apache.zookeeper.proto.Create2Response;
import org.apache.zookeeper.proto.CreateResponse;
+import org.apache.zookeeper.proto.ErrorResponse;
import org.apache.zookeeper.proto.ExistsRequest;
import org.apache.zookeeper.proto.ExistsResponse;
import org.apache.zookeeper.proto.GetACLRequest;
@@ -53,6 +55,7 @@
import org.apache.zookeeper.proto.SetACLResponse;
import org.apache.zookeeper.proto.SetDataResponse;
import org.apache.zookeeper.proto.SetWatches;
+import org.apache.zookeeper.proto.SetWatches2;
import org.apache.zookeeper.proto.SyncRequest;
import org.apache.zookeeper.proto.SyncResponse;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
@@ -341,6 +344,32 @@ public void processRequest(Request request) {
setWatches.getChildWatches(), cnxn);
break;
}
+ case OpCode.setWatches2: {
+ lastOp = "STW2";
+ SetWatches2 setWatches = new SetWatches2();
+ // XXX We really should NOT need this!!!!
+ request.request.rewind();
+ ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
+ long relativeZxid = setWatches.getRelativeZxid();
+ zks.getZKDatabase().setWatches(relativeZxid,
+ setWatches.getDataWatches(),
+ setWatches.getExistWatches(),
+ setWatches.getChildWatches(),
+ setWatches.getPersistentWatches(),
+ setWatches.getPersistentRecursiveWatches(),
+ cnxn);
+ break;
+ }
+ case OpCode.addPersistentWatch: {
+ lastOp = "APEW";
+ AddPersistentWatcherRequest addPersistentWatcherRequest = new AddPersistentWatcherRequest();
+ ByteBufferInputStream.byteBuffer2Record(request.request,
+ addPersistentWatcherRequest);
+ zks.getZKDatabase().addPersistentWatch(addPersistentWatcherRequest.getPath(), cnxn,
+ addPersistentWatcherRequest.getRecursive());
+ rsp = new ErrorResponse(0);
+ break;
+ }
case OpCode.getACL: {
lastOp = "GETA";
GetACLRequest getACLRequest = new GetACLRequest();
diff --git a/src/java/main/org/apache/zookeeper/server/PathParentIterator.java b/src/java/main/org/apache/zookeeper/server/PathParentIterator.java
new file mode 100644
index 00000000000..d1aaf8cfb6f
--- /dev/null
+++ b/src/java/main/org/apache/zookeeper/server/PathParentIterator.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterates over a ZooKeeper path. Each iteration goes up one parent path. Thus, the
+ * effect of the iterator is to iterate over the initial path and then all of its parents.
+ */
+public class PathParentIterator implements Iterator {
+ private String path;
+ private final int maxLevel;
+ private int level = -1;
+
+ /**
+ * Return a new PathParentIterator that iterates from the
+ * given path to all parents.
+ *
+ * @param path initial path
+ */
+ public static PathParentIterator forAll(String path) {
+ return new PathParentIterator(path, Integer.MAX_VALUE);
+ }
+
+ /**
+ * Return a new PathParentIterator that only returns the given path - i.e.
+ * does not iterate to parent paths.
+ *
+ * @param path initial path
+ */
+ public static PathParentIterator forPathOnly(String path) {
+ return new PathParentIterator(path, 0);
+ }
+
+ private PathParentIterator(String path, int maxLevel) {
+ // NOTE: asserts that the path has already been validated
+ this.path = path;
+ this.maxLevel = maxLevel;
+ }
+
+ /**
+ * Return an Iterable view so that this Iterator can be used in for each
+ * statements. IMPORTANT: the returned Iterable is single use only
+ * @return Iterable
+ */
+ public Iterable asIterable() {
+ return new Iterable() {
+ @Override
+ public Iterator iterator() {
+ return PathParentIterator.this;
+ }
+ };
+ }
+
+ @Override
+ public boolean hasNext() {
+ return !path.isEmpty() && (level < maxLevel);
+ }
+
+ /**
+ * Returns true if this iterator is currently at a parent path as opposed
+ * to the initial path given to the constructor
+ *
+ * @return true/false
+ */
+ public boolean atParentPath() {
+ return level > 0;
+ }
+
+ @Override
+ public String next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ String localPath = path;
+ ++level;
+ if (path.equals("/")) {
+ path = "";
+ } else {
+ path = path.substring(0, path.lastIndexOf('/'));
+ if (path.length() == 0) {
+ path = "/";
+ }
+ }
+ return localPath;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git a/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java b/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
index 537ef71a135..e33cee4fe15 100644
--- a/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
+++ b/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -864,8 +864,10 @@ protected void pRequest(Request request) throws RequestProcessorException {
case OpCode.getChildren2:
case OpCode.ping:
case OpCode.setWatches:
+ case OpCode.setWatches2:
case OpCode.checkWatches:
case OpCode.removeWatches:
+ case OpCode.addPersistentWatch:
zks.sessionTracker.checkSession(request.sessionId,
request.getOwner());
break;
diff --git a/src/java/main/org/apache/zookeeper/server/Request.java b/src/java/main/org/apache/zookeeper/server/Request.java
index ede9280441a..03b775bcf4b 100644
--- a/src/java/main/org/apache/zookeeper/server/Request.java
+++ b/src/java/main/org/apache/zookeeper/server/Request.java
@@ -153,9 +153,11 @@ static boolean isValid(int type) {
case OpCode.setACL:
case OpCode.setData:
case OpCode.setWatches:
+ case OpCode.setWatches2:
case OpCode.sync:
case OpCode.checkWatches:
case OpCode.removeWatches:
+ case OpCode.addPersistentWatch:
return true;
default:
return false;
@@ -205,6 +207,8 @@ static String op2String(int op) {
return "createContainer";
case OpCode.setWatches:
return "setWatches";
+ case OpCode.setWatches2:
+ return "setWatches2";
case OpCode.delete:
return "delete";
case OpCode.deleteContainer:
@@ -243,6 +247,8 @@ static String op2String(int op) {
return "checkWatches";
case OpCode.removeWatches:
return "removeWatches";
+ case OpCode.addPersistentWatch:
+ return "addPersistentWatch";
default:
return "unknown " + op;
}
@@ -263,6 +269,7 @@ public String toString() {
String path = "n/a";
if (type != OpCode.createSession
&& type != OpCode.setWatches
+ && type != OpCode.setWatches2
&& type != OpCode.closeSession
&& request != null
&& request.remaining() >= 4)
diff --git a/src/java/main/org/apache/zookeeper/server/TraceFormatter.java b/src/java/main/org/apache/zookeeper/server/TraceFormatter.java
index 63a3edd6ec2..8735c127970 100644
--- a/src/java/main/org/apache/zookeeper/server/TraceFormatter.java
+++ b/src/java/main/org/apache/zookeeper/server/TraceFormatter.java
@@ -71,6 +71,8 @@ public static String op2String(int op) {
return "error";
case OpCode.reconfig:
return "reconfig";
+ case OpCode.addPersistentWatch:
+ return "addPersistentWatch";
default:
return "unknown " + op;
}
diff --git a/src/java/main/org/apache/zookeeper/server/WatchManager.java b/src/java/main/org/apache/zookeeper/server/WatchManager.java
index 076f64501f8..ffdd0f8b119 100644
--- a/src/java/main/org/apache/zookeeper/server/WatchManager.java
+++ b/src/java/main/org/apache/zookeeper/server/WatchManager.java
@@ -21,7 +21,7 @@
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -40,50 +40,110 @@
class WatchManager {
private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);
- private final Map> watchTable =
- new HashMap>();
+ enum Type {
+ STANDARD() {
+ @Override
+ boolean isPersistent() {
+ return false;
+ }
+
+ @Override
+ boolean isRecursive() {
+ return false;
+ }
+ },
+ PERSISTENT() {
+ @Override
+ boolean isPersistent() {
+ return true;
+ }
+
+ @Override
+ boolean isRecursive() {
+ return false;
+ }
+ },
+ PERSISTENT_RECURSIVE() {
+ @Override
+ boolean isPersistent() {
+ return true;
+ }
+
+ @Override
+ boolean isRecursive() {
+ return true;
+ }
+ }
+ ;
+
+ abstract boolean isPersistent();
+ abstract boolean isRecursive();
+ }
+
+ private final Map> watchTable =
+ new HashMap<>();
private final Map> watch2Paths =
- new HashMap>();
+ new HashMap<>();
+
+ private int recursiveWatchQty = 0; // guarded by sync
+
+ // visible for testing
+ synchronized int getRecursiveWatchQty() {
+ return recursiveWatchQty;
+ }
synchronized int size(){
int result = 0;
- for(Set watches : watchTable.values()) {
+ for(Map watches : watchTable.values()) {
result += watches.size();
}
return result;
}
- synchronized void addWatch(String path, Watcher watcher) {
- Set list = watchTable.get(path);
+ synchronized void addWatch(String path, Watcher watcher, WatchManager.Type type) {
+ Map list = watchTable.get(path);
if (list == null) {
// don't waste memory if there are few watches on a node
// rehash when the 4th entry is added, doubling size thereafter
// seems like a good compromise
- list = new HashSet(4);
+ list = new HashMap<>(4);
watchTable.put(path, list);
}
- list.add(watcher);
+ Type previousType = list.put(watcher, type);
+ if (safeIsRecursive(previousType)) {
+ --recursiveWatchQty;
+ }
+ if (type.isRecursive()) {
+ ++recursiveWatchQty;
+ }
Set paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
- paths = new HashSet();
+ paths = new HashSet<>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}
+ private boolean safeIsRecursive(Type type) {
+ return (type != null) && type.isRecursive();
+ }
+
synchronized void removeWatcher(Watcher watcher) {
Set paths = watch2Paths.remove(watcher);
if (paths == null) {
return;
}
for (String p : paths) {
- Set list = watchTable.get(p);
+ Map list = watchTable.get(p);
if (list != null) {
- list.remove(watcher);
- if (list.size() == 0) {
+ Type removedType = list.remove(watcher);
+ if (safeIsRecursive(removedType)) {
+ --recursiveWatchQty;
+ }
+ if (list.isEmpty()) {
watchTable.remove(p);
}
}
@@ -97,24 +157,47 @@ Set triggerWatch(String path, EventType type) {
Set triggerWatch(String path, EventType type, Set supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
- Set watchers;
+ Set watchers = new HashSet<>();
synchronized (this) {
- watchers = watchTable.remove(path);
- if (watchers == null || watchers.isEmpty()) {
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG,
- ZooTrace.EVENT_DELIVERY_TRACE_MASK,
- "No watchers for " + path);
+ PathParentIterator pathParentIterator = getPathParentIterator(path);
+ for (String localPath : pathParentIterator.asIterable()) {
+ Map thisWatchers = watchTable.get(localPath);
+ if (thisWatchers == null || thisWatchers.isEmpty()) {
+ continue;
}
- return null;
- }
- for (Watcher w : watchers) {
- Set paths = watch2Paths.get(w);
- if (paths != null) {
- paths.remove(path);
+ Iterator> iterator = thisWatchers.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Entry entry = iterator.next();
+ Type entryType = entry.getValue();
+ Watcher watcher = entry.getKey();
+ if (entryType.isRecursive()) {
+ if ( type != EventType.NodeChildrenChanged ) {
+ watchers.add(watcher);
+ }
+ } else if (!pathParentIterator.atParentPath()) {
+ watchers.add(watcher);
+ if (!entryType.isPersistent()) {
+ iterator.remove();
+ Set paths = watch2Paths.get(watcher);
+ if (paths != null) {
+ paths.remove(localPath);
+ }
+ }
+ }
+ }
+ if (thisWatchers.isEmpty()) {
+ watchTable.remove(localPath);
}
}
}
+ if (watchers.isEmpty()) {
+ if (LOG.isTraceEnabled()) {
+ ZooTrace.logTraceMessage(LOG,
+ ZooTrace.EVENT_DELIVERY_TRACE_MASK,
+ "No watchers for " + path );
+ }
+ return null;
+ }
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
@@ -147,13 +230,12 @@ public synchronized String toString() {
* String representation of watches. Warning, may be large!
* @param byPath iff true output watches by paths, otw output
* watches by connection
- * @return string representation of watches
*/
synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
if (byPath) {
- for (Entry> e : watchTable.entrySet()) {
+ for (Entry> e : watchTable.entrySet()) {
pwriter.println(e.getKey());
- for (Watcher w : e.getValue()) {
+ for (Watcher w : e.getValue().keySet()) {
pwriter.print("\t0x");
pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId()));
pwriter.print("\n");
@@ -181,11 +263,20 @@ synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
* @return true if the watcher exists, false otherwise
*/
synchronized boolean containsWatcher(String path, Watcher watcher) {
- Set paths = watch2Paths.get(watcher);
- if (paths == null || !paths.contains(path)) {
- return false;
+ PathParentIterator pathParentIterator = getPathParentIterator(path);
+ for (String localPath : pathParentIterator.asIterable()) {
+ Map watchers = watchTable.get(localPath);
+ Type watcherType = (watchers != null) ? watchers.get(watcher) : null;
+ if ( !pathParentIterator.atParentPath() ) {
+ if ( watcherType != null ) {
+ return true; // at the leaf node, all watcher types match
+ }
+ }
+ if (watcherType == Type.PERSISTENT_RECURSIVE) {
+ return true;
+ }
}
- return true;
+ return false;
}
/**
@@ -203,12 +294,16 @@ synchronized boolean removeWatcher(String path, Watcher watcher) {
return false;
}
- Set list = watchTable.get(path);
- if (list == null || !list.remove(watcher)) {
+ Map list = watchTable.get(path);
+ Type removedType = (list != null) ? list.remove(watcher) : null;
+ if (removedType == null) {
return false;
}
+ if (safeIsRecursive(removedType)) {
+ --recursiveWatchQty;
+ }
- if (list.size() == 0) {
+ if (list.isEmpty()) {
watchTable.remove(path);
}
@@ -222,10 +317,10 @@ synchronized boolean removeWatcher(String path, Watcher watcher) {
* @see WatchesReport
*/
synchronized WatchesReport getWatches() {
- Map> id2paths = new HashMap>();
+ Map> id2paths = new HashMap<>();
for (Entry> e: watch2Paths.entrySet()) {
Long id = ((ServerCnxn) e.getKey()).getSessionId();
- Set paths = new HashSet(e.getValue());
+ Set paths = new HashSet<>(e.getValue());
id2paths.put(id, paths);
}
return new WatchesReport(id2paths);
@@ -238,11 +333,11 @@ synchronized WatchesReport getWatches() {
* @see WatchesPathReport
*/
synchronized WatchesPathReport getWatchesByPath() {
- Map> path2ids = new HashMap>();
- for (Entry> e : watchTable.entrySet()) {
- Set ids = new HashSet(e.getValue().size());
+ Map> path2ids = new HashMap<>();
+ for (Entry> e : watchTable.entrySet()) {
+ Set ids = new HashSet<>(e.getValue().size());
path2ids.put(e.getKey(), ids);
- for (Watcher watcher : e.getValue()) {
+ for (Watcher watcher : e.getValue().keySet()) {
ids.add(((ServerCnxn) watcher).getSessionId());
}
}
@@ -263,4 +358,15 @@ synchronized WatchesSummary getWatchesSummary() {
return new WatchesSummary (watch2Paths.size(), watchTable.size(),
totalWatches);
}
+
+ private PathParentIterator getPathParentIterator(String path) {
+ if (recursiveWatchQty == 0) {
+ return PathParentIterator.forPathOnly(path);
+ }
+ return PathParentIterator.forAll(path);
+ }
+
+ private String getParent(String path) {
+ return path.substring(0, path.lastIndexOf('/'));
+ }
}
diff --git a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
index 16baf46f05f..f1fc02e5b0c 100644
--- a/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
+++ b/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
@@ -475,6 +475,23 @@ public void setWatches(long relativeZxid, List dataWatches,
dataTree.setWatches(relativeZxid, dataWatches, existWatches, childWatches, watcher);
}
+ /**
+ * set watches on the datatree
+ * @param relativeZxid the relative zxid that client has seen
+ * @param dataWatches the data watches the client wants to reset
+ * @param existWatches the exists watches the client wants to reset
+ * @param childWatches the child watches the client wants to reset
+ * @param persistentWatches the persistent watches the client wants to reset
+ * @param persistentRecursiveWatches the persistent recursive watches the client wants to reset
+ * @param watcher the watcher function
+ */
+ public void setWatches(long relativeZxid, List dataWatches,
+ List existWatches, List childWatches,
+ List persistentWatches, List persistentRecursiveWatches, Watcher watcher) {
+ dataTree.setWatches(relativeZxid, dataWatches, existWatches, childWatches, persistentWatches,
+ persistentRecursiveWatches, watcher);
+ }
+
/**
* get acl for a path
* @param path the path to query for acl
@@ -639,4 +656,18 @@ public boolean containsWatcher(String path, WatcherType type, Watcher watcher) {
public boolean removeWatch(String path, WatcherType type, Watcher watcher) {
return dataTree.removeWatch(path, type, watcher);
}
+
+ /**
+ * Add a persistent watch
+ *
+ * @param basePath
+ * watch base
+ * @param watcher
+ * the watcher
+ * @param recursive
+ * true if recursive
+ */
+ public void addPersistentWatch(String basePath, Watcher watcher, boolean recursive) {
+ dataTree.addPersistentWatch(basePath, watcher, recursive);
+ }
}
diff --git a/src/java/test/org/apache/zookeeper/server/RecursiveWatchQtyTest.java b/src/java/test/org/apache/zookeeper/server/RecursiveWatchQtyTest.java
new file mode 100644
index 00000000000..3b0f88efdc0
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/RecursiveWatchQtyTest.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RecursiveWatchQtyTest {
+ private WatchManager watchManager;
+
+ private static class DummyWatcher implements Watcher {
+ @Override
+ public void process(WatchedEvent event) {
+ // NOP
+ }
+ }
+
+ @Before
+ public void setup() {
+ watchManager = new WatchManager();
+ }
+
+ @Test
+ public void testAddRemove() {
+ Watcher watcher1 = new DummyWatcher();
+ Watcher watcher2 = new DummyWatcher();
+
+ watchManager.addWatch("/a", watcher1, WatchManager.Type.PERSISTENT_RECURSIVE);
+ watchManager.addWatch("/b", watcher2, WatchManager.Type.PERSISTENT_RECURSIVE);
+ Assert.assertEquals(2, watchManager.getRecursiveWatchQty());
+ Assert.assertTrue(watchManager.removeWatcher("/a", watcher1));
+ Assert.assertTrue(watchManager.removeWatcher("/b", watcher2));
+ Assert.assertEquals(0, watchManager.getRecursiveWatchQty());
+ }
+
+ @Test
+ public void testAddRemoveAlt() {
+ Watcher watcher1 = new DummyWatcher();
+ Watcher watcher2 = new DummyWatcher();
+
+ watchManager.addWatch("/a", watcher1, WatchManager.Type.PERSISTENT_RECURSIVE);
+ watchManager.addWatch("/b", watcher2, WatchManager.Type.PERSISTENT_RECURSIVE);
+ Assert.assertEquals(2, watchManager.getRecursiveWatchQty());
+ watchManager.removeWatcher(watcher1);
+ watchManager.removeWatcher(watcher2);
+ Assert.assertEquals(0, watchManager.getRecursiveWatchQty());
+ }
+
+ @Test
+ public void testDoubleAdd() {
+ Watcher watcher = new DummyWatcher();
+
+ watchManager.addWatch("/a", watcher, WatchManager.Type.PERSISTENT_RECURSIVE);
+ watchManager.addWatch("/a", watcher, WatchManager.Type.PERSISTENT_RECURSIVE);
+ Assert.assertEquals(1, watchManager.getRecursiveWatchQty());
+ watchManager.removeWatcher(watcher);
+ Assert.assertEquals(0, watchManager.getRecursiveWatchQty());
+ }
+
+ @Test
+ public void testSameWatcherMultiPath() {
+ Watcher watcher = new DummyWatcher();
+
+ watchManager.addWatch("/a", watcher, WatchManager.Type.PERSISTENT_RECURSIVE);
+ watchManager.addWatch("/a/b", watcher, WatchManager.Type.PERSISTENT_RECURSIVE);
+ watchManager.addWatch("/a/b/c", watcher, WatchManager.Type.PERSISTENT_RECURSIVE);
+ Assert.assertEquals(3, watchManager.getRecursiveWatchQty());
+ Assert.assertTrue(watchManager.removeWatcher("/a/b", watcher));
+ Assert.assertEquals(2, watchManager.getRecursiveWatchQty());
+ watchManager.removeWatcher(watcher);
+ Assert.assertEquals(0, watchManager.getRecursiveWatchQty());
+ }
+
+ @Test
+ public void testChangeType() {
+ Watcher watcher = new DummyWatcher();
+
+ watchManager.addWatch("/a", watcher, WatchManager.Type.PERSISTENT);
+ Assert.assertEquals(0, watchManager.getRecursiveWatchQty());
+ watchManager.addWatch("/a", watcher, WatchManager.Type.PERSISTENT_RECURSIVE);
+ Assert.assertEquals(1, watchManager.getRecursiveWatchQty());
+ watchManager.addWatch("/a", watcher, WatchManager.Type.STANDARD);
+ Assert.assertEquals(0, watchManager.getRecursiveWatchQty());
+ Assert.assertTrue(watchManager.removeWatcher("/a", watcher));
+ Assert.assertEquals(0, watchManager.getRecursiveWatchQty());
+ }
+}
diff --git a/src/java/test/org/apache/zookeeper/test/PathParentIteratorTest.java b/src/java/test/org/apache/zookeeper/test/PathParentIteratorTest.java
new file mode 100644
index 00000000000..6f551d171a6
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/PathParentIteratorTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.server.PathParentIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PathParentIteratorTest {
+ @Test
+ public void testRoot() {
+ PathParentIterator pathParentIterator = PathParentIterator.forAll("/");
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertFalse(pathParentIterator.atParentPath());
+ Assert.assertEquals(pathParentIterator.next(), "/");
+ Assert.assertFalse(pathParentIterator.hasNext());
+ }
+
+ @Test
+ public void test1Level() {
+ PathParentIterator pathParentIterator = PathParentIterator.forAll("/a");
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertFalse(pathParentIterator.atParentPath());
+ Assert.assertEquals(pathParentIterator.next(), "/a");
+
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertEquals(pathParentIterator.next(), "/");
+ Assert.assertTrue(pathParentIterator.atParentPath());
+
+ Assert.assertFalse(pathParentIterator.hasNext());
+ }
+
+ @Test
+ public void testLong() {
+ PathParentIterator pathParentIterator = PathParentIterator.forAll("/a/b/c/d");
+
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertEquals(pathParentIterator.next(), "/a/b/c/d");
+ Assert.assertFalse(pathParentIterator.atParentPath());
+
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertEquals(pathParentIterator.next(), "/a/b/c");
+ Assert.assertTrue(pathParentIterator.atParentPath());
+
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertEquals(pathParentIterator.next(), "/a/b");
+ Assert.assertTrue(pathParentIterator.atParentPath());
+
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertEquals(pathParentIterator.next(), "/a");
+ Assert.assertTrue(pathParentIterator.atParentPath());
+
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertEquals(pathParentIterator.next(), "/");
+ Assert.assertTrue(pathParentIterator.atParentPath());
+
+ Assert.assertFalse(pathParentIterator.hasNext());
+ }
+
+ @Test
+ public void testForPathOnly() {
+ PathParentIterator pathParentIterator = PathParentIterator.forPathOnly("/a/b/c/d");
+ Assert.assertTrue(pathParentIterator.hasNext());
+ Assert.assertEquals(pathParentIterator.next(), "/a/b/c/d");
+ Assert.assertFalse(pathParentIterator.atParentPath());
+
+ Assert.assertFalse(pathParentIterator.hasNext());
+ }
+}
diff --git a/src/java/test/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java b/src/java/test/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
new file mode 100644
index 00000000000..55b67337cdf
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class PersistentRecursiveWatcherTest extends ClientBase {
+ private static final Logger LOG = LoggerFactory.getLogger(PersistentRecursiveWatcherTest.class);
+ private BlockingQueue events;
+ private Watcher persistentWatcher;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ events = new LinkedBlockingQueue<>();
+ persistentWatcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ events.add(event);
+ }
+ };
+ }
+
+ @Test
+ public void testBasic()
+ throws IOException, InterruptedException, KeeperException {
+ try ( ZooKeeper zk = createClient(new CountdownWatcher(), hostPort) ) {
+ zk.addPersistentWatch("/a/b", persistentWatcher, true);
+ internalTestBasic(zk);
+ }
+ }
+
+ @Test
+ public void testBasicAsync()
+ throws IOException, InterruptedException, KeeperException {
+ try ( ZooKeeper zk = createClient(new CountdownWatcher(), hostPort) ) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ AsyncCallback.VoidCallback cb = new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (rc == 0) {
+ latch.countDown();
+ }
+ }
+ };
+ zk.addPersistentWatch("/a/b", persistentWatcher, true, cb, null);
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ internalTestBasic(zk);
+ }
+ }
+
+ private void internalTestBasic(ZooKeeper zk) throws KeeperException, InterruptedException {
+ zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.setData("/a/b/c/d/e", new byte[0], -1);
+ zk.delete("/a/b/c/d/e", -1);
+ zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c/d/e");
+ assertEvent(events, Watcher.Event.EventType.NodeDeleted, "/a/b/c/d/e");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e");
+ }
+
+ @Test
+ public void testRemoval()
+ throws IOException, InterruptedException, KeeperException {
+ try ( ZooKeeper zk = createClient(new CountdownWatcher(), hostPort) ) {
+ zk.addPersistentWatch("/a/b", persistentWatcher, true);
+ zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c");
+
+ zk.removeWatches("/a/b", persistentWatcher, Watcher.WatcherType.Any, false);
+ zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ assertEvent(events, Watcher.Event.EventType.PersistentWatchRemoved, "/a/b");
+ }
+ }
+
+ @Test
+ public void testDisconnect() throws Exception {
+ try ( ZooKeeper zk = createClient(new CountdownWatcher(), hostPort) ) {
+ zk.addPersistentWatch("/a/b", persistentWatcher, true);
+ stopServer();
+ assertEvent(events, Watcher.Event.EventType.None, null);
+ startServer();
+ assertEvent(events, Watcher.Event.EventType.None, null);
+ internalTestBasic(zk);
+ }
+ }
+
+ @Test
+ public void testMultiClient()
+ throws IOException, InterruptedException, KeeperException {
+ ZooKeeper zk1 = null;
+ ZooKeeper zk2 = null;
+ try {
+ zk1 = createClient(new CountdownWatcher(), hostPort);
+ zk2 = createClient(new CountdownWatcher(), hostPort);
+
+ zk1.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk1.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk1.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ zk1.addPersistentWatch("/a/b", persistentWatcher, true);
+ zk1.setData("/a/b/c", "one".getBytes(), -1);
+ Thread.sleep(1000); // give some time for the event to arrive
+
+ zk2.setData("/a/b/c", "two".getBytes(), -1);
+ zk2.setData("/a/b/c", "three".getBytes(), -1);
+ zk2.setData("/a/b/c", "four".getBytes(), -1);
+
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c");
+ } finally {
+ if (zk1 != null) {
+ zk1.close();
+ }
+ if (zk2 != null) {
+ zk2.close();
+ }
+ }
+ }
+
+ @Test
+ public void testRootWatcher()
+ throws IOException, InterruptedException, KeeperException {
+ try ( ZooKeeper zk = createClient(new CountdownWatcher(), hostPort) ) {
+ zk.addPersistentWatch("/", persistentWatcher, true);
+ zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b/c");
+ }
+ }
+
+ private void assertEvent(BlockingQueue events, Watcher.Event.EventType eventType, String path)
+ throws InterruptedException {
+ WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(event);
+ Assert.assertEquals(eventType, event.getType());
+ Assert.assertEquals(path, event.getPath());
+ }
+}
diff --git a/src/java/test/org/apache/zookeeper/test/PersistentWatcherTest.java b/src/java/test/org/apache/zookeeper/test/PersistentWatcherTest.java
new file mode 100644
index 00000000000..6e4885cbd8f
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/test/PersistentWatcherTest.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public class PersistentWatcherTest extends ClientBase {
+ private static final Logger LOG = LoggerFactory.getLogger(PersistentWatcherTest.class);
+ private BlockingQueue events;
+ private Watcher persistentWatcher;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ events = new LinkedBlockingQueue<>();
+ persistentWatcher = new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ events.add(event);
+ }
+ };
+ }
+
+ @Test
+ public void testBasic()
+ throws IOException, InterruptedException, KeeperException {
+ try ( ZooKeeper zk = createClient(new CountdownWatcher(), hostPort) ) {
+ zk.addPersistentWatch("/a/b", persistentWatcher, false);
+ internalTestBasic(zk);
+ }
+ }
+
+ @Test
+ public void testBasicAsync()
+ throws IOException, InterruptedException, KeeperException {
+ try ( ZooKeeper zk = createClient(new CountdownWatcher(), hostPort) ) {
+ final CountDownLatch latch = new CountDownLatch(1);
+ AsyncCallback.VoidCallback cb = new AsyncCallback.VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ if (rc == 0) {
+ latch.countDown();
+ }
+ }
+ };
+ zk.addPersistentWatch("/a/b", persistentWatcher, false, cb, null);
+ Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
+ internalTestBasic(zk);
+ }
+ }
+
+ private void internalTestBasic(ZooKeeper zk) throws KeeperException, InterruptedException {
+ zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.setData("/a/b", new byte[0], -1);
+ zk.delete("/a/b/c", -1);
+ zk.delete("/a/b", -1);
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeDeleted, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+ }
+
+ @Test
+ public void testRemoval()
+ throws IOException, InterruptedException, KeeperException {
+ try ( ZooKeeper zk = createClient(new CountdownWatcher(), hostPort) ) {
+ zk.addPersistentWatch("/a/b", persistentWatcher, false);
+ zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/a/b");
+
+ zk.removeWatches("/a/b", persistentWatcher, Watcher.WatcherType.Any, false);
+ zk.delete("/a/b/c", -1);
+ zk.delete("/a/b", -1);
+ assertEvent(events, Watcher.Event.EventType.PersistentWatchRemoved, "/a/b");
+ }
+ }
+
+ @Test
+ public void testDisconnect() throws Exception {
+ try ( ZooKeeper zk = createClient(new CountdownWatcher(), hostPort) ) {
+ zk.addPersistentWatch("/a/b", persistentWatcher, false);
+ stopServer();
+ assertEvent(events, Watcher.Event.EventType.None, null);
+ startServer();
+ assertEvent(events, Watcher.Event.EventType.None, null);
+ internalTestBasic(zk);
+ }
+ }
+
+ @Test
+ public void testMultiClient()
+ throws IOException, InterruptedException, KeeperException {
+ ZooKeeper zk1 = null;
+ ZooKeeper zk2 = null;
+ try {
+ zk1 = createClient(new CountdownWatcher(), hostPort);
+ zk2 = createClient(new CountdownWatcher(), hostPort);
+
+ zk1.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk1.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ zk1.addPersistentWatch("/a/b", persistentWatcher, false);
+ zk1.setData("/a/b", "one".getBytes(), -1);
+ Thread.sleep(1000); // give some time for the event to arrive
+
+ zk2.setData("/a/b", "two".getBytes(), -1);
+ zk2.setData("/a/b", "three".getBytes(), -1);
+ zk2.setData("/a/b", "four".getBytes(), -1);
+
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
+ assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b");
+ } finally {
+ if (zk1 != null) {
+ zk1.close();
+ }
+ if (zk2 != null) {
+ zk2.close();
+ }
+ }
+ }
+
+ @Test
+ public void testRootWatcher()
+ throws IOException, InterruptedException, KeeperException {
+ try ( ZooKeeper zk = createClient(new CountdownWatcher(), hostPort) ) {
+ zk.addPersistentWatch("/", persistentWatcher, false);
+ zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.setData("/a", new byte[0], -1);
+ zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/");
+ assertEvent(events, Watcher.Event.EventType.NodeChildrenChanged, "/");
+ }
+ }
+
+ private void assertEvent(BlockingQueue events, Watcher.Event.EventType eventType, String path)
+ throws InterruptedException {
+ WatchedEvent event = events.poll(5, TimeUnit.SECONDS);
+ Assert.assertNotNull(event);
+ Assert.assertEquals(eventType, event.getType());
+ Assert.assertEquals(path, event.getPath());
+ }
+}
diff --git a/src/zookeeper.jute b/src/zookeeper.jute
index a404e787325..de745db31ac 100644
--- a/src/zookeeper.jute
+++ b/src/zookeeper.jute
@@ -73,6 +73,14 @@ module org.apache.zookeeper.proto {
vectorexistWatches;
vectorchildWatches;
}
+ class SetWatches2 {
+ long relativeZxid;
+ vectordataWatches;
+ vectorexistWatches;
+ vectorchildWatches;
+ vectorpersistentWatches;
+ vectorpersistentRecursiveWatches;
+ }
class RequestHeader {
int xid;
int type;
@@ -177,6 +185,10 @@ module org.apache.zookeeper.proto {
class SetACLResponse {
org.apache.zookeeper.data.Stat stat;
}
+ class AddPersistentWatcherRequest {
+ ustring path;
+ boolean recursive;
+ }
class WatcherEvent {
int type; // event type
int state; // state of the Keeper client runtime