Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.google.inject.Inject;
import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.DruidMetrics;
import io.druid.query.Query;
import io.druid.query.QueryMetrics;
import io.druid.query.GenericQueryMetricsFactory;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.aggregation.MetricManipulationFn;
Expand All @@ -39,6 +40,14 @@ public class ScanQueryQueryToolChest extends QueryToolChest<ScanResultValue, Sca
{
};

private final GenericQueryMetricsFactory queryMetricsFactory;

@Inject
public ScanQueryQueryToolChest(GenericQueryMetricsFactory queryMetricsFactory)
{
this.queryMetricsFactory = queryMetricsFactory;
}

@Override
public QueryRunner<ScanResultValue> mergeResults(final QueryRunner<ScanResultValue> runner)
{
Expand Down Expand Up @@ -74,9 +83,9 @@ public void cleanup(ScanQueryLimitRowIterator iterFromMake)
}

@Override
public ServiceMetricEvent.Builder makeMetricBuilder(ScanQuery query)
public QueryMetrics<Query<?>> makeMetrics(ScanQuery query)
{
return DruidMetrics.makePartialQueryTimeMetric(query);
return queryMetricsFactory.makeMetrics(query);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.druid.java.util.common.guava.MergeSequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
Expand Down Expand Up @@ -63,7 +64,9 @@
@RunWith(Parameterized.class)
public class MultiSegmentScanQueryTest
{
private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest();
private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest(
DefaultGenericQueryMetricsFactory.instance()
);

private static final QueryRunnerFactory<ScanResultValue, ScanQuery> factory = new ScanQueryRunnerFactory(
toolChest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.Sets;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequences;
import io.druid.query.DefaultGenericQueryMetricsFactory;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerTestHelper;
import io.druid.query.TableDataSource;
Expand Down Expand Up @@ -95,7 +96,9 @@ public class ScanQueryRunnerTest
);
public static final String[] V_0112_0114 = ObjectArrays.concat(V_0112, V_0113, String.class);

private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest();
private static final ScanQueryQueryToolChest toolChest = new ScanQueryQueryToolChest(
DefaultGenericQueryMetricsFactory.instance()
);

@Parameterized.Parameters(name = "{0}")
public static Iterable<Object[]> constructorFeeder() throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@

package io.druid.query;

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.common.utils.VMUtils;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
Expand All @@ -36,14 +33,14 @@
public class CPUTimeMetricQueryRunner<T> implements QueryRunner<T>
{
private final QueryRunner<T> delegate;
private final Function<Query<T>, ServiceMetricEvent.Builder> builderFn;
private final QueryToolChest<?, ? super Query<T>> queryToolChest;
private final ServiceEmitter emitter;
private final AtomicLong cpuTimeAccumulator;
private final boolean report;

private CPUTimeMetricQueryRunner(
QueryRunner<T> delegate,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryToolChest<?, ? super Query<T>> queryToolChest,
ServiceEmitter emitter,
AtomicLong cpuTimeAccumulator,
boolean report
Expand All @@ -53,7 +50,7 @@ private CPUTimeMetricQueryRunner(
throw new ISE("Cpu time must enabled");
}
this.delegate = delegate;
this.builderFn = builderFn;
this.queryToolChest = queryToolChest;
this.emitter = emitter;
this.cpuTimeAccumulator = cpuTimeAccumulator == null ? new AtomicLong(0L) : cpuTimeAccumulator;
this.report = report;
Expand Down Expand Up @@ -85,10 +82,9 @@ public <RetType> RetType wrap(Supplier<RetType> sequenceProcessing)
public void after(boolean isDone, Throwable thrown) throws Exception
{
if (report) {
final long cpuTime = cpuTimeAccumulator.get();
if (cpuTime > 0) {
final ServiceMetricEvent.Builder builder = Preconditions.checkNotNull(builderFn.apply(query));
emitter.emit(builder.build("query/cpu/time", cpuTimeAccumulator.get() / 1000));
final long cpuTimeNs = cpuTimeAccumulator.get();
if (cpuTimeNs > 0) {
queryToolChest.makeMetrics(query).reportCpuTime(cpuTimeNs).emit(emitter);
}
}
}
Expand All @@ -98,7 +94,7 @@ public void after(boolean isDone, Throwable thrown) throws Exception

public static <T> QueryRunner<T> safeBuild(
QueryRunner<T> delegate,
Function<Query<T>, ServiceMetricEvent.Builder> builderFn,
QueryToolChest<?, ? super Query<T>> queryToolChest,
ServiceEmitter emitter,
AtomicLong accumulator,
boolean report
Expand All @@ -107,7 +103,7 @@ public static <T> QueryRunner<T> safeBuild(
if (!VMUtils.isThreadCpuTimeEnabled()) {
return delegate;
} else {
return new CPUTimeMetricQueryRunner<>(delegate, builderFn, emitter, accumulator, report);
return new CPUTimeMetricQueryRunner<>(delegate, queryToolChest, emitter, accumulator, report);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import io.druid.jackson.DefaultObjectMapper;

public class DefaultGenericQueryMetricsFactory implements GenericQueryMetricsFactory
{
private static final GenericQueryMetricsFactory INSTANCE =
new DefaultGenericQueryMetricsFactory(new DefaultObjectMapper());

/**
* Should be used only in tests, directly or indirectly (e. g. in {@link
* io.druid.query.search.SearchQueryQueryToolChest#SearchQueryQueryToolChest(
* io.druid.query.search.search.SearchQueryConfig, IntervalChunkingQueryRunnerDecorator)}).
*/
@VisibleForTesting
public static GenericQueryMetricsFactory instance()
{
return INSTANCE;
}

private final ObjectMapper jsonMapper;

@Inject
public DefaultGenericQueryMetricsFactory(ObjectMapper jsonMapper)
{
this.jsonMapper = jsonMapper;
}

@Override
public QueryMetrics<Query<?>> makeMetrics(Query<?> query)
{
DefaultQueryMetrics<Query<?>> queryMetrics = new DefaultQueryMetrics<>(jsonMapper);
queryMetrics.query(query);
return queryMetrics;
}

}
Loading