Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3403,7 +3403,6 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
group.completionTimeout = DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout());
pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group);


boolean endOffsetsAreInvalid = false;
for (Entry<PartitionIdType, SequenceOffsetType> entry : endOffsets.entrySet()) {
if (entry.getValue().equals(getEndOfPartitionMarker())) {
Expand Down Expand Up @@ -3453,17 +3452,26 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException
* This method is invoked to determine whether a task count adjustment is needed
* during a task rollover based on the recommendations from the task auto-scaler.
*/

@VisibleForTesting
void maybeScaleDuringTaskRollover()
{
if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) {
int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover();
if (rolloverTaskCount > 0) {
if (rolloverTaskCount > 0 && rolloverTaskCount != getIoConfig().getTaskCount()) {
log.info("Autoscaler recommends scaling down to [%d] tasks during rollover", rolloverTaskCount);
changeTaskCountInIOConfig(rolloverTaskCount);
// Here force reset the supervisor state to be re-calculated on the next iteration of runInternal() call.
// This seems the best way to inject task amount recalculation during the rollover.
clearAllocationInfo();

ServiceMetricEvent.Builder event = ServiceMetricEvent
.builder()
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());

emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, rolloverTaskCount));
}
}
}
Expand Down Expand Up @@ -4048,7 +4056,7 @@ private Map<PartitionIdType, OrderedSequenceNumber<SequenceOffsetType>> generate
builder.put(partitionId, makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForNonFirstSequence()));
}
} else {
// if we don't have a startingOffset (first run or we had some previous failures and reset the sequences) then
// if we don't have a startingOffset (first run, or we had some previous failures and reset the sequences) then
// get the sequence from metadata storage (if available) or Kafka/Kinesis (otherwise)
OrderedSequenceNumber<SequenceOffsetType> offsetFromStorage = getOffsetFromStorageForPartition(
partitionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public SeekableStreamSupervisorIOConfig(
// Could be null
this.autoScalerConfig = autoScalerConfig;
this.autoScalerEnabled = autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler();
// if autoscaler is enabled then taskCount will be ignored here and initial taskCount will equal to taskCountStart/taskCountMin
// if autoscaler is enabled, then taskCount will be ignored here and initial taskCount will equal to taskCountStart/taskCountMin
if (autoScalerEnabled) {
final Integer startTaskCount = autoScalerConfig.getTaskCountStart();
this.taskCount = startTaskCount != null ? startTaskCount : autoScalerConfig.getTaskCountMin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;

import it.unimi.dsi.fastutil.ints.IntArraySet;
import it.unimi.dsi.fastutil.ints.IntSet;
import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
Expand All @@ -33,9 +35,7 @@
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.segment.incremental.RowIngestionMeters;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

Expand All @@ -57,6 +57,25 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler

private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
/**
* Defines the step size used for evaluating lag when computing scaling actions.
* This constant helps control the granularity of lag considerations in scaling decisions,
* ensuring smoother transitions between scaled states and avoiding abrupt changes in task counts.
*/
private static final int LAG_STEP = 100_000;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a lot of magic-looking constants here. Please add javadocs to these constants describing what they do, and consider whether any of them should be configurable to allow for experimentation without changing the code. (Of course, ideally, users do not need to configure anything.)

Copy link
Copy Markdown
Contributor Author

@Fly-Style Fly-Style Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add javadocs; I am following common sense here in terms of specific numbers.

Of course, ideally, users do not need to configure anything.

That's what I'm looking for.

Copy link
Copy Markdown
Contributor Author

@Fly-Style Fly-Style Jan 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed: caf6aa2

/**
* This parameter fine-tunes autoscaling behavior by adding extra flexibility
* when calculating maximum allowable partitions per task in response to lag,
* which must be processed as fast, as possible.
* It acts as a foundational factor that balances the responsiveness and stability of autoscaling.
*/
private static final int BASE_RAW_EXTRA = 5;
// Base PPT lag threshold allowing to activate a burst scaleup to eliminate high lag.
static final int EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD = 50_000;
// Extra PPT lag threshold allowing activation of even more aggressive scaleup to eliminate high lag,
// also enabling lag-amplified idle calculation decay in the cost function (to reduce idle weight).
static final int AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD = 100_000;

public static final String LAG_COST_METRIC = "task/autoScaler/costBased/lagCost";
public static final String IDLE_COST_METRIC = "task/autoScaler/costBased/idleCost";
public static final String OPTIMAL_TASK_COUNT_METRIC = "task/autoScaler/costBased/optimalTaskCount";
Expand Down Expand Up @@ -160,9 +179,7 @@ public CostBasedAutoScalerConfig getConfig()
* Returns -1 (no scaling needed) in the following cases:
* <ul>
* <li>Metrics are not available</li>
* <li>Task count already optimal</li>
* <li>The current idle ratio is in the ideal range and lag considered low</li>
* <li>Optimal task count equals current task count</li>
* <li>Current task count already optimal</li>
* </ul>
*
* @return optimal task count for scale-up, or -1 if no scaling action needed
Expand All @@ -180,7 +197,12 @@ int computeOptimalTaskCount(CostMetrics metrics)
return -1;
}

final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(partitionCount, currentTaskCount);
final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(
partitionCount,
currentTaskCount,
(long) metrics.getAggregateLag(),
config.getTaskCountMax()
);

if (validTaskCounts.length == 0) {
log.warn("No valid task counts after applying constraints for supervisorId [%s]", supervisorId);
Expand Down Expand Up @@ -230,32 +252,77 @@ int computeOptimalTaskCount(CostMetrics metrics)
}

/**
* Generates valid task counts based on partitions-per-task ratios.
* Generates valid task counts based on partitions-per-task ratios and lag-driven PPT relaxation.
* This enables gradual scaling and avoids large jumps.
* Limits the range of task counts considered to avoid excessive computation.
*
* @return sorted list of valid task counts within bounds
*/
static int[] computeValidTaskCounts(int partitionCount, int currentTaskCount)
static int[] computeValidTaskCounts(
int partitionCount,
int currentTaskCount,
double aggregateLag,
int taskCountMax
)
{
if (partitionCount <= 0) {
if (partitionCount <= 0 || currentTaskCount <= 0) {
return new int[]{};
}

Set<Integer> result = new HashSet<>();
IntSet result = new IntArraySet();
final int currentPartitionsPerTask = partitionCount / currentTaskCount;
final int extraIncrease = computeExtraMaxPartitionsPerTaskIncrease(
aggregateLag,
partitionCount,
currentTaskCount,
taskCountMax
);
final int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK + extraIncrease;

// Minimum partitions per task correspond to the maximum number of tasks (scale up) and vice versa.
final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - MAX_INCREASE_IN_PARTITIONS_PER_TASK);
final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - effectiveMaxIncrease);
final int maxPartitionsPerTask = Math.min(
partitionCount,
currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK
);

for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= minPartitionsPerTask; partitionsPerTask--) {
final int taskCount = (partitionCount + partitionsPerTask - 1) / partitionsPerTask;
result.add(taskCount);
if (taskCount <= taskCountMax) {
result.add(taskCount);
}
}
return result.stream().mapToInt(Integer::intValue).toArray();
return result.toIntArray();
}

/**
* Computes extra allowed increase in partitions-per-task in scenarios when the average per-partition lag
* is above the configured threshold. By default, it is {@code EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD}.
* Generally, one of the autoscaler priorities is to keep the lag as close to zero as possible.
*/
static int computeExtraMaxPartitionsPerTaskIncrease(
double aggregateLag,
int partitionCount,
int currentTaskCount,
int taskCountMax
)
{
if (partitionCount <= 0 || taskCountMax <= 0) {
return 0;
}

final double lagPerPartition = aggregateLag / partitionCount;
if (lagPerPartition < EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) {
return 0;
}

int rawExtra = BASE_RAW_EXTRA;
if (lagPerPartition > AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD) {
rawExtra += (int) ((lagPerPartition - AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD) / LAG_STEP);
}

final double headroomRatio = Math.max(0.0, 1.0 - (double) currentTaskCount / taskCountMax);
return (int) (rawExtra * headroomRatio);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class CostBasedAutoScalerConfig implements AutoScalerConfig
private final boolean enableTaskAutoScaler;
private final int taskCountMax;
private final int taskCountMin;
private final Integer taskCountStart;
private Integer taskCountStart;
private final long minTriggerScaleActionFrequencyMillis;
private final Double stopTaskCountRatio;
private final long scaleActionPeriodMillis;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,25 @@
public class WeightedCostFunction
{
private static final Logger log = new Logger(WeightedCostFunction.class);


/**
* Ideal idle ratio range boundaries.
* Idle ratio below MIN indicates tasks are overloaded (scale up needed).
* Idle ratio above MAX indicates tasks are underutilized (scale down needed).
* Represents the maximum multiplier factor applied to amplify lag-based costs in the cost computation process.
* This value is used to cap the lag amplification effect to prevent excessively high cost inflation
* caused by significant partition lag.
* It ensures that lag-related adjustments remain bounded within a reasonable range for stability of
* cost-based auto-scaling decisions.
*/
static final double IDEAL_IDLE_MIN = 0.2;
static final double IDEAL_IDLE_MAX = 0.6;

private static final double LAG_AMPLIFICATION_MAX_MULTIPLIER = 2.0;
private static final long LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION = 500_000L;
/**
* Checks if the given idle ratio is within the ideal range [{@value #IDEAL_IDLE_MIN}, {@value #IDEAL_IDLE_MAX}].
* When idle is in this range, optimal utilization has been achieved and no scaling is needed.
* It is used to calculate the denominator for the ramp formula in the cost
* computation logic. This value represents the difference between the maximum lag per
* partition (LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION) and the extra scaling activation
* lag threshold (CostBasedAutoScaler.EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD).
* <p>
* It is impacting how the cost model evaluates scaling decisions during high-lag sceario.
*/
public static boolean isIdleInIdealRange(double idleRatio)
{
return idleRatio >= IDEAL_IDLE_MIN && idleRatio <= IDEAL_IDLE_MAX;
}
private static final double RAMP_DENOMINATOR =
LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION - (double) CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD;

/**
* Computes cost for a given task count using compute time metrics.
Expand Down Expand Up @@ -104,12 +105,15 @@ public CostResult computeCost(CostMetrics metrics, int proposedTaskCount, CostBa
return new CostResult(cost, lagCost, weightedIdleCost);
}


/**
* Estimates the idle ratio for a given task count using a capacity-based linear model.
* Estimates the idle ratio for a proposed task count.
* Includes lag-based adjustment to eliminate high lag and
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here again I am wondering about how generally applicable a constant for per partition lag in the real world. Same/similar questions as from the PPT scale limits computed in CostBasedAutoScaler.

Also, in addition to the above, I think adding in this lag consideration does add some complexity here. Mainly it generally starts us down the path of making the cost function harder to easily and quickly understand for a newcomer, IMO. If this added complexity is considered a negative or "cost", the positive of improved behavior should outweigh it. So I guess that begs the question, how did we or are we going to measure the improvement that this additional logic/computation provides?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, in addition to the above, I think adding in this lag consideration does add some complexity here. Mainly it generally starts us down the path of making the cost function harder to easily and quickly understand for a newcomer, IMO.

Sometimes you want to have complex things in the project, because they make some things work slightly better. A good example is query planner / query optimizer, which we have from the Calcite side. It's not easy to enter, hard to master, but with complexity it brings a good framework to start using SQL for your database.
Same here: in order to make supervisor autoscaling work well, we need to introduce a level of complexity baked by math (the formulas are described here: #18819). During the testing, I realized it is too conservative in the high lag scenarios, and it is an attempt to tweak it a bit.

I hook up your question from general comment:

I also wonder if a more in depth technical writeup once this feature is stabilized is in order. Something that explains the method to the madness and a bit of the math. Perhaps in a brief blog post or docs page within the apache Druid website?

We must do it, but the feature is not finally stabilized; anyway, it already has a decent base.

So I guess that begs the question, how did we or are we going to measure the improvement that this additional logic/computation provides?

That's a very good question, and I would answer in the following manner: the less time we spend scaling supervisors manually / fine-tuning an autoscaler, the better the result we will receive.

* reduce predicted idle when work exists.
* <p>
* Formula: {@code predictedIdle = 1 - busyFraction / taskRatio}
* where {@code busyFraction = 1 - currentIdleRatio} and {@code taskRatio = targetTaskCount / currentTaskCount}.
* Formulas:
* {@code linearPrediction = max(0, 1 - busyFraction / taskRatio)}
* {@code lagBusyFactor = 1 - exp(-lagPerTask / LAG_SCALE_FACTOR)}
* {@code adjustedPrediction = linearPrediction × (1 - lagBusyFactor)}
*
* @param metrics current system metrics containing idle ratio and task count
* @param taskCount target task count to estimate an idle ratio for
Expand All @@ -119,7 +123,6 @@ private double estimateIdleRatio(CostMetrics metrics, int taskCount)
{
final double currentPollIdleRatio = metrics.getPollIdleRatio();

// Handle edge cases
if (currentPollIdleRatio < 0) {
// No idle data available, assume moderate idle
return 0.5;
Expand All @@ -130,13 +133,33 @@ private double estimateIdleRatio(CostMetrics metrics, int taskCount)
return currentPollIdleRatio;
}

// Capacity-based model: idle ratio reflects spare capacity per task
// Linear prediction (capacity-based) - existing logic
final double busyFraction = 1.0 - currentPollIdleRatio;
final double taskRatio = (double) taskCount / currentTaskCount;
final double predictedIdleRatio = 1.0 - busyFraction / taskRatio;
final double linearPrediction = Math.max(0.0, Math.min(1.0, 1.0 - busyFraction / taskRatio));

// Lag-based adjustment: more work per task → less idle
final double lagPerTask = metrics.getAggregateLag() / taskCount;
double lagBusyFactor = 1.0 - Math.exp(-lagPerTask / CostBasedAutoScaler.AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD);
final int partitionCount = metrics.getPartitionCount();

if (partitionCount > 0) {
final double lagPerPartition = metrics.getAggregateLag() / partitionCount;
// Lag-amplified idle decay
if (lagPerPartition >= CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) {
double ramp = Math.max(0.0,
(lagPerPartition - CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD)
/ RAMP_DENOMINATOR
);
ramp = Math.min(1.0, ramp);

final double multiplier = 1.0 + ramp * (LAG_AMPLIFICATION_MAX_MULTIPLIER - 1.0);
lagBusyFactor = Math.min(1.0, lagBusyFactor * multiplier);
}
}

// Clamp to valid range [0, 1]
return Math.max(0.0, Math.min(1.0, predictedIdleRatio));
return Math.max(0.0, linearPrediction * (1.0 - lagBusyFactor));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testAllDefaults()
}

@Test
public void testAutoScalerEnabledTrueAndFalse()
public void testAutoScalerEnabledPreservesTaskCountWhenNonNull()
{
LagAggregator lagAggregator = mock(LagAggregator.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,7 @@ public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale()
supervisor.maybeScaleDuringTaskRollover();

// Then
Assert.assertEquals(
"Task count should not change when taskAutoScaler is null",
beforeTaskCount,
(int) supervisor.getIoConfig().getTaskCount()
);
Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig());
}

@Test
Expand All @@ -88,6 +84,7 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotSc
supervisor.maybeScaleDuringTaskRollover();

// Then
Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig());
Assert.assertEquals(
"Task count should not change when rolloverTaskCount <= 0",
beforeTaskCount,
Expand All @@ -111,12 +108,11 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScal
supervisor.start();
supervisor.createAutoscaler(spec);

Assert.assertEquals(DEFAULT_TASK_COUNT, (int) supervisor.getIoConfig().getTaskCount());

// When
supervisor.maybeScaleDuringTaskRollover();

// Then
Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig());
Assert.assertEquals(
"Task count should be updated to " + targetTaskCount + " when rolloverTaskCount > 0",
targetTaskCount,
Expand Down Expand Up @@ -144,6 +140,7 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale()
supervisor.maybeScaleDuringTaskRollover();

// Then
Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig());
Assert.assertEquals(
"Task count should not change when rolloverTaskCount is 0",
beforeTaskCount,
Expand Down Expand Up @@ -201,13 +198,11 @@ public int computeTaskCountForRollover()
private static CostBasedAutoScalerConfig getCostBasedAutoScalerConfig()
{
return CostBasedAutoScalerConfig.builder()
.enableTaskAutoScaler(true)
.taskCountMax(100)
.taskCountMin(1)
.taskCountStart(DEFAULT_TASK_COUNT)
.enableTaskAutoScaler(true)
.lagWeight(0.25)
.idleWeight(0.75)
.scaleActionPeriodMillis(100)
.taskCountStart(1)
.scaleActionPeriodMillis(60000)
.build();
}

Expand Down
Loading
Loading