From 02d8481c9953259a45921f01c964f62d90ca3245 Mon Sep 17 00:00:00 2001 From: guohao1 Date: Tue, 20 Jun 2023 14:51:27 +0800 Subject: [PATCH] RATIS-1850. When the stream server side receives a disconnection, memory is cleared --- .../apache/ratis/netty/NettyConfigKeys.java | 11 ++++++++++ .../netty/server/DataStreamManagement.java | 4 ++++ .../netty/server/NettyServerStreamRpc.java | 21 +++++++++++++++++++ 3 files changed, 36 insertions(+) diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java index 07e6621622..81a381c9c0 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyConfigKeys.java @@ -26,6 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import static org.apache.ratis.conf.ConfUtils.*; @@ -181,6 +182,16 @@ static int workerGroupSize(RaftProperties properties) { static void setWorkerGroupSize(RaftProperties properties, int num) { setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, num); } + + String CHANNEL_INACTIVE_GRACE_PERIOD_KEY = PREFIX + ".channel.inactive.grace-period"; + TimeDuration CHANNEL_INACTIVE_GRACE_PERIOD_DEFAULT = TimeDuration.valueOf(10, TimeUnit.MINUTES); + static TimeDuration channelInactiveGracePeriod(RaftProperties properties) { + return getTimeDuration(properties.getTimeDuration(CHANNEL_INACTIVE_GRACE_PERIOD_DEFAULT.getUnit()), + CHANNEL_INACTIVE_GRACE_PERIOD_KEY, CHANNEL_INACTIVE_GRACE_PERIOD_DEFAULT, getDefaultLog()); + } + static void setChannelInactiveGracePeriod(RaftProperties properties, TimeDuration timeoutDuration) { + setTimeDuration(properties::setTimeDuration, CHANNEL_INACTIVE_GRACE_PERIOD_KEY, timeoutDuration); + } } } diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java index b3f42bf32a..fdd6c502e1 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java @@ -383,6 +383,10 @@ static void sendDataStreamException(Throwable throwable, DataStreamRequestByteBu } } + void cleanUpOnChannelInactive(ClientInvocationId key) { + Optional.ofNullable(streams.remove(key)).ifPresent(removed -> removed.getLocal().cleanUp()); + } + void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, CheckedBiFunction, Set, IOException> getStreams) { LOG.debug("{}: read {}", this, request); diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java index 70cb470263..6e1c88c723 100644 --- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java +++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java @@ -28,6 +28,7 @@ import org.apache.ratis.netty.NettyUtils; import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics; import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.ClientInvocationId; import org.apache.ratis.protocol.DataStreamPacket; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.protocol.RaftPeer; @@ -59,6 +60,7 @@ import org.apache.ratis.util.PeerProxyMap; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.TimeDuration; +import org.apache.ratis.util.TimeoutExecutor; import org.apache.ratis.util.UncheckedAutoCloseable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -156,12 +158,18 @@ void close() { private final NettyServerStreamRpcMetrics metrics; + private final TimeDuration channelInactiveGracePeriod; + public NettyServerStreamRpc(RaftServer server, Parameters parameters) { this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass()); this.metrics = new NettyServerStreamRpcMetrics(this.name); this.requests = new DataStreamManagement(server, metrics); final RaftProperties properties = server.getProperties(); + + this.channelInactiveGracePeriod = NettyConfigKeys.DataStream.Server + .channelInactiveGracePeriod(properties); + this.proxies = new ProxiesPool(name, properties, parameters); final boolean useEpoll = NettyConfigKeys.DataStream.Server.useEpoll(properties); @@ -234,6 +242,19 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { } } + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + // Delayed memory garbage cleanup + Optional.ofNullable(requestRef.getAndSetNull()).ifPresent(request -> { + ClientInvocationId clientInvocationId = ClientInvocationId + .valueOf(request.getClientId(), request.getStreamId()); + TimeoutExecutor.getInstance().onTimeout(channelInactiveGracePeriod, + () -> requests.cleanUpOnChannelInactive(clientInvocationId), + LOG, () -> "Timeout check failed, clientInvocationId=" + + clientInvocationId); + }); + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) { Optional.ofNullable(requestRef.getAndSetNull())