Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2708,6 +2708,31 @@ public void getEphemerals(AsyncCallback.EphemeralsCallback cb, Object ctx) {
getEphemerals("/", cb, ctx);
}

/**
* Synchronous sync. Flushes channel between process and leader.
*
* @param path the given path
* @throws KeeperException If the server signals an error with a non-zero error code
* @throws InterruptedException If the server transaction is interrupted.
* @throws IllegalArgumentException if an invalid path is specified
*/
public void sync(final String path) throws KeeperException, InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the path argument even needed? My understanding is that the path has no effect.

Copy link
Member Author

@kezhuw kezhuw Sep 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeh, it does no take effect since day one of sync. ZOOKEEPER-3414(#1187) proposed to error on no node. I think we should keep it until we have conclusion about that and semantics of sync path. Otherwise, if ZOOKEEPER-3414 is delivered in future, the sync with no path(default to "/") could fail due to chroot is not required to be existed in data tree. At the least, I think it is a separate issue and we should keep consistent between two versions of sync.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the argument for keeping it consistent, but I also know it's a pain to have API churn. So, it'd be nice for a decision to be reached and to add the new method in just once, rather than add it, and have to change it (or add a confusing overloaded version) later.

final String clientPath = path;
PathUtils.validatePath(clientPath);

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.sync);
SyncRequest request = new SyncRequest();
SyncResponse response = new SyncResponse();
request.setPath(serverPath);
Comment on lines +2720 to +2729
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be cleaner to refactor this to share code with the asynchronous version, since everything up to here is identical.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it may not worth the effort.

  • To cover only above code, we probably have to introduce wrapper struct to cover context for submitRequest and queuePacket.
  • To cover also following code, we probably have to introduce an overload to accept all parameters from asynchronous code and a flag to differentiate origin as zk.sync("/", null, null") is valid.

A little duplication probably be a good here.

ReplyHeader r = cnxn.submitRequest(h, request, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
Comment on lines +2730 to +2732
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When an asynchronous method and a synchronous version are available, I feel like the easiest implementation of a synchronous version is something like:

  1. Call the asynchronous version, and return a Future
  2. Make the current thread wait on the completion of the Future that was returned in step 1

This implementation might be doing something like that, but it's not obvious as written. If it's doing anything like that, the details might be obscured inside the submitRequest method. That might make this implementation less readable, and therefore harder to maintain. I think using Futures are more intuitive, if it's not too difficult to implement that way.

You actually have a version like this in the test code in this PR. I'm not sure why that couldn't be the implementation here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using Futures are more intuitive, if it's not too difficult to implement that way.

You actually have a version like this in the test code in this PR. I'm not sure why that couldn't be the implementation here.

I am positive to using future internally for synchronous api. This should also solve your above concern about "identical code". But I found ZOOKEEPER-4749 in investigation, may be we should wait a minute. I don't want to handle it specially for sole this api.

}
}

