diff --git a/processing/src/main/java/io/druid/query/QueryPlus.java b/processing/src/main/java/io/druid/query/QueryPlus.java index d725048d0e68..facd59c909af 100644 --- a/processing/src/main/java/io/druid/query/QueryPlus.java +++ b/processing/src/main/java/io/druid/query/QueryPlus.java @@ -28,8 +28,7 @@ import java.util.Map; /** - * An immutable composite object of {@link Query} + extra stuff needed in {@link QueryRunner}s. This "extra stuff" - * is only {@link QueryMetrics} yet. + * An immutable composite object of {@link Query} + extra stuff needed in {@link QueryRunner}s. */ @PublicApi public final class QueryPlus @@ -41,16 +40,18 @@ public final class QueryPlus public static QueryPlus wrap(Query query) { Preconditions.checkNotNull(query); - return new QueryPlus<>(query, null); + return new QueryPlus<>(query, null, null); } private final Query query; private final QueryMetrics queryMetrics; + private final String identity; - private QueryPlus(Query query, QueryMetrics queryMetrics) + private QueryPlus(Query query, QueryMetrics queryMetrics, String identity) { this.query = query; this.queryMetrics = queryMetrics; + this.identity = identity; } public Query getQuery() @@ -64,6 +65,15 @@ public QueryMetrics getQueryMetrics() return queryMetrics; } + /** + * Returns the same QueryPlus object with the identity replaced. This new identity will affect future calls to + * {@link #withoutQueryMetrics()} but will not affect any currently-existing queryMetrics. + */ + public QueryPlus withIdentity(String identity) + { + return new QueryPlus<>(query, queryMetrics, identity); + } + /** * Returns the same QueryPlus object, if it already has {@link QueryMetrics} ({@link #getQueryMetrics()} returns not * null), or returns a new QueryPlus object with {@link Query} from this QueryPlus and QueryMetrics created using the @@ -79,7 +89,13 @@ public QueryPlus withQueryMetrics(QueryToolChest> query if (queryMetrics != null) { return this; } else { - return new QueryPlus<>(query, ((QueryToolChest) queryToolChest).makeMetrics(query)); + final QueryMetrics metrics = ((QueryToolChest) queryToolChest).makeMetrics(query); + + if (identity != null) { + metrics.identity(identity); + } + + return new QueryPlus<>(query, metrics, identity); } } @@ -104,7 +120,7 @@ private QueryPlus withoutQueryMetrics() if (queryMetrics == null) { return this; } else { - return new QueryPlus<>(query, null); + return new QueryPlus<>(query, null, identity); } } @@ -113,7 +129,7 @@ private QueryPlus withoutQueryMetrics() */ public QueryPlus withQuerySegmentSpec(QuerySegmentSpec spec) { - return new QueryPlus<>(query.withQuerySegmentSpec(spec), queryMetrics); + return new QueryPlus<>(query.withQuerySegmentSpec(spec), queryMetrics, identity); } /** @@ -121,7 +137,7 @@ public QueryPlus withQuerySegmentSpec(QuerySegmentSpec spec) */ public QueryPlus withQuery(Query replacementQuery) { - return new QueryPlus<>(replacementQuery, queryMetrics); + return new QueryPlus<>(replacementQuery, queryMetrics, identity); } public Sequence run(QuerySegmentWalker walker, Map context) diff --git a/server/src/main/java/io/druid/server/QueryLifecycle.java b/server/src/main/java/io/druid/server/QueryLifecycle.java index 70486b9b7bb9..435e376dfd3d 100644 --- a/server/src/main/java/io/druid/server/QueryLifecycle.java +++ b/server/src/main/java/io/druid/server/QueryLifecycle.java @@ -19,9 +19,9 @@ package io.druid.server; +import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Iterables; -import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.client.DirectDruidClient; import io.druid.java.util.common.DateTimes; import io.druid.java.util.common.ISE; @@ -29,6 +29,7 @@ import io.druid.java.util.common.guava.SequenceWrapper; import io.druid.java.util.common.guava.Sequences; import io.druid.java.util.common.logger.Logger; +import io.druid.java.util.emitter.service.ServiceEmitter; import io.druid.query.DruidMetrics; import io.druid.query.GenericQueryMetricsFactory; import io.druid.query.Query; @@ -80,7 +81,7 @@ public class QueryLifecycle private State state = State.NEW; private AuthenticationResult authenticationResult; private QueryToolChest toolChest; - private QueryPlus queryPlus; + private Query baseQuery; public QueryLifecycle( final QueryToolChestWarehouse warehouse, @@ -167,7 +168,7 @@ public void initialize(final Query baseQuery) queryId = UUID.randomUUID().toString(); } - this.queryPlus = QueryPlus.wrap(baseQuery.withId(queryId)); + this.baseQuery = baseQuery.withId(queryId); this.toolChest = warehouse.getToolChest(baseQuery); } @@ -186,7 +187,7 @@ public Access authorize(final AuthenticationResult authenticationResult) AuthorizationUtils.authorizeAllResourceActions( authenticationResult, Iterables.transform( - queryPlus.getQuery().getDataSource().getNames(), + baseQuery.getDataSource().getNames(), AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR ), authorizerMapper @@ -210,7 +211,7 @@ public Access authorize(HttpServletRequest req) AuthorizationUtils.authorizeAllResourceActions( req, Iterables.transform( - queryPlus.getQuery().getDataSource().getNames(), + baseQuery.getDataSource().getNames(), AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR ), authorizerMapper @@ -220,6 +221,9 @@ public Access authorize(HttpServletRequest req) private Access doAuthorize(final AuthenticationResult authenticationResult, final Access authorizationResult) { + Preconditions.checkNotNull(authenticationResult, "authenticationResult"); + Preconditions.checkNotNull(authorizationResult, "authorizationResult"); + if (!authorizationResult.isAllowed()) { // Not authorized; go straight to Jail, do not pass Go. transition(State.AUTHORIZING, State.UNAUTHORIZED); @@ -229,12 +233,6 @@ private Access doAuthorize(final AuthenticationResult authenticationResult, fina this.authenticationResult = authenticationResult; - final QueryMetrics queryMetrics = queryPlus.getQueryMetrics(); - - if (queryMetrics != null) { - queryMetrics.identity(authenticationResult.getIdentity()); - } - return authorizationResult; } @@ -250,11 +248,13 @@ public QueryResponse execute() transition(State.AUTHORIZED, State.EXECUTING); final Map responseContext = DirectDruidClient.makeResponseContextForQuery( - queryPlus.getQuery(), + baseQuery, System.currentTimeMillis() ); - final Sequence res = queryPlus.run(texasRanger, responseContext); + final Sequence res = QueryPlus.wrap(baseQuery) + .withIdentity(authenticationResult.getIdentity()) + .run(texasRanger, responseContext); return new QueryResponse(res == null ? Sequences.empty() : res, responseContext); } @@ -273,18 +273,17 @@ public void emitLogsAndMetrics( final long bytesWritten ) { - if (queryPlus == null) { + if (baseQuery == null) { // Never initialized, don't log or emit anything. return; } if (state == State.DONE) { - log.warn("Tried to emit logs and metrics twice for query[%s]!", queryPlus.getQuery().getId()); + log.warn("Tried to emit logs and metrics twice for query[%s]!", baseQuery.getId()); } state = State.DONE; - final Query query = queryPlus != null ? queryPlus.getQuery() : null; final boolean success = e == null; try { @@ -292,7 +291,7 @@ public void emitLogsAndMetrics( QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics( queryMetricsFactory, toolChest, - queryPlus.getQuery(), + baseQuery, Strings.nullToEmpty(remoteAddress) ); queryMetrics.success(success); @@ -302,21 +301,27 @@ public void emitLogsAndMetrics( queryMetrics.reportQueryBytes(bytesWritten); } + if (authenticationResult != null) { + queryMetrics.identity(authenticationResult.getIdentity()); + } + queryMetrics.emit(emitter); final Map statsMap = new LinkedHashMap<>(); statsMap.put("query/time", TimeUnit.NANOSECONDS.toMillis(queryTimeNs)); statsMap.put("query/bytes", bytesWritten); statsMap.put("success", success); + if (authenticationResult != null) { statsMap.put("identity", authenticationResult.getIdentity()); } + if (e != null) { statsMap.put("exception", e.toString()); if (e instanceof QueryInterruptedException) { // Mimic behavior from QueryResource, where this code was originally taken from. - log.warn(e, "Exception while processing queryId [%s]", queryPlus.getQuery().getId()); + log.warn(e, "Exception while processing queryId [%s]", baseQuery.getId()); statsMap.put("interrupted", true); statsMap.put("reason", e.toString()); } @@ -326,19 +331,19 @@ public void emitLogsAndMetrics( new RequestLogLine( DateTimes.utc(startMs), Strings.nullToEmpty(remoteAddress), - queryPlus.getQuery(), + baseQuery, new QueryStats(statsMap) ) ); } catch (Exception ex) { - log.error(ex, "Unable to log query [%s]!", query); + log.error(ex, "Unable to log query [%s]!", baseQuery); } } public Query getQuery() { - return queryPlus.getQuery(); + return baseQuery; } private void transition(final State from, final State to)