Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
e552869
Update reset endpoint to return a map of skipped offsets for each par…
aho135 Mar 17, 2026
0c13ba8
Checkstyle fixes
aho135 Mar 17, 2026
d5da02c
Formatting fix for response
aho135 Mar 17, 2026
4034a98
Support unsupervised task for SeekableStreamIndexTask
aho135 Mar 18, 2026
89fc7a9
Minor refactor
aho135 Mar 18, 2026
84866f4
Use addSequence helper method
aho135 Mar 19, 2026
85ce022
Automate backfill task submission for KafkaSupervisor
aho135 Mar 19, 2026
543b9c7
Switch from java.util.Optional to com.google.common.base.Optional
aho135 Mar 19, 2026
fa4b7a0
Support for configurable number of backfill tasks
aho135 Mar 20, 2026
ca99b94
Rename skippedOffsetRange to backfillRange
aho135 Mar 20, 2026
1f88656
Null check for startOffset
aho135 Mar 20, 2026
20e3cae
Validate the main supervisor has useConcurrentLocks set to true
aho135 Mar 20, 2026
f94f804
Merge branch 'master' into reset-offsets-and-backfill
aho135 Mar 20, 2026
ef17672
javadoc update
aho135 Mar 20, 2026
7d114e2
Merge branch 'reset-offsets-and-backfill' of https://github.com/aho13…
aho135 Mar 20, 2026
e3375d8
Tweak logging
aho135 Mar 25, 2026
87d75d3
Address deprecation notices
aho135 Mar 26, 2026
fb0b041
Use existing useTransaction to disable checkpointing instead of new s…
aho135 Mar 26, 2026
2343b1b
Test coverage for calculateBackfillRange
aho135 Mar 26, 2026
ee2e619
Unit test fixes
aho135 Mar 26, 2026
8a5a61e
Refactoring and Unit tests for calculateBackfillRange
aho135 Mar 27, 2026
d46b5c8
Add test cases for resetSupervisorAndBackfill
aho135 Mar 27, 2026
fbb3516
Increase code coverage for SupervisorResourceTest
aho135 Mar 27, 2026
90f9d07
Unit tests for KafkaSupervisor submitBackfillTask
aho135 Mar 27, 2026
c368e6a
Fail early and don't reset it earliest/latest offsets are empty
aho135 Mar 27, 2026
66705c0
Update SupervisorManager.java
aho135 Mar 27, 2026
fe7e6bb
Refactor into separate resetOffsetsAndBackfill endpoint
aho135 Apr 10, 2026
4e9b68c
Set backfillTaskCount through API param instead of in IOConfig
aho135 Apr 10, 2026
7c4cef9
Unit test fix
aho135 Apr 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ protected Map<String, Long> getTimeLagPerPartition(Map<String, Long> currentOffs
}

@Override
protected RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<String, Long> map)
public RabbitStreamDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<String, Long> map)
{
return new RabbitStreamDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map));
}
Expand Down Expand Up @@ -374,7 +374,7 @@ public LagStats computeLagStats()
}

@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
{
getRecordSupplierLock().lock();

Expand All @@ -401,7 +401,7 @@ protected void updatePartitionLagFromStream()
}

@Override
protected Map<String, Long> getLatestSequencesFromStream()
public Map<String, Long> getLatestSequencesFromStream()
{
return latestSequenceFromStream != null ? latestSequenceFromStream : new HashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
Expand All @@ -41,6 +42,7 @@
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskQueue;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
Expand Down Expand Up @@ -265,6 +267,132 @@ protected List<SeekableStreamIndexTask<KafkaTopicPartition, Long, KafkaRecordEnt
return taskList;
}

