diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index f467cc55104d..24d8b7ac1c2c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -65,7 +65,8 @@ public KafkaSupervisorIOConfig( @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod, @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime, @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides, - @JsonProperty("idleConfig") IdleConfig idleConfig + @JsonProperty("idleConfig") IdleConfig idleConfig, + @JsonProperty("stopTaskCount") Integer stopTaskCount ) { super( @@ -82,7 +83,8 @@ public KafkaSupervisorIOConfig( earlyMessageRejectionPeriod, autoScalerConfig, lateMessageRejectionStartDateTime, - idleConfig + idleConfig, + stopTaskCount ); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index 025ccc38584b..d3efa761eade 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -178,6 +178,7 @@ public void testSample() null, null, null, + null, null ), null, @@ -237,6 +238,7 @@ public void testSampleKafkaInputFormat() null, null, null, + null, null ), null, @@ -338,6 +340,7 @@ public void testWithInputRowParser() throws IOException null, null, null, + null, null ), null, @@ -520,6 +523,7 @@ public void testInvalidKafkaConfig() null, null, null, + null, null ), null, @@ -574,6 +578,7 @@ public void testGetInputSourceResources() null, null, null, + null, null ), null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 5a4a15590c7f..154b85205475 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -307,6 +307,7 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException null, null, null, + null, null ); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); @@ -348,7 +349,8 @@ public void testIdleConfigSerde() throws JsonProcessingException null, null, null, - mapper.convertValue(idleConfig, IdleConfig.class) + mapper.convertValue(idleConfig, IdleConfig.class), + null ); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 171883103f18..f473c0412505 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -308,7 +308,8 @@ public SeekableStreamIndexTaskClient build( null, null, null, - new IdleConfig(true, 1000L) + new IdleConfig(true, 1000L), + 1 ); final KafkaSupervisorTuningConfig tuningConfigOri = new KafkaSupervisorTuningConfig( @@ -4516,7 +4517,8 @@ private TestableKafkaSupervisor getTestableSupervisor( earlyMessageRejectionPeriod, null, null, - idleConfig + idleConfig, + null ); KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( @@ -4627,6 +4629,7 @@ private TestableKafkaSupervisor getTestableSupervisorCustomIsTaskCurrent( earlyMessageRejectionPeriod, null, null, + null, null ); @@ -4742,6 +4745,7 @@ private KafkaSupervisor getSupervisor( earlyMessageRejectionPeriod, null, null, + null, null ); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java index 325283631047..a568aea263ca 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java @@ -93,7 +93,8 @@ public KinesisSupervisorIOConfig( earlyMessageRejectionPeriod, autoScalerConfig, lateMessageRejectionStartDateTime, - new IdleConfig(null, null) + new IdleConfig(null, null), + null ); this.endpoint = endpoint != null diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index ea444bd734c5..a72ba05eacad 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1574,6 +1574,7 @@ public void gracefulShutdownInternal() throws ExecutionException, InterruptedExc } } } + earlyStopTime = DateTimes.EPOCH; checkTaskDuration(); } @@ -2934,33 +2935,43 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException final List>> futures = new ArrayList<>(); final List futureGroupIds = new ArrayList<>(); + boolean stopTasksEarly = false; + if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || earlyStopTime.isEqualNow())) { + log.info("Early stop requested - signalling tasks to complete"); + + earlyStopTime = null; + stopTasksEarly = true; + } + + int stoppedTasks = 0; for (Entry entry : activelyReadingTaskGroups.entrySet()) { Integer groupId = entry.getKey(); TaskGroup group = entry.getValue(); - // find the longest running task from this group - DateTime earliestTaskStart = DateTimes.nowUtc(); - for (TaskData taskData : group.tasks.values()) { - if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) { - earliestTaskStart = taskData.startTime; - } - } - - - boolean stopTasksEarly = false; - if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || earlyStopTime.isEqualNow())) { - log.info("Early stop requested - signalling tasks to complete"); - - earlyStopTime = null; - stopTasksEarly = true; - } - - - // if this task has run longer than the configured duration, signal all tasks in the group to persist - if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || stopTasksEarly) { - log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration()); + if (stopTasksEarly) { + log.info("Stopping task group [%d] early. It has run for [%s]", groupId, ioConfig.getTaskDuration()); futureGroupIds.add(groupId); futures.add(checkpointTaskGroup(group, true)); + } else { + // find the longest running task from this group + DateTime earliestTaskStart = DateTimes.nowUtc(); + for (TaskData taskData : group.tasks.values()) { + if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) { + earliestTaskStart = taskData.startTime; + } + } + + if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) { + // if this task has run longer than the configured duration + // as long as the pending task groups are less than the configured stop task count. + if (pendingCompletionTaskGroups.values().stream().mapToInt(CopyOnWriteArrayList::size).sum() + stoppedTasks + < ioConfig.getStopTaskCount()) { + log.info("Task group [%d] has run for [%s]. Stopping.", groupId, ioConfig.getTaskDuration()); + futureGroupIds.add(groupId); + futures.add(checkpointTaskGroup(group, true)); + stoppedTasks++; + } + } } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java index 700236a74151..d49ceaa260cf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java @@ -50,6 +50,8 @@ public abstract class SeekableStreamSupervisorIOConfig @Nullable private final AutoScalerConfig autoScalerConfig; @Nullable private final IdleConfig idleConfig; + private final int stopTaskCount; + public SeekableStreamSupervisorIOConfig( String stream, @Nullable InputFormat inputFormat, @@ -64,7 +66,8 @@ public SeekableStreamSupervisorIOConfig( Period earlyMessageRejectionPeriod, @Nullable AutoScalerConfig autoScalerConfig, DateTime lateMessageRejectionStartDateTime, - @Nullable IdleConfig idleConfig + @Nullable IdleConfig idleConfig, + @Nullable Integer stopTaskCount ) { this.stream = Preconditions.checkNotNull(stream, "stream cannot be null"); @@ -78,6 +81,8 @@ public SeekableStreamSupervisorIOConfig( } else { this.taskCount = taskCount != null ? taskCount : 1; } + this.stopTaskCount = stopTaskCount == null ? this.taskCount : stopTaskCount; + Preconditions.checkArgument(this.stopTaskCount > 0, "stopTaskCount must be greater than 0"); this.taskDuration = defaultDuration(taskDuration, "PT1H"); this.startDelay = defaultDuration(startDelay, "PT5S"); this.period = defaultDuration(period, "PT30S"); @@ -199,4 +204,10 @@ public IdleConfig getIdleConfig() { return idleConfig; } + + @JsonProperty + public int getStopTaskCount() + { + return stopTaskCount; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java index 8bc91f7c1fa6..87cd196c268f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java @@ -347,7 +347,8 @@ private TestableSeekableStreamSupervisorIOConfig( earlyMessageRejectionPeriod, autoScalerConfig, lateMessageRejectionStartDateTime, - idleConfig + idleConfig, + null ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 1253310a0a3e..05e6401a8f6b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -865,6 +865,7 @@ public void testSeekableStreamSupervisorSpecWithScaleDisable() throws Interrupte null, null, null, + null, null ) { @@ -919,7 +920,8 @@ public void testEnablingIdleBeviourPerSupervisorWithOverlordConfigEnabled() null, null, null, - new IdleConfig(true, null) + new IdleConfig(true, null), + null ) { }; @@ -1085,6 +1087,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal null, mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class), null, + null, null ) { @@ -1104,6 +1107,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal null, mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), null, + null, null ) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index b12c744ef4d0..c592b9375fda 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -492,7 +492,8 @@ public void testIdleStateTransition() throws Exception null, null, null, - new IdleConfig(true, 200L) + new IdleConfig(true, 200L), + null ) { }).anyTimes(); @@ -1088,6 +1089,7 @@ private void expectEmitterSupervisor(boolean suspended) throws EntryExistsExcept null, null, null, + null, null ) { @@ -1148,6 +1150,7 @@ private static SeekableStreamSupervisorIOConfig getIOConfig() null, OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class), null, + null, null ) {