Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@

import com.fasterxml.jackson.databind.Module;
import com.google.inject.Binder;
import org.apache.druid.client.indexing.HttpIndexingServiceClient;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.annotations.LoadScope;
import org.apache.druid.initialization.DruidModule;
Expand All @@ -48,7 +45,6 @@ public void configure(Binder binder)
LifecycleModule.register(binder, SqlTaskResource.class);
Jerseys.addResource(binder, SqlTaskResource.class);
LifecycleModule.register(binder, SqlStatementResource.class);
binder.bind(IndexingServiceClient.class).to(HttpIndexingServiceClient.class).in(LazySingleton.class);
Jerseys.addResource(binder, SqlStatementResource.class);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,7 @@ public List<TaskInfo<Task, TaskStatus>> getActiveTaskInfo(@Nullable String dataS
return listBuilder.build();
}

public List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfo(
CompleteTaskLookup taskLookup,
@Nullable String datasource
)
public List<TaskInfo<Task, TaskStatus>> getRecentlyCreatedAlreadyFinishedTaskInfo(CompleteTaskLookup taskLookup)
{
final Ordering<TaskStuff> createdDateDesc = new Ordering<TaskStuff>()
{
Expand Down Expand Up @@ -216,17 +213,15 @@ public List<TaskInfo<Task, TaskStatus>> getTaskInfos(
)
{
final List<TaskInfo<Task, TaskStatus>> tasks = new ArrayList<>();
taskLookups.forEach((type, lookup) -> {
if (type == TaskLookupType.COMPLETE) {
CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) lookup;
tasks.addAll(
getRecentlyCreatedAlreadyFinishedTaskInfo(
completeTaskLookup.hasTaskCreatedTimeFilter()
? completeTaskLookup
: completeTaskLookup.withDurationBeforeNow(config.getRecentlyFinishedThreshold()),
datasource
)
final Map<TaskLookupType, TaskLookup> processedTaskLookups =
TaskStorageUtils.processTaskLookups(
taskLookups,
DateTimes.nowUtc().minus(config.getRecentlyFinishedThreshold())
);

processedTaskLookups.forEach((type, lookup) -> {
if (type == TaskLookupType.COMPLETE) {
tasks.addAll(getRecentlyCreatedAlreadyFinishedTaskInfo((CompleteTaskLookup) lookup));
} else {
tasks.addAll(getActiveTaskInfo(datasource));
}
Expand Down Expand Up @@ -319,10 +314,10 @@ public void removeTasksOlderThan(final long timestamp)
// It is then possible that the same task will be queued for removal twice. Whilst not ideal,
// it will not cause any problems.
List<String> taskIds = tasks.entrySet().stream()
.filter(entry -> entry.getValue().getStatus().isComplete()
&& entry.getValue().getCreatedDate().isBefore(timestamp))
.map(Map.Entry::getKey)
.collect(Collectors.toList());
.filter(entry -> entry.getValue().getStatus().isComplete()
&& entry.getValue().getCreatedDate().isBefore(timestamp))
.map(Map.Entry::getKey)
.collect(Collectors.toList());

taskIds.forEach(tasks::remove);
synchronized (taskActions) {
Expand Down Expand Up @@ -395,11 +390,11 @@ private TaskStuff withStatus(TaskStatus _status)
static TaskInfo<Task, TaskStatus> toTaskInfo(TaskStuff taskStuff)
{
return new TaskInfo<>(
taskStuff.getTask().getId(),
taskStuff.getCreatedDate(),
taskStuff.getStatus(),
taskStuff.getDataSource(),
taskStuff.getTask()
taskStuff.getTask().getId(),
taskStuff.getCreatedDate(),
taskStuff.getStatus(),
taskStuff.getDataSource(),
taskStuff.getTask()
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import org.apache.druid.common.exception.DruidException;
import org.apache.druid.indexer.TaskInfo;
Expand All @@ -47,7 +46,6 @@
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.TaskLookup;
import org.apache.druid.metadata.TaskLookup.ActiveTaskLookup;
import org.apache.druid.metadata.TaskLookup.CompleteTaskLookup;
import org.apache.druid.metadata.TaskLookup.TaskLookupType;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -228,7 +226,11 @@ public List<TaskInfo<Task, TaskStatus>> getTaskInfos(
@Nullable String datasource
)
{
Map<TaskLookupType, TaskLookup> theTaskLookups = processTaskLookups(taskLookups);
Map<TaskLookupType, TaskLookup> theTaskLookups =
TaskStorageUtils.processTaskLookups(
taskLookups,
DateTimes.nowUtc().minus(config.getRecentlyFinishedThreshold())
);
return Collections.unmodifiableList(handler.getTaskInfos(theTaskLookups, datasource));
}

Expand All @@ -238,7 +240,12 @@ public List<TaskStatusPlus> getTaskStatusPlusList(
@Nullable String datasource
)
{
Map<TaskLookupType, TaskLookup> processedTaskLookups = processTaskLookups(taskLookups);
Map<TaskLookupType, TaskLookup> processedTaskLookups =
TaskStorageUtils.processTaskLookups(
taskLookups,
DateTimes.nowUtc().minus(config.getRecentlyFinishedThreshold())
);

return Collections.unmodifiableList(
handler.getTaskStatusList(processedTaskLookups, datasource)
.stream()
Expand All @@ -247,27 +254,6 @@ public List<TaskStatusPlus> getTaskStatusPlusList(
);
}

private Map<TaskLookupType, TaskLookup> processTaskLookups(
Map<TaskLookupType, TaskLookup> taskLookups
)
{
Map<TaskLookupType, TaskLookup> processedTaskLookups = Maps.newHashMapWithExpectedSize(taskLookups.size());
for (Entry<TaskLookupType, TaskLookup> entry : taskLookups.entrySet()) {
if (entry.getKey() == TaskLookupType.COMPLETE) {
CompleteTaskLookup completeTaskLookup = (CompleteTaskLookup) entry.getValue();
processedTaskLookups.put(
entry.getKey(),
completeTaskLookup.hasTaskCreatedTimeFilter()
? completeTaskLookup
: completeTaskLookup.withDurationBeforeNow(config.getRecentlyFinishedThreshold())
);
} else {
processedTaskLookups.put(entry.getKey(), entry.getValue());
}
}
return processedTaskLookups;
}

@Override
public void addLock(final String taskid, final TaskLock taskLock)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.indexing.overlord;

import org.apache.druid.indexing.common.config.TaskStorageConfig;
import org.apache.druid.metadata.TaskLookup;
import org.joda.time.DateTime;

import java.util.LinkedHashMap;
import java.util.Map;

public class TaskStorageUtils
{
private TaskStorageUtils()
{
// No instantiation.
}

/**
* Process a map of {@link TaskLookup} to apply {@link TaskStorageConfig#getRecentlyFinishedThreshold()}, and to
* remove lookups for which {@link TaskLookup#isNil()}.
*
* @param taskLookups lookups from {@link TaskStorage#getTaskInfos(Map, String)}
* @param minCreationTimestamp minimum creation time based on {@link TaskStorageConfig#getRecentlyFinishedThreshold()}
*/
public static Map<TaskLookup.TaskLookupType, TaskLookup> processTaskLookups(
final Map<TaskLookup.TaskLookupType, TaskLookup> taskLookups,
final DateTime minCreationTimestamp
)
{
final Map<TaskLookup.TaskLookupType, TaskLookup> retVal = new LinkedHashMap<>();

for (Map.Entry<TaskLookup.TaskLookupType, TaskLookup> entry : taskLookups.entrySet()) {
if (!entry.getValue().isNil()) {
if (entry.getKey() == TaskLookup.TaskLookupType.COMPLETE) {
TaskLookup.CompleteTaskLookup completeTaskLookup = (TaskLookup.CompleteTaskLookup) entry.getValue();
retVal.put(
entry.getKey(),
completeTaskLookup.withMinTimestampIfAbsent(minCreationTimestamp)
);
} else {
retVal.put(entry.getKey(), entry.getValue());
}
}
}

return retVal;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public class OverlordResource
private final AuthConfig authConfig;

private AtomicReference<WorkerBehaviorConfig> workerConfigRef = null;
private static final List API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete");
private static final List<String> API_TASK_STATES = ImmutableList.of("pending", "waiting", "running", "complete");

private enum TaskStateLookup
{
Expand Down Expand Up @@ -185,7 +185,7 @@ public OverlordResource(
}

/**
* Warning, magic: {@link org.apache.druid.client.indexing.HttpIndexingServiceClient#runTask} may call this method
* Warning, magic: {@link org.apache.druid.rpc.indexing.OverlordClient#runTask} may call this method
* remotely with {@link ClientTaskQuery} objects, but we deserialize {@link Task} objects. See the comment for {@link
* ClientTaskQuery} for details.
*/
Expand Down
Loading