From 2a797476b8d4341fd25accf256bfa89449f4316c Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Wed, 21 Jan 2026 16:09:45 +0200 Subject: [PATCH 1/4] Adjust cost-based autoscaler algorithm --- .../autoscaler/CostBasedAutoScaler.java | 75 +++++++- .../autoscaler/WeightedCostFunction.java | 42 ++++- .../autoscaler/CostBasedAutoScalerTest.java | 164 ++++++++++++++++-- .../autoscaler/WeightedCostFunctionTest.java | 134 ++++++++------ 4 files changed, 330 insertions(+), 85 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java index 14e3ca2cf5e8..d3f0c16009bf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java @@ -57,6 +57,11 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2; private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2; + private static final int LAG_STEP = 100_000; + private static final int BASE_RAW_EXTRA = 5; + static final int LAG_ACTIVATION_THRESHOLD = 50_000; + static final int LAG_AGGRESSIVE_THRESHOLD = 100_000; + public static final String LAG_COST_METRIC = "task/autoScaler/costBased/lagCost"; public static final String IDLE_COST_METRIC = "task/autoScaler/costBased/idleCost"; public static final String OPTIMAL_TASK_COUNT_METRIC = "task/autoScaler/costBased/optimalTaskCount"; @@ -100,7 +105,10 @@ public CostBasedAutoScaler( public void start() { autoscalerExecutor.scheduleAtFixedRate( - supervisor.buildDynamicAllocationTask(this::computeTaskCountForScaleAction, () -> {}, emitter), + supervisor.buildDynamicAllocationTask( + this::computeTaskCountForScaleAction, () -> { + }, emitter + ), config.getScaleActionPeriodMillis(), config.getScaleActionPeriodMillis(), TimeUnit.MILLISECONDS @@ -173,7 +181,13 @@ int computeOptimalTaskCount(CostMetrics metrics) return -1; } - final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(partitionCount, currentTaskCount); + final int actualTaskCountMax = Math.min(config.getTaskCountMax(), partitionCount); + final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts( + partitionCount, + currentTaskCount, + (long) metrics.getAggregateLag(), + actualTaskCountMax + ); if (validTaskCounts.length == 0) { log.warn("No valid task counts after applying constraints for supervisorId [%s]", supervisorId); @@ -223,22 +237,35 @@ int computeOptimalTaskCount(CostMetrics metrics) } /** - * Generates valid task counts based on partitions-per-task ratios. + * Generates valid task counts based on partitions-per-task ratios and lag-driven PPT relaxation. * This enables gradual scaling and avoids large jumps. * Limits the range of task counts considered to avoid excessive computation. * * @return sorted list of valid task counts within bounds */ - static int[] computeValidTaskCounts(int partitionCount, int currentTaskCount) + static int[] computeValidTaskCounts( + int partitionCount, + int currentTaskCount, + double aggregateLag, + int taskCountMax + ) { - if (partitionCount <= 0) { + if (partitionCount <= 0 || currentTaskCount <= 0 || taskCountMax <= 0) { return new int[]{}; } Set result = new HashSet<>(); final int currentPartitionsPerTask = partitionCount / currentTaskCount; + final int extraIncrease = computeExtraMaxPartitionsPerTaskIncrease( + aggregateLag, + partitionCount, + currentTaskCount, + taskCountMax + ); + final int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK + extraIncrease; + // Minimum partitions per task correspond to the maximum number of tasks (scale up) and vice versa. - final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - MAX_INCREASE_IN_PARTITIONS_PER_TASK); + final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - effectiveMaxIncrease); final int maxPartitionsPerTask = Math.min( partitionCount, currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK @@ -246,9 +273,41 @@ static int[] computeValidTaskCounts(int partitionCount, int currentTaskCount) for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= minPartitionsPerTask; partitionsPerTask--) { final int taskCount = (partitionCount + partitionsPerTask - 1) / partitionsPerTask; - result.add(taskCount); + if (taskCount <= taskCountMax) { + result.add(taskCount); + } + } + return result.stream().mapToInt(Integer::intValue).sorted().toArray(); + } + + /** + * Computes extra allowed increase in partitions-per-task in scenarios when the average + * per-partition lag is relatively high. + * Generally, one of the autoscaler priorities is to keep the lag as close to zero as possible. + */ + static int computeExtraMaxPartitionsPerTaskIncrease( + double aggregateLag, + int partitionCount, + int currentTaskCount, + int taskCountMax + ) + { + if (partitionCount <= 0 || taskCountMax <= 0) { + return 0; } - return result.stream().mapToInt(Integer::intValue).toArray(); + + final double lagPerPartition = aggregateLag / partitionCount; + if (lagPerPartition < LAG_ACTIVATION_THRESHOLD) { + return 0; + } + + int rawExtra = BASE_RAW_EXTRA; + if (lagPerPartition > LAG_AGGRESSIVE_THRESHOLD) { + rawExtra += (int) ((lagPerPartition - LAG_AGGRESSIVE_THRESHOLD) / LAG_STEP); + } + + final double headroomRatio = Math.max(0.0, 1.0 - (double) currentTaskCount / taskCountMax); + return (int) (rawExtra * headroomRatio); } /** diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java index 0da733ef9e71..4cc01a5e070a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java @@ -21,6 +21,8 @@ import org.apache.druid.java.util.common.logger.Logger; +import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.LAG_ACTIVATION_THRESHOLD; + /** * Weighted cost function using compute time as the core metric. * Costs represent actual time in seconds, making them intuitive and debuggable. @@ -29,7 +31,11 @@ public class WeightedCostFunction { private static final Logger log = new Logger(WeightedCostFunction.class); - + private static final double HIHG_LAG_SCALE_FACTOR = 100_000.0; + private static final double LAG_AMPLIFICATION_MAX_MULTIPLIER = 2.0; + private static final long LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION = 500_000L; + private static final double RAMP_DENOMINATOR = LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION + - (double) LAG_ACTIVATION_THRESHOLD; /** * Ideal idle ratio range boundaries. @@ -106,10 +112,14 @@ public CostResult computeCost(CostMetrics metrics, int proposedTaskCount, CostBa /** - * Estimates the idle ratio for a given task count using a capacity-based linear model. + * Estimates the idle ratio for a proposed task count. + * Includes lag-based adjustment to eliminate high lag and + * reduce predicted idle when work exists. *

- * Formula: {@code predictedIdle = 1 - busyFraction / taskRatio} - * where {@code busyFraction = 1 - currentIdleRatio} and {@code taskRatio = targetTaskCount / currentTaskCount}. + * Formulas: + * {@code linearPrediction = max(0, 1 - busyFraction / taskRatio)} + * {@code lagBusyFactor = 1 - exp(-lagPerTask / LAG_SCALE_FACTOR)} + * {@code adjustedPrediction = linearPrediction × (1 - lagBusyFactor)} * * @param metrics current system metrics containing idle ratio and task count * @param taskCount target task count to estimate an idle ratio for @@ -119,7 +129,6 @@ private double estimateIdleRatio(CostMetrics metrics, int taskCount) { final double currentPollIdleRatio = metrics.getPollIdleRatio(); - // Handle edge cases if (currentPollIdleRatio < 0) { // No idle data available, assume moderate idle return 0.5; @@ -130,13 +139,30 @@ private double estimateIdleRatio(CostMetrics metrics, int taskCount) return currentPollIdleRatio; } - // Capacity-based model: idle ratio reflects spare capacity per task + // Linear prediction (capacity-based) - existing logic final double busyFraction = 1.0 - currentPollIdleRatio; final double taskRatio = (double) taskCount / currentTaskCount; - final double predictedIdleRatio = 1.0 - busyFraction / taskRatio; + final double linearPrediction = Math.max(0.0, Math.min(1.0, 1.0 - busyFraction / taskRatio)); + + // Lag-based adjustment: more work per task → less idle + final double lagPerTask = metrics.getAggregateLag() / taskCount; + double lagBusyFactor = 1.0 - Math.exp(-lagPerTask / HIHG_LAG_SCALE_FACTOR); + final int partitionCount = metrics.getPartitionCount(); + + if (partitionCount > 0) { + final double lagPerPartition = metrics.getAggregateLag() / partitionCount; + // Lag-amplified idle decay + if (lagPerPartition >= LAG_ACTIVATION_THRESHOLD) { + double ramp = Math.max(0.0, (lagPerPartition - LAG_ACTIVATION_THRESHOLD) / RAMP_DENOMINATOR); + ramp = Math.min(1.0, ramp); + + final double multiplier = 1.0 + ramp * (LAG_AMPLIFICATION_MAX_MULTIPLIER - 1.0); + lagBusyFactor = Math.min(1.0, lagBusyFactor * multiplier); + } + } // Clamp to valid range [0, 1] - return Math.max(0.0, Math.min(1.0, predictedIdleRatio)); + return Math.max(0.0, linearPrediction * (1.0 - lagBusyFactor)); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java index caf5453f5217..b458ceac22cd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java @@ -30,6 +30,7 @@ import org.junit.Test; import org.mockito.Mockito; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -37,11 +38,16 @@ import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME; import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIVE_MINUTE_NAME; import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.ONE_MINUTE_NAME; +import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.LAG_ACTIVATION_THRESHOLD; +import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeExtraMaxPartitionsPerTaskIncrease; +import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeValidTaskCounts; import static org.mockito.Mockito.when; +@SuppressWarnings("SameParameterValue") public class CostBasedAutoScalerTest { private CostBasedAutoScaler autoScaler; + private CostBasedAutoScalerConfig config; @Before public void setUp() @@ -55,13 +61,13 @@ public void setUp() when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig); when(mockIoConfig.getStream()).thenReturn("test-stream"); - CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(0.6) - .idleWeight(0.4) - .build(); + config = CostBasedAutoScalerConfig.builder() + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.6) + .idleWeight(0.4) + .build(); autoScaler = new CostBasedAutoScaler(mockSupervisor, config, mockSupervisorSpec, mockEmitter); } @@ -70,26 +76,122 @@ public void setUp() public void testComputeValidTaskCounts() { // For 100 partitions at 25 tasks (4 partitions/task), valid counts include 25 and 34 - int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(100, 25); + int[] validTaskCounts = computeValidTaskCounts(100, 25, 0L, 100); Assert.assertTrue("Should contain the current task count", contains(validTaskCounts, 25)); Assert.assertTrue("Should contain the next scale-up option", contains(validTaskCounts, 34)); // Edge cases - Assert.assertEquals("Zero partitions return empty array", 0, CostBasedAutoScaler.computeValidTaskCounts(0, 10).length); - Assert.assertEquals("Negative partitions return empty array", 0, CostBasedAutoScaler.computeValidTaskCounts(-5, 10).length); + Assert.assertEquals(0, computeValidTaskCounts(0, 10, 0L, 100).length); + Assert.assertEquals(0, computeValidTaskCounts(-5, 10, 0L, 100).length); // Single partition - int[] singlePartition = CostBasedAutoScaler.computeValidTaskCounts(1, 1); + int[] singlePartition = computeValidTaskCounts(1, 1, 0L, 100); Assert.assertTrue("Single partition should have at least one valid count", singlePartition.length > 0); Assert.assertTrue("Single partition should contain 1", contains(singlePartition, 1)); // Current exceeds partitions - should still yield valid, deduplicated options - int[] exceedsPartitions = CostBasedAutoScaler.computeValidTaskCounts(2, 5); + int[] exceedsPartitions = computeValidTaskCounts(2, 5, 0L, 100); Assert.assertEquals(2, exceedsPartitions.length); Assert.assertTrue(contains(exceedsPartitions, 1)); Assert.assertTrue(contains(exceedsPartitions, 2)); } + @Test + public void testComputeValidTaskCountsLagExpansion() + { + int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 30); + Assert.assertFalse("Low lag should not include max task count", contains(lowLagCounts, 30)); + Assert.assertTrue("Low lag should cap scale up around 4 tasks", contains(lowLagCounts, 4)); + + long highAggregateLag = 30L * 500_000L; + int[] highLagCounts = computeValidTaskCounts(30, 3, highAggregateLag, 30); + Assert.assertTrue("High lag should allow scaling to max tasks", contains(highLagCounts, 30)); + } + + @Test + public void testComputeValidTaskCountsRespectsTaskCountMax() + { + long highAggregateLag = 30L * 500_000L; + int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 3); + Assert.assertTrue("Should include taskCountMax when doable", contains(cappedCounts, 3)); + Assert.assertFalse("Should not exceed taskCountMax", contains(cappedCounts, 4)); + } + + @Test + public void testScalingExamplesTable() + { + int partitionCount = 30; + int taskCountMax = 30; + double pollIdleRatio = 0.1; + double avgProcessingRate = 10.0; + + class Example + { + final int currentTasks; + final long lagPerPartition; + final int expectedTasks; + + Example(int currentTasks, long lagPerPartition, int expectedTasks) + { + this.currentTasks = currentTasks; + this.lagPerPartition = lagPerPartition; + this.expectedTasks = expectedTasks; + } + } + + Example[] examples = new Example[]{ + new Example(3, 50_000L, 8), + new Example(3, 300_000L, 15), + new Example(3, 500_000L, 30), + new Example(10, 100_000L, 15), + new Example(10, 300_000L, 30), + new Example(10, 500_000L, 30), + new Example(20, 500_000L, 30), + new Example(25, 500_000L, 30) + }; + + for (Example example : examples) { + long aggregateLag = example.lagPerPartition * partitionCount; + int[] validCounts = computeValidTaskCounts(partitionCount, example.currentTasks, aggregateLag, taskCountMax); + Assert.assertTrue( + "Should include expected task count for current=" + example.currentTasks + ", lag=" + example.lagPerPartition, + contains(validCounts, example.expectedTasks) + ); + + CostMetrics metrics = createMetricsWithRate( + example.lagPerPartition, + example.currentTasks, + partitionCount, + pollIdleRatio, + avgProcessingRate + ); + int actualOptimal = autoScaler.computeOptimalTaskCount(metrics); + if (actualOptimal == -1) { + actualOptimal = example.currentTasks; + } + Assert.assertEquals( + "Optimal task count should match for current=" + example.currentTasks + + ", lag=" + example.lagPerPartition + + ", valid=" + Arrays.toString(validCounts), + example.expectedTasks, + actualOptimal + ); + } + } + + @Test + public void testComputeExtraPPTIncrease() + { + // No extra increase below the threshold + Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L * 49_000L, 30, 3, 30)); + Assert.assertEquals(4, computeExtraMaxPartitionsPerTaskIncrease(30L * LAG_ACTIVATION_THRESHOLD, 30, 3, 30)); + + // More aggressive increase when the lag is high + Assert.assertEquals(6, computeExtraMaxPartitionsPerTaskIncrease(30L * 300_000L, 30, 3, 30)); + // Zero when on max task count + Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L * 500_000L, 30, 30, 30)); + } + @Test public void testComputeOptimalTaskCountInvalidInputs() { @@ -110,7 +212,7 @@ public void testComputeOptimalTaskCountScaling() int highIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9)); Assert.assertTrue("Scale down scenario should return optimal <= current", highIdleResult <= 50); - // With low idle and balanced weights, algorithm should not scale up aggressively + // With low idle and balanced weights, the algorithm should not scale up aggressively int lowIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1)); Assert.assertTrue("With low idle and balanced weights, should not scale up aggressively", lowIdleResult <= 25); } @@ -194,10 +296,16 @@ public void testExtractMovingAverageIntervalFallback() { // 15-minute average is preferred Map> fifteenMin = new HashMap<>(); - fifteenMin.put("0", Collections.singletonMap("task-0", buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 1500.0))); + fifteenMin.put( + "0", + Collections.singletonMap( + "task-0", + buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 1500.0) + ) + ); Assert.assertEquals(1500.0, CostBasedAutoScaler.extractMovingAverage(fifteenMin), 0.0001); - // 1-minute as final fallback + // 1-minute as a final fallback Map> oneMin = new HashMap<>(); oneMin.put("0", Collections.singletonMap("task-0", buildTaskStatsWithMovingAverageForInterval(ONE_MINUTE_NAME, 500.0))); Assert.assertEquals(500.0, CostBasedAutoScaler.extractMovingAverage(oneMin), 0.0001); @@ -328,7 +436,7 @@ public void testComputeTaskCountForRolloverReturnsMinusOneWhenLagStatsNull() @Test public void testComputeTaskCountForRolloverReturnsMinusOneWhenNoMetrics() { - // Tests the case where lastKnownMetrics is null (no computeTaskCountForScaleAction called) + // Tests the case where the lastKnownMetrics is null (no computeTaskCountForScaleAction called) SupervisorSpec spec = Mockito.mock(SupervisorSpec.class); SeekableStreamSupervisor supervisor = Mockito.mock(SeekableStreamSupervisor.class); ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class); @@ -369,6 +477,24 @@ private CostMetrics createMetrics( ); } + private CostMetrics createMetricsWithRate( + double avgPartitionLag, + int currentTaskCount, + int partitionCount, + double pollIdleRatio, + double avgProcessingRate + ) + { + return new CostMetrics( + avgPartitionLag, + currentTaskCount, + partitionCount, + pollIdleRatio, + 3600, + avgProcessingRate + ); + } + private boolean contains(int[] array, int value) { for (int i : array) { @@ -434,7 +560,11 @@ private Map buildTaskStatsWithMultipleMovingAverages( return taskStats; } - private Map buildTaskStatsWithNullInterval(String nullInterval, String validInterval, double processedRate) + private Map buildTaskStatsWithNullInterval( + String nullInterval, + String validInterval, + double processedRate + ) { Map buildSegments = new HashMap<>(); buildSegments.put(nullInterval, null); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java index 90b50477c924..416def7e3ab5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java @@ -51,20 +51,24 @@ public void testComputeCostInvalidInputs() Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, 10, null).totalCost(), 0.0); Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, 0, config).totalCost(), 0.0); Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, -5, config).totalCost(), 0.0); - Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10, config).totalCost(), 0.0); + Assert.assertEquals( + Double.POSITIVE_INFINITY, + costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10, config).totalCost(), + 0.0 + ); } @Test public void testScaleDownHasHigherLagCostThanCurrent() { CostBasedAutoScalerConfig lagOnlyConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(1.0) - .idleWeight(0.0) - .defaultProcessingRate(100.0) - .build(); + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(1.0) + .idleWeight(0.0) + .defaultProcessingRate(100.0) + .build(); CostMetrics metrics = createMetrics(200000.0, 10, 200, 0.3); @@ -85,13 +89,13 @@ public void testLagCostWithMarginalModel() // With lag-only config (no idle penalty), the marginal model is used for scale-up: // lagRecoveryTime = aggregateLag / (taskCountDiff * rate) CostBasedAutoScalerConfig lagOnlyConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(1.0) - .idleWeight(0.0) - .defaultProcessingRate(1000.0) - .build(); + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(1.0) + .idleWeight(0.0) + .defaultProcessingRate(1000.0) + .build(); // aggregateLag = 100000 * 100 = 10,000,000 CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3); @@ -113,18 +117,13 @@ public void testLagCostWithMarginalModel() } @Test - public void testBalancedWeightsFavorStabilityOverScaleUp() + public void testBalancedWeightsFavorStabilityOverScaleUpOnSmallLag() { - // With the marginal lag model and corrected idle ratio, balanced weights - // favor stability because idle cost increases significantly with more tasks - // This is intentional behavior: the algorithm is conservative about scale-up. - CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3); - + // Validate idle ratio estimation and ensure balanced weights still favor stability. + CostMetrics metrics = createMetrics(100.0, 10, 100, 0.3); double costCurrent = costFunction.computeCost(metrics, 10, config).totalCost(); double costScaleUp = costFunction.computeCost(metrics, 20, config).totalCost(); - // With balanced weights (0.3 lag, 0.7 idle), the idle cost increase from - // scaling up dominates the lag recovery benefit Assert.assertTrue( "With balanced weights, staying at current count is cheaper than scale-up", costCurrent < costScaleUp @@ -193,13 +192,13 @@ public void testNoProcessingRateDeviationPenaltyIsSymmetric() // Use lag-only config to isolate the lag recovery time component CostBasedAutoScalerConfig lagOnlyConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(1.0) - .idleWeight(0.0) - .defaultProcessingRate(1000.0) - .build(); + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(1.0) + .idleWeight(0.0) + .defaultProcessingRate(1000.0) + .build(); double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount + 5, lagOnlyConfig).totalCost(); double costDown5 = costFunction.computeCost(metricsNoRate, currentTaskCount - 5, lagOnlyConfig).totalCost(); @@ -218,13 +217,13 @@ public void testIdleCostMonotonicWithTaskCount() // Test that idle cost increases monotonically with task count. // With fixed load, adding more tasks means each task has less work, so idle increases. CostBasedAutoScalerConfig idleOnlyConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(0.0) - .idleWeight(1.0) - .defaultProcessingRate(1000.0) - .build(); + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.0) + .idleWeight(1.0) + .defaultProcessingRate(1000.0) + .build(); // Current: 10 tasks with 40% idle (60% busy) CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4); @@ -244,13 +243,13 @@ public void testIdleCostMonotonicWithTaskCount() public void testIdleRatioClampingAtBoundaries() { CostBasedAutoScalerConfig idleOnlyConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(0.0) - .idleWeight(1.0) - .defaultProcessingRate(1000.0) - .build(); + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.0) + .idleWeight(1.0) + .defaultProcessingRate(1000.0) + .build(); // Extreme scale-down: 10 tasks → 2 tasks with 40% idle // busyFraction = 0.6, taskRatio = 0.2 @@ -275,13 +274,13 @@ public void testIdleRatioClampingAtBoundaries() public void testIdleRatioWithMissingData() { CostBasedAutoScalerConfig idleOnlyConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(0.0) - .idleWeight(1.0) - .defaultProcessingRate(1000.0) - .build(); + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.0) + .idleWeight(1.0) + .defaultProcessingRate(1000.0) + .build(); // Negative idle ratio indicates missing data → should default to 0.5 CostMetrics missingIdleData = createMetrics(0.0, 10, 100, -1.0); @@ -296,7 +295,38 @@ public void testIdleRatioWithMissingData() Assert.assertEquals("Cost at 20 tasks with missing idle data", 20 * 3600 * 0.5, cost20, 0.0001); } - private CostMetrics createMetrics(double avgPartitionLag, int currentTaskCount, int partitionCount, double pollIdleRatio) + @Test + public void testLagAmplificationReducesIdleUnderHighLag() + { + CostBasedAutoScalerConfig idleOnlyConfig = CostBasedAutoScalerConfig.builder() + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .defaultProcessingRate(1000.0) + .build(); + + int currentTaskCount = 3; + int proposedTaskCount = 8; + int partitionCount = 30; + double pollIdleRatio = 0.1; + + CostMetrics lowLag = createMetrics(40_000.0, currentTaskCount, partitionCount, pollIdleRatio); + CostMetrics highLag = createMetrics(500_000.0, currentTaskCount, partitionCount, pollIdleRatio); + + double lowLagCost = costFunction.computeCost(lowLag, proposedTaskCount, idleOnlyConfig).totalCost(); + double highLagCost = costFunction.computeCost(highLag, proposedTaskCount, idleOnlyConfig).totalCost(); + Assert.assertTrue( + "Higher lag should reduce predicted idle more aggressively", + lowLagCost > highLagCost + ); + } + + private CostMetrics createMetrics( + double avgPartitionLag, + int currentTaskCount, + int partitionCount, + double pollIdleRatio + ) { return new CostMetrics( avgPartitionLag, From 632e2a72967230311d67478685fd53f3d3487b45 Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Wed, 21 Jan 2026 16:55:46 +0200 Subject: [PATCH 2/4] Actually enable autoscaler task rollover --- .../supervisor/SeekableStreamSupervisor.java | 1 + .../autoscaler/AutoScalerConfig.java | 1 + .../autoscaler/CostBasedAutoScaler.java | 3 +-- .../autoscaler/CostBasedAutoScalerConfig.java | 8 ++++++- .../autoscaler/LagBasedAutoScalerConfig.java | 6 +++++ ...SupervisorScaleDuringTaskRolloverTest.java | 24 ++++++++----------- 6 files changed, 26 insertions(+), 17 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index c5d3ffe873d5..cbc7f52f13ff 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3461,6 +3461,7 @@ void maybeScaleDuringTaskRollover() if (rolloverTaskCount > 0) { log.info("Autoscaler recommends scaling down to [%d] tasks during rollover", rolloverTaskCount); changeTaskCountInIOConfig(rolloverTaskCount); + autoScalerConfig.setTaskCountStart(rolloverTaskCount); // Here force reset the supervisor state to be re-calculated on the next iteration of runInternal() call. // This seems the best way to inject task amount recalculation during the rollover. clearAllocationInfo(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java index 6d3b2a0daa02..35686bb2b41d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java @@ -41,6 +41,7 @@ public interface AutoScalerConfig int getTaskCountMax(); int getTaskCountMin(); Integer getTaskCountStart(); + void setTaskCountStart(int taskCountStart); Double getStopTaskCountRatio(); SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java index d3f0c16009bf..17a4cafbc81f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java @@ -181,12 +181,11 @@ int computeOptimalTaskCount(CostMetrics metrics) return -1; } - final int actualTaskCountMax = Math.min(config.getTaskCountMax(), partitionCount); final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts( partitionCount, currentTaskCount, (long) metrics.getAggregateLag(), - actualTaskCountMax + config.getTaskCountMax() ); if (validTaskCounts.length == 0) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java index d1f682f4f56f..ed2c9fedeb0c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java @@ -52,7 +52,7 @@ public class CostBasedAutoScalerConfig implements AutoScalerConfig private final boolean enableTaskAutoScaler; private final int taskCountMax; private final int taskCountMin; - private final Integer taskCountStart; + private Integer taskCountStart; private final long minTriggerScaleActionFrequencyMillis; private final Double stopTaskCountRatio; private final long scaleActionPeriodMillis; @@ -157,6 +157,12 @@ public Integer getTaskCountStart() return taskCountStart; } + @Override + public void setTaskCountStart(int taskCountStart) + { + this.taskCountStart = taskCountStart; + } + @Override @JsonProperty public long getMinTriggerScaleActionFrequencyMillis() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java index ad036dd0e10a..4ceccb36c503 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java @@ -183,6 +183,12 @@ public Integer getTaskCountStart() return taskCountStart; } + @Override + public void setTaskCountStart(int taskCountStart) + { + this.taskCountStart = taskCountStart; + } + @Override public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java index 3b9ad0162441..45b168b76e26 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java @@ -61,11 +61,7 @@ public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale() supervisor.maybeScaleDuringTaskRollover(); // Then - Assert.assertEquals( - "Task count should not change when taskAutoScaler is null", - beforeTaskCount, - (int) supervisor.getIoConfig().getTaskCount() - ); + Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig()); } @Test @@ -88,10 +84,11 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotSc supervisor.maybeScaleDuringTaskRollover(); // Then + Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig()); Assert.assertEquals( "Task count should not change when rolloverTaskCount <= 0", beforeTaskCount, - (int) supervisor.getIoConfig().getTaskCount() + (int) supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart() ); } @@ -111,16 +108,15 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScal supervisor.start(); supervisor.createAutoscaler(spec); - Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount()); - // When supervisor.maybeScaleDuringTaskRollover(); // Then + Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig()); Assert.assertEquals( "Task count should be updated to " + targetTaskCount + " when rolloverTaskCount > 0", targetTaskCount, - (int) supervisor.getIoConfig().getTaskCount() + (int) supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart() ); } @@ -144,10 +140,11 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale() supervisor.maybeScaleDuringTaskRollover(); // Then + Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig()); Assert.assertEquals( "Task count should not change when rolloverTaskCount is 0", beforeTaskCount, - (int) supervisor.getIoConfig().getTaskCount() + (int) supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart() ); } @@ -201,12 +198,11 @@ public int computeTaskCountForRollover() private static CostBasedAutoScalerConfig getCostBasedAutoScalerConfig() { return CostBasedAutoScalerConfig.builder() + .enableTaskAutoScaler(true) .taskCountMax(100) .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(0.25) - .idleWeight(0.75) - .scaleActionPeriodMillis(100) + .taskCountStart(1) + .scaleActionPeriodMillis(60000) .build(); } From 41ff87aec0ad44c94d0c03e9460a5065f10cd249 Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Wed, 21 Jan 2026 17:26:01 +0200 Subject: [PATCH 3/4] Checkstyle --- .../supervisor/autoscaler/CostBasedAutoScaler.java | 5 +---- .../supervisor/autoscaler/WeightedCostFunction.java | 10 ++++------ 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java index 17a4cafbc81f..82fe80c0194d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java @@ -105,10 +105,7 @@ public CostBasedAutoScaler( public void start() { autoscalerExecutor.scheduleAtFixedRate( - supervisor.buildDynamicAllocationTask( - this::computeTaskCountForScaleAction, () -> { - }, emitter - ), + supervisor.buildDynamicAllocationTask(this::computeTaskCountForScaleAction, () -> {}, emitter), config.getScaleActionPeriodMillis(), config.getScaleActionPeriodMillis(), TimeUnit.MILLISECONDS diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java index 4cc01a5e070a..2f3aea64c666 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java @@ -21,8 +21,6 @@ import org.apache.druid.java.util.common.logger.Logger; -import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.LAG_ACTIVATION_THRESHOLD; - /** * Weighted cost function using compute time as the core metric. * Costs represent actual time in seconds, making them intuitive and debuggable. @@ -34,8 +32,8 @@ public class WeightedCostFunction private static final double HIHG_LAG_SCALE_FACTOR = 100_000.0; private static final double LAG_AMPLIFICATION_MAX_MULTIPLIER = 2.0; private static final long LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION = 500_000L; - private static final double RAMP_DENOMINATOR = LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION - - (double) LAG_ACTIVATION_THRESHOLD; + private static final double RAMP_DENOMINATOR = + LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION - (double) CostBasedAutoScaler.LAG_ACTIVATION_THRESHOLD; /** * Ideal idle ratio range boundaries. @@ -152,8 +150,8 @@ private double estimateIdleRatio(CostMetrics metrics, int taskCount) if (partitionCount > 0) { final double lagPerPartition = metrics.getAggregateLag() / partitionCount; // Lag-amplified idle decay - if (lagPerPartition >= LAG_ACTIVATION_THRESHOLD) { - double ramp = Math.max(0.0, (lagPerPartition - LAG_ACTIVATION_THRESHOLD) / RAMP_DENOMINATOR); + if (lagPerPartition >= CostBasedAutoScaler.LAG_ACTIVATION_THRESHOLD) { + double ramp = Math.max(0.0, (lagPerPartition - CostBasedAutoScaler.LAG_ACTIVATION_THRESHOLD) / RAMP_DENOMINATOR); ramp = Math.min(1.0, ramp); final double multiplier = 1.0 + ramp * (LAG_AMPLIFICATION_MAX_MULTIPLIER - 1.0); From caf6aa20375c5178c00d16edbf7e059f86568bed Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Sat, 24 Jan 2026 14:01:54 +0200 Subject: [PATCH 4/4] Apply review comments --- .../supervisor/SeekableStreamSupervisor.java | 15 +++++-- .../SeekableStreamSupervisorIOConfig.java | 2 +- .../autoscaler/AutoScalerConfig.java | 1 - .../autoscaler/CostBasedAutoScaler.java | 42 +++++++++++------- .../autoscaler/CostBasedAutoScalerConfig.java | 6 --- .../autoscaler/LagBasedAutoScalerConfig.java | 6 --- .../autoscaler/WeightedCostFunction.java | 43 +++++++++---------- .../SeekableStreamSupervisorIOConfigTest.java | 2 +- ...SupervisorScaleDuringTaskRolloverTest.java | 6 +-- .../SeekableStreamSupervisorSpecTest.java | 4 +- .../autoscaler/CostBasedAutoScalerTest.java | 4 +- 11 files changed, 68 insertions(+), 63 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index cbc7f52f13ff..40049967b2ba 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3403,7 +3403,6 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException group.completionTimeout = DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout()); pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group); - boolean endOffsetsAreInvalid = false; for (Entry entry : endOffsets.entrySet()) { if (entry.getValue().equals(getEndOfPartitionMarker())) { @@ -3453,18 +3452,26 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException * This method is invoked to determine whether a task count adjustment is needed * during a task rollover based on the recommendations from the task auto-scaler. */ + @VisibleForTesting void maybeScaleDuringTaskRollover() { if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) { int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover(); - if (rolloverTaskCount > 0) { + if (rolloverTaskCount > 0 && rolloverTaskCount != getIoConfig().getTaskCount()) { log.info("Autoscaler recommends scaling down to [%d] tasks during rollover", rolloverTaskCount); changeTaskCountInIOConfig(rolloverTaskCount); - autoScalerConfig.setTaskCountStart(rolloverTaskCount); // Here force reset the supervisor state to be re-calculated on the next iteration of runInternal() call. // This seems the best way to inject task amount recalculation during the rollover. clearAllocationInfo(); + + ServiceMetricEvent.Builder event = ServiceMetricEvent + .builder() + .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()); + + emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, rolloverTaskCount)); } } } @@ -4049,7 +4056,7 @@ private Map> generate builder.put(partitionId, makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForNonFirstSequence())); } } else { - // if we don't have a startingOffset (first run or we had some previous failures and reset the sequences) then + // if we don't have a startingOffset (first run, or we had some previous failures and reset the sequences) then // get the sequence from metadata storage (if available) or Kafka/Kinesis (otherwise) OrderedSequenceNumber offsetFromStorage = getOffsetFromStorageForPartition( partitionId, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java index 8a7fcad15113..0de06a63949f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java @@ -86,7 +86,7 @@ public SeekableStreamSupervisorIOConfig( // Could be null this.autoScalerConfig = autoScalerConfig; this.autoScalerEnabled = autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler(); - // if autoscaler is enabled then taskCount will be ignored here and initial taskCount will equal to taskCountStart/taskCountMin + // if autoscaler is enabled, then taskCount will be ignored here and initial taskCount will equal to taskCountStart/taskCountMin if (autoScalerEnabled) { final Integer startTaskCount = autoScalerConfig.getTaskCountStart(); this.taskCount = startTaskCount != null ? startTaskCount : autoScalerConfig.getTaskCountMin(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java index 35686bb2b41d..6d3b2a0daa02 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java @@ -41,7 +41,6 @@ public interface AutoScalerConfig int getTaskCountMax(); int getTaskCountMin(); Integer getTaskCountStart(); - void setTaskCountStart(int taskCountStart); Double getStopTaskCountRatio(); SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java index ffe26f0b05fb..350e2284359a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java @@ -19,6 +19,8 @@ package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; +import it.unimi.dsi.fastutil.ints.IntArraySet; +import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; @@ -33,9 +35,7 @@ import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.incremental.RowIngestionMeters; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -57,10 +57,24 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2; private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2; + /** + * Defines the step size used for evaluating lag when computing scaling actions. + * This constant helps control the granularity of lag considerations in scaling decisions, + * ensuring smoother transitions between scaled states and avoiding abrupt changes in task counts. + */ private static final int LAG_STEP = 100_000; + /** + * This parameter fine-tunes autoscaling behavior by adding extra flexibility + * when calculating maximum allowable partitions per task in response to lag, + * which must be processed as fast, as possible. + * It acts as a foundational factor that balances the responsiveness and stability of autoscaling. + */ private static final int BASE_RAW_EXTRA = 5; - static final int LAG_ACTIVATION_THRESHOLD = 50_000; - static final int LAG_AGGRESSIVE_THRESHOLD = 100_000; + // Base PPT lag threshold allowing to activate a burst scaleup to eliminate high lag. + static final int EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD = 50_000; + // Extra PPT lag threshold allowing activation of even more aggressive scaleup to eliminate high lag, + // also enabling lag-amplified idle calculation decay in the cost function (to reduce idle weight). + static final int AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD = 100_000; public static final String LAG_COST_METRIC = "task/autoScaler/costBased/lagCost"; public static final String IDLE_COST_METRIC = "task/autoScaler/costBased/idleCost"; @@ -165,9 +179,7 @@ public CostBasedAutoScalerConfig getConfig() * Returns -1 (no scaling needed) in the following cases: *

* * @return optimal task count for scale-up, or -1 if no scaling action needed @@ -253,11 +265,11 @@ static int[] computeValidTaskCounts( int taskCountMax ) { - if (partitionCount <= 0 || currentTaskCount <= 0 || taskCountMax <= 0) { + if (partitionCount <= 0 || currentTaskCount <= 0) { return new int[]{}; } - Set result = new HashSet<>(); + IntSet result = new IntArraySet(); final int currentPartitionsPerTask = partitionCount / currentTaskCount; final int extraIncrease = computeExtraMaxPartitionsPerTaskIncrease( aggregateLag, @@ -280,12 +292,12 @@ static int[] computeValidTaskCounts( result.add(taskCount); } } - return result.stream().mapToInt(Integer::intValue).sorted().toArray(); + return result.toIntArray(); } /** - * Computes extra allowed increase in partitions-per-task in scenarios when the average - * per-partition lag is relatively high. + * Computes extra allowed increase in partitions-per-task in scenarios when the average per-partition lag + * is above the configured threshold. By default, it is {@code EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD}. * Generally, one of the autoscaler priorities is to keep the lag as close to zero as possible. */ static int computeExtraMaxPartitionsPerTaskIncrease( @@ -300,13 +312,13 @@ static int computeExtraMaxPartitionsPerTaskIncrease( } final double lagPerPartition = aggregateLag / partitionCount; - if (lagPerPartition < LAG_ACTIVATION_THRESHOLD) { + if (lagPerPartition < EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) { return 0; } int rawExtra = BASE_RAW_EXTRA; - if (lagPerPartition > LAG_AGGRESSIVE_THRESHOLD) { - rawExtra += (int) ((lagPerPartition - LAG_AGGRESSIVE_THRESHOLD) / LAG_STEP); + if (lagPerPartition > AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD) { + rawExtra += (int) ((lagPerPartition - AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD) / LAG_STEP); } final double headroomRatio = Math.max(0.0, 1.0 - (double) currentTaskCount / taskCountMax); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java index ed2c9fedeb0c..aba26ba25b52 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java @@ -157,12 +157,6 @@ public Integer getTaskCountStart() return taskCountStart; } - @Override - public void setTaskCountStart(int taskCountStart) - { - this.taskCountStart = taskCountStart; - } - @Override @JsonProperty public long getMinTriggerScaleActionFrequencyMillis() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java index 4ceccb36c503..ad036dd0e10a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java @@ -183,12 +183,6 @@ public Integer getTaskCountStart() return taskCountStart; } - @Override - public void setTaskCountStart(int taskCountStart) - { - this.taskCountStart = taskCountStart; - } - @Override public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter) { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java index 2f3aea64c666..8a3759556910 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java @@ -29,28 +29,25 @@ public class WeightedCostFunction { private static final Logger log = new Logger(WeightedCostFunction.class); - private static final double HIHG_LAG_SCALE_FACTOR = 100_000.0; - private static final double LAG_AMPLIFICATION_MAX_MULTIPLIER = 2.0; - private static final long LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION = 500_000L; - private static final double RAMP_DENOMINATOR = - LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION - (double) CostBasedAutoScaler.LAG_ACTIVATION_THRESHOLD; - /** - * Ideal idle ratio range boundaries. - * Idle ratio below MIN indicates tasks are overloaded (scale up needed). - * Idle ratio above MAX indicates tasks are underutilized (scale down needed). + * Represents the maximum multiplier factor applied to amplify lag-based costs in the cost computation process. + * This value is used to cap the lag amplification effect to prevent excessively high cost inflation + * caused by significant partition lag. + * It ensures that lag-related adjustments remain bounded within a reasonable range for stability of + * cost-based auto-scaling decisions. */ - static final double IDEAL_IDLE_MIN = 0.2; - static final double IDEAL_IDLE_MAX = 0.6; - + private static final double LAG_AMPLIFICATION_MAX_MULTIPLIER = 2.0; + private static final long LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION = 500_000L; /** - * Checks if the given idle ratio is within the ideal range [{@value #IDEAL_IDLE_MIN}, {@value #IDEAL_IDLE_MAX}]. - * When idle is in this range, optimal utilization has been achieved and no scaling is needed. + * It is used to calculate the denominator for the ramp formula in the cost + * computation logic. This value represents the difference between the maximum lag per + * partition (LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION) and the extra scaling activation + * lag threshold (CostBasedAutoScaler.EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD). + *

+ * It is impacting how the cost model evaluates scaling decisions during high-lag sceario. */ - public static boolean isIdleInIdealRange(double idleRatio) - { - return idleRatio >= IDEAL_IDLE_MIN && idleRatio <= IDEAL_IDLE_MAX; - } + private static final double RAMP_DENOMINATOR = + LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION - (double) CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD; /** * Computes cost for a given task count using compute time metrics. @@ -108,7 +105,6 @@ public CostResult computeCost(CostMetrics metrics, int proposedTaskCount, CostBa return new CostResult(cost, lagCost, weightedIdleCost); } - /** * Estimates the idle ratio for a proposed task count. * Includes lag-based adjustment to eliminate high lag and @@ -144,14 +140,17 @@ private double estimateIdleRatio(CostMetrics metrics, int taskCount) // Lag-based adjustment: more work per task → less idle final double lagPerTask = metrics.getAggregateLag() / taskCount; - double lagBusyFactor = 1.0 - Math.exp(-lagPerTask / HIHG_LAG_SCALE_FACTOR); + double lagBusyFactor = 1.0 - Math.exp(-lagPerTask / CostBasedAutoScaler.AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD); final int partitionCount = metrics.getPartitionCount(); if (partitionCount > 0) { final double lagPerPartition = metrics.getAggregateLag() / partitionCount; // Lag-amplified idle decay - if (lagPerPartition >= CostBasedAutoScaler.LAG_ACTIVATION_THRESHOLD) { - double ramp = Math.max(0.0, (lagPerPartition - CostBasedAutoScaler.LAG_ACTIVATION_THRESHOLD) / RAMP_DENOMINATOR); + if (lagPerPartition >= CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) { + double ramp = Math.max(0.0, + (lagPerPartition - CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) + / RAMP_DENOMINATOR + ); ramp = Math.min(1.0, ramp); final double multiplier = 1.0 + ramp * (LAG_AMPLIFICATION_MAX_MULTIPLIER - 1.0); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java index a97e971028c6..56b8904cc18d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java @@ -80,7 +80,7 @@ public void testAllDefaults() } @Test - public void testAutoScalerEnabledTrueAndFalse() + public void testAutoScalerEnabledPreservesTaskCountWhenNonNull() { LagAggregator lagAggregator = mock(LagAggregator.class); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java index 7657ddc7b412..b39e6097903b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java @@ -88,7 +88,7 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotSc Assert.assertEquals( "Task count should not change when rolloverTaskCount <= 0", beforeTaskCount, - (int) supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart() + (int) supervisor.getIoConfig().getTaskCount() ); } @@ -116,7 +116,7 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScal Assert.assertEquals( "Task count should be updated to " + targetTaskCount + " when rolloverTaskCount > 0", targetTaskCount, - (int) supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart() + (int) supervisor.getIoConfig().getTaskCount() ); } @@ -144,7 +144,7 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale() Assert.assertEquals( "Task count should not change when rolloverTaskCount is 0", beforeTaskCount, - (int) supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart() + (int) supervisor.getIoConfig().getTaskCount() ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java index 7df22c467a16..db805c21187a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java @@ -1457,7 +1457,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal "stream", new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, - taskCount, + null, // autoscaler uses taskCountStart/taskCountMin for the initial value new Period("PT1H"), new Period("P1D"), new Period("PT30S"), @@ -1478,7 +1478,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal "stream", new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, - taskCount, + null, // autoscaler uses taskCountStart/taskCountMin for the initial value new Period("PT1H"), new Period("P1D"), new Period("PT30S"), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java index b458ceac22cd..54299d915f68 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java @@ -38,7 +38,7 @@ import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME; import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIVE_MINUTE_NAME; import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.ONE_MINUTE_NAME; -import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.LAG_ACTIVATION_THRESHOLD; +import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD; import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeExtraMaxPartitionsPerTaskIncrease; import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeValidTaskCounts; import static org.mockito.Mockito.when; @@ -184,7 +184,7 @@ public void testComputeExtraPPTIncrease() { // No extra increase below the threshold Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L * 49_000L, 30, 3, 30)); - Assert.assertEquals(4, computeExtraMaxPartitionsPerTaskIncrease(30L * LAG_ACTIVATION_THRESHOLD, 30, 3, 30)); + Assert.assertEquals(4, computeExtraMaxPartitionsPerTaskIncrease(30L * EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD, 30, 3, 30)); // More aggressive increase when the lag is high Assert.assertEquals(6, computeExtraMaxPartitionsPerTaskIncrease(30L * 300_000L, 30, 3, 30));