From d419a1b9d3a21df2aa2f5001e009ff3f59ce7f9e Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Fri, 27 Oct 2023 11:55:56 +0800 Subject: [PATCH 01/10] RATIS-1921. Shared worker group in WorkerGroupGetter should be closed --- .../apache/ratis/netty/NettyConfigKeys.java | 2 +- .../netty/client/NettyClientStreamRpc.java | 67 +++++++++++++++++-- 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java index 98b1a6d747..be3ad8ee67 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java @@ -158,7 +158,7 @@ static void setWorkerGroupSize(RaftProperties properties, int clientWorkerGroupS } String WORKER_GROUP_SHARE_KEY = PREFIX + ".worker-group.share"; - boolean WORKER_GROUP_SHARE_DEFAULT = false; + boolean WORKER_GROUP_SHARE_DEFAULT = true; static boolean workerGroupShare(RaftProperties properties) { return getBoolean(properties::getBoolean, WORKER_GROUP_SHARE_KEY, WORKER_GROUP_SHARE_DEFAULT, getDefaultLog()); diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java index e4c154fd21..a0e2877fef 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java @@ -56,6 +56,7 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.NetUtils; +import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -68,6 +69,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -78,7 +80,53 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class); private static class WorkerGroupGetter implements Supplier { - private static final AtomicReference SHARED_WORKER_GROUP = new AtomicReference<>(); + + private static class RefCountedWorkerGroup implements ReferenceCountedObject { + private final AtomicInteger count = new AtomicInteger(); + private final AtomicReference value = new AtomicReference<>(); + + @Override + public synchronized EventLoopGroup get() { + if (count.get() < 0) { + throw new IllegalStateException("Failed to get: object has already been completely released."); + } + return value.get(); + } + + @Override + public synchronized EventLoopGroup retain() { + // n < 0: exception + // n >= 0: n++ + final int previous = count.getAndUpdate(n -> n < 0? n : n + 1); + if (previous < 0) { + throw new IllegalStateException("Failed to retain: object has already been completely released."); + } else if (previous == 0) { + // TODO: Find a way to pass RaftProperties + // New shared worker group will be created when previously there is + // no active connections + return value.updateAndGet(g -> g != null ? g: newWorkerGroup(new RaftProperties())); + } + return value.get(); + } + + @Override + public synchronized boolean release() { + // n <= 0: exception + // n >= 1: n-- + final int previous = count.getAndUpdate(n -> n <= 0? -1: n - 1); + if (previous <= 0) { + throw new IllegalStateException("Failed to release: object has already been completely released."); + } else if (previous == 1) { + // Shutdown the event loop group when there are no active connection, + // subsequent retain will create a new shared worker group. + EventLoopGroup previousEventLoopGroup = value.getAndSet(null); + previousEventLoopGroup.shutdownGracefully(); + } + return previous == 1; + } + } + + private static final RefCountedWorkerGroup SHARED_WORKER_GROUP = new RefCountedWorkerGroup(); static EventLoopGroup newWorkerGroup(RaftProperties properties) { return NettyUtils.newEventLoopGroup( @@ -88,25 +136,30 @@ static EventLoopGroup newWorkerGroup(RaftProperties properties) { } private final EventLoopGroup workerGroup; - private final boolean ignoreShutdown; + private final boolean isSharedWorkerGroup; WorkerGroupGetter(RaftProperties properties) { - if (NettyConfigKeys.DataStream.Client.workerGroupShare(properties)) { - workerGroup = SHARED_WORKER_GROUP.updateAndGet(g -> g != null? g: newWorkerGroup(properties)); - ignoreShutdown = true; + isSharedWorkerGroup = NettyConfigKeys.DataStream.Client.workerGroupShare(properties); + if (isSharedWorkerGroup) { + SHARED_WORKER_GROUP.retain(); + workerGroup = null; } else { workerGroup = newWorkerGroup(properties); - ignoreShutdown = false; } } @Override public EventLoopGroup get() { + if (isSharedWorkerGroup) { + return SHARED_WORKER_GROUP.get(); + } return workerGroup; } void shutdownGracefully() { - if (!ignoreShutdown) { + if (isSharedWorkerGroup) { + SHARED_WORKER_GROUP.release(); + } else { workerGroup.shutdownGracefully(); } } From 2a63a7290f510a7dcc28d46302512f9596aed57b Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sun, 29 Oct 2023 08:34:15 +0800 Subject: [PATCH 02/10] Apply patch suggestion --- .../ratis/util/ReferenceCountedObject.java | 10 +- .../netty/client/NettyClientStreamRpc.java | 97 ++++++------------- 2 files changed, 40 insertions(+), 67 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java index 8b0c859037..fec82f0999 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java @@ -62,6 +62,11 @@ public interface ReferenceCountedObject { */ boolean release(); + /** The same as wrap(value, EMPTY, EMPTY), where EMPTY is an empty method. */ + static ReferenceCountedObject wrap(V value) { + return wrap(value, () -> {}, () -> {}); + } + /** * Wrap the given value as a {@link ReferenceCountedObject}. * @@ -81,8 +86,11 @@ static ReferenceCountedObject wrap(V value, Runnable retainMethod, Runnab @Override public V get() { - if (count.get() < 0) { + final int previous = count.get(); + if (previous < 0) { throw new IllegalStateException("Failed to get: object has already been completely released."); + } else if (previous == 0) { + throw new IllegalStateException("Failed to get: object has not yet been retained."); } return value; } diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java index a0e2877fef..d76345c1a3 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java @@ -56,6 +56,7 @@ import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.MemoizedSupplier; import org.apache.ratis.util.NetUtils; +import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; @@ -69,7 +70,6 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; @@ -81,53 +81,33 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { private static class WorkerGroupGetter implements Supplier { - private static class RefCountedWorkerGroup implements ReferenceCountedObject { - private final AtomicInteger count = new AtomicInteger(); - private final AtomicReference value = new AtomicReference<>(); - - @Override - public synchronized EventLoopGroup get() { - if (count.get() < 0) { - throw new IllegalStateException("Failed to get: object has already been completely released."); - } - return value.get(); - } - - @Override - public synchronized EventLoopGroup retain() { - // n < 0: exception - // n >= 0: n++ - final int previous = count.getAndUpdate(n -> n < 0? n : n + 1); - if (previous < 0) { - throw new IllegalStateException("Failed to retain: object has already been completely released."); - } else if (previous == 0) { - // TODO: Find a way to pass RaftProperties - // New shared worker group will be created when previously there is - // no active connections - return value.updateAndGet(g -> g != null ? g: newWorkerGroup(new RaftProperties())); - } - return value.get(); - } - - @Override - public synchronized boolean release() { - // n <= 0: exception - // n >= 1: n-- - final int previous = count.getAndUpdate(n -> n <= 0? -1: n - 1); - if (previous <= 0) { - throw new IllegalStateException("Failed to release: object has already been completely released."); - } else if (previous == 1) { - // Shutdown the event loop group when there are no active connection, - // subsequent retain will create a new shared worker group. - EventLoopGroup previousEventLoopGroup = value.getAndSet(null); - previousEventLoopGroup.shutdownGracefully(); - } - return previous == 1; + private static final AtomicReference> SHARED_WORKER_GROUP = + new AtomicReference<>(); + + static WorkerGroupGetter newInstance(RaftProperties properties) { + final boolean shared = NettyConfigKeys.DataStream.Client.workerGroupShare(properties); + if (shared) { + final Supplier> supplier = MemoizedSupplier.valueOf( + () -> ReferenceCountedObject.wrap(newWorkerGroup((properties)))); + final ReferenceCountedObject sharedWorkerGroup = SHARED_WORKER_GROUP.updateAndGet( + g -> g != null ? g : supplier.get()); + return new WorkerGroupGetter(sharedWorkerGroup.get()) { + @Override + void shutdownGracefully() { + final ReferenceCountedObject returned = SHARED_WORKER_GROUP.updateAndGet(ref -> { + Preconditions.assertSame(sharedWorkerGroup, ref, "SHARED_WORKER_GROUP"); + return ref.release() ? null : ref; + }); + if (returned == null) { + workerGroup.shutdownGracefully(); + } + } + }; + } else { + return new WorkerGroupGetter(newWorkerGroup(properties)); } } - private static final RefCountedWorkerGroup SHARED_WORKER_GROUP = new RefCountedWorkerGroup(); - static EventLoopGroup newWorkerGroup(RaftProperties properties) { return NettyUtils.newEventLoopGroup( JavaUtils.getClassSimpleName(NettyClientStreamRpc.class) + "-workerGroup", @@ -135,33 +115,19 @@ static EventLoopGroup newWorkerGroup(RaftProperties properties) { NettyConfigKeys.DataStream.Client.useEpoll(properties)); } - private final EventLoopGroup workerGroup; - private final boolean isSharedWorkerGroup; + final EventLoopGroup workerGroup; - WorkerGroupGetter(RaftProperties properties) { - isSharedWorkerGroup = NettyConfigKeys.DataStream.Client.workerGroupShare(properties); - if (isSharedWorkerGroup) { - SHARED_WORKER_GROUP.retain(); - workerGroup = null; - } else { - workerGroup = newWorkerGroup(properties); - } + private WorkerGroupGetter(EventLoopGroup workerGroup) { + this.workerGroup = workerGroup; } @Override - public EventLoopGroup get() { - if (isSharedWorkerGroup) { - return SHARED_WORKER_GROUP.get(); - } + public final EventLoopGroup get() { return workerGroup; } void shutdownGracefully() { - if (isSharedWorkerGroup) { - SHARED_WORKER_GROUP.release(); - } else { - workerGroup.shutdownGracefully(); - } + workerGroup.shutdownGracefully(); } } @@ -310,8 +276,7 @@ public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties pro final InetSocketAddress address = NetUtils.createSocketAddr(server.getDataStreamAddress()); final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf); - this.connection = new Connection(address, - new WorkerGroupGetter(properties), + this.connection = new Connection(address, WorkerGroupGetter.newInstance(properties), () -> newChannelInitializer(address, sslContext, getClientHandler())); } From ac9cafd21b6e3f03c2908ec576b4d25abafd67e6 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 30 Oct 2023 20:54:09 +0800 Subject: [PATCH 03/10] Using retain --- .../org/apache/ratis/netty/client/NettyClientStreamRpc.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java index d76345c1a3..09445f59d3 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java @@ -91,7 +91,7 @@ static WorkerGroupGetter newInstance(RaftProperties properties) { () -> ReferenceCountedObject.wrap(newWorkerGroup((properties)))); final ReferenceCountedObject sharedWorkerGroup = SHARED_WORKER_GROUP.updateAndGet( g -> g != null ? g : supplier.get()); - return new WorkerGroupGetter(sharedWorkerGroup.get()) { + return new WorkerGroupGetter(sharedWorkerGroup.retain()) { @Override void shutdownGracefully() { final ReferenceCountedObject returned = SHARED_WORKER_GROUP.updateAndGet(ref -> { From 9627b30554d70b33e4a2422274e6a937dc19e3a6 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 30 Oct 2023 20:59:44 +0800 Subject: [PATCH 04/10] Use Reference counted supplier --- .../ratis/netty/client/NettyClientStreamRpc.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java index 09445f59d3..85d54f8d69 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java @@ -81,20 +81,20 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { private static class WorkerGroupGetter implements Supplier { - private static final AtomicReference> SHARED_WORKER_GROUP = - new AtomicReference<>(); + private static final AtomicReference>> SHARED_WORKER_GROUP + = new AtomicReference<>(); static WorkerGroupGetter newInstance(RaftProperties properties) { final boolean shared = NettyConfigKeys.DataStream.Client.workerGroupShare(properties); if (shared) { - final Supplier> supplier = MemoizedSupplier.valueOf( - () -> ReferenceCountedObject.wrap(newWorkerGroup((properties)))); - final ReferenceCountedObject sharedWorkerGroup = SHARED_WORKER_GROUP.updateAndGet( + final Supplier>> supplier = MemoizedSupplier.valueOf( + () -> ReferenceCountedObject.wrap(MemoizedSupplier.valueOf(() -> newWorkerGroup(properties)))); + final ReferenceCountedObject> sharedWorkerGroup = SHARED_WORKER_GROUP.updateAndGet( g -> g != null ? g : supplier.get()); - return new WorkerGroupGetter(sharedWorkerGroup.retain()) { + return new WorkerGroupGetter(sharedWorkerGroup.retain().get()) { @Override void shutdownGracefully() { - final ReferenceCountedObject returned = SHARED_WORKER_GROUP.updateAndGet(ref -> { + final ReferenceCountedObject> returned = SHARED_WORKER_GROUP.updateAndGet(ref -> { Preconditions.assertSame(sharedWorkerGroup, ref, "SHARED_WORKER_GROUP"); return ref.release() ? null : ref; }); From c6b75f43087a4123ed0bd92d0c3d69d54dd7a020 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 30 Oct 2023 21:33:08 +0800 Subject: [PATCH 05/10] Use CompletableFuture --- .../netty/client/NettyClientStreamRpc.java | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java index 85d54f8d69..21f1fd38cc 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java @@ -81,22 +81,25 @@ public class NettyClientStreamRpc implements DataStreamClientRpc { private static class WorkerGroupGetter implements Supplier { - private static final AtomicReference>> SHARED_WORKER_GROUP + private static final AtomicReference>> SHARED_WORKER_GROUP = new AtomicReference<>(); static WorkerGroupGetter newInstance(RaftProperties properties) { final boolean shared = NettyConfigKeys.DataStream.Client.workerGroupShare(properties); if (shared) { - final Supplier>> supplier = MemoizedSupplier.valueOf( - () -> ReferenceCountedObject.wrap(MemoizedSupplier.valueOf(() -> newWorkerGroup(properties)))); - final ReferenceCountedObject> sharedWorkerGroup = SHARED_WORKER_GROUP.updateAndGet( - g -> g != null ? g : supplier.get()); - return new WorkerGroupGetter(sharedWorkerGroup.retain().get()) { + final CompletableFuture> created = new CompletableFuture<>(); + final CompletableFuture> current + = SHARED_WORKER_GROUP.updateAndGet(g -> g != null ? g : created); + if (current == created) { + created.complete(ReferenceCountedObject.wrap(newWorkerGroup(properties))); + } + return new WorkerGroupGetter(current.join().retain()) { @Override void shutdownGracefully() { - final ReferenceCountedObject> returned = SHARED_WORKER_GROUP.updateAndGet(ref -> { - Preconditions.assertSame(sharedWorkerGroup, ref, "SHARED_WORKER_GROUP"); - return ref.release() ? null : ref; + final CompletableFuture> returned + = SHARED_WORKER_GROUP.updateAndGet(previous -> { + Preconditions.assertSame(current, previous, "SHARED_WORKER_GROUP"); + return previous.join().release() ? null : previous; }); if (returned == null) { workerGroup.shutdownGracefully(); From f39e74667c36fb710766343dfbd3c78da8a1901d Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 30 Oct 2023 22:40:48 +0800 Subject: [PATCH 06/10] Use private field --- .../org/apache/ratis/netty/client/NettyClientStreamRpc.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java index 21f1fd38cc..41d862a302 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java @@ -102,7 +102,7 @@ void shutdownGracefully() { return previous.join().release() ? null : previous; }); if (returned == null) { - workerGroup.shutdownGracefully(); + get().shutdownGracefully(); } } }; @@ -118,7 +118,7 @@ static EventLoopGroup newWorkerGroup(RaftProperties properties) { NettyConfigKeys.DataStream.Client.useEpoll(properties)); } - final EventLoopGroup workerGroup; + private final EventLoopGroup workerGroup; private WorkerGroupGetter(EventLoopGroup workerGroup) { this.workerGroup = workerGroup; From 974a3b1f8a6e0f6392499a20d8f29cf680da03ab Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Mon, 30 Oct 2023 22:44:46 +0800 Subject: [PATCH 07/10] Update TestReferenceCountedObject --- .../org/apache/ratis/util/TestReferenceCountedObject.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java index 448212154c..5a855857a7 100644 --- a/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java +++ b/ratis-test/src/test/java/org/apache/ratis/util/TestReferenceCountedObject.java @@ -47,7 +47,12 @@ public void testWrap() { value, retained::getAndIncrement, released::getAndIncrement); assertValues(retained, 0, released, 0); - Assert.assertEquals(value, ref.get()); + try { + ref.get(); + Assert.fail(); + } catch (IllegalStateException e) { + e.printStackTrace(System.out); + } assertValues(retained, 0, released, 0); Assert.assertEquals(value, ref.retain()); From bd80f35f11bcd7a0521cb079f605812dd45f8970 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Tue, 31 Oct 2023 07:15:09 +0800 Subject: [PATCH 08/10] Retain and release buffer in DataStreamManagement --- .../org/apache/ratis/netty/server/DataStreamManagement.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index a4cc537ddc..7e4a2f1895 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -326,9 +326,11 @@ static long writeTo(ByteBuf buf, Iterable options, for (ByteBuffer buffer : buf.nioBuffers()) { final ReferenceCountedObject wrapped = ReferenceCountedObject.wrap(buffer, buf::retain, buf::release); try { - byteWritten += channel.write(wrapped); + byteWritten += channel.write(wrapped.retain()); } catch (Throwable t) { throw new CompletionException(t); + } finally { + wrapped.release(); } } From e9e1c0fd9e8b66870d8f6406b50df24e27d8be96 Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Tue, 31 Oct 2023 11:59:39 +0800 Subject: [PATCH 09/10] Pass ReferenceCountedObject instead of ByteBuffer to retain buffer --- .../org/apache/ratis/netty/server/DataStreamManagement.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index 7e4a2f1895..ad6bc4e02e 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -325,8 +325,9 @@ static long writeTo(ByteBuf buf, Iterable options, long byteWritten = 0; for (ByteBuffer buffer : buf.nioBuffers()) { final ReferenceCountedObject wrapped = ReferenceCountedObject.wrap(buffer, buf::retain, buf::release); + wrapped.retain(); try { - byteWritten += channel.write(wrapped.retain()); + byteWritten += channel.write(wrapped); } catch (Throwable t) { throw new CompletionException(t); } finally { From 42863c748852f2a5fadeed0f300cf044c1f5733e Mon Sep 17 00:00:00 2001 From: Ivan Andika Date: Sat, 4 Nov 2023 11:06:59 +0800 Subject: [PATCH 10/10] Set worker group size --- ...DataStreamChainTopologyWithGrpcCluster.java | 18 ++++++++++++++++++ ...yDataStreamStarTopologyWithGrpcCluster.java | 3 +++ 2 files changed, 21 insertions(+) diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java index e4e9fef575..31b28b4c2d 100644 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamChainTopologyWithGrpcCluster.java @@ -17,7 +17,25 @@ */ package org.apache.ratis.datastream; +import org.apache.ratis.client.RaftClientConfigKeys; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.netty.NettyConfigKeys; +import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; +import org.junit.Before; + public class TestNettyDataStreamChainTopologyWithGrpcCluster extends DataStreamAsyncClusterTests implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet { + + @Before + public void setup() { + final RaftProperties p = getProperties(); + RaftClientConfigKeys.DataStream.setRequestTimeout(p, TimeDuration.ONE_MINUTE); + RaftClientConfigKeys.DataStream.setFlushRequestCountMin(p, 4); + RaftClientConfigKeys.DataStream.setFlushRequestBytesMin(p, SizeInBytes.valueOf("10MB")); + RaftClientConfigKeys.DataStream.setOutstandingRequestsMax(p, 2 << 16); + + NettyConfigKeys.DataStream.Client.setWorkerGroupSize(p,100); + } } diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java index 14c62b74f6..45247d489a 100644 --- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java +++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java @@ -19,6 +19,7 @@ import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.netty.NettyConfigKeys; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.RoutingTable; @@ -41,6 +42,8 @@ public void setup() { RaftClientConfigKeys.DataStream.setFlushRequestCountMin(p, 4); RaftClientConfigKeys.DataStream.setFlushRequestBytesMin(p, SizeInBytes.valueOf("10MB")); RaftClientConfigKeys.DataStream.setOutstandingRequestsMax(p, 2 << 16); + + NettyConfigKeys.DataStream.Client.setWorkerGroupSize(p,100); } @Override