Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion api/src/main/java/io/druid/indexer/TaskStatusPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class TaskStatusPlus
@JsonCreator
public TaskStatusPlus(
@JsonProperty("id") String id,
@JsonProperty("type") String type,
@JsonProperty("type") @Nullable String type, // nullable for backward compatibility
@JsonProperty("createdTime") DateTime createdTime,
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
@JsonProperty("state") @Nullable TaskState state,
Expand All @@ -65,6 +65,7 @@ public String getId()
return id;
}

@Nullable
@JsonProperty
public String getType()
{
Expand All @@ -83,12 +84,14 @@ public DateTime getQueueInsertionTime()
return queueInsertionTime;
}

@Nullable
@JsonProperty
public TaskState getState()
{
return state;
}

@Nullable
@JsonProperty
public Long getDuration()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.druid.java.util.common.DateTimes;
import org.joda.time.DateTime;

import javax.annotation.Nullable;

/**
* A holder for a task and different components associated with the task
*/
Expand Down Expand Up @@ -85,6 +87,11 @@ public DateTime getQueueInsertionTime()
}

public abstract TaskLocation getLocation();

/**
* Returns the type of task. The return value can be null for backward compatibility.
*/
@Nullable
public abstract String getTaskType();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,10 @@ public Collection<? extends TaskRunnerWorkItem> apply(TaskRunner taskRunner)
} else {
workItems = taskRunner.getRunningTasks()
.stream()
.filter(workitem -> workitem.getTaskType().equals(taskType))
.filter(workitem -> {
final String itemType = workitem.getTaskType();
return itemType != null && itemType.equals(taskType);
})
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,14 @@ public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
final int numRunningCompactTasks = indexingServiceClient
.getRunningTasks()
.stream()
.filter(status -> status.getType().equals(COMPACT_TASK_TYPE))
.filter(status -> {
final String taskType = status.getType();
// taskType can be null if middleManagers are running with an older version. Here, we consevatively regard
// the tasks of the unknown taskType as the compactionTask. This is because it's important to not run
// compactionTasks more than the configured limit at any time which might impact to the ingestion
// performance.
return taskType == null || taskType.equals(COMPACT_TASK_TYPE);
})
.collect(Collectors.toList())
.size();
final CompactionSegmentIterator iterator = policy.reset(compactionConfigs, dataSources);
Expand Down