Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,10 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
final boolean shouldUpdateSpec = shouldUpdateSupervisor(spec);
possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
SupervisorSpec existingSpec = possiblyStopAndRemoveSupervisorInternal(spec.getId(), false);
if (existingSpec != null) {
spec.merge(existingSpec);
}
createAndStartSupervisorInternal(spec, shouldUpdateSpec);
return shouldUpdateSpec;
}
Expand All @@ -183,6 +186,7 @@ public boolean createOrUpdateAndStartSupervisor(SupervisorSpec spec)
/**
* Checks whether the submitted SupervisorSpec differs from the current spec in SupervisorManager's supervisor list.
* This is used in SupervisorResource specPost to determine whether the Supervisor needs to be restarted
*
* @param spec The spec submitted
* @return boolean - true only if the spec has been modified, false otherwise
*/
Expand Down Expand Up @@ -221,7 +225,7 @@ public boolean stopAndRemoveSupervisor(String id)

synchronized (lock) {
Preconditions.checkState(started, "SupervisorManager not started");
return possiblyStopAndRemoveSupervisorInternal(id, true);
return possiblyStopAndRemoveSupervisorInternal(id, true) != null;
}
}

Expand Down Expand Up @@ -299,7 +303,8 @@ public void stop()
log.info("SupervisorManager stopped.");
}

