diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java
index 7ab591a7d3d2..8638b3917c91 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java
@@ -25,6 +25,7 @@
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.utils.JvmUtils;
import javax.annotation.Nullable;
@@ -187,7 +188,7 @@ public boolean hasNext()
{
final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime();
if (hasTimeout && thisTimeoutNanos < 0) {
- throw new RE(new TimeoutException("Sequence iterator timed out"));
+ throw new QueryTimeoutException("Sequence iterator timed out");
}
if (currentBatch != null && !currentBatch.isTerminalResult() && !currentBatch.isDrained()) {
@@ -202,7 +203,7 @@ public boolean hasNext()
}
}
if (currentBatch == null) {
- throw new RE(new TimeoutException("Sequence iterator timed out waiting for data"));
+ throw new QueryTimeoutException("Sequence iterator timed out waiting for data");
}
if (cancellationGizmo.isCancelled()) {
diff --git a/processing/src/main/java/org/apache/druid/query/QueryException.java b/core/src/main/java/org/apache/druid/query/QueryException.java
similarity index 100%
rename from processing/src/main/java/org/apache/druid/query/QueryException.java
rename to core/src/main/java/org/apache/druid/query/QueryException.java
diff --git a/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java b/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java
new file mode 100644
index 000000000000..6eca438190fe
--- /dev/null
+++ b/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import java.net.InetAddress;
+
+/**
+ * This exception is thrown when a query does not finish before the configured query timeout.
+ * {@link java.util.concurrent.TimeoutException} exceptions encountered during the lifecycle of a query
+ * are rethrown as this exception.
+ *
+ * As a {@link QueryException}, it is expected to be serialized to a json response, but will be mapped to
+ * {@link #STATUS_CODE} instead of the default HTTP 500 status.
+ */
+
+public class QueryTimeoutException extends QueryException
+{
+ private static final String ERROR_CLASS = QueryTimeoutException.class.getName();
+ public static final String ERROR_CODE = "Query timeout";
+ public static final String ERROR_MESSAGE = "Query Timed Out!";
+ public static final int STATUS_CODE = 504;
+
+ @JsonCreator
+ public QueryTimeoutException(
+ @JsonProperty("error") @Nullable String errorCode,
+ @JsonProperty("errorMessage") String errorMessage,
+ @JsonProperty("errorClass") @Nullable String errorClass,
+ @JsonProperty("host") @Nullable String host
+ )
+ {
+ super(errorCode, errorMessage, errorClass, host);
+ }
+
+ public QueryTimeoutException()
+ {
+ super(ERROR_CODE, ERROR_MESSAGE, ERROR_CLASS, resolveHostname());
+ }
+
+ public QueryTimeoutException(String errorMessage)
+ {
+ super(ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
+ }
+
+ public QueryTimeoutException(String errorMessage, String host)
+ {
+ super(ERROR_CODE, errorMessage, ERROR_CLASS, host);
+ }
+
+
+ private static String resolveHostname()
+ {
+ String host;
+ try {
+ host = InetAddress.getLocalHost().getCanonicalHostName();
+ }
+ catch (Exception e) {
+ host = null;
+ }
+ return host;
+ }
+}
diff --git a/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java b/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java
index e459c38db893..4f9f9e2dac57 100644
--- a/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java
+++ b/core/src/test/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequenceTest.java
@@ -23,7 +23,7 @@
import org.apache.druid.common.guava.CombiningSequence;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
-import org.hamcrest.Matchers;
+import org.apache.druid.query.QueryTimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -39,7 +39,6 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
@@ -511,8 +510,7 @@ public void testTimeoutExceptionDueToStalledInput() throws Exception
input.add(nonBlockingSequence(someSize));
input.add(nonBlockingSequence(someSize));
input.add(blockingSequence(someSize, 400, 500, 1, 500, true));
- expectedException.expect(RuntimeException.class);
- expectedException.expectCause(Matchers.instanceOf(TimeoutException.class));
+ expectedException.expect(QueryTimeoutException.class);
expectedException.expectMessage("Sequence iterator timed out waiting for data");
assertException(
@@ -534,8 +532,7 @@ public void testTimeoutExceptionDueToStalledReader() throws Exception
input.add(nonBlockingSequence(someSize));
input.add(nonBlockingSequence(someSize));
- expectedException.expect(RuntimeException.class);
- expectedException.expectCause(Matchers.instanceOf(TimeoutException.class));
+ expectedException.expect(QueryTimeoutException.class);
expectedException.expectMessage("Sequence iterator timed out");
assertException(input, 8, 64, 1000, 500);
}
diff --git a/core/src/test/java/org/apache/druid/query/QueryTimeoutExceptionTest.java b/core/src/test/java/org/apache/druid/query/QueryTimeoutExceptionTest.java
new file mode 100644
index 000000000000..ab187a1fb428
--- /dev/null
+++ b/core/src/test/java/org/apache/druid/query/QueryTimeoutExceptionTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class QueryTimeoutExceptionTest
+{
+ @Test
+ public void testSerde() throws IOException
+ {
+ final ObjectMapper mapper = new ObjectMapper();
+ QueryTimeoutException timeoutException = mapper.readValue(
+ mapper.writeValueAsBytes(new QueryTimeoutException()),
+ QueryTimeoutException.class
+ );
+ QueryTimeoutException timeoutExceptionWithMsg = mapper.readValue(mapper.writeValueAsBytes(new QueryTimeoutException(
+ "Another query timeout")), QueryTimeoutException.class);
+
+ Assert.assertEquals(
+ "Query timeout",
+ timeoutException.getErrorCode()
+ );
+ Assert.assertEquals(
+ "Query Timed Out!",
+ timeoutException.getMessage()
+ );
+ Assert.assertEquals(
+ "Another query timeout",
+ timeoutExceptionWithMsg.getMessage()
+ );
+ Assert.assertEquals(
+ "org.apache.druid.query.QueryTimeoutException",
+ timeoutExceptionWithMsg.getErrorClass()
+ );
+ }
+
+ @Test
+ public void testExceptionHost()
+ {
+ Assert.assertEquals(
+ "timeouthost",
+ new QueryTimeoutException("Timed out", "timeouthost").getHost()
+ );
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
index e30e9d1d8e5b..6269493dfd16 100644
--- a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
+++ b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
@@ -29,6 +29,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.MergeIterable;
import org.apache.druid.java.util.common.guava.Sequence;
@@ -132,6 +133,9 @@ public Iterable call()
catch (QueryInterruptedException e) {
throw new RuntimeException(e);
}
+ catch (QueryTimeoutException e) {
+ throw e;
+ }
catch (Exception e) {
log.noStackTrace().error(e, "Exception with one of the sequences!");
Throwables.propagateIfPossible(e);
@@ -167,7 +171,7 @@ public Iterable call()
catch (TimeoutException e) {
log.warn("Query timeout, cancelling pending results for query id [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
- throw new QueryInterruptedException(e);
+ throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);
diff --git a/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java
index 8845f2a3a36a..85a96014ef70 100644
--- a/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java
+++ b/processing/src/main/java/org/apache/druid/query/GroupByMergedQueryRunner.java
@@ -35,6 +35,7 @@
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@@ -201,7 +202,7 @@ private void waitForFutureCompletion(
closeOnFailure.close();
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
- throw new QueryInterruptedException(e);
+ throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);
diff --git a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java
index 206e11453335..b174c23857c7 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java
@@ -25,7 +25,6 @@
import javax.annotation.Nullable;
import java.util.concurrent.CancellationException;
-import java.util.concurrent.TimeoutException;
/**
* Exception representing a failed query. The name "QueryInterruptedException" is a misnomer; this is actually
@@ -33,7 +32,7 @@
*
* Fields:
* - "errorCode" is a well-defined errorCode code taken from a specific list (see the static constants). "Unknown exception"
- * represents all wrapped exceptions other than interrupt, timeout, cancellation, resource limit exceeded, unauthorized
+ * represents all wrapped exceptions other than interrupt, cancellation, resource limit exceeded, unauthorized
* request, and unsupported operation.
* - "errorMessage" is the toString of the wrapped exception
* - "errorClass" is the class of the wrapped exception
@@ -45,7 +44,6 @@
public class QueryInterruptedException extends QueryException
{
public static final String QUERY_INTERRUPTED = "Query interrupted";
- public static final String QUERY_TIMEOUT = "Query timeout";
public static final String QUERY_CANCELLED = "Query cancelled";
public static final String RESOURCE_LIMIT_EXCEEDED = "Resource limit exceeded";
public static final String UNAUTHORIZED = "Unauthorized request.";
@@ -100,8 +98,6 @@ private static String getErrorCodeFromThrowable(Throwable e)
return QUERY_INTERRUPTED;
} else if (e instanceof CancellationException) {
return QUERY_CANCELLED;
- } else if (e instanceof TimeoutException) {
- return QUERY_TIMEOUT;
} else if (e instanceof ResourceLimitExceededException) {
return RESOURCE_LIMIT_EXCEEDED;
} else if (e instanceof UnsupportedOperationException) {
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
index 73c1b64397e4..980b34ef4d29 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouper.java
@@ -34,6 +34,7 @@
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.AbstractPrioritizedCallable;
import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
@@ -361,10 +362,14 @@ public CloseableIterator> call()
final long timeout = queryTimeoutAt - System.currentTimeMillis();
return hasQueryTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get();
}
- catch (InterruptedException | TimeoutException | CancellationException e) {
+ catch (InterruptedException | CancellationException e) {
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
+ catch (TimeoutException e) {
+ GuavaUtils.cancelAll(true, future, futures);
+ throw new QueryTimeoutException();
+ }
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);
throw new RuntimeException(e.getCause());
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
index 6d0563cdf519..f9cd21b9c041 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
@@ -49,6 +49,7 @@
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.context.ResponseContext;
@@ -245,7 +246,7 @@ public AggregateResult call()
return input.run(queryPlusForRunners, responseContext)
.accumulate(AggregateResult.ok(), accumulator);
}
- catch (QueryInterruptedException e) {
+ catch (QueryInterruptedException | QueryTimeoutException e) {
throw e;
}
catch (Exception e) {
@@ -321,16 +322,19 @@ private List> getMergeBuffersHolder(
if (hasTimeout) {
final long timeout = timeoutAt - System.currentTimeMillis();
if (timeout <= 0) {
- throw new TimeoutException();
+ throw new QueryTimeoutException();
}
if ((mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers, timeout)).isEmpty()) {
- throw new TimeoutException("Cannot acquire enough merge buffers");
+ throw new QueryTimeoutException("Cannot acquire enough merge buffers");
}
} else {
mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers);
}
return mergeBufferHolder;
}
+ catch (QueryTimeoutException e) {
+ throw e;
+ }
catch (Exception e) {
throw new QueryInterruptedException(e);
}
@@ -350,7 +354,7 @@ private void waitForFutureCompletion(
}
if (hasTimeout && timeout <= 0) {
- throw new TimeoutException();
+ throw new QueryTimeoutException();
}
final List results = hasTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get();
@@ -374,7 +378,7 @@ private void waitForFutureCompletion(
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
- throw new QueryInterruptedException(e);
+ throw new QueryTimeoutException();
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouper.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouper.java
index a5ca158d9e29..573ec2d21a81 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouper.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouper.java
@@ -26,6 +26,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.BufferAggregator;
import org.apache.druid.segment.ColumnSelectorFactory;
@@ -33,7 +34,6 @@
import java.nio.ByteBuffer;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
/**
* A streaming grouper which can aggregate sorted inputs. This grouper can aggregate while its iterator is being
@@ -302,7 +302,7 @@ private void increaseWriteIndex()
// The below condition is checked in a while loop instead of using a lock to avoid frequent thread park.
while ((nextReadIndex == -1 || nextReadIndex == 0) && !Thread.currentThread().isInterrupted()) {
if (timeoutNs <= 0L) {
- throw new RuntimeException(new TimeoutException());
+ throw new QueryTimeoutException();
}
// Thread.yield() should not be called from the very beginning
if (spinTimeoutNs <= 0L) {
@@ -321,7 +321,7 @@ private void increaseWriteIndex()
// The below condition is checked in a while loop instead of using a lock to avoid frequent thread park.
while ((nextWriteIndex == nextReadIndex) && !Thread.currentThread().isInterrupted()) {
if (timeoutNs <= 0L) {
- throw new RuntimeException(new TimeoutException());
+ throw new QueryTimeoutException();
}
// Thread.yield() should not be called from the very beginning
if (spinTimeoutNs <= 0L) {
@@ -470,7 +470,7 @@ private void increaseReadIndexTo(int target)
while ((curWriteIndex == -1 || target == curWriteIndex) &&
!finished && !Thread.currentThread().isInterrupted()) {
if (timeoutNs <= 0L) {
- throw new RuntimeException(new TimeoutException());
+ throw new QueryTimeoutException();
}
// Thread.yield() should not be called from the very beginning
if (spinTimeoutNs <= 0L) {
diff --git a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
index 5e2ba0e54a1c..24980fea8dd9 100644
--- a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
+++ b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
@@ -25,6 +25,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
@@ -37,6 +38,7 @@
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerFactory;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.aggregation.AggregatorFactory;
@@ -237,7 +239,7 @@ public Sequence call()
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
future.cancel(true);
- throw new QueryInterruptedException(e);
+ throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
}
catch (ExecutionException e) {
throw new RuntimeException(e);
diff --git a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java
index b09f731eea11..8fa5dc0c8b4b 100644
--- a/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java
+++ b/processing/src/main/java/org/apache/druid/query/scan/ScanQueryEngine.java
@@ -25,13 +25,14 @@
import com.google.common.collect.Sets;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.UOE;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.QueryContexts;
-import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.filter.Filter;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
@@ -52,7 +53,6 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
-import java.util.concurrent.TimeoutException;
public class ScanQueryEngine
{
@@ -174,7 +174,7 @@ public ScanResultValue next()
throw new NoSuchElementException();
}
if (hasTimeout && System.currentTimeMillis() >= timeoutAt) {
- throw new QueryInterruptedException(new TimeoutException());
+ throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
}
final long lastOffset = offset;
final Object events;
diff --git a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
index b2385044b560..cf822927fc67 100644
--- a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
@@ -269,14 +269,14 @@ public void run()
ListenableFuture future = capturedFuture.getValue();
// wait for query to time out
- QueryInterruptedException cause = null;
+ QueryTimeoutException cause = null;
try {
resultFuture.get();
}
catch (ExecutionException e) {
- Assert.assertTrue(e.getCause() instanceof QueryInterruptedException);
- Assert.assertEquals("Query timeout", ((QueryInterruptedException) e.getCause()).getErrorCode());
- cause = (QueryInterruptedException) e.getCause();
+ Assert.assertTrue(e.getCause() instanceof QueryTimeoutException);
+ Assert.assertEquals("Query timeout", ((QueryTimeoutException) e.getCause()).getErrorCode());
+ cause = (QueryTimeoutException) e.getCause();
}
queriesInterrupted.await();
Assert.assertNotNull(cause);
diff --git a/processing/src/test/java/org/apache/druid/query/QueryInterruptedExceptionTest.java b/processing/src/test/java/org/apache/druid/query/QueryInterruptedExceptionTest.java
index 11ec31aebe3e..5116a4cafddb 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryInterruptedExceptionTest.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryInterruptedExceptionTest.java
@@ -27,7 +27,6 @@
import org.junit.Test;
import java.util.concurrent.CancellationException;
-import java.util.concurrent.TimeoutException;
public class QueryInterruptedExceptionTest
{
@@ -42,7 +41,6 @@ public void testErrorCode()
);
Assert.assertEquals("Query cancelled", new QueryInterruptedException(new CancellationException()).getErrorCode());
Assert.assertEquals("Query interrupted", new QueryInterruptedException(new InterruptedException()).getErrorCode());
- Assert.assertEquals("Query timeout", new QueryInterruptedException(new TimeoutException()).getErrorCode());
Assert.assertEquals("Unsupported operation", new QueryInterruptedException(new UOE("Unsupported")).getErrorCode());
Assert.assertEquals("Unknown exception", new QueryInterruptedException(null).getErrorCode());
Assert.assertEquals("Unknown exception", new QueryInterruptedException(new ISE("Something bad!")).getErrorCode());
@@ -71,10 +69,6 @@ public void testErrorMessage()
null,
new QueryInterruptedException(new InterruptedException()).getMessage()
);
- Assert.assertEquals(
- null,
- new QueryInterruptedException(new TimeoutException()).getMessage()
- );
Assert.assertEquals(
null,
new QueryInterruptedException(null).getMessage()
@@ -108,10 +102,6 @@ public void testErrorClass()
"java.lang.InterruptedException",
new QueryInterruptedException(new InterruptedException()).getErrorClass()
);
- Assert.assertEquals(
- "java.util.concurrent.TimeoutException",
- new QueryInterruptedException(new TimeoutException()).getErrorClass()
- );
Assert.assertEquals(
"org.apache.druid.query.ResourceLimitExceededException",
new QueryInterruptedException(new ResourceLimitExceededException("too many!")).getErrorClass()
@@ -162,10 +152,6 @@ public void testSerde()
"java.lang.InterruptedException",
roundTrip(new QueryInterruptedException(new InterruptedException())).getErrorClass()
);
- Assert.assertEquals(
- "java.util.concurrent.TimeoutException",
- roundTrip(new QueryInterruptedException(new TimeoutException())).getErrorClass()
- );
Assert.assertEquals(
null,
roundTrip(new QueryInterruptedException(null)).getErrorClass()
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
index 70806e6e55d2..11238f1b8782 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
@@ -33,16 +33,15 @@
import org.apache.druid.query.InsufficientResourcesException;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
-import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.dimension.DefaultDimensionSpec;
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
-import org.hamcrest.CoreMatchers;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
@@ -56,7 +55,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.TimeoutException;
@RunWith(Parameterized.class)
public class GroupByQueryRunnerFailureTest
@@ -182,8 +180,7 @@ public GroupByQueryRunnerFailureTest(QueryRunner runner)
@Test(timeout = 60_000L)
public void testNotEnoughMergeBuffersOnQueryable()
{
- expectedException.expect(QueryInterruptedException.class);
- expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
+ expectedException.expect(QueryTimeoutException.class);
expectedException.expectMessage("Cannot acquire enough merge buffers");
final GroupByQuery query = GroupByQuery
@@ -283,8 +280,7 @@ public void testInsufficientResourcesOnBroker()
@Test(timeout = 60_000L)
public void testTimeoutExceptionOnQueryable()
{
- expectedException.expect(QueryInterruptedException.class);
- expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
+ expectedException.expect(QueryTimeoutException.class);
final GroupByQuery query = GroupByQuery
.builder()
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
index cd94d8166006..a93237e5afe8 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/ConcurrentGrouperTest.java
@@ -27,6 +27,7 @@
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.dimension.DimensionSpec;
@@ -170,6 +171,63 @@ public void run()
grouper.close();
}
+ @Test
+ public void testGrouperTimeout() throws Exception
+ {
+ final ConcurrentGrouper grouper = new ConcurrentGrouper<>(
+ bufferSupplier,
+ TEST_RESOURCE_HOLDER,
+ KEY_SERDE_FACTORY,
+ KEY_SERDE_FACTORY,
+ NULL_FACTORY,
+ new AggregatorFactory[] {new CountAggregatorFactory("cnt")},
+ 1024,
+ 0.7f,
+ 1,
+ new LimitedTemporaryStorage(temporaryFolder.newFolder(), 1024 * 1024),
+ new DefaultObjectMapper(),
+ 8,
+ null,
+ false,
+ MoreExecutors.listeningDecorator(SERVICE),
+ 0,
+ true,
+ 1,
+ 4,
+ 8
+ );
+ grouper.init();
+
+ final int numRows = 1000;
+
+ Future>[] futures = new Future[8];
+
+ for (int i = 0; i < 8; i++) {
+ futures[i] = SERVICE.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ for (long i = 0; i < numRows; i++) {
+ grouper.aggregate(i);
+ }
+ }
+ });
+ }
+
+ for (Future eachFuture : futures) {
+ eachFuture.get();
+ }
+ try {
+ grouper.iterator(true);
+ }
+ catch (RuntimeException e) {
+ Assert.assertTrue(e instanceof QueryTimeoutException);
+ Assert.assertEquals("Query timeout", ((QueryTimeoutException) e).getErrorCode());
+ }
+ grouper.close();
+ }
+
static class TestResourceHolder extends ReferenceCountingResourceHolder
{
private boolean taken;
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouperTest.java b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouperTest.java
index 15712510db18..831db4da1ed7 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouperTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/epinephelinae/StreamingMergeSortedGrouperTest.java
@@ -28,11 +28,12 @@
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.data.input.MapBasedRow;
import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.groupby.epinephelinae.Grouper.Entry;
-import org.hamcrest.CoreMatchers;
+import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -45,9 +46,8 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
-public class StreamingMergeSortedGrouperTest
+public class StreamingMergeSortedGrouperTest extends InitializedNullHandlingTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
@@ -161,8 +161,7 @@ public void testNotEnoughBuffer()
@Test
public void testTimeout()
{
- expectedException.expect(RuntimeException.class);
- expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
+ expectedException.expect(QueryTimeoutException.class);
final TestColumnSelectorFactory columnSelectorFactory = GrouperTestUtil.newColumnSelectorFactory();
final StreamingMergeSortedGrouper grouper = newGrouper(columnSelectorFactory, 100);
diff --git a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java
index 739420980ad9..a08d23b216f3 100644
--- a/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java
+++ b/processing/src/test/java/org/apache/druid/query/scan/ScanQueryRunnerTest.java
@@ -38,10 +38,14 @@
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.Druids;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.TableDataSource;
+import org.apache.druid.query.context.DefaultResponseContext;
+import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.expression.TestExprMacroTable;
import org.apache.druid.query.extraction.MapLookupExtractor;
import org.apache.druid.query.filter.AndDimFilter;
@@ -900,6 +904,25 @@ public void testFullOnSelectWithFilterLimitAndDescendingTimeOrderingCompactedLis
}
}
+ @Test
+ public void testScanQueryTimeout()
+ {
+ ScanQuery query = newTestQuery()
+ .intervals(I_0112_0114)
+ .virtualColumns(EXPR_COLUMN)
+ .context(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1))
+ .build();
+ ResponseContext responseContext = DefaultResponseContext.createEmpty();
+ responseContext.add(ResponseContext.Key.TIMEOUT_AT, System.currentTimeMillis());
+ try {
+ runner.run(QueryPlus.wrap(query), responseContext).toList();
+ }
+ catch (RuntimeException e) {
+ Assert.assertTrue(e instanceof QueryTimeoutException);
+ Assert.assertEquals("Query timeout", ((QueryTimeoutException) e).getErrorCode());
+ }
+ }
+
private List>> toFullEvents(final String[]... valueSet)
{
return toEvents(
diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index fa6c45e9af3f..8bc5d78f4c8a 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -48,6 +48,7 @@
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
@@ -211,7 +212,7 @@ private InputStream dequeue() throws InterruptedException
{
final InputStreamHolder holder = queue.poll(checkQueryTimeout(), TimeUnit.MILLISECONDS);
if (holder == null) {
- throw new RE("Query[%s] url[%s] timed out.", query.getId(), url);
+ throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", query.getId(), url));
}
final long currentQueuedByteCount = queuedByteCount.addAndGet(-holder.getLength());
@@ -428,7 +429,7 @@ private long checkQueryTimeout()
if (timeLeft <= 0) {
String msg = StringUtils.format("Query[%s] url[%s] timed out.", query.getId(), url);
setupResponseReadFailure(msg, null);
- throw new RE(msg);
+ throw new QueryTimeoutException(msg);
} else {
return timeLeft;
}
@@ -451,7 +452,7 @@ private void checkTotalBytesLimit(long bytes)
long timeLeft = timeoutAt - System.currentTimeMillis();
if (timeLeft <= 0) {
- throw new RE("Query[%s] url[%s] timed out.", query.getId(), url);
+ throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query[%s] url[%s] timed out.", query.getId(), url));
}
future = httpClient.go(
diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
index 5fac43bf66f8..7ec4f1503014 100644
--- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
+++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
@@ -30,6 +30,7 @@
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.ResourceLimitExceededException;
import javax.annotation.Nullable;
@@ -112,9 +113,9 @@ public T next()
catch (IOException e) {
// check for timeout, a failure here might be related to a timeout, so lets just attribute it
if (checkTimeout()) {
- TimeoutException timeoutException = timeoutQuery();
+ QueryTimeoutException timeoutException = timeoutQuery();
timeoutException.addSuppressed(e);
- throw interruptQuery(timeoutException);
+ throw timeoutException;
} else {
throw interruptQuery(e);
}
@@ -155,14 +156,14 @@ private void init()
try {
long timeLeftMillis = timeoutAt - System.currentTimeMillis();
if (checkTimeout(timeLeftMillis)) {
- throw interruptQuery(timeoutQuery());
+ throw timeoutQuery();
}
InputStream is = hasTimeout ? future.get(timeLeftMillis, TimeUnit.MILLISECONDS) : future.get();
if (is != null) {
jp = objectMapper.getFactory().createParser(is);
} else if (checkTimeout()) {
- throw interruptQuery(timeoutQuery());
+ throw timeoutQuery();
} else {
// if we haven't timed out completing the future, then this is the likely cause
throw interruptQuery(new ResourceLimitExceededException("url[%s] max bytes limit reached.", url));
@@ -180,15 +181,18 @@ private void init()
);
}
}
- catch (IOException | InterruptedException | ExecutionException | CancellationException | TimeoutException e) {
+ catch (IOException | InterruptedException | ExecutionException | CancellationException e) {
throw interruptQuery(e);
}
+ catch (TimeoutException e) {
+ throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out!", queryId), host);
+ }
}
}
- private TimeoutException timeoutQuery()
+ private QueryTimeoutException timeoutQuery()
{
- return new TimeoutException(StringUtils.format("url[%s] timed out", url));
+ return new QueryTimeoutException(StringUtils.nonStrictFormat("url[%s] timed out", url), host);
}
private QueryInterruptedException interruptQuery(Exception cause)
diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
index cf6cde5081c9..a82bbf8c4e98 100644
--- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
+++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
@@ -40,6 +40,7 @@
import org.apache.druid.query.QueryMetrics;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.context.ResponseContext;
@@ -331,7 +332,7 @@ public void emitLogsAndMetrics(
if (e != null) {
statsMap.put("exception", e.toString());
log.noStackTrace().warn(e, "Exception while processing queryId [%s]", baseQuery.getId());
- if (e instanceof QueryInterruptedException) {
+ if (e instanceof QueryInterruptedException || e instanceof QueryTimeoutException) {
// Mimic behavior from QueryResource, where this code was originally taken from.
statsMap.put("interrupted", true);
statsMap.put("reason", e.toString());
diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java
index cb8746c2d881..27ac513bd6e1 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResource.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResource.java
@@ -44,6 +44,7 @@
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryException;
import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.TruncatedResponseContextException;
@@ -330,6 +331,11 @@ public void write(OutputStream outputStream) throws WebApplicationException
queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1);
return ioReaderWriter.gotError(e);
}
+ catch (QueryTimeoutException timeout) {
+ interruptedQueryCount.incrementAndGet();
+ queryLifecycle.emitLogsAndMetrics(timeout, req.getRemoteAddr(), -1);
+ return ioReaderWriter.gotTimeout(timeout);
+ }
catch (QueryCapacityExceededException cap) {
failedQueryCount.incrementAndGet();
queryLifecycle.emitLogsAndMetrics(cap, req.getRemoteAddr(), -1);
@@ -465,6 +471,17 @@ Response gotError(Exception e) throws IOException
.build();
}
+ Response gotTimeout(QueryTimeoutException e) throws IOException
+ {
+ return Response.status(QueryTimeoutException.STATUS_CODE)
+ .type(contentType)
+ .entity(
+ newOutputWriter(null, null, false)
+ .writeValueAsBytes(e)
+ )
+ .build();
+ }
+
Response gotLimited(QueryCapacityExceededException e) throws IOException
{
return Response.status(QueryCapacityExceededException.STATUS_CODE)
diff --git a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
index 6c50249693f8..c38bcd4ce9a1 100644
--- a/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
+++ b/server/src/test/java/org/apache/druid/client/DirectDruidClientTest.java
@@ -40,6 +40,7 @@
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunnerTestHelper;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.ReflectionQueryToolChestWarehouse;
import org.apache.druid.query.Result;
import org.apache.druid.query.timeboundary.TimeBoundaryQuery;
@@ -93,8 +94,8 @@ public void setup()
{
httpClient = EasyMock.createMock(HttpClient.class);
serverSelector = new ServerSelector(
- dataSegment,
- new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
+ dataSegment,
+ new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
);
client = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
@@ -354,7 +355,7 @@ public void testQueryTimeoutBeforeFuture() throws IOException, InterruptedExcept
in
);
- QueryInterruptedException actualException = null;
+ QueryTimeoutException actualException = null;
try {
out.write(StringUtils.toUtf8("[{\"timestamp\":\"2014-01-01T01:02:03Z\"}"));
Thread.sleep(250);
@@ -362,7 +363,7 @@ public void testQueryTimeoutBeforeFuture() throws IOException, InterruptedExcept
out.close();
results.toList();
}
- catch (QueryInterruptedException e) {
+ catch (QueryTimeoutException e) {
actualException = e;
}
Assert.assertNotNull(actualException);
@@ -399,16 +400,16 @@ public void testQueryTimeoutFromFuture()
Sequence results = client.run(QueryPlus.wrap(query));
- QueryInterruptedException actualException = null;
+ QueryTimeoutException actualException = null;
try {
results.toList();
}
- catch (QueryInterruptedException e) {
+ catch (QueryTimeoutException e) {
actualException = e;
}
Assert.assertNotNull(actualException);
Assert.assertEquals("Query timeout", actualException.getErrorCode());
- Assert.assertEquals("Timeout waiting for task.", actualException.getMessage());
+ Assert.assertEquals(StringUtils.format("Query [%s] timed out!", queryId), actualException.getMessage());
Assert.assertEquals(hostName, actualException.getHost());
EasyMock.verify(httpClient);
}
diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index 0a24b0edb8ad..5c501077ab26 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -41,6 +41,7 @@
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.Result;
@@ -629,6 +630,62 @@ public Access authorize(AuthenticationResult authenticationResult, Resource reso
);
}
+ @Test
+ public void testQueryTimeoutException() throws Exception
+ {
+ final QuerySegmentWalker timeoutSegmentWalker = new QuerySegmentWalker()
+ {
+ @Override
+ public QueryRunner getQueryRunnerForIntervals(Query query, Iterable intervals)
+ {
+ throw new QueryTimeoutException();
+ }
+
+ @Override
+ public QueryRunner getQueryRunnerForSegments(Query query, Iterable specs)
+ {
+ return getQueryRunnerForIntervals(null, null);
+ }
+ };
+
+ final QueryResource timeoutQueryResource = new QueryResource(
+ new QueryLifecycleFactory(
+ WAREHOUSE,
+ timeoutSegmentWalker,
+ new DefaultGenericQueryMetricsFactory(),
+ new NoopServiceEmitter(),
+ testRequestLogger,
+ new AuthConfig(),
+ AuthTestUtils.TEST_AUTHORIZER_MAPPER,
+ Suppliers.ofInstance(new DefaultQueryConfig(ImmutableMap.of()))
+ ),
+ JSON_MAPPER,
+ JSON_MAPPER,
+ queryScheduler,
+ new AuthConfig(),
+ null,
+ ResponseContextConfig.newConfig(true),
+ DRUID_NODE
+ );
+ expectPermissiveHappyPathAuth();
+ Response response = timeoutQueryResource.doPost(
+ new ByteArrayInputStream(SIMPLE_TIMESERIES_QUERY.getBytes(StandardCharsets.UTF_8)),
+ null /*pretty*/,
+ testServletRequest
+ );
+ Assert.assertNotNull(response);
+ Assert.assertEquals(QueryTimeoutException.STATUS_CODE, response.getStatus());
+ QueryTimeoutException ex;
+ try {
+ ex = JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryTimeoutException.class);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Assert.assertEquals("Query Timed Out!", ex.getMessage());
+ Assert.assertEquals(QueryTimeoutException.ERROR_CODE, ex.getErrorCode());
+ }
+
@Test(timeout = 60_000L)
public void testSecuredCancelQuery() throws Exception
{
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
index 4c9135941ca0..9453d71c5143 100644
--- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
+++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
@@ -36,6 +36,7 @@
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.server.QueryStats;
import org.apache.druid.server.RequestLogLine;
import org.apache.druid.server.log.RequestLogger;
@@ -340,7 +341,7 @@ public void emitLogsAndMetrics(
if (e != null) {
statsMap.put("exception", e.toString());
- if (e instanceof QueryInterruptedException) {
+ if (e instanceof QueryInterruptedException || e instanceof QueryTimeoutException) {
statsMap.put("interrupted", true);
statsMap.put("reason", e.toString());
}
diff --git a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
index 1c950c94819c..03173eea3bad 100644
--- a/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
+++ b/sql/src/main/java/org/apache/druid/sql/http/SqlResource.java
@@ -33,6 +33,7 @@
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.server.QueryCapacityExceededException;
import org.apache.druid.server.security.ForbiddenException;
@@ -182,6 +183,10 @@ public Response doPost(
lifecycle.emitLogsAndMetrics(unsupported, remoteAddr, -1);
return Response.status(QueryUnsupportedException.STATUS_CODE).entity(jsonMapper.writeValueAsBytes(unsupported)).build();
}
+ catch (QueryTimeoutException timeout) {
+ lifecycle.emitLogsAndMetrics(timeout, remoteAddr, -1);
+ return Response.status(QueryTimeoutException.STATUS_CODE).entity(jsonMapper.writeValueAsBytes(timeout)).build();
+ }
catch (ForbiddenException e) {
throw e; // let ForbiddenExceptionMapper handle this
}
diff --git a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
index cc3bce3f6b26..123277e491e5 100644
--- a/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
+++ b/sql/src/test/java/org/apache/druid/sql/http/SqlResourceTest.java
@@ -38,9 +38,11 @@
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryException;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.server.QueryCapacityExceededException;
@@ -801,6 +803,25 @@ public void testTooManyRequests() throws Exception
Assert.assertEquals(3, testRequestLogger.getSqlQueryLogs().size());
}
+ @Test
+ public void testQueryTimeoutException() throws Exception
+ {
+ Map queryContext = ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1);
+ final QueryException timeoutException = doPost(
+ new SqlQuery(
+ "SELECT CAST(__time AS DATE), dim1, dim2, dim3 FROM druid.foo GROUP by __time, dim1, dim2, dim3 ORDER BY dim2 DESC",
+ ResultFormat.OBJECT,
+ false,
+ queryContext,
+ null
+ )
+ ).lhs;
+ Assert.assertNotNull(timeoutException);
+ Assert.assertEquals(timeoutException.getErrorCode(), QueryTimeoutException.ERROR_CODE);
+ Assert.assertEquals(timeoutException.getErrorClass(), QueryTimeoutException.class.getName());
+
+ }
+
@SuppressWarnings("unchecked")
private void checkSqlRequestLog(boolean success)
{