diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java index 3d31d7e2c3ee..3ad8bf1f29a3 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java @@ -129,12 +129,12 @@ enum State STARTED, /** - * State entered upon calling {@link #stop()}. + * State entered upon calling {@link #stop(Throwable)}. */ STOPPING, /** - * State entered when a call to {@link #stop()} concludes. + * State entered when a call to {@link #stop(Throwable)} concludes. */ STOPPED } @@ -232,7 +232,7 @@ public void startAsync() setUpCompletionCallbacks(); } catch (Throwable t) { - stopUnchecked(); + stopUnchecked(t); } } @@ -242,64 +242,72 @@ public void startAsync() * are all properly cleaned up. * * Blocks until execution is fully stopped. + * + * @param t error to send to {@link RunWorkOrderListener#onFailure}, if success/failure has not already been sent. + * Will also be thrown at the end of this method. */ - public void stop() throws InterruptedException + public void stop(@Nullable Throwable t) throws InterruptedException { if (state.compareAndSet(State.INIT, State.STOPPING) || state.compareAndSet(State.STARTED, State.STOPPING)) { // Initiate stopping. - Throwable e = null; - try { exec.cancel(cancellationId); } catch (Throwable e2) { - e = e2; + if (t == null) { + t = e2; + } else { + t.addSuppressed(e2); + } } try { frameContext.close(); } catch (Throwable e2) { - if (e == null) { - e = e2; + if (t == null) { + t = e2; } else { - e.addSuppressed(e2); + t.addSuppressed(e2); } } try { - // notifyListener will ignore this cancellation error if work has already succeeded. - notifyListener(Either.error(new MSQException(CanceledFault.instance()))); + // notifyListener will ignore this error if work has already succeeded. + notifyListener(Either.error(t != null ? t : new MSQException(CanceledFault.instance()))); } catch (Throwable e2) { - if (e == null) { - e = e2; + if (t == null) { + t = e2; } else { - e.addSuppressed(e2); + t.addSuppressed(e2); } } stopLatch.countDown(); - - if (e != null) { - Throwables.throwIfInstanceOf(e, InterruptedException.class); - Throwables.throwIfUnchecked(e); - throw new RuntimeException(e); - } } stopLatch.await(); + + if (t != null) { + Throwables.throwIfInstanceOf(t, InterruptedException.class); + Throwables.throwIfUnchecked(t); + throw new RuntimeException(t); + } } /** - * Calls {@link #stop()}. If the call to {@link #stop()} throws {@link InterruptedException}, this method sets - * the interrupt flag and throws an unchecked exception. + * Calls {@link #stop(Throwable)}. If the call to {@link #stop(Throwable)} throws {@link InterruptedException}, + * this method sets the interrupt flag and throws an unchecked exception. + * + * @param t error to send to {@link RunWorkOrderListener#onFailure}, if success/failure has not already been sent. + * Will also be thrown at the end of this method. */ - public void stopUnchecked() + public void stopUnchecked(@Nullable final Throwable t) { try { - stop(); + stop(t); } catch (InterruptedException e) { Thread.currentThread().interrupt(); diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java index 906d9e041b3c..7be045542bc8 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/WorkerImpl.java @@ -405,7 +405,7 @@ private void handleNewWorkOrder( ); // Set up processorCloser (called when processing is done). - kernelHolder.processorCloser.register(runWorkOrder::stopUnchecked); + kernelHolder.processorCloser.register(() -> runWorkOrder.stopUnchecked(null)); // Start working on this stage immediately. kernel.startReading(); diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java new file mode 100644 index 000000000000..dbd6857b2722 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/RunWorkOrderTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.exec; + +import org.apache.druid.frame.processor.FrameProcessorExecutor; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.msq.indexing.error.MSQException; +import org.apache.druid.msq.kernel.FrameContext; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +public class RunWorkOrderTest +{ + private static final String CANCELLATION_ID = "my-cancellation-id"; + + @Test + public void test_stopUnchecked() throws InterruptedException + { + final FrameProcessorExecutor exec = Mockito.mock(FrameProcessorExecutor.class); + final WorkerContext workerContext = Mockito.mock(WorkerContext.class); + final FrameContext frameContext = Mockito.mock(FrameContext.class); + final WorkerStorageParameters storageParameters = Mockito.mock(WorkerStorageParameters.class); + final RunWorkOrderListener listener = Mockito.mock(RunWorkOrderListener.class); + + Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters); + + final RunWorkOrder runWorkOrder = + new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, workerContext, frameContext, listener, false, false); + + runWorkOrder.stopUnchecked(null); + + // Calling a second time doesn't do anything special. + runWorkOrder.stopUnchecked(null); + + Mockito.verify(exec).cancel(CANCELLATION_ID); + Mockito.verify(frameContext).close(); + Mockito.verify(listener).onFailure(ArgumentMatchers.any(MSQException.class)); + } + + @Test + public void test_stopUnchecked_error() throws InterruptedException + { + final FrameProcessorExecutor exec = Mockito.mock(FrameProcessorExecutor.class); + final WorkerContext workerContext = Mockito.mock(WorkerContext.class); + final FrameContext frameContext = Mockito.mock(FrameContext.class); + final WorkerStorageParameters storageParameters = Mockito.mock(WorkerStorageParameters.class); + final RunWorkOrderListener listener = Mockito.mock(RunWorkOrderListener.class); + + Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters); + + final RunWorkOrder runWorkOrder = + new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, workerContext, frameContext, listener, false, false); + + final ISE exception = new ISE("oops"); + + Assert.assertThrows( + IllegalStateException.class, + () -> runWorkOrder.stopUnchecked(exception) + ); + + // Calling a second time doesn't do anything special. We already tried our best. + runWorkOrder.stopUnchecked(null); + + Mockito.verify(exec).cancel(CANCELLATION_ID); + Mockito.verify(frameContext).close(); + Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception)); + } + + @Test + public void test_stopUnchecked_errorDuringExecCancel() throws InterruptedException + { + final FrameProcessorExecutor exec = Mockito.mock(FrameProcessorExecutor.class); + final WorkerContext workerContext = Mockito.mock(WorkerContext.class); + final FrameContext frameContext = Mockito.mock(FrameContext.class); + final WorkerStorageParameters storageParameters = Mockito.mock(WorkerStorageParameters.class); + final RunWorkOrderListener listener = Mockito.mock(RunWorkOrderListener.class); + + final ISE exception = new ISE("oops"); + Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters); + Mockito.doThrow(exception).when(exec).cancel(CANCELLATION_ID); + + final RunWorkOrder runWorkOrder = + new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, workerContext, frameContext, listener, false, false); + + Assert.assertThrows( + IllegalStateException.class, + () -> runWorkOrder.stopUnchecked(null) + ); + + Mockito.verify(exec).cancel(CANCELLATION_ID); + Mockito.verify(frameContext).close(); + Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception)); + } + + @Test + public void test_stopUnchecked_errorDuringFrameContextClose() throws InterruptedException + { + final FrameProcessorExecutor exec = Mockito.mock(FrameProcessorExecutor.class); + final WorkerContext workerContext = Mockito.mock(WorkerContext.class); + final FrameContext frameContext = Mockito.mock(FrameContext.class); + final WorkerStorageParameters storageParameters = Mockito.mock(WorkerStorageParameters.class); + final RunWorkOrderListener listener = Mockito.mock(RunWorkOrderListener.class); + + final ISE exception = new ISE("oops"); + Mockito.when(frameContext.storageParameters()).thenReturn(storageParameters); + Mockito.doThrow(exception).when(frameContext).close(); + + final RunWorkOrder runWorkOrder = + new RunWorkOrder(null, null, null, exec, CANCELLATION_ID, workerContext, frameContext, listener, false, false); + + Assert.assertThrows( + IllegalStateException.class, + () -> runWorkOrder.stopUnchecked(null) + ); + + Mockito.verify(exec).cancel(CANCELLATION_ID); + Mockito.verify(frameContext).close(); + Mockito.verify(listener).onFailure(ArgumentMatchers.eq(exception)); + } +}