-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-29141 Calculate default maxQueueLength call queues correctly #7490
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
630e3a9
a870cec
c1c3730
27bed5e
394ad47
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,9 +20,10 @@ | |
| import static org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_READ_SHARE_CONF_KEY; | ||
| import static org.apache.hadoop.hbase.ipc.RWQueueRpcExecutor.CALL_QUEUE_SCAN_SHARE_CONF_KEY; | ||
| import static org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY; | ||
| import static org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT; | ||
| import static org.junit.Assert.assertEquals; | ||
| import static org.junit.Assert.assertTrue; | ||
| import static org.mockito.Mockito.*; | ||
| import static org.mockito.Mockito.mock; | ||
|
|
||
| import java.util.List; | ||
| import java.util.concurrent.BlockingQueue; | ||
|
|
@@ -61,8 +62,9 @@ public void setUp() { | |
| @Test | ||
| public void itProvidesCorrectQueuesToBalancers() throws InterruptedException { | ||
| PriorityFunction qosFunction = mock(PriorityFunction.class); | ||
| RWQueueRpcExecutor executor = | ||
| new RWQueueRpcExecutor(testName.getMethodName(), 100, 100, qosFunction, conf, null); | ||
| int softQueueLimit = 100; | ||
| RWQueueRpcExecutor executor = new RWQueueRpcExecutor(testName.getMethodName(), 100, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we have a test that validates the queue length limits? Could we validate
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added a test |
||
| softQueueLimit, qosFunction, conf, null); | ||
|
|
||
| QueueBalancer readBalancer = executor.getReadBalancer(); | ||
| QueueBalancer writeBalancer = executor.getWriteBalancer(); | ||
|
|
@@ -79,6 +81,11 @@ public void itProvidesCorrectQueuesToBalancers() throws InterruptedException { | |
| assertEquals(25, readQueues.size()); | ||
| assertEquals(50, writeQueues.size()); | ||
| assertEquals(25, scanQueues.size()); | ||
| assertEquals("Soft limit is not applied properly", softQueueLimit, executor.currentQueueLimit); | ||
| // Hard Limit is applied as the max capacity of the queue | ||
| int hardQueueLimit = readQueues.get(0).remainingCapacity() + readQueues.get(0).size(); | ||
| assertEquals("Default hard limit should be applied ", DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT, | ||
| hardQueueLimit); | ||
|
|
||
| verifyDistinct(readQueues, writeQueues, scanQueues); | ||
| verifyDistinct(writeQueues, readQueues, scanQueues); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,111 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.hadoop.hbase.ipc; | ||
|
|
||
| import static org.apache.hadoop.hbase.ipc.RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY; | ||
| import static org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_HANDLER_FACTOR; | ||
| import static org.apache.hadoop.hbase.ipc.RpcExecutor.DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT; | ||
| import static org.apache.hadoop.hbase.ipc.RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER; | ||
| import static org.mockito.Mockito.mock; | ||
|
|
||
| import java.util.List; | ||
| import java.util.concurrent.BlockingQueue; | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.hbase.HBaseConfiguration; | ||
| import org.apache.hadoop.hbase.testclassification.MediumTests; | ||
| import org.apache.hadoop.hbase.testclassification.RPCTests; | ||
| import org.junit.jupiter.api.Assertions; | ||
| import org.junit.jupiter.api.BeforeAll; | ||
| import org.junit.jupiter.api.Tag; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.TestInfo; | ||
|
|
||
| @Tag(RPCTests.TAG) | ||
| @Tag(MediumTests.TAG) | ||
| public class TestRpcExecutor { | ||
|
|
||
| private static Configuration conf; | ||
|
|
||
| @BeforeAll | ||
| public static void setUp() { | ||
| conf = HBaseConfiguration.create(); | ||
| } | ||
|
|
||
| /** | ||
| * Test that validates default soft and hard limits when maxQueueLength is not explicitly | ||
| * configured (-1). | ||
| */ | ||
| @Test | ||
| public void testDefaultQueueLimits(TestInfo testInfo) { | ||
| PriorityFunction qosFunction = mock(PriorityFunction.class); | ||
| int handlerCount = 100; | ||
| // Pass -1 to use default maxQueueLength calculation | ||
| int defaultMaxQueueLength = -1; | ||
|
|
||
| BalancedQueueRpcExecutor executor = | ||
| new BalancedQueueRpcExecutor(testInfo.getTestMethod().get().getName(), handlerCount, | ||
| defaultMaxQueueLength, qosFunction, conf, null); | ||
|
|
||
| List<BlockingQueue<CallRunner>> queues = executor.getQueues(); | ||
| int expectedQueueSize = Math.round(handlerCount * DEFAULT_CALL_QUEUE_HANDLER_FACTOR); | ||
| Assertions.assertEquals(expectedQueueSize, queues.size(), | ||
| "Number of queues should be according to default callQueueHandlerFactor"); | ||
|
|
||
| // By default, the soft limit depends on number of handler the queue will serve | ||
| int expectedSoftLimit = | ||
| (handlerCount / expectedQueueSize) * DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER; | ||
| Assertions.assertEquals(expectedSoftLimit, executor.currentQueueLimit, | ||
| "Soft limit of queues is wrongly calculated"); | ||
|
|
||
| // Hard limit should be maximum of softLimit and DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT | ||
| int hardQueueLimit = queues.get(0).remainingCapacity() + queues.get(0).size(); | ||
| int expectedHardLimit = Math.max(expectedSoftLimit, DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT); | ||
| Assertions.assertEquals(expectedHardLimit, hardQueueLimit, | ||
| "Default hard limit of queues is wrongly calculated "); | ||
| } | ||
|
|
||
| /** | ||
| * Test that validates configured soft and hard limits when maxQueueLength is explicitly set. | ||
| */ | ||
| @Test | ||
| public void testConfiguredQueueLimits(TestInfo testInfo) { | ||
| float callQueueHandlerFactor = 0.2f; | ||
| conf.setFloat(CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, callQueueHandlerFactor); | ||
| PriorityFunction qosFunction = mock(PriorityFunction.class); | ||
| int handlerCount = 100; | ||
| int maxQueueLength = 150; | ||
|
|
||
| BalancedQueueRpcExecutor executor = | ||
| new BalancedQueueRpcExecutor(testInfo.getTestMethod().get().getName() + "1", handlerCount, | ||
| maxQueueLength, qosFunction, conf, null); | ||
|
|
||
| Assertions.assertEquals(maxQueueLength, executor.currentQueueLimit, | ||
| "Configured soft limit is not applied."); | ||
|
|
||
| List<BlockingQueue<CallRunner>> queues1 = executor.getQueues(); | ||
|
|
||
| int expectedQueueSize = Math.round(handlerCount * callQueueHandlerFactor); | ||
| Assertions.assertEquals(expectedQueueSize, queues1.size(), | ||
| "Number of queues should be according to callQueueHandlerFactor"); | ||
|
|
||
| int hardQueueLimit1 = queues1.get(0).remainingCapacity() + queues1.get(0).size(); | ||
| Assertions.assertEquals(DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT, hardQueueLimit1, | ||
| "Default Hard limit is not applied"); | ||
|
|
||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does seem like a useful visibility change, but must it be included here? Changing a public method of a "public" class (LimitedPrivate for COPROC, PHOENIX) is a breaking change. That being said, it's hard to imagine how anyone is using it outside of hbase... and the return class is not LimitedPrivate for COPROC... and I don't see it used in Phoenix... so 🤷♂️
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method creates the balancer for RPC queues, so I don't think it should be visible outside of the protected visibility.