Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
Expand Down Expand Up @@ -780,7 +779,7 @@ public boolean block() throws InterruptedException
if (hasTimeout) {
final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime();
if (thisTimeoutNanos < 0) {
throw new RE(new TimeoutException("QueuePusher timed out offering data"));
throw new QueryTimeoutException("QueuePusher timed out offering data");
}
success = queue.offer(item, thisTimeoutNanos, TimeUnit.NANOSECONDS);
} else {
Expand Down Expand Up @@ -1127,7 +1126,7 @@ public boolean block() throws InterruptedException
if (hasTimeout) {
final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime();
if (thisTimeoutNanos < 0) {
throw new RE(new TimeoutException("BlockingQueue cursor timed out waiting for data"));
throw new QueryTimeoutException("BlockingQueue cursor timed out waiting for data");
}
resultBatch = queue.poll(thisTimeoutNanos, TimeUnit.NANOSECONDS);
} else {
Expand Down
16 changes: 14 additions & 2 deletions core/src/main/java/org/apache/druid/query/QueryException.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;
import java.net.InetAddress;

/**
* Base serializable error response
Expand All @@ -35,7 +36,7 @@ public class QueryException extends RuntimeException
private final String errorClass;
private final String host;

public QueryException(Throwable cause, String errorCode, String errorClass, String host)
protected QueryException(Throwable cause, String errorCode, String errorClass, String host)
{
super(cause == null ? null : cause.getMessage(), cause);
this.errorCode = errorCode;
Expand All @@ -44,7 +45,7 @@ public QueryException(Throwable cause, String errorCode, String errorClass, Stri
}

@JsonCreator
public QueryException(
protected QueryException(
@JsonProperty("error") @Nullable String errorCode,
@JsonProperty("errorMessage") String errorMessage,
@JsonProperty("errorClass") @Nullable String errorClass,
Expand Down Expand Up @@ -82,4 +83,15 @@ public String getHost()
{
return host;
}

@Nullable
protected static String resolveHostname()
{
try {
return InetAddress.getLocalHost().getCanonicalHostName();
}
catch (Exception e) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;
import java.net.InetAddress;

/**
* This exception is thrown when a query does not finish before the configured query timeout.
Expand Down Expand Up @@ -66,17 +65,4 @@ public QueryTimeoutException(String errorMessage, String host)
{
super(ERROR_CODE, errorMessage, ERROR_CLASS, host);
}


private static String resolveHostname()
{
String host;
try {
host = InetAddress.getLocalHost().getCanonicalHostName();
}
catch (Exception e) {
host = null;
}
return host;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import com.google.inject.Inject;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.query.Druids;
import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.server.QueryCapacityExceededException;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.CoordinatorResourceTestClient;
import org.apache.druid.testing.clients.QueryResourceTestClient;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonParseException;

public class BadJsonQueryException extends BadQueryException
{
public static final String ERROR_CODE = "Json parse failed";
public static final String ERROR_CLASS = JsonParseException.class.getName();

public BadJsonQueryException(JsonParseException e)
{
this(ERROR_CODE, e.getMessage(), ERROR_CLASS);
}

@JsonCreator
private BadJsonQueryException(
@JsonProperty("error") String errorCode,
@JsonProperty("errorMessage") String errorMessage,
@JsonProperty("errorClass") String errorClass
)
{
super(errorCode, errorMessage, errorClass);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,21 @@
package org.apache.druid.query;

/**
* This exception is thrown when the requested operation cannot be completed due to a lack of available resources.
* An abstract class for all query exceptions that should return a bad request status code (400).
*
* See {@code BadRequestException} for non-query requests.
*/
public class InsufficientResourcesException extends RuntimeException
public abstract class BadQueryException extends QueryException
{
public InsufficientResourcesException(String message)
public static final int STATUS_CODE = 400;

protected BadQueryException(String errorCode, String errorMessage, String errorClass)
{
super(errorCode, errorMessage, errorClass, null);
}

protected BadQueryException(String errorCode, String errorMessage, String errorClass, String host)
{
super(message);
super(errorCode, errorMessage, errorClass, host);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
* under the License.
*/

package org.apache.druid.server;
package org.apache.druid.query;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.QueryException;

/**
* This exception is for {@link QueryResource} and SqlResource to surface when a query is cast away by
* {@link QueryScheduler}.
* This exception is for QueryResource and SqlResource to surface when a query is cast away after
* it hits a resource limit. It is currently used in 2 places:
*
* <ul>
* <li>When the query is rejected by QueryScheduler.</li>
* <li>When the query cannot acquire enough merge buffers for groupBy v2</li>
* </ul>
*
* As a {@link QueryException} it is expected to be serialied to a json response, but will be mapped to
* {@link #STATUS_CODE} instead of the default HTTP 500 status.
Expand All @@ -52,13 +56,24 @@ public QueryCapacityExceededException(String lane, int capacity)
super(ERROR_CODE, makeLaneErrorMessage(lane, capacity), ERROR_CLASS, null);
}

/**
* This method sets hostName unlike constructors because this can be called in historicals
* while those constructors are only used in brokers.
*/
public static QueryCapacityExceededException withErrorMessageAndResolvedHost(String errorMessage)
{
return new QueryCapacityExceededException(ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
}

@JsonCreator
public QueryCapacityExceededException(
@JsonProperty("error") String errorCode,
@JsonProperty("errorMessage") String errorMessage,
@JsonProperty("errorClass") String errorClass)
@JsonProperty("errorClass") String errorClass,
@JsonProperty("host") String host
)
{
super(errorCode, errorMessage, errorClass, null);
super(errorCode, errorMessage, errorClass, host);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ public class QueryInterruptedException extends QueryException
{
public static final String QUERY_INTERRUPTED = "Query interrupted";
public static final String QUERY_CANCELLED = "Query cancelled";
public static final String RESOURCE_LIMIT_EXCEEDED = "Resource limit exceeded";
public static final String UNAUTHORIZED = "Unauthorized request.";
public static final String UNAUTHORIZED = "Unauthorized request";
public static final String UNSUPPORTED_OPERATION = "Unsupported operation";
public static final String TRUNCATED_RESPONSE_CONTEXT = "Truncated response context";
public static final String UNKNOWN_EXCEPTION = "Unknown exception";
Expand Down Expand Up @@ -98,8 +97,6 @@ private static String getErrorCodeFromThrowable(Throwable e)
return QUERY_INTERRUPTED;
} else if (e instanceof CancellationException) {
return QUERY_CANCELLED;
} else if (e instanceof ResourceLimitExceededException) {
return RESOURCE_LIMIT_EXCEEDED;
} else if (e instanceof UnsupportedOperationException) {
return UNSUPPORTED_OPERATION;
} else if (e instanceof TruncatedResponseContextException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,20 @@
import com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;
import java.net.InetAddress;

/**
* This exception is for the query engine to surface when a query cannot be run. This can be due to the
* following reasons: 1) The query is not supported yet. 2) The query is not something Druid would ever supports.
* For these cases, the exact causes and details should also be documented in Druid user facing documents.
*
* As a {@link QueryException} it is expected to be serialied to a json response, but will be mapped to
* {@link #STATUS_CODE} instead of the default HTTP 500 status.
* As a {@link QueryException} it is expected to be serialized to a json response with a proper HTTP error code
* ({@link #STATUS_CODE}).
*/
public class QueryUnsupportedException extends QueryException
{
private static final String ERROR_CLASS = QueryUnsupportedException.class.getName();
public static final String ERROR_CODE = "Unsupported query";
public static final int STATUS_CODE = 400;
public static final int STATUS_CODE = 501;

@JsonCreator
public QueryUnsupportedException(
Expand All @@ -55,16 +54,4 @@ public QueryUnsupportedException(String errorMessage)
{
super(ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
}

private static String resolveHostname()
{
String host;
try {
host = InetAddress.getLocalHost().getCanonicalHostName();
}
catch (Exception e) {
host = null;
}
return host;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,34 @@

package org.apache.druid.query;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.java.util.common.StringUtils;

/**
* Exception indicating that an operation failed because it exceeded some configured resource limit.
*
* This is used as a marker exception by {@link QueryInterruptedException} to report the "Resource limit exceeded"
* error code.
* This is a {@link BadQueryException} because it likely indicates a user's misbehavior when this exception is thrown.
* The resource limitations set by Druid cluster operators are typically less flexible than the parameters of
* a user query, so when a user query requires too many resources, the likely remedy is that the user query
* should be modified to use fewer resources, or to reduce query volume.
*/
public class ResourceLimitExceededException extends RuntimeException
public class ResourceLimitExceededException extends BadQueryException
{
public static final String ERROR_CODE = "Resource limit exceeded";

public ResourceLimitExceededException(String message, Object... arguments)
{
super(StringUtils.nonStrictFormat(message, arguments));
this(ERROR_CODE, StringUtils.nonStrictFormat(message, arguments), ResourceLimitExceededException.class.getName());
}

@JsonCreator
private ResourceLimitExceededException(
@JsonProperty("error") String errorCode,
@JsonProperty("errorMessage") String errorMessage,
@JsonProperty("errorClass") String errorClass
)
{
super(errorCode, errorMessage, errorClass, resolveHostname());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ private List<ReferenceCountingResourceHolder<ByteBuffer>> getMergeBuffersHolder(
}
return mergeBufferHolder;
}
catch (QueryTimeoutException e) {
catch (QueryTimeoutException | ResourceLimitExceededException e) {
throw e;
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,16 @@
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.collect.Utils;
import org.apache.druid.java.util.common.guava.CloseQuietly;
import org.apache.druid.java.util.common.guava.LazySequence;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.DruidProcessingConfig;
import org.apache.druid.query.InsufficientResourcesException;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryCapacityExceededException;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
import org.apache.druid.query.QueryPlus;
Expand Down Expand Up @@ -133,7 +134,12 @@ public GroupByQueryResource prepareResource(GroupByQuery query)
mergeBufferHolders = mergeBufferPool.takeBatch(requiredMergeBufferNum);
}
if (mergeBufferHolders.isEmpty()) {
throw new InsufficientResourcesException("Cannot acquire enough merge buffers");
throw QueryCapacityExceededException.withErrorMessageAndResolvedHost(
StringUtils.format(
"Cannot acquire %s merge buffers. Try again after current running queries are finished.",
requiredMergeBufferNum
)
);
} else {
return new GroupByQueryResource(mergeBufferHolders);
}
Expand Down
Loading