diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java index 54e163862d62..c49e0b98aed6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java @@ -172,25 +172,27 @@ public Map> getWorkerStats() public void stop(boolean interrupt) { if (state.compareAndSet(State.STARTED, State.STOPPED)) { - final List> futures = new ArrayList<>(); - - // Send stop commands to all workers. This ensures they exit promptly, and do not get left in a zombie state. - // For this reason, the workerClient uses an unlimited retry policy. If a stop command is lost, a worker - // could get stuck in a zombie state without its controller. This state would persist until the server that - // ran the controller shuts down or restarts. At that time, the listener in DartWorkerRunner.BrokerListener calls - // "controllerFailed()" on the Worker, and the zombie worker would exit. - - for (final String workerId : workerIds) { - futures.add(workerClient.stopWorker(workerId)); - } - - // Block until messages are acknowledged, or until the worker we're communicating with has failed. - - try { - FutureUtils.getUnchecked(Futures.successfulAsList(futures), false); - } - catch (Throwable ignored) { - // Suppress errors. + if (interrupt) { + final List> futures = new ArrayList<>(); + + // Send stop commands to all workers. This ensures they exit promptly, and do not get left in a zombie state. + // For this reason, the workerClient uses an unlimited retry policy. If a stop command is lost, a worker + // could get stuck in a zombie state without its controller. This state would persist until the server that + // ran the controller shuts down or restarts. At that time, the listener in DartWorkerRunner.BrokerListener + // calls "controllerFailed()" on the Worker, and the zombie worker would exit. + + for (final String workerId : workerIds) { + futures.add(workerClient.stopWorker(workerId)); + } + + // Block until messages are acknowledged, or until the worker we're communicating with has failed. + + try { + FutureUtils.getUnchecked(Futures.successfulAsList(futures), false); + } + catch (Throwable ignored) { + // Suppress errors. + } } CloseableUtils.closeAndSuppressExceptions(workerClient, e -> log.warn(e, "Failed to close workerClient")); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java index ae136196a0fc..d51a410fbb31 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java @@ -36,8 +36,7 @@ import org.apache.druid.msq.dart.worker.http.DartWorkerInfo; import org.apache.druid.msq.dart.worker.http.GetWorkersResponse; import org.apache.druid.msq.exec.Worker; -import org.apache.druid.msq.indexing.error.CanceledFault; -import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.indexing.error.MSQFaultUtils; import org.apache.druid.msq.rpc.ResourcePermissionMapper; import org.apache.druid.msq.rpc.WorkerResource; import org.apache.druid.query.QueryContext; @@ -142,8 +141,7 @@ public Worker startWorker( holder.worker.run(); } catch (Throwable t) { - if (Thread.interrupted() - || t instanceof MSQException && ((MSQException) t).getFault().getErrorCode().equals(CanceledFault.CODE)) { + if (Thread.interrupted() || MSQFaultUtils.isCanceledException(t)) { log.debug(t, "Canceled, exiting thread."); } else { log.warn(t, "Worker for query[%s] failed and stopped.", queryId); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java index 3ad8bf1f29a3..15b1b5454e41 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java @@ -128,6 +128,11 @@ enum State */ STARTED, + /** + * State entered upon failure of some work. + */ + FAILED, + /** * State entered upon calling {@link #stop(Throwable)}. */ @@ -249,7 +254,8 @@ public void startAsync() public void stop(@Nullable Throwable t) throws InterruptedException { if (state.compareAndSet(State.INIT, State.STOPPING) - || state.compareAndSet(State.STARTED, State.STOPPING)) { + || state.compareAndSet(State.STARTED, State.STOPPING) + || state.compareAndSet(State.FAILED, State.STOPPING)) { // Initiate stopping. try { exec.cancel(cancellationId); @@ -562,7 +568,11 @@ public void onSuccess(final List workerResultAndOutputChannelsResolved) @Override public void onFailure(final Throwable t) { - notifyListener(Either.error(t)); + if (state.compareAndSet(State.STARTED, State.FAILED)) { + // Call notifyListener only if we were STARTED. In particular, if we were STOPPING, skip this and allow + // the stop() method to set its own Canceled error. + notifyListener(Either.error(t)); + } } }, Execs.directExecutor() diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 60325e640e5a..46e7e4a14491 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -625,6 +625,8 @@ public void postCleanupStage(final StageId stageId) holder.finishProcessing(stageId); final WorkerStageKernel kernel = holder.getKernelFor(stageId); if (kernel != null) { + // Calling setStageFinished places the kernel into FINISHED state, which also means we'll ignore any + // "Canceled" errors generated by "holder.finishProcessing(stageId)". (See WorkerStageKernel.fail) kernel.setStageFinished(); } }); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java index 781639b17adc..5385645d5ebb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java @@ -50,4 +50,11 @@ public static String getErrorCodeFromMessage(String message) return message.split(ERROR_CODE_DELIMITER, 2)[0]; } + /** + * Returns whether the provided throwable is a {@link MSQException} with {@link CanceledFault}. + */ + public static boolean isCanceledException(final Throwable t) + { + return t instanceof MSQException && ((MSQException) t).getFault().getErrorCode().equals(CanceledFault.CODE); + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java index 992b90c02859..726333a2d1dc 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.indexing.error.MSQFaultUtils; import org.apache.druid.msq.kernel.ShuffleKind; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.kernel.StageId; @@ -201,12 +202,18 @@ public void fail(Throwable t) { Preconditions.checkNotNull(t, "t"); - transitionTo(WorkerStagePhase.FAILED); - resultKeyStatisticsSnapshot = null; - resultPartitionBoundaries = null; - - if (exceptionFromFail == null) { - exceptionFromFail = t; + if (WorkerStagePhase.FAILED.canTransitionFrom(phase)) { + transitionTo(WorkerStagePhase.FAILED); + resultKeyStatisticsSnapshot = null; + resultPartitionBoundaries = null; + + if (exceptionFromFail == null) { + exceptionFromFail = t; + } + } else if (!MSQFaultUtils.isCanceledException(t)) { + // Current phase is already terminal. Log and suppress this error. It likely happened during cleanup. + // (Don't log CanceledFault though. Ignore those if they come after the kernel is in a terminal phase.) + log.warn(t, "Stage[%s] failed while in phase[%s]", getStageDefinition().getId(), phase); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java index 10543beeb069..797dd8d9f8ac 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java @@ -84,7 +84,7 @@ public boolean canTransitionFrom(final WorkerStagePhase priorPhase) @Override public boolean canTransitionFrom(final WorkerStagePhase priorPhase) { - return true; + return !priorPhase.isTerminal(); } }; diff --git a/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java index 79ad621de280..71e844f2a832 100644 --- a/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java @@ -182,9 +182,9 @@ public ListenableFuture addChunk(final byte[] chunk) public void setError(final Throwable t) { synchronized (lock) { - if (noMoreWrites) { - log.noStackTrace().warn(t, "Channel is no longer accepting writes, cannot propagate exception"); - } else { + // Write error to the channel, unless "noMoreWrites" is set. If that's set, suppress errors, so regular channel + // shutdown doesn't trigger warnings in the log. + if (!noMoreWrites) { chunks.clear(); chunks.add(Either.error(t)); nextCompressedFrameLength = UNKNOWN_LENGTH;