Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions processing/src/main/java/io/druid/query/QueryPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
import java.util.Map;

/**
* An immutable composite object of {@link Query} + extra stuff needed in {@link QueryRunner}s. This "extra stuff"
* is only {@link QueryMetrics} yet.
* An immutable composite object of {@link Query} + extra stuff needed in {@link QueryRunner}s.
*/
@PublicApi
public final class QueryPlus<T>
Expand All @@ -41,16 +40,18 @@ public final class QueryPlus<T>
public static <T> QueryPlus<T> wrap(Query<T> query)
{
Preconditions.checkNotNull(query);
return new QueryPlus<>(query, null);
return new QueryPlus<>(query, null, null);
}

private final Query<T> query;
private final QueryMetrics<?> queryMetrics;
private final String identity;

private QueryPlus(Query<T> query, QueryMetrics<?> queryMetrics)
private QueryPlus(Query<T> query, QueryMetrics<?> queryMetrics, String identity)
{
this.query = query;
this.queryMetrics = queryMetrics;
this.identity = identity;
}

public Query<T> getQuery()
Expand All @@ -64,6 +65,15 @@ public QueryMetrics<?> getQueryMetrics()
return queryMetrics;
}

/**
* Returns the same QueryPlus object with the identity replaced. This new identity will affect future calls to
* {@link #withoutQueryMetrics()} but will not affect any currently-existing queryMetrics.
*/
public QueryPlus<T> withIdentity(String identity)
{
return new QueryPlus<>(query, queryMetrics, identity);
}

/**
* Returns the same QueryPlus object, if it already has {@link QueryMetrics} ({@link #getQueryMetrics()} returns not
* null), or returns a new QueryPlus object with {@link Query} from this QueryPlus and QueryMetrics created using the
Expand All @@ -79,7 +89,13 @@ public QueryPlus<T> withQueryMetrics(QueryToolChest<T, ? extends Query<T>> query
if (queryMetrics != null) {
return this;
} else {
return new QueryPlus<>(query, ((QueryToolChest) queryToolChest).makeMetrics(query));
final QueryMetrics metrics = ((QueryToolChest) queryToolChest).makeMetrics(query);

if (identity != null) {
metrics.identity(identity);
}

return new QueryPlus<>(query, metrics, identity);
}
}

Expand All @@ -104,7 +120,7 @@ private QueryPlus<T> withoutQueryMetrics()
if (queryMetrics == null) {
return this;
} else {
return new QueryPlus<>(query, null);
return new QueryPlus<>(query, null, identity);
}
}

Expand All @@ -113,15 +129,15 @@ private QueryPlus<T> withoutQueryMetrics()
*/
public QueryPlus<T> withQuerySegmentSpec(QuerySegmentSpec spec)
{
return new QueryPlus<>(query.withQuerySegmentSpec(spec), queryMetrics);
return new QueryPlus<>(query.withQuerySegmentSpec(spec), queryMetrics, identity);
}

/**
* Returns a QueryPlus object with {@link QueryMetrics} from this QueryPlus object, and the provided {@link Query}.
*/
public <U> QueryPlus<U> withQuery(Query<U> replacementQuery)
{
return new QueryPlus<>(replacementQuery, queryMetrics);
return new QueryPlus<>(replacementQuery, queryMetrics, identity);
}

public Sequence<T> run(QuerySegmentWalker walker, Map<String, Object> context)
Expand Down
47 changes: 26 additions & 21 deletions server/src/main/java/io/druid/server/QueryLifecycle.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@

package io.druid.server;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Iterables;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.client.DirectDruidClient;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.SequenceWrapper;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.DruidMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.Query;
Expand Down Expand Up @@ -80,7 +81,7 @@ public class QueryLifecycle
private State state = State.NEW;
private AuthenticationResult authenticationResult;
private QueryToolChest toolChest;
private QueryPlus queryPlus;
private Query baseQuery;

public QueryLifecycle(
final QueryToolChestWarehouse warehouse,
Expand Down Expand Up @@ -167,7 +168,7 @@ public void initialize(final Query baseQuery)
queryId = UUID.randomUUID().toString();
}

this.queryPlus = QueryPlus.wrap(baseQuery.withId(queryId));
this.baseQuery = baseQuery.withId(queryId);
this.toolChest = warehouse.getToolChest(baseQuery);
}

