Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public interface ReferenceCountedObject<T> {
*/
boolean release();

/** The same as wrap(value, EMPTY, EMPTY), where EMPTY is an empty method. */
static <V> ReferenceCountedObject<V> wrap(V value) {
return wrap(value, () -> {}, () -> {});
}

/**
* Wrap the given value as a {@link ReferenceCountedObject}.
*
Expand All @@ -81,8 +86,11 @@ static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, Runnab

@Override
public V get() {
if (count.get() < 0) {
final int previous = count.get();
if (previous < 0) {
throw new IllegalStateException("Failed to get: object has already been completely released.");
} else if (previous == 0) {
Copy link
Contributor

@szetszwo szetszwo Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivandika3 , Sorry that this change is causing test failures for DataStream tests. Could you include the following fix?

+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -325,10 +325,13 @@ public class DataStreamManagement {
     long byteWritten = 0;
     for (ByteBuffer buffer : buf.nioBuffers()) {
       final ReferenceCountedObject<ByteBuffer> wrapped = ReferenceCountedObject.wrap(buffer, buf::retain, buf::release);
+      wrapped.retain();
       try {
         byteWritten += channel.write(wrapped);
       } catch (Throwable t) {
         throw new CompletionException(t);
+      } finally {
+        wrapped.release();
       }
     }

Copy link
Contributor Author

@ivandika3 ivandika3 Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, yes I think this should resolve the issue, ran the unit tests in local.

Also tested without regression using ozone freon ockg.

throw new IllegalStateException("Failed to get: object has not yet been retained.");
}
return value;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ static void setWorkerGroupSize(RaftProperties properties, int clientWorkerGroupS
}

String WORKER_GROUP_SHARE_KEY = PREFIX + ".worker-group.share";
boolean WORKER_GROUP_SHARE_DEFAULT = false;
boolean WORKER_GROUP_SHARE_DEFAULT = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is good to change the default to true.

static boolean workerGroupShare(RaftProperties properties) {
return getBoolean(properties::getBoolean, WORKER_GROUP_SHARE_KEY,
WORKER_GROUP_SHARE_DEFAULT, getDefaultLog());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.slf4j.Logger;
Expand All @@ -78,7 +80,36 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class);

private static class WorkerGroupGetter implements Supplier<EventLoopGroup> {
private static final AtomicReference<EventLoopGroup> SHARED_WORKER_GROUP = new AtomicReference<>();

private static final AtomicReference<CompletableFuture<ReferenceCountedObject<EventLoopGroup>>> SHARED_WORKER_GROUP
= new AtomicReference<>();

static WorkerGroupGetter newInstance(RaftProperties properties) {
final boolean shared = NettyConfigKeys.DataStream.Client.workerGroupShare(properties);
if (shared) {
final CompletableFuture<ReferenceCountedObject<EventLoopGroup>> created = new CompletableFuture<>();
final CompletableFuture<ReferenceCountedObject<EventLoopGroup>> current
= SHARED_WORKER_GROUP.updateAndGet(g -> g != null ? g : created);
if (current == created) {
created.complete(ReferenceCountedObject.wrap(newWorkerGroup(properties)));
}
return new WorkerGroupGetter(current.join().retain()) {
@Override
void shutdownGracefully() {
final CompletableFuture<ReferenceCountedObject<EventLoopGroup>> returned
= SHARED_WORKER_GROUP.updateAndGet(previous -> {
Preconditions.assertSame(current, previous, "SHARED_WORKER_GROUP");
return previous.join().release() ? null : previous;
});
if (returned == null) {
get().shutdownGracefully();
}
}
};
} else {
return new WorkerGroupGetter(newWorkerGroup(properties));
}
}

static EventLoopGroup newWorkerGroup(RaftProperties properties) {
return NettyUtils.newEventLoopGroup(
Expand All @@ -88,27 +119,18 @@ static EventLoopGroup newWorkerGroup(RaftProperties properties) {
}

private final EventLoopGroup workerGroup;
private final boolean ignoreShutdown;

WorkerGroupGetter(RaftProperties properties) {
if (NettyConfigKeys.DataStream.Client.workerGroupShare(properties)) {
workerGroup = SHARED_WORKER_GROUP.updateAndGet(g -> g != null? g: newWorkerGroup(properties));
ignoreShutdown = true;
} else {
workerGroup = newWorkerGroup(properties);
ignoreShutdown = false;
}
private WorkerGroupGetter(EventLoopGroup workerGroup) {
this.workerGroup = workerGroup;
}

@Override
public EventLoopGroup get() {
public final EventLoopGroup get() {
return workerGroup;
}

void shutdownGracefully() {
if (!ignoreShutdown) {
workerGroup.shutdownGracefully();
}
workerGroup.shutdownGracefully();
}
}

Expand Down Expand Up @@ -257,8 +279,7 @@ public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties pro

final InetSocketAddress address = NetUtils.createSocketAddr(server.getDataStreamAddress());
final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf);
this.connection = new Connection(address,
new WorkerGroupGetter(properties),
this.connection = new Connection(address, WorkerGroupGetter.newInstance(properties),
() -> newChannelInitializer(address, sslContext, getClientHandler()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,13 @@ static long writeTo(ByteBuf buf, Iterable<WriteOption> options,
long byteWritten = 0;
for (ByteBuffer buffer : buf.nioBuffers()) {
final ReferenceCountedObject<ByteBuffer> wrapped = ReferenceCountedObject.wrap(buffer, buf::retain, buf::release);
wrapped.retain();
try {
byteWritten += channel.write(wrapped);
} catch (Throwable t) {
throw new CompletionException(t);
} finally {
wrapped.release();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,25 @@
*/
package org.apache.ratis.datastream;

import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.junit.Before;

public class TestNettyDataStreamChainTopologyWithGrpcCluster
extends DataStreamAsyncClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet {

@Before
public void setup() {
final RaftProperties p = getProperties();
RaftClientConfigKeys.DataStream.setRequestTimeout(p, TimeDuration.ONE_MINUTE);
RaftClientConfigKeys.DataStream.setFlushRequestCountMin(p, 4);
RaftClientConfigKeys.DataStream.setFlushRequestBytesMin(p, SizeInBytes.valueOf("10MB"));
RaftClientConfigKeys.DataStream.setOutstandingRequestsMax(p, 2 << 16);

NettyConfigKeys.DataStream.Client.setWorkerGroupSize(p,100);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RoutingTable;
Expand All @@ -41,6 +42,8 @@ public void setup() {
RaftClientConfigKeys.DataStream.setFlushRequestCountMin(p, 4);
RaftClientConfigKeys.DataStream.setFlushRequestBytesMin(p, SizeInBytes.valueOf("10MB"));
RaftClientConfigKeys.DataStream.setOutstandingRequestsMax(p, 2 << 16);

NettyConfigKeys.DataStream.Client.setWorkerGroupSize(p,100);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@ public void testWrap() {
value, retained::getAndIncrement, released::getAndIncrement);

assertValues(retained, 0, released, 0);
Assert.assertEquals(value, ref.get());
try {
ref.get();
Assert.fail();
} catch (IllegalStateException e) {
e.printStackTrace(System.out);
}
assertValues(retained, 0, released, 0);

Assert.assertEquals(value, ref.retain());
Expand Down