From dfda88c4aaa210a497b9059ff8240698df18289e Mon Sep 17 00:00:00 2001 From: Andrew Ho Date: Thu, 23 Apr 2026 16:29:59 -0700 Subject: [PATCH 1/9] Initial implementation for BoundedStreamConfig --- .../indexing/StreamIndexTestBase.java | 1 + .../kinesis/KinesisDataFormatsTest.java | 1 + .../supervisor/RabbitStreamSupervisor.java | 29 ++ .../RabbitStreamSupervisorIOConfig.java | 7 +- .../RabbitStreamSupervisorTest.java | 3 + .../kafka/supervisor/KafkaSupervisor.java | 25 ++ .../supervisor/KafkaSupervisorIOConfig.java | 7 +- .../supervisor/KafkaIOConfigBuilder.java | 1 + .../KafkaSupervisorIOConfigTest.java | 3 + .../kafka/supervisor/KafkaSupervisorTest.java | 8 +- .../kinesis/supervisor/KinesisSupervisor.java | 23 + .../supervisor/KinesisSupervisorIOConfig.java | 7 +- .../kinesis/KinesisSamplerSpecTest.java | 2 + .../supervisor/KinesisSupervisorTest.java | 8 + .../supervisor/BoundedStreamConfig.java | 90 ++++ .../supervisor/SeekableStreamSupervisor.java | 397 +++++++++++++++++- .../SeekableStreamSupervisorIOConfig.java | 17 +- .../SeekableStreamSupervisorSpec.java | 39 ++ .../SeekableStreamSamplerSpecTest.java | 1 + .../SeekableStreamSupervisorIOConfigTest.java | 13 +- .../SeekableStreamSupervisorSpecTest.java | 4 + .../SeekableStreamSupervisorStateTest.java | 46 +- .../SeekableStreamSupervisorTestBase.java | 19 + .../supervisor/SupervisorStateManager.java | 15 +- 24 files changed, 747 insertions(+), 19 deletions(-) create mode 100644 indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfig.java diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java index 821d776947dc..ee826936b2b4 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/StreamIndexTestBase.java @@ -120,6 +120,7 @@ protected KinesisSupervisorSpec createKinesisSupervisor(KinesisResource kinesis, Period.seconds(60), null, null, null, null, null, null, null, null, false, + null, null ), Map.of(), diff --git a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java index 2c5cfb67d9c3..9a37c73573a6 100644 --- a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java +++ b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/kinesis/KinesisDataFormatsTest.java @@ -87,6 +87,7 @@ private KinesisSupervisorSpec createKinesisSupervisorSpec(String dataSource, Str Period.seconds(5), null, null, null, null, null, null, null, null, false, + null, null ), Map.of(), diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java index 9be4a77a8d4e..f72e55147548 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java @@ -362,6 +362,35 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() return false; } + @Override + protected boolean isOffsetAtOrBeyond(Long current, Long target) + { + throw new UnsupportedOperationException( + "Bounded stream processing is not yet supported for RabbitMQ. " + + "This feature is currently only available for Kafka supervisors." + ); + } + + @Override + protected String createPartitionIdFromString(String partitionIdString) + { + // RabbitMQ uses String as partition ID, so just return the string as-is + return partitionIdString; + } + + @Override + protected Long createSequenceOffsetFromObject(Object offsetObj) + { + // RabbitMQ uses Long as sequence offset + if (offsetObj instanceof Number) { + return ((Number) offsetObj).longValue(); + } + if (offsetObj instanceof String) { + return Long.parseLong((String) offsetObj); + } + throw new IllegalArgumentException("Cannot convert " + offsetObj.getClass() + " to Long offset"); + } + @Override public LagStats computeLagStats() { diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java index 8aad5b762219..20c4e92988b6 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputFormat; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; @@ -66,7 +67,8 @@ public RabbitStreamSupervisorIOConfig( @JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod, @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime, @JsonProperty("stopTaskCount") Integer stopTaskCount, - @Nullable @JsonProperty("serverPriorityToReplicas") Map serverPriorityToReplicas + @Nullable @JsonProperty("serverPriorityToReplicas") Map serverPriorityToReplicas, + @Nullable @JsonProperty("boundedStreamConfig") BoundedStreamConfig boundedStreamConfig ) { super( @@ -86,7 +88,8 @@ public RabbitStreamSupervisorIOConfig( lateMessageRejectionStartDateTime, new IdleConfig(null, null), stopTaskCount, - serverPriorityToReplicas + serverPriorityToReplicas, + boundedStreamConfig ); this.consumerProperties = consumerProperties; diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java index 82e0b164471a..d66313848609 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java @@ -212,6 +212,7 @@ private RabbitStreamSupervisor getSupervisor( earlyMessageRejectionPeriod, // early message rejection null, // latemessagerejectionstartdatetime 1, + null, null ); RabbitStreamIndexTaskClientFactory clientFactory = new RabbitStreamIndexTaskClientFactory(null, @@ -278,6 +279,7 @@ public void testRecordSupplier() null, // early message rejection null, // latemessagerejectionstartdatetime 1, + null, null ); RabbitStreamIndexTaskClientFactory clientFactory = new RabbitStreamIndexTaskClientFactory(null, @@ -421,6 +423,7 @@ public void testCreateTaskIOConfig() null, // early message rejection null, // latemessagerejectionstartdatetime 1, + null, null ) ); diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index e2f62ed8d750..d85e56fc4918 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -390,6 +390,31 @@ protected boolean isShardExpirationMarker(Long seqNum) return false; } + @Override + protected boolean isOffsetAtOrBeyond(Long current, Long target) + { + return current >= target; + } + + @Override + protected KafkaTopicPartition createPartitionIdFromString(String partitionIdString) + { + return KafkaTopicPartition.fromString(partitionIdString); + } + + @Override + protected Long createSequenceOffsetFromObject(Object offsetObj) + { + // Jackson may deserialize numbers as Integer if they fit, but Kafka needs Long + if (offsetObj instanceof Number) { + return ((Number) offsetObj).longValue(); + } + if (offsetObj instanceof String) { + return Long.parseLong((String) offsetObj); + } + throw new IllegalArgumentException("Cannot convert " + offsetObj.getClass() + " to Long offset"); + } + @Override protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index 992ff292694a..7e9fffb7e79c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputFormat; import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; @@ -78,7 +79,8 @@ public KafkaSupervisorIOConfig( @JsonProperty("idleConfig") IdleConfig idleConfig, @JsonProperty("stopTaskCount") Integer stopTaskCount, @Nullable @JsonProperty("emitTimeLagMetrics") Boolean emitTimeLagMetrics, - @Nullable @JsonProperty("serverPriorityToReplicas") Map serverPriorityToReplicas + @Nullable @JsonProperty("serverPriorityToReplicas") Map serverPriorityToReplicas, + @Nullable @JsonProperty("boundedStreamConfig") BoundedStreamConfig boundedStreamConfig ) { super( @@ -98,7 +100,8 @@ public KafkaSupervisorIOConfig( lateMessageRejectionStartDateTime, idleConfig, stopTaskCount, - serverPriorityToReplicas + serverPriorityToReplicas, + boundedStreamConfig ); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java index 24c1656fc7e6..dd62c8385f45 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java @@ -93,6 +93,7 @@ public KafkaSupervisorIOConfig build() idleConfig, stopTaskCount, null, + null, null ); } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 6295d41937e8..049b9959088c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -341,6 +341,7 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException null, null, false, + null, null ); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); @@ -377,6 +378,7 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException null, null, false, + null, null ); Assert.assertEquals(5, kafkaSupervisorIOConfig.getTaskCount().intValue()); @@ -430,6 +432,7 @@ public void testIdleConfigSerde() throws JsonProcessingException mapper.convertValue(idleConfig, IdleConfig.class), null, false, + null, null ); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index b5e00bcaab4b..9e11053939d9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -5401,7 +5401,8 @@ public void testComputeUnassignedServerPriorities_whenMultipleReplicasPerPriorit Map.of( 10, 2, 20, 3 - ) + ), + null ); Assert.assertEquals(5, (int) kafkaSupervisorIOConfig.getReplicas()); @@ -5686,7 +5687,8 @@ private TestableKafkaSupervisor getTestableSupervisor( idleConfig, null, true, - serverPriorityToReplicas + serverPriorityToReplicas, + null ); KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( @@ -5781,6 +5783,7 @@ private TestableKafkaSupervisor getTestableSupervisorCustomIsTaskCurrent( null, null, false, + null, null ); @@ -5876,6 +5879,7 @@ private KafkaSupervisor createSupervisor( null, null, false, + null, null ); diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 08491caa8ff5..262f4b39100d 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -381,6 +381,29 @@ protected boolean isShardExpirationMarker(String seqNum) return KinesisSequenceNumber.EXPIRED_MARKER.equals(seqNum); } + @Override + protected boolean isOffsetAtOrBeyond(String current, String target) + { + throw new UnsupportedOperationException( + "Bounded stream processing is not yet supported for Kinesis. " + + "This feature is currently only available for Kafka supervisors." + ); + } + + @Override + protected String createPartitionIdFromString(String partitionIdString) + { + // Kinesis uses String as partition ID, so just return the string as-is + return partitionIdString; + } + + @Override + protected String createSequenceOffsetFromObject(Object offsetObj) + { + // Kinesis uses String as sequence offset + return offsetObj.toString(); + } + @Override protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() { diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java index 6c325bd0744d..e20d3a261cfb 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java @@ -26,6 +26,7 @@ import org.apache.druid.data.input.InputFormat; import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig; import org.apache.druid.indexing.kinesis.KinesisRegion; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; @@ -79,7 +80,8 @@ public KinesisSupervisorIOConfig( @JsonProperty("awsExternalId") String awsExternalId, @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig, @JsonProperty("deaggregate") @Deprecated boolean deaggregate, - @Nullable @JsonProperty("serverPriorityToReplicas") Map serverPriorityToReplicas + @Nullable @JsonProperty("serverPriorityToReplicas") Map serverPriorityToReplicas, + @Nullable @JsonProperty("boundedStreamConfig") BoundedStreamConfig boundedStreamConfig ) { super( @@ -99,7 +101,8 @@ public KinesisSupervisorIOConfig( lateMessageRejectionStartDateTime, new IdleConfig(null, null), null, - serverPriorityToReplicas + serverPriorityToReplicas, + boundedStreamConfig ); this.endpoint = endpoint != null diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index 4c9f4a1c8e2c..6f4410f3a8be 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -140,6 +140,7 @@ public void testSample() throws InterruptedException null, null, false, + null, null ), null, @@ -195,6 +196,7 @@ public void testGetInputSourceResources() null, null, false, + null, null ), null, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 522aebad4cf6..9c82671abccb 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -468,6 +468,7 @@ public void testRecordSupplier() null, null, false, + null, null ); KinesisIndexTaskClientFactory clientFactory = new KinesisIndexTaskClientFactory(null, OBJECT_MAPPER); @@ -534,6 +535,7 @@ public void testKinesisIOConfigInitAndAutoscalerConfigCreation() null, null, false, + null, null ); @@ -562,6 +564,7 @@ public void testKinesisIOConfigInitAndAutoscalerConfigCreation() null, OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class), false, + null, null ); @@ -4219,6 +4222,7 @@ public void testCorrectInputSources() null, null, false, + null, null ), null, @@ -5184,6 +5188,7 @@ private TestableKinesisSupervisor getTestableSupervisor( null, null, false, + null, null ); @@ -5327,6 +5332,7 @@ private TestableKinesisSupervisor getTestableSupervisor( null, autoScalerConfig, false, + null, null ); @@ -5414,6 +5420,7 @@ private TestableKinesisSupervisor getTestableSupervisorCustomIsTaskCurrent( null, null, false, + null, null ); @@ -5503,6 +5510,7 @@ private KinesisSupervisor createSupervisor( null, null, false, + null, null ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfig.java new file mode 100644 index 000000000000..6d8caf0686d3 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfig.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; + +import java.util.Map; + +/** + * Configuration for bounded (one-time) stream processing with explicit start/end offsets. + * + * When configured, the supervisor will: + * 1. Create tasks starting at the specified startSequenceNumbers + * 2. Tasks will automatically stop when they reach endSequenceNumbers + * 3. Supervisor will not recreate tasks after they complete + * 4. Supervisor will auto-terminate when all tasks are done + * + * This is useful for: + * - Backfill processing + * - Historical reprocessing + * - One-time migration tasks + */ +public class BoundedStreamConfig +{ + private final Map startSequenceNumbers; // Partition -> Start Offset + private final Map endSequenceNumbers; // Partition -> End Offset + + @JsonCreator + public BoundedStreamConfig( + @JsonProperty("startSequenceNumbers") Map startSequenceNumbers, + @JsonProperty("endSequenceNumbers") Map endSequenceNumbers + ) + { + this.startSequenceNumbers = Preconditions.checkNotNull(startSequenceNumbers, "startSequenceNumbers"); + this.endSequenceNumbers = Preconditions.checkNotNull(endSequenceNumbers, "endSequenceNumbers"); + + // Validation + Preconditions.checkArgument( + !startSequenceNumbers.isEmpty(), + "startSequenceNumbers cannot be empty" + ); + + Preconditions.checkArgument( + startSequenceNumbers.keySet().equals(endSequenceNumbers.keySet()), + "startSequenceNumbers and endSequenceNumbers must have matching partition sets. Start: %s, End: %s", + startSequenceNumbers.keySet(), + endSequenceNumbers.keySet() + ); + } + + @JsonProperty + public Map getStartSequenceNumbers() + { + return startSequenceNumbers; + } + + @JsonProperty + public Map getEndSequenceNumbers() + { + return endSequenceNumbers; + } + + @Override + public String toString() + { + return "BoundedStreamConfig{" + + "startSequenceNumbers=" + startSequenceNumbers + + ", endSequenceNumbers=" + endSequenceNumbers + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 58d366d2306e..b006e90119d1 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -206,6 +206,10 @@ public class TaskGroup // task groups have nothing but closed partitions in their assignments. final ImmutableMap unfilteredStartingSequencesForSequenceName; + // End sequences for bounded mode - null for streaming mode + @Nullable + final ImmutableMap endSequences; + final ConcurrentHashMap tasks = new ConcurrentHashMap<>(); final ConcurrentHashMap taskIdToServerPriority = new ConcurrentHashMap<>(); final DateTime minimumMessageTime; @@ -230,6 +234,37 @@ public class TaskGroup groupId, startingSequences, unfilteredStartingSequencesForSequenceName, + null, // endSequences - null for streaming mode + minimumMessageTime, + maximumMessageTime, + exclusiveStartSequenceNumberPartitions, + generateSequenceName( + unfilteredStartingSequencesForSequenceName == null + ? startingSequences + : unfilteredStartingSequencesForSequenceName, + minimumMessageTime, + maximumMessageTime, + spec.getDataSchema(), + taskTuningConfig + ) + ); + } + + TaskGroup( + int groupId, + ImmutableMap startingSequences, + @Nullable ImmutableMap unfilteredStartingSequencesForSequenceName, + @Nullable ImmutableMap endSequences, + @Nullable DateTime minimumMessageTime, + @Nullable DateTime maximumMessageTime, + @Nullable Set exclusiveStartSequenceNumberPartitions + ) + { + this( + groupId, + startingSequences, + unfilteredStartingSequencesForSequenceName, + endSequences, minimumMessageTime, maximumMessageTime, exclusiveStartSequenceNumberPartitions, @@ -249,6 +284,7 @@ public class TaskGroup int groupId, ImmutableMap startingSequences, @Nullable ImmutableMap unfilteredStartingSequencesForSequenceName, + @Nullable ImmutableMap endSequences, DateTime minimumMessageTime, DateTime maximumMessageTime, Set exclusiveStartSequenceNumberPartitions, @@ -260,6 +296,7 @@ public class TaskGroup this.unfilteredStartingSequencesForSequenceName = unfilteredStartingSequencesForSequenceName == null ? startingSequences : unfilteredStartingSequencesForSequenceName; + this.endSequences = endSequences; this.minimumMessageTime = minimumMessageTime; this.maximumMessageTime = maximumMessageTime; this.checkpointSequences.put(0, startingSequences); @@ -1347,6 +1384,17 @@ public void tryInit() try { recordSupplier = setupRecordSupplier(); + // Initialize bounded partitions BEFORE first run + if (ioConfig.isBounded()) { + try { + initializeBoundedPartitionGroups(); + } + catch (Exception e) { + log.error(e, "Failed to initialize bounded partition groups"); + throw new RuntimeException(e); + } + } + exec.submit( () -> { try { @@ -1851,6 +1899,12 @@ public void runInternal() } } + // Check for bounded completion after tasks have been created/managed + if (isBoundedWorkComplete()) { + handleBoundedCompletion(); + return; + } + logDebugReport(); } catch (Exception e) { @@ -2338,6 +2392,17 @@ public Boolean apply(Pair endSequences = null; + if (seekableStreamIndexTask.getIOConfig().getEndSequenceNumbers() != null) { + endSequences = ImmutableMap.copyOf( + seekableStreamIndexTask.getIOConfig() + .getEndSequenceNumbers() + .getPartitionSequenceNumberMap() + ); + } + return new TaskGroup( taskGroupId, ImmutableMap.copyOf( @@ -2346,6 +2411,7 @@ public Boolean apply(Pair previousPartitionIds = new ArrayList<>(partitionIds); Set partitionIdsFromSupplier; recordSupplierLock.lock(); @@ -3161,6 +3234,59 @@ protected Map getLatestSequencesFromStream( return new HashMap<>(); } + /** + * Converts a map with string keys from BoundedStreamConfig to a properly typed map. + * The BoundedStreamConfig uses Map which Jackson deserializes as Map. + * This method converts string keys to partition IDs and offset values to the appropriate types. + * + * @param rawMap the raw map from BoundedStreamConfig + * @return a map with properly typed partition IDs and sequence offsets + */ + private Map convertBoundedConfigMap(Map rawMap) + { + Map result = new HashMap<>(); + for (Map.Entry entry : rawMap.entrySet()) { + PartitionIdType partition = createPartitionIdFromString(entry.getKey().toString()); + SequenceOffsetType offset = createSequenceOffsetFromObject(entry.getValue()); + result.put(partition, offset); + } + return result; + } + + /** + * Initialize partitionGroups from bounded config instead of from stream discovery. + * This prevents the supervisor from trying to recreate tasks as they complete. + * Only called when in bounded mode during supervisor startup. + */ + private void initializeBoundedPartitionGroups() + { + if (!ioConfig.isBounded()) { + return; + } + + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + Map configuredPartitions = + convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers()); + + for (PartitionIdType partition : configuredPartitions.keySet()) { + int taskGroupId = getTaskGroupIdForPartition(partition); + + partitionGroups.computeIfAbsent(taskGroupId, k -> new HashSet<>()).add(partition); + partitionIds.add(partition); + partitionOffsets.put(partition, getNotSetMarker()); + + log.info("Bounded mode: initialized partition[%s] in taskGroup[%d]", partition, taskGroupId); + } + + assignRecordSupplierToPartitionIds(); + + log.info( + "Bounded mode: initialized [%d] partitions in [%d] task groups", + configuredPartitions.size(), + partitionGroups.size() + ); + } + private void assignRecordSupplierToPartitionIds() { recordSupplierLock.lock(); @@ -4052,6 +4178,29 @@ private void createNewTasks() throws JsonProcessingException // check that there is a current task group for each group of partitions in [partitionGroups] for (Integer groupId : partitionGroups.keySet()) { if (!activelyReadingTaskGroups.containsKey(groupId)) { + + // In bounded mode, distinguish between completion and failure + if (ioConfig.isBounded()) { + if (hasTaskGroupReachedBoundedEnd(groupId)) { + // Task group completed successfully - don't recreate + log.debug( + "Bounded taskGroup[%d] has reached end offsets, skipping recreation", + groupId + ); + continue; // Skip creating new task group + } else { + // Task group hasn't reached end - task must have failed, recreate it + log.info( + "Bounded taskGroup[%d] has not reached end offsets (current: %s, target: %s). " + + "Task may have failed, recreating to continue processing.", + groupId, + getCurrentOffsetsForGroup(groupId), + getEndOffsetsForGroup(groupId) + ); + // Fall through to create new task group + } + } + log.info("Creating new taskGroup[%d] for partitions[%s].", groupId, partitionGroups.get(groupId)); final DateTime minimumMessageTime; if (ioConfig.getLateMessageRejectionStartDateTime().isPresent()) { @@ -4118,13 +4267,25 @@ private void createNewTasks() throws JsonProcessingException .collect(Collectors.toSet()); } - log.info("Initializing taskGroup[%d] with startingOffsets[%s].", groupId, simpleStartingOffsets); + // NEW: Extract end offsets for bounded mode + ImmutableMap endOffsets = null; + if (ioConfig.isBounded()) { + endOffsets = ImmutableMap.copyOf(getEndOffsetsForGroup(groupId)); + } + + log.info( + "Initializing taskGroup[%d] with startingOffsets[%s] and endOffsets[%s]", + groupId, + simpleStartingOffsets, + endOffsets + ); activelyReadingTaskGroups.put( groupId, new TaskGroup( groupId, simpleStartingOffsets, simpleUnfilteredStartingOffsets, + endOffsets, minimumMessageTime, maximumMessageTime, exclusiveStartSequenceNumberPartitions @@ -4188,6 +4349,8 @@ private Map> generate int groupId ) { + // Existing logic for both streaming and bounded mode + // Bounded mode will fall back to bounded start offsets in getOffsetFromStorageForPartition() ImmutableMap.Builder> builder = ImmutableMap.builder(); final Map metadataOffsets = getOffsetsFromMetadataStorage(); for (PartitionIdType partitionId : partitionGroups.get(groupId)) { @@ -4255,6 +4418,23 @@ private OrderedSequenceNumber getOffsetFromStorageForPartiti } return makeSequenceNumber(sequence, useExclusiveStartSequenceNumberForNonFirstSequence()); } else { + // NEW: In bounded mode, if no checkpoint exists (task failed before first checkpoint), + // fall back to bounded start offset + if (ioConfig.isBounded()) { + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + Map startOffsets = + convertBoundedConfigMap(boundedConfig.getStartSequenceNumbers()); + SequenceOffsetType startOffset = startOffsets.get(partition); + if (startOffset != null) { + log.info( + "Bounded mode: no checkpoint found for partition[%s], using configured start offset[%s]", + partition, + startOffset + ); + return makeSequenceNumber(startOffset, false); + } + } + boolean useEarliestSequenceNumber = ioConfig.isUseEarliestSequenceNumber(); if (subsequentlyDiscoveredPartitions.contains(partition)) { log.info( @@ -4298,6 +4478,179 @@ && checkSourceMetadataMatch(dataSourceMetadata)) { return Collections.emptyMap(); } + /** + * Check if all partitions in a task group have reached their bounded end offsets. + * Used to determine if the task group completed successfully vs failed midway. + * + * @param groupId The task group ID to check + * @return true if all partitions in the group have reached their end offsets, false otherwise + */ + private boolean hasTaskGroupReachedBoundedEnd(int groupId) + { + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + Map endOffsets = + convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers()); + Map currentOffsets = getOffsetsFromMetadataStorage(); + + log.info( + "Bounded mode: checking completion for taskGroup[%d]. Current offsets from metadata: %s, End offsets: %s", + groupId, + currentOffsets, + endOffsets + ); + + if (currentOffsets == null || currentOffsets.isEmpty()) { + log.debug("No checkpointed offsets found, taskGroup[%d] has not completed", groupId); + return false; // No progress yet, task hasn't completed + } + + Set partitionsInGroup = partitionGroups.get(groupId); + if (partitionsInGroup == null || partitionsInGroup.isEmpty()) { + return false; + } + + // Check if ALL partitions in this group have reached their end offsets + for (PartitionIdType partition : partitionsInGroup) { + SequenceOffsetType endOffset = endOffsets.get(partition); + SequenceOffsetType currentOffset = currentOffsets.get(partition); + + if (currentOffset == null) { + log.debug( + "Partition[%s] in taskGroup[%d] has no checkpointed offset, not complete", + partition, + groupId + ); + return false; // Partition hasn't started processing + } + + if (!isOffsetAtOrBeyond(currentOffset, endOffset)) { + log.debug( + "Partition[%s] in taskGroup[%d] at offset[%s], has not reached end[%s]", + partition, + groupId, + currentOffset, + endOffset + ); + return false; // This partition hasn't reached its end + } + } + + log.info( + "All partitions in taskGroup[%d] have reached their end offsets", + groupId + ); + return true; // All partitions have reached their end offsets + } + + /** + * Get current offsets for all partitions in a task group from metadata storage. + */ + private Map getCurrentOffsetsForGroup(int groupId) + { + Map allOffsets = getOffsetsFromMetadataStorage(); + if (allOffsets == null || allOffsets.isEmpty()) { + return Collections.emptyMap(); + } + + Set partitionsInGroup = partitionGroups.get(groupId); + if (partitionsInGroup == null) { + return Collections.emptyMap(); + } + + return partitionsInGroup.stream() + .filter(allOffsets::containsKey) + .collect(Collectors.toMap( + p -> p, + allOffsets::get + )); + } + + /** + * Get end offsets for all partitions in a task group from bounded config. + */ + private Map getEndOffsetsForGroup(int groupId) + { + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + Map endOffsets = + convertBoundedConfigMap(boundedConfig.getEndSequenceNumbers()); + Set partitionsInGroup = partitionGroups.get(groupId); + + if (partitionsInGroup == null) { + return Collections.emptyMap(); + } + + return partitionsInGroup.stream() + .filter(endOffsets::containsKey) + .collect(Collectors.toMap( + p -> p, + endOffsets::get + )); + } + + /** + * Check if all bounded tasks have completed. + * Called after createNewTasks() in runInternal to ensure tasks have been created first. + * + * For bounded supervisors, we determine completion by checking if new tasks would be created. + * In createNewTasks(), bounded mode checks hasTaskGroupReachedBoundedEnd() before creating tasks. + * If that returns true (offsets reached), no new tasks are created. + * + * So completion is: no active tasks, no pending tasks, and createNewTasks() chose not to create any. + * This is indicated by empty task groups after createNewTasks() has run. + * + * We do NOT separately check metadata storage here because: + * 1. Metadata may contain stale offsets from previous supervisor runs + * 2. createNewTasks() already does the offset checking logic + * 3. If tasks were killed/failed and work is incomplete, createNewTasks() will recreate them + * + * @return true if all bounded work is complete, false otherwise + */ + private boolean isBoundedWorkComplete() + { + if (!ioConfig.isBounded()) { + return false; + } + + // Check if task groups are empty (no tasks active or pending) + boolean noActiveTasks = activelyReadingTaskGroups.isEmpty(); + boolean noPendingTasks = pendingCompletionTaskGroups.values().stream().allMatch(List::isEmpty); + + if (!noActiveTasks || !noPendingTasks) { + return false; + } + + // At this point, no tasks are running. Since createNewTasks() already ran, + // if tasks aren't running it means either: + // A) Tasks completed successfully and offset targets were reached (don't recreate) + // B) Tasks failed/killed and haven't reached targets (will recreate next run) + // + // To distinguish, we check if createNewTasks() would create new tasks. + // If hasTaskGroupReachedBoundedEnd() returns false for any group, createNewTasks() + // will create tasks next iteration, so we're not complete. + for (Integer groupId : partitionGroups.keySet()) { + if (!hasTaskGroupReachedBoundedEnd(groupId)) { + log.debug("TaskGroup[%d] has not reached bounded end, tasks will be recreated", groupId); + return false; + } + } + + // All groups have reached their end offsets and no tasks are running. + // Work is complete! + log.info("All bounded tasks completed for supervisor[%s]", supervisorId); + return true; + } + + /** + * Handle bounded processing completion by shutting down the supervisor. + * At this point, all task groups are already empty (verified by isBoundedWorkComplete), + * so we just need to mark the supervisor as completed. + */ + private void handleBoundedCompletion() + { + log.info("Bounded processing complete for supervisor[%s]. Marking as COMPLETED.", supervisorId); + stateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED); + } + protected DataSourceMetadata retrieveDataSourceMetadata() { return indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(supervisorId); @@ -4333,9 +4686,18 @@ private void createTasksForGroup(int groupId, int replicas) TaskGroup group = activelyReadingTaskGroups.get(groupId); Map startPartitions = group.startingSequences; Map endPartitions = new HashMap<>(); - for (PartitionIdType partition : startPartitions.keySet()) { - endPartitions.put(partition, getEndOfPartitionMarker()); + + if (group.endSequences != null && !group.endSequences.isEmpty()) { + // Bounded mode: use explicit end offsets from task group + endPartitions.putAll(group.endSequences); + log.info("Creating bounded tasks for taskGroup[%d] with endOffsets: %s", groupId, group.endSequences); + } else { + // Streaming mode: use exclusive end (effectively no end) + for (PartitionIdType partition : startPartitions.keySet()) { + endPartitions.put(partition, getEndOfPartitionMarker()); + } } + Set exclusiveStartSequenceNumberPartitions = activelyReadingTaskGroups .get(groupId) .exclusiveStartSequenceNumberPartitions; @@ -4692,6 +5054,25 @@ protected abstract List serverPrioritiesToAssign ) throws JsonProcessingException; + /** + * Converts a string representation of a partition ID to the typed partition ID. + * Used for deserializing bounded stream config where partition keys come as strings. + * + * @param partitionIdString string representation of partition ID + * @return typed partition ID + */ + protected abstract PartitionIdType createPartitionIdFromString(String partitionIdString); + + /** + * Converts an object (typically Number) to the typed sequence offset. + * Used for deserializing bounded stream config where offset values may come as Integer, Long, String, etc. + * Jackson may deserialize numeric values as Integer if they fit, but implementations like Kafka need Long. + * + * @param offsetObj the offset object from deserialization + * @return typed sequence offset + */ + protected abstract SequenceOffsetType createSequenceOffsetFromObject(Object offsetObj); + /** * calculates the taskgroup id that the given partition belongs to. * different between Kafka/Kinesis since Kinesis uses String as partition id @@ -5049,6 +5430,16 @@ protected LagStats aggregatePartitionLags(Map partitionLa */ protected abstract boolean isShardExpirationMarker(SequenceOffsetType seqNum); + /** + * Compares if current offset has reached or exceeded the target offset. + * Used to determine if a bounded task group has completed successfully. + * + * @param current Current offset from metadata storage + * @param target Target end offset from bounded config + * @return true if current >= target + */ + protected abstract boolean isOffsetAtOrBeyond(SequenceOffsetType current, SequenceOffsetType target); + /** * Returns true if the start sequence number should be exclusive for the non-first sequences for the whole partition. * For example, in Kinesis, the start offsets are inclusive for the first sequence, but exclusive for following diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java index 421a885b294d..4bd806c49595 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java @@ -54,6 +54,7 @@ public abstract class SeekableStreamSupervisorIOConfig @Nullable private final IdleConfig idleConfig; @Nullable private final Integer stopTaskCount; @Nullable private final Map serverPriorityToReplicas; + @Nullable private final BoundedStreamConfig boundedStreamConfig; private final LagAggregator lagAggregator; private final boolean autoScalerEnabled; @@ -75,7 +76,8 @@ public SeekableStreamSupervisorIOConfig( DateTime lateMessageRejectionStartDateTime, @Nullable IdleConfig idleConfig, @Nullable Integer stopTaskCount, - @Nullable Map serverPriorityToReplicas + @Nullable Map serverPriorityToReplicas, + @Nullable BoundedStreamConfig boundedStreamConfig ) { this.stream = Preconditions.checkNotNull(stream, "stream cannot be null"); @@ -128,6 +130,7 @@ public SeekableStreamSupervisorIOConfig( this.idleConfig = idleConfig; this.serverPriorityToReplicas = serverPriorityToReplicas; + this.boundedStreamConfig = boundedStreamConfig; if (this.serverPriorityToReplicas != null) { int serverPriorityReplicas = 0; @@ -283,4 +286,16 @@ public int getMaxAllowedStops() } return stopTaskCount == null ? taskCount : stopTaskCount; } + + @Nullable + @JsonProperty + public BoundedStreamConfig getBoundedStreamConfig() + { + return boundedStreamConfig; + } + + public boolean isBounded() + { + return boundedStreamConfig != null; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index f21e073f6c4c..bee2994d7b6c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -37,6 +37,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; @@ -49,6 +50,8 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec { + private static final Logger log = new Logger(SeekableStreamSupervisorSpec.class); + protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE = "Update of the input source stream from [%s] to [%s] is not supported for a running supervisor." + "%nTo perform the update safely, follow these steps:" @@ -259,6 +262,42 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept if (!this.getSource().equals(other.getSource())) { throw InvalidInput.exception(ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE, this.getSource(), other.getSource()); } + + // Validate bounded stream configuration + validateBoundedStreamConfig(other); + } + + /** + * Validates bounded stream configuration for the supervisor spec. + * + * @param spec the supervisor spec to validate + * @throws DruidException if the bounded stream configuration is invalid + */ + protected void validateBoundedStreamConfig(SeekableStreamSupervisorSpec spec) throws DruidException + { + SeekableStreamSupervisorIOConfig ioConfig = spec.getIoConfig(); + + if (ioConfig.isBounded()) { + // Validate partition consistency + BoundedStreamConfig boundedConfig = ioConfig.getBoundedStreamConfig(); + if (!boundedConfig.getStartSequenceNumbers().keySet().equals(boundedConfig.getEndSequenceNumbers().keySet())) { + throw InvalidInput.exception( + "Bounded stream config has mismatched partitions. Start: %s, End: %s", + boundedConfig.getStartSequenceNumbers().keySet(), + boundedConfig.getEndSequenceNumbers().keySet() + ); + } + + // Warn if useConcurrentLocks is not enabled + Map context = spec.getContext(); + if (context == null || !Boolean.TRUE.equals(context.get("useConcurrentLocks"))) { + log.warn( + "Bounded stream processing without 'useConcurrentLocks=true' may fail " + + "if other supervisors are running or segments already exist for these intervals. " + + "Consider setting useConcurrentLocks=true in the supervisor context." + ); + } + } } @Override diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java index 5af21ca7b6aa..6fd6307e50c7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSamplerSpecTest.java @@ -333,6 +333,7 @@ private TestableSeekableStreamSupervisorIOConfig( lateMessageRejectionStartDateTime, idleConfig, null, + null, null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java index 3974b9bebca9..c0b7dc1d753a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java @@ -69,6 +69,7 @@ public void testAllDefaults() null, null, null, + null, null ) { @@ -121,6 +122,7 @@ public void testAutoScalerEnabledPreservesTaskCountWhenNonNull() null, null, null, + null, null ) { @@ -146,6 +148,7 @@ public void testAutoScalerEnabledPreservesTaskCountWhenNonNull() null, null, null, + null, null ) { @@ -178,6 +181,7 @@ public void testBothLateMessageRejectionPeriodAndStartDateTime() DateTimes.nowUtc(), null, null, + null, null ) { @@ -213,6 +217,7 @@ public void testNullAggregatorThrows() null, null, null, + null, null ) { @@ -246,6 +251,7 @@ public void testGetMaxAllowedStopsScalingDisabled() null, null, null, + null, null ) { @@ -270,6 +276,7 @@ public void testGetMaxAllowedStopsScalingDisabled() null, null, 3, + null, null ) { @@ -306,6 +313,7 @@ public void testGetMaxAllowedStopsScalingEnabled() null, null, 1, + null, null ) { @@ -339,6 +347,7 @@ public void testGetMaxAllowedStopsScalingEnabled() null, null, 1, + null, null ) { @@ -369,6 +378,7 @@ public void testGetMaxAllowedStopsScalingEnabled() null, null, null, + null, null ) { @@ -456,7 +466,8 @@ private SeekableStreamSupervisorIOConfig makeSeekableStreamSupervisorIOConfig(@N null, null, null, - serverPriorityToReplicas + serverPriorityToReplicas, + null ) { }; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java index 7c20855b033d..0efad16bc276 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java @@ -785,6 +785,7 @@ public void testSeekableStreamSupervisorSpecWithScaleDisable() throws Interrupte null, null, null, + null, null ) { @@ -843,6 +844,7 @@ public void testEnablingIdleBeviourPerSupervisorWithOverlordConfigEnabled() null, new IdleConfig(true, null), null, + null, null ) { @@ -1455,6 +1457,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal null, null, null, + null, null ) { @@ -1477,6 +1480,7 @@ private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scal null, null, null, + null, null ) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index d59e2711ce88..a17d59759e76 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -697,6 +697,7 @@ public void testIdleStateTransition() throws Exception null, new IdleConfig(true, 200L), null, + null, null ) { @@ -805,6 +806,7 @@ public void testIdleOnStartUpAndTurnsToRunningAfterLagUpdates() null, new IdleConfig(true, 200L), null, + null, null ) { @@ -1105,6 +1107,7 @@ public void testCheckpointForActiveTaskGroup() throws InterruptedException, Json null, new IdleConfig(true, 200L), null, + null, null ) {}; @@ -1324,6 +1327,7 @@ public void testEarlyStoppingOfTaskGroupBasedOnStopTaskCount() throws Interrupte null, new IdleConfig(true, 200L), stopTaskCount, + null, null ) { @@ -1560,6 +1564,7 @@ public void testSupervisorStopTaskGroupEarly() throws JsonProcessingException, I null, new IdleConfig(true, 200L), null, + null, null ) { @@ -2609,6 +2614,24 @@ public LagStats computeLagStats() { return new LagStats(0, 0, 0); } + + @Override + protected boolean isOffsetAtOrBeyond(String current, String target) + { + return Long.parseLong(current) >= Long.parseLong(target); + } + + @Override + protected String createPartitionIdFromString(String partitionIdString) + { + return partitionIdString; + } + + @Override + protected String createSequenceOffsetFromObject(Object offsetObj) + { + return offsetObj.toString(); + } }; supervisor.scheduleReporting(executorService); EasyMock.verify(executorService, spec); @@ -2715,6 +2738,7 @@ private void expectEmitterSupervisor(boolean suspended) null, null, null, + null, null ) { @@ -2779,6 +2803,7 @@ public void testMaxAllowedStopsWithStopTaskCountRatio() null, null, 1, // ensure this is overridden + null, null ) { @@ -2934,7 +2959,8 @@ private static SeekableStreamSupervisorIOConfig createSupervisorIOConfig( null, null, null, - serverPriorityToReplicas + serverPriorityToReplicas, + null ) { }; @@ -3273,6 +3299,24 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() { return false; } + + @Override + protected boolean isOffsetAtOrBeyond(String current, String target) + { + return Long.parseLong(current) >= Long.parseLong(target); + } + + @Override + protected String createPartitionIdFromString(String partitionIdString) + { + return partitionIdString; + } + + @Override + protected String createSequenceOffsetFromObject(Object offsetObj) + { + return offsetObj.toString(); + } } private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java index 35d063e88e1e..dad426a755c5 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorTestBase.java @@ -328,6 +328,24 @@ public int getPartitionCount() { return partitionNumbers; } + + @Override + protected boolean isOffsetAtOrBeyond(String current, String target) + { + return Long.parseLong(current) >= Long.parseLong(target); + } + + @Override + protected String createPartitionIdFromString(String partitionIdString) + { + return partitionIdString; + } + + @Override + protected String createSequenceOffsetFromObject(Object offsetObj) + { + return offsetObj.toString(); + } } class StateOverrideTestSeekableStreamSupervisor extends TestSeekableStreamSupervisor @@ -537,6 +555,7 @@ protected SeekableStreamSupervisorIOConfig createIOConfig(int taskCount, CostBas null, null, null, + null, null ) { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java index f64f390fc566..437ea6b92922 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManager.java @@ -64,7 +64,8 @@ public enum BasicState implements State RUNNING(true, false), IDLE(true, false), SUSPENDED(true, false), - STOPPING(true, false); + STOPPING(true, false), + COMPLETED(true, false); private final boolean healthy; private final boolean firstRunOnly; @@ -122,14 +123,18 @@ public SupervisorStateManager(SupervisorStateManagerConfig supervisorStateManage /** * Certain states are only valid if the supervisor hasn't had a successful iteration. This method checks if there's * been at least one successful iteration, and if applicable, sets supervisor state to an appropriate new state. - * A STOPPING supervisor cannot transition to any other state as this state is final. + * STOPPING and COMPLETED are terminal states that cannot transition to any other state. * This method must be thread-safe as multiple threads trying to update may lead to an invalid state. */ public synchronized void maybeSetState(State proposedState) { - if (BasicState.STOPPING.equals(this.supervisorState) || BasicState.STOPPING.equals(proposedState)) { - // STOPPING takes precedence over all other states - supervisorState = BasicState.STOPPING; + // Terminal states (STOPPING, COMPLETED) take precedence over all other states + if (BasicState.STOPPING.equals(this.supervisorState) || BasicState.COMPLETED.equals(this.supervisorState)) { + // Already in a terminal state, cannot transition + return; + } else if (BasicState.STOPPING.equals(proposedState) || BasicState.COMPLETED.equals(proposedState)) { + // Transitioning to a terminal state + supervisorState = proposedState; return; } From 4180acee9a8170c54159a41ad919856b737cb303 Mon Sep 17 00:00:00 2001 From: Andrew Ho Date: Thu, 23 Apr 2026 16:34:35 -0700 Subject: [PATCH 2/9] Implement isOffsetAtOrBeyond for Rabbit and Kinesis --- .../rabbitstream/supervisor/RabbitStreamSupervisor.java | 6 ++---- .../indexing/kinesis/supervisor/KinesisSupervisor.java | 7 +++---- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java index f72e55147548..d3907f16008d 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java @@ -365,10 +365,8 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() @Override protected boolean isOffsetAtOrBeyond(Long current, Long target) { - throw new UnsupportedOperationException( - "Bounded stream processing is not yet supported for RabbitMQ. " + - "This feature is currently only available for Kafka supervisors." - ); + // RabbitMQ uses Long sequence numbers (delivery tags) + return current >= target; } @Override diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 262f4b39100d..d62872e01d51 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -384,10 +384,9 @@ protected boolean isShardExpirationMarker(String seqNum) @Override protected boolean isOffsetAtOrBeyond(String current, String target) { - throw new UnsupportedOperationException( - "Bounded stream processing is not yet supported for Kinesis. " + - "This feature is currently only available for Kafka supervisors." - ); + // Kinesis sequence numbers are comparable strings + // They can be compared lexicographically to determine order + return current.compareTo(target) >= 0; } @Override From 8cb75f699ea5228069aa49ef57443bbd693a247d Mon Sep 17 00:00:00 2001 From: Andrew Ho Date: Fri, 24 Apr 2026 10:21:12 -0700 Subject: [PATCH 3/9] Unit test coverage --- ...RabbitStreamSupervisorBoundedModeTest.java | 204 +++++++++++++++++ .../RabbitStreamSupervisorIOConfigTest.java | 76 +++++++ .../RabbitStreamSupervisorTest.java | 41 ++++ .../KafkaSupervisorBoundedModeTest.java | 206 ++++++++++++++++++ .../KafkaSupervisorIOConfigTest.java | 128 +++++++++++ .../kafka/supervisor/KafkaSupervisorTest.java | 92 ++++++++ .../KinesisSupervisorBoundedModeTest.java | 202 +++++++++++++++++ .../KinesisSupervisorIOConfigTest.java | 73 +++++++ .../supervisor/KinesisSupervisorTest.java | 95 ++++++++ .../supervisor/BoundedStreamConfigTest.java | 175 +++++++++++++++ .../SeekableStreamSupervisorIOConfigTest.java | 101 +++++++++ .../SupervisorStateManagerTest.java | 84 +++++++ 12 files changed, 1477 insertions(+) create mode 100644 extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorBoundedModeTest.java create mode 100644 extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorBoundedModeTest.java create mode 100644 extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorBoundedModeTest.java create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfigTest.java diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorBoundedModeTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorBoundedModeTest.java new file mode 100644 index 000000000000..81420e7fe91f --- /dev/null +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorBoundedModeTest.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.rabbitstream.supervisor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class RabbitStreamSupervisorBoundedModeTest +{ + @Test + public void testCreatePartitionIdFromString() + { + TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); + + String partition = supervisor.createPartitionIdFromString("queue-0"); + + Assert.assertEquals("queue-0", partition); + } + + @Test + public void testCreateSequenceOffsetFromObjectWithInteger() + { + TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); + + Long offset = supervisor.createSequenceOffsetFromObject(100); + + Assert.assertEquals(Long.valueOf(100L), offset); + } + + @Test + public void testCreateSequenceOffsetFromObjectWithLong() + { + TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); + + Long offset = supervisor.createSequenceOffsetFromObject(100L); + + Assert.assertEquals(Long.valueOf(100L), offset); + } + + @Test + public void testCreateSequenceOffsetFromObjectWithString() + { + TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); + + Long offset = supervisor.createSequenceOffsetFromObject("100"); + + Assert.assertEquals(Long.valueOf(100L), offset); + } + + @Test + public void testCreateSequenceOffsetFromObjectWithInvalidType() + { + TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); + + IllegalArgumentException ex = Assert.assertThrows( + IllegalArgumentException.class, + () -> supervisor.createSequenceOffsetFromObject(new Object()) + ); + + Assert.assertTrue(ex.getMessage().contains("Cannot convert")); + } + + @Test + public void testIsOffsetAtOrBeyondEqual() + { + TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); + + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(100L, 100L)); + } + + @Test + public void testIsOffsetAtOrBeyondGreater() + { + TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); + + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(200L, 100L)); + } + + @Test + public void testIsOffsetAtOrBeyondLess() + { + TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); + + Assert.assertFalse(supervisor.isOffsetAtOrBeyond(50L, 100L)); + } + + @Test + public void testConvertBoundedConfigMapWithIntegerValues() + { + TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); + + Map rawMap = new HashMap<>(); + rawMap.put("queue-0", 100); + rawMap.put("queue-1", 200); + + Map converted = supervisor.convertBoundedConfigMap(rawMap); + + Assert.assertEquals(2, converted.size()); + Assert.assertEquals(Long.valueOf(100L), converted.get("queue-0")); + Assert.assertEquals(Long.valueOf(200L), converted.get("queue-1")); + } + + @Test + public void testConvertBoundedConfigMapWithStringValues() + { + TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); + + Map rawMap = new HashMap<>(); + rawMap.put("queue-0", "100"); + rawMap.put("queue-1", "200"); + + Map converted = supervisor.convertBoundedConfigMap(rawMap); + + Assert.assertEquals(2, converted.size()); + Assert.assertEquals(Long.valueOf(100L), converted.get("queue-0")); + Assert.assertEquals(Long.valueOf(200L), converted.get("queue-1")); + } + + @Test + public void testConvertBoundedConfigMapWithMixedValues() + { + TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); + + Map rawMap = new HashMap<>(); + rawMap.put("queue-0", 100); + rawMap.put("queue-1", "200"); + rawMap.put("queue-2", 300L); + + Map converted = supervisor.convertBoundedConfigMap(rawMap); + + Assert.assertEquals(3, converted.size()); + Assert.assertEquals(Long.valueOf(100L), converted.get("queue-0")); + Assert.assertEquals(Long.valueOf(200L), converted.get("queue-1")); + Assert.assertEquals(Long.valueOf(300L), converted.get("queue-2")); + } + + /** + * Minimal testable subclass that exposes protected methods for testing. + */ + private static class TestableRabbitStreamSupervisor extends RabbitStreamSupervisor + { + public TestableRabbitStreamSupervisor() + { + super( + null, + null, + null, + null, + null, + null, + null + ); + } + + @Override + public String createPartitionIdFromString(String partitionIdString) + { + return super.createPartitionIdFromString(partitionIdString); + } + + @Override + public Long createSequenceOffsetFromObject(Object offsetObj) + { + return super.createSequenceOffsetFromObject(offsetObj); + } + + @Override + public boolean isOffsetAtOrBeyond(Long current, Long target) + { + return super.isOffsetAtOrBeyond(current, target); + } + + public Map convertBoundedConfigMap(Map rawMap) + { + Map result = new HashMap<>(); + for (Map.Entry entry : rawMap.entrySet()) { + String partition = createPartitionIdFromString(entry.getKey().toString()); + Long offset = createSequenceOffsetFromObject(entry.getValue()); + result.put(partition, offset); + } + return result; + } + } +} diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfigTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfigTest.java index 347152a4bb28..0dca7f084f0a 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfigTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorIOConfigTest.java @@ -136,4 +136,80 @@ public void testURIRequired() throws Exception mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class); } + @Test + public void testBoundedModeSerdeWithIntegerOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"rabbit\",\n" + + " \"stream\": \"my-stream\",\n" + + " \"uri\": \"rabbitmq-stream://localhost:5552\",\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"queue-0\": 100, \"queue-1\": 200},\n" + + " \"endSequenceNumbers\": {\"queue-0\": 500, \"queue-1\": 600}\n" + + " }\n" + + "}"; + + RabbitStreamSupervisorIOConfig config = mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getEndSequenceNumbers().size()); + } + + @Test + public void testBoundedModeSerdeWithStringOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"rabbit\",\n" + + " \"stream\": \"my-stream\",\n" + + " \"uri\": \"rabbitmq-stream://localhost:5552\",\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"queue-0\": \"100\", \"queue-1\": \"200\"},\n" + + " \"endSequenceNumbers\": {\"queue-0\": \"500\", \"queue-1\": \"600\"}\n" + + " }\n" + + "}"; + + RabbitStreamSupervisorIOConfig config = mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getEndSequenceNumbers().size()); + } + + @Test + public void testBoundedModeSerdeWithMixedOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"rabbit\",\n" + + " \"stream\": \"my-stream\",\n" + + " \"uri\": \"rabbitmq-stream://localhost:5552\",\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"queue-0\": 100, \"queue-1\": \"200\"},\n" + + " \"endSequenceNumbers\": {\"queue-0\": 500, \"queue-1\": \"600\"}\n" + + " }\n" + + "}"; + + RabbitStreamSupervisorIOConfig config = mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + } + + @Test + public void testUnboundedModeByDefault() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"rabbit\",\n" + + " \"stream\": \"my-stream\",\n" + + " \"uri\": \"rabbitmq-stream://localhost:5552\"\n" + + "}"; + + RabbitStreamSupervisorIOConfig config = mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class); + + Assert.assertFalse(config.isBounded()); + Assert.assertNull(config.getBoundedStreamConfig()); + } + } diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java index d66313848609..b9ab01e13248 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java @@ -42,6 +42,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.java.util.common.parsers.JSONPathSpec; @@ -464,4 +465,44 @@ public void test_doesTaskMatchSupervisor() Assert.assertFalse(supervisor.doesTaskMatchSupervisor(differentTaskType)); } + + @Test + public void testBoundedModeConfiguration() + { + ImmutableMap startOffsets = ImmutableMap.of( + "queue-0", 100, + "queue-1", 200 + ); + ImmutableMap endOffsets = ImmutableMap.of( + "queue-0", 500, + "queue-1", 600 + ); + + final RabbitStreamSupervisorIOConfig rabbitSupervisorIOConfig = new RabbitStreamSupervisorIOConfig( + STREAM, + URI, + INPUT_FORMAT, + 1, + 1, + new Period("PT1H"), + null, + null, + null, + new Period("PT30M"), + null, + null, + null, + null, + null, + null, + 1000, + null, + new BoundedStreamConfig(startOffsets, endOffsets) + ); + + Assert.assertTrue(rabbitSupervisorIOConfig.isBounded()); + Assert.assertNotNull(rabbitSupervisorIOConfig.getBoundedStreamConfig()); + Assert.assertEquals(2, rabbitSupervisorIOConfig.getBoundedStreamConfig().getStartSequenceNumbers().size()); + Assert.assertEquals(2, rabbitSupervisorIOConfig.getBoundedStreamConfig().getEndSequenceNumbers().size()); + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorBoundedModeTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorBoundedModeTest.java new file mode 100644 index 000000000000..b654a68dadd6 --- /dev/null +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorBoundedModeTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kafka.supervisor; + +import org.apache.druid.data.input.kafka.KafkaTopicPartition; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class KafkaSupervisorBoundedModeTest +{ + @Test + public void testCreatePartitionIdFromString() + { + TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); + + KafkaTopicPartition partition = supervisor.createPartitionIdFromString("my-topic:5"); + + Assert.assertEquals("my-topic", partition.topic()); + Assert.assertEquals(5, partition.partition()); + } + + @Test + public void testCreateSequenceOffsetFromObjectWithInteger() + { + TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); + + Long offset = supervisor.createSequenceOffsetFromObject(100); + + Assert.assertEquals(Long.valueOf(100L), offset); + } + + @Test + public void testCreateSequenceOffsetFromObjectWithLong() + { + TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); + + Long offset = supervisor.createSequenceOffsetFromObject(100L); + + Assert.assertEquals(Long.valueOf(100L), offset); + } + + @Test + public void testCreateSequenceOffsetFromObjectWithString() + { + TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); + + Long offset = supervisor.createSequenceOffsetFromObject("100"); + + Assert.assertEquals(Long.valueOf(100L), offset); + } + + @Test + public void testCreateSequenceOffsetFromObjectWithInvalidType() + { + TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); + + IllegalArgumentException ex = Assert.assertThrows( + IllegalArgumentException.class, + () -> supervisor.createSequenceOffsetFromObject(new Object()) + ); + + Assert.assertTrue(ex.getMessage().contains("Cannot convert")); + } + + @Test + public void testIsOffsetAtOrBeyondEqual() + { + TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); + + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(100L, 100L)); + } + + @Test + public void testIsOffsetAtOrBeyondGreater() + { + TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); + + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(200L, 100L)); + } + + @Test + public void testIsOffsetAtOrBeyondLess() + { + TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); + + Assert.assertFalse(supervisor.isOffsetAtOrBeyond(50L, 100L)); + } + + @Test + public void testConvertBoundedConfigMapWithIntegerValues() + { + TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); + + Map rawMap = new HashMap<>(); + rawMap.put("my-topic:0", 100); + rawMap.put("my-topic:1", 200); + + Map converted = supervisor.convertBoundedConfigMap(rawMap); + + Assert.assertEquals(2, converted.size()); + Assert.assertEquals(Long.valueOf(100L), converted.get(new KafkaTopicPartition(false, "my-topic", 0))); + Assert.assertEquals(Long.valueOf(200L), converted.get(new KafkaTopicPartition(false, "my-topic", 1))); + } + + @Test + public void testConvertBoundedConfigMapWithStringValues() + { + TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); + + Map rawMap = new HashMap<>(); + rawMap.put("my-topic:0", "100"); + rawMap.put("my-topic:1", "200"); + + Map converted = supervisor.convertBoundedConfigMap(rawMap); + + Assert.assertEquals(2, converted.size()); + Assert.assertEquals(Long.valueOf(100L), converted.get(new KafkaTopicPartition(false, "my-topic", 0))); + Assert.assertEquals(Long.valueOf(200L), converted.get(new KafkaTopicPartition(false, "my-topic", 1))); + } + + @Test + public void testConvertBoundedConfigMapWithMixedValues() + { + TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); + + Map rawMap = new HashMap<>(); + rawMap.put("my-topic:0", 100); + rawMap.put("my-topic:1", "200"); + rawMap.put("my-topic:2", 300L); + + Map converted = supervisor.convertBoundedConfigMap(rawMap); + + Assert.assertEquals(3, converted.size()); + Assert.assertEquals(Long.valueOf(100L), converted.get(new KafkaTopicPartition(false, "my-topic", 0))); + Assert.assertEquals(Long.valueOf(200L), converted.get(new KafkaTopicPartition(false, "my-topic", 1))); + Assert.assertEquals(Long.valueOf(300L), converted.get(new KafkaTopicPartition(false, "my-topic", 2))); + } + + /** + * Minimal testable subclass that exposes protected methods for testing. + */ + private static class TestableKafkaSupervisor extends KafkaSupervisor + { + public TestableKafkaSupervisor() + { + super( + null, + null, + null, + null, + null, + null, + null + ); + } + + @Override + public KafkaTopicPartition createPartitionIdFromString(String partitionIdString) + { + return super.createPartitionIdFromString(partitionIdString); + } + + @Override + public Long createSequenceOffsetFromObject(Object offsetObj) + { + return super.createSequenceOffsetFromObject(offsetObj); + } + + @Override + public boolean isOffsetAtOrBeyond(Long current, Long target) + { + return super.isOffsetAtOrBeyond(current, target); + } + + public Map convertBoundedConfigMap(Map rawMap) + { + Map result = new HashMap<>(); + for (Map.Entry entry : rawMap.entrySet()) { + KafkaTopicPartition partition = createPartitionIdFromString(entry.getKey().toString()); + Long offset = createSequenceOffsetFromObject(entry.getValue()); + result.put(partition, offset); + } + return result; + } + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 049b9959088c..79c7804991dc 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -27,6 +27,7 @@ import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.indexing.kafka.KafkaRecordSupplier; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig; @@ -442,4 +443,131 @@ public void testIdleConfigSerde() throws JsonProcessingException Assert.assertTrue(kafkaSupervisorIOConfig1.getIdleConfig().isEnabled()); Assert.assertEquals(Long.valueOf(600000), kafkaSupervisorIOConfig1.getIdleConfig().getInactiveAfterMillis()); } + + @Test + public void testBoundedModeSerdeWithIntegerOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"topic\": \"my-topic\",\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"0\": 100, \"1\": 200},\n" + + " \"endSequenceNumbers\": {\"0\": 500, \"1\": 600}\n" + + " }\n" + + "}"; + + KafkaSupervisorIOConfig config = mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getEndSequenceNumbers().size()); + } + + @Test + public void testBoundedModeSerdeWithStringOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"topic\": \"my-topic\",\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"0\": \"100\", \"1\": \"200\"},\n" + + " \"endSequenceNumbers\": {\"0\": \"500\", \"1\": \"600\"}\n" + + " }\n" + + "}"; + + KafkaSupervisorIOConfig config = mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getEndSequenceNumbers().size()); + } + + @Test + public void testBoundedModeSerdeWithMixedOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"topic\": \"my-topic\",\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"},\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"0\": 100, \"1\": \"200\"},\n" + + " \"endSequenceNumbers\": {\"0\": 500, \"1\": \"600\"}\n" + + " }\n" + + "}"; + + KafkaSupervisorIOConfig config = mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + } + + @Test + public void testUnboundedModeByDefault() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"topic\": \"my-topic\",\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"}\n" + + "}"; + + KafkaSupervisorIOConfig config = mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class); + + Assert.assertFalse(config.isBounded()); + Assert.assertNull(config.getBoundedStreamConfig()); + } + + @Test + public void testBoundedModeRoundTrip() throws Exception + { + final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(); + consumerProperties.put("bootstrap.servers", "localhost:8082"); + + Map startOffsets = new HashMap<>(); + startOffsets.put("0", 100); + startOffsets.put("1", 200); + + Map endOffsets = new HashMap<>(); + endOffsets.put("0", 500); + endOffsets.put("1", 600); + + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + KafkaSupervisorIOConfig original = new KafkaSupervisorIOConfig( + "test-topic", + null, + null, + 1, + 1, + new Period("PT1H"), + consumerProperties, + null, + LagAggregator.DEFAULT, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + null, + null, + null, + false, + null, + boundedConfig + ); + + String json = mapper.writeValueAsString(original); + KafkaSupervisorIOConfig deserialized = mapper.readValue(json, KafkaSupervisorIOConfig.class); + + Assert.assertTrue(deserialized.isBounded()); + Assert.assertNotNull(deserialized.getBoundedStreamConfig()); + Assert.assertEquals(2, deserialized.getBoundedStreamConfig().getStartSequenceNumbers().size()); + Assert.assertEquals(2, deserialized.getBoundedStreamConfig().getEndSequenceNumbers().size()); + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 9e11053939d9..daef2fe13d00 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -5477,6 +5477,98 @@ public void testComputeUnassignedServerPriorities_whenMultipleReplicasPerPriorit Assert.assertEquals(List.of(20, 20), supervisor.computeUnassignedServerPriorities(taskGroup3, 2)); } + @Test + public void testBoundedModeCreateTasksWithCorrectOffsets() throws JsonProcessingException + { + Map startOffsets = ImmutableMap.of( + topic + ":0", 100, + topic + ":1", 200, + topic + ":2", 300 + ); + Map endOffsets = ImmutableMap.of( + topic + ":0", 500, + topic + ":1", 600, + topic + ":2", 700 + ); + + final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(); + consumerProperties.put("bootstrap.servers", kafkaHost); + + final KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( + topic, + null, + INPUT_FORMAT, + 1, + 1, + new Period("PT1H"), + consumerProperties, + null, + null, + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + null, + null, + null, + true, + null, + new org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig(startOffsets, endOffsets) + ); + + Assert.assertTrue(kafkaSupervisorIOConfig.isBounded()); + + final KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(null, null); + final KafkaSupervisorSpec spec = new KafkaSupervisorSpec( + null, + null, + dataSchema, + KafkaSupervisorTuningConfig.defaultConfig(), + kafkaSupervisorIOConfig, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory, + new SupervisorStateManagerConfig() + ); + + supervisor = new TestableKafkaSupervisor( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + spec, + rowIngestionMetersFactory + ); + + // Test type conversion methods + KafkaTopicPartition partition0 = supervisor.createPartitionIdFromString(topic + ":0"); + Assert.assertEquals(topic, partition0.topic()); + Assert.assertEquals(0, partition0.partition()); + + Long offset = supervisor.createSequenceOffsetFromObject(100); + Assert.assertEquals(Long.valueOf(100L), offset); + + offset = supervisor.createSequenceOffsetFromObject("200"); + Assert.assertEquals(Long.valueOf(200L), offset); + + // Test offset comparison + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(500L, 100L)); + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(100L, 100L)); + Assert.assertFalse(supervisor.isOffsetAtOrBeyond(50L, 100L)); + } + private void addSomeEvents(int numEventsPerPartition) throws Exception { // create topic manually diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorBoundedModeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorBoundedModeTest.java new file mode 100644 index 000000000000..8c62e39e3fb9 --- /dev/null +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorBoundedModeTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.kinesis.supervisor; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class KinesisSupervisorBoundedModeTest +{ + @Test + public void testCreatePartitionIdFromString() + { + TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); + + String partition = supervisor.createPartitionIdFromString("shardId-000000000000"); + + Assert.assertEquals("shardId-000000000000", partition); + } + + @Test + public void testCreateSequenceOffsetFromObjectWithString() + { + TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); + + String offset = supervisor.createSequenceOffsetFromObject("49590338271490256608559692538361571095921575989136588898"); + + Assert.assertEquals("49590338271490256608559692538361571095921575989136588898", offset); + } + + @Test + public void testCreateSequenceOffsetFromObjectWithInteger() + { + TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); + + String offset = supervisor.createSequenceOffsetFromObject(100); + + Assert.assertEquals("100", offset); + } + + @Test + public void testCreateSequenceOffsetFromObjectWithLong() + { + TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); + + String offset = supervisor.createSequenceOffsetFromObject(100L); + + Assert.assertEquals("100", offset); + } + + @Test + public void testIsOffsetAtOrBeyondEqual() + { + TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); + + Assert.assertTrue(supervisor.isOffsetAtOrBeyond("49590338271490256608559692538361571095921575989136588898", "49590338271490256608559692538361571095921575989136588898")); + } + + @Test + public void testIsOffsetAtOrBeyondGreater() + { + TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); + + Assert.assertTrue(supervisor.isOffsetAtOrBeyond("49590338271512257353759162668991891722121171891717232706", "49590338271490256608559692538361571095921575989136588898")); + } + + @Test + public void testIsOffsetAtOrBeyondLess() + { + TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); + + Assert.assertFalse(supervisor.isOffsetAtOrBeyond("49590338271490256608559692538361571095921575989136588898", "49590338271512257353759162668991891722121171891717232706")); + } + + @Test + public void testIsOffsetAtOrBeyondWithSimpleStrings() + { + TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); + + Assert.assertTrue(supervisor.isOffsetAtOrBeyond("200", "100")); + Assert.assertFalse(supervisor.isOffsetAtOrBeyond("100", "200")); + Assert.assertTrue(supervisor.isOffsetAtOrBeyond("100", "100")); + } + + @Test + public void testConvertBoundedConfigMapWithStringValues() + { + TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); + + Map rawMap = new HashMap<>(); + rawMap.put("shardId-000000000000", "49590338271490256608559692538361571095921575989136588898"); + rawMap.put("shardId-000000000001", "49590338271512257353759162668991891722121171891717232706"); + + Map converted = supervisor.convertBoundedConfigMap(rawMap); + + Assert.assertEquals(2, converted.size()); + Assert.assertEquals("49590338271490256608559692538361571095921575989136588898", converted.get("shardId-000000000000")); + Assert.assertEquals("49590338271512257353759162668991891722121171891717232706", converted.get("shardId-000000000001")); + } + + @Test + public void testConvertBoundedConfigMapWithNumericValues() + { + TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); + + Map rawMap = new HashMap<>(); + rawMap.put("shardId-000000000000", 100); + rawMap.put("shardId-000000000001", 200L); + + Map converted = supervisor.convertBoundedConfigMap(rawMap); + + Assert.assertEquals(2, converted.size()); + Assert.assertEquals("100", converted.get("shardId-000000000000")); + Assert.assertEquals("200", converted.get("shardId-000000000001")); + } + + @Test + public void testConvertBoundedConfigMapWithMixedValues() + { + TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); + + Map rawMap = new HashMap<>(); + rawMap.put("shardId-000000000000", "49590338271490256608559692538361571095921575989136588898"); + rawMap.put("shardId-000000000001", 100); + rawMap.put("shardId-000000000002", 200L); + + Map converted = supervisor.convertBoundedConfigMap(rawMap); + + Assert.assertEquals(3, converted.size()); + Assert.assertEquals("49590338271490256608559692538361571095921575989136588898", converted.get("shardId-000000000000")); + Assert.assertEquals("100", converted.get("shardId-000000000001")); + Assert.assertEquals("200", converted.get("shardId-000000000002")); + } + + /** + * Minimal testable subclass that exposes protected methods for testing. + */ + private static class TestableKinesisSupervisor extends KinesisSupervisor + { + public TestableKinesisSupervisor() + { + super( + null, + null, + null, + null, + null, + null, + null, + null + ); + } + + @Override + public String createPartitionIdFromString(String partitionIdString) + { + return super.createPartitionIdFromString(partitionIdString); + } + + @Override + public String createSequenceOffsetFromObject(Object offsetObj) + { + return super.createSequenceOffsetFromObject(offsetObj); + } + + @Override + public boolean isOffsetAtOrBeyond(String current, String target) + { + return super.isOffsetAtOrBeyond(current, target); + } + + public Map convertBoundedConfigMap(Map rawMap) + { + Map result = new HashMap<>(); + for (Map.Entry entry : rawMap.entrySet()) { + String partition = createPartitionIdFromString(entry.getKey().toString()); + String offset = createSequenceOffsetFromObject(entry.getValue()); + result.put(partition, offset); + } + return result; + } + } +} diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java index aa922b008a55..7683cacb97f3 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule; import org.apache.druid.indexing.kinesis.KinesisRegion; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.hamcrest.CoreMatchers; import org.joda.time.Duration; @@ -132,4 +133,76 @@ public void testTopicRequired() throws Exception mapper.readValue(jsonStr, KinesisSupervisorIOConfig.class); } + @Test + public void testBoundedModeSerdeWithStringOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kinesis\",\n" + + " \"stream\": \"my-stream\",\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"shardId-000000000000\": \"49590338271490256608559692538361571095921575989136588898\", \"shardId-000000000001\": \"49590338271512257353759162668991891722121171891717232706\"},\n" + + " \"endSequenceNumbers\": {\"shardId-000000000000\": \"49590338271534258098958632799622211348320767794297876514\", \"shardId-000000000001\": \"49590338271556258844158102930252531974520363696878520322\"}\n" + + " }\n" + + "}"; + + KinesisSupervisorIOConfig config = mapper.readValue(jsonStr, KinesisSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getEndSequenceNumbers().size()); + } + + @Test + public void testBoundedModeSerdeWithNumericOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kinesis\",\n" + + " \"stream\": \"my-stream\",\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"shardId-000000000000\": 100, \"shardId-000000000001\": 200},\n" + + " \"endSequenceNumbers\": {\"shardId-000000000000\": 500, \"shardId-000000000001\": 600}\n" + + " }\n" + + "}"; + + KinesisSupervisorIOConfig config = mapper.readValue(jsonStr, KinesisSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getBoundedStreamConfig().getEndSequenceNumbers().size()); + } + + @Test + public void testBoundedModeSerdeWithMixedOffsets() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kinesis\",\n" + + " \"stream\": \"my-stream\",\n" + + " \"boundedStreamConfig\": {\n" + + " \"startSequenceNumbers\": {\"shardId-000000000000\": \"49590338271490256608559692538361571095921575989136588898\", \"shardId-000000000001\": 200},\n" + + " \"endSequenceNumbers\": {\"shardId-000000000000\": 500, \"shardId-000000000001\": \"49590338271556258844158102930252531974520363696878520322\"}\n" + + " }\n" + + "}"; + + KinesisSupervisorIOConfig config = mapper.readValue(jsonStr, KinesisSupervisorIOConfig.class); + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + } + + @Test + public void testUnboundedModeByDefault() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kinesis\",\n" + + " \"stream\": \"my-stream\"\n" + + "}"; + + KinesisSupervisorIOConfig config = mapper.readValue(jsonStr, KinesisSupervisorIOConfig.class); + + Assert.assertFalse(config.isBounded()); + Assert.assertNull(config.getBoundedStreamConfig()); + } + } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 9c82671abccb..0dfba54fa07c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -65,6 +65,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData; @@ -4732,6 +4733,100 @@ public void test_doesTaskMatchSupervisor() Assert.assertFalse(supervisor.doesTaskMatchSupervisor(differentTaskType)); } + @Test + public void testBoundedModeCreateTasksWithCorrectOffsets() + { + Map startOffsets = ImmutableMap.of( + "shardId-000000000000", "49590338271490256608559692538361571095921575989136588898", + "shardId-000000000001", "49590338271512257353759162668991891722121171891717232706" + ); + Map endOffsets = ImmutableMap.of( + "shardId-000000000000", "49590338271534258098958632799622211348320767794297876514", + "shardId-000000000001", "49590338271556258844158102930252531974520363696878520322" + ); + final KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT30S"), + null, + new Period("PT30M"), + null, + null, + null, + null, + null, + null, + 0, + null, + null, + null, + true, + null, + new BoundedStreamConfig(startOffsets, endOffsets) + ); + + Assert.assertTrue(kinesisSupervisorIOConfig.isBounded()); + + final KinesisIndexTaskClientFactory taskClientFactory = new KinesisIndexTaskClientFactory(null, null); + final KinesisSupervisorSpec spec = new KinesisSupervisorSpec( + null, + null, + dataSchema, + KinesisSupervisorTuningConfig.defaultConfig(), + kinesisSupervisorIOConfig, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory, + null, + new SupervisorStateManagerConfig() + ); + + supervisor = new TestableKinesisSupervisor( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + spec, + rowIngestionMetersFactory + ); + + // Test type conversion methods + String shardId = supervisor.createPartitionIdFromString("shardId-000000000000"); + Assert.assertEquals("shardId-000000000000", shardId); + + String offset = supervisor.createSequenceOffsetFromObject("49590338271490256608559692538361571095921575989136588898"); + Assert.assertEquals("49590338271490256608559692538361571095921575989136588898", offset); + + offset = supervisor.createSequenceOffsetFromObject(100); + Assert.assertEquals("100", offset); + + // Test offset comparison (lexicographic) + Assert.assertTrue(supervisor.isOffsetAtOrBeyond( + "49590338271512257353759162668991891722121171891717232706", + "49590338271490256608559692538361571095921575989136588898" + )); + Assert.assertTrue(supervisor.isOffsetAtOrBeyond( + "49590338271490256608559692538361571095921575989136588898", + "49590338271490256608559692538361571095921575989136588898" + )); + Assert.assertFalse(supervisor.isOffsetAtOrBeyond( + "49590338271490256608559692538361571095921575989136588898", + "49590338271512257353759162668991891722121171891717232706" + )); + } + private List testShardMergePhaseOne() throws Exception { supervisorRecordSupplier.assign(EasyMock.anyObject()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfigTest.java new file mode 100644 index 000000000000..63ec54124227 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfigTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +public class BoundedStreamConfigTest +{ + private final ObjectMapper mapper = new ObjectMapper(); + + @Test + public void testConstructorWithValidMaps() + { + Map startOffsets = new HashMap<>(); + startOffsets.put("0", 100L); + startOffsets.put("1", 200L); + + Map endOffsets = new HashMap<>(); + endOffsets.put("0", 500L); + endOffsets.put("1", 600L); + + BoundedStreamConfig config = new BoundedStreamConfig(startOffsets, endOffsets); + + Assert.assertEquals(startOffsets, config.getStartSequenceNumbers()); + Assert.assertEquals(endOffsets, config.getEndSequenceNumbers()); + } + + @Test + public void testConstructorWithNullStartSequenceNumbers() + { + Map endOffsets = new HashMap<>(); + endOffsets.put("0", 500L); + + IllegalArgumentException ex = Assert.assertThrows( + IllegalArgumentException.class, + () -> new BoundedStreamConfig(null, endOffsets) + ); + + Assert.assertEquals("startSequenceNumbers cannot be null", ex.getMessage()); + } + + @Test + public void testConstructorWithNullEndSequenceNumbers() + { + Map startOffsets = new HashMap<>(); + startOffsets.put("0", 100L); + + IllegalArgumentException ex = Assert.assertThrows( + IllegalArgumentException.class, + () -> new BoundedStreamConfig(startOffsets, null) + ); + + Assert.assertEquals("endSequenceNumbers cannot be null", ex.getMessage()); + } + + @Test + public void testConstructorWithEmptyStartSequenceNumbers() + { + Map startOffsets = new HashMap<>(); + Map endOffsets = new HashMap<>(); + endOffsets.put("0", 500L); + + IllegalArgumentException ex = Assert.assertThrows( + IllegalArgumentException.class, + () -> new BoundedStreamConfig(startOffsets, endOffsets) + ); + + Assert.assertEquals("startSequenceNumbers cannot be empty", ex.getMessage()); + } + + @Test + public void testConstructorWithEmptyEndSequenceNumbers() + { + Map startOffsets = new HashMap<>(); + startOffsets.put("0", 100L); + Map endOffsets = new HashMap<>(); + + IllegalArgumentException ex = Assert.assertThrows( + IllegalArgumentException.class, + () -> new BoundedStreamConfig(startOffsets, endOffsets) + ); + + Assert.assertEquals("endSequenceNumbers cannot be empty", ex.getMessage()); + } + + @Test + public void testSerializationDeserialization() throws Exception + { + Map startOffsets = new HashMap<>(); + startOffsets.put(0, 100); + startOffsets.put(1, 200); + + Map endOffsets = new HashMap<>(); + endOffsets.put(0, 500); + endOffsets.put(1, 600); + + BoundedStreamConfig config = new BoundedStreamConfig(startOffsets, endOffsets); + + String json = mapper.writeValueAsString(config); + BoundedStreamConfig deserialized = mapper.readValue(json, BoundedStreamConfig.class); + + Assert.assertEquals(config.getStartSequenceNumbers(), deserialized.getStartSequenceNumbers()); + Assert.assertEquals(config.getEndSequenceNumbers(), deserialized.getEndSequenceNumbers()); + } + + @Test + public void testDeserializationWithIntegerValues() throws Exception + { + String json = "{" + + "\"startSequenceNumbers\": {\"0\": 100, \"1\": 200}," + + "\"endSequenceNumbers\": {\"0\": 500, \"1\": 600}" + + "}"; + + BoundedStreamConfig config = mapper.readValue(json, BoundedStreamConfig.class); + + Assert.assertNotNull(config.getStartSequenceNumbers()); + Assert.assertNotNull(config.getEndSequenceNumbers()); + Assert.assertEquals(2, config.getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getEndSequenceNumbers().size()); + } + + @Test + public void testDeserializationWithStringValues() throws Exception + { + String json = "{" + + "\"startSequenceNumbers\": {\"0\": \"100\", \"1\": \"200\"}," + + "\"endSequenceNumbers\": {\"0\": \"500\", \"1\": \"600\"}" + + "}"; + + BoundedStreamConfig config = mapper.readValue(json, BoundedStreamConfig.class); + + Assert.assertNotNull(config.getStartSequenceNumbers()); + Assert.assertNotNull(config.getEndSequenceNumbers()); + Assert.assertEquals(2, config.getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getEndSequenceNumbers().size()); + } + + @Test + public void testDeserializationWithMixedTypes() throws Exception + { + String json = "{" + + "\"startSequenceNumbers\": {\"0\": 100, \"1\": \"200\"}," + + "\"endSequenceNumbers\": {\"0\": 500, \"1\": \"600\"}" + + "}"; + + BoundedStreamConfig config = mapper.readValue(json, BoundedStreamConfig.class); + + Assert.assertNotNull(config.getStartSequenceNumbers()); + Assert.assertNotNull(config.getEndSequenceNumbers()); + Assert.assertEquals(2, config.getStartSequenceNumbers().size()); + Assert.assertEquals(2, config.getEndSequenceNumbers().size()); + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java index c0b7dc1d753a..2c018b18f544 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfigTest.java @@ -472,4 +472,105 @@ private SeekableStreamSupervisorIOConfig makeSeekableStreamSupervisorIOConfig(@N { }; } + + @Test + public void testBoundedModeWithValidConfig() + { + Map startOffsets = Map.of("0", 100, "1", 200); + Map endOffsets = Map.of("0", 500, "1", 600); + BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + LagAggregator lagAggregator = mock(LagAggregator.class); + + SeekableStreamSupervisorIOConfig config = new SeekableStreamSupervisorIOConfig( + "stream", + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + lagAggregator, + null, + null, + null, + null, + boundedConfig + ) + { + }; + + Assert.assertTrue(config.isBounded()); + Assert.assertNotNull(config.getBoundedStreamConfig()); + Assert.assertEquals(boundedConfig, config.getBoundedStreamConfig()); + } + + @Test + public void testUnboundedModeByDefault() + { + LagAggregator lagAggregator = mock(LagAggregator.class); + + SeekableStreamSupervisorIOConfig config = new SeekableStreamSupervisorIOConfig( + "stream", + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + lagAggregator, + null, + null, + null, + null, + null + ) + { + }; + + Assert.assertFalse(config.isBounded()); + Assert.assertNull(config.getBoundedStreamConfig()); + } + + @Test + public void testBoundedModeWithNullConfig() + { + LagAggregator lagAggregator = mock(LagAggregator.class); + + SeekableStreamSupervisorIOConfig config = new SeekableStreamSupervisorIOConfig( + "stream", + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + lagAggregator, + null, + null, + null, + null, + null + ) + { + }; + + Assert.assertFalse(config.isBounded()); + Assert.assertNull(config.getBoundedStreamConfig()); + } } diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java index 367577a291dc..043992998f72 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorStateManagerTest.java @@ -66,4 +66,88 @@ public void testIdleConfigSerde() Assert.assertTrue(stateManagerConfig.isIdleConfigEnabled()); Assert.assertEquals(60000, stateManagerConfig.getInactiveAfterMillis()); } + + @Test + public void testCompletedStateIsTerminal() + { + stateManagerConfig = new SupervisorStateManagerConfig(); + SupervisorStateManager supervisorStateManager = new SupervisorStateManager( + stateManagerConfig, + false + ); + + // Start in PENDING state + Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, supervisorStateManager.getSupervisorState()); + + // Transition to COMPLETED + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED); + Assert.assertEquals(SupervisorStateManager.BasicState.COMPLETED, supervisorStateManager.getSupervisorState()); + + // Attempt to transition out of COMPLETED should be ignored + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.RUNNING); + Assert.assertEquals(SupervisorStateManager.BasicState.COMPLETED, supervisorStateManager.getSupervisorState()); + + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE); + Assert.assertEquals(SupervisorStateManager.BasicState.COMPLETED, supervisorStateManager.getSupervisorState()); + + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.PENDING); + Assert.assertEquals(SupervisorStateManager.BasicState.COMPLETED, supervisorStateManager.getSupervisorState()); + } + + @Test + public void testStoppingStateIsTerminal() + { + stateManagerConfig = new SupervisorStateManagerConfig(); + SupervisorStateManager supervisorStateManager = new SupervisorStateManager( + stateManagerConfig, + false + ); + + // Start in PENDING state + Assert.assertEquals(SupervisorStateManager.BasicState.PENDING, supervisorStateManager.getSupervisorState()); + + // Transition to STOPPING + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.STOPPING); + Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, supervisorStateManager.getSupervisorState()); + + // Attempt to transition out of STOPPING should be ignored + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.RUNNING); + Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, supervisorStateManager.getSupervisorState()); + + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.IDLE); + Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, supervisorStateManager.getSupervisorState()); + + // Cannot transition to COMPLETED from STOPPING + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED); + Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, supervisorStateManager.getSupervisorState()); + } + + @Test + public void testCompletedStateIsHealthy() + { + stateManagerConfig = new SupervisorStateManagerConfig(); + SupervisorStateManager supervisorStateManager = new SupervisorStateManager( + stateManagerConfig, + false + ); + + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED); + + Assert.assertTrue(supervisorStateManager.isHealthy()); + Assert.assertEquals(SupervisorStateManager.BasicState.COMPLETED, supervisorStateManager.getSupervisorState()); + } + + @Test + public void testCompletedStateIsNotFirstRunOnly() + { + stateManagerConfig = new SupervisorStateManagerConfig(); + SupervisorStateManager supervisorStateManager = new SupervisorStateManager( + stateManagerConfig, + false + ); + + supervisorStateManager.maybeSetState(SupervisorStateManager.BasicState.COMPLETED); + + Assert.assertFalse(SupervisorStateManager.BasicState.COMPLETED.isFirstRunOnly()); + } } From 300ebe3233f9f07764167631443113eab70de1d1 Mon Sep 17 00:00:00 2001 From: Andrew Ho Date: Fri, 24 Apr 2026 11:10:45 -0700 Subject: [PATCH 4/9] Fix BoundedStreamConfigTest --- .../supervisor/BoundedStreamConfigTest.java | 42 +++++++++++-------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfigTest.java index 63ec54124227..9c86b4d9a6d7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/BoundedStreamConfigTest.java @@ -53,12 +53,12 @@ public void testConstructorWithNullStartSequenceNumbers() Map endOffsets = new HashMap<>(); endOffsets.put("0", 500L); - IllegalArgumentException ex = Assert.assertThrows( - IllegalArgumentException.class, + NullPointerException ex = Assert.assertThrows( + NullPointerException.class, () -> new BoundedStreamConfig(null, endOffsets) ); - Assert.assertEquals("startSequenceNumbers cannot be null", ex.getMessage()); + Assert.assertTrue(ex.getMessage().contains("startSequenceNumbers")); } @Test @@ -67,12 +67,12 @@ public void testConstructorWithNullEndSequenceNumbers() Map startOffsets = new HashMap<>(); startOffsets.put("0", 100L); - IllegalArgumentException ex = Assert.assertThrows( - IllegalArgumentException.class, + NullPointerException ex = Assert.assertThrows( + NullPointerException.class, () -> new BoundedStreamConfig(startOffsets, null) ); - Assert.assertEquals("endSequenceNumbers cannot be null", ex.getMessage()); + Assert.assertTrue(ex.getMessage().contains("endSequenceNumbers")); } @Test @@ -87,42 +87,50 @@ public void testConstructorWithEmptyStartSequenceNumbers() () -> new BoundedStreamConfig(startOffsets, endOffsets) ); - Assert.assertEquals("startSequenceNumbers cannot be empty", ex.getMessage()); + Assert.assertTrue(ex.getMessage().contains("startSequenceNumbers cannot be empty")); } @Test - public void testConstructorWithEmptyEndSequenceNumbers() + public void testConstructorWithMismatchedPartitions() { Map startOffsets = new HashMap<>(); startOffsets.put("0", 100L); Map endOffsets = new HashMap<>(); + endOffsets.put("1", 500L); IllegalArgumentException ex = Assert.assertThrows( IllegalArgumentException.class, () -> new BoundedStreamConfig(startOffsets, endOffsets) ); - Assert.assertEquals("endSequenceNumbers cannot be empty", ex.getMessage()); + Assert.assertTrue(ex.getMessage().contains("must have matching partition sets")); } @Test public void testSerializationDeserialization() throws Exception { - Map startOffsets = new HashMap<>(); - startOffsets.put(0, 100); - startOffsets.put(1, 200); + Map startOffsets = new HashMap<>(); + startOffsets.put("0", 100); + startOffsets.put("1", 200); - Map endOffsets = new HashMap<>(); - endOffsets.put(0, 500); - endOffsets.put(1, 600); + Map endOffsets = new HashMap<>(); + endOffsets.put("0", 500); + endOffsets.put("1", 600); BoundedStreamConfig config = new BoundedStreamConfig(startOffsets, endOffsets); String json = mapper.writeValueAsString(config); BoundedStreamConfig deserialized = mapper.readValue(json, BoundedStreamConfig.class); - Assert.assertEquals(config.getStartSequenceNumbers(), deserialized.getStartSequenceNumbers()); - Assert.assertEquals(config.getEndSequenceNumbers(), deserialized.getEndSequenceNumbers()); + // Check sizes + Assert.assertEquals(2, deserialized.getStartSequenceNumbers().size()); + Assert.assertEquals(2, deserialized.getEndSequenceNumbers().size()); + + // Check that deserialized maps contain expected values (keys will be Strings after deserialization) + Assert.assertEquals(100, deserialized.getStartSequenceNumbers().get("0")); + Assert.assertEquals(200, deserialized.getStartSequenceNumbers().get("1")); + Assert.assertEquals(500, deserialized.getEndSequenceNumbers().get("0")); + Assert.assertEquals(600, deserialized.getEndSequenceNumbers().get("1")); } @Test From 9af972979f528b08d8b8aa8f69023a0d920e119e Mon Sep 17 00:00:00 2001 From: Andrew Ho Date: Fri, 24 Apr 2026 11:11:42 -0700 Subject: [PATCH 5/9] Remove unused import --- .../kinesis/supervisor/KinesisSupervisorIOConfigTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java index 7683cacb97f3..a93bdf6871bc 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule; import org.apache.druid.indexing.kinesis.KinesisRegion; -import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.hamcrest.CoreMatchers; import org.joda.time.Duration; From e0ffef671c6717acc5f95fbcae0b69ae06753053 Mon Sep 17 00:00:00 2001 From: Andrew Ho Date: Fri, 24 Apr 2026 11:33:38 -0700 Subject: [PATCH 6/9] Remove unneeded tests --- ...RabbitStreamSupervisorBoundedModeTest.java | 204 ----------------- .../KafkaSupervisorBoundedModeTest.java | 206 ------------------ .../KinesisSupervisorBoundedModeTest.java | 202 ----------------- 3 files changed, 612 deletions(-) delete mode 100644 extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorBoundedModeTest.java delete mode 100644 extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorBoundedModeTest.java delete mode 100644 extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorBoundedModeTest.java diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorBoundedModeTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorBoundedModeTest.java deleted file mode 100644 index 81420e7fe91f..000000000000 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorBoundedModeTest.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.rabbitstream.supervisor; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -public class RabbitStreamSupervisorBoundedModeTest -{ - @Test - public void testCreatePartitionIdFromString() - { - TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); - - String partition = supervisor.createPartitionIdFromString("queue-0"); - - Assert.assertEquals("queue-0", partition); - } - - @Test - public void testCreateSequenceOffsetFromObjectWithInteger() - { - TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); - - Long offset = supervisor.createSequenceOffsetFromObject(100); - - Assert.assertEquals(Long.valueOf(100L), offset); - } - - @Test - public void testCreateSequenceOffsetFromObjectWithLong() - { - TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); - - Long offset = supervisor.createSequenceOffsetFromObject(100L); - - Assert.assertEquals(Long.valueOf(100L), offset); - } - - @Test - public void testCreateSequenceOffsetFromObjectWithString() - { - TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); - - Long offset = supervisor.createSequenceOffsetFromObject("100"); - - Assert.assertEquals(Long.valueOf(100L), offset); - } - - @Test - public void testCreateSequenceOffsetFromObjectWithInvalidType() - { - TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); - - IllegalArgumentException ex = Assert.assertThrows( - IllegalArgumentException.class, - () -> supervisor.createSequenceOffsetFromObject(new Object()) - ); - - Assert.assertTrue(ex.getMessage().contains("Cannot convert")); - } - - @Test - public void testIsOffsetAtOrBeyondEqual() - { - TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); - - Assert.assertTrue(supervisor.isOffsetAtOrBeyond(100L, 100L)); - } - - @Test - public void testIsOffsetAtOrBeyondGreater() - { - TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); - - Assert.assertTrue(supervisor.isOffsetAtOrBeyond(200L, 100L)); - } - - @Test - public void testIsOffsetAtOrBeyondLess() - { - TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); - - Assert.assertFalse(supervisor.isOffsetAtOrBeyond(50L, 100L)); - } - - @Test - public void testConvertBoundedConfigMapWithIntegerValues() - { - TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); - - Map rawMap = new HashMap<>(); - rawMap.put("queue-0", 100); - rawMap.put("queue-1", 200); - - Map converted = supervisor.convertBoundedConfigMap(rawMap); - - Assert.assertEquals(2, converted.size()); - Assert.assertEquals(Long.valueOf(100L), converted.get("queue-0")); - Assert.assertEquals(Long.valueOf(200L), converted.get("queue-1")); - } - - @Test - public void testConvertBoundedConfigMapWithStringValues() - { - TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); - - Map rawMap = new HashMap<>(); - rawMap.put("queue-0", "100"); - rawMap.put("queue-1", "200"); - - Map converted = supervisor.convertBoundedConfigMap(rawMap); - - Assert.assertEquals(2, converted.size()); - Assert.assertEquals(Long.valueOf(100L), converted.get("queue-0")); - Assert.assertEquals(Long.valueOf(200L), converted.get("queue-1")); - } - - @Test - public void testConvertBoundedConfigMapWithMixedValues() - { - TestableRabbitStreamSupervisor supervisor = new TestableRabbitStreamSupervisor(); - - Map rawMap = new HashMap<>(); - rawMap.put("queue-0", 100); - rawMap.put("queue-1", "200"); - rawMap.put("queue-2", 300L); - - Map converted = supervisor.convertBoundedConfigMap(rawMap); - - Assert.assertEquals(3, converted.size()); - Assert.assertEquals(Long.valueOf(100L), converted.get("queue-0")); - Assert.assertEquals(Long.valueOf(200L), converted.get("queue-1")); - Assert.assertEquals(Long.valueOf(300L), converted.get("queue-2")); - } - - /** - * Minimal testable subclass that exposes protected methods for testing. - */ - private static class TestableRabbitStreamSupervisor extends RabbitStreamSupervisor - { - public TestableRabbitStreamSupervisor() - { - super( - null, - null, - null, - null, - null, - null, - null - ); - } - - @Override - public String createPartitionIdFromString(String partitionIdString) - { - return super.createPartitionIdFromString(partitionIdString); - } - - @Override - public Long createSequenceOffsetFromObject(Object offsetObj) - { - return super.createSequenceOffsetFromObject(offsetObj); - } - - @Override - public boolean isOffsetAtOrBeyond(Long current, Long target) - { - return super.isOffsetAtOrBeyond(current, target); - } - - public Map convertBoundedConfigMap(Map rawMap) - { - Map result = new HashMap<>(); - for (Map.Entry entry : rawMap.entrySet()) { - String partition = createPartitionIdFromString(entry.getKey().toString()); - Long offset = createSequenceOffsetFromObject(entry.getValue()); - result.put(partition, offset); - } - return result; - } - } -} diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorBoundedModeTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorBoundedModeTest.java deleted file mode 100644 index b654a68dadd6..000000000000 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorBoundedModeTest.java +++ /dev/null @@ -1,206 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.kafka.supervisor; - -import org.apache.druid.data.input.kafka.KafkaTopicPartition; -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -public class KafkaSupervisorBoundedModeTest -{ - @Test - public void testCreatePartitionIdFromString() - { - TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); - - KafkaTopicPartition partition = supervisor.createPartitionIdFromString("my-topic:5"); - - Assert.assertEquals("my-topic", partition.topic()); - Assert.assertEquals(5, partition.partition()); - } - - @Test - public void testCreateSequenceOffsetFromObjectWithInteger() - { - TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); - - Long offset = supervisor.createSequenceOffsetFromObject(100); - - Assert.assertEquals(Long.valueOf(100L), offset); - } - - @Test - public void testCreateSequenceOffsetFromObjectWithLong() - { - TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); - - Long offset = supervisor.createSequenceOffsetFromObject(100L); - - Assert.assertEquals(Long.valueOf(100L), offset); - } - - @Test - public void testCreateSequenceOffsetFromObjectWithString() - { - TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); - - Long offset = supervisor.createSequenceOffsetFromObject("100"); - - Assert.assertEquals(Long.valueOf(100L), offset); - } - - @Test - public void testCreateSequenceOffsetFromObjectWithInvalidType() - { - TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); - - IllegalArgumentException ex = Assert.assertThrows( - IllegalArgumentException.class, - () -> supervisor.createSequenceOffsetFromObject(new Object()) - ); - - Assert.assertTrue(ex.getMessage().contains("Cannot convert")); - } - - @Test - public void testIsOffsetAtOrBeyondEqual() - { - TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); - - Assert.assertTrue(supervisor.isOffsetAtOrBeyond(100L, 100L)); - } - - @Test - public void testIsOffsetAtOrBeyondGreater() - { - TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); - - Assert.assertTrue(supervisor.isOffsetAtOrBeyond(200L, 100L)); - } - - @Test - public void testIsOffsetAtOrBeyondLess() - { - TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); - - Assert.assertFalse(supervisor.isOffsetAtOrBeyond(50L, 100L)); - } - - @Test - public void testConvertBoundedConfigMapWithIntegerValues() - { - TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); - - Map rawMap = new HashMap<>(); - rawMap.put("my-topic:0", 100); - rawMap.put("my-topic:1", 200); - - Map converted = supervisor.convertBoundedConfigMap(rawMap); - - Assert.assertEquals(2, converted.size()); - Assert.assertEquals(Long.valueOf(100L), converted.get(new KafkaTopicPartition(false, "my-topic", 0))); - Assert.assertEquals(Long.valueOf(200L), converted.get(new KafkaTopicPartition(false, "my-topic", 1))); - } - - @Test - public void testConvertBoundedConfigMapWithStringValues() - { - TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); - - Map rawMap = new HashMap<>(); - rawMap.put("my-topic:0", "100"); - rawMap.put("my-topic:1", "200"); - - Map converted = supervisor.convertBoundedConfigMap(rawMap); - - Assert.assertEquals(2, converted.size()); - Assert.assertEquals(Long.valueOf(100L), converted.get(new KafkaTopicPartition(false, "my-topic", 0))); - Assert.assertEquals(Long.valueOf(200L), converted.get(new KafkaTopicPartition(false, "my-topic", 1))); - } - - @Test - public void testConvertBoundedConfigMapWithMixedValues() - { - TestableKafkaSupervisor supervisor = new TestableKafkaSupervisor(); - - Map rawMap = new HashMap<>(); - rawMap.put("my-topic:0", 100); - rawMap.put("my-topic:1", "200"); - rawMap.put("my-topic:2", 300L); - - Map converted = supervisor.convertBoundedConfigMap(rawMap); - - Assert.assertEquals(3, converted.size()); - Assert.assertEquals(Long.valueOf(100L), converted.get(new KafkaTopicPartition(false, "my-topic", 0))); - Assert.assertEquals(Long.valueOf(200L), converted.get(new KafkaTopicPartition(false, "my-topic", 1))); - Assert.assertEquals(Long.valueOf(300L), converted.get(new KafkaTopicPartition(false, "my-topic", 2))); - } - - /** - * Minimal testable subclass that exposes protected methods for testing. - */ - private static class TestableKafkaSupervisor extends KafkaSupervisor - { - public TestableKafkaSupervisor() - { - super( - null, - null, - null, - null, - null, - null, - null - ); - } - - @Override - public KafkaTopicPartition createPartitionIdFromString(String partitionIdString) - { - return super.createPartitionIdFromString(partitionIdString); - } - - @Override - public Long createSequenceOffsetFromObject(Object offsetObj) - { - return super.createSequenceOffsetFromObject(offsetObj); - } - - @Override - public boolean isOffsetAtOrBeyond(Long current, Long target) - { - return super.isOffsetAtOrBeyond(current, target); - } - - public Map convertBoundedConfigMap(Map rawMap) - { - Map result = new HashMap<>(); - for (Map.Entry entry : rawMap.entrySet()) { - KafkaTopicPartition partition = createPartitionIdFromString(entry.getKey().toString()); - Long offset = createSequenceOffsetFromObject(entry.getValue()); - result.put(partition, offset); - } - return result; - } - } -} diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorBoundedModeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorBoundedModeTest.java deleted file mode 100644 index 8c62e39e3fb9..000000000000 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorBoundedModeTest.java +++ /dev/null @@ -1,202 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.druid.indexing.kinesis.supervisor; - -import org.junit.Assert; -import org.junit.Test; - -import java.util.HashMap; -import java.util.Map; - -public class KinesisSupervisorBoundedModeTest -{ - @Test - public void testCreatePartitionIdFromString() - { - TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); - - String partition = supervisor.createPartitionIdFromString("shardId-000000000000"); - - Assert.assertEquals("shardId-000000000000", partition); - } - - @Test - public void testCreateSequenceOffsetFromObjectWithString() - { - TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); - - String offset = supervisor.createSequenceOffsetFromObject("49590338271490256608559692538361571095921575989136588898"); - - Assert.assertEquals("49590338271490256608559692538361571095921575989136588898", offset); - } - - @Test - public void testCreateSequenceOffsetFromObjectWithInteger() - { - TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); - - String offset = supervisor.createSequenceOffsetFromObject(100); - - Assert.assertEquals("100", offset); - } - - @Test - public void testCreateSequenceOffsetFromObjectWithLong() - { - TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); - - String offset = supervisor.createSequenceOffsetFromObject(100L); - - Assert.assertEquals("100", offset); - } - - @Test - public void testIsOffsetAtOrBeyondEqual() - { - TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); - - Assert.assertTrue(supervisor.isOffsetAtOrBeyond("49590338271490256608559692538361571095921575989136588898", "49590338271490256608559692538361571095921575989136588898")); - } - - @Test - public void testIsOffsetAtOrBeyondGreater() - { - TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); - - Assert.assertTrue(supervisor.isOffsetAtOrBeyond("49590338271512257353759162668991891722121171891717232706", "49590338271490256608559692538361571095921575989136588898")); - } - - @Test - public void testIsOffsetAtOrBeyondLess() - { - TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); - - Assert.assertFalse(supervisor.isOffsetAtOrBeyond("49590338271490256608559692538361571095921575989136588898", "49590338271512257353759162668991891722121171891717232706")); - } - - @Test - public void testIsOffsetAtOrBeyondWithSimpleStrings() - { - TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); - - Assert.assertTrue(supervisor.isOffsetAtOrBeyond("200", "100")); - Assert.assertFalse(supervisor.isOffsetAtOrBeyond("100", "200")); - Assert.assertTrue(supervisor.isOffsetAtOrBeyond("100", "100")); - } - - @Test - public void testConvertBoundedConfigMapWithStringValues() - { - TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); - - Map rawMap = new HashMap<>(); - rawMap.put("shardId-000000000000", "49590338271490256608559692538361571095921575989136588898"); - rawMap.put("shardId-000000000001", "49590338271512257353759162668991891722121171891717232706"); - - Map converted = supervisor.convertBoundedConfigMap(rawMap); - - Assert.assertEquals(2, converted.size()); - Assert.assertEquals("49590338271490256608559692538361571095921575989136588898", converted.get("shardId-000000000000")); - Assert.assertEquals("49590338271512257353759162668991891722121171891717232706", converted.get("shardId-000000000001")); - } - - @Test - public void testConvertBoundedConfigMapWithNumericValues() - { - TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); - - Map rawMap = new HashMap<>(); - rawMap.put("shardId-000000000000", 100); - rawMap.put("shardId-000000000001", 200L); - - Map converted = supervisor.convertBoundedConfigMap(rawMap); - - Assert.assertEquals(2, converted.size()); - Assert.assertEquals("100", converted.get("shardId-000000000000")); - Assert.assertEquals("200", converted.get("shardId-000000000001")); - } - - @Test - public void testConvertBoundedConfigMapWithMixedValues() - { - TestableKinesisSupervisor supervisor = new TestableKinesisSupervisor(); - - Map rawMap = new HashMap<>(); - rawMap.put("shardId-000000000000", "49590338271490256608559692538361571095921575989136588898"); - rawMap.put("shardId-000000000001", 100); - rawMap.put("shardId-000000000002", 200L); - - Map converted = supervisor.convertBoundedConfigMap(rawMap); - - Assert.assertEquals(3, converted.size()); - Assert.assertEquals("49590338271490256608559692538361571095921575989136588898", converted.get("shardId-000000000000")); - Assert.assertEquals("100", converted.get("shardId-000000000001")); - Assert.assertEquals("200", converted.get("shardId-000000000002")); - } - - /** - * Minimal testable subclass that exposes protected methods for testing. - */ - private static class TestableKinesisSupervisor extends KinesisSupervisor - { - public TestableKinesisSupervisor() - { - super( - null, - null, - null, - null, - null, - null, - null, - null - ); - } - - @Override - public String createPartitionIdFromString(String partitionIdString) - { - return super.createPartitionIdFromString(partitionIdString); - } - - @Override - public String createSequenceOffsetFromObject(Object offsetObj) - { - return super.createSequenceOffsetFromObject(offsetObj); - } - - @Override - public boolean isOffsetAtOrBeyond(String current, String target) - { - return super.isOffsetAtOrBeyond(current, target); - } - - public Map convertBoundedConfigMap(Map rawMap) - { - Map result = new HashMap<>(); - for (Map.Entry entry : rawMap.entrySet()) { - String partition = createPartitionIdFromString(entry.getKey().toString()); - String offset = createSequenceOffsetFromObject(entry.getValue()); - result.put(partition, offset); - } - return result; - } - } -} From 162e1f339be911d5bb6a51530afe98461d2129d8 Mon Sep 17 00:00:00 2001 From: Andrew Ho Date: Fri, 24 Apr 2026 13:15:59 -0700 Subject: [PATCH 7/9] Unit test fix --- .../druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index daef2fe13d00..7c5a0610c8db 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -5554,7 +5554,7 @@ public void testBoundedModeCreateTasksWithCorrectOffsets() throws JsonProcessing // Test type conversion methods KafkaTopicPartition partition0 = supervisor.createPartitionIdFromString(topic + ":0"); - Assert.assertEquals(topic, partition0.topic()); + Assert.assertEquals(topic, partition0.topic().get()); Assert.assertEquals(0, partition0.partition()); Long offset = supervisor.createSequenceOffsetFromObject(100); From 3ea2b0b394dc3160cf3198655773a767a8911b3a Mon Sep 17 00:00:00 2001 From: Andrew Ho Date: Fri, 24 Apr 2026 14:23:08 -0700 Subject: [PATCH 8/9] Fix import and add coverage for RabbitStreamSupervisor --- .../RabbitStreamSupervisorTest.java | 30 +++++++++++++++++++ .../kafka/supervisor/KafkaSupervisorTest.java | 3 +- 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java index b9ab01e13248..19a5cb86d298 100644 --- a/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java +++ b/extensions-contrib/rabbit-stream-indexing-service/src/test/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisorTest.java @@ -504,5 +504,35 @@ public void testBoundedModeConfiguration() Assert.assertNotNull(rabbitSupervisorIOConfig.getBoundedStreamConfig()); Assert.assertEquals(2, rabbitSupervisorIOConfig.getBoundedStreamConfig().getStartSequenceNumbers().size()); Assert.assertEquals(2, rabbitSupervisorIOConfig.getBoundedStreamConfig().getEndSequenceNumbers().size()); + + // Create supervisor to test type conversion methods + supervisor = getSupervisor( + "supervisorId", + 1, + 1, + false, + "PT1H", + null, + null, + dataSchema, + tuningConfig + ); + + // Test createPartitionIdFromString + String queueName = supervisor.createPartitionIdFromString("queue-0"); + Assert.assertEquals("queue-0", queueName); + + // Test createSequenceOffsetFromObject with Integer + Long offset = supervisor.createSequenceOffsetFromObject(100); + Assert.assertEquals(Long.valueOf(100L), offset); + + // Test createSequenceOffsetFromObject with String + offset = supervisor.createSequenceOffsetFromObject("200"); + Assert.assertEquals(Long.valueOf(200L), offset); + + // Test isOffsetAtOrBeyond + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(500L, 100L)); + Assert.assertTrue(supervisor.isOffsetAtOrBeyond(100L, 100L)); + Assert.assertFalse(supervisor.isOffsetAtOrBeyond(50L, 100L)); } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 7c5a0610c8db..1f3bedae78fb 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -70,6 +70,7 @@ import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; @@ -5517,7 +5518,7 @@ public void testBoundedModeCreateTasksWithCorrectOffsets() throws JsonProcessing null, true, null, - new org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig(startOffsets, endOffsets) + new BoundedStreamConfig(startOffsets, endOffsets) ); Assert.assertTrue(kafkaSupervisorIOConfig.isBounded()); From 8e3e81cf2c52e30ef7c4b9cf3fab4dd998c6a9ab Mon Sep 17 00:00:00 2001 From: Andrew Ho Date: Fri, 24 Apr 2026 15:11:46 -0700 Subject: [PATCH 9/9] Test coverage for validateBoundedStreamConfig --- .../SeekableStreamSupervisorSpecTest.java | 145 ++++++++++++++++++ 1 file changed, 145 insertions(+) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java index 0efad16bc276..b914da4ebb17 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpecTest.java @@ -1171,6 +1171,151 @@ public String getSource() originalSpec.validateSpecUpdateTo(proposedSpecSameSource); } + @Test + public void test_validateBoundedStreamConfig_WithValidConfig() + { + mockIngestionSchema(); + + Map startOffsets = ImmutableMap.of("0", 100, "1", 200); + Map endOffsets = ImmutableMap.of("0", 500, "1", 600); + BoundedStreamConfig validConfig = new BoundedStreamConfig(startOffsets, endOffsets); + + SeekableStreamSupervisorIOConfig ioConfigWithValidBounded = new SeekableStreamSupervisorIOConfig( + "stream", + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false), + 1, + 1, + new Period("PT1H"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + LagAggregator.DEFAULT, + null, + null, + null, + null, + validConfig + ) + { + }; + + SeekableStreamSupervisorIngestionSpec ingestionWithValidBounded = EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(ingestionWithValidBounded.getIOConfig()).andReturn(ioConfigWithValidBounded).anyTimes(); + EasyMock.expect(ingestionWithValidBounded.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionWithValidBounded.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionWithValidBounded); + + TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec( + ingestionSchema, + ImmutableMap.of("key", "value"), + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1" + ); + + TestSeekableStreamSupervisorSpec specWithValidBounded = new TestSeekableStreamSupervisorSpec( + ingestionWithValidBounded, + ImmutableMap.of("key", "value"), + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1" + ); + + // Should not throw + spec.validateBoundedStreamConfig(specWithValidBounded); + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void test_validateBoundedStreamConfig_WithMismatchedPartitions() + { + mockIngestionSchema(); + + // Create a mock BoundedStreamConfig that returns mismatched partition sets + BoundedStreamConfig mismatchedConfig = EasyMock.mock(BoundedStreamConfig.class); + Map startMap = ImmutableMap.of("0", 100L, "1", 200L); + Map endMap = ImmutableMap.of("0", 500L, "2", 600L); + EasyMock.expect(mismatchedConfig.getStartSequenceNumbers()).andStubReturn(startMap); + EasyMock.expect(mismatchedConfig.getEndSequenceNumbers()).andStubReturn(endMap); + EasyMock.replay(mismatchedConfig); + + SeekableStreamSupervisorIOConfig ioConfigWithMismatchedBounded = EasyMock.mock(SeekableStreamSupervisorIOConfig.class); + EasyMock.expect(ioConfigWithMismatchedBounded.isBounded()).andReturn(true).anyTimes(); + EasyMock.expect(ioConfigWithMismatchedBounded.getBoundedStreamConfig()).andReturn(mismatchedConfig).anyTimes(); + EasyMock.replay(ioConfigWithMismatchedBounded); + + SeekableStreamSupervisorIngestionSpec ingestionWithMismatchedBounded = EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class); + EasyMock.expect(ingestionWithMismatchedBounded.getIOConfig()).andReturn(ioConfigWithMismatchedBounded).anyTimes(); + EasyMock.expect(ingestionWithMismatchedBounded.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionWithMismatchedBounded.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionWithMismatchedBounded); + + TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec( + ingestionSchema, + ImmutableMap.of("key", "value"), + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1" + ); + + TestSeekableStreamSupervisorSpec specWithMismatchedBounded = new TestSeekableStreamSupervisorSpec( + ingestionWithMismatchedBounded, + ImmutableMap.of("key", "value"), + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1" + ); + + MatcherAssert.assertThat( + assertThrows(DruidException.class, () -> spec.validateBoundedStreamConfig(specWithMismatchedBounded)), + new DruidExceptionMatcher( + DruidException.Persona.USER, + DruidException.Category.INVALID_INPUT, + "invalidInput" + ).expectMessageContains("Bounded stream config has mismatched partitions") + ); + } + @Test public void test_dynamicAllocationNotice_skipsScalingAndEmitsReason_ifTasksArePublishing() throws InterruptedException {