Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
/** An RPC interface which extends the user interface {@link AsyncApi}. */
public interface AsyncRpcApi extends AsyncApi {
/**
* Send the given RaftClientRequest asynchronously to the raft service.
* The RaftClientRequest will wrapped as Message in a new RaftClientRequest
* and leader will be decode it from the Message
* @param request The RaftClientRequest.
* Send the given forward-request asynchronously to the raft service.
*
* @param request The request to be forwarded.
* @return a future of the reply.
*/
CompletableFuture<RaftClientReply> sendForward(RaftClientRequest request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@ public interface DataStreamClient extends DataStreamRpcApi, Closeable {
DataStreamClientRpc getClientRpc();

static Builder newBuilder() {
return new Builder();
return newBuilder(null);
}

static Builder newBuilder(RaftClient client) {
return new Builder(client);
}

/** To build {@link DataStreamClient} objects */
Expand All @@ -51,10 +55,14 @@ class Builder {
private DataStreamClientRpc dataStreamClientRpc;
private RaftProperties properties;
private Parameters parameters;
private RaftGroupId raftGroupId;
private RaftGroupId groupId;
private ClientId clientId;

private Builder() {}
private final RaftClient client;

private Builder(RaftClient client) {
this.client = client;
}

public DataStreamClient build() {
Objects.requireNonNull(dataStreamServer, "The 'dataStreamServer' field is not initialized.");
Expand All @@ -65,18 +73,22 @@ public DataStreamClient build() {
.newDataStreamClientRpc(dataStreamServer, properties);
}
}
if (client != null) {
return ClientImplUtils.newDataStreamClient(
client, dataStreamServer, dataStreamClientRpc, properties);
}
return ClientImplUtils.newDataStreamClient(
Optional.ofNullable(clientId).orElseGet(ClientId::randomId),
raftGroupId, dataStreamServer, dataStreamClientRpc, properties);
groupId, dataStreamServer, dataStreamClientRpc, properties);
}

public Builder setClientId(ClientId clientId) {
this.clientId = clientId;
return this;
}

public Builder setRaftGroupId(RaftGroupId raftGroupId) {
this.raftGroupId = raftGroupId;
public Builder setGroupId(RaftGroupId groupId) {
this.groupId = groupId;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ public interface RaftClient extends Closeable {
/** @return the id of this client. */
ClientId getId();

/** @return the group id of this client. */
RaftGroupId getGroupId();

/** @return the cluster leaderId recorded by this client. */
RaftPeerId getLeaderId();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@ static DataStreamClient newDataStreamClient(ClientId clientId, RaftGroupId group
DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
return new DataStreamClientImpl(clientId, groupId, primaryDataStreamServer, dataStreamClientRpc, properties);
}

static DataStreamClient newDataStreamClient(RaftClient client, RaftPeer primaryDataStreamServer,
DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
return new DataStreamClientImpl(client, primaryDataStreamServer, dataStreamClientRpc, properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
*/
package org.apache.ratis.client.impl;

import org.apache.ratis.client.AsyncRpcApi;
import org.apache.ratis.client.DataStreamClient;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.io.StandardWriteOption;
import org.apache.ratis.io.WriteOption;
Expand Down Expand Up @@ -54,6 +57,7 @@
* allows client to create streams and send asynchronously.
*/
public class DataStreamClientImpl implements DataStreamClient {
private final RaftClient client;
private final ClientId clientId;
private final RaftGroupId groupId;

Expand All @@ -63,13 +67,24 @@ public class DataStreamClientImpl implements DataStreamClient {

DataStreamClientImpl(ClientId clientId, RaftGroupId groupId, RaftPeer dataStreamServer,
DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
this.client = null;
this.clientId = clientId;
this.groupId = groupId;
this.dataStreamServer = dataStreamServer;
this.dataStreamClientRpc = dataStreamClientRpc;
this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, properties);
}

DataStreamClientImpl(RaftClient client, RaftPeer dataStreamServer,
DataStreamClientRpc dataStreamClientRpc, RaftProperties properties) {
this.client = client;
this.clientId = client.getId();
this.groupId = client.getGroupId();
this.dataStreamServer = dataStreamServer;
this.dataStreamClientRpc = dataStreamClientRpc;
this.orderedStreamAsync = new OrderedStreamAsync(dataStreamClientRpc, properties);
}

public final class DataStreamOutputImpl implements DataStreamOutputRpc {
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> headerFuture;
Expand Down Expand Up @@ -127,7 +142,7 @@ private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, long leng
}
final CompletableFuture<DataStreamReply> f = combineHeader(send(Type.STREAM_DATA, data, length, options));
if (WriteOption.containsOption(options, StandardWriteOption.CLOSE)) {
closeFuture = f;
closeFuture = client != null? f.thenCompose(this::sendForward): f;
f.thenApply(ClientProtoUtils::getRaftClientReply).whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
}
streamOffset += length;
Expand Down Expand Up @@ -172,6 +187,23 @@ public CompletableFuture<RaftClientReply> getRaftClientReplyFuture() {
public WritableByteChannel getWritableByteChannel() {
return writableByteChannelSupplier.get();
}

private CompletableFuture<DataStreamReply> sendForward(DataStreamReply writeReply) {
if (!writeReply.isSuccess()) {
return CompletableFuture.completedFuture(writeReply);
}
final AsyncRpcApi asyncRpc = (AsyncRpcApi) client.async();
return asyncRpc.sendForward(header).thenApply(clientReply -> DataStreamReplyByteBuffer.newBuilder()
.setClientId(clientId)
.setType(writeReply.getType())
.setStreamId(writeReply.getStreamId())
.setStreamOffset(writeReply.getStreamOffset())
.setBuffer(ClientProtoUtils.toRaftClientReplyProto(clientReply).toByteString().asReadOnlyByteBuffer())
.setSuccess(clientReply.isSuccess())
.setBytesWritten(writeReply.getBytesWritten())
.setCommitInfos(clientReply.getCommitInfos())
.build());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,21 +170,21 @@ void set(Collection<RaftPeer> newPeers) {
this.messageStreamApi = JavaUtils.memoize(() -> MessageStreamImpl.newInstance(this, properties));
this.asyncApi = JavaUtils.memoize(() -> new AsyncImpl(this));
this.blockingApi = JavaUtils.memoize(() -> new BlockingImpl(this));
this.dataStreamApi = JavaUtils.memoize(() -> DataStreamClient.newBuilder()
.setClientId(clientId)
.setRaftGroupId(groupId)
this.dataStreamApi = JavaUtils.memoize(() -> DataStreamClient.newBuilder(this)
.setDataStreamServer(primaryDataStreamServer)
.setProperties(properties)
.setParameters(parameters)
.build());
this.adminApi = JavaUtils.memoize(() -> new AdminImpl(this));
}

@Override
public RaftPeerId getLeaderId() {
return leaderId;
}

RaftGroupId getGroupId() {
@Override
public RaftGroupId getGroupId() {
return groupId;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.ratis.netty.server;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.ratis.client.AsyncRpcApi;
import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.conf.RaftProperties;
Expand Down Expand Up @@ -324,14 +323,12 @@ static void close(DataStream stream) {
}

static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(DataStreamRequestByteBuf request,
RaftClientReply reply, long bytesWritten, Collection<CommitInfoProto> commitInfos) {
RaftClientReply reply) {
final ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
return DataStreamReplyByteBuffer.newBuilder()
.setDataStreamPacket(request)
.setBuffer(buffer)
.setSuccess(reply.isSuccess())
.setBytesWritten(bytesWritten)
.setCommitInfos(commitInfos)
.build();
}

Expand All @@ -349,28 +346,6 @@ static void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
ctx.writeAndFlush(builder.build());
}

private CompletableFuture<RaftClientReply> startTransaction(StreamInfo info, DataStreamRequestByteBuf request,
long bytesWritten, ChannelHandlerContext ctx) {
final RequestMetrics metrics = getMetrics().newRequestMetrics(RequestType.START_TRANSACTION);
final RequestContext context = metrics.start();
try {
AsyncRpcApi asyncRpcApi = (AsyncRpcApi) (server.getDivision(info.getRequest()
.getRaftGroupId())
.getRaftClient()
.async());
return asyncRpcApi.sendForward(info.request).whenCompleteAsync((reply, e) -> {
metrics.stop(context, e == null);
if (e != null) {
replyDataStreamException(server, e, info.getRequest(), request, ctx);
} else {
ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, bytesWritten, info.getCommitInfos()));
}
}, requestExecutor);
} catch (IOException e) {
throw new CompletionException(e);
}
}

static void replyDataStreamException(RaftServer server, Throwable cause, RaftClientRequest raftClientRequest,
DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
final RaftClientReply reply = RaftClientReply.newBuilder()
Expand All @@ -394,7 +369,7 @@ static void sendDataStreamException(Throwable throwable, DataStreamRequestByteBu
ChannelHandlerContext ctx) {
LOG.warn("Failed to process {}", request, throwable);
try {
ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, 0, null));
ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
} catch (Throwable t) {
LOG.warn("Failed to sendDataStreamException {} for {}", throwable, request, t);
}
Expand Down Expand Up @@ -452,15 +427,9 @@ private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ct
composeAsync(info.getPrevious(), requestExecutor, n -> JavaUtils.allOf(remoteWrites)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
if (request.getType() == Type.STREAM_HEADER
|| (request.getType() == Type.STREAM_DATA && !close)) {
|| request.getType() == Type.STREAM_DATA
|| close) {
sendReply(remoteWrites, request, bytesWritten, info.getCommitInfos(), ctx);
} else if (close) {
if (info.isPrimary()) {
// after all server close stream, primary server start transaction
startTransaction(info, request, bytesWritten, ctx);
} else {
sendReply(remoteWrites, request, bytesWritten, info.getCommitInfos(), ctx);
}
} else {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
}
Expand Down