Expand All @@ -186,7 +187,7 @@ public Access authorize(final AuthenticationResult authenticationResult)
AuthorizationUtils.authorizeAllResourceActions(
authenticationResult,
Iterables.transform(
queryPlus.getQuery().getDataSource().getNames(),
baseQuery.getDataSource().getNames(),
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR
),
authorizerMapper
Expand All @@ -210,7 +211,7 @@ public Access authorize(HttpServletRequest req)
AuthorizationUtils.authorizeAllResourceActions(
req,
Iterables.transform(
queryPlus.getQuery().getDataSource().getNames(),
baseQuery.getDataSource().getNames(),
AuthorizationUtils.DATASOURCE_READ_RA_GENERATOR
),
authorizerMapper
Expand All @@ -220,6 +221,9 @@ public Access authorize(HttpServletRequest req)

private Access doAuthorize(final AuthenticationResult authenticationResult, final Access authorizationResult)
{
Preconditions.checkNotNull(authenticationResult, "authenticationResult");
Preconditions.checkNotNull(authorizationResult, "authorizationResult");

if (!authorizationResult.isAllowed()) {
// Not authorized; go straight to Jail, do not pass Go.
transition(State.AUTHORIZING, State.UNAUTHORIZED);
Expand All @@ -229,12 +233,6 @@ private Access doAuthorize(final AuthenticationResult authenticationResult, fina

this.authenticationResult = authenticationResult;

final QueryMetrics queryMetrics = queryPlus.getQueryMetrics();

if (queryMetrics != null) {
queryMetrics.identity(authenticationResult.getIdentity());
}

return authorizationResult;
}

Expand All @@ -250,11 +248,13 @@ public QueryResponse execute()
transition(State.AUTHORIZED, State.EXECUTING);

final Map<String, Object> responseContext = DirectDruidClient.makeResponseContextForQuery(
queryPlus.getQuery(),
baseQuery,
System.currentTimeMillis()
);

final Sequence res = queryPlus.run(texasRanger, responseContext);
final Sequence res = QueryPlus.wrap(baseQuery)
.withIdentity(authenticationResult.getIdentity())
.run(texasRanger, responseContext);

return new QueryResponse(res == null ? Sequences.empty() : res, responseContext);
}
Expand All @@ -273,26 +273,25 @@ public void emitLogsAndMetrics(
final long bytesWritten
)
{
if (queryPlus == null) {
if (baseQuery == null) {
// Never initialized, don't log or emit anything.
return;
}

if (state == State.DONE) {
log.warn("Tried to emit logs and metrics twice for query[%s]!", queryPlus.getQuery().getId());
log.warn("Tried to emit logs and metrics twice for query[%s]!", baseQuery.getId());
}

state = State.DONE;

final Query query = queryPlus != null ? queryPlus.getQuery() : null;
final boolean success = e == null;

try {
final long queryTimeNs = System.nanoTime() - startNs;
QueryMetrics queryMetrics = DruidMetrics.makeRequestMetrics(
queryMetricsFactory,
toolChest,
queryPlus.getQuery(),
baseQuery,
Strings.nullToEmpty(remoteAddress)
);
queryMetrics.success(success);
Expand All @@ -302,21 +301,27 @@ public void emitLogsAndMetrics(
queryMetrics.reportQueryBytes(bytesWritten);
}

if (authenticationResult != null) {
queryMetrics.identity(authenticationResult.getIdentity());
}

queryMetrics.emit(emitter);

final Map<String, Object> statsMap = new LinkedHashMap<>();
statsMap.put("query/time", TimeUnit.NANOSECONDS.toMillis(queryTimeNs));
statsMap.put("query/bytes", bytesWritten);
statsMap.put("success", success);

if (authenticationResult != null) {
statsMap.put("identity", authenticationResult.getIdentity());
}

if (e != null) {
statsMap.put("exception", e.toString());

if (e instanceof QueryInterruptedException) {
// Mimic behavior from QueryResource, where this code was originally taken from.
log.warn(e, "Exception while processing queryId [%s]", queryPlus.getQuery().getId());
log.warn(e, "Exception while processing queryId [%s]", baseQuery.getId());
statsMap.put("interrupted", true);
statsMap.put("reason", e.toString());
}
Expand All @@ -326,19 +331,19 @@ public void emitLogsAndMetrics(
new RequestLogLine(
DateTimes.utc(startMs),
Strings.nullToEmpty(remoteAddress),
queryPlus.getQuery(),
baseQuery,
new QueryStats(statsMap)
)
);
}
catch (Exception ex) {
log.error(ex, "Unable to log query [%s]!", query);
log.error(ex, "Unable to log query [%s]!", baseQuery);
}
}

public Query getQuery()
{
return queryPlus.getQuery();
return baseQuery;
}

private void transition(final State from, final State to)
Expand Down