Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,8 @@ public void close() throws IOException {
}
}

private int xid = 1;
// @VisibleForTesting
protected int xid = 1;

// @VisibleForTesting
volatile States state = States.NOT_CONNECTED;
Expand All @@ -1393,6 +1394,12 @@ public void close() throws IOException {
* the server. Thus, getXid() must be public.
*/
synchronized public int getXid() {
// Avoid negative cxid values. In particular, cxid values of -4, -2, and -1 are special and
// must not be used for requests -- see SendThread.readResponse.
// Skip from MAX to 1.
if (xid == Integer.MAX_VALUE) {
xid = 1;
}
return xid++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,12 +448,20 @@ public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
connectString);
HostProvider hostProvider = new StaticHostProvider(
connectStringParser.getServerAddresses());
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
cnxn = createConnection(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}

// @VisibleForTesting
protected ClientCnxn createConnection(String chrootPath,
HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly) throws IOException {
return new ClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
watchManager, clientCnxnSocket, canBeReadOnly);
}
/**
* To create a ZooKeeper client object, the application needs to pass a
* connection string containing a comma separated list of host:port pairs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;

import org.apache.jute.Record;
import org.apache.zookeeper.client.HostProvider;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.RequestHeader;

Expand All @@ -34,6 +35,39 @@ public TestableZooKeeper(String host, int sessionTimeout,
Watcher watcher) throws IOException {
super(host, sessionTimeout, watcher);
}

class TestableClientCnxn extends ClientCnxn {
TestableClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
throws IOException {
super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
clientCnxnSocket, 0, new byte[16], canBeReadOnly);
}

void setXid(int newXid) {
xid = newXid;
}

int checkXid() {
return xid;
}
}

protected ClientCnxn createConnection(String chrootPath,
HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
boolean canBeReadOnly) throws IOException {
return new TestableClientCnxn(chrootPath, hostProvider, sessionTimeout, this,
watcher, clientCnxnSocket, canBeReadOnly);
}

public void setXid(int xid) {
((TestableClientCnxn)cnxn).setXid(xid);
}

public int checkXid() {
return ((TestableClientCnxn)cnxn).checkXid();
}

@Override
public List<String> getChildWatches() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicInteger;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
Expand Down Expand Up @@ -802,4 +804,37 @@ public synchronized void process(WatchedEvent event) {
Assert.assertTrue("failed to disconnect",
clientDisconnected.await(5000, TimeUnit.MILLISECONDS));
}

@Test
public void testCXidRollover() throws Exception {
TestableZooKeeper zk = null;
try {
zk = createClient();
zk.setXid(Integer.MAX_VALUE - 10);

zk.create("/testnode", "".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
for (int i = 0; i < 20; ++i) {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger rc = new AtomicInteger(0);
zk.setData("/testnode", "".getBytes(), -1,
new AsyncCallback.StatCallback() {
@Override
public void processResult(int retcode, String path, Object ctx, Stat stat) {
rc.set(retcode);
latch.countDown();
}
}, null);
Assert.assertTrue("setData should complete within 5s",
latch.await(zk.getSessionTimeout(), TimeUnit.MILLISECONDS));
Assert.assertEquals("setData should have succeeded", Code.OK.intValue(), rc.get());
}
zk.delete("/testnode", -1);
Assert.assertTrue("xid should be positive", zk.checkXid() > 0);
} finally {
if (zk != null) {
zk.close();
}
}
}
}