From ec9fbecd04de6b4e71b9200241fcd4d77a588f02 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 31 Jul 2018 22:09:53 -0700 Subject: [PATCH 1/4] Fix TaskLockBox.syncFromStorage() when updating from 0.12.x to 0.12.2 --- .../druid/indexing/overlord/TaskLockbox.java | 47 +++++- .../indexing/overlord/TaskLockboxTest.java | 144 ++++++++++++++++++ 2 files changed, 183 insertions(+), 8 deletions(-) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index 93f62b70eaf8..c9a3fd1bd8e4 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -34,7 +34,9 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.task.Task; +import io.druid.indexing.common.task.Tasks; import io.druid.java.util.common.DateTimes; +import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.StringUtils; @@ -382,12 +384,27 @@ private TaskLockPosse createOrFindLockPosse(Task task, TaskLock taskLock) taskLock.getDataSource(), task.getDataSource() ); - Preconditions.checkArgument( - task.getPriority() == taskLock.getPriority(), - "lock priority[%s] is different from task priority[%s]", - taskLock.getPriority(), - task.getPriority() - ); + + final int priority; + // Here, both task and taskLock can return 0 priority if the priority is not properly set, that is they are + // created in an older version of Druid. + if (task.getPriority() == taskLock.getPriority()) { + priority = task.getPriority(); + } else { + // The priority of task and taskLock can be different when updating cluster if the task doesn't have the + // priority in taskContext while taskLock has. In this case, we simply ignores the task priority and uses the + // taskLock priority. + if (task.getContextValue(Tasks.PRIORITY_KEY) == null && task.getDefaultPriority() == taskLock.getPriority()) { + log.warn("Task priority is missing. Using taskLock priority[%d] instead.", taskLock.getPriority()); + priority = taskLock.getPriority(); + } else { + throw new ISE( + "lock priority[%s] is different from task priority[%s]", + taskLock.getPriority(), + task.getPriority() + ); + } + } return createOrFindLockPosse( taskLock.getType(), @@ -396,7 +413,7 @@ private TaskLockPosse createOrFindLockPosse(Task task, TaskLock taskLock) taskLock.getDataSource(), taskLock.getInterval(), taskLock.getVersion(), - taskLock.getPriority(), + priority, taskLock.isRevoked() ); } @@ -986,7 +1003,21 @@ TaskLock getTaskLock() boolean addTask(Task task) { Preconditions.checkArgument(taskLock.getGroupId().equals(task.getGroupId())); - Preconditions.checkArgument(taskLock.getPriority() == task.getPriority()); + + // The priority of task and taskLock can be different when updating cluster if the task doesn't have the + // priority in taskContext while taskLock has. In this case, we simply ignores the task priority and uses the + // taskLock priority. + if (taskLock.getPriority() != task.getPriority()) { + if (task.getContextValue(Tasks.PRIORITY_KEY) == null && task.getDefaultPriority() == taskLock.getPriority()) { + log.warn("Task priority is missing. Using taskLock priority[%d] instead.", taskLock.getPriority()); + } else { + throw new IAE( + "Task priority[%d] is different from taskLock priority[%d]", + task.getPriority(), + taskLock.getPriority() + ); + } + } return taskIds.add(task.getId()); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index f2dceb0ee30e..ca35d84fc6b7 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -19,6 +19,9 @@ package io.druid.indexing.overlord; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterables; import io.druid.indexer.TaskStatus; @@ -261,6 +264,84 @@ public void testSyncFromStorage() throws EntryExistsException Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); } + @Test + public void testSyncFromStorageWithMissingTaskLockPriority() throws EntryExistsException + { + final Task task = NoopTask.create(); + taskStorage.insert(task, TaskStatus.running(task.getId())); + taskStorage.addLock( + task.getId(), + new TaskLockWithoutPriority(task.getGroupId(), task.getDataSource(), Intervals.of("2017/2018"), "v1") + ); + + final List beforeLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + final TaskLockbox lockbox = new TaskLockbox(taskStorage); + lockbox.syncFromStorage(); + + final List afterLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); + } + + @Test + public void testSyncFromStorageWithMissingTaskPriority() throws EntryExistsException + { + final Task task = NoopTask.create(); + taskStorage.insert(task, TaskStatus.running(task.getId())); + taskStorage.addLock( + task.getId(), + new TaskLock( + TaskLockType.EXCLUSIVE, + task.getGroupId(), + task.getDataSource(), + Intervals.of("2017/2018"), + "v1", + task.getDefaultPriority() + ) + ); + + final List beforeLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + final TaskLockbox lockbox = new TaskLockbox(taskStorage); + lockbox.syncFromStorage(); + + final List afterLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); + } + + @Test + public void testSyncFromStorageWithInvalidPriority() throws EntryExistsException + { + final Task task = NoopTask.create(); + taskStorage.insert(task, TaskStatus.running(task.getId())); + taskStorage.addLock( + task.getId(), + new TaskLock( + TaskLockType.EXCLUSIVE, + task.getGroupId(), + task.getDataSource(), + Intervals.of("2017/2018"), + "v1", + 10 + ) + ); + + final TaskLockbox lockbox = new TaskLockbox(taskStorage); + expectedException.expect(ISE.class); + expectedException.expectMessage("lock priority[10] is different from task priority[0]"); + lockbox.syncFromStorage(); + } + @Test public void testRevokedLockSyncFromStorage() throws EntryExistsException { @@ -504,4 +585,67 @@ private Set getAllLocks(List tasks) .flatMap(task -> taskStorage.getLocks(task.getId()).stream()) .collect(Collectors.toSet()); } + + private static class TaskLockWithoutPriority extends TaskLock + { + @JsonCreator + TaskLockWithoutPriority( + String groupId, + String dataSource, + Interval interval, + String version + ) + { + super(null, groupId, dataSource, interval, version, 0, false); + } + + @Override + @JsonProperty + public TaskLockType getType() + { + return super.getType(); + } + + @Override + @JsonProperty + public String getGroupId() + { + return super.getGroupId(); + } + + @Override + @JsonProperty + public String getDataSource() + { + return super.getDataSource(); + } + + @Override + @JsonProperty + public Interval getInterval() + { + return super.getInterval(); + } + + @Override + @JsonProperty + public String getVersion() + { + return super.getVersion(); + } + + @JsonIgnore + @Override + public int getPriority() + { + return super.getPriority(); + } + + @JsonIgnore + @Override + public boolean isRevoked() + { + return super.isRevoked(); + } + } } From 3f5a23c179a0229facde71f8db266445dd2c0f37 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 3 Aug 2018 09:17:03 -0700 Subject: [PATCH 2/4] Make the priority of taskLock nullable --- .../druid/indexing/kafka/KafkaIndexTask.java | 4 +- .../io/druid/indexing/common/TaskLock.java | 27 +++++-- .../AppenderatorDriverRealtimeIndexTask.java | 4 +- .../indexing/common/task/CompactionTask.java | 4 +- .../indexing/common/task/HadoopIndexTask.java | 4 +- .../druid/indexing/common/task/IndexTask.java | 4 +- .../indexing/common/task/MergeTaskBase.java | 4 +- .../druid/indexing/common/task/NoopTask.java | 4 +- .../common/task/RealtimeIndexTask.java | 4 +- .../io/druid/indexing/common/task/Task.java | 8 --- .../druid/indexing/overlord/TaskLockbox.java | 70 +++++++------------ .../overlord/http/OverlordResource.java | 6 -- .../indexing/overlord/TaskLockboxTest.java | 8 +-- 13 files changed, 67 insertions(+), 84 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index d3a58240d0f3..2f8f224289f1 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -165,9 +165,9 @@ private static String makeTaskId(String dataSource, int randomBits) } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java index e2d20281e04b..2ad8fae335f1 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java @@ -37,9 +37,22 @@ public class TaskLock private final String dataSource; private final Interval interval; private final String version; - private final int priority; + private final Integer priority; private final boolean revoked; + public static TaskLock withPriority(TaskLock lock, int priority) + { + return new TaskLock( + lock.type, + lock.getGroupId(), + lock.getDataSource(), + lock.getInterval(), + lock.getVersion(), + priority, + lock.isRevoked() + ); + } + @JsonCreator public TaskLock( @JsonProperty("type") @Nullable TaskLockType type, // nullable for backward compatibility @@ -47,7 +60,7 @@ public TaskLock( @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("version") String version, - @JsonProperty("priority") int priority, + @JsonProperty("priority") @Nullable Integer priority, @JsonProperty("revoked") boolean revoked ) { @@ -116,11 +129,17 @@ public String getVersion() } @JsonProperty - public int getPriority() + @Nullable + public Integer getPriority() { return priority; } + public int getNonNullPriority() + { + return Preconditions.checkNotNull(priority, "priority"); + } + @JsonProperty public boolean isRevoked() { @@ -139,7 +158,7 @@ public boolean equals(Object o) this.dataSource.equals(that.dataSource) && this.interval.equals(that.interval) && this.version.equals(that.version) && - this.priority == that.priority && + Objects.equal(this.priority, that.priority) && this.revoked == that.revoked; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java index 5dcc35bde4ae..fa930143f6aa 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTask.java @@ -202,9 +202,9 @@ public AppenderatorDriverRealtimeIndexTask( } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java index 05c9c578a0af..f81999f15f2d 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java @@ -183,9 +183,9 @@ public String getType() } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_MERGE_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY); } @VisibleForTesting diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index a49239abcc7c..4a72114be82c 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -171,9 +171,9 @@ public HadoopIndexTask( } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index ac1c94dae1c4..f91eaf9a27b2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -241,9 +241,9 @@ public IndexTask( } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index 675f27a64f64..b1f386c0bc91 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -133,9 +133,9 @@ public boolean apply(@Nullable DataSegment segment) } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_MERGE_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index fab5a9d6b06f..e704d6aa0662 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -152,9 +152,9 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } public static NoopTask create() diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 5ea6c0810204..37a7ccf11391 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -161,9 +161,9 @@ public RealtimeIndexTask( } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index 8848f7fa8a53..727c35e08544 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -98,14 +98,6 @@ default int getPriority() return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY); } - /** - * Returns the default task priority. It can vary depending on the task type. - */ - default int getDefaultPriority() - { - return Tasks.DEFAULT_TASK_PRIORITY; - } - /** * Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may * require. diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index c9a3fd1bd8e4..c3cb42823c4b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -34,9 +34,7 @@ import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskLockType; import io.druid.indexing.common.task.Task; -import io.druid.indexing.common.task.Tasks; import io.druid.java.util.common.DateTimes; -import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.StringUtils; @@ -132,17 +130,23 @@ public int compare(Pair left, Pair right) final TaskLock savedTaskLock = taskAndLock.rhs; if (savedTaskLock.getInterval().toDurationMillis() <= 0) { // "Impossible", but you never know what crazy stuff can be restored from storage. - log.warn("WTF?! Got lock with empty interval for task: %s", task.getId()); + log.warn("WTF?! Got lock[%s] with empty interval for task: %s", savedTaskLock, task.getId()); continue; } - final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLock); + // Create a new taskLock if it doesn't have a proper priority, + // so that every taskLock in memory has the priority. + final TaskLock savedTaskLockWithPriority = savedTaskLock.getPriority() == null + ? TaskLock.withPriority(savedTaskLock, task.getPriority()) + : savedTaskLock; + + final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLockWithPriority); if (taskLockPosse != null) { taskLockPosse.addTask(task); final TaskLock taskLock = taskLockPosse.getTaskLock(); - if (savedTaskLock.getVersion().equals(taskLock.getVersion())) { + if (savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) { taskLockCount++; log.info( "Reacquired lock[%s] for task: %s", @@ -153,8 +157,8 @@ public int compare(Pair left, Pair right) taskLockCount++; log.info( "Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s", - savedTaskLock.getInterval(), - savedTaskLock.getVersion(), + savedTaskLockWithPriority.getInterval(), + savedTaskLockWithPriority.getVersion(), taskLock.getVersion(), task.getId() ); @@ -162,8 +166,8 @@ public int compare(Pair left, Pair right) } else { throw new ISE( "Could not reacquire lock on interval[%s] version[%s] for task: %s", - savedTaskLock.getInterval(), - savedTaskLock.getVersion(), + savedTaskLockWithPriority.getInterval(), + savedTaskLockWithPriority.getVersion(), task.getId() ); } @@ -384,27 +388,15 @@ private TaskLockPosse createOrFindLockPosse(Task task, TaskLock taskLock) taskLock.getDataSource(), task.getDataSource() ); + final int taskPriority = task.getPriority(); + final int lockPriority = taskLock.getNonNullPriority(); - final int priority; - // Here, both task and taskLock can return 0 priority if the priority is not properly set, that is they are - // created in an older version of Druid. - if (task.getPriority() == taskLock.getPriority()) { - priority = task.getPriority(); - } else { - // The priority of task and taskLock can be different when updating cluster if the task doesn't have the - // priority in taskContext while taskLock has. In this case, we simply ignores the task priority and uses the - // taskLock priority. - if (task.getContextValue(Tasks.PRIORITY_KEY) == null && task.getDefaultPriority() == taskLock.getPriority()) { - log.warn("Task priority is missing. Using taskLock priority[%d] instead.", taskLock.getPriority()); - priority = taskLock.getPriority(); - } else { - throw new ISE( - "lock priority[%s] is different from task priority[%s]", - taskLock.getPriority(), - task.getPriority() - ); - } - } + Preconditions.checkArgument( + lockPriority == taskPriority, + "lock priority[%s] is different from task priority[%s]", + lockPriority, + taskPriority + ); return createOrFindLockPosse( taskLock.getType(), @@ -413,7 +405,7 @@ private TaskLockPosse createOrFindLockPosse(Task task, TaskLock taskLock) taskLock.getDataSource(), taskLock.getInterval(), taskLock.getVersion(), - priority, + taskPriority, taskLock.isRevoked() ); } @@ -942,7 +934,7 @@ private static boolean isAllRevocable(List lockPosses, int tryLoc private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority) { final TaskLock existingLock = lockPosse.getTaskLock(); - return existingLock.isRevoked() || existingLock.getPriority() < tryLockPriority; + return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority; } private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval) @@ -1003,21 +995,7 @@ TaskLock getTaskLock() boolean addTask(Task task) { Preconditions.checkArgument(taskLock.getGroupId().equals(task.getGroupId())); - - // The priority of task and taskLock can be different when updating cluster if the task doesn't have the - // priority in taskContext while taskLock has. In this case, we simply ignores the task priority and uses the - // taskLock priority. - if (taskLock.getPriority() != task.getPriority()) { - if (task.getContextValue(Tasks.PRIORITY_KEY) == null && task.getDefaultPriority() == taskLock.getPriority()) { - log.warn("Task priority is missing. Using taskLock priority[%d] instead.", taskLock.getPriority()); - } else { - throw new IAE( - "Task priority[%d] is different from taskLock priority[%d]", - task.getPriority(), - taskLock.getPriority() - ); - } - } + Preconditions.checkArgument(taskLock.getNonNullPriority() == task.getPriority()); return taskIds.add(task.getId()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index f3b7876523e2..8b8d5e74db6e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -44,7 +44,6 @@ import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.task.Task; -import io.druid.indexing.common.task.Tasks; import io.druid.indexing.overlord.IndexerMetadataStorageAdapter; import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskQueue; @@ -176,11 +175,6 @@ public Response taskPost( public Response apply(TaskQueue taskQueue) { try { - // Set default priority if needed - final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY); - if (priority == null) { - task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority()); - } taskQueue.add(task); return Response.ok(ImmutableMap.of("task", task.getId())).build(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index ca35d84fc6b7..442f304dc27b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -301,7 +301,7 @@ public void testSyncFromStorageWithMissingTaskPriority() throws EntryExistsExcep task.getDataSource(), Intervals.of("2017/2018"), "v1", - task.getDefaultPriority() + task.getPriority() ) ); @@ -337,8 +337,8 @@ public void testSyncFromStorageWithInvalidPriority() throws EntryExistsException ); final TaskLockbox lockbox = new TaskLockbox(taskStorage); - expectedException.expect(ISE.class); - expectedException.expectMessage("lock priority[10] is different from task priority[0]"); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("lock priority[10] is different from task priority[50]"); lockbox.syncFromStorage(); } @@ -636,7 +636,7 @@ public String getVersion() @JsonIgnore @Override - public int getPriority() + public Integer getPriority() { return super.getPriority(); } From d60fad4c54720c6d1455c26b821d7373ad0308b7 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 3 Aug 2018 15:45:29 -0700 Subject: [PATCH 3/4] fix test --- .../java/io/druid/indexing/overlord/http/OverlordTest.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index f947bb21decc..2d7ebc59a6d0 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -233,12 +233,6 @@ public void testOverlordRun() throws Exception Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity()); - final Map context = task_0.getContext(); - Assert.assertEquals(1, context.size()); - final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY); - Assert.assertNotNull(priority); - Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue()); - // Duplicate task - should fail response = overlordResource.taskPost(task_0, req); Assert.assertEquals(400, response.getStatus()); From 31857e66593c9ebbca6f89567031429eaf378f3c Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 3 Aug 2018 15:58:16 -0700 Subject: [PATCH 4/4] fix build --- .../java/io/druid/indexing/overlord/http/OverlordTest.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index 2d7ebc59a6d0..4c08cce4bf2f 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -31,13 +31,12 @@ import io.druid.discovery.DruidLeaderSelector; import io.druid.indexer.TaskLocation; import io.druid.indexer.TaskState; -import io.druid.indexer.TaskStatusPlus; import io.druid.indexer.TaskStatus; +import io.druid.indexer.TaskStatusPlus; import io.druid.indexing.common.actions.TaskActionClientFactory; import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; -import io.druid.indexing.common.task.Tasks; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.IndexerMetadataStorageAdapter; import io.druid.indexing.overlord.TaskLockbox; @@ -81,7 +80,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch;