diff --git a/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java b/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java index aee5b2f18ab..7de1be0370b 100644 --- a/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java +++ b/zookeeper-it/src/main/java/org/apache/zookeeper/server/watch/WatchBench.java @@ -191,7 +191,7 @@ void prepare() { @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS) public void testTriggerConcentrateWatch(InvocationState state) throws Exception { for (String path : state.paths) { - state.watchManager.triggerWatch(path, event); + state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID); } } @@ -225,7 +225,7 @@ public void tearDown() { // clear all the watches for (String path : paths) { - watchManager.triggerWatch(path, event); + watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID); } } } @@ -294,7 +294,7 @@ public void prepare() { @Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS) public void testTriggerSparseWatch(TriggerSparseWatchState state) throws Exception { for (String path : state.paths) { - state.watchManager.triggerWatch(path, event); + state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID); } } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java index c29a2141d58..727d97daa6a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java @@ -902,7 +902,7 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException { event.setPath(clientPath); } - WatchedEvent we = new WatchedEvent(event); + WatchedEvent we = new WatchedEvent(event, replyHdr.getZxid()); LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId)); eventThread.queueEvent(we); return; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java index 1de3d3ddf68..1303629c914 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/WatchedEvent.java @@ -31,27 +31,38 @@ */ @InterfaceAudience.Public public class WatchedEvent { + public static final long NO_ZXID = -1L; private final KeeperState keeperState; private final EventType eventType; - private String path; + private final String path; + private final long zxid; /** - * Create a WatchedEvent with specified type, state and path + * Create a WatchedEvent with specified type, state, path and zxid */ - public WatchedEvent(EventType eventType, KeeperState keeperState, String path) { + public WatchedEvent(EventType eventType, KeeperState keeperState, String path, long zxid) { this.keeperState = keeperState; this.eventType = eventType; this.path = path; + this.zxid = zxid; } /** - * Convert a WatcherEvent sent over the wire into a full-fledged WatcherEvent + * Create a WatchedEvent with specified type, state and path */ - public WatchedEvent(WatcherEvent eventMessage) { + public WatchedEvent(EventType eventType, KeeperState keeperState, String path) { + this(eventType, keeperState, path, NO_ZXID); + } + + /** + * Convert a WatcherEvent sent over the wire into a full-fledged WatchedEvent + */ + public WatchedEvent(WatcherEvent eventMessage, long zxid) { keeperState = KeeperState.fromInt(eventMessage.getState()); eventType = EventType.fromInt(eventMessage.getType()); path = eventMessage.getPath(); + this.zxid = zxid; } public KeeperState getState() { @@ -66,9 +77,24 @@ public String getPath() { return path; } + /** + * Returns the zxid of the transaction that triggered this watch if it is + * of one of the following types: + * Otherwise, returns {@value #NO_ZXID}. Note that {@value #NO_ZXID} is also + * returned by old servers that do not support this feature. + */ + public long getZxid() { + return zxid; + } + @Override public String toString() { - return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path; + return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path + " zxid: " + zxid; } /** @@ -77,5 +103,4 @@ public String toString() { public WatcherEvent getWrapper() { return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path); } - } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java index ab4b654880e..42e48b9efd2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/Watcher.java @@ -26,6 +26,11 @@ * server it connects to. An application using such a client handles these * events by registering a callback object with the client. The callback object * is expected to be an instance of a class that implements Watcher interface. + * When {@link #process} is triggered by a watch firing, such as + * {@link Event.EventType#NodeDataChanged}, {@link WatchedEvent#getZxid()} will + * return the zxid of the transaction that caused said watch to fire. If + * {@value WatchedEvent#NO_ZXID} is returned then the server must be updated to + * support this feature. * */ @InterfaceAudience.Public diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java index a6f60539089..01a6da9a447 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DataTree.java @@ -518,8 +518,8 @@ public void createNode(final String path, byte[] data, List acl, long ephem updateQuotaStat(lastPrefix, bytes, 1); } updateWriteStat(path, bytes); - dataWatches.triggerWatch(path, Event.EventType.NodeCreated); - childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged); + dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid); + childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid); } /** @@ -615,9 +615,9 @@ public void deleteNode(String path, long zxid) throws NoNodeException { "childWatches.triggerWatch " + parentName); } - WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted); - childWatches.triggerWatch(path, EventType.NodeDeleted, processed); - childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged); + WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid); + childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed); + childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid); } public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException { @@ -649,7 +649,7 @@ public Stat setData(String path, byte[] data, int version, long zxid, long time) nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastData)); updateWriteStat(path, dataBytes); - dataWatches.triggerWatch(path, EventType.NodeDataChanged); + dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid); return s; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java index c7bf830b4ef..231a063fe12 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java @@ -33,6 +33,9 @@ public class DumbWatcher extends ServerCnxn { private long sessionId; + private String mostRecentPath; + private Event.EventType mostRecentEventType; + private long mostRecentZxid = WatchedEvent.NO_ZXID; public DumbWatcher() { this(0); @@ -49,8 +52,24 @@ void setSessionTimeout(int sessionTimeout) { @Override public void process(WatchedEvent event) { + mostRecentEventType = event.getType(); + mostRecentZxid = event.getZxid(); + mostRecentPath = event.getPath(); } + public String getMostRecentPath() { + return mostRecentPath; + } + + public Event.EventType getMostRecentEventType() { + return mostRecentEventType; + } + + public long getMostRecentZxid() { + return mostRecentZxid; + } + + @Override int getSessionTimeout() { return 0; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java index 1a8575cd45e..61cbe71bad2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java @@ -705,7 +705,7 @@ public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, St */ @Override public void process(WatchedEvent event) { - ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0); + ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index f95200d560b..3ed77183434 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -162,7 +162,7 @@ public int getSessionTimeout() { @Override public void process(WatchedEvent event) { - ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0); + ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0); if (LOG.isTraceEnabled()) { ZooTrace.logTraceMessage( LOG, diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java index 1bc44c805a0..482bc7e87d4 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/IWatchManager.java @@ -82,10 +82,11 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) * * @param path znode path * @param type the watch event type + * @param zxid the zxid for the corresponding change that triggered this event * * @return the watchers have been notified */ - WatcherOrBitSet triggerWatch(String path, EventType type); + WatcherOrBitSet triggerWatch(String path, EventType type, long zxid); /** * Distribute the watch event for the given path, but ignore those @@ -93,11 +94,12 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode) * * @param path znode path * @param type the watch event type + * @param zxid the zxid for the corresponding change that triggered this event * @param suppress the suppressed watcher set * * @return the watchers have been notified */ - WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress); + WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress); /** * Get the size of watchers. diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java index c5b133059b2..2697808f6ca 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManager.java @@ -115,13 +115,13 @@ public synchronized void removeWatcher(Watcher watcher) { } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type) { - return triggerWatch(path, type, null); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) { + return triggerWatch(path, type, zxid, null); } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) { - WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) { + WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid); Set watchers = new HashSet<>(); PathParentIterator pathParentIterator = getPathParentIterator(path); synchronized (this) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java index 7f72175efdf..0a6b4279fdb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatchManagerOptimized.java @@ -202,13 +202,13 @@ public void processDeadWatchers(Set deadWatchers) { } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type) { - return triggerWatch(path, type, null); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) { + return triggerWatch(path, type, zxid, null); } @Override - public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) { - WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); + public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress) { + WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid); BitHashSet watchers = remove(path); if (watchers == null) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java index 8d9430e1cf6..b29deedb0bc 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java @@ -22,7 +22,12 @@ import static org.junit.jupiter.api.Assertions.fail; import java.io.File; import java.time.Instant; +import org.apache.zookeeper.metrics.MetricsUtils; import org.apache.zookeeper.util.ServiceUtils; +import org.hamcrest.CustomMatcher; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.StringDescription; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -40,6 +45,7 @@ public class ZKTestCase { protected static final File testBaseDir = new File(System.getProperty("build.test.dir", "build")); private static final Logger LOG = LoggerFactory.getLogger(ZKTestCase.class); + public static final int DEFAULT_METRIC_TIMEOUT = 30; static { // Disable System.exit in tests. @@ -103,7 +109,7 @@ public interface WaitForCondition { * @param timeout timeout in seconds * @throws InterruptedException */ - public void waitFor(String msg, WaitForCondition condition, int timeout) throws InterruptedException { + public static void waitFor(String msg, WaitForCondition condition, int timeout) throws InterruptedException { final Instant deadline = Instant.now().plusSeconds(timeout); while (Instant.now().isBefore(deadline)) { if (condition.evaluate()) { @@ -114,4 +120,36 @@ public void waitFor(String msg, WaitForCondition condition, int timeout) throws fail(msg); } -} + public static void waitForMetric(String metricKey, Matcher matcher) throws InterruptedException { + waitForMetric(metricKey, matcher, DEFAULT_METRIC_TIMEOUT); + } + + public static void waitForMetric(String metricKey, Matcher matcher, int timeoutInSeconds) throws InterruptedException { + String errorMessage = String.format("metric \"%s\" failed to match after %d seconds", + metricKey, timeoutInSeconds); + waitFor(errorMessage, () -> { + @SuppressWarnings("unchecked") + T actual = (T) MetricsUtils.currentServerMetrics().get(metricKey); + if (!matcher.matches(actual)) { + Description description = new StringDescription(); + matcher.describeMismatch(actual, description); + LOG.info("match failed for metric {}: {}", metricKey, description); + return false; + } + return true; + }, timeoutInSeconds); + } + + /** + * Functionally identical to {@link org.hamcrest.Matchers#closeTo} except that it accepts all numerical types + * instead of failing if the value is not a {@link Double}. + */ + public static Matcher closeTo(double operand, double error) { + return new CustomMatcher(String.format("A number within %s of %s", error, operand)) { + @Override + public boolean matches(Object actual) { + return Math.abs(operand - ((Number) actual).doubleValue()) <= error; + } + }; + } +} \ No newline at end of file diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java index 15259207599..088f80c4857 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/RequestThrottlerTest.java @@ -19,6 +19,8 @@ package org.apache.zookeeper.server; import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; @@ -75,10 +77,6 @@ public class RequestThrottlerTest extends ZKTestCase { ZooKeeper zk = null; int connectionLossCount = 0; - private long getCounterMetric(String name) { - return (long) MetricsUtils.currentServerMetrics().get(name); - } - @BeforeEach public void setup() throws Exception { // start a server and create a client @@ -222,11 +220,8 @@ public void testRequestThrottler() throws Exception { submitted.await(5, TimeUnit.SECONDS); // but only two requests can get into the pipeline because of the throttler - WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") == 2; - waitFor("request not queued", requestQueued, 5); - - WaitForCondition throttleWait = () -> getCounterMetric("request_throttle_wait_count") >= 1; - waitFor("no throttle wait", throttleWait, 5); + waitForMetric("prep_processor_request_queued", is(2L)); + waitForMetric("request_throttle_wait_count", greaterThanOrEqualTo(1L)); // let the requests go through the pipeline and the throttler will be waken up to allow more requests // to enter the pipeline @@ -387,8 +382,7 @@ public void testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled() // be GLOBAL_OUTSTANDING_LIMIT + 2. // // But due to leak of consistent view of number of outstanding requests, the number could be larger. - WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") >= Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2; - waitFor("no enough requests queued", requestQueued, 5); + waitForMetric("prep_processor_request_queued", greaterThanOrEqualTo(Long.parseLong(GLOBAL_OUTSTANDING_LIMIT) + 2)); resumeProcess.countDown(); } catch (Exception e) { diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogMetricsTest.java index 65648fefce4..1a569e4d47e 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/persistence/FileTxnSnapLogMetricsTest.java @@ -79,8 +79,7 @@ public void testFileTxnSnapLogMetrics() throws Exception { } // It is possible that above writes will trigger more than one snapshot due to randomization. - WaitForCondition newSnapshot = () -> (long) MetricsUtils.currentServerMetrics().get("cnt_snapshottime") >= 2L; - waitFor("no snapshot in 10s", newSnapshot, 10); + waitForMetric("cnt_snapshottime", greaterThanOrEqualTo(2L), 10); // Pauses snapshot and logs more txns. cnxnFactory.getZooKeeperServer().getTxnLogFactory().snapLog.close(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java index 5df14600a15..aa3ba3622ad 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java @@ -26,10 +26,8 @@ import org.apache.zookeeper.PortAssignment; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.metrics.MetricsUtils; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.test.ClientBase; -import org.hamcrest.Matcher; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -38,7 +36,6 @@ public class LearnerMetricsTest extends QuorumPeerTestBase { - private static final int TIMEOUT_SECONDS = 30; private static final int SERVER_COUNT = 4; // 1 observer, 3 participants private final QuorumPeerTestBase.MainThread[] mt = new QuorumPeerTestBase.MainThread[SERVER_COUNT]; private ZooKeeper zk_client; @@ -113,18 +110,6 @@ public void testLearnerMetricsTest(boolean asyncSending) throws Exception { waitForMetric("min_commit_propagation_latency", greaterThanOrEqualTo(0L)); } - private void waitForMetric(final String metricKey, final Matcher matcher) throws InterruptedException { - final String errorMessage = String.format("unable to match on metric: %s", metricKey); - waitFor(errorMessage, () -> { - long actual = (long) MetricsUtils.currentServerMetrics().get(metricKey); - if (!matcher.matches(actual)) { - LOG.info("match failed on {}, actual value: {}", metricKey, actual); - return false; - } - return true; - }, TIMEOUT_SECONDS); - } - @AfterEach public void tearDown() throws Exception { zk_client.close(); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java index f65391c497c..6474e0a06ae 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatchManagerTest.java @@ -131,7 +131,7 @@ public WatcherTriggerWorker( public void run() { while (!stopped) { String path = PATH_PREFIX + r.nextInt(paths); - WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted); + WatcherOrBitSet s = manager.triggerWatch(path, EventType.NodeDeleted, -1); if (s != null) { triggeredCount.addAndGet(s.size()); } @@ -416,6 +416,12 @@ private void checkMetrics(String metricName, long min, long max, double avg, lon assertEquals(sum, values.get("sum_" + metricName)); } + private void checkMostRecentWatchedEvent(DumbWatcher watcher, String path, EventType eventType, long zxid) { + assertEquals(path, watcher.getMostRecentPath()); + assertEquals(eventType, watcher.getMostRecentEventType()); + assertEquals(zxid, watcher.getMostRecentZxid()); + } + @ParameterizedTest @MethodSource("data") public void testWatcherMetrics(String className) throws IOException { @@ -430,41 +436,54 @@ public void testWatcherMetrics(String className) throws IOException { final String path3 = "/path3"; - //both wather1 and wather2 are watching path1 + //both watcher1 and watcher2 are watching path1 manager.addWatch(path1, watcher1); manager.addWatch(path1, watcher2); //path2 is watched by watcher1 manager.addWatch(path2, watcher1); - manager.triggerWatch(path3, EventType.NodeCreated); + manager.triggerWatch(path3, EventType.NodeCreated, 1); //path3 is not being watched so metric is 0 checkMetrics("node_created_watch_count", 0L, 0L, 0D, 0L, 0L); + // Watchers shouldn't have received any events yet so the zxid should be -1. + checkMostRecentWatchedEvent(watcher1, null, null, -1); + checkMostRecentWatchedEvent(watcher2, null, null, -1); //path1 is watched by two watchers so two fired - manager.triggerWatch(path1, EventType.NodeCreated); + manager.triggerWatch(path1, EventType.NodeCreated, 2); checkMetrics("node_created_watch_count", 2L, 2L, 2D, 1L, 2L); + checkMostRecentWatchedEvent(watcher1, path1, EventType.NodeCreated, 2); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2); //path2 is watched by one watcher so one fired now total is 3 - manager.triggerWatch(path2, EventType.NodeCreated); + manager.triggerWatch(path2, EventType.NodeCreated, 3); checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L); + checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeCreated, 3); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2); //watches on path1 are no longer there so zero fired - manager.triggerWatch(path1, EventType.NodeDataChanged); + manager.triggerWatch(path1, EventType.NodeDataChanged, 4); checkMetrics("node_changed_watch_count", 0L, 0L, 0D, 0L, 0L); + checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeCreated, 3); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeCreated, 2); - //both wather1 and wather2 are watching path1 + //both watcher and watcher are watching path1 manager.addWatch(path1, watcher1); manager.addWatch(path1, watcher2); //path2 is watched by watcher1 manager.addWatch(path2, watcher1); - manager.triggerWatch(path1, EventType.NodeDataChanged); + manager.triggerWatch(path1, EventType.NodeDataChanged, 5); checkMetrics("node_changed_watch_count", 2L, 2L, 2D, 1L, 2L); + checkMostRecentWatchedEvent(watcher1, path1, EventType.NodeDataChanged, 5); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeDataChanged, 5); - manager.triggerWatch(path2, EventType.NodeDeleted); + manager.triggerWatch(path2, EventType.NodeDeleted, 6); checkMetrics("node_deleted_watch_count", 1L, 1L, 1D, 1L, 1L); + checkMostRecentWatchedEvent(watcher1, path2, EventType.NodeDeleted, 6); + checkMostRecentWatchedEvent(watcher2, path1, EventType.NodeDataChanged, 5); //make sure that node created watch count is not impacted by the fire of other event types checkMetrics("node_created_watch_count", 1L, 2L, 1.5D, 2L, 3L); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java index 17e44eb9b01..3320f1a247c 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/watch/WatcherCleanerTest.java @@ -17,8 +17,8 @@ package org.apache.zookeeper.server.watch; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.number.OrderingComparison.greaterThan; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -140,7 +140,7 @@ public void testMaxInProcessingDeadWatchers() { } @Test - public void testDeadWatcherMetrics() { + public void testDeadWatcherMetrics() throws InterruptedException { ServerMetrics.getMetrics().resetAll(); MyDeadWatcherListener listener = new MyDeadWatcherListener(); WatcherCleaner cleaner = new WatcherCleaner(listener, 1, 1, 1, 1); @@ -156,19 +156,19 @@ public void testDeadWatcherMetrics() { assertTrue(listener.wait(5000)); Map values = MetricsUtils.currentServerMetrics(); - assertThat("Adding dead watcher should be stalled twice", (Long) values.get("add_dead_watcher_stall_time"), greaterThan(0L)); - assertEquals(3L, values.get("dead_watchers_queued"), "Total dead watchers added to the queue should be 3"); - assertEquals(3L, values.get("dead_watchers_cleared"), "Total dead watchers cleared should be 3"); - - assertEquals(3L, values.get("cnt_dead_watchers_cleaner_latency")); - - //Each latency should be a little over 20 ms, allow 20 ms deviation - assertEquals(20D, (Double) values.get("avg_dead_watchers_cleaner_latency"), 20); - assertEquals(20D, ((Long) values.get("min_dead_watchers_cleaner_latency")).doubleValue(), 20); - assertEquals(20D, ((Long) values.get("max_dead_watchers_cleaner_latency")).doubleValue(), 20); - assertEquals(20D, ((Long) values.get("p50_dead_watchers_cleaner_latency")).doubleValue(), 20); - assertEquals(20D, ((Long) values.get("p95_dead_watchers_cleaner_latency")).doubleValue(), 20); - assertEquals(20D, ((Long) values.get("p99_dead_watchers_cleaner_latency")).doubleValue(), 20); + // Adding dead watcher should be stalled twice + waitForMetric("add_dead_watcher_stall_time", greaterThan(0L)); + waitForMetric("dead_watchers_queued", is(3L)); + waitForMetric("dead_watchers_cleared", is(3L)); + waitForMetric("cnt_dead_watchers_cleaner_latency", is(3L)); + + //Each latency should be a little over 20 ms, allow 5 ms deviation + waitForMetric("avg_dead_watchers_cleaner_latency", closeTo(20, 5)); + waitForMetric("min_dead_watchers_cleaner_latency", closeTo(20, 5)); + waitForMetric("max_dead_watchers_cleaner_latency", closeTo(20, 5)); + waitForMetric("p50_dead_watchers_cleaner_latency", closeTo(20, 5)); + waitForMetric("p95_dead_watchers_cleaner_latency", closeTo(20, 5)); + waitForMetric("p99_dead_watchers_cleaner_latency", closeTo(20, 5)); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java index e74ee2fd683..24957296f6a 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/PersistentRecursiveWatcherTest.java @@ -19,7 +19,6 @@ package org.apache.zookeeper.test; import static org.apache.zookeeper.AddWatchMode.PERSISTENT_RECURSIVE; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; @@ -32,8 +31,11 @@ import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.Watcher.Event.EventType; +import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.slf4j.Logger; @@ -80,21 +82,28 @@ public void testBasicAsync() private void internalTestBasic(ZooKeeper zk) throws KeeperException, InterruptedException { zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.setData("/a/b/c/d/e", new byte[0], -1); + + Stat stat = new Stat(); + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat); + + zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c", stat); + + zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d", stat); + + zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat); + + stat = zk.setData("/a/b/c/d/e", new byte[0], -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c/d/e", stat); + zk.delete("/a/b/c/d/e", -1); - zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeDeleted, "/a/b/c/d/e"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c/d/e"); + assertEvent(events, EventType.NodeDeleted, "/a/b/c/d/e", zk.exists("/a/b/c/d", false).getPzxid()); + + zk.create("/a/b/c/d/e", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c/d/e", stat); } @Test @@ -103,14 +112,15 @@ public void testRemoval() try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c"); + Stat stat = new Stat(); + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat); + zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b/c", stat); zk.removeWatches("/a/b", persistentWatcher, Watcher.WatcherType.Any, false); zk.create("/a/b/c/d", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.PersistentWatchRemoved, "/a/b"); + assertEvent(events, EventType.PersistentWatchRemoved, "/a/b", WatchedEvent.NO_ZXID); } } @@ -124,13 +134,15 @@ public void testNoChildEvents() throws Exception { BlockingQueue childEvents = new LinkedBlockingQueue<>(); zk.getChildren("/a", childEvents::add); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + Stat createABStat = new Stat(); + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createABStat); + Stat createABCStat = new Stat(); + zk.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, createABCStat); - assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a"); + assertEvent(childEvents, Watcher.Event.EventType.NodeChildrenChanged, "/a", createABStat.getPzxid()); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c"); + assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b", createABStat); + assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b/c", createABCStat); assertTrue(events.isEmpty()); } } @@ -140,9 +152,9 @@ public void testDisconnect() throws Exception { try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); stopServer(); - assertEvent(events, Watcher.Event.EventType.None, null); + assertEvent(events, EventType.None, KeeperState.Disconnected, null, WatchedEvent.NO_ZXID); startServer(); - assertEvent(events, Watcher.Event.EventType.None, null); + assertEvent(events, EventType.None, KeeperState.SyncConnected, null, WatchedEvent.NO_ZXID); internalTestBasic(zk); } } @@ -157,17 +169,15 @@ public void testMultiClient() zk1.create("/a/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); zk1.addWatch("/a/b", persistentWatcher, PERSISTENT_RECURSIVE); - zk1.setData("/a/b/c", "one".getBytes(), -1); - Thread.sleep(1000); // give some time for the event to arrive - - zk2.setData("/a/b/c", "two".getBytes(), -1); - zk2.setData("/a/b/c", "three".getBytes(), -1); - zk2.setData("/a/b/c", "four".getBytes(), -1); - - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); - assertEvent(events, Watcher.Event.EventType.NodeDataChanged, "/a/b/c"); + Stat stat = zk1.setData("/a/b/c", "one".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + + stat = zk2.setData("/a/b/c", "two".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + stat = zk2.setData("/a/b/c", "three".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); + stat = zk2.setData("/a/b/c", "four".getBytes(), -1); + assertEvent(events, EventType.NodeDataChanged, "/a/b/c", stat.getMzxid()); } } @@ -176,22 +186,42 @@ public void testRootWatcher() throws IOException, InterruptedException, KeeperException { try (ZooKeeper zk = createClient(new CountdownWatcher(), hostPort)) { zk.addWatch("/", persistentWatcher, PERSISTENT_RECURSIVE); - zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/a/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b"); - assertEvent(events, Watcher.Event.EventType.NodeCreated, "/b/c"); + Stat stat = new Stat(); + + zk.create("/a", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a", stat.getMzxid()); + + zk.create("/a/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/a/b", stat.getMzxid()); + + zk.create("/b", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/b", stat.getMzxid()); + + zk.create("/b/c", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + assertEvent(events, EventType.NodeCreated, "/b/c", stat.getMzxid()); } } - private void assertEvent(BlockingQueue events, Watcher.Event.EventType eventType, String path) - throws InterruptedException { - WatchedEvent event = events.poll(5, TimeUnit.SECONDS); - assertNotNull(event); - assertEquals(eventType, event.getType()); - assertEquals(path, event.getPath()); + private void assertEvent(BlockingQueue events, EventType eventType, String path, Stat stat) + throws InterruptedException { + assertEvent(events, eventType, path, stat.getMzxid()); + } + + private void assertEvent(BlockingQueue events, EventType eventType, String path, long zxid) + throws InterruptedException { + assertEvent(events, eventType, KeeperState.SyncConnected, path, zxid); + } + + private void assertEvent(BlockingQueue events, EventType eventType, KeeperState keeperState, + String path, long zxid) throws InterruptedException { + WatchedEvent actualEvent = events.poll(5, TimeUnit.SECONDS); + assertNotNull(actualEvent); + WatchedEvent expectedEvent = new WatchedEvent( + eventType, + keeperState, + path, + zxid + ); + TestUtils.assertWatchedEventEquals(expectedEvent, actualEvent); } } \ No newline at end of file diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java index 00c6c070a04..e3306c1fe76 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TestUtils.java @@ -18,8 +18,10 @@ package org.apache.zookeeper.test; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.fail; import java.io.File; +import org.apache.zookeeper.WatchedEvent; /** * This class contains test utility methods @@ -57,4 +59,16 @@ public static boolean deleteFileRecursively(File file) { return deleteFileRecursively(file, false); } + /** + * Asserts that the given {@link WatchedEvent} are semantically equal, i.e. they have the same EventType, path and + * zxid. + */ + public static void assertWatchedEventEquals(WatchedEvent expected, WatchedEvent actual) { + // TODO: .hashCode and .equals cannot be added to WatchedEvent without potentially breaking consumers. This + // can be changed to `assertEquals(expected, actual)` once WatchedEvent has those methods. Until then, + // compare the lists manually. + assertEquals(expected.getType(), actual.getType()); + assertEquals(expected.getPath(), actual.getPath()); + assertEquals(expected.getZxid(), actual.getZxid()); + } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java index a3d6eef7bdc..cce26276cdd 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/UnsupportedAddWatcherTest.java @@ -59,12 +59,12 @@ public void removeWatcher(Watcher watcher) { } @Override - public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type) { + public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, long zxid) { return new WatcherOrBitSet(Collections.emptySet()); } @Override - public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, WatcherOrBitSet suppress) { + public WatcherOrBitSet triggerWatch(String path, Watcher.Event.EventType type, long zxid, WatcherOrBitSet suppress) { return new WatcherOrBitSet(Collections.emptySet()); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java index f4a0298f233..a9bc11da2a6 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatchedEventTest.java @@ -61,7 +61,7 @@ public void testCreatingWatchedEventFromWrapper() { for (EventType et : allTypes) { for (KeeperState ks : allStates) { wep = new WatcherEvent(et.getIntValue(), ks.getIntValue(), "blah"); - we = new WatchedEvent(wep); + we = new WatchedEvent(wep, WatchedEvent.NO_ZXID); assertEquals(et, we.getType()); assertEquals(ks, we.getState()); assertEquals("blah", we.getPath()); @@ -75,7 +75,7 @@ public void testCreatingWatchedEventFromInvalidWrapper() { try { WatcherEvent wep = new WatcherEvent(-2342, -252352, "foo"); - new WatchedEvent(wep); + new WatchedEvent(wep, WatchedEvent.NO_ZXID); fail("Was able to create WatchedEvent from bad wrapper"); } catch (RuntimeException re) { // we're good diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java index 180cd08a611..44440a7c3bb 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/WatcherFuncTest.java @@ -37,6 +37,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -68,14 +69,17 @@ public void process(WatchedEvent event) { assertTrue(false, "interruption unexpected"); } } - public void verify(List expected) throws InterruptedException { + + public void verify(List expected) throws InterruptedException { + List actual = new ArrayList<>(); WatchedEvent event; - int count = 0; - while (count < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) { - assertEquals(expected.get(count), event.getType()); - count++; + while (actual.size() < expected.size() && (event = events.poll(30, TimeUnit.SECONDS)) != null) { + actual.add(event); + } + assertEquals(expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i++) { + TestUtils.assertWatchedEventEquals(expected.get(i), actual.get(i)); } - assertEquals(expected.size(), count); events.clear(); } @@ -88,7 +92,7 @@ public void verify(List expected) throws InterruptedException { private volatile CountDownLatch lsnr_latch; private ZooKeeper lsnr; - private List expected; + private List expected; @BeforeEach @Override @@ -127,15 +131,34 @@ private void verify() throws InterruptedException { expected.clear(); } + private void addEvent(List events, EventType eventType, String path, Stat stat) { + addEvent(events, eventType, path, stat.getMzxid()); + } + + private void addEvent(List events, EventType eventType, String path, long zxid) { + events.add(new WatchedEvent(eventType, KeeperState.SyncConnected, path, zxid)); + } + + private long delete(String path) throws InterruptedException, KeeperException { + client.delete(path, -1); + int lastSlash = path.lastIndexOf('/'); + String parent = (lastSlash == 0) + ? "/" + : path.substring(0, lastSlash); + // the deletion's zxid will be reflected in the parent's Pzxid + return client.exists(parent, false).getPzxid(); + } + @Test public void testExistsSync() throws IOException, InterruptedException, KeeperException { assertNull(lsnr.exists("/foo", true)); assertNull(lsnr.exists("/foo/bar", true)); - client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); + Stat stat = new Stat(); + client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo", stat); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo/bar", stat); verify(); @@ -160,20 +183,20 @@ public void testExistsSync() throws IOException, InterruptedException, KeeperExc assertEquals("/foo/car", e.getPath()); } - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - expected.add(EventType.NodeDataChanged); + stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat); verify(); assertNotNull(lsnr.exists("/foo", true)); assertNotNull(lsnr.exists("/foo/bar", true)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -200,20 +223,20 @@ public void testGetDataSync() throws IOException, InterruptedException, KeeperEx client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); assertNotNull(lsnr.getData("/foo/bar", true, null)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - expected.add(EventType.NodeDataChanged); + Stat stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo/bar", stat); verify(); assertNotNull(lsnr.getData("/foo", true, null)); assertNotNull(lsnr.getData("/foo/bar", true, null)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -238,8 +261,9 @@ public void testGetChildrenSync() throws IOException, InterruptedException, Keep client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); assertNotNull(lsnr.getChildren("/foo", true)); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeChildrenChanged); // /foo + Stat stat = new Stat(); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo assertNotNull(lsnr.getChildren("/foo/bar", true)); client.setData("/foo", "parent".getBytes(), -1); @@ -250,11 +274,11 @@ public void testGetChildrenSync() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getChildren("/foo", true)); assertNotNull(lsnr.getChildren("/foo/bar", true)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); // /foo/bar childwatch - expected.add(EventType.NodeChildrenChanged); // /foo - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(expected, EventType.NodeDeleted, "/foo/bar", deleteZxid); // /foo/bar childwatch + addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); verify(); } @@ -266,7 +290,7 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList<>(); + List e2 = new ArrayList<>(); assertNull(lsnr.exists("/foo", true)); assertNull(lsnr.exists("/foo", w1)); @@ -276,10 +300,11 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNull(lsnr.exists("/foo/bar", w3)); assertNull(lsnr.exists("/foo/bar", w4)); - client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeCreated); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - e2.add(EventType.NodeCreated); + Stat stat = new Stat(); + client.create("/foo", "parent".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeCreated, "/foo", stat); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(e2, EventType.NodeCreated, "/foo/bar", stat); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -297,10 +322,10 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNotNull(lsnr.exists("/foo/bar", w4)); assertNotNull(lsnr.exists("/foo/bar", w4)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - e2.add(EventType.NodeDataChanged); + stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat); lsnr_dwatch.verify(new ArrayList<>()); // not reg so should = 0 w1.verify(expected); @@ -319,10 +344,10 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe assertNotNull(lsnr.exists("/foo/bar", w3)); assertNotNull(lsnr.exists("/foo/bar", w4)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - e2.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -331,7 +356,6 @@ public void testExistsSyncWObj() throws IOException, InterruptedException, Keepe w4.verify(e2); expected.clear(); e2.clear(); - } @Test @@ -341,7 +365,7 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList<>(); + List e2 = new ArrayList<>(); try { lsnr.getData("/foo", w1, null); @@ -367,10 +391,10 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getData("/foo/bar", w4, null)); assertNotNull(lsnr.getData("/foo/bar", w4, null)); - client.setData("/foo", "parent".getBytes(), -1); - expected.add(EventType.NodeDataChanged); - client.setData("/foo/bar", "child".getBytes(), -1); - e2.add(EventType.NodeDataChanged); + Stat stat = client.setData("/foo", "parent".getBytes(), -1); + addEvent(expected, EventType.NodeDataChanged, "/foo", stat); + stat = client.setData("/foo/bar", "child".getBytes(), -1); + addEvent(e2, EventType.NodeDataChanged, "/foo/bar", stat); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -387,10 +411,10 @@ public void testGetDataSyncWObj() throws IOException, InterruptedException, Keep assertNotNull(lsnr.getData("/foo/bar", w3, null)); assertNotNull(lsnr.getData("/foo/bar", w4, null)); - client.delete("/foo/bar", -1); - expected.add(EventType.NodeDeleted); - client.delete("/foo", -1); - e2.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected); @@ -408,7 +432,7 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, SimpleWatcher w3 = new SimpleWatcher(null); SimpleWatcher w4 = new SimpleWatcher(null); - List e2 = new ArrayList<>(); + List e2 = new ArrayList<>(); try { lsnr.getChildren("/foo", true); @@ -429,8 +453,9 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, assertNotNull(lsnr.getChildren("/foo", true)); assertNotNull(lsnr.getChildren("/foo", w1)); - client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - expected.add(EventType.NodeChildrenChanged); // /foo + Stat stat = new Stat(); + client.create("/foo/bar", "child".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", stat); // /foo assertNotNull(lsnr.getChildren("/foo/bar", w2)); assertNotNull(lsnr.getChildren("/foo/bar", w2)); assertNotNull(lsnr.getChildren("/foo/bar", w3)); @@ -451,11 +476,11 @@ public void testGetChildrenSyncWObj() throws IOException, InterruptedException, assertNotNull(lsnr.getChildren("/foo/bar", w4)); assertNotNull(lsnr.getChildren("/foo/bar", w4)); - client.delete("/foo/bar", -1); - e2.add(EventType.NodeDeleted); // /foo/bar childwatch - expected.add(EventType.NodeChildrenChanged); // /foo - client.delete("/foo", -1); - expected.add(EventType.NodeDeleted); + long deleteZxid = delete("/foo/bar"); + addEvent(e2, EventType.NodeDeleted, "/foo/bar", deleteZxid); + addEvent(expected, EventType.NodeChildrenChanged, "/foo", deleteZxid); // /foo + deleteZxid = delete("/foo"); + addEvent(expected, EventType.NodeDeleted, "/foo", deleteZxid); lsnr_dwatch.verify(expected); w1.verify(expected);