diff --git a/curator-framework/pom.xml b/curator-framework/pom.xml
index ebc55a842e..62d12d837f 100644
--- a/curator-framework/pom.xml
+++ b/curator-framework/pom.xml
@@ -56,6 +56,12 @@
test
+
+ org.assertj
+ assertj-core
+ test
+
+
com.fasterxml.jackson.core
jackson-core
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReadOnly.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReadOnly.java
index 21a9130a24..1d1d2d04c3 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReadOnly.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestReadOnly.java
@@ -19,6 +19,7 @@
package org.apache.curator.framework.imps;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.common.collect.Queues;
@@ -69,12 +70,9 @@ public void testConnectionStateNewClient() throws Exception
client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
client.start();
client.checkExists().forPath("/");
- client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState) {
- if (newState == ConnectionState.LOST) {
- lostLatch.countDown();
- }
+ client.getConnectionStateListenable().addListener((client1, newState) -> {
+ if (newState == ConnectionState.LOST) {
+ lostLatch.countDown();
}
});
@@ -95,23 +93,13 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) {
.build();
final BlockingQueue states = Queues.newLinkedBlockingQueue();
- client.getConnectionStateListenable().addListener
- (
- new ConnectionStateListener()
- {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- states.add(newState);
- }
- }
- );
+ client.getConnectionStateListenable().addListener((client12, newState) -> states.add(newState));
client.start();
client.checkExists().forPath("/");
ConnectionState state = states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
- assertEquals(state, ConnectionState.READ_ONLY);
+ assertThat(state).isEqualTo(ConnectionState.READ_ONLY);
}
finally
{
@@ -136,19 +124,14 @@ public void testReadOnly() throws Exception
final CountDownLatch readOnlyLatch = new CountDownLatch(1);
final CountDownLatch reconnectedLatch = new CountDownLatch(1);
- ConnectionStateListener listener = new ConnectionStateListener()
- {
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
+ ConnectionStateListener listener = (client1, newState) -> {
+ if ( newState == ConnectionState.READ_ONLY )
+ {
+ readOnlyLatch.countDown();
+ }
+ else if ( newState == ConnectionState.RECONNECTED )
{
- if ( newState == ConnectionState.READ_ONLY )
- {
- readOnlyLatch.countDown();
- }
- else if ( newState == ConnectionState.RECONNECTED )
- {
- reconnectedLatch.countDown();
- }
+ reconnectedLatch.countDown();
}
};
client.getConnectionStateListenable().addListener(listener);
@@ -177,7 +160,7 @@ else if ( newState == ConnectionState.RECONNECTED )
}
@Override
- protected void createServer() throws Exception
+ protected void createServer()
{
// NOP
}
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
index 07e9a17862..aaaf43c1e5 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/ChaosMonkeyCnxnFactory.java
@@ -27,6 +27,7 @@
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerShutdownHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
@@ -76,10 +77,6 @@ public void startup()
{
((TestingZooKeeperMain.TestZooKeeperServer)zks).noteStartup();
}
- else
- {
- throw new RuntimeException("Unknown ZooKeeperServer: " + zks.getClass());
- }
}
@Override
@@ -123,5 +120,10 @@ public void submitRequest(Request si)
}
}
}
+
+ @Override
+ public ZooKeeperServerShutdownHandler getZkShutdownHandler() {
+ return zks.getZkShutdownHandler();
+ }
}
}
diff --git a/curator-test/src/main/java/org/apache/curator/test/QuorumConfigBuilder.java b/curator-test/src/main/java/org/apache/curator/test/QuorumConfigBuilder.java
index 2300efc9d7..42f7b27bd5 100644
--- a/curator-test/src/main/java/org/apache/curator/test/QuorumConfigBuilder.java
+++ b/curator-test/src/main/java/org/apache/curator/test/QuorumConfigBuilder.java
@@ -96,6 +96,22 @@ public void close()
}
public QuorumPeerConfig buildConfig(int instanceIndex) throws Exception
+ {
+ Properties properties = buildConfigProperties(instanceIndex);
+ QuorumPeerConfig config = new QuorumPeerConfig()
+ {
+ {
+ if ( fakeConfigFile != null )
+ {
+ configFileStr = fakeConfigFile.getPath();
+ }
+ }
+ };
+ config.parseProperties(properties);
+ return config;
+ }
+
+ public Properties buildConfigProperties(int instanceIndex) throws Exception
{
boolean isCluster = (instanceSpecs.size() > 1);
InstanceSpec spec = instanceSpecs.get(instanceIndex);
@@ -131,21 +147,9 @@ public QuorumPeerConfig buildConfig(int instanceIndex) throws Exception
}
Map customProperties = spec.getCustomProperties();
if (customProperties != null) {
- for (Map.Entry property : customProperties.entrySet()) {
- properties.put(property.getKey(), property.getValue());
- }
+ properties.putAll(customProperties);
}
- QuorumPeerConfig config = new QuorumPeerConfig()
- {
- {
- if ( fakeConfigFile != null )
- {
- configFileStr = fakeConfigFile.getPath();
- }
- }
- };
- config.parseProperties(properties);
- return config;
+ return properties;
}
}
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
index 49d47c5408..a54e8cb335 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingQuorumPeerMain.java
@@ -18,17 +18,23 @@
*/
package org.apache.curator.test;
+import java.lang.reflect.Field;
+import java.nio.channels.ServerSocketChannel;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerMain;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.nio.channels.ServerSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
class TestingQuorumPeerMain extends QuorumPeerMain implements ZooKeeperMainFace
{
+ private static final Logger log = LoggerFactory.getLogger(TestingQuorumPeerMain.class);
private volatile boolean isClosed = false;
+ private volatile QuorumConfigBuilder configBuilder;
+ private volatile int instanceIndex;
+
@Override
public void kill()
{
@@ -54,11 +60,6 @@ public void kill()
}
}
- public QuorumPeer getTestingQuorumPeer()
- {
- return quorumPeer;
- }
-
@Override
public void close()
{
@@ -69,8 +70,7 @@ public void close()
}
}
- @Override
- public void blockUntilStarted()
+ private void blockUntilStarted()
{
long startTime = System.currentTimeMillis();
while ( (quorumPeer == null) && ((System.currentTimeMillis() - startTime) <= TestingZooKeeperMain.MAX_WAIT_MS) )
@@ -90,4 +90,32 @@ public void blockUntilStarted()
throw new FailedServerStartException("quorumPeer never got set");
}
}
+
+ @Override
+ public void configure(QuorumConfigBuilder configBuilder, int instanceIndex) {
+ this.configBuilder = configBuilder;
+ this.instanceIndex = instanceIndex;
+ }
+
+ @Override
+ public QuorumPeerConfig getConfig() throws Exception {
+ if (configBuilder != null) {
+ return configBuilder.buildConfig(instanceIndex);
+ }
+
+ return null;
+ }
+
+ @Override
+ public void start() {
+ new Thread(() -> {
+ try {
+ runFromConfig(getConfig());
+ } catch (Exception e) {
+ log.error("From testing server (random state: {}) for instance: {}", configBuilder.isFromRandom(), configBuilder.getInstanceSpec(instanceIndex), e);
+ }
+ }).start();
+
+ blockUntilStarted();
+ }
}
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
index b1a873a16b..646cdc4f04 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperMain.java
@@ -19,6 +19,15 @@
package org.apache.curator.test;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.channels.ServerSocketChannel;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
import org.apache.zookeeper.server.ContainerManager;
@@ -31,27 +40,19 @@
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.management.JMException;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.nio.channels.ServerSocketChannel;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
public class TestingZooKeeperMain implements ZooKeeperMainFace
{
private static final Logger log = LoggerFactory.getLogger(TestingZooKeeperMain.class);
private final CountDownLatch latch = new CountDownLatch(1);
- private final AtomicReference startingException = new AtomicReference(null);
+ private final AtomicReference startingException = new AtomicReference<>(null);
private volatile ServerCnxnFactory cnxnFactory;
private volatile TestZooKeeperServer zkServer;
private volatile ContainerManager containerManager;
+ private volatile QuorumConfigBuilder configBuilder;
+ private volatile int instanceIndex;
private static final Timing timing = new Timing();
@@ -97,12 +98,16 @@ public void kill()
}
}
- TestZooKeeperServer getZkServer() {
- return zkServer;
+ @Override
+ public QuorumPeerConfig getConfig() throws Exception {
+ if (configBuilder != null) {
+ return configBuilder.buildConfig(instanceIndex);
+ }
+
+ return null;
}
- @Override
- public void runFromConfig(QuorumPeerConfig config) throws Exception
+ private void runFromConfig(QuorumPeerConfig config) throws Exception
{
try
{
@@ -111,7 +116,7 @@ public void runFromConfig(QuorumPeerConfig config) throws Exception
MBeanRegistry nopMBeanRegistry = new MBeanRegistry()
{
@Override
- public void register(ZKMBeanInfo bean, ZKMBeanInfo parent) throws JMException
+ public void register(ZKMBeanInfo bean, ZKMBeanInfo parent)
{
// NOP
}
@@ -142,9 +147,7 @@ public void unregister(ZKMBeanInfo bean)
}
}
- @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
- @Override
- public void blockUntilStarted()
+ private void blockUntilStarted()
{
if (!timing.awaitLatch(latch))
{
@@ -267,6 +270,28 @@ private void internalRunFromConfig(ServerConfig config) throws IOException
}
}
+ @Override
+ public void configure(QuorumConfigBuilder configBuilder, int instanceIndex) {
+ this.configBuilder = configBuilder;
+ this.instanceIndex = instanceIndex;
+ }
+
+ @Override
+ public void start() {
+ new Thread(() -> {
+ try
+ {
+ runFromConfig(getConfig());
+ }
+ catch ( Exception e )
+ {
+ log.error(String.format("From testing server (random state: %s) for instance: %s", String.valueOf(configBuilder.isFromRandom()), configBuilder.getInstanceSpec(instanceIndex)), e);
+ }
+ }, "zk-main-thread").start();
+
+ blockUntilStarted();
+ }
+
public static class TestZooKeeperServer extends ZooKeeperServer
{
private final FileTxnSnapLog txnLog;
diff --git a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperServer.java b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperServer.java
index e80566ad63..42d94fd4a0 100644
--- a/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperServer.java
+++ b/curator-test/src/main/java/org/apache/curator/test/TestingZooKeeperServer.java
@@ -19,26 +19,37 @@
package org.apache.curator.test;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Thanks to Jeremie BORDIER (ahfeel) for this code
*/
public class TestingZooKeeperServer implements Closeable
{
- private static final Logger logger = LoggerFactory.getLogger(TestingZooKeeperServer.class);
+ private static final Logger log = LoggerFactory.getLogger(TestingZooKeeperServer.class);
+ private static final boolean hasZooKeeperServerEmbedded;
+ private final AtomicReference state = new AtomicReference<>(State.LATENT);
private final QuorumConfigBuilder configBuilder;
private final int thisInstanceIndex;
private volatile ZooKeeperMainFace main;
- private final AtomicReference state = new AtomicReference(State.LATENT);
+
+ static {
+ boolean localHasZooKeeperServerEmbedded;
+ try {
+ Class.forName("org.apache.zookeeper.server.embedded.ZooKeeperServerEmbedded");
+ localHasZooKeeperServerEmbedded = true;
+ } catch (Throwable t) {
+ localHasZooKeeperServerEmbedded = false;
+ log.info("ZooKeeperServerEmbedded is not available in the version of the ZooKeeper library being used");
+ }
+ hasZooKeeperServerEmbedded = localHasZooKeeperServerEmbedded;
+ }
ZooKeeperMainFace getMain() {
return main;
@@ -60,20 +71,22 @@ public TestingZooKeeperServer(QuorumConfigBuilder configBuilder, int thisInstanc
this.configBuilder = configBuilder;
this.thisInstanceIndex = thisInstanceIndex;
- main = isCluster() ? new TestingQuorumPeerMain() : new TestingZooKeeperMain();
+ main = createServerMain();
+ }
+
+ private ZooKeeperMainFace createServerMain() {
+ if (hasZooKeeperServerEmbedded) {
+ return new ZooKeeperServerEmbeddedAdapter();
+ } else if (isCluster()) {
+ return new TestingQuorumPeerMain();
+ } else {
+ return new TestingZooKeeperMain();
+ }
}
private boolean isCluster() {
return configBuilder.size() > 1;
}
-
- public QuorumPeer getQuorumPeer()
- {
- if (isCluster()) {
- return ((TestingQuorumPeerMain) main).getTestingQuorumPeer();
- }
- throw new UnsupportedOperationException();
- }
public Collection getInstanceSpecs()
{
@@ -91,8 +104,6 @@ public void kill()
* started again. If it is not running (in a LATENT or STOPPED state) then
* it will be restarted. If it is in a CLOSED state then an exception will
* be thrown.
- *
- * @throws Exception
*/
public void restart() throws Exception
{
@@ -111,7 +122,7 @@ public void restart() throws Exception
// Set to a LATENT state so we can restart
state.set(State.LATENT);
- main = isCluster() ? new TestingQuorumPeerMain() : new TestingZooKeeperMain();
+ main = createServerMain();
start();
}
@@ -152,22 +163,7 @@ public void start() throws Exception
return;
}
- new Thread(new Runnable()
- {
- public void run()
- {
- try
- {
- QuorumPeerConfig config = configBuilder.buildConfig(thisInstanceIndex);
- main.runFromConfig(config);
- }
- catch ( Exception e )
- {
- logger.error(String.format("From testing server (random state: %s) for instance: %s", String.valueOf(configBuilder.isFromRandom()), getInstanceSpec()), e);
- }
- }
- }).start();
-
- main.blockUntilStarted();
+ main.configure(configBuilder, thisInstanceIndex);
+ main.start();
}
}
\ No newline at end of file
diff --git a/curator-test/src/main/java/org/apache/curator/test/ZooKeeperMainFace.java b/curator-test/src/main/java/org/apache/curator/test/ZooKeeperMainFace.java
index 904e601d73..5b8521fe68 100644
--- a/curator-test/src/main/java/org/apache/curator/test/ZooKeeperMainFace.java
+++ b/curator-test/src/main/java/org/apache/curator/test/ZooKeeperMainFace.java
@@ -18,14 +18,16 @@
*/
package org.apache.curator.test;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import java.io.Closeable;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
public interface ZooKeeperMainFace extends Closeable
{
- void runFromConfig(QuorumPeerConfig config) throws Exception;
+ void configure(QuorumConfigBuilder config, int instanceIndex) throws Exception;
- void blockUntilStarted();
+ void start();
void kill();
+
+ QuorumPeerConfig getConfig() throws Exception;
}
diff --git a/curator-test/src/main/java/org/apache/curator/test/ZooKeeperServerEmbeddedAdapter.java b/curator-test/src/main/java/org/apache/curator/test/ZooKeeperServerEmbeddedAdapter.java
new file mode 100644
index 0000000000..0e8aeb0763
--- /dev/null
+++ b/curator-test/src/main/java/org/apache/curator/test/ZooKeeperServerEmbeddedAdapter.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.curator.test;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Properties;
+import org.apache.zookeeper.server.embedded.ZooKeeperServerEmbedded;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZooKeeperServerEmbeddedAdapter implements ZooKeeperMainFace {
+ private static final Logger log = LoggerFactory.getLogger(ZooKeeperServerEmbeddedAdapter.class);
+ private static final Duration DEFAULT_STARTUP_TIMEOUT = Duration.ofMinutes(1);
+
+ private volatile ZooKeeperServerEmbedded zooKeeperEmbedded;
+ private volatile QuorumConfigBuilder configBuilder;
+ private volatile int instanceIndex;
+
+ @Override
+ public void configure(QuorumConfigBuilder config, int instanceIndex) throws Exception {
+ this.configBuilder = config;
+ this.instanceIndex = instanceIndex;
+
+ final Properties properties = config.buildConfigProperties(instanceIndex);
+ properties.put("admin.enableServer", "false");
+
+ final Path dataDir = Paths.get(properties.getProperty("dataDir"));
+ zooKeeperEmbedded = ZooKeeperServerEmbedded.builder()
+ .configuration(properties)
+ .baseDir(dataDir.getParent())
+ .build();
+ log.info("Configure ZooKeeperServerEmbeddedAdapter with properties: {}", properties);
+ }
+
+ @Override
+ public QuorumPeerConfig getConfig() throws Exception {
+ if (configBuilder != null) {
+ return configBuilder.buildConfig(instanceIndex);
+ }
+
+ return null;
+ }
+
+ @Override
+ public void start() {
+ if (zooKeeperEmbedded == null) {
+ throw new FailedServerStartException(new NullPointerException("zooKeeperEmbedded"));
+ }
+
+ try {
+ zooKeeperEmbedded.start(DEFAULT_STARTUP_TIMEOUT.toMillis());
+ } catch (Exception e) {
+ throw new FailedServerStartException(e);
+ }
+ }
+
+ @Override
+ public void kill() {
+ close();
+ }
+
+ @Override
+ public void close() {
+ if (zooKeeperEmbedded != null) {
+ zooKeeperEmbedded.close();
+ }
+ }
+}
diff --git a/curator-test/src/test/java/org/apache/curator/test/TestTestingServer.java b/curator-test/src/test/java/org/apache/curator/test/TestTestingServer.java
index 1ddf960e49..f16b2ac748 100644
--- a/curator-test/src/test/java/org/apache/curator/test/TestTestingServer.java
+++ b/curator-test/src/test/java/org/apache/curator/test/TestTestingServer.java
@@ -45,8 +45,7 @@ public void setCustomTickTimeTest() throws Exception {
final InstanceSpec spec = new InstanceSpec(zkTmpDir, -1, -1, -1, true, -1, customTickMs, -1);
final int zkTickTime;
try (TestingServer testingServer = new TestingServer(spec, true)) {
- TestingZooKeeperMain main = (TestingZooKeeperMain) testingServer.getTestingZooKeeperServer().getMain();
- zkTickTime = main.getZkServer().getTickTime();
+ zkTickTime = testingServer.getTestingZooKeeperServer().getMain().getConfig().getTickTime();
}
assertEquals(customTickMs, zkTickTime);
}