diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java index 78dc8fadccc..0d17ce2bc6a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java @@ -101,6 +101,7 @@ public enum DisconnectReason { CLOSE_CONNECTION_COMMAND("close_connection_command"), CLEAN_UP("clean_up"), CONNECTION_MODE_CHANGED("connection_mode_changed"), + RENEW_GLOBAL_SESSION_IN_RO_MODE("renew a global session in readonly mode"), // Below reasons are NettyServerCnxnFactory only CHANNEL_DISCONNECTED("channel disconnected"), CHANNEL_CLOSED_EXCEPTION("channel_closed_exception"), @@ -298,7 +299,7 @@ void disableRecv() { protected ZooKeeperSaslServer zooKeeperSaslServer = null; - protected static class CloseRequestException extends IOException { + public static class CloseRequestException extends IOException { private static final long serialVersionUID = -7854505709816442681L; private DisconnectReason reason; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index c391425a309..8a0321c445a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1413,13 +1413,13 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) connReq.getTimeOut(), cnxn.getRemoteSocketAddress()); } else { - long clientSessionId = connReq.getSessionId(); - LOG.debug( - "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", - Long.toHexString(clientSessionId), - Long.toHexString(connReq.getLastZxidSeen()), - connReq.getTimeOut(), - cnxn.getRemoteSocketAddress()); + validateSession(cnxn, sessionId); + LOG.debug( + "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", + Long.toHexString(sessionId), + Long.toHexString(connReq.getLastZxidSeen()), + connReq.getTimeOut(), + cnxn.getRemoteSocketAddress()); if (serverCnxnFactory != null) { serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); } @@ -1433,6 +1433,17 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) } } + /** + * Validate if a particular session can be reestablished. + * + * @param cnxn + * @param sessionId + */ + protected void validateSession(ServerCnxn cnxn, long sessionId) + throws IOException { + // do nothing + } + public boolean shouldThrottle(long outStandingCount) { int globalOutstandingLimit = getGlobalOutstandingLimit(); if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java index c50dd539f4e..3aec97f72e2 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java @@ -86,16 +86,14 @@ public void run() { case OpCode.setACL: case OpCode.multi: case OpCode.check: - ReplyHeader hdr = new ReplyHeader( - request.cxid, - zks.getZKDatabase().getDataTreeLastProcessedZxid(), - Code.NOTREADONLY.intValue()); - try { - request.cnxn.sendResponse(hdr, null, null); - } catch (IOException e) { - LOG.error("IO exception while sending response", e); - } + sendErrorResponse(request); continue; + case OpCode.closeSession: + case OpCode.createSession: + if (!request.isLocalSession()) { + sendErrorResponse(request); + continue; + } } // proceed to the next processor @@ -109,6 +107,18 @@ public void run() { LOG.info("ReadOnlyRequestProcessor exited loop!"); } + private void sendErrorResponse(Request request) { + ReplyHeader hdr = new ReplyHeader( + request.cxid, + zks.getZKDatabase().getDataTreeLastProcessedZxid(), + Code.NOTREADONLY.intValue()); + try { + request.cnxn.sendResponse(hdr, null, null); + } catch (IOException e) { + LOG.error("IO exception while sending response", e); + } + } + @Override public void processRequest(Request request) { if (!finished) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java index f8517eb9b02..d2f6b39b69a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java @@ -18,14 +18,18 @@ package org.apache.zookeeper.server.quorum; +import java.io.IOException; import java.io.PrintWriter; import java.util.Objects; import java.util.stream.Collectors; +import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.jmx.MBeanRegistry; import org.apache.zookeeper.server.DataTreeBean; import org.apache.zookeeper.server.FinalRequestProcessor; import org.apache.zookeeper.server.PrepRequestProcessor; +import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestProcessor; +import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ZKDatabase; import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.server.ZooKeeperServerBean; @@ -80,6 +84,49 @@ public synchronized void startup() { LOG.info("Read-only server started"); } + @Override + public void createSessionTracker() { + sessionTracker = new LearnerSessionTracker( + this, getZKDatabase().getSessionWithTimeOuts(), + this.tickTime, self.getId(), self.areLocalSessionsEnabled(), + getZooKeeperServerListener()); + } + + @Override + protected void startSessionTracker() { + ((LearnerSessionTracker) sessionTracker).start(); + } + + @Override + protected void setLocalSessionFlag(Request si) { + switch (si.type) { + case OpCode.createSession: + if (self.areLocalSessionsEnabled()) { + si.setLocalSession(true); + } + break; + case OpCode.closeSession: + if (((UpgradeableSessionTracker) sessionTracker).isLocalSession(si.sessionId)) { + si.setLocalSession(true); + } else { + LOG.warn("Submitting global closeSession request for session 0x{} in ReadOnly mode", + Long.toHexString(si.sessionId)); + } + break; + default: + break; + } + } + + @Override + protected void validateSession(ServerCnxn cnxn, long sessionId) throws IOException { + if (((LearnerSessionTracker) sessionTracker).isGlobalSession(sessionId)) { + String msg = "Refusing global session reconnection in RO mode " + cnxn.getRemoteSocketAddress(); + LOG.info(msg); + throw new ServerCnxn.CloseRequestException(msg, ServerCnxn.DisconnectReason.RENEW_GLOBAL_SESSION_IN_RO_MODE); + } + } + @Override protected void registerJMX() { // register with JMX diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java index 0bef4c14aad..966f3c1546d 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java @@ -27,6 +27,7 @@ import java.io.ByteArrayOutputStream; import java.io.LineNumberReader; import java.io.StringReader; +import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import org.apache.log4j.Layout; import org.apache.log4j.Level; @@ -56,7 +57,6 @@ public class ReadOnlyModeTest extends ZKTestCase { @Before public void setUp() throws Exception { System.setProperty("readonlymode.enabled", "true"); - qu.startQuorum(); } @After @@ -70,6 +70,9 @@ public void tearDown() throws Exception { */ @Test(timeout = 90000) public void testMultiTransaction() throws Exception { + qu.enableLocalSession(true); + qu.startQuorum(); + CountdownWatcher watcher = new CountdownWatcher(); ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true); watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got connected @@ -78,9 +81,12 @@ public void testMultiTransaction() throws Exception { final String node1 = "/tnode1"; final String node2 = "/tnode2"; zk.create(node1, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + zk.close(); + watcher.waitForDisconnected(CONNECTION_TIMEOUT); watcher.reset(); qu.shutdown(2); + zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true); watcher.waitForConnected(CONNECTION_TIMEOUT); assertEquals("Should be in r-o mode", States.CONNECTEDREADONLY, zk.getState()); @@ -107,6 +113,9 @@ public void testMultiTransaction() throws Exception { */ @Test(timeout = 90000) public void testReadOnlyClient() throws Exception { + qu.enableLocalSession(true); + qu.startQuorum(); + CountdownWatcher watcher = new CountdownWatcher(); ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true); watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got connected @@ -158,6 +167,9 @@ public void testReadOnlyClient() throws Exception { */ @Test(timeout = 90000) public void testConnectionEvents() throws Exception { + qu.enableLocalSession(true); + qu.startQuorum(); + CountdownWatcher watcher = new CountdownWatcher(); ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true); boolean success = false; @@ -198,6 +210,9 @@ public void testConnectionEvents() throws Exception { */ @Test(timeout = 90000) public void testSessionEstablishment() throws Exception { + qu.enableLocalSession(true); + qu.startQuorum(); + qu.shutdown(2); CountdownWatcher watcher = new CountdownWatcher(); @@ -227,6 +242,43 @@ public void testSessionEstablishment() throws Exception { zk.close(); } + @Test(timeout = 90000) + public void testGlobalSessionInRO() throws Exception { + qu.startQuorum(); + + CountdownWatcher watcher = new CountdownWatcher(); + ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true); + watcher.waitForConnected(CONNECTION_TIMEOUT); + LOG.info("global session created 0x{}", Long.toHexString(zk.getSessionId())); + + watcher.reset(); + qu.shutdown(2); + try { + watcher.waitForConnected(CONNECTION_TIMEOUT); + fail("Should not be able to renew a global session"); + } catch (TimeoutException e) { + } + zk.close(); + + watcher.reset(); + zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true); + try { + watcher.waitForConnected(CONNECTION_TIMEOUT); + fail("Should not be able to create a global session"); + } catch (TimeoutException e) { + } + zk.close(); + + qu.getPeer(1).peer.enableLocalSessions(true); + zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true); + try { + watcher.waitForConnected(CONNECTION_TIMEOUT); + } catch (TimeoutException e) { + fail("Should be able to create a local session"); + } + zk.close(); + } + /** * Ensures that client seeks for r/w servers while it's connected to r/o * server. @@ -234,6 +286,9 @@ public void testSessionEstablishment() throws Exception { @SuppressWarnings("deprecation") @Test(timeout = 90000) public void testSeekForRwServer() throws Exception { + qu.enableLocalSession(true); + qu.startQuorum(); + // setup the logger to capture all logs Layout layout = Logger.getRootLogger().getAppender("CONSOLE").getLayout(); ByteArrayOutputStream os = new ByteArrayOutputStream();