/**
* Asynchronous sync. Flushes channel between process and leader.
* @param path
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.jupiter.api.Assertions.fail;
import java.io.File;
import java.time.Instant;
import java.util.concurrent.CompletableFuture;
import org.apache.zookeeper.metrics.MetricsUtils;
import org.apache.zookeeper.util.ServiceUtils;
import org.hamcrest.CustomMatcher;
Expand Down Expand Up @@ -58,6 +59,25 @@ protected String getTestName() {
return testName;
}

public void syncClient(ZooKeeper zk, boolean synchronous) throws KeeperException {
if (synchronous) {
try {
zk.sync("/");
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
return;
}
final CompletableFuture<KeeperException.Code> synced = new CompletableFuture<>();
zk.sync("/", (rc, path, ctx) -> {
synced.complete(KeeperException.Code.get(rc));
}, null);
KeeperException.Code code = synced.join();
if (code != KeeperException.Code.OK) {
throw KeeperException.create(code);
}
}

@BeforeAll
public static void before() {
if (!testBaseDir.exists()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Stream;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.TestableZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.test.QuorumBase;
Expand Down Expand Up @@ -95,18 +93,6 @@ public void setUp(ServerState serverState, boolean checkEnabled) throws Exceptio
clientWatchB.waitForConnected(CONNECTION_TIMEOUT);
}

void syncClient(ZooKeeper zk) {
CompletableFuture<Void> synced = new CompletableFuture<>();
zk.sync("/", (rc, path, ctx) -> {
if (rc == 0) {
synced.complete(null);
} else {
synced.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
}, null);
synced.join();
}

@AfterEach
public void tearDown() throws Exception {
if (zkClient != null) {
Expand All @@ -124,7 +110,7 @@ private void ensureCheck(boolean enabled) {
ZooKeeperServer.setEnableEagerACLCheck(enabled);
}

private void assertTransactionState(String operation, QuorumPeer peer, long lastxid) {
private void assertTransactionState(String operation, QuorumPeer peer, long lastxid) throws Exception {
if (peer == zkLeader && peer != zkConnected) {
// The operation is performed on no leader, but we are asserting on leader.
// There is no happen-before between `zkLeader.getLastLoggedZxid()` and
Expand All @@ -133,7 +119,7 @@ private void assertTransactionState(String operation, QuorumPeer peer, long last
// to sync leader client to go through commit and response path in leader to
// build happen-before between `zkLeader.getLastLoggedZxid()` and side effect
// of previous operation.
syncClient(zkLeaderClient);
syncClient(zkLeaderClient, false);
}
assertTrue(peer == zkLeader || peer == zkConnected);
boolean eagerACL = ZooKeeperServer.isEnableEagerACLCheck();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
Expand Down Expand Up @@ -179,4 +181,15 @@ public void processResult(int rc, String path, Object ctx) {
assertTrue(complete, String.format("%s Sync completed", serverState));
}

@ParameterizedTest
@MethodSource("data")
public void testSynchronousSync(ServerState serverState) throws Exception {
setUp(serverState);
create2EmptyNode(zkClient, PARENT_PATH);
ForkJoinTask<Void> task = ForkJoinPool.commonPool().submit(() -> {
zkClient.sync(PARENT_PATH);
return null;
});
task.get(30, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.DummyWatcher;
import org.apache.zookeeper.KeeperException;
Expand Down Expand Up @@ -182,15 +183,24 @@ public void testMultipleWatcherObjs() throws IOException, InterruptedException,
}

/**
* Make sure that we can change sessions
* from follower to leader.
*
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
* Make sure that we can change sessions among servers and maintain consistent view
* using {@link ZooKeeper#sync(String)}.
*/
@Test
public void testSessionMoved() throws Exception {
public void testSessionMovedWithSynchronousSync() throws Exception {
testSessionMoved(true);
}

/**
* Make sure that we can change sessions among servers and maintain consistent view
* using {@link ZooKeeper#sync(String, AsyncCallback.VoidCallback, Object)}.
*/
@Test
public void testSessionMovedWithAsynchronousSync() throws Exception {
testSessionMoved(false);
}

public void testSessionMoved(boolean synchronous_sync) throws Exception {
String[] hostPorts = qb.hostPort.split(",");
DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
hostPorts[0],
Expand All @@ -208,21 +218,8 @@ public void testSessionMoved() throws Exception {
zk.getSessionId(),
zk.getSessionPasswd());
zknew.setData("/", new byte[1], -1);
final int[] result = new int[1];
result[0] = Integer.MAX_VALUE;
zknew.sync("/", (rc, path, ctx) -> {
synchronized (result) {
result[0] = rc;
result.notify();
}
}, null);
synchronized (result) {
if (result[0] == Integer.MAX_VALUE) {
result.wait(5000);
}
}
LOG.info("{} Sync returned {}", hostPorts[(i + 1) % hostPorts.length], result[0]);
assertTrue(result[0] == KeeperException.Code.OK.intValue());
syncClient(zknew, synchronous_sync);
LOG.info("{} Sync succeed", hostPorts[(i + 1) % hostPorts.length]);
try {
zk.setData("/", new byte[1], -1);
fail("Should have lost the connection");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,21 +243,8 @@ public void testSessionMove() throws Exception {
% hostPorts.length], CONNECTION_TIMEOUT, new MyWatcher(Integer.toString(
i
+ 1)), zk.getSessionId(), zk.getSessionPasswd());
final int[] result = new int[1];
result[0] = Integer.MAX_VALUE;
zknew.sync("/", (rc, path, ctx) -> {
synchronized (result) {
result[0] = rc;
result.notify();
}
}, null);
synchronized (result) {
if (result[0] == Integer.MAX_VALUE) {
result.wait(5000);
}
}
LOG.info("{} Sync returned {}", hostPorts[(i + 1) % hostPorts.length], result[0]);
assertTrue(result[0] == KeeperException.Code.OK.intValue());
zknew.sync("/");
LOG.info("{} Sync succeed", hostPorts[(i + 1) % hostPorts.length]);
zknew.setData("/", new byte[1], -1);
try {
zk.setData("/", new byte[1], -1);
Expand All @@ -270,6 +257,7 @@ public void testSessionMove() throws Exception {
}
zk.close();
}

/**
* This test makes sure that duplicate state changes are not communicated
* to the client watcher. For example we should not notify state as
Expand Down