Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ default CompletableFuture<RaftClientReply> sendReadOnly(Message message) {
*/
CompletableFuture<RaftClientReply> sendReadOnly(Message message, RaftPeerId server);

/**
* Send the given readonly message asynchronously to the raft service.
* The result will be read-after-write consistent, i.e. reflecting the latest successful write by the same client.
* @param message The request message.
* @return the reply.
*/
CompletableFuture<RaftClientReply> sendReadAfterWrite(Message message);


/**
* Send the given readonly message asynchronously to the raft service using non-linearizable read.
* This method is useful when linearizable read is enabled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ default RaftClientReply sendReadOnly(Message message) throws IOException {
*/
RaftClientReply sendReadOnlyNonLinearizable(Message message) throws IOException;

/**
* Send the given readonly message to the raft service.
* The result will be read-after-write consistent, i.e. reflecting the latest successful write by the same client.
* @param message The request message.
* @return the reply.
*/
RaftClientReply sendReadAfterWrite(Message message) throws IOException;

/**
* Send the given stale-read message to the given server (not the raft service).
* If the server commit index is larger than or equal to the given min-index, the request will be processed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public CompletableFuture<RaftClientReply> sendReadOnly(Message message, RaftPeer
return send(RaftClientRequest.readRequestType(), message, server);
}

@Override
public CompletableFuture<RaftClientReply> sendReadAfterWrite(Message message) {
return send(RaftClientRequest.readAfterWriteConsistentRequestType(), message, null);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SzyWilliam , Should sendReadAfterWrite use UnorderedAsync.send(..)? Indeed, all the linearizable reads should use UnorderedAsync.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true! Thanks for pointing out, I'll change all the linearizable reads to use UnorderedAsync

}

@Override
public CompletableFuture<RaftClientReply> sendReadOnlyNonLinearizable(Message message) {
return send(RaftClientRequest.readRequestType(true), message, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public RaftClientReply sendReadOnlyNonLinearizable(Message message) throws IOExc
return send(RaftClientRequest.readRequestType(true), message, null);
}

@Override
public RaftClientReply sendReadAfterWrite(Message message) throws IOException {
return send(RaftClientRequest.readAfterWriteConsistentRequestType(), message, null);
}

@Override
public RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId server)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ public class RaftClientRequest extends RaftClientMessage {
private static final Type WATCH_DEFAULT = new Type(
WatchRequestTypeProto.newBuilder().setIndex(0L).setReplication(ReplicationLevel.MAJORITY).build());

private static final Type READ_AFTER_WRITE_CONSISTENT_DEFAULT
= new Type(ReadRequestTypeProto.newBuilder().setReadAfterWriteConsistent(true).build());
private static final Type READ_DEFAULT = new Type(ReadRequestTypeProto.getDefaultInstance());
private static final Type
READ_NONLINEARIZABLE_DEFAULT = new Type(ReadRequestTypeProto.newBuilder().setPreferNonLinearizable(true).build());
private static final Type READ_NONLINEARIZABLE_DEFAULT
= new Type(ReadRequestTypeProto.newBuilder().setPreferNonLinearizable(true).build());
private static final Type STALE_READ_DEFAULT = new Type(StaleReadRequestTypeProto.getDefaultInstance());

public static Type writeRequestType() {
Expand All @@ -60,6 +62,10 @@ public static Type messageStreamRequestType(long streamId, long messageId, boole
.build());
}

public static Type readAfterWriteConsistentRequestType() {
return READ_AFTER_WRITE_CONSISTENT_DEFAULT;
}

public static Type readRequestType() {
return READ_DEFAULT;
}
Expand Down Expand Up @@ -95,7 +101,9 @@ public static Type valueOf(ForwardRequestTypeProto forward) {
}

public static Type valueOf(ReadRequestTypeProto read) {
return read.getPreferNonLinearizable()? READ_NONLINEARIZABLE_DEFAULT: READ_DEFAULT;
return read.getPreferNonLinearizable()? READ_NONLINEARIZABLE_DEFAULT
: read.getReadAfterWriteConsistent()? READ_AFTER_WRITE_CONSISTENT_DEFAULT
: READ_DEFAULT;
}

public static Type valueOf(StaleReadRequestTypeProto staleRead) {
Expand Down Expand Up @@ -219,7 +227,10 @@ public String toString() {
case MESSAGESTREAM:
return toString(getMessageStream());
case READ:
return "RO";
final ReadRequestTypeProto read = getRead();
return read.getReadAfterWriteConsistent()? "RaW"
: read.getPreferNonLinearizable()? "RO(pNL)"
: "RO";
case STALEREAD:
return "StaleRead(" + getStaleRead().getMinIndex() + ")";
case WATCH:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;

import org.apache.ratis.BaseTest;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.AsyncApi;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine;
import org.apache.ratis.examples.arithmetic.expression.DoubleValue;
import org.apache.ratis.examples.arithmetic.expression.Expression;
import org.apache.ratis.examples.arithmetic.expression.Variable;
import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.Slf4jUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.event.Level;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.apache.ratis.examples.arithmetic.expression.BinaryExpression.Op.ADD;

public class TestReadAfterWrite
extends BaseTest
implements MiniRaftClusterWithGrpc.FactoryGet {

@Before
public void setup() {
Slf4jUtils.setLogLevel(ArithmeticStateMachine.LOG, Level.DEBUG);
Slf4jUtils.setLogLevel(CodeInjectionForTesting.LOG, Level.DEBUG);
Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
RaftServerTestUtil.setStateMachineUpdaterLogLevel(Level.DEBUG);

final RaftProperties p = getProperties();
p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
ArithmeticStateMachine.class, StateMachine.class);
RaftServerConfigKeys.Read.setOption(p, RaftServerConfigKeys.Read.Option.LINEARIZABLE);

}

static class BlockingCode implements CodeInjectionForTesting.Code {
private final CompletableFuture<Void> future = new CompletableFuture<>();

void complete() {
future.complete(null);
}

@Override
public boolean execute(Object localId, Object remoteId, Object... args) {
final boolean blocked = !future.isDone();
if (blocked) {
LOG.info("Server {} blocks client {}: {}", localId, remoteId, args[0]);
}
future.join();
if (blocked) {
LOG.info("Server {} unblocks client {}", localId, remoteId);
}
return true;
}
}

@Test
public void testReadAfterWriteSingleServer() throws Exception {
runWithNewCluster(1, cluster -> {
try (final RaftClient client = cluster.createClient()) {
runTestReadAfterWrite(client);
}
});
}

@Test
public void testReadAfterWrite() throws Exception {
runWithNewCluster(3, cluster -> {
try (final RaftClient client = cluster.createClient()) {
runTestReadAfterWrite(client);
}
});
}

void runTestReadAfterWrite(RaftClient client) throws Exception {
final Variable a = new Variable("a");
final Expression a_plus_2 = ADD.apply(a, new DoubleValue(2));

final AsyncApi async = client.async();
final int initialValue = 10;
final RaftClientReply assign = async.send(a.assign(new DoubleValue(initialValue))).join();
Assert.assertTrue(assign.isSuccess());

final Message query = Expression.Utils.toMessage(a);
assertReply(async.sendReadOnly(query), initialValue);

//block state machine
final BlockingCode blockingCode = new BlockingCode();
CodeInjectionForTesting.put(RaftServerImpl.APPEND_TRANSACTION, blockingCode);
final CompletableFuture<RaftClientReply> plus2 = async.send(a.assign(a_plus_2));

final CompletableFuture<RaftClientReply> readOnlyUnordered = async.sendReadOnlyUnordered(query);
final CompletableFuture<RaftClientReply> readAfterWrite = async.sendReadAfterWrite(query);

Thread.sleep(1000);
// readOnlyUnordered should get 10
assertReply(readOnlyUnordered, initialValue);

LOG.info("readAfterWrite.get");
try {
// readAfterWrite should time out
final RaftClientReply reply = readAfterWrite.get(100, TimeUnit.MILLISECONDS);
final DoubleValue result = (DoubleValue) Expression.Utils.bytes2Expression(
reply.getMessage().getContent().toByteArray(), 0);
Assert.fail("result=" + result + ", reply=" + reply);
} catch (TimeoutException e) {
LOG.info("Good", e);
}

// plus2 should still be blocked.
Assert.assertFalse(plus2.isDone());
// readAfterWrite should still be blocked.
Assert.assertFalse(readAfterWrite.isDone());

// unblock plus2
blockingCode.complete();

// readAfterWrite should get 12.
assertReply(readAfterWrite, initialValue + 2);
}

void assertReply(CompletableFuture<RaftClientReply> future, int expected) {
LOG.info("assertReply, expected {}", expected);
final RaftClientReply reply = future.join();
Assert.assertTrue(reply.isSuccess());
LOG.info("reply {}", reply);
final DoubleValue result = (DoubleValue) Expression.Utils.bytes2Expression(
reply.getMessage().getContent().toByteArray(), 0);
Assert.assertEquals(expected, (int) (double) result.evaluate(null));
}
}
2 changes: 2 additions & 0 deletions ratis-proto/src/main/proto/Raft.proto
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ message InstallSnapshotReplyProto {

message ReadIndexRequestProto {
RaftRpcRequestProto serverRequest = 1;
RaftClientRequestProto clientRequest = 2; // clientRequest is used to support read-after-write consistency
}

message ReadIndexReplyProto {
Expand Down Expand Up @@ -295,6 +296,7 @@ message ForwardRequestTypeProto {

message ReadRequestTypeProto {
bool preferNonLinearizable = 1;
bool readAfterWriteConsistent = 2;
}

message StaleReadRequestTypeProto {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,23 @@ static Option option(RaftProperties properties) {
static void setOption(RaftProperties properties, Option option) {
set(properties::setEnum, OPTION_KEY, option);
}

interface ReadAfterWriteConsistent {
String PREFIX = RaftServerConfigKeys.PREFIX + ".read-after-write-consistent";

String WRITE_INDEX_CACHE_EXPIRY_TIME_KEY = PREFIX + "write-index-cache.expiry-time";
/** Must be larger than {@link Read#TIMEOUT_DEFAULT}. */
TimeDuration WRITE_INDEX_CACHE_EXPIRY_TIME_DEFAULT = TimeDuration.valueOf(60, TimeUnit.SECONDS);

static TimeDuration writeIndexCacheExpiryTime(RaftProperties properties) {
return getTimeDuration(properties.getTimeDuration(WRITE_INDEX_CACHE_EXPIRY_TIME_DEFAULT.getUnit()),
WRITE_INDEX_CACHE_EXPIRY_TIME_KEY, WRITE_INDEX_CACHE_EXPIRY_TIME_DEFAULT, getDefaultLog());
}

static void setWriteIndexCacheExpiryTime(RaftProperties properties, TimeDuration expiryTime) {
setTimeDuration(properties::setTimeDuration, WRITE_INDEX_CACHE_EXPIRY_TIME_KEY, expiryTime);
}
}
}

