Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ public void testNoInitialStateWithAutoScaleOut() throws Exception
Thread.sleep(1 * 1000);
int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScale);
autoscaler.stop();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was causing issues randomly because the autoscaler was kept running and was emitting events which was same listener for the whole suite of tests in this class.

}

@Test
Expand Down Expand Up @@ -435,6 +436,7 @@ public void testNoInitialStateWithAutoScaleIn() throws Exception
Thread.sleep(1 * 1000);
int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountAfterScale);
autoscaler.stop();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,13 +422,19 @@ public boolean equals(Object obj)
// change taskCount without resubmitting.
private class DynamicAllocationTasksNotice implements Notice
{
Callable<Integer> scaleAction;
Callable<Integer> computeDesiredTaskCount;
ServiceEmitter emitter;
Runnable onSuccessfulScale;
private static final String TYPE = "dynamic_allocation_tasks_notice";

DynamicAllocationTasksNotice(Callable<Integer> scaleAction, ServiceEmitter emitter)
DynamicAllocationTasksNotice(
Callable<Integer> computeDesiredTaskCount,
Runnable onSuccessfulScale,
ServiceEmitter emitter
)
{
this.scaleAction = scaleAction;
this.computeDesiredTaskCount = computeDesiredTaskCount;
this.onSuccessfulScale = onSuccessfulScale;
this.emitter = emitter;
}

Expand Down Expand Up @@ -470,7 +476,7 @@ public void handle()
return;
}
}
final Integer desiredTaskCount = scaleAction.call();
final Integer desiredTaskCount = computeDesiredTaskCount.call();
ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
Expand Down Expand Up @@ -500,6 +506,7 @@ public void handle()

boolean allocationSuccess = changeTaskCount(desiredTaskCount);
Comment thread
suneet-s marked this conversation as resolved.
if (allocationSuccess) {
onSuccessfulScale.run();
dynamicTriggerLastRunTime = nowTime;
}
}
Expand Down Expand Up @@ -1260,9 +1267,13 @@ public void tryInit()
}
}

public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction, ServiceEmitter emitter)
public Runnable buildDynamicAllocationTask(
Callable<Integer> scaleAction,
Comment thread
ac9817 marked this conversation as resolved.
Runnable onSuccessfulScale,
ServiceEmitter emitter
)
{
return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, emitter));
return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, onSuccessfulScale, emitter));
}

private Runnable buildRunTask()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ public void start()
int desiredTaskCount = -1;
try {
desiredTaskCount = computeDesiredTaskCount(new ArrayList<>(lagMetricsQueue));

if (desiredTaskCount != -1) {
lagMetricsQueue.clear();
}
}
catch (Exception ex) {
log.warn(ex, "Exception while computing desired task count for [%s]", dataSource);
Expand All @@ -100,14 +96,27 @@ public void start()
return desiredTaskCount;
};

Runnable onSuccessfulScale = () -> {
LOCK.lock();
try {
lagMetricsQueue.clear();
}
catch (Exception ex) {
log.warn(ex, "Exception while clearing lags for [%s]", dataSource);
}
finally {
LOCK.unlock();
}
};

lagComputationExec.scheduleAtFixedRate(
computeAndCollectLag(),
lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for tasks to start up
lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
TimeUnit.MILLISECONDS
);
allocationExec.scheduleAtFixedRate(
supervisor.buildDynamicAllocationTask(scaleAction, emitter),
supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter),
lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig
.getLagCollectionRangeMillis(),
lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
Expand Down