Skip to content

Implementing perpetually running tasks for Streaming Ingestion#18466

Open
uds5501 wants to merge 49 commits intoapache:masterfrom
uds5501:supervisor-reassignment-rollovers
Open

Implementing perpetually running tasks for Streaming Ingestion#18466
uds5501 wants to merge 49 commits intoapache:masterfrom
uds5501:supervisor-reassignment-rollovers

Conversation

@uds5501
Copy link
Copy Markdown
Contributor

@uds5501 uds5501 commented Sep 2, 2025

Proposed changes

As part of this PR, I aim to introduce perpetually running tasks for seekable ingestion (tasks that should never shut down). To enable this feature, users need to specify usePerpetuallyRunningTasks:true in the supervisor spec.

Once users have enabled this flag, they can expect the following changes in Supervisor and SeekableStreamIndexTaskRunner behaviours:

Task rollovers

  • Existing behaviour: if the supervisor identifies that a running task group has been running for > configured duration, it'll be checkpointed and moved to pendingTaskGroup and eventually shut down.
  • New behaviour: The supervisor will not attempt to checkpoint and shut down the tasks in the task groups (ioConfig.taskDuration is ignored).

Auto Scaling

  • Existing behaviour: If a dynamic autoscale event is triggered, an early stop is requested and all the existing tasks are shut down, in memory data structures are cleared and supervisor's running loop creates new task groups in the next run.
  • New behaviour: The tasks will no longer be shut down, instead the workflow will look like the one mentioned below
image

To felicitate the coordination of this flow, there will be a flag each on the supervisor and task runner end.

  • On supervisor, isDynamicAllocationOngoing flag ensures that another scale event is not accepted until the existing one is finished and it determines whether the checkpoint action needs to call the update config to the other tasks.
  • On task runner, a waitForConfigUpdate is set true when pauseAndCheckpoint is triggered that can only be unset on /configUpdate.

Note for scaledowns: For scaledowns, we'll for now shut down the extra tasks (so that we don't have to remember that there's another task group currently running that we could send the partitions to).

Checkpoint mechanism

  • Existing behaviour: /offsets/end used to resume the tasks with new sequence added in the task runner.
  • New behaviour: /offsets/end will update the end of the latest sequence but not add a new sequence in the sequences list if the task runner is waiting for a config update. The task resume responsibility will now land on /configUpdate API in the event of auto scaling.

Druid cluster upgrades

  • There won't be any change (if early handoffs are requested for a supervisor)

Partition <> Task Mapping assignment

  • Earlier it used to be mod based mapping, it'll now be range based mapping.

Misc.

This will cover bits that are necessary to ensure that the supervisor don't end up killing tasks that are valid / other race conditions

Supervisor

  • Once a update config is completed for a task, it's registered in memory and storage via TaskQueue.update() so that in the next runInternal() loop, supervisor knows about the change of ioConfig that took place during task discovery.
  • Additionally, on an update, we update the task group's starting sequences in the activelyRuningTaskGroups.
  • isCheckpointSignatureValid() has been introduced to return true for now , earlier the check verified that partitions and offsets are the same in the checkpoints however, with the possible change of partition mapping in tasks, this no longer holds true.

Task Runner

  • All the points where the main run loop could've been broken have been patched (so that the loop never stops).

What the overall changes look like:

  • [TaskRunner] Implement a /pauseAndCheckpoint API.
  • [TaskRunner] Changes in /offsets/end POST API .
    • Ensure the task is not resumed if the runner has been communicated that a config update will happen.
  • [TaskRunner] Expose current config being used by the task runner via /configs API.
  • [Supervisor] Ensure that task group's still recognizes the updated tasks.
    • Allow change of task group's starting sequences.
    • Update validations during checkpoints in both supervisor and task runner to adhere to perpetually running task demands.
  • [Supervisor] Changes for partition <> task group assignment, it's currently done in Round-Robin basis, we'll start doing it sequentially instead to ensure minimum changes occur at the time of autoscaler hits.
  • [Supervisor] Update the dynamic configuration step to ensure that graceful shut down is no longer triggered, instead all the tasks in all taskGroups are paused and the /updateConfig with updated partitions is called across task groups. (this needs to be guarded behind a toggle).
  • [Overall] Wrtiting embedded tests for autoscaler verifications.
  • [TaskStorage & TaskQueue] Provisions to ensure task ioConfig payloads can be updated for existing tasks.

Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @uds5501 !
The approach makes sense to me, let's try to add some embedded Kafka tests for this.

Comment on lines +1730 to +1734
this.ioConfig = req.getIoConfig();
this.stream = ioConfig.getStartSequenceNumbers().getStream();
this.endOffsets = new ConcurrentHashMap<>(ioConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap());
minMessageTime = Configs.valueOrDefault(ioConfig.getMinimumMessageTime(), DateTimes.MIN);
maxMessageTime = Configs.valueOrDefault(ioConfig.getMaximumMessageTime(), DateTimes.MAX);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put this part in a new method so that constructor can also reuse the code.

public Response updateConfig(TaskConfigUpdateRequest<PartitionIdType, SequenceOffsetType> req) throws InterruptedException
{
try {
requestPause();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should call pause() instead of requestPause(), since the latter only requests a pause but doesn't ensure that we reach a paused state.
If the pause() call returns non-OK response, we should return the same response immediately.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before starting with pause, we can log the new config that we are trying to update to.
After the update finishes, let's log the new config and also emit an event.

* Creates new sequences for the ingestion process. It currently accepts the ioConfig given by the request as the correct offsets
* and ignores the offsets it may have stored in currOffsets and endOffsets.
*/
private void createNewSequenceFromIoConfig(SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the 2 new methods, the SeekableStreamIndexTaskRunner must already be performing these actions.
Let's try to put them in common methods so that we can use the same method in the normal flow as well as on update config.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider looking at the Kafka StickyAssignor implementation for inspiration here.
The Kafka StickyAssignor is a partition assignment strategy for Kafka consumers within a consumer group. Its primary goal is to achieve both a balanced distribution of partitions among consumers and to minimize the movement of partitions during rebalances.

@kfaraz
Copy link
Copy Markdown
Contributor

kfaraz commented Sep 4, 2025

@uds5501 , we should probably also add an API to get the latest config from the task since it would have diverged from the original task payload.

@uds5501 uds5501 changed the title Partition reassignment for a running task [WIP] Partition reassignment for a running task Sep 4, 2025
@uds5501 uds5501 force-pushed the supervisor-reassignment-rollovers branch from f9c98ee to aa67322 Compare September 5, 2025 09:53
@uds5501
Copy link
Copy Markdown
Contributor Author

uds5501 commented Sep 8, 2025

Edit : This has now been included in the original approach :)


There's a flaw in the original approach. When the autoscaler event is triggered, the offsets are of time t0.
When an autoscaler is triggered, the supervisor attempts to set the offsets present at t0, however, as part of the updateConfig, a checkpoint is triggered by the runners, this makes the offsets t1 (offset t1 >= offset t0), so now the offsets being sent at updateConfig time are irrelevant.

Instead, the chronology has to be:

  • [Supervisor] sets an internal isUpdatingConfig flag internally, hits the updateConfig API.
  • [TaskRunner] isConfigChangeOngoing=true , task runner is paused and checkpoint is triggered.
  • [Supervisor] Hits the setEndOffset API, checks if there was an ongoing isUpdatingConfig (continue normally, if not then attempt creating the new sequence as part of this endOffsetCall and toggle off the isConfigChangeOngoing in the runner).

Concerns:

  • However this breaks the current design, the updateConfig API is not really updating config anymore, instead it's just performing a pause and forcing a checkpoint, so do we rename this to pauseAndCheckpoint?
  • This API will be very specific to just catering the auto scaler repartitioning of perpetually running task and won't be extensible (at least in any other way I could look at it) for general config updates.

@uds5501
Copy link
Copy Markdown
Contributor Author

uds5501 commented Sep 9, 2025

New approach will perform the following:

  1. Supervisor will hit a new API /pauseAndCheckpoint on the task runners. [Might end up using /pause with a flag but for the POC, continuing with this]
  2. Supervisor waits till all the checkpoints are completed using a latch.
  3. Supervisor then triggers the /updateConfig with new partitions
  4. Task runner resumes the flow.

@uds5501 uds5501 force-pushed the supervisor-reassignment-rollovers branch from 9f608bd to dcd3549 Compare September 11, 2025 07:30
@uds5501 uds5501 marked this pull request as ready for review September 18, 2025 17:09
Comment on lines +421 to +423
final int numSegments = Integer.parseInt(
cluster.runSql("SELECT COUNT(*) FROM sys.segments WHERE datasource = '%s'", dataSource)
);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException

Potential uncaught 'java.lang.NumberFormatException'.
Comment on lines +426 to +428
final int numRows = Integer.parseInt(
cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)
);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException

Potential uncaught 'java.lang.NumberFormatException'.
Comment on lines +235 to +237
final int numSegments = Integer.parseInt(
cluster.runSql("SELECT COUNT(*) FROM sys.segments WHERE datasource = '%s'", dataSource)
);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException

Potential uncaught 'java.lang.NumberFormatException'.
Comment on lines +240 to +242
final int numRows = Integer.parseInt(
cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)
);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException

Potential uncaught 'java.lang.NumberFormatException'.
Comment on lines +421 to +423
final int numSegments = Integer.parseInt(
cluster.runSql("SELECT COUNT(*) FROM sys.segments WHERE datasource = '%s'", dataSource)
);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException

Potential uncaught 'java.lang.NumberFormatException'.
Comment on lines +426 to +428
final int numRows = Integer.parseInt(
cluster.runSql("SELECT COUNT(*) FROM %s", dataSource)
);

Check notice

Code scanning / CodeQL

Missing catch of NumberFormatException

Potential uncaught 'java.lang.NumberFormatException'.

SeekableStreamStartSequenceNumbers<KafkaTopicPartition, Long> startSequenceNumbers =
new SeekableStreamStartSequenceNumbers<>(
spec.getIoConfig().getStream(),

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [KafkaSupervisorSpec.getIoConfig](1) should be avoided because it has been deprecated.
existingTaskGroup.getMaximumMessageTime(),
spec.getIoConfig().getInputFormat(),
spec.getIoConfig().getConfigOverrides(),
spec.getIoConfig().isMultiTopic(),

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [KafkaSupervisorSpec.getIoConfig](1) should be avoided because it has been deprecated.
spec.getIoConfig().getInputFormat(),
spec.getIoConfig().getConfigOverrides(),
spec.getIoConfig().isMultiTopic(),
spec.getIoConfig().getTaskDuration().getStandardMinutes()

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [KafkaSupervisorSpec.getIoConfig](1) should be avoided because it has been deprecated.
}

SeekableStreamIndexTaskTuningConfig ss = spec.getSpec().getTuningConfig().convertToTaskTuningConfig();
SeekableStreamIndexTaskTuningConfig ss = spec.getSpec().getTuningConfig().convertToTaskTuningConfig(spec.usePerpetuallyRunningTasks());

Check notice

Code scanning / CodeQL

Unread local variable

Variable 'SeekableStreamIndexTaskTuningConfig ss' is never read.
/**
* Test implementation of OrderedSequenceNumber for Long values
*/
private static class TestSequenceNumber extends OrderedSequenceNumber<Long>

Check warning

Code scanning / CodeQL

Inconsistent compareTo

This class declares [compareTo](1) but inherits equals; the two could be inconsistent.
/**
* Test implementation that throws exceptions on comparison
*/
private static class TestExceptionSequenceNumber extends OrderedSequenceNumber<Long>

Check warning

Code scanning / CodeQL

Inconsistent compareTo

This class declares [compareTo](1) but inherits equals; the two could be inconsistent.
@uds5501 uds5501 changed the title Implementing perpetually running tasks for Steaming Ingestion Implementing perpetually running tasks for Streaming Ingestion Sep 21, 2025
Copy link
Copy Markdown
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes, @uds5501 !
I have taken a pass through the SeekableStreamIndexTaskRunner, left some initial feedback.

Will take a look at the supervisor side changes after this.

Also, as discussed offline, I think we should not be trying to update all the task payloads whenever there is a scaling event.

Instead, we should perhaps do the following:

  • Add a new nullable field, say taskIoConfigBuilder (or some other better name) to SeekableStreamSupervisorSpec
  • this field should have enough info to create task ioConfigs as needed
  • include a String version inside the taskIoConfigBuilder
  • this same version should be sent to the tasks in TaskConfigUpdateRequest
  • update the supervisor spec whenever there is a scaling event

Persisting the update in the supervisor spec has multiple benefits:

  • Supervisor table is already versioned. So we have the history at our disposal.
  • We need not update all the task payloads in metadata store every time there is a scaling event.
    (There can be a large number of tasks. Also this breaks the current model of immutable Task payloads).
  • The version provides the supervisor with an easy way to quickly check if a task
    is running the same spec/ioConfig or not.

The only drawback is that changes made to the supervisor spec due to scaling
might be difficult to distinguish from changes made by the user.
But we can remedy that with some update message (The audit logs could also be used for this).
Haven't thought this through completely, will try to give it some more thought.

final int compareToEnd = this.compareTo(end);
return isEndOffsetExclusive ? compareToEnd < 0 : compareToEnd <= 0;
}
catch (Exception e) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What kind of exception can happen here? I don't think we should be catching it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, we've already handled the end's sequence number to be null, we don't need to capture this exception.


@Test
@Timeout(60)
public void test_ingest20kRows_ofSelfClusterMetricsWithScaleOuts_andVerifyValues()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would make more sense to put these test methods in a new test class which is more focused on scaling, task duration etc. This test class KafkaClusterMetricsTest was mostly about ingesting metrics of a cluster.

The new test class can be called something like KafkaTaskScalingTest and it can use data directly published to Kafka topic (similar to KafkaSupervisorTest) rather than self cluster metrics.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some typical test cases can be something like:
(all of these cases should be added to a new class KafkaTaskScalingTest)

Case 1:

  • No auto-scaling
  • Task duration = 0.5s, perpetual = false
  • Start the supervisor
  • Do not publish any data to Kafka topic
  • Verify that tasks finish within 1s (+ completion time might take the total duration to 2 or 3s but definitely < 5s)
  • Stop the supervisor

Case 2:

  • No auto-scaling
  • Task duration = 0.5s, perpetual = true
  • Start the supervisor
  • Do not publish any data to Kafka topic
  • Verify that tasks do not finish even after 10s
  • Stop the supervisor

Case 3:

  • Auto-scaling enabled
  • Task duration = 0.5s, perpetual = true
  • Start the supervisor
  • Do not publish any data to Kafka topic
  • Verify that tasks are scaled down after some time
  • Stop the supervisor

Case 4:

  • Auto-scaling enabled
  • Task duration = 0.5s, perpetual = true
  • Start the supervisor
  • Publish some data and force the task to enter into some lag that triggers scale up action
    (you may use druid.unsafe.cluster.testing for this, see ClusterTestingModule)
  • Verify that tasks are scaled up after some time.
  • Verify all kinds of events here - that the tasks were paused and checkpointed, then they waited for config update, and then they were finally resumed
  • Don't send any more data
  • Verify that the tasks are scaled down
  • Verify the events again
  • Stop the supervisor

@Nullable
private final InputRowParser<ByteBuffer> parser;
private final String stream;
private String stream;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The input source stream never changes for a supervisor. See SeekableStreamSupervisorSpec.validateSpecUpdateTo().

private volatile DateTime minMessageTime;
private volatile DateTime maxMessageTime;
private final ScheduledExecutorService rejectionPeriodUpdaterExec;
private AtomicBoolean waitForConfigUpdate = new AtomicBoolean(false);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private AtomicBoolean waitForConfigUpdate = new AtomicBoolean(false);
private final AtomicBoolean waitForConfigUpdate = new AtomicBoolean(false);

//milliseconds waited for created segments to be handed off
long handoffWaitMs = 0L;

log.info("Task perpetually running: %s", task.isPerpetuallyRunning());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this log line to runInternal.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already in runInternal()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏻

Move this log line to the start of this method.

Suggested change
log.info("Task perpetually running: %s", task.isPerpetuallyRunning());
log.info("Running task[%s] in persisted[%s] mode.", task.getId(), task.isPerpetuallyRunning());

