diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 5f039c0336b4..76468b67721d 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -299,9 +299,9 @@ private static String makeTaskId(String dataSource, int randomBits) } @Override - public int getPriority() + public int getDefaultPriority() { - return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); + return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java index cbf10f4134ca..adde5e97ba7a 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/AbstractTask.java @@ -36,6 +36,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,17 +65,17 @@ protected AbstractTask(String id, String dataSource, Map context protected AbstractTask( String id, - String groupId, - TaskResource taskResource, + @Nullable String groupId, + @Nullable TaskResource taskResource, String dataSource, - Map context + @Nullable Map context ) { this.id = Preconditions.checkNotNull(id, "id"); this.groupId = groupId == null ? id : groupId; this.taskResource = taskResource == null ? new TaskResource(id, 1) : taskResource; this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource"); - this.context = context; + this.context = context == null ? new HashMap<>() : context; } static String getOrMakeId(String id, final String typeName, String dataSource) @@ -162,11 +163,13 @@ public void stopGracefully() @Override public String toString() { - return Objects.toStringHelper(this) - .add("id", id) - .add("type", getType()) - .add("dataSource", dataSource) - .toString(); + return "AbstractTask{" + + "id='" + id + '\'' + + ", groupId='" + groupId + '\'' + + ", taskResource=" + taskResource + + ", dataSource='" + dataSource + '\'' + + ", context=" + context + + '}'; } /** @@ -207,13 +210,21 @@ public boolean equals(Object o) return false; } - return true; + if (!groupId.equals(that.groupId)) { + return false; + } + + if (!dataSource.equals(that.dataSource)) { + return false; + } + + return context.equals(that.context); } @Override public int hashCode() { - return id.hashCode(); + return Objects.hashCode(id, groupId, dataSource, context); } static List getTaskLocks(TaskActionClient client) throws IOException diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java index 5e94285cd9f7..ad7760110435 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/CompactionTask.java @@ -160,9 +160,9 @@ public String getType() } @Override - public int getPriority() + public int getDefaultPriority() { - return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY); + return Tasks.DEFAULT_MERGE_TASK_PRIORITY; } @VisibleForTesting diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java index 4386b5b35953..6a08a82811bb 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/HadoopIndexTask.java @@ -121,9 +121,9 @@ public HadoopIndexTask( } @Override - public int getPriority() + public int getDefaultPriority() { - return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); + return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 1d88733f9522..6cbc27811d96 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -67,9 +67,9 @@ import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.appenderator.Appenderator; import io.druid.segment.realtime.appenderator.AppenderatorConfig; -import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.appenderator.SegmentIdentifier; @@ -170,9 +170,9 @@ public IndexTask( } @Override - public int getPriority() + public int getDefaultPriority() { - return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY); + return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java index 454899f5c774..3a9fc9793946 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/MergeTaskBase.java @@ -34,9 +34,6 @@ import com.google.common.collect.Ordering; import com.google.common.collect.Sets; import com.google.common.hash.Hashing; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.emitter.service.ServiceEmitter; -import io.druid.java.util.emitter.service.ServiceMetricEvent; import io.druid.indexing.common.TaskLock; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; @@ -45,8 +42,11 @@ import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; -import io.druid.segment.writeout.SegmentWriteOutMediumFactory; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.emitter.service.ServiceEmitter; +import io.druid.java.util.emitter.service.ServiceMetricEvent; import io.druid.segment.IndexIO; +import io.druid.segment.writeout.SegmentWriteOutMediumFactory; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NoneShardSpec; import org.joda.time.DateTime; @@ -133,9 +133,9 @@ public boolean apply(@Nullable DataSegment segment) } @Override - public int getPriority() + public int getDefaultPriority() { - return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY); + return Tasks.DEFAULT_MERGE_TASK_PRIORITY; } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java index e060f3c2893b..6c6358df4beb 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/NoopTask.java @@ -151,6 +151,12 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception } } + @Override + public int getDefaultPriority() + { + return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY; + } + public static NoopTask create() { return new NoopTask(null, null, 0, 0, null, null, null); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java index 2a6b3b4723ac..053df113ec94 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/RealtimeIndexTask.java @@ -28,7 +28,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.primitives.Ints; -import io.druid.java.util.emitter.EmittingLogger; import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; @@ -45,6 +44,7 @@ import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.guava.CloseQuietly; +import io.druid.java.util.emitter.EmittingLogger; import io.druid.query.DruidMetrics; import io.druid.query.FinalizeResultsQueryRunner; import io.druid.query.Query; @@ -162,9 +162,9 @@ public RealtimeIndexTask( } @Override - public int getPriority() + public int getDefaultPriority() { - return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY); + return Tasks.DEFAULT_REALTIME_TASK_PRIORITY; } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java index aeb05781ec5b..c69b6b56e251 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/Task.java @@ -27,7 +27,6 @@ import io.druid.query.Query; import io.druid.query.QueryRunner; -import javax.annotation.Nullable; import java.util.Map; /** @@ -84,15 +83,28 @@ public interface Task * Returns task priority. The task priority is currently used only for prioritized locking, but, in the future, it can * be used for task scheduling, cluster resource management, etc. * + * The task priority must be in taskContext if the task is submitted to the proper Overlord endpoint. + * + * It might not be in taskContext in rolling update. This returns {@link Tasks#DEFAULT_TASK_PRIORITY} in this case. + * * @return task priority * * @see Tasks for default task priorities + * @see io.druid.indexing.overlord.http.OverlordResource#taskPost */ default int getPriority() { return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY); } + /** + * Returns the default task priority. It can vary depending on the task type. + */ + default int getDefaultPriority() + { + return Tasks.DEFAULT_TASK_PRIORITY; + } + /** * Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may * require. @@ -179,13 +191,17 @@ default int getPriority() */ TaskStatus run(TaskToolbox toolbox) throws Exception; - @Nullable + default Map addToContext(String key, Object val) + { + getContext().put(key, val); + return getContext(); + } + Map getContext(); - @Nullable default ContextValueType getContextValue(String key) { - return getContext() == null ? null : (ContextValueType) getContext().get(key); + return (ContextValueType) getContext().get(key); } default ContextValueType getContextValue(String key, ContextValueType defaultValue) diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java index 8e4be31b8695..aeb31418b4c7 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/http/OverlordResource.java @@ -41,6 +41,7 @@ import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.actions.TaskActionHolder; import io.druid.indexing.common.task.Task; +import io.druid.indexing.common.task.Tasks; import io.druid.indexing.overlord.IndexerMetadataStorageAdapter; import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskQueue; @@ -164,6 +165,11 @@ public Response taskPost( public Response apply(TaskQueue taskQueue) { try { + // Set default priority if needed + final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY); + if (priority == null) { + task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority()); + } taskQueue.add(task); return Response.ok(ImmutableMap.of("task", task.getId())).build(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java index 8bfdc3cb23b4..f389351ed385 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/http/OverlordTest.java @@ -37,6 +37,7 @@ import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.common.task.NoopTask; import io.druid.indexing.common.task.Task; +import io.druid.indexing.common.task.Tasks; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.IndexerMetadataStorageAdapter; import io.druid.indexing.overlord.TaskLockbox; @@ -80,6 +81,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; @@ -230,6 +232,12 @@ public void testOverlordRun() throws Exception Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity()); + final Map context = task_0.getContext(); + Assert.assertEquals(1, context.size()); + final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY); + Assert.assertNotNull(priority); + Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue()); + // Duplicate task - should fail response = overlordResource.taskPost(task_0, req); Assert.assertEquals(400, response.getStatus()); diff --git a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java index 62acf4965ee4..cc79261128e8 100644 --- a/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java +++ b/server/src/main/java/io/druid/metadata/DerbyMetadataStorageActionHandler.java @@ -20,6 +20,7 @@ package io.druid.metadata; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import io.druid.java.util.common.StringUtils; import org.joda.time.DateTime; import org.skife.jdbi.v2.Handle; @@ -31,6 +32,7 @@ public class DerbyMetadataStorageActionHandler extends SQLMetadataStorageActionHandler { + @VisibleForTesting DerbyMetadataStorageActionHandler( SQLMetadataConnector connector, ObjectMapper jsonMapper, diff --git a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java index 66208be95692..7b9c9513baf3 100644 --- a/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java +++ b/server/src/main/java/io/druid/metadata/SQLMetadataStorageActionHandler.java @@ -22,9 +22,8 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; -import com.google.common.base.Predicate; -import com.google.common.base.Throwables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.druid.java.util.emitter.EmittingLogger; @@ -121,53 +120,42 @@ public void insert( ) throws EntryExistsException { try { - connector.retryWithHandle( - new HandleCallback() - { - @Override - public Void withHandle(Handle handle) throws Exception - { - handle.createStatement( - StringUtils.format( - "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)", - entryTable - ) - ) - .bind("id", id) - .bind("created_date", timestamp.toString()) - .bind("datasource", dataSource) - .bind("payload", jsonMapper.writeValueAsBytes(entry)) - .bind("active", active) - .bind("status_payload", jsonMapper.writeValueAsBytes(status)) - .execute(); - return null; - } + getConnector().retryWithHandle( + (HandleCallback) handle -> { + final String sql = StringUtils.format( + "INSERT INTO %s (id, created_date, datasource, payload, active, status_payload) " + + "VALUES (:id, :created_date, :datasource, :payload, :active, :status_payload)", + getEntryTable() + ); + handle.createStatement(sql) + .bind("id", id) + .bind("created_date", timestamp.toString()) + .bind("datasource", dataSource) + .bind("payload", jsonMapper.writeValueAsBytes(entry)) + .bind("active", active) + .bind("status_payload", jsonMapper.writeValueAsBytes(status)) + .execute(); + return null; }, - new Predicate() - { - @Override - public boolean apply(Throwable e) - { - final boolean isStatementException = e instanceof StatementException || - (e instanceof CallbackFailedException - && e.getCause() instanceof StatementException); - return connector.isTransientException(e) && !(isStatementException && getEntry(id).isPresent()); - } - } + e -> getConnector().isTransientException(e) && !(isStatementException(e) && getEntry(id).isPresent()) ); } catch (Exception e) { - final boolean isStatementException = e instanceof StatementException || - (e instanceof CallbackFailedException - && e.getCause() instanceof StatementException); - if (isStatementException && getEntry(id).isPresent()) { + if (isStatementException(e) && getEntry(id).isPresent()) { throw new EntryExistsException(id, e); } else { - throw Throwables.propagate(e); + throw new RuntimeException(e); } } } + @VisibleForTesting + protected static boolean isStatementException(Throwable e) + { + return e instanceof StatementException || + (e instanceof CallbackFailedException && e.getCause() instanceof StatementException); + } + @Override public boolean setStatus(final String entryId, final boolean active, final StatusType status) {