From 314b20d810f6258bfbc74e428c3751e9dc87d103 Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Tue, 11 Nov 2025 12:16:39 +0200 Subject: [PATCH 1/8] Improve lag-based autoscaler config behaviour --- .../supervisor/SupervisorManager.java | 29 +- .../supervisor/SeekableStreamSupervisor.java | 88 +++-- .../SeekableStreamSupervisorSpec.java | 48 ++- .../autoscaler/AutoScalerConfig.java | 1 + .../autoscaler/LagBasedAutoScaler.java | 26 +- .../autoscaler/LagBasedAutoScalerConfig.java | 6 + .../SeekableStreamSupervisorSpecTest.java | 172 ++++++++-- .../SupervisorResourceConfigMergeTest.java | 323 ++++++++++++++++++ .../overlord/supervisor/SupervisorSpec.java | 12 + 9 files changed, 610 insertions(+), 95 deletions(-) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 168b956afd2d..7921a966f4e5 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -174,7 +174,11 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) synchronized (lock) { Preconditions.checkState(started, "SupervisorManager not started"); final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec); - possiblyStopAndRemoveSupervisorInternal(spec.getId(), false); + SupervisorSpec existingSpec = possiblyStopAndRemoveSupervisorInternal(spec.getId(), false); + System.out.println(existingSpec == null ? "null" : existingSpec.getId()); + if (existingSpec != null) { + spec.mergeSpecConfigs(existingSpec); + } createAndStartSupervisorInternal(spec, shouldUpdateSpec); return shouldUpdateSpec; } @@ -183,6 +187,7 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) /** * Checks whether the submitted SupervisorSpec differs from the current spec in SupervisorManager's supervisor list. * This is used in SupervisorResource specPost to determine whether the Supervisor needs to be restarted + * * @param spec The spec submitted * @return boolean - true only if the spec has been modified, false otherwise */ @@ -221,7 +226,7 @@ public boolean stopAndRemoveSupervisor(String id) synchronized (lock) { Preconditions.checkState(started, "SupervisorManager not started"); - return possiblyStopAndRemoveSupervisorInternal(id, true); + return possiblyStopAndRemoveSupervisorInternal(id, true) != null; } } @@ -299,7 +304,8 @@ public void stop() log.info("SupervisorManager stopped."); } - public List getSupervisorHistoryForId(String id, @Nullable Integer limit) throws IllegalArgumentException + public List getSupervisorHistoryForId(String id, @Nullable Integer limit) + throws IllegalArgumentException { return metadataSupervisorManager.getAllForId(id, limit); } @@ -429,13 +435,14 @@ public boolean registerUpgradedPendingSegmentOnSupervisor( * Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be * starting, stopping, suspending and resuming supervisors. * - * @return true if a supervisor was stopped, false if there was no supervisor with this id + * @return reference to existing supervisor, if exists and was stopped, null if there was no supervisor with this id */ - private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean writeTombstone) + @Nullable + private SupervisorSpec possiblyStopAndRemoveSupervisorInternal(String id, boolean writeTombstone) { Pair pair = supervisors.get(id); - if (pair == null) { - return false; + if (pair == null || pair.rhs == null || pair.lhs == null) { + return null; } if (writeTombstone) { @@ -447,13 +454,13 @@ private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean write pair.lhs.stop(true); supervisors.remove(id); - SupervisorTaskAutoScaler autoscler = autoscalers.get(id); - if (autoscler != null) { - autoscler.stop(); + SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); + if (autoscaler != null) { + autoscaler.stop(); autoscalers.remove(id); } - return true; + return pair.rhs; } /** diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index ef65d6d22caa..7d4ac46e5815 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -135,8 +135,8 @@ import java.util.stream.Stream; /** - * this class is the parent class of both the Kafka and Kinesis supervisor. All the main run loop - * logic are similar enough so they're grouped together into this class. + * This class is the parent class of both the Kafka and Kinesis supervisor. All the main run loop + * logic is similar enough, so they're grouped together into this class. *

* Supervisor responsible for managing the SeekableStreamIndexTasks (Kafka/Kinesis) for a single dataSource. At a high level, the class accepts a * {@link SeekableStreamSupervisorSpec} which includes the stream name (topic / stream) and configuration as well as an ingestion spec which will @@ -541,10 +541,20 @@ public String getType() /** * This method determines how to do scale actions based on collected lag points. - * If scale action is triggered : - * First of all, call gracefulShutdownInternal() which will change the state of current datasource ingest tasks from reading to publishing. - * Secondly, clear all the stateful data structures: activelyReadingTaskGroups, partitionGroups, partitionOffsets, pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in the next 'RunNotice'. - * Finally, change the taskCount in SeekableStreamSupervisorIOConfig and sync it to MetadataStorage. + * If scale action is triggered: + *

    + *
  • First, call gracefulShutdownInternal() which will change the state of current datasource ingest tasks from reading to publishing. + *
  • Secondly, clear all the stateful data structures: + *
      + *
    • activelyReadingTaskGroups, + *
    • partitionGroups, + *
    • partitionOffsets, + *
    • pendingCompletionTaskGroups, + *
    • partitionIds. + *
    + * These structures will be rebuiled in the next 'RunNotice'. + *
  • Finally, change the taskCount in SeekableStreamSupervisorIOConfig and sync it to MetadataStorage. + *
* After the taskCount is changed in SeekableStreamSupervisorIOConfig, next RunNotice will create scaled number of ingest tasks without resubmitting the supervisor. * * @param desiredActiveTaskCount desired taskCount computed from AutoScaler @@ -556,44 +566,52 @@ public String getType() private boolean changeTaskCount(int desiredActiveTaskCount) throws InterruptedException, ExecutionException { + if (autoScalerConfig == null) { + log.warn("autoScalerConfig is 'null' but dynamic allocation notice is submitted, how can it be ?"); + return false; + } int currentActiveTaskCount; Collection activeTaskGroups = activelyReadingTaskGroups.values(); currentActiveTaskCount = activeTaskGroups.size(); if (desiredActiveTaskCount < 0 || desiredActiveTaskCount == currentActiveTaskCount) { return false; - } else { - log.info( - "Starting scale action, current active task count is [%d] and desired task count is [%d] for supervisor[%s] for dataSource[%s].", - currentActiveTaskCount, - desiredActiveTaskCount, - supervisorId, - dataSource - ); - final Stopwatch scaleActionStopwatch = Stopwatch.createStarted(); - gracefulShutdownInternal(); - changeTaskCountInIOConfig(desiredActiveTaskCount); - clearAllocationInfo(); - emitter.emit(ServiceMetricEvent.builder() - .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()) - .setDimensionIfNotNull( - DruidMetrics.TAGS, - spec.getContextValue(DruidMetrics.TAGS) - ) - .setMetric( - AUTOSCALER_SCALING_TIME_METRIC, - scaleActionStopwatch.millisElapsed() - )); - log.info("Changed taskCount to [%s] for supervisor[%s] for dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource); - return true; } + log.info( + "Starting scale action, current active task count is [%d] and desired task count is [%d] for supervisor[%s] for dataSource[%s].", + currentActiveTaskCount, + desiredActiveTaskCount, + supervisorId, + dataSource + ); + final Stopwatch scaleActionStopwatch = Stopwatch.createStarted(); + gracefulShutdownInternal(); + changeTaskCountInAutoScalerConfig(desiredActiveTaskCount); + clearAllocationInfo(); + emitter.emit(ServiceMetricEvent.builder() + .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()) + .setDimensionIfNotNull( + DruidMetrics.TAGS, + spec.getContextValue(DruidMetrics.TAGS) + ) + .setMetric( + AUTOSCALER_SCALING_TIME_METRIC, + scaleActionStopwatch.millisElapsed() + )); + log.info("Changed taskCount to [%s] for supervisor[%s] for dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource); + return true; } - private void changeTaskCountInIOConfig(int desiredActiveTaskCount) + private void changeTaskCountInAutoScalerConfig(int desiredActiveTaskCount) { - ioConfig.setTaskCount(desiredActiveTaskCount); + // Sanity check. + if (autoScalerConfig == null) { + log.warn("autoScalerConfig is null but scale action is submitted, how can it be ?"); + return; + } + autoScalerConfig.setTaskCountStart(desiredActiveTaskCount); try { Optional supervisorManager = taskMaster.getSupervisorManager(); if (supervisorManager.isPresent()) { @@ -916,7 +934,7 @@ public String getType() private volatile boolean lifecycleStarted = false; private final ServiceEmitter emitter; - // snapshots latest sequences from stream to be verified in next run cycle of inactive stream check + // snapshots latest sequences from the stream to be verified in the next run cycle of inactive stream check private final Map previousSequencesFromStream = new HashMap<>(); private long lastActiveTimeMillis; private final IdleConfig idleConfig; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 967652673f7c..bf217b73a179 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -43,16 +43,18 @@ import org.apache.druid.segment.indexing.DataSchema; import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; import java.util.List; import java.util.Map; public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec { - protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE = "Update of the input source stream from [%s] to [%s] is not supported for a running supervisor." - + "%nTo perform the update safely, follow these steps:" - + "%n(1) Suspend this supervisor, reset its offsets and then terminate it. " - + "%n(2) Create a new supervisor with the new input source stream." - + "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too."; + protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE = + "Update of the input source stream from [%s] to [%s] is not supported for a running supervisor." + + "%nTo perform the update safely, follow these steps:" + + "%n(1) Suspend this supervisor, reset its offsets and then terminate it. " + + "%n(2) Create a new supervisor with the new input source stream." + + "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too."; private static SeekableStreamSupervisorIngestionSpec checkIngestionSchema( SeekableStreamSupervisorIngestionSpec ingestionSchema @@ -183,6 +185,7 @@ public DruidMonitorSchedulerConfig getMonitorSchedulerConfig() /** * An autoScaler instance will be returned depending on the autoScalerConfig. In case autoScalerConfig is null or autoScaler is disabled then NoopTaskAutoScaler will be returned. + * * @param supervisor * @return autoScaler */ @@ -190,7 +193,9 @@ public DruidMonitorSchedulerConfig getMonitorSchedulerConfig() public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor) { AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoScalerConfig(); - if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) { + if (autoScalerConfig != null + && autoScalerConfig.getEnableTaskAutoScaler() + && supervisor instanceof SeekableStreamSupervisor) { return autoScalerConfig.createAutoScaler(supervisor, this, emitter); } return new NoopTaskAutoScaler(); @@ -232,6 +237,7 @@ public boolean isSuspended() *
  • You cannot migrate between types of supervisors.
  • *
  • You cannot change the input source stream of a running supervisor.
  • * + * * @param proposedSpec the proposed supervisor spec * @throws DruidException if the proposed spec update is not allowed */ @@ -240,7 +246,9 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept { if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) { throw InvalidInput.exception( - "Cannot update supervisor spec from type[%s] to type[%s]", getClass().getSimpleName(), proposedSpec.getClass().getSimpleName() + "Cannot update supervisor spec from type[%s] to type[%s]", + getClass().getSimpleName(), + proposedSpec.getClass().getSimpleName() ); } SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec) proposedSpec; @@ -255,6 +263,32 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept } } + /** + * Writes the taskCountStart value from old config, if not specificed in new config. + * + * @param existingSpec the existing supervisor specification to merge configuration values from + */ + @Override + public void mergeSpecConfigs(@NotNull SupervisorSpec existingSpec) + { + AutoScalerConfig thisAutoScalerConfig = this.getIoConfig().getAutoScalerConfig(); + // Either if autoscaler is absent or taskCountStart is specified - just return. + if (thisAutoScalerConfig == null || thisAutoScalerConfig.getTaskCountStart() != null) { + return; + } + + // TODO[sasha]: use switch expression with pattern matching when we move to Java 21 as minimum requirement. + if (existingSpec instanceof SeekableStreamSupervisorSpec) { + // Note: for some reason, sources are available only for bytecode version 11. + //noinspection PatternVariableCanBeUsed + var spec = (SeekableStreamSupervisorSpec) existingSpec; + AutoScalerConfig autoScalerConfig = spec.getIoConfig().getAutoScalerConfig(); + if (autoScalerConfig != null && autoScalerConfig.getTaskCountStart() != null) { + thisAutoScalerConfig.setTaskCountStart(autoScalerConfig.getTaskCountStart()); + } + } + } + protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java index b9eb741bad1d..2f90883862b3 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java @@ -40,6 +40,7 @@ public interface AutoScalerConfig int getTaskCountMax(); int getTaskCountMin(); Integer getTaskCountStart(); + void setTaskCountStart(int taskCountStart); Double getStopTaskCountRatio(); SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index b1446c3d4c14..3faaca5b53ca 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -190,16 +190,22 @@ private Runnable computeAndCollectLag() }; } - /** - * This method determines whether to do scale actions based on collected lag points. - * Current algorithm of scale is simple: - * First of all, compute the proportion of lag points higher/lower than scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold. - * Secondly, compare scaleOutThreshold/scaleInThreshold with triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold. P.S. Scale out action has higher priority than scale in action. - * Finaly, if scaleOutThreshold/scaleInThreshold is higher than triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold, scale out/in action would be triggered. - * - * @param lags the lag metrics of Stream(Kafka/Kinesis) - * @return Integer. target number of tasksCount, -1 means skip scale action. - */ + /** + * This method determines whether to do scale actions based on collected lag points. + * The current algorithm of scale is straightforward: + *
      + *
    • First, compute the proportion of lag points higher/lower than scaleOutThreshold/scaleInThreshold, + * getting scaleOutThreshold/scaleInThreshold. + *
    • Secondly, compare scaleOutThreshold/scaleInThreshold with + * triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold. + *
      • P.S. Scale out action has a higher priority than scale in action.
      + *
    • Finally, if scaleOutThreshold/scaleInThreshold is higher than + * triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold, scale out/in action would be triggered. + *
    + * + * @param lags the lag metrics of Stream (Kafka/Kinesis) + * @return Integer, target number of tasksCount. -1 means skip scale action. + */ private int computeDesiredTaskCount(List lags) { // if supervisor is not suspended, ensure required tasks are running diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java index b4a9b0e8891c..ac942e178b2d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java @@ -183,6 +183,12 @@ public Integer getTaskCountStart() return taskCountStart; } + @Override + public void setTaskCountStart(int taskCountStart) + { + this.taskCountStart = taskCountStart; + } + @Override public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index e96e38873189..d71194494bb7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -566,14 +566,16 @@ public void testAutoScalerConfig() Exception e = null; try { - AutoScalerConfig autoScalerError = mapper.convertValue(ImmutableMap.of( - "enableTaskAutoScaler", - "true", - "taskCountMax", - "1", - "taskCountMin", - "4" - ), AutoScalerConfig.class); + AutoScalerConfig autoScalerError = mapper.convertValue( + ImmutableMap.of( + "enableTaskAutoScaler", + "true", + "taskCountMax", + "1", + "taskCountMin", + "4" + ), AutoScalerConfig.class + ); } catch (RuntimeException ex) { e = ex; @@ -685,16 +687,18 @@ public void testDefaultAutoScalerConfigCreatedWithDefault() EasyMock.replay(ingestionSchema); EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig()) - .andReturn(mapper.convertValue(ImmutableMap.of( - "lagCollectionIntervalMillis", - "1", - "enableTaskAutoScaler", - true, - "taskCountMax", - "4", - "taskCountMin", - "1" - ), AutoScalerConfig.class)) + .andReturn(mapper.convertValue( + ImmutableMap.of( + "lagCollectionIntervalMillis", + "1", + "enableTaskAutoScaler", + true, + "taskCountMax", + "4", + "taskCountMin", + "1" + ), AutoScalerConfig.class + )) .anyTimes(); EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes(); EasyMock.replay(seekableStreamSupervisorIOConfig); @@ -775,10 +779,12 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc supervisor.start(); autoScaler.start(); supervisor.runInternal(); - int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); + Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig()); + + int taskCountBeforeScaleOut = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountMin(); Assert.assertEquals(1, taskCountBeforeScaleOut); Thread.sleep(1000); - int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); + int taskCountAfterScaleOut = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart(); Assert.assertEquals(2, taskCountAfterScaleOut); Assert.assertTrue( dynamicActionEmitter @@ -1003,9 +1009,11 @@ public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() t supervisor.start(); autoScaler.start(); supervisor.runInternal(); - int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); - Assert.assertEquals(1, taskCountBeforeScaleOut); + + Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig()); + Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); Thread.sleep(1000); + int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScaleOut); emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 1); @@ -1049,14 +1057,16 @@ public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedExce // enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin. Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount()); - supervisor.getIoConfig().setTaskCount(2); + Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig()); + supervisor.start(); autoScaler.start(); supervisor.runInternal(); - int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); - Assert.assertEquals(2, taskCountBeforeScaleOut); + + Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); + Thread.sleep(1000); - int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); + int taskCountAfterScaleOut = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart(); Assert.assertEquals(1, taskCountAfterScaleOut); emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 1); @@ -1102,16 +1112,21 @@ public void testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanParti emitter ); - // enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin. - Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount()); - supervisor.getIoConfig().setTaskCount(2); + // enable autoscaler so that taskcount config will be ignored and the init value of taskCount will use taskCountMin. + Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig()); + Assert.assertEquals( + supervisor.getIoConfig().getAutoScalerConfig().getTaskCountMin(), + (int) supervisor.getIoConfig().getTaskCount() + ); + + // When supervisor.start(); autoScaler.start(); supervisor.runInternal(); - Assert.assertEquals(2, (int) supervisor.getIoConfig().getTaskCount()); Thread.sleep(2000); - Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount()); + // Then + Assert.assertEquals(10, (int) supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 1); @@ -1481,7 +1496,11 @@ public void test_validateSpecUpdateTo_ShortCircuits() DruidException.Category.INVALID_INPUT, "invalidInput" ).expectMessageIs( - StringUtils.format("Cannot update supervisor spec from type[%s] to type[%s]", proposedSpec.getClass().getSimpleName(), otherSpec.getClass().getSimpleName()) + StringUtils.format( + "Cannot update supervisor spec from type[%s] to type[%s]", + proposedSpec.getClass().getSimpleName(), + otherSpec.getClass().getSimpleName() + ) ) ); } @@ -1580,6 +1599,95 @@ public String getSource() originalSpec.validateSpecUpdateTo(proposedSpecSameSource); } + @Test + public void testMergeSpecConfigs() + { + mockIngestionSchema(); + + // Given + // Create existing spec with autoscaler config that has taskCountStart set to 5 + HashMap existingAutoScalerConfig = new HashMap<>(); + existingAutoScalerConfig.put("enableTaskAutoScaler", true); + existingAutoScalerConfig.put("taskCountMax", 8); + existingAutoScalerConfig.put("taskCountMin", 1); + existingAutoScalerConfig.put("taskCountStart", 5); + + SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.mock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(existingIoConfig.getAutoScalerConfig()) + .andReturn(mapper.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.replay(existingIoConfig); + + SeekableStreamSupervisorIngestionSpec existingIngestionSchema = EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes(); + EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(existingIngestionSchema.getTuningConfig()) + .andReturn(seekableStreamSupervisorTuningConfig) + .anyTimes(); + EasyMock.replay(existingIngestionSchema); + + TestSeekableStreamSupervisorSpec existingSpec = buildDefaultSupervisorSpecWithIngestionSchema( + "id123", + existingIngestionSchema + ); + + // Create new spec with autoscaler config that has taskCountStart not set (null) + HashMap newAutoScalerConfig = new HashMap<>(); + newAutoScalerConfig.put("enableTaskAutoScaler", true); + newAutoScalerConfig.put("taskCountMax", 8); + newAutoScalerConfig.put("taskCountMin", 1); + + SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.mock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(newIoConfig.getAutoScalerConfig()) + .andReturn(mapper.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.replay(newIoConfig); + + SeekableStreamSupervisorIngestionSpec newIngestionSchema = EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes(); + EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(newIngestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(newIngestionSchema); + + TestSeekableStreamSupervisorSpec newSpec = buildDefaultSupervisorSpecWithIngestionSchema( + "id124", + newIngestionSchema + ); + + + // Before merge, taskCountStart should be null + Assert.assertNull(newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart()); + + // When + newSpec.mergeSpecConfigs(existingSpec); + + // Then - taskCountStart should be copied from existing spec + Assert.assertEquals(Integer.valueOf(5), newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart()); + } + + private TestSeekableStreamSupervisorSpec buildDefaultSupervisorSpecWithIngestionSchema( + String id, + SeekableStreamSupervisorIngestionSpec ingestionSchema + ) + { + return new TestSeekableStreamSupervisorSpec( + ingestionSchema, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + id + ); + } + private void mockIngestionSchema() { EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java new file mode 100644 index 000000000000..285645ddff81 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java @@ -0,0 +1,323 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import org.apache.druid.audit.AuditManager; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; +import org.apache.druid.indexing.overlord.supervisor.SupervisorResource; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.incremental.RowIngestionMetersFactory; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.server.security.Access; +import org.apache.druid.server.security.Action; +import org.apache.druid.server.security.AuthConfig; +import org.apache.druid.server.security.AuthenticationResult; +import org.apache.druid.server.security.Authorizer; +import org.apache.druid.server.security.AuthorizerMapper; +import org.apache.druid.server.security.Resource; +import org.apache.druid.server.security.ResourceAction; +import org.apache.druid.server.security.ResourceType; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.EasyMockRunner; +import org.easymock.EasyMockSupport; +import org.easymock.Mock; +import org.jetbrains.annotations.Nullable; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +import javax.annotation.Nonnull; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.Response; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Set; + +@RunWith(EasyMockRunner.class) +public class SupervisorResourceConfigMergeTest extends EasyMockSupport +{ + private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper(); + @Mock + private TaskMaster taskMaster; + + @Mock + private SupervisorManager supervisorManager; + + @Mock + private HttpServletRequest request; + + @Mock + private AuthConfig authConfig; + + @Mock + private AuditManager auditManager; + + private SupervisorResource supervisorResource; + + @Before + public void setUp() + { + supervisorResource = new SupervisorResource( + taskMaster, + new AuthorizerMapper(null) + { + @Override + public Authorizer getAuthorizer(String name) + { + return (authenticationResult, resource, action) -> { + if (authenticationResult.getIdentity().equals("druid")) { + return Access.OK; + } else { + if (resource.getType().equals(ResourceType.DATASOURCE)) { + if (resource.getName().equals("datasource2")) { + return Access.deny("not authorized."); + } else { + return Access.OK; + } + } else if (resource.getType().equals(ResourceType.EXTERNAL)) { + if (resource.getName().equals("test")) { + return Access.deny("not authorized."); + } else { + return Access.OK; + } + } + return Access.OK; + } + }; + } + }, + OBJECT_MAPPER, + authConfig, + auditManager + ); + } + + @Test + public void testSpecPostWithTaskCountStartMerge() + { + // Create an existing spec with taskCountStart=5 in autoScalerConfig + HashMap existingAutoScalerConfig = new HashMap<>(); + existingAutoScalerConfig.put("enableTaskAutoScaler", true); + existingAutoScalerConfig.put("taskCountMax", 7); + existingAutoScalerConfig.put("taskCountMin", 1); + existingAutoScalerConfig.put("taskCountStart", 5); + + SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(existingIoConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(existingIoConfig.getStream()).andReturn("test-stream").anyTimes(); + EasyMock.replay(existingIoConfig); + + DataSchema existingDataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(existingDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(existingDataSchema); + + SeekableStreamSupervisorIngestionSpec existingIngestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes(); + EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(existingDataSchema).anyTimes(); + EasyMock.replay(existingIngestionSchema); + + TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec( + "my-id", + existingIngestionSchema + ); + + // Create a new spec WITHOUT taskCountStart in autoScalerConfig + HashMap newAutoScalerConfig = new HashMap<>(); + newAutoScalerConfig.put("enableTaskAutoScaler", true); + newAutoScalerConfig.put("taskCountMax", 8); + newAutoScalerConfig.put("taskCountMin", 1); + // Note: taskCountStart is NOT set + + SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(newIoConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(newIoConfig.getStream()).andReturn("test-stream").anyTimes(); + EasyMock.replay(newIoConfig); + + DataSchema newDataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(newDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(newDataSchema); + + SeekableStreamSupervisorIngestionSpec newIngestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes(); + EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(newDataSchema).anyTimes(); + EasyMock.replay(newIngestionSchema); + + TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec( + "my-id", + newIngestionSchema + ) + { + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + + // Set up mocks for SupervisorManager behavior + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + + // Mock shouldUpdateSupervisor to return true (spec is different) + Capture capturedExistingSpec = EasyMock.newCapture(); + EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(EasyMock.capture(capturedExistingSpec))) + .andAnswer(() -> { + SupervisorSpec arg = (SupervisorSpec) EasyMock.getCurrentArguments()[0]; + arg.mergeSpecConfigs(existingSpec); + return true; + }); + + // Mock getSupervisorSpec to return the existing spec (simulating an update scenario) + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) + .andReturn(Optional.of(existingSpec)) + .anyTimes(); + + setupMockRequest(); + setupMockRequestForAudit(); + + EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); + auditManager.doAudit(EasyMock.anyObject()); + EasyMock.expectLastCall().once(); + + replayAll(); + + // Before merge, taskCountStart should be null in new spec + Assert.assertNull(newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart()); + + // When + Response response = supervisorResource.specPost(newSpec, false, request); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); + + // Then + TestSeekableStreamSupervisorSpec capturedSpec = (TestSeekableStreamSupervisorSpec) capturedExistingSpec.getValue(); + + Assert.assertNotNull(capturedSpec.getIoConfig().getAutoScalerConfig()); + Assert.assertEquals( + Integer.valueOf(5), + newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart() + ); + } + + private void setupMockRequest() + { + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); + EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)) + .andReturn(new AuthenticationResult("druid", "druid", null, null)) + .atLeastOnce(); + request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); + EasyMock.expectLastCall().anyTimes(); + } + + private void setupMockRequestForAudit() + { + EasyMock.expect(request.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn("author").once(); + EasyMock.expect(request.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn("comment").once(); + + EasyMock.expect(request.getRemoteAddr()).andReturn("127.0.0.1").once(); + EasyMock.expect(request.getMethod()).andReturn("POST").once(); + EasyMock.expect(request.getRequestURI()).andReturn("supes").once(); + EasyMock.expect(request.getQueryString()).andReturn("a=b").once(); + } + + static class TestSeekableStreamSupervisorSpec extends SeekableStreamSupervisorSpec + { + public TestSeekableStreamSupervisorSpec( + @Nullable String id, + SeekableStreamSupervisorIngestionSpec ingestionSchema + ) + { + super( + id, + ingestionSchema, + null, + false, + EasyMock.createMock(TaskStorage.class), + EasyMock.createMock(TaskMaster.class), + EasyMock.createMock(IndexerMetadataStorageCoordinator.class), + EasyMock.createMock(SeekableStreamIndexTaskClientFactory.class), + OBJECT_MAPPER, + EasyMock.createMock(ServiceEmitter.class), + EasyMock.createMock(DruidMonitorSchedulerConfig.class), + EasyMock.createMock(RowIngestionMetersFactory.class), + EasyMock.createMock(SupervisorStateManagerConfig.class) + ); + } + + @Override + public Supervisor createSupervisor() + { + return null; + } + + @Override + public String getType() + { + return "test"; + } + + @Override + public String getSource() + { + return "test-stream"; + } + + @Override + protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend) + { + return null; + } + + @JsonIgnore + @Nonnull + @Override + public Set getInputSourceResources() throws UnsupportedOperationException + { + return Collections.singleton(new ResourceAction(new Resource("test", ResourceType.EXTERNAL), Action.READ)); + } + } + + +} diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index 9ff217d5404a..a3fedcac7046 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -27,6 +27,7 @@ import org.apache.druid.server.security.ResourceAction; import javax.annotation.Nonnull; +import javax.validation.constraints.NotNull; import java.util.List; import java.util.Set; @@ -113,4 +114,15 @@ default void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcep { // The default implementation does not do any validation checks. } + + /** + * Perform any merging of spec configurations needed after deserialization. + * + * @param existingSpec used spec to merge values from + * @throws DruidException if the spec update is not allowed + */ + default void mergeSpecConfigs(@NotNull SupervisorSpec existingSpec) + { + // No-op by default + } } From 8d39651189efa1feb68cb085ce8d4ff9c3532948 Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Thu, 13 Nov 2025 13:04:28 +0200 Subject: [PATCH 2/8] Remove sout from production code --- .../druid/indexing/overlord/supervisor/SupervisorManager.java | 1 - .../seekablestream/SupervisorResourceConfigMergeTest.java | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 7921a966f4e5..48fe5615f7fb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -175,7 +175,6 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) Preconditions.checkState(started, "SupervisorManager not started"); final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec); SupervisorSpec existingSpec = possiblyStopAndRemoveSupervisorInternal(spec.getId(), false); - System.out.println(existingSpec == null ? "null" : existingSpec.getId()); if (existingSpec != null) { spec.mergeSpecConfigs(existingSpec); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java index 285645ddff81..ba9ae67a5204 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java @@ -54,13 +54,13 @@ import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; import org.easymock.Mock; -import org.jetbrains.annotations.Nullable; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; import java.util.Collections; From b4f3c45edf8909dbe6df68662d29f8682fea0a5a Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Thu, 13 Nov 2025 16:57:17 +0200 Subject: [PATCH 3/8] Fix more falling tests --- .../kafka/supervisor/KafkaSupervisorTest.java | 4 ++-- .../kinesis/supervisor/KinesisSupervisorTest.java | 12 +++++------- .../SeekableStreamSupervisorSpecTest.java | 2 +- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index f6a026e6aa79..7f2f0ea67d60 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -389,13 +389,13 @@ public SeekableStreamIndexTaskClient build( supervisor.start(); int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount(); - Assert.assertEquals(1, taskCountBeforeScale); + Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); autoscaler.start(); supervisor.runInternal(); Thread.sleep(1000); verifyAll(); - int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); + int taskCountAfterScale = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart(); Assert.assertEquals(2, taskCountAfterScale); KafkaIndexTask task = captured.getValue(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 1fd50c135458..f217e0747052 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -355,14 +355,13 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception replayAll(); supervisor.start(); - int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount(); - Assert.assertEquals(1, taskCountBeforeScale); + Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); autoscaler.start(); supervisor.runInternal(); verifyAll(); - Thread.sleep(1 * 1000); - int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); + Thread.sleep(1000); + int taskCountAfterScale = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart(); Assert.assertEquals(2, taskCountAfterScale); autoscaler.stop(); } @@ -433,14 +432,13 @@ public void testNoInitialStateWithAutoScaleIn() throws Exception supervisor.getIoConfig().setTaskCount(2); supervisor.start(); - int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount(); - Assert.assertEquals(2, taskCountBeforeScale); + Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); autoscaler.start(); supervisor.runInternal(); verifyAll(); Thread.sleep(1 * 1000); - int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); + int taskCountAfterScale = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart(); Assert.assertEquals(1, taskCountAfterScale); autoscaler.stop(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index d71194494bb7..c8bd99526e55 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -1014,7 +1014,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() t Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); Thread.sleep(1000); - int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); + int taskCountAfterScaleOut = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart(); Assert.assertEquals(2, taskCountAfterScaleOut); emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 1); From aaf84c9c268fcee1ed7c4683203da9077c6e59c4 Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Fri, 14 Nov 2025 12:06:01 +0200 Subject: [PATCH 4/8] Checkstyle --- .../supervisor/SeekableStreamSupervisor.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 7d4ac46e5815..c441e7e8408a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -578,11 +578,11 @@ private boolean changeTaskCount(int desiredActiveTaskCount) return false; } log.info( - "Starting scale action, current active task count is [%d] and desired task count is [%d] for supervisor[%s] for dataSource[%s].", - currentActiveTaskCount, - desiredActiveTaskCount, - supervisorId, - dataSource + "Starting scale action, current active task count is [%d] and desired task count is [%d] for supervisor[%s] for dataSource[%s].", + currentActiveTaskCount, + desiredActiveTaskCount, + supervisorId, + dataSource ); final Stopwatch scaleActionStopwatch = Stopwatch.createStarted(); gracefulShutdownInternal(); @@ -1179,7 +1179,7 @@ public void stop(boolean stopGracefully) catch (Exception e) { stateManager.recordThrowableEvent(e); log.makeAlert(e, "Exception stopping [%s]", supervisorId) - .emit(); + .emit(); } } } @@ -1357,7 +1357,7 @@ public void tryInit() } initRetryCounter++; log.makeAlert(e, "Exception starting SeekableStreamSupervisor[%s]", supervisorId) - .emit(); + .emit(); throw new RuntimeException(e); } From fa86de53f3f10985b2eda877dd3db9c6cbb7a9cc Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Wed, 10 Dec 2025 11:14:51 +0200 Subject: [PATCH 5/8] Change a logic according to a new decision --- .../supervisor/SupervisorManager.java | 2 +- .../supervisor/SeekableStreamSupervisor.java | 6 +- .../SeekableStreamSupervisorSpec.java | 33 ++- .../autoscaler/AutoScalerConfig.java | 1 - .../autoscaler/LagBasedAutoScaler.java | 42 +-- .../autoscaler/LagBasedAutoScalerConfig.java | 10 +- .../SeekableStreamSupervisorSpecTest.java | 26 +- .../SupervisorResourceConfigMergeTest.java | 240 ++++++++++++++++-- .../overlord/supervisor/SupervisorSpec.java | 6 +- 9 files changed, 286 insertions(+), 80 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 48fe5615f7fb..21d2a6265011 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -176,7 +176,7 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec); SupervisorSpec existingSpec = possiblyStopAndRemoveSupervisorInternal(spec.getId(), false); if (existingSpec != null) { - spec.mergeSpecConfigs(existingSpec); + spec.merge(existingSpec); } createAndStartSupervisorInternal(spec, shouldUpdateSpec); return shouldUpdateSpec; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index f7569ecdab66..60dc829fb196 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -611,7 +611,7 @@ private void changeTaskCountInAutoScalerConfig(int desiredActiveTaskCount) log.warn("autoScalerConfig is null but scale action is submitted, how can it be ?"); return; } - autoScalerConfig.setTaskCountStart(desiredActiveTaskCount); + ioConfig.setTaskCount(desiredActiveTaskCount); try { Optional supervisorManager = taskMaster.getSupervisorManager(); if (supervisorManager.isPresent()) { @@ -1179,7 +1179,7 @@ public void stop(boolean stopGracefully) catch (Exception e) { stateManager.recordThrowableEvent(e); log.makeAlert(e, "Exception stopping [%s]", supervisorId) - .emit(); + .emit(); } } } @@ -1357,7 +1357,7 @@ public void tryInit() } initRetryCounter++; log.makeAlert(e, "Exception starting SeekableStreamSupervisor[%s]", supervisorId) - .emit(); + .emit(); throw new RuntimeException(e); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index bf217b73a179..f5e0f4c12a77 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -192,11 +192,9 @@ public DruidMonitorSchedulerConfig getMonitorSchedulerConfig() @Override public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor) { - AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoScalerConfig(); - if (autoScalerConfig != null - && autoScalerConfig.getEnableTaskAutoScaler() - && supervisor instanceof SeekableStreamSupervisor) { - return autoScalerConfig.createAutoScaler(supervisor, this, emitter); + AutoScalerConfig autoScalerCfg = ingestionSchema.getIOConfig().getAutoScalerConfig(); + if (autoScalerCfg != null && autoScalerCfg.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) { + return autoScalerCfg.createAutoScaler(supervisor, this, emitter); } return new NoopTaskAutoScaler(); } @@ -263,13 +261,8 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept } } - /** - * Writes the taskCountStart value from old config, if not specificed in new config. - * - * @param existingSpec the existing supervisor specification to merge configuration values from - */ @Override - public void mergeSpecConfigs(@NotNull SupervisorSpec existingSpec) + public void merge(@NotNull SupervisorSpec existingSpec) { AutoScalerConfig thisAutoScalerConfig = this.getIoConfig().getAutoScalerConfig(); // Either if autoscaler is absent or taskCountStart is specified - just return. @@ -277,15 +270,21 @@ public void mergeSpecConfigs(@NotNull SupervisorSpec existingSpec) return; } - // TODO[sasha]: use switch expression with pattern matching when we move to Java 21 as minimum requirement. + // Use a switch expression with pattern matching when we move to Java 21 as a minimum requirement. if (existingSpec instanceof SeekableStreamSupervisorSpec) { - // Note: for some reason, sources are available only for bytecode version 11. - //noinspection PatternVariableCanBeUsed - var spec = (SeekableStreamSupervisorSpec) existingSpec; + SeekableStreamSupervisorSpec spec = (SeekableStreamSupervisorSpec) existingSpec; AutoScalerConfig autoScalerConfig = spec.getIoConfig().getAutoScalerConfig(); - if (autoScalerConfig != null && autoScalerConfig.getTaskCountStart() != null) { - thisAutoScalerConfig.setTaskCountStart(autoScalerConfig.getTaskCountStart()); + if (autoScalerConfig == null) { + return; + } + // provided `taskCountStart` > provided `taskCount` > existing `taskCount` > provided `taskCountMin`. + int taskCount = thisAutoScalerConfig.getTaskCountMin(); + if (this.getIoConfig().getTaskCount() != null) { + taskCount = this.getIoConfig().getTaskCount(); + } else if (spec.getIoConfig().getTaskCount() != null) { + taskCount = spec.getIoConfig().getTaskCount(); } + this.getIoConfig().setTaskCount(taskCount); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java index 2f90883862b3..b9eb741bad1d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java @@ -40,7 +40,6 @@ public interface AutoScalerConfig int getTaskCountMax(); int getTaskCountMin(); Integer getTaskCountStart(); - void setTaskCountStart(int taskCountStart); Double getStopTaskCountRatio(); SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 3faaca5b53ca..142193ae63f6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -124,8 +124,10 @@ public void start() ); log.info( "LagBasedAutoScaler will collect lag every [%d] millis and will keep up to [%d] data points for the last [%d] millis for dataSource [%s]", - lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), lagMetricsQueue.maxSize(), - lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource + lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), + lagMetricsQueue.maxSize(), + lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), + dataSource ); } @@ -190,27 +192,27 @@ private Runnable computeAndCollectLag() }; } - /** - * This method determines whether to do scale actions based on collected lag points. - * The current algorithm of scale is straightforward: - *
      - *
    • First, compute the proportion of lag points higher/lower than scaleOutThreshold/scaleInThreshold, - * getting scaleOutThreshold/scaleInThreshold. - *
    • Secondly, compare scaleOutThreshold/scaleInThreshold with - * triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold. - *
      • P.S. Scale out action has a higher priority than scale in action.
      - *
    • Finally, if scaleOutThreshold/scaleInThreshold is higher than - * triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold, scale out/in action would be triggered. - *
    - * - * @param lags the lag metrics of Stream (Kafka/Kinesis) - * @return Integer, target number of tasksCount. -1 means skip scale action. - */ + /** + * This method determines whether to do scale actions based on collected lag points. + * The current algorithm of scale is straightforward: + *
      + *
    • First, compute the proportion of lag points higher/lower than {@code scaleOutThreshold/scaleInThreshold}, + * getting {@code scaleInThreshold/scaleOutThreshold},. + *
    • Secondly, compare {@code scaleInThreshold/scaleOutThreshold} with + * {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}. + *
      • P.S. Scale out action has a higher priority than scale in action.
      + *
    • Finally, if {@code scaleOutThreshold/scaleInThreshold}, is higher than + * {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}, scale out/in action would be triggered. + *
    + * + * @param lags the lag metrics of Stream (Kafka/Kinesis) + * @return Integer, target number of tasksCount. -1 means skip scale action. + */ private int computeDesiredTaskCount(List lags) { - // if supervisor is not suspended, ensure required tasks are running + // if the supervisor is not suspended, ensure required tasks are running // if suspended, ensure tasks have been requested to gracefully stop - log.debug("Computing desired task count for [%s], based on following lags : [%s]", dataSource, lags); + log.debug("Computing the desired task count for [%s], based on following lags : [%s]", dataSource, lags); int beyond = 0; int within = 0; int metricsCount = lags.size(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java index ac942e178b2d..ad036dd0e10a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java @@ -103,8 +103,8 @@ public LagBasedAutoScalerConfig( this.scaleInStep = scaleInStep != null ? scaleInStep : 1; this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2; - this.minTriggerScaleActionFrequencyMillis = minTriggerScaleActionFrequencyMillis - != null ? minTriggerScaleActionFrequencyMillis : 600000; + this.minTriggerScaleActionFrequencyMillis = + minTriggerScaleActionFrequencyMillis != null ? minTriggerScaleActionFrequencyMillis : 600000; Preconditions.checkArgument( stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 && stopTaskCountRatio <= 1.0), @@ -183,12 +183,6 @@ public Integer getTaskCountStart() return taskCountStart; } - @Override - public void setTaskCountStart(int taskCountStart) - { - this.taskCountStart = taskCountStart; - } - @Override public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index c8bd99526e55..65e28b1dfb07 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -784,7 +784,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc int taskCountBeforeScaleOut = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountMin(); Assert.assertEquals(1, taskCountBeforeScaleOut); Thread.sleep(1000); - int taskCountAfterScaleOut = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart(); + int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScaleOut); Assert.assertTrue( dynamicActionEmitter @@ -1014,7 +1014,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() t Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); Thread.sleep(1000); - int taskCountAfterScaleOut = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart(); + int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScaleOut); emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 1); @@ -1066,7 +1066,7 @@ public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedExce Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); Thread.sleep(1000); - int taskCountAfterScaleOut = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart(); + int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(1, taskCountAfterScaleOut); emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 1); @@ -1126,7 +1126,7 @@ public void testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanParti Thread.sleep(2000); // Then - Assert.assertEquals(10, (int) supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); + Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount()); emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 1); @@ -1605,17 +1605,17 @@ public void testMergeSpecConfigs() mockIngestionSchema(); // Given - // Create existing spec with autoscaler config that has taskCountStart set to 5 + // Create existing spec with autoscaler config and taskCount set to 5 HashMap existingAutoScalerConfig = new HashMap<>(); existingAutoScalerConfig.put("enableTaskAutoScaler", true); existingAutoScalerConfig.put("taskCountMax", 8); existingAutoScalerConfig.put("taskCountMin", 1); - existingAutoScalerConfig.put("taskCountStart", 5); SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.mock(SeekableStreamSupervisorIOConfig.class); EasyMock.expect(existingIoConfig.getAutoScalerConfig()) .andReturn(mapper.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) .anyTimes(); + EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes(); EasyMock.replay(existingIoConfig); SeekableStreamSupervisorIngestionSpec existingIngestionSchema = EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class); @@ -1631,7 +1631,7 @@ public void testMergeSpecConfigs() existingIngestionSchema ); - // Create new spec with autoscaler config that has taskCountStart not set (null) + // Create new spec with autoscaler config that has taskCountStart not set (null) and no taskCount set HashMap newAutoScalerConfig = new HashMap<>(); newAutoScalerConfig.put("enableTaskAutoScaler", true); newAutoScalerConfig.put("taskCountMax", 8); @@ -1641,6 +1641,9 @@ public void testMergeSpecConfigs() EasyMock.expect(newIoConfig.getAutoScalerConfig()) .andReturn(mapper.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) .anyTimes(); + EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes(); + newIoConfig.setTaskCount(5); + EasyMock.expectLastCall().once(); EasyMock.replay(newIoConfig); SeekableStreamSupervisorIngestionSpec newIngestionSchema = EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class); @@ -1654,15 +1657,14 @@ public void testMergeSpecConfigs() newIngestionSchema ); - // Before merge, taskCountStart should be null Assert.assertNull(newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart()); - // When - newSpec.mergeSpecConfigs(existingSpec); + // When - merge should copy taskCount from existing spec since new spec has no taskCount + newSpec.merge(existingSpec); - // Then - taskCountStart should be copied from existing spec - Assert.assertEquals(Integer.valueOf(5), newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart()); + // Then - verify setTaskCount was called (EasyMock will verify the mock expectations) + EasyMock.verify(newIoConfig); } private TestSeekableStreamSupervisorSpec buildDefaultSupervisorSpecWithIngestionSchema( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java index ba9ae67a5204..09ac91a36da2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java @@ -127,21 +127,28 @@ public Authorizer getAuthorizer(String name) ); } + /** + * Tests that when a new spec is submitted without taskCountStart, the merge function + * follows the priority: provided taskCountStart > provided taskCount > existing taskCount > provided taskCountMin. + * In this test, existing spec has taskCount=5, new spec has no taskCount set, + * so merge should use existing taskCount (5). + */ @Test - public void testSpecPostWithTaskCountStartMerge() + public void testSpecPostMergeUsesExistingTaskCount() { - // Create an existing spec with taskCountStart=5 in autoScalerConfig + // Create an existing spec with taskCount=5 (simulating a scaled state) HashMap existingAutoScalerConfig = new HashMap<>(); existingAutoScalerConfig.put("enableTaskAutoScaler", true); existingAutoScalerConfig.put("taskCountMax", 7); existingAutoScalerConfig.put("taskCountMin", 1); - existingAutoScalerConfig.put("taskCountStart", 5); + // Note: taskCountStart is NOT set in existing spec SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); EasyMock.expect(existingIoConfig.getAutoScalerConfig()) .andReturn(OBJECT_MAPPER.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) .anyTimes(); EasyMock.expect(existingIoConfig.getStream()).andReturn("test-stream").anyTimes(); + EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes(); // existing taskCount is 5 EasyMock.replay(existingIoConfig); DataSchema existingDataSchema = EasyMock.createMock(DataSchema.class); @@ -159,11 +166,11 @@ public void testSpecPostWithTaskCountStartMerge() existingIngestionSchema ); - // Create a new spec WITHOUT taskCountStart in autoScalerConfig + // Create a new spec WITHOUT taskCountStart and WITHOUT taskCount in autoScalerConfig HashMap newAutoScalerConfig = new HashMap<>(); newAutoScalerConfig.put("enableTaskAutoScaler", true); newAutoScalerConfig.put("taskCountMax", 8); - newAutoScalerConfig.put("taskCountMin", 1); + newAutoScalerConfig.put("taskCountMin", 2); // Note: taskCountStart is NOT set SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); @@ -171,6 +178,9 @@ public void testSpecPostWithTaskCountStartMerge() .andReturn(OBJECT_MAPPER.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) .anyTimes(); EasyMock.expect(newIoConfig.getStream()).andReturn("test-stream").anyTimes(); + EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes(); // new spec has no taskCount + newIoConfig.setTaskCount(5); // Expect merge to set taskCount to existing value (5) + EasyMock.expectLastCall().once(); EasyMock.replay(newIoConfig); DataSchema newDataSchema = EasyMock.createMock(DataSchema.class); @@ -198,12 +208,13 @@ public List getDataSources() // Set up mocks for SupervisorManager behavior EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - // Mock shouldUpdateSupervisor to return true (spec is different) - Capture capturedExistingSpec = EasyMock.newCapture(); - EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(EasyMock.capture(capturedExistingSpec))) + // Mock createOrUpdateAndStartSupervisor to call merge + final SupervisorSpec existingSpecForMerge = existingSpec; + Capture capturedNewSpec = EasyMock.newCapture(); + EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(EasyMock.capture(capturedNewSpec))) .andAnswer(() -> { SupervisorSpec arg = (SupervisorSpec) EasyMock.getCurrentArguments()[0]; - arg.mergeSpecConfigs(existingSpec); + arg.merge(existingSpecForMerge); return true; }); @@ -229,15 +240,214 @@ public List getDataSources() verifyAll(); Assert.assertEquals(200, response.getStatus()); + } + + /** + * Tests that when a new spec is submitted with taskCount set, merge uses provided taskCount + * over existing taskCount. Priority: provided taskCountStart > provided taskCount > existing taskCount > provided taskCountMin. + */ + @Test + public void testSpecPostMergeUsesProvidedTaskCount() + { + // Create an existing spec with taskCount=5 + HashMap existingAutoScalerConfig = new HashMap<>(); + existingAutoScalerConfig.put("enableTaskAutoScaler", true); + existingAutoScalerConfig.put("taskCountMax", 7); + existingAutoScalerConfig.put("taskCountMin", 1); + + SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(existingIoConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(existingIoConfig.getStream()).andReturn("test-stream").anyTimes(); + EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes(); + EasyMock.replay(existingIoConfig); + + DataSchema existingDataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(existingDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(existingDataSchema); + + SeekableStreamSupervisorIngestionSpec existingIngestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes(); + EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(existingDataSchema).anyTimes(); + EasyMock.replay(existingIngestionSchema); + + TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec( + "my-id", + existingIngestionSchema + ); + + // Create a new spec with taskCount=3 (provided taskCount should take precedence) + HashMap newAutoScalerConfig = new HashMap<>(); + newAutoScalerConfig.put("enableTaskAutoScaler", true); + newAutoScalerConfig.put("taskCountMax", 8); + newAutoScalerConfig.put("taskCountMin", 2); + + SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(newIoConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(newIoConfig.getStream()).andReturn("test-stream").anyTimes(); + EasyMock.expect(newIoConfig.getTaskCount()).andReturn(3).anyTimes(); // provided taskCount=3 + newIoConfig.setTaskCount(3); // Expect merge to use provided taskCount (3) + EasyMock.expectLastCall().once(); + EasyMock.replay(newIoConfig); + + DataSchema newDataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(newDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(newDataSchema); + + SeekableStreamSupervisorIngestionSpec newIngestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes(); + EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(newDataSchema).anyTimes(); + EasyMock.replay(newIngestionSchema); + + TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec( + "my-id", + newIngestionSchema + ) + { + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + + final SupervisorSpec existingSpecForMerge = existingSpec; + Capture capturedNewSpec = EasyMock.newCapture(); + EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(EasyMock.capture(capturedNewSpec))) + .andAnswer(() -> { + SupervisorSpec arg = (SupervisorSpec) EasyMock.getCurrentArguments()[0]; + arg.merge(existingSpecForMerge); + return true; + }); + + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) + .andReturn(Optional.of(existingSpec)) + .anyTimes(); + + setupMockRequest(); + setupMockRequestForAudit(); + + EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); + auditManager.doAudit(EasyMock.anyObject()); + EasyMock.expectLastCall().once(); + + replayAll(); + + Response response = supervisorResource.specPost(newSpec, false, request); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); + } + + /** + * Tests that when neither provided taskCount nor existing taskCount is set, + * merge falls back to provided taskCountMin. + */ + @Test + public void testSpecPostMergeFallsBackToProvidedTaskCountMin() + { + // Create an existing spec with no taskCount + HashMap existingAutoScalerConfig = new HashMap<>(); + existingAutoScalerConfig.put("enableTaskAutoScaler", true); + existingAutoScalerConfig.put("taskCountMax", 7); + existingAutoScalerConfig.put("taskCountMin", 1); + + SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(existingIoConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(existingIoConfig.getStream()).andReturn("test-stream").anyTimes(); + EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(null).anyTimes(); // existing has no taskCount + EasyMock.replay(existingIoConfig); - // Then - TestSeekableStreamSupervisorSpec capturedSpec = (TestSeekableStreamSupervisorSpec) capturedExistingSpec.getValue(); + DataSchema existingDataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(existingDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(existingDataSchema); + + SeekableStreamSupervisorIngestionSpec existingIngestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes(); + EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(existingDataSchema).anyTimes(); + EasyMock.replay(existingIngestionSchema); - Assert.assertNotNull(capturedSpec.getIoConfig().getAutoScalerConfig()); - Assert.assertEquals( - Integer.valueOf(5), - newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart() + TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec( + "my-id", + existingIngestionSchema ); + + // Create a new spec with taskCountMin=4, no taskCount + HashMap newAutoScalerConfig = new HashMap<>(); + newAutoScalerConfig.put("enableTaskAutoScaler", true); + newAutoScalerConfig.put("taskCountMax", 8); + newAutoScalerConfig.put("taskCountMin", 4); // provided taskCountMin + + SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(newIoConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(newIoConfig.getStream()).andReturn("test-stream").anyTimes(); + EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes(); // no provided taskCount + newIoConfig.setTaskCount(4); // Expect merge to fall back to provided taskCountMin (4) + EasyMock.expectLastCall().once(); + EasyMock.replay(newIoConfig); + + DataSchema newDataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(newDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(newDataSchema); + + SeekableStreamSupervisorIngestionSpec newIngestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes(); + EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(newDataSchema).anyTimes(); + EasyMock.replay(newIngestionSchema); + + TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec( + "my-id", + newIngestionSchema + ) + { + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + + final SupervisorSpec existingSpecForMerge = existingSpec; + Capture capturedNewSpec = EasyMock.newCapture(); + EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(EasyMock.capture(capturedNewSpec))) + .andAnswer(() -> { + SupervisorSpec arg = (SupervisorSpec) EasyMock.getCurrentArguments()[0]; + arg.merge(existingSpecForMerge); + return true; + }); + + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) + .andReturn(Optional.of(existingSpec)) + .anyTimes(); + + setupMockRequest(); + setupMockRequestForAudit(); + + EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); + auditManager.doAudit(EasyMock.anyObject()); + EasyMock.expectLastCall().once(); + + replayAll(); + + Response response = supervisorResource.specPost(newSpec, false, request); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); } private void setupMockRequest() diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index a3fedcac7046..377223308b0f 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -116,12 +116,12 @@ default void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcep } /** - * Perform any merging of spec configurations needed after deserialization. + * Updates this supervisor spec by merging values from the given {@code existingSpec}. + * This method may be used to carry forward existing spec values when a supervisor is being resubmitted. * * @param existingSpec used spec to merge values from - * @throws DruidException if the spec update is not allowed */ - default void mergeSpecConfigs(@NotNull SupervisorSpec existingSpec) + default void merge(@NotNull SupervisorSpec existingSpec) { // No-op by default } From 967952709bb2acf1f69c146539f8fb3b8a41ca48 Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Wed, 10 Dec 2025 15:08:03 +0200 Subject: [PATCH 6/8] Fix another issues --- .../druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | 2 +- .../indexing/kinesis/supervisor/KinesisSupervisorTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 7f2f0ea67d60..941fc5a7d79c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -395,7 +395,7 @@ public SeekableStreamIndexTaskClient build( Thread.sleep(1000); verifyAll(); - int taskCountAfterScale = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart(); + int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScale); KafkaIndexTask task = captured.getValue(); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index f217e0747052..80e538bdce76 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -361,7 +361,7 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception supervisor.runInternal(); verifyAll(); Thread.sleep(1000); - int taskCountAfterScale = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart(); + int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScale); autoscaler.stop(); } @@ -438,7 +438,7 @@ public void testNoInitialStateWithAutoScaleIn() throws Exception supervisor.runInternal(); verifyAll(); Thread.sleep(1 * 1000); - int taskCountAfterScale = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart(); + int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(1, taskCountAfterScale); autoscaler.stop(); } From c7a1468aeada7abcbd167d1a932024f3860054ba Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Thu, 11 Dec 2025 18:52:32 +0200 Subject: [PATCH 7/8] Tests fixes and cleanups --- .../kafka/supervisor/KafkaSupervisorTest.java | 2 +- .../supervisor/KinesisSupervisorTest.java | 8 +- .../supervisor/SeekableStreamSupervisor.java | 62 +- .../SeekableStreamSupervisorSpec.java | 6 +- .../supervisor/SupervisorResourceTest.java | 411 +++++++++++++- .../SeekableStreamSupervisorSpecTest.java | 26 +- .../SupervisorResourceConfigMergeTest.java | 533 ------------------ 7 files changed, 449 insertions(+), 599 deletions(-) delete mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 941fc5a7d79c..f6a026e6aa79 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -389,7 +389,7 @@ public SeekableStreamIndexTaskClient build( supervisor.start(); int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount(); - Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); + Assert.assertEquals(1, taskCountBeforeScale); autoscaler.start(); supervisor.runInternal(); Thread.sleep(1000); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 80e538bdce76..1fd50c135458 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -355,12 +355,13 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception replayAll(); supervisor.start(); - Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); + int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(1, taskCountBeforeScale); autoscaler.start(); supervisor.runInternal(); verifyAll(); - Thread.sleep(1000); + Thread.sleep(1 * 1000); int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScale); autoscaler.stop(); @@ -432,7 +433,8 @@ public void testNoInitialStateWithAutoScaleIn() throws Exception supervisor.getIoConfig().setTaskCount(2); supervisor.start(); - Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); + int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(2, taskCountBeforeScale); autoscaler.start(); supervisor.runInternal(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 60dc829fb196..f69dc52159b9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -566,51 +566,43 @@ public String getType() private boolean changeTaskCount(int desiredActiveTaskCount) throws InterruptedException, ExecutionException { - if (autoScalerConfig == null) { - log.warn("autoScalerConfig is 'null' but dynamic allocation notice is submitted, how can it be ?"); - return false; - } int currentActiveTaskCount; Collection activeTaskGroups = activelyReadingTaskGroups.values(); currentActiveTaskCount = activeTaskGroups.size(); if (desiredActiveTaskCount < 0 || desiredActiveTaskCount == currentActiveTaskCount) { return false; + } else { + log.info( + "Starting scale action, current active task count is [%d] and desired task count is [%d] for supervisor[%s] for dataSource[%s].", + currentActiveTaskCount, + desiredActiveTaskCount, + supervisorId, + dataSource + ); + final Stopwatch scaleActionStopwatch = Stopwatch.createStarted(); + gracefulShutdownInternal(); + changeTaskCountInIOConfig(desiredActiveTaskCount); + clearAllocationInfo(); + emitter.emit(ServiceMetricEvent.builder() + .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) + .setDimension(DruidMetrics.DATASOURCE, dataSource) + .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()) + .setDimensionIfNotNull( + DruidMetrics.TAGS, + spec.getContextValue(DruidMetrics.TAGS) + ) + .setMetric( + AUTOSCALER_SCALING_TIME_METRIC, + scaleActionStopwatch.millisElapsed() + )); + log.info("Changed taskCount to [%s] for supervisor[%s] for dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource); + return true; } - log.info( - "Starting scale action, current active task count is [%d] and desired task count is [%d] for supervisor[%s] for dataSource[%s].", - currentActiveTaskCount, - desiredActiveTaskCount, - supervisorId, - dataSource - ); - final Stopwatch scaleActionStopwatch = Stopwatch.createStarted(); - gracefulShutdownInternal(); - changeTaskCountInAutoScalerConfig(desiredActiveTaskCount); - clearAllocationInfo(); - emitter.emit(ServiceMetricEvent.builder() - .setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId) - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .setDimension(DruidMetrics.STREAM, getIoConfig().getStream()) - .setDimensionIfNotNull( - DruidMetrics.TAGS, - spec.getContextValue(DruidMetrics.TAGS) - ) - .setMetric( - AUTOSCALER_SCALING_TIME_METRIC, - scaleActionStopwatch.millisElapsed() - )); - log.info("Changed taskCount to [%s] for supervisor[%s] for dataSource[%s].", desiredActiveTaskCount, supervisorId, dataSource); - return true; } - private void changeTaskCountInAutoScalerConfig(int desiredActiveTaskCount) + private void changeTaskCountInIOConfig(int desiredActiveTaskCount) { - // Sanity check. - if (autoScalerConfig == null) { - log.warn("autoScalerConfig is null but scale action is submitted, how can it be ?"); - return; - } ioConfig.setTaskCount(desiredActiveTaskCount); try { Optional supervisorManager = taskMaster.getSupervisorManager(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index f5e0f4c12a77..f21e073f6c4c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -192,9 +192,9 @@ public DruidMonitorSchedulerConfig getMonitorSchedulerConfig() @Override public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor) { - AutoScalerConfig autoScalerCfg = ingestionSchema.getIOConfig().getAutoScalerConfig(); - if (autoScalerCfg != null && autoScalerCfg.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) { - return autoScalerCfg.createAutoScaler(supervisor, this, emitter); + AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoScalerConfig(); + if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) { + return autoScalerConfig.createAutoScaler(supervisor, this, emitter); } return new NoopTaskAutoScaler(); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 538fd4a9e78f..bdc18f82b3f8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -27,12 +27,23 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.audit.AuditManager; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.incremental.RowIngestionMetersFactory; +import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthConfig; @@ -54,6 +65,7 @@ import org.junit.runner.RunWith; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; import java.util.Collections; @@ -61,6 +73,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; @RunWith(EasyMockRunner.class) @@ -351,8 +364,10 @@ public void testSpecGetAllFull() Assert.assertTrue( specs.stream() .allMatch(spec -> - ("id1".equals(spec.getId()) && spec.getDataSource().equals("datasource1") && SPEC1.equals(spec.getSpec())) || - ("id2".equals(spec.getId()) && spec.getDataSource().equals("datasource2") && SPEC2.equals(spec.getSpec())) + ("id1".equals(spec.getId()) && spec.getDataSource().equals("datasource1") && SPEC1.equals( + spec.getSpec())) || + ("id2".equals(spec.getId()) && spec.getDataSource().equals("datasource2") && SPEC2.equals( + spec.getSpec())) ) ); } @@ -992,7 +1007,9 @@ public void testSpecGetHistory() EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(3); EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id1", null)).andReturn(versions1).times(1); EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id2", null)).andReturn(versions2).times(1); - EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3", null)).andReturn(Collections.emptyList()).times(1); + EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3", null)) + .andReturn(Collections.emptyList()) + .times(1); setupMockRequest(); replayAll(); @@ -1083,7 +1100,9 @@ public void testSpecGetHistoryWithAuthFailure() EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id1", null)).andReturn(versions1).times(1); EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id2", null)).andReturn(versions2).times(1); EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3", null)).andReturn(versions3).times(1); - EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id4", null)).andReturn(Collections.emptyList()).times(1); + EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id4", null)) + .andReturn(Collections.emptyList()) + .times(1); setupMockRequestForUser("notdruid"); replayAll(); @@ -1183,12 +1202,18 @@ public void testSpecGetHistoryWithLimit() // Test with limit=0 (should return 400 Bad Request) response = supervisorResource.specGetHistory(request, "id1", 0); Assert.assertEquals(400, response.getStatus()); - Assert.assertEquals(ImmutableMap.of("error", "Count must be greater than zero if set (count was 0)"), response.getEntity()); + Assert.assertEquals( + ImmutableMap.of("error", "Count must be greater than zero if set (count was 0)"), + response.getEntity() + ); // Test with negative limit (should return 400 Bad Request) response = supervisorResource.specGetHistory(request, "id1", -1); Assert.assertEquals(400, response.getStatus()); - Assert.assertEquals(ImmutableMap.of("error", "Count must be greater than zero if set (count was -1)"), response.getEntity()); + Assert.assertEquals( + ImmutableMap.of("error", "Count must be greater than zero if set (count was -1)"), + response.getEntity() + ); // Test with limit larger than available history response = supervisorResource.specGetHistory(request, "id1", 100); @@ -1320,6 +1345,319 @@ public void testNoopSupervisorSpecSerde() throws Exception Assert.assertEquals(spec, specRoundTrip); } + + @Test + public void testSpecPostMergeUsesExistingTaskCountHigherPriorityHasBeenMissed() + { + // Tests that when a new spec is submitted without taskCountStart, the merge function + // follows the priority: provided taskCountStart > provided taskCount > existing taskCount > provided taskCountMin. + + // Create an existing spec with taskCount=5 (simulating a scaled state) + HashMap existingAutoScalerConfig = new HashMap<>(); + existingAutoScalerConfig.put("enableTaskAutoScaler", true); + existingAutoScalerConfig.put("taskCountMax", 7); + existingAutoScalerConfig.put("taskCountMin", 1); + // Note: taskCountStart is NOT set in the existing spec + + SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(existingIoConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(existingIoConfig.getStream()).andReturn("test-stream").anyTimes(); + EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes(); // existing taskCount is 5 + EasyMock.replay(existingIoConfig); + + DataSchema existingDataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(existingDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(existingDataSchema); + + SeekableStreamSupervisorIngestionSpec existingIngestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes(); + EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(existingDataSchema).anyTimes(); + EasyMock.replay(existingIngestionSchema); + + TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec( + "my-id", + existingIngestionSchema + ); + + // Create a new spec WITHOUT taskCountStart and WITHOUT taskCount in autoScalerConfig + HashMap newAutoScalerConfig = new HashMap<>(); + newAutoScalerConfig.put("enableTaskAutoScaler", true); + newAutoScalerConfig.put("taskCountMax", 8); + newAutoScalerConfig.put("taskCountMin", 2); + // Note: taskCountStart is NOT set + + SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(newIoConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(newIoConfig.getStream()).andReturn("test-stream").anyTimes(); + EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes(); // new spec has no taskCount + newIoConfig.setTaskCount(5); // Expect merge to set taskCount to existing value (5) + EasyMock.expectLastCall().once(); + EasyMock.replay(newIoConfig); + + DataSchema newDataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(newDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(newDataSchema); + + SeekableStreamSupervisorIngestionSpec newIngestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes(); + EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(newDataSchema).anyTimes(); + EasyMock.replay(newIngestionSchema); + + TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec( + "my-id", + newIngestionSchema + ) + { + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + + // Set up mocks for SupervisorManager behavior + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + + // Mock createOrUpdateAndStartSupervisor to call merge + final SupervisorSpec existingSpecForMerge = existingSpec; + Capture capturedNewSpec = EasyMock.newCapture(); + EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(EasyMock.capture(capturedNewSpec))) + .andAnswer(() -> { + SupervisorSpec arg = (SupervisorSpec) EasyMock.getCurrentArguments()[0]; + arg.merge(existingSpecForMerge); + return true; + }); + + // Mock getSupervisorSpec to return the existing spec (simulating an update scenario) + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) + .andReturn(Optional.of(existingSpec)) + .anyTimes(); + + setupMockRequest(); + setupMockRequestForAudit(); + + EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); + auditManager.doAudit(EasyMock.anyObject()); + EasyMock.expectLastCall().once(); + + replayAll(); + + // Before merge, taskCountStart should be null in new spec + Assert.assertNull(newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart()); + + // When + Response response = supervisorResource.specPost(newSpec, false, request); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); + } + + @Test + public void testSpecPostMergeUsesProvidedTaskCountOverExistingTaskCount() + { + // Create an existing spec with taskCount=5 + HashMap existingAutoScalerConfig = new HashMap<>(); + existingAutoScalerConfig.put("enableTaskAutoScaler", true); + existingAutoScalerConfig.put("taskCountMax", 7); + existingAutoScalerConfig.put("taskCountMin", 1); + + SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(existingIoConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(existingIoConfig.getStream()).andReturn("test-stream").anyTimes(); + EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes(); + EasyMock.replay(existingIoConfig); + + DataSchema existingDataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(existingDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(existingDataSchema); + + SeekableStreamSupervisorIngestionSpec existingIngestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes(); + EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(existingDataSchema).anyTimes(); + EasyMock.replay(existingIngestionSchema); + + TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec( + "my-id", + existingIngestionSchema + ); + + // Create a new spec with taskCount=3 (provided taskCount should take precedence) + HashMap newAutoScalerConfig = new HashMap<>(); + newAutoScalerConfig.put("enableTaskAutoScaler", true); + newAutoScalerConfig.put("taskCountMax", 8); + newAutoScalerConfig.put("taskCountMin", 2); + + SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(newIoConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(newIoConfig.getStream()).andReturn("test-stream").anyTimes(); + EasyMock.expect(newIoConfig.getTaskCount()).andReturn(3).anyTimes(); // provided taskCount=3 + newIoConfig.setTaskCount(3); // Expect merge to use provided taskCount (3) + EasyMock.expectLastCall().once(); + EasyMock.replay(newIoConfig); + + DataSchema newDataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(newDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(newDataSchema); + + SeekableStreamSupervisorIngestionSpec newIngestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes(); + EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(newDataSchema).anyTimes(); + EasyMock.replay(newIngestionSchema); + + TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec( + "my-id", + newIngestionSchema + ) + { + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + + final SupervisorSpec existingSpecForMerge = existingSpec; + Capture capturedNewSpec = EasyMock.newCapture(); + EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(EasyMock.capture(capturedNewSpec))) + .andAnswer(() -> { + SupervisorSpec arg = (SupervisorSpec) EasyMock.getCurrentArguments()[0]; + arg.merge(existingSpecForMerge); + return true; + }); + + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) + .andReturn(Optional.of(existingSpec)) + .anyTimes(); + + setupMockRequest(); + setupMockRequestForAudit(); + + EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); + auditManager.doAudit(EasyMock.anyObject()); + EasyMock.expectLastCall().once(); + + replayAll(); + + Response response = supervisorResource.specPost(newSpec, false, request); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); + } + + @Test + public void testSpecPostMergeFallsBackToProvidedTaskCountMin() + { + // Create an existing spec with no taskCount + HashMap existingAutoScalerConfig = new HashMap<>(); + existingAutoScalerConfig.put("enableTaskAutoScaler", true); + existingAutoScalerConfig.put("taskCountMax", 7); + existingAutoScalerConfig.put("taskCountMin", 1); + + SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(existingIoConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(existingIoConfig.getStream()).andReturn("test-stream").anyTimes(); + EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(null).anyTimes(); // existing has no taskCount + EasyMock.replay(existingIoConfig); + + DataSchema existingDataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(existingDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(existingDataSchema); + + SeekableStreamSupervisorIngestionSpec existingIngestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes(); + EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(existingDataSchema).anyTimes(); + EasyMock.replay(existingIngestionSchema); + + TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec( + "my-id", + existingIngestionSchema + ); + + // Create a new spec with taskCountMin=4, no taskCount + HashMap newAutoScalerConfig = new HashMap<>(); + newAutoScalerConfig.put("enableTaskAutoScaler", true); + newAutoScalerConfig.put("taskCountMax", 8); + newAutoScalerConfig.put("taskCountMin", 4); // provided taskCountMin + + SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(newIoConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(newIoConfig.getStream()).andReturn("test-stream").anyTimes(); + EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes(); // no provided taskCount + newIoConfig.setTaskCount(4); // Expect merge to fall back to provided taskCountMin (4) + EasyMock.expectLastCall().once(); + EasyMock.replay(newIoConfig); + + DataSchema newDataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(newDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(newDataSchema); + + SeekableStreamSupervisorIngestionSpec newIngestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes(); + EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(newDataSchema).anyTimes(); + EasyMock.replay(newIngestionSchema); + + TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec( + "my-id", + newIngestionSchema + ) + { + @Override + public List getDataSources() + { + return Collections.singletonList("datasource1"); + } + }; + + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + + final SupervisorSpec existingSpecForMerge = existingSpec; + Capture capturedNewSpec = EasyMock.newCapture(); + EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(EasyMock.capture(capturedNewSpec))) + .andAnswer(() -> { + SupervisorSpec arg = (SupervisorSpec) EasyMock.getCurrentArguments()[0]; + arg.merge(existingSpecForMerge); + return true; + }); + + EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) + .andReturn(Optional.of(existingSpec)) + .anyTimes(); + + setupMockRequest(); + setupMockRequestForAudit(); + + EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); + auditManager.doAudit(EasyMock.anyObject()); + EasyMock.expectLastCall().once(); + + replayAll(); + + Response response = supervisorResource.specPost(newSpec, false, request); + verifyAll(); + + Assert.assertEquals(200, response.getStatus()); + } + private void setupMockRequest() { setupMockRequestForUser("druid"); @@ -1445,10 +1783,10 @@ public boolean equals(Object o) if (getId() != null ? !getId().equals(that.getId()) : that.getId() != null) { return false; } - if (supervisor != null ? !supervisor.equals(that.supervisor) : that.supervisor != null) { + if (!Objects.equals(supervisor, that.supervisor)) { return false; } - if (datasources != null ? !datasources.equals(that.datasources) : that.datasources != null) { + if (!Objects.equals(datasources, that.datasources)) { return false; } return isSuspended() == that.isSuspended(); @@ -1464,4 +1802,61 @@ public int hashCode() return result; } } + + static class TestSeekableStreamSupervisorSpec extends SeekableStreamSupervisorSpec + { + public TestSeekableStreamSupervisorSpec( + @Nullable String id, + SeekableStreamSupervisorIngestionSpec ingestionSchema + ) + { + super( + id, + ingestionSchema, + null, + false, + EasyMock.createMock(TaskStorage.class), + EasyMock.createMock(TaskMaster.class), + EasyMock.createMock(IndexerMetadataStorageCoordinator.class), + EasyMock.createMock(SeekableStreamIndexTaskClientFactory.class), + OBJECT_MAPPER, + EasyMock.createMock(ServiceEmitter.class), + EasyMock.createMock(DruidMonitorSchedulerConfig.class), + EasyMock.createMock(RowIngestionMetersFactory.class), + EasyMock.createMock(SupervisorStateManagerConfig.class) + ); + } + + @Override + public Supervisor createSupervisor() + { + return null; + } + + @Override + public String getType() + { + return "test"; + } + + @Override + public String getSource() + { + return "test-stream"; + } + + @Override + protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend) + { + return null; + } + + @JsonIgnore + @Nonnull + @Override + public Set getInputSourceResources() throws UnsupportedOperationException + { + return Collections.singleton(new ResourceAction(new Resource("test", ResourceType.EXTERNAL), Action.READ)); + } + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 65e28b1dfb07..58ffb8438e0d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -779,9 +779,8 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc supervisor.start(); autoScaler.start(); supervisor.runInternal(); - Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig()); - int taskCountBeforeScaleOut = supervisor.getIoConfig().getAutoScalerConfig().getTaskCountMin(); + int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(1, taskCountBeforeScaleOut); Thread.sleep(1000); int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); @@ -1010,8 +1009,8 @@ public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() t autoScaler.start(); supervisor.runInternal(); - Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig()); - Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); + int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(1, taskCountBeforeScaleOut); Thread.sleep(1000); int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); @@ -1057,13 +1056,14 @@ public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedExce // enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin. Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount()); - Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig()); + supervisor.getIoConfig().setTaskCount(2); supervisor.start(); autoScaler.start(); supervisor.runInternal(); - Assert.assertNull(supervisor.getIoConfig().getAutoScalerConfig().getTaskCountStart()); + int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(2, taskCountBeforeScaleOut); Thread.sleep(1000); int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); @@ -1113,17 +1113,15 @@ public void testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanParti ); // enable autoscaler so that taskcount config will be ignored and the init value of taskCount will use taskCountMin. - Assert.assertNotNull(supervisor.getIoConfig().getAutoScalerConfig()); - Assert.assertEquals( - supervisor.getIoConfig().getAutoScalerConfig().getTaskCountMin(), - (int) supervisor.getIoConfig().getTaskCount() - ); + Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount()); + supervisor.getIoConfig().setTaskCount(2); // When supervisor.start(); autoScaler.start(); supervisor.runInternal(); + Assert.assertEquals(2, (int) supervisor.getIoConfig().getTaskCount()); Thread.sleep(2000); // Then Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount()); @@ -1496,11 +1494,7 @@ public void test_validateSpecUpdateTo_ShortCircuits() DruidException.Category.INVALID_INPUT, "invalidInput" ).expectMessageIs( - StringUtils.format( - "Cannot update supervisor spec from type[%s] to type[%s]", - proposedSpec.getClass().getSimpleName(), - otherSpec.getClass().getSimpleName() - ) + StringUtils.format("Cannot update supervisor spec from type[%s] to type[%s]", proposedSpec.getClass().getSimpleName(), otherSpec.getClass().getSimpleName()) ) ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java deleted file mode 100644 index 09ac91a36da2..000000000000 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SupervisorResourceConfigMergeTest.java +++ /dev/null @@ -1,533 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.seekablestream; - -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Optional; -import org.apache.druid.audit.AuditManager; -import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; -import org.apache.druid.indexing.overlord.TaskMaster; -import org.apache.druid.indexing.overlord.TaskStorage; -import org.apache.druid.indexing.overlord.supervisor.Supervisor; -import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; -import org.apache.druid.indexing.overlord.supervisor.SupervisorResource; -import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; -import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; -import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; -import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec; -import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; -import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; -import org.apache.druid.java.util.emitter.service.ServiceEmitter; -import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; -import org.apache.druid.segment.TestHelper; -import org.apache.druid.segment.incremental.RowIngestionMetersFactory; -import org.apache.druid.segment.indexing.DataSchema; -import org.apache.druid.server.security.Access; -import org.apache.druid.server.security.Action; -import org.apache.druid.server.security.AuthConfig; -import org.apache.druid.server.security.AuthenticationResult; -import org.apache.druid.server.security.Authorizer; -import org.apache.druid.server.security.AuthorizerMapper; -import org.apache.druid.server.security.Resource; -import org.apache.druid.server.security.ResourceAction; -import org.apache.druid.server.security.ResourceType; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.EasyMockRunner; -import org.easymock.EasyMockSupport; -import org.easymock.Mock; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.core.Response; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Set; - -@RunWith(EasyMockRunner.class) -public class SupervisorResourceConfigMergeTest extends EasyMockSupport -{ - private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper(); - @Mock - private TaskMaster taskMaster; - - @Mock - private SupervisorManager supervisorManager; - - @Mock - private HttpServletRequest request; - - @Mock - private AuthConfig authConfig; - - @Mock - private AuditManager auditManager; - - private SupervisorResource supervisorResource; - - @Before - public void setUp() - { - supervisorResource = new SupervisorResource( - taskMaster, - new AuthorizerMapper(null) - { - @Override - public Authorizer getAuthorizer(String name) - { - return (authenticationResult, resource, action) -> { - if (authenticationResult.getIdentity().equals("druid")) { - return Access.OK; - } else { - if (resource.getType().equals(ResourceType.DATASOURCE)) { - if (resource.getName().equals("datasource2")) { - return Access.deny("not authorized."); - } else { - return Access.OK; - } - } else if (resource.getType().equals(ResourceType.EXTERNAL)) { - if (resource.getName().equals("test")) { - return Access.deny("not authorized."); - } else { - return Access.OK; - } - } - return Access.OK; - } - }; - } - }, - OBJECT_MAPPER, - authConfig, - auditManager - ); - } - - /** - * Tests that when a new spec is submitted without taskCountStart, the merge function - * follows the priority: provided taskCountStart > provided taskCount > existing taskCount > provided taskCountMin. - * In this test, existing spec has taskCount=5, new spec has no taskCount set, - * so merge should use existing taskCount (5). - */ - @Test - public void testSpecPostMergeUsesExistingTaskCount() - { - // Create an existing spec with taskCount=5 (simulating a scaled state) - HashMap existingAutoScalerConfig = new HashMap<>(); - existingAutoScalerConfig.put("enableTaskAutoScaler", true); - existingAutoScalerConfig.put("taskCountMax", 7); - existingAutoScalerConfig.put("taskCountMin", 1); - // Note: taskCountStart is NOT set in existing spec - - SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); - EasyMock.expect(existingIoConfig.getAutoScalerConfig()) - .andReturn(OBJECT_MAPPER.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) - .anyTimes(); - EasyMock.expect(existingIoConfig.getStream()).andReturn("test-stream").anyTimes(); - EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes(); // existing taskCount is 5 - EasyMock.replay(existingIoConfig); - - DataSchema existingDataSchema = EasyMock.createMock(DataSchema.class); - EasyMock.expect(existingDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.replay(existingDataSchema); - - SeekableStreamSupervisorIngestionSpec existingIngestionSchema = - EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); - EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes(); - EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(existingDataSchema).anyTimes(); - EasyMock.replay(existingIngestionSchema); - - TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec( - "my-id", - existingIngestionSchema - ); - - // Create a new spec WITHOUT taskCountStart and WITHOUT taskCount in autoScalerConfig - HashMap newAutoScalerConfig = new HashMap<>(); - newAutoScalerConfig.put("enableTaskAutoScaler", true); - newAutoScalerConfig.put("taskCountMax", 8); - newAutoScalerConfig.put("taskCountMin", 2); - // Note: taskCountStart is NOT set - - SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); - EasyMock.expect(newIoConfig.getAutoScalerConfig()) - .andReturn(OBJECT_MAPPER.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) - .anyTimes(); - EasyMock.expect(newIoConfig.getStream()).andReturn("test-stream").anyTimes(); - EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes(); // new spec has no taskCount - newIoConfig.setTaskCount(5); // Expect merge to set taskCount to existing value (5) - EasyMock.expectLastCall().once(); - EasyMock.replay(newIoConfig); - - DataSchema newDataSchema = EasyMock.createMock(DataSchema.class); - EasyMock.expect(newDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.replay(newDataSchema); - - SeekableStreamSupervisorIngestionSpec newIngestionSchema = - EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); - EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes(); - EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(newDataSchema).anyTimes(); - EasyMock.replay(newIngestionSchema); - - TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec( - "my-id", - newIngestionSchema - ) - { - @Override - public List getDataSources() - { - return Collections.singletonList("datasource1"); - } - }; - - // Set up mocks for SupervisorManager behavior - EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - - // Mock createOrUpdateAndStartSupervisor to call merge - final SupervisorSpec existingSpecForMerge = existingSpec; - Capture capturedNewSpec = EasyMock.newCapture(); - EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(EasyMock.capture(capturedNewSpec))) - .andAnswer(() -> { - SupervisorSpec arg = (SupervisorSpec) EasyMock.getCurrentArguments()[0]; - arg.merge(existingSpecForMerge); - return true; - }); - - // Mock getSupervisorSpec to return the existing spec (simulating an update scenario) - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) - .andReturn(Optional.of(existingSpec)) - .anyTimes(); - - setupMockRequest(); - setupMockRequestForAudit(); - - EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); - auditManager.doAudit(EasyMock.anyObject()); - EasyMock.expectLastCall().once(); - - replayAll(); - - // Before merge, taskCountStart should be null in new spec - Assert.assertNull(newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart()); - - // When - Response response = supervisorResource.specPost(newSpec, false, request); - verifyAll(); - - Assert.assertEquals(200, response.getStatus()); - } - - /** - * Tests that when a new spec is submitted with taskCount set, merge uses provided taskCount - * over existing taskCount. Priority: provided taskCountStart > provided taskCount > existing taskCount > provided taskCountMin. - */ - @Test - public void testSpecPostMergeUsesProvidedTaskCount() - { - // Create an existing spec with taskCount=5 - HashMap existingAutoScalerConfig = new HashMap<>(); - existingAutoScalerConfig.put("enableTaskAutoScaler", true); - existingAutoScalerConfig.put("taskCountMax", 7); - existingAutoScalerConfig.put("taskCountMin", 1); - - SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); - EasyMock.expect(existingIoConfig.getAutoScalerConfig()) - .andReturn(OBJECT_MAPPER.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) - .anyTimes(); - EasyMock.expect(existingIoConfig.getStream()).andReturn("test-stream").anyTimes(); - EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes(); - EasyMock.replay(existingIoConfig); - - DataSchema existingDataSchema = EasyMock.createMock(DataSchema.class); - EasyMock.expect(existingDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.replay(existingDataSchema); - - SeekableStreamSupervisorIngestionSpec existingIngestionSchema = - EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); - EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes(); - EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(existingDataSchema).anyTimes(); - EasyMock.replay(existingIngestionSchema); - - TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec( - "my-id", - existingIngestionSchema - ); - - // Create a new spec with taskCount=3 (provided taskCount should take precedence) - HashMap newAutoScalerConfig = new HashMap<>(); - newAutoScalerConfig.put("enableTaskAutoScaler", true); - newAutoScalerConfig.put("taskCountMax", 8); - newAutoScalerConfig.put("taskCountMin", 2); - - SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); - EasyMock.expect(newIoConfig.getAutoScalerConfig()) - .andReturn(OBJECT_MAPPER.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) - .anyTimes(); - EasyMock.expect(newIoConfig.getStream()).andReturn("test-stream").anyTimes(); - EasyMock.expect(newIoConfig.getTaskCount()).andReturn(3).anyTimes(); // provided taskCount=3 - newIoConfig.setTaskCount(3); // Expect merge to use provided taskCount (3) - EasyMock.expectLastCall().once(); - EasyMock.replay(newIoConfig); - - DataSchema newDataSchema = EasyMock.createMock(DataSchema.class); - EasyMock.expect(newDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.replay(newDataSchema); - - SeekableStreamSupervisorIngestionSpec newIngestionSchema = - EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); - EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes(); - EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(newDataSchema).anyTimes(); - EasyMock.replay(newIngestionSchema); - - TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec( - "my-id", - newIngestionSchema - ) - { - @Override - public List getDataSources() - { - return Collections.singletonList("datasource1"); - } - }; - - EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - - final SupervisorSpec existingSpecForMerge = existingSpec; - Capture capturedNewSpec = EasyMock.newCapture(); - EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(EasyMock.capture(capturedNewSpec))) - .andAnswer(() -> { - SupervisorSpec arg = (SupervisorSpec) EasyMock.getCurrentArguments()[0]; - arg.merge(existingSpecForMerge); - return true; - }); - - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) - .andReturn(Optional.of(existingSpec)) - .anyTimes(); - - setupMockRequest(); - setupMockRequestForAudit(); - - EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); - auditManager.doAudit(EasyMock.anyObject()); - EasyMock.expectLastCall().once(); - - replayAll(); - - Response response = supervisorResource.specPost(newSpec, false, request); - verifyAll(); - - Assert.assertEquals(200, response.getStatus()); - } - - /** - * Tests that when neither provided taskCount nor existing taskCount is set, - * merge falls back to provided taskCountMin. - */ - @Test - public void testSpecPostMergeFallsBackToProvidedTaskCountMin() - { - // Create an existing spec with no taskCount - HashMap existingAutoScalerConfig = new HashMap<>(); - existingAutoScalerConfig.put("enableTaskAutoScaler", true); - existingAutoScalerConfig.put("taskCountMax", 7); - existingAutoScalerConfig.put("taskCountMin", 1); - - SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); - EasyMock.expect(existingIoConfig.getAutoScalerConfig()) - .andReturn(OBJECT_MAPPER.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) - .anyTimes(); - EasyMock.expect(existingIoConfig.getStream()).andReturn("test-stream").anyTimes(); - EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(null).anyTimes(); // existing has no taskCount - EasyMock.replay(existingIoConfig); - - DataSchema existingDataSchema = EasyMock.createMock(DataSchema.class); - EasyMock.expect(existingDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.replay(existingDataSchema); - - SeekableStreamSupervisorIngestionSpec existingIngestionSchema = - EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); - EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes(); - EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(existingDataSchema).anyTimes(); - EasyMock.replay(existingIngestionSchema); - - TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec( - "my-id", - existingIngestionSchema - ); - - // Create a new spec with taskCountMin=4, no taskCount - HashMap newAutoScalerConfig = new HashMap<>(); - newAutoScalerConfig.put("enableTaskAutoScaler", true); - newAutoScalerConfig.put("taskCountMax", 8); - newAutoScalerConfig.put("taskCountMin", 4); // provided taskCountMin - - SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); - EasyMock.expect(newIoConfig.getAutoScalerConfig()) - .andReturn(OBJECT_MAPPER.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) - .anyTimes(); - EasyMock.expect(newIoConfig.getStream()).andReturn("test-stream").anyTimes(); - EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes(); // no provided taskCount - newIoConfig.setTaskCount(4); // Expect merge to fall back to provided taskCountMin (4) - EasyMock.expectLastCall().once(); - EasyMock.replay(newIoConfig); - - DataSchema newDataSchema = EasyMock.createMock(DataSchema.class); - EasyMock.expect(newDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.replay(newDataSchema); - - SeekableStreamSupervisorIngestionSpec newIngestionSchema = - EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); - EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes(); - EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(newDataSchema).anyTimes(); - EasyMock.replay(newIngestionSchema); - - TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec( - "my-id", - newIngestionSchema - ) - { - @Override - public List getDataSources() - { - return Collections.singletonList("datasource1"); - } - }; - - EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - - final SupervisorSpec existingSpecForMerge = existingSpec; - Capture capturedNewSpec = EasyMock.newCapture(); - EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(EasyMock.capture(capturedNewSpec))) - .andAnswer(() -> { - SupervisorSpec arg = (SupervisorSpec) EasyMock.getCurrentArguments()[0]; - arg.merge(existingSpecForMerge); - return true; - }); - - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) - .andReturn(Optional.of(existingSpec)) - .anyTimes(); - - setupMockRequest(); - setupMockRequestForAudit(); - - EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); - auditManager.doAudit(EasyMock.anyObject()); - EasyMock.expectLastCall().once(); - - replayAll(); - - Response response = supervisorResource.specPost(newSpec, false, request); - verifyAll(); - - Assert.assertEquals(200, response.getStatus()); - } - - private void setupMockRequest() - { - EasyMock.expect(request.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).atLeastOnce(); - EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).atLeastOnce(); - EasyMock.expect(request.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)) - .andReturn(new AuthenticationResult("druid", "druid", null, null)) - .atLeastOnce(); - request.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true); - EasyMock.expectLastCall().anyTimes(); - } - - private void setupMockRequestForAudit() - { - EasyMock.expect(request.getHeader(AuditManager.X_DRUID_AUTHOR)).andReturn("author").once(); - EasyMock.expect(request.getHeader(AuditManager.X_DRUID_COMMENT)).andReturn("comment").once(); - - EasyMock.expect(request.getRemoteAddr()).andReturn("127.0.0.1").once(); - EasyMock.expect(request.getMethod()).andReturn("POST").once(); - EasyMock.expect(request.getRequestURI()).andReturn("supes").once(); - EasyMock.expect(request.getQueryString()).andReturn("a=b").once(); - } - - static class TestSeekableStreamSupervisorSpec extends SeekableStreamSupervisorSpec - { - public TestSeekableStreamSupervisorSpec( - @Nullable String id, - SeekableStreamSupervisorIngestionSpec ingestionSchema - ) - { - super( - id, - ingestionSchema, - null, - false, - EasyMock.createMock(TaskStorage.class), - EasyMock.createMock(TaskMaster.class), - EasyMock.createMock(IndexerMetadataStorageCoordinator.class), - EasyMock.createMock(SeekableStreamIndexTaskClientFactory.class), - OBJECT_MAPPER, - EasyMock.createMock(ServiceEmitter.class), - EasyMock.createMock(DruidMonitorSchedulerConfig.class), - EasyMock.createMock(RowIngestionMetersFactory.class), - EasyMock.createMock(SupervisorStateManagerConfig.class) - ); - } - - @Override - public Supervisor createSupervisor() - { - return null; - } - - @Override - public String getType() - { - return "test"; - } - - @Override - public String getSource() - { - return "test-stream"; - } - - @Override - protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend) - { - return null; - } - - @JsonIgnore - @Nonnull - @Override - public Set getInputSourceResources() throws UnsupportedOperationException - { - return Collections.singleton(new ResourceAction(new Resource("test", ResourceType.EXTERNAL), Action.READ)); - } - } - - -} From f3d20e010928a32e67155d807d8fdff966338083 Mon Sep 17 00:00:00 2001 From: Sasha Syrotenko Date: Thu, 11 Dec 2025 19:17:52 +0200 Subject: [PATCH 8/8] Cleanup test methods in resource test --- .../supervisor/SupervisorResourceTest.java | 356 ++++-------------- 1 file changed, 66 insertions(+), 290 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index bdc18f82b3f8..0737722d3414 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -364,10 +364,8 @@ public void testSpecGetAllFull() Assert.assertTrue( specs.stream() .allMatch(spec -> - ("id1".equals(spec.getId()) && spec.getDataSource().equals("datasource1") && SPEC1.equals( - spec.getSpec())) || - ("id2".equals(spec.getId()) && spec.getDataSource().equals("datasource2") && SPEC2.equals( - spec.getSpec())) + ("id1".equals(spec.getId()) && spec.getDataSource().equals("datasource1") && SPEC1.equals(spec.getSpec())) || + ("id2".equals(spec.getId()) && spec.getDataSource().equals("datasource2") && SPEC2.equals(spec.getSpec())) ) ); } @@ -1007,9 +1005,7 @@ public void testSpecGetHistory() EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(3); EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id1", null)).andReturn(versions1).times(1); EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id2", null)).andReturn(versions2).times(1); - EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3", null)) - .andReturn(Collections.emptyList()) - .times(1); + EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3", null)).andReturn(Collections.emptyList()).times(1); setupMockRequest(); replayAll(); @@ -1345,317 +1341,97 @@ public void testNoopSupervisorSpecSerde() throws Exception Assert.assertEquals(spec, specRoundTrip); } - @Test public void testSpecPostMergeUsesExistingTaskCountHigherPriorityHasBeenMissed() { - // Tests that when a new spec is submitted without taskCountStart, the merge function - // follows the priority: provided taskCountStart > provided taskCount > existing taskCount > provided taskCountMin. - - // Create an existing spec with taskCount=5 (simulating a scaled state) - HashMap existingAutoScalerConfig = new HashMap<>(); - existingAutoScalerConfig.put("enableTaskAutoScaler", true); - existingAutoScalerConfig.put("taskCountMax", 7); - existingAutoScalerConfig.put("taskCountMin", 1); - // Note: taskCountStart is NOT set in the existing spec - - SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); - EasyMock.expect(existingIoConfig.getAutoScalerConfig()) - .andReturn(OBJECT_MAPPER.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) - .anyTimes(); - EasyMock.expect(existingIoConfig.getStream()).andReturn("test-stream").anyTimes(); - EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes(); // existing taskCount is 5 - EasyMock.replay(existingIoConfig); - - DataSchema existingDataSchema = EasyMock.createMock(DataSchema.class); - EasyMock.expect(existingDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.replay(existingDataSchema); - - SeekableStreamSupervisorIngestionSpec existingIngestionSchema = - EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); - EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes(); - EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(existingDataSchema).anyTimes(); - EasyMock.replay(existingIngestionSchema); - - TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec( - "my-id", - existingIngestionSchema - ); - - // Create a new spec WITHOUT taskCountStart and WITHOUT taskCount in autoScalerConfig - HashMap newAutoScalerConfig = new HashMap<>(); - newAutoScalerConfig.put("enableTaskAutoScaler", true); - newAutoScalerConfig.put("taskCountMax", 8); - newAutoScalerConfig.put("taskCountMin", 2); - // Note: taskCountStart is NOT set - - SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); - EasyMock.expect(newIoConfig.getAutoScalerConfig()) - .andReturn(OBJECT_MAPPER.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) - .anyTimes(); - EasyMock.expect(newIoConfig.getStream()).andReturn("test-stream").anyTimes(); - EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes(); // new spec has no taskCount - newIoConfig.setTaskCount(5); // Expect merge to set taskCount to existing value (5) - EasyMock.expectLastCall().once(); - EasyMock.replay(newIoConfig); - - DataSchema newDataSchema = EasyMock.createMock(DataSchema.class); - EasyMock.expect(newDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.replay(newDataSchema); - - SeekableStreamSupervisorIngestionSpec newIngestionSchema = - EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); - EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes(); - EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(newDataSchema).anyTimes(); - EasyMock.replay(newIngestionSchema); - - TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec( - "my-id", - newIngestionSchema - ) - { - @Override - public List getDataSources() - { - return Collections.singletonList("datasource1"); - } - }; - - // Set up mocks for SupervisorManager behavior - EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - - // Mock createOrUpdateAndStartSupervisor to call merge - final SupervisorSpec existingSpecForMerge = existingSpec; - Capture capturedNewSpec = EasyMock.newCapture(); - EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(EasyMock.capture(capturedNewSpec))) - .andAnswer(() -> { - SupervisorSpec arg = (SupervisorSpec) EasyMock.getCurrentArguments()[0]; - arg.merge(existingSpecForMerge); - return true; - }); - - // Mock getSupervisorSpec to return the existing spec (simulating an update scenario) - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) - .andReturn(Optional.of(existingSpec)) - .anyTimes(); - - setupMockRequest(); - setupMockRequestForAudit(); - - EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); - auditManager.doAudit(EasyMock.anyObject()); - EasyMock.expectLastCall().once(); - - replayAll(); - - // Before merge, taskCountStart should be null in new spec - Assert.assertNull(newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart()); + // New spec has no taskCount -> should use existing taskCount (5) + TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1); + TestSeekableStreamSupervisorSpec newSpec = createTestSpecWithExpectedMerge(null, 2, 5); - // When - Response response = supervisorResource.specPost(newSpec, false, request); - verifyAll(); - - Assert.assertEquals(200, response.getStatus()); + newSpec.merge(existingSpec); + EasyMock.verify(newSpec.getIoConfig()); } @Test public void testSpecPostMergeUsesProvidedTaskCountOverExistingTaskCount() { - // Create an existing spec with taskCount=5 - HashMap existingAutoScalerConfig = new HashMap<>(); - existingAutoScalerConfig.put("enableTaskAutoScaler", true); - existingAutoScalerConfig.put("taskCountMax", 7); - existingAutoScalerConfig.put("taskCountMin", 1); - - SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); - EasyMock.expect(existingIoConfig.getAutoScalerConfig()) - .andReturn(OBJECT_MAPPER.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) - .anyTimes(); - EasyMock.expect(existingIoConfig.getStream()).andReturn("test-stream").anyTimes(); - EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes(); - EasyMock.replay(existingIoConfig); - - DataSchema existingDataSchema = EasyMock.createMock(DataSchema.class); - EasyMock.expect(existingDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.replay(existingDataSchema); - - SeekableStreamSupervisorIngestionSpec existingIngestionSchema = - EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); - EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes(); - EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(existingDataSchema).anyTimes(); - EasyMock.replay(existingIngestionSchema); - - TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec( - "my-id", - existingIngestionSchema - ); - - // Create a new spec with taskCount=3 (provided taskCount should take precedence) - HashMap newAutoScalerConfig = new HashMap<>(); - newAutoScalerConfig.put("enableTaskAutoScaler", true); - newAutoScalerConfig.put("taskCountMax", 8); - newAutoScalerConfig.put("taskCountMin", 2); + // New spec has taskCount=3 -> should use provided taskCount over existing (5) + TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1); + TestSeekableStreamSupervisorSpec newSpec = createTestSpecWithExpectedMerge(3, 2, 3); - SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); - EasyMock.expect(newIoConfig.getAutoScalerConfig()) - .andReturn(OBJECT_MAPPER.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) - .anyTimes(); - EasyMock.expect(newIoConfig.getStream()).andReturn("test-stream").anyTimes(); - EasyMock.expect(newIoConfig.getTaskCount()).andReturn(3).anyTimes(); // provided taskCount=3 - newIoConfig.setTaskCount(3); // Expect merge to use provided taskCount (3) - EasyMock.expectLastCall().once(); - EasyMock.replay(newIoConfig); - - DataSchema newDataSchema = EasyMock.createMock(DataSchema.class); - EasyMock.expect(newDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.replay(newDataSchema); - - SeekableStreamSupervisorIngestionSpec newIngestionSchema = - EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); - EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes(); - EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(newDataSchema).anyTimes(); - EasyMock.replay(newIngestionSchema); - - TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec( - "my-id", - newIngestionSchema - ) - { - @Override - public List getDataSources() - { - return Collections.singletonList("datasource1"); - } - }; - - EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); - - final SupervisorSpec existingSpecForMerge = existingSpec; - Capture capturedNewSpec = EasyMock.newCapture(); - EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(EasyMock.capture(capturedNewSpec))) - .andAnswer(() -> { - SupervisorSpec arg = (SupervisorSpec) EasyMock.getCurrentArguments()[0]; - arg.merge(existingSpecForMerge); - return true; - }); - - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) - .andReturn(Optional.of(existingSpec)) - .anyTimes(); - - setupMockRequest(); - setupMockRequestForAudit(); - - EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); - auditManager.doAudit(EasyMock.anyObject()); - EasyMock.expectLastCall().once(); - - replayAll(); - - Response response = supervisorResource.specPost(newSpec, false, request); - verifyAll(); - - Assert.assertEquals(200, response.getStatus()); + newSpec.merge(existingSpec); + EasyMock.verify(newSpec.getIoConfig()); } @Test public void testSpecPostMergeFallsBackToProvidedTaskCountMin() { - // Create an existing spec with no taskCount - HashMap existingAutoScalerConfig = new HashMap<>(); - existingAutoScalerConfig.put("enableTaskAutoScaler", true); - existingAutoScalerConfig.put("taskCountMax", 7); - existingAutoScalerConfig.put("taskCountMin", 1); - - SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); - EasyMock.expect(existingIoConfig.getAutoScalerConfig()) - .andReturn(OBJECT_MAPPER.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) - .anyTimes(); - EasyMock.expect(existingIoConfig.getStream()).andReturn("test-stream").anyTimes(); - EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(null).anyTimes(); // existing has no taskCount - EasyMock.replay(existingIoConfig); - - DataSchema existingDataSchema = EasyMock.createMock(DataSchema.class); - EasyMock.expect(existingDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.replay(existingDataSchema); - - SeekableStreamSupervisorIngestionSpec existingIngestionSchema = - EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); - EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes(); - EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(existingDataSchema).anyTimes(); - EasyMock.replay(existingIngestionSchema); + // Neither has taskCount -> should fall back to taskCountMin (4) + TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(null, 1); + TestSeekableStreamSupervisorSpec newSpec = createTestSpecWithExpectedMerge(null, 4, 4); - TestSeekableStreamSupervisorSpec existingSpec = new TestSeekableStreamSupervisorSpec( - "my-id", - existingIngestionSchema - ); - - // Create a new spec with taskCountMin=4, no taskCount - HashMap newAutoScalerConfig = new HashMap<>(); - newAutoScalerConfig.put("enableTaskAutoScaler", true); - newAutoScalerConfig.put("taskCountMax", 8); - newAutoScalerConfig.put("taskCountMin", 4); // provided taskCountMin + newSpec.merge(existingSpec); + EasyMock.verify(newSpec.getIoConfig()); + } - SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); - EasyMock.expect(newIoConfig.getAutoScalerConfig()) - .andReturn(OBJECT_MAPPER.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) + private TestSeekableStreamSupervisorSpec createTestSpec(Integer taskCount, int taskCountMin) + { + HashMap autoScalerConfig = new HashMap<>(); + autoScalerConfig.put("enableTaskAutoScaler", true); + autoScalerConfig.put("taskCountMax", 10); + autoScalerConfig.put("taskCountMin", taskCountMin); + + SeekableStreamSupervisorIOConfig ioConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(ioConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(autoScalerConfig, AutoScalerConfig.class)) .anyTimes(); - EasyMock.expect(newIoConfig.getStream()).andReturn("test-stream").anyTimes(); - EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes(); // no provided taskCount - newIoConfig.setTaskCount(4); // Expect merge to fall back to provided taskCountMin (4) - EasyMock.expectLastCall().once(); - EasyMock.replay(newIoConfig); + EasyMock.expect(ioConfig.getTaskCount()).andReturn(taskCount).anyTimes(); + EasyMock.replay(ioConfig); - DataSchema newDataSchema = EasyMock.createMock(DataSchema.class); - EasyMock.expect(newDataSchema.getDataSource()).andReturn("datasource1").anyTimes(); - EasyMock.replay(newDataSchema); + DataSchema dataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(dataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(dataSchema); - SeekableStreamSupervisorIngestionSpec newIngestionSchema = + SeekableStreamSupervisorIngestionSpec ingestionSchema = EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); - EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes(); - EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(newDataSchema).anyTimes(); - EasyMock.replay(newIngestionSchema); - - TestSeekableStreamSupervisorSpec newSpec = new TestSeekableStreamSupervisorSpec( - "my-id", - newIngestionSchema - ) - { - @Override - public List getDataSources() - { - return Collections.singletonList("datasource1"); - } - }; + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.replay(ingestionSchema); - EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)); + return new TestSeekableStreamSupervisorSpec("my-id", ingestionSchema); + } - final SupervisorSpec existingSpecForMerge = existingSpec; - Capture capturedNewSpec = EasyMock.newCapture(); - EasyMock.expect(supervisorManager.createOrUpdateAndStartSupervisor(EasyMock.capture(capturedNewSpec))) - .andAnswer(() -> { - SupervisorSpec arg = (SupervisorSpec) EasyMock.getCurrentArguments()[0]; - arg.merge(existingSpecForMerge); - return true; - }); - - EasyMock.expect(supervisorManager.getSupervisorSpec("my-id")) - .andReturn(Optional.of(existingSpec)) + private TestSeekableStreamSupervisorSpec createTestSpecWithExpectedMerge( + Integer taskCount, + int taskCountMin, + int expectedTaskCount + ) + { + HashMap autoScalerConfig = new HashMap<>(); + autoScalerConfig.put("enableTaskAutoScaler", true); + autoScalerConfig.put("taskCountMax", 10); + autoScalerConfig.put("taskCountMin", taskCountMin); + + SeekableStreamSupervisorIOConfig ioConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(ioConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(autoScalerConfig, AutoScalerConfig.class)) .anyTimes(); - - setupMockRequest(); - setupMockRequestForAudit(); - - EasyMock.expect(authConfig.isEnableInputSourceSecurity()).andReturn(true); - auditManager.doAudit(EasyMock.anyObject()); + EasyMock.expect(ioConfig.getTaskCount()).andReturn(taskCount).anyTimes(); + ioConfig.setTaskCount(expectedTaskCount); EasyMock.expectLastCall().once(); + EasyMock.replay(ioConfig); - replayAll(); + DataSchema dataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(dataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(dataSchema); - Response response = supervisorResource.specPost(newSpec, false, request); - verifyAll(); + SeekableStreamSupervisorIngestionSpec ingestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.replay(ingestionSchema); - Assert.assertEquals(200, response.getStatus()); + return new TestSeekableStreamSupervisorSpec("my-id", ingestionSchema); } private void setupMockRequest()