diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 629efe6c203c..297578b01d3b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -20,6 +20,13 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; +import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_BUFFER_LOW_WATERMARK; +import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_BUFFER_HIGH_WATERMARK; +import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_CONNECT_MAX_RETRIES; +import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_TCP_KEEPALIVE; +import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_TCP_NODELAY; +import static org.apache.hadoop.hbase.ipc.NettyRpcClient.DEFAULT_CLIENT_BUFFER_LOW_WATERMARK; +import static org.apache.hadoop.hbase.ipc.NettyRpcClient.DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; @@ -128,6 +135,8 @@ public abstract class AbstractRpcClient implements RpcC protected final long failureSleep; // Time to sleep before retry on failure. protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpKeepAlive; // if T then use keepalives + protected final int bufferLowWatermark; + protected final int bufferHighWatermark; protected final Codec codec; protected final CompressionCodec compressor; protected final boolean fallbackAllowed; @@ -165,12 +174,15 @@ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress loc MetricsConnection metrics) { this.userProvider = UserProvider.instantiate(conf); this.localAddr = localAddr; - this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); - this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); + this.maxRetries = conf.getInt(CLIENT_CONNECT_MAX_RETRIES, 0); + this.tcpNoDelay = conf.getBoolean(CLIENT_TCP_NODELAY, true); + this.tcpKeepAlive = conf.getBoolean(CLIENT_TCP_KEEPALIVE, true); + this.bufferLowWatermark = conf.getInt(CLIENT_BUFFER_LOW_WATERMARK, DEFAULT_CLIENT_BUFFER_LOW_WATERMARK); + this.bufferHighWatermark = conf.getInt(CLIENT_BUFFER_HIGH_WATERMARK, DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK); + this.cellBlockBuilder = new CellBlockBuilder(conf); this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java index 61dedbb5c124..588e6eef5402 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -19,6 +19,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; @@ -46,9 +47,21 @@ public class NettyRpcClient extends AbstractRpcClient { private final boolean shutdownGroupWhenClose; + protected final WriteBufferWaterMark writeBufferWaterMark; + + protected static final String CLIENT_CONNECT_MAX_RETRIES = "hbase.ipc.client.connect.max.retries"; + protected static final String CLIENT_TCP_NODELAY = "hbase.ipc.client.tcpnodelay"; + protected static final String CLIENT_TCP_KEEPALIVE = "hbase.ipc.client.tcpkeepalive"; + protected static final String CLIENT_BUFFER_LOW_WATERMARK = "hbase.ipc.client.bufferlowwatermark"; + protected static final String CLIENT_BUFFER_HIGH_WATERMARK = "hbase.ipc.client.bufferhighwatermark"; + + protected static final int DEFAULT_CLIENT_BUFFER_LOW_WATERMARK = 1024; + protected static final int DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK = 64 * 1024; + public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, MetricsConnection metrics) { super(configuration, clusterId, localAddress, metrics); + this.writeBufferWaterMark = new WriteBufferWaterMark(bufferLowWatermark, bufferHighWatermark); Pair> groupAndChannelClass = NettyRpcClientConfigHelper .getEventLoopConfig(conf); if (groupAndChannelClass == null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index b8620b1b743c..4b0429cbee3a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -36,6 +36,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; +import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark; import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil; @@ -257,6 +258,7 @@ private void connect() { .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, rpcClient.writeBufferWaterMark) .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr) .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index bba1bede118f..30e824c9f6b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -24,6 +24,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel; +import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark; import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; @@ -89,8 +90,11 @@ public NettyRpcServer(Server server, String name, List() { @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index f0409fd31f62..d2559af4e7c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -147,9 +147,14 @@ public abstract class RpcServer implements RpcServerInterface, */ protected final LongAdder callQueueSizeInBytes = new LongAdder(); + protected final boolean tcpReuseAddr; // if T then reuse address protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpKeepAlive; // if T then use keepalives + protected final int tcpBacklog; + protected final int bufferLowWatermark; + protected final int bufferHighWatermark; + /** * This flag is used to indicate to sub threads when they should go down. When we call * {@link #start()}, all threads started will consult this flag on whether they should @@ -174,6 +179,14 @@ public abstract class RpcServer implements RpcServerInterface, protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; + protected static final String SERVER_TCP_BACKLOG = "hbase.ipc.server.tcpbacklog"; + protected static final String SERVER_TCP_REUSEADDR = "hbase.ipc.server.tcpreuseaddr"; + protected static final String SERVER_TCP_NODELAY = "hbase.ipc.server.tcpnodelay"; + protected static final String SERVER_TCP_KEEPALIVE = "hbase.ipc.server.tcpkeepalive"; + + protected static final String SERVER_BUFFER_LOW_WATERMARK = "hbase.ipc.server.bufferlowwatermark"; + protected static final String SERVER_BUFFER_HIGH_WATERMARK = "hbase.ipc.server.bufferhighwatermark"; + /** * Minimum allowable timeout (in milliseconds) in rpc request's header. This * configuration exists to prevent the rpc service regarding this request as timeout immediately. @@ -186,6 +199,10 @@ public abstract class RpcServer implements RpcServerInterface, protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; + protected static final int DEFAULT_SERVER_TCP_BACKLOG = 1024; + protected static final int DEFAULT_SERVER_BUFFER_LOW_WATERMARK = 1024; + protected static final int DEFAULT_SERVER_BUFFER_HIGH_WATERMARK = 64 * 1024; + protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000; protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length"; protected static final String KEY_WORD_TRUNCATED = " "; @@ -273,8 +290,12 @@ public RpcServer(final Server server, final String name, this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE); this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this)); - this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true); - this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true); + this.tcpBacklog = conf.getInt(SERVER_TCP_BACKLOG, DEFAULT_SERVER_TCP_BACKLOG); + this.tcpReuseAddr = conf.getBoolean(SERVER_TCP_REUSEADDR, true); + this.tcpNoDelay = conf.getBoolean(SERVER_TCP_NODELAY, true); + this.tcpKeepAlive = conf.getBoolean(SERVER_TCP_KEEPALIVE, true); + this.bufferLowWatermark = conf.getInt(SERVER_BUFFER_LOW_WATERMARK, DEFAULT_SERVER_BUFFER_LOW_WATERMARK); + this.bufferHighWatermark = conf.getInt(SERVER_BUFFER_HIGH_WATERMARK, DEFAULT_SERVER_BUFFER_HIGH_WATERMARK); this.cellBlockBuilder = new CellBlockBuilder(conf);