From a95b85840ad261c920ad0ed8fa6801ea60ab214a Mon Sep 17 00:00:00 2001 From: szywilliam Date: Fri, 1 Sep 2023 12:02:14 +0800 Subject: [PATCH 1/5] support read-after-write consistency --- .../org/apache/ratis/client/api/AsyncApi.java | 10 +++ .../apache/ratis/client/api/BlockingApi.java | 8 +++ .../apache/ratis/client/impl/AsyncImpl.java | 6 ++ .../ratis/client/impl/BlockingImpl.java | 5 ++ .../ratis/protocol/RaftClientRequest.java | 14 +++- ratis-proto/src/main/proto/Raft.proto | 2 + .../ratis/server/RaftServerConfigKeys.java | 17 +++++ .../ratis/server/impl/LeaderStateImpl.java | 16 +++-- .../ratis/server/impl/RaftServerImpl.java | 40 +++++++---- .../ratis/server/impl/ServerProtoUtils.java | 5 +- .../ratis/server/impl/WriteIndexCache.java | 69 +++++++++++++++++++ .../impl/TransactionContextImpl.java | 10 +++ .../apache/ratis/ReadOnlyRequestTests.java | 32 +++++++++ 13 files changed, 211 insertions(+), 23 deletions(-) create mode 100644 ratis-server/src/main/java/org/apache/ratis/server/impl/WriteIndexCache.java diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java index c6f5e4181b..47aa2dddd8 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.client.api; +import java.io.IOException; import java.util.concurrent.CompletableFuture; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.protocol.Message; @@ -55,6 +56,15 @@ default CompletableFuture sendReadOnly(Message message) { */ CompletableFuture sendReadOnly(Message message, RaftPeerId server); + /** + * Send the given readonly message asynchronously to the raft service. + * The result will be read-after-write consistent, i.e. reflecting the latest successful write by the same client. + * @param message The request message. + * @return the reply. + */ + CompletableFuture sendReadAfterWrite(Message message) throws IOException; + + /** * Send the given readonly message asynchronously to the raft service using non-linearizable read. * This method is useful when linearizable read is enabled diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java index dc03e1b8de..64d63ff293 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/api/BlockingApi.java @@ -65,6 +65,14 @@ default RaftClientReply sendReadOnly(Message message) throws IOException { */ RaftClientReply sendReadOnlyNonLinearizable(Message message) throws IOException; + /** + * Send the given readonly message to the raft service. + * The result will be read-after-write consistent, i.e. reflecting the latest successful write by the same client. + * @param message The request message. + * @return the reply. + */ + RaftClientReply sendReadAfterWrite(Message message) throws IOException; + /** * Send the given stale-read message to the given server (not the raft service). * If the server commit index is larger than or equal to the given min-index, the request will be processed. diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java index 9bdc9d50a0..54ba10ce6a 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java @@ -17,6 +17,7 @@ */ package org.apache.ratis.client.impl; +import java.io.IOException; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -51,6 +52,11 @@ public CompletableFuture sendReadOnly(Message message, RaftPeer return send(RaftClientRequest.readRequestType(), message, server); } + @Override + public CompletableFuture sendReadAfterWrite(Message message) throws IOException { + return send(RaftClientRequest.readAfterWriteConsistentRequestType(), message, null); + } + @Override public CompletableFuture sendReadOnlyNonLinearizable(Message message) { return send(RaftClientRequest.readRequestType(true), message, null); diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java index 7e81baf8d5..742ee29010 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java @@ -69,6 +69,11 @@ public RaftClientReply sendReadOnlyNonLinearizable(Message message) throws IOExc return send(RaftClientRequest.readRequestType(true), message, null); } + @Override + public RaftClientReply sendReadAfterWrite(Message message) throws IOException { + return send(RaftClientRequest.readAfterWriteConsistentRequestType(), message, null); + } + @Override public RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId server) throws IOException { diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java index ae76607ff3..4b0c5166bd 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java @@ -35,9 +35,11 @@ public class RaftClientRequest extends RaftClientMessage { private static final Type WATCH_DEFAULT = new Type( WatchRequestTypeProto.newBuilder().setIndex(0L).setReplication(ReplicationLevel.MAJORITY).build()); + private static final Type READ_AFTER_WRITE_CONSISTENT_DEFAULT + = new Type(ReadRequestTypeProto.newBuilder().setReadAfterWriteConsistent(true).build()); private static final Type READ_DEFAULT = new Type(ReadRequestTypeProto.getDefaultInstance()); - private static final Type - READ_NONLINEARIZABLE_DEFAULT = new Type(ReadRequestTypeProto.newBuilder().setPreferNonLinearizable(true).build()); + private static final Type READ_NONLINEARIZABLE_DEFAULT + = new Type(ReadRequestTypeProto.newBuilder().setPreferNonLinearizable(true).build()); private static final Type STALE_READ_DEFAULT = new Type(StaleReadRequestTypeProto.getDefaultInstance()); public static Type writeRequestType() { @@ -60,6 +62,10 @@ public static Type messageStreamRequestType(long streamId, long messageId, boole .build()); } + public static Type readAfterWriteConsistentRequestType() { + return READ_AFTER_WRITE_CONSISTENT_DEFAULT; + } + public static Type readRequestType() { return READ_DEFAULT; } @@ -95,7 +101,9 @@ public static Type valueOf(ForwardRequestTypeProto forward) { } public static Type valueOf(ReadRequestTypeProto read) { - return read.getPreferNonLinearizable()? READ_NONLINEARIZABLE_DEFAULT: READ_DEFAULT; + return read.getPreferNonLinearizable()? READ_NONLINEARIZABLE_DEFAULT + : read.getReadAfterWriteConsistent()? READ_AFTER_WRITE_CONSISTENT_DEFAULT + : READ_DEFAULT; } public static Type valueOf(StaleReadRequestTypeProto staleRead) { diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto index b8680051f8..49a107c455 100644 --- a/ratis-proto/src/main/proto/Raft.proto +++ b/ratis-proto/src/main/proto/Raft.proto @@ -241,6 +241,7 @@ message InstallSnapshotReplyProto { message ReadIndexRequestProto { RaftRpcRequestProto serverRequest = 1; + RaftClientRequestProto clientRequest = 2; // clientRequest is used to support read-after-write consistency } message ReadIndexReplyProto { @@ -295,6 +296,7 @@ message ForwardRequestTypeProto { message ReadRequestTypeProto { bool preferNonLinearizable = 1; + bool readAfterWriteConsistent = 2; } message StaleReadRequestTypeProto { diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java index 211edd7960..e561a3cd92 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java @@ -191,6 +191,23 @@ static Option option(RaftProperties properties) { static void setOption(RaftProperties properties, Option option) { set(properties::setEnum, OPTION_KEY, option); } + + interface ReadAfterWriteConsistent { + String PREFIX = RaftServerConfigKeys.PREFIX + ".read-after-write-consistent"; + + String WRITE_INDEX_CACHE_EXPIRY_TIME_KEY = PREFIX + "write-index-cache.expiry-time"; + /** Must be larger than {@link Read#TIMEOUT_DEFAULT}. */ + TimeDuration WRITE_INDEX_CACHE_EXPIRY_TIME_DEFAULT = TimeDuration.valueOf(60, TimeUnit.SECONDS); + + static TimeDuration writeIndexCacheExpiryTime(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(WRITE_INDEX_CACHE_EXPIRY_TIME_DEFAULT.getUnit()), + WRITE_INDEX_CACHE_EXPIRY_TIME_KEY, WRITE_INDEX_CACHE_EXPIRY_TIME_DEFAULT, getDefaultLog()); + } + + static void setWriteIndexCacheExpiryTime(RaftProperties properties, TimeDuration expiryTime) { + setTimeDuration(properties::setTimeDuration, WRITE_INDEX_CACHE_EXPIRY_TIME_KEY, expiryTime); + } + } } interface Write { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index ac8c3599ff..c8f1aa4277 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -26,6 +26,7 @@ import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; @@ -50,6 +51,7 @@ import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.thirdparty.org.checkerframework.checker.units.qual.A; import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.Daemon; @@ -1089,16 +1091,22 @@ public boolean checkLeadership() { * 4. If majority respond success, returns readIndex. * @return current readIndex. */ - CompletableFuture getReadIndex() { - final long readIndex = server.getRaftLog().getLastCommittedIndex(); + CompletableFuture getReadIndex(Long readAfterWriteConsistentIndex) { + final long readIndex; + if (readAfterWriteConsistentIndex != null) { + readIndex = readAfterWriteConsistentIndex; + } else { + readIndex = server.getRaftLog().getLastCommittedIndex(); + } // if group contains only one member, fast path if (server.getRaftConf().isSingleton()) { - return CompletableFuture.completedFuture(readIndex); + return CompletableFuture.completedFuture(readIndex); } // leader has not committed any entries in this term, reject - if (server.getRaftLog().getTermIndex(readIndex).getTerm() != getCurrentTerm()) { + // TODO: wait for leader to become ready instead of failing the request. + if (!isReady()) { return JavaUtils.completeExceptionally(new ReadIndexException( "Failed to getReadIndex " + readIndex + " since the term is not yet committed.", new LeaderNotReadyException(server.getMemberId()))); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 3b225dc3c8..09a53a1050 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -68,6 +68,7 @@ import org.apache.ratis.proto.RaftProtos.ServerRpcProto; import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto; import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto; +import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.ClientInvocationId; import org.apache.ratis.protocol.GroupInfoReply; import org.apache.ratis.protocol.GroupInfoRequest; @@ -125,6 +126,7 @@ import org.apache.ratis.statemachine.SnapshotInfo; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.statemachine.impl.TransactionContextImpl; import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException; import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.CollectionUtils; @@ -222,6 +224,7 @@ public long[] getFollowerNextIndices() { private final RetryCacheImpl retryCache; private final CommitInfoCache commitInfoCache = new CommitInfoCache(); + private final WriteIndexCache writeIndexCache; private final RaftServerJmxAdapter jmxAdapter; private final LeaderElectionMetrics leaderElectionMetrics; @@ -262,6 +265,7 @@ public long[] getFollowerNextIndices() { this.retryCache = new RetryCacheImpl(properties); this.dataStreamMap = new DataStreamMapImpl(id); this.readOption = RaftServerConfigKeys.Read.option(properties); + this.writeIndexCache = new WriteIndexCache(properties); this.jmxAdapter = new RaftServerJmxAdapter(); this.leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics( @@ -803,7 +807,7 @@ void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws GroupM * Handle a normal update request from client. */ private CompletableFuture appendTransaction( - RaftClientRequest request, TransactionContext context, CacheEntry cacheEntry) throws IOException { + RaftClientRequest request, TransactionContextImpl context, CacheEntry cacheEntry) throws IOException { assertLifeCycleState(LifeCycle.States.RUNNING); CompletableFuture reply; @@ -816,6 +820,8 @@ private CompletableFuture appendTransaction( // append the message to its local log final LeaderStateImpl leaderState = role.getLeaderStateNonNull(); + writeIndexCache.add(request.getClientId(), context.getLogIndexFuture()); + final PendingRequests.Permit permit = leaderState.tryAcquirePendingRequest(request.getMessage()); if (permit == null) { cacheEntry.failWithException(new ResourceUnavailableException( @@ -923,7 +929,8 @@ public CompletableFuture submitClientRequestAsync( // TODO: this client request will not be added to pending requests until // later which means that any failure in between will leave partial state in // the state machine. We should call cancelTransaction() for failed requests - TransactionContext context = stateMachine.startTransaction(filterDataStreamRaftClientRequest(request)); + final TransactionContextImpl context = (TransactionContextImpl) stateMachine.startTransaction( + filterDataStreamRaftClientRequest(request)); if (context.getException() != null) { final StateMachineException e = new StateMachineException(getMemberId(), context.getException()); final RaftClientReply exceptionReply = newExceptionReply(request, e); @@ -970,12 +977,13 @@ ReadRequests getReadRequests() { return getState().getReadRequests(); } - private CompletableFuture sendReadIndexAsync() { + private CompletableFuture sendReadIndexAsync(RaftClientRequest clientRequest) { final RaftPeerId leaderId = getInfo().getLeaderId(); if (leaderId == null) { return JavaUtils.completeExceptionally(new ReadIndexException(getMemberId() + ": Leader is unknown.")); } - final ReadIndexRequestProto request = ServerProtoUtils.toReadIndexRequestProto(getMemberId(), leaderId); + final ReadIndexRequestProto request = + ServerProtoUtils.toReadIndexRequestProto(clientRequest, getMemberId(), leaderId); try { return getServerRpc().async().readIndexAsync(request); } catch (IOException e) { @@ -983,6 +991,10 @@ private CompletableFuture sendReadIndexAsync() { } } + private CompletableFuture getReadIndex(RaftClientRequest request, LeaderStateImpl leader) { + return writeIndexCache.getWriteIndexFuture(request).thenCompose(leader::getReadIndex); + } + private CompletableFuture readAsync(RaftClientRequest request) { if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE && !request.getType().getRead().getPreferNonLinearizable()) { @@ -996,9 +1008,9 @@ private CompletableFuture readAsync(RaftClientRequest request) final CompletableFuture replyFuture; if (leader != null) { - replyFuture = leader.getReadIndex(); + replyFuture = getReadIndex(request, leader); } else { - replyFuture = sendReadIndexAsync().thenApply(reply -> { + replyFuture = sendReadIndexAsync(request).thenApply(reply -> { if (reply.getServerReply().getSuccess()) { return reply.getReadIndex(); } else { @@ -1454,7 +1466,7 @@ public CompletableFuture readIndexAsync(ReadIndexRequestPro ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(), false, INVALID_LOG_INDEX)); } - return leader.getReadIndex() + return getReadIndex(ClientProtoUtils.toRaftClientRequest(request.getClientRequest()), leader) .thenApply(index -> ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(), true, index)) .exceptionally(throwable -> ServerProtoUtils.toReadIndexReplyProto(peerId, getMemberId(), false, INVALID_LOG_INDEX)); @@ -1754,14 +1766,11 @@ void submitUpdateCommitEvent() { /** * The log has been submitted to the state machine. Use the future to update * the pending requests and retry cache. - * @param logEntry the log entry that has been submitted to the state machine * @param stateMachineFuture the future returned by the state machine * from which we will get transaction result later */ private CompletableFuture replyPendingRequest( - LogEntryProto logEntry, CompletableFuture stateMachineFuture) { - Preconditions.assertTrue(logEntry.hasStateMachineLogEntry()); - final ClientInvocationId invocationId = ClientInvocationId.valueOf(logEntry.getStateMachineLogEntry()); + ClientInvocationId invocationId, long logIndex, CompletableFuture stateMachineFuture) { // update the retry cache final CacheEntry cacheEntry = retryCache.getOrCreateEntry(invocationId); Preconditions.assertTrue(cacheEntry != null); @@ -1772,7 +1781,6 @@ private CompletableFuture replyPendingRequest( retryCache.refreshEntry(new CacheEntry(cacheEntry.getKey())); } - final long logIndex = logEntry.getIndex(); return stateMachineFuture.whenComplete((reply, exception) -> { final RaftClientReply.Builder b = newReplyBuilder(invocationId, logIndex); final RaftClientReply r; @@ -1805,19 +1813,21 @@ CompletableFuture applyLogToStateMachine(LogEntryProto next) throws Raf } else if (next.hasStateMachineLogEntry()) { // check whether there is a TransactionContext because we are the leader. TransactionContext trx = role.getLeaderState() - .map(leader -> leader.getTransactionContext(next.getIndex())).orElseGet( - () -> TransactionContext.newBuilder() + .map(leader -> leader.getTransactionContext(next.getIndex())) + .orElseGet(() -> TransactionContext.newBuilder() .setServerRole(role.getCurrentRole()) .setStateMachine(stateMachine) .setLogEntry(next) .build()); + final ClientInvocationId invocationId = ClientInvocationId.valueOf(next.getStateMachineLogEntry()); + writeIndexCache.add(invocationId.getClientId(), ((TransactionContextImpl) trx).getLogIndexFuture()); try { // Let the StateMachine inject logic for committed transactions in sequential order. trx = stateMachine.applyTransactionSerial(trx); final CompletableFuture stateMachineFuture = stateMachine.applyTransaction(trx); - return replyPendingRequest(next, stateMachineFuture); + return replyPendingRequest(invocationId, next.getIndex(), stateMachineFuture); } catch (Exception e) { throw new RaftLogIOException(e); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 108b6c939d..9e99178c69 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -20,6 +20,8 @@ import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.proto.RaftProtos.*; import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftGroupMemberId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; @@ -110,9 +112,10 @@ static InstallSnapshotReplyProto toInstallSnapshotReplyProto( } static ReadIndexRequestProto toReadIndexRequestProto( - RaftGroupMemberId requestorId, RaftPeerId replyId) { + RaftClientRequest clientRequest, RaftGroupMemberId requestorId, RaftPeerId replyId) { return ReadIndexRequestProto.newBuilder() .setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId)) + .setClientRequest(ClientProtoUtils.toRaftClientRequestProto(clientRequest)) .build(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/WriteIndexCache.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/WriteIndexCache.java new file mode 100644 index 0000000000..8d2c410921 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/WriteIndexCache.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.thirdparty.com.google.common.cache.Cache; +import org.apache.ratis.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.ratis.util.TimeDuration; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +/** Caching the per client write index in order to support read-after-write consistency. */ +class WriteIndexCache { + private final Cache>> cache; + + WriteIndexCache(RaftProperties properties) { + this(RaftServerConfigKeys.Read.ReadAfterWriteConsistent.writeIndexCacheExpiryTime(properties)); + } + + /** + * @param cacheExpiryTime time for a cache entry to expire. + */ + WriteIndexCache(TimeDuration cacheExpiryTime) { + this.cache = CacheBuilder.newBuilder() + .expireAfterAccess(cacheExpiryTime.getDuration(), cacheExpiryTime.getUnit()) + .build(); + } + + void add(ClientId key, CompletableFuture future) { + final AtomicReference> ref; + try { + ref = cache.get(key, AtomicReference::new); + } catch (ExecutionException e) { + throw new IllegalStateException(e); + } + ref.set(future); + } + + CompletableFuture getWriteIndexFuture(RaftClientRequest request) { + if (request != null && request.getType().getRead().getReadAfterWriteConsistent()) { + final AtomicReference> ref = cache.getIfPresent(request.getClientId()); + final CompletableFuture future = ref != null? ref.get(): null; + if (future != null) { + return future; + } + } + return CompletableFuture.completedFuture(null); + } +} diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java index 8cedb4a7c8..a1a878e7db 100644 --- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/TransactionContextImpl.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.Objects; +import java.util.concurrent.CompletableFuture; /** * Implementation of {@link TransactionContext} @@ -68,6 +69,8 @@ public class TransactionContextImpl implements TransactionContext { /** Committed LogEntry. */ private LogEntryProto logEntry; + private final CompletableFuture logIndexFuture = new CompletableFuture<>(); + private TransactionContextImpl(RaftPeerRole serverRole, RaftClientRequest clientRequest, StateMachine stateMachine, StateMachineLogEntryProto stateMachineLogEntry) { this.serverRole = serverRole; @@ -107,6 +110,7 @@ private static StateMachineLogEntryProto get(StateMachineLogEntryProto stateMach TransactionContextImpl(RaftPeerRole serverRole, StateMachine stateMachine, LogEntryProto logEntry) { this(serverRole, null, stateMachine, logEntry.getStateMachineLogEntry()); this.logEntry = logEntry; + this.logIndexFuture.complete(logEntry.getIndex()); } @Override @@ -145,9 +149,15 @@ public LogEntryProto initLogEntry(long term, long index) { Preconditions.assertTrue(serverRole == RaftPeerRole.LEADER); Preconditions.assertNull(logEntry, "logEntry"); Objects.requireNonNull(stateMachineLogEntry, "stateMachineLogEntry == null"); + + logIndexFuture.complete(index); return logEntry = LogProtoUtils.toLogEntryProto(stateMachineLogEntry, term, index); } + public CompletableFuture getLogIndexFuture() { + return logIndexFuture; + } + @Override public LogEntryProto getLogEntry() { return logEntry; diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java index c4c31cd22f..a919a92926 100644 --- a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java +++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java @@ -43,6 +43,7 @@ import java.nio.charset.StandardCharsets; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -244,6 +245,37 @@ private void testFollowerLinearizableReadRetryWhenLeaderDown(CLUSTER cluster) th } } + @Test + public void testReadAfterWrite() throws Exception { + runWithNewCluster(NUM_SERVERS, this::testReadAfterWriteImpl); + } + + private void testReadAfterWriteImpl(CLUSTER cluster) throws Exception { + RaftTestUtil.waitForLeader(cluster); + try (RaftClient client = cluster.createClient()) { + // test blocking read-after-write + client.io().send(incrementMessage); + final RaftClientReply blockReply = client.io().sendReadAfterWrite(queryMessage); + Assert.assertEquals(1, retrieve(blockReply)); + + // test asynchronous read-after-write + client.async().send(incrementMessage); + client.async().sendReadAfterWrite(queryMessage).thenAccept(reply -> { + Assert.assertEquals(2, retrieve(reply)); + }); + + for (int i = 0; i < 20; i++) { + client.async().send(incrementMessage); + } + final CompletableFuture linearizable = client.async().sendReadOnly(queryMessage); + final CompletableFuture readAfterWrite = client.async().sendReadAfterWrite(queryMessage); + + CompletableFuture.allOf(linearizable, readAfterWrite).get(); + // read-after-write is more consistent than linearizable read + Assert.assertTrue(retrieve(readAfterWrite.get()) >= retrieve(linearizable.get())); + } + } + static int retrieve(RaftClientReply reply) { return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8)); } From ef16989a05a1ed3726ed5f2f2e14bca2f866d48f Mon Sep 17 00:00:00 2001 From: szywilliam Date: Fri, 1 Sep 2023 12:58:49 +0800 Subject: [PATCH 2/5] support read-after-write consistency --- .../src/main/java/org/apache/ratis/client/api/AsyncApi.java | 3 +-- .../src/main/java/org/apache/ratis/client/impl/AsyncImpl.java | 3 +-- .../java/org/apache/ratis/server/impl/LeaderStateImpl.java | 3 +-- .../main/java/org/apache/ratis/server/impl/RaftServerImpl.java | 1 - .../java/org/apache/ratis/server/impl/ServerProtoUtils.java | 1 - 5 files changed, 3 insertions(+), 8 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java index 47aa2dddd8..483a22205e 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/api/AsyncApi.java @@ -17,7 +17,6 @@ */ package org.apache.ratis.client.api; -import java.io.IOException; import java.util.concurrent.CompletableFuture; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.protocol.Message; @@ -62,7 +61,7 @@ default CompletableFuture sendReadOnly(Message message) { * @param message The request message. * @return the reply. */ - CompletableFuture sendReadAfterWrite(Message message) throws IOException; + CompletableFuture sendReadAfterWrite(Message message); /** diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java index 54ba10ce6a..2f7069f397 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java @@ -17,7 +17,6 @@ */ package org.apache.ratis.client.impl; -import java.io.IOException; import java.util.Objects; import java.util.concurrent.CompletableFuture; @@ -53,7 +52,7 @@ public CompletableFuture sendReadOnly(Message message, RaftPeer } @Override - public CompletableFuture sendReadAfterWrite(Message message) throws IOException { + public CompletableFuture sendReadAfterWrite(Message message) { return send(RaftClientRequest.readAfterWriteConsistentRequestType(), message, null); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index c8f1aa4277..783e8fc3c8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -26,7 +26,6 @@ import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; -import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientReply; import org.apache.ratis.protocol.RaftClientRequest; @@ -1101,7 +1100,7 @@ CompletableFuture getReadIndex(Long readAfterWriteConsistentIndex) { // if group contains only one member, fast path if (server.getRaftConf().isSingleton()) { - return CompletableFuture.completedFuture(readIndex); + return CompletableFuture.completedFuture(readIndex); } // leader has not committed any entries in this term, reject diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 09a53a1050..6ef2f74b3a 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -68,7 +68,6 @@ import org.apache.ratis.proto.RaftProtos.ServerRpcProto; import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto; import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto; -import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.ClientInvocationId; import org.apache.ratis.protocol.GroupInfoReply; import org.apache.ratis.protocol.GroupInfoRequest; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java index 9e99178c69..c2ec88a32d 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java @@ -20,7 +20,6 @@ import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.proto.RaftProtos.*; import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto.AppendResult; -import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftGroupMemberId; import org.apache.ratis.protocol.RaftPeer; From e0590d058b97b57744a7c913d033f9dd4c5b29f0 Mon Sep 17 00:00:00 2001 From: szywilliam Date: Fri, 1 Sep 2023 16:28:31 +0800 Subject: [PATCH 3/5] support read-after-write consistency --- .../main/java/org/apache/ratis/server/impl/LeaderStateImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 783e8fc3c8..6751f1fc10 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -50,7 +50,6 @@ import org.apache.ratis.server.raftlog.LogProtoUtils; import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.statemachine.TransactionContext; -import org.apache.ratis.thirdparty.org.checkerframework.checker.units.qual.A; import org.apache.ratis.util.CodeInjectionForTesting; import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.Daemon; From 71525253083cdbcd1a392cb8bd68fde870ecfcbb Mon Sep 17 00:00:00 2001 From: szywilliam Date: Mon, 4 Sep 2023 09:23:49 +0800 Subject: [PATCH 4/5] apply review patch --- .../apache/ratis/client/impl/AsyncImpl.java | 2 +- .../ratis/server/impl/TestReadAfterWrite.java | 158 ++++++++++++++++++ .../ratis/server/impl/LeaderStateImpl.java | 1 + .../ratis/server/impl/RaftServerImpl.java | 4 + .../server/impl/ReadIndexHeartbeats.java | 1 + .../ratis/server/impl/WriteIndexCache.java | 5 +- 6 files changed, 167 insertions(+), 4 deletions(-) create mode 100644 ratis-examples/src/test/java/org/apache/ratis/server/impl/TestReadAfterWrite.java diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java index 2f7069f397..bf5d74bcba 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java @@ -53,7 +53,7 @@ public CompletableFuture sendReadOnly(Message message, RaftPeer @Override public CompletableFuture sendReadAfterWrite(Message message) { - return send(RaftClientRequest.readAfterWriteConsistentRequestType(), message, null); + return UnorderedAsync.send(RaftClientRequest.readAfterWriteConsistentRequestType(), message, null, client); } @Override diff --git a/ratis-examples/src/test/java/org/apache/ratis/server/impl/TestReadAfterWrite.java b/ratis-examples/src/test/java/org/apache/ratis/server/impl/TestReadAfterWrite.java new file mode 100644 index 0000000000..f8f2405cb1 --- /dev/null +++ b/ratis-examples/src/test/java/org/apache/ratis/server/impl/TestReadAfterWrite.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.BaseTest; +import org.apache.ratis.client.RaftClient; +import org.apache.ratis.client.api.AsyncApi; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine; +import org.apache.ratis.examples.arithmetic.expression.DoubleValue; +import org.apache.ratis.examples.arithmetic.expression.Expression; +import org.apache.ratis.examples.arithmetic.expression.Variable; +import org.apache.ratis.grpc.MiniRaftClusterWithGrpc; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientReply; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.util.CodeInjectionForTesting; +import org.apache.ratis.util.Slf4jUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.event.Level; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.ratis.examples.arithmetic.expression.BinaryExpression.Op.ADD; + +public class TestReadAfterWrite + extends BaseTest + implements MiniRaftClusterWithGrpc.FactoryGet { + + @Before + public void setup() { + Slf4jUtils.setLogLevel(ArithmeticStateMachine.LOG, Level.DEBUG); + Slf4jUtils.setLogLevel(CodeInjectionForTesting.LOG, Level.DEBUG); + Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG); + RaftServerTestUtil.setStateMachineUpdaterLogLevel(Level.DEBUG); + + final RaftProperties p = getProperties(); + p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, + ArithmeticStateMachine.class, StateMachine.class); + RaftServerConfigKeys.Read.setOption(p, RaftServerConfigKeys.Read.Option.LINEARIZABLE); + + } + + static class BlockingCode implements CodeInjectionForTesting.Code { + private final CompletableFuture future = new CompletableFuture<>(); + + void complete() { + future.complete(null); + } + + @Override + public boolean execute(Object localId, Object remoteId, Object... args) { + final boolean blocked = !future.isDone(); + if (blocked) { + LOG.info("Server {} blocks client {}: {}", localId, remoteId, args[0]); + } + future.join(); + if (blocked) { + LOG.info("Server {} unblocks client {}", localId, remoteId); + } + return true; + } + } + + @Test + public void testReadAfterWriteSingleServer() throws Exception { + runWithNewCluster(1, cluster -> { + try (final RaftClient client = cluster.createClient()) { + runTestReadAfterWrite(client); + } + }); + } + + @Test + public void testReadAfterWrite() throws Exception { + runWithNewCluster(3, cluster -> { + try (final RaftClient client = cluster.createClient()) { + runTestReadAfterWrite(client); + } + }); + } + + void runTestReadAfterWrite(RaftClient client) throws Exception { + final Variable a = new Variable("a"); + final Expression a_plus_2 = ADD.apply(a, new DoubleValue(2)); + + final AsyncApi async = client.async(); + final int initialValue = 10; + final RaftClientReply assign = async.send(a.assign(new DoubleValue(initialValue))).join(); + Assert.assertTrue(assign.isSuccess()); + + final Message query = Expression.Utils.toMessage(a); + assertReply(async.sendReadOnly(query), initialValue); + + //block state machine + final BlockingCode blockingCode = new BlockingCode(); + CodeInjectionForTesting.put(RaftServerImpl.APPEND_TRANSACTION, blockingCode); + final CompletableFuture plus2 = async.send(a.assign(a_plus_2)); + + final CompletableFuture readOnlyUnordered = async.sendReadOnlyUnordered(query); + final CompletableFuture readAfterWrite = async.sendReadAfterWrite(query); + + Thread.sleep(1000); + // readOnlyUnordered should get 10 + assertReply(readOnlyUnordered, initialValue); + + LOG.info("readAfterWrite.get"); + try { + // readAfterWrite should time out + readAfterWrite.get(100, TimeUnit.MILLISECONDS); + Assert.fail(); + } catch (TimeoutException e) { + LOG.info("Good", e); + } + + // plus2 should still be blocked. + Assert.assertFalse(plus2.isDone()); + // readAfterWrite should still be blocked. + Assert.assertFalse(readAfterWrite.isDone()); + + // unblock plus2 + blockingCode.complete(); + + // readAfterWrite should get 12. + assertReply(readAfterWrite, initialValue + 2); + } + + void assertReply(CompletableFuture future, int expected) { + LOG.info("assertReply, expected {}", expected); + final RaftClientReply reply = future.join(); + Assert.assertTrue(reply.isSuccess()); + LOG.info("reply {}", reply); + final DoubleValue result = (DoubleValue) Expression.Utils.bytes2Expression( + reply.getMessage().getContent().toByteArray(), 0); + Assert.assertEquals(expected, (int) (double) result.evaluate(null)); + } +} diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java index 6751f1fc10..5156585f81 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java @@ -1096,6 +1096,7 @@ CompletableFuture getReadIndex(Long readAfterWriteConsistentIndex) { } else { readIndex = server.getRaftLog().getLastCommittedIndex(); } + LOG.debug("readIndex={}, readAfterWriteConsistentIndex={}", readIndex, readAfterWriteConsistentIndex); // if group contains only one member, fast path if (server.getRaftConf().isSingleton()) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 6ef2f74b3a..c11a1a4c42 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -158,6 +158,7 @@ class RaftServerImpl implements RaftServer.Division, static final String REQUEST_VOTE = CLASS_NAME + ".requestVote"; static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries"; static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot"; + static final String APPEND_TRANSACTION = CLASS_NAME + ".appendTransaction"; static final String LOG_SYNC = APPEND_ENTRIES + ".logComplete"; static final String START_LEADER_ELECTION = CLASS_NAME + ".startLeaderElection"; @@ -807,6 +808,9 @@ void assertGroup(Object requestorId, RaftGroupId requestorGroupId) throws GroupM */ private CompletableFuture appendTransaction( RaftClientRequest request, TransactionContextImpl context, CacheEntry cacheEntry) throws IOException { + CodeInjectionForTesting.execute(APPEND_TRANSACTION, getId(), + request.getClientId(), request, context, cacheEntry); + assertLifeCycleState(LifeCycle.States.RUNNING); CompletableFuture reply; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java index 7e252f7ad8..3f31a25308 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadIndexHeartbeats.java @@ -167,6 +167,7 @@ AppendEntriesListener addAppendEntriesListener(long commitIndex, Function future) { CompletableFuture getWriteIndexFuture(RaftClientRequest request) { if (request != null && request.getType().getRead().getReadAfterWriteConsistent()) { final AtomicReference> ref = cache.getIfPresent(request.getClientId()); - final CompletableFuture future = ref != null? ref.get(): null; - if (future != null) { - return future; + if (ref != null) { + return ref.get(); } } return CompletableFuture.completedFuture(null); From 00f718286f3cfe527a54c196a763e77b821971cc Mon Sep 17 00:00:00 2001 From: szywilliam Date: Tue, 5 Sep 2023 09:49:18 +0800 Subject: [PATCH 5/5] apply review patch --- .../main/java/org/apache/ratis/client/impl/AsyncImpl.java | 2 +- .../java/org/apache/ratis/protocol/RaftClientRequest.java | 5 ++++- .../org/apache/ratis/server/impl/TestReadAfterWrite.java | 6 ++++-- 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java index bf5d74bcba..2f7069f397 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java @@ -53,7 +53,7 @@ public CompletableFuture sendReadOnly(Message message, RaftPeer @Override public CompletableFuture sendReadAfterWrite(Message message) { - return UnorderedAsync.send(RaftClientRequest.readAfterWriteConsistentRequestType(), message, null, client); + return send(RaftClientRequest.readAfterWriteConsistentRequestType(), message, null); } @Override diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java index 4b0c5166bd..220694ce0a 100644 --- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java +++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java @@ -227,7 +227,10 @@ public String toString() { case MESSAGESTREAM: return toString(getMessageStream()); case READ: - return "RO"; + final ReadRequestTypeProto read = getRead(); + return read.getReadAfterWriteConsistent()? "RaW" + : read.getPreferNonLinearizable()? "RO(pNL)" + : "RO"; case STALEREAD: return "StaleRead(" + getStaleRead().getMinIndex() + ")"; case WATCH: diff --git a/ratis-examples/src/test/java/org/apache/ratis/server/impl/TestReadAfterWrite.java b/ratis-examples/src/test/java/org/apache/ratis/server/impl/TestReadAfterWrite.java index f8f2405cb1..f515628c97 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/server/impl/TestReadAfterWrite.java +++ b/ratis-examples/src/test/java/org/apache/ratis/server/impl/TestReadAfterWrite.java @@ -128,8 +128,10 @@ void runTestReadAfterWrite(RaftClient client) throws Exception { LOG.info("readAfterWrite.get"); try { // readAfterWrite should time out - readAfterWrite.get(100, TimeUnit.MILLISECONDS); - Assert.fail(); + final RaftClientReply reply = readAfterWrite.get(100, TimeUnit.MILLISECONDS); + final DoubleValue result = (DoubleValue) Expression.Utils.bytes2Expression( + reply.getMessage().getContent().toByteArray(), 0); + Assert.fail("result=" + result + ", reply=" + reply); } catch (TimeoutException e) { LOG.info("Good", e); }