Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 124 additions & 0 deletions processing/src/main/java/io/druid/query/FluentQueryRunnerBuilder.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.query;

import com.google.common.base.Function;
import com.metamx.common.guava.Sequence;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

public class FluentQueryRunnerBuilder<T>
{
final QueryToolChest<T, Query<T>> toolChest;

public FluentQueryRunner create(QueryRunner<T> baseRunner) {
return new FluentQueryRunner(baseRunner);
}

public FluentQueryRunnerBuilder(QueryToolChest<T, Query<T>> toolChest)
{
this.toolChest = toolChest;
}

public class FluentQueryRunner implements QueryRunner<T>
{
private QueryRunner<T> baseRunner;

public FluentQueryRunner(QueryRunner<T> runner)
{
this.baseRunner = runner;
}

@Override
public Sequence<T> run(
Query<T> query, Map<String, Object> responseContext
)
{
return baseRunner.run(query, responseContext);
}

public FluentQueryRunner from(QueryRunner<T> runner) {
return new FluentQueryRunner(runner);
}

public FluentQueryRunner applyPostMergeDecoration()
{
return from(
new FinalizeResultsQueryRunner<T>(
toolChest.postMergeQueryDecoration(
baseRunner
),
toolChest
)
);
}

public FluentQueryRunner applyPreMergeDecoration()
{
return from(
new UnionQueryRunner<T>(
toolChest.preMergeQueryDecoration(
baseRunner
)
)
);
}

public FluentQueryRunner emitCPUTimeMetric(ServiceEmitter emitter)
{
return from(
CPUTimeMetricQueryRunner.safeBuild(
baseRunner,
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Nullable
@Override
public ServiceMetricEvent.Builder apply(Query<T> tQuery)
{
return toolChest.makeMetricBuilder(tQuery);
}
},
emitter,
new AtomicLong(0L),
true
)
);
}

public FluentQueryRunner postProcess(PostProcessingOperator<T> postProcessing)
{
return from(
postProcessing != null ?
postProcessing.postProcess(baseRunner) : baseRunner
);
}

public FluentQueryRunner mergeResults()
{
return from(
toolChest.mergeResults(baseRunner)
);
}
}
}
87 changes: 41 additions & 46 deletions processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -442,61 +442,56 @@ public static <T> QueryRunner<T> makeUnionQueryRunner(
Segment adapter
)
{
return new FinalizeResultsQueryRunner<T>(
factory.getToolchest().postMergeQueryDecoration(
factory.getToolchest().mergeResults(
new UnionQueryRunner<T>(
new BySegmentQueryRunner<T>(
segmentId, adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
)
return new FluentQueryRunnerBuilder<T>(factory.getToolchest())
.create(
new UnionQueryRunner<T>(
new BySegmentQueryRunner<T>(
segmentId, adapter.getDataInterval().getStart(),
factory.createRunner(adapter)
)
)
),
factory.getToolchest()
);
)
.mergeResults()
.applyPostMergeDecoration();
}

public static <T> QueryRunner<T> makeFilteringQueryRunner(
final VersionedIntervalTimeline<String, Segment> timeline,
final QueryRunnerFactory<T, Query<T>> factory) {

final QueryToolChest<T, Query<T>> toolChest = factory.getToolchest();
return new FinalizeResultsQueryRunner(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
toolChest.preMergeQueryDecoration(
new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
List<TimelineObjectHolder> segments = Lists.newArrayList();
for (Interval interval : query.getIntervals()) {
segments.addAll(timeline.lookup(interval));
}
List<Sequence<T>> sequences = Lists.newArrayList();
for (TimelineObjectHolder<String, Segment> holder : toolChest.filterSegments(query, segments)) {
Segment segment = holder.getObject().getChunk(0).getObject();
Query running = query.withQuerySegmentSpec(
new SpecificSegmentSpec(
new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
0
)
)
);
sequences.add(factory.createRunner(segment).run(running, responseContext));
}
return new MergeSequence<>(query.getResultOrdering(), Sequences.simple(sequences));
}
}
)
)
),
toolChest
);
return new FluentQueryRunnerBuilder<T>(toolChest)
.create(
new QueryRunner<T>()
{
@Override
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
List<TimelineObjectHolder> segments = Lists.newArrayList();
for (Interval interval : query.getIntervals()) {
segments.addAll(timeline.lookup(interval));
}
List<Sequence<T>> sequences = Lists.newArrayList();
for (TimelineObjectHolder<String, Segment> holder : toolChest.filterSegments(query, segments)) {
Segment segment = holder.getObject().getChunk(0).getObject();
Query running = query.withQuerySegmentSpec(
new SpecificSegmentSpec(
new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
0
)
)
);
sequences.add(factory.createRunner(segment).run(running, responseContext));
}
return new MergeSequence<>(query.getResultOrdering(), Sequences.simple(sequences));
}
}
)
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration();
}

public static IntervalChunkingQueryRunnerDecorator NoopIntervalChunkingQueryRunnerDecorator()
Expand Down
76 changes: 23 additions & 53 deletions server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,10 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachingClusteredClient;
import io.druid.query.CPUTimeMetricQueryRunner;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.FluentQueryRunnerBuilder;
import io.druid.query.PostProcessingOperator;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
Expand All @@ -37,13 +34,8 @@
import io.druid.query.RetryQueryRunner;
import io.druid.query.RetryQueryRunnerConfig;
import io.druid.query.SegmentDescriptor;
import io.druid.query.UnionQueryRunner;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
*/
public class ClientQuerySegmentWalker implements QuerySegmentWalker
Expand Down Expand Up @@ -82,53 +74,31 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
return makeRunner(query);
}

private <T> QueryRunner<T> makeRunner(final Query<T> query)
private <T> QueryRunner<T> makeRunner(Query<T> query)
{
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
final QueryRunner<T> baseRunner = CPUTimeMetricQueryRunner.safeBuild(
new FinalizeResultsQueryRunner<T>(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
new UnionQueryRunner<T>(
toolChest.preMergeQueryDecoration(
new RetryQueryRunner<T>(
baseClient,
toolChest,
retryConfig,
objectMapper
)
)
)
)
),
toolChest
),
new Function<Query<T>, ServiceMetricEvent.Builder>()
QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
PostProcessingOperator<T> postProcessing = objectMapper.convertValue(
query.<String>getContextValue("postProcessing"),
new TypeReference<PostProcessingOperator<T>>()
{
@Nullable
@Override
public ServiceMetricEvent.Builder apply(Query<T> tQuery)
{
return toolChest.makeMetricBuilder(tQuery);
}
},
emitter,
new AtomicLong(0L),
true
}
);

final Map<String, Object> context = query.getContext();
PostProcessingOperator<T> postProcessing = null;
if (context != null) {
postProcessing = objectMapper.convertValue(
context.get("postProcessing"),
new TypeReference<PostProcessingOperator<T>>()
{
}
);
}

return postProcessing != null ?
postProcessing.postProcess(baseRunner) : baseRunner;
return new FluentQueryRunnerBuilder<>(toolChest)
.create(
new RetryQueryRunner<>(
baseClient,
toolChest,
retryConfig,
objectMapper
)
)
.applyPreMergeDecoration()
.mergeResults()
.applyPostMergeDecoration()
.emitCPUTimeMetric(emitter)
.postProcess(postProcessing);
}


}