Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions ratis-docs/src/site/markdown/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,14 @@ treat the peer as caught-up. Increase this number when write throughput is high.

--------------------------------------------------------------------------------

| **Property** | `raft.server.read.leader.lease.enabled` |
|:----------------|:-----------------------------------------------------------|
| **Description** | whether to enable lease in linearizable read-only requests |
| **Type** | boolean |
| **Default** | true |

--------------------------------------------------------------------------------

| **Property** | `raft.server.read.leader.lease.timeout.ratio` |
|:----------------|:----------------------------------------------|
| **Description** | maximum timeout ratio of leader lease |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,16 @@ static void setOption(RaftProperties properties, Option option) {
set(properties::setEnum, OPTION_KEY, option);
}

String LEADER_LEASE_ENABLED_KEY = PREFIX + ".leader.lease.enabled";
boolean LEADER_LEASE_ENABLED_DEFAULT = false;
static boolean leaderLeaseEnabled(RaftProperties properties) {
return getBoolean(properties::getBoolean, LEADER_LEASE_ENABLED_KEY,
LEADER_LEASE_ENABLED_DEFAULT, getDefaultLog());
}
static void setLeaderLeaseEnabled(RaftProperties properties, boolean enabled) {
setBoolean(properties::setBoolean, LEADER_LEASE_ENABLED_KEY, enabled);
}

