Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions curator-framework/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.curator.framework.imps;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.google.common.collect.Queues;
Expand Down Expand Up @@ -69,12 +70,9 @@ public void testConnectionStateNewClient() throws Exception
client = CuratorFrameworkFactory.newClient(cluster.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
client.start();
client.checkExists().forPath("/");
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (newState == ConnectionState.LOST) {
lostLatch.countDown();
}
client.getConnectionStateListenable().addListener((client1, newState) -> {
if (newState == ConnectionState.LOST) {
lostLatch.countDown();
}
});

Expand All @@ -95,23 +93,13 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) {
.build();

final BlockingQueue<ConnectionState> states = Queues.newLinkedBlockingQueue();
client.getConnectionStateListenable().addListener
(
new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
states.add(newState);
}
}
);
client.getConnectionStateListenable().addListener((client12, newState) -> states.add(newState));
client.start();

client.checkExists().forPath("/");

ConnectionState state = states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
assertEquals(state, ConnectionState.READ_ONLY);
assertThat(state).isEqualTo(ConnectionState.READ_ONLY);
}
finally
{
Expand All @@ -136,19 +124,14 @@ public void testReadOnly() throws Exception

final CountDownLatch readOnlyLatch = new CountDownLatch(1);
final CountDownLatch reconnectedLatch = new CountDownLatch(1);
ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
ConnectionStateListener listener = (client1, newState) -> {
if ( newState == ConnectionState.READ_ONLY )
{
readOnlyLatch.countDown();
}
else if ( newState == ConnectionState.RECONNECTED )
{
if ( newState == ConnectionState.READ_ONLY )
{
readOnlyLatch.countDown();
}
else if ( newState == ConnectionState.RECONNECTED )
{
reconnectedLatch.countDown();
}
reconnectedLatch.countDown();
}
};
client.getConnectionStateListenable().addListener(listener);
Expand Down Expand Up @@ -177,7 +160,7 @@ else if ( newState == ConnectionState.RECONNECTED )
}

@Override
protected void createServer() throws Exception
protected void createServer()
{
// NOP
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerShutdownHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
Expand Down Expand Up @@ -76,10 +77,6 @@ public void startup()
{
((TestingZooKeeperMain.TestZooKeeperServer)zks).noteStartup();
}
else
{
throw new RuntimeException("Unknown ZooKeeperServer: " + zks.getClass());
}
}

@Override
Expand Down Expand Up @@ -123,5 +120,10 @@ public void submitRequest(Request si)
}
}
}

@Override
public ZooKeeperServerShutdownHandler getZkShutdownHandler() {
return zks.getZkShutdownHandler();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,22 @@ public void close()
}

public QuorumPeerConfig buildConfig(int instanceIndex) throws Exception
{
Properties properties = buildConfigProperties(instanceIndex);
QuorumPeerConfig config = new QuorumPeerConfig()
{
{
if ( fakeConfigFile != null )
{
configFileStr = fakeConfigFile.getPath();
}
}
};
config.parseProperties(properties);
return config;
}

public Properties buildConfigProperties(int instanceIndex) throws Exception
{
boolean isCluster = (instanceSpecs.size() > 1);
InstanceSpec spec = instanceSpecs.get(instanceIndex);
Expand Down Expand Up @@ -131,21 +147,9 @@ public QuorumPeerConfig buildConfig(int instanceIndex) throws Exception
}
Map<String,Object> customProperties = spec.getCustomProperties();
if (customProperties != null) {
for (Map.Entry<String,Object> property : customProperties.entrySet()) {
properties.put(property.getKey(), property.getValue());
}
properties.putAll(customProperties);
}

QuorumPeerConfig config = new QuorumPeerConfig()
{
{
if ( fakeConfigFile != null )
{
configFileStr = fakeConfigFile.getPath();
}
}
};
config.parseProperties(properties);
return config;
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@
*/
package org.apache.curator.test;

import java.lang.reflect.Field;
import java.nio.channels.ServerSocketChannel;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.apache.zookeeper.server.quorum.QuorumPeerMain;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.channels.ServerSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class TestingQuorumPeerMain extends QuorumPeerMain implements ZooKeeperMainFace
{
private static final Logger log = LoggerFactory.getLogger(TestingQuorumPeerMain.class);
private volatile boolean isClosed = false;

private volatile QuorumConfigBuilder configBuilder;
private volatile int instanceIndex;

@Override
public void kill()
{
Expand All @@ -54,11 +60,6 @@ public void kill()
}
}

