diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java index cfcf9fc431c6..ffafd7cab235 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java @@ -24,7 +24,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.inject.Inject; -import io.druid.java.util.http.client.response.FullResponseHolder; import io.druid.discovery.DruidLeaderClient; import io.druid.indexer.TaskStatusPlus; import io.druid.java.util.common.DateTimes; @@ -32,6 +31,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.jackson.JacksonUtils; +import io.druid.java.util.http.client.response.FullResponseHolder; import io.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index c3d859c00d33..450511b1338c 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -21,6 +21,7 @@ import com.google.inject.Inject; import io.druid.client.indexing.IndexingServiceClient; +import io.druid.indexer.TaskStatusPlus; import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; import io.druid.server.coordinator.CoordinatorCompactionConfig; @@ -32,6 +33,8 @@ import it.unimi.dsi.fastutil.objects.Object2LongMap; import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -73,32 +76,28 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) Map compactionConfigs = compactionConfigList .stream() .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); - final int numRunningCompactTasks = indexingServiceClient - .getRunningTasks() - .stream() - .filter(status -> { - final String taskType = status.getType(); - // taskType can be null if middleManagers are running with an older version. Here, we consevatively regard - // the tasks of the unknown taskType as the compactionTask. This is because it's important to not run - // compactionTasks more than the configured limit at any time which might impact to the ingestion - // performance. - return taskType == null || taskType.equals(COMPACT_TASK_TYPE); - }) - .collect(Collectors.toList()) - .size(); + final int numNonCompleteCompactionTasks = findNumNonCompleteCompactTasks( + indexingServiceClient.getRunningTasks(), + indexingServiceClient.getPendingTasks(), + indexingServiceClient.getWaitingTasks() + ); final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources); final int compactionTaskCapacity = (int) Math.min( indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(), dynamicConfig.getMaxCompactionTaskSlots() ); - final int numAvailableCompactionTaskSlots = numRunningCompactTasks > 0 ? - compactionTaskCapacity - numRunningCompactTasks : + final int numAvailableCompactionTaskSlots = numNonCompleteCompactionTasks > 0 ? + compactionTaskCapacity - numNonCompleteCompactionTasks : // compactionTaskCapacity might be 0 if totalWorkerCapacity is low. // This guarantees that at least one slot is available if // compaction is enabled and numRunningCompactTasks is 0. Math.max(1, compactionTaskCapacity); - LOG.info("Running tasks [%d/%d]", numRunningCompactTasks, compactionTaskCapacity); + LOG.info( + "Found [%d] available task slots for compaction out of [%d] max compaction task capacity", + numAvailableCompactionTaskSlots, + compactionTaskCapacity + ); if (numAvailableCompactionTaskSlots > 0) { stats.accumulate(doRun(compactionConfigs, numAvailableCompactionTaskSlots, iterator)); } else { @@ -116,6 +115,26 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) .build(); } + @SafeVarargs + private static int findNumNonCompleteCompactTasks(List...taskStatusStreams) + { + final List allTaskStatusPlus = new ArrayList<>(); + Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll); + + return allTaskStatusPlus + .stream() + .filter(status -> { + final String taskType = status.getType(); + // taskType can be null if middleManagers are running with an older version. Here, we consevatively regard + // the tasks of the unknown taskType as the compactionTask. This is because it's important to not run + // compactionTasks more than the configured limit at any time which might impact to the ingestion + // performance. + return taskType == null || taskType.equals(COMPACT_TASK_TYPE); + }) + .collect(Collectors.toList()) + .size(); + } + private CoordinatorStats doRun( Map compactionConfigs, int numAvailableCompactionTaskSlots, diff --git a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java index e352f126bc86..086e3a53ad64 100644 --- a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java @@ -105,7 +105,19 @@ public String compactSegments( @Override public List getRunningTasks() { - return ImmutableList.of(); + return Collections.emptyList(); + } + + @Override + public List getPendingTasks() + { + return Collections.emptyList(); + } + + @Override + public List getWaitingTasks() + { + return Collections.emptyList(); } @Override