From 244fd65d9e95f7693d717c2461c35b28574e2d84 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Mon, 30 Oct 2023 14:27:23 -0700 Subject: [PATCH 1/2] RATIS-1923. Netty: atomic operations require side-effect-free functions. --- .../netty/client/NettyClientStreamRpc.java | 30 ++++++++++++------- .../netty/server/DataStreamManagement.java | 24 ++++++++------- 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java index e4c154fd21..6efe326757 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java @@ -120,18 +120,23 @@ static class Connection { private final Supplier> channelInitializerSupplier; /** The {@link ChannelFuture} is null when this connection is closed. */ - private final AtomicReference ref; + private final AtomicReference> ref; Connection(InetSocketAddress address, WorkerGroupGetter workerGroup, Supplier> channelInitializerSupplier) { this.address = address; this.workerGroup = workerGroup; this.channelInitializerSupplier = channelInitializerSupplier; - this.ref = new AtomicReference<>(connect()); + this.ref = new AtomicReference<>(MemoizedSupplier.valueOf(this::connect)); + } + + ChannelFuture getChannelFuture() { + final Supplier referenced = ref.get(); + return referenced != null? referenced.get(): null; } Channel getChannelUninterruptibly() { - final ChannelFuture future = ref.get(); + final ChannelFuture future = getChannelFuture(); if (future == null) { return null; //closed } @@ -176,7 +181,7 @@ void scheduleReconnect(String message, Throwable cause) { private synchronized ChannelFuture reconnect() { // concurrent reconnect double check - ChannelFuture channelFuture = ref.get(); + final ChannelFuture channelFuture = getChannelFuture(); if (channelFuture != null) { Channel channel = channelFuture.syncUninterruptibly().channel(); if (channel.isActive()) { @@ -184,19 +189,24 @@ private synchronized ChannelFuture reconnect() { } } - final MemoizedSupplier supplier = MemoizedSupplier.valueOf(this::connect); - final ChannelFuture previous = ref.getAndUpdate(prev -> prev == null? null: supplier.get()); + // Two levels of MemoizedSupplier for as a side-effect-free function: + // AtomicReference.getAndUpdate may call the update function multiple times and discard the old objects. + // The outer supplier creates only an inner supplier, which can be discarded without any leakage. + // The inner supplier will be invoked (i.e. connect) ONLY IF it is successfully set to the reference. + final MemoizedSupplier> supplier = MemoizedSupplier.valueOf( + () -> MemoizedSupplier.valueOf(this::connect)); + final Supplier previous = ref.getAndUpdate(prev -> prev == null? null: supplier.get()); if (previous != null) { - previous.channel().close(); + previous.get().channel().close(); } - return supplier.isInitialized() ? supplier.get() : null; + return getChannelFuture(); } void close() { - final ChannelFuture previous = ref.getAndSet(null); + final Supplier previous = ref.getAndSet(null); if (previous != null) { // wait channel closed, do shutdown workerGroup - previous.channel().close().addListener((future) -> workerGroup.shutdownGracefully()); + previous.get().channel().close().addListener(future -> workerGroup.shutdownGracefully()); } } diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index a4cc537ddc..33a62d5212 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -46,6 +46,7 @@ import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServer.Division; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.StateMachine.DataStream; import org.apache.ratis.statemachine.StateMachine.DataChannel; import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf; @@ -275,22 +276,23 @@ public Set remove(ChannelId channelId) { this.nettyServerStreamRpcMetrics = metrics; } + private CompletableFuture stream(RaftClientRequest request, StateMachine stateMachine) { + final RequestMetrics metrics = getMetrics().newRequestMetrics(RequestType.STATE_MACHINE_STREAM); + final Timekeeper.Context context = metrics.start(); + return stateMachine.data().stream(request) + .whenComplete((r, e) -> metrics.stop(context, e == null)); + } + private CompletableFuture computeDataStreamIfAbsent(RaftClientRequest request) throws IOException { final Division division = server.getDivision(request.getRaftGroupId()); final ClientInvocationId invocationId = ClientInvocationId.valueOf(request); - final MemoizedSupplier> supplier = JavaUtils.memoize( - () -> { - final RequestMetrics metrics = getMetrics().newRequestMetrics(RequestType.STATE_MACHINE_STREAM); - final Timekeeper.Context context = metrics.start(); - return division.getStateMachine().data().stream(request) - .whenComplete((r, e) -> metrics.stop(context, e == null)); - }); - final CompletableFuture f = division.getDataStreamMap() - .computeIfAbsent(invocationId, key -> supplier.get()); - if (!supplier.isInitialized()) { + final CompletableFuture created = new CompletableFuture<>(); + final CompletableFuture returned = division.getDataStreamMap().computeIfAbsent(invocationId, key -> created); + if (returned != created) { throw new AlreadyExistsException("A DataStream already exists for " + invocationId); } - return f; + stream(request, division.getStateMachine()).whenComplete(JavaUtils.asBiConsumer(created)); + return created; } private StreamInfo newStreamInfo(ByteBuf buf, From d210400ceaa322446e23a3662b4889adbe37e297 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Mon, 30 Oct 2023 20:53:39 -0700 Subject: [PATCH 2/2] Fix checkstyle --- .../org/apache/ratis/netty/client/NettyClientStreamRpc.java | 2 +- .../org/apache/ratis/netty/server/DataStreamManagement.java | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java index 6efe326757..e6ce29a461 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java @@ -189,7 +189,7 @@ private synchronized ChannelFuture reconnect() { } } - // Two levels of MemoizedSupplier for as a side-effect-free function: + // Two levels of MemoizedSupplier as a side-effect-free function: // AtomicReference.getAndUpdate may call the update function multiple times and discard the old objects. // The outer supplier creates only an inner supplier, which can be discarded without any leakage. // The inner supplier will be invoked (i.e. connect) ONLY IF it is successfully set to the reference. diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index 33a62d5212..958a1ad45d 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -287,7 +287,8 @@ private CompletableFuture computeDataStreamIfAbsent(RaftClientReques final Division division = server.getDivision(request.getRaftGroupId()); final ClientInvocationId invocationId = ClientInvocationId.valueOf(request); final CompletableFuture created = new CompletableFuture<>(); - final CompletableFuture returned = division.getDataStreamMap().computeIfAbsent(invocationId, key -> created); + final CompletableFuture returned = division.getDataStreamMap() + .computeIfAbsent(invocationId, key -> created); if (returned != created) { throw new AlreadyExistsException("A DataStream already exists for " + invocationId); }