diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index c5d3ffe873d5..40049967b2ba 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -3403,7 +3403,6 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException group.completionTimeout = DateTimes.nowUtc().plus(ioConfig.getCompletionTimeout()); pendingCompletionTaskGroups.computeIfAbsent(groupId, k -> new CopyOnWriteArrayList<>()).add(group); - boolean endOffsetsAreInvalid = false; for (Entry entry : endOffsets.entrySet()) { if (entry.getValue().equals(getEndOfPartitionMarker())) { @@ -3453,17 +3452,26 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException * This method is invoked to determine whether a task count adjustment is needed * during a task rollover based on the recommendations from the task auto-scaler. */ + @VisibleForTesting void maybeScaleDuringTaskRollover() { if (taskAutoScaler != null && activelyReadingTaskGroups.isEmpty()) { int rolloverTaskCount = taskAutoScaler.computeTaskCountForRollover(); - if (rolloverTaskCount > 0) { + if (rolloverTaskCount > 0 && rolloverTaskCount != getIoConfig().getTaskCount()) { log.info("Autoscaler recommends scaling down to [%d] tasks during rollover", rolloverTaskCount); changeTaskCountInIOConfig(rolloverTaskCount); // Here force reset the supervisor state to be re-calculated on the next iteration of runInternal() call. // This seems the best way to inject task amount recalculation during the rollover. clearAllocationInfo(); + + ServiceMetricEvent.Builder event = ServiceMetricEvent + .builder() + .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()); + + emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, rolloverTaskCount)); } } } @@ -4048,7 +4056,7 @@ private Map> generate builder.put(partitionId, makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForNonFirstSequence())); } } else { - // if we don't have a startingOffset (first run or we had some previous failures and reset the sequences) then + // if we don't have a startingOffset (first run, or we had some previous failures and reset the sequences) then // get the sequence from metadata storage (if available) or Kafka/Kinesis (otherwise) OrderedSequenceNumber offsetFromStorage = getOffsetFromStorageForPartition( partitionId, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java index 8a7fcad15113..0de06a63949f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java @@ -86,7 +86,7 @@ public SeekableStreamSupervisorIOConfig( // Could be null this.autoScalerConfig = autoScalerConfig; this.autoScalerEnabled = autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler(); - // if autoscaler is enabled then taskCount will be ignored here and initial taskCount will equal to taskCountStart/taskCountMin + // if autoscaler is enabled, then taskCount will be ignored here and initial taskCount will equal to taskCountStart/taskCountMin if (autoScalerEnabled) { final Integer startTaskCount = autoScalerConfig.getTaskCountStart(); this.taskCount = startTaskCount != null ? startTaskCount : autoScalerConfig.getTaskCountMin(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java index 598a56e41d14..350e2284359a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java @@ -19,6 +19,8 @@ package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; +import it.unimi.dsi.fastutil.ints.IntArraySet; +import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; @@ -33,9 +35,7 @@ import org.apache.druid.query.DruidMetrics; import org.apache.druid.segment.incremental.RowIngestionMeters; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -57,6 +57,25 @@ public class CostBasedAutoScaler implements SupervisorTaskAutoScaler private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2; private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2; + /** + * Defines the step size used for evaluating lag when computing scaling actions. + * This constant helps control the granularity of lag considerations in scaling decisions, + * ensuring smoother transitions between scaled states and avoiding abrupt changes in task counts. + */ + private static final int LAG_STEP = 100_000; + /** + * This parameter fine-tunes autoscaling behavior by adding extra flexibility + * when calculating maximum allowable partitions per task in response to lag, + * which must be processed as fast, as possible. + * It acts as a foundational factor that balances the responsiveness and stability of autoscaling. + */ + private static final int BASE_RAW_EXTRA = 5; + // Base PPT lag threshold allowing to activate a burst scaleup to eliminate high lag. + static final int EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD = 50_000; + // Extra PPT lag threshold allowing activation of even more aggressive scaleup to eliminate high lag, + // also enabling lag-amplified idle calculation decay in the cost function (to reduce idle weight). + static final int AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD = 100_000; + public static final String LAG_COST_METRIC = "task/autoScaler/costBased/lagCost"; public static final String IDLE_COST_METRIC = "task/autoScaler/costBased/idleCost"; public static final String OPTIMAL_TASK_COUNT_METRIC = "task/autoScaler/costBased/optimalTaskCount"; @@ -160,9 +179,7 @@ public CostBasedAutoScalerConfig getConfig() * Returns -1 (no scaling needed) in the following cases: *
    *
  • Metrics are not available
  • - *
  • Task count already optimal
  • - *
  • The current idle ratio is in the ideal range and lag considered low
  • - *
  • Optimal task count equals current task count
  • + *
  • Current task count already optimal
  • *
* * @return optimal task count for scale-up, or -1 if no scaling action needed @@ -180,7 +197,12 @@ int computeOptimalTaskCount(CostMetrics metrics) return -1; } - final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(partitionCount, currentTaskCount); + final int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts( + partitionCount, + currentTaskCount, + (long) metrics.getAggregateLag(), + config.getTaskCountMax() + ); if (validTaskCounts.length == 0) { log.warn("No valid task counts after applying constraints for supervisorId [%s]", supervisorId); @@ -230,22 +252,35 @@ int computeOptimalTaskCount(CostMetrics metrics) } /** - * Generates valid task counts based on partitions-per-task ratios. + * Generates valid task counts based on partitions-per-task ratios and lag-driven PPT relaxation. * This enables gradual scaling and avoids large jumps. * Limits the range of task counts considered to avoid excessive computation. * * @return sorted list of valid task counts within bounds */ - static int[] computeValidTaskCounts(int partitionCount, int currentTaskCount) + static int[] computeValidTaskCounts( + int partitionCount, + int currentTaskCount, + double aggregateLag, + int taskCountMax + ) { - if (partitionCount <= 0) { + if (partitionCount <= 0 || currentTaskCount <= 0) { return new int[]{}; } - Set result = new HashSet<>(); + IntSet result = new IntArraySet(); final int currentPartitionsPerTask = partitionCount / currentTaskCount; + final int extraIncrease = computeExtraMaxPartitionsPerTaskIncrease( + aggregateLag, + partitionCount, + currentTaskCount, + taskCountMax + ); + final int effectiveMaxIncrease = MAX_INCREASE_IN_PARTITIONS_PER_TASK + extraIncrease; + // Minimum partitions per task correspond to the maximum number of tasks (scale up) and vice versa. - final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - MAX_INCREASE_IN_PARTITIONS_PER_TASK); + final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - effectiveMaxIncrease); final int maxPartitionsPerTask = Math.min( partitionCount, currentPartitionsPerTask + MAX_DECREASE_IN_PARTITIONS_PER_TASK @@ -253,9 +288,41 @@ static int[] computeValidTaskCounts(int partitionCount, int currentTaskCount) for (int partitionsPerTask = maxPartitionsPerTask; partitionsPerTask >= minPartitionsPerTask; partitionsPerTask--) { final int taskCount = (partitionCount + partitionsPerTask - 1) / partitionsPerTask; - result.add(taskCount); + if (taskCount <= taskCountMax) { + result.add(taskCount); + } } - return result.stream().mapToInt(Integer::intValue).toArray(); + return result.toIntArray(); + } + + /** + * Computes extra allowed increase in partitions-per-task in scenarios when the average per-partition lag + * is above the configured threshold. By default, it is {@code EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD}. + * Generally, one of the autoscaler priorities is to keep the lag as close to zero as possible. + */ + static int computeExtraMaxPartitionsPerTaskIncrease( + double aggregateLag, + int partitionCount, + int currentTaskCount, + int taskCountMax + ) + { + if (partitionCount <= 0 || taskCountMax <= 0) { + return 0; + } + + final double lagPerPartition = aggregateLag / partitionCount; + if (lagPerPartition < EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) { + return 0; + } + + int rawExtra = BASE_RAW_EXTRA; + if (lagPerPartition > AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD) { + rawExtra += (int) ((lagPerPartition - AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD) / LAG_STEP); + } + + final double headroomRatio = Math.max(0.0, 1.0 - (double) currentTaskCount / taskCountMax); + return (int) (rawExtra * headroomRatio); } /** diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java index d1f682f4f56f..aba26ba25b52 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java @@ -52,7 +52,7 @@ public class CostBasedAutoScalerConfig implements AutoScalerConfig private final boolean enableTaskAutoScaler; private final int taskCountMax; private final int taskCountMin; - private final Integer taskCountStart; + private Integer taskCountStart; private final long minTriggerScaleActionFrequencyMillis; private final Double stopTaskCountRatio; private final long scaleActionPeriodMillis; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java index 0da733ef9e71..8a3759556910 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java @@ -29,24 +29,25 @@ public class WeightedCostFunction { private static final Logger log = new Logger(WeightedCostFunction.class); - - /** - * Ideal idle ratio range boundaries. - * Idle ratio below MIN indicates tasks are overloaded (scale up needed). - * Idle ratio above MAX indicates tasks are underutilized (scale down needed). + * Represents the maximum multiplier factor applied to amplify lag-based costs in the cost computation process. + * This value is used to cap the lag amplification effect to prevent excessively high cost inflation + * caused by significant partition lag. + * It ensures that lag-related adjustments remain bounded within a reasonable range for stability of + * cost-based auto-scaling decisions. */ - static final double IDEAL_IDLE_MIN = 0.2; - static final double IDEAL_IDLE_MAX = 0.6; - + private static final double LAG_AMPLIFICATION_MAX_MULTIPLIER = 2.0; + private static final long LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION = 500_000L; /** - * Checks if the given idle ratio is within the ideal range [{@value #IDEAL_IDLE_MIN}, {@value #IDEAL_IDLE_MAX}]. - * When idle is in this range, optimal utilization has been achieved and no scaling is needed. + * It is used to calculate the denominator for the ramp formula in the cost + * computation logic. This value represents the difference between the maximum lag per + * partition (LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION) and the extra scaling activation + * lag threshold (CostBasedAutoScaler.EXTRA_SCALING_ACTIVATION_LAG_THRESHOLD). + *

+ * It is impacting how the cost model evaluates scaling decisions during high-lag sceario. */ - public static boolean isIdleInIdealRange(double idleRatio) - { - return idleRatio >= IDEAL_IDLE_MIN && idleRatio <= IDEAL_IDLE_MAX; - } + private static final double RAMP_DENOMINATOR = + LAG_AMPLIFICATION_MAX_LAG_PER_PARTITION - (double) CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD; /** * Computes cost for a given task count using compute time metrics. @@ -104,12 +105,15 @@ public CostResult computeCost(CostMetrics metrics, int proposedTaskCount, CostBa return new CostResult(cost, lagCost, weightedIdleCost); } - /** - * Estimates the idle ratio for a given task count using a capacity-based linear model. + * Estimates the idle ratio for a proposed task count. + * Includes lag-based adjustment to eliminate high lag and + * reduce predicted idle when work exists. *

- * Formula: {@code predictedIdle = 1 - busyFraction / taskRatio} - * where {@code busyFraction = 1 - currentIdleRatio} and {@code taskRatio = targetTaskCount / currentTaskCount}. + * Formulas: + * {@code linearPrediction = max(0, 1 - busyFraction / taskRatio)} + * {@code lagBusyFactor = 1 - exp(-lagPerTask / LAG_SCALE_FACTOR)} + * {@code adjustedPrediction = linearPrediction × (1 - lagBusyFactor)} * * @param metrics current system metrics containing idle ratio and task count * @param taskCount target task count to estimate an idle ratio for @@ -119,7 +123,6 @@ private double estimateIdleRatio(CostMetrics metrics, int taskCount) { final double currentPollIdleRatio = metrics.getPollIdleRatio(); - // Handle edge cases if (currentPollIdleRatio < 0) { // No idle data available, assume moderate idle return 0.5; @@ -130,13 +133,33 @@ private double estimateIdleRatio(CostMetrics metrics, int taskCount) return currentPollIdleRatio; } - // Capacity-based model: idle ratio reflects spare capacity per task + // Linear prediction (capacity-based) - existing logic final double busyFraction = 1.0 - currentPollIdleRatio; final double taskRatio = (double) taskCount / currentTaskCount; - final double predictedIdleRatio = 1.0 - busyFraction / taskRatio; + final double linearPrediction = Math.max(0.0, Math.min(1.0, 1.0 - busyFraction / taskRatio)); + + // Lag-based adjustment: more work per task → less idle + final double lagPerTask = metrics.getAggregateLag() / taskCount; + double lagBusyFactor = 1.0 - Math.exp(-lagPerTask / CostBasedAutoScaler.AGGRESSIVE_SCALING_LAG_PER_PARTITION_THRESHOLD); + final int partitionCount = metrics.getPartitionCount(); + + if (partitionCount > 0) { + final double lagPerPartition = metrics.getAggregateLag() / partitionCount; + // Lag-amplified idle decay + if (lagPerPartition >= CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) { + double ramp = Math.max(0.0, + (lagPerPartition - CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD) + / RAMP_DENOMINATOR + ); + ramp = Math.min(1.0, ramp); + + final double multiplier = 1.0 + ramp * (LAG_AMPLIFICATION_MAX_MULTIPLIER - 1.0); + lagBusyFactor = Math.min(1.0, lagBusyFactor * multiplier); + } + } // Clamp to valid range [0, 1] - return Math.max(0.0, Math.min(1.0, predictedIdleRatio)); + return Math.max(0.0, linearPrediction * (1.0 - lagBusyFactor)); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java index a97e971028c6..56b8904cc18d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java @@ -80,7 +80,7 @@ public void testAllDefaults() } @Test - public void testAutoScalerEnabledTrueAndFalse() + public void testAutoScalerEnabledPreservesTaskCountWhenNonNull() { LagAggregator lagAggregator = mock(LagAggregator.class); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java index 2c4275a99ec3..b39e6097903b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorScaleDuringTaskRolloverTest.java @@ -61,11 +61,7 @@ public void test_maybeScaleDuringTaskRollover_noAutoScaler_doesNotScale() supervisor.maybeScaleDuringTaskRollover(); // Then - Assert.assertEquals( - "Task count should not change when taskAutoScaler is null", - beforeTaskCount, - (int) supervisor.getIoConfig().getTaskCount() - ); + Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig()); } @Test @@ -88,6 +84,7 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountNonPositive_doesNotSc supervisor.maybeScaleDuringTaskRollover(); // Then + Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig()); Assert.assertEquals( "Task count should not change when rolloverTaskCount <= 0", beforeTaskCount, @@ -111,12 +108,11 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountPositive_performsScal supervisor.start(); supervisor.createAutoscaler(spec); - Assert.assertEquals(DEFAULT_TASK_COUNT, (int) supervisor.getIoConfig().getTaskCount()); - // When supervisor.maybeScaleDuringTaskRollover(); // Then + Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig()); Assert.assertEquals( "Task count should be updated to " + targetTaskCount + " when rolloverTaskCount > 0", targetTaskCount, @@ -144,6 +140,7 @@ public void test_maybeScaleDuringTaskRollover_rolloverCountZero_doesNotScale() supervisor.maybeScaleDuringTaskRollover(); // Then + Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig()); Assert.assertEquals( "Task count should not change when rolloverTaskCount is 0", beforeTaskCount, @@ -201,13 +198,11 @@ public int computeTaskCountForRollover() private static CostBasedAutoScalerConfig getCostBasedAutoScalerConfig() { return CostBasedAutoScalerConfig.builder() + .enableTaskAutoScaler(true) .taskCountMax(100) .taskCountMin(1) - .taskCountStart(DEFAULT_TASK_COUNT) - .enableTaskAutoScaler(true) - .lagWeight(0.25) - .idleWeight(0.75) - .scaleActionPeriodMillis(100) + .taskCountStart(1) + .scaleActionPeriodMillis(60000) .build(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java index 7df22c467a16..db805c21187a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java @@ -1457,7 +1457,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal "stream", new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, - taskCount, + null, // autoscaler uses taskCountStart/taskCountMin for the initial value new Period("PT1H"), new Period("P1D"), new Period("PT30S"), @@ -1478,7 +1478,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal "stream", new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), 1, - taskCount, + null, // autoscaler uses taskCountStart/taskCountMin for the initial value new Period("PT1H"), new Period("P1D"), new Period("PT30S"), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java index caf5453f5217..54299d915f68 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerTest.java @@ -30,6 +30,7 @@ import org.junit.Test; import org.mockito.Mockito; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -37,11 +38,16 @@ import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIFTEEN_MINUTE_NAME; import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.FIVE_MINUTE_NAME; import static org.apache.druid.indexing.common.stats.DropwizardRowIngestionMeters.ONE_MINUTE_NAME; +import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD; +import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeExtraMaxPartitionsPerTaskIncrease; +import static org.apache.druid.indexing.seekablestream.supervisor.autoscaler.CostBasedAutoScaler.computeValidTaskCounts; import static org.mockito.Mockito.when; +@SuppressWarnings("SameParameterValue") public class CostBasedAutoScalerTest { private CostBasedAutoScaler autoScaler; + private CostBasedAutoScalerConfig config; @Before public void setUp() @@ -55,13 +61,13 @@ public void setUp() when(mockSupervisor.getIoConfig()).thenReturn(mockIoConfig); when(mockIoConfig.getStream()).thenReturn("test-stream"); - CostBasedAutoScalerConfig config = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(0.6) - .idleWeight(0.4) - .build(); + config = CostBasedAutoScalerConfig.builder() + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.6) + .idleWeight(0.4) + .build(); autoScaler = new CostBasedAutoScaler(mockSupervisor, config, mockSupervisorSpec, mockEmitter); } @@ -70,26 +76,122 @@ public void setUp() public void testComputeValidTaskCounts() { // For 100 partitions at 25 tasks (4 partitions/task), valid counts include 25 and 34 - int[] validTaskCounts = CostBasedAutoScaler.computeValidTaskCounts(100, 25); + int[] validTaskCounts = computeValidTaskCounts(100, 25, 0L, 100); Assert.assertTrue("Should contain the current task count", contains(validTaskCounts, 25)); Assert.assertTrue("Should contain the next scale-up option", contains(validTaskCounts, 34)); // Edge cases - Assert.assertEquals("Zero partitions return empty array", 0, CostBasedAutoScaler.computeValidTaskCounts(0, 10).length); - Assert.assertEquals("Negative partitions return empty array", 0, CostBasedAutoScaler.computeValidTaskCounts(-5, 10).length); + Assert.assertEquals(0, computeValidTaskCounts(0, 10, 0L, 100).length); + Assert.assertEquals(0, computeValidTaskCounts(-5, 10, 0L, 100).length); // Single partition - int[] singlePartition = CostBasedAutoScaler.computeValidTaskCounts(1, 1); + int[] singlePartition = computeValidTaskCounts(1, 1, 0L, 100); Assert.assertTrue("Single partition should have at least one valid count", singlePartition.length > 0); Assert.assertTrue("Single partition should contain 1", contains(singlePartition, 1)); // Current exceeds partitions - should still yield valid, deduplicated options - int[] exceedsPartitions = CostBasedAutoScaler.computeValidTaskCounts(2, 5); + int[] exceedsPartitions = computeValidTaskCounts(2, 5, 0L, 100); Assert.assertEquals(2, exceedsPartitions.length); Assert.assertTrue(contains(exceedsPartitions, 1)); Assert.assertTrue(contains(exceedsPartitions, 2)); } + @Test + public void testComputeValidTaskCountsLagExpansion() + { + int[] lowLagCounts = computeValidTaskCounts(30, 3, 0L, 30); + Assert.assertFalse("Low lag should not include max task count", contains(lowLagCounts, 30)); + Assert.assertTrue("Low lag should cap scale up around 4 tasks", contains(lowLagCounts, 4)); + + long highAggregateLag = 30L * 500_000L; + int[] highLagCounts = computeValidTaskCounts(30, 3, highAggregateLag, 30); + Assert.assertTrue("High lag should allow scaling to max tasks", contains(highLagCounts, 30)); + } + + @Test + public void testComputeValidTaskCountsRespectsTaskCountMax() + { + long highAggregateLag = 30L * 500_000L; + int[] cappedCounts = computeValidTaskCounts(30, 4, highAggregateLag, 3); + Assert.assertTrue("Should include taskCountMax when doable", contains(cappedCounts, 3)); + Assert.assertFalse("Should not exceed taskCountMax", contains(cappedCounts, 4)); + } + + @Test + public void testScalingExamplesTable() + { + int partitionCount = 30; + int taskCountMax = 30; + double pollIdleRatio = 0.1; + double avgProcessingRate = 10.0; + + class Example + { + final int currentTasks; + final long lagPerPartition; + final int expectedTasks; + + Example(int currentTasks, long lagPerPartition, int expectedTasks) + { + this.currentTasks = currentTasks; + this.lagPerPartition = lagPerPartition; + this.expectedTasks = expectedTasks; + } + } + + Example[] examples = new Example[]{ + new Example(3, 50_000L, 8), + new Example(3, 300_000L, 15), + new Example(3, 500_000L, 30), + new Example(10, 100_000L, 15), + new Example(10, 300_000L, 30), + new Example(10, 500_000L, 30), + new Example(20, 500_000L, 30), + new Example(25, 500_000L, 30) + }; + + for (Example example : examples) { + long aggregateLag = example.lagPerPartition * partitionCount; + int[] validCounts = computeValidTaskCounts(partitionCount, example.currentTasks, aggregateLag, taskCountMax); + Assert.assertTrue( + "Should include expected task count for current=" + example.currentTasks + ", lag=" + example.lagPerPartition, + contains(validCounts, example.expectedTasks) + ); + + CostMetrics metrics = createMetricsWithRate( + example.lagPerPartition, + example.currentTasks, + partitionCount, + pollIdleRatio, + avgProcessingRate + ); + int actualOptimal = autoScaler.computeOptimalTaskCount(metrics); + if (actualOptimal == -1) { + actualOptimal = example.currentTasks; + } + Assert.assertEquals( + "Optimal task count should match for current=" + example.currentTasks + + ", lag=" + example.lagPerPartition + + ", valid=" + Arrays.toString(validCounts), + example.expectedTasks, + actualOptimal + ); + } + } + + @Test + public void testComputeExtraPPTIncrease() + { + // No extra increase below the threshold + Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L * 49_000L, 30, 3, 30)); + Assert.assertEquals(4, computeExtraMaxPartitionsPerTaskIncrease(30L * EXTRA_SCALING_LAG_PER_PARTITION_THRESHOLD, 30, 3, 30)); + + // More aggressive increase when the lag is high + Assert.assertEquals(6, computeExtraMaxPartitionsPerTaskIncrease(30L * 300_000L, 30, 3, 30)); + // Zero when on max task count + Assert.assertEquals(0, computeExtraMaxPartitionsPerTaskIncrease(30L * 500_000L, 30, 30, 30)); + } + @Test public void testComputeOptimalTaskCountInvalidInputs() { @@ -110,7 +212,7 @@ public void testComputeOptimalTaskCountScaling() int highIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(10.0, 50, 100, 0.9)); Assert.assertTrue("Scale down scenario should return optimal <= current", highIdleResult <= 50); - // With low idle and balanced weights, algorithm should not scale up aggressively + // With low idle and balanced weights, the algorithm should not scale up aggressively int lowIdleResult = autoScaler.computeOptimalTaskCount(createMetrics(1000.0, 25, 100, 0.1)); Assert.assertTrue("With low idle and balanced weights, should not scale up aggressively", lowIdleResult <= 25); } @@ -194,10 +296,16 @@ public void testExtractMovingAverageIntervalFallback() { // 15-minute average is preferred Map> fifteenMin = new HashMap<>(); - fifteenMin.put("0", Collections.singletonMap("task-0", buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 1500.0))); + fifteenMin.put( + "0", + Collections.singletonMap( + "task-0", + buildTaskStatsWithMovingAverageForInterval(FIFTEEN_MINUTE_NAME, 1500.0) + ) + ); Assert.assertEquals(1500.0, CostBasedAutoScaler.extractMovingAverage(fifteenMin), 0.0001); - // 1-minute as final fallback + // 1-minute as a final fallback Map> oneMin = new HashMap<>(); oneMin.put("0", Collections.singletonMap("task-0", buildTaskStatsWithMovingAverageForInterval(ONE_MINUTE_NAME, 500.0))); Assert.assertEquals(500.0, CostBasedAutoScaler.extractMovingAverage(oneMin), 0.0001); @@ -328,7 +436,7 @@ public void testComputeTaskCountForRolloverReturnsMinusOneWhenLagStatsNull() @Test public void testComputeTaskCountForRolloverReturnsMinusOneWhenNoMetrics() { - // Tests the case where lastKnownMetrics is null (no computeTaskCountForScaleAction called) + // Tests the case where the lastKnownMetrics is null (no computeTaskCountForScaleAction called) SupervisorSpec spec = Mockito.mock(SupervisorSpec.class); SeekableStreamSupervisor supervisor = Mockito.mock(SeekableStreamSupervisor.class); ServiceEmitter emitter = Mockito.mock(ServiceEmitter.class); @@ -369,6 +477,24 @@ private CostMetrics createMetrics( ); } + private CostMetrics createMetricsWithRate( + double avgPartitionLag, + int currentTaskCount, + int partitionCount, + double pollIdleRatio, + double avgProcessingRate + ) + { + return new CostMetrics( + avgPartitionLag, + currentTaskCount, + partitionCount, + pollIdleRatio, + 3600, + avgProcessingRate + ); + } + private boolean contains(int[] array, int value) { for (int i : array) { @@ -434,7 +560,11 @@ private Map buildTaskStatsWithMultipleMovingAverages( return taskStats; } - private Map buildTaskStatsWithNullInterval(String nullInterval, String validInterval, double processedRate) + private Map buildTaskStatsWithNullInterval( + String nullInterval, + String validInterval, + double processedRate + ) { Map buildSegments = new HashMap<>(); buildSegments.put(nullInterval, null); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java index 90b50477c924..416def7e3ab5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunctionTest.java @@ -51,20 +51,24 @@ public void testComputeCostInvalidInputs() Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, 10, null).totalCost(), 0.0); Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, 0, config).totalCost(), 0.0); Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(validMetrics, -5, config).totalCost(), 0.0); - Assert.assertEquals(Double.POSITIVE_INFINITY, costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10, config).totalCost(), 0.0); + Assert.assertEquals( + Double.POSITIVE_INFINITY, + costFunction.computeCost(createMetrics(0.0, 10, 0, 0.3), 10, config).totalCost(), + 0.0 + ); } @Test public void testScaleDownHasHigherLagCostThanCurrent() { CostBasedAutoScalerConfig lagOnlyConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(1.0) - .idleWeight(0.0) - .defaultProcessingRate(100.0) - .build(); + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(1.0) + .idleWeight(0.0) + .defaultProcessingRate(100.0) + .build(); CostMetrics metrics = createMetrics(200000.0, 10, 200, 0.3); @@ -85,13 +89,13 @@ public void testLagCostWithMarginalModel() // With lag-only config (no idle penalty), the marginal model is used for scale-up: // lagRecoveryTime = aggregateLag / (taskCountDiff * rate) CostBasedAutoScalerConfig lagOnlyConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(1.0) - .idleWeight(0.0) - .defaultProcessingRate(1000.0) - .build(); + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(1.0) + .idleWeight(0.0) + .defaultProcessingRate(1000.0) + .build(); // aggregateLag = 100000 * 100 = 10,000,000 CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3); @@ -113,18 +117,13 @@ public void testLagCostWithMarginalModel() } @Test - public void testBalancedWeightsFavorStabilityOverScaleUp() + public void testBalancedWeightsFavorStabilityOverScaleUpOnSmallLag() { - // With the marginal lag model and corrected idle ratio, balanced weights - // favor stability because idle cost increases significantly with more tasks - // This is intentional behavior: the algorithm is conservative about scale-up. - CostMetrics metrics = createMetrics(100000.0, 10, 100, 0.3); - + // Validate idle ratio estimation and ensure balanced weights still favor stability. + CostMetrics metrics = createMetrics(100.0, 10, 100, 0.3); double costCurrent = costFunction.computeCost(metrics, 10, config).totalCost(); double costScaleUp = costFunction.computeCost(metrics, 20, config).totalCost(); - // With balanced weights (0.3 lag, 0.7 idle), the idle cost increase from - // scaling up dominates the lag recovery benefit Assert.assertTrue( "With balanced weights, staying at current count is cheaper than scale-up", costCurrent < costScaleUp @@ -193,13 +192,13 @@ public void testNoProcessingRateDeviationPenaltyIsSymmetric() // Use lag-only config to isolate the lag recovery time component CostBasedAutoScalerConfig lagOnlyConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(1.0) - .idleWeight(0.0) - .defaultProcessingRate(1000.0) - .build(); + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(1.0) + .idleWeight(0.0) + .defaultProcessingRate(1000.0) + .build(); double costUp5 = costFunction.computeCost(metricsNoRate, currentTaskCount + 5, lagOnlyConfig).totalCost(); double costDown5 = costFunction.computeCost(metricsNoRate, currentTaskCount - 5, lagOnlyConfig).totalCost(); @@ -218,13 +217,13 @@ public void testIdleCostMonotonicWithTaskCount() // Test that idle cost increases monotonically with task count. // With fixed load, adding more tasks means each task has less work, so idle increases. CostBasedAutoScalerConfig idleOnlyConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(0.0) - .idleWeight(1.0) - .defaultProcessingRate(1000.0) - .build(); + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.0) + .idleWeight(1.0) + .defaultProcessingRate(1000.0) + .build(); // Current: 10 tasks with 40% idle (60% busy) CostMetrics metrics = createMetrics(0.0, 10, 100, 0.4); @@ -244,13 +243,13 @@ public void testIdleCostMonotonicWithTaskCount() public void testIdleRatioClampingAtBoundaries() { CostBasedAutoScalerConfig idleOnlyConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(0.0) - .idleWeight(1.0) - .defaultProcessingRate(1000.0) - .build(); + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.0) + .idleWeight(1.0) + .defaultProcessingRate(1000.0) + .build(); // Extreme scale-down: 10 tasks → 2 tasks with 40% idle // busyFraction = 0.6, taskRatio = 0.2 @@ -275,13 +274,13 @@ public void testIdleRatioClampingAtBoundaries() public void testIdleRatioWithMissingData() { CostBasedAutoScalerConfig idleOnlyConfig = CostBasedAutoScalerConfig.builder() - .taskCountMax(100) - .taskCountMin(1) - .enableTaskAutoScaler(true) - .lagWeight(0.0) - .idleWeight(1.0) - .defaultProcessingRate(1000.0) - .build(); + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .lagWeight(0.0) + .idleWeight(1.0) + .defaultProcessingRate(1000.0) + .build(); // Negative idle ratio indicates missing data → should default to 0.5 CostMetrics missingIdleData = createMetrics(0.0, 10, 100, -1.0); @@ -296,7 +295,38 @@ public void testIdleRatioWithMissingData() Assert.assertEquals("Cost at 20 tasks with missing idle data", 20 * 3600 * 0.5, cost20, 0.0001); } - private CostMetrics createMetrics(double avgPartitionLag, int currentTaskCount, int partitionCount, double pollIdleRatio) + @Test + public void testLagAmplificationReducesIdleUnderHighLag() + { + CostBasedAutoScalerConfig idleOnlyConfig = CostBasedAutoScalerConfig.builder() + .taskCountMax(100) + .taskCountMin(1) + .enableTaskAutoScaler(true) + .defaultProcessingRate(1000.0) + .build(); + + int currentTaskCount = 3; + int proposedTaskCount = 8; + int partitionCount = 30; + double pollIdleRatio = 0.1; + + CostMetrics lowLag = createMetrics(40_000.0, currentTaskCount, partitionCount, pollIdleRatio); + CostMetrics highLag = createMetrics(500_000.0, currentTaskCount, partitionCount, pollIdleRatio); + + double lowLagCost = costFunction.computeCost(lowLag, proposedTaskCount, idleOnlyConfig).totalCost(); + double highLagCost = costFunction.computeCost(highLag, proposedTaskCount, idleOnlyConfig).totalCost(); + Assert.assertTrue( + "Higher lag should reduce predicted idle more aggressively", + lowLagCost > highLagCost + ); + } + + private CostMetrics createMetrics( + double avgPartitionLag, + int currentTaskCount, + int partitionCount, + double pollIdleRatio + ) { return new CostMetrics( avgPartitionLag,