Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,23 @@ static class Connection {
private final Supplier<ChannelInitializer<SocketChannel>> channelInitializerSupplier;

/** The {@link ChannelFuture} is null when this connection is closed. */
private final AtomicReference<ChannelFuture> ref;
private final AtomicReference<Supplier<ChannelFuture>> ref;

Connection(InetSocketAddress address, WorkerGroupGetter workerGroup,
Supplier<ChannelInitializer<SocketChannel>> channelInitializerSupplier) {
this.address = address;
this.workerGroup = workerGroup;
this.channelInitializerSupplier = channelInitializerSupplier;
this.ref = new AtomicReference<>(connect());
this.ref = new AtomicReference<>(MemoizedSupplier.valueOf(this::connect));
}

ChannelFuture getChannelFuture() {
final Supplier<ChannelFuture> referenced = ref.get();
return referenced != null? referenced.get(): null;
}

Channel getChannelUninterruptibly() {
final ChannelFuture future = ref.get();
final ChannelFuture future = getChannelFuture();
if (future == null) {
return null; //closed
}
Expand Down Expand Up @@ -176,27 +181,32 @@ void scheduleReconnect(String message, Throwable cause) {

private synchronized ChannelFuture reconnect() {
// concurrent reconnect double check
ChannelFuture channelFuture = ref.get();
final ChannelFuture channelFuture = getChannelFuture();
if (channelFuture != null) {
Channel channel = channelFuture.syncUninterruptibly().channel();
if (channel.isActive()) {
return channelFuture;
}
}

final MemoizedSupplier<ChannelFuture> supplier = MemoizedSupplier.valueOf(this::connect);
final ChannelFuture previous = ref.getAndUpdate(prev -> prev == null? null: supplier.get());
// Two levels of MemoizedSupplier as a side-effect-free function:
// AtomicReference.getAndUpdate may call the update function multiple times and discard the old objects.
// The outer supplier creates only an inner supplier, which can be discarded without any leakage.
// The inner supplier will be invoked (i.e. connect) ONLY IF it is successfully set to the reference.
final MemoizedSupplier<Supplier<ChannelFuture>> supplier = MemoizedSupplier.valueOf(
() -> MemoizedSupplier.valueOf(this::connect));
final Supplier<ChannelFuture> previous = ref.getAndUpdate(prev -> prev == null? null: supplier.get());
if (previous != null) {
previous.channel().close();
previous.get().channel().close();
}
return supplier.isInitialized() ? supplier.get() : null;
return getChannelFuture();
}

void close() {
final ChannelFuture previous = ref.getAndSet(null);
final Supplier<ChannelFuture> previous = ref.getAndSet(null);
if (previous != null) {
// wait channel closed, do shutdown workerGroup
previous.channel().close().addListener((future) -> workerGroup.shutdownGracefully());
previous.get().channel().close().addListener(future -> workerGroup.shutdownGracefully());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServer.Division;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.statemachine.StateMachine.DataChannel;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -275,22 +276,24 @@ public Set<ClientInvocationId> remove(ChannelId channelId) {
this.nettyServerStreamRpcMetrics = metrics;
}

private CompletableFuture<DataStream> stream(RaftClientRequest request, StateMachine stateMachine) {
final RequestMetrics metrics = getMetrics().newRequestMetrics(RequestType.STATE_MACHINE_STREAM);
final Timekeeper.Context context = metrics.start();
return stateMachine.data().stream(request)
.whenComplete((r, e) -> metrics.stop(context, e == null));
}

private CompletableFuture<DataStream> computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
final Division division = server.getDivision(request.getRaftGroupId());
final ClientInvocationId invocationId = ClientInvocationId.valueOf(request);
final MemoizedSupplier<CompletableFuture<DataStream>> supplier = JavaUtils.memoize(
() -> {
final RequestMetrics metrics = getMetrics().newRequestMetrics(RequestType.STATE_MACHINE_STREAM);
final Timekeeper.Context context = metrics.start();
return division.getStateMachine().data().stream(request)
.whenComplete((r, e) -> metrics.stop(context, e == null));
});
final CompletableFuture<DataStream> f = division.getDataStreamMap()
.computeIfAbsent(invocationId, key -> supplier.get());
if (!supplier.isInitialized()) {
final CompletableFuture<DataStream> created = new CompletableFuture<>();
final CompletableFuture<DataStream> returned = division.getDataStreamMap()
.computeIfAbsent(invocationId, key -> created);
if (returned != created) {
throw new AlreadyExistsException("A DataStream already exists for " + invocationId);
}
return f;
stream(request, division.getStateMachine()).whenComplete(JavaUtils.asBiConsumer(created));
return created;
}

private StreamInfo newStreamInfo(ByteBuf buf,
Expand Down