String LEADER_LEASE_TIMEOUT_RATIO_KEY = PREFIX + ".leader.lease.timeout.ratio";
double LEADER_LEASE_TIMEOUT_RATIO_DEFAULT = 0.9;
static double leaderLeaseTimeoutRatio(RaftProperties properties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

class LeaderLease {

private final AtomicBoolean enabled;
private final long leaseTimeoutMs;
// TODO invalidate leader lease when stepDown / transferLeader
private final AtomicReference<Timestamp> lease = new AtomicReference<>(Timestamp.currentTime());

LeaderLease(RaftProperties properties) {
this.enabled = new AtomicBoolean(RaftServerConfigKeys.Read.leaderLeaseEnabled(properties));
final double leaseRatio = RaftServerConfigKeys.Read.leaderLeaseTimeoutRatio(properties);
Preconditions.assertTrue(leaseRatio > 0.0 && leaseRatio <= 1.0,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to check this when lease read is disabled?

"leader ratio should sit in (0,1], now is " + leaseRatio);
Expand All @@ -47,8 +49,16 @@ class LeaderLease {
.toIntExact(TimeUnit.MILLISECONDS);
}

boolean getAndSetEnabled(boolean newValue) {
return enabled.getAndSet(newValue);
}

boolean isEnabled() {
return enabled.get();
}

boolean isValid() {
return lease.get().elapsedTimeMs() < leaseTimeoutMs;
return isEnabled() && lease.get().elapsedTimeMs() < leaseTimeoutMs;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ void stop() {
messageStreamRequests.clear();
// TODO client should retry on NotLeaderException
readIndexHeartbeats.failListeners(nle);
lease.getAndSetEnabled(false);
server.getServerRpc().notifyNotLeader(server.getMemberId().getGroupId());
logAppenderMetrics.unregister();
raftServerMetrics.unregister();
Expand Down Expand Up @@ -675,6 +676,7 @@ void submitStepDownEvent(long term, StepDownReason reason) {

private void stepDown(long term, StepDownReason reason) {
try {
lease.getAndSetEnabled(false);
server.changeToFollowerAndPersistMetadata(term, false, reason);
pendingStepDown.complete(server::newSuccessReply);
} catch(IOException e) {
Expand Down Expand Up @@ -953,6 +955,7 @@ private void checkAndUpdateConfiguration() {
pendingRequests.replySetConfiguration(server::newSuccessReply);
// if the leader is not included in the current configuration, step down
if (!conf.containsInConf(server.getId(), RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER)) {
lease.getAndSetEnabled(false);
LOG.info("{} is not included in the new configuration {}. Will shutdown server...", this, conf);
try {
// leave some time for all RPC senders to send out new conf entry
Expand Down Expand Up @@ -1113,6 +1116,12 @@ CompletableFuture<Long> getReadIndex(Long readAfterWriteConsistentIndex) {
new LeaderNotReadyException(server.getMemberId())));
}

// if lease is enabled, check lease first
if (hasLease()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that even though lease read is not enabled, we are still calling hasLease frequently and attempting to extend lease many times. Maybe we could use some optimization here, since it's a critical step in the query.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right! Fixed, PTAL.

return CompletableFuture.completedFuture(readIndex);
}

// send heartbeats and wait for majority acknowledgments
final AppendEntriesListener listener = readIndexHeartbeats.addAppendEntriesListener(
readIndex, i -> new AppendEntriesListener(i, senders));

Expand All @@ -1129,7 +1138,15 @@ public void onAppendEntriesReply(LogAppender appender, RaftProtos.AppendEntriesR
readIndexHeartbeats.onAppendEntriesReply(appender, reply, this::hasMajority);
}

boolean getAndSetLeaseEnabled(boolean newValue) {
return lease.getAndSetEnabled(newValue);
}

boolean hasLease() {
if (!lease.isEnabled()) {
return false;
}

if (checkLeaderLease()) {
return true;
}
Expand All @@ -1143,7 +1160,8 @@ boolean hasLease() {
}

private boolean checkLeaderLease() {
return isReady() && (server.getRaftConf().isSingleton() || lease.isValid());
return isRunning() && isReady()
&& (server.getRaftConf().isSingleton() || lease.isValid());
}

void replyPendingRequest(long logIndex, RaftClientReply reply) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -999,8 +999,14 @@ private CompletableFuture<Long> getReadIndex(RaftClientRequest request, LeaderSt
}

private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request) {
if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE
&& !request.getType().getRead().getPreferNonLinearizable()) {
if (request.getType().getRead().getPreferNonLinearizable()
|| readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
if (reply != null) {
return reply;
}
return queryStateMachine(request);
} else if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE){
/*
Linearizable read using ReadIndex. See Raft paper section 6.4.
1. First obtain readIndex from Leader.
Expand All @@ -1027,13 +1033,6 @@ private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request)
.thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
.thenCompose(readIndex -> queryStateMachine(request))
.exceptionally(e -> readException2Reply(request, e));
} else if (readOption == RaftServerConfigKeys.Read.Option.DEFAULT
|| request.getType().getRead().getPreferNonLinearizable()) {
CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
if (reply != null) {
return reply;
}
return queryStateMachine(request);
} else {
throw new IllegalStateException("Unexpected read option: " + readOption);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,9 @@ private CompletableFuture<RaftClientReply> start(Context context) {
if (previous != null) {
return createReplyFutureFromPreviousRequest(request, previous);
}
// disable the lease before transferring leader
final boolean previousLeaseEnabled = server.getRole().getLeaderState()
.map(l -> l.getAndSetLeaseEnabled(false)).orElse(false);
final PendingRequest pendingRequest = supplier.get();
final Result result = tryTransferLeadership(context);
final Result.Type type = result.getType();
Expand All @@ -308,6 +311,12 @@ private CompletableFuture<RaftClientReply> start(Context context) {
timeout.toString(TimeUnit.SECONDS, 3))),
LOG, () -> "Failed to handle timeout");
}
// reset back lease if the current transfer fails
pendingRequest.getReplyFuture().whenCompleteAsync((reply, ex) -> {
if (ex != null || !reply.isSuccess()) {
server.getRole().getLeaderState().ifPresent(l -> l.getAndSetLeaseEnabled(previousLeaseEnabled));
}
});
return pendingRequest.getReplyFuture();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

Expand All @@ -69,16 +68,21 @@ public void setup() {
final RaftProperties p = getProperties();
p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
CounterStateMachine.class, StateMachine.class);

p.setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE);
}

@Test
public void testLinearizableRead() throws Exception {
runWithNewCluster(NUM_SERVERS, this::testLinearizableReadImpl);
getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE);
runWithNewCluster(NUM_SERVERS, this::testReadOnlyImpl);
}

private void testLinearizableReadImpl(CLUSTER cluster) throws Exception {
@Test
public void testLeaseRead() throws Exception {
getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true);
runWithNewCluster(NUM_SERVERS, this::testReadOnlyImpl);
}

private void testReadOnlyImpl(CLUSTER cluster) throws Exception {
try {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();
Expand All @@ -98,10 +102,17 @@ private void testLinearizableReadImpl(CLUSTER cluster) throws Exception {

@Test
public void testLinearizableReadTimeout() throws Exception {
runWithNewCluster(NUM_SERVERS, this::testLinearizableReadTimeoutImpl);
getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE);
runWithNewCluster(NUM_SERVERS, this::testReadOnlyTimeoutImpl);
}

@Test
public void testLeaseReadTimeout() throws Exception {
getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true);
runWithNewCluster(NUM_SERVERS, this::testReadOnlyTimeoutImpl);
}

private void testLinearizableReadTimeoutImpl(CLUSTER cluster) throws Exception {
private void testReadOnlyTimeoutImpl(CLUSTER cluster) throws Exception {
try {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();
Expand All @@ -126,10 +137,17 @@ private void testLinearizableReadTimeoutImpl(CLUSTER cluster) throws Exception {

@Test
public void testFollowerLinearizableRead() throws Exception {
runWithNewCluster(NUM_SERVERS, this::testFollowerLinearizableReadImpl);
getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE);
runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyImpl);
}

@Test
public void testFollowerLeaseRead() throws Exception {
getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true);
runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyImpl);
}

private void testFollowerLinearizableReadImpl(CLUSTER cluster) throws Exception {
private void testFollowerReadOnlyImpl(CLUSTER cluster) throws Exception {
try {
RaftTestUtil.waitForLeader(cluster);

Expand All @@ -155,10 +173,17 @@ private void testFollowerLinearizableReadImpl(CLUSTER cluster) throws Exception

@Test
public void testFollowerLinearizableReadParallel() throws Exception {
runWithNewCluster(NUM_SERVERS, this::testFollowerLinearizableReadParallelImpl);
getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE);
runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyParallelImpl);
}

private void testFollowerLinearizableReadParallelImpl(CLUSTER cluster) throws Exception {
@Test
public void testFollowerLeaseReadParallel() throws Exception {
getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true);
runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyParallelImpl);
}

private void testFollowerReadOnlyParallelImpl(CLUSTER cluster) throws Exception {
try {
RaftTestUtil.waitForLeader(cluster);

Expand All @@ -183,10 +208,17 @@ private void testFollowerLinearizableReadParallelImpl(CLUSTER cluster) throws Ex

@Test
public void testFollowerLinearizableReadFailWhenLeaderDown() throws Exception {
runWithNewCluster(NUM_SERVERS, this::testFollowerLinearizableReadFailWhenLeaderDownImpl);
getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE);
runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyFailWhenLeaderDownImpl);
}

private void testFollowerLinearizableReadFailWhenLeaderDownImpl(CLUSTER cluster) throws Exception {
@Test
public void testFollowerLeaseReadWhenLeaderDown() throws Exception {
getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true);
runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyFailWhenLeaderDownImpl);
}

private void testFollowerReadOnlyFailWhenLeaderDownImpl(CLUSTER cluster) throws Exception {
try {
RaftTestUtil.waitForLeader(cluster);

Expand Down Expand Up @@ -215,11 +247,18 @@ private void testFollowerLinearizableReadFailWhenLeaderDownImpl(CLUSTER cluster)
}

@Test
public void testFollowerLinearizableReadRetryWhenLeaderDown() throws Exception {
runWithNewCluster(NUM_SERVERS, this::testFollowerLinearizableReadRetryWhenLeaderDown);
public void testFollowerReadOnlyRetryWhenLeaderDown() throws Exception {
getProperties().setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE);
runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyRetryWhenLeaderDown);
}

@Test
public void testFollowerLeaseReadRetryWhenLeaderDown() throws Exception {
getProperties().setBoolean(RaftServerConfigKeys.Read.LEADER_LEASE_ENABLED_KEY, true);
runWithNewCluster(NUM_SERVERS, this::testFollowerReadOnlyRetryWhenLeaderDown);
}

private void testFollowerLinearizableReadRetryWhenLeaderDown(CLUSTER cluster) throws Exception {
private void testFollowerReadOnlyRetryWhenLeaderDown(CLUSTER cluster) throws Exception {
// only retry on readIndexException
final RetryPolicy retryPolicy = ExceptionDependentRetry
.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,7 @@ private void runLeaseTest(CLUSTER cluster, CheckedBiConsumer<CLUSTER, Long, Exce
@Test
public void testLeaderLease() throws Exception {
// use a strict lease
RaftServerConfigKeys.Read.setLeaderLeaseEnabled(getProperties(), true);
RaftServerConfigKeys.Read.setLeaderLeaseTimeoutRatio(getProperties(), 0.5);
runWithNewCluster(3, c -> runLeaseTest(c, this::runTestLeaderLease));
}
Expand Down Expand Up @@ -679,6 +680,7 @@ void runTestLeaderLease(CLUSTER cluster, long leaseTimeoutMs) throws Exception {
@Test
public void testLeaderLeaseDuringReconfiguration() throws Exception {
// use a strict lease
RaftServerConfigKeys.Read.setLeaderLeaseEnabled(getProperties(), true);
RaftServerConfigKeys.Read.setLeaderLeaseTimeoutRatio(getProperties(), 0.5);
runWithNewCluster(3, c -> runLeaseTest(c, this::runTestLeaderLeaseDuringReconfiguration));
}
Expand Down