From 1dd6513c8d6dbc067beb5dcecec5fc8ba4cb45ec Mon Sep 17 00:00:00 2001 From: ukumawat Date: Fri, 28 Nov 2025 18:20:20 +0530 Subject: [PATCH 1/5] HBASE-29141 Resolution for high default maxQueueLength for call queues --- .../hbase/ipc/BalancedQueueRpcExecutor.java | 8 ++-- .../ipc/FastPathBalancedQueueRpcExecutor.java | 8 ++-- .../hbase/ipc/FastPathRWQueueRpcExecutor.java | 4 +- .../hbase/ipc/MetaRWQueueRpcExecutor.java | 7 ++-- .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 7 ++-- .../apache/hadoop/hbase/ipc/RpcExecutor.java | 14 +++++-- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 38 ++++++++----------- .../hbase/ipc/TestRWQueueRpcExecutor.java | 6 ++- .../hbase/ipc/TestSimpleRpcScheduler.java | 4 +- 9 files changed, 49 insertions(+), 47 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 24bda5a6e123..4939150ecc48 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -37,16 +37,16 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { private final QueueBalancer balancer; public BalancedQueueRpcExecutor(final String name, final int handlerCount, - final int maxQueueLength, final PriorityFunction priority, final Configuration conf, + final String maxQueueLengthConfKey, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT), - maxQueueLength, priority, conf, abortable); + maxQueueLengthConfKey, priority, conf, abortable); } public BalancedQueueRpcExecutor(final String name, final int handlerCount, - final String callQueueType, final int maxQueueLength, final PriorityFunction priority, + final String callQueueType, final String maxQueueLengthConfKey, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { - super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable); + super(name, handlerCount, callQueueType, maxQueueLengthConfKey, priority, conf, abortable); initializeQueues(this.numCallQueues); this.balancer = getBalancer(name, conf, getQueues()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java index 440f4d3a8197..c6eef5f145ca 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java @@ -42,15 +42,15 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { private final Deque fastPathHandlerStack = new ConcurrentLinkedDeque<>(); public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, - final int maxQueueLength, final PriorityFunction priority, final Configuration conf, + final String maxQueueLengthConfKey, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { - super(name, handlerCount, maxQueueLength, priority, conf, abortable); + super(name, handlerCount, maxQueueLengthConfKey, priority, conf, abortable); } public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, - final String callQueueType, final int maxQueueLength, final PriorityFunction priority, + final String callQueueType, final String maxQueueLengthConfKey, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { - super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable); + super(name, handlerCount, callQueueType, maxQueueLengthConfKey, priority, conf, abortable); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java index 63436e1dd4ab..366904b4a94c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java @@ -39,9 +39,9 @@ public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor { private final Deque writeHandlerStack = new ConcurrentLinkedDeque<>(); private final Deque scanHandlerStack = new ConcurrentLinkedDeque<>(); - public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength, + public FastPathRWQueueRpcExecutor(String name, int handlerCount, String maxQueueLengthConfKey, PriorityFunction priority, Configuration conf, Abortable abortable) { - super(name, handlerCount, maxQueueLength, priority, conf, abortable); + super(name, handlerCount, maxQueueLengthConfKey, priority, conf, abortable); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java index 97c3a8765256..fa9364d5d7b3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java @@ -38,9 +38,10 @@ public class MetaRWQueueRpcExecutor extends RWQueueRpcExecutor { public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.8f; private static final float DEFAULT_META_CALL_QUEUE_SCAN_SHARE = 0.2f; - public MetaRWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength, - final PriorityFunction priority, final Configuration conf, final Abortable abortable) { - super(name, handlerCount, maxQueueLength, priority, conf, abortable); + public MetaRWQueueRpcExecutor(final String name, final int handlerCount, + final String maxQueueLengthConfKey, final PriorityFunction priority, final Configuration conf, + final Abortable abortable) { + super(name, handlerCount, maxQueueLengthConfKey, priority, conf, abortable); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 70a7b74b8e2a..54b86f878f18 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -66,9 +66,10 @@ public class RWQueueRpcExecutor extends RpcExecutor { private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0); private final AtomicInteger activeScanHandlerCount = new AtomicInteger(0); - public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength, - final PriorityFunction priority, final Configuration conf, final Abortable abortable) { - super(name, handlerCount, maxQueueLength, priority, conf, abortable); + public RWQueueRpcExecutor(final String name, final int handlerCount, + final String maxQueueLengthConfKey, final PriorityFunction priority, final Configuration conf, + final Abortable abortable) { + super(name, handlerCount, maxQueueLengthConfKey, priority, conf, abortable); float callqReadShare = getReadShare(conf); float callqScanShare = getScanShare(conf); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 15c9afe030c2..a95ace5e0a07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -117,14 +117,14 @@ public abstract class RpcExecutor { private final Configuration conf; private final Abortable abortable; - public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength, + public RpcExecutor(final String name, final int handlerCount, final String maxQueueLengthLConfKey, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT), - maxQueueLength, priority, conf, abortable); + maxQueueLengthLConfKey, priority, conf, abortable); } public RpcExecutor(final String name, final int handlerCount, final String callQueueType, - final int maxQueueLength, final PriorityFunction priority, final Configuration conf, + final String maxQueueLengthConfKey, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { this.name = Strings.nullToEmpty(name); this.conf = conf; @@ -153,6 +153,10 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ this.handlerCount = Math.max(handlerCount, this.numCallQueues); this.handlers = new ArrayList<>(this.handlerCount); + int handlerCountPerQueue = this.handlerCount / this.numCallQueues; + int maxQueueLength = conf.getInt(maxQueueLengthConfKey, + handlerCountPerQueue * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + if (isDeadlineQueueType(callQueueType)) { this.name += ".Deadline"; this.queueInitArgs = @@ -225,7 +229,9 @@ public Map getCallQueueSizeSummary() { protected void initializeQueues(final int numQueues) { if (queueInitArgs.length > 0) { currentQueueLimit = (int) queueInitArgs[0]; - queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); + queueInitArgs[0] = Math.min((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); + // queue should neven be initialised with 0 or less length + queueInitArgs[0] = Math.max((int) queueInitArgs[0], 1); } for (int i = 0; i < numQueues; ++i) { queues.add(ReflectionUtils.newInstance(queueClass, queueInitArgs)); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 92b387570317..8f0339c5772a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -67,15 +67,6 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand Abortable server, int highPriorityLevel) { int bulkLoadHandlerCount = conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT); - int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, - handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); - int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, - priorityHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); - int maxReplicationQueueLength = - conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH, - replicationHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); - int maxBulkLoadQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH, - bulkLoadHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); this.priority = priority; this.highPriorityLevel = highPriorityLevel; @@ -88,17 +79,17 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand if (callqReadShare > 0) { // at least 1 read handler and 1 write handler callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), - maxQueueLength, priority, conf, server); + RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, priority, conf, server); } else { if ( RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType) || RpcExecutor.isPluggableQueueWithFastPath(callQueueType, conf) ) { callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount, - maxQueueLength, priority, conf, server); + RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, priority, conf, server); } else { - callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength, - priority, conf, server); + callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, + RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, priority, conf, server); } } @@ -107,30 +98,31 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand MetaRWQueueRpcExecutor.DEFAULT_META_CALL_QUEUE_READ_SHARE); if (metaCallqReadShare > 0) { // different read/write handler for meta, at least 1 read handler and 1 write handler - this.priorityExecutor = new MetaRWQueueRpcExecutor("priority.RWQ", - Math.max(2, priorityHandlerCount), maxPriorityQueueLength, priority, conf, server); + this.priorityExecutor = + new MetaRWQueueRpcExecutor("priority.RWQ", Math.max(2, priorityHandlerCount), + RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, priority, conf, server); } else { // Create 2 queues to help priorityExecutor be more scalable. this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount, - RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, - abortable) + RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, + RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, priority, conf, abortable) : null; } this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount, - RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxReplicationQueueLength, priority, conf, - abortable) + RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, + RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH, priority, conf, abortable) : null; this.metaTransitionExecutor = metaTransitionHandler > 0 ? new FastPathBalancedQueueRpcExecutor("metaPriority.FPBQ", metaTransitionHandler, - RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, - abortable) + RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, + RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, priority, conf, abortable) : null; this.bulkloadExecutor = bulkLoadHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor("bulkLoad.FPBQ", bulkLoadHandlerCount, - RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxBulkLoadQueueLength, priority, conf, - abortable) + RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, + RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH, priority, conf, abortable) : null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java index 7a7f0e30f5fe..2b8ebe02bd70 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY; import static org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY; import static org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY; +import static org.apache.hadoop.hbase.ipc.RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.*; @@ -54,6 +55,7 @@ public class TestRWQueueRpcExecutor { public void setUp() { conf = HBaseConfiguration.create(); conf.setFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); + conf.setInt(IPC_SERVER_MAX_CALLQUEUE_LENGTH, 100); conf.setFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); conf.setFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); } @@ -61,8 +63,8 @@ public void setUp() { @Test public void itProvidesCorrectQueuesToBalancers() throws InterruptedException { PriorityFunction qosFunction = mock(PriorityFunction.class); - RWQueueRpcExecutor executor = - new RWQueueRpcExecutor(testName.getMethodName(), 100, 100, qosFunction, conf, null); + RWQueueRpcExecutor executor = new RWQueueRpcExecutor(testName.getMethodName(), 100, + IPC_SERVER_MAX_CALLQUEUE_LENGTH, qosFunction, conf, null); QueueBalancer readBalancer = executor.getReadBalancer(); QueueBalancer writeBalancer = executor.getWriteBalancer(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index eed7d98d7358..d632aa0c6fb3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -693,13 +693,13 @@ public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Except String name = testName.getMethodName(); int handlerCount = 1; String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE; - int maxQueueLength = 0; PriorityFunction priority = mock(PriorityFunction.class); Configuration conf = HBaseConfiguration.create(); + conf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 0); Abortable abortable = mock(Abortable.class); FastPathBalancedQueueRpcExecutor executor = Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, handlerCount, callQueueType, - maxQueueLength, priority, conf, abortable)); + RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, priority, conf, abortable)); CallRunner task = mock(CallRunner.class); assertFalse(executor.dispatch(task)); // make sure we never internally get a handler, which would skip the queue validation From 938ecba09ff796f3a39e548beea98911af50aaa1 Mon Sep 17 00:00:00 2001 From: ukumawat Date: Fri, 9 Jan 2026 01:52:38 +0530 Subject: [PATCH 2/5] HBASE-29141 Resolution for high default maxQueueLength for call queues --- .../hbase/ipc/BalancedQueueRpcExecutor.java | 8 +- .../ipc/FastPathBalancedQueueRpcExecutor.java | 8 +- .../hbase/ipc/FastPathRWQueueRpcExecutor.java | 4 +- .../hbase/ipc/MetaRWQueueRpcExecutor.java | 11 +- .../hadoop/hbase/ipc/RWQueueRpcExecutor.java | 11 +- .../apache/hadoop/hbase/ipc/RpcExecutor.java | 29 +++-- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 36 +++--- .../hbase/ipc/TestRWQueueRpcExecutor.java | 13 ++- .../hadoop/hbase/ipc/TestRpcExecutor.java | 110 ++++++++++++++++++ .../hbase/ipc/TestSimpleRpcScheduler.java | 3 +- 10 files changed, 180 insertions(+), 53 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcExecutor.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java index 4939150ecc48..24bda5a6e123 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java @@ -37,16 +37,16 @@ public class BalancedQueueRpcExecutor extends RpcExecutor { private final QueueBalancer balancer; public BalancedQueueRpcExecutor(final String name, final int handlerCount, - final String maxQueueLengthConfKey, final PriorityFunction priority, final Configuration conf, + final int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT), - maxQueueLengthConfKey, priority, conf, abortable); + maxQueueLength, priority, conf, abortable); } public BalancedQueueRpcExecutor(final String name, final int handlerCount, - final String callQueueType, final String maxQueueLengthConfKey, final PriorityFunction priority, + final String callQueueType, final int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { - super(name, handlerCount, callQueueType, maxQueueLengthConfKey, priority, conf, abortable); + super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable); initializeQueues(this.numCallQueues); this.balancer = getBalancer(name, conf, getQueues()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java index c6eef5f145ca..440f4d3a8197 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java @@ -42,15 +42,15 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { private final Deque fastPathHandlerStack = new ConcurrentLinkedDeque<>(); public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, - final String maxQueueLengthConfKey, final PriorityFunction priority, final Configuration conf, + final int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { - super(name, handlerCount, maxQueueLengthConfKey, priority, conf, abortable); + super(name, handlerCount, maxQueueLength, priority, conf, abortable); } public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, - final String callQueueType, final String maxQueueLengthConfKey, final PriorityFunction priority, + final String callQueueType, final int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { - super(name, handlerCount, callQueueType, maxQueueLengthConfKey, priority, conf, abortable); + super(name, handlerCount, callQueueType, maxQueueLength, priority, conf, abortable); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java index 366904b4a94c..63436e1dd4ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathRWQueueRpcExecutor.java @@ -39,9 +39,9 @@ public class FastPathRWQueueRpcExecutor extends RWQueueRpcExecutor { private final Deque writeHandlerStack = new ConcurrentLinkedDeque<>(); private final Deque scanHandlerStack = new ConcurrentLinkedDeque<>(); - public FastPathRWQueueRpcExecutor(String name, int handlerCount, String maxQueueLengthConfKey, + public FastPathRWQueueRpcExecutor(String name, int handlerCount, int maxQueueLength, PriorityFunction priority, Configuration conf, Abortable abortable) { - super(name, handlerCount, maxQueueLengthConfKey, priority, conf, abortable); + super(name, handlerCount, maxQueueLength, priority, conf, abortable); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java index fa9364d5d7b3..158f4a25ad97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetaRWQueueRpcExecutor.java @@ -35,13 +35,13 @@ public class MetaRWQueueRpcExecutor extends RWQueueRpcExecutor { "hbase.ipc.server.metacallqueue.scan.ratio"; public static final String META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.metacallqueue.handler.factor"; + public static final float DEFAULT_META_CALL_QUEUE_HANDLER_FACTOR = 0.5f; public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.8f; private static final float DEFAULT_META_CALL_QUEUE_SCAN_SHARE = 0.2f; - public MetaRWQueueRpcExecutor(final String name, final int handlerCount, - final String maxQueueLengthConfKey, final PriorityFunction priority, final Configuration conf, - final Abortable abortable) { - super(name, handlerCount, maxQueueLengthConfKey, priority, conf, abortable); + public MetaRWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength, + final PriorityFunction priority, final Configuration conf, final Abortable abortable) { + super(name, handlerCount, maxQueueLength, priority, conf, abortable); } @Override @@ -68,6 +68,7 @@ public boolean dispatch(CallRunner callTask) { @Override protected float getCallQueueHandlerFactor(Configuration conf) { - return conf.getFloat(META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.5f); + return conf.getFloat(META_CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, + DEFAULT_META_CALL_QUEUE_HANDLER_FACTOR); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java index 54b86f878f18..d1bb7a3f8c23 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java @@ -66,10 +66,9 @@ public class RWQueueRpcExecutor extends RpcExecutor { private final AtomicInteger activeReadHandlerCount = new AtomicInteger(0); private final AtomicInteger activeScanHandlerCount = new AtomicInteger(0); - public RWQueueRpcExecutor(final String name, final int handlerCount, - final String maxQueueLengthConfKey, final PriorityFunction priority, final Configuration conf, - final Abortable abortable) { - super(name, handlerCount, maxQueueLengthConfKey, priority, conf, abortable); + public RWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength, + final PriorityFunction priority, final Configuration conf, final Abortable abortable) { + super(name, handlerCount, maxQueueLength, priority, conf, abortable); float callqReadShare = getReadShare(conf); float callqScanShare = getScanShare(conf); @@ -98,9 +97,7 @@ public RWQueueRpcExecutor(final String name, final int handlerCount, numScanQueues = scanQueues; scanHandlersCount = scanHandlers; - initializeQueues(numWriteQueues); - initializeQueues(numReadQueues); - initializeQueues(numScanQueues); + initializeQueues(numWriteQueues + numReadQueues + numScanQueues); this.writeBalancer = getBalancer(name, conf, queues.subList(0, numWriteQueues)); this.readBalancer = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index a95ace5e0a07..10b925856b3a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -57,6 +57,7 @@ public abstract class RpcExecutor { private static final Logger LOG = LoggerFactory.getLogger(RpcExecutor.class); protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; + protected static final float DEFAULT_CALL_QUEUE_HANDLER_FACTOR = 0.1f; public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor"; @@ -117,14 +118,14 @@ public abstract class RpcExecutor { private final Configuration conf; private final Abortable abortable; - public RpcExecutor(final String name, final int handlerCount, final String maxQueueLengthLConfKey, + public RpcExecutor(final String name, final int handlerCount, final int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { this(name, handlerCount, conf.get(CALL_QUEUE_TYPE_CONF_KEY, CALL_QUEUE_TYPE_CONF_DEFAULT), - maxQueueLengthLConfKey, priority, conf, abortable); + maxQueueLength, priority, conf, abortable); } public RpcExecutor(final String name, final int handlerCount, final String callQueueType, - final String maxQueueLengthConfKey, final PriorityFunction priority, final Configuration conf, + int maxQueueLength, final PriorityFunction priority, final Configuration conf, final Abortable abortable) { this.name = Strings.nullToEmpty(name); this.conf = conf; @@ -153,9 +154,12 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ this.handlerCount = Math.max(handlerCount, this.numCallQueues); this.handlers = new ArrayList<>(this.handlerCount); - int handlerCountPerQueue = this.handlerCount / this.numCallQueues; - int maxQueueLength = conf.getInt(maxQueueLengthConfKey, - handlerCountPerQueue * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + // If soft limit of queue is not provided, then calculate using + // DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER + if (maxQueueLength == -1) { + int handlerCountPerQueue = this.handlerCount / this.numCallQueues; + maxQueueLength = handlerCountPerQueue * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER; + } if (isDeadlineQueueType(callQueueType)) { this.name += ".Deadline"; @@ -226,12 +230,15 @@ public Map getCallQueueSizeSummary() { .collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond))); } + // IMPORTANT: Call this method only ONCE per executor instance. + // Before calling: queueInitArgs[0] contains the soft limit (desired queue capacity) + // After calling: queueInitArgs[0] is set to hard limit (max(soft limit, + // DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT)) and currentQueueLimit stores the original soft limit. + // Multiple calls would incorrectly use the hard limit as the soft limit. protected void initializeQueues(final int numQueues) { if (queueInitArgs.length > 0) { currentQueueLimit = (int) queueInitArgs[0]; - queueInitArgs[0] = Math.min((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); - // queue should neven be initialised with 0 or less length - queueInitArgs[0] = Math.max((int) queueInitArgs[0], 1); + queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); } for (int i = 0; i < numQueues; ++i) { queues.add(ReflectionUtils.newInstance(queueClass, queueInitArgs)); @@ -302,7 +309,7 @@ protected void startHandlers(final String nameSuffix, final int numHandlers, */ private static final QueueBalancer ONE_QUEUE = val -> 0; - public static QueueBalancer getBalancer(final String executorName, final Configuration conf, + protected static QueueBalancer getBalancer(final String executorName, final Configuration conf, final List> queues) { Preconditions.checkArgument(queues.size() > 0, "Queue size is <= 0, must be at least 1"); if (queues.size() == 1) { @@ -476,6 +483,6 @@ public void onConfigurationChange(Configuration conf) { } protected float getCallQueueHandlerFactor(Configuration conf) { - return conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 0.1f); + return conf.getFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, DEFAULT_CALL_QUEUE_HANDLER_FACTOR); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 8f0339c5772a..e0c482e0e983 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -67,6 +67,13 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand Abortable server, int highPriorityLevel) { int bulkLoadHandlerCount = conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT); + int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, -1); + int maxPriorityQueueLength = + conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, -1); + int maxReplicationQueueLength = + conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH, -1); + int maxBulkLoadQueueLength = + conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH, -1); this.priority = priority; this.highPriorityLevel = highPriorityLevel; @@ -79,17 +86,17 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand if (callqReadShare > 0) { // at least 1 read handler and 1 write handler callExecutor = new FastPathRWQueueRpcExecutor("default.FPRWQ", Math.max(2, handlerCount), - RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, priority, conf, server); + maxQueueLength, priority, conf, server); } else { if ( RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType) || RpcExecutor.isPluggableQueueWithFastPath(callQueueType, conf) ) { callExecutor = new FastPathBalancedQueueRpcExecutor("default.FPBQ", handlerCount, - RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, priority, conf, server); + maxQueueLength, priority, conf, server); } else { - callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, - RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, priority, conf, server); + callExecutor = new BalancedQueueRpcExecutor("default.BQ", handlerCount, maxQueueLength, + priority, conf, server); } } @@ -98,31 +105,30 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand MetaRWQueueRpcExecutor.DEFAULT_META_CALL_QUEUE_READ_SHARE); if (metaCallqReadShare > 0) { // different read/write handler for meta, at least 1 read handler and 1 write handler - this.priorityExecutor = - new MetaRWQueueRpcExecutor("priority.RWQ", Math.max(2, priorityHandlerCount), - RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, priority, conf, server); + this.priorityExecutor = new MetaRWQueueRpcExecutor("priority.RWQ", + Math.max(2, priorityHandlerCount), maxPriorityQueueLength, priority, conf, server); } else { // Create 2 queues to help priorityExecutor be more scalable. this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount, - RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, - RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, priority, conf, abortable) + RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, + abortable) : null; } this.replicationExecutor = replicationHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount, - RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, - RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH, priority, conf, abortable) + RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxReplicationQueueLength, priority, conf, + abortable) : null; this.metaTransitionExecutor = metaTransitionHandler > 0 ? new FastPathBalancedQueueRpcExecutor("metaPriority.FPBQ", metaTransitionHandler, - RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, - RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, priority, conf, abortable) + RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, + abortable) : null; this.bulkloadExecutor = bulkLoadHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor("bulkLoad.FPBQ", bulkLoadHandlerCount, - RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, - RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH, priority, conf, abortable) + RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxBulkLoadQueueLength, priority, conf, + abortable) : null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java index 2b8ebe02bd70..0008ea5f44d6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRWQueueRpcExecutor.java @@ -20,10 +20,10 @@ import static org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY; import static org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY; import static org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY; -import static org.apache.hadoop.hbase.ipc.RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH; +import static org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -55,7 +55,6 @@ public class TestRWQueueRpcExecutor { public void setUp() { conf = HBaseConfiguration.create(); conf.setFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f); - conf.setInt(IPC_SERVER_MAX_CALLQUEUE_LENGTH, 100); conf.setFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f); conf.setFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0.5f); } @@ -63,8 +62,9 @@ public void setUp() { @Test public void itProvidesCorrectQueuesToBalancers() throws InterruptedException { PriorityFunction qosFunction = mock(PriorityFunction.class); + int softQueueLimit = 100; RWQueueRpcExecutor executor = new RWQueueRpcExecutor(testName.getMethodName(), 100, - IPC_SERVER_MAX_CALLQUEUE_LENGTH, qosFunction, conf, null); + softQueueLimit, qosFunction, conf, null); QueueBalancer readBalancer = executor.getReadBalancer(); QueueBalancer writeBalancer = executor.getWriteBalancer(); @@ -81,6 +81,11 @@ public void itProvidesCorrectQueuesToBalancers() throws InterruptedException { assertEquals(25, readQueues.size()); assertEquals(50, writeQueues.size()); assertEquals(25, scanQueues.size()); + assertEquals("Soft limit is not applied properly", softQueueLimit, executor.currentQueueLimit); + // Hard Limit is applied as the max capacity of the queue + int hardQueueLimit = readQueues.get(0).remainingCapacity() + readQueues.get(0).size(); + assertEquals("Default hard limit should be applied ", DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT, + hardQueueLimit); verifyDistinct(readQueues, writeQueues, scanQueues); verifyDistinct(writeQueues, readQueues, scanQueues); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcExecutor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcExecutor.java new file mode 100644 index 000000000000..0147ba1dbc7c --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcExecutor.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import static org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY; +import static org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_HANDLER_FACTOR; +import static org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT; +import static org.apache.hadoop.hbase.ipc.RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER; +import static org.mockito.Mockito.mock; + +import java.util.List; +import java.util.concurrent.BlockingQueue; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; + +@Tag(RPCTests.TAG) +@Tag(MediumTests.TAG) +public class TestRpcExecutor { + + private static Configuration conf; + + @BeforeAll + public static void setUp() { + conf = HBaseConfiguration.create(); + } + + /** + * Test that validates default soft and hard limits when maxQueueLength is not explicitly + * configured (-1). + */ + @Test + public void testDefaultQueueLimits(TestInfo testInfo) { + PriorityFunction qosFunction = mock(PriorityFunction.class); + int handlerCount = 100; + // Pass -1 to use default maxQueueLength calculation + int defaultMaxQueueLength = -1; + + BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor(testInfo.getDisplayName(), + handlerCount, defaultMaxQueueLength, qosFunction, conf, null); + + List> queues = executor.getQueues(); + int expectedQueueSize = Math.round(handlerCount * DEFAULT_CALL_QUEUE_HANDLER_FACTOR); + Assertions.assertEquals(expectedQueueSize, queues.size(), + "Number of queues should be according to default callQueueHandlerFactor"); + + // By default, the soft limit depends on number of handler the queue will serve + int expectedSoftLimit = + (handlerCount / expectedQueueSize) * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER; + Assertions.assertEquals(expectedSoftLimit, executor.currentQueueLimit, + "Soft limit of queues is wrongly calculated"); + + // Hard limit should be maximum of softLimit and DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT + int hardQueueLimit = queues.get(0).remainingCapacity() + queues.get(0).size(); + int expectedHardLimit = Math.max(expectedSoftLimit, DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); + Assertions.assertEquals(expectedHardLimit, hardQueueLimit, + "Default hard limit of queues is wrongly calculated "); + } + + /** + * Test that validates configured soft and hard limits when maxQueueLength is explicitly set. + */ + @Test + public void testConfiguredQueueLimits(TestInfo testInfo) { + float callQueueHandlerFactor = 0.2f; + conf.setFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, callQueueHandlerFactor); + PriorityFunction qosFunction = mock(PriorityFunction.class); + int handlerCount = 100; + + // Test Case 1: Configured soft limit < DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT + int maxQueueLength = 150; + BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor( + testInfo.getDisplayName() + "1", handlerCount, maxQueueLength, qosFunction, conf, null); + + Assertions.assertEquals(maxQueueLength, executor.currentQueueLimit, + "Configured soft limit is not applied."); + + List> queues1 = executor.getQueues(); + + int expectedQueueSize = Math.round(handlerCount * callQueueHandlerFactor); + Assertions.assertEquals(expectedQueueSize, queues1.size(), + "Number of queues should be according to callQueueHandlerFactor"); + + int hardQueueLimit1 = queues1.get(0).remainingCapacity() + queues1.get(0).size(); + Assertions.assertEquals(DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT, hardQueueLimit1, + "Default Hard limit is not applied"); + + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index d632aa0c6fb3..d7ff04357e9f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -693,13 +693,14 @@ public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Except String name = testName.getMethodName(); int handlerCount = 1; String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE; + int maxQueueLength = 0; PriorityFunction priority = mock(PriorityFunction.class); Configuration conf = HBaseConfiguration.create(); conf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 0); Abortable abortable = mock(Abortable.class); FastPathBalancedQueueRpcExecutor executor = Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, handlerCount, callQueueType, - RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, priority, conf, abortable)); + maxQueueLength, priority, conf, abortable)); CallRunner task = mock(CallRunner.class); assertFalse(executor.dispatch(task)); // make sure we never internally get a handler, which would skip the queue validation From 41366ce87ec25dbfedd3d1bda18b8bca2d3856f5 Mon Sep 17 00:00:00 2001 From: ukumawat Date: Fri, 9 Jan 2026 12:33:50 +0530 Subject: [PATCH 3/5] HBASE-29141 remove unwanted change --- .../java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index d7ff04357e9f..eed7d98d7358 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -696,7 +696,6 @@ public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Except int maxQueueLength = 0; PriorityFunction priority = mock(PriorityFunction.class); Configuration conf = HBaseConfiguration.create(); - conf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 0); Abortable abortable = mock(Abortable.class); FastPathBalancedQueueRpcExecutor executor = Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, handlerCount, callQueueType, From 054a1097d770dcffb97310f80317d97f25d71de0 Mon Sep 17 00:00:00 2001 From: ukumawat Date: Sun, 11 Jan 2026 22:55:54 +0530 Subject: [PATCH 4/5] HBASE-29141 use getTestMethod instead of getDisplayName in JUnits --- .../apache/hadoop/hbase/ipc/TestRpcExecutor.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcExecutor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcExecutor.java index 0147ba1dbc7c..533aa7bd6638 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcExecutor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcExecutor.java @@ -57,8 +57,9 @@ public void testDefaultQueueLimits(TestInfo testInfo) { // Pass -1 to use default maxQueueLength calculation int defaultMaxQueueLength = -1; - BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor(testInfo.getDisplayName(), - handlerCount, defaultMaxQueueLength, qosFunction, conf, null); + BalancedQueueRpcExecutor executor = + new BalancedQueueRpcExecutor(testInfo.getTestMethod().get().getName(), handlerCount, + defaultMaxQueueLength, qosFunction, conf, null); List> queues = executor.getQueues(); int expectedQueueSize = Math.round(handlerCount * DEFAULT_CALL_QUEUE_HANDLER_FACTOR); @@ -87,11 +88,11 @@ public void testConfiguredQueueLimits(TestInfo testInfo) { conf.setFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, callQueueHandlerFactor); PriorityFunction qosFunction = mock(PriorityFunction.class); int handlerCount = 100; - - // Test Case 1: Configured soft limit < DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT int maxQueueLength = 150; - BalancedQueueRpcExecutor executor = new BalancedQueueRpcExecutor( - testInfo.getDisplayName() + "1", handlerCount, maxQueueLength, qosFunction, conf, null); + + BalancedQueueRpcExecutor executor = + new BalancedQueueRpcExecutor(testInfo.getTestMethod().get().getName() + "1", handlerCount, + maxQueueLength, qosFunction, conf, null); Assertions.assertEquals(maxQueueLength, executor.currentQueueLimit, "Configured soft limit is not applied."); From 5d18bb6598dbef2ef0601a94551027f132c8f75c Mon Sep 17 00:00:00 2001 From: ukumawat Date: Wed, 14 Jan 2026 01:12:11 +0530 Subject: [PATCH 5/5] HBASE-29141 enforce max one call to initializeQueues --- .../org/apache/hadoop/hbase/ipc/RpcExecutor.java | 13 +++++++++---- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 14 ++++++++------ 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java index 10b925856b3a..eb1eda1f9f49 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java @@ -58,6 +58,7 @@ public abstract class RpcExecutor { protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250; protected static final float DEFAULT_CALL_QUEUE_HANDLER_FACTOR = 0.1f; + protected static final int UNDEFINED_MAX_CALLQUEUE_LENGTH = -1; public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor"; @@ -156,7 +157,7 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ // If soft limit of queue is not provided, then calculate using // DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER - if (maxQueueLength == -1) { + if (maxQueueLength == UNDEFINED_MAX_CALLQUEUE_LENGTH) { int handlerCountPerQueue = this.handlerCount / this.numCallQueues; maxQueueLength = handlerCountPerQueue * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER; } @@ -230,12 +231,16 @@ public Map getCallQueueSizeSummary() { .collect(Collectors.groupingBy(Pair::getFirst, Collectors.summingLong(Pair::getSecond))); } - // IMPORTANT: Call this method only ONCE per executor instance. + // This method can only be called ONCE per executor instance. // Before calling: queueInitArgs[0] contains the soft limit (desired queue capacity) - // After calling: queueInitArgs[0] is set to hard limit (max(soft limit, - // DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT)) and currentQueueLimit stores the original soft limit. + // After calling: queueInitArgs[0] is set to hard limit and currentQueueLimit stores the original + // soft limit. // Multiple calls would incorrectly use the hard limit as the soft limit. + // As all the queues has same initArgs and queueClass, there should be no need to call this again. protected void initializeQueues(final int numQueues) { + if (!queues.isEmpty()) { + throw new RuntimeException("Queues are already initialized"); + } if (queueInitArgs.length > 0) { currentQueueLimit = (int) queueInitArgs[0]; queueInitArgs[0] = Math.max((int) queueInitArgs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index e0c482e0e983..969bc5d60ddd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -67,13 +67,15 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand Abortable server, int highPriorityLevel) { int bulkLoadHandlerCount = conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT, HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT); - int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, -1); - int maxPriorityQueueLength = - conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, -1); + int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, + RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH); + int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, + RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH); int maxReplicationQueueLength = - conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH, -1); - int maxBulkLoadQueueLength = - conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH, -1); + conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH, + RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH); + int maxBulkLoadQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH, + RpcExecutor.UNDEFINED_MAX_CALLQUEUE_LENGTH); this.priority = priority; this.highPriorityLevel = highPriorityLevel;