Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import static org.apache.ratis.conf.ConfUtils.*;
Expand Down Expand Up @@ -181,6 +182,16 @@ static int workerGroupSize(RaftProperties properties) {
static void setWorkerGroupSize(RaftProperties properties, int num) {
setInt(properties::setInt, WORKER_GROUP_SIZE_KEY, num);
}

String CHANNEL_INACTIVE_GRACE_PERIOD_KEY = PREFIX + ".channel.inactive.grace-period";
TimeDuration CHANNEL_INACTIVE_GRACE_PERIOD_DEFAULT = TimeDuration.valueOf(10, TimeUnit.MINUTES);
static TimeDuration channelInactiveGracePeriod(RaftProperties properties) {
return getTimeDuration(properties.getTimeDuration(CHANNEL_INACTIVE_GRACE_PERIOD_DEFAULT.getUnit()),
CHANNEL_INACTIVE_GRACE_PERIOD_KEY, CHANNEL_INACTIVE_GRACE_PERIOD_DEFAULT, getDefaultLog());
}
static void setChannelInactiveGracePeriod(RaftProperties properties, TimeDuration timeoutDuration) {
setTimeDuration(properties::setTimeDuration, CHANNEL_INACTIVE_GRACE_PERIOD_KEY, timeoutDuration);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,10 @@ static void sendDataStreamException(Throwable throwable, DataStreamRequestByteBu
}
}

void cleanUpOnChannelInactive(ClientInvocationId key) {
Optional.ofNullable(streams.remove(key)).ifPresent(removed -> removed.getLocal().cleanUp());
}

void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
LOG.debug("{}: read {}", this, request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.ratis.netty.NettyUtils;
import org.apache.ratis.netty.metrics.NettyServerStreamRpcMetrics;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamPacket;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
Expand Down Expand Up @@ -59,6 +60,7 @@
import org.apache.ratis.util.PeerProxyMap;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -156,12 +158,18 @@ void close() {

private final NettyServerStreamRpcMetrics metrics;

private final TimeDuration channelInactiveGracePeriod;

public NettyServerStreamRpc(RaftServer server, Parameters parameters) {
this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass());
this.metrics = new NettyServerStreamRpcMetrics(this.name);
this.requests = new DataStreamManagement(server, metrics);

final RaftProperties properties = server.getProperties();

this.channelInactiveGracePeriod = NettyConfigKeys.DataStream.Server
.channelInactiveGracePeriod(properties);

this.proxies = new ProxiesPool(name, properties, parameters);

final boolean useEpoll = NettyConfigKeys.DataStream.Server.useEpoll(properties);
Expand Down Expand Up @@ -234,6 +242,19 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
}
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
// Delayed memory garbage cleanup
Optional.ofNullable(requestRef.getAndSetNull()).ifPresent(request -> {
ClientInvocationId clientInvocationId = ClientInvocationId
.valueOf(request.getClientId(), request.getStreamId());
TimeoutExecutor.getInstance().onTimeout(channelInactiveGracePeriod,
() -> requests.cleanUpOnChannelInactive(clientInvocationId),
LOG, () -> "Timeout check failed, clientInvocationId=" +
clientInvocationId);
});
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
Optional.ofNullable(requestRef.getAndSetNull())
Expand Down