Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ protected KinesisSupervisorSpec createKinesisSupervisor(KinesisResource kinesis,
Period.seconds(60),
null, null, null, null, null, null, null, null,
false,
null,
null
),
Map.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private KinesisSupervisorSpec createKinesisSupervisorSpec(String dataSource, Str
Period.seconds(5),
null, null, null, null, null, null, null, null,
false,
null,
null
),
Map.of(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,33 @@ protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
return false;
}

@Override
protected boolean isOffsetAtOrBeyond(Long current, Long target)
{
// RabbitMQ uses Long sequence numbers (delivery tags)
return current >= target;
}

@Override
protected String createPartitionIdFromString(String partitionIdString)
{
// RabbitMQ uses String as partition ID, so just return the string as-is
return partitionIdString;
}

@Override
protected Long createSequenceOffsetFromObject(Object offsetObj)
{
// RabbitMQ uses Long as sequence offset
if (offsetObj instanceof Number) {
return ((Number) offsetObj).longValue();
}
if (offsetObj instanceof String) {
return Long.parseLong((String) offsetObj);
}
throw new IllegalArgumentException("Cannot convert " + offsetObj.getClass() + " to Long offset");
}

@Override
public LagStats computeLagStats()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
Expand Down Expand Up @@ -66,7 +67,8 @@ public RabbitStreamSupervisorIOConfig(
@JsonProperty("earlyMessageRejectionPeriod") Period earlyMessageRejectionPeriod,
@JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime,
@JsonProperty("stopTaskCount") Integer stopTaskCount,
@Nullable @JsonProperty("serverPriorityToReplicas") Map<Integer, Integer> serverPriorityToReplicas
@Nullable @JsonProperty("serverPriorityToReplicas") Map<Integer, Integer> serverPriorityToReplicas,
@Nullable @JsonProperty("boundedStreamConfig") BoundedStreamConfig boundedStreamConfig
)
{
super(
Expand All @@ -86,7 +88,8 @@ public RabbitStreamSupervisorIOConfig(
lateMessageRejectionStartDateTime,
new IdleConfig(null, null),
stopTaskCount,
serverPriorityToReplicas
serverPriorityToReplicas,
boundedStreamConfig
);

this.consumerProperties = consumerProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,80 @@ public void testURIRequired() throws Exception
mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class);
}

@Test
public void testBoundedModeSerdeWithIntegerOffsets() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"rabbit\",\n"
+ " \"stream\": \"my-stream\",\n"
+ " \"uri\": \"rabbitmq-stream://localhost:5552\",\n"
+ " \"boundedStreamConfig\": {\n"
+ " \"startSequenceNumbers\": {\"queue-0\": 100, \"queue-1\": 200},\n"
+ " \"endSequenceNumbers\": {\"queue-0\": 500, \"queue-1\": 600}\n"
+ " }\n"
+ "}";

RabbitStreamSupervisorIOConfig config = mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class);

Assert.assertTrue(config.isBounded());
Assert.assertNotNull(config.getBoundedStreamConfig());
Assert.assertEquals(2, config.getBoundedStreamConfig().getStartSequenceNumbers().size());
Assert.assertEquals(2, config.getBoundedStreamConfig().getEndSequenceNumbers().size());
}

@Test
public void testBoundedModeSerdeWithStringOffsets() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"rabbit\",\n"
+ " \"stream\": \"my-stream\",\n"
+ " \"uri\": \"rabbitmq-stream://localhost:5552\",\n"
+ " \"boundedStreamConfig\": {\n"
+ " \"startSequenceNumbers\": {\"queue-0\": \"100\", \"queue-1\": \"200\"},\n"
+ " \"endSequenceNumbers\": {\"queue-0\": \"500\", \"queue-1\": \"600\"}\n"
+ " }\n"
+ "}";

RabbitStreamSupervisorIOConfig config = mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class);

Assert.assertTrue(config.isBounded());
Assert.assertNotNull(config.getBoundedStreamConfig());
Assert.assertEquals(2, config.getBoundedStreamConfig().getStartSequenceNumbers().size());
Assert.assertEquals(2, config.getBoundedStreamConfig().getEndSequenceNumbers().size());
}

@Test
public void testBoundedModeSerdeWithMixedOffsets() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"rabbit\",\n"
+ " \"stream\": \"my-stream\",\n"
+ " \"uri\": \"rabbitmq-stream://localhost:5552\",\n"
+ " \"boundedStreamConfig\": {\n"
+ " \"startSequenceNumbers\": {\"queue-0\": 100, \"queue-1\": \"200\"},\n"
+ " \"endSequenceNumbers\": {\"queue-0\": 500, \"queue-1\": \"600\"}\n"
+ " }\n"
+ "}";

RabbitStreamSupervisorIOConfig config = mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class);

Assert.assertTrue(config.isBounded());
Assert.assertNotNull(config.getBoundedStreamConfig());
}

