diff --git a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
index 9be4a77a8d4e..32ac2b9c8651 100644
--- a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
+++ b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java
@@ -321,7 +321,7 @@ protected Map getTimeLagPerPartition(Map currentOffs
}
@Override
- protected RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map map)
+ public RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map map)
{
return new RabbitStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map));
}
@@ -374,7 +374,7 @@ public LagStats computeLagStats()
}
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
{
getRecordSupplierLock().lock();
@@ -401,7 +401,7 @@ protected void updatePartitionLagFromStream()
}
@Override
- protected Map getLatestSequencesFromStream()
+ public Map getLatestSequencesFromStream()
{
return latestSequenceFromStream != null ? latestSequenceFromStream : new HashMap<>();
}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
index e2f62ed8d750..ea1aa0caa151 100644
--- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
+++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java
@@ -24,6 +24,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
@@ -41,6 +42,7 @@
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
+import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
@@ -265,6 +267,132 @@ protected List startOffsets,
+ Map endOffsets,
+ @Nullable Integer backfillTaskCount
+ )
+ {
+ if (startOffsets == null || startOffsets.isEmpty() || endOffsets == null || endOffsets.isEmpty()) {
+ log.info("No offsets to backfill, skipping backfill task submission");
+ return;
+ }
+
+ try {
+ String backfillSupervisorId = spec.getSpec().getDataSchema().getDataSource() + "_backfill";
+
+ // If backfillTaskCount is not provided, default to taskCount / 2
+ int taskCount = spec.getSpec().getIOConfig().getTaskCount();
+ int numBackfillTasks = backfillTaskCount != null ? backfillTaskCount : Math.max(1, taskCount / 2);
+ List partitions = new ArrayList<>(endOffsets.keySet());
+
+ // Determine actual number of tasks (can't have more tasks than partitions)
+ int numTasks = Math.min(numBackfillTasks, partitions.size());
+
+ log.info(
+ "Submitting %d backfill task(s) with supervisorId[%s] for %d partition(s)",
+ numTasks,
+ backfillSupervisorId,
+ partitions.size()
+ );
+
+ // Split partitions into groups for each task
+ int partitionsPerTask = partitions.size() / numTasks;
+ int remainder = partitions.size() % numTasks;
+
+ int startIdx = 0;
+ for (int taskNum = 0; taskNum < numTasks; taskNum++) {
+ // Distribute remainder across first few tasks
+ int taskPartitionCount = partitionsPerTask + (taskNum < remainder ? 1 : 0);
+ int endIdx = startIdx + taskPartitionCount;
+
+ List taskPartitions = partitions.subList(startIdx, endIdx);
+
+ // Create offset maps for this task's partitions only
+ Map taskStartOffsets = new HashMap<>();
+ Map taskEndOffsets = new HashMap<>();
+ for (KafkaTopicPartition partition : taskPartitions) {
+ Long startOffset = startOffsets.get(partition);
+ if (startOffset == null) {
+ log.info("No checkpoint has occurred before for partition [%s], setting startOffset equal to endOffset to skip data consumption", partition);
+ startOffset = endOffsets.get(partition);
+ }
+ taskStartOffsets.put(partition, startOffset);
+ taskEndOffsets.put(partition, endOffsets.get(partition));
+ }
+
+ String baseSequenceName = generateSequenceName(
+ taskStartOffsets,
+ null, // minimumMessageTime - process all data in range
+ null, // maximumMessageTime - process all data in range
+ spec.getSpec().getDataSchema(),
+ spec.getSpec().getTuningConfig()
+ );
+
+ KafkaSupervisorIOConfig kafkaIoConfig = spec.getSpec().getIOConfig();
+ KafkaIndexTaskIOConfig backfillIoConfig = new KafkaIndexTaskIOConfig(
+ taskNum, // taskGroupId
+ baseSequenceName,
+ null,
+ null,
+ new SeekableStreamStartSequenceNumbers<>(kafkaIoConfig.getStream(), taskStartOffsets, Collections.emptySet()),
+ new SeekableStreamEndSequenceNumbers<>(kafkaIoConfig.getStream(), taskEndOffsets),
+ kafkaIoConfig.getConsumerProperties(),
+ kafkaIoConfig.getPollTimeout(),
+ false, // useTransaction = false for backfill (no supervisor coordination)
+ null, // minimumMessageTime - no time filtering for backfill
+ null, // maximumMessageTime - no time filtering for backfill
+ kafkaIoConfig.getInputFormat(),
+ kafkaIoConfig.getConfigOverrides(),
+ kafkaIoConfig.isMultiTopic(),
+ null // refreshRejectionPeriodsInMinutes - don't refresh rejection periods for backfill
+ );
+
+ // Create backfill task with different supervisorId
+ String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName);
+ Map context = createBaseTaskContexts();
+ // Use APPEND locks to allow writing to intervals that may overlap with main supervisor
+ context.put("useConcurrentLocks", true);
+
+ KafkaIndexTask backfillTask = new KafkaIndexTask(
+ taskId,
+ backfillSupervisorId, // Use backfill supervisorId instead of spec.getId()
+ new TaskResource(baseSequenceName, 1),
+ spec.getSpec().getDataSchema(),
+ spec.getSpec().getTuningConfig(),
+ backfillIoConfig,
+ context,
+ sortingMapper,
+ null // no server priority for backfill tasks
+ );
+
+ Optional taskQueue = getTaskMaster().getTaskQueue();
+ if (taskQueue.isPresent()) {
+ log.info(
+ "Submitting backfill task[%s] (task %d of %d) with supervisorId[%s] for partitions %s, offsets from [%s] to [%s]",
+ taskId,
+ taskNum + 1,
+ numTasks,
+ backfillSupervisorId,
+ taskPartitions,
+ taskStartOffsets,
+ taskEndOffsets
+ );
+ taskQueue.get().add(backfillTask);
+ } else {
+ log.error("Failed to submit backfill task because I'm not the leader!");
+ break;
+ }
+
+ startIdx = endIdx;
+ }
+ }
+ catch (Exception e) {
+ log.error(e, "Failed to submit backfill task, skipping backfill");
+ }
+ }
+
@Override
protected Map getPartitionRecordLag()
{
@@ -355,7 +483,7 @@ protected Map getTimeLagPerPartition(Map map)
+ public KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map map)
{
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map));
}
@@ -516,7 +644,7 @@ private Map getTimestampPerPartitionAtCurrentOffset(S
*
*/
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
{
if (getIoConfig().isEmitTimeLagMetrics()) {
updatePartitionTimeAndRecordLagFromStream();
@@ -565,7 +693,7 @@ private void updateOffsetSnapshot(
}
@Override
- protected Map getLatestSequencesFromStream()
+ public Map getLatestSequencesFromStream()
{
return offsetSnapshotRef.get().getLatestOffsetsFromStream();
}
@@ -598,17 +726,17 @@ protected boolean isMultiTopic()
* Gets the offsets as stored in the metadata store. The map returned will only contain
* offsets from topic partitions that match the current supervisor config stream. This
* override is needed because in the case of multi-topic, a user could have updated the supervisor
- * config from single topic to mult-topic, where the new multi-topic pattern regex matches the
+ * config from single topic to multi-topic, where the new multi-topic pattern regex matches the
* old config single topic. Without this override, the previously stored metadata for the single
* topic would be deemed as different from the currently configure stream, and not be included in
* the offset map returned. This implementation handles these cases appropriately.
*
- * @return the previoulsy stored offsets from metadata storage, possibly updated with offsets removed
+ * @return the previously stored offsets from metadata storage, possibly updated with offsets removed
* for topics that do not match the currently configured supervisor topic. Topic partition keys may also be
* updated to single topic or multi-topic depending on the supervisor config, as needed.
*/
@Override
- protected Map getOffsetsFromMetadataStorage()
+ public Map getOffsetsFromMetadataStorage()
{
final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
if (checkSourceMetadataMatch(dataSourceMetadata)) {
diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index b5e00bcaab4b..c085d5f0b2e9 100644
--- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -6203,6 +6203,144 @@ public int getPendingCompletionTaskGroupsCount(int groupId)
}
}
+ @Test
+ public void testSubmitBackfillTask()
+ {
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+ EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();
+
+ Capture capturedTasks = Capture.newInstance(CaptureType.ALL);
+ EasyMock.expect(taskQueue.add(EasyMock.capture(capturedTasks))).andReturn(true).times(2);
+
+ replayAll();
+
+ supervisor = getTestableSupervisor(2, 4, true, false, null, null, null);
+
+ // Create start and end offsets for 3 partitions
+ Map startOffsets = ImmutableMap.of(
+ new KafkaTopicPartition(false, topic, 0), 100L,
+ new KafkaTopicPartition(false, topic, 1), 200L,
+ new KafkaTopicPartition(false, topic, 2), 300L
+ );
+
+ Map endOffsets = ImmutableMap.of(
+ new KafkaTopicPartition(false, topic, 0), 150L,
+ new KafkaTopicPartition(false, topic, 1), 250L,
+ new KafkaTopicPartition(false, topic, 2), 350L
+ );
+
+ supervisor.submitBackfillTask(startOffsets, endOffsets, null);
+
+ List tasks = capturedTasks.getValues();
+ Assert.assertEquals(2, tasks.size());
+
+ // Verify both tasks are KafkaIndexTask
+ for (Task task : tasks) {
+ Assert.assertTrue(task instanceof KafkaIndexTask);
+ KafkaIndexTask kafkaTask = (KafkaIndexTask) task;
+
+ // Verify useTransaction=false for backfill tasks
+ Assert.assertFalse(
+ "Backfill tasks should have useTransaction=false",
+ kafkaTask.getIOConfig().isUseTransaction()
+ );
+
+ // Verify task has correct datasource
+ Assert.assertEquals(DATASOURCE, kafkaTask.getDataSource());
+
+ // Verify offsets are within the expected range
+ Map taskStartOffsets =
+ kafkaTask.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap();
+ Map taskEndOffsets =
+ kafkaTask.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap();
+
+ for (Map.Entry entry : taskStartOffsets.entrySet()) {
+ KafkaTopicPartition partition = entry.getKey();
+ Long startOffset = entry.getValue();
+ Long endOffset = taskEndOffsets.get(partition);
+
+ // Verify offsets are from our original maps
+ Assert.assertTrue(startOffsets.containsKey(partition));
+ Assert.assertEquals(startOffsets.get(partition), startOffset);
+ Assert.assertEquals(endOffsets.get(partition), endOffset);
+ }
+ }
+
+ verifyAll();
+ }
+
+ @Test
+ public void testSubmitBackfillTaskWithNullStartOffset()
+ {
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+ EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();
+
+ Capture capturedTask = Capture.newInstance();
+ EasyMock.expect(taskQueue.add(EasyMock.capture(capturedTask))).andReturn(true).once();
+
+ replayAll();
+
+ supervisor = getTestableSupervisor(2, 2, true, false, null, null, null);
+
+ KafkaTopicPartition partition0 = new KafkaTopicPartition(false, topic, 0);
+ KafkaTopicPartition partition1 = new KafkaTopicPartition(false, topic, 1);
+
+ // partition0 has a start offset, partition1 does not (null in startOffsets map)
+ Map startOffsets = ImmutableMap.of(
+ partition0, 100L
+ // partition1 intentionally missing - simulates no checkpoint for this partition
+ );
+
+ Map endOffsets = ImmutableMap.of(
+ partition0, 150L,
+ partition1, 250L
+ );
+
+ supervisor.submitBackfillTask(startOffsets, endOffsets, null);
+
+ // Verify task was submitted
+ Task task = capturedTask.getValue();
+ Assert.assertTrue(task instanceof KafkaIndexTask);
+ KafkaIndexTask kafkaTask = (KafkaIndexTask) task;
+
+ Map taskStartOffsets =
+ kafkaTask.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap();
+ Map taskEndOffsets =
+ kafkaTask.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap();
+
+ // partition0 should use its start offset
+ Assert.assertEquals(Long.valueOf(100L), taskStartOffsets.get(partition0));
+ Assert.assertEquals(Long.valueOf(150L), taskEndOffsets.get(partition0));
+
+ // partition1 should have startOffset set equal to endOffset (since start was null)
+ Assert.assertEquals(Long.valueOf(250L), taskStartOffsets.get(partition1));
+ Assert.assertEquals(Long.valueOf(250L), taskEndOffsets.get(partition1));
+
+ verifyAll();
+ }
+
+ @Test
+ public void testSubmitBackfillTaskWithEmptyOffsets()
+ {
+ EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+ EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
+ EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
+
+ replayAll();
+
+ supervisor = getTestableSupervisor(2, 2, true, false, null, null, null);
+
+ // Submit with empty offsets - should return early without submitting any tasks
+ supervisor.submitBackfillTask(ImmutableMap.of(), ImmutableMap.of(), null);
+
+ // Verify no tasks were submitted (taskQueue.add should never be called)
+ verifyAll();
+ }
+
private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends TestableKafkaSupervisor
{
private final boolean isTaskCurrentReturn;
diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 08491caa8ff5..9b434da54180 100644
--- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -317,7 +317,7 @@ protected Map getTimeLagPerPartition(Map currentOf
}
@Override
- protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
+ public SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset(
String stream,
Map map
)
@@ -332,7 +332,7 @@ protected OrderedSequenceNumber makeSequenceNumber(String seq, boolean i
}
@Override
- protected void updatePartitionLagFromStream()
+ public void updatePartitionLagFromStream()
{
KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier;
// this recordSupplier method is thread safe, so does not need to acquire the recordSupplierLock
diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
index 51abb814a6dd..81e361bd057f 100644
--- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
+++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java
@@ -21,8 +21,10 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import org.apache.druid.common.guava.FutureUtils;
@@ -37,6 +39,8 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@@ -52,6 +56,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -303,7 +308,7 @@ public void stop()
Preconditions.checkState(started, "SupervisorManager not started");
List> stopFutures = new ArrayList<>();
synchronized (lock) {
- log.info("Stopping [%d] supervisors", supervisors.keySet().size());
+ log.info("Stopping [%d] supervisors", supervisors.size());
for (String id : supervisors.keySet()) {
try {
stopFutures.add(supervisors.get(id).lhs.stopAsync());
@@ -395,6 +400,137 @@ public boolean resetSupervisor(String id, @Nullable DataSourceMetadata resetData
return true;
}
+ /**
+ * Resets a supervisor to latest offsets and backfills the skipped offset ranges.
+ * Requirements:
+ * - Supervisor must be a SeekableStreamSupervisor
+ * - useEarliestOffset must be false (otherwise supervisor always starts from earliest)
+ * - useConcurrentLocks must be true
+ * - Supervisor must be RUNNING - needs active stream connection
+ * @param id supervisor ID
+ * @return Map containing supervisorId and skipped offset ranges
+ * @throws IllegalArgumentException if supervisor doesn't exist or if useEarliestOffset is true
+ * @throws IllegalStateException if supervisor is not running or if either checkpointed or latest offsets is empty
+ */
+ public Map resetSupervisorAndBackfill(String id, @Nullable Integer backfillTaskCount)
+ {
+ Preconditions.checkState(started, "SupervisorManager not started");
+ Preconditions.checkNotNull(id, "id");
+
+ Pair supervisorPair = supervisors.get(id);
+ if (supervisorPair == null || supervisorPair.lhs == null || supervisorPair.rhs == null) {
+ throw new IAE("Supervisor[%s] does not exist", id);
+ }
+ if (!(supervisorPair.lhs instanceof SeekableStreamSupervisor)) {
+ throw new IAE("Supervisor[%s] is not a SeekableStreamSupervisor", id);
+ }
+ SeekableStreamSupervisor streamSupervisor = (SeekableStreamSupervisor) supervisorPair.lhs;
+ SeekableStreamSupervisorSpec streamSpec = (SeekableStreamSupervisorSpec) supervisorPair.rhs;
+
+ // Verify useEarliestOffset is false
+ if (streamSupervisor.getIoConfig().isUseEarliestSequenceNumber()) {
+ throw new IAE("Reset with skipped offsets is not supported when useEarliestOffset is true.");
+ }
+
+ // Verify useConcurrentLocks is enabled
+ if (streamSpec.getContext() == null || !Boolean.TRUE.equals(streamSpec.getContext().get("useConcurrentLocks"))) {
+ throw new IAE(
+ "Backfill tasks require 'useConcurrentLocks' to be set to true in the supervisor context to allow concurrent writes with the main supervisor tasks"
+ );
+ }
+
+ // We need an active recordSupplier to query the latest offsets from the stream
+ if (supervisorPair.lhs.getState() != SupervisorStateManager.BasicState.RUNNING) {
+ throw new ISE("A running supervisor is required to query the latest offsets from the stream");
+ }
+
+ log.info("Capturing latest offsets from stream for supervisor[%s]", id);
+ streamSupervisor.updatePartitionLagFromStream();
+ Map, ?> latestOffsets = streamSupervisor.getLatestSequencesFromStream();
+
+ log.info("Capturing checkpointed offsets for supervisor[%s]", id);
+ Map, ?> startOffsets = streamSupervisor.getOffsetsFromMetadataStorage();
+
+ // Validate that we successfully retrieved offsets
+ if (latestOffsets == null || latestOffsets.isEmpty()) {
+ throw new ISE("Skipping reset: Failed to get latest offsets from stream for supervisor[%s]", id);
+ }
+ if (startOffsets == null || startOffsets.isEmpty()) {
+ throw new ISE("Skipping reset: Failed to get checkpointed offsets for supervisor[%s]", id);
+ }
+
+ log.info("Resetting supervisor[%s] metadata to latest offsets", id);
+ DataSourceMetadata resetMetadata = streamSupervisor.createDataSourceMetaDataForReset(
+ streamSupervisor.getIoConfig().getStream(),
+ latestOffsets
+ );
+
+ streamSupervisor.resetOffsets(resetMetadata);
+
+ // Reset autoscaler if present
+ SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
+ if (autoscaler != null) {
+ autoscaler.reset();
+ }
+
+ Map, Object> backfillRange = calculateBackfillRange(startOffsets, latestOffsets);
+
+ streamSupervisor.submitBackfillTask(startOffsets, latestOffsets, backfillTaskCount);
+
+ log.info("Successfully reset supervisor[%s] to latest. Backfill range: %s", id, backfillRange);
+
+ return ImmutableMap.of(
+ "id", id,
+ "backfillRange", backfillRange
+ );
+ }
+
+ /**
+ * Calculates the backfill range between start and end offsets for display purposes.
+ * Returns a map with partition ID as key and offset range details as value.
+ *
+ * @param startOffsets Starting offsets (last checkpointed)
+ * @param endOffsets Ending offsets (latest from stream)
+ * @return Map of partition ID to offset range [start, end]
+ */
+ @VisibleForTesting
+ public Map, Object> calculateBackfillRange(
+ Map, ?> startOffsets,
+ Map, ?> endOffsets
+ )
+ {
+ Map