diff --git a/ratis-client/src/main/java/org/apache/ratis/client/AsyncRpcApi.java b/ratis-client/src/main/java/org/apache/ratis/client/AsyncRpcApi.java index 68d536f88a..29370755cd 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/AsyncRpcApi.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/AsyncRpcApi.java @@ -26,10 +26,9 @@ /** An RPC interface which extends the user interface {@link AsyncApi}. */ public interface AsyncRpcApi extends AsyncApi { /** - * Send the given RaftClientRequest asynchronously to the raft service. - * The RaftClientRequest will wrapped as Message in a new RaftClientRequest - * and leader will be decode it from the Message - * @param request The RaftClientRequest. + * Send the given forward-request asynchronously to the raft service. + * + * @param request The request to be forwarded. * @return a future of the reply. */ CompletableFuture sendForward(RaftClientRequest request); diff --git a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java index b00f18219f..94b9252c81 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamClient.java @@ -42,7 +42,11 @@ public interface DataStreamClient extends DataStreamRpcApi, Closeable { DataStreamClientRpc getClientRpc(); static Builder newBuilder() { - return new Builder(); + return newBuilder(null); + } + + static Builder newBuilder(RaftClient client) { + return new Builder(client); } /** To build {@link DataStreamClient} objects */ @@ -51,10 +55,14 @@ class Builder { private DataStreamClientRpc dataStreamClientRpc; private RaftProperties properties; private Parameters parameters; - private RaftGroupId raftGroupId; + private RaftGroupId groupId; private ClientId clientId; - private Builder() {} + private final RaftClient client; + + private Builder(RaftClient client) { + this.client = client; + } public DataStreamClient build() { Objects.requireNonNull(dataStreamServer, "The 'dataStreamServer' field is not initialized."); @@ -65,9 +73,13 @@ public DataStreamClient build() { .newDataStreamClientRpc(dataStreamServer, properties); } } + if (client != null) { + return ClientImplUtils.newDataStreamClient( + client, dataStreamServer, dataStreamClientRpc, properties); + } return ClientImplUtils.newDataStreamClient( Optional.ofNullable(clientId).orElseGet(ClientId::randomId), - raftGroupId, dataStreamServer, dataStreamClientRpc, properties); + groupId, dataStreamServer, dataStreamClientRpc, properties); } public Builder setClientId(ClientId clientId) { @@ -75,8 +87,8 @@ public Builder setClientId(ClientId clientId) { return this; } - public Builder setRaftGroupId(RaftGroupId raftGroupId) { - this.raftGroupId = raftGroupId; + public Builder setGroupId(RaftGroupId groupId) { + this.groupId = groupId; return this; } diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java index d4c2f16e91..60877b33bc 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java @@ -47,6 +47,9 @@ public interface RaftClient extends Closeable { /** @return the id of this client. */ ClientId getId(); + /** @return the group id of this client. */ + RaftGroupId getGroupId(); + /** @return the cluster leaderId recorded by this client. */ RaftPeerId getLeaderId(); diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java index 392dcd7f14..a69e9ffd3c 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientImplUtils.java @@ -44,4 +44,9 @@ static DataStreamClient newDataStreamClient(ClientId clientId, RaftGroupId group DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) { return new DataStreamClientImpl(clientId, groupId, primaryDataStreamServer, dataStreamClientRpc, properties); } + + static DataStreamClient newDataStreamClient(RaftClient client, RaftPeer primaryDataStreamServer, + DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) { + return new DataStreamClientImpl(client, primaryDataStreamServer, dataStreamClientRpc, properties); + } } diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java index 7e04d2f2cb..8a2692a5b6 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java @@ -17,11 +17,14 @@ */ package org.apache.ratis.client.impl; +import org.apache.ratis.client.AsyncRpcApi; import org.apache.ratis.client.DataStreamClient; import org.apache.ratis.client.DataStreamClientRpc; import org.apache.ratis.client.DataStreamOutputRpc; +import org.apache.ratis.client.RaftClient; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer; +import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer; import org.apache.ratis.io.FilePositionCount; import org.apache.ratis.io.StandardWriteOption; import org.apache.ratis.io.WriteOption; @@ -54,6 +57,7 @@ * allows client to create streams and send asynchronously. */ public class DataStreamClientImpl implements DataStreamClient { + private final RaftClient client; private final ClientId clientId; private final RaftGroupId groupId; @@ -63,6 +67,7 @@ public class DataStreamClientImpl implements DataStreamClient { DataStreamClientImpl(ClientId clientId, RaftGroupId groupId, RaftPeer dataStreamServer, DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) { + this.client = null; this.clientId = clientId; this.groupId = groupId; this.dataStreamServer = dataStreamServer; @@ -70,6 +75,16 @@ public class DataStreamClientImpl implements DataStreamClient { this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, properties); } + DataStreamClientImpl(RaftClient client, RaftPeer dataStreamServer, + DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) { + this.client = client; + this.clientId = client.getId(); + this.groupId = client.getGroupId(); + this.dataStreamServer = dataStreamServer; + this.dataStreamClientRpc = dataStreamClientRpc; + this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, properties); + } + public final class DataStreamOutputImpl implements DataStreamOutputRpc { private final RaftClientRequest header; private final CompletableFuture headerFuture; @@ -127,7 +142,7 @@ private CompletableFuture writeAsyncImpl(Object data, long leng } final CompletableFuture f = combineHeader(send(Type.STREAM_DATA, data, length, options)); if (WriteOption.containsOption(options, StandardWriteOption.CLOSE)) { - closeFuture = f; + closeFuture = client != null? f.thenCompose(this::sendForward): f; f.thenApply(ClientProtoUtils::getRaftClientReply).whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture)); } streamOffset += length; @@ -172,6 +187,23 @@ public CompletableFuture getRaftClientReplyFuture() { public WritableByteChannel getWritableByteChannel() { return writableByteChannelSupplier.get(); } + + private CompletableFuture sendForward(DataStreamReply writeReply) { + if (!writeReply.isSuccess()) { + return CompletableFuture.completedFuture(writeReply); + } + final AsyncRpcApi asyncRpc = (AsyncRpcApi) client.async(); + return asyncRpc.sendForward(header).thenApply(clientReply -> DataStreamReplyByteBuffer.newBuilder() + .setClientId(clientId) + .setType(writeReply.getType()) + .setStreamId(writeReply.getStreamId()) + .setStreamOffset(writeReply.getStreamOffset()) + .setBuffer(ClientProtoUtils.toRaftClientReplyProto(clientReply).toByteString().asReadOnlyByteBuffer()) + .setSuccess(clientReply.isSuccess()) + .setBytesWritten(writeReply.getBytesWritten()) + .setCommitInfos(clientReply.getCommitInfos()) + .build()); + } } @Override diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java index 75871bf906..e89c2464fe 100644 --- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java +++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java @@ -170,9 +170,7 @@ void set(Collection newPeers) { this.messageStreamApi = JavaUtils.memoize(() -> MessageStreamImpl.newInstance(this, properties)); this.asyncApi = JavaUtils.memoize(() -> new AsyncImpl(this)); this.blockingApi = JavaUtils.memoize(() -> new BlockingImpl(this)); - this.dataStreamApi = JavaUtils.memoize(() -> DataStreamClient.newBuilder() - .setClientId(clientId) - .setRaftGroupId(groupId) + this.dataStreamApi = JavaUtils.memoize(() -> DataStreamClient.newBuilder(this) .setDataStreamServer(primaryDataStreamServer) .setProperties(properties) .setParameters(parameters) @@ -180,11 +178,13 @@ void set(Collection newPeers) { this.adminApi = JavaUtils.memoize(() -> new AdminImpl(this)); } + @Override public RaftPeerId getLeaderId() { return leaderId; } - RaftGroupId getGroupId() { + @Override + public RaftGroupId getGroupId() { return groupId; } diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index 81c3aa1b6c..ddef3ac95f 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -19,7 +19,6 @@ package org.apache.ratis.netty.server; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import org.apache.ratis.client.AsyncRpcApi; import org.apache.ratis.client.DataStreamOutputRpc; import org.apache.ratis.client.impl.ClientProtoUtils; import org.apache.ratis.conf.RaftProperties; @@ -324,14 +323,12 @@ static void close(DataStream stream) { } static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(DataStreamRequestByteBuf request, - RaftClientReply reply, long bytesWritten, Collection commitInfos) { + RaftClientReply reply) { final ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer(); return DataStreamReplyByteBuffer.newBuilder() .setDataStreamPacket(request) .setBuffer(buffer) .setSuccess(reply.isSuccess()) - .setBytesWritten(bytesWritten) - .setCommitInfos(commitInfos) .build(); } @@ -349,28 +346,6 @@ static void sendReply(List> remoteWrites, ctx.writeAndFlush(builder.build()); } - private CompletableFuture startTransaction(StreamInfo info, DataStreamRequestByteBuf request, - long bytesWritten, ChannelHandlerContext ctx) { - final RequestMetrics metrics = getMetrics().newRequestMetrics(RequestType.START_TRANSACTION); - final RequestContext context = metrics.start(); - try { - AsyncRpcApi asyncRpcApi = (AsyncRpcApi) (server.getDivision(info.getRequest() - .getRaftGroupId()) - .getRaftClient() - .async()); - return asyncRpcApi.sendForward(info.request).whenCompleteAsync((reply, e) -> { - metrics.stop(context, e == null); - if (e != null) { - replyDataStreamException(server, e, info.getRequest(), request, ctx); - } else { - ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, bytesWritten, info.getCommitInfos())); - } - }, requestExecutor); - } catch (IOException e) { - throw new CompletionException(e); - } - } - static void replyDataStreamException(RaftServer server, Throwable cause, RaftClientRequest raftClientRequest, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) { final RaftClientReply reply = RaftClientReply.newBuilder() @@ -394,7 +369,7 @@ static void sendDataStreamException(Throwable throwable, DataStreamRequestByteBu ChannelHandlerContext ctx) { LOG.warn("Failed to process {}", request, throwable); try { - ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, 0, null)); + ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply)); } catch (Throwable t) { LOG.warn("Failed to sendDataStreamException {} for {}", throwable, request, t); } @@ -452,15 +427,9 @@ private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ct composeAsync(info.getPrevious(), requestExecutor, n -> JavaUtils.allOf(remoteWrites) .thenCombineAsync(localWrite, (v, bytesWritten) -> { if (request.getType() == Type.STREAM_HEADER - || (request.getType() == Type.STREAM_DATA && !close)) { + || request.getType() == Type.STREAM_DATA + || close) { sendReply(remoteWrites, request, bytesWritten, info.getCommitInfos(), ctx); - } else if (close) { - if (info.isPrimary()) { - // after all server close stream, primary server start transaction - startTransaction(info, request, bytesWritten, ctx); - } else { - sendReply(remoteWrites, request, bytesWritten, info.getCommitInfos(), ctx); - } } else { throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request); }