Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
Expand Down Expand Up @@ -123,7 +122,6 @@

import javax.annotation.Nullable;
import java.io.IOException;
import java.lang.reflect.Method;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
Expand Down Expand Up @@ -5218,12 +5216,10 @@ public void test_autoScaler_doesNotRepeatScaleDownActions_ifTasksAreStillPublish
hasPendingTasks
);

// Now call clearAllocationInfo() - this is where the bug was
// The bug was that this method cleared pendingCompletionTaskGroups
supervisor.testClearAllocationInfo();
// Clear the partition assignments (this is called when task count has changed)
supervisor.clearPartitionAssignmentsForScaling();

// THE KEY ASSERTION: Verify pendingCompletionTaskGroups is still NOT empty after clearAllocationInfo
// This is the fix - clearAllocationInfo should preserve pendingCompletionTaskGroups
// Verify that pendingCompletionTaskGroups has not been cleared
boolean stillHasPendingTasks = false;
for (int groupId = 0; groupId < 3; groupId++) {
if (supervisor.getPendingCompletionTaskGroupsCount(groupId) > 0) {
Expand All @@ -5243,6 +5239,9 @@ public void test_autoScaler_doesNotRepeatScaleDownActions_ifTasksAreStillPublish
0,
supervisor.getActivelyReadingTaskGroupsCount()
);

// Verify that partitionOffsets have not been cleared either
Assert.assertFalse(supervisor.getPartitionOffsets().isEmpty());
}

private void addSomeEvents(int numEventsPerPartition) throws Exception
Expand Down Expand Up @@ -5962,13 +5961,6 @@ public int getPendingCompletionTaskGroupsCount(int groupId)
CopyOnWriteArrayList<?> groups = getPendingCompletionTaskGroups(groupId);
return groups != null ? groups.size() : 0;
}

public void testClearAllocationInfo() throws Exception
{
Method method = SeekableStreamSupervisor.class.getDeclaredMethod("clearAllocationInfo");
method.setAccessible(true);
method.invoke(this);
}
}

private static class TestableKafkaSupervisorWithCustomIsTaskCurrent extends TestableKafkaSupervisor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ private boolean changeTaskCount(int desiredActiveTaskCount)
final Stopwatch scaleActionStopwatch = Stopwatch.createStarted();
gracefulShutdownInternal();
changeTaskCountInIOConfig(desiredActiveTaskCount);
clearAllocationInfo();
clearPartitionAssignmentsForScaling();
emitter.emit(ServiceMetricEvent.builder()
.setDimension(DruidMetrics.SUPERVISOR_ID, supervisorId)
.setDimension(DruidMetrics.DATASOURCE, dataSource)
Expand Down Expand Up @@ -621,18 +621,30 @@ private void changeTaskCountInIOConfig(int desiredActiveTaskCount)
}

/**
* Clears allocation information including active task groups, partition groups, partition offsets, and partition IDs.
* Clears previous partition assignments in preparation for an upcoming scaling event.
* <p>
* Note: Does not clear {@link #pendingCompletionTaskGroups} so that the supervisor remembers that these
* tasks are publishing and auto-scaler does not repeatedly attempt a scale down until these tasks
* complete. If this is cleared, the next {@link #discoverTasks()} might add these tasks to
* {@link #activelyReadingTaskGroups}.
* <p>
* Also does not clear {@link #partitionOffsets} so that the new tasks remember
* where the previous tasks had left off.
* <p>
* Since both of these are in-memory structures, a change in Overlord leadership
* might cause duplicate scaling actions and/or intermittent task failures if
* the new generation tasks start with stale offsets.
*/
private void clearAllocationInfo()
@VisibleForTesting
public void clearPartitionAssignmentsForScaling()
{
// All previous tasks should now be publishing and not actively reading anymore
activelyReadingTaskGroups.clear();

// partitionGroups will change as taskCount has changed due to scaling
partitionGroups.clear();
partitionOffsets.clear();

// partitionIds will be rediscovered from the stream and assigned to respective taskGroups
partitionIds.clear();
}

Expand Down Expand Up @@ -3337,6 +3349,13 @@ public Boolean apply(@Nullable DateTime startTime)
}
}

