Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion api/src/main/java/io/druid/indexer/TaskStatusPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class TaskStatusPlus
private final TaskState state;
private final Long duration;
private final TaskLocation location;
private final String dataSource;

@JsonCreator
public TaskStatusPlus(
Expand All @@ -45,7 +46,8 @@ public TaskStatusPlus(
@JsonProperty("queueInsertionTime") DateTime queueInsertionTime,
@JsonProperty("statusCode") @Nullable TaskState state,
@JsonProperty("duration") @Nullable Long duration,
@JsonProperty("location") TaskLocation location
@JsonProperty("location") TaskLocation location,
@JsonProperty("dataSource") String dataSource
)
{
if (state != null && state.isComplete()) {
Expand All @@ -58,6 +60,7 @@ public TaskStatusPlus(
this.state = state;
this.duration = duration;
this.location = Preconditions.checkNotNull(location, "location");
this.dataSource = dataSource;
}

@JsonProperty
Expand Down Expand Up @@ -143,4 +146,11 @@ public int hashCode()
{
return Objects.hash(id, type, createdTime, queueInsertionTime, state, duration, location);
}

@JsonProperty
public String getDataSource()
{
return dataSource;
}

}
3 changes: 2 additions & 1 deletion api/src/test/java/io/druid/indexer/TaskStatusPlusTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public void testSerde() throws IOException
DateTimes.nowUtc(),
TaskState.RUNNING,
1000L,
TaskLocation.create("testHost", 1010, -1)
TaskLocation.create("testHost", 1010, -1),
"ds_test"
);
final String json = mapper.writeValueAsString(status);
Assert.assertEquals(status, mapper.readValue(json, TaskStatusPlus.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2060,12 +2060,14 @@ private static class TestTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final String taskType;
private final TaskLocation location;
private final String dataSource;

public TestTaskRunnerWorkItem(Task task, ListenableFuture<TaskStatus> result, TaskLocation location)
{
super(task.getId(), result);
this.taskType = task.getType();
this.location = location;
this.dataSource = task.getDataSource();
}

@Override
Expand All @@ -2079,6 +2081,13 @@ public String getTaskType()
{
return taskType;
}

@Override
public String getDataSource()
{
return dataSource;
}

}

private static class TestableKafkaSupervisor extends KafkaSupervisor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,12 @@ public String getTaskType()
{
return task.getType();
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@Override
public String getDataSource()
{
return task.getDataSource();
}
}

private static class ProcessHolder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,8 @@ private RemoteTaskRunnerWorkItem addPendingTask(final Task task)
task.getId(),
task.getType(),
null,
null
null,
task.getDataSource()
);
pendingTaskPayloads.put(task.getId(), task);
pendingTasks.put(task.getId(), taskRunnerWorkItem);
Expand Down Expand Up @@ -966,7 +967,8 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th
taskId,
announcement.getTaskType(),
zkWorker.getWorker(),
TaskLocation.unknown()
TaskLocation.unknown(),
runningTasks.get(taskId).getDataSource()
);
final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
taskId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,32 +31,36 @@ public class RemoteTaskRunnerWorkItem extends TaskRunnerWorkItem
{
private final SettableFuture<TaskStatus> result;
private String taskType;
private final String dataSource;
private Worker worker;
private TaskLocation location;

public RemoteTaskRunnerWorkItem(
String taskId,
String taskType,
Worker worker,
TaskLocation location
TaskLocation location,
String dataSource
)
{
this(taskId, taskType, SettableFuture.<TaskStatus>create(), worker, location);
this(taskId, taskType, SettableFuture.<TaskStatus>create(), worker, location, dataSource);
}

private RemoteTaskRunnerWorkItem(
String taskId,
String taskType,
SettableFuture<TaskStatus> result,
Worker worker,
TaskLocation location
TaskLocation location,
String dataSource
)
{
super(taskId, result);
this.result = result;
this.taskType = taskType;
this.worker = worker;
this.location = location == null ? TaskLocation.unknown() : location;
this.dataSource = dataSource;
}

private RemoteTaskRunnerWorkItem(
Expand All @@ -66,14 +70,16 @@ private RemoteTaskRunnerWorkItem(
DateTime createdTime,
DateTime queueInsertionTime,
Worker worker,
TaskLocation location
TaskLocation location,
String dataSource
)
{
super(taskId, result, createdTime, queueInsertionTime);
this.result = result;
this.taskType = taskType;
this.worker = worker;
this.location = location == null ? TaskLocation.unknown() : location;
this.dataSource = dataSource;
}

public void setLocation(TaskLocation location)
Expand All @@ -97,6 +103,12 @@ public String getTaskType()
{
return taskType;
}

@Override
public String getDataSource()
{
return dataSource;
}

public void setWorker(Worker worker)
{
Expand All @@ -115,7 +127,7 @@ public void setResult(TaskStatus status)

public RemoteTaskRunnerWorkItem withQueueInsertionTime(DateTime time)
{
return new RemoteTaskRunnerWorkItem(getTaskId(), taskType, result, getCreatedTime(), time, worker, location);
return new RemoteTaskRunnerWorkItem(getTaskId(), taskType, result, getCreatedTime(), time, worker, location, dataSource);
}

public RemoteTaskRunnerWorkItem withWorker(Worker theWorker, TaskLocation location)
Expand All @@ -127,7 +139,8 @@ public RemoteTaskRunnerWorkItem withWorker(Worker theWorker, TaskLocation locati
getCreatedTime(),
getQueueInsertionTime(),
theWorker,
location
location,
dataSource
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ public DateTime getQueueInsertionTime()
*/
@Nullable
public abstract String getTaskType();
public abstract String getDataSource();

@Override
public String toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,10 @@ public Set<DataSegment> getInsertedSegments(final String taskid)
}
return segments;
}

public Pair<DateTime, String> getCreatedDateAndDataSource(String taskId)
{
return storage.getCreatedDateTimeAndDataSource(taskId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,12 @@ public String getTaskType()
{
return task.getType();
}

@Override
public String getDataSource()
{
return task.getDataSource();
}
}

private class ThreadPoolTaskRunnerCallable implements Callable<TaskStatus>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,7 @@ enum State
State state
)
{
super(taskId, taskType, worker, location);
super(taskId, task == null ? null : task.getType(), worker, location, task == null ? null : task.getDataSource());
this.state = Preconditions.checkNotNull(state);
Preconditions.checkArgument(task == null || taskType == null || taskType.equals(task.getType()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import io.druid.indexing.overlord.setup.WorkerBehaviorConfig;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.logger.Logger;
import io.druid.metadata.EntryExistsException;
Expand All @@ -70,6 +71,7 @@
import io.druid.server.security.ResourceType;
import io.druid.tasklogs.TaskLogStreamer;
import io.druid.timeline.DataSegment;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.servlet.http.HttpServletRequest;
Expand Down Expand Up @@ -460,7 +462,8 @@ public String apply(final TaskRunnerWorkItem workItem)
new WaitingTask(
task.getId(),
task.getType(),
SettableFuture.create()
SettableFuture.create(),
task.getDataSource()
)
{
@Override
Expand All @@ -481,15 +484,18 @@ public TaskLocation getLocation()
private static class WaitingTask extends TaskRunnerWorkItem
{
private final String taskType;
private final String dataSource;

WaitingTask(
String taskId,
String taskType,
ListenableFuture<TaskStatus> result
ListenableFuture<TaskStatus> result,
String dataSource
)
{
super(taskId, result, DateTimes.EPOCH, DateTimes.EPOCH);
this.taskType = taskType;
this.dataSource = dataSource;
}

@Override
Expand All @@ -503,6 +509,12 @@ public String getTaskType()
{
return taskType;
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@Override
public String getDataSource()
{
return dataSource;
}
}

@GET
Expand Down Expand Up @@ -595,20 +607,22 @@ public Response getCompleteTasks(
)
);

final List<TaskStatusPlus> completeTasks = recentlyFinishedTasks
.stream()
.map(status -> new TaskStatusPlus(
status.getId(),
taskFunction.apply(status.getId()).getType(),
taskStorageQueryAdapter.getCreatedTime(status.getId()),
// Would be nice to include the real queue insertion time, but the TaskStorage API doesn't yet allow it.
DateTimes.EPOCH,
status.getStatusCode(),
status.getDuration(),
TaskLocation.unknown()
)
)
.collect(Collectors.toList());
final List<TaskStatusPlus> completeTasks = Lists.newArrayList(Iterables.transform(
recentlyFinishedTasks,
status -> {
final Pair<DateTime, String> pair = taskStorageQueryAdapter.getCreatedDateAndDataSource(status.getId());
return new TaskStatusPlus(
status.getId(),
taskFunction.apply(status.getId()).getType(),
pair.lhs,
// Would be nice to include the real queue insertion time, but the
// TaskStorage API doesn't yet allow it.
DateTimes.EPOCH,
status.getStatusCode(),
status.getDuration(),
TaskLocation.unknown(),
pair.rhs);
}));

return Response.ok(completeTasks).build();
}
Expand Down Expand Up @@ -718,6 +732,26 @@ public Response doGetLog(
}
}

@GET
@Path("/dataSources/{dataSource}")
@Produces(MediaType.APPLICATION_JSON)
public Response getRunningTasksByDataSource(@PathParam("dataSource") String dataSource,
@Context HttpServletRequest request)
{
Optional<TaskRunner> ts = taskMaster.getTaskRunner();
if (!ts.isPresent()) {
return Response.status(Response.Status.NOT_FOUND).entity("No tasks are running").build();
}
Collection<? extends TaskRunnerWorkItem> runningTasks = ts.get().getRunningTasks();
if (runningTasks == null || runningTasks.isEmpty()) {
return Response.status(Response.Status.NOT_FOUND)
.entity("No running tasks found for the datasource : " + dataSource).build();
}
List<TaskRunnerWorkItem> taskRunnerWorkItemList = runningTasks.stream()
.filter(task -> dataSource.equals(task.getDataSource())).collect(Collectors.toList());
return Response.ok(taskRunnerWorkItemList).build();
}

private Response workItemsResponse(final Function<TaskRunner, Collection<? extends TaskRunnerWorkItem>> fn)
{
return asLeaderWith(
Expand All @@ -742,7 +776,8 @@ public TaskStatusPlus apply(TaskRunnerWorkItem workItem)
workItem.getQueueInsertionTime(),
null,
null,
workItem.getLocation()
workItem.getLocation(),
workItem.getDataSource()
);
}
}
Expand Down
Loading