Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> r
{
final Query<T> query = queryPlus.getQuery();
final int priority = QueryContexts.getPriority(query);
final QueryPlus<T> threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState();
final ListenableFuture<Sequence<T>> future = executor.submit(new AbstractPrioritizedCallable<Sequence<T>>(priority)
{
@Override
public Sequence<T> call() throws Exception
{
//Note: this is assumed that baseRunner does most of the work eagerly on call to the
//run() method and resulting sequence accumulate/yield is fast.
return baseRunner.run(queryPlus, responseContext);
return baseRunner.run(threadSafeQueryPlus, responseContext);
}
});
queryWatcher.registerQuery(query, future);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@
public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> delegate;
private final QueryToolChest<?, ? super Query<T>> queryToolChest;
private final QueryToolChest<T, ? extends Query<T>> queryToolChest;
private final ServiceEmitter emitter;
private final AtomicLong cpuTimeAccumulator;
private final boolean report;

private CPUTimeMetricQueryRunner(
QueryRunner<T> delegate,
QueryToolChest<?, ? super Query<T>> queryToolChest,
QueryToolChest<T, ? extends Query<T>> queryToolChest,
ServiceEmitter emitter,
AtomicLong cpuTimeAccumulator,
boolean report
Expand All @@ -60,8 +60,7 @@ private CPUTimeMetricQueryRunner(
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
final QueryPlus<T> queryWithMetrics =
queryPlus.withQueryMetrics((QueryToolChest<T, ? extends Query<T>>) queryToolChest);
final QueryPlus<T> queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest);
final Sequence<T> baseSequence = delegate.run(queryWithMetrics, responseContext);
return Sequences.wrap(
baseSequence,
Expand Down Expand Up @@ -94,7 +93,7 @@ public void after(boolean isDone, Throwable thrown) throws Exception

public static <T> QueryRunner<T> safeBuild(
QueryRunner<T> delegate,
QueryToolChest<?, ? super Query<T>> queryToolChest,
QueryToolChest<T, ? extends Query<T>> queryToolChest,
ServiceEmitter emitter,
AtomicLong accumulator,
boolean report
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> r
Query<T> query = queryPlus.getQuery();
final int priority = QueryContexts.getPriority(query);
final Ordering ordering = query.getResultOrdering();

final QueryPlus<T> threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState();
return new BaseSequence<T, Iterator<T>>(
new BaseSequence.IteratorMaker<T, Iterator<T>>()
{
Expand Down Expand Up @@ -122,7 +122,7 @@ public ListenableFuture<Iterable<T>> apply(final QueryRunner<T> input)
public Iterable<T> call() throws Exception
{
try {
Sequence<T> result = input.run(queryPlus, responseContext);
Sequence<T> result = input.run(threadSafeQueryPlus, responseContext);
if (result == null) {
throw new ISE("Got a null result! Segments are missing!");
}
Expand Down
90 changes: 58 additions & 32 deletions processing/src/main/java/io/druid/query/DefaultQueryMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,42 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
* DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the
* thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread}
* field should be updated.
*/
public class DefaultQueryMetrics<QueryType extends Query<?>> implements QueryMetrics<QueryType>
{
protected final ObjectMapper jsonMapper;
protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
protected final Map<String, Number> metrics = new HashMap<>();

/** Non final to give subclasses ability to reassign it. */
protected Thread ownerThread = Thread.currentThread();

public DefaultQueryMetrics(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}

protected void checkModifiedFromOwnerThread()
{
if (Thread.currentThread() != ownerThread) {
throw new IllegalStateException(
"DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or "
+ "metric information from multiple threads or from an async thread, this information should explicitly be "
+ "passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be "
+ "reassigned explicitly");
}
}

protected void setDimension(String dimension, String value)
{
checkModifiedFromOwnerThread();
builder.setDimension(dimension, value);
}

@Override
public void query(QueryType query)
{
Expand All @@ -56,18 +81,19 @@ public void query(QueryType query)
@Override
public void dataSource(QueryType query)
{
builder.setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()));
setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource()));
}

@Override
public void queryType(QueryType query)
{
builder.setDimension(DruidMetrics.TYPE, query.getType());
setDimension(DruidMetrics.TYPE, query.getType());
}

@Override
public void interval(QueryType query)
{
checkModifiedFromOwnerThread();
builder.setDimension(
DruidMetrics.INTERVAL,
query.getIntervals().stream().map(Interval::toString).toArray(String[]::new)
Expand All @@ -77,32 +103,28 @@ public void interval(QueryType query)
@Override
public void hasFilters(QueryType query)
{
builder.setDimension("hasFilters", String.valueOf(query.hasFilters()));
setDimension("hasFilters", String.valueOf(query.hasFilters()));
}

@Override
public void duration(QueryType query)
{
builder.setDimension("duration", query.getDuration().toString());
setDimension("duration", query.getDuration().toString());
}

@Override
public void queryId(QueryType query)
{
builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));
setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));
}

@Override
public void context(QueryType query)
{
try {
builder.setDimension(
setDimension(
"context",
jsonMapper.writeValueAsString(
query.getContext() == null
? ImmutableMap.of()
: query.getContext()
)
jsonMapper.writeValueAsString(query.getContext() == null ? ImmutableMap.of() : query.getContext())
);
}
catch (JsonProcessingException e) {
Expand All @@ -113,111 +135,115 @@ public void context(QueryType query)
@Override
public void server(String host)
{
builder.setDimension("server", host);
setDimension("server", host);
}

@Override
public void remoteAddress(String remoteAddress)
{
builder.setDimension("remoteAddress", remoteAddress);
setDimension("remoteAddress", remoteAddress);
}

@Override
public void status(String status)
{
builder.setDimension(DruidMetrics.STATUS, status);
setDimension(DruidMetrics.STATUS, status);
}

@Override
public void success(boolean success)
{
builder.setDimension("success", String.valueOf(success));
setDimension("success", String.valueOf(success));
}

@Override
public void segment(String segmentIdentifier)
{
builder.setDimension("segment", segmentIdentifier);
setDimension("segment", segmentIdentifier);
}

@Override
public void chunkInterval(Interval interval)
{
builder.setDimension("chunkInterval", interval.toString());
setDimension("chunkInterval", interval.toString());
}

@Override
public QueryMetrics<QueryType> reportQueryTime(long timeNs)
{
return defaultTimeMetric("query/time", timeNs);
return reportMillisTimeMetric("query/time", timeNs);
}

@Override
public QueryMetrics<QueryType> reportQueryBytes(long byteCount)
{
metrics.put("query/bytes", byteCount);
return this;
return reportMetric("query/bytes", byteCount);
}

@Override
public QueryMetrics<QueryType> reportWaitTime(long timeNs)
{
return defaultTimeMetric("query/wait/time", timeNs);
return reportMillisTimeMetric("query/wait/time", timeNs);
}

@Override
public QueryMetrics<QueryType> reportSegmentTime(long timeNs)
{
return defaultTimeMetric("query/segment/time", timeNs);
return reportMillisTimeMetric("query/segment/time", timeNs);
}

@Override
public QueryMetrics<QueryType> reportSegmentAndCacheTime(long timeNs)
{
return defaultTimeMetric("query/segmentAndCache/time", timeNs);
return reportMillisTimeMetric("query/segmentAndCache/time", timeNs);
}

@Override
public QueryMetrics<QueryType> reportIntervalChunkTime(long timeNs)
{
return defaultTimeMetric("query/intervalChunk/time", timeNs);
return reportMillisTimeMetric("query/intervalChunk/time", timeNs);
}

@Override
public QueryMetrics<QueryType> reportCpuTime(long timeNs)
{
metrics.put("query/cpu/time", TimeUnit.NANOSECONDS.toMicros(timeNs));
return this;
return reportMetric("query/cpu/time", TimeUnit.NANOSECONDS.toMicros(timeNs));
}

@Override
public QueryMetrics<QueryType> reportNodeTimeToFirstByte(long timeNs)
{
return defaultTimeMetric("query/node/ttfb", timeNs);
return reportMillisTimeMetric("query/node/ttfb", timeNs);
}

@Override
public QueryMetrics<QueryType> reportNodeTime(long timeNs)
{
return defaultTimeMetric("query/node/time", timeNs);
return reportMillisTimeMetric("query/node/time", timeNs);
}

private QueryMetrics<QueryType> defaultTimeMetric(String metricName, long timeNs)
private QueryMetrics<QueryType> reportMillisTimeMetric(String metricName, long timeNs)
{
metrics.put(metricName, TimeUnit.NANOSECONDS.toMillis(timeNs));
return reportMetric(metricName, TimeUnit.NANOSECONDS.toMillis(timeNs));
}

protected QueryMetrics<QueryType> reportMetric(String metricName, Number value)
{
checkModifiedFromOwnerThread();
metrics.put(metricName, value);
return this;
}

@Override
public QueryMetrics<QueryType> reportNodeBytes(long byteCount)
{
metrics.put("query/node/bytes", byteCount);
return this;
return reportMetric("query/node/bytes", byteCount);
}

@Override
public void emit(ServiceEmitter emitter)
{
checkModifiedFromOwnerThread();
for (Map.Entry<String, Number> metric : metrics.entrySet()) {
emitter.emit(builder.build(metric.getKey(), metric.getValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> r
final Pair<Queue, Accumulator<Queue, T>> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair();
final boolean bySegment = QueryContexts.isBySegment(query);
final int priority = QueryContexts.getPriority(query);

ListenableFuture<List<Void>> futures = Futures.allAsList(
final QueryPlus<T> threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState();
final ListenableFuture<List<Void>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
Expand All @@ -114,10 +114,10 @@ public Void call() throws Exception
{
try {
if (bySegment) {
input.run(queryPlus, responseContext)
input.run(threadSafeQueryPlus, responseContext)
.accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs);
} else {
input.run(queryPlus, responseContext)
input.run(threadSafeQueryPlus, responseContext)
.accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@
public class MetricsEmittingQueryRunner<T> implements QueryRunner<T>
{
private final ServiceEmitter emitter;
private final QueryToolChest<?, ? super Query<T>> queryToolChest;
private final QueryToolChest<T, ? extends Query<T>> queryToolChest;
private final QueryRunner<T> queryRunner;
private final long creationTimeNs;
private final ObjLongConsumer<? super QueryMetrics<? super Query<T>>> reportMetric;
private final Consumer<QueryMetrics<? super Query<T>>> applyCustomDimensions;
private final ObjLongConsumer<? super QueryMetrics<?>> reportMetric;
private final Consumer<QueryMetrics<?>> applyCustomDimensions;

private MetricsEmittingQueryRunner(
ServiceEmitter emitter,
QueryToolChest<?, ? super Query<T>> queryToolChest,
QueryToolChest<T, ? extends Query<T>> queryToolChest,
QueryRunner<T> queryRunner,
long creationTimeNs,
ObjLongConsumer<? super QueryMetrics<? super Query<T>>> reportMetric,
Consumer<QueryMetrics<? super Query<T>>> applyCustomDimensions
ObjLongConsumer<? super QueryMetrics<?>> reportMetric,
Consumer<QueryMetrics<?>> applyCustomDimensions
)
{
this.emitter = emitter;
Expand All @@ -59,10 +59,10 @@ private MetricsEmittingQueryRunner(

public MetricsEmittingQueryRunner(
ServiceEmitter emitter,
QueryToolChest<?, ? super Query<T>> queryToolChest,
QueryToolChest<T, ? extends Query<T>> queryToolChest,
QueryRunner<T> queryRunner,
ObjLongConsumer<? super QueryMetrics<? super Query<T>>> reportMetric,
Consumer<QueryMetrics<? super Query<T>>> applyCustomDimensions
ObjLongConsumer<? super QueryMetrics<?>> reportMetric,
Consumer<QueryMetrics<?>> applyCustomDimensions
)
{
this(emitter, queryToolChest, queryRunner, -1, reportMetric, applyCustomDimensions);
Expand All @@ -83,8 +83,8 @@ public MetricsEmittingQueryRunner<T> withWaitMeasuredFromNow()
@Override
public Sequence<T> run(final QueryPlus<T> queryPlus, final Map<String, Object> responseContext)
{
QueryPlus<T> queryWithMetrics = queryPlus.withQueryMetrics((QueryToolChest<T, ? extends Query<T>>) queryToolChest);
final QueryMetrics<? super Query<T>> queryMetrics = (QueryMetrics<? super Query<T>>) queryWithMetrics.getQueryMetrics();
QueryPlus<T> queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest);
final QueryMetrics<?> queryMetrics = queryWithMetrics.getQueryMetrics();

applyCustomDimensions.accept(queryMetrics);

Expand Down
Loading