From 8b14c1567794381778ddc500d17caeba7d9b8b46 Mon Sep 17 00:00:00 2001 From: Nicholas Jiang Date: Thu, 3 Oct 2019 00:11:01 +0800 Subject: [PATCH 1/4] ipc netty optimization --- .../hadoop/hbase/ipc/AbstractRpcClient.java | 11 +- .../hadoop/hbase/ipc/NettyRpcConnection.java | 2 + .../apache/hadoop/hbase/ipc/RpcClient.java | 9 + .../hbase/ipc/AbstractBatchDecoder.java | 442 ++++++++++++++++++ .../hbase/ipc/NettyRpcFrameDecoder.java | 3 +- .../hadoop/hbase/ipc/NettyRpcServer.java | 4 + .../apache/hadoop/hbase/ipc/RpcServer.java | 25 +- 7 files changed, 489 insertions(+), 7 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AbstractBatchDecoder.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 629efe6c203c..827076a21c18 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -128,6 +128,8 @@ public abstract class AbstractRpcClient implements RpcC protected final long failureSleep; // Time to sleep before retry on failure. protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpKeepAlive; // if T then use keepalives + protected final int bufferLowWatermark; + protected final int bufferHighWatermark; protected final Codec codec; protected final CompressionCodec compressor; protected final boolean fallbackAllowed; @@ -165,12 +167,15 @@ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress loc MetricsConnection metrics) { this.userProvider = UserProvider.instantiate(conf); this.localAddr = localAddr; - this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true); this.clusterId = clusterId != null ? clusterId : HConstants.CLUSTER_ID_DEFAULT; this.failureSleep = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE); - this.maxRetries = conf.getInt("hbase.ipc.client.connect.max.retries", 0); - this.tcpNoDelay = conf.getBoolean("hbase.ipc.client.tcpnodelay", true); + this.maxRetries = conf.getInt(CLIENT_CONNECT_MAX_RETRIES, 0); + this.tcpNoDelay = conf.getBoolean(CLIENT_TCP_NODELAY, true); + this.tcpKeepAlive = conf.getBoolean(CLIENT_TCP_KEEPALIVE, true); + this.bufferLowWatermark = conf.getInt(CLIENT_BUFFER_LOW_WATERMARK, DEFAULT_CLIENT_BUFFER_LOW_WATERMARK); + this.bufferHighWatermark = conf.getInt(CLIENT_BUFFER_HIGH_WATERMARK, DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK); + this.cellBlockBuilder = new CellBlockBuilder(conf); this.minIdleTimeBeforeClose = conf.getInt(IDLE_TIME, 120000); // 2 minutes diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index b8620b1b743c..a95d08758b10 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -36,6 +36,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler; import org.apache.hbase.thirdparty.io.netty.channel.ChannelOption; import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; +import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark; import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler; import org.apache.hbase.thirdparty.io.netty.util.ReferenceCountUtil; @@ -257,6 +258,7 @@ private void connect() { .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(rpcClient.bufferLowWatermark, rpcClient.bufferHighWatermark)) .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr) .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 0e006956d249..8b0e21213743 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -34,6 +34,15 @@ public interface RpcClient extends Closeable { String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry"; int FAILED_SERVER_EXPIRY_DEFAULT = 2000; + + String CLIENT_CONNECT_MAX_RETRIES = "hbase.ipc.client.connect.max.retries"; + String CLIENT_TCP_NODELAY = "hbase.ipc.client.tcpnodelay"; + String CLIENT_TCP_KEEPALIVE = "hbase.ipc.client.tcpkeepalive"; + String CLIENT_BUFFER_LOW_WATERMARK = "hbase.ipc.client.bufferlowwatermark"; + String CLIENT_BUFFER_HIGH_WATERMARK = "hbase.ipc.client.bufferhighwatermark"; + int DEFAULT_CLIENT_BUFFER_LOW_WATERMARK = 1024; + int DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK = 64 * 1024; + String IDLE_TIME = "hbase.ipc.client.connection.minIdleTimeBeforeClose"; String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "hbase.ipc.client.fallback-to-simple-auth-allowed"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AbstractBatchDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AbstractBatchDecoder.java new file mode 100644 index 000000000000..0bdc47150ec4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AbstractBatchDecoder.java @@ -0,0 +1,442 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; +import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; +import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf; +import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; +import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder; +import org.apache.hbase.thirdparty.io.netty.handler.codec.DecoderException; +import org.apache.hbase.thirdparty.io.netty.util.internal.RecyclableArrayList; +import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; + +/** + * This class mainly hack the {@link org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder} to provide batch submission capability. + * This can be used the same way as ByteToMessageDecoder except the case your following inbound handler may get a decoded msg, + * which actually is an array list, then you can submit the list of msgs to an executor to process. For example + *

