Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.guice.annotations;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@BindingAnnotation
@Target({ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface QueryResourceWaitingPool
{
}
17 changes: 17 additions & 0 deletions server/src/main/java/io/druid/guice/DruidProcessingModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.druid.guice;

import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Binder;
Expand All @@ -35,6 +36,7 @@
import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Global;
import io.druid.guice.annotations.Processing;
import io.druid.guice.annotations.QueryResourceWaitingPool;
import io.druid.offheap.OffheapBufferPool;
import io.druid.query.DruidProcessingConfig;
import io.druid.query.MetricsEmittingExecutorService;
Expand Down Expand Up @@ -97,6 +99,21 @@ public ExecutorService getProcessingExecutorService(
);
}

@Provides
@QueryResourceWaitingPool
@LazySingleton
public ListeningExecutorService getQueryResourceWaitingPool() throws Exception
{
return MoreExecutors.listeningDecorator(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason listeningDecorator(..) is needed and ExecutorService is not enough?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i am understanding it correct, its listeningexecutor so that the futures can be registered with QueryWatcher,
see QueryManager.registerQuery()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I needed to register

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks

Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setDaemon(false)
.setNameFormat("QueryResourceWaitingPool-%d")
.build()
)
);
}

@Provides
@LazySingleton
@Global
Expand Down
77 changes: 62 additions & 15 deletions server/src/main/java/io/druid/server/QueryResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MapMaker;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
Expand All @@ -31,11 +33,12 @@
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.guice.annotations.Json;
import io.druid.guice.annotations.QueryResourceWaitingPool;
import io.druid.guice.annotations.Smile;
import io.druid.query.DruidMetrics;
import io.druid.query.Query;
import io.druid.query.QueryContextKeys;
import io.druid.query.QueryInterruptedException;
import io.druid.query.DruidMetrics;
import io.druid.query.QuerySegmentWalker;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
Expand All @@ -59,6 +62,11 @@
import java.io.OutputStream;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
*/
Expand All @@ -76,6 +84,7 @@ public class QueryResource
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final QueryManager queryManager;
private final ListeningExecutorService waitingService;

@Inject
public QueryResource(
Expand All @@ -85,7 +94,9 @@ public QueryResource(
QuerySegmentWalker texasRanger,
ServiceEmitter emitter,
RequestLogger requestLogger,
QueryManager queryManager
QueryManager queryManager,
@QueryResourceWaitingPool
ListeningExecutorService waitingService
)
{
this.config = config;
Expand All @@ -95,12 +106,13 @@ public QueryResource(
this.emitter = emitter;
this.requestLogger = requestLogger;
this.queryManager = queryManager;
this.waitingService = waitingService;
}

@DELETE
@Path("{id}")
@Produces(MediaType.APPLICATION_JSON)
public Response getServer(@PathParam("id") String queryId)
public Response cancelQuery(@PathParam("id") String queryId)
{
queryManager.cancelQuery(queryId);
return Response.status(Response.Status.ACCEPTED).build();
Expand Down Expand Up @@ -145,6 +157,7 @@ public Response doPost(
)
);
}
final long timeout = Long.parseLong(query.getContextValue(QueryContextKeys.TIMEOUT).toString());

if (log.isDebugEnabled()) {
log.debug("Got query [%s]", query);
Expand All @@ -159,19 +172,53 @@ public Response doPost(
results = res;
}

final Yielder yielder = results.toYielder(
null,
new YieldingAccumulator()
{
@Override
public Object accumulate(Object accumulated, Object in)
final Yielder yielder;
try {
ListenableFuture<Yielder> future = waitingService.submit(
new Callable<Yielder>()
{
yield();
return in;
@Override
public Yielder call() throws Exception
{
return results.toYielder(
null,
new YieldingAccumulator()
{
@Override
public Object accumulate(Object accumulated, Object in)
{
yield();
return in;
}
}
);
}
}
}
);

);
queryManager.registerQuery(query, future);
yielder = future.get(timeout, TimeUnit.MILLISECONDS);
}
catch (TimeoutException ex) {
queryManager.cancelQuery(queryId);
throw new QueryInterruptedException("Query timeout");
}
catch (InterruptedException ex) {
queryManager.cancelQuery(queryId);
throw new QueryInterruptedException("Query interrupted");
}
catch (CancellationException ex) {
queryManager.cancelQuery(queryId);
throw new QueryInterruptedException("Query cancelled");
}
catch (ExecutionException ex) {
throw new QueryInterruptedException(
String.format(
"Query error [%s] : [%s]",
ex.getCause(),
ex.getCause().getMessage()
)
);
}
try {
final Query theQuery = query;
return Response
Expand All @@ -188,7 +235,7 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE
final long queryTime = System.currentTimeMillis() - start;
emitter.emit(
DruidMetrics.makeQueryTimeMetric(jsonMapper, theQuery, req.getRemoteAddr())
.build("query/time", queryTime)
.build("query/time", queryTime)
);

requestLogger.log(
Expand Down
Loading