Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -172,25 +172,27 @@ public Map<Integer, List<WorkerStats>> getWorkerStats()
public void stop(boolean interrupt)
{
if (state.compareAndSet(State.STARTED, State.STOPPED)) {
final List<ListenableFuture<?>> futures = new ArrayList<>();

// Send stop commands to all workers. This ensures they exit promptly, and do not get left in a zombie state.
// For this reason, the workerClient uses an unlimited retry policy. If a stop command is lost, a worker
// could get stuck in a zombie state without its controller. This state would persist until the server that
// ran the controller shuts down or restarts. At that time, the listener in DartWorkerRunner.BrokerListener calls
// "controllerFailed()" on the Worker, and the zombie worker would exit.

for (final String workerId : workerIds) {
futures.add(workerClient.stopWorker(workerId));
}

// Block until messages are acknowledged, or until the worker we're communicating with has failed.

try {
FutureUtils.getUnchecked(Futures.successfulAsList(futures), false);
}
catch (Throwable ignored) {
// Suppress errors.
if (interrupt) {
final List<ListenableFuture<?>> futures = new ArrayList<>();

// Send stop commands to all workers. This ensures they exit promptly, and do not get left in a zombie state.
// For this reason, the workerClient uses an unlimited retry policy. If a stop command is lost, a worker
// could get stuck in a zombie state without its controller. This state would persist until the server that
// ran the controller shuts down or restarts. At that time, the listener in DartWorkerRunner.BrokerListener
// calls "controllerFailed()" on the Worker, and the zombie worker would exit.

for (final String workerId : workerIds) {
futures.add(workerClient.stopWorker(workerId));
}

// Block until messages are acknowledged, or until the worker we're communicating with has failed.

try {
FutureUtils.getUnchecked(Futures.successfulAsList(futures), false);
}
catch (Throwable ignored) {
// Suppress errors.
}
}

CloseableUtils.closeAndSuppressExceptions(workerClient, e -> log.warn(e, "Failed to close workerClient"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@
import org.apache.druid.msq.dart.worker.http.DartWorkerInfo;
import org.apache.druid.msq.dart.worker.http.GetWorkersResponse;
import org.apache.druid.msq.exec.Worker;
import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.MSQFaultUtils;
import org.apache.druid.msq.rpc.ResourcePermissionMapper;
import org.apache.druid.msq.rpc.WorkerResource;
import org.apache.druid.query.QueryContext;
Expand Down Expand Up @@ -142,8 +141,7 @@ public Worker startWorker(
holder.worker.run();
}
catch (Throwable t) {
if (Thread.interrupted()
|| t instanceof MSQException && ((MSQException) t).getFault().getErrorCode().equals(CanceledFault.CODE)) {
if (Thread.interrupted() || MSQFaultUtils.isCanceledException(t)) {
log.debug(t, "Canceled, exiting thread.");
} else {
log.warn(t, "Worker for query[%s] failed and stopped.", queryId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ enum State
*/
STARTED,

/**
* State entered upon failure of some work.
*/
FAILED,

/**
* State entered upon calling {@link #stop(Throwable)}.
*/
Expand Down Expand Up @@ -249,7 +254,8 @@ public void startAsync()
public void stop(@Nullable Throwable t) throws InterruptedException
{
if (state.compareAndSet(State.INIT, State.STOPPING)
|| state.compareAndSet(State.STARTED, State.STOPPING)) {
|| state.compareAndSet(State.STARTED, State.STOPPING)
|| state.compareAndSet(State.FAILED, State.STOPPING)) {
// Initiate stopping.
try {
exec.cancel(cancellationId);
Expand Down Expand Up @@ -562,7 +568,11 @@ public void onSuccess(final List<Object> workerResultAndOutputChannelsResolved)
@Override
public void onFailure(final Throwable t)
{
notifyListener(Either.error(t));
if (state.compareAndSet(State.STARTED, State.FAILED)) {
// Call notifyListener only if we were STARTED. In particular, if we were STOPPING, skip this and allow
// the stop() method to set its own Canceled error.
notifyListener(Either.error(t));
}
}
},
Execs.directExecutor()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,8 @@ public void postCleanupStage(final StageId stageId)
holder.finishProcessing(stageId);
final WorkerStageKernel kernel = holder.getKernelFor(stageId);
if (kernel != null) {
// Calling setStageFinished places the kernel into FINISHED state, which also means we'll ignore any
// "Canceled" errors generated by "holder.finishProcessing(stageId)". (See WorkerStageKernel.fail)
kernel.setStageFinished();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,11 @@ public static String getErrorCodeFromMessage(String message)
return message.split(ERROR_CODE_DELIMITER, 2)[0];
}

/**
* Returns whether the provided throwable is a {@link MSQException} with {@link CanceledFault}.
*/
public static boolean isCanceledException(final Throwable t)
{
return t instanceof MSQException && ((MSQException) t).getFault().getErrorCode().equals(CanceledFault.CODE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.indexing.error.MSQFaultUtils;
import org.apache.druid.msq.kernel.ShuffleKind;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageId;
Expand Down Expand Up @@ -201,12 +202,18 @@ public void fail(Throwable t)
{
Preconditions.checkNotNull(t, "t");

transitionTo(WorkerStagePhase.FAILED);
resultKeyStatisticsSnapshot = null;
resultPartitionBoundaries = null;

if (exceptionFromFail == null) {
exceptionFromFail = t;
if (WorkerStagePhase.FAILED.canTransitionFrom(phase)) {
transitionTo(WorkerStagePhase.FAILED);
resultKeyStatisticsSnapshot = null;
resultPartitionBoundaries = null;

if (exceptionFromFail == null) {
exceptionFromFail = t;
}
} else if (!MSQFaultUtils.isCanceledException(t)) {
// Current phase is already terminal. Log and suppress this error. It likely happened during cleanup.
// (Don't log CanceledFault though. Ignore those if they come after the kernel is in a terminal phase.)
log.warn(t, "Stage[%s] failed while in phase[%s]", getStageDefinition().getId(), phase);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public boolean canTransitionFrom(final WorkerStagePhase priorPhase)
@Override
public boolean canTransitionFrom(final WorkerStagePhase priorPhase)
{
return true;
return !priorPhase.isTerminal();
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,9 @@ public ListenableFuture<?> addChunk(final byte[] chunk)
public void setError(final Throwable t)
{
synchronized (lock) {
if (noMoreWrites) {
log.noStackTrace().warn(t, "Channel is no longer accepting writes, cannot propagate exception");
} else {
// Write error to the channel, unless "noMoreWrites" is set. If that's set, suppress errors, so regular channel
// shutdown doesn't trigger warnings in the log.
if (!noMoreWrites) {
chunks.clear();
chunks.add(Either.error(t));
nextCompressedFrameLength = UNKNOWN_LENGTH;
Expand Down