@Override
public void submitBackfillTask(
Map<KafkaTopicPartition, Long> startOffsets,
Map<KafkaTopicPartition, Long> endOffsets,
@Nullable Integer backfillTaskCount
)
{
if (startOffsets == null || startOffsets.isEmpty() || endOffsets == null || endOffsets.isEmpty()) {
log.info("No offsets to backfill, skipping backfill task submission");
return;
}

try {
String backfillSupervisorId = spec.getSpec().getDataSchema().getDataSource() + "_backfill";

// If backfillTaskCount is not provided, default to taskCount / 2
int taskCount = spec.getSpec().getIOConfig().getTaskCount();
int numBackfillTasks = backfillTaskCount != null ? backfillTaskCount : Math.max(1, taskCount / 2);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[P1] Reject non-positive backfill task counts before resetting offsets

backfillTaskCount comes directly from the new query parameter and can be 0 or negative. In that case numTasks becomes non-positive, the code later divides by numTasks, catches the exception, and silently skips backfill submission; because resetSupervisorAndBackfill has already reset the supervisor metadata to latest, this can acknowledge a reset while leaving the skipped range un-backfilled. Validate backfillTaskCount > 0 before performing the reset.

List<KafkaTopicPartition> partitions = new ArrayList<>(endOffsets.keySet());

// Determine actual number of tasks (can't have more tasks than partitions)
int numTasks = Math.min(numBackfillTasks, partitions.size());

log.info(
"Submitting %d backfill task(s) with supervisorId[%s] for %d partition(s)",
numTasks,
backfillSupervisorId,
partitions.size()
);

// Split partitions into groups for each task
int partitionsPerTask = partitions.size() / numTasks;
int remainder = partitions.size() % numTasks;

int startIdx = 0;
for (int taskNum = 0; taskNum < numTasks; taskNum++) {
// Distribute remainder across first few tasks
int taskPartitionCount = partitionsPerTask + (taskNum < remainder ? 1 : 0);
int endIdx = startIdx + taskPartitionCount;

List<KafkaTopicPartition> taskPartitions = partitions.subList(startIdx, endIdx);

// Create offset maps for this task's partitions only
Map<KafkaTopicPartition, Long> taskStartOffsets = new HashMap<>();
Map<KafkaTopicPartition, Long> taskEndOffsets = new HashMap<>();
for (KafkaTopicPartition partition : taskPartitions) {
Long startOffset = startOffsets.get(partition);
if (startOffset == null) {
log.info("No checkpoint has occurred before for partition [%s], setting startOffset equal to endOffset to skip data consumption", partition);
startOffset = endOffsets.get(partition);
}
taskStartOffsets.put(partition, startOffset);
taskEndOffsets.put(partition, endOffsets.get(partition));
}

String baseSequenceName = generateSequenceName(
taskStartOffsets,
null, // minimumMessageTime - process all data in range
null, // maximumMessageTime - process all data in range
spec.getSpec().getDataSchema(),
spec.getSpec().getTuningConfig()
);

KafkaSupervisorIOConfig kafkaIoConfig = spec.getSpec().getIOConfig();
KafkaIndexTaskIOConfig backfillIoConfig = new KafkaIndexTaskIOConfig(
taskNum, // taskGroupId
baseSequenceName,
null,
null,
new SeekableStreamStartSequenceNumbers<>(kafkaIoConfig.getStream(), taskStartOffsets, Collections.emptySet()),
new SeekableStreamEndSequenceNumbers<>(kafkaIoConfig.getStream(), taskEndOffsets),
kafkaIoConfig.getConsumerProperties(),
kafkaIoConfig.getPollTimeout(),
false, // useTransaction = false for backfill (no supervisor coordination)
null, // minimumMessageTime - no time filtering for backfill
null, // maximumMessageTime - no time filtering for backfill
kafkaIoConfig.getInputFormat(),
kafkaIoConfig.getConfigOverrides(),
kafkaIoConfig.isMultiTopic(),
null // refreshRejectionPeriodsInMinutes - don't refresh rejection periods for backfill
);

// Create backfill task with different supervisorId
String taskId = IdUtils.getRandomIdWithPrefix(baseSequenceName);
Map<String, Object> context = createBaseTaskContexts();
// Use APPEND locks to allow writing to intervals that may overlap with main supervisor
context.put("useConcurrentLocks", true);

KafkaIndexTask backfillTask = new KafkaIndexTask(
taskId,
backfillSupervisorId, // Use backfill supervisorId instead of spec.getId()
new TaskResource(baseSequenceName, 1),
spec.getSpec().getDataSchema(),
spec.getSpec().getTuningConfig(),
backfillIoConfig,
context,
sortingMapper,
null // no server priority for backfill tasks
);

Optional<TaskQueue> taskQueue = getTaskMaster().getTaskQueue();
if (taskQueue.isPresent()) {
log.info(
"Submitting backfill task[%s] (task %d of %d) with supervisorId[%s] for partitions %s, offsets from [%s] to [%s]",
taskId,
taskNum + 1,
numTasks,
backfillSupervisorId,
taskPartitions,
taskStartOffsets,
taskEndOffsets
);
taskQueue.get().add(backfillTask);
} else {
log.error("Failed to submit backfill task because I'm not the leader!");
break;
}

startIdx = endIdx;
}
}
catch (Exception e) {
log.error(e, "Failed to submit backfill task, skipping backfill");
}
}

