Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.|
|`task/autoScaler/requiredCount`|Count of required tasks based on the calculations of `lagBased` auto scaler.|`dataSource`, `stream`, `scalingSkipReason`|Depends on auto scaler config.|

If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
implements Supervisor
{
public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
public static final String AUTOSCALER_SKIP_REASON_DIMENSION = "scalingSkipReason";
public static final String AUTOSCALER_REQUIRED_TASKS_METRIC = "task/autoScaler/requiredCount";

private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
Expand Down Expand Up @@ -403,11 +405,13 @@ public boolean equals(Object obj)
private class DynamicAllocationTasksNotice implements Notice
{
Callable<Integer> scaleAction;
ServiceEmitter emitter;
private static final String TYPE = "dynamic_allocation_tasks_notice";

DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
DynamicAllocationTasksNotice(Callable<Integer> scaleAction, ServiceEmitter emitter)
{
this.scaleAction = scaleAction;
this.emitter = emitter;
}

/**
Expand Down Expand Up @@ -448,17 +452,35 @@ public void handle()
return;
}
}
final Integer desiredTaskCount = scaleAction.call();
ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
log.info(
"DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
"DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it! desired task count is [%s], active task count is [%s]",
nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(),
dataSource
dataSource,
desiredTaskCount,
getActiveTaskGroupsCount()
);

if (desiredTaskCount > 0) {
emitter.emit(event.setDimension(
AUTOSCALER_SKIP_REASON_DIMENSION,
"minTriggerScaleActionFrequencyMillis not elapsed yet"
)
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
}
return;
}
final Integer desriedTaskCount = scaleAction.call();
boolean allocationSuccess = changeTaskCount(desriedTaskCount);

if (desiredTaskCount > 0) {
emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
}

boolean allocationSuccess = changeTaskCount(desiredTaskCount);
if (allocationSuccess) {
dynamicTriggerLastRunTime = nowTime;
}
Expand Down Expand Up @@ -1208,9 +1230,9 @@ public void tryInit()
}
}

public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction)
public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction, ServiceEmitter emitter)
{
return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction));
return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, emitter));
}

private Runnable buildRunTask()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
{
AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoScalerConfig();
if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) {
return autoScalerConfig.createAutoScaler(supervisor, this);
return autoScalerConfig.createAutoScaler(supervisor, this, emitter);
}
return new NoopTaskAutoScaler();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;

@UnstableApi
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", defaultImpl = LagBasedAutoScalerConfig.class)
Expand All @@ -38,6 +39,6 @@ public interface AutoScalerConfig
long getMinTriggerScaleActionFrequencyMillis();
int getTaskCountMax();
int getTaskCountMin();
SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec);
SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter);
}

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;

import java.util.ArrayList;
import java.util.List;
Expand All @@ -45,11 +48,17 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
private final SupervisorSpec spec;
private final SeekableStreamSupervisor supervisor;
private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
private final ServiceEmitter emitter;
private final ServiceMetricEvent.Builder metricBuilder;

private static final ReentrantLock LOCK = new ReentrantLock(true);

public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String dataSource,
LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
public LagBasedAutoScaler(
SeekableStreamSupervisor supervisor,
String dataSource,
LagBasedAutoScalerConfig autoScalerConfig,
SupervisorSpec spec,
ServiceEmitter emitter
)
{
this.lagBasedAutoScalerConfig = autoScalerConfig;
Expand All @@ -62,6 +71,10 @@ public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String dataSource
this.lagComputationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Computation-%d");
this.spec = spec;
this.supervisor = supervisor;
this.emitter = emitter;
metricBuilder = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, this.supervisor.getIoConfig().getStream());
}

@Override
Expand Down Expand Up @@ -93,7 +106,7 @@ public void start()
TimeUnit.MILLISECONDS
);
allocationExec.scheduleAtFixedRate(
supervisor.buildDynamicAllocationTask(scaleAction),
supervisor.buildDynamicAllocationTask(scaleAction, emitter),
lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig
.getLagCollectionRangeMillis(),
lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
Expand Down Expand Up @@ -214,6 +227,12 @@ private int computeDesiredTaskCount(List<Long> lags)
log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].",
dataSource
);
emitter.emit(metricBuilder
.setDimension(
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
"Already at max task count"
)
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount));
return -1;
} else {
desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax);
Expand All @@ -228,6 +247,12 @@ private int computeDesiredTaskCount(List<Long> lags)
log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].",
dataSource
);
emitter.emit(metricBuilder
.setDimension(
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
"Already at min task count"
)
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount));
return -1;
} else {
desiredActiveTaskCount = Math.max(taskCount, lagBasedAutoScalerConfig.getTaskCountMin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -154,9 +155,9 @@ public int getTaskCountMin()
}

@Override
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec)
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter)
{
return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, spec.getId(), this, spec);
return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, spec.getId(), this, spec, emitter);
}

@JsonProperty
Expand Down
Loading