diff --git a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java
index 8638b3917c91..2a5c489cf72c 100644
--- a/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java
+++ b/core/src/main/java/org/apache/druid/java/util/common/guava/ParallelMergeCombiningSequence.java
@@ -46,7 +46,6 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
@@ -780,7 +779,7 @@ public boolean block() throws InterruptedException
if (hasTimeout) {
final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime();
if (thisTimeoutNanos < 0) {
- throw new RE(new TimeoutException("QueuePusher timed out offering data"));
+ throw new QueryTimeoutException("QueuePusher timed out offering data");
}
success = queue.offer(item, thisTimeoutNanos, TimeUnit.NANOSECONDS);
} else {
@@ -1127,7 +1126,7 @@ public boolean block() throws InterruptedException
if (hasTimeout) {
final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime();
if (thisTimeoutNanos < 0) {
- throw new RE(new TimeoutException("BlockingQueue cursor timed out waiting for data"));
+ throw new QueryTimeoutException("BlockingQueue cursor timed out waiting for data");
}
resultBatch = queue.poll(thisTimeoutNanos, TimeUnit.NANOSECONDS);
} else {
diff --git a/core/src/main/java/org/apache/druid/query/QueryException.java b/core/src/main/java/org/apache/druid/query/QueryException.java
index 8a835573a2c4..b8db5bd48d7b 100644
--- a/core/src/main/java/org/apache/druid/query/QueryException.java
+++ b/core/src/main/java/org/apache/druid/query/QueryException.java
@@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
+import java.net.InetAddress;
/**
* Base serializable error response
@@ -35,7 +36,7 @@ public class QueryException extends RuntimeException
private final String errorClass;
private final String host;
- public QueryException(Throwable cause, String errorCode, String errorClass, String host)
+ protected QueryException(Throwable cause, String errorCode, String errorClass, String host)
{
super(cause == null ? null : cause.getMessage(), cause);
this.errorCode = errorCode;
@@ -44,7 +45,7 @@ public QueryException(Throwable cause, String errorCode, String errorClass, Stri
}
@JsonCreator
- public QueryException(
+ protected QueryException(
@JsonProperty("error") @Nullable String errorCode,
@JsonProperty("errorMessage") String errorMessage,
@JsonProperty("errorClass") @Nullable String errorClass,
@@ -82,4 +83,15 @@ public String getHost()
{
return host;
}
+
+ @Nullable
+ protected static String resolveHostname()
+ {
+ try {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ }
+ catch (Exception e) {
+ return null;
+ }
+ }
}
diff --git a/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java b/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java
index 6eca438190fe..d3626e9c5e60 100644
--- a/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java
+++ b/core/src/main/java/org/apache/druid/query/QueryTimeoutException.java
@@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
-import java.net.InetAddress;
/**
* This exception is thrown when a query does not finish before the configured query timeout.
@@ -66,17 +65,4 @@ public QueryTimeoutException(String errorMessage, String host)
{
super(ERROR_CODE, errorMessage, ERROR_CLASS, host);
}
-
-
- private static String resolveHostname()
- {
- String host;
- try {
- host = InetAddress.getLocalHost().getCanonicalHostName();
- }
- catch (Exception e) {
- host = null;
- }
- return host;
- }
}
diff --git a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
index 936e79433310..b5652b9d90d5 100644
--- a/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
+++ b/integration-tests/src/test/java/org/apache/druid/tests/query/ITWikipediaQueryTest.java
@@ -23,8 +23,8 @@
import com.google.inject.Inject;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.Druids;
+import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
-import org.apache.druid.server.QueryCapacityExceededException;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.clients.QueryResourceTestClient;
diff --git a/processing/src/main/java/org/apache/druid/query/BadJsonQueryException.java b/processing/src/main/java/org/apache/druid/query/BadJsonQueryException.java
new file mode 100644
index 000000000000..8be18edf18a0
--- /dev/null
+++ b/processing/src/main/java/org/apache/druid/query/BadJsonQueryException.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonParseException;
+
+public class BadJsonQueryException extends BadQueryException
+{
+ public static final String ERROR_CODE = "Json parse failed";
+ public static final String ERROR_CLASS = JsonParseException.class.getName();
+
+ public BadJsonQueryException(JsonParseException e)
+ {
+ this(ERROR_CODE, e.getMessage(), ERROR_CLASS);
+ }
+
+ @JsonCreator
+ private BadJsonQueryException(
+ @JsonProperty("error") String errorCode,
+ @JsonProperty("errorMessage") String errorMessage,
+ @JsonProperty("errorClass") String errorClass
+ )
+ {
+ super(errorCode, errorMessage, errorClass);
+ }
+}
diff --git a/processing/src/main/java/org/apache/druid/query/InsufficientResourcesException.java b/processing/src/main/java/org/apache/druid/query/BadQueryException.java
similarity index 59%
rename from processing/src/main/java/org/apache/druid/query/InsufficientResourcesException.java
rename to processing/src/main/java/org/apache/druid/query/BadQueryException.java
index 05424bf47613..b115cc1170c3 100644
--- a/processing/src/main/java/org/apache/druid/query/InsufficientResourcesException.java
+++ b/processing/src/main/java/org/apache/druid/query/BadQueryException.java
@@ -20,12 +20,21 @@
package org.apache.druid.query;
/**
- * This exception is thrown when the requested operation cannot be completed due to a lack of available resources.
+ * An abstract class for all query exceptions that should return a bad request status code (400).
+ *
+ * See {@code BadRequestException} for non-query requests.
*/
-public class InsufficientResourcesException extends RuntimeException
+public abstract class BadQueryException extends QueryException
{
- public InsufficientResourcesException(String message)
+ public static final int STATUS_CODE = 400;
+
+ protected BadQueryException(String errorCode, String errorMessage, String errorClass)
+ {
+ super(errorCode, errorMessage, errorClass, null);
+ }
+
+ protected BadQueryException(String errorCode, String errorMessage, String errorClass, String host)
{
- super(message);
+ super(errorCode, errorMessage, errorClass, host);
}
}
diff --git a/server/src/main/java/org/apache/druid/server/QueryCapacityExceededException.java b/processing/src/main/java/org/apache/druid/query/QueryCapacityExceededException.java
similarity index 74%
rename from server/src/main/java/org/apache/druid/server/QueryCapacityExceededException.java
rename to processing/src/main/java/org/apache/druid/query/QueryCapacityExceededException.java
index 957ebcd23efb..f62eb9166d8a 100644
--- a/server/src/main/java/org/apache/druid/server/QueryCapacityExceededException.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryCapacityExceededException.java
@@ -17,17 +17,21 @@
* under the License.
*/
-package org.apache.druid.server;
+package org.apache.druid.query;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.query.QueryException;
/**
- * This exception is for {@link QueryResource} and SqlResource to surface when a query is cast away by
- * {@link QueryScheduler}.
+ * This exception is for QueryResource and SqlResource to surface when a query is cast away after
+ * it hits a resource limit. It is currently used in 2 places:
+ *
+ *
+ * - When the query is rejected by QueryScheduler.
+ * - When the query cannot acquire enough merge buffers for groupBy v2
+ *
*
* As a {@link QueryException} it is expected to be serialied to a json response, but will be mapped to
* {@link #STATUS_CODE} instead of the default HTTP 500 status.
@@ -52,13 +56,24 @@ public QueryCapacityExceededException(String lane, int capacity)
super(ERROR_CODE, makeLaneErrorMessage(lane, capacity), ERROR_CLASS, null);
}
+ /**
+ * This method sets hostName unlike constructors because this can be called in historicals
+ * while those constructors are only used in brokers.
+ */
+ public static QueryCapacityExceededException withErrorMessageAndResolvedHost(String errorMessage)
+ {
+ return new QueryCapacityExceededException(ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
+ }
+
@JsonCreator
public QueryCapacityExceededException(
@JsonProperty("error") String errorCode,
@JsonProperty("errorMessage") String errorMessage,
- @JsonProperty("errorClass") String errorClass)
+ @JsonProperty("errorClass") String errorClass,
+ @JsonProperty("host") String host
+ )
{
- super(errorCode, errorMessage, errorClass, null);
+ super(errorCode, errorMessage, errorClass, host);
}
@VisibleForTesting
diff --git a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java
index b174c23857c7..bf000f6ec83a 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryInterruptedException.java
@@ -45,8 +45,7 @@ public class QueryInterruptedException extends QueryException
{
public static final String QUERY_INTERRUPTED = "Query interrupted";
public static final String QUERY_CANCELLED = "Query cancelled";
- public static final String RESOURCE_LIMIT_EXCEEDED = "Resource limit exceeded";
- public static final String UNAUTHORIZED = "Unauthorized request.";
+ public static final String UNAUTHORIZED = "Unauthorized request";
public static final String UNSUPPORTED_OPERATION = "Unsupported operation";
public static final String TRUNCATED_RESPONSE_CONTEXT = "Truncated response context";
public static final String UNKNOWN_EXCEPTION = "Unknown exception";
@@ -98,8 +97,6 @@ private static String getErrorCodeFromThrowable(Throwable e)
return QUERY_INTERRUPTED;
} else if (e instanceof CancellationException) {
return QUERY_CANCELLED;
- } else if (e instanceof ResourceLimitExceededException) {
- return RESOURCE_LIMIT_EXCEEDED;
} else if (e instanceof UnsupportedOperationException) {
return UNSUPPORTED_OPERATION;
} else if (e instanceof TruncatedResponseContextException) {
diff --git a/processing/src/main/java/org/apache/druid/query/QueryUnsupportedException.java b/processing/src/main/java/org/apache/druid/query/QueryUnsupportedException.java
index 41126dcaf4f0..bde1f9d14e1d 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryUnsupportedException.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryUnsupportedException.java
@@ -24,21 +24,20 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import javax.annotation.Nullable;
-import java.net.InetAddress;
/**
* This exception is for the query engine to surface when a query cannot be run. This can be due to the
* following reasons: 1) The query is not supported yet. 2) The query is not something Druid would ever supports.
* For these cases, the exact causes and details should also be documented in Druid user facing documents.
*
- * As a {@link QueryException} it is expected to be serialied to a json response, but will be mapped to
- * {@link #STATUS_CODE} instead of the default HTTP 500 status.
+ * As a {@link QueryException} it is expected to be serialized to a json response with a proper HTTP error code
+ * ({@link #STATUS_CODE}).
*/
public class QueryUnsupportedException extends QueryException
{
private static final String ERROR_CLASS = QueryUnsupportedException.class.getName();
public static final String ERROR_CODE = "Unsupported query";
- public static final int STATUS_CODE = 400;
+ public static final int STATUS_CODE = 501;
@JsonCreator
public QueryUnsupportedException(
@@ -55,16 +54,4 @@ public QueryUnsupportedException(String errorMessage)
{
super(ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
}
-
- private static String resolveHostname()
- {
- String host;
- try {
- host = InetAddress.getLocalHost().getCanonicalHostName();
- }
- catch (Exception e) {
- host = null;
- }
- return host;
- }
}
diff --git a/processing/src/main/java/org/apache/druid/query/ResourceLimitExceededException.java b/processing/src/main/java/org/apache/druid/query/ResourceLimitExceededException.java
index f24a2ff3e466..8a3ba000be93 100644
--- a/processing/src/main/java/org/apache/druid/query/ResourceLimitExceededException.java
+++ b/processing/src/main/java/org/apache/druid/query/ResourceLimitExceededException.java
@@ -19,18 +19,34 @@
package org.apache.druid.query;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.StringUtils;
/**
* Exception indicating that an operation failed because it exceeded some configured resource limit.
*
- * This is used as a marker exception by {@link QueryInterruptedException} to report the "Resource limit exceeded"
- * error code.
+ * This is a {@link BadQueryException} because it likely indicates a user's misbehavior when this exception is thrown.
+ * The resource limitations set by Druid cluster operators are typically less flexible than the parameters of
+ * a user query, so when a user query requires too many resources, the likely remedy is that the user query
+ * should be modified to use fewer resources, or to reduce query volume.
*/
-public class ResourceLimitExceededException extends RuntimeException
+public class ResourceLimitExceededException extends BadQueryException
{
+ public static final String ERROR_CODE = "Resource limit exceeded";
+
public ResourceLimitExceededException(String message, Object... arguments)
{
- super(StringUtils.nonStrictFormat(message, arguments));
+ this(ERROR_CODE, StringUtils.nonStrictFormat(message, arguments), ResourceLimitExceededException.class.getName());
+ }
+
+ @JsonCreator
+ private ResourceLimitExceededException(
+ @JsonProperty("error") String errorCode,
+ @JsonProperty("errorMessage") String errorMessage,
+ @JsonProperty("errorClass") String errorClass
+ )
+ {
+ super(errorCode, errorMessage, errorClass, resolveHostname());
}
}
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
index f9cd21b9c041..7e8c49c6c03c 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java
@@ -332,7 +332,7 @@ private List> getMergeBuffersHolder(
}
return mergeBufferHolder;
}
- catch (QueryTimeoutException e) {
+ catch (QueryTimeoutException | ResourceLimitExceededException e) {
throw e;
}
catch (Exception e) {
diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
index af452eb39c62..95338f5e6134 100644
--- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
+++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java
@@ -33,6 +33,7 @@
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.LazySequence;
@@ -40,8 +41,8 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidProcessingConfig;
-import org.apache.druid.query.InsufficientResourcesException;
import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryPlus;
@@ -133,7 +134,12 @@ public GroupByQueryResource prepareResource(GroupByQuery query)
mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum);
}
if (mergeBufferHolders.isEmpty()) {
- throw new InsufficientResourcesException("Cannot acquire enough merge buffers");
+ throw QueryCapacityExceededException.withErrorMessageAndResolvedHost(
+ StringUtils.format(
+ "Cannot acquire %s merge buffers. Try again after current running queries are finished.",
+ requiredMergeBufferNum
+ )
+ );
} else {
return new GroupByQueryResource(mergeBufferHolders);
}
diff --git a/processing/src/test/java/org/apache/druid/query/QueryInterruptedExceptionTest.java b/processing/src/test/java/org/apache/druid/query/QueryInterruptedExceptionTest.java
index 5116a4cafddb..b450b6ba7180 100644
--- a/processing/src/test/java/org/apache/druid/query/QueryInterruptedExceptionTest.java
+++ b/processing/src/test/java/org/apache/druid/query/QueryInterruptedExceptionTest.java
@@ -44,10 +44,6 @@ public void testErrorCode()
Assert.assertEquals("Unsupported operation", new QueryInterruptedException(new UOE("Unsupported")).getErrorCode());
Assert.assertEquals("Unknown exception", new QueryInterruptedException(null).getErrorCode());
Assert.assertEquals("Unknown exception", new QueryInterruptedException(new ISE("Something bad!")).getErrorCode());
- Assert.assertEquals(
- "Resource limit exceeded",
- new QueryInterruptedException(new ResourceLimitExceededException("too many!")).getErrorCode()
- );
Assert.assertEquals(
"Unknown exception",
new QueryInterruptedException(new QueryInterruptedException(new ISE("Something bad!"))).getErrorCode()
@@ -73,10 +69,6 @@ public void testErrorMessage()
null,
new QueryInterruptedException(null).getMessage()
);
- Assert.assertEquals(
- "too many!",
- new QueryInterruptedException(new ResourceLimitExceededException("too many!")).getMessage()
- );
Assert.assertEquals(
"Something bad!",
new QueryInterruptedException(new ISE("Something bad!")).getMessage()
@@ -102,10 +94,6 @@ public void testErrorClass()
"java.lang.InterruptedException",
new QueryInterruptedException(new InterruptedException()).getErrorClass()
);
- Assert.assertEquals(
- "org.apache.druid.query.ResourceLimitExceededException",
- new QueryInterruptedException(new ResourceLimitExceededException("too many!")).getErrorClass()
- );
Assert.assertEquals(
null,
new QueryInterruptedException(null).getErrorClass()
diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
index 11238f1b8782..f98785231f39 100644
--- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
+++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
@@ -30,7 +30,7 @@
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.DruidProcessingConfig;
-import org.apache.druid.query.InsufficientResourcesException;
+import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryRunner;
@@ -243,7 +243,7 @@ public void testResourceLimitExceededOnBroker()
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
}
- @Test(timeout = 60_000L, expected = InsufficientResourcesException.class)
+ @Test(timeout = 60_000L)
public void testInsufficientResourcesOnBroker()
{
final GroupByQuery query = GroupByQuery
@@ -268,6 +268,8 @@ public void testInsufficientResourcesOnBroker()
List> holder = null;
try {
holder = MERGE_BUFFER_POOL.takeBatch(1, 10);
+ expectedException.expect(QueryCapacityExceededException.class);
+ expectedException.expectMessage("Cannot acquire 1 merge buffers. Try again after current running queries are finished.");
GroupByQueryRunnerTestHelper.runQuery(FACTORY, runner, query);
}
finally {
diff --git a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
index 8bc5d78f4c8a..23b9f046d128 100644
--- a/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
+++ b/server/src/main/java/org/apache/druid/client/DirectDruidClient.java
@@ -52,6 +52,7 @@
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryWatcher;
+import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.aggregation.MetricManipulatorFns;
import org.apache.druid.query.context.ConcurrentResponseContext;
import org.apache.druid.query.context.ResponseContext;
@@ -444,7 +445,7 @@ private void checkTotalBytesLimit(long bytes)
url
);
setupResponseReadFailure(msg, null);
- throw new RE(msg);
+ throw new ResourceLimitExceededException(msg);
}
}
};
diff --git a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
index 7ec4f1503014..c7f12e7fa1dc 100644
--- a/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
+++ b/server/src/main/java/org/apache/druid/client/JsonParserIterator.java
@@ -29,8 +29,11 @@
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryCapacityExceededException;
+import org.apache.druid.query.QueryException;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryTimeoutException;
+import org.apache.druid.query.QueryUnsupportedException;
import org.apache.druid.query.ResourceLimitExceededException;
import javax.annotation.Nullable;
@@ -117,7 +120,7 @@ public T next()
timeoutException.addSuppressed(e);
throw timeoutException;
} else {
- throw interruptQuery(e);
+ throw convertException(e);
}
}
}
@@ -165,8 +168,11 @@ private void init()
} else if (checkTimeout()) {
throw timeoutQuery();
} else {
- // if we haven't timed out completing the future, then this is the likely cause
- throw interruptQuery(new ResourceLimitExceededException("url[%s] max bytes limit reached.", url));
+ // TODO: NettyHttpClient should check the actual cause of the failure and set it in the future properly.
+ throw new ResourceLimitExceededException(
+ "Possibly max scatter-gather bytes limit reached while reading from url[%s].",
+ url
+ );
}
final JsonToken nextToken = jp.nextToken();
@@ -174,15 +180,15 @@ private void init()
jp.nextToken();
objectCodec = jp.getCodec();
} else if (nextToken == JsonToken.START_OBJECT) {
- throw interruptQuery(jp.getCodec().readValue(jp, QueryInterruptedException.class));
+ throw convertException(jp.getCodec().readValue(jp, QueryException.class));
} else {
- throw interruptQuery(
+ throw convertException(
new IAE("Next token wasn't a START_ARRAY, was[%s] from url[%s]", jp.getCurrentToken(), url)
);
}
}
catch (IOException | InterruptedException | ExecutionException | CancellationException e) {
- throw interruptQuery(e);
+ throw convertException(e);
}
catch (TimeoutException e) {
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out!", queryId), host);
@@ -195,10 +201,73 @@ private QueryTimeoutException timeoutQuery()
return new QueryTimeoutException(StringUtils.nonStrictFormat("url[%s] timed out", url), host);
}
- private QueryInterruptedException interruptQuery(Exception cause)
+ /**
+ * Converts the given exception to a proper type of {@link QueryException}.
+ * The use cases of this method are:
+ *
+ * - All non-QueryExceptions are wrapped with {@link QueryInterruptedException}.
+ * - The QueryException from {@link DirectDruidClient} is converted to a more specific type of QueryException
+ * based on {@link QueryException#getErrorCode()}. During conversion, {@link QueryException#host} is overridden
+ * by {@link #host}.
+ */
+ private QueryException convertException(Exception cause)
{
LOG.warn(cause, "Query [%s] to host [%s] interrupted", queryId, host);
- return new QueryInterruptedException(cause, host);
+ if (cause instanceof QueryException) {
+ final QueryException queryException = (QueryException) cause;
+ if (queryException.getErrorCode() == null) {
+ // errorCode should not be null now, but maybe could be null in the past..
+ return new QueryInterruptedException(
+ queryException.getErrorCode(),
+ queryException.getMessage(),
+ queryException.getErrorClass(),
+ host
+ );
+ }
+
+ // Note: this switch clause is to restore the 'type' information of QueryExceptions which is lost during
+ // JSON serialization. This is not a good way to restore the correct exception type. Rather, QueryException
+ // should store its type when it is serialized, so that we can know the exact type when it is deserialized.
+ switch (queryException.getErrorCode()) {
+ // The below is the list of exceptions that can be thrown in historicals and propagated to the broker.
+ case QueryTimeoutException.ERROR_CODE:
+ return new QueryTimeoutException(
+ queryException.getErrorCode(),
+ queryException.getMessage(),
+ queryException.getErrorClass(),
+ host
+ );
+ case QueryCapacityExceededException.ERROR_CODE:
+ return new QueryCapacityExceededException(
+ queryException.getErrorCode(),
+ queryException.getMessage(),
+ queryException.getErrorClass(),
+ host
+ );
+ case QueryUnsupportedException.ERROR_CODE:
+ return new QueryUnsupportedException(
+ queryException.getErrorCode(),
+ queryException.getMessage(),
+ queryException.getErrorClass(),
+ host
+ );
+ case ResourceLimitExceededException.ERROR_CODE:
+ return new ResourceLimitExceededException(
+ queryException.getErrorCode(),
+ queryException.getMessage(),
+ queryException.getErrorClass(),
+ host
+ );
+ default:
+ return new QueryInterruptedException(
+ queryException.getErrorCode(),
+ queryException.getMessage(),
+ queryException.getErrorClass(),
+ host
+ );
+ }
+ } else {
+ return new QueryInterruptedException(cause, host);
+ }
}
}
-
diff --git a/server/src/main/java/org/apache/druid/server/QueryResource.java b/server/src/main/java/org/apache/druid/server/QueryResource.java
index 75e68ad98996..6394395a1f0b 100644
--- a/server/src/main/java/org/apache/druid/server/QueryResource.java
+++ b/server/src/main/java/org/apache/druid/server/QueryResource.java
@@ -19,6 +19,8 @@
package org.apache.druid.server;
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.module.SimpleModule;
@@ -40,13 +42,17 @@
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.query.BadJsonQueryException;
+import org.apache.druid.query.BadQueryException;
import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryException;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.QueryUnsupportedException;
+import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.TruncatedResponseContextException;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.server.metrics.QueryCountStatsProvider;
@@ -70,6 +76,7 @@
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import java.io.IOException;
import java.io.InputStream;
@@ -347,6 +354,11 @@ public void write(OutputStream outputStream) throws WebApplicationException
queryLifecycle.emitLogsAndMetrics(unsupported, req.getRemoteAddr(), -1);
return ioReaderWriter.gotUnsupported(unsupported);
}
+ catch (BadJsonQueryException | ResourceLimitExceededException e) {
+ interruptedQueryCount.incrementAndGet();
+ queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1);
+ return ioReaderWriter.gotBadQuery(e);
+ }
catch (ForbiddenException e) {
// don't do anything for an authorization failure, ForbiddenExceptionMapper will catch this later and
// send an error response if this is thrown.
@@ -375,7 +387,13 @@ private Query> readQuery(
final ResourceIOReaderWriter ioReaderWriter
) throws IOException
{
- Query baseQuery = ioReaderWriter.getInputMapper().readValue(in, Query.class);
+ Query baseQuery;
+ try {
+ baseQuery = ioReaderWriter.getInputMapper().readValue(in, Query.class);
+ }
+ catch (JsonParseException e) {
+ throw new BadJsonQueryException(e);
+ }
String prevEtag = getPreviousEtag(req);
if (prevEtag != null) {
@@ -463,36 +481,36 @@ Response ok(Object object) throws IOException
Response gotError(Exception e) throws IOException
{
- return Response.serverError()
- .type(contentType)
- .entity(
- newOutputWriter(null, null, false)
- .writeValueAsBytes(QueryInterruptedException.wrapIfNeeded(e))
- )
- .build();
+ return buildNonOkResponse(
+ Status.INTERNAL_SERVER_ERROR.getStatusCode(),
+ QueryInterruptedException.wrapIfNeeded(e)
+ );
}
Response gotTimeout(QueryTimeoutException e) throws IOException
{
- return Response.status(QueryTimeoutException.STATUS_CODE)
- .type(contentType)
- .entity(
- newOutputWriter(null, null, false)
- .writeValueAsBytes(e)
- )
- .build();
+ return buildNonOkResponse(QueryTimeoutException.STATUS_CODE, e);
}
Response gotLimited(QueryCapacityExceededException e) throws IOException
{
- return Response.status(QueryCapacityExceededException.STATUS_CODE)
- .entity(newOutputWriter(null, null, false).writeValueAsBytes(e))
- .build();
+ return buildNonOkResponse(QueryCapacityExceededException.STATUS_CODE, e);
}
Response gotUnsupported(QueryUnsupportedException e) throws IOException
{
- return Response.status(QueryUnsupportedException.STATUS_CODE)
+ return buildNonOkResponse(QueryUnsupportedException.STATUS_CODE, e);
+ }
+
+ Response gotBadQuery(BadQueryException e) throws IOException
+ {
+ return buildNonOkResponse(BadQueryException.STATUS_CODE, e);
+ }
+
+ Response buildNonOkResponse(int status, Exception e) throws JsonProcessingException
+ {
+ return Response.status(status)
+ .type(contentType)
.entity(newOutputWriter(null, null, false).writeValueAsBytes(e))
.build();
}
diff --git a/server/src/main/java/org/apache/druid/server/QueryScheduler.java b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
index f50b50c80390..46f2580ba54a 100644
--- a/server/src/main/java/org/apache/druid/server/QueryScheduler.java
+++ b/server/src/main/java/org/apache/druid/server/QueryScheduler.java
@@ -33,6 +33,7 @@
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
diff --git a/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestException.java b/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestException.java
index 8badcabe5604..78d711c62bf2 100644
--- a/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestException.java
+++ b/server/src/main/java/org/apache/druid/server/initialization/jetty/BadRequestException.java
@@ -19,6 +19,12 @@
package org.apache.druid.server.initialization.jetty;
+/**
+ * This class is for any exceptions that should return a bad request status code (404).
+ * See {@code BadQueryException} for query requests.
+ *
+ * @see BadRequestExceptionMapper
+ */
public class BadRequestException extends RuntimeException
{
public BadRequestException(String msg)
diff --git a/server/src/main/java/org/apache/druid/server/security/ForbiddenException.java b/server/src/main/java/org/apache/druid/server/security/ForbiddenException.java
index f278236e98fb..6cf326cc9895 100644
--- a/server/src/main/java/org/apache/druid/server/security/ForbiddenException.java
+++ b/server/src/main/java/org/apache/druid/server/security/ForbiddenException.java
@@ -19,6 +19,9 @@
package org.apache.druid.server.security;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
/**
* Throw this when a request is unauthorized and we want to send a 403 response back, Jersey exception mapper will
* take care of sending the response.
@@ -30,8 +33,15 @@ public ForbiddenException()
super("Unauthorized.");
}
- public ForbiddenException(String msg)
+ @JsonCreator
+ public ForbiddenException(@JsonProperty("errorMessage") String msg)
{
super(msg);
}
+
+ @JsonProperty
+ public String getErrorMessage()
+ {
+ return super.getMessage();
+ }
}
diff --git a/server/src/main/java/org/apache/druid/server/security/SecuritySanityCheckFilter.java b/server/src/main/java/org/apache/druid/server/security/SecuritySanityCheckFilter.java
index 2e23898a6463..cdc9e045785e 100644
--- a/server/src/main/java/org/apache/druid/server/security/SecuritySanityCheckFilter.java
+++ b/server/src/main/java/org/apache/druid/server/security/SecuritySanityCheckFilter.java
@@ -21,8 +21,6 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.query.QueryInterruptedException;
-import org.apache.druid.server.DruidNode;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
@@ -41,19 +39,12 @@ public class SecuritySanityCheckFilter implements Filter
private final String unauthorizedMessage;
- public SecuritySanityCheckFilter(
- ObjectMapper jsonMapper
- )
+ public SecuritySanityCheckFilter(ObjectMapper jsonMapper)
{
try {
- QueryInterruptedException unauthorizedError = new QueryInterruptedException(
- QueryInterruptedException.UNAUTHORIZED,
- null,
- null,
- DruidNode.getDefaultHost()
- );
- unauthorizedError.setStackTrace(new StackTraceElement[0]);
- this.unauthorizedMessage = jsonMapper.writeValueAsString(unauthorizedError);
+ ForbiddenException forbiddenException = new ForbiddenException();
+ forbiddenException.setStackTrace(new StackTraceElement[0]);
+ this.unauthorizedMessage = jsonMapper.writeValueAsString(forbiddenException);
}
catch (Exception e) {
throw new RuntimeException(e);
diff --git a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index 899aa14be940..ce3c032b6b2d 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -34,16 +34,20 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.BadJsonQueryException;
import org.apache.druid.query.DefaultGenericQueryMetricsFactory;
import org.apache.druid.query.DefaultQueryConfig;
import org.apache.druid.query.MapQueryToolChestWarehouse;
import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryCapacityExceededException;
+import org.apache.druid.query.QueryException;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChestWarehouse;
import org.apache.druid.query.QueryUnsupportedException;
+import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.Result;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.TruncatedResponseContextException;
@@ -76,6 +80,7 @@
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.StreamingOutput;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -490,7 +495,6 @@ public void testGoodQueryWithSmileAcceptHeader() throws IOException
EasyMock.verify(smileRequest);
}
-
@Test
public void testBadQuery() throws IOException
{
@@ -501,7 +505,29 @@ public void testBadQuery() throws IOException
testServletRequest
);
Assert.assertNotNull(response);
- Assert.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
+ Assert.assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+ QueryException e = JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryException.class);
+ Assert.assertEquals(BadJsonQueryException.ERROR_CODE, e.getErrorCode());
+ Assert.assertEquals(BadJsonQueryException.ERROR_CLASS, e.getErrorClass());
+ }
+
+ @Test
+ public void testResourceLimitExceeded() throws IOException
+ {
+ ByteArrayInputStream badQuery = EasyMock.createMock(ByteArrayInputStream.class);
+ EasyMock.expect(badQuery.read(EasyMock.anyObject(), EasyMock.anyInt(), EasyMock.anyInt()))
+ .andThrow(new ResourceLimitExceededException("You require too much of something"));
+ EasyMock.replay(badQuery, testServletRequest);
+ Response response = queryResource.doPost(
+ badQuery,
+ null /*pretty*/,
+ testServletRequest
+ );
+ Assert.assertNotNull(response);
+ Assert.assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
+ QueryException e = JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryException.class);
+ Assert.assertEquals(ResourceLimitExceededException.ERROR_CODE, e.getErrorCode());
+ Assert.assertEquals(ResourceLimitExceededException.class.getName(), e.getErrorClass());
}
@Test
@@ -520,13 +546,7 @@ public void testUnsupportedQueryThrowsException() throws IOException
);
Assert.assertNotNull(response);
Assert.assertEquals(QueryUnsupportedException.STATUS_CODE, response.getStatus());
- QueryUnsupportedException ex;
- try {
- ex = JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryUnsupportedException.class);
- }
- catch (IOException e) {
- throw new RuntimeException(e);
- }
+ QueryException ex = JSON_MAPPER.readValue((byte[]) response.getEntity(), QueryException.class);
Assert.assertEquals(errorMessage, ex.getMessage());
Assert.assertEquals(QueryUnsupportedException.ERROR_CODE, ex.getErrorCode());
}
diff --git a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
index 485275b07bf2..88683612bd68 100644
--- a/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
+++ b/server/src/test/java/org/apache/druid/server/QuerySchedulerTest.java
@@ -45,6 +45,7 @@
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
import org.apache.druid.query.Query;
+import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
diff --git a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
index 9453d71c5143..41deb6d25829 100644
--- a/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
+++ b/sql/src/main/java/org/apache/druid/sql/SqlLifecycle.java
@@ -159,8 +159,8 @@ public PrepareResult prepare(AuthenticationResult authenticationResult)
}
}
- public PlannerContext plan(AuthenticationResult authenticationResult)
- throws ValidationException, RelConversionException, SqlParseException
+ private PlannerContext plan(AuthenticationResult authenticationResult)
+ throws RelConversionException
{
synchronized (lock) {
transition(State.INITIALIZED, State.PLANNED);
@@ -168,12 +168,19 @@ public PlannerContext plan(AuthenticationResult authenticationResult)
this.plannerContext = planner.getPlannerContext();
this.plannerResult = planner.plan(sql);
}
+ // we can't collapse catch clauses since SqlPlanningException has type-sensitive constructors.
+ catch (SqlParseException e) {
+ throw new SqlPlanningException(e);
+ }
+ catch (ValidationException e) {
+ throw new SqlPlanningException(e);
+ }
return plannerContext;
}
}
- public PlannerContext plan(HttpServletRequest req)
- throws SqlParseException, RelConversionException, ValidationException
+ private PlannerContext plan(HttpServletRequest req)
+ throws RelConversionException
{
synchronized (lock) {
this.req = req;
@@ -225,7 +232,7 @@ private Access doAuthorize(final Access authorizationResult)
}
public PlannerContext planAndAuthorize(final AuthenticationResult authenticationResult)
- throws SqlParseException, RelConversionException, ValidationException
+ throws RelConversionException
{
PlannerContext plannerContext = plan(authenticationResult);
Access access = authorize();
@@ -236,7 +243,7 @@ public PlannerContext planAndAuthorize(final AuthenticationResult authentication
}
public PlannerContext planAndAuthorize(final HttpServletRequest req)
- throws SqlParseException, RelConversionException, ValidationException
+ throws RelConversionException
{
PlannerContext plannerContext = plan(req);
Access access = authorize();
@@ -260,7 +267,7 @@ public Sequence