Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -216,17 +216,16 @@ private int computeDesiredTaskCount(List<Long> lags)

int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
int desiredActiveTaskCount;
int partitionCount = supervisor.getPartitionCount();
if (partitionCount <= 0) {
log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
return -1;
}

if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
// Do Scale out
int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep();

int partitionCount = supervisor.getPartitionCount();
if (partitionCount <= 0) {
log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
return -1;
}

int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
if (currentActiveTaskCount == actualTaskCountMax) {
log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].",
Expand All @@ -248,7 +247,8 @@ private int computeDesiredTaskCount(List<Long> lags)
if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
// Do Scale in
int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep();
if (currentActiveTaskCount == lagBasedAutoScalerConfig.getTaskCountMin()) {
int actualTaskCountMin = Math.min(lagBasedAutoScalerConfig.getTaskCountMin(), partitionCount);
if (currentActiveTaskCount == actualTaskCountMin) {
log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].",
dataSource
);
Expand All @@ -260,7 +260,7 @@ private int computeDesiredTaskCount(List<Long> lags)
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount));
return -1;
} else {
desiredActiveTaskCount = Math.max(taskCount, lagBasedAutoScalerConfig.getTaskCountMin());
desiredActiveTaskCount = Math.max(taskCount, actualTaskCountMin);
}
return desiredActiveTaskCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -992,6 +992,58 @@ public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedExce
autoScaler.stop();
}

@Test
public void testSeekableStreamSupervisorSpecWithScaleInThresholdGreaterThanPartitions() throws InterruptedException
{
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();

Check notice

Code scanning / CodeQL

Deprecated method or constructor invocation

Invoking [SeekableStreamSupervisorSpec.getDataSchema](1) should be avoided because it has been deprecated.
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, false)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.replay(spec);

EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);

EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster);

TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10);
Map<String, Object> modifiedScaleInProps = getScaleInProperties();

modifiedScaleInProps.put("taskCountMax", 20);
modifiedScaleInProps.put("taskCountMin", 15);

LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
DATASOURCE,
mapper.convertValue(
modifiedScaleInProps,
LagBasedAutoScalerConfig.class
),
spec,
emitter
);

// enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin.
Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
supervisor.getIoConfig().setTaskCount(2);
supervisor.start();
autoScaler.start();
supervisor.runInternal();

Assert.assertEquals(2, (int) supervisor.getIoConfig().getTaskCount());
Thread.sleep(2000);
Assert.assertEquals(10, (int) supervisor.getIoConfig().getTaskCount());

autoScaler.reset();
autoScaler.stop();
}

@Test
public void testSeekableStreamSupervisorSpecWithScaleInAlreadyAtMin() throws InterruptedException
{
Expand Down