diff --git a/processing/src/main/java/io/druid/guice/annotations/QueryResourceWaitingPool.java b/processing/src/main/java/io/druid/guice/annotations/QueryResourceWaitingPool.java new file mode 100644 index 000000000000..6d27a1427443 --- /dev/null +++ b/processing/src/main/java/io/druid/guice/annotations/QueryResourceWaitingPool.java @@ -0,0 +1,34 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.guice.annotations; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@BindingAnnotation +@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface QueryResourceWaitingPool +{ +} diff --git a/server/src/main/java/io/druid/guice/DruidProcessingModule.java b/server/src/main/java/io/druid/guice/DruidProcessingModule.java index 1e2065ae808c..58e1a5ac26c5 100644 --- a/server/src/main/java/io/druid/guice/DruidProcessingModule.java +++ b/server/src/main/java/io/druid/guice/DruidProcessingModule.java @@ -18,6 +18,7 @@ package io.druid.guice; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Binder; @@ -35,6 +36,7 @@ import io.druid.guice.annotations.BackgroundCaching; import io.druid.guice.annotations.Global; import io.druid.guice.annotations.Processing; +import io.druid.guice.annotations.QueryResourceWaitingPool; import io.druid.offheap.OffheapBufferPool; import io.druid.query.DruidProcessingConfig; import io.druid.query.MetricsEmittingExecutorService; @@ -97,6 +99,21 @@ public ExecutorService getProcessingExecutorService( ); } + @Provides + @QueryResourceWaitingPool + @LazySingleton + public ListeningExecutorService getQueryResourceWaitingPool() throws Exception + { + return MoreExecutors.listeningDecorator( + Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(false) + .setNameFormat("QueryResourceWaitingPool-%d") + .build() + ) + ); + } + @Provides @LazySingleton @Global diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index bd859c98b883..c2c3ae87fdce 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -23,6 +23,8 @@ import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.MapMaker; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.inject.Inject; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; @@ -31,11 +33,12 @@ import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; import io.druid.guice.annotations.Json; +import io.druid.guice.annotations.QueryResourceWaitingPool; import io.druid.guice.annotations.Smile; +import io.druid.query.DruidMetrics; import io.druid.query.Query; import io.druid.query.QueryContextKeys; import io.druid.query.QueryInterruptedException; -import io.druid.query.DruidMetrics; import io.druid.query.QuerySegmentWalker; import io.druid.server.initialization.ServerConfig; import io.druid.server.log.RequestLogger; @@ -59,6 +62,11 @@ import java.io.OutputStream; import java.util.Map; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** */ @@ -76,6 +84,7 @@ public class QueryResource private final ServiceEmitter emitter; private final RequestLogger requestLogger; private final QueryManager queryManager; + private final ListeningExecutorService waitingService; @Inject public QueryResource( @@ -85,7 +94,9 @@ public QueryResource( QuerySegmentWalker texasRanger, ServiceEmitter emitter, RequestLogger requestLogger, - QueryManager queryManager + QueryManager queryManager, + @QueryResourceWaitingPool + ListeningExecutorService waitingService ) { this.config = config; @@ -95,12 +106,13 @@ public QueryResource( this.emitter = emitter; this.requestLogger = requestLogger; this.queryManager = queryManager; + this.waitingService = waitingService; } @DELETE @Path("{id}") @Produces(MediaType.APPLICATION_JSON) - public Response getServer(@PathParam("id") String queryId) + public Response cancelQuery(@PathParam("id") String queryId) { queryManager.cancelQuery(queryId); return Response.status(Response.Status.ACCEPTED).build(); @@ -145,6 +157,7 @@ public Response doPost( ) ); } + final long timeout = Long.parseLong(query.getContextValue(QueryContextKeys.TIMEOUT).toString()); if (log.isDebugEnabled()) { log.debug("Got query [%s]", query); @@ -159,19 +172,53 @@ public Response doPost( results = res; } - final Yielder yielder = results.toYielder( - null, - new YieldingAccumulator() - { - @Override - public Object accumulate(Object accumulated, Object in) + final Yielder yielder; + try { + ListenableFuture future = waitingService.submit( + new Callable() { - yield(); - return in; + @Override + public Yielder call() throws Exception + { + return results.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Object accumulate(Object accumulated, Object in) + { + yield(); + return in; + } + } + ); + } } - } - ); - + ); + queryManager.registerQuery(query, future); + yielder = future.get(timeout, TimeUnit.MILLISECONDS); + } + catch (TimeoutException ex) { + queryManager.cancelQuery(queryId); + throw new QueryInterruptedException("Query timeout"); + } + catch (InterruptedException ex) { + queryManager.cancelQuery(queryId); + throw new QueryInterruptedException("Query interrupted"); + } + catch (CancellationException ex) { + queryManager.cancelQuery(queryId); + throw new QueryInterruptedException("Query cancelled"); + } + catch (ExecutionException ex) { + throw new QueryInterruptedException( + String.format( + "Query error [%s] : [%s]", + ex.getCause(), + ex.getCause().getMessage() + ) + ); + } try { final Query theQuery = query; return Response @@ -188,7 +235,7 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE final long queryTime = System.currentTimeMillis() - start; emitter.emit( DruidMetrics.makeQueryTimeMetric(jsonMapper, theQuery, req.getRemoteAddr()) - .build("query/time", queryTime) + .build("query/time", queryTime) ); requestLogger.log( diff --git a/server/src/test/java/io/druid/server/QueryResourceTest.java b/server/src/test/java/io/druid/server/QueryResourceTest.java index 16ad638ea82c..20340960f9b4 100644 --- a/server/src/test/java/io/druid/server/QueryResourceTest.java +++ b/server/src/test/java/io/druid/server/QueryResourceTest.java @@ -18,8 +18,16 @@ package io.druid.server; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.metamx.common.StringUtils; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; +import com.metamx.common.guava.SimpleSequence; import com.metamx.emitter.service.ServiceEmitter; import io.druid.jackson.DefaultObjectMapper; import io.druid.query.Query; @@ -37,12 +45,17 @@ import org.junit.BeforeClass; import org.junit.Test; +import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.io.ByteArrayInputStream; import java.io.IOException; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; /** * @@ -65,6 +78,52 @@ public Period getMaxIdleTime() } }; private final HttpServletRequest testServletRequest = EasyMock.createMock(HttpServletRequest.class); + public static final AtomicLong SLEEPY_SEGMENT_WALKER_COUNTER = new AtomicLong(0L); + public static final QuerySegmentWalker SLEEPY_SEGMENT_WALKER = new QuerySegmentWalker() + { + @Override + public QueryRunner getQueryRunnerForIntervals( + Query query, final Iterable intervals + ) + { + return new QueryRunner() + { + @Override + public Sequence run( + Query query, Map responseContext + ) + { + return Sequences.map( + SimpleSequence.simple(ImmutableList.of("result")), + new Function() + { + @Nullable + @Override + public T apply(String input) + { + SLEEPY_SEGMENT_WALKER_COUNTER.incrementAndGet(); + try { + Thread.sleep(10_000); + } + catch (InterruptedException e) { + throw Throwables.propagate(e); + } + return null; + } + } + ); + } + }; + } + + @Override + public QueryRunner getQueryRunnerForSegments( + Query query, Iterable specs + ) + { + return getQueryRunnerForIntervals(null, null); + } + }; public static final QuerySegmentWalker testSegmentWalker = new QuerySegmentWalker() { @Override @@ -134,7 +193,8 @@ public void testGoodQuery() throws IOException testSegmentWalker, new NoopServiceEmitter(), new NoopRequestLogger(), - new QueryManager() + new QueryManager(), + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()) ); Response respone = queryResource.doPost( new ByteArrayInputStream(simpleTimeSeriesQuery.getBytes("UTF-8")), @@ -155,7 +215,8 @@ public void testBadQuery() throws IOException testSegmentWalker, new NoopServiceEmitter(), new NoopRequestLogger(), - new QueryManager() + new QueryManager(), + MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()) ); Response respone = queryResource.doPost( new ByteArrayInputStream("Meka Leka Hi Meka Hiney Ho".getBytes("UTF-8")), @@ -165,4 +226,100 @@ public void testBadQuery() throws IOException Assert.assertNotNull(respone); Assert.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), respone.getStatus()); } + + @Test + public void testTimeoutInYielder() throws IOException + { + final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + final QueryResource sleepyQueryResource = new QueryResource( + serverConfig, + jsonMapper, + jsonMapper, + SLEEPY_SEGMENT_WALKER, + new NoopServiceEmitter(), + new NoopRequestLogger(), + new QueryManager(), + executorService + ); + final String query = "{\n" + + " \"queryType\": \"timeseries\",\n" + + " \"dataSource\": \"mmx_metrics\",\n" + + " \"granularity\": \"hour\",\n" + + " \"intervals\": [\n" + + " \"2014-12-17/2015-12-30\"\n" + + " ],\n" + + " \"aggregations\": [\n" + + " {\n" + + " \"type\": \"count\",\n" + + " \"name\": \"rows\"\n" + + " }\n" + + " ]\n" + + " ,\"context\":{\"timeout\":10}" + + "}"; + Response response = sleepyQueryResource.doPost( + new ByteArrayInputStream(StringUtils.toUtf8(query)), + null, + testServletRequest + ); + Assert.assertEquals(500, response.getStatus()); + Assert.assertEquals("{\"error\":\"Query timeout\"}", StringUtils.fromUtf8((byte[]) response.getEntity())); + } + + @Test + public void testCancel() throws IOException, ExecutionException, InterruptedException + { + final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + final QueryResource sleepyQueryResource = new QueryResource( + serverConfig, + jsonMapper, + jsonMapper, + SLEEPY_SEGMENT_WALKER, + new NoopServiceEmitter(), + new NoopRequestLogger(), + new QueryManager(), + executorService + ); + final String queryId = "test cancel id"; + final String query = "{\n" + + " \"queryType\": \"timeseries\",\n" + + " \"dataSource\": \"mmx_metrics\",\n" + + " \"granularity\": \"hour\",\n" + + " \"intervals\": [\n" + + " \"2014-12-17/2015-12-30\"\n" + + " ],\n" + + " \"aggregations\": [\n" + + " {\n" + + " \"type\": \"count\",\n" + + " \"name\": \"rows\"\n" + + " }\n" + + " ]\n" + + " ,\"context\":{\"timeout\":10,\"queryId\":\"" + queryId + "\"}" + + "}"; + final long pre = SLEEPY_SEGMENT_WALKER_COUNTER.get(); + final ListeningExecutorService canceller = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + final ListenableFuture cancelFuture = canceller.submit( + new Callable() + { + @Override + public Response call() throws Exception + { + while (SLEEPY_SEGMENT_WALKER_COUNTER.get() == pre) { + ; + } + return sleepyQueryResource.cancelQuery(queryId); + } + } + ); + Response response = sleepyQueryResource.doPost( + new ByteArrayInputStream(StringUtils.toUtf8(query)), + null, + testServletRequest + ); + + Assert.assertEquals(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus()); + Assert.assertEquals("{\"error\":\"Query cancelled\"}", StringUtils.fromUtf8((byte[]) response.getEntity())); + + final Response cancelResponse = cancelFuture.get(); + Assert.assertEquals(Response.Status.ACCEPTED.getStatusCode(), cancelResponse.getStatus()); + } }