public List<VersionedSupervisorSpec> getSupervisorHistoryForId(String id, @Nullable Integer limit) throws IllegalArgumentException
public List<VersionedSupervisorSpec> getSupervisorHistoryForId(String id, @Nullable Integer limit)
throws IllegalArgumentException
{
return metadataSupervisorManager.getAllForId(id, limit);
}
Expand Down Expand Up @@ -429,13 +434,14 @@ public boolean registerUpgradedPendingSegmentOnSupervisor(
* Caller should have acquired [lock] before invoking this method to avoid contention with other threads that may be
* starting, stopping, suspending and resuming supervisors.
*
* @return true if a supervisor was stopped, false if there was no supervisor with this id
* @return reference to existing supervisor, if exists and was stopped, null if there was no supervisor with this id
*/
private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean writeTombstone)
@Nullable
private SupervisorSpec possiblyStopAndRemoveSupervisorInternal(String id, boolean writeTombstone)
{
Pair<Supervisor, SupervisorSpec> pair = supervisors.get(id);
if (pair == null) {
return false;
if (pair == null || pair.rhs == null || pair.lhs == null) {
return null;
}

if (writeTombstone) {
Expand All @@ -447,13 +453,13 @@ private boolean possiblyStopAndRemoveSupervisorInternal(String id, boolean write
pair.lhs.stop(true);
supervisors.remove(id);

SupervisorTaskAutoScaler autoscler = autoscalers.get(id);
if (autoscler != null) {
autoscler.stop();
SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
if (autoscaler != null) {
autoscaler.stop();
autoscalers.remove(id);
}

return true;
return pair.rhs;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@
import java.util.stream.Stream;

/**
* this class is the parent class of both the Kafka and Kinesis supervisor. All the main run loop
* logic are similar enough so they're grouped together into this class.
* This class is the parent class of both the Kafka and Kinesis supervisor. All the main run loop
* logic is similar enough, so they're grouped together into this class.
* <p>
* Supervisor responsible for managing the SeekableStreamIndexTasks (Kafka/Kinesis) for a single dataSource. At a high level, the class accepts a
* {@link SeekableStreamSupervisorSpec} which includes the stream name (topic / stream) and configuration as well as an ingestion spec which will
Expand Down Expand Up @@ -541,10 +541,20 @@ public String getType()

/**
* This method determines how to do scale actions based on collected lag points.
* If scale action is triggered :
* First of all, call gracefulShutdownInternal() which will change the state of current datasource ingest tasks from reading to publishing.
* Secondly, clear all the stateful data structures: activelyReadingTaskGroups, partitionGroups, partitionOffsets, pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in the next 'RunNotice'.
* Finally, change the taskCount in SeekableStreamSupervisorIOConfig and sync it to MetadataStorage.
* If scale action is triggered:
* <ul>
* <li>First, call <code>gracefulShutdownInternal()</code> which will change the state of current datasource ingest tasks from reading to publishing.
* <li>Secondly, clear all the stateful data structures:
* <ul>
* <li><code>activelyReadingTaskGroups</code>,
* <li><code>partitionGroups</code>,
* <li><code>partitionOffsets</code>,
* <li><code>pendingCompletionTaskGroups</code>,
* <li><code>partitionIds</code>.
* </ul>
* These structures will be rebuiled in the next 'RunNotice'.
* <li>Finally, change the <code>taskCount</code> in <code>SeekableStreamSupervisorIOConfig</code> and sync it to <code>MetadataStorage</code>.
* </ul>
* After the taskCount is changed in SeekableStreamSupervisorIOConfig, next RunNotice will create scaled number of ingest tasks without resubmitting the supervisor.
*
* @param desiredActiveTaskCount desired taskCount computed from AutoScaler
Expand Down Expand Up @@ -916,7 +926,7 @@ public String getType()
private volatile boolean lifecycleStarted = false;
private final ServiceEmitter emitter;

// snapshots latest sequences from stream to be verified in next run cycle of inactive stream check
// snapshots latest sequences from the stream to be verified in the next run cycle of inactive stream check
private final Map<PartitionIdType, SequenceOffsetType> previousSequencesFromStream = new HashMap<>();
private long lastActiveTimeMillis;
private final IdleConfig idleConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,18 @@
import org.apache.druid.segment.indexing.DataSchema;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
import java.util.List;
import java.util.Map;

public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
{
protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE = "Update of the input source stream from [%s] to [%s] is not supported for a running supervisor."
+ "%nTo perform the update safely, follow these steps:"
+ "%n(1) Suspend this supervisor, reset its offsets and then terminate it. "
+ "%n(2) Create a new supervisor with the new input source stream."
+ "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too.";
protected static final String ILLEGAL_INPUT_SOURCE_UPDATE_ERROR_MESSAGE =
"Update of the input source stream from [%s] to [%s] is not supported for a running supervisor."
+ "%nTo perform the update safely, follow these steps:"
+ "%n(1) Suspend this supervisor, reset its offsets and then terminate it. "
+ "%n(2) Create a new supervisor with the new input source stream."
+ "%nNote that doing the reset can cause data duplication or loss if any topic used in the old supervisor is included in the new one too.";

private static SeekableStreamSupervisorIngestionSpec checkIngestionSchema(
SeekableStreamSupervisorIngestionSpec ingestionSchema
Expand Down Expand Up @@ -183,6 +185,7 @@ public DruidMonitorSchedulerConfig getMonitorSchedulerConfig()

/**
* An autoScaler instance will be returned depending on the autoScalerConfig. In case autoScalerConfig is null or autoScaler is disabled then NoopTaskAutoScaler will be returned.
*
* @param supervisor
* @return autoScaler
*/
Expand Down Expand Up @@ -232,6 +235,7 @@ public boolean isSuspended()
* <li>You cannot migrate between types of supervisors.</li>
* <li>You cannot change the input source stream of a running supervisor.</li>
* </ul>
*
* @param proposedSpec the proposed supervisor spec
* @throws DruidException if the proposed spec update is not allowed
*/
Expand All @@ -240,7 +244,9 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept
{
if (!(proposedSpec instanceof SeekableStreamSupervisorSpec)) {
throw InvalidInput.exception(
"Cannot update supervisor spec from type[%s] to type[%s]", getClass().getSimpleName(), proposedSpec.getClass().getSimpleName()
"Cannot update supervisor spec from type[%s] to type[%s]",
getClass().getSimpleName(),
proposedSpec.getClass().getSimpleName()
);
}
SeekableStreamSupervisorSpec other = (SeekableStreamSupervisorSpec) proposedSpec;
Expand All @@ -255,6 +261,33 @@ public void validateSpecUpdateTo(SupervisorSpec proposedSpec) throws DruidExcept
}
}

@Override
public void merge(@NotNull SupervisorSpec existingSpec)
{
AutoScalerConfig thisAutoScalerConfig = this.getIoConfig().getAutoScalerConfig();
// Either if autoscaler is absent or taskCountStart is specified - just return.
if (thisAutoScalerConfig == null || thisAutoScalerConfig.getTaskCountStart() != null) {
return;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also return early if this.ioConfig.getTaskCount() is specified?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, taskCountStart has bigger priority :)

the priority: provided taskCountStart > provided taskCount > existing taskCount > provided taskCountMin.


// Use a switch expression with pattern matching when we move to Java 21 as a minimum requirement.
if (existingSpec instanceof SeekableStreamSupervisorSpec) {
SeekableStreamSupervisorSpec spec = (SeekableStreamSupervisorSpec) existingSpec;
AutoScalerConfig autoScalerConfig = spec.getIoConfig().getAutoScalerConfig();
if (autoScalerConfig == null) {
return;
}
// provided `taskCountStart` > provided `taskCount` > existing `taskCount` > provided `taskCountMin`.
int taskCount = thisAutoScalerConfig.getTaskCountMin();
if (this.getIoConfig().getTaskCount() != null) {
taskCount = this.getIoConfig().getTaskCount();
} else if (spec.getIoConfig().getTaskCount() != null) {
taskCount = spec.getIoConfig().getTaskCount();
}
this.getIoConfig().setTaskCount(taskCount);
}
}

protected abstract SeekableStreamSupervisorSpec toggleSuspend(boolean suspend);

}
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,10 @@ public void start()
);
log.info(
"LagBasedAutoScaler will collect lag every [%d] millis and will keep up to [%d] data points for the last [%d] millis for dataSource [%s]",
lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), lagMetricsQueue.maxSize(),
lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
lagMetricsQueue.maxSize(),
lagBasedAutoScalerConfig.getLagCollectionRangeMillis(),
dataSource
);
}

