Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ private static String makeTaskId(String dataSource, int randomBits)
}

@Override
public int getPriority()
public int getDefaultPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -64,17 +65,17 @@ protected AbstractTask(String id, String dataSource, Map<String, Object> context

protected AbstractTask(
String id,
String groupId,
TaskResource taskResource,
@Nullable String groupId,
@Nullable TaskResource taskResource,
String dataSource,
Map<String, Object> context
@Nullable Map<String, Object> context
)
{
this.id = Preconditions.checkNotNull(id, "id");
this.groupId = groupId == null ? id : groupId;
this.taskResource = taskResource == null ? new TaskResource(id, 1) : taskResource;
this.dataSource = Preconditions.checkNotNull(dataSource, "dataSource");
this.context = context;
this.context = context == null ? new HashMap<>() : context;
}

static String getOrMakeId(String id, final String typeName, String dataSource)
Expand Down Expand Up @@ -162,11 +163,13 @@ public void stopGracefully()
@Override
public String toString()
{
return Objects.toStringHelper(this)
.add("id", id)
.add("type", getType())
.add("dataSource", dataSource)
.toString();
return "AbstractTask{" +
"id='" + id + '\'' +
", groupId='" + groupId + '\'' +
", taskResource=" + taskResource +
", dataSource='" + dataSource + '\'' +
", context=" + context +
'}';
}

/**
Expand Down Expand Up @@ -207,13 +210,21 @@ public boolean equals(Object o)
return false;
}

return true;
if (!groupId.equals(that.groupId)) {
return false;
}

if (!dataSource.equals(that.dataSource)) {
return false;
}

return context.equals(that.context);
}

@Override
public int hashCode()
{
return id.hashCode();
return Objects.hashCode(id, groupId, dataSource, context);
}

static List<TaskLock> getTaskLocks(TaskActionClient client) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.indexer.IngestionState;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import io.druid.indexing.common.TaskReport;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
Expand Down Expand Up @@ -202,9 +202,9 @@ public AppenderatorDriverRealtimeIndexTask(
}

@Override
public int getPriority()
public int getDefaultPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,9 @@ public String getType()
}

@Override
public int getPriority()
public int getDefaultPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@
import io.druid.indexer.MetadataStorageUpdaterJobHandler;
import io.druid.indexer.TaskMetricsGetter;
import io.druid.indexer.TaskMetricsUtils;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskReport;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockTryAcquireAction;
Expand Down Expand Up @@ -171,9 +171,9 @@ public HadoopIndexTask(
}

@Override
public int getPriority()
public int getDefaultPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,14 @@
import io.druid.data.input.Rows;
import io.druid.hll.HyperLogLogCollector;
import io.druid.indexer.IngestionState;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReport;
import io.druid.indexing.common.IngestionStatsAndErrorsTaskReportData;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import io.druid.indexing.common.TaskReport;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
Expand Down Expand Up @@ -240,9 +240,9 @@ public IndexTask(
}

@Override
public int getPriority()
public int getDefaultPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY);
return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.hash.Hashing;
import io.druid.indexing.common.TaskLock;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentListUsedAction;
import io.druid.indexing.common.actions.TaskActionClient;
Expand Down Expand Up @@ -133,9 +133,9 @@ public boolean apply(@Nullable DataSegment segment)
}

@Override
public int getPriority()
public int getDefaultPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_MERGE_TASK_PRIORITY);
return Tasks.DEFAULT_MERGE_TASK_PRIORITY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ public TaskStatus run(TaskToolbox toolbox) throws Exception
}
}

@Override
public int getDefaultPriority()
{
return Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY;
}

public static NoopTask create()
{
return new NoopTask(null, null, 0, 0, null, null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
import io.druid.discovery.DiscoveryDruidNode;
import io.druid.discovery.DruidNodeDiscoveryProvider;
import io.druid.discovery.LookupNodeService;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexing.common.TaskRealtimeMetricsMonitorBuilder;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.LockAcquireAction;
import io.druid.indexing.common.actions.LockReleaseAction;
Expand Down Expand Up @@ -161,9 +161,9 @@ public RealtimeIndexTask(
}

@Override
public int getPriority()
public int getDefaultPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_REALTIME_TASK_PRIORITY);
return Tasks.DEFAULT_REALTIME_TASK_PRIORITY;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.druid.query.Query;
import io.druid.query.QueryRunner;

import javax.annotation.Nullable;
import java.util.Map;

/**
Expand Down Expand Up @@ -85,15 +84,28 @@ public interface Task
* Returns task priority. The task priority is currently used only for prioritized locking, but, in the future, it can
* be used for task scheduling, cluster resource management, etc.
*
* The task priority must be in taskContext if the task is submitted to the proper Overlord endpoint.
*
* It might not be in taskContext in rolling update. This returns {@link Tasks#DEFAULT_TASK_PRIORITY} in this case.
*
* @return task priority
*
* @see Tasks for default task priorities
* @see io.druid.indexing.overlord.http.OverlordResource#taskPost
*/
default int getPriority()
{
return getContextValue(Tasks.PRIORITY_KEY, Tasks.DEFAULT_TASK_PRIORITY);
}

/**
* Returns the default task priority. It can vary depending on the task type.
*/
default int getDefaultPriority()
{
return Tasks.DEFAULT_TASK_PRIORITY;
}

/**
* Returns a {@link TaskResource} for this task. Task resources define specific worker requirements a task may
* require.
Expand Down Expand Up @@ -180,13 +192,17 @@ default int getPriority()
*/
TaskStatus run(TaskToolbox toolbox) throws Exception;

@Nullable
default Map<String, Object> addToContext(String key, Object val)
{
getContext().put(key, val);
return getContext();
}

Map<String, Object> getContext();

@Nullable
default <ContextValueType> ContextValueType getContextValue(String key)
{
return getContext() == null ? null : (ContextValueType) getContext().get(key);
return (ContextValueType) getContext().get(key);
}

default <ContextValueType> ContextValueType getContextValue(String key, ContextValueType defaultValue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionHolder;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import io.druid.indexing.overlord.TaskMaster;
import io.druid.indexing.overlord.TaskQueue;
Expand Down Expand Up @@ -174,6 +175,11 @@ public Response taskPost(
public Response apply(TaskQueue taskQueue)
{
try {
// Set default priority if needed
final Integer priority = task.getContextValue(Tasks.PRIORITY_KEY);
if (priority == null) {
task.addToContext(Tasks.PRIORITY_KEY, task.getDefaultPriority());
}
taskQueue.add(task);
return Response.ok(ImmutableMap.of("task", task.getId())).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskLockType;
import io.druid.indexer.TaskStatus;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.Tasks;
import io.druid.indexing.overlord.HeapMemoryTaskStorage;
import io.druid.indexing.overlord.IndexerMetadataStorageAdapter;
import io.druid.indexing.overlord.TaskLockbox;
Expand Down Expand Up @@ -80,6 +81,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -231,6 +233,12 @@ public void testOverlordRun() throws Exception
Assert.assertEquals(200, response.getStatus());
Assert.assertEquals(ImmutableMap.of("task", taskId_0), response.getEntity());

final Map<String, Object> context = task_0.getContext();
Assert.assertEquals(1, context.size());
final Integer priority = (Integer) context.get(Tasks.PRIORITY_KEY);
Assert.assertNotNull(priority);
Assert.assertEquals(Tasks.DEFAULT_BATCH_INDEX_TASK_PRIORITY, priority.intValue());

// Duplicate task - should fail
response = overlordResource.taskPost(task_0, req);
Assert.assertEquals(400, response.getStatus());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.metadata;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import io.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.Handle;
Expand All @@ -31,6 +32,7 @@
public class DerbyMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
extends SQLMetadataStorageActionHandler<EntryType, StatusType, LogType, LockType>
{
@VisibleForTesting
DerbyMetadataStorageActionHandler(
SQLMetadataConnector connector,
ObjectMapper jsonMapper,
Expand Down
Loading