@Override
protected Map<KafkaTopicPartition, Long> getPartitionRecordLag()
{
Expand Down Expand Up @@ -355,7 +483,7 @@ protected Map<KafkaTopicPartition, Long> getTimeLagPerPartition(Map<KafkaTopicPa
}

@Override
protected KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<KafkaTopicPartition, Long> map)
public KafkaDataSourceMetadata createDataSourceMetaDataForReset(String topic, Map<KafkaTopicPartition, Long> map)
{
return new KafkaDataSourceMetadata(new SeekableStreamEndSequenceNumbers<>(topic, map));
}
Expand Down Expand Up @@ -516,7 +644,7 @@ private Map<KafkaTopicPartition, Long> getTimestampPerPartitionAtCurrentOffset(S
* </p>
*/
@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
{
if (getIoConfig().isEmitTimeLagMetrics()) {
updatePartitionTimeAndRecordLagFromStream();
Expand Down Expand Up @@ -565,7 +693,7 @@ private void updateOffsetSnapshot(
}

@Override
protected Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
public Map<KafkaTopicPartition, Long> getLatestSequencesFromStream()
{
return offsetSnapshotRef.get().getLatestOffsetsFromStream();
}
Expand Down Expand Up @@ -598,17 +726,17 @@ protected boolean isMultiTopic()
* Gets the offsets as stored in the metadata store. The map returned will only contain
* offsets from topic partitions that match the current supervisor config stream. This
* override is needed because in the case of multi-topic, a user could have updated the supervisor
* config from single topic to mult-topic, where the new multi-topic pattern regex matches the
* config from single topic to multi-topic, where the new multi-topic pattern regex matches the
* old config single topic. Without this override, the previously stored metadata for the single
* topic would be deemed as different from the currently configure stream, and not be included in
* the offset map returned. This implementation handles these cases appropriately.
*
* @return the previoulsy stored offsets from metadata storage, possibly updated with offsets removed
* @return the previously stored offsets from metadata storage, possibly updated with offsets removed
* for topics that do not match the currently configured supervisor topic. Topic partition keys may also be
* updated to single topic or multi-topic depending on the supervisor config, as needed.
*/
@Override
protected Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
public Map<KafkaTopicPartition, Long> getOffsetsFromMetadataStorage()
{
final DataSourceMetadata dataSourceMetadata = retrieveDataSourceMetadata();
if (checkSourceMetadataMatch(dataSourceMetadata)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6203,6 +6203,144 @@ public int getPendingCompletionTaskGroupsCount(int groupId)
}
}

@Test
public void testSubmitBackfillTask()
{
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();

Capture<Task> capturedTasks = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskQueue.add(EasyMock.capture(capturedTasks))).andReturn(true).times(2);

replayAll();

supervisor = getTestableSupervisor(2, 4, true, false, null, null, null);

// Create start and end offsets for 3 partitions
Map<KafkaTopicPartition, Long> startOffsets = ImmutableMap.of(
new KafkaTopicPartition(false, topic, 0), 100L,
new KafkaTopicPartition(false, topic, 1), 200L,
new KafkaTopicPartition(false, topic, 2), 300L
);

Map<KafkaTopicPartition, Long> endOffsets = ImmutableMap.of(
new KafkaTopicPartition(false, topic, 0), 150L,
new KafkaTopicPartition(false, topic, 1), 250L,
new KafkaTopicPartition(false, topic, 2), 350L
);

supervisor.submitBackfillTask(startOffsets, endOffsets, null);

List<Task> tasks = capturedTasks.getValues();
Assert.assertEquals(2, tasks.size());

// Verify both tasks are KafkaIndexTask
for (Task task : tasks) {
Assert.assertTrue(task instanceof KafkaIndexTask);
KafkaIndexTask kafkaTask = (KafkaIndexTask) task;

// Verify useTransaction=false for backfill tasks
Assert.assertFalse(
"Backfill tasks should have useTransaction=false",
kafkaTask.getIOConfig().isUseTransaction()
);

// Verify task has correct datasource
Assert.assertEquals(DATASOURCE, kafkaTask.getDataSource());

// Verify offsets are within the expected range
Map<KafkaTopicPartition, Long> taskStartOffsets =
kafkaTask.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap();
Map<KafkaTopicPartition, Long> taskEndOffsets =
kafkaTask.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap();

for (Map.Entry<KafkaTopicPartition, Long> entry : taskStartOffsets.entrySet()) {
KafkaTopicPartition partition = entry.getKey();
Long startOffset = entry.getValue();
Long endOffset = taskEndOffsets.get(partition);

// Verify offsets are from our original maps
Assert.assertTrue(startOffsets.containsKey(partition));
Assert.assertEquals(startOffsets.get(partition), startOffset);
Assert.assertEquals(endOffsets.get(partition), endOffset);
}
}

verifyAll();
}

@Test
public void testSubmitBackfillTaskWithNullStartOffset()
{
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(null).anyTimes();

Capture<Task> capturedTask = Capture.newInstance();
EasyMock.expect(taskQueue.add(EasyMock.capture(capturedTask))).andReturn(true).once();

replayAll();

supervisor = getTestableSupervisor(2, 2, true, false, null, null, null);

KafkaTopicPartition partition0 = new KafkaTopicPartition(false, topic, 0);
KafkaTopicPartition partition1 = new KafkaTopicPartition(false, topic, 1);

// partition0 has a start offset, partition1 does not (null in startOffsets map)
Map<KafkaTopicPartition, Long> startOffsets = ImmutableMap.of(
partition0, 100L
// partition1 intentionally missing - simulates no checkpoint for this partition
);

Map<KafkaTopicPartition, Long> endOffsets = ImmutableMap.of(
partition0, 150L,
partition1, 250L
);

supervisor.submitBackfillTask(startOffsets, endOffsets, null);

// Verify task was submitted
Task task = capturedTask.getValue();
Assert.assertTrue(task instanceof KafkaIndexTask);
KafkaIndexTask kafkaTask = (KafkaIndexTask) task;

Map<KafkaTopicPartition, Long> taskStartOffsets =
kafkaTask.getIOConfig().getStartSequenceNumbers().getPartitionSequenceNumberMap();
Map<KafkaTopicPartition, Long> taskEndOffsets =
kafkaTask.getIOConfig().getEndSequenceNumbers().getPartitionSequenceNumberMap();

// partition0 should use its start offset
Assert.assertEquals(Long.valueOf(100L), taskStartOffsets.get(partition0));
Assert.assertEquals(Long.valueOf(150L), taskEndOffsets.get(partition0));

// partition1 should have startOffset set equal to endOffset (since start was null)
Assert.assertEquals(Long.valueOf(250L), taskStartOffsets.get(partition1));
Assert.assertEquals(Long.valueOf(250L), taskEndOffsets.get(partition1));

verifyAll();
}

@Test
public void testSubmitBackfillTaskWithEmptyOffsets()
{
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();

replayAll();

supervisor = getTestableSupervisor(2, 2, true, false, null, null, null);

// Submit with empty offsets - should return early without submitting any tasks
supervisor.submitBackfillTask(ImmutableMap.of(), ImmutableMap.of(), null);

// Verify no tasks were submitted (taskQueue.add should never be called)
verifyAll();
}

private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends TestableKafkaSupervisor
{
private final boolean isTaskCurrentReturn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ protected Map<String, Long> getTimeLagPerPartition(Map<String, String> currentOf
}

@Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(
public SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(
String stream,
Map<String, String> map
)
Expand All @@ -332,7 +332,7 @@ protected OrderedSequenceNumber<String> makeSequenceNumber(String seq, boolean i
}

@Override
protected void updatePartitionLagFromStream()
public void updatePartitionLagFromStream()
{
KinesisRecordSupplier supplier = (KinesisRecordSupplier) recordSupplier;
// this recordSupplier method is thread safe, so does not need to acquire the recordSupplierLock
Expand Down
Loading
Loading