Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,12 @@ public int getTotalCapacity()
return Math.max(0, k8sCapacity) + Math.max(0, workerCapacity);
}

@Override
public int getMaximumCapacityWithAutoscale()
{
return workerTaskRunner.getMaximumCapacityWithAutoscale() + kubernetesTaskRunner.getMaximumCapacityWithAutoscale();
}

@Override
public int getUsedCapacity()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,4 +371,14 @@ public void test_updateLocation()
runner.updateLocation(task, TaskLocation.unknown());
verifyAll();
}

@Test
public void test_getMaximumCapacity()
{
EasyMock.expect(kubernetesTaskRunner.getMaximumCapacityWithAutoscale()).andReturn(1);
EasyMock.expect(workerTaskRunner.getMaximumCapacityWithAutoscale()).andReturn(1);
replayAll();
Assert.assertEquals(2, runner.getMaximumCapacityWithAutoscale());
verifyAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.TaskAnnouncement;
Expand Down Expand Up @@ -1648,6 +1649,35 @@ public int getTotalCapacity()
return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum();
}

/**
* Retrieves the maximum capacity of the task runner when autoscaling is enabled.*
* @return The maximum capacity as an integer value. Returns -1 if the maximum
* capacity cannot be determined or if autoscaling is not enabled.
*/
@Override
public int getMaximumCapacityWithAutoscale()
{
int maximumCapacity = -1;
WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get();
if (workerBehaviorConfig == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured");
maximumCapacity = -1;
} else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig;
if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as auto scaler not configured");
maximumCapacity = -1;
} else {
int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(getWorkers());
maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity;
}
}
return maximumCapacity;
}

@Override
public int getUsedCapacity()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
Expand All @@ -33,7 +32,6 @@
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.http.TaskStateLookup;
import org.apache.druid.indexing.overlord.http.TotalWorkerCapacityResponse;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
Expand All @@ -49,7 +47,6 @@

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -377,40 +374,11 @@ public TotalWorkerCapacityResponse getTotalWorkerCapacity()
}
TaskRunner taskRunner = taskRunnerOptional.get();

Collection<ImmutableWorkerInfo> workers = taskRunner instanceof WorkerTaskRunner ?
((WorkerTaskRunner) taskRunner).getWorkers() : ImmutableList.of();

int currentCapacity = taskRunner.getTotalCapacity();
int usedCapacity = taskRunner.getUsedCapacity();
// Calculate maximum capacity with auto scale
int maximumCapacity;
WorkerBehaviorConfig workerBehaviorConfig = getLatestWorkerConfig();
if (workerBehaviorConfig == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured");
maximumCapacity = -1;
} else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig;
if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as auto scaler not configured");
maximumCapacity = -1;
} else {
int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(workers);
maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity;
}
} else {
// Auto-scale is not using DefaultWorkerBehaviorConfig
log.debug(
"Cannot calculate maximum worker capacity as WorkerBehaviorConfig [%s] of type [%s] does not support getting max capacity",
workerBehaviorConfig,
workerBehaviorConfig.getClass().getSimpleName()
);
maximumCapacity = -1;
}

return new TotalWorkerCapacityResponse(currentCapacity, maximumCapacity, usedCapacity);
return new TotalWorkerCapacityResponse(
taskRunner.getTotalCapacity(),
taskRunner.getMaximumCapacityWithAutoscale(),
taskRunner.getUsedCapacity()
);
}

public WorkerBehaviorConfig getLatestWorkerConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,15 @@ default int getTotalCapacity()
return -1;
}

/**
* The maximum number of tasks this TaskRunner can run concurrently with autoscaling hints.
* @return -1 if this method is not implemented or capacity can't be found.
*/
default int getMaximumCapacityWithAutoscale()
{
return -1;
}

/**
* The current number of tasks this TaskRunner is running.
* Can return -1 if this method is not implemented or the # of tasks can't be found.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;
import org.apache.druid.indexing.overlord.config.HttpRemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.config.WorkerTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.WorkerSelectStrategy;
import org.apache.druid.indexing.worker.TaskAnnouncement;
Expand Down Expand Up @@ -1791,6 +1792,36 @@ public int getTotalCapacity()
return getWorkers().stream().mapToInt(workerInfo -> workerInfo.getWorker().getCapacity()).sum();
}


/**
* Retrieves the maximum capacity of the task runner when autoscaling is enabled.*
* @return The maximum capacity as an integer value. Returns -1 if the maximum
* capacity cannot be determined or if autoscaling is not enabled.
*/
@Override
public int getMaximumCapacityWithAutoscale()
{
int maximumCapacity = -1;
WorkerBehaviorConfig workerBehaviorConfig = workerConfigRef.get();
if (workerBehaviorConfig == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as worker behavior config is not configured");
maximumCapacity = -1;
} else if (workerBehaviorConfig instanceof DefaultWorkerBehaviorConfig) {
DefaultWorkerBehaviorConfig defaultWorkerBehaviorConfig = (DefaultWorkerBehaviorConfig) workerBehaviorConfig;
if (defaultWorkerBehaviorConfig.getAutoScaler() == null) {
// Auto scale not setup
log.debug("Cannot calculate maximum worker capacity as auto scaler not configured");
maximumCapacity = -1;
} else {
int maxWorker = defaultWorkerBehaviorConfig.getAutoScaler().getMaxNumWorkers();
int expectedWorkerCapacity = provisioningStrategy.getExpectedWorkerCapacity(getWorkers());
maximumCapacity = expectedWorkerCapacity == -1 ? -1 : maxWorker * expectedWorkerCapacity;
}
}
return maximumCapacity;
}

@Override
public int getUsedCapacity()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.overlord.config.RemoteTaskRunnerConfig;
import org.apache.druid.indexing.overlord.setup.DefaultWorkerBehaviorConfig;
import org.apache.druid.indexing.overlord.setup.EqualDistributionWorkerSelectStrategy;
import org.apache.druid.indexing.worker.Worker;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.common.DateTimes;
Expand Down Expand Up @@ -153,6 +155,7 @@ public void testRun() throws Exception
Assert.assertEquals(3, remoteTaskRunner.getIdleTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(0, remoteTaskRunner.getUsedTaskSlotCount().get(WorkerConfig.DEFAULT_CATEGORY).longValue());
Assert.assertEquals(3, remoteTaskRunner.getTotalCapacity());
Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacityWithAutoscale());
Assert.assertEquals(0, remoteTaskRunner.getUsedCapacity());


Expand Down Expand Up @@ -608,6 +611,46 @@ public void testRunPendingTaskTimeoutToAssign() throws Exception
);
}

@Test
public void testGetMaximumCapacity_noWorkerConfig()
{
httpClient = EasyMock.createMock(HttpClient.class);
remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(
new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD),
new TestProvisioningStrategy<>(),
httpClient,
null
);
Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacityWithAutoscale());
}

@Test
public void testGetMaximumCapacity_noAutoScaler()
{
httpClient = EasyMock.createMock(HttpClient.class);
remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(
new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD),
new TestProvisioningStrategy<>(),
httpClient,
new DefaultWorkerBehaviorConfig(new EqualDistributionWorkerSelectStrategy(null), null)
);
Assert.assertEquals(-1, remoteTaskRunner.getMaximumCapacityWithAutoscale());
}

@Test
public void testGetMaximumCapacity_withAutoScaler()
{
httpClient = EasyMock.createMock(HttpClient.class);
remoteTaskRunner = rtrTestUtils.makeRemoteTaskRunner(
new TestRemoteTaskRunnerConfig(TIMEOUT_PERIOD),
new TestProvisioningStrategy<>(),
httpClient,
DefaultWorkerBehaviorConfig.defaultConfig()
);
// Default autoscaler has max workers of 0
Assert.assertEquals(0, remoteTaskRunner.getMaximumCapacityWithAutoscale());
}

private void doSetup() throws Exception
{
makeWorker();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,21 @@ public RemoteTaskRunner makeRemoteTaskRunner(
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy,
HttpClient httpClient
)
{
return makeRemoteTaskRunner(
config,
provisioningStrategy,
httpClient,
DefaultWorkerBehaviorConfig.defaultConfig()
);
}

public RemoteTaskRunner makeRemoteTaskRunner(
RemoteTaskRunnerConfig config,
ProvisioningStrategy<WorkerTaskRunner> provisioningStrategy,
HttpClient httpClient,
WorkerBehaviorConfig workerBehaviorConfig
)
{
RemoteTaskRunner remoteTaskRunner = new TestableRemoteTaskRunner(
jsonMapper,
Expand All @@ -134,7 +149,7 @@ public String getBase()
cf,
new PathChildrenCacheFactory.Builder(),
httpClient,
DSuppliers.of(new AtomicReference<>(DefaultWorkerBehaviorConfig.defaultConfig())),
DSuppliers.of(new AtomicReference<>(workerBehaviorConfig)),
provisioningStrategy
);

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package org.apache.druid.indexing.overlord;

import org.apache.druid.indexing.overlord.autoscaling.ProvisioningService;
import org.apache.druid.indexing.overlord.autoscaling.ProvisioningStrategy;
import org.apache.druid.indexing.overlord.autoscaling.ScalingStats;

import javax.annotation.Nonnull;
import java.util.Collection;


public class TestProvisioningStrategy<T extends TaskRunner> implements ProvisioningStrategy<T>
{
@Override
public ProvisioningService makeProvisioningService(T runner)
{
return new ProvisioningService()
{
@Override
public void close()
{
// nothing to close
}

@Override
public ScalingStats getStats()
{
return null;
}
};
}

@Override
public int getExpectedWorkerCapacity(@Nonnull Collection<ImmutableWorkerInfo> workers)
{
return 1;
}
}
Loading