Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,15 +163,15 @@ public Iterable<T> call() throws Exception
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query interrupted");
throw new QueryInterruptedException(e);
}
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
throw new QueryInterruptedException(e);
}
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query timeout");
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,17 @@ public Void call() throws Exception
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
futures.cancel(true);
indexAccumulatorPair.lhs.close();
throw new QueryInterruptedException("Query interrupted");
throw new QueryInterruptedException(e);
}
catch (CancellationException e) {
indexAccumulatorPair.lhs.close();
throw new QueryInterruptedException("Query cancelled");
throw new QueryInterruptedException(e);
}
catch (TimeoutException e) {
indexAccumulatorPair.lhs.close();
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
futures.cancel(true);
throw new QueryInterruptedException("Query timeout");
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
indexAccumulatorPair.lhs.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,88 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;

import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.TimeoutException;

public class QueryInterruptedException extends RuntimeException
{
public QueryInterruptedException() {
super();
}
public static final String QUERY_INTERRUPTED = "Query interrupted";
public static final String QUERY_TIMEOUT = "Query timeout";
public static final String QUERY_CANCELLED = "Query cancelled";
public static final String UNKNOWN_EXCEPTION = "Unknown exception";

private static final Set<String> listKnownException = ImmutableSet.of(
QUERY_CANCELLED,
QUERY_INTERRUPTED,
QUERY_TIMEOUT,
UNKNOWN_EXCEPTION
);

@JsonProperty
private final String causeMessage;
@JsonProperty
private final String host;

@JsonCreator
public QueryInterruptedException(@JsonProperty("error") String message)
public QueryInterruptedException(
@JsonProperty("error") String message,
@JsonProperty("causeMessage") String causeMessage,
@JsonProperty("host") String host
)
{
super(message);
this.causeMessage = causeMessage;
this.host = host;
}

public QueryInterruptedException(Throwable cause)
{
super(cause);
this(cause, null);
}

public QueryInterruptedException(Throwable e, String host)
{
super(e);
this.host = host;
causeMessage = e.getMessage();
}

@JsonProperty("error")
@Override
public String getMessage()
{
return super.getMessage();
if (this.getCause() == null) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a unit test for this method that hits the branches?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@drcrallen and @cheddar the current UT test this branch by this code if you have another idea in mind please let me know

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    if (this.getCause() instanceof QueryInterruptedException) {
      return getCause().getMessage();
    } else if (this.getCause() instanceof InterruptedException) {
      return QUERY_INTERRUPTED;
    }

neither of those two branches were hit in anything I could find.

I was asking if you could add a basic unit test to explicitly hit the branches in this method.

return super.getMessage();
} else if (this.getCause() instanceof QueryInterruptedException) {
return getCause().getMessage();
} else if (this.getCause() instanceof InterruptedException) {
return QUERY_INTERRUPTED;
} else if (this.getCause() instanceof CancellationException) {
return QUERY_CANCELLED;
} else if (this.getCause() instanceof TimeoutException) {
return QUERY_TIMEOUT;
} else {
return UNKNOWN_EXCEPTION;
}
}

@JsonProperty("causeMessage")
public String getCauseMessage()
{
return causeMessage;
}

@JsonProperty("host")
public String getHost()
{
return host;
}

public boolean isNotKnown()
{
return !listKnownException.contains(getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ public Void call() throws Exception
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query interrupted");
throw new QueryInterruptedException(e);
}
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
throw new QueryInterruptedException(e);
}
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query timeout");
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,15 @@ public Sequence<SegmentAnalysis> call() throws Exception
catch (InterruptedException e) {
log.warn(e, "Query interrupted, cancelling pending results, query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query interrupted");
throw new QueryInterruptedException(e);
}
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
catch(CancellationException e) {
throw new QueryInterruptedException(e);
}
catch (TimeoutException e) {
log.info("Query timeout, cancelling pending results for query id [%s]", query.getId());
future.cancel(true);
throw new QueryInterruptedException("Query timeout");
throw new QueryInterruptedException(e);
}
catch (ExecutionException e) {
throw Throwables.propagate(e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ public DateTime getTime()
public void advance()
{
if (Thread.interrupted()) {
throw new QueryInterruptedException();
throw new QueryInterruptedException(new InterruptedException());
}
cursorOffset.increment();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ public void advance()

while (baseIter.hasNext()) {
if (Thread.interrupted()) {
throw new QueryInterruptedException();
throw new QueryInterruptedException(new InterruptedException());
}

currEntry.set(baseIter.next());
Expand Down Expand Up @@ -307,7 +307,7 @@ public void reset()
}

if (Thread.interrupted()) {
throw new QueryInterruptedException();
throw new QueryInterruptedException( new InterruptedException());
}

boolean foundMatched = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ public Sequence<Integer> run(Query<Integer> query, Map<String, Object> responseC
interrupted = true;
interruptedRunners.offer(this);
stop.countDown();
throw new QueryInterruptedException("I got killed");
throw new QueryInterruptedException(e);
}
}

Expand Down
12 changes: 9 additions & 3 deletions server/src/main/java/io/druid/client/DirectDruidClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -478,8 +478,14 @@ private void init()
jp = objectMapper.getFactory().createParser(future.get());
final JsonToken nextToken = jp.nextToken();
if (nextToken == JsonToken.START_OBJECT) {
QueryInterruptedException e = jp.getCodec().readValue(jp, QueryInterruptedException.class);
throw e;
QueryInterruptedException cause = jp.getCodec().readValue(jp, QueryInterruptedException.class);
//case we get an exception with an unknown message.
if (cause.isNotKnown()) {
throw new QueryInterruptedException(QueryInterruptedException.UNKNOWN_EXCEPTION, cause.getMessage(), host);
} else {
throw new QueryInterruptedException(cause, host);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be a QueryInterruptedException in a QueryInterruptedException.

I think that's ok, but is it intended?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that this is deserializing a remote exception, this "re-wrapping" works as a demarcation point between the remote and the local. Seems potentially useful to me... If nothing else, it might just be some extra stuff on the exception, but it shouldn't make it more difficult to track down what happened.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very reasonable. Just wanted to make sure that was the intended use here.

}

} else if (nextToken != JsonToken.START_ARRAY) {
throw new IAE("Next token wasn't a START_ARRAY, was[%s] from url [%s]", jp.getCurrentToken(), url);
} else {
Expand All @@ -491,7 +497,7 @@ private void init()
throw new RE(e, "Failure getting results from[%s] because of [%s]", url, e.getMessage());
}
catch (CancellationException e) {
throw new QueryInterruptedException("Query cancelled");
throw new QueryInterruptedException(e, host);
}
}
}
Expand Down
71 changes: 71 additions & 0 deletions server/src/test/java/io/druid/client/DirectDruidClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package io.druid.client;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -259,4 +261,73 @@ public void testCancel() throws Exception

EasyMock.verify(httpClient);
}

@Test
public void testQueryInterruptionExceptionLogMessage() throws JsonProcessingException
{
HttpClient httpClient = EasyMock.createMock(HttpClient.class);
SettableFuture<Object> interruptionFuture = SettableFuture.create();
Capture<Request> capturedRequest = EasyMock.newCapture();
String hostName = "localhost:8080";
EasyMock.expect(
httpClient.go(
EasyMock.capture(capturedRequest),
EasyMock.<HttpResponseHandler>anyObject()
)
)
.andReturn(interruptionFuture)
.anyTimes();

EasyMock.replay(httpClient);

DataSegment dataSegment = new DataSegment(
"test",
new Interval("2013-01-01/2013-01-02"),
new DateTime("2013-01-01").toString(),
Maps.<String, Object>newHashMap(),
Lists.<String>newArrayList(),
Lists.<String>newArrayList(),
new NoneShardSpec(),
0,
0L
);
final ServerSelector serverSelector = new ServerSelector( dataSegment
,
new HighestPriorityTierSelectorStrategy(new ConnectionCountServerSelectorStrategy())
);

DirectDruidClient client1 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
hostName,
new NoopServiceEmitter()
);

QueryableDruidServer queryableDruidServer = new QueryableDruidServer(
new DruidServer("test1", hostName, 0, "historical", DruidServer.DEFAULT_TIER, 0),
client1
);

serverSelector.addServerAndUpdateSegment(queryableDruidServer, dataSegment);

TimeBoundaryQuery query = Druids.newTimeBoundaryQueryBuilder().dataSource("test").build();
HashMap<String, List> context = Maps.newHashMap();
interruptionFuture.set(new ByteArrayInputStream("{\"error\":\"testing\"}".getBytes()));
Sequence results = client1.run(query, context);

QueryInterruptedException actualException = null;
try {
Sequences.toList(results, Lists.newArrayList());
}
catch (QueryInterruptedException e) {
actualException = e;
}
Assert.assertNotNull(actualException);
Assert.assertEquals(actualException.getMessage(), QueryInterruptedException.UNKNOWN_EXCEPTION);
Assert.assertEquals(actualException.getCauseMessage(), "testing");
Assert.assertEquals(actualException.getHost(), hostName);
EasyMock.verify(httpClient);
}
}