Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.utils.JvmUtils;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -187,7 +188,7 @@ public boolean hasNext()
{
final long thisTimeoutNanos = timeoutAtNanos - System.nanoTime();
if (hasTimeout && thisTimeoutNanos < 0) {
throw new RE(new TimeoutException("Sequence iterator timed out"));
throw new QueryTimeoutException("Sequence iterator timed out");
}

if (currentBatch != null && !currentBatch.isTerminalResult() && !currentBatch.isDrained()) {
Expand All @@ -202,7 +203,7 @@ public boolean hasNext()
}
}
if (currentBatch == null) {
throw new RE(new TimeoutException("Sequence iterator timed out waiting for data"));
throw new QueryTimeoutException("Sequence iterator timed out waiting for data");
}

if (cancellationGizmo.isCancelled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;

import javax.annotation.Nullable;
import java.net.InetAddress;

/**
* This exception is thrown when a query does not finish before the configured query timeout.
* {@link java.util.concurrent.TimeoutException} exceptions encountered during the lifecycle of a query
* are rethrown as this exception.
* <p>
* As a {@link QueryException}, it is expected to be serialized to a json response, but will be mapped to
* {@link #STATUS_CODE} instead of the default HTTP 500 status.
*/

public class QueryTimeoutException extends QueryException
{
private static final String ERROR_CLASS = QueryTimeoutException.class.getName();
public static final String ERROR_CODE = "Query timeout";
public static final String ERROR_MESSAGE = "Query Timed Out!";
public static final int STATUS_CODE = 504;

@JsonCreator
public QueryTimeoutException(
@JsonProperty("error") @Nullable String errorCode,
@JsonProperty("errorMessage") String errorMessage,
@JsonProperty("errorClass") @Nullable String errorClass,
@JsonProperty("host") @Nullable String host
)
{
super(errorCode, errorMessage, errorClass, host);
}

public QueryTimeoutException()
{
super(ERROR_CODE, ERROR_MESSAGE, ERROR_CLASS, resolveHostname());
}

public QueryTimeoutException(String errorMessage)
{
super(ERROR_CODE, errorMessage, ERROR_CLASS, resolveHostname());
}

public QueryTimeoutException(String errorMessage, String host)
{
super(ERROR_CODE, errorMessage, ERROR_CLASS, host);
}


private static String resolveHostname()
{
String host;
try {
host = InetAddress.getLocalHost().getCanonicalHostName();
}
catch (Exception e) {
host = null;
}
return host;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.druid.common.guava.CombiningSequence;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.logger.Logger;
import org.hamcrest.Matchers;
import org.apache.druid.query.QueryTimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -39,7 +39,6 @@
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
Expand Down Expand Up @@ -511,8 +510,7 @@ public void testTimeoutExceptionDueToStalledInput() throws Exception
input.add(nonBlockingSequence(someSize));
input.add(nonBlockingSequence(someSize));
input.add(blockingSequence(someSize, 400, 500, 1, 500, true));
expectedException.expect(RuntimeException.class);
expectedException.expectCause(Matchers.instanceOf(TimeoutException.class));
expectedException.expect(QueryTimeoutException.class);
expectedException.expectMessage("Sequence iterator timed out waiting for data");

assertException(
Expand All @@ -534,8 +532,7 @@ public void testTimeoutExceptionDueToStalledReader() throws Exception
input.add(nonBlockingSequence(someSize));
input.add(nonBlockingSequence(someSize));

expectedException.expect(RuntimeException.class);
expectedException.expectCause(Matchers.instanceOf(TimeoutException.class));
expectedException.expect(QueryTimeoutException.class);
expectedException.expectMessage("Sequence iterator timed out");
assertException(input, 8, 64, 1000, 500);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.query;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;

public class QueryTimeoutExceptionTest
{
@Test
public void testSerde() throws IOException
{
final ObjectMapper mapper = new ObjectMapper();
QueryTimeoutException timeoutException = mapper.readValue(
mapper.writeValueAsBytes(new QueryTimeoutException()),
QueryTimeoutException.class
);
QueryTimeoutException timeoutExceptionWithMsg = mapper.readValue(mapper.writeValueAsBytes(new QueryTimeoutException(
"Another query timeout")), QueryTimeoutException.class);

Assert.assertEquals(
"Query timeout",
timeoutException.getErrorCode()
);
Assert.assertEquals(
"Query Timed Out!",
timeoutException.getMessage()
);
Assert.assertEquals(
"Another query timeout",
timeoutExceptionWithMsg.getMessage()
);
Assert.assertEquals(
"org.apache.druid.query.QueryTimeoutException",
timeoutExceptionWithMsg.getErrorClass()
);
}

@Test
public void testExceptionHost()
{
Assert.assertEquals(
"timeouthost",
new QueryTimeoutException("Timed out", "timeouthost").getHost()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.common.guava.GuavaUtils;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.BaseSequence;
import org.apache.druid.java.util.common.guava.MergeIterable;
import org.apache.druid.java.util.common.guava.Sequence;
Expand Down Expand Up @@ -132,6 +133,9 @@ public Iterable<T> call()
catch (QueryInterruptedException e) {
throw new RuntimeException(e);
}
catch (QueryTimeoutException e) {
throw e;
}
catch (Exception e) {
log.noStackTrace().error(e, "Exception with one of the sequences!");
Throwables.propagateIfPossible(e);
Expand Down Expand Up @@ -167,7 +171,7 @@ public Iterable<T> call()
catch (TimeoutException e) {
log.warn("Query timeout, cancelling pending results for query id [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.druid.data.input.Row;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.guava.Accumulator;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
Expand Down Expand Up @@ -201,7 +202,7 @@ private void waitForFutureCompletion(
closeOnFailure.close();
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", query.getId()));
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,14 @@

import javax.annotation.Nullable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;

/**
* Exception representing a failed query. The name "QueryInterruptedException" is a misnomer; this is actually
* used on the client side for *all* kinds of failed queries.
*
* Fields:
* - "errorCode" is a well-defined errorCode code taken from a specific list (see the static constants). "Unknown exception"
* represents all wrapped exceptions other than interrupt, timeout, cancellation, resource limit exceeded, unauthorized
* represents all wrapped exceptions other than interrupt, cancellation, resource limit exceeded, unauthorized
* request, and unsupported operation.
* - "errorMessage" is the toString of the wrapped exception
* - "errorClass" is the class of the wrapped exception
Expand All @@ -45,7 +44,6 @@
public class QueryInterruptedException extends QueryException
{
public static final String QUERY_INTERRUPTED = "Query interrupted";
public static final String QUERY_TIMEOUT = "Query timeout";
public static final String QUERY_CANCELLED = "Query cancelled";
public static final String RESOURCE_LIMIT_EXCEEDED = "Resource limit exceeded";
public static final String UNAUTHORIZED = "Unauthorized request.";
Expand Down Expand Up @@ -100,8 +98,6 @@ private static String getErrorCodeFromThrowable(Throwable e)
return QUERY_INTERRUPTED;
} else if (e instanceof CancellationException) {
return QUERY_CANCELLED;
} else if (e instanceof TimeoutException) {
return QUERY_TIMEOUT;
} else if (e instanceof ResourceLimitExceededException) {
return RESOURCE_LIMIT_EXCEEDED;
} else if (e instanceof UnsupportedOperationException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.query.AbstractPrioritizedCallable;
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.orderby.DefaultLimitSpec;
Expand Down Expand Up @@ -361,10 +362,14 @@ public CloseableIterator<Entry<KeyType>> call()
final long timeout = queryTimeoutAt - System.currentTimeMillis();
return hasQueryTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get();
}
catch (InterruptedException | TimeoutException | CancellationException e) {
catch (InterruptedException | CancellationException e) {
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
catch (TimeoutException e) {
GuavaUtils.cancelAll(true, future, futures);
throw new QueryTimeoutException();
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);
throw new RuntimeException(e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.druid.query.QueryInterruptedException;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryWatcher;
import org.apache.druid.query.ResourceLimitExceededException;
import org.apache.druid.query.context.ResponseContext;
Expand Down Expand Up @@ -245,7 +246,7 @@ public AggregateResult call()
return input.run(queryPlusForRunners, responseContext)
.accumulate(AggregateResult.ok(), accumulator);
}
catch (QueryInterruptedException e) {
catch (QueryInterruptedException | QueryTimeoutException e) {
throw e;
}
catch (Exception e) {
Expand Down Expand Up @@ -321,16 +322,19 @@ private List<ReferenceCountingResourceHolder<ByteBuffer>> getMergeBuffersHolder(
if (hasTimeout) {
final long timeout = timeoutAt - System.currentTimeMillis();
if (timeout <= 0) {
throw new TimeoutException();
throw new QueryTimeoutException();
}
if ((mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers, timeout)).isEmpty()) {
throw new TimeoutException("Cannot acquire enough merge buffers");
throw new QueryTimeoutException("Cannot acquire enough merge buffers");
}
} else {
mergeBufferHolder = mergeBufferPool.takeBatch(numBuffers);
}
return mergeBufferHolder;
}
catch (QueryTimeoutException e) {
throw e;
}
catch (Exception e) {
throw new QueryInterruptedException(e);
}
Expand All @@ -350,7 +354,7 @@ private void waitForFutureCompletion(
}

if (hasTimeout && timeout <= 0) {
throw new TimeoutException();
throw new QueryTimeoutException();
}

final List<AggregateResult> results = hasTimeout ? future.get(timeout, TimeUnit.MILLISECONDS) : future.get();
Expand All @@ -374,7 +378,7 @@ private void waitForFutureCompletion(
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
throw new QueryTimeoutException();
}
catch (ExecutionException e) {
GuavaUtils.cancelAll(true, future, futures);
Expand Down
Loading