diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index a8a143b22d7d..f32be51d1ea3 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -163,15 +163,15 @@ public Iterable call() throws Exception catch (InterruptedException e) { log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); futures.cancel(true); - throw new QueryInterruptedException("Query interrupted"); + throw new QueryInterruptedException(e); } catch (CancellationException e) { - throw new QueryInterruptedException("Query cancelled"); + throw new QueryInterruptedException(e); } catch (TimeoutException e) { log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); futures.cancel(true); - throw new QueryInterruptedException("Query timeout"); + throw new QueryInterruptedException(e); } catch (ExecutionException e) { throw Throwables.propagate(e.getCause()); diff --git a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java index 81929de10a47..125b61774f34 100644 --- a/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByParallelQueryRunner.java @@ -151,17 +151,17 @@ public Void call() throws Exception log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); futures.cancel(true); indexAccumulatorPair.lhs.close(); - throw new QueryInterruptedException("Query interrupted"); + throw new QueryInterruptedException(e); } catch (CancellationException e) { indexAccumulatorPair.lhs.close(); - throw new QueryInterruptedException("Query cancelled"); + throw new QueryInterruptedException(e); } catch (TimeoutException e) { indexAccumulatorPair.lhs.close(); log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); futures.cancel(true); - throw new QueryInterruptedException("Query timeout"); + throw new QueryInterruptedException(e); } catch (ExecutionException e) { indexAccumulatorPair.lhs.close(); diff --git a/processing/src/main/java/io/druid/query/QueryInterruptedException.java b/processing/src/main/java/io/druid/query/QueryInterruptedException.java index 13a46652744f..1d4b4c76550d 100644 --- a/processing/src/main/java/io/druid/query/QueryInterruptedException.java +++ b/processing/src/main/java/io/druid/query/QueryInterruptedException.java @@ -21,28 +21,88 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; + +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.TimeoutException; public class QueryInterruptedException extends RuntimeException { - public QueryInterruptedException() { - super(); - } + public static final String QUERY_INTERRUPTED = "Query interrupted"; + public static final String QUERY_TIMEOUT = "Query timeout"; + public static final String QUERY_CANCELLED = "Query cancelled"; + public static final String UNKNOWN_EXCEPTION = "Unknown exception"; + + private static final Set listKnownException = ImmutableSet.of( + QUERY_CANCELLED, + QUERY_INTERRUPTED, + QUERY_TIMEOUT, + UNKNOWN_EXCEPTION + ); + + @JsonProperty + private final String causeMessage; + @JsonProperty + private final String host; @JsonCreator - public QueryInterruptedException(@JsonProperty("error") String message) + public QueryInterruptedException( + @JsonProperty("error") String message, + @JsonProperty("causeMessage") String causeMessage, + @JsonProperty("host") String host + ) { super(message); + this.causeMessage = causeMessage; + this.host = host; } public QueryInterruptedException(Throwable cause) { - super(cause); + this(cause, null); + } + + public QueryInterruptedException(Throwable e, String host) + { + super(e); + this.host = host; + causeMessage = e.getMessage(); } @JsonProperty("error") @Override public String getMessage() { - return super.getMessage(); + if (this.getCause() == null) { + return super.getMessage(); + } else if (this.getCause() instanceof QueryInterruptedException) { + return getCause().getMessage(); + } else if (this.getCause() instanceof InterruptedException) { + return QUERY_INTERRUPTED; + } else if (this.getCause() instanceof CancellationException) { + return QUERY_CANCELLED; + } else if (this.getCause() instanceof TimeoutException) { + return QUERY_TIMEOUT; + } else { + return UNKNOWN_EXCEPTION; + } + } + + @JsonProperty("causeMessage") + public String getCauseMessage() + { + return causeMessage; + } + + @JsonProperty("host") + public String getHost() + { + return host; + } + + public boolean isNotKnown() + { + return !listKnownException.contains(getMessage()); } } diff --git a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java index ebeda93598ac..dc93702e5871 100644 --- a/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/groupby/GroupByQueryRunnerFactory.java @@ -156,15 +156,15 @@ public Void call() throws Exception catch (InterruptedException e) { log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); future.cancel(true); - throw new QueryInterruptedException("Query interrupted"); + throw new QueryInterruptedException(e); } catch (CancellationException e) { - throw new QueryInterruptedException("Query cancelled"); + throw new QueryInterruptedException(e); } catch (TimeoutException e) { log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); future.cancel(true); - throw new QueryInterruptedException("Query timeout"); + throw new QueryInterruptedException(e); } catch (ExecutionException e) { throw Throwables.propagate(e.getCause()); diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index 8017446c9a6c..0e846b89e1d6 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -184,15 +184,15 @@ public Sequence call() throws Exception catch (InterruptedException e) { log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId()); future.cancel(true); - throw new QueryInterruptedException("Query interrupted"); + throw new QueryInterruptedException(e); } - catch (CancellationException e) { - throw new QueryInterruptedException("Query cancelled"); + catch(CancellationException e) { + throw new QueryInterruptedException(e); } catch (TimeoutException e) { log.info("Query timeout, cancelling pending results for query id [%s]", query.getId()); future.cancel(true); - throw new QueryInterruptedException("Query timeout"); + throw new QueryInterruptedException(e); } catch (ExecutionException e) { throw Throwables.propagate(e.getCause()); diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java index f9a6467e68c9..22984501242b 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexStorageAdapter.java @@ -339,7 +339,7 @@ public DateTime getTime() public void advance() { if (Thread.interrupted()) { - throw new QueryInterruptedException(); + throw new QueryInterruptedException(new InterruptedException()); } cursorOffset.increment(); } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 3cc5825568c5..b1975c3a7b1c 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -264,7 +264,7 @@ public void advance() while (baseIter.hasNext()) { if (Thread.interrupted()) { - throw new QueryInterruptedException(); + throw new QueryInterruptedException(new InterruptedException()); } currEntry.set(baseIter.next()); @@ -307,7 +307,7 @@ public void reset() } if (Thread.interrupted()) { - throw new QueryInterruptedException(); + throw new QueryInterruptedException( new InterruptedException()); } boolean foundMatched = false; diff --git a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java index 6b658fa6701e..e532f3a6e30b 100644 --- a/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/ChainedExecutionQueryRunnerTest.java @@ -343,7 +343,7 @@ public Sequence run(Query query, Map responseC interrupted = true; interruptedRunners.offer(this); stop.countDown(); - throw new QueryInterruptedException("I got killed"); + throw new QueryInterruptedException(e); } } diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 83cb2c983a09..9f26b1de48ef 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -478,8 +478,14 @@ private void init() jp = objectMapper.getFactory().createParser(future.get()); final JsonToken nextToken = jp.nextToken(); if (nextToken == JsonToken.START_OBJECT) { - QueryInterruptedException e = jp.getCodec().readValue(jp, QueryInterruptedException.class); - throw e; + QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class); + //case we get an exception with an unknown message. + if (cause.isNotKnown()) { + throw new QueryInterruptedException(QueryInterruptedException.UNKNOWN_EXCEPTION, cause.getMessage(), host); + } else { + throw new QueryInterruptedException(cause, host); + } + } else if (nextToken != JsonToken.START_ARRAY) { throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url); } else { @@ -491,7 +497,7 @@ private void init() throw new RE(e, "Failure getting results from[%s] because of [%s]", url, e.getMessage()); } catch (CancellationException e) { - throw new QueryInterruptedException("Query cancelled"); + throw new QueryInterruptedException(e, host); } } } diff --git a/server/src/test/java/io/druid/client/DirectDruidClientTest.java b/server/src/test/java/io/druid/client/DirectDruidClientTest.java index 413e6e25517d..1648e59584e6 100644 --- a/server/src/test/java/io/druid/client/DirectDruidClientTest.java +++ b/server/src/test/java/io/druid/client/DirectDruidClientTest.java @@ -19,6 +19,8 @@ package io.druid.client; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.Futures; @@ -259,4 +261,73 @@ public void testCancel() throws Exception EasyMock.verify(httpClient); } + + @Test + public void testQueryInterruptionExceptionLogMessage() throws JsonProcessingException + { + HttpClient httpClient = EasyMock.createMock(HttpClient.class); + SettableFuture interruptionFuture = SettableFuture.create(); + Capture capturedRequest = EasyMock.newCapture(); + String hostName = "localhost:8080"; + EasyMock.expect( + httpClient.go( + EasyMock.capture(capturedRequest), + EasyMock.anyObject() + ) + ) + .andReturn(interruptionFuture) + .anyTimes(); + + EasyMock.replay(httpClient); + + DataSegment dataSegment = new DataSegment( + "test", + new Interval("2013-01-01/2013-01-02"), + new DateTime("2013-01-01").toString(), + Maps.newHashMap(), + Lists.newArrayList(), + Lists.newArrayList(), + new NoneShardSpec(), + 0, + 0L + ); + final ServerSelector serverSelector = new ServerSelector( dataSegment + , + new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy()) + ); + + DirectDruidClient client1 = new DirectDruidClient( + new ReflectionQueryToolChestWarehouse(), + QueryRunnerTestHelper.NOOP_QUERYWATCHER, + new DefaultObjectMapper(), + httpClient, + hostName, + new NoopServiceEmitter() + ); + + QueryableDruidServer queryableDruidServer = new QueryableDruidServer( + new DruidServer("test1", hostName, 0, "historical", DruidServer.DEFAULT_TIER, 0), + client1 + ); + + serverSelector.addServerAndUpdateSegment(queryableDruidServer, dataSegment); + + TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build(); + HashMap context = Maps.newHashMap(); + interruptionFuture.set(new ByteArrayInputStream("{\"error\":\"testing\"}".getBytes())); + Sequence results = client1.run(query, context); + + QueryInterruptedException actualException = null; + try { + Sequences.toList(results, Lists.newArrayList()); + } + catch (QueryInterruptedException e) { + actualException = e; + } + Assert.assertNotNull(actualException); + Assert.assertEquals(actualException.getMessage(), QueryInterruptedException.UNKNOWN_EXCEPTION); + Assert.assertEquals(actualException.getCauseMessage(), "testing"); + Assert.assertEquals(actualException.getHost(), hostName); + EasyMock.verify(httpClient); + } }