@Test
public void testUnboundedModeByDefault() throws Exception
{
String jsonStr = "{\n"
+ " \"type\": \"rabbit\",\n"
+ " \"stream\": \"my-stream\",\n"
+ " \"uri\": \"rabbitmq-stream://localhost:5552\"\n"
+ "}";

RabbitStreamSupervisorIOConfig config = mapper.readValue(jsonStr, RabbitStreamSupervisorIOConfig.class);

Assert.assertFalse(config.isBounded());
Assert.assertNull(config.getBoundedStreamConfig());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
Expand Down Expand Up @@ -212,6 +213,7 @@ private RabbitStreamSupervisor getSupervisor(
earlyMessageRejectionPeriod, // early message rejection
null, // latemessagerejectionstartdatetime
1,
null,
null
);
RabbitStreamIndexTaskClientFactory clientFactory = new RabbitStreamIndexTaskClientFactory(null,
Expand Down Expand Up @@ -278,6 +280,7 @@ public void testRecordSupplier()
null, // early message rejection
null, // latemessagerejectionstartdatetime
1,
null,
null
);
RabbitStreamIndexTaskClientFactory clientFactory = new RabbitStreamIndexTaskClientFactory(null,
Expand Down Expand Up @@ -421,6 +424,7 @@ public void testCreateTaskIOConfig()
null, // early message rejection
null, // latemessagerejectionstartdatetime
1,
null,
null
)
);
Expand Down Expand Up @@ -461,4 +465,74 @@ public void test_doesTaskMatchSupervisor()

Assert.assertFalse(supervisor.doesTaskMatchSupervisor(differentTaskType));
}

@Test
public void testBoundedModeConfiguration()
{
ImmutableMap<String, Integer> startOffsets = ImmutableMap.of(
"queue-0", 100,
"queue-1", 200
);
ImmutableMap<String, Integer> endOffsets = ImmutableMap.of(
"queue-0", 500,
"queue-1", 600
);

final RabbitStreamSupervisorIOConfig rabbitSupervisorIOConfig = new RabbitStreamSupervisorIOConfig(
STREAM,
URI,
INPUT_FORMAT,
1,
1,
new Period("PT1H"),
null,
null,
null,
new Period("PT30M"),
null,
null,
null,
null,
null,
null,
1000,
null,
new BoundedStreamConfig(startOffsets, endOffsets)
);

Assert.assertTrue(rabbitSupervisorIOConfig.isBounded());
Assert.assertNotNull(rabbitSupervisorIOConfig.getBoundedStreamConfig());
Assert.assertEquals(2, rabbitSupervisorIOConfig.getBoundedStreamConfig().getStartSequenceNumbers().size());
Assert.assertEquals(2, rabbitSupervisorIOConfig.getBoundedStreamConfig().getEndSequenceNumbers().size());

// Create supervisor to test type conversion methods
supervisor = getSupervisor(
"supervisorId",
1,
1,
false,
"PT1H",
null,
null,
dataSchema,
tuningConfig
);

// Test createPartitionIdFromString
String queueName = supervisor.createPartitionIdFromString("queue-0");
Assert.assertEquals("queue-0", queueName);

// Test createSequenceOffsetFromObject with Integer
Long offset = supervisor.createSequenceOffsetFromObject(100);
Assert.assertEquals(Long.valueOf(100L), offset);

// Test createSequenceOffsetFromObject with String
offset = supervisor.createSequenceOffsetFromObject("200");
Assert.assertEquals(Long.valueOf(200L), offset);

// Test isOffsetAtOrBeyond
Assert.assertTrue(supervisor.isOffsetAtOrBeyond(500L, 100L));
Assert.assertTrue(supervisor.isOffsetAtOrBeyond(100L, 100L));
Assert.assertFalse(supervisor.isOffsetAtOrBeyond(50L, 100L));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,31 @@ protected boolean isShardExpirationMarker(Long seqNum)
return false;
}

@Override
protected boolean isOffsetAtOrBeyond(Long current, Long target)
{
return current >= target;
}

@Override
protected KafkaTopicPartition createPartitionIdFromString(String partitionIdString)
{
return KafkaTopicPartition.fromString(partitionIdString);
}

@Override
protected Long createSequenceOffsetFromObject(Object offsetObj)
{
// Jackson may deserialize numbers as Integer if they fit, but Kafka needs Long
if (offsetObj instanceof Number) {
return ((Number) offsetObj).longValue();
}
if (offsetObj instanceof String) {
return Long.parseLong((String) offsetObj);
}
throw new IllegalArgumentException("Cannot convert " + offsetObj.getClass() + " to Long offset");
}

@Override
protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides;
import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.LagAggregator;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
Expand Down Expand Up @@ -78,7 +79,8 @@ public KafkaSupervisorIOConfig(
@JsonProperty("idleConfig") IdleConfig idleConfig,
@JsonProperty("stopTaskCount") Integer stopTaskCount,
@Nullable @JsonProperty("emitTimeLagMetrics") Boolean emitTimeLagMetrics,
@Nullable @JsonProperty("serverPriorityToReplicas") Map<Integer, Integer> serverPriorityToReplicas
@Nullable @JsonProperty("serverPriorityToReplicas") Map<Integer, Integer> serverPriorityToReplicas,
@Nullable @JsonProperty("boundedStreamConfig") BoundedStreamConfig boundedStreamConfig
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do the bounds interplay with the supervisor’s behavior with other properties like taskDuration, completionTimeout, etc.?

I suppose if the bounds are too wide, tasks would still roll over and all existing properties would be honored as-is. The only additional behavioral change is when this optional config is specified and the end sequence numbers are reached, the supervisor eventually moves to a terminal COMPLETED state?

Copy link
Copy Markdown
Contributor

@abhishekrb19 abhishekrb19 Apr 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would also be good to update some user-facing documentation in supervisor.md for this functionality

)
{
super(
Expand All @@ -98,7 +100,8 @@ public KafkaSupervisorIOConfig(
lateMessageRejectionStartDateTime,
idleConfig,
stopTaskCount,
serverPriorityToReplicas
serverPriorityToReplicas,
boundedStreamConfig
);

this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public KafkaSupervisorIOConfig build()
idleConfig,
stopTaskCount,
null,
null,
null
);
}
Expand Down
Loading
Loading