Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ private static String makeTaskId(String dataSource, int randomBits)
}

@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,30 @@ public class TaskLock
private final String dataSource;
private final Interval interval;
private final String version;
private final int priority;
private final Integer priority;
private final boolean revoked;

public static TaskLock withPriority(TaskLock lock, int priority)
{
return new TaskLock(
lock.type,
lock.getGroupId(),
lock.getDataSource(),
lock.getInterval(),
lock.getVersion(),
priority,
lock.isRevoked()
);
}

@JsonCreator
public TaskLock(
@JsonProperty("type") @Nullable TaskLockType type, // nullable for backward compatibility
@JsonProperty("groupId") String groupId,
@JsonProperty("dataSource") String dataSource,
@JsonProperty("interval") Interval interval,
@JsonProperty("version") String version,
@JsonProperty("priority") int priority,
@JsonProperty("priority") @Nullable Integer priority,
@JsonProperty("revoked") boolean revoked
)
{
Expand Down Expand Up @@ -116,11 +129,17 @@ public String getVersion()
}

@JsonProperty
public int getPriority()
@Nullable
public Integer getPriority()
{
return priority;
}

public int getNonNullPriority()
{
return Preconditions.checkNotNull(priority, "priority");
}

@JsonProperty
public boolean isRevoked()
{
Expand All @@ -139,7 +158,7 @@ public boolean equals(Object o)
this.dataSource.equals(that.dataSource) &&
this.interval.equals(that.interval) &&
this.version.equals(that.version) &&
this.priority == that.priority &&
Objects.equal(this.priority, that.priority) &&
this.revoked == that.revoked;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ public AppenderatorDriverRealtimeIndexTask(
}

@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,9 @@ public String getType()
}

@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,9 @@ public HadoopIndexTask(
}

@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ public IndexTask(
}

@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ public boolean apply(@Nullable DataSegment segment)
}

@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
}

@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
}

public static NoopTask create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,9 +161,9 @@ public RealtimeIndexTask(
}

@Override
public int getDefaultPriority()
public int getPriority()
{
return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,14 +98,6 @@ default int getPriority()
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY);
}

/**
* Returns the default task priority. It can vary depending on the task type.
*/
default int getDefaultPriority()
{
return Tasks.DEFAULT_TASK_PRIORITY;
}

/**
* Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may
* require.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,23 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
final TaskLock savedTaskLock = taskAndLock.rhs;
if (savedTaskLock.getInterval().toDurationMillis() <= 0) {
// "Impossible", but you never know what crazy stuff can be restored from storage.
log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
log.warn("WTF?! Got lock[%s] with empty interval for task: %s", savedTaskLock, task.getId());
continue;
}

final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLock);
// Create a new taskLock if it doesn't have a proper priority,
// so that every taskLock in memory has the priority.
final TaskLock savedTaskLockWithPriority = savedTaskLock.getPriority() == null
? TaskLock.withPriority(savedTaskLock, task.getPriority())
: savedTaskLock;

final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLockWithPriority);
if (taskLockPosse != null) {
taskLockPosse.addTask(task);

final TaskLock taskLock = taskLockPosse.getTaskLock();

if (savedTaskLock.getVersion().equals(taskLock.getVersion())) {
if (savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) {
taskLockCount++;
log.info(
"Reacquired lock[%s] for task: %s",
Expand All @@ -151,17 +157,17 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
taskLockCount++;
log.info(
"Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(),
taskLock.getVersion(),
task.getId()
);
}
} else {
throw new ISE(
"Could not reacquire lock on interval[%s] version[%s] for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
savedTaskLockWithPriority.getInterval(),
savedTaskLockWithPriority.getVersion(),
task.getId()
);
}
Expand Down Expand Up @@ -382,11 +388,14 @@ private TaskLockPosse createOrFindLockPosse(Task task, TaskLock taskLock)
taskLock.getDataSource(),
task.getDataSource()
);
final int taskPriority = task.getPriority();
final int lockPriority = taskLock.getNonNullPriority();

Preconditions.checkArgument(
task.getPriority() == taskLock.getPriority(),
lockPriority == taskPriority,
"lock priority[%s] is different from task priority[%s]",
taskLock.getPriority(),
task.getPriority()
lockPriority,
taskPriority
);

return createOrFindLockPosse(
Expand All @@ -396,7 +405,7 @@ private TaskLockPosse createOrFindLockPosse(Task task, TaskLock taskLock)
taskLock.getDataSource(),
taskLock.getInterval(),
taskLock.getVersion(),
taskLock.getPriority(),
taskPriority,
taskLock.isRevoked()
);
}
Expand Down Expand Up @@ -925,7 +934,7 @@ private static boolean isAllRevocable(List<TaskLockPosse> lockPosses, int tryLoc
private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority)
{
final TaskLock existingLock = lockPosse.getTaskLock();
return existingLock.isRevoked() || existingLock.getPriority() < tryLockPriority;
return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority;
}

private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval)
Expand Down Expand Up @@ -986,7 +995,7 @@ TaskLock getTaskLock()
boolean addTask(Task task)
{
Preconditions.checkArgument(taskLock.getGroupId().equals(task.getGroupId()));
Preconditions.checkArgument(taskLock.getPriority() == task.getPriority());
Preconditions.checkArgument(taskLock.getNonNullPriority() == task.getPriority());
return taskIds.add(task.getId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
Expand Down Expand Up @@ -176,11 +175,6 @@ public Response taskPost(
public Response apply(TaskQueue taskQueue)
{
try {
// Set default priority if needed
final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY);
if (priority == null) {
task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority());
}
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
Expand Down
Loading