From 107245aa74df14cc78fad968a72084d170947e3c Mon Sep 17 00:00:00 2001 From: leventov Date: Mon, 15 May 2017 17:14:06 -0500 Subject: [PATCH 1/2] Don't pass QueryMetrics down in concurrent and async QueryRunners --- .../java/io/druid/query/AsyncQueryRunner.java | 3 ++- .../query/ChainedExecutionQueryRunner.java | 4 +-- .../druid/query/GroupByMergedQueryRunner.java | 8 +++--- .../main/java/io/druid/query/QueryPlus.java | 25 +++++++++++++++++++ .../GroupByMergingQueryRunnerV2.java | 8 +++--- .../SegmentMetadataQueryRunnerFactory.java | 8 ++++-- 6 files changed, 44 insertions(+), 12 deletions(-) diff --git a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java index 8de2a2b17d7c..c8e063bb1bde 100644 --- a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java +++ b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java @@ -51,6 +51,7 @@ public Sequence run(final QueryPlus queryPlus, final Map r { final Query query = queryPlus.getQuery(); final int priority = QueryContexts.getPriority(query); + final QueryPlus threadSafeQueryPlus = queryPlus.threadSafe(); final ListenableFuture> future = executor.submit(new AbstractPrioritizedCallable>(priority) { @Override @@ -58,7 +59,7 @@ public Sequence call() throws Exception { //Note: this is assumed that baseRunner does most of the work eagerly on call to the //run() method and resulting sequence accumulate/yield is fast. - return baseRunner.run(queryPlus, responseContext); + return baseRunner.run(threadSafeQueryPlus, responseContext); } }); queryWatcher.registerQuery(query, future); diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 7474844cb24d..6d20696ff8d1 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -94,7 +94,7 @@ public Sequence run(final QueryPlus queryPlus, final Map r Query query = queryPlus.getQuery(); final int priority = QueryContexts.getPriority(query); final Ordering ordering = query.getResultOrdering(); - + final QueryPlus threadSafeQueryPlus = queryPlus.threadSafe(); return new BaseSequence>( new BaseSequence.IteratorMaker>() { @@ -122,7 +122,7 @@ public ListenableFuture> apply(final QueryRunner input) public Iterable call() throws Exception { try { - Sequence result = input.run(queryPlus, responseContext); + Sequence result = input.run(threadSafeQueryPlus, responseContext); if (result == null) { throw new ISE("Got a null result! Segments are missing!"); } diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index 295e79eee23f..92c7a1cd06cb 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -92,8 +92,8 @@ public Sequence run(final QueryPlus queryPlus, final Map r final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); final boolean bySegment = QueryContexts.isBySegment(query); final int priority = QueryContexts.getPriority(query); - - ListenableFuture> futures = Futures.allAsList( + final QueryPlus threadSafeQueryPlus = queryPlus.threadSafe(); + final ListenableFuture> futures = Futures.allAsList( Lists.newArrayList( Iterables.transform( queryables, @@ -114,10 +114,10 @@ public Void call() throws Exception { try { if (bySegment) { - input.run(queryPlus, responseContext) + input.run(threadSafeQueryPlus, responseContext) .accumulate(bySegmentAccumulatorPair.lhs, bySegmentAccumulatorPair.rhs); } else { - input.run(queryPlus, responseContext) + input.run(threadSafeQueryPlus, responseContext) .accumulate(indexAccumulatorPair.lhs, indexAccumulatorPair.rhs); } diff --git a/processing/src/main/java/io/druid/query/QueryPlus.java b/processing/src/main/java/io/druid/query/QueryPlus.java index ae6977011776..7d6c50b377c4 100644 --- a/processing/src/main/java/io/druid/query/QueryPlus.java +++ b/processing/src/main/java/io/druid/query/QueryPlus.java @@ -76,6 +76,31 @@ public QueryPlus withQueryMetrics(QueryToolChest> query } } + /** + * Returns a QueryPlus object without the components which are unsafe for concurrent use from multiple threads, + * therefore couldn't be passed down in concurrent or async {@link QueryRunner}s. + * + * Currently the only unsafe component is {@link QueryMetrics}, i. e. {@code threadSafe()} call is equivalent to + * {@link #withoutQueryMetrics()}. + */ + public QueryPlus threadSafe() + { + return withoutQueryMetrics(); + } + + /** + * Returns the same QueryPlus object, if it doesn't have {@link QueryMetrics} ({@link #getQueryMetrics()} returns + * null), or returns a new QueryPlus object with {@link Query} from this QueryPlus and null as QueryMetrics. + */ + private QueryPlus withoutQueryMetrics() + { + if (queryMetrics == null) { + return this; + } else { + return new QueryPlus<>(query, null); + } + } + /** * Equivalent of withQuery(getQuery().withQuerySegmentSpec(spec)). */ diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 242915ddf13f..698deb226ec3 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -119,9 +119,11 @@ public Sequence run(final QueryPlus queryPlus, final Map queryPlusForRunners = queryPlus.withQuery( - query.withOverriddenContext(ImmutableMap.of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true)) - ); + final QueryPlus queryPlusForRunners = queryPlus + .withQuery( + query.withOverriddenContext(ImmutableMap.of(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true)) + ) + .threadSafe(); if (QueryContexts.isBySegment(query) || forceChainedExecution) { ChainedExecutionQueryRunner runner = new ChainedExecutionQueryRunner<>(exec, queryWatcher, queryables); diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index b625fec927b0..ff31652c91a1 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -205,14 +205,18 @@ public Sequence run( { final Query query = queryPlus.getQuery(); final int priority = QueryContexts.getPriority(query); - ListenableFuture> future = queryExecutor.submit( + final QueryPlus threadSafeQueryPlus = queryPlus.threadSafe(); + final ListenableFuture> future = queryExecutor.submit( new AbstractPrioritizedCallable>(priority) { @Override public Sequence call() throws Exception { return Sequences.simple( - Sequences.toList(input.run(queryPlus, responseContext), new ArrayList<>()) + Sequences.toList( + input.run(threadSafeQueryPlus, responseContext), + new ArrayList<>() + ) ); } } From f5d3672e7cc419113626977d117a11b33c289c5b Mon Sep 17 00:00:00 2001 From: leventov Date: Fri, 19 May 2017 15:29:59 -0500 Subject: [PATCH 2/2] Rename QueryPlus.threadSafe() to withoutThreadUnsafeState(); Update QueryPlus.withQueryMetrics() Javadocs; Fix generics in MetricsEmittingQueryRunner and CpuTimeMetricQueryRunner; Make DefaultQueryMetrics to fail fast on modifications from concurrent threads --- .../java/io/druid/query/AsyncQueryRunner.java | 2 +- .../druid/query/CPUTimeMetricQueryRunner.java | 9 +- .../query/ChainedExecutionQueryRunner.java | 2 +- .../io/druid/query/DefaultQueryMetrics.java | 90 ++++++++++++------- .../druid/query/GroupByMergedQueryRunner.java | 2 +- .../query/MetricsEmittingQueryRunner.java | 22 ++--- .../main/java/io/druid/query/QueryPlus.java | 11 ++- .../groupby/DefaultGroupByQueryMetrics.java | 6 +- .../GroupByMergingQueryRunnerV2.java | 2 +- .../SegmentMetadataQueryRunnerFactory.java | 2 +- .../DefaultTimeseriesQueryMetrics.java | 4 +- .../query/topn/DefaultTopNQueryMetrics.java | 8 +- .../appenderator/SinkQuerySegmentWalker.java | 2 +- 13 files changed, 96 insertions(+), 66 deletions(-) diff --git a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java index c8e063bb1bde..182d18b1f0d8 100644 --- a/processing/src/main/java/io/druid/query/AsyncQueryRunner.java +++ b/processing/src/main/java/io/druid/query/AsyncQueryRunner.java @@ -51,7 +51,7 @@ public Sequence run(final QueryPlus queryPlus, final Map r { final Query query = queryPlus.getQuery(); final int priority = QueryContexts.getPriority(query); - final QueryPlus threadSafeQueryPlus = queryPlus.threadSafe(); + final QueryPlus threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState(); final ListenableFuture> future = executor.submit(new AbstractPrioritizedCallable>(priority) { @Override diff --git a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java index 141f2bd075bb..5f90615fba4a 100644 --- a/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java +++ b/processing/src/main/java/io/druid/query/CPUTimeMetricQueryRunner.java @@ -33,14 +33,14 @@ public class CPUTimeMetricQueryRunner implements QueryRunner { private final QueryRunner delegate; - private final QueryToolChest> queryToolChest; + private final QueryToolChest> queryToolChest; private final ServiceEmitter emitter; private final AtomicLong cpuTimeAccumulator; private final boolean report; private CPUTimeMetricQueryRunner( QueryRunner delegate, - QueryToolChest> queryToolChest, + QueryToolChest> queryToolChest, ServiceEmitter emitter, AtomicLong cpuTimeAccumulator, boolean report @@ -60,8 +60,7 @@ private CPUTimeMetricQueryRunner( @Override public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - final QueryPlus queryWithMetrics = - queryPlus.withQueryMetrics((QueryToolChest>) queryToolChest); + final QueryPlus queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest); final Sequence baseSequence = delegate.run(queryWithMetrics, responseContext); return Sequences.wrap( baseSequence, @@ -94,7 +93,7 @@ public void after(boolean isDone, Throwable thrown) throws Exception public static QueryRunner safeBuild( QueryRunner delegate, - QueryToolChest> queryToolChest, + QueryToolChest> queryToolChest, ServiceEmitter emitter, AtomicLong accumulator, boolean report diff --git a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java index 6d20696ff8d1..1312fb1b024e 100644 --- a/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/ChainedExecutionQueryRunner.java @@ -94,7 +94,7 @@ public Sequence run(final QueryPlus queryPlus, final Map r Query query = queryPlus.getQuery(); final int priority = QueryContexts.getPriority(query); final Ordering ordering = query.getResultOrdering(); - final QueryPlus threadSafeQueryPlus = queryPlus.threadSafe(); + final QueryPlus threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState(); return new BaseSequence>( new BaseSequence.IteratorMaker>() { diff --git a/processing/src/main/java/io/druid/query/DefaultQueryMetrics.java b/processing/src/main/java/io/druid/query/DefaultQueryMetrics.java index a19053be8789..57c6099f23bd 100644 --- a/processing/src/main/java/io/druid/query/DefaultQueryMetrics.java +++ b/processing/src/main/java/io/druid/query/DefaultQueryMetrics.java @@ -31,17 +31,42 @@ import java.util.Map; import java.util.concurrent.TimeUnit; +/** + * DefaultQueryMetrics is unsafe for use from multiple threads. It fails with RuntimeException on access not from the + * thread where it was constructed. To "transfer" DefaultQueryMetrics from one thread to another {@link #ownerThread} + * field should be updated. + */ public class DefaultQueryMetrics> implements QueryMetrics { protected final ObjectMapper jsonMapper; protected final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder(); protected final Map metrics = new HashMap<>(); + /** Non final to give subclasses ability to reassign it. */ + protected Thread ownerThread = Thread.currentThread(); + public DefaultQueryMetrics(ObjectMapper jsonMapper) { this.jsonMapper = jsonMapper; } + protected void checkModifiedFromOwnerThread() + { + if (Thread.currentThread() != ownerThread) { + throw new IllegalStateException( + "DefaultQueryMetrics must not be modified from multiple threads. If it is needed to gather dimension or " + + "metric information from multiple threads or from an async thread, this information should explicitly be " + + "passed between threads (e. g. using Futures), or this DefaultQueryMetrics's ownerThread should be " + + "reassigned explicitly"); + } + } + + protected void setDimension(String dimension, String value) + { + checkModifiedFromOwnerThread(); + builder.setDimension(dimension, value); + } + @Override public void query(QueryType query) { @@ -56,18 +81,19 @@ public void query(QueryType query) @Override public void dataSource(QueryType query) { - builder.setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource())); + setDimension(DruidMetrics.DATASOURCE, DataSourceUtil.getMetricName(query.getDataSource())); } @Override public void queryType(QueryType query) { - builder.setDimension(DruidMetrics.TYPE, query.getType()); + setDimension(DruidMetrics.TYPE, query.getType()); } @Override public void interval(QueryType query) { + checkModifiedFromOwnerThread(); builder.setDimension( DruidMetrics.INTERVAL, query.getIntervals().stream().map(Interval::toString).toArray(String[]::new) @@ -77,32 +103,28 @@ public void interval(QueryType query) @Override public void hasFilters(QueryType query) { - builder.setDimension("hasFilters", String.valueOf(query.hasFilters())); + setDimension("hasFilters", String.valueOf(query.hasFilters())); } @Override public void duration(QueryType query) { - builder.setDimension("duration", query.getDuration().toString()); + setDimension("duration", query.getDuration().toString()); } @Override public void queryId(QueryType query) { - builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId())); + setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId())); } @Override public void context(QueryType query) { try { - builder.setDimension( + setDimension( "context", - jsonMapper.writeValueAsString( - query.getContext() == null - ? ImmutableMap.of() - : query.getContext() - ) + jsonMapper.writeValueAsString(query.getContext() == null ? ImmutableMap.of() : query.getContext()) ); } catch (JsonProcessingException e) { @@ -113,111 +135,115 @@ public void context(QueryType query) @Override public void server(String host) { - builder.setDimension("server", host); + setDimension("server", host); } @Override public void remoteAddress(String remoteAddress) { - builder.setDimension("remoteAddress", remoteAddress); + setDimension("remoteAddress", remoteAddress); } @Override public void status(String status) { - builder.setDimension(DruidMetrics.STATUS, status); + setDimension(DruidMetrics.STATUS, status); } @Override public void success(boolean success) { - builder.setDimension("success", String.valueOf(success)); + setDimension("success", String.valueOf(success)); } @Override public void segment(String segmentIdentifier) { - builder.setDimension("segment", segmentIdentifier); + setDimension("segment", segmentIdentifier); } @Override public void chunkInterval(Interval interval) { - builder.setDimension("chunkInterval", interval.toString()); + setDimension("chunkInterval", interval.toString()); } @Override public QueryMetrics reportQueryTime(long timeNs) { - return defaultTimeMetric("query/time", timeNs); + return reportMillisTimeMetric("query/time", timeNs); } @Override public QueryMetrics reportQueryBytes(long byteCount) { - metrics.put("query/bytes", byteCount); - return this; + return reportMetric("query/bytes", byteCount); } @Override public QueryMetrics reportWaitTime(long timeNs) { - return defaultTimeMetric("query/wait/time", timeNs); + return reportMillisTimeMetric("query/wait/time", timeNs); } @Override public QueryMetrics reportSegmentTime(long timeNs) { - return defaultTimeMetric("query/segment/time", timeNs); + return reportMillisTimeMetric("query/segment/time", timeNs); } @Override public QueryMetrics reportSegmentAndCacheTime(long timeNs) { - return defaultTimeMetric("query/segmentAndCache/time", timeNs); + return reportMillisTimeMetric("query/segmentAndCache/time", timeNs); } @Override public QueryMetrics reportIntervalChunkTime(long timeNs) { - return defaultTimeMetric("query/intervalChunk/time", timeNs); + return reportMillisTimeMetric("query/intervalChunk/time", timeNs); } @Override public QueryMetrics reportCpuTime(long timeNs) { - metrics.put("query/cpu/time", TimeUnit.NANOSECONDS.toMicros(timeNs)); - return this; + return reportMetric("query/cpu/time", TimeUnit.NANOSECONDS.toMicros(timeNs)); } @Override public QueryMetrics reportNodeTimeToFirstByte(long timeNs) { - return defaultTimeMetric("query/node/ttfb", timeNs); + return reportMillisTimeMetric("query/node/ttfb", timeNs); } @Override public QueryMetrics reportNodeTime(long timeNs) { - return defaultTimeMetric("query/node/time", timeNs); + return reportMillisTimeMetric("query/node/time", timeNs); } - private QueryMetrics defaultTimeMetric(String metricName, long timeNs) + private QueryMetrics reportMillisTimeMetric(String metricName, long timeNs) { - metrics.put(metricName, TimeUnit.NANOSECONDS.toMillis(timeNs)); + return reportMetric(metricName, TimeUnit.NANOSECONDS.toMillis(timeNs)); + } + + protected QueryMetrics reportMetric(String metricName, Number value) + { + checkModifiedFromOwnerThread(); + metrics.put(metricName, value); return this; } @Override public QueryMetrics reportNodeBytes(long byteCount) { - metrics.put("query/node/bytes", byteCount); - return this; + return reportMetric("query/node/bytes", byteCount); } @Override public void emit(ServiceEmitter emitter) { + checkModifiedFromOwnerThread(); for (Map.Entry metric : metrics.entrySet()) { emitter.emit(builder.build(metric.getKey(), metric.getValue())); } diff --git a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java index 92c7a1cd06cb..dda375b100be 100644 --- a/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java +++ b/processing/src/main/java/io/druid/query/GroupByMergedQueryRunner.java @@ -92,7 +92,7 @@ public Sequence run(final QueryPlus queryPlus, final Map r final Pair> bySegmentAccumulatorPair = GroupByQueryHelper.createBySegmentAccumulatorPair(); final boolean bySegment = QueryContexts.isBySegment(query); final int priority = QueryContexts.getPriority(query); - final QueryPlus threadSafeQueryPlus = queryPlus.threadSafe(); + final QueryPlus threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState(); final ListenableFuture> futures = Futures.allAsList( Lists.newArrayList( Iterables.transform( diff --git a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java index 06c563d89714..69d1db8094c2 100644 --- a/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java +++ b/processing/src/main/java/io/druid/query/MetricsEmittingQueryRunner.java @@ -34,19 +34,19 @@ public class MetricsEmittingQueryRunner implements QueryRunner { private final ServiceEmitter emitter; - private final QueryToolChest> queryToolChest; + private final QueryToolChest> queryToolChest; private final QueryRunner queryRunner; private final long creationTimeNs; - private final ObjLongConsumer>> reportMetric; - private final Consumer>> applyCustomDimensions; + private final ObjLongConsumer> reportMetric; + private final Consumer> applyCustomDimensions; private MetricsEmittingQueryRunner( ServiceEmitter emitter, - QueryToolChest> queryToolChest, + QueryToolChest> queryToolChest, QueryRunner queryRunner, long creationTimeNs, - ObjLongConsumer>> reportMetric, - Consumer>> applyCustomDimensions + ObjLongConsumer> reportMetric, + Consumer> applyCustomDimensions ) { this.emitter = emitter; @@ -59,10 +59,10 @@ private MetricsEmittingQueryRunner( public MetricsEmittingQueryRunner( ServiceEmitter emitter, - QueryToolChest> queryToolChest, + QueryToolChest> queryToolChest, QueryRunner queryRunner, - ObjLongConsumer>> reportMetric, - Consumer>> applyCustomDimensions + ObjLongConsumer> reportMetric, + Consumer> applyCustomDimensions ) { this(emitter, queryToolChest, queryRunner, -1, reportMetric, applyCustomDimensions); @@ -83,8 +83,8 @@ public MetricsEmittingQueryRunner withWaitMeasuredFromNow() @Override public Sequence run(final QueryPlus queryPlus, final Map responseContext) { - QueryPlus queryWithMetrics = queryPlus.withQueryMetrics((QueryToolChest>) queryToolChest); - final QueryMetrics> queryMetrics = (QueryMetrics>) queryWithMetrics.getQueryMetrics(); + QueryPlus queryWithMetrics = queryPlus.withQueryMetrics(queryToolChest); + final QueryMetrics queryMetrics = queryWithMetrics.getQueryMetrics(); applyCustomDimensions.accept(queryMetrics); diff --git a/processing/src/main/java/io/druid/query/QueryPlus.java b/processing/src/main/java/io/druid/query/QueryPlus.java index 7d6c50b377c4..f6453068bd82 100644 --- a/processing/src/main/java/io/druid/query/QueryPlus.java +++ b/processing/src/main/java/io/druid/query/QueryPlus.java @@ -66,6 +66,11 @@ public QueryMetrics getQueryMetrics() * Returns the same QueryPlus object, if it already has {@link QueryMetrics} ({@link #getQueryMetrics()} returns not * null), or returns a new QueryPlus object with {@link Query} from this QueryPlus and QueryMetrics created using the * given {@link QueryToolChest}, via {@link QueryToolChest#makeMetrics(Query)} method. + * + * By convention, callers of {@code withQueryMetrics()} must also call .getQueryMetrics().emit() on the returned + * QueryMetrics object, regardless if this object is the same as the object on which .withQueryMetrics() was initially + * called (i. e. it already had non-null QueryMetrics), or if it is a new QueryPlus object. See {@link + * MetricsEmittingQueryRunner} for example. */ public QueryPlus withQueryMetrics(QueryToolChest> queryToolChest) { @@ -80,10 +85,10 @@ public QueryPlus withQueryMetrics(QueryToolChest> query * Returns a QueryPlus object without the components which are unsafe for concurrent use from multiple threads, * therefore couldn't be passed down in concurrent or async {@link QueryRunner}s. * - * Currently the only unsafe component is {@link QueryMetrics}, i. e. {@code threadSafe()} call is equivalent to - * {@link #withoutQueryMetrics()}. + * Currently the only unsafe component is {@link QueryMetrics}, i. e. {@code withoutThreadUnsafeState()} call is + * equivalent to {@link #withoutQueryMetrics()}. */ - public QueryPlus threadSafe() + public QueryPlus withoutThreadUnsafeState() { return withoutQueryMetrics(); } diff --git a/processing/src/main/java/io/druid/query/groupby/DefaultGroupByQueryMetrics.java b/processing/src/main/java/io/druid/query/groupby/DefaultGroupByQueryMetrics.java index 5d8ee7321fc3..808269e949a1 100644 --- a/processing/src/main/java/io/druid/query/groupby/DefaultGroupByQueryMetrics.java +++ b/processing/src/main/java/io/druid/query/groupby/DefaultGroupByQueryMetrics.java @@ -43,19 +43,19 @@ public void query(GroupByQuery query) @Override public void numDimensions(GroupByQuery query) { - builder.setDimension("numDimensions", String.valueOf(query.getDimensions().size())); + setDimension("numDimensions", String.valueOf(query.getDimensions().size())); } @Override public void numMetrics(GroupByQuery query) { - builder.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size())); + setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size())); } @Override public void numComplexMetrics(GroupByQuery query) { int numComplexAggs = DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()); - builder.setDimension("numComplexMetrics", String.valueOf(numComplexAggs)); + setDimension("numComplexMetrics", String.valueOf(numComplexAggs)); } } diff --git a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java index 698deb226ec3..c7b09faee993 100644 --- a/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java +++ b/processing/src/main/java/io/druid/query/groupby/epinephelinae/GroupByMergingQueryRunnerV2.java @@ -123,7 +123,7 @@ public Sequence run(final QueryPlus queryPlus, final Mapof(CTX_KEY_MERGE_RUNNERS_USING_CHAINED_EXECUTION, true)) ) - .threadSafe(); + .withoutThreadUnsafeState(); if (QueryContexts.isBySegment(query) || forceChainedExecution) { ChainedExecutionQueryRunner runner = new ChainedExecutionQueryRunner<>(exec, queryWatcher, queryables); diff --git a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java index ff31652c91a1..a3938574b064 100644 --- a/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java +++ b/processing/src/main/java/io/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java @@ -205,7 +205,7 @@ public Sequence run( { final Query query = queryPlus.getQuery(); final int priority = QueryContexts.getPriority(query); - final QueryPlus threadSafeQueryPlus = queryPlus.threadSafe(); + final QueryPlus threadSafeQueryPlus = queryPlus.withoutThreadUnsafeState(); final ListenableFuture> future = queryExecutor.submit( new AbstractPrioritizedCallable>(priority) { diff --git a/processing/src/main/java/io/druid/query/timeseries/DefaultTimeseriesQueryMetrics.java b/processing/src/main/java/io/druid/query/timeseries/DefaultTimeseriesQueryMetrics.java index addaac5a76f5..d8a015bd2a39 100644 --- a/processing/src/main/java/io/druid/query/timeseries/DefaultTimeseriesQueryMetrics.java +++ b/processing/src/main/java/io/druid/query/timeseries/DefaultTimeseriesQueryMetrics.java @@ -42,13 +42,13 @@ public void query(TimeseriesQuery query) @Override public void numMetrics(TimeseriesQuery query) { - builder.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size())); + setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size())); } @Override public void numComplexMetrics(TimeseriesQuery query) { int numComplexAggs = DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()); - builder.setDimension("numComplexMetrics", String.valueOf(numComplexAggs)); + setDimension("numComplexMetrics", String.valueOf(numComplexAggs)); } } diff --git a/processing/src/main/java/io/druid/query/topn/DefaultTopNQueryMetrics.java b/processing/src/main/java/io/druid/query/topn/DefaultTopNQueryMetrics.java index 2be769b6c1fd..9dadbbcfd480 100644 --- a/processing/src/main/java/io/druid/query/topn/DefaultTopNQueryMetrics.java +++ b/processing/src/main/java/io/druid/query/topn/DefaultTopNQueryMetrics.java @@ -44,25 +44,25 @@ public void query(TopNQuery query) @Override public void threshold(TopNQuery query) { - builder.setDimension("threshold", String.valueOf(query.getThreshold())); + setDimension("threshold", String.valueOf(query.getThreshold())); } @Override public void dimension(TopNQuery query) { - builder.setDimension("dimension", query.getDimensionSpec().getDimension()); + setDimension("dimension", query.getDimensionSpec().getDimension()); } @Override public void numMetrics(TopNQuery query) { - builder.setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size())); + setDimension("numMetrics", String.valueOf(query.getAggregatorSpecs().size())); } @Override public void numComplexMetrics(TopNQuery query) { int numComplexAggs = DruidMetrics.findNumComplexAggs(query.getAggregatorSpecs()); - builder.setDimension("numComplexMetrics", String.valueOf(numComplexAggs)); + setDimension("numComplexMetrics", String.valueOf(numComplexAggs)); } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java index 37ef7489831f..0caa56316199 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/SinkQuerySegmentWalker.java @@ -275,7 +275,7 @@ public QueryRunner apply(final FireHydrant hydrant) */ private QueryRunner withPerSinkMetrics( final QueryRunner sinkRunner, - final QueryToolChest> queryToolChest, + final QueryToolChest> queryToolChest, final String sinkSegmentIdentifier, final AtomicLong cpuTimeAccumulator )