Expand Down Expand Up @@ -192,19 +194,25 @@ private Runnable computeAndCollectLag()

/**
* This method determines whether to do scale actions based on collected lag points.
* Current algorithm of scale is simple:
* First of all, compute the proportion of lag points higher/lower than scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold.
* Secondly, compare scaleOutThreshold/scaleInThreshold with triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold. P.S. Scale out action has higher priority than scale in action.
* Finaly, if scaleOutThreshold/scaleInThreshold is higher than triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold, scale out/in action would be triggered.
* The current algorithm of scale is straightforward:
* <ul>
* <li>First, compute the proportion of lag points higher/lower than {@code scaleOutThreshold/scaleInThreshold},
* getting {@code scaleInThreshold/scaleOutThreshold},.
* <li>Secondly, compare {@code scaleInThreshold/scaleOutThreshold} with
* {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}.
* <ul><li>P.S. Scale out action has a higher priority than scale in action.</ul>
* <li>Finally, if {@code scaleOutThreshold/scaleInThreshold}, is higher than
* {@code triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold}, scale out/in action would be triggered.
* </ul>
*
* @param lags the lag metrics of Stream(Kafka/Kinesis)
* @return Integer. target number of tasksCount, -1 means skip scale action.
* @param lags the lag metrics of Stream (Kafka/Kinesis)
* @return Integer, target number of tasksCount. -1 means skip scale action.
*/
private int computeDesiredTaskCount(List<Long> lags)
{
// if supervisor is not suspended, ensure required tasks are running
// if the supervisor is not suspended, ensure required tasks are running
// if suspended, ensure tasks have been requested to gracefully stop
log.debug("Computing desired task count for [%s], based on following lags : [%s]", dataSource, lags);
log.debug("Computing the desired task count for [%s], based on following lags : [%s]", dataSource, lags);
int beyond = 0;
int within = 0;
int metricsCount = lags.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ public LagBasedAutoScalerConfig(

this.scaleInStep = scaleInStep != null ? scaleInStep : 1;
this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2;
this.minTriggerScaleActionFrequencyMillis = minTriggerScaleActionFrequencyMillis
!= null ? minTriggerScaleActionFrequencyMillis : 600000;
this.minTriggerScaleActionFrequencyMillis =
minTriggerScaleActionFrequencyMillis != null ? minTriggerScaleActionFrequencyMillis : 600000;

Preconditions.checkArgument(
stopTaskCountRatio == null || (stopTaskCountRatio > 0.0 && stopTaskCountRatio <= 1.0),
Expand Down
Loading
Loading