interface Write {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1089,16 +1089,23 @@ public boolean checkLeadership() {
* 4. If majority respond success, returns readIndex.
* @return current readIndex.
*/
CompletableFuture<Long> getReadIndex() {
final long readIndex = server.getRaftLog().getLastCommittedIndex();
CompletableFuture<Long> getReadIndex(Long readAfterWriteConsistentIndex) {
final long readIndex;
if (readAfterWriteConsistentIndex != null) {
readIndex = readAfterWriteConsistentIndex;
} else {
readIndex = server.getRaftLog().getLastCommittedIndex();
}
LOG.debug("readIndex={}, readAfterWriteConsistentIndex={}", readIndex, readAfterWriteConsistentIndex);

// if group contains only one member, fast path
if (server.getRaftConf().isSingleton()) {
return CompletableFuture.completedFuture(readIndex);
}

// leader has not committed any entries in this term, reject
if (server.getRaftLog().getTermIndex(readIndex).getTerm() != getCurrentTerm()) {
// TODO: wait for leader to become ready instead of failing the request.
if (!isReady()) {
return JavaUtils.completeExceptionally(new ReadIndexException(
"Failed to getReadIndex " + readIndex + " since the term is not yet committed.",
new LeaderNotReadyException(server.getMemberId())));
Expand Down
Loading