diff --git a/api/src/main/java/io/druid/indexer/TaskStatusPlus.java b/api/src/main/java/io/druid/indexer/TaskStatusPlus.java index cd295ce2e5a0..08c9741ade32 100644 --- a/api/src/main/java/io/druid/indexer/TaskStatusPlus.java +++ b/api/src/main/java/io/druid/indexer/TaskStatusPlus.java @@ -39,7 +39,7 @@ public class TaskStatusPlus @JsonCreator public TaskStatusPlus( @JsonProperty("id") String id, - @JsonProperty("type") String type, + @JsonProperty("type") @Nullable String type, // nullable for backward compatibility @JsonProperty("createdTime") DateTime createdTime, @JsonProperty("queueInsertionTime") DateTime queueInsertionTime, @JsonProperty("state") @Nullable TaskState state, @@ -65,6 +65,7 @@ public String getId() return id; } + @Nullable @JsonProperty public String getType() { @@ -83,12 +84,14 @@ public DateTime getQueueInsertionTime() return queueInsertionTime; } + @Nullable @JsonProperty public TaskState getState() { return state; } + @Nullable @JsonProperty public Long getDuration() { diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java index d2d8f6daf1ca..f2bbc4d21bb5 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskRunnerWorkItem.java @@ -27,6 +27,8 @@ import io.druid.java.util.common.DateTimes; import org.joda.time.DateTime; +import javax.annotation.Nullable; + /** * A holder for a task and different components associated with the task */ @@ -85,6 +87,11 @@ public DateTime getQueueInsertionTime() } public abstract TaskLocation getLocation(); + + /** + * Returns the type of task. The return value can be null for backward compatibility. + */ + @Nullable public abstract String getTaskType(); @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index b22a5626e317..1ddbe4a39546 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -517,7 +517,10 @@ public Collection apply(TaskRunner taskRunner) } else { workItems = taskRunner.getRunningTasks() .stream() - .filter(workitem -> workitem.getTaskType().equals(taskType)) + .filter(workitem -> { + final String itemType = workitem.getTaskType(); + return itemType != null && itemType.equals(taskType); + }) .collect(Collectors.toList()); } diff --git a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java index a703ad4c5b62..0afc8b880038 100644 --- a/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java +++ b/server/src/main/java/io/druid/server/coordinator/helper/DruidCoordinatorSegmentCompactor.java @@ -72,7 +72,14 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) final int numRunningCompactTasks = indexingServiceClient .getRunningTasks() .stream() - .filter(status -> status.getType().equals(COMPACT_TASK_TYPE)) + .filter(status -> { + final String taskType = status.getType(); + // taskType can be null if middleManagers are running with an older version. Here, we consevatively regard + // the tasks of the unknown taskType as the compactionTask. This is because it's important to not run + // compactionTasks more than the configured limit at any time which might impact to the ingestion + // performance. + return taskType == null || taskType.equals(COMPACT_TASK_TYPE); + }) .collect(Collectors.toList()) .size(); final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources);