diff --git a/ratis-docs/src/site/markdown/configurations.md b/ratis-docs/src/site/markdown/configurations.md index 7fa7ddf80d..0500a053a6 100644 --- a/ratis-docs/src/site/markdown/configurations.md +++ b/ratis-docs/src/site/markdown/configurations.md @@ -185,6 +185,14 @@ treat the peer as caught-up. Increase this number when write throughput is high. -------------------------------------------------------------------------------- +| **Property** | `raft.server.read.leader.lease.enabled` | +|:----------------|:-----------------------------------------------------------| +| **Description** | whether to enable lease in linearizable read-only requests | +| **Type** | boolean | +| **Default** | true | + +-------------------------------------------------------------------------------- + | **Property** | `raft.server.read.leader.lease.timeout.ratio` | |:----------------|:----------------------------------------------| | **Description** | maximum timeout ratio of leader lease | diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 5551f9cdde..cd38e5667d 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -192,6 +192,16 @@ static void setOption(RaftProperties properties, Option option) { set(properties::setEnum, OPTION_KEY, option); } + String LEADER_LEASE_ENABLED_KEY = PREFIX + ".leader.lease.enabled"; + boolean LEADER_LEASE_ENABLED_DEFAULT = false; + static boolean leaderLeaseEnabled(RaftProperties properties) { + return getBoolean(properties::getBoolean, LEADER_LEASE_ENABLED_KEY, + LEADER_LEASE_ENABLED_DEFAULT, getDefaultLog()); + } + static void setLeaderLeaseEnabled(RaftProperties properties, boolean enabled) { + setBoolean(properties::setBoolean, LEADER_LEASE_ENABLED_KEY, enabled); + } + String LEADER_LEASE_TIMEOUT_RATIO_KEY = PREFIX + ".leader.lease.timeout.ratio"; double LEADER_LEASE_TIMEOUT_RATIO_DEFAULT = 0.9; static double leaderLeaseTimeoutRatio(RaftProperties properties) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderLease.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderLease.java index 758cbb6c54..315cc9f143 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderLease.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderLease.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -34,11 +35,12 @@ class LeaderLease { + private final AtomicBoolean enabled; private final long leaseTimeoutMs; - // TODO invalidate leader lease when stepDown / transferLeader private final AtomicReference lease = new AtomicReference<>(Timestamp.currentTime()); LeaderLease(RaftProperties properties) { + this.enabled = new AtomicBoolean(RaftServerConfigKeys.Read.leaderLeaseEnabled(properties)); final double leaseRatio = RaftServerConfigKeys.Read.leaderLeaseTimeoutRatio(properties); Preconditions.assertTrue(leaseRatio > 0.0 && leaseRatio <= 1.0, "leader ratio should sit in (0,1], now is " + leaseRatio); @@ -47,8 +49,16 @@ class LeaderLease { .toIntExact(TimeUnit.MILLISECONDS); } + boolean getAndSetEnabled(boolean newValue) { + return enabled.getAndSet(newValue); + } + + boolean isEnabled() { + return enabled.get(); + } + boolean isValid() { - return lease.get().elapsedTimeMs() < leaseTimeoutMs; + return isEnabled() && lease.get().elapsedTimeMs() < leaseTimeoutMs; } /** diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 4181393780..4ebfc3d567 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -438,6 +438,7 @@ void stop() { messageStreamRequests.clear(); // TODO client should retry on NotLeaderException readIndexHeartbeats.failListeners(nle); + lease.getAndSetEnabled(false); server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId()); logAppenderMetrics.unregister(); raftServerMetrics.unregister(); @@ -675,6 +676,7 @@ void submitStepDownEvent(long term, StepDownReason reason) { private void stepDown(long term, StepDownReason reason) { try { + lease.getAndSetEnabled(false); server.changeToFollowerAndPersistMetadata(term, false, reason); pendingStepDown.complete(server::newSuccessReply); } catch(IOException e) { @@ -953,6 +955,7 @@ private void checkAndUpdateConfiguration() { pendingRequests.replySetConfiguration(server::newSuccessReply); // if the leader is not included in the current configuration, step down if (!conf.containsInConf(server.getId(), RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER)) { + lease.getAndSetEnabled(false); LOG.info("{} is not included in the new configuration {}. Will shutdown server...", this, conf); try { // leave some time for all RPC senders to send out new conf entry @@ -1113,6 +1116,12 @@ CompletableFuture getReadIndex(Long readAfterWriteConsistentIndex) { new LeaderNotReadyException(server.getMemberId()))); } + // if lease is enabled, check lease first + if (hasLease()) { + return CompletableFuture.completedFuture(readIndex); + } + + // send heartbeats and wait for majority acknowledgments final AppendEntriesListener listener = readIndexHeartbeats.addAppendEntriesListener( readIndex, i -> new AppendEntriesListener(i, senders)); @@ -1129,7 +1138,15 @@ public void onAppendEntriesReply(LogAppender appender, RaftProtos.AppendEntriesR readIndexHeartbeats.onAppendEntriesReply(appender, reply, this::hasMajority); } + boolean getAndSetLeaseEnabled(boolean newValue) { + return lease.getAndSetEnabled(newValue); + } + boolean hasLease() { + if (!lease.isEnabled()) { + return false; + } + if (checkLeaderLease()) { return true; } @@ -1143,7 +1160,8 @@ boolean hasLease() { } private boolean checkLeaderLease() { - return isReady() && (server.getRaftConf().isSingleton() || lease.isValid()); + return isRunning() && isReady() + && (server.getRaftConf().isSingleton() || lease.isValid()); } void replyPendingRequest(long logIndex, RaftClientReply reply) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 40a17c4e95..3fb0cb2faa 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -999,8 +999,14 @@ private CompletableFuture getReadIndex(RaftClientRequest request, LeaderSt } private CompletableFuture readAsync(RaftClientRequest request) { - if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE - && !request.getType().getRead().getPreferNonLinearizable()) { + if (request.getType().getRead().getPreferNonLinearizable() + || readOption == RaftServerConfigKeys.Read.Option.DEFAULT) { + final CompletableFuture reply = checkLeaderState(request, null, false); + if (reply != null) { + return reply; + } + return queryStateMachine(request); + } else if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE){ /* Linearizable read using ReadIndex. See Raft paper section 6.4. 1. First obtain readIndex from Leader. @@ -1027,13 +1033,6 @@ private CompletableFuture readAsync(RaftClientRequest request) .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex)) .thenCompose(readIndex -> queryStateMachine(request)) .exceptionally(e -> readException2Reply(request, e)); - } else if (readOption == RaftServerConfigKeys.Read.Option.DEFAULT - || request.getType().getRead().getPreferNonLinearizable()) { - CompletableFuture reply = checkLeaderState(request, null, false); - if (reply != null) { - return reply; - } - return queryStateMachine(request); } else { throw new IllegalStateException("Unexpected read option: " + readOption); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java index 74ada6541f..e54bee7484 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java @@ -295,6 +295,9 @@ private CompletableFuture start(Context context) { if (previous != null) { return createReplyFutureFromPreviousRequest(request, previous); } + // disable the lease before transferring leader + final boolean previousLeaseEnabled = server.getRole().getLeaderState() + .map(l -> l.getAndSetLeaseEnabled(false)).orElse(false); final PendingRequest pendingRequest = supplier.get(); final Result result = tryTransferLeadership(context); final Result.Type type = result.getType(); @@ -308,6 +311,12 @@ private CompletableFuture start(Context context) { timeout.toString(TimeUnit.SECONDS, 3))), LOG, () -> "Failed to handle timeout"); } + // reset back lease if the current transfer fails + pendingRequest.getReplyFuture().whenCompleteAsync((reply, ex) -> { + if (ex != null || !reply.isSuccess()) { + server.getRole().getLeaderState().ifPresent(l -> l.getAndSetLeaseEnabled(previousLeaseEnabled)); + } + }); return pendingRequest.getReplyFuture(); } diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java index a919a92926..eea75592ef 100644 --- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java @@ -43,7 +43,6 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -69,16 +68,21 @@ public void setup() { final RaftProperties p = getProperties(); p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, CounterStateMachine.class, StateMachine.class); - - p.setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE); } @Test public void testLinearizableRead() throws Exception { - runWithNewCluster(NUM_SERVERS, this::testLinearizableReadImpl); + getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE); + runWithNewCluster(NUM_SERVERS, this::testReadOnlyImpl); } - private void testLinearizableReadImpl(CLUSTER cluster) throws Exception { + @Test + public void testLeaseRead() throws Exception { + getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true); + runWithNewCluster(NUM_SERVERS, this::testReadOnlyImpl); + } + + private void testReadOnlyImpl(CLUSTER cluster) throws Exception { try { RaftTestUtil.waitForLeader(cluster); final RaftPeerId leaderId = cluster.getLeader().getId(); @@ -98,10 +102,17 @@ private void testLinearizableReadImpl(CLUSTER cluster) throws Exception { @Test public void testLinearizableReadTimeout() throws Exception { - runWithNewCluster(NUM_SERVERS, this::testLinearizableReadTimeoutImpl); + getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE); + runWithNewCluster(NUM_SERVERS, this::testReadOnlyTimeoutImpl); + } + + @Test + public void testLeaseReadTimeout() throws Exception { + getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true); + runWithNewCluster(NUM_SERVERS, this::testReadOnlyTimeoutImpl); } - private void testLinearizableReadTimeoutImpl(CLUSTER cluster) throws Exception { + private void testReadOnlyTimeoutImpl(CLUSTER cluster) throws Exception { try { RaftTestUtil.waitForLeader(cluster); final RaftPeerId leaderId = cluster.getLeader().getId(); @@ -126,10 +137,17 @@ private void testLinearizableReadTimeoutImpl(CLUSTER cluster) throws Exception { @Test public void testFollowerLinearizableRead() throws Exception { - runWithNewCluster(NUM_SERVERS, this::testFollowerLinearizableReadImpl); + getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE); + runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyImpl); + } + + @Test + public void testFollowerLeaseRead() throws Exception { + getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true); + runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyImpl); } - private void testFollowerLinearizableReadImpl(CLUSTER cluster) throws Exception { + private void testFollowerReadOnlyImpl(CLUSTER cluster) throws Exception { try { RaftTestUtil.waitForLeader(cluster); @@ -155,10 +173,17 @@ private void testFollowerLinearizableReadImpl(CLUSTER cluster) throws Exception @Test public void testFollowerLinearizableReadParallel() throws Exception { - runWithNewCluster(NUM_SERVERS, this::testFollowerLinearizableReadParallelImpl); + getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE); + runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyParallelImpl); } - private void testFollowerLinearizableReadParallelImpl(CLUSTER cluster) throws Exception { + @Test + public void testFollowerLeaseReadParallel() throws Exception { + getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true); + runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyParallelImpl); + } + + private void testFollowerReadOnlyParallelImpl(CLUSTER cluster) throws Exception { try { RaftTestUtil.waitForLeader(cluster); @@ -183,10 +208,17 @@ private void testFollowerLinearizableReadParallelImpl(CLUSTER cluster) throws Ex @Test public void testFollowerLinearizableReadFailWhenLeaderDown() throws Exception { - runWithNewCluster(NUM_SERVERS, this::testFollowerLinearizableReadFailWhenLeaderDownImpl); + getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE); + runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyFailWhenLeaderDownImpl); } - private void testFollowerLinearizableReadFailWhenLeaderDownImpl(CLUSTER cluster) throws Exception { + @Test + public void testFollowerLeaseReadWhenLeaderDown() throws Exception { + getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true); + runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyFailWhenLeaderDownImpl); + } + + private void testFollowerReadOnlyFailWhenLeaderDownImpl(CLUSTER cluster) throws Exception { try { RaftTestUtil.waitForLeader(cluster); @@ -215,11 +247,18 @@ private void testFollowerLinearizableReadFailWhenLeaderDownImpl(CLUSTER cluster) } @Test - public void testFollowerLinearizableReadRetryWhenLeaderDown() throws Exception { - runWithNewCluster(NUM_SERVERS, this::testFollowerLinearizableReadRetryWhenLeaderDown); + public void testFollowerReadOnlyRetryWhenLeaderDown() throws Exception { + getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE); + runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyRetryWhenLeaderDown); + } + + @Test + public void testFollowerLeaseReadRetryWhenLeaderDown() throws Exception { + getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true); + runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyRetryWhenLeaderDown); } - private void testFollowerLinearizableReadRetryWhenLeaderDown(CLUSTER cluster) throws Exception { + private void testFollowerReadOnlyRetryWhenLeaderDown(CLUSTER cluster) throws Exception { // only retry on readIndexException final RetryPolicy retryPolicy = ExceptionDependentRetry .newBuilder() diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java index c2e5cbd1c7..6453e8e944 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java @@ -652,6 +652,7 @@ private void runLeaseTest(CLUSTER cluster, CheckedBiConsumer runLeaseTest(c, this::runTestLeaderLease)); } @@ -679,6 +680,7 @@ void runTestLeaderLease(CLUSTER cluster, long leaseTimeoutMs) throws Exception { @Test public void testLeaderLeaseDuringReconfiguration() throws Exception { // use a strict lease + RaftServerConfigKeys.Read.setLeaderLeaseEnabled(getProperties(), true); RaftServerConfigKeys.Read.setLeaderLeaseTimeoutRatio(getProperties(), 0.5); runWithNewCluster(3, c -> runLeaseTest(c, this::runTestLeaderLeaseDuringReconfiguration)); }