From 789ea9ccdc9f85813a85ec971050087d19975334 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 13 Jul 2021 18:06:01 +0700 Subject: [PATCH 1/9] fix pendingTaskBased --- docs/configuration/index.md | 1 + ...dingTaskBasedWorkerProvisioningConfig.java | 12 + ...ngTaskBasedWorkerProvisioningStrategy.java | 32 ++- ...dingTaskBasedProvisioningStrategyTest.java | 245 +++++++++++++++++- 4 files changed, 280 insertions(+), 10 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index f01079254dcb..d5f56526cb08 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1015,6 +1015,7 @@ There are additional configs for autoscaling (if it is enabled): |`druid.indexer.autoscale.pendingTaskTimeout`|How long a task can be in "pending" state before the Overlord tries to scale up.|PT30S| |`druid.indexer.autoscale.workerVersion`|If set, will only create nodes of set version during autoscaling. Overrides dynamic configuration. |null| |`druid.indexer.autoscale.workerPort`|The port that MiddleManagers will run on.|8080| +|`druid.indexer.autoscale.workerCapacityFallback`| Worker capcity for determining number of workers needed for auto scaling when there are currently no worker running. If unset or null, auto scaler will scale to `minNumWorkers` in autoScaler config instead. Note: this config is only applicable to `pendingTaskBased` provisioning strategy|null| ##### Supervisors diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java index 3ef70e936788..086d0f609427 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java @@ -29,6 +29,8 @@ public class PendingTaskBasedWorkerProvisioningConfig extends SimpleWorkerProvis @JsonProperty private int maxScalingStep = 10; + @JsonProperty + private Integer workerCapacityFallback = null; public int getMaxScalingStep() { @@ -76,4 +78,14 @@ public PendingTaskBasedWorkerProvisioningConfig setPendingTaskTimeout(Period pen return this; } + public Integer getWorkerCapacityFallback() + { + return workerCapacityFallback; + } + + public PendingTaskBasedWorkerProvisioningConfig setWorkerCapacityFallback(Integer workerCapacityFallback) + { + this.workerCapacityFallback = workerCapacityFallback; + return this; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java index cf562f1dfa3b..a4117943be21 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java @@ -246,13 +246,20 @@ private int getScaleUpNodeCount( log.info("Min/max workers: %d/%d", minWorkerCount, maxWorkerCount); final int currValidWorkers = getCurrValidWorkers(workers); - // If there are no worker, spin up minWorkerCount, we cannot determine the exact capacity here to fulfill the need - // since we are not aware of the expectedWorkerCapacity. - int moreWorkersNeeded = currValidWorkers == 0 ? minWorkerCount : getWorkersNeededToAssignTasks( + // If there are no worker, try to determine worker capacity from config. + Integer workerCapacityFromConfig = null; + if (currValidWorkers == 0) { + workerCapacityFromConfig = config.getWorkerCapacityFallback(); + } + + // If there are no worker and workerCapacityFallback is not set in config then spin up minWorkerCount, + // as we cannot determine the exact capacity here to fulfill the need + int moreWorkersNeeded = currValidWorkers == 0 && workerCapacityFromConfig == null ? minWorkerCount : getWorkersNeededToAssignTasks( remoteTaskRunnerConfig, workerConfig, pendingTasks, - workers + workers, + workerCapacityFromConfig ); log.debug("More workers needed: %d", moreWorkersNeeded); @@ -280,7 +287,8 @@ private int getWorkersNeededToAssignTasks( final WorkerTaskRunnerConfig workerTaskRunnerConfig, final DefaultWorkerBehaviorConfig workerConfig, final Collection pendingTasks, - final Collection workers + final Collection workers, + final Integer workerCapacityFallback ) { final Collection validWorkers = Collections2.filter( @@ -295,7 +303,7 @@ private int getWorkersNeededToAssignTasks( } WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy(); int need = 0; - int capacity = getExpectedWorkerCapacity(workers); + int capacity = getExpectedWorkerCapacity(workers, workerCapacityFallback); log.info("Expected worker capacity: %d", capacity); // Simulate assigning tasks to dummy workers using configured workerSelectStrategy @@ -441,12 +449,18 @@ private int getCurrValidWorkers(Collection workers) return currValidWorkers; } - private static int getExpectedWorkerCapacity(final Collection workers) + private static int getExpectedWorkerCapacity(final Collection workers, final Integer workerCapacityFallback) { int size = workers.size(); if (size == 0) { - // No existing workers assume capacity per worker as 1 - return 1; + // No existing workers + if (workerCapacityFallback != null) { + // Return workerCapacityFallback if it is set in config + return workerCapacityFallback; + } else { + // Assume capacity per worker as 1 + return 1; + } } else { // Assume all workers have same capacity return workers.iterator().next().getWorker().getCapacity(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java index 45207940668e..3c010f617ba9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java @@ -20,6 +20,7 @@ package org.apache.druid.indexing.overlord.autoscaling; import com.google.common.base.Supplier; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.common.guava.DSuppliers; import org.apache.druid.indexer.TaskLocation; @@ -137,6 +138,101 @@ public void testSuccessfulInitialMinWorkersProvision() } } + @Test + public void testProvisionNoCurrentlyRunningWorkerWithCapacityFallbackSetAndNoPendingTaskShouldProvisionMinimumAsCurrentIsBelowMinimum() + { + PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() + .setMaxScalingDuration(new Period(1000)) + .setNumEventsToTrack(10) + .setPendingTaskTimeout(new Period(0)) + .setWorkerVersion(MIN_VERSION) + .setMaxScalingStep(2) + .setWorkerCapacityFallback(30); + strategy = new PendingTaskBasedWorkerProvisioningStrategy( + config, + DSuppliers.of(workerConfig), + new ProvisioningSchedulerConfig(), + new Supplier() + { + @Override + public ScheduledExecutorService get() + { + return executorService; + } + } + ); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) + .andReturn(new ArrayList()); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + // No pending tasks + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + new ArrayList<>() + ); + EasyMock.expect(runner.getWorkers()).andReturn( + Collections.emptyList() + ); + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); + EasyMock.expect(autoScaler.provision()).andReturn( + new AutoScalingData(Collections.singletonList("aNode")) + ).times(3); + EasyMock.replay(runner, autoScaler); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); + Assert.assertTrue(provisionedSomething); + Assert.assertTrue(provisioner.getStats().toList().size() == 3); + for (ScalingStats.ScalingEvent event : provisioner.getStats().toList()) { + Assert.assertTrue( + event.getEvent() == ScalingStats.EVENT.PROVISION + ); + } + } + + @Test + public void testProvisionNoCurrentlyRunningWorkerWithCapacityFallbackSetAndNoPendingTaskShouldNotProvisionAsMinimumIsZero() + { + PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() + .setMaxScalingDuration(new Period(1000)) + .setNumEventsToTrack(10) + .setPendingTaskTimeout(new Period(0)) + .setWorkerVersion(MIN_VERSION) + .setMaxScalingStep(2) + .setWorkerCapacityFallback(30); + strategy = new PendingTaskBasedWorkerProvisioningStrategy( + config, + DSuppliers.of(workerConfig), + new ProvisioningSchedulerConfig(), + new Supplier() + { + @Override + public ScheduledExecutorService get() + { + return executorService; + } + } + ); + // minWorkerCount is 0 + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) + .andReturn(new ArrayList()); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + // No pending tasks + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + new ArrayList<>() + ); + EasyMock.expect(runner.getWorkers()).andReturn( + Collections.emptyList() + ); + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()); + EasyMock.replay(runner, autoScaler); + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); + Assert.assertFalse(provisionedSomething); + Assert.assertEquals(0, provisioner.getStats().toList().size()); + } + @Test public void testSuccessfulMinWorkersProvision() { @@ -207,7 +303,7 @@ public void testSuccessfulMinWorkersProvisionWithOldVersionNodeRunning() } @Test - public void testSomethingProvisioning() + public void testProvisioning() { EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1); @@ -257,6 +353,153 @@ public void testSomethingProvisioning() EasyMock.verify(runner); } + @Test + public void testProvisionWithPendingTaskAndWorkerCapacityFallbackSetButNonEmptyCurrentlyRunningWorkerShouldUseCapcityFromRunningWorker() + { + PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() + .setMaxScalingDuration(new Period(1000)) + .setNumEventsToTrack(10) + .setPendingTaskTimeout(new Period(0)) + .setWorkerVersion(MIN_VERSION) + .setMaxScalingStep(2) + .setWorkerCapacityFallback(30); + strategy = new PendingTaskBasedWorkerProvisioningStrategy( + config, + DSuppliers.of(workerConfig), + new ProvisioningSchedulerConfig(), + new Supplier() + { + @Override + public ScheduledExecutorService get() + { + return executorService; + } + } + ); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) + .andReturn(new ArrayList()).times(2); + EasyMock.expect(autoScaler.provision()).andReturn( + new AutoScalingData(Collections.singletonList("fake")) + ).times(2); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + // two pending tasks + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + ImmutableList.of( + NoopTask.create(), + NoopTask.create() + ) + ).times(2); + // Capacity for current worker is 1 + EasyMock.expect(runner.getWorkers()).andReturn( + Arrays.asList( + new TestZkWorker(testTask).toImmutable(), + new TestZkWorker(testTask, "http", "h1", "n1", INVALID_VERSION).toImmutable() // Invalid version node + ) + ).times(2); + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1); + EasyMock.replay(runner); + EasyMock.replay(autoScaler); + + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); + + // Expect to use capacity from current worker (which is 1) + // and since there are two pending tasks, we will need two more workers + Assert.assertTrue(provisionedSomething); + Assert.assertEquals(2, provisioner.getStats().toList().size()); + DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp(); + Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(0).getEvent()); + Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(1).getEvent()); + + provisionedSomething = provisioner.doProvision(); + + Assert.assertFalse(provisionedSomething); + Assert.assertTrue( + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + ); + DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue( + createdTime.equals(anotherCreatedTime) + ); + + EasyMock.verify(autoScaler); + EasyMock.verify(runner); + } + + @Test + public void testProvisionWithPendingTaskAndWorkerCapacityFallbackSetButEmptyCurrentlyRunningWorkerShouldUseCapcityFromFallbackConfig() + { + PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() + .setMaxScalingDuration(new Period(1000)) + .setNumEventsToTrack(10) + .setPendingTaskTimeout(new Period(0)) + .setWorkerVersion(MIN_VERSION) + .setMaxScalingStep(2) + .setWorkerCapacityFallback(30); + strategy = new PendingTaskBasedWorkerProvisioningStrategy( + config, + DSuppliers.of(workerConfig), + new ProvisioningSchedulerConfig(), + new Supplier() + { + @Override + public ScheduledExecutorService get() + { + return executorService; + } + } + ); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1); + EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1); + EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) + .andReturn(new ArrayList()).times(2); + EasyMock.expect(autoScaler.provision()).andReturn( + new AutoScalingData(Collections.singletonList("fake")) + ).times(1); + RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); + // two pending tasks + EasyMock.expect(runner.getPendingTaskPayloads()).andReturn( + ImmutableList.of( + NoopTask.create(), + NoopTask.create() + ) + ).times(2); + // No currently running worker node + EasyMock.expect(runner.getWorkers()).andReturn( + Collections.emptyList() + ).times(2); + + EasyMock.expect(runner.getConfig()).andReturn(new RemoteTaskRunnerConfig()).times(1); + EasyMock.replay(runner); + EasyMock.replay(autoScaler); + + Provisioner provisioner = strategy.makeProvisioner(runner); + boolean provisionedSomething = provisioner.doProvision(); + + // Expect to use capacity from workerCapacityFallback config (which is 30) + // and since there are two pending tasks, we will need one more worker + Assert.assertTrue(provisionedSomething); + Assert.assertEquals(1, provisioner.getStats().toList().size()); + DateTime createdTime = provisioner.getStats().toList().get(0).getTimestamp(); + Assert.assertEquals(ScalingStats.EVENT.PROVISION, provisioner.getStats().toList().get(0).getEvent()); + + provisionedSomething = provisioner.doProvision(); + + Assert.assertFalse(provisionedSomething); + Assert.assertTrue( + provisioner.getStats().toList().get(0).getEvent() == ScalingStats.EVENT.PROVISION + ); + DateTime anotherCreatedTime = provisioner.getStats().toList().get(0).getTimestamp(); + Assert.assertTrue( + createdTime.equals(anotherCreatedTime) + ); + + EasyMock.verify(autoScaler); + EasyMock.verify(runner); + } + @Test public void testProvisionAlert() throws Exception { From b1d132e1e3c30c2f0ed50116800c4b1086498c20 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Tue, 13 Jul 2021 23:56:07 +0700 Subject: [PATCH 2/9] fix doc --- docs/configuration/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index d5f56526cb08..a25d8694b946 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1015,7 +1015,7 @@ There are additional configs for autoscaling (if it is enabled): |`druid.indexer.autoscale.pendingTaskTimeout`|How long a task can be in "pending" state before the Overlord tries to scale up.|PT30S| |`druid.indexer.autoscale.workerVersion`|If set, will only create nodes of set version during autoscaling. Overrides dynamic configuration. |null| |`druid.indexer.autoscale.workerPort`|The port that MiddleManagers will run on.|8080| -|`druid.indexer.autoscale.workerCapacityFallback`| Worker capcity for determining number of workers needed for auto scaling when there are currently no worker running. If unset or null, auto scaler will scale to `minNumWorkers` in autoScaler config instead. Note: this config is only applicable to `pendingTaskBased` provisioning strategy|null| +|`druid.indexer.autoscale.workerCapacityFallback`| Worker capacity for determining the number of workers needed for auto scaling when there is currently no worker running. If unset or null, auto scaler will scale to `minNumWorkers` in autoScaler config instead. Note: this config is only applicable to `pendingTaskBased` provisioning strategy|null| ##### Supervisors From 2459031aca90cb24f43efcc3eeca67619d4389a3 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Wed, 14 Jul 2021 02:28:27 +0700 Subject: [PATCH 3/9] address comments --- docs/configuration/index.md | 2 +- ...PendingTaskBasedWorkerProvisioningConfig.java | 6 +++--- ...ndingTaskBasedWorkerProvisioningStrategy.java | 16 ++++++---------- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index a25d8694b946..c42252d5609e 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1015,7 +1015,7 @@ There are additional configs for autoscaling (if it is enabled): |`druid.indexer.autoscale.pendingTaskTimeout`|How long a task can be in "pending" state before the Overlord tries to scale up.|PT30S| |`druid.indexer.autoscale.workerVersion`|If set, will only create nodes of set version during autoscaling. Overrides dynamic configuration. |null| |`druid.indexer.autoscale.workerPort`|The port that MiddleManagers will run on.|8080| -|`druid.indexer.autoscale.workerCapacityFallback`| Worker capacity for determining the number of workers needed for auto scaling when there is currently no worker running. If unset or null, auto scaler will scale to `minNumWorkers` in autoScaler config instead. Note: this config is only applicable to `pendingTaskBased` provisioning strategy|null| +|`druid.indexer.autoscale.workerCapacityFallback`| Worker capacity for determining the number of workers needed for auto scaling when there is currently no worker running. If unset or set to value of 0 or less, auto scaler will scale to `minNumWorkers` in autoScaler config instead. Note: this config is only applicable to `pendingTaskBased` provisioning strategy|-1| ##### Supervisors diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java index 086d0f609427..4a5830d4dbe9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java @@ -30,7 +30,7 @@ public class PendingTaskBasedWorkerProvisioningConfig extends SimpleWorkerProvis private int maxScalingStep = 10; @JsonProperty - private Integer workerCapacityFallback = null; + private int workerCapacityFallback = -1; public int getMaxScalingStep() { @@ -78,12 +78,12 @@ public PendingTaskBasedWorkerProvisioningConfig setPendingTaskTimeout(Period pen return this; } - public Integer getWorkerCapacityFallback() + public int getWorkerCapacityFallback() { return workerCapacityFallback; } - public PendingTaskBasedWorkerProvisioningConfig setWorkerCapacityFallback(Integer workerCapacityFallback) + public PendingTaskBasedWorkerProvisioningConfig setWorkerCapacityFallback(int workerCapacityFallback) { this.workerCapacityFallback = workerCapacityFallback; return this; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java index a4117943be21..48139d910c8f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java @@ -246,20 +246,16 @@ private int getScaleUpNodeCount( log.info("Min/max workers: %d/%d", minWorkerCount, maxWorkerCount); final int currValidWorkers = getCurrValidWorkers(workers); - // If there are no worker, try to determine worker capacity from config. - Integer workerCapacityFromConfig = null; - if (currValidWorkers == 0) { - workerCapacityFromConfig = config.getWorkerCapacityFallback(); - } - - // If there are no worker and workerCapacityFallback is not set in config then spin up minWorkerCount, - // as we cannot determine the exact capacity here to fulfill the need - int moreWorkersNeeded = currValidWorkers == 0 && workerCapacityFromConfig == null ? minWorkerCount : getWorkersNeededToAssignTasks( + // If there are no worker and workerCapacityFallback config is not set (-1) then spin up minWorkerCount + // as we cannot determine the exact capacity here to fulfill the need. + // However, if there are no worker but workerCapacityFallback config is set (>0), then we can + // determine the number of workers needed using workerCapacityFallback config as expected worker capacity + int moreWorkersNeeded = currValidWorkers == 0 && config.getWorkerCapacityFallback() > 0 ? minWorkerCount : getWorkersNeededToAssignTasks( remoteTaskRunnerConfig, workerConfig, pendingTasks, workers, - workerCapacityFromConfig + config.getWorkerCapacityFallback() ); log.debug("More workers needed: %d", moreWorkersNeeded); From f3955cefa5b3d833887f28ef1e93f6a883d304d5 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Wed, 14 Jul 2021 02:29:53 +0700 Subject: [PATCH 4/9] address comments --- .../indexing/overlord/autoscaling/gce/GceAutoScaler.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java index 3c8f52915017..e307b3217f7c 100644 --- a/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java +++ b/extensions-contrib/gce-extensions/src/main/java/org/apache/druid/indexing/overlord/autoscaling/gce/GceAutoScaler.java @@ -80,8 +80,8 @@ public GceAutoScaler( @JsonProperty("envConfig") GceEnvironmentConfig envConfig ) { - Preconditions.checkArgument(minNumWorkers > 0, - "minNumWorkers must be greater than 0"); + Preconditions.checkArgument(minNumWorkers >= 0, + "minNumWorkers must be greater than or equal to 0"); this.minNumWorkers = minNumWorkers; Preconditions.checkArgument(maxNumWorkers > 0, "maxNumWorkers must be greater than 0"); From d6bb9a850451df70947d2b77f620cb7300e69622 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Wed, 14 Jul 2021 03:13:52 +0700 Subject: [PATCH 5/9] address comments --- .../PendingTaskBasedWorkerProvisioningStrategy.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java index 48139d910c8f..dc1468c71d86 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java @@ -246,11 +246,11 @@ private int getScaleUpNodeCount( log.info("Min/max workers: %d/%d", minWorkerCount, maxWorkerCount); final int currValidWorkers = getCurrValidWorkers(workers); - // If there are no worker and workerCapacityFallback config is not set (-1) then spin up minWorkerCount + // If there are no worker and workerCapacityFallback config is not set (-1) or invalid (<= 0), then spin up minWorkerCount // as we cannot determine the exact capacity here to fulfill the need. // However, if there are no worker but workerCapacityFallback config is set (>0), then we can // determine the number of workers needed using workerCapacityFallback config as expected worker capacity - int moreWorkersNeeded = currValidWorkers == 0 && config.getWorkerCapacityFallback() > 0 ? minWorkerCount : getWorkersNeededToAssignTasks( + int moreWorkersNeeded = currValidWorkers == 0 && config.getWorkerCapacityFallback() <= 0 ? minWorkerCount : getWorkersNeededToAssignTasks( remoteTaskRunnerConfig, workerConfig, pendingTasks, @@ -284,7 +284,7 @@ private int getWorkersNeededToAssignTasks( final DefaultWorkerBehaviorConfig workerConfig, final Collection pendingTasks, final Collection workers, - final Integer workerCapacityFallback + final int workerCapacityFallback ) { final Collection validWorkers = Collections2.filter( @@ -445,12 +445,12 @@ private int getCurrValidWorkers(Collection workers) return currValidWorkers; } - private static int getExpectedWorkerCapacity(final Collection workers, final Integer workerCapacityFallback) + private static int getExpectedWorkerCapacity(final Collection workers, final int workerCapacityFallback) { int size = workers.size(); if (size == 0) { // No existing workers - if (workerCapacityFallback != null) { + if (workerCapacityFallback > 0) { // Return workerCapacityFallback if it is set in config return workerCapacityFallback; } else { From 06441b952b7d44a9efb669d781e2c4d63eb41d8e Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Wed, 14 Jul 2021 14:52:06 +0700 Subject: [PATCH 6/9] address comments --- docs/configuration/index.md | 2 +- ...dingTaskBasedWorkerProvisioningConfig.java | 10 +-- ...ngTaskBasedWorkerProvisioningStrategy.java | 55 ++++++++----- .../SimpleWorkerProvisioningStrategy.java | 4 +- ...dingTaskBasedProvisioningStrategyTest.java | 79 +++++++++++++------ 5 files changed, 97 insertions(+), 53 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index c42252d5609e..dd0a4f2c0c5c 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -1015,7 +1015,7 @@ There are additional configs for autoscaling (if it is enabled): |`druid.indexer.autoscale.pendingTaskTimeout`|How long a task can be in "pending" state before the Overlord tries to scale up.|PT30S| |`druid.indexer.autoscale.workerVersion`|If set, will only create nodes of set version during autoscaling. Overrides dynamic configuration. |null| |`druid.indexer.autoscale.workerPort`|The port that MiddleManagers will run on.|8080| -|`druid.indexer.autoscale.workerCapacityFallback`| Worker capacity for determining the number of workers needed for auto scaling when there is currently no worker running. If unset or set to value of 0 or less, auto scaler will scale to `minNumWorkers` in autoScaler config instead. Note: this config is only applicable to `pendingTaskBased` provisioning strategy|-1| +|`druid.indexer.autoscale.workerCapacityHint`| Worker capacity for determining the number of workers needed for auto scaling when there is currently no worker running. If unset or set to value of 0 or less, auto scaler will scale to `minNumWorkers` in autoScaler config instead. This value should typically be equal to `druid.worker.capacity` when you have a homogeneous cluster and the average of `druid.worker.capacity` across the workers when you have a heterogeneous cluster. Note: this config is only applicable to `pendingTaskBased` provisioning strategy|-1| ##### Supervisors diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java index 4a5830d4dbe9..4ab98edc6d8e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningConfig.java @@ -30,7 +30,7 @@ public class PendingTaskBasedWorkerProvisioningConfig extends SimpleWorkerProvis private int maxScalingStep = 10; @JsonProperty - private int workerCapacityFallback = -1; + private int workerCapacityHint = -1; public int getMaxScalingStep() { @@ -78,14 +78,14 @@ public PendingTaskBasedWorkerProvisioningConfig setPendingTaskTimeout(Period pen return this; } - public int getWorkerCapacityFallback() + public int getWorkerCapacityHint() { - return workerCapacityFallback; + return workerCapacityHint; } - public PendingTaskBasedWorkerProvisioningConfig setWorkerCapacityFallback(int workerCapacityFallback) + public PendingTaskBasedWorkerProvisioningConfig setWorkerCapacityHint(int workerCapacityHint) { - this.workerCapacityFallback = workerCapacityFallback; + this.workerCapacityHint = workerCapacityHint; return this; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java index dc1468c71d86..e6e80bad0e29 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.overlord.autoscaling; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Predicate; @@ -60,11 +61,14 @@ public class PendingTaskBasedWorkerProvisioningStrategy extends AbstractWorkerPr { private static final EmittingLogger log = new EmittingLogger(PendingTaskBasedWorkerProvisioningStrategy.class); + public static final String ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET = "As minNumWorkers is set to 0, workerCapacityHint must be greater than 0. workerCapacityHint value set is %d"; private static final String SCHEME = "http"; + @VisibleForTesting @Nullable - static DefaultWorkerBehaviorConfig getDefaultWorkerBehaviorConfig( + public static DefaultWorkerBehaviorConfig getDefaultWorkerBehaviorConfig( Supplier workerConfigRef, + SimpleWorkerProvisioningConfig config, String action, EmittingLogger log ) @@ -87,6 +91,17 @@ static DefaultWorkerBehaviorConfig getDefaultWorkerBehaviorConfig( log.error("No autoScaler available, cannot %s workers", action); return null; } + if (workerConfig.getAutoScaler().getMinNumWorkers() == 0) { + if (!(config instanceof PendingTaskBasedWorkerProvisioningConfig)) { + log.error("PendingTaskBasedWorkerProvisioningConfig must be provided"); + return null; + } + int workerCapacityHint = ((PendingTaskBasedWorkerProvisioningConfig) config).getWorkerCapacityHint(); + if (workerCapacityHint <= 0) { + log.error(ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET, workerCapacityHint); + return null; + } + } return workerConfig; } @@ -157,7 +172,7 @@ public synchronized boolean doProvision() Collection workers = runner.getWorkers(); log.debug("Workers: %d %s", workers.size(), workers); boolean didProvision = false; - final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, "provision", log); + final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, config,"provision", log); if (workerConfig == null) { return false; } @@ -246,17 +261,19 @@ private int getScaleUpNodeCount( log.info("Min/max workers: %d/%d", minWorkerCount, maxWorkerCount); final int currValidWorkers = getCurrValidWorkers(workers); - // If there are no worker and workerCapacityFallback config is not set (-1) or invalid (<= 0), then spin up minWorkerCount + // If there are no worker and workerCapacityHint config is not set (-1) or invalid (<= 0), then spin up minWorkerCount // as we cannot determine the exact capacity here to fulfill the need. - // However, if there are no worker but workerCapacityFallback config is set (>0), then we can - // determine the number of workers needed using workerCapacityFallback config as expected worker capacity - int moreWorkersNeeded = currValidWorkers == 0 && config.getWorkerCapacityFallback() <= 0 ? minWorkerCount : getWorkersNeededToAssignTasks( - remoteTaskRunnerConfig, - workerConfig, - pendingTasks, - workers, - config.getWorkerCapacityFallback() - ); + // However, if there are no worker but workerCapacityHint config is set (>0), then we can + // determine the number of workers needed using workerCapacityHint config as expected worker capacity + int moreWorkersNeeded = currValidWorkers == 0 && config.getWorkerCapacityHint() <= 0 + ? minWorkerCount + : getWorkersNeededToAssignTasks( + remoteTaskRunnerConfig, + workerConfig, + pendingTasks, + workers, + config.getWorkerCapacityHint() + ); log.debug("More workers needed: %d", moreWorkersNeeded); int want = Math.max( @@ -284,7 +301,7 @@ private int getWorkersNeededToAssignTasks( final DefaultWorkerBehaviorConfig workerConfig, final Collection pendingTasks, final Collection workers, - final int workerCapacityFallback + final int workerCapacityHint ) { final Collection validWorkers = Collections2.filter( @@ -299,7 +316,7 @@ private int getWorkersNeededToAssignTasks( } WorkerSelectStrategy workerSelectStrategy = workerConfig.getSelectStrategy(); int need = 0; - int capacity = getExpectedWorkerCapacity(workers, workerCapacityFallback); + int capacity = getExpectedWorkerCapacity(workers, workerCapacityHint); log.info("Expected worker capacity: %d", capacity); // Simulate assigning tasks to dummy workers using configured workerSelectStrategy @@ -337,7 +354,7 @@ public synchronized boolean doTerminate() { Collection zkWorkers = runner.getWorkers(); log.debug("Workers: %d [%s]", zkWorkers.size(), zkWorkers); - final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, "terminate", log); + final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, config,"terminate", log); if (workerConfig == null) { return false; } @@ -445,14 +462,14 @@ private int getCurrValidWorkers(Collection workers) return currValidWorkers; } - private static int getExpectedWorkerCapacity(final Collection workers, final int workerCapacityFallback) + private static int getExpectedWorkerCapacity(final Collection workers, final int workerCapacityHint) { int size = workers.size(); if (size == 0) { // No existing workers - if (workerCapacityFallback > 0) { - // Return workerCapacityFallback if it is set in config - return workerCapacityFallback; + if (workerCapacityHint > 0) { + // Return workerCapacityHint if it is set in config + return workerCapacityHint; } else { // Assume capacity per worker as 1 return 1; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java index a17014c29c56..4b1337a8336a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java @@ -121,7 +121,7 @@ public synchronized boolean doProvision() Collection workers = runner.getWorkers(); boolean didProvision = false; final DefaultWorkerBehaviorConfig workerConfig = - PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, "provision", log); + PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, config,"provision", log); if (workerConfig == null) { return false; } @@ -186,7 +186,7 @@ public synchronized boolean doTerminate() { Collection pendingTasks = runner.getPendingTasks(); final DefaultWorkerBehaviorConfig workerConfig = - PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, "terminate", log); + PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, config,"terminate", log); if (workerConfig == null) { return false; } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java index 3c010f617ba9..a6fbbe233df9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java @@ -44,6 +44,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceEventBuilder; +import org.easymock.Capture; import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Period; @@ -107,10 +108,35 @@ public ScheduledExecutorService get() ); } + @Test + public void testFailIfMinWorkerIsZeroAndWorkerHintNotSet() + { + EmittingLogger mockLogger = EasyMock.createMock(EmittingLogger.class); + Capture capturedArgument = Capture.newInstance(); + mockLogger.error(EasyMock.capture(capturedArgument), EasyMock.anyInt()); + + PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() + .setMaxScalingDuration(new Period(1000)) + .setNumEventsToTrack(10) + .setPendingTaskTimeout(new Period(0)) + .setWorkerVersion(MIN_VERSION) + .setMaxScalingStep(2); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.replay(autoScaler, mockLogger); + DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig( + DSuppliers.of(workerConfig), + config, + "test", + mockLogger + ); + Assert.assertNull(defaultWorkerBehaviorConfig); + Assert.assertEquals(PendingTaskBasedWorkerProvisioningStrategy.ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET, capturedArgument.getValue()); + } + @Test public void testSuccessfulInitialMinWorkersProvision() { - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()); @@ -139,7 +165,7 @@ public void testSuccessfulInitialMinWorkersProvision() } @Test - public void testProvisionNoCurrentlyRunningWorkerWithCapacityFallbackSetAndNoPendingTaskShouldProvisionMinimumAsCurrentIsBelowMinimum() + public void testProvisionNoCurrentlyRunningWorkerWithCapacityHintSetAndNoPendingTaskShouldProvisionMinimumAsCurrentIsBelowMinimum() { PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() .setMaxScalingDuration(new Period(1000)) @@ -147,7 +173,7 @@ public void testProvisionNoCurrentlyRunningWorkerWithCapacityFallbackSetAndNoPen .setPendingTaskTimeout(new Period(0)) .setWorkerVersion(MIN_VERSION) .setMaxScalingStep(2) - .setWorkerCapacityFallback(30); + .setWorkerCapacityHint(30); strategy = new PendingTaskBasedWorkerProvisioningStrategy( config, DSuppliers.of(workerConfig), @@ -161,7 +187,7 @@ public ScheduledExecutorService get() } } ); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()); @@ -190,7 +216,7 @@ public ScheduledExecutorService get() } @Test - public void testProvisionNoCurrentlyRunningWorkerWithCapacityFallbackSetAndNoPendingTaskShouldNotProvisionAsMinimumIsZero() + public void testProvisionNoCurrentlyRunningWorkerWithCapacityHintSetAndNoPendingTaskShouldNotProvisionAsMinimumIsZero() { PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() .setMaxScalingDuration(new Period(1000)) @@ -198,7 +224,7 @@ public void testProvisionNoCurrentlyRunningWorkerWithCapacityFallbackSetAndNoPen .setPendingTaskTimeout(new Period(0)) .setWorkerVersion(MIN_VERSION) .setMaxScalingStep(2) - .setWorkerCapacityFallback(30); + .setWorkerCapacityHint(30); strategy = new PendingTaskBasedWorkerProvisioningStrategy( config, DSuppliers.of(workerConfig), @@ -213,7 +239,7 @@ public ScheduledExecutorService get() } ); // minWorkerCount is 0 - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()); @@ -236,7 +262,7 @@ public ScheduledExecutorService get() @Test public void testSuccessfulMinWorkersProvision() { - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()); @@ -270,7 +296,7 @@ public void testSuccessfulMinWorkersProvision() @Test public void testSuccessfulMinWorkersProvisionWithOldVersionNodeRunning() { - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()); @@ -305,7 +331,7 @@ public void testSuccessfulMinWorkersProvisionWithOldVersionNodeRunning() @Test public void testProvisioning() { - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()).times(2); @@ -354,7 +380,7 @@ public void testProvisioning() } @Test - public void testProvisionWithPendingTaskAndWorkerCapacityFallbackSetButNonEmptyCurrentlyRunningWorkerShouldUseCapcityFromRunningWorker() + public void testProvisionWithPendingTaskAndWorkerCapacityHintSetButNonEmptyCurrentlyRunningWorkerShouldUseCapcityFromRunningWorker() { PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() .setMaxScalingDuration(new Period(1000)) @@ -362,7 +388,7 @@ public void testProvisionWithPendingTaskAndWorkerCapacityFallbackSetButNonEmptyC .setPendingTaskTimeout(new Period(0)) .setWorkerVersion(MIN_VERSION) .setMaxScalingStep(2) - .setWorkerCapacityFallback(30); + .setWorkerCapacityHint(30); strategy = new PendingTaskBasedWorkerProvisioningStrategy( config, DSuppliers.of(workerConfig), @@ -376,7 +402,7 @@ public ScheduledExecutorService get() } } ); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(3); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()).times(2); @@ -429,7 +455,7 @@ public ScheduledExecutorService get() } @Test - public void testProvisionWithPendingTaskAndWorkerCapacityFallbackSetButEmptyCurrentlyRunningWorkerShouldUseCapcityFromFallbackConfig() + public void testProvisionWithPendingTaskAndWorkerCapacityHintSetButEmptyCurrentlyRunningWorkerShouldUseCapcityFromHintConfig() { PendingTaskBasedWorkerProvisioningConfig config = new PendingTaskBasedWorkerProvisioningConfig() .setMaxScalingDuration(new Period(1000)) @@ -437,7 +463,7 @@ public void testProvisionWithPendingTaskAndWorkerCapacityFallbackSetButEmptyCurr .setPendingTaskTimeout(new Period(0)) .setWorkerVersion(MIN_VERSION) .setMaxScalingStep(2) - .setWorkerCapacityFallback(30); + .setWorkerCapacityHint(30); strategy = new PendingTaskBasedWorkerProvisioningStrategy( config, DSuppliers.of(workerConfig), @@ -451,7 +477,7 @@ public ScheduledExecutorService get() } } ); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(3); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(3).times(1); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()).times(2); @@ -478,7 +504,7 @@ public ScheduledExecutorService get() Provisioner provisioner = strategy.makeProvisioner(runner); boolean provisionedSomething = provisioner.doProvision(); - // Expect to use capacity from workerCapacityFallback config (which is 30) + // Expect to use capacity from workerCapacityHint config (which is 30) // and since there are two pending tasks, we will need one more worker Assert.assertTrue(provisionedSomething); Assert.assertEquals(1, provisioner.getStats().toList().size()); @@ -509,7 +535,7 @@ public void testProvisionAlert() throws Exception EasyMock.expectLastCall(); EasyMock.replay(emitter); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2).times(1); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()).times(2); @@ -566,7 +592,7 @@ public void testProvisionAlert() throws Exception @Test public void testDoSuccessfulTerminate() { - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(new ArrayList()); EasyMock.expect(autoScaler.terminate(EasyMock.anyObject())).andReturn( @@ -586,7 +612,8 @@ public void testDoSuccessfulTerminate() ) ).times(2); EasyMock.expect(runner.getWorkers()).andReturn( - Collections.singletonList( + ImmutableList.of( + new TestZkWorker(testTask).toImmutable(), new TestZkWorker(testTask).toImmutable() ) ).times(2); @@ -610,7 +637,7 @@ public void testDoSuccessfulTerminate() @Test public void testSomethingTerminating() { - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0).times(1); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(3); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(Collections.singletonList("ip")).times(2); EasyMock.expect(autoScaler.terminate(EasyMock.anyObject())).andReturn( @@ -654,7 +681,7 @@ public void testSomethingTerminating() public void testNoActionNeeded() { EasyMock.reset(autoScaler); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(Collections.singletonList("ip")); EasyMock.replay(autoScaler); @@ -685,7 +712,7 @@ public void testNoActionNeeded() EasyMock.verify(autoScaler); EasyMock.reset(autoScaler); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(Collections.singletonList("ip")); @@ -703,7 +730,7 @@ public void testMinCountIncrease() { // Don't terminate anything EasyMock.reset(autoScaler); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(Collections.singletonList("ip")); EasyMock.replay(autoScaler); @@ -730,7 +757,7 @@ public void testMinCountIncrease() // Don't provision anything EasyMock.reset(autoScaler); - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(0); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(1).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(2); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(Collections.singletonList("ip")); @@ -741,7 +768,7 @@ public void testMinCountIncrease() EasyMock.reset(autoScaler); // Increase minNumWorkers - EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3); + EasyMock.expect(autoScaler.getMinNumWorkers()).andReturn(3).times(2); EasyMock.expect(autoScaler.getMaxNumWorkers()).andReturn(5); EasyMock.expect(autoScaler.ipToIdLookup(EasyMock.anyObject())) .andReturn(Collections.singletonList("ip")); From f5ed8df63abd00dfba9aaff41faf1c1f6aa59fa1 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Wed, 14 Jul 2021 14:55:23 +0700 Subject: [PATCH 7/9] address comments --- .../PendingTaskBasedWorkerProvisioningStrategy.java | 4 ++-- .../autoscaling/SimpleWorkerProvisioningStrategy.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java index e6e80bad0e29..8e48abd95310 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java @@ -172,7 +172,7 @@ public synchronized boolean doProvision() Collection workers = runner.getWorkers(); log.debug("Workers: %d %s", workers.size(), workers); boolean didProvision = false; - final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, config,"provision", log); + final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, config, "provision", log); if (workerConfig == null) { return false; } @@ -354,7 +354,7 @@ public synchronized boolean doTerminate() { Collection zkWorkers = runner.getWorkers(); log.debug("Workers: %d [%s]", zkWorkers.size(), zkWorkers); - final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, config,"terminate", log); + final DefaultWorkerBehaviorConfig workerConfig = getDefaultWorkerBehaviorConfig(workerConfigRef, config, "terminate", log); if (workerConfig == null) { return false; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java index 4b1337a8336a..afdaa57c565d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/SimpleWorkerProvisioningStrategy.java @@ -121,7 +121,7 @@ public synchronized boolean doProvision() Collection workers = runner.getWorkers(); boolean didProvision = false; final DefaultWorkerBehaviorConfig workerConfig = - PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, config,"provision", log); + PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, config, "provision", log); if (workerConfig == null) { return false; } @@ -186,7 +186,7 @@ public synchronized boolean doTerminate() { Collection pendingTasks = runner.getPendingTasks(); final DefaultWorkerBehaviorConfig workerConfig = - PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, config,"terminate", log); + PendingTaskBasedWorkerProvisioningStrategy.getDefaultWorkerBehaviorConfig(workerConfigRef, config, "terminate", log); if (workerConfig == null) { return false; } From 298146cca493c6c59a97bd1bbe3030d398da2bb2 Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Wed, 14 Jul 2021 15:13:12 +0700 Subject: [PATCH 8/9] address comments --- ...ndingTaskBasedWorkerProvisioningStrategy.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java index 8e48abd95310..28d1bef33693 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedWorkerProvisioningStrategy.java @@ -91,16 +91,12 @@ public static DefaultWorkerBehaviorConfig getDefaultWorkerBehaviorConfig( log.error("No autoScaler available, cannot %s workers", action); return null; } - if (workerConfig.getAutoScaler().getMinNumWorkers() == 0) { - if (!(config instanceof PendingTaskBasedWorkerProvisioningConfig)) { - log.error("PendingTaskBasedWorkerProvisioningConfig must be provided"); - return null; - } - int workerCapacityHint = ((PendingTaskBasedWorkerProvisioningConfig) config).getWorkerCapacityHint(); - if (workerCapacityHint <= 0) { - log.error(ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET, workerCapacityHint); - return null; - } + if (config instanceof PendingTaskBasedWorkerProvisioningConfig + && workerConfig.getAutoScaler().getMinNumWorkers() == 0 + && ((PendingTaskBasedWorkerProvisioningConfig) config).getWorkerCapacityHint() <= 0 + ) { + log.error(ERROR_MESSAGE_MIN_WORKER_ZERO_HINT_UNSET, ((PendingTaskBasedWorkerProvisioningConfig) config).getWorkerCapacityHint()); + return null; } return workerConfig; } From cf39ec2526edc823898ba0417de0d2897f842a5d Mon Sep 17 00:00:00 2001 From: Maytas Monsereenusorn Date: Wed, 14 Jul 2021 15:34:09 +0700 Subject: [PATCH 9/9] address comments --- .../autoscaling/PendingTaskBasedProvisioningStrategyTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java index a6fbbe233df9..34af0cadc56a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/autoscaling/PendingTaskBasedProvisioningStrategyTest.java @@ -647,7 +647,9 @@ public void testSomethingTerminating() RemoteTaskRunner runner = EasyMock.createMock(RemoteTaskRunner.class); EasyMock.expect(runner.getWorkers()).andReturn( - Collections.singletonList( + ImmutableList.of( + new TestZkWorker(testTask).toImmutable(), + new TestZkWorker(testTask).toImmutable(), new TestZkWorker(testTask).toImmutable() ) ).times(2);