+ *   if (msg instanceof List) {
+ *       processorManager.getDefaultExecutor().execute(new Runnable() {
+ *           public void run() {
+ *               // batch submit to an executor
+ *               for (Object m : (List) msg) {
+ *                   RpcCommandHandler.this.process(ctx, m);
+ *               }
+ *           }
+ *       });
+ *   } else {
+ *       process(ctx, msg);
+ *   }
+ * 
+ * You can check the method {@link AbstractBatchDecoder#channelRead(ChannelHandlerContext, Object)} ()} + * to know the detail modification. + */ +public abstract class AbstractBatchDecoder extends ChannelInboundHandlerAdapter { + /** + * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies. + */ + public static final Cumulator MERGE_CUMULATOR = new Cumulator() { + @Override + public ByteBuf cumulate(ByteBufAllocator alloc, + ByteBuf cumulation, + ByteBuf in) { + ByteBuf buffer; + if (cumulation.writerIndex() > cumulation + .maxCapacity() + - in.readableBytes() + || cumulation.refCnt() > 1) { + // Expand cumulation (by replace it) when either there is not more room in the buffer + // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or + // duplicate().retain(). + // + // See: + // - https://github.com/netty/netty/issues/2327 + // - https://github.com/netty/netty/issues/1764 + buffer = expandCumulation(alloc, + cumulation, + in.readableBytes()); + } else { + buffer = cumulation; + } + buffer.writeBytes(in); + in.release(); + return buffer; + } + }; + + /** + * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible. + * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case + * and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}. + */ + public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() { + @Override + public ByteBuf cumulate(ByteBufAllocator alloc, + ByteBuf cumulation, + ByteBuf in) { + ByteBuf buffer; + if (cumulation.refCnt() > 1) { + // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user + // use slice().retain() or duplicate().retain(). + // + // See: + // - https://github.com/netty/netty/issues/2327 + // - https://github.com/netty/netty/issues/1764 + buffer = expandCumulation(alloc, + cumulation, + in.readableBytes()); + buffer.writeBytes(in); + in.release(); + } else { + CompositeByteBuf composite; + if (cumulation instanceof CompositeByteBuf) { + composite = (CompositeByteBuf) cumulation; + } else { + int readable = cumulation + .readableBytes(); + composite = alloc + .compositeBuffer(); + composite.addComponent( + cumulation).writerIndex( + readable); + } + composite + .addComponent(in) + .writerIndex( + composite.writerIndex() + + in.readableBytes()); + buffer = composite; + } + return buffer; + } + }; + + ByteBuf cumulation; + private Cumulator cumulator = MERGE_CUMULATOR; + private boolean singleDecode; + private boolean decodeWasNull; + private boolean first; + private int discardAfterReads = 16; + private int numReads; + + /** + * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)} + * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up. + * + * Default is {@code false} as this has performance impacts. + */ + public void setSingleDecode(boolean singleDecode) { + this.singleDecode = singleDecode; + } + + /** + * If {@code true} then only one message is decoded on each + * {@link #channelRead(ChannelHandlerContext, Object)} call. + * + * Default is {@code false} as this has performance impacts. + */ + public boolean isSingleDecode() { + return singleDecode; + } + + /** + * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s. + */ + public void setCumulator(Cumulator cumulator) { + if (cumulator == null) { + throw new NullPointerException("cumulator"); + } + this.cumulator = cumulator; + } + + /** + * Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory. + * The default is {@code 16}. + */ + public void setDiscardAfterReads(int discardAfterReads) { + if (discardAfterReads <= 0) { + throw new IllegalArgumentException("discardAfterReads must be > 0"); + } + this.discardAfterReads = discardAfterReads; + } + + /** + * Returns the actual number of readable bytes in the internal cumulative + * buffer of this decoder. You usually do not need to rely on this value + * to write a decoder. Use it only when you must use it at your own risk. + * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}. + */ + protected int actualReadableBytes() { + return internalBuffer().readableBytes(); + } + + /** + * Returns the internal cumulative buffer of this decoder. You usually + * do not need to access the internal buffer directly to write a decoder. + * Use it only when you must use it at your own risk. + */ + protected ByteBuf internalBuffer() { + if (cumulation != null) { + return cumulation; + } else { + return Unpooled.EMPTY_BUFFER; + } + } + + @Override + public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + ByteBuf buf = internalBuffer(); + int readable = buf.readableBytes(); + if (readable > 0) { + ByteBuf bytes = buf.readBytes(readable); + buf.release(); + ctx.fireChannelRead(bytes); + } else { + buf.release(); + } + cumulation = null; + numReads = 0; + ctx.fireChannelReadComplete(); + handlerRemoved0(ctx); + } + + /** + * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle + * events anymore. + */ + protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { + } + + /** + * This method has been modified to check the size of decoded msgs, which is represented by the + * local variable {@code RecyclableArrayList out}. If has decoded more than one msg, + * then construct an array list to submit all decoded msgs to the pipeline. + * + * @param ctx channel handler context + * @param msg data + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof ByteBuf) { + RecyclableArrayList out = RecyclableArrayList.newInstance(); + try { + ByteBuf data = (ByteBuf) msg; + first = cumulation == null; + if (first) { + cumulation = data; + } else { + cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); + } + callDecode(ctx, cumulation, out); + } catch (DecoderException e) { + throw e; + } catch (Throwable t) { + throw new DecoderException(t); + } finally { + if (cumulation != null && !cumulation.isReadable()) { + numReads = 0; + cumulation.release(); + cumulation = null; + } else if (++numReads >= discardAfterReads) { + // We did enough reads already try to discard some bytes so we not risk to see a OOME. + // See https://github.com/netty/netty/issues/4275 + numReads = 0; + discardSomeReadBytes(); + } + + int size = out.size(); + if (size == 0) { + decodeWasNull = true; + } else if (size == 1) { + ctx.fireChannelRead(out.get(0)); + } else { + ArrayList ret = new ArrayList(size); + for (int i = 0; i < size; i++) { + ret.add(out.get(i)); + } + ctx.fireChannelRead(ret); + } + + out.recycle(); + } + } else { + ctx.fireChannelRead(msg); + } + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + numReads = 0; + discardSomeReadBytes(); + if (decodeWasNull) { + decodeWasNull = false; + if (!ctx.channel().config().isAutoRead()) { + ctx.read(); + } + } + ctx.fireChannelReadComplete(); + } + + protected final void discardSomeReadBytes() { + if (cumulation != null && !first && cumulation.refCnt() == 1) { + // discard some bytes if possible to make more room in the + // buffer but only if the refCnt == 1 as otherwise the user may have + // used slice().retain() or duplicate().retain(). + // + // See: + // - https://github.com/netty/netty/issues/2327 + // - https://github.com/netty/netty/issues/1764 + cumulation.discardSomeReadBytes(); + } + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + RecyclableArrayList out = RecyclableArrayList.newInstance(); + try { + if (cumulation != null) { + callDecode(ctx, cumulation, out); + decodeLast(ctx, cumulation, out); + } else { + decodeLast(ctx, Unpooled.EMPTY_BUFFER, out); + } + } catch (DecoderException e) { + throw e; + } catch (Exception e) { + throw new DecoderException(e); + } finally { + try { + if (cumulation != null) { + cumulation.release(); + cumulation = null; + } + int size = out.size(); + for (int i = 0; i < size; i++) { + ctx.fireChannelRead(out.get(i)); + } + if (size > 0) { + // Something was read, call fireChannelReadComplete() + ctx.fireChannelReadComplete(); + } + ctx.fireChannelInactive(); + } finally { + // recycle in all cases + out.recycle(); + } + } + } + + /** + * Called once data should be decoded from the given {@link ByteBuf}. This method will call + * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to + * @param in the {@link ByteBuf} from which to read data + * @param out the {@link List} to which decoded messages should be added + */ + protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List out) { + try { + while (in.isReadable()) { + int outSize = out.size(); + int oldInputLength = in.readableBytes(); + decode(ctx, in, out); + + // Check if this handler was removed before continuing the loop. + // If it was removed, it is not safe to continue to operate on the buffer. + // + // See https://github.com/netty/netty/issues/1664 + if (ctx.isRemoved()) { + break; + } + + if (outSize == out.size()) { + if (oldInputLength == in.readableBytes()) { + break; + } else { + continue; + } + } + + if (oldInputLength == in.readableBytes()) { + throw new DecoderException( + StringUtil.simpleClassName(getClass()) + + ".decode() did not read anything but decoded a message."); + } + + if (isSingleDecode()) { + break; + } + } + } catch (DecoderException e) { + throw e; + } catch (Throwable cause) { + throw new DecoderException(cause); + } + } + + /** + * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the + * {@link #channelInactive(ChannelHandlerContext)} was triggered. + * + * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may + * override this for some special cleanup operation. + */ + protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List out) + throws Exception { + decode(ctx, in, out); + } + + static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { + ByteBuf oldCumulation = cumulation; + cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); + cumulation.writeBytes(oldCumulation); + oldCumulation.release(); + return cumulation; + } + + /** + * Cumulate {@link ByteBuf}s. + */ + public interface Cumulator { + /** + * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes. + * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so + * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed. + */ + ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in); + } + + /** + * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input + * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input + * {@link ByteBuf}. + * + * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to + * @param in the {@link ByteBuf} from which to read data + * @param out the {@link List} to which decoded messages should be added + * @throws Exception is thrown if an error accour + */ + protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List out) + throws Exception; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java index 5ed3d2ef43f3..d7210e8f1a82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java @@ -28,7 +28,6 @@ import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; -import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder; import org.apache.hbase.thirdparty.io.netty.handler.codec.CorruptedFrameException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; @@ -40,7 +39,7 @@ * @since 2.0.0 */ @InterfaceAudience.Private -public class NettyRpcFrameDecoder extends ByteToMessageDecoder { +public class NettyRpcFrameDecoder extends AbstractBatchDecoder { private static int FRAME_LENGTH_FIELD_LENGTH = 4; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index bba1bede118f..30e824c9f6b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -24,6 +24,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.ServerChannel; +import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark; import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; @@ -89,8 +90,11 @@ public NettyRpcServer(Server server, String name, List() { @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java index f0409fd31f62..d2559af4e7c8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java @@ -147,9 +147,14 @@ public abstract class RpcServer implements RpcServerInterface, */ protected final LongAdder callQueueSizeInBytes = new LongAdder(); + protected final boolean tcpReuseAddr; // if T then reuse address protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpKeepAlive; // if T then use keepalives + protected final int tcpBacklog; + protected final int bufferLowWatermark; + protected final int bufferHighWatermark; + /** * This flag is used to indicate to sub threads when they should go down. When we call * {@link #start()}, all threads started will consult this flag on whether they should @@ -174,6 +179,14 @@ public abstract class RpcServer implements RpcServerInterface, protected static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time"; protected static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size"; + protected static final String SERVER_TCP_BACKLOG = "hbase.ipc.server.tcpbacklog"; + protected static final String SERVER_TCP_REUSEADDR = "hbase.ipc.server.tcpreuseaddr"; + protected static final String SERVER_TCP_NODELAY = "hbase.ipc.server.tcpnodelay"; + protected static final String SERVER_TCP_KEEPALIVE = "hbase.ipc.server.tcpkeepalive"; + + protected static final String SERVER_BUFFER_LOW_WATERMARK = "hbase.ipc.server.bufferlowwatermark"; + protected static final String SERVER_BUFFER_HIGH_WATERMARK = "hbase.ipc.server.bufferhighwatermark"; + /** * Minimum allowable timeout (in milliseconds) in rpc request's header. This * configuration exists to prevent the rpc service regarding this request as timeout immediately. @@ -186,6 +199,10 @@ public abstract class RpcServer implements RpcServerInterface, protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024; + protected static final int DEFAULT_SERVER_TCP_BACKLOG = 1024; + protected static final int DEFAULT_SERVER_BUFFER_LOW_WATERMARK = 1024; + protected static final int DEFAULT_SERVER_BUFFER_HIGH_WATERMARK = 64 * 1024; + protected static final int DEFAULT_TRACE_LOG_MAX_LENGTH = 1000; protected static final String TRACE_LOG_MAX_LENGTH = "hbase.ipc.trace.log.max.length"; protected static final String KEY_WORD_TRUNCATED = " "; @@ -273,8 +290,12 @@ public RpcServer(final Server server, final String name, this.maxRequestSize = conf.getInt(MAX_REQUEST_SIZE, DEFAULT_MAX_REQUEST_SIZE); this.metrics = new MetricsHBaseServer(name, new MetricsHBaseServerWrapperImpl(this)); - this.tcpNoDelay = conf.getBoolean("hbase.ipc.server.tcpnodelay", true); - this.tcpKeepAlive = conf.getBoolean("hbase.ipc.server.tcpkeepalive", true); + this.tcpBacklog = conf.getInt(SERVER_TCP_BACKLOG, DEFAULT_SERVER_TCP_BACKLOG); + this.tcpReuseAddr = conf.getBoolean(SERVER_TCP_REUSEADDR, true); + this.tcpNoDelay = conf.getBoolean(SERVER_TCP_NODELAY, true); + this.tcpKeepAlive = conf.getBoolean(SERVER_TCP_KEEPALIVE, true); + this.bufferLowWatermark = conf.getInt(SERVER_BUFFER_LOW_WATERMARK, DEFAULT_SERVER_BUFFER_LOW_WATERMARK); + this.bufferHighWatermark = conf.getInt(SERVER_BUFFER_HIGH_WATERMARK, DEFAULT_SERVER_BUFFER_HIGH_WATERMARK); this.cellBlockBuilder = new CellBlockBuilder(conf); From fb9bb62183f7a609c3c06d32c94404ee0ab6709f Mon Sep 17 00:00:00 2001 From: Nicholas Jiang Date: Fri, 4 Oct 2019 15:48:13 +0800 Subject: [PATCH 2/4] ipc netty optimization --- .../hbase/ipc/AbstractBatchDecoder.java | 180 +++++++++--------- 1 file changed, 90 insertions(+), 90 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AbstractBatchDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AbstractBatchDecoder.java index 0bdc47150ec4..e6ff993956f5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AbstractBatchDecoder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AbstractBatchDecoder.java @@ -50,40 +50,40 @@ * } * * You can check the method {@link AbstractBatchDecoder#channelRead(ChannelHandlerContext, Object)} ()} - * to know the detail modification. + * to know the detail modification. */ public abstract class AbstractBatchDecoder extends ChannelInboundHandlerAdapter { /** * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies. */ - public static final Cumulator MERGE_CUMULATOR = new Cumulator() { - @Override - public ByteBuf cumulate(ByteBufAllocator alloc, - ByteBuf cumulation, - ByteBuf in) { - ByteBuf buffer; - if (cumulation.writerIndex() > cumulation - .maxCapacity() - - in.readableBytes() - || cumulation.refCnt() > 1) { - // Expand cumulation (by replace it) when either there is not more room in the buffer - // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or - // duplicate().retain(). - // - // See: - // - https://github.com/netty/netty/issues/2327 - // - https://github.com/netty/netty/issues/1764 - buffer = expandCumulation(alloc, - cumulation, - in.readableBytes()); - } else { - buffer = cumulation; - } - buffer.writeBytes(in); - in.release(); - return buffer; - } - }; + public static final Cumulator MERGE_CUMULATOR = new Cumulator() { + @Override + public ByteBuf cumulate(ByteBufAllocator alloc, + ByteBuf cumulation, + ByteBuf in) { + ByteBuf buffer; + if (cumulation.writerIndex() > cumulation + .maxCapacity() + - in.readableBytes() + || cumulation.refCnt() > 1) { + // Expand cumulation (by replace it) when either there is not more room in the buffer + // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or + // duplicate().retain(). + // + // See: + // - https://github.com/netty/netty/issues/2327 + // - https://github.com/netty/netty/issues/1764 + buffer = expandCumulation(alloc, + cumulation, + in.readableBytes()); + } else { + buffer = cumulation; + } + buffer.writeBytes(in); + in.release(); + return buffer; + } + }; /** * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible. @@ -91,59 +91,59 @@ public ByteBuf cumulate(ByteBufAllocator alloc, * and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}. */ public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() { - @Override - public ByteBuf cumulate(ByteBufAllocator alloc, - ByteBuf cumulation, - ByteBuf in) { - ByteBuf buffer; - if (cumulation.refCnt() > 1) { - // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user - // use slice().retain() or duplicate().retain(). - // - // See: - // - https://github.com/netty/netty/issues/2327 - // - https://github.com/netty/netty/issues/1764 - buffer = expandCumulation(alloc, - cumulation, - in.readableBytes()); - buffer.writeBytes(in); - in.release(); - } else { - CompositeByteBuf composite; - if (cumulation instanceof CompositeByteBuf) { - composite = (CompositeByteBuf) cumulation; - } else { - int readable = cumulation - .readableBytes(); - composite = alloc - .compositeBuffer(); - composite.addComponent( - cumulation).writerIndex( - readable); - } - composite - .addComponent(in) - .writerIndex( - composite.writerIndex() - + in.readableBytes()); - buffer = composite; - } - return buffer; - } - }; + @Override + public ByteBuf cumulate(ByteBufAllocator alloc, + ByteBuf cumulation, + ByteBuf in) { + ByteBuf buffer; + if (cumulation.refCnt() > 1) { + // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user + // use slice().retain() or duplicate().retain(). + // + // See: + // - https://github.com/netty/netty/issues/2327 + // - https://github.com/netty/netty/issues/1764 + buffer = expandCumulation(alloc, + cumulation, + in.readableBytes()); + buffer.writeBytes(in); + in.release(); + } else { + CompositeByteBuf composite; + if (cumulation instanceof CompositeByteBuf) { + composite = (CompositeByteBuf) cumulation; + } else { + int readable = cumulation + .readableBytes(); + composite = alloc + .compositeBuffer(); + composite.addComponent( + cumulation).writerIndex( + readable); + } + composite + .addComponent(in) + .writerIndex( + composite.writerIndex() + + in.readableBytes()); + buffer = composite; + } + return buffer; + } + }; - ByteBuf cumulation; - private Cumulator cumulator = MERGE_CUMULATOR; - private boolean singleDecode; - private boolean decodeWasNull; - private boolean first; - private int discardAfterReads = 16; - private int numReads; + ByteBuf cumulation; + private Cumulator cumulator = MERGE_CUMULATOR; + private boolean singleDecode; + private boolean decodeWasNull; + private boolean first; + private int discardAfterReads = 16; + private int numReads; /** * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)} * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up. - * + *

* Default is {@code false} as this has performance impacts. */ public void setSingleDecode(boolean singleDecode) { @@ -153,7 +153,7 @@ public void setSingleDecode(boolean singleDecode) { /** * If {@code true} then only one message is decoded on each * {@link #channelRead(ChannelHandlerContext, Object)} call. - * + *

* Default is {@code false} as this has performance impacts. */ public boolean isSingleDecode() { @@ -351,9 +351,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { * Called once data should be decoded from the given {@link ByteBuf}. This method will call * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place. * - * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to - * @param in the {@link ByteBuf} from which to read data - * @param out the {@link List} to which decoded messages should be added + * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to + * @param in the {@link ByteBuf} from which to read data + * @param out the {@link List} to which decoded messages should be added */ protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List out) { try { @@ -380,8 +380,8 @@ protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List ou if (oldInputLength == in.readableBytes()) { throw new DecoderException( - StringUtil.simpleClassName(getClass()) - + ".decode() did not read anything but decoded a message."); + StringUtil.simpleClassName(getClass()) + + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { @@ -398,12 +398,12 @@ protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List ou /** * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the * {@link #channelInactive(ChannelHandlerContext)} was triggered. - * + *

* By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may * override this for some special cleanup operation. */ protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List out) - throws Exception { + throws Exception { decode(ctx, in, out); } @@ -432,11 +432,11 @@ public interface Cumulator { * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input * {@link ByteBuf}. * - * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to - * @param in the {@link ByteBuf} from which to read data - * @param out the {@link List} to which decoded messages should be added - * @throws Exception is thrown if an error accour + * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to + * @param in the {@link ByteBuf} from which to read data + * @param out the {@link List} to which decoded messages should be added + * @throws Exception is thrown if an error accour */ protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List out) - throws Exception; + throws Exception; } From 440289c4ed5540e3fdabfef269fcd998b9fd0c17 Mon Sep 17 00:00:00 2001 From: Nicholas Jiang Date: Sat, 5 Oct 2019 20:34:20 +0800 Subject: [PATCH 3/4] ipc netty optimization --- .../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 7 +++++++ .../org/apache/hadoop/hbase/ipc/NettyRpcClient.java | 13 +++++++++++++ .../apache/hadoop/hbase/ipc/NettyRpcConnection.java | 2 +- .../java/org/apache/hadoop/hbase/ipc/RpcClient.java | 9 --------- 4 files changed, 21 insertions(+), 10 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 827076a21c18..297578b01d3b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -20,6 +20,13 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE; import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException; +import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_BUFFER_LOW_WATERMARK; +import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_BUFFER_HIGH_WATERMARK; +import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_CONNECT_MAX_RETRIES; +import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_TCP_KEEPALIVE; +import static org.apache.hadoop.hbase.ipc.NettyRpcClient.CLIENT_TCP_NODELAY; +import static org.apache.hadoop.hbase.ipc.NettyRpcClient.DEFAULT_CLIENT_BUFFER_LOW_WATERMARK; +import static org.apache.hadoop.hbase.ipc.NettyRpcClient.DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java index 61dedbb5c124..588e6eef5402 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -19,6 +19,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.Channel; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; +import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark; import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel; import org.apache.hbase.thirdparty.io.netty.util.concurrent.DefaultThreadFactory; @@ -46,9 +47,21 @@ public class NettyRpcClient extends AbstractRpcClient { private final boolean shutdownGroupWhenClose; + protected final WriteBufferWaterMark writeBufferWaterMark; + + protected static final String CLIENT_CONNECT_MAX_RETRIES = "hbase.ipc.client.connect.max.retries"; + protected static final String CLIENT_TCP_NODELAY = "hbase.ipc.client.tcpnodelay"; + protected static final String CLIENT_TCP_KEEPALIVE = "hbase.ipc.client.tcpkeepalive"; + protected static final String CLIENT_BUFFER_LOW_WATERMARK = "hbase.ipc.client.bufferlowwatermark"; + protected static final String CLIENT_BUFFER_HIGH_WATERMARK = "hbase.ipc.client.bufferhighwatermark"; + + protected static final int DEFAULT_CLIENT_BUFFER_LOW_WATERMARK = 1024; + protected static final int DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK = 64 * 1024; + public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress, MetricsConnection metrics) { super(configuration, clusterId, localAddress, metrics); + this.writeBufferWaterMark = new WriteBufferWaterMark(bufferLowWatermark, bufferHighWatermark); Pair> groupAndChannelClass = NettyRpcClientConfigHelper .getEventLoopConfig(conf); if (groupAndChannelClass == null) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index a95d08758b10..4b0429cbee3a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -258,7 +258,7 @@ private void connect() { .option(ChannelOption.TCP_NODELAY, rpcClient.isTcpNoDelay()) .option(ChannelOption.SO_KEEPALIVE, rpcClient.tcpKeepAlive) .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, rpcClient.connectTO) - .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(rpcClient.bufferLowWatermark, rpcClient.bufferHighWatermark)) + .option(ChannelOption.WRITE_BUFFER_WATER_MARK, rpcClient.writeBufferWaterMark) .handler(new BufferCallBeforeInitHandler()).localAddress(rpcClient.localAddr) .remoteAddress(remoteId.address).connect().addListener(new ChannelFutureListener() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 8b0e21213743..0e006956d249 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -34,15 +34,6 @@ public interface RpcClient extends Closeable { String FAILED_SERVER_EXPIRY_KEY = "hbase.ipc.client.failed.servers.expiry"; int FAILED_SERVER_EXPIRY_DEFAULT = 2000; - - String CLIENT_CONNECT_MAX_RETRIES = "hbase.ipc.client.connect.max.retries"; - String CLIENT_TCP_NODELAY = "hbase.ipc.client.tcpnodelay"; - String CLIENT_TCP_KEEPALIVE = "hbase.ipc.client.tcpkeepalive"; - String CLIENT_BUFFER_LOW_WATERMARK = "hbase.ipc.client.bufferlowwatermark"; - String CLIENT_BUFFER_HIGH_WATERMARK = "hbase.ipc.client.bufferhighwatermark"; - int DEFAULT_CLIENT_BUFFER_LOW_WATERMARK = 1024; - int DEFAULT_CLIENT_BUFFER_HIGH_WATERMARK = 64 * 1024; - String IDLE_TIME = "hbase.ipc.client.connection.minIdleTimeBeforeClose"; String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "hbase.ipc.client.fallback-to-simple-auth-allowed"; From d7338490128c070529908f68fb7de8c967280ec6 Mon Sep 17 00:00:00 2001 From: Nicholas Jiang Date: Sat, 5 Oct 2019 20:55:40 +0800 Subject: [PATCH 4/4] revert netty decoder --- .../hbase/ipc/AbstractBatchDecoder.java | 442 ------------------ .../hbase/ipc/NettyRpcFrameDecoder.java | 3 +- 2 files changed, 2 insertions(+), 443 deletions(-) delete mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AbstractBatchDecoder.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AbstractBatchDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AbstractBatchDecoder.java deleted file mode 100644 index e6ff993956f5..000000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AbstractBatchDecoder.java +++ /dev/null @@ -1,442 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; -import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator; -import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf; -import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled; -import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; -import org.apache.hbase.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter; -import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder; -import org.apache.hbase.thirdparty.io.netty.handler.codec.DecoderException; -import org.apache.hbase.thirdparty.io.netty.util.internal.RecyclableArrayList; -import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil; - -/** - * This class mainly hack the {@link org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder} to provide batch submission capability. - * This can be used the same way as ByteToMessageDecoder except the case your following inbound handler may get a decoded msg, - * which actually is an array list, then you can submit the list of msgs to an executor to process. For example - *

- *   if (msg instanceof List) {
- *       processorManager.getDefaultExecutor().execute(new Runnable() {
- *           public void run() {
- *               // batch submit to an executor
- *               for (Object m : (List) msg) {
- *                   RpcCommandHandler.this.process(ctx, m);
- *               }
- *           }
- *       });
- *   } else {
- *       process(ctx, msg);
- *   }
- * 
- * You can check the method {@link AbstractBatchDecoder#channelRead(ChannelHandlerContext, Object)} ()} - * to know the detail modification. - */ -public abstract class AbstractBatchDecoder extends ChannelInboundHandlerAdapter { - /** - * Cumulate {@link ByteBuf}s by merge them into one {@link ByteBuf}'s, using memory copies. - */ - public static final Cumulator MERGE_CUMULATOR = new Cumulator() { - @Override - public ByteBuf cumulate(ByteBufAllocator alloc, - ByteBuf cumulation, - ByteBuf in) { - ByteBuf buffer; - if (cumulation.writerIndex() > cumulation - .maxCapacity() - - in.readableBytes() - || cumulation.refCnt() > 1) { - // Expand cumulation (by replace it) when either there is not more room in the buffer - // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or - // duplicate().retain(). - // - // See: - // - https://github.com/netty/netty/issues/2327 - // - https://github.com/netty/netty/issues/1764 - buffer = expandCumulation(alloc, - cumulation, - in.readableBytes()); - } else { - buffer = cumulation; - } - buffer.writeBytes(in); - in.release(); - return buffer; - } - }; - - /** - * Cumulate {@link ByteBuf}s by add them to a {@link CompositeByteBuf} and so do no memory copy whenever possible. - * Be aware that {@link CompositeByteBuf} use a more complex indexing implementation so depending on your use-case - * and the decoder implementation this may be slower then just use the {@link #MERGE_CUMULATOR}. - */ - public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() { - @Override - public ByteBuf cumulate(ByteBufAllocator alloc, - ByteBuf cumulation, - ByteBuf in) { - ByteBuf buffer; - if (cumulation.refCnt() > 1) { - // Expand cumulation (by replace it) when the refCnt is greater then 1 which may happen when the user - // use slice().retain() or duplicate().retain(). - // - // See: - // - https://github.com/netty/netty/issues/2327 - // - https://github.com/netty/netty/issues/1764 - buffer = expandCumulation(alloc, - cumulation, - in.readableBytes()); - buffer.writeBytes(in); - in.release(); - } else { - CompositeByteBuf composite; - if (cumulation instanceof CompositeByteBuf) { - composite = (CompositeByteBuf) cumulation; - } else { - int readable = cumulation - .readableBytes(); - composite = alloc - .compositeBuffer(); - composite.addComponent( - cumulation).writerIndex( - readable); - } - composite - .addComponent(in) - .writerIndex( - composite.writerIndex() - + in.readableBytes()); - buffer = composite; - } - return buffer; - } - }; - - ByteBuf cumulation; - private Cumulator cumulator = MERGE_CUMULATOR; - private boolean singleDecode; - private boolean decodeWasNull; - private boolean first; - private int discardAfterReads = 16; - private int numReads; - - /** - * If set then only one message is decoded on each {@link #channelRead(ChannelHandlerContext, Object)} - * call. This may be useful if you need to do some protocol upgrade and want to make sure nothing is mixed up. - *

- * Default is {@code false} as this has performance impacts. - */ - public void setSingleDecode(boolean singleDecode) { - this.singleDecode = singleDecode; - } - - /** - * If {@code true} then only one message is decoded on each - * {@link #channelRead(ChannelHandlerContext, Object)} call. - *

- * Default is {@code false} as this has performance impacts. - */ - public boolean isSingleDecode() { - return singleDecode; - } - - /** - * Set the {@link Cumulator} to use for cumulate the received {@link ByteBuf}s. - */ - public void setCumulator(Cumulator cumulator) { - if (cumulator == null) { - throw new NullPointerException("cumulator"); - } - this.cumulator = cumulator; - } - - /** - * Set the number of reads after which {@link ByteBuf#discardSomeReadBytes()} are called and so free up memory. - * The default is {@code 16}. - */ - public void setDiscardAfterReads(int discardAfterReads) { - if (discardAfterReads <= 0) { - throw new IllegalArgumentException("discardAfterReads must be > 0"); - } - this.discardAfterReads = discardAfterReads; - } - - /** - * Returns the actual number of readable bytes in the internal cumulative - * buffer of this decoder. You usually do not need to rely on this value - * to write a decoder. Use it only when you must use it at your own risk. - * This method is a shortcut to {@link #internalBuffer() internalBuffer().readableBytes()}. - */ - protected int actualReadableBytes() { - return internalBuffer().readableBytes(); - } - - /** - * Returns the internal cumulative buffer of this decoder. You usually - * do not need to access the internal buffer directly to write a decoder. - * Use it only when you must use it at your own risk. - */ - protected ByteBuf internalBuffer() { - if (cumulation != null) { - return cumulation; - } else { - return Unpooled.EMPTY_BUFFER; - } - } - - @Override - public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - ByteBuf buf = internalBuffer(); - int readable = buf.readableBytes(); - if (readable > 0) { - ByteBuf bytes = buf.readBytes(readable); - buf.release(); - ctx.fireChannelRead(bytes); - } else { - buf.release(); - } - cumulation = null; - numReads = 0; - ctx.fireChannelReadComplete(); - handlerRemoved0(ctx); - } - - /** - * Gets called after the {@link ByteToMessageDecoder} was removed from the actual context and it doesn't handle - * events anymore. - */ - protected void handlerRemoved0(ChannelHandlerContext ctx) throws Exception { - } - - /** - * This method has been modified to check the size of decoded msgs, which is represented by the - * local variable {@code RecyclableArrayList out}. If has decoded more than one msg, - * then construct an array list to submit all decoded msgs to the pipeline. - * - * @param ctx channel handler context - * @param msg data - */ - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - if (msg instanceof ByteBuf) { - RecyclableArrayList out = RecyclableArrayList.newInstance(); - try { - ByteBuf data = (ByteBuf) msg; - first = cumulation == null; - if (first) { - cumulation = data; - } else { - cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); - } - callDecode(ctx, cumulation, out); - } catch (DecoderException e) { - throw e; - } catch (Throwable t) { - throw new DecoderException(t); - } finally { - if (cumulation != null && !cumulation.isReadable()) { - numReads = 0; - cumulation.release(); - cumulation = null; - } else if (++numReads >= discardAfterReads) { - // We did enough reads already try to discard some bytes so we not risk to see a OOME. - // See https://github.com/netty/netty/issues/4275 - numReads = 0; - discardSomeReadBytes(); - } - - int size = out.size(); - if (size == 0) { - decodeWasNull = true; - } else if (size == 1) { - ctx.fireChannelRead(out.get(0)); - } else { - ArrayList ret = new ArrayList(size); - for (int i = 0; i < size; i++) { - ret.add(out.get(i)); - } - ctx.fireChannelRead(ret); - } - - out.recycle(); - } - } else { - ctx.fireChannelRead(msg); - } - } - - @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - numReads = 0; - discardSomeReadBytes(); - if (decodeWasNull) { - decodeWasNull = false; - if (!ctx.channel().config().isAutoRead()) { - ctx.read(); - } - } - ctx.fireChannelReadComplete(); - } - - protected final void discardSomeReadBytes() { - if (cumulation != null && !first && cumulation.refCnt() == 1) { - // discard some bytes if possible to make more room in the - // buffer but only if the refCnt == 1 as otherwise the user may have - // used slice().retain() or duplicate().retain(). - // - // See: - // - https://github.com/netty/netty/issues/2327 - // - https://github.com/netty/netty/issues/1764 - cumulation.discardSomeReadBytes(); - } - } - - @Override - public void channelInactive(ChannelHandlerContext ctx) throws Exception { - RecyclableArrayList out = RecyclableArrayList.newInstance(); - try { - if (cumulation != null) { - callDecode(ctx, cumulation, out); - decodeLast(ctx, cumulation, out); - } else { - decodeLast(ctx, Unpooled.EMPTY_BUFFER, out); - } - } catch (DecoderException e) { - throw e; - } catch (Exception e) { - throw new DecoderException(e); - } finally { - try { - if (cumulation != null) { - cumulation.release(); - cumulation = null; - } - int size = out.size(); - for (int i = 0; i < size; i++) { - ctx.fireChannelRead(out.get(i)); - } - if (size > 0) { - // Something was read, call fireChannelReadComplete() - ctx.fireChannelReadComplete(); - } - ctx.fireChannelInactive(); - } finally { - // recycle in all cases - out.recycle(); - } - } - } - - /** - * Called once data should be decoded from the given {@link ByteBuf}. This method will call - * {@link #decode(ChannelHandlerContext, ByteBuf, List)} as long as decoding should take place. - * - * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to - * @param in the {@link ByteBuf} from which to read data - * @param out the {@link List} to which decoded messages should be added - */ - protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List out) { - try { - while (in.isReadable()) { - int outSize = out.size(); - int oldInputLength = in.readableBytes(); - decode(ctx, in, out); - - // Check if this handler was removed before continuing the loop. - // If it was removed, it is not safe to continue to operate on the buffer. - // - // See https://github.com/netty/netty/issues/1664 - if (ctx.isRemoved()) { - break; - } - - if (outSize == out.size()) { - if (oldInputLength == in.readableBytes()) { - break; - } else { - continue; - } - } - - if (oldInputLength == in.readableBytes()) { - throw new DecoderException( - StringUtil.simpleClassName(getClass()) - + ".decode() did not read anything but decoded a message."); - } - - if (isSingleDecode()) { - break; - } - } - } catch (DecoderException e) { - throw e; - } catch (Throwable cause) { - throw new DecoderException(cause); - } - } - - /** - * Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the - * {@link #channelInactive(ChannelHandlerContext)} was triggered. - *

- * By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may - * override this for some special cleanup operation. - */ - protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List out) - throws Exception { - decode(ctx, in, out); - } - - static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) { - ByteBuf oldCumulation = cumulation; - cumulation = alloc.buffer(oldCumulation.readableBytes() + readable); - cumulation.writeBytes(oldCumulation); - oldCumulation.release(); - return cumulation; - } - - /** - * Cumulate {@link ByteBuf}s. - */ - public interface Cumulator { - /** - * Cumulate the given {@link ByteBuf}s and return the {@link ByteBuf} that holds the cumulated bytes. - * The implementation is responsible to correctly handle the life-cycle of the given {@link ByteBuf}s and so - * call {@link ByteBuf#release()} if a {@link ByteBuf} is fully consumed. - */ - ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in); - } - - /** - * Decode the from one {@link ByteBuf} to an other. This method will be called till either the input - * {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input - * {@link ByteBuf}. - * - * @param ctx the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to - * @param in the {@link ByteBuf} from which to read data - * @param out the {@link List} to which decoded messages should be added - * @throws Exception is thrown if an error accour - */ - protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List out) - throws Exception; -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java index d7210e8f1a82..5ed3d2ef43f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcFrameDecoder.java @@ -28,6 +28,7 @@ import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener; import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; +import org.apache.hbase.thirdparty.io.netty.handler.codec.ByteToMessageDecoder; import org.apache.hbase.thirdparty.io.netty.handler.codec.CorruptedFrameException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; @@ -39,7 +40,7 @@ * @since 2.0.0 */ @InterfaceAudience.Private -public class NettyRpcFrameDecoder extends AbstractBatchDecoder { +public class NettyRpcFrameDecoder extends ByteToMessageDecoder { private static int FRAME_LENGTH_FIELD_LENGTH = 4;