diff --git a/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java b/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java index 1cfe559ee1b2..0f831f77eeef 100644 --- a/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java +++ b/benchmarks/src/main/java/io/druid/benchmark/query/SqlBenchmark.java @@ -38,7 +38,6 @@ import io.druid.query.dimension.DimensionSpec; import io.druid.query.groupby.GroupByQuery; import io.druid.segment.QueryableIndex; -import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.DruidPlanner; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.planner.PlannerFactory; @@ -116,11 +115,10 @@ public void setup() throws Exception plannerFactory = new PlannerFactory( CalciteTests.createMockSchema(walker, plannerConfig), - walker, + CalciteTests.createMockQueryLifecycleFactory(walker), CalciteTests.createOperatorTable(), CalciteTests.createExprMacroTable(), - plannerConfig, - new ServerConfig() + plannerConfig ); groupByQuery = GroupByQuery .builder() diff --git a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java index b38c4c93ffa5..807c7b434f16 100644 --- a/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java +++ b/extensions-core/histogram/src/test/java/io/druid/query/aggregation/histogram/sql/QuantileSqlAggregatorTest.java @@ -26,7 +26,6 @@ import io.druid.java.util.common.granularity.Granularities; import io.druid.java.util.common.guava.Sequences; import io.druid.query.Druids; -import io.druid.query.QueryContexts; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.FilteredAggregatorFactory; @@ -45,7 +44,6 @@ import io.druid.segment.column.ValueType; import io.druid.segment.incremental.IncrementalIndexSchema; import io.druid.segment.virtual.ExpressionVirtualColumn; -import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.filtration.Filtration; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; @@ -131,11 +129,10 @@ public void setUp() throws Exception ); plannerFactory = new PlannerFactory( druidSchema, - walker, + CalciteTests.createMockQueryLifecycleFactory(walker), operatorTable, CalciteTests.createExprMacroTable(), - plannerConfig, - new ServerConfig() + plannerConfig ); } @@ -223,11 +220,7 @@ public void testQuantileOnFloatAndLongs() throws Exception new QuantilePostAggregator("a7", "a5:agg", 0.999f), new QuantilePostAggregator("a8", "a8:agg", 0.50f) )) - .context(ImmutableMap.of( - "skipEmptyBuckets", true, - QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, - QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE - )) + .context(ImmutableMap.of("skipEmptyBuckets", true)) .build(), Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) ); @@ -287,11 +280,7 @@ public void testQuantileOnComplexColumn() throws Exception new QuantilePostAggregator("a5", "a5:agg", 0.999f), new QuantilePostAggregator("a6", "a4:agg", 0.999f) )) - .context(ImmutableMap.of( - "skipEmptyBuckets", true, - QueryContexts.DEFAULT_TIMEOUT_KEY, QueryContexts.DEFAULT_TIMEOUT_MILLIS, - QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, Long.MAX_VALUE - )) + .context(ImmutableMap.of("skipEmptyBuckets", true)) .build(), Iterables.getOnlyElement(queryLogHook.getRecordedQueries()) ); diff --git a/server/src/main/java/io/druid/client/DirectDruidClient.java b/server/src/main/java/io/druid/client/DirectDruidClient.java index 9e2c0c6fe77e..5c1dfef58d93 100644 --- a/server/src/main/java/io/druid/client/DirectDruidClient.java +++ b/server/src/main/java/io/druid/client/DirectDruidClient.java @@ -128,6 +128,15 @@ public static > QueryType withDefaultTimeoutAndMax ); } + /** + * Removes the magical fields added by {@link #makeResponseContextForQuery(Query, long)}. + */ + public static void removeMagicResponseContextFields(Map responseContext) + { + responseContext.remove(DirectDruidClient.QUERY_FAIL_TIME); + responseContext.remove(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED); + } + public static Map makeResponseContextForQuery(Query query, long startTimeMillis) { final Map responseContext = new MapMaker().makeMap(); diff --git a/server/src/main/java/io/druid/server/BrokerQueryResource.java b/server/src/main/java/io/druid/server/BrokerQueryResource.java index d94966a902aa..402e59f4356c 100644 --- a/server/src/main/java/io/druid/server/BrokerQueryResource.java +++ b/server/src/main/java/io/druid/server/BrokerQueryResource.java @@ -22,19 +22,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.inject.Inject; -import com.metamx.emitter.service.ServiceEmitter; import com.sun.jersey.spi.container.ResourceFilters; import io.druid.client.ServerViewUtil; import io.druid.client.TimelineServerView; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; import io.druid.query.Query; -import io.druid.query.GenericQueryMetricsFactory; -import io.druid.query.QuerySegmentWalker; -import io.druid.query.QueryToolChestWarehouse; import io.druid.server.http.security.StateResourceFilter; -import io.druid.server.initialization.ServerConfig; -import io.druid.server.log.RequestLogger; import io.druid.server.security.AuthConfig; import javax.servlet.http.HttpServletRequest; @@ -59,30 +53,20 @@ public class BrokerQueryResource extends QueryResource @Inject public BrokerQueryResource( - QueryToolChestWarehouse warehouse, - ServerConfig config, + QueryLifecycleFactory queryLifecycleFactory, @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, - QuerySegmentWalker texasRanger, - ServiceEmitter emitter, - RequestLogger requestLogger, QueryManager queryManager, AuthConfig authConfig, - GenericQueryMetricsFactory queryMetricsFactory, TimelineServerView brokerServerView ) { super( - warehouse, - config, + queryLifecycleFactory, jsonMapper, smileMapper, - texasRanger, - emitter, - requestLogger, queryManager, - authConfig, - queryMetricsFactory + authConfig ); this.brokerServerView = brokerServerView; } diff --git a/server/src/main/java/io/druid/server/QueryLifecycle.java b/server/src/main/java/io/druid/server/QueryLifecycle.java new file mode 100644 index 000000000000..4104e1638e25 --- /dev/null +++ b/server/src/main/java/io/druid/server/QueryLifecycle.java @@ -0,0 +1,363 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import com.google.common.base.Strings; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.client.DirectDruidClient; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.guava.Sequence; +import io.druid.java.util.common.guava.SequenceWrapper; +import io.druid.java.util.common.guava.Sequences; +import io.druid.java.util.common.logger.Logger; +import io.druid.query.DruidMetrics; +import io.druid.query.GenericQueryMetricsFactory; +import io.druid.query.Query; +import io.druid.query.QueryInterruptedException; +import io.druid.query.QueryMetrics; +import io.druid.query.QueryPlus; +import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryToolChest; +import io.druid.query.QueryToolChestWarehouse; +import io.druid.server.initialization.ServerConfig; +import io.druid.server.log.RequestLogger; +import io.druid.server.security.Access; +import io.druid.server.security.Action; +import io.druid.server.security.AuthConfig; +import io.druid.server.security.AuthorizationInfo; +import io.druid.server.security.Resource; +import io.druid.server.security.ResourceType; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * Class that helps a Druid server (broker, historical, etc) manage the lifecycle of a query that it is handling. It + * ensures that a query goes through the following stages, in the proper order: + * + *
    + *
  1. Initialization ({@link #initialize(Query)})
  2. + *
  3. Authorization ({@link #authorize(AuthorizationInfo)}
  4. + *
  5. Execution ({@link #execute()}
  6. + *
  7. Logging ({@link #emitLogsAndMetrics(Throwable, String, long)}
  8. + *
+ * + * This object is not thread-safe. + */ +public class QueryLifecycle +{ + private static final Logger log = new Logger(QueryLifecycle.class); + + private final QueryToolChestWarehouse warehouse; + private final QuerySegmentWalker texasRanger; + private final GenericQueryMetricsFactory queryMetricsFactory; + private final ServiceEmitter emitter; + private final RequestLogger requestLogger; + private final ServerConfig serverConfig; + private final AuthConfig authConfig; + private final long startMs; + private final long startNs; + + private State state = State.NEW; + private QueryToolChest toolChest; + private QueryPlus queryPlus; + + public QueryLifecycle( + final QueryToolChestWarehouse warehouse, + final QuerySegmentWalker texasRanger, + final GenericQueryMetricsFactory queryMetricsFactory, + final ServiceEmitter emitter, + final RequestLogger requestLogger, + final ServerConfig serverConfig, + final AuthConfig authConfig, + final long startMs, + final long startNs + ) + { + this.warehouse = warehouse; + this.texasRanger = texasRanger; + this.queryMetricsFactory = queryMetricsFactory; + this.emitter = emitter; + this.requestLogger = requestLogger; + this.serverConfig = serverConfig; + this.authConfig = authConfig; + this.startMs = startMs; + this.startNs = startNs; + } + + /** + * For callers where simplicity is desiredĀ over flexibility. This method does it all in one call. If the request + * is unauthorized, an IllegalStateException will be thrown. Logs and metrics are emitted when the Sequence is + * either fully iterated or throws an exception. + * + * @param query the query + * @param authorizationInfo authorization info from the request; or null if none is present. This must be non-null + * if security is enabled, or the request will be considered unauthorized. + * @param remoteAddress remote address, for logging; or null if unknown + * + * @return results + */ + @SuppressWarnings("unchecked") + public Sequence runSimple( + final Query query, + @Nullable final AuthorizationInfo authorizationInfo, + @Nullable final String remoteAddress + ) + { + initialize(query); + + final Sequence results; + + try { + final Access access = authorize(authorizationInfo); + if (!access.isAllowed()) { + throw new ISE("Unauthorized"); + } + + final QueryLifecycle.QueryResponse queryResponse = execute(); + results = queryResponse.getResults(); + } + catch (Throwable e) { + emitLogsAndMetrics(e, remoteAddress, -1); + throw e; + } + + return Sequences.wrap( + results, + new SequenceWrapper() + { + @Override + public void after(final boolean isDone, final Throwable thrown) throws Exception + { + emitLogsAndMetrics(thrown, remoteAddress, -1); + } + } + ); + } + + /** + * Initializes this object to execute a specific query. Does not actually execute the query. + * + * @param baseQuery the query + */ + @SuppressWarnings("unchecked") + public void initialize(final Query baseQuery) + { + transition(State.NEW, State.INITIALIZED); + + String queryId = baseQuery.getId(); + if (queryId == null) { + queryId = UUID.randomUUID().toString(); + } + + this.queryPlus = QueryPlus.wrap( + (Query) DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( + baseQuery.withId(queryId), + serverConfig + ) + ); + this.toolChest = warehouse.getToolChest(baseQuery); + } + + /** + * Authorize the query. Will return an Access object denoting whether the query is authorized or not. + * + * @param authorizationInfo authorization info from the request; or null if none is present. This must be non-null + * if security is enabled, or the request will be considered unauthorized. + * + * @return authorization result + * + * @throws IllegalStateException if security is enabled and authorizationInfo is null + */ + public Access authorize(@Nullable final AuthorizationInfo authorizationInfo) + { + transition(State.INITIALIZED, State.AUTHORIZING); + + if (authConfig.isEnabled()) { + // This is an experimental feature, see - https://github.com/druid-io/druid/pull/2424 + if (authorizationInfo != null) { + for (String dataSource : queryPlus.getQuery().getDataSource().getNames()) { + Access authResult = authorizationInfo.isAuthorized( + new Resource(dataSource, ResourceType.DATASOURCE), + Action.READ + ); + if (!authResult.isAllowed()) { + // Not authorized; go straight to Jail, do not pass Go. + transition(State.AUTHORIZING, State.DONE); + return authResult; + } + } + + transition(State.AUTHORIZING, State.AUTHORIZED); + return new Access(true); + } else { + throw new ISE("WTF?! Security is enabled but no authorization info found in the request"); + } + } else { + transition(State.AUTHORIZING, State.AUTHORIZED); + return new Access(true); + } + } + + /** + * Execute the query. Can only be called if the query has been authorized. Note that query logs and metrics will + * not be emitted automatically when the Sequence is fully iterated. It is the caller's responsibility to call + * {@link #emitLogsAndMetrics(Throwable, String, long)} to emit logs and metrics. + * + * @return result sequence and response context + */ + public QueryResponse execute() + { + transition(State.AUTHORIZED, State.EXECUTING); + + final Map responseContext = DirectDruidClient.makeResponseContextForQuery( + queryPlus.getQuery(), + System.currentTimeMillis() + ); + + final Sequence res = queryPlus.run(texasRanger, responseContext); + + return new QueryResponse(res == null ? Sequences.empty() : res, responseContext); + } + + /** + * Emit logs and metrics for this query. + * + * @param e exception that occurred while processing this query + * @param remoteAddress remote address, for logging; or null if unknown + * @param bytesWritten number of bytes written; will become a query/bytes metric if >= 0 + */ + @SuppressWarnings("unchecked") + public void emitLogsAndMetrics( + @Nullable final Throwable e, + @Nullable final String remoteAddress, + final long bytesWritten + ) + { + if (queryPlus == null) { + // Never initialized, don't log or emit anything. + return; + } + + if (state == State.DONE) { + log.warn("Tried to emit logs and metrics twice for query[%s]!", queryPlus.getQuery().getId()); + } + + state = State.DONE; + + final Query query = queryPlus != null ? queryPlus.getQuery() : null; + final boolean success = e == null; + + try { + final long queryTimeNs = System.nanoTime() - startNs; + QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics( + queryMetricsFactory, + toolChest, + queryPlus.getQuery(), + Strings.nullToEmpty(remoteAddress) + ); + queryMetrics.success(success); + queryMetrics.reportQueryTime(queryTimeNs); + + if (bytesWritten >= 0) { + queryMetrics.reportQueryBytes(bytesWritten); + } + + queryMetrics.emit(emitter); + + final Map statsMap = new LinkedHashMap<>(); + statsMap.put("query/time", TimeUnit.NANOSECONDS.toMillis(queryTimeNs)); + statsMap.put("query/bytes", bytesWritten); + statsMap.put("success", success); + if (e != null) { + statsMap.put("exception", e.toString()); + + if (e instanceof QueryInterruptedException) { + // Mimic behavior from QueryResource, where this code was originally taken from. + log.warn(e, "Exception while processing queryId [%s]", queryPlus.getQuery().getId()); + statsMap.put("interrupted", true); + statsMap.put("reason", e.toString()); + } + } + + requestLogger.log( + new RequestLogLine( + new DateTime(startMs), + Strings.nullToEmpty(remoteAddress), + queryPlus.getQuery(), + new QueryStats(statsMap) + ) + ); + } + catch (Exception ex) { + log.error(ex, "Unable to log query [%s]!", query); + } + } + + public Query getQuery() + { + return queryPlus.getQuery(); + } + + private void transition(final State from, final State to) + { + if (state != from) { + throw new ISE("Cannot transition from[%s] to[%s].", from, to); + } + + state = to; + } + + enum State + { + NEW, + INITIALIZED, + AUTHORIZING, + AUTHORIZED, + EXECUTING, + DONE + } + + public static class QueryResponse + { + private final Sequence results; + private final Map responseContext; + + private QueryResponse(final Sequence results, final Map responseContext) + { + this.results = results; + this.responseContext = responseContext; + } + + public Sequence getResults() + { + return results; + } + + public Map getResponseContext() + { + return responseContext; + } + } +} diff --git a/server/src/main/java/io/druid/server/QueryLifecycleFactory.java b/server/src/main/java/io/druid/server/QueryLifecycleFactory.java new file mode 100644 index 000000000000..a6fbc3cca719 --- /dev/null +++ b/server/src/main/java/io/druid/server/QueryLifecycleFactory.java @@ -0,0 +1,77 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server; + +import com.google.inject.Inject; +import com.metamx.emitter.service.ServiceEmitter; +import io.druid.guice.LazySingleton; +import io.druid.query.GenericQueryMetricsFactory; +import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryToolChestWarehouse; +import io.druid.server.initialization.ServerConfig; +import io.druid.server.log.RequestLogger; +import io.druid.server.security.AuthConfig; + +@LazySingleton +public class QueryLifecycleFactory +{ + private final QueryToolChestWarehouse warehouse; + private final QuerySegmentWalker texasRanger; + private final GenericQueryMetricsFactory queryMetricsFactory; + private final ServiceEmitter emitter; + private final RequestLogger requestLogger; + private final ServerConfig serverConfig; + private final AuthConfig authConfig; + + @Inject + public QueryLifecycleFactory( + final QueryToolChestWarehouse warehouse, + final QuerySegmentWalker texasRanger, + final GenericQueryMetricsFactory queryMetricsFactory, + final ServiceEmitter emitter, + final RequestLogger requestLogger, + final ServerConfig serverConfig, + final AuthConfig authConfig + ) + { + this.warehouse = warehouse; + this.texasRanger = texasRanger; + this.queryMetricsFactory = queryMetricsFactory; + this.emitter = emitter; + this.requestLogger = requestLogger; + this.serverConfig = serverConfig; + this.authConfig = authConfig; + } + + public QueryLifecycle factorize() + { + return new QueryLifecycle( + warehouse, + texasRanger, + queryMetricsFactory, + emitter, + requestLogger, + serverConfig, + authConfig, + System.currentTimeMillis(), + System.nanoTime() + ); + } +} diff --git a/server/src/main/java/io/druid/server/QueryResource.java b/server/src/main/java/io/druid/server/QueryResource.java index 1e6e8c0e457e..84caf56ee70f 100644 --- a/server/src/main/java/io/druid/server/QueryResource.java +++ b/server/src/main/java/io/druid/server/QueryResource.java @@ -30,29 +30,17 @@ import com.google.common.io.CountingOutputStream; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; -import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.DirectDruidClient; import io.druid.guice.LazySingleton; import io.druid.guice.annotations.Json; import io.druid.guice.annotations.Smile; -import io.druid.java.util.common.ISE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.guava.Sequence; -import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.guava.Yielder; import io.druid.java.util.common.guava.Yielders; -import io.druid.query.DruidMetrics; -import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; import io.druid.query.QueryContexts; import io.druid.query.QueryInterruptedException; -import io.druid.query.QueryMetrics; -import io.druid.query.QueryPlus; -import io.druid.query.QuerySegmentWalker; -import io.druid.query.QueryToolChest; -import io.druid.query.QueryToolChestWarehouse; -import io.druid.server.initialization.ServerConfig; -import io.druid.server.log.RequestLogger; import io.druid.server.metrics.QueryCountStatsProvider; import io.druid.server.security.Access; import io.druid.server.security.Action; @@ -80,8 +68,6 @@ import java.io.OutputStream; import java.util.Map; import java.util.Set; -import java.util.UUID; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /** @@ -99,48 +85,33 @@ public class QueryResource implements QueryCountStatsProvider public static final String HEADER_IF_NONE_MATCH = "If-None-Match"; public static final String HEADER_ETAG = "ETag"; - protected final QueryToolChestWarehouse warehouse; - protected final ServerConfig config; + protected final QueryLifecycleFactory queryLifecycleFactory; protected final ObjectMapper jsonMapper; protected final ObjectMapper smileMapper; protected final ObjectMapper serializeDateTimeAsLongJsonMapper; protected final ObjectMapper serializeDateTimeAsLongSmileMapper; - protected final QuerySegmentWalker texasRanger; - protected final ServiceEmitter emitter; - protected final RequestLogger requestLogger; protected final QueryManager queryManager; protected final AuthConfig authConfig; - private final GenericQueryMetricsFactory queryMetricsFactory; private final AtomicLong successfulQueryCount = new AtomicLong(); private final AtomicLong failedQueryCount = new AtomicLong(); private final AtomicLong interruptedQueryCount = new AtomicLong(); @Inject public QueryResource( - QueryToolChestWarehouse warehouse, - ServerConfig config, + QueryLifecycleFactory queryLifecycleFactory, @Json ObjectMapper jsonMapper, @Smile ObjectMapper smileMapper, - QuerySegmentWalker texasRanger, - ServiceEmitter emitter, - RequestLogger requestLogger, QueryManager queryManager, - AuthConfig authConfig, - GenericQueryMetricsFactory queryMetricsFactory + AuthConfig authConfig ) { - this.warehouse = warehouse; - this.config = config; + this.queryLifecycleFactory = queryLifecycleFactory; this.jsonMapper = jsonMapper; this.smileMapper = smileMapper; this.serializeDateTimeAsLongJsonMapper = serializeDataTimeAsLong(jsonMapper); this.serializeDateTimeAsLongSmileMapper = serializeDataTimeAsLong(smileMapper); - this.texasRanger = texasRanger; - this.emitter = emitter; - this.requestLogger = requestLogger; this.queryManager = queryManager; this.authConfig = authConfig; - this.queryMetricsFactory = queryMetricsFactory; } @DELETE @@ -181,35 +152,21 @@ public Response getServer(@PathParam("id") String queryId, @Context final HttpSe @Produces({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE}) @Consumes({MediaType.APPLICATION_JSON, SmileMediaTypes.APPLICATION_JACKSON_SMILE, APPLICATION_SMILE}) public Response doPost( - InputStream in, - @QueryParam("pretty") String pretty, + final InputStream in, + @QueryParam("pretty") final String pretty, @Context final HttpServletRequest req // used to get request content-type, remote address and AuthorizationInfo ) throws IOException { - final long startNs = System.nanoTime(); + final QueryLifecycle queryLifecycle = queryLifecycleFactory.factorize(); Query query = null; - QueryToolChest toolChest = null; - String queryId = null; final ResponseContext context = createContext(req.getContentType(), pretty != null); final String currThreadName = Thread.currentThread().getName(); try { - - query = context.getObjectMapper().readValue(in, Query.class); - queryId = query.getId(); - if (queryId == null) { - queryId = UUID.randomUUID().toString(); - query = query.withId(queryId); - } - - query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes(query, config); - final Map responseContext = DirectDruidClient.makeResponseContextForQuery( - query, - System.currentTimeMillis() - ); - - toolChest = warehouse.getToolChest(query); + queryLifecycle.initialize(readQuery(req, in, context)); + query = queryLifecycle.getQuery(); + final String queryId = query.getId(); Thread.currentThread() .setName(StringUtils.format("%s[%s_%s_%s]", currThreadName, query.getType(), query.getDataSource().getNames(), queryId)); @@ -217,49 +174,23 @@ public Response doPost( log.debug("Got query [%s]", query); } - if (authConfig.isEnabled()) { - // This is an experimental feature, see - https://github.com/druid-io/druid/pull/2424 - AuthorizationInfo authorizationInfo = (AuthorizationInfo) req.getAttribute(AuthConfig.DRUID_AUTH_TOKEN); - if (authorizationInfo != null) { - for (String dataSource : query.getDataSource().getNames()) { - Access authResult = authorizationInfo.isAuthorized( - new Resource(dataSource, ResourceType.DATASOURCE), - Action.READ - ); - if (!authResult.isAllowed()) { - return Response.status(Response.Status.FORBIDDEN).header("Access-Check-Result", authResult).build(); - } - } - } else { - throw new ISE("WTF?! Security is enabled but no authorization info found in the request"); - } + final Access authResult = queryLifecycle.authorize((AuthorizationInfo) req.getAttribute(AuthConfig.DRUID_AUTH_TOKEN)); + if (!authResult.isAllowed()) { + return Response.status(Response.Status.FORBIDDEN).header("Access-Check-Result", authResult).build(); } - String prevEtag = req.getHeader(HEADER_IF_NONE_MATCH); - if (prevEtag != null) { - query = query.withOverriddenContext( - ImmutableMap.of (HEADER_IF_NONE_MATCH, prevEtag) - ); - } - - final Sequence res = QueryPlus.wrap(query).run(texasRanger, responseContext); + final QueryLifecycle.QueryResponse queryResponse = queryLifecycle.execute(); + final Sequence results = queryResponse.getResults(); + final Map responseContext = queryResponse.getResponseContext(); + final String prevEtag = getPreviousEtag(req); if (prevEtag != null && prevEtag.equals(responseContext.get(HEADER_ETAG))) { return Response.notModified().build(); } - final Sequence results; - if (res == null) { - results = Sequences.empty(); - } else { - results = res; - } - - final Yielder yielder = Yielders.each(results); + final Yielder yielder = Yielders.each(results); try { - final Query theQuery = query; - final QueryToolChest theToolChest = toolChest; boolean shouldFinalize = QueryContexts.isFinalize(query, true); boolean serializeDateTimeAsLong = QueryContexts.isSerializeDateTimeAsLong(query, false) @@ -272,8 +203,7 @@ public Response doPost( @Override public void write(OutputStream outputStream) throws IOException, WebApplicationException { - boolean success = false; - String exceptionStr = ""; + Exception e = null; CountingOutputStream os = new CountingOutputStream(outputStream); try { @@ -282,63 +212,21 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE os.flush(); // Some types of OutputStream suppress flush errors in the .close() method. os.close(); - - success = true; } catch (Exception ex) { - exceptionStr = ex.toString(); + e = ex; log.error(ex, "Unable to send query response."); throw Throwables.propagate(ex); } finally { - try { - if (success) { - successfulQueryCount.incrementAndGet(); - } else { - failedQueryCount.incrementAndGet(); - } - - final long queryTimeNs = System.nanoTime() - startNs; - QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics( - queryMetricsFactory, - theToolChest, - theQuery, - req.getRemoteAddr() - ); - queryMetrics.success(success); - queryMetrics.reportQueryTime(queryTimeNs).emit(emitter); - - DruidMetrics.makeRequestMetrics( - queryMetricsFactory, - theToolChest, - theQuery, - req.getRemoteAddr() - ).reportQueryBytes(os.getCount()).emit(emitter); - - ImmutableMap.Builder statsMapBuilder = ImmutableMap.builder(); - statsMapBuilder.put("query/time", TimeUnit.NANOSECONDS.toMillis(queryTimeNs)); - statsMapBuilder.put("query/bytes", os.getCount()); - statsMapBuilder.put("success", success); - if (!success) { - statsMapBuilder.put("exception", exceptionStr); - } - - requestLogger.log( - new RequestLogLine( - new DateTime(TimeUnit.NANOSECONDS.toMillis(startNs)), - req.getRemoteAddr(), - theQuery, - new QueryStats( - statsMapBuilder.build() - ) - ) - ); - } - catch (Exception ex) { - log.error(ex, "Unable to log query [%s]!", theQuery); - } - finally { - Thread.currentThread().setName(currThreadName); + Thread.currentThread().setName(currThreadName); + + queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), os.getCount()); + + if (e == null) { + successfulQueryCount.incrementAndGet(); + } else { + failedQueryCount.incrementAndGet(); } } } @@ -352,8 +240,7 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE responseContext.remove(HEADER_ETAG); } - responseContext.remove(DirectDruidClient.QUERY_FAIL_TIME); - responseContext.remove(DirectDruidClient.QUERY_TOTAL_BYTES_GATHERED); + DirectDruidClient.removeMagicResponseContextFields(responseContext); //Limit the response-context header, see https://github.com/druid-io/druid/issues/2331 //Note that Response.ResponseBuilder.header(String key,Object value).build() calls value.toString() @@ -379,86 +266,17 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE } } catch (QueryInterruptedException e) { - try { - log.warn(e, "Exception while processing queryId [%s]", queryId); - interruptedQueryCount.incrementAndGet(); - final long queryTimeNs = System.nanoTime() - startNs; - QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics( - queryMetricsFactory, - toolChest, - query, - req.getRemoteAddr() - ); - queryMetrics.success(false); - queryMetrics.reportQueryTime(queryTimeNs).emit(emitter); - requestLogger.log( - new RequestLogLine( - new DateTime(TimeUnit.NANOSECONDS.toMillis(startNs)), - req.getRemoteAddr(), - query, - new QueryStats( - ImmutableMap.of( - "query/time", - TimeUnit.NANOSECONDS.toMillis(queryTimeNs), - "success", - false, - "interrupted", - true, - "reason", - e.toString() - ) - ) - ) - ); - } - catch (Exception e2) { - log.error(e2, "Unable to log query [%s]!", query); - } + interruptedQueryCount.incrementAndGet(); + queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1); return context.gotError(e); } catch (Exception e) { - // Input stream has already been consumed by the json object mapper if query == null - final String queryString = - query == null - ? "unparsable query" - : query.toString(); - - log.warn(e, "Exception occurred on request [%s]", queryString); failedQueryCount.incrementAndGet(); - - try { - final long queryTimeNs = System.nanoTime() - startNs; - QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics( - queryMetricsFactory, - toolChest, - query, - req.getRemoteAddr() - ); - queryMetrics.success(false); - queryMetrics.reportQueryTime(queryTimeNs).emit(emitter); - requestLogger.log( - new RequestLogLine( - new DateTime(TimeUnit.NANOSECONDS.toMillis(startNs)), - req.getRemoteAddr(), - query, - new QueryStats(ImmutableMap.of( - "query/time", - TimeUnit.NANOSECONDS.toMillis(queryTimeNs), - "success", - false, - "exception", - e.toString() - )) - ) - ); - } - catch (Exception e2) { - log.error(e2, "Unable to log query [%s]!", queryString); - } + queryLifecycle.emitLogsAndMetrics(e, req.getRemoteAddr(), -1); log.makeAlert(e, "Exception handling request") .addData("exception", e.toString()) - .addData("query", queryString) + .addData("query", query != null ? query.toString() : "unparseable query") .addData("peer", req.getRemoteAddr()) .emit(); @@ -469,6 +287,29 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE } } + private static Query readQuery( + final HttpServletRequest req, + final InputStream in, + final ResponseContext context + ) throws IOException + { + Query baseQuery = context.getObjectMapper().readValue(in, Query.class); + String prevEtag = getPreviousEtag(req); + + if (prevEtag != null) { + baseQuery = baseQuery.withOverriddenContext( + ImmutableMap.of(HEADER_IF_NONE_MATCH, prevEtag) + ); + } + + return baseQuery; + } + + private static String getPreviousEtag(final HttpServletRequest req) + { + return req.getHeader(HEADER_IF_NONE_MATCH); + } + protected ObjectMapper serializeDataTimeAsLong(ObjectMapper mapper) { return mapper.copy().registerModule(new SimpleModule().addSerializer(DateTime.class, new DateTimeSerializer())); diff --git a/server/src/main/java/io/druid/server/security/SystemAuthorizationInfo.java b/server/src/main/java/io/druid/server/security/SystemAuthorizationInfo.java new file mode 100644 index 000000000000..f41983edc696 --- /dev/null +++ b/server/src/main/java/io/druid/server/security/SystemAuthorizationInfo.java @@ -0,0 +1,39 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.server.security; + +/** + * An AuthorizationInfo that is useful for actions generated internally by the system. It allows everything. + */ +public class SystemAuthorizationInfo implements AuthorizationInfo +{ + public static final SystemAuthorizationInfo INSTANCE = new SystemAuthorizationInfo(); + + private SystemAuthorizationInfo() + { + // Singleton. + } + + @Override + public Access isAuthorized(final Resource resource, final Action action) + { + return new Access(true); + } +} diff --git a/server/src/test/java/io/druid/server/QueryResourceTest.java b/server/src/test/java/io/druid/server/QueryResourceTest.java index 3344cddb0390..1dc226b979f8 100644 --- a/server/src/test/java/io/druid/server/QueryResourceTest.java +++ b/server/src/test/java/io/druid/server/QueryResourceTest.java @@ -132,16 +132,19 @@ public void setup() EasyMock.expect(testServletRequest.getRemoteAddr()).andReturn("localhost").anyTimes(); queryManager = new QueryManager(); queryResource = new QueryResource( - warehouse, - serverConfig, + new QueryLifecycleFactory( + warehouse, + testSegmentWalker, + new DefaultGenericQueryMetricsFactory(jsonMapper), + new NoopServiceEmitter(), + new NoopRequestLogger(), + serverConfig, + new AuthConfig() + ), jsonMapper, jsonMapper, - testSegmentWalker, - new NoopServiceEmitter(), - new NoopRequestLogger(), queryManager, - new AuthConfig(), - new DefaultGenericQueryMetricsFactory(jsonMapper) + new AuthConfig() ); } @@ -163,6 +166,16 @@ public void setup() @Test public void testGoodQuery() throws IOException { + EasyMock.expect(testServletRequest.getAttribute(EasyMock.anyString())).andReturn( + new AuthorizationInfo() + { + @Override + public Access isAuthorized(Resource resource, Action action) + { + return new Access(true); + } + } + ).times(1); EasyMock.replay(testServletRequest); Response response = queryResource.doPost( new ByteArrayInputStream(simpleTimeSeriesQuery.getBytes("UTF-8")), @@ -207,16 +220,19 @@ public Access isAuthorized( EasyMock.replay(testServletRequest); queryResource = new QueryResource( - warehouse, - serverConfig, + new QueryLifecycleFactory( + warehouse, + testSegmentWalker, + new DefaultGenericQueryMetricsFactory(jsonMapper), + new NoopServiceEmitter(), + new NoopRequestLogger(), + serverConfig, + new AuthConfig(true) + ), jsonMapper, jsonMapper, - testSegmentWalker, - new NoopServiceEmitter(), - new NoopRequestLogger(), queryManager, - new AuthConfig(true), - new DefaultGenericQueryMetricsFactory(jsonMapper) + new AuthConfig(true) ); Response response = queryResource.doPost( @@ -278,16 +294,19 @@ public Access isAuthorized( EasyMock.replay(testServletRequest); queryResource = new QueryResource( - warehouse, - serverConfig, + new QueryLifecycleFactory( + warehouse, + testSegmentWalker, + new DefaultGenericQueryMetricsFactory(jsonMapper), + new NoopServiceEmitter(), + new NoopRequestLogger(), + serverConfig, + new AuthConfig(true) + ), jsonMapper, jsonMapper, - testSegmentWalker, - new NoopServiceEmitter(), - new NoopRequestLogger(), queryManager, - new AuthConfig(true), - new DefaultGenericQueryMetricsFactory(jsonMapper) + new AuthConfig(true) ); final String queryString = "{\"queryType\":\"timeBoundary\", \"dataSource\":\"allow\"," @@ -375,16 +394,19 @@ public Access isAuthorized( EasyMock.replay(testServletRequest); queryResource = new QueryResource( - warehouse, - serverConfig, + new QueryLifecycleFactory( + warehouse, + testSegmentWalker, + new DefaultGenericQueryMetricsFactory(jsonMapper), + new NoopServiceEmitter(), + new NoopRequestLogger(), + serverConfig, + new AuthConfig(true) + ), jsonMapper, jsonMapper, - testSegmentWalker, - new NoopServiceEmitter(), - new NoopRequestLogger(), queryManager, - new AuthConfig(true), - new DefaultGenericQueryMetricsFactory(jsonMapper) + new AuthConfig(true) ); final String queryString = "{\"queryType\":\"timeBoundary\", \"dataSource\":\"allow\"," diff --git a/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java b/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java index 7ce0254b8a32..105c08809de0 100644 --- a/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java +++ b/sql/src/main/java/io/druid/sql/calcite/planner/PlannerFactory.java @@ -21,8 +21,7 @@ import com.google.inject.Inject; import io.druid.math.expr.ExprMacroTable; -import io.druid.query.QuerySegmentWalker; -import io.druid.server.initialization.ServerConfig; +import io.druid.server.QueryLifecycleFactory; import io.druid.sql.calcite.rel.QueryMaker; import io.druid.sql.calcite.schema.DruidSchema; import org.apache.calcite.avatica.util.Casing; @@ -50,35 +49,32 @@ public class PlannerFactory .build(); private final DruidSchema druidSchema; - private final QuerySegmentWalker walker; + private final QueryLifecycleFactory queryLifecycleFactory; private final DruidOperatorTable operatorTable; private final ExprMacroTable macroTable; private final PlannerConfig plannerConfig; - private final ServerConfig serverConfig; @Inject public PlannerFactory( final DruidSchema druidSchema, - final QuerySegmentWalker walker, + final QueryLifecycleFactory queryLifecycleFactory, final DruidOperatorTable operatorTable, final ExprMacroTable macroTable, - final PlannerConfig plannerConfig, - final ServerConfig serverConfig + final PlannerConfig plannerConfig ) { this.druidSchema = druidSchema; - this.walker = walker; + this.queryLifecycleFactory = queryLifecycleFactory; this.operatorTable = operatorTable; this.macroTable = macroTable; this.plannerConfig = plannerConfig; - this.serverConfig = serverConfig; } public DruidPlanner createPlanner(final Map queryContext) { final SchemaPlus rootSchema = Calcites.createRootSchema(druidSchema); final PlannerContext plannerContext = PlannerContext.create(operatorTable, macroTable, plannerConfig, queryContext); - final QueryMaker queryMaker = new QueryMaker(walker, plannerContext, serverConfig); + final QueryMaker queryMaker = new QueryMaker(queryLifecycleFactory, plannerContext); final FrameworkConfig frameworkConfig = Frameworks .newConfigBuilder() .parserConfig(PARSER_CONFIG) diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidNestedGroupBy.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidNestedGroupBy.java index 0cf203bf6d8d..3ada2785f16a 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidNestedGroupBy.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidNestedGroupBy.java @@ -98,7 +98,6 @@ public Sequence runQuery() if (queryDataSource != null) { return getQueryMaker().runQuery( queryDataSource, - sourceRel.getOutputRowSignature(), queryBuilder ); } else { diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java index fb49416ded25..64d91a1e5498 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/DruidQueryRel.java @@ -164,7 +164,7 @@ public int getQueryCount() @Override public Sequence runQuery() { - return getQueryMaker().runQuery(druidTable.getDataSource(), druidTable.getRowSignature(), queryBuilder); + return getQueryMaker().runQuery(druidTable.getDataSource(), queryBuilder); } @Override diff --git a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java index c7a59b12513f..d9ff5a2641be 100644 --- a/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java +++ b/sql/src/main/java/io/druid/sql/calcite/rel/QueryMaker.java @@ -25,7 +25,6 @@ import com.google.common.collect.Iterables; import com.google.common.primitives.Doubles; import com.google.common.primitives.Ints; -import io.druid.client.DirectDruidClient; import io.druid.common.guava.GuavaUtils; import io.druid.data.input.Row; import io.druid.java.util.common.ISE; @@ -33,9 +32,8 @@ import io.druid.java.util.common.guava.Sequences; import io.druid.math.expr.Evals; import io.druid.query.DataSource; +import io.druid.query.Query; import io.druid.query.QueryDataSource; -import io.druid.query.QueryPlus; -import io.druid.query.QuerySegmentWalker; import io.druid.query.Result; import io.druid.query.groupby.GroupByQuery; import io.druid.query.select.EventHolder; @@ -48,10 +46,9 @@ import io.druid.query.topn.TopNQuery; import io.druid.query.topn.TopNResultValue; import io.druid.segment.column.Column; -import io.druid.server.initialization.ServerConfig; +import io.druid.server.QueryLifecycleFactory; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.PlannerContext; -import io.druid.sql.calcite.table.RowSignature; import org.apache.calcite.avatica.ColumnMetaData; import org.apache.calcite.rel.type.RelDataTypeField; import org.apache.calcite.runtime.Hook; @@ -69,19 +66,16 @@ public class QueryMaker { - private final QuerySegmentWalker walker; + private final QueryLifecycleFactory queryLifecycleFactory; private final PlannerContext plannerContext; - private final ServerConfig serverConfig; public QueryMaker( - final QuerySegmentWalker walker, - final PlannerContext plannerContext, - final ServerConfig serverConfig + final QueryLifecycleFactory queryLifecycleFactory, + final PlannerContext plannerContext ) { - this.walker = walker; + this.queryLifecycleFactory = queryLifecycleFactory; this.plannerContext = plannerContext; - this.serverConfig = serverConfig; } public PlannerContext getPlannerContext() @@ -91,7 +85,6 @@ public PlannerContext getPlannerContext() public Sequence runQuery( final DataSource dataSource, - final RowSignature sourceRowSignature, final DruidQueryBuilder queryBuilder ) { @@ -161,31 +154,20 @@ public boolean hasNext() @Override public Sequence next() { - final SelectQuery queryWithPagination = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( - baseQuery.withPagingSpec( - new PagingSpec( - pagingIdentifiers.get(), - plannerContext.getPlannerConfig().getSelectThreshold(), - true - ) - ), - serverConfig + final SelectQuery queryWithPagination = baseQuery.withPagingSpec( + new PagingSpec( + pagingIdentifiers.get(), + plannerContext.getPlannerConfig().getSelectThreshold(), + true + ) ); - Hook.QUERY_PLAN.run(queryWithPagination); - morePages.set(false); final AtomicBoolean gotResult = new AtomicBoolean(); return Sequences.concat( Sequences.map( - QueryPlus.wrap(queryWithPagination) - .run(walker, - DirectDruidClient.makeResponseContextForQuery( - queryWithPagination, - plannerContext.getQueryStartTimeMillis() - ) - ), + runQuery(queryWithPagination), new Function, Sequence>() { @Override @@ -244,30 +226,29 @@ public void remove() return Sequences.concat(sequenceOfSequences); } + @SuppressWarnings("unchecked") + private Sequence runQuery(final Query query) + { + Hook.QUERY_PLAN.run(query); + + // Authorization really should be applied in planning. At this point the query has already begun to execute. + // So, use "null" authorizationInfo to force the query to fail if security is enabled. + return queryLifecycleFactory.factorize().runSimple(query, null, null); + } + private Sequence executeTimeseries( final DruidQueryBuilder queryBuilder, - final TimeseriesQuery baseQuery + final TimeseriesQuery query ) { - final TimeseriesQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( - baseQuery, - serverConfig - ); - final List fieldList = queryBuilder.getRowType().getFieldList(); final String timeOutputName = queryBuilder.getGrouping().getDimensions().isEmpty() ? null : Iterables.getOnlyElement(queryBuilder.getGrouping().getDimensions()) .getOutputName(); - Hook.QUERY_PLAN.run(query); - return Sequences.map( - QueryPlus.wrap(query) - .run( - walker, - DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis()) - ), + runQuery(query), new Function, Object[]>() { @Override @@ -293,25 +274,14 @@ public Object[] apply(final Result result) private Sequence executeTopN( final DruidQueryBuilder queryBuilder, - final TopNQuery baseQuery + final TopNQuery query ) { - final TopNQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( - baseQuery, - serverConfig - ); - final List fieldList = queryBuilder.getRowType().getFieldList(); - Hook.QUERY_PLAN.run(query); - return Sequences.concat( Sequences.map( - QueryPlus.wrap(query) - .run( - walker, - DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis()) - ), + runQuery(query), new Function, Sequence>() { @Override @@ -339,23 +309,13 @@ public Sequence apply(final Result result) private Sequence executeGroupBy( final DruidQueryBuilder queryBuilder, - final GroupByQuery baseQuery + final GroupByQuery query ) { - final GroupByQuery query = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( - baseQuery, - serverConfig - ); - final List fieldList = queryBuilder.getRowType().getFieldList(); - Hook.QUERY_PLAN.run(query); return Sequences.map( - QueryPlus.wrap(query) - .run( - walker, - DirectDruidClient.makeResponseContextForQuery(query, plannerContext.getQueryStartTimeMillis()) - ), + runQuery(query), new Function() { @Override diff --git a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java index 0f0d5b41181f..41e07dbb47b5 100644 --- a/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java +++ b/sql/src/main/java/io/druid/sql/calcite/schema/DruidSchema.java @@ -30,7 +30,6 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; -import io.druid.client.DirectDruidClient; import io.druid.client.ServerView; import io.druid.client.TimelineServerView; import io.druid.guice.ManageLifecycle; @@ -41,8 +40,6 @@ import io.druid.java.util.common.guava.Yielders; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; -import io.druid.query.QueryPlus; -import io.druid.query.QuerySegmentWalker; import io.druid.query.TableDataSource; import io.druid.query.metadata.metadata.AllColumnIncluderator; import io.druid.query.metadata.metadata.ColumnAnalysis; @@ -50,8 +47,9 @@ import io.druid.query.metadata.metadata.SegmentMetadataQuery; import io.druid.query.spec.MultipleSpecificSegmentSpec; import io.druid.segment.column.ValueType; +import io.druid.server.QueryLifecycleFactory; import io.druid.server.coordination.DruidServerMetadata; -import io.druid.server.initialization.ServerConfig; +import io.druid.server.security.SystemAuthorizationInfo; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.table.DruidTable; import io.druid.sql.calcite.table.RowSignature; @@ -91,13 +89,12 @@ public class DruidSchema extends AbstractSchema private static final EmittingLogger log = new EmittingLogger(DruidSchema.class); private static final int MAX_SEGMENTS_PER_QUERY = 15000; - private final QuerySegmentWalker walker; + private final QueryLifecycleFactory queryLifecycleFactory; private final TimelineServerView serverView; private final PlannerConfig config; private final ViewManager viewManager; private final ExecutorService cacheExec; private final ConcurrentMap tables; - private final ServerConfig serverConfig; // For awaitInitialization. private final CountDownLatch initializationLatch = new CountDownLatch(1); @@ -124,20 +121,18 @@ public class DruidSchema extends AbstractSchema @Inject public DruidSchema( - final QuerySegmentWalker walker, + final QueryLifecycleFactory queryLifecycleFactory, final TimelineServerView serverView, final PlannerConfig config, - final ViewManager viewManager, - final ServerConfig serverConfig + final ViewManager viewManager ) { - this.walker = Preconditions.checkNotNull(walker, "walker"); + this.queryLifecycleFactory = Preconditions.checkNotNull(queryLifecycleFactory, "queryLifecycleFactory"); this.serverView = Preconditions.checkNotNull(serverView, "serverView"); this.config = Preconditions.checkNotNull(config, "config"); this.viewManager = Preconditions.checkNotNull(viewManager, "viewManager"); this.cacheExec = ScheduledExecutors.fixed(1, "DruidSchema-Cache-%d"); this.tables = Maps.newConcurrentMap(); - this.serverConfig = serverConfig; } @LifecycleStart @@ -404,8 +399,7 @@ private Set refreshSegmentsForDataSource( final Set retVal = new HashSet<>(); final Sequence sequence = runSegmentMetadataQuery( - walker, - serverConfig, + queryLifecycleFactory, Iterables.limit(segments, MAX_SEGMENTS_PER_QUERY) ); @@ -475,8 +469,7 @@ private DruidTable buildDruidTable(final String dataSource) } private static Sequence runSegmentMetadataQuery( - final QuerySegmentWalker walker, - final ServerConfig serverConfig, + final QueryLifecycleFactory queryLifecycleFactory, final Iterable segments ) { @@ -491,28 +484,19 @@ private static Sequence runSegmentMetadataQuery( .map(DataSegment::toDescriptor).collect(Collectors.toList()) ); - final SegmentMetadataQuery segmentMetadataQuery = DirectDruidClient.withDefaultTimeoutAndMaxScatterGatherBytes( - new SegmentMetadataQuery( - new TableDataSource(dataSource), - querySegmentSpec, - new AllColumnIncluderator(), - false, - ImmutableMap.of(), - EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), - false, - false - ), - serverConfig + final SegmentMetadataQuery segmentMetadataQuery = new SegmentMetadataQuery( + new TableDataSource(dataSource), + querySegmentSpec, + new AllColumnIncluderator(), + false, + ImmutableMap.of(), + EnumSet.noneOf(SegmentMetadataQuery.AnalysisType.class), + false, + false ); - return QueryPlus.wrap(segmentMetadataQuery) - .run( - walker, - DirectDruidClient.makeResponseContextForQuery( - segmentMetadataQuery, - System.currentTimeMillis() - ) - ); + // Use SystemAuthorizationInfo since this is a query generated by Druid itself. + return queryLifecycleFactory.factorize().runSimple(segmentMetadataQuery, SystemAuthorizationInfo.INSTANCE, null); } private static RowSignature analysisToRowSignature(final SegmentAnalysis analysis) diff --git a/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java b/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java index 47e874a85279..da84d198f808 100644 --- a/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java +++ b/sql/src/test/java/io/druid/sql/avatica/DruidAvaticaHandlerTest.java @@ -120,7 +120,13 @@ public void setUp() throws Exception final DruidOperatorTable operatorTable = CalciteTests.createOperatorTable(); final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); druidMeta = new DruidMeta( - new PlannerFactory(druidSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()), + new PlannerFactory( + druidSchema, + CalciteTests.createMockQueryLifecycleFactory(walker), + operatorTable, + macroTable, + plannerConfig + ), AVATICA_CONFIG ); final DruidAvaticaHandler handler = new DruidAvaticaHandler( @@ -545,7 +551,13 @@ public int getMaxRowsPerFrame() final List frames = new ArrayList<>(); DruidMeta smallFrameDruidMeta = new DruidMeta( - new PlannerFactory(druidSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()), + new PlannerFactory( + druidSchema, + CalciteTests.createMockQueryLifecycleFactory(walker), + operatorTable, + macroTable, + plannerConfig + ), smallFrameConfig ) { diff --git a/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java b/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java index 7cde6e2012e0..a3757ebd7633 100644 --- a/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java +++ b/sql/src/test/java/io/druid/sql/avatica/DruidStatementTest.java @@ -22,7 +22,6 @@ import com.google.common.base.Function; import com.google.common.collect.Lists; import io.druid.math.expr.ExprMacroTable; -import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerConfig; @@ -68,11 +67,10 @@ public void setUp() throws Exception final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); plannerFactory = new PlannerFactory( druidSchema, - walker, + CalciteTests.createMockQueryLifecycleFactory(walker), operatorTable, macroTable, - plannerConfig, - new ServerConfig() + plannerConfig ); } diff --git a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java index 5d0ee07e1c3f..ebf93328300c 100644 --- a/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/CalciteQueryTest.java @@ -81,7 +81,6 @@ import io.druid.segment.column.Column; import io.druid.segment.column.ValueType; import io.druid.segment.virtual.ExpressionVirtualColumn; -import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.filtration.Filtration; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; @@ -5354,11 +5353,10 @@ private List getResults( final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); final PlannerFactory plannerFactory = new PlannerFactory( druidSchema, - walker, + CalciteTests.createMockQueryLifecycleFactory(walker), operatorTable, macroTable, - plannerConfig, - new ServerConfig() + plannerConfig ); viewManager.createView( diff --git a/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java b/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java index 8280ab682b00..ab75a343d84d 100644 --- a/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/http/SqlResourceTest.java @@ -29,7 +29,6 @@ import io.druid.math.expr.ExprMacroTable; import io.druid.query.QueryInterruptedException; import io.druid.query.ResourceLimitExceededException; -import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.DruidOperatorTable; import io.druid.sql.calcite.planner.PlannerConfig; @@ -80,7 +79,13 @@ public void setUp() throws Exception final ExprMacroTable macroTable = CalciteTests.createExprMacroTable(); resource = new SqlResource( JSON_MAPPER, - new PlannerFactory(druidSchema, walker, operatorTable, macroTable, plannerConfig, new ServerConfig()) + new PlannerFactory( + druidSchema, + CalciteTests.createMockQueryLifecycleFactory(walker), + operatorTable, + macroTable, + plannerConfig + ) ); } diff --git a/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java b/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java index c3ad3ea8cf8d..49497ac5fa47 100644 --- a/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java +++ b/sql/src/test/java/io/druid/sql/calcite/schema/DruidSchemaTest.java @@ -31,7 +31,6 @@ import io.druid.segment.QueryableIndex; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndexSchema; -import io.druid.server.initialization.ServerConfig; import io.druid.sql.calcite.planner.Calcites; import io.druid.sql.calcite.planner.PlannerConfig; import io.druid.sql.calcite.table.DruidTable; @@ -141,11 +140,10 @@ public void setUp() throws Exception ); schema = new DruidSchema( - walker, + CalciteTests.createMockQueryLifecycleFactory(walker), new TestServerInventoryView(walker.getSegments()), PLANNER_CONFIG_DEFAULT, - new NoopViewManager(), - new ServerConfig() + new NoopViewManager() ); schema.start(); diff --git a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java index 0786d1a5bbbd..2b74cf523968 100644 --- a/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/io/druid/sql/calcite/util/CalciteTests.java @@ -30,6 +30,8 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; +import com.metamx.emitter.core.NoopEmitter; +import com.metamx.emitter.service.ServiceEmitter; import io.druid.collections.StupidPool; import io.druid.data.input.InputRow; import io.druid.data.input.impl.DimensionsSpec; @@ -40,12 +42,16 @@ import io.druid.guice.ExpressionModule; import io.druid.guice.annotations.Json; import io.druid.math.expr.ExprMacroTable; +import io.druid.query.DefaultGenericQueryMetricsFactory; import io.druid.query.DefaultQueryRunnerFactoryConglomerate; import io.druid.query.DruidProcessingConfig; import io.druid.query.Query; import io.druid.query.QueryRunnerFactory; import io.druid.query.QueryRunnerFactoryConglomerate; import io.druid.query.QueryRunnerTestHelper; +import io.druid.query.QuerySegmentWalker; +import io.druid.query.QueryToolChest; +import io.druid.query.QueryToolChestWarehouse; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.DoubleSumAggregatorFactory; import io.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory; @@ -77,7 +83,10 @@ import io.druid.segment.QueryableIndex; import io.druid.segment.TestHelper; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.server.QueryLifecycleFactory; import io.druid.server.initialization.ServerConfig; +import io.druid.server.log.NoopRequestLogger; +import io.druid.server.security.AuthConfig; import io.druid.sql.calcite.aggregation.SqlAggregator; import io.druid.sql.calcite.expression.SqlOperatorConversion; import io.druid.sql.calcite.planner.DruidOperatorTable; @@ -272,6 +281,26 @@ public static QueryRunnerFactoryConglomerate queryRunnerFactoryConglomerate() return CONGLOMERATE; } + public static QueryLifecycleFactory createMockQueryLifecycleFactory(final QuerySegmentWalker walker) + { + return new QueryLifecycleFactory( + new QueryToolChestWarehouse() + { + @Override + public > QueryToolChest getToolChest(final QueryType query) + { + return CONGLOMERATE.findFactory(query).getToolchest(); + } + }, + walker, + new DefaultGenericQueryMetricsFactory(INJECTOR.getInstance(Key.get(ObjectMapper.class, Json.class))), + new ServiceEmitter("dummy", "dummy", new NoopEmitter()), + new NoopRequestLogger(), + new ServerConfig(), + new AuthConfig() + ); + } + public static SpecificSegmentsQuerySegmentWalker createMockWalker(final File tmpDir) { final QueryableIndex index1 = IndexBuilder.create() @@ -353,11 +382,10 @@ public static DruidSchema createMockSchema( ) { final DruidSchema schema = new DruidSchema( - walker, + CalciteTests.createMockQueryLifecycleFactory(walker), new TestServerInventoryView(walker.getSegments()), plannerConfig, - viewManager, - new ServerConfig() + viewManager ); schema.start();