public QuorumPeer getTestingQuorumPeer()
{
return quorumPeer;
}

@Override
public void close()
{
Expand All @@ -69,8 +70,7 @@ public void close()
}
}

@Override
public void blockUntilStarted()
private void blockUntilStarted()
{
long startTime = System.currentTimeMillis();
while ( (quorumPeer == null) && ((System.currentTimeMillis() - startTime) <= TestingZooKeeperMain.MAX_WAIT_MS) )
Expand All @@ -90,4 +90,32 @@ public void blockUntilStarted()
throw new FailedServerStartException("quorumPeer never got set");
}
}

@Override
public void configure(QuorumConfigBuilder configBuilder, int instanceIndex) {
this.configBuilder = configBuilder;
this.instanceIndex = instanceIndex;
}

@Override
public QuorumPeerConfig getConfig() throws Exception {
if (configBuilder != null) {
return configBuilder.buildConfig(instanceIndex);
}

return null;
}

@Override
public void start() {
new Thread(() -> {
try {
runFromConfig(getConfig());
} catch (Exception e) {
log.error("From testing server (random state: {}) for instance: {}", configBuilder.isFromRandom(), configBuilder.getInstanceSpec(instanceIndex), e);
}
}).start();

blockUntilStarted();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,15 @@

package org.apache.curator.test;

import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
import org.apache.zookeeper.server.ContainerManager;
Expand All @@ -31,27 +40,19 @@
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.JMException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.channels.ServerSocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

public class TestingZooKeeperMain implements ZooKeeperMainFace
{
private static final Logger log = LoggerFactory.getLogger(TestingZooKeeperMain.class);

private final CountDownLatch latch = new CountDownLatch(1);
private final AtomicReference<Exception> startingException = new AtomicReference<Exception>(null);
private final AtomicReference<Exception> startingException = new AtomicReference<>(null);

private volatile ServerCnxnFactory cnxnFactory;
private volatile TestZooKeeperServer zkServer;
private volatile ContainerManager containerManager;
private volatile QuorumConfigBuilder configBuilder;
private volatile int instanceIndex;

private static final Timing timing = new Timing();

Expand Down Expand Up @@ -97,12 +98,16 @@ public void kill()
}
}

TestZooKeeperServer getZkServer() {
return zkServer;
@Override
public QuorumPeerConfig getConfig() throws Exception {
if (configBuilder != null) {
return configBuilder.buildConfig(instanceIndex);
}

return null;
}

@Override
public void runFromConfig(QuorumPeerConfig config) throws Exception
private void runFromConfig(QuorumPeerConfig config) throws Exception
{
try
{
Expand All @@ -111,7 +116,7 @@ public void runFromConfig(QuorumPeerConfig config) throws Exception
MBeanRegistry nopMBeanRegistry = new MBeanRegistry()
{
@Override
public void register(ZKMBeanInfo bean, ZKMBeanInfo parent) throws JMException
public void register(ZKMBeanInfo bean, ZKMBeanInfo parent)
{
// NOP
}
Expand Down Expand Up @@ -142,9 +147,7 @@ public void unregister(ZKMBeanInfo bean)
}
}

@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
public void blockUntilStarted()
private void blockUntilStarted()
{
if (!timing.awaitLatch(latch))
{
Expand Down Expand Up @@ -267,6 +270,28 @@ private void internalRunFromConfig(ServerConfig config) throws IOException
}
}

@Override
public void configure(QuorumConfigBuilder configBuilder, int instanceIndex) {
this.configBuilder = configBuilder;
this.instanceIndex = instanceIndex;
}

@Override
public void start() {
new Thread(() -> {
try
{
runFromConfig(getConfig());
}
catch ( Exception e )
{
log.error(String.format("From testing server (random state: %s) for instance: %s", String.valueOf(configBuilder.isFromRandom()), configBuilder.getInstanceSpec(instanceIndex)), e);
}
}, "zk-main-thread").start();

blockUntilStarted();
}

public static class TestZooKeeperServer extends ZooKeeperServer
{
private final FileTxnSnapLog txnLog;
Expand Down
Loading