From b0990a6378c149861afff942e65e06befc9582a8 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 25 Apr 2018 16:24:22 -0700 Subject: [PATCH 1/3] Consider waiting and pending compaction tasks as well as running tasks in DruidCoordinatorSegmentCompactor --- .../indexing/IndexingServiceClient.java | 2 +- .../DruidCoordinatorSegmentCompactor.java | 47 ++++++++++++------- .../DruidCoordinatorSegmentCompactorTest.java | 14 +++++- 3 files changed, 45 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java index cfcf9fc431c6..ffafd7cab235 100644 --- a/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java +++ b/server/src/main/java/io/druid/client/indexing/IndexingServiceClient.java @@ -24,7 +24,6 @@ import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.inject.Inject; -import io.druid.java.util.http.client.response.FullResponseHolder; import io.druid.discovery.DruidLeaderClient; import io.druid.indexer.TaskStatusPlus; import io.druid.java.util.common.DateTimes; @@ -32,6 +31,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.jackson.JacksonUtils; +import io.druid.java.util.http.client.response.FullResponseHolder; import io.druid.timeline.DataSegment; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index c3d859c00d33..cbc5aa6efeec 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -21,6 +21,7 @@ import com.google.inject.Inject; import io.druid.client.indexing.IndexingServiceClient; +import io.druid.indexer.TaskStatusPlus; import io.druid.java.util.common.ISE; import io.druid.java.util.common.logger.Logger; import io.druid.server.coordinator.CoordinatorCompactionConfig; @@ -32,6 +33,8 @@ import it.unimi.dsi.fastutil.objects.Object2LongMap; import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.function.Function; @@ -73,32 +76,24 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) Map compactionConfigs = compactionConfigList .stream() .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); - final int numRunningCompactTasks = indexingServiceClient - .getRunningTasks() - .stream() - .filter(status -> { - final String taskType = status.getType(); - // taskType can be null if middleManagers are running with an older version. Here, we consevatively regard - // the tasks of the unknown taskType as the compactionTask. This is because it's important to not run - // compactionTasks more than the configured limit at any time which might impact to the ingestion - // performance. - return taskType == null || taskType.equals(COMPACT_TASK_TYPE); - }) - .collect(Collectors.toList()) - .size(); + final int numNonCompleteCompactionTasks = findNumNonCompleteCompactTasks( + indexingServiceClient.getRunningTasks(), + indexingServiceClient.getPendingTasks(), + indexingServiceClient.getWaitingTasks() + ); final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources); final int compactionTaskCapacity = (int) Math.min( indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(), dynamicConfig.getMaxCompactionTaskSlots() ); - final int numAvailableCompactionTaskSlots = numRunningCompactTasks > 0 ? - compactionTaskCapacity - numRunningCompactTasks : + final int numAvailableCompactionTaskSlots = numNonCompleteCompactionTasks > 0 ? + compactionTaskCapacity - numNonCompleteCompactionTasks : // compactionTaskCapacity might be 0 if totalWorkerCapacity is low. // This guarantees that at least one slot is available if // compaction is enabled and numRunningCompactTasks is 0. Math.max(1, compactionTaskCapacity); - LOG.info("Running tasks [%d/%d]", numRunningCompactTasks, compactionTaskCapacity); + LOG.info("Running tasks [%d/%d]", numNonCompleteCompactionTasks, compactionTaskCapacity); if (numAvailableCompactionTaskSlots > 0) { stats.accumulate(doRun(compactionConfigs, numAvailableCompactionTaskSlots, iterator)); } else { @@ -116,6 +111,26 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) .build(); } + @SafeVarargs + private static int findNumNonCompleteCompactTasks(List ... taskStatusStreams) + { + final List allTaskStatusPlus = new ArrayList<>(); + Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll); + + return allTaskStatusPlus + .stream() + .filter(status -> { + final String taskType = status.getType(); + // taskType can be null if middleManagers are running with an older version. Here, we consevatively regard + // the tasks of the unknown taskType as the compactionTask. This is because it's important to not run + // compactionTasks more than the configured limit at any time which might impact to the ingestion + // performance. + return taskType == null || taskType.equals(COMPACT_TASK_TYPE); + }) + .collect(Collectors.toList()) + .size(); + } + private CoordinatorStats doRun( Map compactionConfigs, int numAvailableCompactionTaskSlots, diff --git a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java index e352f126bc86..086e3a53ad64 100644 --- a/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactorTest.java @@ -105,7 +105,19 @@ public String compactSegments( @Override public List getRunningTasks() { - return ImmutableList.of(); + return Collections.emptyList(); + } + + @Override + public List getPendingTasks() + { + return Collections.emptyList(); + } + + @Override + public List getWaitingTasks() + { + return Collections.emptyList(); } @Override From 3d0fee710ba17bdceb009d450273d47b2f10a65c Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Wed, 25 Apr 2018 16:47:32 -0700 Subject: [PATCH 2/3] fix build --- .../coordinator/helper/DruidCoordinatorSegmentCompactor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index cbc5aa6efeec..b8cac4a7cd01 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -112,7 +112,7 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) } @SafeVarargs - private static int findNumNonCompleteCompactTasks(List ... taskStatusStreams) + private static int findNumNonCompleteCompactTasks(List...taskStatusStreams) { final List allTaskStatusPlus = new ArrayList<>(); Arrays.stream(taskStatusStreams).forEach(allTaskStatusPlus::addAll); From 8dff36da654ebc1cf48b258d536f1589a1402275 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Thu, 3 May 2018 10:52:21 -0700 Subject: [PATCH 3/3] fix logging --- .../helper/DruidCoordinatorSegmentCompactor.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index b8cac4a7cd01..450511b1338c 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -93,7 +93,11 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) // This guarantees that at least one slot is available if // compaction is enabled and numRunningCompactTasks is 0. Math.max(1, compactionTaskCapacity); - LOG.info("Running tasks [%d/%d]", numNonCompleteCompactionTasks, compactionTaskCapacity); + LOG.info( + "Found [%d] available task slots for compaction out of [%d] max compaction task capacity", + numAvailableCompactionTaskSlots, + compactionTaskCapacity + ); if (numAvailableCompactionTaskSlots > 0) { stats.accumulate(doRun(compactionConfigs, numAvailableCompactionTaskSlots, iterator)); } else {