From 5d60c59e1c4cd7142a95293fa331646e52419940 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Fri, 3 Aug 2018 17:35:23 -0700 Subject: [PATCH] [Backport] Fix IllegalArgumentException in TaskLockBox.syncFromStorage() when updating from 0.12.x to 0.12.2. --- .../druid/indexing/kafka/KafkaIndexTask.java | 4 +- .../io/druid/indexing/common/TaskLock.java | 27 +++- .../indexing/common/task/CompactionTask.java | 4 +- .../indexing/common/task/HadoopIndexTask.java | 4 +- .../druid/indexing/common/task/IndexTask.java | 4 +- .../indexing/common/task/MergeTaskBase.java | 4 +- .../druid/indexing/common/task/NoopTask.java | 4 +- .../common/task/RealtimeIndexTask.java | 4 +- .../io/druid/indexing/common/task/Task.java | 8 - .../druid/indexing/overlord/TaskLockbox.java | 35 +++-- .../overlord/http/OverlordResource.java | 6 - .../indexing/overlord/TaskLockboxTest.java | 144 ++++++++++++++++++ .../indexing/overlord/http/OverlordTest.java | 8 - 13 files changed, 203 insertions(+), 53 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 76468b67721d..5f039c0336b4 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -299,9 +299,9 @@ private static String makeTaskId(String dataSource, int randomBits) } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java index 5c7ca14b30d1..273335ed6542 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/TaskLock.java @@ -37,9 +37,22 @@ public class TaskLock private final String dataSource; private final Interval interval; private final String version; - private final int priority; + private final Integer priority; private final boolean revoked; + public static TaskLock withPriority(TaskLock lock, int priority) + { + return new TaskLock( + lock.type, + lock.getGroupId(), + lock.getDataSource(), + lock.getInterval(), + lock.getVersion(), + priority, + lock.isRevoked() + ); + } + @JsonCreator public TaskLock( @JsonProperty("type") @Nullable TaskLockType type, // nullable for backward compatibility @@ -47,7 +60,7 @@ public TaskLock( @JsonProperty("dataSource") String dataSource, @JsonProperty("interval") Interval interval, @JsonProperty("version") String version, - @JsonProperty("priority") int priority, + @JsonProperty("priority") @Nullable Integer priority, @JsonProperty("revoked") boolean revoked ) { @@ -116,11 +129,17 @@ public String getVersion() } @JsonProperty - public int getPriority() + @Nullable + public Integer getPriority() { return priority; } + public int getNonNullPriority() + { + return Preconditions.checkNotNull(priority, "priority"); + } + @JsonProperty public boolean isRevoked() { @@ -139,7 +158,7 @@ public boolean equals(Object o) this.dataSource.equals(that.dataSource) && this.interval.equals(that.interval) && this.version.equals(that.version) && - this.priority == that.priority && + Objects.equal(this.priority, that.priority) && this.revoked == that.revoked; } } diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java index ad7760110435..5e94285cd9f7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java @@ -160,9 +160,9 @@ public String getType() } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_MERGE_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY); } @VisibleForTesting diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 6a08a82811bb..4386b5b35953 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -121,9 +121,9 @@ public HadoopIndexTask( } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 6cbc27811d96..b9ec27055106 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -170,9 +170,9 @@ public IndexTask( } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index 3a9fc9793946..acf1be019cab 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -133,9 +133,9 @@ public boolean apply(@Nullable DataSegment segment) } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_MERGE_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index 6c6358df4beb..e2027747bf5e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -152,9 +152,9 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); } public static NoopTask create() diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 053df113ec94..b3a12f79c874 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -162,9 +162,9 @@ public RealtimeIndexTask( } @Override - public int getDefaultPriority() + public int getPriority() { - return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; + return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index c69b6b56e251..1c7a134eb84e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -97,14 +97,6 @@ default int getPriority() return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY); } - /** - * Returns the default task priority. It can vary depending on the task type. - */ - default int getDefaultPriority() - { - return Tasks.DEFAULT_TASK_PRIORITY; - } - /** * Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may * require. diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java index 286296d1c4c9..8488257a7ee2 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/TaskLockbox.java @@ -130,17 +130,23 @@ public int compare(Pair left, Pair right) final TaskLock savedTaskLock = taskAndLock.rhs; if (savedTaskLock.getInterval().toDurationMillis() <= 0) { // "Impossible", but you never know what crazy stuff can be restored from storage. - log.warn("WTF?! Got lock with empty interval for task: %s", task.getId()); + log.warn("WTF?! Got lock[%s] with empty interval for task: %s", savedTaskLock, task.getId()); continue; } - final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLock); + // Create a new taskLock if it doesn't have a proper priority, + // so that every taskLock in memory has the priority. + final TaskLock savedTaskLockWithPriority = savedTaskLock.getPriority() == null + ? TaskLock.withPriority(savedTaskLock, task.getPriority()) + : savedTaskLock; + + final TaskLockPosse taskLockPosse = createOrFindLockPosse(task, savedTaskLockWithPriority); if (taskLockPosse != null) { taskLockPosse.addTask(task); final TaskLock taskLock = taskLockPosse.getTaskLock(); - if (savedTaskLock.getVersion().equals(taskLock.getVersion())) { + if (savedTaskLockWithPriority.getVersion().equals(taskLock.getVersion())) { taskLockCount++; log.info( "Reacquired lock[%s] for task: %s", @@ -151,8 +157,8 @@ public int compare(Pair left, Pair right) taskLockCount++; log.info( "Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s", - savedTaskLock.getInterval(), - savedTaskLock.getVersion(), + savedTaskLockWithPriority.getInterval(), + savedTaskLockWithPriority.getVersion(), taskLock.getVersion(), task.getId() ); @@ -160,8 +166,8 @@ public int compare(Pair left, Pair right) } else { throw new ISE( "Could not reacquire lock on interval[%s] version[%s] for task: %s", - savedTaskLock.getInterval(), - savedTaskLock.getVersion(), + savedTaskLockWithPriority.getInterval(), + savedTaskLockWithPriority.getVersion(), task.getId() ); } @@ -382,11 +388,14 @@ private TaskLockPosse createOrFindLockPosse(Task task, TaskLock taskLock) taskLock.getDataSource(), task.getDataSource() ); + final int taskPriority = task.getPriority(); + final int lockPriority = taskLock.getNonNullPriority(); + Preconditions.checkArgument( - task.getPriority() == taskLock.getPriority(), + lockPriority == taskPriority, "lock priority[%s] is different from task priority[%s]", - taskLock.getPriority(), - task.getPriority() + lockPriority, + taskPriority ); return createOrFindLockPosse( @@ -396,7 +405,7 @@ private TaskLockPosse createOrFindLockPosse(Task task, TaskLock taskLock) taskLock.getDataSource(), taskLock.getInterval(), taskLock.getVersion(), - taskLock.getPriority(), + taskPriority, taskLock.isRevoked() ); } @@ -925,7 +934,7 @@ private static boolean isAllRevocable(List lockPosses, int tryLoc private static boolean isRevocable(TaskLockPosse lockPosse, int tryLockPriority) { final TaskLock existingLock = lockPosse.getTaskLock(); - return existingLock.isRevoked() || existingLock.getPriority() < tryLockPriority; + return existingLock.isRevoked() || existingLock.getNonNullPriority() < tryLockPriority; } private TaskLockPosse getOnlyTaskLockPosseContainingInterval(Task task, Interval interval) @@ -986,7 +995,7 @@ TaskLock getTaskLock() boolean addTask(Task task) { Preconditions.checkArgument(taskLock.getGroupId().equals(task.getGroupId())); - Preconditions.checkArgument(taskLock.getPriority() == task.getPriority()); + Preconditions.checkArgument(taskLock.getNonNullPriority() == task.getPriority()); return taskIds.add(task.getId()); } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index aeb31418b4c7..8e4be31b8695 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -41,7 +41,6 @@ import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.task.Task; -import io.druid.indexing.common.task.Tasks; import io.druid.indexing.overlord.IndexerMetadataStorageAdapter; import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskQueue; @@ -165,11 +164,6 @@ public Response taskPost( public Response apply(TaskQueue taskQueue) { try { - // Set default priority if needed - final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY); - if (priority == null) { - task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority()); - } taskQueue.add(task); return Response.ok(ImmutableMap.of("task", task.getId())).build(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java index 0aa90d6401e4..ed16b9be6f5d 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLockboxTest.java @@ -19,6 +19,9 @@ package io.druid.indexing.overlord; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Iterables; import io.druid.indexing.common.TaskLock; @@ -261,6 +264,84 @@ public void testSyncFromStorage() throws EntryExistsException Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); } + @Test + public void testSyncFromStorageWithMissingTaskLockPriority() throws EntryExistsException + { + final Task task = NoopTask.create(); + taskStorage.insert(task, TaskStatus.running(task.getId())); + taskStorage.addLock( + task.getId(), + new TaskLockWithoutPriority(task.getGroupId(), task.getDataSource(), Intervals.of("2017/2018"), "v1") + ); + + final List beforeLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + final TaskLockbox lockbox = new TaskLockbox(taskStorage); + lockbox.syncFromStorage(); + + final List afterLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); + } + + @Test + public void testSyncFromStorageWithMissingTaskPriority() throws EntryExistsException + { + final Task task = NoopTask.create(); + taskStorage.insert(task, TaskStatus.running(task.getId())); + taskStorage.addLock( + task.getId(), + new TaskLock( + TaskLockType.EXCLUSIVE, + task.getGroupId(), + task.getDataSource(), + Intervals.of("2017/2018"), + "v1", + task.getPriority() + ) + ); + + final List beforeLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + final TaskLockbox lockbox = new TaskLockbox(taskStorage); + lockbox.syncFromStorage(); + + final List afterLocksInStorage = taskStorage.getActiveTasks().stream() + .flatMap(t -> taskStorage.getLocks(t.getId()).stream()) + .collect(Collectors.toList()); + + Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage); + } + + @Test + public void testSyncFromStorageWithInvalidPriority() throws EntryExistsException + { + final Task task = NoopTask.create(); + taskStorage.insert(task, TaskStatus.running(task.getId())); + taskStorage.addLock( + task.getId(), + new TaskLock( + TaskLockType.EXCLUSIVE, + task.getGroupId(), + task.getDataSource(), + Intervals.of("2017/2018"), + "v1", + 10 + ) + ); + + final TaskLockbox lockbox = new TaskLockbox(taskStorage); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("lock priority[10] is different from task priority[50]"); + lockbox.syncFromStorage(); + } + @Test public void testRevokedLockSyncFromStorage() throws EntryExistsException { @@ -504,4 +585,67 @@ private Set getAllLocks(List tasks) .flatMap(task -> taskStorage.getLocks(task.getId()).stream()) .collect(Collectors.toSet()); } + + private static class TaskLockWithoutPriority extends TaskLock + { + @JsonCreator + TaskLockWithoutPriority( + String groupId, + String dataSource, + Interval interval, + String version + ) + { + super(null, groupId, dataSource, interval, version, 0, false); + } + + @Override + @JsonProperty + public TaskLockType getType() + { + return super.getType(); + } + + @Override + @JsonProperty + public String getGroupId() + { + return super.getGroupId(); + } + + @Override + @JsonProperty + public String getDataSource() + { + return super.getDataSource(); + } + + @Override + @JsonProperty + public Interval getInterval() + { + return super.getInterval(); + } + + @Override + @JsonProperty + public String getVersion() + { + return super.getVersion(); + } + + @JsonIgnore + @Override + public Integer getPriority() + { + return super.getPriority(); + } + + @JsonIgnore + @Override + public boolean isRevoked() + { + return super.isRevoked(); + } + } } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index f389351ed385..8bfdc3cb23b4 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -37,7 +37,6 @@ import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; -import io.druid.indexing.common.task.Tasks; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.IndexerMetadataStorageAdapter; import io.druid.indexing.overlord.TaskLockbox; @@ -81,7 +80,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -232,12 +230,6 @@ public void testOverlordRun() throws Exception Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity()); - final Map context = task_0.getContext(); - Assert.assertEquals(1, context.size()); - final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY); - Assert.assertNotNull(priority); - Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue()); - // Duplicate task - should fail response = overlordResource.taskPost(task_0, req); Assert.assertEquals(400, response.getStatus());