/**
* Checks the duration of {@link #activelyReadingTaskGroups}, requests them
* to checkpoint themselves if they have exceeded the specified run duration
* or if early stop has been requested. If checkpoint is successful, the
* {@link #partitionOffsets} are updated and checkpointed tasks are moved to
* {@link #pendingCompletionTaskGroups}.
*/
private void checkTaskDuration() throws ExecutionException, InterruptedException
{
final List<ListenableFuture<Map<PartitionIdType, SequenceOffsetType>>> futures = new ArrayList<>();
Expand Down Expand Up @@ -3463,7 +3482,7 @@ void maybeScaleDuringTaskRollover()
changeTaskCountInIOConfig(rolloverTaskCount);
// Here force reset the supervisor state to be re-calculated on the next iteration of runInternal() call.
// This seems the best way to inject task amount recalculation during the rollover.
clearAllocationInfo();
clearPartitionAssignmentsForScaling();

ServiceMetricEvent.Builder event = ServiceMetricEvent
.builder()
Expand Down Expand Up @@ -3976,6 +3995,7 @@ private void createNewTasks() throws JsonProcessingException
.collect(Collectors.toSet());
}

log.info("Initializing taskGroup[%d] with startingOffsets[%s].", groupId, simpleStartingOffsets);
activelyReadingTaskGroups.put(
groupId,
new TaskGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2310,21 +2310,26 @@ protected SegmentPublishResult updateDataSourceMetadataInTransaction(
startMetadataMatchesExisting = startMetadata.asStartMetadata().matches(oldCommitMetadataFromDb.asStartMetadata());
}

if (startMetadataGreaterThanExisting && !startMetadataMatchesExisting) {
// Offsets stored in startMetadata is greater than the last commited metadata.
if (startMetadataMatchesExisting) {
// Proceed with the commit
} else if (startMetadataGreaterThanExisting) {
// Offsets stored in startMetadata is greater than the last committed metadata.
// This can happen because the previous task is still publishing its segments and can resolve once
// the previous task finishes publishing.
return SegmentPublishResult.retryableFailure(
"The new start metadata state[%s] is ahead of the last committed"
+ " end state[%s]. Try resetting the supervisor.",
startMetadata, oldCommitMetadataFromDb
);
}

if (!startMetadataMatchesExisting) {
// Not in the desired start state.
} else {
// startMetadata is older than committed metadata
// The task trying to publish is probably a replica trying to commit offsets already published by another task.
// OR the metadata has been updated manually
return SegmentPublishResult.fail(
"Inconsistency between stored metadata state[%s] and target state[%s]. Try resetting the supervisor.",
"Stored metadata state[%s] has already been updated by other tasks and"
+ " has diverged from the expected start metadata state[%s]."
+ " This task will be replaced by the supervisor with a new task using updated start offsets."
+ " Try resetting the supervisor if the issue persists.",
oldCommitMetadataFromDb, startMetadata
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,8 +945,10 @@ public void testTransactionalAnnounceFailDbNotNullWantNull()
);
Assert.assertEquals(
SegmentPublishResult.fail(
"Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}]"
+ " and target state[ObjectMetadata{theObject=null}]. Try resetting the supervisor."
"Stored metadata state[ObjectMetadata{theObject={foo=baz}}] has already been updated by other tasks"
+ " and has diverged from the expected start metadata state[ObjectMetadata{theObject=null}]."
+ " This task will be replaced by the supervisor with a new task using updated start offsets."
+ " Try resetting the supervisor if the issue persists."
),
result2
);
Expand Down Expand Up @@ -1093,8 +1095,10 @@ public void testTransactionalAnnounceFailDbNotNullWantDifferent()
);
Assert.assertEquals(
SegmentPublishResult.fail(
"Inconsistency between stored metadata state[ObjectMetadata{theObject={foo=baz}}] and "
+ "target state[ObjectMetadata{theObject={foo=qux}}]. Try resetting the supervisor."
"Stored metadata state[ObjectMetadata{theObject={foo=baz}}] has already been updated by other tasks"
+ " and has diverged from the expected start metadata state[ObjectMetadata{theObject={foo=qux}}]."
+ " This task will be replaced by the supervisor with a new task using updated start offsets."
+ " Try resetting the supervisor if the issue persists."
),
result2
);
Expand Down
Loading