From c22222b6ed43f57af5b936c09f84a6cda5871438 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Fri, 29 Sep 2023 10:21:22 -0700 Subject: [PATCH] RATIS-1897. Make TransactionContext available in DataApi.write(..). --- .../ratis/examples/filestore/FileStore.java | 2 +- .../examples/filestore/FileStoreCommon.java | 3 +- .../filestore/FileStoreStateMachine.java | 71 +++++----- .../server/raftlog/RaftLogSequentialOps.java | 8 ++ .../ratis/statemachine/StateMachine.java | 38 +++++- .../ratis/server/impl/RaftServerImpl.java | 33 +++-- .../apache/ratis/server/impl/ServerState.java | 15 ++- .../ratis/server/impl/TransactionManager.java | 44 +++++++ .../ratis/server/raftlog/RaftLogBase.java | 11 +- .../server/raftlog/memory/MemoryRaftLog.java | 3 +- .../raftlog/segmented/SegmentedRaftLog.java | 124 ++++++++++++++---- .../segmented/SegmentedRaftLogWorker.java | 9 +- .../ratis/server/impl/RaftServerTestUtil.java | 13 +- .../ratis/server/impl/RetryCacheTestUtil.java | 12 +- .../segmented/TestSegmentedRaftLog.java | 29 +++- 15 files changed, 322 insertions(+), 93 deletions(-) create mode 100644 ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java index 04ff64bd84..a930170ecd 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java @@ -239,7 +239,7 @@ CompletableFuture submitCommit( uc = files.get(relative).asUnderConstruction(); } catch (FileNotFoundException e) { return FileStoreCommon.completeExceptionally( - index, "Failed to write to " + relative, e); + index, "Failed to submitCommit to " + relative, e); } return uc.submitCommit(offset, size, converter, committer, getId(), index) diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java index 152bc5a23d..c96662b2ae 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java @@ -59,7 +59,6 @@ static CompletableFuture completeExceptionally( static CompletableFuture completeExceptionally( String message, Throwable cause) { - return JavaUtils.completeExceptionally( - new IOException(message).initCause(cause)); + return JavaUtils.completeExceptionally(new IOException(message, cause)); } } diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java index 209baaf7d8..5f258ee3b7 100644 --- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java +++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java @@ -26,8 +26,8 @@ import org.apache.ratis.proto.ExamplesProtos.StreamWriteRequestProto; import org.apache.ratis.proto.ExamplesProtos.WriteRequestHeaderProto; import org.apache.ratis.proto.ExamplesProtos.WriteRequestProto; +import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftGroupId; @@ -97,29 +97,32 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE final TransactionContext.Builder b = TransactionContext.newBuilder() .setStateMachine(this) .setClientRequest(request); - if (proto.getRequestCase() == FileStoreRequestProto.RequestCase.WRITE) { final WriteRequestProto write = proto.getWrite(); final FileStoreRequestProto newProto = FileStoreRequestProto.newBuilder() .setWriteHeader(write.getHeader()).build(); - b.setLogData(newProto.toByteString()).setStateMachineData(write.getData()); + b.setLogData(newProto.toByteString()).setStateMachineData(write.getData()) + .setStateMachineContext(newProto); } else { - b.setLogData(content); + b.setLogData(content) + .setStateMachineContext(proto); } return b.build(); } @Override - public CompletableFuture write(LogEntryProto entry) { - final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); - final ByteString data = smLog.getLogData(); - final FileStoreRequestProto proto; - try { - proto = FileStoreRequestProto.parseFrom(data); - } catch (InvalidProtocolBufferException e) { - return FileStoreCommon.completeExceptionally( - entry.getIndex(), "Failed to parse data, entry=" + entry, e); - } + public TransactionContext startTransaction(LogEntryProto entry, RaftProtos.RaftPeerRole role) { + return TransactionContext.newBuilder() + .setStateMachine(this) + .setLogEntry(entry) + .setServerRole(role) + .setStateMachineContext(getProto(entry)) + .build(); + } + + @Override + public CompletableFuture write(LogEntryProto entry, TransactionContext context) { + final FileStoreRequestProto proto = getProto(context, entry); if (proto.getRequestCase() != FileStoreRequestProto.RequestCase.WRITEHEADER) { return null; } @@ -127,22 +130,32 @@ public CompletableFuture write(LogEntryProto entry) { final WriteRequestHeaderProto h = proto.getWriteHeader(); final CompletableFuture f = files.write(entry.getIndex(), h.getPath().toStringUtf8(), h.getClose(), h.getSync(), h.getOffset(), - smLog.getStateMachineEntry().getStateMachineData()); + entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData()); // sync only if closing the file return h.getClose()? f: null; } - @Override - public CompletableFuture read(LogEntryProto entry) { - final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); - final ByteString data = smLog.getLogData(); - final FileStoreRequestProto proto; + static FileStoreRequestProto getProto(TransactionContext context, LogEntryProto entry) { + if (context != null) { + final FileStoreRequestProto proto = (FileStoreRequestProto) context.getStateMachineContext(); + if (proto != null) { + return proto; + } + } + return getProto(entry); + } + + static FileStoreRequestProto getProto(LogEntryProto entry) { try { - proto = FileStoreRequestProto.parseFrom(data); + return FileStoreRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData()); } catch (InvalidProtocolBufferException e) { - return FileStoreCommon.completeExceptionally( - entry.getIndex(), "Failed to parse data, entry=" + entry, e); + throw new IllegalArgumentException("Failed to parse data, entry=" + entry, e); } + } + + @Override + public CompletableFuture read(LogEntryProto entry, TransactionContext context) { + final FileStoreRequestProto proto = getProto(context, entry); if (proto.getRequestCase() != FileStoreRequestProto.RequestCase.WRITEHEADER) { return null; } @@ -206,20 +219,14 @@ public CompletableFuture applyTransaction(TransactionContext trx) { final long index = entry.getIndex(); updateLastAppliedTermIndex(entry.getTerm(), index); - final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry(); - final FileStoreRequestProto request; - try { - request = FileStoreRequestProto.parseFrom(smLog.getLogData()); - } catch (InvalidProtocolBufferException e) { - return FileStoreCommon.completeExceptionally(index, - "Failed to parse logData in" + smLog, e); - } + final FileStoreRequestProto request = getProto(trx, entry); switch(request.getRequestCase()) { case DELETE: return delete(index, request.getDelete()); case WRITEHEADER: - return writeCommit(index, request.getWriteHeader(), smLog.getStateMachineEntry().getStateMachineData().size()); + return writeCommit(index, request.getWriteHeader(), + entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData().size()); case STREAM: return streamCommit(request.getStream()); case WRITE: diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java index 32bd564e0a..7b9f42b6bd 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/server/raftlog/RaftLogSequentialOps.java @@ -121,6 +121,14 @@ OUTPUT runSequentially( */ CompletableFuture appendEntry(LogEntryProto entry); + /** + * Append asynchronously an entry. + * Used by the leader. + */ + default CompletableFuture appendEntry(LogEntryProto entry, TransactionContext context) { + return appendEntry(entry); + } + /** * The same as append(Arrays.asList(entries)). * diff --git a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java index 90813f325b..b1fc5addae 100644 --- a/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java +++ b/ratis-server-api/src/main/java/org/apache/ratis/statemachine/StateMachine.java @@ -79,6 +79,15 @@ default CompletableFuture read(LogEntryProto entry) { throw new UnsupportedOperationException("This method is NOT supported."); } + /** + * Read asynchronously the state machine data from this state machine. + * + * @return a future for the read task. + */ + default CompletableFuture read(LogEntryProto entry, TransactionContext context) { + return read(entry); + } + /** * Write asynchronously the state machine data in the given log entry to this state machine. * @@ -88,6 +97,15 @@ default CompletableFuture write(LogEntryProto entry) { return CompletableFuture.completedFuture(null); } + /** + * Write asynchronously the state machine data in the given log entry to this state machine. + * + * @return a future for the write task + */ + default CompletableFuture write(LogEntryProto entry, TransactionContext context) { + return write(entry); + } + /** * Create asynchronously a {@link DataStream} to stream state machine data. * The state machine may use the first message (i.e. request.getMessage()) as the header to create the stream. @@ -483,14 +501,30 @@ default FollowerEventApi followerEvent() { * and then build a {@link TransactionContext}. * The implementation should also be light-weighted. * - * @return null if the request should be rejected. - * Otherwise, return a transaction with the content to be written to the log. + * @return a transaction with the content to be written to the log. * @throws IOException thrown by the state machine while validation * * @see TransactionContext.Builder */ TransactionContext startTransaction(RaftClientRequest request) throws IOException; + /** + * Start a transaction for the given log entry for non-leaders. + * This method can be invoked in parallel when there are multiple requests. + * The implementation should prepare a {@link StateMachineLogEntryProto}, + * and then build a {@link TransactionContext}. + * The implementation should also be light-weighted. + * + * @return a transaction with the content to be written to the log. + */ + default TransactionContext startTransaction(LogEntryProto entry, RaftPeerRole role) { + return TransactionContext.newBuilder() + .setStateMachine(this) + .setLogEntry(entry) + .setServerRole(role) + .build(); + } + /** * This is called before the transaction passed from the StateMachine is appended to the raft log. * This method is called with the same strict serial order as the transaction order in the raft log. diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java index 3fb0cb2faa..6127972cb6 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java @@ -135,6 +135,7 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.JmxRegister; import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ProtoUtils; import org.apache.ratis.util.TimeDuration; @@ -222,6 +223,7 @@ public long[] getFollowerNextIndices() { private final DataStreamMap dataStreamMap; private final RaftServerConfigKeys.Read.Option readOption; + private final TransactionManager transactionManager = new TransactionManager(); private final RetryCacheImpl retryCache; private final CommitInfoCache commitInfoCache = new CommitInfoCache(); private final WriteIndexCache writeIndexCache; @@ -1784,6 +1786,7 @@ private CompletableFuture replyPendingRequest( } return stateMachineFuture.whenComplete((reply, exception) -> { + transactionManager.remove(logIndex); final RaftClientReply.Builder b = newReplyBuilder(invocationId, logIndex); final RaftClientReply r; if (exception == null) { @@ -1801,6 +1804,27 @@ private CompletableFuture replyPendingRequest( }); } + TransactionContext getTransactionContext(LogEntryProto entry, Boolean createNew) { + if (!entry.hasStateMachineLogEntry()) { + return null; + } + + final Optional leader = getRole().getLeaderState(); + if (leader.isPresent()) { + final TransactionContext context = leader.get().getTransactionContext(entry.getIndex()); + if (context != null) { + return context; + } + } + + if (!createNew) { + return transactionManager.get(entry.getIndex()); + } + return transactionManager.computeIfAbsent(entry.getIndex(), + // call startTransaction only once + MemoizedSupplier.valueOf(() -> stateMachine.startTransaction(entry, getInfo().getCurrentRole()))); + } + CompletableFuture applyLogToStateMachine(LogEntryProto next) throws RaftLogIOException { if (!next.hasStateMachineLogEntry()) { stateMachine.event().notifyTermIndexUpdated(next.getTerm(), next.getIndex()); @@ -1813,14 +1837,7 @@ CompletableFuture applyLogToStateMachine(LogEntryProto next) throws Raf stateMachine.event().notifyConfigurationChanged(next.getTerm(), next.getIndex(), next.getConfigurationEntry()); role.getLeaderState().ifPresent(leader -> leader.checkReady(next)); } else if (next.hasStateMachineLogEntry()) { - // check whether there is a TransactionContext because we are the leader. - TransactionContext trx = role.getLeaderState() - .map(leader -> leader.getTransactionContext(next.getIndex())) - .orElseGet(() -> TransactionContext.newBuilder() - .setServerRole(role.getCurrentRole()) - .setStateMachine(stateMachine) - .setLogEntry(next) - .build()); + TransactionContext trx = getTransactionContext(next, true); final ClientInvocationId invocationId = ClientInvocationId.valueOf(next.getStateMachineLogEntry()); writeIndexCache.add(invocationId.getClientId(), ((TransactionContextImpl) trx).getLogIndexFuture()); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java index 2f2f36d79a..315d610551 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java @@ -177,11 +177,16 @@ private static RaftLog initRaftLog(RaftGroupMemberId memberId, RaftServerImpl se if (RaftServerConfigKeys.Log.useMemory(prop)) { log = new MemoryRaftLog(memberId, getSnapshotIndexFromStateMachine, prop); } else { - log = new SegmentedRaftLog(memberId, server, - server.getStateMachine(), - server::notifyTruncatedLogEntry, - server::submitUpdateCommitEvent, - storage, getSnapshotIndexFromStateMachine, prop); + log = SegmentedRaftLog.newBuilder() + .setMemberId(memberId) + .setServer(server) + .setNotifyTruncatedLogEntry(server::notifyTruncatedLogEntry) + .setGetTransactionContext(server::getTransactionContext) + .setSubmitUpdateCommitEvent(server::submitUpdateCommitEvent) + .setStorage(storage) + .setSnapshotIndexSupplier(getSnapshotIndexFromStateMachine) + .setProperties(prop) + .build(); } log.open(log.getSnapshotIndex(), logConsumer); return log; diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java new file mode 100644 index 0000000000..aa989cf989 --- /dev/null +++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransactionManager.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.server.impl; + +import org.apache.ratis.statemachine.TransactionContext; + +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; + +/** + * Managing {@link TransactionContext}. + */ +class TransactionManager { + private final ConcurrentMap> contexts = new ConcurrentHashMap<>(); + + TransactionContext get(long index) { + return Optional.ofNullable(contexts.get(index)).map(Supplier::get).orElse(null); + } + + TransactionContext computeIfAbsent(long index, Supplier constructor) { + return contexts.computeIfAbsent(index, i -> constructor).get(); + } + + void remove(long index) { + contexts.remove(index); + } +} \ No newline at end of file diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java index 31865038e3..708b499cd3 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLogBase.java @@ -185,7 +185,7 @@ private long appendImpl(long term, TransactionContext operation) throws StateMac throw new StateMachineException(memberId, new RaftLogIOException( "Log entry size " + entrySize + " exceeds the max buffer limit of " + maxBufferSize)); } - appendEntry(e).whenComplete((returned, t) -> { + appendEntry(e, operation).whenComplete((returned, t) -> { if (t != null) { LOG.error(name + ": Failed to write log entry " + LogProtoUtils.toLogEntryString(e), t); } else if (returned != nextIndex) { @@ -343,10 +343,15 @@ public final CompletableFuture purge(long suggestedIndex) { @Override public final CompletableFuture appendEntry(LogEntryProto entry) { - return runner.runSequentially(() -> appendEntryImpl(entry)); + return appendEntry(entry, null); } - protected abstract CompletableFuture appendEntryImpl(LogEntryProto entry); + @Override + public final CompletableFuture appendEntry(LogEntryProto entry, TransactionContext context) { + return runner.runSequentially(() -> appendEntryImpl(entry, context)); + } + + protected abstract CompletableFuture appendEntryImpl(LogEntryProto entry, TransactionContext context); @Override public final List> append(List entries) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java index 0eb7fb1595..ebb1e27d77 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java @@ -25,6 +25,7 @@ import org.apache.ratis.server.raftlog.RaftLogBase; import org.apache.ratis.server.raftlog.LogEntryHeader; import org.apache.ratis.server.storage.RaftStorageMetadata; +import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.Preconditions; @@ -165,7 +166,7 @@ public TermIndex getLastEntryTermIndex() { } @Override - protected CompletableFuture appendEntryImpl(LogEntryProto entry) { + protected CompletableFuture appendEntryImpl(LogEntryProto entry, TransactionContext context) { checkLogState(); try(AutoCloseableLock writeLock = writeLock()) { validateLogEntry(entry); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 255bec2911..a729f8e2e8 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -26,6 +26,7 @@ import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.server.raftlog.LogEntryHeader; import org.apache.ratis.server.raftlog.LogProtoUtils; +import org.apache.ratis.server.raftlog.RaftLog; import org.apache.ratis.server.raftlog.RaftLogBase; import org.apache.ratis.server.raftlog.RaftLogIOException; import org.apache.ratis.server.storage.RaftStorageMetadata; @@ -34,6 +35,7 @@ import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncateIndices; import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.AutoCloseableLock; import org.apache.ratis.util.AwaitToRun; @@ -49,6 +51,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.LongSupplier; @@ -80,7 +83,7 @@ * in segments should be no smaller than the last index of snapshot, otherwise * we may have hole when append further log. */ -public class SegmentedRaftLog extends RaftLogBase { +public final class SegmentedRaftLog extends RaftLogBase { /** * I/O task definitions. */ @@ -145,6 +148,10 @@ default long getLastAppliedIndex() { /** Notify the server that a log entry is being truncated. */ default void notifyTruncatedLogEntry(TermIndex ti) { } + + default TransactionContext getTransactionContext(LogEntryProto entry, boolean createNew) { + return null; + } } /** @@ -152,7 +159,8 @@ default void notifyTruncatedLogEntry(TermIndex ti) { * Otherwise, the server is non-null, return the implementation using the given server. */ private ServerLogMethods newServerLogMethods(RaftServer.Division impl, - Consumer notifyTruncatedLogEntry) { + Consumer notifyTruncatedLogEntry, + BiFunction getTransactionContext) { if (impl == null) { return ServerLogMethods.DUMMY; } @@ -177,6 +185,11 @@ public void notifyTruncatedLogEntry(TermIndex ti) { LOG.error("{}: Failed to read log {}", getName(), ti, e); } } + + @Override + public TransactionContext getTransactionContext(LogEntryProto entry, boolean createNew) { + return getTransactionContext.apply(entry, createNew); + } }; } @@ -190,22 +203,19 @@ public void notifyTruncatedLogEntry(TermIndex ti) { private final boolean stateMachineCachingEnabled; private final SegmentedRaftLogMetrics metrics; - @SuppressWarnings("parameternumber") - public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServer.Division server, - StateMachine stateMachine, Consumer notifyTruncatedLogEntry, Runnable submitUpdateCommitEvent, - RaftStorage storage, LongSupplier snapshotIndexSupplier, RaftProperties properties) { - super(memberId, snapshotIndexSupplier, properties); - this.metrics = new SegmentedRaftLogMetrics(memberId); - - this.server = newServerLogMethods(server, notifyTruncatedLogEntry); - this.storage = storage; - this.stateMachine = stateMachine; - segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); - this.cache = new SegmentedRaftLogCache(memberId, storage, properties, getRaftLogMetrics()); - this.cacheEviction = new AwaitToRun(memberId + "-cacheEviction", this::checkAndEvictCache).start(); - this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine, - submitUpdateCommitEvent, server, storage, properties, getRaftLogMetrics()); - stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties); + private SegmentedRaftLog(Builder b) { + super(b.memberId, b.snapshotIndexSupplier, b.properties); + this.metrics = new SegmentedRaftLogMetrics(b.memberId); + + this.server = newServerLogMethods(b.server, b.notifyTruncatedLogEntry, b.getTransactionContext); + this.storage = b.storage; + this.stateMachine = b.stateMachine; + this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(b.properties).getSize(); + this.cache = new SegmentedRaftLogCache(b.memberId, storage, b.properties, getRaftLogMetrics()); + this.cacheEviction = new AwaitToRun(b.memberId + "-cacheEviction", this::checkAndEvictCache).start(); + this.fileLogWorker = new SegmentedRaftLogWorker(b.memberId, stateMachine, + b.submitUpdateCommitEvent, b.server, storage, b.properties, getRaftLogMetrics()); + stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(b.properties); } @Override @@ -298,7 +308,7 @@ public EntryWithData getEntryWithData(long index) throws RaftLogIOException { try { CompletableFuture future = null; if (stateMachine != null) { - future = stateMachine.data().read(entry).exceptionally(ex -> { + future = stateMachine.data().read(entry, server.getTransactionContext(entry, false)).exceptionally(ex -> { stateMachine.event().notifyLogFailed(ex, entry); throw new CompletionException("Failed to read state machine data for log entry " + entry, ex); }); @@ -376,7 +386,7 @@ protected CompletableFuture purgeImpl(long index) { } @Override - protected CompletableFuture appendEntryImpl(LogEntryProto entry) { + protected CompletableFuture appendEntryImpl(LogEntryProto entry, TransactionContext context) { checkLogState(); if (LOG.isTraceEnabled()) { LOG.trace("{}: appendEntry {}", getName(), LogProtoUtils.toLogEntryString(entry)); @@ -412,7 +422,7 @@ protected CompletableFuture appendEntryImpl(LogEntryProto entry) { // to statemachine first and then to the cache. Not following the order // will leave a spurious entry in the cache. CompletableFuture writeFuture = - fileLogWorker.writeLogEntry(entry).getFuture(); + fileLogWorker.writeLogEntry(entry, context).getFuture(); if (stateMachineCachingEnabled) { // The stateMachineData will be cached inside the StateMachine itself. cache.appendEntry(LogProtoUtils.removeStateMachineData(entry), @@ -460,7 +470,8 @@ public List> appendImpl(List entries) { futures = new ArrayList<>(entries.size() - index); } for (int i = index; i < entries.size(); i++) { - futures.add(appendEntry(entries.get(i))); + final LogEntryProto entry = entries.get(i); + futures.add(appendEntry(entry, server.getTransactionContext(entry, true))); } return futures; } @@ -528,4 +539,73 @@ SegmentedRaftLogCache getRaftLogCache() { public String toLogEntryString(LogEntryProto logEntry) { return LogProtoUtils.toLogEntryString(logEntry, stateMachine::toStateMachineLogEntryString); } + + public static Builder newBuilder() { + return new Builder(); + } + + public static final class Builder { + private RaftGroupMemberId memberId; + private RaftServer.Division server; + private StateMachine stateMachine; + private Consumer notifyTruncatedLogEntry; + private BiFunction getTransactionContext; + private Runnable submitUpdateCommitEvent; + private RaftStorage storage; + private LongSupplier snapshotIndexSupplier = () -> RaftLog.INVALID_LOG_INDEX; + private RaftProperties properties; + + private Builder() {} + + public Builder setMemberId(RaftGroupMemberId memberId) { + this.memberId = memberId; + return this; + } + + public Builder setServer(RaftServer.Division server) { + this.server = server; + this.stateMachine = server.getStateMachine(); + return this; + } + + public Builder setStateMachine(StateMachine stateMachine) { + this.stateMachine = stateMachine; + return this; + } + + public Builder setNotifyTruncatedLogEntry(Consumer notifyTruncatedLogEntry) { + this.notifyTruncatedLogEntry = notifyTruncatedLogEntry; + return this; + } + + public Builder setGetTransactionContext( + BiFunction getTransactionContext) { + this.getTransactionContext = getTransactionContext; + return this; + } + + public Builder setSubmitUpdateCommitEvent(Runnable submitUpdateCommitEvent) { + this.submitUpdateCommitEvent = submitUpdateCommitEvent; + return this; + } + + public Builder setStorage(RaftStorage storage) { + this.storage = storage; + return this; + } + + public Builder setSnapshotIndexSupplier(LongSupplier snapshotIndexSupplier) { + this.snapshotIndexSupplier = snapshotIndexSupplier; + return this; + } + + public Builder setProperties(RaftProperties properties) { + this.properties = properties; + return this; + } + + public SegmentedRaftLog build() { + return new SegmentedRaftLog(this); + } + } } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java index 9f34f29170..18fd680129 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java @@ -37,6 +37,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.StateMachine.DataStream; +import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -435,8 +436,8 @@ void rollLogSegment(LogSegment segmentToClose) { addIOTask(new StartLogSegment(segmentToClose.getEndIndex() + 1)); } - Task writeLogEntry(LogEntryProto entry) { - return addIOTask(new WriteLog(entry)); + Task writeLogEntry(LogEntryProto entry, TransactionContext context) { + return addIOTask(new WriteLog(entry, context)); } Task truncate(TruncationSegments ts, long index) { @@ -483,7 +484,7 @@ private class WriteLog extends Task { private final CompletableFuture stateMachineFuture; private final CompletableFuture combined; - WriteLog(LogEntryProto entry) { + WriteLog(LogEntryProto entry, TransactionContext context) { this.entry = LogProtoUtils.removeStateMachineData(entry); if (this.entry == entry) { final StateMachineLogEntryProto proto = entry.hasStateMachineLogEntry()? entry.getStateMachineLogEntry(): null; @@ -498,7 +499,7 @@ private class WriteLog extends Task { } else { try { // this.entry != entry iff the entry has state machine data - this.stateMachineFuture = stateMachine.data().write(entry); + this.stateMachineFuture = stateMachine.data().write(entry, context); } catch (Exception e) { LOG.error(name + ": writeStateMachineData failed for index " + entry.getIndex() + ", entry=" + LogProtoUtils.toLogEntryString(entry, stateMachine::toStateMachineLogEntryString), e); diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java index 958c19442f..73482dcf8d 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java @@ -182,10 +182,15 @@ public static SegmentedRaftLog newSegmentedRaftLog(RaftGroupMemberId memberId, D final RaftServerImpl server = Mockito.mock(RaftServerImpl.class); Mockito.when(server.getInfo()).thenReturn(info); - return new SegmentedRaftLog(memberId, server, null, - server::notifyTruncatedLogEntry, - server::submitUpdateCommitEvent, - storage, () -> -1, properties); + return SegmentedRaftLog.newBuilder() + .setMemberId(memberId) + .setServer(server) + .setNotifyTruncatedLogEntry(server::notifyTruncatedLogEntry) + .setGetTransactionContext(server::getTransactionContext) + .setSubmitUpdateCommitEvent(server::submitUpdateCommitEvent) + .setStorage(storage) + .setProperties(properties) + .build(); } public static boolean isHighestPriority(RaftConfiguration config, RaftPeerId peerId) { diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java index 9ab814cc66..f59958a943 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java @@ -81,8 +81,14 @@ public static SegmentedRaftLog newSegmentedRaftLog(RaftGroupMemberId memberId, R when(server.getRetryCache()).thenReturn((RetryCacheImpl) retryCache); when(server.getMemberId()).thenReturn(memberId); doCallRealMethod().when(server).notifyTruncatedLogEntry(any(LogEntryProto.class)); - return new SegmentedRaftLog(memberId, server, null, - server::notifyTruncatedLogEntry, server::submitUpdateCommitEvent, - storage, () -> -1, properties); + return SegmentedRaftLog.newBuilder() + .setMemberId(memberId) + .setServer(server) + .setNotifyTruncatedLogEntry(server::notifyTruncatedLogEntry) + .setGetTransactionContext(server::getTransactionContext) + .setSubmitUpdateCommitEvent(server::submitUpdateCommitEvent) + .setStorage(storage) + .setProperties(properties) + .build(); } } diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java index 6dc75a3d3a..abc36e4eed 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java @@ -136,14 +136,21 @@ SegmentedRaftLog newSegmentedRaftLog(LongSupplier getSnapshotIndexFromStateMachi } static SegmentedRaftLog newSegmentedRaftLog(RaftStorage storage, RaftProperties properties) { - return new SegmentedRaftLog(memberId, null, null, null, null, storage, - () -> -1, properties); + return SegmentedRaftLog.newBuilder() + .setMemberId(memberId) + .setStorage(storage) + .setProperties(properties) + .build(); } private SegmentedRaftLog newSegmentedRaftLogWithSnapshotIndex(RaftStorage storage, RaftProperties properties, LongSupplier getSnapshotIndexFromStateMachine) { - return new SegmentedRaftLog(memberId, null, null, null, null, storage, - getSnapshotIndexFromStateMachine, properties); + return SegmentedRaftLog.newBuilder() + .setMemberId(memberId) + .setStorage(storage) + .setSnapshotIndexSupplier(getSnapshotIndexFromStateMachine) + .setProperties(properties) + .build(); } @Before @@ -576,7 +583,12 @@ public void testSegmentedRaftLogStateMachineData() throws Exception { final List entries = prepareLogEntries(range, null, true, new ArrayList<>()); final SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing(); - try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, sm, null, null, storage, () -> -1, properties)) { + try (SegmentedRaftLog raftLog = SegmentedRaftLog.newBuilder() + .setMemberId(memberId) + .setStateMachine(sm) + .setStorage(storage) + .setProperties(properties) + .build()) { raftLog.open(RaftLog.INVALID_LOG_INDEX, null); int next = 0; @@ -641,7 +653,12 @@ public void notifyLogFailed(Throwable cause, LogEntryProto entry) { }; Throwable ex = null; // TimeoutIOException - try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, sm, null, null, storage, () -> -1, properties)) { + try (SegmentedRaftLog raftLog = SegmentedRaftLog.newBuilder() + .setMemberId(memberId) + .setStateMachine(sm) + .setStorage(storage) + .setProperties(properties) + .build()) { raftLog.open(RaftLog.INVALID_LOG_INDEX, null); // SegmentedRaftLogWorker should catch TimeoutIOException CompletableFuture f = raftLog.appendEntry(entry);