Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ void prepare() {
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
public void testTriggerConcentrateWatch(InvocationState state) throws Exception {
for (String path : state.paths) {
state.watchManager.triggerWatch(path, event);
state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
}
}

Expand Down Expand Up @@ -225,7 +225,7 @@ public void tearDown() {

// clear all the watches
for (String path : paths) {
watchManager.triggerWatch(path, event);
watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
}
}
}
Expand Down Expand Up @@ -294,7 +294,7 @@ public void prepare() {
@Measurement(iterations = 3, time = 10, timeUnit = TimeUnit.SECONDS)
public void testTriggerSparseWatch(TriggerSparseWatchState state) throws Exception {
for (String path : state.paths) {
state.watchManager.triggerWatch(path, event);
state.watchManager.triggerWatch(path, event, WatchedEvent.NO_ZXID);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ void readResponse(ByteBuffer incomingBuffer) throws IOException {
event.setPath(clientPath);
}

WatchedEvent we = new WatchedEvent(event);
WatchedEvent we = new WatchedEvent(event, replyHdr.getZxid());
LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
eventThread.queueEvent(we);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,38 @@
*/
@InterfaceAudience.Public
public class WatchedEvent {
public static final long NO_ZXID = -1L;

private final KeeperState keeperState;
private final EventType eventType;
private String path;
private final String path;
private final long zxid;

/**
* Create a WatchedEvent with specified type, state and path
* Create a WatchedEvent with specified type, state, path and zxid
*/
public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
public WatchedEvent(EventType eventType, KeeperState keeperState, String path, long zxid) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is public API, we should keep the original constructors, maybe you can mark them "@deprecated"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is preserved below, it's just not marked as @deprecated because there are many uses of WatchedEvent that correctly do not have a zxid.

this.keeperState = keeperState;
this.eventType = eventType;
this.path = path;
this.zxid = zxid;
}

/**
* Convert a WatcherEvent sent over the wire into a full-fledged WatcherEvent
* Create a WatchedEvent with specified type, state and path
*/
public WatchedEvent(WatcherEvent eventMessage) {
public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
this(eventType, keeperState, path, NO_ZXID);
}

/**
* Convert a WatcherEvent sent over the wire into a full-fledged WatchedEvent
*/
public WatchedEvent(WatcherEvent eventMessage, long zxid) {
keeperState = KeeperState.fromInt(eventMessage.getState());
eventType = EventType.fromInt(eventMessage.getType());
path = eventMessage.getPath();
this.zxid = zxid;
}

public KeeperState getState() {
Expand All @@ -66,9 +77,24 @@ public String getPath() {
return path;
}

/**
* Returns the zxid of the transaction that triggered this watch if it is
* of one of the following types:<ul>
* <li>{@link EventType#NodeCreated}</li>
* <li>{@link EventType#NodeDeleted}</li>
* <li>{@link EventType#NodeDataChanged}</li>
* <li>{@link EventType#NodeChildrenChanged}</li>
* </ul>
* Otherwise, returns {@value #NO_ZXID}. Note that {@value #NO_ZXID} is also
* returned by old servers that do not support this feature.
*/
public long getZxid() {
return zxid;
}

@Override
public String toString() {
return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path;
return "WatchedEvent state:" + keeperState + " type:" + eventType + " path:" + path + " zxid: " + zxid;
}

/**
Expand All @@ -77,5 +103,4 @@ public String toString() {
public WatcherEvent getWrapper() {
return new WatcherEvent(eventType.getIntValue(), keeperState.getIntValue(), path);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
* server it connects to. An application using such a client handles these
* events by registering a callback object with the client. The callback object
* is expected to be an instance of a class that implements Watcher interface.
* When {@link #process} is triggered by a watch firing, such as
* {@link Event.EventType#NodeDataChanged}, {@link WatchedEvent#getZxid()} will
* return the zxid of the transaction that caused said watch to fire. If
* {@value WatchedEvent#NO_ZXID} is returned then the server must be updated to
* support this feature.
*
*/
@InterfaceAudience.Public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,8 +518,8 @@ public void createNode(final String path, byte[] data, List<ACL> acl, long ephem
updateQuotaStat(lastPrefix, bytes, 1);
}
updateWriteStat(path, bytes);
dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged);
dataWatches.triggerWatch(path, Event.EventType.NodeCreated, zxid);
childWatches.triggerWatch(parentName.equals("") ? "/" : parentName, Event.EventType.NodeChildrenChanged, zxid);
}

/**
Expand Down Expand Up @@ -615,9 +615,9 @@ public void deleteNode(String path, long zxid) throws NoNodeException {
"childWatches.triggerWatch " + parentName);
}

WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted);
childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged);
WatcherOrBitSet processed = dataWatches.triggerWatch(path, EventType.NodeDeleted, zxid);
childWatches.triggerWatch(path, EventType.NodeDeleted, zxid, processed);
childWatches.triggerWatch("".equals(parentName) ? "/" : parentName, EventType.NodeChildrenChanged, zxid);
}

public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
Expand Down Expand Up @@ -649,7 +649,7 @@ public Stat setData(String path, byte[] data, int version, long zxid, long time)
nodeDataSize.addAndGet(getNodeSize(path, data) - getNodeSize(path, lastData));

updateWriteStat(path, dataBytes);
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
dataWatches.triggerWatch(path, EventType.NodeDataChanged, zxid);
return s;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
public class DumbWatcher extends ServerCnxn {

private long sessionId;
private String mostRecentPath;
private Event.EventType mostRecentEventType;
private long mostRecentZxid = WatchedEvent.NO_ZXID;

public DumbWatcher() {
this(0);
Expand All @@ -49,8 +52,24 @@ void setSessionTimeout(int sessionTimeout) {

@Override
public void process(WatchedEvent event) {
mostRecentEventType = event.getType();
mostRecentZxid = event.getZxid();
mostRecentPath = event.getPath();
}

public String getMostRecentPath() {
return mostRecentPath;
}

public Event.EventType getMostRecentEventType() {
return mostRecentEventType;
}

public long getMostRecentZxid() {
return mostRecentZxid;
}


@Override
int getSessionTimeout() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -705,7 +705,7 @@ public int sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, St
*/
@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public int getSessionTimeout() {

@Override
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, event.getZxid(), 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(
LOG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,24 @@ default boolean addWatch(String path, Watcher watcher, WatcherMode watcherMode)
*
* @param path znode path
* @param type the watch event type
* @param zxid the zxid for the corresponding change that triggered this event
*
* @return the watchers have been notified
*/
WatcherOrBitSet triggerWatch(String path, EventType type);
WatcherOrBitSet triggerWatch(String path, EventType type, long zxid);

/**
* Distribute the watch event for the given path, but ignore those
* suppressed ones.
*
* @param path znode path
* @param type the watch event type
* @param zxid the zxid for the corresponding change that triggered this event
* @param suppress the suppressed watcher set
*
* @return the watchers have been notified
*/
WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress);
WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress);

/**
* Get the size of watchers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,13 @@ public synchronized void removeWatcher(Watcher watcher) {
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) {
return triggerWatch(path, type, zxid, null);
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);
Set<Watcher> watchers = new HashSet<>();
PathParentIterator pathParentIterator = getPathParentIterator(path);
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,13 @@ public void processDeadWatchers(Set<Integer> deadWatchers) {
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid) {
return triggerWatch(path, type, zxid, null);
}

@Override
public WatcherOrBitSet triggerWatch(String path, EventType type, WatcherOrBitSet suppress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
public WatcherOrBitSet triggerWatch(String path, EventType type, long zxid, WatcherOrBitSet suppress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path, zxid);

BitHashSet watchers = remove(path);
if (watchers == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
import java.time.Instant;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.util.ServiceUtils;
import org.hamcrest.CustomMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.StringDescription;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -40,6 +45,7 @@ public class ZKTestCase {

protected static final File testBaseDir = new File(System.getProperty("build.test.dir", "build"));
private static final Logger LOG = LoggerFactory.getLogger(ZKTestCase.class);
public static final int DEFAULT_METRIC_TIMEOUT = 30;

static {
// Disable System.exit in tests.
Expand Down Expand Up @@ -103,7 +109,7 @@ public interface WaitForCondition {
* @param timeout timeout in seconds
* @throws InterruptedException
*/
public void waitFor(String msg, WaitForCondition condition, int timeout) throws InterruptedException {
public static void waitFor(String msg, WaitForCondition condition, int timeout) throws InterruptedException {
final Instant deadline = Instant.now().plusSeconds(timeout);
while (Instant.now().isBefore(deadline)) {
if (condition.evaluate()) {
Expand All @@ -114,4 +120,36 @@ public void waitFor(String msg, WaitForCondition condition, int timeout) throws
fail(msg);
}

}
public static <T> void waitForMetric(String metricKey, Matcher<T> matcher) throws InterruptedException {
waitForMetric(metricKey, matcher, DEFAULT_METRIC_TIMEOUT);
}

public static <T> void waitForMetric(String metricKey, Matcher<T> matcher, int timeoutInSeconds) throws InterruptedException {
String errorMessage = String.format("metric \"%s\" failed to match after %d seconds",
metricKey, timeoutInSeconds);
waitFor(errorMessage, () -> {
@SuppressWarnings("unchecked")
T actual = (T) MetricsUtils.currentServerMetrics().get(metricKey);
if (!matcher.matches(actual)) {
Description description = new StringDescription();
matcher.describeMismatch(actual, description);
LOG.info("match failed for metric {}: {}", metricKey, description);
return false;
}
return true;
}, timeoutInSeconds);
}

/**
* Functionally identical to {@link org.hamcrest.Matchers#closeTo} except that it accepts all numerical types
* instead of failing if the value is not a {@link Double}.
*/
public static Matcher<Number> closeTo(double operand, double error) {
return new CustomMatcher<Number>(String.format("A number within %s of %s", error, operand)) {
@Override
public boolean matches(Object actual) {
return Math.abs(operand - ((Number) actual).doubleValue()) <= error;
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.zookeeper.server;

import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.File;
Expand Down Expand Up @@ -75,10 +77,6 @@ public class RequestThrottlerTest extends ZKTestCase {
ZooKeeper zk = null;
int connectionLossCount = 0;

private long getCounterMetric(String name) {
return (long) MetricsUtils.currentServerMetrics().get(name);
}

@BeforeEach
public void setup() throws Exception {
// start a server and create a client
Expand Down Expand Up @@ -222,11 +220,8 @@ public void testRequestThrottler() throws Exception {
submitted.await(5, TimeUnit.SECONDS);

// but only two requests can get into the pipeline because of the throttler
WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") == 2;
waitFor("request not queued", requestQueued, 5);

WaitForCondition throttleWait = () -> getCounterMetric("request_throttle_wait_count") >= 1;
waitFor("no throttle wait", throttleWait, 5);
waitForMetric("prep_processor_request_queued", is(2L));
waitForMetric("request_throttle_wait_count", greaterThanOrEqualTo(1L));

// let the requests go through the pipeline and the throttler will be waken up to allow more requests
// to enter the pipeline
Expand Down Expand Up @@ -387,8 +382,7 @@ public void testGlobalOutstandingRequestThrottlingWithRequestThrottlerDisabled()
// be GLOBAL_OUTSTANDING_LIMIT + 2.
//
// But due to leak of consistent view of number of outstanding requests, the number could be larger.
WaitForCondition requestQueued = () -> getCounterMetric("prep_processor_request_queued") >= Integer.parseInt(GLOBAL_OUTSTANDING_LIMIT) + 2;
waitFor("no enough requests queued", requestQueued, 5);
waitForMetric("prep_processor_request_queued", greaterThanOrEqualTo(Long.parseLong(GLOBAL_OUTSTANDING_LIMIT) + 2));

resumeProcess.countDown();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,7 @@ public void testFileTxnSnapLogMetrics() throws Exception {
}

// It is possible that above writes will trigger more than one snapshot due to randomization.
WaitForCondition newSnapshot = () -> (long) MetricsUtils.currentServerMetrics().get("cnt_snapshottime") >= 2L;
waitFor("no snapshot in 10s", newSnapshot, 10);
waitForMetric("cnt_snapshottime", greaterThanOrEqualTo(2L), 10);

// Pauses snapshot and logs more txns.
cnxnFactory.getZooKeeperServer().getTxnLogFactory().snapLog.close();
Expand Down
Loading