From aab18d63de65f2541660ec83e5ef6f843e6ffbc9 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Wed, 2 Oct 2024 22:45:26 -0700 Subject: [PATCH] Dart: Smoother handling of stage early-exit. Stages can be instructed to exit before they finish, especially when a downstream stage includes a "LIMIT". This patch has improvements related to early-exiting stages. Bug fix: - WorkerStageKernel: Don't allow fail() to set an exception if the stage is already in a terminal state (FINISHED or FAILED). If fail() is called while in a terminal state, log the exception, then throw it away. If it's a cancellation exception, don't even log it. This fixes a bug where a stage that exited early could transition to FINISHED and then to FAILED, causing the overall query to fail. Performance: - DartWorkerManager previously sent stopWorker commands to workers even when "interrupt" was false. Now it only sends those commands when "interrupt" is true. The method javadoc already claimed this is what the method did, but the implementation did not match the javadoc. This reduces the number of RPCs by 1 per worker per query. Quieter logging: - In ReadableByteChunksFrameChannel, skip logging exception from setError if the channel has been closed. Channels are closed when readers are done with them, so at that point, we wouldn't be interested in the errors. - In RunWorkOrder, skip calling notifyListener on failure of the main work, in the case when stop() has already been called. The stop() method will set its own error using CanceledFault. This enables callers to detect when a stage was canceled vs. failed for some other reason. - In WorkerStageKernel, skip logging cancellation errors in fail(). This is made possible by the previous change in RunWorkOrder. --- .../dart/controller/DartWorkerManager.java | 40 ++++++++++--------- .../msq/dart/worker/DartWorkerRunner.java | 6 +-- .../apache/druid/msq/exec/RunWorkOrder.java | 14 ++++++- .../org/apache/druid/msq/exec/WorkerImpl.java | 2 + .../msq/indexing/error/MSQFaultUtils.java | 7 ++++ .../msq/kernel/worker/WorkerStageKernel.java | 19 ++++++--- .../msq/kernel/worker/WorkerStagePhase.java | 2 +- .../ReadableByteChunksFrameChannel.java | 6 +-- 8 files changed, 61 insertions(+), 35 deletions(-) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java index 54e163862d62..c49e0b98aed6 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/controller/DartWorkerManager.java @@ -172,25 +172,27 @@ public Map> getWorkerStats() public void stop(boolean interrupt) { if (state.compareAndSet(State.STARTED, State.STOPPED)) { - final List> futures = new ArrayList<>(); - - // Send stop commands to all workers. This ensures they exit promptly, and do not get left in a zombie state. - // For this reason, the workerClient uses an unlimited retry policy. If a stop command is lost, a worker - // could get stuck in a zombie state without its controller. This state would persist until the server that - // ran the controller shuts down or restarts. At that time, the listener in DartWorkerRunner.BrokerListener calls - // "controllerFailed()" on the Worker, and the zombie worker would exit. - - for (final String workerId : workerIds) { - futures.add(workerClient.stopWorker(workerId)); - } - - // Block until messages are acknowledged, or until the worker we're communicating with has failed. - - try { - FutureUtils.getUnchecked(Futures.successfulAsList(futures), false); - } - catch (Throwable ignored) { - // Suppress errors. + if (interrupt) { + final List> futures = new ArrayList<>(); + + // Send stop commands to all workers. This ensures they exit promptly, and do not get left in a zombie state. + // For this reason, the workerClient uses an unlimited retry policy. If a stop command is lost, a worker + // could get stuck in a zombie state without its controller. This state would persist until the server that + // ran the controller shuts down or restarts. At that time, the listener in DartWorkerRunner.BrokerListener + // calls "controllerFailed()" on the Worker, and the zombie worker would exit. + + for (final String workerId : workerIds) { + futures.add(workerClient.stopWorker(workerId)); + } + + // Block until messages are acknowledged, or until the worker we're communicating with has failed. + + try { + FutureUtils.getUnchecked(Futures.successfulAsList(futures), false); + } + catch (Throwable ignored) { + // Suppress errors. + } } CloseableUtils.closeAndSuppressExceptions(workerClient, e -> log.warn(e, "Failed to close workerClient")); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java index ae136196a0fc..d51a410fbb31 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/dart/worker/DartWorkerRunner.java @@ -36,8 +36,7 @@ import org.apache.druid.msq.dart.worker.http.DartWorkerInfo; import org.apache.druid.msq.dart.worker.http.GetWorkersResponse; import org.apache.druid.msq.exec.Worker; -import org.apache.druid.msq.indexing.error.CanceledFault; -import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.indexing.error.MSQFaultUtils; import org.apache.druid.msq.rpc.ResourcePermissionMapper; import org.apache.druid.msq.rpc.WorkerResource; import org.apache.druid.query.QueryContext; @@ -142,8 +141,7 @@ public Worker startWorker( holder.worker.run(); } catch (Throwable t) { - if (Thread.interrupted() - || t instanceof MSQException && ((MSQException) t).getFault().getErrorCode().equals(CanceledFault.CODE)) { + if (Thread.interrupted() || MSQFaultUtils.isCanceledException(t)) { log.debug(t, "Canceled, exiting thread."); } else { log.warn(t, "Worker for query[%s] failed and stopped.", queryId); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java index 3ad8bf1f29a3..15b1b5454e41 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java @@ -128,6 +128,11 @@ enum State */ STARTED, + /** + * State entered upon failure of some work. + */ + FAILED, + /** * State entered upon calling {@link #stop(Throwable)}. */ @@ -249,7 +254,8 @@ public void startAsync() public void stop(@Nullable Throwable t) throws InterruptedException { if (state.compareAndSet(State.INIT, State.STOPPING) - || state.compareAndSet(State.STARTED, State.STOPPING)) { + || state.compareAndSet(State.STARTED, State.STOPPING) + || state.compareAndSet(State.FAILED, State.STOPPING)) { // Initiate stopping. try { exec.cancel(cancellationId); @@ -562,7 +568,11 @@ public void onSuccess(final List workerResultAndOutputChannelsResolved) @Override public void onFailure(final Throwable t) { - notifyListener(Either.error(t)); + if (state.compareAndSet(State.STARTED, State.FAILED)) { + // Call notifyListener only if we were STARTED. In particular, if we were STOPPING, skip this and allow + // the stop() method to set its own Canceled error. + notifyListener(Either.error(t)); + } } }, Execs.directExecutor() diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 60325e640e5a..46e7e4a14491 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -625,6 +625,8 @@ public void postCleanupStage(final StageId stageId) holder.finishProcessing(stageId); final WorkerStageKernel kernel = holder.getKernelFor(stageId); if (kernel != null) { + // Calling setStageFinished places the kernel into FINISHED state, which also means we'll ignore any + // "Canceled" errors generated by "holder.finishProcessing(stageId)". (See WorkerStageKernel.fail) kernel.setStageFinished(); } }); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java index 781639b17adc..5385645d5ebb 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/error/MSQFaultUtils.java @@ -50,4 +50,11 @@ public static String getErrorCodeFromMessage(String message) return message.split(ERROR_CODE_DELIMITER, 2)[0]; } + /** + * Returns whether the provided throwable is a {@link MSQException} with {@link CanceledFault}. + */ + public static boolean isCanceledException(final Throwable t) + { + return t instanceof MSQException && ((MSQException) t).getFault().getErrorCode().equals(CanceledFault.CODE); + } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java index 992b90c02859..726333a2d1dc 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStageKernel.java @@ -25,6 +25,7 @@ import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.msq.indexing.error.MSQFaultUtils; import org.apache.druid.msq.kernel.ShuffleKind; import org.apache.druid.msq.kernel.StageDefinition; import org.apache.druid.msq.kernel.StageId; @@ -201,12 +202,18 @@ public void fail(Throwable t) { Preconditions.checkNotNull(t, "t"); - transitionTo(WorkerStagePhase.FAILED); - resultKeyStatisticsSnapshot = null; - resultPartitionBoundaries = null; - - if (exceptionFromFail == null) { - exceptionFromFail = t; + if (WorkerStagePhase.FAILED.canTransitionFrom(phase)) { + transitionTo(WorkerStagePhase.FAILED); + resultKeyStatisticsSnapshot = null; + resultPartitionBoundaries = null; + + if (exceptionFromFail == null) { + exceptionFromFail = t; + } + } else if (!MSQFaultUtils.isCanceledException(t)) { + // Current phase is already terminal. Log and suppress this error. It likely happened during cleanup. + // (Don't log CanceledFault though. Ignore those if they come after the kernel is in a terminal phase.) + log.warn(t, "Stage[%s] failed while in phase[%s]", getStageDefinition().getId(), phase); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java index 10543beeb069..797dd8d9f8ac 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/kernel/worker/WorkerStagePhase.java @@ -84,7 +84,7 @@ public boolean canTransitionFrom(final WorkerStagePhase priorPhase) @Override public boolean canTransitionFrom(final WorkerStagePhase priorPhase) { - return true; + return !priorPhase.isTerminal(); } }; diff --git a/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java b/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java index 79ad621de280..71e844f2a832 100644 --- a/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java +++ b/processing/src/main/java/org/apache/druid/frame/channel/ReadableByteChunksFrameChannel.java @@ -182,9 +182,9 @@ public ListenableFuture addChunk(final byte[] chunk) public void setError(final Throwable t) { synchronized (lock) { - if (noMoreWrites) { - log.noStackTrace().warn(t, "Channel is no longer accepting writes, cannot propagate exception"); - } else { + // Write error to the channel, unless "noMoreWrites" is set. If that's set, suppress errors, so regular channel + // shutdown doesn't trigger warnings in the log. + if (!noMoreWrites) { chunks.clear(); chunks.add(Either.error(t)); nextCompressedFrameLength = UNKNOWN_LENGTH;