Skip to content

Conversation

@ivandika3
Copy link
Contributor

What changes were proposed in this pull request?

Problem

When benchmarking Ozone streaming pipeline using ozone freon ockg, the benchmark would not end although all the keys have been written. hdds.ratis.raft.netty.dataStream.client.worker-group.share is set to true to use the shared worker group optimization.

Using Arthas, it's found that non-daemon threads like "NettyClientStreamRpc-workerGroup–thread1" are still running even after the benchmark has finished. The root cause is that the shared worker group will never be closed, causing the JVM shutdown hook to never be triggered. The benchmark was able to shutdown normally if the share configuration is disabled.

Background

It seems that shared worker group is a lazily instantiated singleton. The shared worker group will be instantiated when the first WorkerGroupGetter is instantiated and passed to a new Connectionduring construction. Due to the nature of singleton, this worker group will be shared across all the Raft clients under the same Ozone client (please correct me if I'm wrong).

In the case of Ozone streaming write pipeline, every BlockDatastreamOutput (whose scope is a single block) will create a new RaftClient which corresponds to a single NettyClientStreamRpc instance. These RaftClients will share the shared worker group.

Solution

The current solution uses a "modified" reference counted EventLoopGroup using the ReferenceCountedObject interface. Previously, I was trying to use the implementation from ReferenceCountedObject#wrap. However, when the reference count is 1 and release() is invoked, it will completely release the object and throw exceptions for any further operations. This implementation does not seem to suit the use case.

The "modified" reference count instantiates the shared worker group whenever it's retained and the previous reference count is 0. It will also gracefully shutdown the worker group when it's released and the reference count becomes 0. it's retained whenever a new WorkerGroupGetter is instantiated (i.e when connection is created), and released when the connection is closed.

Technically, the worker group is not a singleton anymore, the shared worker group will be shared with all the connections, but will be removed and shutdown when all connections are removed, subsequent connections will use a new shared worker group. This should guarantee that there will be one worker group shared among the connections, but not necessarily the same instance.

The current solution is still a rough solution for reference what the solution might be, any advise is greatly appreciated.

Also enabled the hdds.ratis.raft.netty.dataStream.client.worker-group.share according to (https://github.com/szetszwo/ozone-benchmark/blob/master/benchmark-conf.xml).

What is the link to the Apache JIRA

https://issues.apache.org/jira/browse/RATIS-1921

How was this patch tested?

Existing unit tests (enable worker group sharing by default). Also tested using ozone freon ockg again.

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivandika3 , thanks a lot for working on this! Please see the comments inlined and also https://issues.apache.org/jira/secure/attachment/13063958/955_review.patch

private static class WorkerGroupGetter implements Supplier<EventLoopGroup> {
private static final AtomicReference<EventLoopGroup> SHARED_WORKER_GROUP = new AtomicReference<>();

private static class RefCountedWorkerGroup implements ReferenceCountedObject<EventLoopGroup> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should just use ReferenceCountedObject.warp(..).


String WORKER_GROUP_SHARE_KEY = PREFIX + ".worker-group.share";
boolean WORKER_GROUP_SHARE_DEFAULT = false;
boolean WORKER_GROUP_SHARE_DEFAULT = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is good to change the default to true.

}
}

private static final RefCountedWorkerGroup SHARED_WORKER_GROUP = new RefCountedWorkerGroup();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We needs AtomicReference so that a new SHARED_WORKER_GROUP can be created after an old one is completely released.

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivandika3 , thanks for the update! Just found that there were two problem in my review patch. Please see the comments inlined.

final Supplier<ReferenceCountedObject<EventLoopGroup>> supplier = MemoizedSupplier.valueOf(
() -> ReferenceCountedObject.wrap(newWorkerGroup((properties))));
final ReferenceCountedObject<EventLoopGroup> sharedWorkerGroup = SHARED_WORKER_GROUP.updateAndGet(
g -> g != null ? g : supplier.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should call retain().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review. Updated.

Comment on lines 84 to 85
private static final AtomicReference<ReferenceCountedObject<EventLoopGroup>> SHARED_WORKER_GROUP =
new AtomicReference<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need

    private static final AtomicReference<ReferenceCountedObject<Supplier<EventLoopGroup>>> SHARED_WORKER_GROUP
        = new AtomicReference<>();

since AtomicReference.updateAndGet(updateFunction) may call the updateFunction without using the result. The current code will then called newWorkerGroup(..) without shutting it down.

Use a MemoizedSupplier inside:

        final Supplier<ReferenceCountedObject<Supplier<EventLoopGroup>>> supplier = MemoizedSupplier.valueOf(
            () -> ReferenceCountedObject.wrap(MemoizedSupplier.valueOf(() -> newWorkerGroup(properties))));
        final ReferenceCountedObject<Supplier<EventLoopGroup>> sharedWorkerGroup = SHARED_WORKER_GROUP.updateAndGet(
            g -> g != null ? g : supplier.get());
        return new WorkerGroupGetter(sharedWorkerGroup.retain().get()) {
          ...
        };

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since AtomicReference.updateAndGet(updateFunction) may call the updateFunction without using the result. The current code will then called newWorkerGroup(..) without shutting it down.

Do you mean it's because AtomicReference.updateAndGet(updateFunction) might get re-applied, as mentioned in the AtomicReference documentation?

Since it may be re-applied when attempted updates fail due to contention among threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. When there are two or more threads changing an AtomicReference at the same time, the updateAndGet(..) method runs in a loop and may call the update function multiple times. In our previous code, it may create multiple EventLoopGroup objects and the old objects will not be shutdown.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the explanation.

@szetszwo
Copy link
Contributor

Another way is to use CompletableFuture as below. It may be easier to understand.

    private static final AtomicReference<CompletableFuture<ReferenceCountedObject<EventLoopGroup>>> SHARED_WORKER_GROUP
        = new AtomicReference<>();

    static WorkerGroupGetter newInstance(RaftProperties properties) {
      final boolean shared = NettyConfigKeys.DataStream.Client.workerGroupShare(properties);
      if (shared) {
        final CompletableFuture<ReferenceCountedObject<EventLoopGroup>> created = new CompletableFuture<>();
        final CompletableFuture<ReferenceCountedObject<EventLoopGroup>> current
            = SHARED_WORKER_GROUP.updateAndGet(g -> g != null ? g : created);
        if (current == created) {
          created.complete(ReferenceCountedObject.wrap(newWorkerGroup(properties)));
        }
        return new WorkerGroupGetter(current.join().retain()) {
          @Override
          void shutdownGracefully() {
            final CompletableFuture<ReferenceCountedObject<EventLoopGroup>> returned
                = SHARED_WORKER_GROUP.updateAndGet(previous -> {
              Preconditions.assertSame(current, previous, "SHARED_WORKER_GROUP");
              return previous.join().release() ? null : previous;
            });
            if (returned == null) {
              workerGroup.shutdownGracefully();
            }
          }
        };
      } else {
        return new WorkerGroupGetter(newWorkerGroup(properties));
      }
    }

@ivandika3
Copy link
Contributor Author

ivandika3 commented Oct 30, 2023

Thank you for the review and guidance @szetszwo.

I have updated it using the CompletableFuture version for easier unedrstanding, but found that Datastream tests regression that threw this error:

java.lang.IllegalStateException: Failed to get StreamInfo for DataStreamRequestByteBuf:clientId=client-FF2407A774AC,type=STREAM_DATA,id=0,offset=75998,length=607
	at org.apache.ratis.netty.server.DataStreamManagement.lambda$readImpl$11(DataStreamManagement.java:467)
	at java.base/java.util.Optional.orElseThrow(Optional.java:408)
	at org.apache.ratis.netty.server.DataStreamManagement.readImpl(DataStreamManagement.java:466)
	at org.apache.ratis.netty.server.DataStreamManagement.read(DataStreamManagement.java:433)
	at org.apache.ratis.netty.server.NettyServerStreamRpc$1.channelRead(NettyServerStreamRpc.java:236)

From my understanding, it might be because STREAM_HEADER was not sent, before the STREAM_DATA was received. I will try to resolve this.

Copy link
Contributor

@szetszwo szetszwo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 the change looks good.

@szetszwo
Copy link
Contributor

... it might be because STREAM_HEADER was not sent, before the STREAM_DATA was received. ...

@ivandika3 , when reviewing this, I found that we also have used AtomicReference/ConcurrentHashMap incorrectly in some other case; filed RATIS-1923.

If you could find some other bugs, please also file a JIRA. Thanks!

final int previous = count.get();
if (previous < 0) {
throw new IllegalStateException("Failed to get: object has already been completely released.");
} else if (previous == 0) {
Copy link
Contributor

@szetszwo szetszwo Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivandika3 , Sorry that this change is causing test failures for DataStream tests. Could you include the following fix?

+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -325,10 +325,13 @@ public class DataStreamManagement {
     long byteWritten = 0;
     for (ByteBuffer buffer : buf.nioBuffers()) {
       final ReferenceCountedObject<ByteBuffer> wrapped = ReferenceCountedObject.wrap(buffer, buf::retain, buf::release);
+      wrapped.retain();
       try {
         byteWritten += channel.write(wrapped);
       } catch (Throwable t) {
         throw new CompletionException(t);
+      } finally {
+        wrapped.release();
       }
     }

Copy link
Contributor Author

@ivandika3 ivandika3 Oct 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, yes I think this should resolve the issue, ran the unit tests in local.

Also tested without regression using ozone freon ockg.

@ivandika3 ivandika3 marked this pull request as ready for review October 30, 2023 23:15
final ReferenceCountedObject<ByteBuffer> wrapped = ReferenceCountedObject.wrap(buffer, buf::retain, buf::release);
try {
byteWritten += channel.write(wrapped);
byteWritten += channel.write(wrapped.retain());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivandika3 , we need to pass ReferenceCountedObject instead of ByteBuffer so that the implementation can also retain the buffer for later use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake, I have updated it accordingly.

@szetszwo
Copy link
Contributor

szetszwo commented Nov 1, 2023

TestNettyDataStreamStarTopologyWithGrpcCluster and TestNettyDataStreamChainTopologyWithGrpcCluster have failed with timeout in quite a few attempts. It may be related to changing the default of WORKER_GROUP_SHARE_KEY since these tests created quite many streams. We may need to either disable it in the tests or increase the workerGroupSize.

@ivandika3 , could you take a look?

@ivandika3
Copy link
Contributor Author

ivandika3 commented Nov 4, 2023

Hi @szetszwo , thank you for the suggestions. I have increased the worker group size to 100 (taken from https://github.com/szetszwo/ozone-benchmark/blob/master/benchmark-conf.xml).

TestNettyDataStreamChainTopologyWithGrpcCluster 400x: https://github.com/ivandika3/ratis/actions/runs/6752427405

Edit: We might want to use a fixed value for the worker group instead of depending on the number of processors.

@szetszwo szetszwo merged commit d673154 into apache:master Nov 4, 2023
@ivandika3 ivandika3 deleted the RATIS-1921 branch May 29, 2024 01:54
RexXiong pushed a commit to apache/celeborn that referenced this pull request May 30, 2024
### What changes were proposed in this pull request?

Bump Ratis version from 2.5.1 to 3.0.1. Address incompatible changes:

- RATIS-589. Eliminate buffer copying in SegmentedRaftLogOutputStream.(apache/ratis#964)
- RATIS-1677. Do not auto format RaftStorage in RECOVER.(apache/ratis#718)
- RATIS-1710. Refactor metrics api and implementation to separated modules. (apache/ratis#749)

### Why are the changes needed?

Bump Ratis version from 2.5.1 to 3.0.1. Ratis has released v3.0.0, v3.0.1, which release note refers to [3.0.0](https://ratis.apache.org/post/3.0.0.html), [3.0.1](https://ratis.apache.org/post/3.0.1.html). The 3.0.x version include new features like pluggable metrics and lease read, etc, some improvements and bugfixes including:

- 3.0.0: Change list of ratis 3.0.0 In total, there are roughly 100 commits diffing from 2.5.1 including:
   - Incompatible Changes
      - RaftStorage Auto-Format
      - RATIS-1677. Do not auto format RaftStorage in RECOVER. (apache/ratis#718)
      - RATIS-1694. Fix the compatibility issue of RATIS-1677. (apache/ratis#731)
      - RATIS-1871. Auto format RaftStorage when there is only one directory configured. (apache/ratis#903)
      - Pluggable Ratis-Metrics (RATIS-1688)
      - RATIS-1689. Remove the use of the thirdparty Gauge. (apache/ratis#728)
      - RATIS-1692. Remove the use of the thirdparty Counter. (apache/ratis#732)
      - RATIS-1693. Remove the use of the thirdparty Timer. (apache/ratis#734)
      - RATIS-1703. Move MetricsReporting and JvmMetrics to impl. (apache/ratis#741)
      - RATIS-1704. Fix SuppressWarnings(“VisibilityModifier”) in RatisMetrics. (apache/ratis#742)
      - RATIS-1710. Refactor metrics api and implementation to separated modules. (apache/ratis#749)
      - RATIS-1712. Add a dropwizard 3 implementation of ratis-metrics-api. (apache/ratis#751)
      - RATIS-1391. Update library dropwizard.metrics version to 4.x (apache/ratis#632)
      - RATIS-1601. Use the shaded dropwizard metrics and remove the dependency (apache/ratis#671)
      - Streaming Protocol Change
      - RATIS-1569. Move the asyncRpcApi.sendForward(..) call to the client side. (apache/ratis#635)
   - New Features
      - Leader Lease (RATIS-1864)
      - RATIS-1865. Add leader lease bound ratio configuration (apache/ratis#897)
      - RATIS-1866. Maintain leader lease after AppendEntries (apache/ratis#898)
      - RATIS-1894. Implement ReadOnly based on leader lease (apache/ratis#925)
      - RATIS-1882. Support read-after-write consistency (apache/ratis#913)
      - StateMachine API
      - RATIS-1874. Add notifyLeaderReady function in IStateMachine (apache/ratis#906)
      - RATIS-1897. Make TransactionContext available in DataApi.write(..). (apache/ratis#930)
      - New Configuration Properties
      - RATIS-1862. Add the parameter whether to take Snapshot when stopping to adapt to different services (apache/ratis#896)
      - RATIS-1930. Add a conf for enable/disable majority-add. (apache/ratis#961)
      - RATIS-1918. Introduces parameters that separately control the shutdown of RaftServerProxy by JVMPauseMonitor. (apache/ratis#950)
      - RATIS-1636. Support re-config ratis properties (apache/ratis#800)
      - RATIS-1860. Add ratis-shell cmd to generate a new raft-meta.conf. (apache/ratis#901)
   - Improvements & Bug Fixes
      - Netty
         - RATIS-1898. Netty should use EpollEventLoopGroup by default (apache/ratis#931)
         - RATIS-1899. Use EpollEventLoopGroup for Netty Proxies (apache/ratis#932)
         - RATIS-1921. Shared worker group in WorkerGroupGetter should be closed. (apache/ratis#955)
         - RATIS-1923. Netty: atomic operations require side-effect-free functions. (apache/ratis#956)
      - RaftServer
         - RATIS-1924. Increase the default of raft.server.log.segment.size.max. (apache/ratis#957)
         - RATIS-1892. Unify the lifetime of the RaftServerProxy thread pool (apache/ratis#923)
         - RATIS-1889. NoSuchMethodError: RaftServerMetricsImpl.addNumPendingRequestsGauge apache/ratis#922 (apache/ratis#922)
         - RATIS-761. Handle writeStateMachineData failure in leader. (apache/ratis#927)
         - RATIS-1902. The snapshot index is set incorrectly in InstallSnapshotReplyProto. (apache/ratis#933)
         - RATIS-1912. Fix infinity election when perform membership change. (apache/ratis#954)
         - RATIS-1858. Follower keeps logging first election timeout. (apache/ratis#894)

- 3.0.1:This is a bugfix release. See the [changes between 3.0.0 and 3.0.1](apache/ratis@ratis-3.0.0...ratis-3.0.1) releases.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Cluster manual test.

Closes #2480 from SteNicholas/CELEBORN-1400.

Authored-by: SteNicholas <programgeek@163.com>
Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants