Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/content/configuration/broker.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.broker.http.numConnections`|Size of connection pool for the Broker to connect to historical and real-time processes. If there are more queries than this number that all need to speak to the same node, then they will queue up.|20|
|`druid.broker.http.compressionCodec`|Compression codec the Broker uses to communicate with historical and real-time processes. May be "gzip" or "identity".|gzip|
|`druid.broker.http.readTimeout`|The timeout for data reads from historical and real-time processes.|PT15M|
|`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](query-context.html) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE|

#### Retry Policy

Expand Down
1 change: 1 addition & 0 deletions docs/content/configuration/historical.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.server.http.maxIdleTime`|The Jetty max idle time for a connection.|PT5m|
|`druid.server.http.enableRequestLimit`|If enabled, no requests would be queued in jetty queue and "HTTP 429 Too Many Requests" error response would be sent. |false|
|`druid.server.http.defaultQueryTimeout`|Query timeout in millis, beyond which unfinished queries will be cancelled|300000|
|`druid.server.http.maxQueryTimeout`|Maximum allowed value (in milliseconds) for `timeout` parameter. See [query-context](query-context.html) to know more about `timeout`. Query is rejected if the query context `timeout` is greater than this value. |Long.MAX_VALUE|

#### Processing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@
import io.druid.query.QuerySegmentWalker;
import io.druid.query.SegmentDescriptor;
import io.druid.server.DruidNode;
import io.druid.server.SetAndVerifyContextQueryRunner;
import io.druid.server.initialization.ServerConfig;
import org.joda.time.Interval;

import java.util.Collection;
Expand Down Expand Up @@ -83,6 +85,7 @@ public class ThreadPoolTaskRunner implements TaskRunner, QuerySegmentWalker
private final CopyOnWriteArrayList<Pair<TaskRunnerListener, Executor>> listeners = new CopyOnWriteArrayList<>();
private final ServiceEmitter emitter;
private final TaskLocation location;
private final ServerConfig serverConfig;

private volatile boolean stopping = false;

Expand All @@ -91,13 +94,15 @@ public ThreadPoolTaskRunner(
TaskToolboxFactory toolboxFactory,
TaskConfig taskConfig,
ServiceEmitter emitter,
@Self DruidNode node
@Self DruidNode node,
ServerConfig serverConfig
)
{
this.toolboxFactory = Preconditions.checkNotNull(toolboxFactory, "toolboxFactory");
this.taskConfig = taskConfig;
this.emitter = Preconditions.checkNotNull(emitter, "emitter");
this.location = TaskLocation.create(node.getHost(), node.getPlaintextPort(), node.getTlsPort());
this.serverConfig = serverConfig;
}

@Override
Expand Down Expand Up @@ -362,7 +367,10 @@ private <T> QueryRunner<T> getQueryRunnerImpl(Query<T> query)
}
}

return queryRunner == null ? new NoopQueryRunner<T>() : queryRunner;
return new SetAndVerifyContextQueryRunner(
serverConfig,
queryRunner == null ? new NoopQueryRunner<T>() : queryRunner
);
}

private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@
import io.druid.server.coordination.DataSegmentAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.server.coordination.ServerType;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
Expand Down Expand Up @@ -622,7 +623,8 @@ private TaskRunner setUpThreadPoolTaskRunner(TaskToolboxFactory tb)
tb,
taskConfig,
emitter,
new DruidNode("dummy", "dummy", 10000, null, true, false)
new DruidNode("dummy", "dummy", 10000, null, true, false),
new ServerConfig()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.DruidNode;
import io.druid.server.initialization.IndexerZkConfig;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import org.apache.curator.framework.CuratorFramework;
Expand Down Expand Up @@ -194,7 +195,8 @@ public List<StorageLocationConfig> getLocations()
),
taskConfig,
new NoopServiceEmitter(),
DUMMY_NODE
DUMMY_NODE,
new ServerConfig()
)
);
}
Expand Down
17 changes: 17 additions & 0 deletions processing/src/main/java/io/druid/query/QueryContexts.java
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,23 @@ public static <T> Query<T> withMaxScatterGatherBytes(Query<T> query, long maxSca
}
}

public static <T> Query<T> verifyMaxQueryTimeout(Query<T> query, long maxQueryTimeout)
{
long timeout = getTimeout(query);
if (timeout > maxQueryTimeout) {
throw new IAE(
"configured [%s = %s] is more than enforced limit of maxQueryTimeout [%s].",
TIMEOUT_KEY,
timeout,
maxQueryTimeout
);
} else {
return query;
}
}



public static <T> long getMaxScatterGatherBytes(Query<T> query)
{
return parseLong(query, MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE);
Expand Down
35 changes: 35 additions & 0 deletions processing/src/test/java/io/druid/query/QueryContextsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,20 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.Intervals;
import io.druid.query.spec.MultipleIntervalSegmentSpec;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.HashMap;

public class QueryContextsTest
{
@Rule
public final ExpectedException exception = ExpectedException.none();

@Test
public void testDefaultQueryTimeout()
Expand Down Expand Up @@ -72,4 +77,34 @@ public void testQueryTimeout()
query = QueryContexts.withDefaultTimeout(query, 1_000_000);
Assert.assertEquals(1000, QueryContexts.getTimeout(query));
}

@Test
public void testQueryMaxTimeout()
{
exception.expect(IAE.class);
exception.expectMessage("configured [timeout = 1000] is more than enforced limit of maxQueryTimeout [100].");
Query<?> query = new TestQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
false,
ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1000)
);

QueryContexts.verifyMaxQueryTimeout(query, 100);
}

@Test
public void testMaxScatterGatherBytes()
{
exception.expect(IAE.class);
exception.expectMessage("configured [maxScatterGatherBytes = 1000] is more than enforced limit of [100].");
Query<?> query = new TestQuery(
new TableDataSource("test"),
new MultipleIntervalSegmentSpec(ImmutableList.of(Intervals.of("0/100"))),
false,
ImmutableMap.of(QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 1000)
);

QueryContexts.withMaxScatterGatherBytes(query, 100);
}
}
15 changes: 0 additions & 15 deletions server/src/main/java/io/druid/client/DirectDruidClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import io.druid.query.ResourceLimitExceededException;
import io.druid.query.Result;
import io.druid.query.aggregation.MetricManipulatorFns;
import io.druid.server.initialization.ServerConfig;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;
import org.jboss.netty.handler.codec.http.HttpChunk;
Expand Down Expand Up @@ -116,20 +115,6 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private final AtomicInteger openConnections;
private final boolean isSmile;

public static <T, QueryType extends Query<T>> QueryType withDefaultTimeoutAndMaxScatterGatherBytes(
final QueryType query,
ServerConfig serverConfig
)
{
return (QueryType) QueryContexts.withMaxScatterGatherBytes(
QueryContexts.withDefaultTimeout(
(Query) query,
serverConfig.getDefaultQueryTimeout()
),
serverConfig.getMaxScatterGatherBytes()
);
}

/**
* Removes the magical fields added by {@link #makeResponseContextForQuery(Query, long)}.
*/
Expand Down
17 changes: 12 additions & 5 deletions server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.druid.query.RetryQueryRunner;
import io.druid.query.RetryQueryRunnerConfig;
import io.druid.query.SegmentDescriptor;
import io.druid.server.initialization.ServerConfig;
import org.joda.time.Interval;

/**
Expand All @@ -45,21 +46,24 @@ public class ClientQuerySegmentWalker implements QuerySegmentWalker
private final QueryToolChestWarehouse warehouse;
private final RetryQueryRunnerConfig retryConfig;
private final ObjectMapper objectMapper;
private final ServerConfig serverConfig;

@Inject
public ClientQuerySegmentWalker(
ServiceEmitter emitter,
CachingClusteredClient baseClient,
QueryToolChestWarehouse warehouse,
RetryQueryRunnerConfig retryConfig,
ObjectMapper objectMapper
ObjectMapper objectMapper,
ServerConfig serverConfig
)
{
this.emitter = emitter;
this.baseClient = baseClient;
this.warehouse = warehouse;
this.retryConfig = retryConfig;
this.objectMapper = objectMapper;
this.serverConfig = serverConfig;
}

@Override
Expand All @@ -86,10 +90,13 @@ private <T> QueryRunner<T> makeRunner(Query<T> query, QueryRunner<T> baseClientR

return new FluentQueryRunnerBuilder<>(toolChest)
.create(
new RetryQueryRunner<>(
baseClientRunner,
retryConfig,
objectMapper
new SetAndVerifyContextQueryRunner(
serverConfig,
new RetryQueryRunner<>(
baseClientRunner,
retryConfig,
objectMapper
)
)
)
.applyPreMergeDecoration()
Expand Down
13 changes: 2 additions & 11 deletions server/src/main/java/io/druid/server/QueryLifecycle.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.security.Access;
import io.druid.server.security.AuthenticationResult;
Expand All @@ -58,7 +57,7 @@
*
* <ol>
* <li>Initialization ({@link #initialize(Query)})</li>
* <li>Authorization ({@link #authorize(String, String, HttpServletRequest)}</li>
* <li>Authorization ({@link #authorize(HttpServletRequest)}</li>
* <li>Execution ({@link #execute()}</li>
* <li>Logging ({@link #emitLogsAndMetrics(Throwable, String, long)}</li>
* </ol>
Expand All @@ -74,7 +73,6 @@ public class QueryLifecycle
private final GenericQueryMetricsFactory queryMetricsFactory;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final ServerConfig serverConfig;
private final AuthorizerMapper authorizerMapper;
private final long startMs;
private final long startNs;
Expand All @@ -90,7 +88,6 @@ public QueryLifecycle(
final GenericQueryMetricsFactory queryMetricsFactory,
final ServiceEmitter emitter,
final RequestLogger requestLogger,
final ServerConfig serverConfig,
final AuthorizerMapper authorizerMapper,
final long startMs,
final long startNs
Expand All @@ -101,7 +98,6 @@ public QueryLifecycle(
this.queryMetricsFactory = queryMetricsFactory;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.serverConfig = serverConfig;
this.authorizerMapper = authorizerMapper;
this.startMs = startMs;
this.startNs = startNs;
Expand Down Expand Up @@ -171,12 +167,7 @@ public void initialize(final Query baseQuery)
queryId = UUID.randomUUID().toString();
}

this.queryPlus = QueryPlus.wrap(
(Query) DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(
baseQuery.withId(queryId),
serverConfig
)
);
this.queryPlus = QueryPlus.wrap(baseQuery.withId(queryId));
this.toolChest = warehouse.getToolChest(baseQuery);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.security.AuthConfig;
import io.druid.server.security.AuthorizerMapper;
Expand All @@ -38,7 +37,6 @@ public class QueryLifecycleFactory
private final GenericQueryMetricsFactory queryMetricsFactory;
private final ServiceEmitter emitter;
private final RequestLogger requestLogger;
private final ServerConfig serverConfig;
private final AuthorizerMapper authorizerMapper;

@Inject
Expand All @@ -48,7 +46,6 @@ public QueryLifecycleFactory(
final GenericQueryMetricsFactory queryMetricsFactory,
final ServiceEmitter emitter,
final RequestLogger requestLogger,
final ServerConfig serverConfig,
final AuthConfig authConfig,
final AuthorizerMapper authorizerMapper
)
Expand All @@ -58,7 +55,6 @@ public QueryLifecycleFactory(
this.queryMetricsFactory = queryMetricsFactory;
this.emitter = emitter;
this.requestLogger = requestLogger;
this.serverConfig = serverConfig;
this.authorizerMapper = authorizerMapper;
}

Expand All @@ -70,7 +66,6 @@ public QueryLifecycle factorize()
queryMetricsFactory,
emitter,
requestLogger,
serverConfig,
authorizerMapper,
System.currentTimeMillis(),
System.nanoTime()
Expand Down
Loading