Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
Expand Down Expand Up @@ -429,9 +428,10 @@ protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetad
}

@Override
public LagMetric getLagMetricForAutoScaler()
public long computeLagForAutoScaler()
{
return LagMetric.MAX;
LagStats lagStats = computeLagStats();
return lagStats == null ? 0L : lagStats.getMaxLag();
}

private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithClosedOrExpiredPartitions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.StringUtils;
Expand Down Expand Up @@ -155,13 +154,8 @@ private Runnable computeAndCollectLag()
LOCK.lock();
try {
if (!spec.isSuspended()) {
LagStats lagStats = supervisor.computeLagStats();
if (lagStats == null) {
lagMetricsQueue.offer(0L);
} else {
long lag = lagStats.get(supervisor.getLagMetricForAutoScaler());
lagMetricsQueue.offer(lag > 0 ? lag : 0L);
}
long lag = supervisor.computeLagForAutoScaler();
lagMetricsQueue.offer(lag > 0 ? lag : 0L);

Check notice

Code scanning / CodeQL

Ignored error status of call

Method run ignores exceptional return value of CircularFifoQueue<Long>.offer.
log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue);
} else {
log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.segment.incremental.ParseExceptionReport;

Expand Down Expand Up @@ -95,11 +94,12 @@ default Boolean isHealthy()
LagStats computeLagStats();

/**
* Used by AutoScaler to either scale by max/total/avg.
* Used by AutoScaler to make scaling decisions.
*/
default LagMetric getLagMetricForAutoScaler()
default long computeLagForAutoScaler()
{
return LagMetric.TOTAL;
LagStats lagStats = computeLagStats();
return lagStats == null ? 0L : lagStats.getTotalLag();
}

int getActiveTaskGroupsCount();
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,4 @@ public long getAvgLag()
{
return avgLag;
}

public long get(LagMetric metric)
{
switch (metric) {
case AVERAGE:
return avgLag;
case TOTAL:
return totalLag;
case MAX:
return maxLag;
}
throw new IllegalStateException("Unknown Metric");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,22 @@

package org.apache.druid.indexing.overlord.supervisor;

import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class LagStatsTest
public class SupervisorTest
{

@Test
public void lagStatsByMetric()
public void testAutoScalerLagComputation()
{
int max = 1;
int avg = 2;
int total = 3;
LagStats lag = new LagStats(max, total, avg);
Supervisor supervisor = Mockito.spy(Supervisor.class);

Mockito.when(supervisor.computeLagStats()).thenReturn(new LagStats(1, 2, 3));
Assert.assertEquals(2, supervisor.computeLagForAutoScaler());

Assert.assertEquals(max, lag.get(LagMetric.MAX));
Assert.assertEquals(total, lag.get(LagMetric.TOTAL));
Assert.assertEquals(avg, lag.get(LagMetric.AVERAGE));
Mockito.when(supervisor.computeLagStats()).thenReturn(null);
Assert.assertEquals(0, supervisor.computeLagForAutoScaler());
}
}