diff --git a/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java b/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java index 0f6bb8bf3482..7ad5d1091de9 100644 --- a/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java +++ b/core/src/main/java/org/apache/druid/java/util/emitter/EmittingLogger.java @@ -83,11 +83,13 @@ public AlertBuilder makeAlert(@Nullable Throwable t, String message, Object... o StringUtils.nonStrictFormat(message, objects) ); - error(errorMessage); ISE e = new ISE(errorMessage); if (t != null) { e.addSuppressed(t); } + + error(e, errorMessage); + throw e; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerWorkItem.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerWorkItem.java index e10f25007e65..f9aa6652d233 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerWorkItem.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/TaskRunnerWorkItem.java @@ -86,6 +86,7 @@ public DateTime getQueueInsertionTime() return queueInsertionTime; } + @JsonProperty public abstract TaskLocation getLocation(); /** diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/HttpRemoteTaskRunnerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/HttpRemoteTaskRunnerConfig.java index b0d91bc511f2..bc0ba7f81c70 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/HttpRemoteTaskRunnerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/config/HttpRemoteTaskRunnerConfig.java @@ -30,9 +30,6 @@ public class HttpRemoteTaskRunnerConfig extends RemoteTaskRunnerConfig @JsonProperty private int workerSyncNumThreads = 5; - @JsonProperty - private Period waitForWorkerSlot = new Period("PT1M"); - @JsonProperty private int shutdownRequestMaxRetries = 3; @@ -56,11 +53,6 @@ public int getWorkerSyncNumThreads() return workerSyncNumThreads; } - public Period getWaitForWorkerSlot() - { - return waitForWorkerSlot; - } - public int getShutdownRequestMaxRetries() { return shutdownRequestMaxRetries; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java index 3020d3c4c01e..e5549f6c032d 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunner.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.overlord.hrtr; +import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.base.Optional; @@ -36,6 +37,7 @@ import com.google.common.util.concurrent.ListenableScheduledFuture; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; +import com.google.errorprone.annotations.concurrent.GuardedBy; import org.apache.curator.framework.CuratorFramework; import org.apache.druid.concurrent.LifecycleLock; import org.apache.druid.discovery.DiscoveryDruidNode; @@ -106,7 +108,7 @@ /** * A Remote TaskRunner to manage tasks on Middle Manager nodes using internal-discovery({@link DruidNodeDiscoveryProvider}) * to discover them and Http. - * Middle Managers manages list of assigned/completed tasks on disk and exposes 3 HTTP endpoints + * Middle Managers manages list of assigned/completed tasks on disk and expose 3 HTTP endpoints * 1. POST request for assigning a task * 2. POST request for shutting down a task * 3. GET request for getting list of assigned, running, completed tasks on Middle Manager and its enable/disable status. @@ -126,10 +128,18 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer // Executor for assigning pending tasks to workers. private final ExecutorService pendingTasksExec; - // All known tasks + // All known tasks, TaskID -> HttpRemoteTaskRunnerWorkItem + // This is a ConcurrentMap as some of the reads are done without holding the lock. + @GuardedBy("statusLock") private final ConcurrentMap tasks = new ConcurrentHashMap<>(); - // All discovered workers. + // This is the list of pending tasks in the order they arrived, exclusively manipulated/used by thread that + // gives a new task to this class and threads in pendingTasksExec that are responsible for assigning tasks to + // workers. + @GuardedBy("statusLock") + private final List pendingTaskIds = new ArrayList<>(); + + // All discovered workers, "host:port" -> WorkerHolder private final ConcurrentMap workers = new ConcurrentHashMap<>(); // Executor for syncing state of each worker. @@ -143,15 +153,15 @@ public class HttpRemoteTaskRunner implements WorkerTaskRunner, TaskLogStreamer // workers which were assigned a task and are yet to acknowledge same. // Map: workerId -> taskId + // all writes are guarded + @GuardedBy("statusLock") private final ConcurrentMap workersWithUnacknowledgedTask = new ConcurrentHashMap<>(); // Executor to complete cleanup of workers which have disappeared. private final ListeningScheduledExecutorService cleanupExec; private final ConcurrentMap removedWorkerCleanups = new ConcurrentHashMap<>(); - // Guards the pending/running/complete lists of tasks and list of workers - // statusLock.notifyAll() is called whenever there is a possibility of worker slot to run task becoming available. - // statusLock.notifyAll() is called whenever a task status or location changes. + private final Object statusLock = new Object(); // task runner listeners @@ -243,6 +253,9 @@ public void start() provisioningService = provisioningStrategy.makeProvisioningService(this); scheduleSyncMonitoring(); + + startPendingTaskHandling(); + lifecycleLock.started(); log.info("Started."); @@ -313,6 +326,31 @@ private void scheduleCompletedTaskStatusCleanupFromZk() ); } + /** + * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} + */ + @SuppressWarnings("GuardedBy") // Read on workersWithUnacknowledgedTask is safe + Map getWorkersEligibleToRunTasks() + { + // In this class, this method is called with statusLock held. + // writes to workersWithUnacknowledgedTask are always guarded by statusLock. + // however writes to lazyWorker/blacklistedWorkers aren't necessarily guarded by same lock, so technically there + // could be races in that a task could get assigned to a worker which in another thread is concurrently being + // marked lazy/blacklisted , but that is ok because that is equivalent to this worker being picked for task and + // being assigned lazy/blacklisted right after even when the two threads hold a mutually exclusive lock. + return Maps.transformEntries( + Maps.filterEntries( + workers, + input -> !lazyWorkers.containsKey(input.getKey()) && + !workersWithUnacknowledgedTask.containsKey(input.getKey()) && + !blackListedWorkers.containsKey(input.getKey()) && + input.getValue().isInitialized() && + input.getValue().isEnabled() + ), + (String key, WorkerHolder value) -> value.toImmutable() + ); + } + private ImmutableWorkerInfo findWorkerToRunTask(Task task) { WorkerBehaviorConfig workerConfig = workerConfigRef.get(); @@ -326,17 +364,7 @@ private ImmutableWorkerInfo findWorkerToRunTask(Task task) return strategy.findWorkerForTask( config, - ImmutableMap.copyOf( - Maps.transformEntries( - Maps.filterEntries( - workers, - input -> !lazyWorkers.containsKey(input.getKey()) && - !workersWithUnacknowledgedTask.containsKey(input.getKey()) && - !blackListedWorkers.containsKey(input.getKey()) - ), - (String key, WorkerHolder value) -> value.toImmutable() - ) - ), + ImmutableMap.copyOf(getWorkersEligibleToRunTasks()), task ); } @@ -344,7 +372,7 @@ private ImmutableWorkerInfo findWorkerToRunTask(Task task) private boolean runTaskOnWorker( final HttpRemoteTaskRunnerWorkItem workItem, final String workerHost - ) throws Exception + ) throws InterruptedException { String taskId = workItem.getTaskId(); WorkerHolder workerHolder = workers.get(workerHost); @@ -363,8 +391,7 @@ private boolean runTaskOnWorker( long waitStart = System.currentTimeMillis(); boolean isTaskAssignmentTimedOut = false; synchronized (statusLock) { - while (tasks.containsKey(taskId) - && tasks.get(taskId).getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING) { + while (tasks.containsKey(taskId) && tasks.get(taskId).getState().isPending()) { long remaining = waitMs - (System.currentTimeMillis() - waitStart); if (remaining > 0) { statusLock.wait(remaining); @@ -382,6 +409,7 @@ private boolean runTaskOnWorker( taskId, config.getTaskAssignmentTimeout() ).emit(); + // taskComplete(..) must be called outside of statusLock, see comments on method. taskComplete(workItem, workerHolder, TaskStatus.failure(taskId)); } @@ -611,6 +639,7 @@ private void scheduleTasksCleanupForWorker(final String workerHostAndPort) taskItem.getTaskId(), config.getTaskCleanupTimeout() ); + // taskComplete(..) must be called outside of statusLock, see comments on method. taskComplete(taskItem, null, TaskStatus.failure(taskItem.getTaskId())); } } @@ -689,7 +718,7 @@ private void scheduleSyncMonitoring() * This method returns the debugging information exposed by {@link HttpRemoteTaskRunnerResource} and meant * for that use only. It must not be used for any other purpose. */ - public Map getDebugInfo() + Map getWorkerSyncerDebugInfo() { Preconditions.checkArgument(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); @@ -849,7 +878,7 @@ public Collection getPendingTaskPayloads() synchronized (statusLock) { return tasks.values() .stream() - .filter(item -> item.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING) + .filter(item -> item.getState().isPending()) .map(HttpRemoteTaskRunnerWorkItem::getTask) .collect(Collectors.toList()); } @@ -858,6 +887,7 @@ public Collection getPendingTaskPayloads() @Override public Optional streamTaskLog(String taskId, long offset) { + @SuppressWarnings("GuardedBy") // Read on tasks is safe HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId); Worker worker = null; if (taskRunnerWorkItem != null && taskRunnerWorkItem.getState() != HttpRemoteTaskRunnerWorkItem.State.COMPLETE) { @@ -904,6 +934,7 @@ public InputStream openStream() throws IOException @Override public Optional streamTaskReports(String taskId) { + @SuppressWarnings("GuardedBy") // Read on tasks is safe HttpRemoteTaskRunnerWorkItem taskRunnerWorkItem = tasks.get(taskId); Worker worker = null; if (taskRunnerWorkItem != null && taskRunnerWorkItem.getState() != HttpRemoteTaskRunnerWorkItem.State.COMPLETE) { @@ -1019,92 +1050,175 @@ public ListenableFuture run(Task task) HttpRemoteTaskRunnerWorkItem.State.PENDING ); tasks.put(task.getId(), taskRunnerWorkItem); - addPendingTaskToExecutor(task.getId()); + pendingTaskIds.add(task.getId()); + + statusLock.notifyAll(); + return taskRunnerWorkItem.getResult(); } } } - private void addPendingTaskToExecutor(final String taskId) + private void startPendingTaskHandling() { - pendingTasksExec.execute( - () -> { - while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { - ImmutableWorkerInfo immutableWorker; - HttpRemoteTaskRunnerWorkItem taskItem = null; + for (int i = 0; i < config.getPendingTasksRunnerNumThreads(); i++) { + pendingTasksExec.submit( + () -> { try { - synchronized (statusLock) { - taskItem = tasks.get(taskId); + if (!lifecycleLock.awaitStarted()) { + log.makeAlert("Lifecycle not started, PendingTaskExecution loop will not run.").emit(); + return; + } - if (taskItem == null) { - log.info( - "Task[%s] work item not found. Probably user asked to shutdown before. Not assigning.", - taskId - ); - return; - } + pendingTasksExecutionLoop(); + } + catch (Throwable t) { + log.makeAlert(t, "Error while waiting for lifecycle start. PendingTaskExecution loop will not run") + .emit(); + } + finally { + log.info("PendingTaskExecution loop exited."); + } + } + ); + } + } - if (taskItem.getState() != HttpRemoteTaskRunnerWorkItem.State.PENDING) { - log.info( - "Task[%s] is in state[%s]. Probably some worker already reported it. Not assigning.", - taskId, - taskItem.getState() - ); - return; - } + private void pendingTasksExecutionLoop() + { + while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { + try { + // Find one pending task to run and a worker to run on + HttpRemoteTaskRunnerWorkItem taskItem = null; + ImmutableWorkerInfo immutableWorker = null; - if (taskItem.getTask() == null) { - throw new ISE("WTF! couldn't find Task instance for taskId[%s].", taskId); - } - immutableWorker = findWorkerToRunTask(taskItem.getTask()); - - if (immutableWorker == null) { - // no free worker, wait for some worker to become free - statusLock.wait(config.getWaitForWorkerSlot().toStandardDuration().getMillis()); - continue; - } else if (workersWithUnacknowledgedTask.putIfAbsent( - immutableWorker.getWorker().getHost(), - taskId - ) != null) { - // there was a race and someone else took this worker slot, try again - continue; - } - } + synchronized (statusLock) { + Iterator iter = pendingTaskIds.iterator(); + while (iter.hasNext()) { + String taskId = iter.next(); + HttpRemoteTaskRunnerWorkItem ti = tasks.get(taskId); + + if (ti == null || !ti.getState().isPending()) { + // happens if the task was shutdown, failed or observed running by a worker + iter.remove(); + continue; + } - try { - // this will send HTTP request to worker for assigning task and hence kept - // outside the synchronized block. - if (runTaskOnWorker(taskItem, immutableWorker.getWorker().getHost())) { - return; - } - } - finally { - workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost()); - synchronized (statusLock) { - statusLock.notifyAll(); - } - } + if (ti.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) { + // picked up by another pending task executor thread which is in the process of trying to + // run it on a worker, skip to next. + continue; } - catch (InterruptedException ex) { - log.info("Got InterruptedException while assigning task[%s].", taskId); - Thread.currentThread().interrupt(); - return; + if (ti.getTask() == null) { + // this is not supposed to happen except for a bug, we want to mark this task failed but + // taskComplete(..) can not be called while holding statusLock. See the javadoc on that + // method. + // so this will get marked failed afterwards outside of current synchronized block. + taskItem = ti; + break; } - catch (Throwable th) { - log.makeAlert(th, "Exception while trying to assign task") - .addData("taskId", taskId) - .emit(); - if (taskItem != null) { - taskComplete(taskItem, null, TaskStatus.failure(taskId)); - } + immutableWorker = findWorkerToRunTask(ti.getTask()); + if (immutableWorker == null) { + continue; + } + + String prevUnackedTaskId = workersWithUnacknowledgedTask.putIfAbsent( + immutableWorker.getWorker().getHost(), + taskId + ); + if (prevUnackedTaskId != null) { + log.makeAlert( + "Found worker[%s] with unacked task[%s] but still was identified to run task[%s].", + immutableWorker.getWorker().getHost(), + prevUnackedTaskId, + taskId + ).emit(); + } + + // set state to PENDING_WORKER_ASSIGN before releasing the lock so that this task item is not picked + // up by another task execution thread. + // note that we can't simply delete this task item from pendingTaskIds or else we would have to add it + // back if this thread couldn't run this task for any reason, which we will know at some later time + // and also we will need to add it back to its old position in the list. that becomes complex quickly. + // Instead we keep the PENDING_WORKER_ASSIGN to notify other task execution threads not to pick this one up. + // And, it is automatically removed by any of the task execution threads when they notice that + // ti.getState().isPending() is false (at the beginning of this loop) + ti.setState(HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN); + taskItem = ti; + break; + } + + if (taskItem == null) { + // Either no pending task is found or no suitable worker is found for any of the pending tasks. + // statusLock.notifyAll() is called whenever a new task shows up or if there is a possibility for a task + // to successfully get worker to run, for example when a new worker shows up, a task slot opens up + // because some task completed etc. + statusLock.wait(TimeUnit.MINUTES.toMillis(1)); + continue; + } + } + + String taskId = taskItem.getTaskId(); + + if (taskItem.getTask() == null) { + log.makeAlert("No Task obj found in TaskItem for taskID[%s]. Failed.", taskId).emit(); + // taskComplete(..) must be called outside of statusLock, see comments on method. + taskComplete(taskItem, null, TaskStatus.failure(taskId)); + continue; + } + + if (immutableWorker == null) { + throw new ISE("WTH! NULL immutableWorker"); + } - return; + try { + // this will send HTTP request to worker for assigning task + if (!runTaskOnWorker(taskItem, immutableWorker.getWorker().getHost())) { + if (taskItem.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING_WORKER_ASSIGN) { + taskItem.revertStateFromPendingWorkerAssignToPending(); } } } - ); + catch (InterruptedException ex) { + log.info("Got InterruptedException while assigning task[%s].", taskId); + throw ex; + } + catch (Throwable th) { + log.makeAlert(th, "Exception while trying to assign task") + .addData("taskId", taskId) + .emit(); + + // taskComplete(..) must be called outside of statusLock, see comments on method. + taskComplete(taskItem, null, TaskStatus.failure(taskId)); + } + finally { + synchronized (statusLock) { + workersWithUnacknowledgedTask.remove(immutableWorker.getWorker().getHost()); + statusLock.notifyAll(); + } + } + + } + catch (InterruptedException ex) { + log.info("Interrupted, will Exit."); + Thread.currentThread().interrupt(); + } + catch (Throwable th) { + log.makeAlert(th, "Unknown Exception while trying to assign tasks.").emit(); + } + } + } + + /** + * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} + */ + List getPendingTasksList() + { + synchronized (statusLock) { + return ImmutableList.copyOf(pendingTaskIds); + } } @Override @@ -1161,6 +1275,7 @@ public void stop() } @Override + @SuppressWarnings("GuardedBy") // Read on tasks is safe public Collection getRunningTasks() { return tasks.values() @@ -1170,11 +1285,12 @@ public Collection getRunningTasks() } @Override + @SuppressWarnings("GuardedBy") // Read on tasks is safe public Collection getPendingTasks() { return tasks.values() .stream() - .filter(item -> item.getState() == HttpRemoteTaskRunnerWorkItem.State.PENDING) + .filter(item -> item.getState().isPending()) .collect(Collectors.toList()); } @@ -1186,6 +1302,7 @@ public Collection getKnownTasks() } } + @SuppressWarnings("GuardedBy") // Read on tasks is safe public Collection getCompletedTasks() { return tasks.values() @@ -1196,26 +1313,19 @@ public Collection getCompletedTasks() @Nullable @Override + @SuppressWarnings("GuardedBy") // Read on tasks is safe public RunnerTaskState getRunnerTaskState(String taskId) { final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId); if (workItem == null) { return null; } else { - switch (workItem.state) { - case PENDING: - return RunnerTaskState.PENDING; - case RUNNING: - return RunnerTaskState.RUNNING; - case COMPLETE: - return RunnerTaskState.NONE; - default: - throw new ISE("Unknown state[%s]", workItem.state); - } + return workItem.getState().toRunnerTaskState(); } } @Override + @SuppressWarnings("GuardedBy") // Read on tasks is safe public TaskLocation getTaskLocation(String taskId) { final HttpRemoteTaskRunnerWorkItem workItem = tasks.get(taskId); @@ -1233,6 +1343,15 @@ public List getBlacklistedWorkers() ).collect(Collectors.toList()); } + /** + * Must not be used outside of this class and {@link HttpRemoteTaskRunnerResource} , used for read only. + */ + @SuppressWarnings("GuardedBy") + Map getWorkersWithUnacknowledgedTasks() + { + return workersWithUnacknowledgedTask; + } + @Override public Optional getScalingStats() { @@ -1313,6 +1432,7 @@ void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder case RUNNING: switch (taskItem.getState()) { case PENDING: + case PENDING_WORKER_ASSIGN: taskItem.setWorker(worker); taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING); log.info("Task[%s] started RUNNING on worker[%s].", taskId, worker.getHost()); @@ -1361,6 +1481,7 @@ void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder case SUCCESS: switch (taskItem.getState()) { case PENDING: + case PENDING_WORKER_ASSIGN: taskItem.setWorker(worker); taskItem.setState(HttpRemoteTaskRunnerWorkItem.State.RUNNING); log.info("Task[%s] finished on worker[%s].", taskId, worker.getHost()); @@ -1413,6 +1534,7 @@ void taskAddedOrUpdated(final TaskAnnouncement announcement, final WorkerHolder } if (isTaskCompleted) { + // taskComplete(..) must be called outside of statusLock, see comments on method. taskComplete(taskItem, workerHolder, announcement.getTaskStatus()); } @@ -1430,15 +1552,35 @@ private static class HttpRemoteTaskRunnerWorkItem extends RemoteTaskRunnerWorkIt { enum State { - PENDING(0), - RUNNING(1), - COMPLETE(2); + // Task has been given to HRTR, but a worker to run this task hasn't been identified yet. + PENDING(0, true, RunnerTaskState.PENDING), + + // A Worker has been identified to run this task, but request to run task hasn't been made to worker yet + // or worker hasn't acknowledged the task yet. + PENDING_WORKER_ASSIGN(1, true, RunnerTaskState.PENDING), - int index; + RUNNING(2, false, RunnerTaskState.RUNNING), + COMPLETE(3, false, RunnerTaskState.NONE); - State(int index) + private final int index; + private final boolean isPending; + private final RunnerTaskState runnerTaskState; + + State(int index, boolean isPending, RunnerTaskState runnerTaskState) { this.index = index; + this.isPending = isPending; + this.runnerTaskState = runnerTaskState; + } + + boolean isPending() + { + return isPending; + } + + RunnerTaskState toRunnerTaskState() + { + return runnerTaskState; } } @@ -1478,6 +1620,7 @@ public void setTask(Task task) } } + @JsonProperty public State getState() { return state; @@ -1499,7 +1642,25 @@ public void setState(State state) state ); + setStateUnconditionally(state); + } + + public void revertStateFromPendingWorkerAssignToPending() + { + Preconditions.checkState( + this.state == State.PENDING_WORKER_ASSIGN, + "Can't move state from [%s] to [%s]", + this.state, + State.PENDING + ); + + setStateUnconditionally(State.PENDING); + } + + private void setStateUnconditionally(State state) + { if (log.isDebugEnabled()) { + // Exception is logged to know what led to this call. log.debug( new RuntimeException("Stacktrace..."), "Setting task[%s] work item state from [%s] to [%s].", diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java index 0266ba4beb21..fc2c5ced2542 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerResource.java @@ -34,6 +34,8 @@ /** * Collection of http endpoits to introspect state of HttpRemoteTaskRunner instance for debugging. + * Also, generic TaskRunner state can be introspected by the endpoints in + * {@link org.apache.druid.indexing.overlord.http.OverlordResource} */ @Path("/druid-internal/v1/httpRemoteTaskRunner") @ResourceFilters(StateResourceFilter.class) @@ -48,15 +50,42 @@ public HttpRemoteTaskRunnerResource(TaskMaster taskMaster) } @GET + @Path("/knownTasks") @Produces(MediaType.APPLICATION_JSON) - public Response getDebugInfo() + public Response getAllKnownTasks() { HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner(); if (httpRemoteTaskRunner == null) { return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build(); } - return Response.ok().entity(httpRemoteTaskRunner.getDebugInfo()).build(); + return Response.ok().entity(httpRemoteTaskRunner.getKnownTasks()).build(); + } + + @GET + @Path("/pendingTasksQueue") + @Produces(MediaType.APPLICATION_JSON) + public Response getPendingTasksQueue() + { + HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner(); + if (httpRemoteTaskRunner == null) { + return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build(); + } + + return Response.ok().entity(httpRemoteTaskRunner.getPendingTasksList()).build(); + } + + @GET + @Path("/workerSyncerDebugInfo") + @Produces(MediaType.APPLICATION_JSON) + public Response getWorkerSyncerDebugInfo() + { + HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner(); + if (httpRemoteTaskRunner == null) { + return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build(); + } + + return Response.ok().entity(httpRemoteTaskRunner.getWorkerSyncerDebugInfo()).build(); } @GET @@ -72,6 +101,45 @@ public Response getBlacklistedWorkers() return Response.ok().entity(httpRemoteTaskRunner.getBlacklistedWorkers()).build(); } + @GET + @Path("/lazyWorkers") + @Produces(MediaType.APPLICATION_JSON) + public Response getLazyWorkers() + { + HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner(); + if (httpRemoteTaskRunner == null) { + return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build(); + } + + return Response.ok().entity(httpRemoteTaskRunner.getLazyWorkers()).build(); + } + + @GET + @Path("/workersWithUnacknowledgedTasks") + @Produces(MediaType.APPLICATION_JSON) + public Response getWorkersWithUnacknowledgedTasks() + { + HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner(); + if (httpRemoteTaskRunner == null) { + return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build(); + } + + return Response.ok().entity(httpRemoteTaskRunner.getWorkersWithUnacknowledgedTasks()).build(); + } + + @GET + @Path("/workersEilgibleToRunTasks") + @Produces(MediaType.APPLICATION_JSON) + public Response getWorkersEilgibleToRunTasks() + { + HttpRemoteTaskRunner httpRemoteTaskRunner = getHttpRemoteTaskRunner(); + if (httpRemoteTaskRunner == null) { + return Response.status(Response.Status.FORBIDDEN).entity("HttpRemoteTaskRunner is NULL.").build(); + } + + return Response.ok().entity(httpRemoteTaskRunner.getWorkersEligibleToRunTasks()).build(); + } + private HttpRemoteTaskRunner getHttpRemoteTaskRunner() { Optional taskRunnerOpt = taskMaster.getTaskRunner(); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java index cfb618066bb0..630b0f032202 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/hrtr/WorkerHolder.java @@ -332,6 +332,22 @@ public void waitForInitialization() throws InterruptedException } } + public boolean isInitialized() + { + try { + return syncer.awaitInitialization(1, TimeUnit.MILLISECONDS); + } + catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + return false; + } + } + + public boolean isEnabled() + { + return !disabled.get(); + } + public ChangeRequestHttpSyncer getUnderlyingSyncer() { return syncer; diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java index 745f3e5cf9a9..fe1df770a19a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/hrtr/HttpRemoteTaskRunnerTest.java @@ -1349,6 +1349,12 @@ public void stop() } + @Override + public boolean isInitialized() + { + return true; + } + @Override public void waitForInitialization() {