Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ CompletableFuture<WriteReplyProto> submitCommit(
uc = files.get(relative).asUnderConstruction();
} catch (FileNotFoundException e) {
return FileStoreCommon.completeExceptionally(
index, "Failed to write to " + relative, e);
index, "Failed to submitCommit to " + relative, e);
}

return uc.submitCommit(offset, size, converter, committer, getId(), index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ static <T> CompletableFuture<T> completeExceptionally(

static <T> CompletableFuture<T> completeExceptionally(
String message, Throwable cause) {
return JavaUtils.completeExceptionally(
new IOException(message).initCause(cause));
return JavaUtils.completeExceptionally(new IOException(message, cause));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.ratis.proto.ExamplesProtos.StreamWriteRequestProto;
import org.apache.ratis.proto.ExamplesProtos.WriteRequestHeaderProto;
import org.apache.ratis.proto.ExamplesProtos.WriteRequestProto;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
Expand Down Expand Up @@ -97,52 +97,65 @@ public TransactionContext startTransaction(RaftClientRequest request) throws IOE
final TransactionContext.Builder b = TransactionContext.newBuilder()
.setStateMachine(this)
.setClientRequest(request);

if (proto.getRequestCase() == FileStoreRequestProto.RequestCase.WRITE) {
final WriteRequestProto write = proto.getWrite();
final FileStoreRequestProto newProto = FileStoreRequestProto.newBuilder()
.setWriteHeader(write.getHeader()).build();
b.setLogData(newProto.toByteString()).setStateMachineData(write.getData());
b.setLogData(newProto.toByteString()).setStateMachineData(write.getData())
.setStateMachineContext(newProto);
} else {
b.setLogData(content);
b.setLogData(content)
.setStateMachineContext(proto);
}
return b.build();
}

@Override
public CompletableFuture<Integer> write(LogEntryProto entry) {
final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry();
final ByteString data = smLog.getLogData();
final FileStoreRequestProto proto;
try {
proto = FileStoreRequestProto.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
return FileStoreCommon.completeExceptionally(
entry.getIndex(), "Failed to parse data, entry=" + entry, e);
}
public TransactionContext startTransaction(LogEntryProto entry, RaftProtos.RaftPeerRole role) {
return TransactionContext.newBuilder()
.setStateMachine(this)
.setLogEntry(entry)
.setServerRole(role)
.setStateMachineContext(getProto(entry))
.build();
}

@Override
public CompletableFuture<Integer> write(LogEntryProto entry, TransactionContext context) {
final FileStoreRequestProto proto = getProto(context, entry);
if (proto.getRequestCase() != FileStoreRequestProto.RequestCase.WRITEHEADER) {
return null;
}

final WriteRequestHeaderProto h = proto.getWriteHeader();
final CompletableFuture<Integer> f = files.write(entry.getIndex(),
h.getPath().toStringUtf8(), h.getClose(), h.getSync(), h.getOffset(),
smLog.getStateMachineEntry().getStateMachineData());
entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData());
// sync only if closing the file
return h.getClose()? f: null;
}

@Override
public CompletableFuture<ByteString> read(LogEntryProto entry) {
final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry();
final ByteString data = smLog.getLogData();
final FileStoreRequestProto proto;
static FileStoreRequestProto getProto(TransactionContext context, LogEntryProto entry) {
if (context != null) {
final FileStoreRequestProto proto = (FileStoreRequestProto) context.getStateMachineContext();
if (proto != null) {
return proto;
}
}
return getProto(entry);
}

static FileStoreRequestProto getProto(LogEntryProto entry) {
try {
proto = FileStoreRequestProto.parseFrom(data);
return FileStoreRequestProto.parseFrom(entry.getStateMachineLogEntry().getLogData());
} catch (InvalidProtocolBufferException e) {
return FileStoreCommon.completeExceptionally(
entry.getIndex(), "Failed to parse data, entry=" + entry, e);
throw new IllegalArgumentException("Failed to parse data, entry=" + entry, e);
}
}

@Override
public CompletableFuture<ByteString> read(LogEntryProto entry, TransactionContext context) {
final FileStoreRequestProto proto = getProto(context, entry);
if (proto.getRequestCase() != FileStoreRequestProto.RequestCase.WRITEHEADER) {
return null;
}
Expand Down Expand Up @@ -206,20 +219,14 @@ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
final long index = entry.getIndex();
updateLastAppliedTermIndex(entry.getTerm(), index);

final StateMachineLogEntryProto smLog = entry.getStateMachineLogEntry();
final FileStoreRequestProto request;
try {
request = FileStoreRequestProto.parseFrom(smLog.getLogData());
} catch (InvalidProtocolBufferException e) {
return FileStoreCommon.completeExceptionally(index,
"Failed to parse logData in" + smLog, e);
}
final FileStoreRequestProto request = getProto(trx, entry);

switch(request.getRequestCase()) {
case DELETE:
return delete(index, request.getDelete());
case WRITEHEADER:
return writeCommit(index, request.getWriteHeader(), smLog.getStateMachineEntry().getStateMachineData().size());
return writeCommit(index, request.getWriteHeader(),
entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData().size());
case STREAM:
return streamCommit(request.getStream());
case WRITE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ <OUTPUT, THROWABLE extends Throwable> OUTPUT runSequentially(
*/
CompletableFuture<Long> appendEntry(LogEntryProto entry);

/**
* Append asynchronously an entry.
* Used by the leader.
*/
default CompletableFuture<Long> appendEntry(LogEntryProto entry, TransactionContext context) {
return appendEntry(entry);
}

/**
* The same as append(Arrays.asList(entries)).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ default CompletableFuture<ByteString> read(LogEntryProto entry) {
throw new UnsupportedOperationException("This method is NOT supported.");
}

/**
* Read asynchronously the state machine data from this state machine.
*
* @return a future for the read task.
*/
default CompletableFuture<ByteString> read(LogEntryProto entry, TransactionContext context) {
return read(entry);
}

/**
* Write asynchronously the state machine data in the given log entry to this state machine.
*
Expand All @@ -88,6 +97,15 @@ default CompletableFuture<?> write(LogEntryProto entry) {
return CompletableFuture.completedFuture(null);
}

/**
* Write asynchronously the state machine data in the given log entry to this state machine.
*
* @return a future for the write task
*/
default CompletableFuture<?> write(LogEntryProto entry, TransactionContext context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a @Nullable annotation here to notify user the context will be null when the peer is a follower?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point! Follower currently creates context in RaftServerImpl.applyLogToStateMachine. We should create a context instead of passing null and then the context can be reused. Let me think about it

return write(entry);
}

/**
* Create asynchronously a {@link DataStream} to stream state machine data.
* The state machine may use the first message (i.e. request.getMessage()) as the header to create the stream.
Expand Down Expand Up @@ -483,14 +501,30 @@ default FollowerEventApi followerEvent() {
* and then build a {@link TransactionContext}.
* The implementation should also be light-weighted.
*
* @return null if the request should be rejected.
* Otherwise, return a transaction with the content to be written to the log.
* @return a transaction with the content to be written to the log.
* @throws IOException thrown by the state machine while validation
*
* @see TransactionContext.Builder
*/
TransactionContext startTransaction(RaftClientRequest request) throws IOException;

/**
* Start a transaction for the given log entry for non-leaders.
* This method can be invoked in parallel when there are multiple requests.
* The implementation should prepare a {@link StateMachineLogEntryProto},
* and then build a {@link TransactionContext}.
* The implementation should also be light-weighted.
*
* @return a transaction with the content to be written to the log.
*/
default TransactionContext startTransaction(LogEntryProto entry, RaftPeerRole role) {
return TransactionContext.newBuilder()
.setStateMachine(this)
.setLogEntry(entry)
.setServerRole(role)
.build();
}

/**
* This is called before the transaction passed from the StateMachine is appended to the raft log.
* This method is called with the same strict serial order as the transaction order in the raft log.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.JmxRegister;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import org.apache.ratis.util.TimeDuration;
Expand Down Expand Up @@ -222,6 +223,7 @@ public long[] getFollowerNextIndices() {
private final DataStreamMap dataStreamMap;
private final RaftServerConfigKeys.Read.Option readOption;

private final TransactionManager transactionManager = new TransactionManager();
private final RetryCacheImpl retryCache;
private final CommitInfoCache commitInfoCache = new CommitInfoCache();
private final WriteIndexCache writeIndexCache;
Expand Down Expand Up @@ -1784,6 +1786,7 @@ private CompletableFuture<Message> replyPendingRequest(
}

return stateMachineFuture.whenComplete((reply, exception) -> {
transactionManager.remove(logIndex);
final RaftClientReply.Builder b = newReplyBuilder(invocationId, logIndex);
final RaftClientReply r;
if (exception == null) {
Expand All @@ -1801,6 +1804,27 @@ private CompletableFuture<Message> replyPendingRequest(
});
}

TransactionContext getTransactionContext(LogEntryProto entry, Boolean createNew) {
if (!entry.hasStateMachineLogEntry()) {
return null;
}

final Optional<LeaderStateImpl> leader = getRole().getLeaderState();
if (leader.isPresent()) {
final TransactionContext context = leader.get().getTransactionContext(entry.getIndex());
if (context != null) {
return context;
}
}

if (!createNew) {
return transactionManager.get(entry.getIndex());
}
return transactionManager.computeIfAbsent(entry.getIndex(),
// call startTransaction only once
MemoizedSupplier.valueOf(() -> stateMachine.startTransaction(entry, getInfo().getCurrentRole())));
}

CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) throws RaftLogIOException {
if (!next.hasStateMachineLogEntry()) {
stateMachine.event().notifyTermIndexUpdated(next.getTerm(), next.getIndex());
Expand All @@ -1813,14 +1837,7 @@ CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) throws Raf
stateMachine.event().notifyConfigurationChanged(next.getTerm(), next.getIndex(), next.getConfigurationEntry());
role.getLeaderState().ifPresent(leader -> leader.checkReady(next));
} else if (next.hasStateMachineLogEntry()) {
// check whether there is a TransactionContext because we are the leader.
TransactionContext trx = role.getLeaderState()
.map(leader -> leader.getTransactionContext(next.getIndex()))
.orElseGet(() -> TransactionContext.newBuilder()
.setServerRole(role.getCurrentRole())
.setStateMachine(stateMachine)
.setLogEntry(next)
.build());
TransactionContext trx = getTransactionContext(next, true);
final ClientInvocationId invocationId = ClientInvocationId.valueOf(next.getStateMachineLogEntry());
writeIndexCache.add(invocationId.getClientId(), ((TransactionContextImpl) trx).getLogIndexFuture());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,16 @@ private static RaftLog initRaftLog(RaftGroupMemberId memberId, RaftServerImpl se
if (RaftServerConfigKeys.Log.useMemory(prop)) {
log = new MemoryRaftLog(memberId, getSnapshotIndexFromStateMachine, prop);
} else {
log = new SegmentedRaftLog(memberId, server,
server.getStateMachine(),
server::notifyTruncatedLogEntry,
server::submitUpdateCommitEvent,
storage, getSnapshotIndexFromStateMachine, prop);
log = SegmentedRaftLog.newBuilder()
.setMemberId(memberId)
.setServer(server)
.setNotifyTruncatedLogEntry(server::notifyTruncatedLogEntry)
.setGetTransactionContext(server::getTransactionContext)
.setSubmitUpdateCommitEvent(server::submitUpdateCommitEvent)
.setStorage(storage)
.setSnapshotIndexSupplier(getSnapshotIndexFromStateMachine)
.setProperties(prop)
.build();
}
log.open(log.getSnapshotIndex(), logConsumer);
return log;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.impl;

import org.apache.ratis.statemachine.TransactionContext;

import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;

/**
* Managing {@link TransactionContext}.
*/
class TransactionManager {
private final ConcurrentMap<Long, Supplier<TransactionContext>> contexts = new ConcurrentHashMap<>();

TransactionContext get(long index) {
return Optional.ofNullable(contexts.get(index)).map(Supplier::get).orElse(null);
}

TransactionContext computeIfAbsent(long index, Supplier<TransactionContext> constructor) {
return contexts.computeIfAbsent(index, i -> constructor).get();
}

void remove(long index) {
contexts.remove(index);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ private long appendImpl(long term, TransactionContext operation) throws StateMac
throw new StateMachineException(memberId, new RaftLogIOException(
"Log entry size " + entrySize + " exceeds the max buffer limit of " + maxBufferSize));
}
appendEntry(e).whenComplete((returned, t) -> {
appendEntry(e, operation).whenComplete((returned, t) -> {
if (t != null) {
LOG.error(name + ": Failed to write log entry " + LogProtoUtils.toLogEntryString(e), t);
} else if (returned != nextIndex) {
Expand Down Expand Up @@ -343,10 +343,15 @@ public final CompletableFuture<Long> purge(long suggestedIndex) {

@Override
public final CompletableFuture<Long> appendEntry(LogEntryProto entry) {
return runner.runSequentially(() -> appendEntryImpl(entry));
return appendEntry(entry, null);
}

protected abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto entry);
@Override
public final CompletableFuture<Long> appendEntry(LogEntryProto entry, TransactionContext context) {
return runner.runSequentially(() -> appendEntryImpl(entry, context));
}

protected abstract CompletableFuture<Long> appendEntryImpl(LogEntryProto entry, TransactionContext context);

@Override
public final List<CompletableFuture<Long>> append(List<LogEntryProto> entries) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.ratis.server.raftlog.RaftLogBase;
import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.Preconditions;

Expand Down Expand Up @@ -165,7 +166,7 @@ public TermIndex getLastEntryTermIndex() {
}

@Override
protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry, TransactionContext context) {
checkLogState();
try(AutoCloseableLock writeLock = writeLock()) {
validateLogEntry(entry);
Expand Down
Loading