diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 168b956afd2d..21d2a6265011 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -174,7 +174,10 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) synchronized (lock) { Preconditions.checkState(started, "SupervisorManager not started"); final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec); - possiblyStopAndRemoveSupervisorInternal(spec.getId(), false); + SupervisorSpec existingSpec = possiblyStopAndRemoveSupervisorInternal(spec.getId(), false); + if (existingSpec != null) { + spec.merge(existingSpec); + } createAndStartSupervisorInternal(spec, shouldUpdateSpec); return shouldUpdateSpec; } @@ -183,6 +186,7 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec) /** * Checks whether the submitted SupervisorSpec differs from the current spec in SupervisorManager's supervisor list. * This is used in SupervisorResource specPost to determine whether the Supervisor needs to be restarted + * * @param spec The spec submitted * @return boolean - true only if the spec has been modified, false otherwise */ @@ -221,7 +225,7 @@ public boolean stopAndRemoveSupervisor(String id) synchronized (lock) { Preconditions.checkState(started, "SupervisorManager not started"); - return possiblyStopAndRemoveSupervisorInternal(id, true); + return possiblyStopAndRemoveSupervisorInternal(id, true) != null; } } @@ -299,7 +303,8 @@ public void stop() log.info("SupervisorManager stopped."); } - public List getSupervisorHistoryForId(String id, @Nullable Integer limit) throws IllegalArgumentException + public List getSupervisorHistoryForId(String id, @Nullable Integer limit) + throws IllegalArgumentException { return metadataSupervisorManager.getAllForId(id, limit); } @@ -429,13 +434,14 @@ public boolean registerUpgradedPendingSegmentOnSupervisor( * Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be * starting, stopping, suspending and resuming supervisors. * - * @return true if a supervisor was stopped, false if there was no supervisor with this id + * @return reference to existing supervisor, if exists and was stopped, null if there was no supervisor with this id */ - private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean writeTombstone) + @Nullable + private SupervisorSpec possiblyStopAndRemoveSupervisorInternal(String id, boolean writeTombstone) { Pair pair = supervisors.get(id); - if (pair == null) { - return false; + if (pair == null || pair.rhs == null || pair.lhs == null) { + return null; } if (writeTombstone) { @@ -447,13 +453,13 @@ private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean write pair.lhs.stop(true); supervisors.remove(id); - SupervisorTaskAutoScaler autoscler = autoscalers.get(id); - if (autoscler != null) { - autoscler.stop(); + SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); + if (autoscaler != null) { + autoscaler.stop(); autoscalers.remove(id); } - return true; + return pair.rhs; } /** diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 0f2839e7fba6..f69dc52159b9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -135,8 +135,8 @@ import java.util.stream.Stream; /** - * this class is the parent class of both the Kafka and Kinesis supervisor. All the main run loop - * logic are similar enough so they're grouped together into this class. + * This class is the parent class of both the Kafka and Kinesis supervisor. All the main run loop + * logic is similar enough, so they're grouped together into this class. *

* Supervisor responsible for managing the SeekableStreamIndexTasks (Kafka/Kinesis) for a single dataSource. At a high level, the class accepts a * {@link SeekableStreamSupervisorSpec} which includes the stream name (topic / stream) and configuration as well as an ingestion spec which will @@ -541,10 +541,20 @@ public String getType() /** * This method determines how to do scale actions based on collected lag points. - * If scale action is triggered : - * First of all, call gracefulShutdownInternal() which will change the state of current datasource ingest tasks from reading to publishing. - * Secondly, clear all the stateful data structures: activelyReadingTaskGroups, partitionGroups, partitionOffsets, pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in the next 'RunNotice'. - * Finally, change the taskCount in SeekableStreamSupervisorIOConfig and sync it to MetadataStorage. + * If scale action is triggered: + *

    + *
  • First, call gracefulShutdownInternal() which will change the state of current datasource ingest tasks from reading to publishing. + *
  • Secondly, clear all the stateful data structures: + *
      + *
    • activelyReadingTaskGroups, + *
    • partitionGroups, + *
    • partitionOffsets, + *
    • pendingCompletionTaskGroups, + *
    • partitionIds. + *
    + * These structures will be rebuiled in the next 'RunNotice'. + *
  • Finally, change the taskCount in SeekableStreamSupervisorIOConfig and sync it to MetadataStorage. + *
* After the taskCount is changed in SeekableStreamSupervisorIOConfig, next RunNotice will create scaled number of ingest tasks without resubmitting the supervisor. * * @param desiredActiveTaskCount desired taskCount computed from AutoScaler @@ -916,7 +926,7 @@ public String getType() private volatile boolean lifecycleStarted = false; private final ServiceEmitter emitter; - // snapshots latest sequences from stream to be verified in next run cycle of inactive stream check + // snapshots latest sequences from the stream to be verified in the next run cycle of inactive stream check private final Map previousSequencesFromStream = new HashMap<>(); private long lastActiveTimeMillis; private final IdleConfig idleConfig; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 967652673f7c..f21e073f6c4c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -43,16 +43,18 @@ import org.apache.druid.segment.indexing.DataSchema; import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; import java.util.List; import java.util.Map; public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec { - protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE = "Update of the input source stream from [%s] to [%s] is not supported for a running supervisor." - + "%nTo perform the update safely, follow these steps:" - + "%n(1) Suspend this supervisor, reset its offsets and then terminate it. " - + "%n(2) Create a new supervisor with the new input source stream." - + "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too."; + protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE = + "Update of the input source stream from [%s] to [%s] is not supported for a running supervisor." + + "%nTo perform the update safely, follow these steps:" + + "%n(1) Suspend this supervisor, reset its offsets and then terminate it. " + + "%n(2) Create a new supervisor with the new input source stream." + + "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too."; private static SeekableStreamSupervisorIngestionSpec checkIngestionSchema( SeekableStreamSupervisorIngestionSpec ingestionSchema @@ -183,6 +185,7 @@ public DruidMonitorSchedulerConfig getMonitorSchedulerConfig() /** * An autoScaler instance will be returned depending on the autoScalerConfig. In case autoScalerConfig is null or autoScaler is disabled then NoopTaskAutoScaler will be returned. + * * @param supervisor * @return autoScaler */ @@ -232,6 +235,7 @@ public boolean isSuspended() *
  • You cannot migrate between types of supervisors.
  • *
  • You cannot change the input source stream of a running supervisor.
  • * + * * @param proposedSpec the proposed supervisor spec * @throws DruidException if the proposed spec update is not allowed */ @@ -240,7 +244,9 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept { if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) { throw InvalidInput.exception( - "Cannot update supervisor spec from type[%s] to type[%s]", getClass().getSimpleName(), proposedSpec.getClass().getSimpleName() + "Cannot update supervisor spec from type[%s] to type[%s]", + getClass().getSimpleName(), + proposedSpec.getClass().getSimpleName() ); } SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec) proposedSpec; @@ -255,6 +261,33 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept } } + @Override + public void merge(@NotNull SupervisorSpec existingSpec) + { + AutoScalerConfig thisAutoScalerConfig = this.getIoConfig().getAutoScalerConfig(); + // Either if autoscaler is absent or taskCountStart is specified - just return. + if (thisAutoScalerConfig == null || thisAutoScalerConfig.getTaskCountStart() != null) { + return; + } + + // Use a switch expression with pattern matching when we move to Java 21 as a minimum requirement. + if (existingSpec instanceof SeekableStreamSupervisorSpec) { + SeekableStreamSupervisorSpec spec = (SeekableStreamSupervisorSpec) existingSpec; + AutoScalerConfig autoScalerConfig = spec.getIoConfig().getAutoScalerConfig(); + if (autoScalerConfig == null) { + return; + } + // provided `taskCountStart` > provided `taskCount` > existing `taskCount` > provided `taskCountMin`. + int taskCount = thisAutoScalerConfig.getTaskCountMin(); + if (this.getIoConfig().getTaskCount() != null) { + taskCount = this.getIoConfig().getTaskCount(); + } else if (spec.getIoConfig().getTaskCount() != null) { + taskCount = spec.getIoConfig().getTaskCount(); + } + this.getIoConfig().setTaskCount(taskCount); + } + } + protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index b1446c3d4c14..142193ae63f6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -124,8 +124,10 @@ public void start() ); log.info( "LagBasedAutoScaler will collect lag every [%d] millis and will keep up to [%d] data points for the last [%d] millis for dataSource [%s]", - lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), lagMetricsQueue.maxSize(), - lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource + lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), + lagMetricsQueue.maxSize(), + lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), + dataSource ); } @@ -192,19 +194,25 @@ private Runnable computeAndCollectLag() /** * This method determines whether to do scale actions based on collected lag points. - * Current algorithm of scale is simple: - * First of all, compute the proportion of lag points higher/lower than scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold. - * Secondly, compare scaleOutThreshold/scaleInThreshold with triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold. P.S. Scale out action has higher priority than scale in action. - * Finaly, if scaleOutThreshold/scaleInThreshold is higher than triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold, scale out/in action would be triggered. + * The current algorithm of scale is straightforward: + *
      + *
    • First, compute the proportion of lag points higher/lower than {@code scaleOutThreshold/scaleInThreshold}, + * getting {@code scaleInThreshold/scaleOutThreshold},. + *
    • Secondly, compare {@code scaleInThreshold/scaleOutThreshold} with + * {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}. + *
      • P.S. Scale out action has a higher priority than scale in action.
      + *
    • Finally, if {@code scaleOutThreshold/scaleInThreshold}, is higher than + * {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}, scale out/in action would be triggered. + *
    * - * @param lags the lag metrics of Stream(Kafka/Kinesis) - * @return Integer. target number of tasksCount, -1 means skip scale action. + * @param lags the lag metrics of Stream (Kafka/Kinesis) + * @return Integer, target number of tasksCount. -1 means skip scale action. */ private int computeDesiredTaskCount(List lags) { - // if supervisor is not suspended, ensure required tasks are running + // if the supervisor is not suspended, ensure required tasks are running // if suspended, ensure tasks have been requested to gracefully stop - log.debug("Computing desired task count for [%s], based on following lags : [%s]", dataSource, lags); + log.debug("Computing the desired task count for [%s], based on following lags : [%s]", dataSource, lags); int beyond = 0; int within = 0; int metricsCount = lags.size(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java index b4a9b0e8891c..ad036dd0e10a 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java @@ -103,8 +103,8 @@ public LagBasedAutoScalerConfig( this.scaleInStep = scaleInStep != null ? scaleInStep : 1; this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2; - this.minTriggerScaleActionFrequencyMillis = minTriggerScaleActionFrequencyMillis - != null ? minTriggerScaleActionFrequencyMillis : 600000; + this.minTriggerScaleActionFrequencyMillis = + minTriggerScaleActionFrequencyMillis != null ? minTriggerScaleActionFrequencyMillis : 600000; Preconditions.checkArgument( stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 && stopTaskCountRatio <= 1.0), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 538fd4a9e78f..0737722d3414 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -27,12 +27,23 @@ import com.google.common.collect.ImmutableSet; import org.apache.druid.audit.AuditManager; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.TestSeekableStreamDataSourceMetadata; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.incremental.RowIngestionMetersFactory; +import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.Action; import org.apache.druid.server.security.AuthConfig; @@ -54,6 +65,7 @@ import org.junit.runner.RunWith; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.Response; import java.util.Collections; @@ -61,6 +73,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; @RunWith(EasyMockRunner.class) @@ -1083,7 +1096,9 @@ public void testSpecGetHistoryWithAuthFailure() EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id1", null)).andReturn(versions1).times(1); EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id2", null)).andReturn(versions2).times(1); EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id3", null)).andReturn(versions3).times(1); - EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id4", null)).andReturn(Collections.emptyList()).times(1); + EasyMock.expect(supervisorManager.getSupervisorHistoryForId("id4", null)) + .andReturn(Collections.emptyList()) + .times(1); setupMockRequestForUser("notdruid"); replayAll(); @@ -1183,12 +1198,18 @@ public void testSpecGetHistoryWithLimit() // Test with limit=0 (should return 400 Bad Request) response = supervisorResource.specGetHistory(request, "id1", 0); Assert.assertEquals(400, response.getStatus()); - Assert.assertEquals(ImmutableMap.of("error", "Count must be greater than zero if set (count was 0)"), response.getEntity()); + Assert.assertEquals( + ImmutableMap.of("error", "Count must be greater than zero if set (count was 0)"), + response.getEntity() + ); // Test with negative limit (should return 400 Bad Request) response = supervisorResource.specGetHistory(request, "id1", -1); Assert.assertEquals(400, response.getStatus()); - Assert.assertEquals(ImmutableMap.of("error", "Count must be greater than zero if set (count was -1)"), response.getEntity()); + Assert.assertEquals( + ImmutableMap.of("error", "Count must be greater than zero if set (count was -1)"), + response.getEntity() + ); // Test with limit larger than available history response = supervisorResource.specGetHistory(request, "id1", 100); @@ -1320,6 +1341,99 @@ public void testNoopSupervisorSpecSerde() throws Exception Assert.assertEquals(spec, specRoundTrip); } + @Test + public void testSpecPostMergeUsesExistingTaskCountHigherPriorityHasBeenMissed() + { + // New spec has no taskCount -> should use existing taskCount (5) + TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1); + TestSeekableStreamSupervisorSpec newSpec = createTestSpecWithExpectedMerge(null, 2, 5); + + newSpec.merge(existingSpec); + EasyMock.verify(newSpec.getIoConfig()); + } + + @Test + public void testSpecPostMergeUsesProvidedTaskCountOverExistingTaskCount() + { + // New spec has taskCount=3 -> should use provided taskCount over existing (5) + TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(5, 1); + TestSeekableStreamSupervisorSpec newSpec = createTestSpecWithExpectedMerge(3, 2, 3); + + newSpec.merge(existingSpec); + EasyMock.verify(newSpec.getIoConfig()); + } + + @Test + public void testSpecPostMergeFallsBackToProvidedTaskCountMin() + { + // Neither has taskCount -> should fall back to taskCountMin (4) + TestSeekableStreamSupervisorSpec existingSpec = createTestSpec(null, 1); + TestSeekableStreamSupervisorSpec newSpec = createTestSpecWithExpectedMerge(null, 4, 4); + + newSpec.merge(existingSpec); + EasyMock.verify(newSpec.getIoConfig()); + } + + private TestSeekableStreamSupervisorSpec createTestSpec(Integer taskCount, int taskCountMin) + { + HashMap autoScalerConfig = new HashMap<>(); + autoScalerConfig.put("enableTaskAutoScaler", true); + autoScalerConfig.put("taskCountMax", 10); + autoScalerConfig.put("taskCountMin", taskCountMin); + + SeekableStreamSupervisorIOConfig ioConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(ioConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(autoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(ioConfig.getTaskCount()).andReturn(taskCount).anyTimes(); + EasyMock.replay(ioConfig); + + DataSchema dataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(dataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(dataSchema); + + SeekableStreamSupervisorIngestionSpec ingestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.replay(ingestionSchema); + + return new TestSeekableStreamSupervisorSpec("my-id", ingestionSchema); + } + + private TestSeekableStreamSupervisorSpec createTestSpecWithExpectedMerge( + Integer taskCount, + int taskCountMin, + int expectedTaskCount + ) + { + HashMap autoScalerConfig = new HashMap<>(); + autoScalerConfig.put("enableTaskAutoScaler", true); + autoScalerConfig.put("taskCountMax", 10); + autoScalerConfig.put("taskCountMin", taskCountMin); + + SeekableStreamSupervisorIOConfig ioConfig = EasyMock.createMock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(ioConfig.getAutoScalerConfig()) + .andReturn(OBJECT_MAPPER.convertValue(autoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(ioConfig.getTaskCount()).andReturn(taskCount).anyTimes(); + ioConfig.setTaskCount(expectedTaskCount); + EasyMock.expectLastCall().once(); + EasyMock.replay(ioConfig); + + DataSchema dataSchema = EasyMock.createMock(DataSchema.class); + EasyMock.expect(dataSchema.getDataSource()).andReturn("datasource1").anyTimes(); + EasyMock.replay(dataSchema); + + SeekableStreamSupervisorIngestionSpec ingestionSchema = + EasyMock.createMock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(ioConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.replay(ingestionSchema); + + return new TestSeekableStreamSupervisorSpec("my-id", ingestionSchema); + } + private void setupMockRequest() { setupMockRequestForUser("druid"); @@ -1445,10 +1559,10 @@ public boolean equals(Object o) if (getId() != null ? !getId().equals(that.getId()) : that.getId() != null) { return false; } - if (supervisor != null ? !supervisor.equals(that.supervisor) : that.supervisor != null) { + if (!Objects.equals(supervisor, that.supervisor)) { return false; } - if (datasources != null ? !datasources.equals(that.datasources) : that.datasources != null) { + if (!Objects.equals(datasources, that.datasources)) { return false; } return isSuspended() == that.isSuspended(); @@ -1464,4 +1578,61 @@ public int hashCode() return result; } } + + static class TestSeekableStreamSupervisorSpec extends SeekableStreamSupervisorSpec + { + public TestSeekableStreamSupervisorSpec( + @Nullable String id, + SeekableStreamSupervisorIngestionSpec ingestionSchema + ) + { + super( + id, + ingestionSchema, + null, + false, + EasyMock.createMock(TaskStorage.class), + EasyMock.createMock(TaskMaster.class), + EasyMock.createMock(IndexerMetadataStorageCoordinator.class), + EasyMock.createMock(SeekableStreamIndexTaskClientFactory.class), + OBJECT_MAPPER, + EasyMock.createMock(ServiceEmitter.class), + EasyMock.createMock(DruidMonitorSchedulerConfig.class), + EasyMock.createMock(RowIngestionMetersFactory.class), + EasyMock.createMock(SupervisorStateManagerConfig.class) + ); + } + + @Override + public Supervisor createSupervisor() + { + return null; + } + + @Override + public String getType() + { + return "test"; + } + + @Override + public String getSource() + { + return "test-stream"; + } + + @Override + protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend) + { + return null; + } + + @JsonIgnore + @Nonnull + @Override + public Set getInputSourceResources() throws UnsupportedOperationException + { + return Collections.singleton(new ResourceAction(new Resource("test", ResourceType.EXTERNAL), Action.READ)); + } + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index e96e38873189..58ffb8438e0d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -566,14 +566,16 @@ public void testAutoScalerConfig() Exception e = null; try { - AutoScalerConfig autoScalerError = mapper.convertValue(ImmutableMap.of( - "enableTaskAutoScaler", - "true", - "taskCountMax", - "1", - "taskCountMin", - "4" - ), AutoScalerConfig.class); + AutoScalerConfig autoScalerError = mapper.convertValue( + ImmutableMap.of( + "enableTaskAutoScaler", + "true", + "taskCountMax", + "1", + "taskCountMin", + "4" + ), AutoScalerConfig.class + ); } catch (RuntimeException ex) { e = ex; @@ -685,16 +687,18 @@ public void testDefaultAutoScalerConfigCreatedWithDefault() EasyMock.replay(ingestionSchema); EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig()) - .andReturn(mapper.convertValue(ImmutableMap.of( - "lagCollectionIntervalMillis", - "1", - "enableTaskAutoScaler", - true, - "taskCountMax", - "4", - "taskCountMin", - "1" - ), AutoScalerConfig.class)) + .andReturn(mapper.convertValue( + ImmutableMap.of( + "lagCollectionIntervalMillis", + "1", + "enableTaskAutoScaler", + true, + "taskCountMax", + "4", + "taskCountMin", + "1" + ), AutoScalerConfig.class + )) .anyTimes(); EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes(); EasyMock.replay(seekableStreamSupervisorIOConfig); @@ -775,6 +779,7 @@ public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedExc supervisor.start(); autoScaler.start(); supervisor.runInternal(); + int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(1, taskCountBeforeScaleOut); Thread.sleep(1000); @@ -1003,9 +1008,11 @@ public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() t supervisor.start(); autoScaler.start(); supervisor.runInternal(); + int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(1, taskCountBeforeScaleOut); Thread.sleep(1000); + int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScaleOut); emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 1); @@ -1050,11 +1057,14 @@ public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedExce // enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin. Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount()); supervisor.getIoConfig().setTaskCount(2); + supervisor.start(); autoScaler.start(); supervisor.runInternal(); + int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountBeforeScaleOut); + Thread.sleep(1000); int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(1, taskCountAfterScaleOut); @@ -1102,15 +1112,18 @@ public void testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanParti emitter ); - // enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin. + // enable autoscaler so that taskcount config will be ignored and the init value of taskCount will use taskCountMin. Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount()); supervisor.getIoConfig().setTaskCount(2); + + // When supervisor.start(); autoScaler.start(); supervisor.runInternal(); Assert.assertEquals(2, (int) supervisor.getIoConfig().getTaskCount()); Thread.sleep(2000); + // Then Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount()); emitter.verifyEmitted(SeekableStreamSupervisor.AUTOSCALER_SCALING_TIME_METRIC, 1); @@ -1580,6 +1593,97 @@ public String getSource() originalSpec.validateSpecUpdateTo(proposedSpecSameSource); } + @Test + public void testMergeSpecConfigs() + { + mockIngestionSchema(); + + // Given + // Create existing spec with autoscaler config and taskCount set to 5 + HashMap existingAutoScalerConfig = new HashMap<>(); + existingAutoScalerConfig.put("enableTaskAutoScaler", true); + existingAutoScalerConfig.put("taskCountMax", 8); + existingAutoScalerConfig.put("taskCountMin", 1); + + SeekableStreamSupervisorIOConfig existingIoConfig = EasyMock.mock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(existingIoConfig.getAutoScalerConfig()) + .andReturn(mapper.convertValue(existingAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(existingIoConfig.getTaskCount()).andReturn(5).anyTimes(); + EasyMock.replay(existingIoConfig); + + SeekableStreamSupervisorIngestionSpec existingIngestionSchema = EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(existingIngestionSchema.getIOConfig()).andReturn(existingIoConfig).anyTimes(); + EasyMock.expect(existingIngestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(existingIngestionSchema.getTuningConfig()) + .andReturn(seekableStreamSupervisorTuningConfig) + .anyTimes(); + EasyMock.replay(existingIngestionSchema); + + TestSeekableStreamSupervisorSpec existingSpec = buildDefaultSupervisorSpecWithIngestionSchema( + "id123", + existingIngestionSchema + ); + + // Create new spec with autoscaler config that has taskCountStart not set (null) and no taskCount set + HashMap newAutoScalerConfig = new HashMap<>(); + newAutoScalerConfig.put("enableTaskAutoScaler", true); + newAutoScalerConfig.put("taskCountMax", 8); + newAutoScalerConfig.put("taskCountMin", 1); + + SeekableStreamSupervisorIOConfig newIoConfig = EasyMock.mock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(newIoConfig.getAutoScalerConfig()) + .andReturn(mapper.convertValue(newAutoScalerConfig, AutoScalerConfig.class)) + .anyTimes(); + EasyMock.expect(newIoConfig.getTaskCount()).andReturn(null).anyTimes(); + newIoConfig.setTaskCount(5); + EasyMock.expectLastCall().once(); + EasyMock.replay(newIoConfig); + + SeekableStreamSupervisorIngestionSpec newIngestionSchema = EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(newIngestionSchema.getIOConfig()).andReturn(newIoConfig).anyTimes(); + EasyMock.expect(newIngestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(newIngestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(newIngestionSchema); + + TestSeekableStreamSupervisorSpec newSpec = buildDefaultSupervisorSpecWithIngestionSchema( + "id124", + newIngestionSchema + ); + + // Before merge, taskCountStart should be null + Assert.assertNull(newSpec.getIoConfig().getAutoScalerConfig().getTaskCountStart()); + + // When - merge should copy taskCount from existing spec since new spec has no taskCount + newSpec.merge(existingSpec); + + // Then - verify setTaskCount was called (EasyMock will verify the mock expectations) + EasyMock.verify(newIoConfig); + } + + private TestSeekableStreamSupervisorSpec buildDefaultSupervisorSpecWithIngestionSchema( + String id, + SeekableStreamSupervisorIngestionSpec ingestionSchema + ) + { + return new TestSeekableStreamSupervisorSpec( + ingestionSchema, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + id + ); + } + private void mockIngestionSchema() { EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index 9ff217d5404a..377223308b0f 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -27,6 +27,7 @@ import org.apache.druid.server.security.ResourceAction; import javax.annotation.Nonnull; +import javax.validation.constraints.NotNull; import java.util.List; import java.util.Set; @@ -113,4 +114,15 @@ default void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcep { // The default implementation does not do any validation checks. } + + /** + * Updates this supervisor spec by merging values from the given {@code existingSpec}. + * This method may be used to carry forward existing spec values when a supervisor is being resubmitted. + * + * @param existingSpec used spec to merge values from + */ + default void merge(@NotNull SupervisorSpec existingSpec) + { + // No-op by default + } }