From be609055411f91d82facdb9a74db6e40e21874f7 Mon Sep 17 00:00:00 2001 From: George Wu Date: Wed, 18 Sep 2024 13:15:41 -0400 Subject: [PATCH 1/3] Add maximumCapacity to taskRunner --- .../KubernetesAndWorkerTaskRunner.java | 6 ++ .../KubernetesAndWorkerTaskRunnerTest.java | 10 +++ .../indexing/overlord/RemoteTaskRunner.java | 25 ++++++ .../indexing/overlord/TaskQueryTool.java | 34 +-------- .../druid/indexing/overlord/TaskRunner.java | 9 +++ .../overlord/hrtr/HttpRemoteTaskRunner.java | 25 ++++++ .../overlord/RemoteTaskRunnerTest.java | 43 +++++++++++ .../overlord/RemoteTaskRunnerTestUtils.java | 17 ++++- .../overlord/TestProvisioningStrategy.java | 57 ++++++++++++++ .../hrtr/HttpRemoteTaskRunnerTest.java | 76 +++++++++++++++++++ 10 files changed, 268 insertions(+), 34 deletions(-) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestProvisioningStrategy.java diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java index 2c45a0ec7b89..5e50b98570f7 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java @@ -268,6 +268,12 @@ public int getTotalCapacity() return Math.max(0, k8sCapacity) + Math.max(0, workerCapacity); } + @Override + public int getMaximumCapacity() + { + return workerTaskRunner.getMaximumCapacity() + kubernetesTaskRunner.getMaximumCapacity(); + } + @Override public int getUsedCapacity() { diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java index 3ab515cc6e55..295b09bac4c2 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java @@ -371,4 +371,14 @@ public void test_updateLocation() runner.updateLocation(task, TaskLocation.unknown()); verifyAll(); } + + @Test + public void test_getMaximumCapacity() + { + EasyMock.expect(kubernetesTaskRunner.getMaximumCapacity()).andReturn(1); + EasyMock.expect(workerTaskRunner.getMaximumCapacity()).andReturn(1); + replayAll(); + Assert.assertEquals(2, runner.getMaximumCapacity()); + verifyAll(); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index a79c263ec40c..7a7017e357d8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -58,6 +58,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy; import org.apache.druid.indexing.worker.TaskAnnouncement; @@ -1648,6 +1649,30 @@ public int getTotalCapacity() return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum(); } + @Override + public int getMaximumCapacity() + { + int maximumCapacity = -1; + WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get(); + if (workerBehaviorConfig == null) { + // Auto scale not setup + log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured"); + maximumCapacity = -1; + } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) { + DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig; + if (defaultWorkerBehaviorConfig.getAutoScaler() == null) { + // Auto scale not setup + log.debug("Cannot calculate maximum worker capacity as auto scaler not configured"); + maximumCapacity = -1; + } else { + int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers(); + int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(getWorkers()); + maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity; + } + } + return maximumCapacity; + } + @Override public int getUsedCapacity() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index 4c37af7ef169..6e5183bedc4a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -21,7 +21,6 @@ import com.google.common.base.Optional; import com.google.common.base.Supplier; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.inject.Inject; @@ -33,7 +32,6 @@ import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import org.apache.druid.indexing.overlord.http.TaskStateLookup; import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse; -import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; @@ -49,7 +47,6 @@ import javax.annotation.Nullable; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -377,38 +374,9 @@ public TotalWorkerCapacityResponse getTotalWorkerCapacity() } TaskRunner taskRunner = taskRunnerOptional.get(); - Collection workers = taskRunner instanceof WorkerTaskRunner ? - ((WorkerTaskRunner) taskRunner).getWorkers() : ImmutableList.of(); - int currentCapacity = taskRunner.getTotalCapacity(); int usedCapacity = taskRunner.getUsedCapacity(); - // Calculate maximum capacity with auto scale - int maximumCapacity; - WorkerBehaviorConfig workerBehaviorConfig = getLatestWorkerConfig(); - if (workerBehaviorConfig == null) { - // Auto scale not setup - log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured"); - maximumCapacity = -1; - } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) { - DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig; - if (defaultWorkerBehaviorConfig.getAutoScaler() == null) { - // Auto scale not setup - log.debug("Cannot calculate maximum worker capacity as auto scaler not configured"); - maximumCapacity = -1; - } else { - int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers(); - int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(workers); - maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity; - } - } else { - // Auto-scale is not using DefaultWorkerBehaviorConfig - log.debug( - "Cannot calculate maximum worker capacity as WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity", - workerBehaviorConfig, - workerBehaviorConfig.getClass().getSimpleName() - ); - maximumCapacity = -1; - } + int maximumCapacity = taskRunner.getMaximumCapacity(); return new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java index ac1fd124ef55..70e8cad9a8bb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java @@ -155,6 +155,15 @@ default int getTotalCapacity() return -1; } + /** + * The maximum number of tasks this TaskRunner can run concurrently with autoscaling hints. + * Can return -1 if this method is not implemented or capacity can't be found. + */ + default int getMaximumCapacity() + { + return -1; + } + /** * The current number of tasks this TaskRunner is running. * Can return -1 if this method is not implemented or the # of tasks can't be found. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index a2edb1eb6d10..3d2cc61ecaf1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -64,6 +64,7 @@ import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig; import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig; +import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy; import org.apache.druid.indexing.worker.TaskAnnouncement; @@ -1791,6 +1792,30 @@ public int getTotalCapacity() return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum(); } + @Override + public int getMaximumCapacity() + { + int maximumCapacity = -1; + WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get(); + if (workerBehaviorConfig == null) { + // Auto scale not setup + log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured"); + maximumCapacity = -1; + } else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) { + DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig; + if (defaultWorkerBehaviorConfig.getAutoScaler() == null) { + // Auto scale not setup + log.debug("Cannot calculate maximum worker capacity as auto scaler not configured"); + maximumCapacity = -1; + } else { + int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers(); + int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(getWorkers()); + maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity; + } + } + return maximumCapacity; + } + @Override public int getUsedCapacity() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index dec98e052910..4d8839962401 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -47,6 +47,8 @@ import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig; +import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; +import org.apache.druid.indexing.overlord.setup.EqualDistributionWorkerSelectStrategy; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; @@ -153,6 +155,7 @@ public void testRun() throws Exception Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); Assert.assertEquals(3, remoteTaskRunner.getTotalCapacity()); + Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacity()); Assert.assertEquals(0, remoteTaskRunner.getUsedCapacity()); @@ -608,6 +611,46 @@ public void testRunPendingTaskTimeoutToAssign() throws Exception ); } + @Test + public void testGetMaximumCapacity_noWorkerConfig() + { + httpClient = EasyMock.createMock(HttpClient.class); + remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner( + new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD), + new TestProvisioningStrategy<>(), + httpClient, + null + ); + Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacity()); + } + + @Test + public void testGetMaximumCapacity_noAutoScaler() + { + httpClient = EasyMock.createMock(HttpClient.class); + remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner( + new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD), + new TestProvisioningStrategy<>(), + httpClient, + new DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null), null) + ); + Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacity()); + } + + @Test + public void testGetMaximumCapacity_withAutoScaler() + { + httpClient = EasyMock.createMock(HttpClient.class); + remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner( + new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD), + new TestProvisioningStrategy<>(), + httpClient, + DefaultWorkerBehaviorConfig.defaultConfig() + ); + // Default autoscaler has max workers of 0 + Assert.assertEquals(0, remoteTaskRunner.getMaximumCapacity()); + } + private void doSetup() throws Exception { makeWorker(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java index bdf886aa41bd..7c579f196763 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTestUtils.java @@ -117,6 +117,21 @@ public RemoteTaskRunner makeRemoteTaskRunner( ProvisioningStrategy provisioningStrategy, HttpClient httpClient ) + { + return makeRemoteTaskRunner( + config, + provisioningStrategy, + httpClient, + DefaultWorkerBehaviorConfig.defaultConfig() + ); + } + + public RemoteTaskRunner makeRemoteTaskRunner( + RemoteTaskRunnerConfig config, + ProvisioningStrategy provisioningStrategy, + HttpClient httpClient, + WorkerBehaviorConfig workerBehaviorConfig + ) { RemoteTaskRunner remoteTaskRunner = new TestableRemoteTaskRunner( jsonMapper, @@ -134,7 +149,7 @@ public String getBase() cf, new PathChildrenCacheFactory.Builder(), httpClient, - DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())), + DSuppliers.of(new AtomicReference<>(workerBehaviorConfig)), provisioningStrategy ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestProvisioningStrategy.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestProvisioningStrategy.java new file mode 100644 index 000000000000..18cade99b9d7 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TestProvisioningStrategy.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.apache.druid.indexing.overlord; + +import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService; +import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; +import org.apache.druid.indexing.overlord.autoscaling.ScalingStats; + +import javax.annotation.Nonnull; +import java.util.Collection; + + +public class TestProvisioningStrategy implements ProvisioningStrategy +{ + @Override + public ProvisioningService makeProvisioningService(T runner) + { + return new ProvisioningService() + { + @Override + public void close() + { + // nothing to close + } + + @Override + public ScalingStats getStats() + { + return null; + } + }; + } + + @Override + public int getExpectedWorkerCapacity(@Nonnull Collection workers) + { + return 1; + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index 1bfac9f42a38..f11d80d4e3e9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -42,11 +42,13 @@ import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.overlord.TestProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.NoopProvisioningStrategy; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig; import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; +import org.apache.druid.indexing.overlord.setup.EqualDistributionWorkerSelectStrategy; import org.apache.druid.indexing.worker.TaskAnnouncement; import org.apache.druid.indexing.worker.Worker; import org.apache.druid.indexing.worker.config.WorkerConfig; @@ -147,6 +149,7 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0", Wo Assert.assertEquals(numTasks, taskRunner.getKnownTasks().size()); Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size()); Assert.assertEquals(4, taskRunner.getTotalCapacity()); + Assert.assertEquals(-1, taskRunner.getMaximumCapacity()); Assert.assertEquals(0, taskRunner.getUsedCapacity()); } @@ -1778,6 +1781,79 @@ protected WorkerHolder createWorkerHolder( Assert.assertEquals(3, taskRunner.getWorkerSyncerDebugInfo().size()); } + @Test + public void testGetMaximumCapacity_noWorkerConfig() + { + TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) + .andReturn(druidNodeDiscovery); + EasyMock.replay(druidNodeDiscoveryProvider); + + HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( + TestHelper.makeJsonMapper(), + new HttpRemoteTaskRunnerConfig(), + EasyMock.createNiceMock(HttpClient.class), + DSuppliers.of(new AtomicReference<>(null)), + new TestProvisioningStrategy<>(), + druidNodeDiscoveryProvider, + EasyMock.createMock(TaskStorage.class), + EasyMock.createNiceMock(CuratorFramework.class), + new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), + new NoopServiceEmitter() + ); + Assert.assertEquals(-1, taskRunner.getMaximumCapacity()); + } + + @Test + public void testGetMaximumCapacity_noAutoScaler() + { + TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) + .andReturn(druidNodeDiscovery); + EasyMock.replay(druidNodeDiscoveryProvider); + + HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( + TestHelper.makeJsonMapper(), + new HttpRemoteTaskRunnerConfig(), + EasyMock.createNiceMock(HttpClient.class), + DSuppliers.of(new AtomicReference<>(new DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null), null))), + new TestProvisioningStrategy<>(), + druidNodeDiscoveryProvider, + EasyMock.createMock(TaskStorage.class), + EasyMock.createNiceMock(CuratorFramework.class), + new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), + new NoopServiceEmitter() + ); + Assert.assertEquals(-1, taskRunner.getMaximumCapacity()); + } + + @Test + public void testGetMaximumCapacity_withAutoScaler() + { + TestDruidNodeDiscovery druidNodeDiscovery = new TestDruidNodeDiscovery(); + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForService(WorkerNodeService.DISCOVERY_SERVICE_KEY)) + .andReturn(druidNodeDiscovery); + EasyMock.replay(druidNodeDiscoveryProvider); + + HttpRemoteTaskRunner taskRunner = new HttpRemoteTaskRunner( + TestHelper.makeJsonMapper(), + new HttpRemoteTaskRunnerConfig(), + EasyMock.createNiceMock(HttpClient.class), + DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())), + new TestProvisioningStrategy<>(), + druidNodeDiscoveryProvider, + EasyMock.createMock(TaskStorage.class), + EasyMock.createNiceMock(CuratorFramework.class), + new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), + new NoopServiceEmitter() + ); + // Default autoscaler has max workers of 0 + Assert.assertEquals(0, taskRunner.getMaximumCapacity()); + } + public static HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated( TaskStorage taskStorage, List listenerNotificationsAccumulator From a9242afbdc2b9f33bf209b61f19552b01813a79b Mon Sep 17 00:00:00 2001 From: George Wu Date: Thu, 19 Sep 2024 14:10:45 -0400 Subject: [PATCH 2/3] fix tests --- .../overlord/http/OverlordResourceTest.java | 137 ++---------------- 1 file changed, 14 insertions(+), 123 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index c13a15717656..b5563b498e68 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -43,7 +43,6 @@ import org.apache.druid.indexing.common.task.NoopTask; import org.apache.druid.indexing.common.task.Task; import org.apache.druid.indexing.overlord.DruidOverlord; -import org.apache.druid.indexing.overlord.ImmutableWorkerInfo; import org.apache.druid.indexing.overlord.IndexerMetadataStorageAdapter; import org.apache.druid.indexing.overlord.TaskLockbox; import org.apache.druid.indexing.overlord.TaskMaster; @@ -52,14 +51,9 @@ import org.apache.druid.indexing.overlord.TaskRunner; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskStorage; -import org.apache.druid.indexing.overlord.WorkerTaskRunner; import org.apache.druid.indexing.overlord.WorkerTaskRunnerQueryAdapter; -import org.apache.druid.indexing.overlord.autoscaling.AutoScaler; import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy; -import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig; import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig; -import org.apache.druid.indexing.worker.Worker; -import org.apache.druid.indexing.worker.config.WorkerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.RE; @@ -1321,6 +1315,7 @@ public void testGetTotalWorkerCapacityWithUnknown() .andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); + EasyMock.expect(taskRunner.getMaximumCapacity()).andReturn(-1); EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); @@ -1332,130 +1327,26 @@ public void testGetTotalWorkerCapacityWithUnknown() } @Test - public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButWorkerBehaviorConfigNotConfigured() - { - AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(null); - EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); - EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); - EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); - EasyMock.expect(overlord.isLeader()).andReturn(true); - replayAll(); - - final Response response = overlordResource.getTotalWorkerCapacity(); - Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); - Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); - Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); - Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); - } - - @Test - public void testGetTotalWorkerCapacityWithWorkerTaskRunnerButAutoScaleNotConfigured() - { - DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, null); - AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); - EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); - EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); - EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); - EasyMock.expect(overlord.isLeader()).andReturn(true); - replayAll(); - - final Response response = overlordResource.getTotalWorkerCapacity(); - Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); - Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); - Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); - Assert.assertEquals(-1, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); - } - - @Test - public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategySupportExpectedWorkerCapacity() + public void testGetTotalWorkerCapacityWithMaximumCapacity() { int expectedWorkerCapacity = 3; - int maxNumWorkers = 2; - WorkerTaskRunner workerTaskRunner = EasyMock.createMock(WorkerTaskRunner.class); - Collection workerInfos = ImmutableList.of( - new ImmutableWorkerInfo( - new Worker( - "http", "testWorker", "192.0.0.1", expectedWorkerCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY - ), - 2, - ImmutableSet.of("grp1", "grp2"), - ImmutableSet.of("task1", "task2"), - DateTimes.of("2015-01-01T01:01:01Z") - ) - ); - EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos); - EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(expectedWorkerCapacity); - EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(0); - - EasyMock.reset(taskMaster); - EasyMock.expect(taskMaster.getTaskRunner()).andReturn( - Optional.of(workerTaskRunner) - ).anyTimes(); - EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(expectedWorkerCapacity).anyTimes(); - AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); - EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers); - DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, autoScaler); - AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); - EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); - EasyMock.replay( - workerTaskRunner, - autoScaler - ); + int expectedWorkerCapacityWithAutoscale = 10; + WorkerBehaviorConfig workerBehaviorConfig = EasyMock.createMock(WorkerBehaviorConfig.class); + AtomicReference workerBehaviorConfigAtomicReference + = new AtomicReference<>(workerBehaviorConfig); + EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)) + .andReturn(workerBehaviorConfigAtomicReference); + EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(expectedWorkerCapacity); + EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(expectedWorkerCapacity); + EasyMock.expect(taskRunner.getMaximumCapacity()).andReturn(expectedWorkerCapacityWithAutoscale); EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); - final Response response = overlordResource.getTotalWorkerCapacity(); - Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); - Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); - Assert.assertEquals(0, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); - Assert.assertEquals(expectedWorkerCapacity * maxNumWorkers, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); - } - @Test - public void testGetTotalWorkerCapacityWithAutoScaleConfiguredAndProvisioningStrategyNotSupportExpectedWorkerCapacity() - { - int invalidExpectedCapacity = -1; - int currentTotalCapacity = 3; - int currentCapacityUsed = 2; - int maxNumWorkers = 2; - WorkerTaskRunner workerTaskRunner = EasyMock.createMock(WorkerTaskRunner.class); - Collection workerInfos = ImmutableList.of( - new ImmutableWorkerInfo( - new Worker( - "http", "testWorker", "192.0.0.1", currentTotalCapacity, "v1", WorkerConfig.DEFAULT_CATEGORY - ), - currentCapacityUsed, - ImmutableSet.of("grp1", "grp2"), - ImmutableSet.of("task1", "task2"), - DateTimes.of("2015-01-01T01:01:01Z") - ) - ); - EasyMock.expect(workerTaskRunner.getWorkers()).andReturn(workerInfos); - EasyMock.expect(workerTaskRunner.getTotalCapacity()).andReturn(currentTotalCapacity); - EasyMock.expect(workerTaskRunner.getUsedCapacity()).andReturn(currentCapacityUsed); - EasyMock.reset(taskMaster); - EasyMock.expect(taskMaster.getTaskRunner()).andReturn( - Optional.of(workerTaskRunner) - ).anyTimes(); - EasyMock.expect(provisioningStrategy.getExpectedWorkerCapacity(workerInfos)).andReturn(invalidExpectedCapacity).anyTimes(); - AutoScaler autoScaler = EasyMock.createMock(AutoScaler.class); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); - EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(maxNumWorkers); - DefaultWorkerBehaviorConfig workerBehaviorConfig = new DefaultWorkerBehaviorConfig(null, autoScaler); - AtomicReference workerBehaviorConfigAtomicReference = new AtomicReference<>(workerBehaviorConfig); - EasyMock.expect(configManager.watch(WorkerBehaviorConfig.CONFIG_KEY, WorkerBehaviorConfig.class)).andReturn(workerBehaviorConfigAtomicReference); - EasyMock.replay( - workerTaskRunner, - autoScaler - ); - EasyMock.expect(overlord.isLeader()).andReturn(true); - replayAll(); final Response response = overlordResource.getTotalWorkerCapacity(); Assert.assertEquals(HttpResponseStatus.OK.getCode(), response.getStatus()); - Assert.assertEquals(workerInfos.stream().findFirst().get().getWorker().getCapacity(), ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); - Assert.assertEquals(invalidExpectedCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); - Assert.assertEquals(currentTotalCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); - Assert.assertEquals(currentCapacityUsed, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); + Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getCurrentClusterCapacity()); + Assert.assertEquals(expectedWorkerCapacity, ((TotalWorkerCapacityResponse) response.getEntity()).getUsedClusterCapacity()); + Assert.assertEquals(expectedWorkerCapacityWithAutoscale, ((TotalWorkerCapacityResponse) response.getEntity()).getMaximumCapacityWithAutoScale()); } @Test From f38df0543a3e26aaaa7c7e560dfcdba49afdaf56 Mon Sep 17 00:00:00 2001 From: George Wu Date: Tue, 1 Oct 2024 10:48:09 -0400 Subject: [PATCH 3/3] pr comments --- .../k8s/overlord/KubernetesAndWorkerTaskRunner.java | 4 ++-- .../overlord/KubernetesAndWorkerTaskRunnerTest.java | 6 +++--- .../druid/indexing/overlord/RemoteTaskRunner.java | 7 ++++++- .../apache/druid/indexing/overlord/TaskQueryTool.java | 10 +++++----- .../org/apache/druid/indexing/overlord/TaskRunner.java | 4 ++-- .../indexing/overlord/hrtr/HttpRemoteTaskRunner.java | 8 +++++++- .../druid/indexing/overlord/RemoteTaskRunnerTest.java | 8 ++++---- .../overlord/hrtr/HttpRemoteTaskRunnerTest.java | 8 ++++---- .../indexing/overlord/http/OverlordResourceTest.java | 4 ++-- 9 files changed, 35 insertions(+), 24 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java index 5e50b98570f7..767afaa2fd3e 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunner.java @@ -269,9 +269,9 @@ public int getTotalCapacity() } @Override - public int getMaximumCapacity() + public int getMaximumCapacityWithAutoscale() { - return workerTaskRunner.getMaximumCapacity() + kubernetesTaskRunner.getMaximumCapacity(); + return workerTaskRunner.getMaximumCapacityWithAutoscale() + kubernetesTaskRunner.getMaximumCapacityWithAutoscale(); } @Override diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java index 295b09bac4c2..1cc3be34e383 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/KubernetesAndWorkerTaskRunnerTest.java @@ -375,10 +375,10 @@ public void test_updateLocation() @Test public void test_getMaximumCapacity() { - EasyMock.expect(kubernetesTaskRunner.getMaximumCapacity()).andReturn(1); - EasyMock.expect(workerTaskRunner.getMaximumCapacity()).andReturn(1); + EasyMock.expect(kubernetesTaskRunner.getMaximumCapacityWithAutoscale()).andReturn(1); + EasyMock.expect(workerTaskRunner.getMaximumCapacityWithAutoscale()).andReturn(1); replayAll(); - Assert.assertEquals(2, runner.getMaximumCapacity()); + Assert.assertEquals(2, runner.getMaximumCapacityWithAutoscale()); verifyAll(); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java index 7a7017e357d8..1f6f8e64e793 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/RemoteTaskRunner.java @@ -1649,8 +1649,13 @@ public int getTotalCapacity() return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum(); } + /** + * Retrieves the maximum capacity of the task runner when autoscaling is enabled.* + * @return The maximum capacity as an integer value. Returns -1 if the maximum + * capacity cannot be determined or if autoscaling is not enabled. + */ @Override - public int getMaximumCapacity() + public int getMaximumCapacityWithAutoscale() { int maximumCapacity = -1; WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java index 6e5183bedc4a..3613b6fa08d2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskQueryTool.java @@ -374,11 +374,11 @@ public TotalWorkerCapacityResponse getTotalWorkerCapacity() } TaskRunner taskRunner = taskRunnerOptional.get(); - int currentCapacity = taskRunner.getTotalCapacity(); - int usedCapacity = taskRunner.getUsedCapacity(); - int maximumCapacity = taskRunner.getMaximumCapacity(); - - return new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity); + return new TotalWorkerCapacityResponse( + taskRunner.getTotalCapacity(), + taskRunner.getMaximumCapacityWithAutoscale(), + taskRunner.getUsedCapacity() + ); } public WorkerBehaviorConfig getLatestWorkerConfig() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java index 70e8cad9a8bb..2178bc433dfd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunner.java @@ -157,9 +157,9 @@ default int getTotalCapacity() /** * The maximum number of tasks this TaskRunner can run concurrently with autoscaling hints. - * Can return -1 if this method is not implemented or capacity can't be found. + * @return -1 if this method is not implemented or capacity can't be found. */ - default int getMaximumCapacity() + default int getMaximumCapacityWithAutoscale() { return -1; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 3d2cc61ecaf1..72bd9cff174c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -1792,8 +1792,14 @@ public int getTotalCapacity() return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum(); } + + /** + * Retrieves the maximum capacity of the task runner when autoscaling is enabled.* + * @return The maximum capacity as an integer value. Returns -1 if the maximum + * capacity cannot be determined or if autoscaling is not enabled. + */ @Override - public int getMaximumCapacity() + public int getMaximumCapacityWithAutoscale() { int maximumCapacity = -1; WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java index 4d8839962401..c2edd5450931 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -155,7 +155,7 @@ public void testRun() throws Exception Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue()); Assert.assertEquals(3, remoteTaskRunner.getTotalCapacity()); - Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacity()); + Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacityWithAutoscale()); Assert.assertEquals(0, remoteTaskRunner.getUsedCapacity()); @@ -621,7 +621,7 @@ public void testGetMaximumCapacity_noWorkerConfig() httpClient, null ); - Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacity()); + Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacityWithAutoscale()); } @Test @@ -634,7 +634,7 @@ public void testGetMaximumCapacity_noAutoScaler() httpClient, new DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null), null) ); - Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacity()); + Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacityWithAutoscale()); } @Test @@ -648,7 +648,7 @@ public void testGetMaximumCapacity_withAutoScaler() DefaultWorkerBehaviorConfig.defaultConfig() ); // Default autoscaler has max workers of 0 - Assert.assertEquals(0, remoteTaskRunner.getMaximumCapacity()); + Assert.assertEquals(0, remoteTaskRunner.getMaximumCapacityWithAutoscale()); } private void doSetup() throws Exception diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index f11d80d4e3e9..91b0778c9505 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -149,7 +149,7 @@ WorkerNodeService.DISCOVERY_SERVICE_KEY, new WorkerNodeService("ip2", 2, "0", Wo Assert.assertEquals(numTasks, taskRunner.getKnownTasks().size()); Assert.assertEquals(numTasks, taskRunner.getCompletedTasks().size()); Assert.assertEquals(4, taskRunner.getTotalCapacity()); - Assert.assertEquals(-1, taskRunner.getMaximumCapacity()); + Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale()); Assert.assertEquals(0, taskRunner.getUsedCapacity()); } @@ -1802,7 +1802,7 @@ public void testGetMaximumCapacity_noWorkerConfig() new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ); - Assert.assertEquals(-1, taskRunner.getMaximumCapacity()); + Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale()); } @Test @@ -1826,7 +1826,7 @@ public void testGetMaximumCapacity_noAutoScaler() new IndexerZkConfig(new ZkPathsConfig(), null, null, null, null), new NoopServiceEmitter() ); - Assert.assertEquals(-1, taskRunner.getMaximumCapacity()); + Assert.assertEquals(-1, taskRunner.getMaximumCapacityWithAutoscale()); } @Test @@ -1851,7 +1851,7 @@ public void testGetMaximumCapacity_withAutoScaler() new NoopServiceEmitter() ); // Default autoscaler has max workers of 0 - Assert.assertEquals(0, taskRunner.getMaximumCapacity()); + Assert.assertEquals(0, taskRunner.getMaximumCapacityWithAutoscale()); } public static HttpRemoteTaskRunner createTaskRunnerForTestTaskAddedOrUpdated( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java index b5563b498e68..732d586002f8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/OverlordResourceTest.java @@ -1315,7 +1315,7 @@ public void testGetTotalWorkerCapacityWithUnknown() .andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(-1); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(-1); - EasyMock.expect(taskRunner.getMaximumCapacity()).andReturn(-1); + EasyMock.expect(taskRunner.getMaximumCapacityWithAutoscale()).andReturn(-1); EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll(); @@ -1338,7 +1338,7 @@ public void testGetTotalWorkerCapacityWithMaximumCapacity() .andReturn(workerBehaviorConfigAtomicReference); EasyMock.expect(taskRunner.getTotalCapacity()).andReturn(expectedWorkerCapacity); EasyMock.expect(taskRunner.getUsedCapacity()).andReturn(expectedWorkerCapacity); - EasyMock.expect(taskRunner.getMaximumCapacity()).andReturn(expectedWorkerCapacityWithAutoscale); + EasyMock.expect(taskRunner.getMaximumCapacityWithAutoscale()).andReturn(expectedWorkerCapacityWithAutoscale); EasyMock.expect(overlord.isLeader()).andReturn(true); replayAll();