return Response.status(409).entity("Task must be paused for checkpoint completion before updating config").build();
}
try {
log.info("Attempting to update config to [%s]", request.getIoConfig());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please move the logic inside the try into a separate private method, and add a short javadoc to it outlining the steps involved.

log.info("Attempting to update config to [%s]", request.getIoConfig());

SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> newIoConfig = (SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType>)
toolbox.getJsonMapper().convertValue(request.getIoConfig(), SeekableStreamIndexTaskIOConfig.class);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why convert? Isn't the payload already a SeekableStreamTaskIOConfig?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I thought it required conversion to understand the generics, going to remove the conversion.

*/
public class TaskConfigUpdateRequest
{
private final SeekableStreamIndexTaskIOConfig ioConfig;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this field and the class have generic args for partition id type and offset type?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember some discussion earlier where if we set the partition id types etc here, the supervisor may change the partition id type and that will be invalid later, so decided to strip generics from here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, let's retain the generics. It will work as expected.

seekToStartingSequence(recordSupplier, assignment);
} else {
// if there is no assignment, It means that there was no partition assigned to this task after scaling down.
pause();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't the task already be in a paused state?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be, fair enough

createNewSequenceFromIoConfig(newIoConfig);

assignment = assignPartitions(recordSupplier);
boolean shouldResume = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than this boolean, just call resume() inside the if.

Comment on lines +135 to +136
spec.getSpec().getIOConfig().getConfigOverrides(),
spec.getSpec().getIOConfig().isMultiTopic()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this change is needed. spec.getIoConfig() already does the right thing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's deprecated and IDE + github seems to be complaining about that.

Copy link
Copy Markdown
Contributor

@kfaraz kfaraz Sep 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Although, let's postpone it for a later PR if possible.

Comment on lines +145 to +155
if (spec.usePerpetuallyRunningTasks()) {
int taskGroupId = getRangeBasedTaskGroupId(partitionId, taskCount);
log.debug("Range-based assignment for partition [%s]: taskGroupId [%d] when taskCount is [%d]", partitionId, taskGroupId, taskCount);
return taskGroupId;
} else {
if (partitionId.isMultiTopicPartition()) {
return Math.abs(31 * partitionId.topic().hashCode() + partitionId.partition()) % taskCount;
} else {
return partitionId.partition() % taskCount;
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please simplify this if:

Suggested change
if (spec.usePerpetuallyRunningTasks()) {
int taskGroupId = getRangeBasedTaskGroupId(partitionId, taskCount);
log.debug("Range-based assignment for partition [%s]: taskGroupId [%d] when taskCount is [%d]", partitionId, taskGroupId, taskCount);
return taskGroupId;
} else {
if (partitionId.isMultiTopicPartition()) {
return Math.abs(31 * partitionId.topic().hashCode() + partitionId.partition()) % taskCount;
} else {
return partitionId.partition() % taskCount;
}
}
if (partitionId.isMultiTopicPartition()) {
return Math.abs(31 * partitionId.topic().hashCode() + partitionId.partition()) % taskCount;
} else if (spec.usePerpetuallyRunningTasks()) {
return getRangeBasedTaskGroupId(partitionId, taskCount);
} else {
return partitionId.partition() % taskCount;
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this altogether.

{
int minPartitionsPerTaskGroup = totalPartitions / taskCount;

if (partitionId.isMultiTopicPartition()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method shouldn't need to handle multi topic stuff right now.

@jtuglu1 jtuglu1 self-requested a review September 24, 2025 04:54
return suspended;
}

public Optional<String> getVersion()

Check notice

Code scanning / CodeQL

Missing Override annotation

This method overrides [SupervisorSpec.getVersion](1); it is advisable to add an Override annotation.
Copy link
Copy Markdown
Contributor

@jtuglu1 jtuglu1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did a quick initial pass, changes LGTM – thanks! I'll take another look at the partition re-assignment logic in a bit.

Have we tested on a rolling update between overlords? Or just a leadership change?

}
}

private Void updateEntryWithHandle(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: perhaps add some javadoc heading like insertEntryWithHandle.

@JsonCreator
public TaskConfigUpdateRequest(
@JsonProperty("ioConfig") @Nullable SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig,
@JsonProperty("supervisorSpecVersion") String supervisorSpecVersion
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we assert this is non-null like we do for other task actions' required params?

)
{
this.ioConfig = ioConfig;
this.supervisorSpecVersion = supervisorSpecVersion;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same null check

}
if (isDynamicAllocationOngoing.get()) {
checkpointsToWaitFor -= setEndOffsetFutures.size();
if (checkpointsToWaitFor <= 0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When can this be < 0?

checkpointsToWaitFor -= setEndOffsetFutures.size();
if (checkpointsToWaitFor <= 0) {
log.info("All tasks in current task groups have been checkpointed, resuming dynamic allocation");
pendingConfigUpdateHook.call();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is pendingConfigUpdateHook written/read across threads? Might want to make this volatile or an atomic ref

// For end sequences, use NOT_SET to indicate open-ended reading
Map<KafkaTopicPartition, Long> endingSequences = new HashMap<>();
for (KafkaTopicPartition partition : partitions) {
endingSequences.put(partition, END_OF_PARTITION);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

END_OF_PARTITION or NOT_SET? Comment is a bit misleading

log.info("Task [%s] paused successfully & Checkpoint requested successffully", id);
return deserializeOffsetsMap(r.getContent());
} else if (r.getStatus().equals(HttpResponseStatus.ACCEPTED)) {
return null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we expect to see this kind of response?

//milliseconds waited for created segments to be handed off
long handoffWaitMs = 0L;

log.info("Task perpetually running: %s", task.isPerpetuallyRunning());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add task ID to this log (unless thread ID is there).

return Response.ok().entity("Task is already paused for checkpoint completion").build();
}
Response pauseResponse = pause();
if (pauseResponse.getStatus() == 409) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

.getStatus().equals(HttpResponseStatus.CONFLICT)

Response pauseResponse = pause();
if (pauseResponse.getStatus() == 409) {
waitForConfigUpdate.set(false);
return pauseResponse;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly naive: do we need to resume() here?

}

@Override
public void update(String id, Task entry)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this.

}

@Test
public void testUpdateTask()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is needed anymore.

// The default implementation does not do any validation checks.
}

default Optional<String> getVersion()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't put version into the SupervisorSpec since there is already a VersionedSupervisorSpec.
You can maintain the version as a separate variable in the supervisor or just use a VersionedSupervisorSpec.

this.interval = interval;
}

private TestTask(String id, String dataSource, Interval interval, Map<String, Object> context)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed anymore?

TestSequenceNumber current = new TestSequenceNumber(null, false);
TestSequenceNumber end = new TestSequenceNumber(10L, false);

Assert.assertThrows(NullPointerException.class, () -> current.isMoreToReadBeforeReadingRecord(end, false));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please put args in separate lines.

Comment on lines +636 to +638
private boolean changeTaskCountForPerpetualTasks(int desiredActiveTaskCount,
Runnable successfulScaleAutoScalerCallback
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the formatting.

Comment on lines +636 to +638
private boolean changeTaskCountForPerpetualTasks(int desiredActiveTaskCount,
Runnable successfulScaleAutoScalerCallback
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please fix the formatting.

public void runInternal()
{
if (isDynamicAllocationOngoing.get()) {
log.info("Skipping run because dynamic allocation is ongoing.");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can get noisy as task scaling can be a slow operation.

.get()
.withDataSchema(schema -> schema.withTimestamp(new TimestampSpec("timestamp", "iso", null)))
.withTuningConfig(tuningConfig -> tuningConfig.withMaxRowsPerSegment(maxRowsPerSegment))
.withTuningConfig(tuningConfig -> tuningConfig
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert all the changes to this file.

* Embedded test to verify task scaling behaviour of {@code KafkaSupervisor} ingesting from a custom kafka topic.
*/
@SuppressWarnings("resource")
public class KafkaTaskScalingTest extends EmbeddedClusterTestBase
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this class separate from KafkaTaskAutoScalingTest. We can merge the two classes into one.

.build();
} else {
try {
// Don't acquire a lock if the task is already paused for checkpoint completion, avoiding deadlock
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is not valid anymore

public Response setEndOffsets(
Map<PartitionIdType, SequenceOffsetType> sequenceNumbers,
boolean finish // this field is only for internal purposes, shouldn't be usually set by users
boolean finish, // this field is only for internal purposes, shouldn't be usually set by users
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This old comment does not add any value. Please remove it.

Suggested change
boolean finish, // this field is only for internal purposes, shouldn't be usually set by users
boolean finish,

@JsonCreator
public TaskConfigResponse(
@JsonProperty("ioConfig") @Nullable SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceOffsetType> ioConfig,
@JsonProperty("supervisorSpecVersion") String supervisorSpecVersion
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might as well just call it supervisorVersion. The version always denotes the spec anyway.
Please make the same change in all the relevant places.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants