From 6ec0bd11c6c3caadcb1c0a49f20e26f0ab324d9d Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Thu, 8 Jun 2023 06:56:07 -0700 Subject: [PATCH 1/9] Rolling supervior task publishing --- .../extensions-core/kafka-supervisor-operations.md | 6 +++--- .../supervisor/SeekableStreamSupervisor.java | 13 +++++++++++-- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/docs/development/extensions-core/kafka-supervisor-operations.md b/docs/development/extensions-core/kafka-supervisor-operations.md index dbfa05174fb3..f447b0ded546 100644 --- a/docs/development/extensions-core/kafka-supervisor-operations.md +++ b/docs/development/extensions-core/kafka-supervisor-operations.md @@ -157,12 +157,12 @@ as it takes to generate segments, push segments to deep storage, and have them b The number of reading tasks is controlled by `replicas` and `taskCount`. In general, there will be `replicas * taskCount` reading tasks, the exception being if taskCount > {numKafkaPartitions} in which case {numKafkaPartitions} tasks will -be used instead. When `taskDuration` elapses, these tasks will transition to publishing state and `replicas * taskCount` -new reading tasks will be created. Therefore to allow for reading tasks and publishing tasks to run concurrently, there +be used instead. When `taskDuration` elapses, one task and it's replicas will transition to publishing state and +new reading tasks will be created. Therefore, to allow for reading tasks and publishing tasks to run concurrently, there should be a minimum capacity of: ``` -workerCapacity = 2 * replicas * taskCount +workerCapacity = replicas * (taskCount + 1) ``` This value is for the ideal situation in which there is at most one set of tasks publishing while another set is reading. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 0ae9aad9631e..2e9c533b7088 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2960,12 +2960,21 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException } - // if this task has run longer than the configured duration, signal all tasks in the group to persist - if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow() || stopTasksEarly) { + if (stopTasksEarly) { log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration()); futureGroupIds.add(groupId); futures.add(checkpointTaskGroup(group, true)); } + // if this task has run longer than the configured duration, signal all tasks in the group to persist + else if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) { + int pendingTasks = pendingCompletionTaskGroups.values().stream().mapToInt(CopyOnWriteArrayList::size).sum(); + if (pendingTasks <= 0) { + log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration()); + futureGroupIds.add(groupId); + futures.add(checkpointTaskGroup(group, true)); + } + break; + } } List>> results = coalesceAndAwait(futures); From 46c131354f44199ade9506215b7b7c1be0ea07e0 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Tue, 13 Jun 2023 22:14:14 -0700 Subject: [PATCH 2/9] add an option for number of task groups to roll over --- .../kafka/supervisor/KafkaSupervisorIOConfig.java | 6 ++++-- .../druid/indexing/kafka/KafkaSamplerSpecTest.java | 5 +++++ .../supervisor/KafkaSupervisorIOConfigTest.java | 4 +++- .../kafka/supervisor/KafkaSupervisorTest.java | 8 ++++++-- .../supervisor/KinesisSupervisorIOConfig.java | 3 ++- .../supervisor/SeekableStreamSupervisor.java | 2 +- .../SeekableStreamSupervisorIOConfig.java | 13 ++++++++++++- .../SeekableStreamSamplerSpecTest.java | 4 +++- .../SeekableStreamSupervisorSpecTest.java | 6 +++++- .../SeekableStreamSupervisorStateTest.java | 5 ++++- 10 files changed, 45 insertions(+), 11 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index f467cc55104d..24d8b7ac1c2c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -65,7 +65,8 @@ public KafkaSupervisorIOConfig( @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod, @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime, @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides, - @JsonProperty("idleConfig") IdleConfig idleConfig + @JsonProperty("idleConfig") IdleConfig idleConfig, + @JsonProperty("stopTaskCount") Integer stopTaskCount ) { super( @@ -82,7 +83,8 @@ public KafkaSupervisorIOConfig( earlyMessageRejectionPeriod, autoScalerConfig, lateMessageRejectionStartDateTime, - idleConfig + idleConfig, + stopTaskCount ); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index 025ccc38584b..d3efa761eade 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -178,6 +178,7 @@ public void testSample() null, null, null, + null, null ), null, @@ -237,6 +238,7 @@ public void testSampleKafkaInputFormat() null, null, null, + null, null ), null, @@ -338,6 +340,7 @@ public void testWithInputRowParser() throws IOException null, null, null, + null, null ), null, @@ -520,6 +523,7 @@ public void testInvalidKafkaConfig() null, null, null, + null, null ), null, @@ -574,6 +578,7 @@ public void testGetInputSourceResources() null, null, null, + null, null ), null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 5a4a15590c7f..154b85205475 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -307,6 +307,7 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException null, null, null, + null, null ); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); @@ -348,7 +349,8 @@ public void testIdleConfigSerde() throws JsonProcessingException null, null, null, - mapper.convertValue(idleConfig, IdleConfig.class) + mapper.convertValue(idleConfig, IdleConfig.class), + null ); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); KafkaSupervisorIOConfig kafkaSupervisorIOConfig1 = mapper.readValue(ioConfig, KafkaSupervisorIOConfig.class); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 97297c085e22..3e66d47925db 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -309,7 +309,8 @@ public SeekableStreamIndexTaskClient build( null, null, null, - new IdleConfig(true, 1000L) + new IdleConfig(true, 1000L), + null ); final KafkaSupervisorTuningConfig tuningConfigOri = new KafkaSupervisorTuningConfig( @@ -4523,7 +4524,8 @@ private TestableKafkaSupervisor getTestableSupervisor( earlyMessageRejectionPeriod, null, null, - idleConfig + idleConfig, + null ); KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( @@ -4637,6 +4639,7 @@ private TestableKafkaSupervisor getTestableSupervisorCustomIsTaskCurrent( earlyMessageRejectionPeriod, null, null, + null, null ); @@ -4755,6 +4758,7 @@ private KafkaSupervisor getSupervisor( earlyMessageRejectionPeriod, null, null, + null, null ); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java index 325283631047..a568aea263ca 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java @@ -93,7 +93,8 @@ public KinesisSupervisorIOConfig( earlyMessageRejectionPeriod, autoScalerConfig, lateMessageRejectionStartDateTime, - new IdleConfig(null, null) + new IdleConfig(null, null), + null ); this.endpoint = endpoint != null diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 2e9c533b7088..5c4fd164c1ca 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2968,7 +2968,7 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException // if this task has run longer than the configured duration, signal all tasks in the group to persist else if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) { int pendingTasks = pendingCompletionTaskGroups.values().stream().mapToInt(CopyOnWriteArrayList::size).sum(); - if (pendingTasks <= 0) { + if (pendingTasks <= ioConfig.getStopTaskCount()) { log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration()); futureGroupIds.add(groupId); futures.add(checkpointTaskGroup(group, true)); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java index 700236a74151..2227aa128bb1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java @@ -50,6 +50,8 @@ public abstract class SeekableStreamSupervisorIOConfig @Nullable private final AutoScalerConfig autoScalerConfig; @Nullable private final IdleConfig idleConfig; + private final Integer stopTaskCount; + public SeekableStreamSupervisorIOConfig( String stream, @Nullable InputFormat inputFormat, @@ -64,7 +66,8 @@ public SeekableStreamSupervisorIOConfig( Period earlyMessageRejectionPeriod, @Nullable AutoScalerConfig autoScalerConfig, DateTime lateMessageRejectionStartDateTime, - @Nullable IdleConfig idleConfig + @Nullable IdleConfig idleConfig, + @Nullable Integer stopTaskCount ) { this.stream = Preconditions.checkNotNull(stream, "stream cannot be null"); @@ -78,6 +81,8 @@ public SeekableStreamSupervisorIOConfig( } else { this.taskCount = taskCount != null ? taskCount : 1; } + this.stopTaskCount = stopTaskCount == null ? this.taskCount : stopTaskCount; + Preconditions.checkArgument(this.stopTaskCount > 0, "stopTaskCount must be greater than 0"); this.taskDuration = defaultDuration(taskDuration, "PT1H"); this.startDelay = defaultDuration(startDelay, "PT5S"); this.period = defaultDuration(period, "PT30S"); @@ -199,4 +204,10 @@ public IdleConfig getIdleConfig() { return idleConfig; } + + @JsonProperty + public int getStopTaskCount() + { + return stopTaskCount; + } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java index 8bc91f7c1fa6..007f316e1440 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java @@ -347,7 +347,9 @@ private TestableSeekableStreamSupervisorIOConfig( earlyMessageRejectionPeriod, autoScalerConfig, lateMessageRejectionStartDateTime, - idleConfig + idleConfig, + // TODO: wire test code through + null ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 52e160058588..ed5ff6798865 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -877,6 +877,7 @@ public void testSeekableStreamSupervisorSpecWithScaleDisable() throws Interrupte null, null, null, + null, null ) { @@ -931,7 +932,8 @@ public void testEnablingIdleBeviourPerSupervisorWithOverlordConfigEnabled() null, null, null, - new IdleConfig(true, null) + new IdleConfig(true, null), + null ) { }; @@ -1097,6 +1099,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal null, mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class), null, + null, null ) { @@ -1116,6 +1119,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal null, mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), null, + null, null ) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index d622984b0bab..cd7b076f78e2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -490,7 +490,8 @@ public void testIdleStateTransition() throws Exception null, null, null, - new IdleConfig(true, 200L) + new IdleConfig(true, 200L), + null ) { }).anyTimes(); @@ -1062,6 +1063,7 @@ private void expectEmitterSupervisor(boolean suspended) throws EntryExistsExcept null, null, null, + null, null ) { @@ -1122,6 +1124,7 @@ private static SeekableStreamSupervisorIOConfig getIOConfig() null, OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class), null, + null, null ) { From 365e03dd4a08866eca704146e3954c8a12cc7707 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Tue, 13 Jun 2023 22:23:53 -0700 Subject: [PATCH 3/9] better --- .../supervisor/SeekableStreamSupervisor.java | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 5c4fd164c1ca..4d11a2d706c8 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1578,6 +1578,7 @@ public void gracefulShutdownInternal() throws ExecutionException, InterruptedExc } } } + earlyStopTime = DateTimes.EPOCH; checkTaskDuration(); } @@ -2938,6 +2939,14 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException final List>> futures = new ArrayList<>(); final List futureGroupIds = new ArrayList<>(); + boolean stopTasksEarly = false; + if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || earlyStopTime.isEqualNow())) { + log.info("Early stop requested - signalling tasks to complete"); + + earlyStopTime = null; + stopTasksEarly = true; + } + for (Entry entry : activelyReadingTaskGroups.entrySet()) { Integer groupId = entry.getKey(); TaskGroup group = entry.getValue(); @@ -2950,16 +2959,6 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException } } - - boolean stopTasksEarly = false; - if (earlyStopTime != null && (earlyStopTime.isBeforeNow() || earlyStopTime.isEqualNow())) { - log.info("Early stop requested - signalling tasks to complete"); - - earlyStopTime = null; - stopTasksEarly = true; - } - - if (stopTasksEarly) { log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration()); futureGroupIds.add(groupId); From b73dcd83e244e1f69ed11961fcd713f61690820a Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Wed, 21 Jun 2023 13:29:03 -0700 Subject: [PATCH 4/9] remove docs --- .../extensions-core/kafka-supervisor-operations.md | 4 ++-- .../supervisor/SeekableStreamSupervisorIOConfig.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/development/extensions-core/kafka-supervisor-operations.md b/docs/development/extensions-core/kafka-supervisor-operations.md index f447b0ded546..5c2196734c4f 100644 --- a/docs/development/extensions-core/kafka-supervisor-operations.md +++ b/docs/development/extensions-core/kafka-supervisor-operations.md @@ -157,8 +157,8 @@ as it takes to generate segments, push segments to deep storage, and have them b The number of reading tasks is controlled by `replicas` and `taskCount`. In general, there will be `replicas * taskCount` reading tasks, the exception being if taskCount > {numKafkaPartitions} in which case {numKafkaPartitions} tasks will -be used instead. When `taskDuration` elapses, one task and it's replicas will transition to publishing state and -new reading tasks will be created. Therefore, to allow for reading tasks and publishing tasks to run concurrently, there +be used instead. When `taskDuration` elapses, these tasks will transition to publishing state and `replicas * taskCount` +new reading tasks will be created. Therefore to allow for reading tasks and publishing tasks to run concurrently, there should be a minimum capacity of: ``` diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java index 2227aa128bb1..d49ceaa260cf 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java @@ -50,7 +50,7 @@ public abstract class SeekableStreamSupervisorIOConfig @Nullable private final AutoScalerConfig autoScalerConfig; @Nullable private final IdleConfig idleConfig; - private final Integer stopTaskCount; + private final int stopTaskCount; public SeekableStreamSupervisorIOConfig( String stream, From 55e82eda8a441b16f7366d3e5057b0c8645d46bd Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Wed, 21 Jun 2023 13:30:48 -0700 Subject: [PATCH 5/9] oops --- docs/development/extensions-core/kafka-supervisor-operations.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/development/extensions-core/kafka-supervisor-operations.md b/docs/development/extensions-core/kafka-supervisor-operations.md index 5c2196734c4f..dbfa05174fb3 100644 --- a/docs/development/extensions-core/kafka-supervisor-operations.md +++ b/docs/development/extensions-core/kafka-supervisor-operations.md @@ -162,7 +162,7 @@ new reading tasks will be created. Therefore to allow for reading tasks and publ should be a minimum capacity of: ``` -workerCapacity = replicas * (taskCount + 1) +workerCapacity = 2 * replicas * taskCount ``` This value is for the ideal situation in which there is at most one set of tasks publishing while another set is reading. From d6500f2c6a39aa944a5efa456f9b00a776b9ea41 Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Wed, 21 Jun 2023 14:18:20 -0700 Subject: [PATCH 6/9] checkstyle --- .../supervisor/SeekableStreamSupervisor.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 4d11a2d706c8..f6d9a5f8e672 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -2963,11 +2963,12 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration()); futureGroupIds.add(groupId); futures.add(checkpointTaskGroup(group, true)); - } - // if this task has run longer than the configured duration, signal all tasks in the group to persist - else if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) { - int pendingTasks = pendingCompletionTaskGroups.values().stream().mapToInt(CopyOnWriteArrayList::size).sum(); - if (pendingTasks <= ioConfig.getStopTaskCount()) { + } else if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) { + // if this task has run longer than the configured duration + // as long as the pending task groups are less than the configured stop task count. + if (ioConfig.getStopTaskCount() == ioConfig.getTaskCount() + || pendingCompletionTaskGroups.values().stream().mapToInt(CopyOnWriteArrayList::size).sum() + < ioConfig.getStopTaskCount()) { log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration()); futureGroupIds.add(groupId); futures.add(checkpointTaskGroup(group, true)); From 6cb9b5d4e7a3081e61ab933d314c1efdcbd543fd Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Fri, 21 Jul 2023 09:14:57 -0700 Subject: [PATCH 7/9] wip test --- .../kafka/supervisor/KafkaSupervisorTest.java | 2 +- .../supervisor/SeekableStreamSupervisor.java | 71 +++++++++++++------ .../SeekableStreamSamplerSpecTest.java | 1 - .../SeekableStreamSupervisorSpecTest.java | 36 ++++++++++ 4 files changed, 88 insertions(+), 22 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 6a1b8964d6d6..6572d6b23c1d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -310,7 +310,7 @@ public SeekableStreamIndexTaskClient build( null, null, new IdleConfig(true, 1000L), - null + 1 ); final KafkaSupervisorTuningConfig tuningConfigOri = new KafkaSupervisorTuningConfig( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 523840d449a9..b39e93df41f2 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1415,6 +1415,27 @@ private List getCurrentParseErrors() return limitedParseErrors; } + @VisibleForTesting + public void addTaskGroupToActivelyReadingTaskGroup( + int taskGroupId, + ImmutableMap partitionOffsets, + Optional minMsgTime, + Optional maxMsgTime, + Set tasks, + Set exclusiveStartingSequencePartitions + ) + { + addTaskGroupToActivelyReadingTaskGroup( + taskGroupId, + partitionOffsets, + minMsgTime, + maxMsgTime, + tasks, + exclusiveStartingSequencePartitions, + null + ); + } + @VisibleForTesting public void addTaskGroupToActivelyReadingTaskGroup( int taskGroupId, @@ -1422,7 +1443,8 @@ public void addTaskGroupToActivelyReadingTaskGroup( Optional minMsgTime, Optional maxMsgTime, Set tasks, - Set exclusiveStartingSequencePartitions + Set exclusiveStartingSequencePartitions, + @Nullable DateTime taskStartTime ) { TaskGroup group = new TaskGroup( @@ -1434,6 +1456,13 @@ public void addTaskGroupToActivelyReadingTaskGroup( exclusiveStartingSequencePartitions ); group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> new TaskData()))); + if (taskStartTime != null) { + int i =0; + for (TaskData td : group.tasks.values()) { + td.startTime = taskStartTime.minusSeconds(i); + i++; + } + } if (activelyReadingTaskGroups.putIfAbsent(taskGroupId, group) != null) { throw new ISE( "trying to add taskGroup with id [%s] to actively reading task groups, but group already exists.", @@ -2947,33 +2976,35 @@ private void checkTaskDuration() throws ExecutionException, InterruptedException stopTasksEarly = true; } + int stoppedTasks = 0; for (Entry entry : activelyReadingTaskGroups.entrySet()) { Integer groupId = entry.getKey(); TaskGroup group = entry.getValue(); - // find the longest running task from this group - DateTime earliestTaskStart = DateTimes.nowUtc(); - for (TaskData taskData : group.tasks.values()) { - if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) { - earliestTaskStart = taskData.startTime; - } - } - if (stopTasksEarly) { - log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration()); + log.info("Stopping task group [%d] early. It has run for [%s]", groupId, ioConfig.getTaskDuration()); futureGroupIds.add(groupId); futures.add(checkpointTaskGroup(group, true)); - } else if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) { - // if this task has run longer than the configured duration - // as long as the pending task groups are less than the configured stop task count. - if (ioConfig.getStopTaskCount() == ioConfig.getTaskCount() - || pendingCompletionTaskGroups.values().stream().mapToInt(CopyOnWriteArrayList::size).sum() - < ioConfig.getStopTaskCount()) { - log.info("Task group [%d] has run for [%s]", groupId, ioConfig.getTaskDuration()); - futureGroupIds.add(groupId); - futures.add(checkpointTaskGroup(group, true)); + } else { + // find the longest running task from this group + DateTime earliestTaskStart = DateTimes.nowUtc(); + for (TaskData taskData : group.tasks.values()) { + if (taskData.startTime != null && earliestTaskStart.isAfter(taskData.startTime)) { + earliestTaskStart = taskData.startTime; + } + } + + if (earliestTaskStart.plus(ioConfig.getTaskDuration()).isBeforeNow()) { + // if this task has run longer than the configured duration + // as long as the pending task groups are less than the configured stop task count. + if (pendingCompletionTaskGroups.values().stream().mapToInt(CopyOnWriteArrayList::size).sum() + stoppedTasks + < ioConfig.getStopTaskCount()) { + log.info("Task group [%d] has run for [%s]. Stopping.", groupId, ioConfig.getTaskDuration()); + futureGroupIds.add(groupId); + futures.add(checkpointTaskGroup(group, true)); + stoppedTasks++; + } } - break; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java index 007f316e1440..87cd196c268f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java @@ -348,7 +348,6 @@ private TestableSeekableStreamSupervisorIOConfig( autoScalerConfig, lateMessageRejectionStartDateTime, idleConfig, - // TODO: wire test code through null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index ed5ff6798865..bc7e05c826ae 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -812,6 +812,42 @@ public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() t autoScaler.stop(); } + @Test + public void testSeekableStreamSupervisorWithStopTaskCount() + { + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig( + "stream", + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), + 1, + 2, + new Period("PT1H"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), + null, + null, + 1 + ) + { + }).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.replay(spec); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3); + + supervisor.start(); + supervisor.runInternal(); + + } @Test public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedException { From 36f94a7b99b15be64f5ec647ce2b6752b11973ee Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Thu, 3 Aug 2023 14:44:49 -0700 Subject: [PATCH 8/9] undo partial test change --- .../supervisor/SeekableStreamSupervisor.java | 31 +------------------ 1 file changed, 1 insertion(+), 30 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index d0ea0a21b491..a72ba05eacad 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -1411,27 +1411,6 @@ private List getCurrentParseErrors() return limitedParseErrors; } - @VisibleForTesting - public void addTaskGroupToActivelyReadingTaskGroup( - int taskGroupId, - ImmutableMap partitionOffsets, - Optional minMsgTime, - Optional maxMsgTime, - Set tasks, - Set exclusiveStartingSequencePartitions - ) - { - addTaskGroupToActivelyReadingTaskGroup( - taskGroupId, - partitionOffsets, - minMsgTime, - maxMsgTime, - tasks, - exclusiveStartingSequencePartitions, - null - ); - } - @VisibleForTesting public void addTaskGroupToActivelyReadingTaskGroup( int taskGroupId, @@ -1439,8 +1418,7 @@ public void addTaskGroupToActivelyReadingTaskGroup( Optional minMsgTime, Optional maxMsgTime, Set tasks, - Set exclusiveStartingSequencePartitions, - @Nullable DateTime taskStartTime + Set exclusiveStartingSequencePartitions ) { TaskGroup group = new TaskGroup( @@ -1452,13 +1430,6 @@ public void addTaskGroupToActivelyReadingTaskGroup( exclusiveStartingSequencePartitions ); group.tasks.putAll(tasks.stream().collect(Collectors.toMap(x -> x, x -> new TaskData()))); - if (taskStartTime != null) { - int i =0; - for (TaskData td : group.tasks.values()) { - td.startTime = taskStartTime.minusSeconds(i); - i++; - } - } if (activelyReadingTaskGroups.putIfAbsent(taskGroupId, group) != null) { throw new ISE( "trying to add taskGroup with id [%s] to actively reading task groups, but group already exists.", From 5c74bca1c8e9658f8503523a7e2da30f79a27c4b Mon Sep 17 00:00:00 2001 From: Suneet Saldanha Date: Thu, 3 Aug 2023 14:47:44 -0700 Subject: [PATCH 9/9] remove incomplete test --- .../SeekableStreamSupervisorSpecTest.java | 36 ------------------- 1 file changed, 36 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 6ec4004b88e1..05e6401a8f6b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -800,42 +800,6 @@ public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() t autoScaler.stop(); } - @Test - public void testSeekableStreamSupervisorWithStopTaskCount() - { - EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); - - EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); - EasyMock.expect(spec.getIoConfig()).andReturn(new SeekableStreamSupervisorIOConfig( - "stream", - new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), - 1, - 2, - new Period("PT1H"), - new Period("P1D"), - new Period("PT30S"), - false, - new Period("PT30M"), - null, - null, - mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), - null, - null, - 1 - ) - { - }).anyTimes(); - EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); - EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); - EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); - EasyMock.replay(spec); - - TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3); - - supervisor.start(); - supervisor.runInternal(); - - } @Test public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedException {