From 573aa96bd6d84838025792fde22e85b270c24e67 Mon Sep 17 00:00:00 2001 From: Nishant Date: Tue, 15 Sep 2015 02:24:12 +0530 Subject: [PATCH] fix #1727 - Union bySegment queries fix Fixes #1727. revert to doing merging for results for union queries on broker. revert unrelated changes Add test for union query runner Add test remove unused imports fix imports fix renamed file fix test update docs. --- .../druid/timeline/UnionTimeLineLookup.java | 78 --------- .../VersionedIntervalTimelineTest.java | 75 --------- docs/content/querying/datasource.md | 1 + .../overlord/ThreadPoolTaskRunner.java | 52 +++--- .../java/io/druid/query/UnionQueryRunner.java | 27 +-- .../io/druid/query/QueryRunnerTestHelper.java | 30 +--- .../io/druid/query/UnionQueryRunnerTest.java | 86 ++++++++++ .../TimeSeriesUnionQueryRunnerTest.java | 155 ++++++++---------- .../io/druid/client/BrokerServerView.java | 23 +-- .../segment/realtime/RealtimeManager.java | 45 ++--- .../server/ClientQuerySegmentWalker.java | 12 +- .../server/coordination/ServerManager.java | 34 ++-- 12 files changed, 238 insertions(+), 380 deletions(-) delete mode 100644 common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java create mode 100644 processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java diff --git a/common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java b/common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java deleted file mode 100644 index 18302fe08bce..000000000000 --- a/common/src/main/java/io/druid/timeline/UnionTimeLineLookup.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.timeline; - -import com.google.common.base.Function; -import com.google.common.base.Predicates; -import com.google.common.collect.Iterables; -import com.metamx.common.guava.Comparators; -import io.druid.timeline.partition.PartitionHolder; -import org.joda.time.Interval; - -import java.util.Comparator; - - -public class UnionTimeLineLookup implements TimelineLookup -{ - Iterable> delegates; - - public UnionTimeLineLookup(Iterable> delegates) - { - // delegate can be null in case there is no segment loaded for the dataSource on this node - this.delegates = Iterables.filter(delegates, Predicates.notNull()); - } - - @Override - public Iterable> lookup(final Interval interval) - { - return Iterables.mergeSorted( - Iterables.transform( - delegates, - new Function, Iterable>>() - { - @Override - public Iterable> apply(TimelineLookup input) - { - return input.lookup(interval); - } - } - ), - new Comparator>() - { - @Override - public int compare( - TimelineObjectHolder o1, TimelineObjectHolder o2 - ) - { - return Comparators.intervalsByStartThenEnd().compare(o1.getInterval(), o2.getInterval()); - } - } - ); - } - - public PartitionHolder findEntry(Interval interval, VersionType version) - { - for (TimelineLookup delegate : delegates) { - final PartitionHolder entry = delegate.findEntry(interval, version); - if (entry != null) { - return entry; - } - } - return null; - } -} diff --git a/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java b/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java index c66511993510..2f19ad3b333c 100644 --- a/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java +++ b/common/src/test/java/io/druid/timeline/VersionedIntervalTimelineTest.java @@ -1596,79 +1596,4 @@ private VersionedIntervalTimeline makeStringIntegerTimeline() return new VersionedIntervalTimeline(Ordering.natural()); } - @Test - public void testUnionTimeLineLookup() - { - TimelineLookup lookup = new UnionTimeLineLookup( - Arrays.>asList( - timeline, - timeline - ) - ); - assertValues( - Arrays.asList( - createExpected("2011-04-01/2011-04-02", "3", 5), - createExpected("2011-04-01/2011-04-02", "3", 5), - createExpected("2011-04-02/2011-04-06", "2", 1), - createExpected("2011-04-02/2011-04-06", "2", 1), - createExpected("2011-04-06/2011-04-09", "3", 4), - createExpected("2011-04-06/2011-04-09", "3", 4) - ), - (List)Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-09"))) - ); - } - - @Test - public void testUnionTimeLineLookupNonExistentDelegates() - { - TimelineLookup lookup = new UnionTimeLineLookup( - Arrays.>asList( - timeline, - null, - timeline, - null - ) - ); - assertValues( - Arrays.asList( - createExpected("2011-04-01/2011-04-02", "3", 5), - createExpected("2011-04-01/2011-04-02", "3", 5), - createExpected("2011-04-02/2011-04-06", "2", 1), - createExpected("2011-04-02/2011-04-06", "2", 1), - createExpected("2011-04-06/2011-04-09", "3", 4), - createExpected("2011-04-06/2011-04-09", "3", 4) - ), - (List)Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-09"))) ); - } - - @Test - public void testUnionTimeLineLookupReturnsSortedValues() - { - timeline = makeStringIntegerTimeline(); - add("2011-04-02/2011-04-06", "1", 1); - add("2011-04-03/2011-04-09", "9", 2); - VersionedIntervalTimeline t1 = timeline; - timeline = makeStringIntegerTimeline(); - add("2011-04-01/2011-04-03", "2", 1); - add("2011-04-03/2011-04-10", "8", 2); - VersionedIntervalTimeline t2 = timeline; - TimelineLookup lookup = new UnionTimeLineLookup( - Arrays.>asList( - t1, t2 - ) - ); - assertValues( - Arrays.asList( - createExpected("2011-04-01/2011-04-03", "2", 1), - createExpected("2011-04-02/2011-04-03", "1", 1), - createExpected("2011-04-03/2011-04-09", "9", 2), - createExpected("2011-04-03/2011-04-10", "8", 2) - - ), - (List) Lists.newArrayList(lookup.lookup(new Interval("2011-04-01/2011-04-11"))) - ); - } - - - } diff --git a/docs/content/querying/datasource.md b/docs/content/querying/datasource.md index 045aae119299..68fd0d9dba47 100644 --- a/docs/content/querying/datasource.md +++ b/docs/content/querying/datasource.md @@ -28,6 +28,7 @@ This data source unions two or more table data sources. ``` Note that the data sources being unioned should have the same schema. +Union Queries should be always sent to the broker/router node and are *NOT* supported directly by the historical nodes. ### Query Data Source diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java index 8e598bee2d04..462179d0b156 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/ThreadPoolTaskRunner.java @@ -153,43 +153,29 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable QueryRunner getQueryRunnerImpl(final Query query) + private QueryRunner getQueryRunnerImpl(Query query) { - return new UnionQueryRunner<>( - Iterables.transform( - query.getDataSource().getNames(), new Function() - { - @Override - public QueryRunner apply(String queryDataSource) - { - QueryRunner queryRunner = null; + QueryRunner queryRunner = null; + final String queryDataSource = Iterables.getOnlyElement(query.getDataSource().getNames()); - for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) { - final Task task = taskRunnerWorkItem.getTask(); - if (task.getDataSource().equals(queryDataSource)) { - final QueryRunner taskQueryRunner = task.getQueryRunner(query); + for (final ThreadPoolTaskRunnerWorkItem taskRunnerWorkItem : ImmutableList.copyOf(runningItems)) { + final Task task = taskRunnerWorkItem.getTask(); + if (task.getDataSource().equals(queryDataSource)) { + final QueryRunner taskQueryRunner = task.getQueryRunner(query); - if (taskQueryRunner != null) { - if (queryRunner == null) { - queryRunner = taskQueryRunner; - } else { - log.makeAlert("Found too many query runners for datasource") - .addData("dataSource", queryDataSource) - .emit(); - } - } - } - } - if (queryRunner != null) { - return queryRunner; - } else { - return new NoopQueryRunner(); - } - } - } - ), conglomerate.findFactory(query).getToolchest() - ); + if (taskQueryRunner != null) { + if (queryRunner == null) { + queryRunner = taskQueryRunner; + } else { + log.makeAlert("Found too many query runners for datasource") + .addData("dataSource", queryDataSource) + .emit(); + } + } + } + } + return queryRunner == null ? new NoopQueryRunner() : queryRunner; } private static class ThreadPoolTaskRunnerWorkItem extends TaskRunnerWorkItem diff --git a/processing/src/main/java/io/druid/query/UnionQueryRunner.java b/processing/src/main/java/io/druid/query/UnionQueryRunner.java index cb7cf7ca1773..d731bdc8b29e 100644 --- a/processing/src/main/java/io/druid/query/UnionQueryRunner.java +++ b/processing/src/main/java/io/druid/query/UnionQueryRunner.java @@ -18,7 +18,7 @@ package io.druid.query; import com.google.common.base.Function; -import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; @@ -26,35 +26,34 @@ public class UnionQueryRunner implements QueryRunner { - private final Iterable baseRunners; + private final QueryRunner baseRunner; private final QueryToolChest> toolChest; public UnionQueryRunner( - Iterable baseRunners, + QueryRunner baseRunner, QueryToolChest> toolChest ) { - this.baseRunners = baseRunners; + this.baseRunner = baseRunner; this.toolChest = toolChest; } @Override public Sequence run(final Query query, final Map responseContext) { - if (Iterables.size(baseRunners) == 1) { - return Iterables.getOnlyElement(baseRunners).run(query, responseContext); - } else { + DataSource dataSource = query.getDataSource(); + if (dataSource instanceof UnionDataSource) { return toolChest.mergeSequencesUnordered( Sequences.simple( - Iterables.transform( - baseRunners, - new Function>() + Lists.transform( + ((UnionDataSource) dataSource).getDataSources(), + new Function>() { @Override - public Sequence apply(QueryRunner singleRunner) + public Sequence apply(DataSource singleSource) { - return singleRunner.run( - query, + return baseRunner.run( + query.withDataSource(singleSource), responseContext ); } @@ -62,6 +61,8 @@ public Sequence apply(QueryRunner singleRunner) ) ) ); + } else { + return baseRunner.run(query, responseContext); } } diff --git a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java index f6e088159b26..65c7dbea899f 100644 --- a/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java +++ b/processing/src/test/java/io/druid/query/QueryRunnerTestHelper.java @@ -253,20 +253,19 @@ public static Collection makeUnionQueryRunners( return Arrays.asList( new Object[][]{ { - makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId), unionDataSource) + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndex, segmentId)) }, { - makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex), unionDataSource) + makeUnionQueryRunner(factory, new QueryableIndexSegment(segmentId, mMappedTestIndex)) }, { makeUnionQueryRunner( factory, - new QueryableIndexSegment(segmentId, mergedRealtimeIndex), - unionDataSource + new QueryableIndexSegment(segmentId, mergedRealtimeIndex) ) }, { - makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId), unionDataSource) + makeUnionQueryRunner(factory, new IncrementalIndexSegment(rtIndexOffheap, segmentId)) } } ); @@ -341,28 +340,17 @@ public static > QueryRunner makeQueryRunner( } public static QueryRunner makeUnionQueryRunner( - final QueryRunnerFactory> factory, - final Segment adapter, - final DataSource unionDataSource + QueryRunnerFactory> factory, + Segment adapter ) { return new FinalizeResultsQueryRunner( factory.getToolchest().postMergeQueryDecoration( factory.getToolchest().mergeResults( new UnionQueryRunner( - Iterables.transform( - unionDataSource.getNames(), new Function() - { - @Nullable - @Override - public QueryRunner apply(@Nullable String input) - { - return new BySegmentQueryRunner( - segmentId, adapter.getDataInterval().getStart(), - factory.createRunner(adapter) - ); - } - } + new BySegmentQueryRunner( + segmentId, adapter.getDataInterval().getStart(), + factory.createRunner(adapter) ), factory.getToolchest() ) diff --git a/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java new file mode 100644 index 000000000000..0cb24e6124b1 --- /dev/null +++ b/processing/src/test/java/io/druid/query/UnionQueryRunnerTest.java @@ -0,0 +1,86 @@ +/* + * Druid - a distributed column store. + * Copyright 2012 - 2015 Metamarkets Group Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.druid.query; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import io.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import junit.framework.Assert; +import org.junit.Test; + +public class UnionQueryRunnerTest +{ + @Test + public void testUnionQueryRunner() + { + QueryRunner baseRunner = new QueryRunner() + { + @Override + public Sequence run(Query query, Map responseContext) + { + // verify that table datasource is passed to baseQueryRunner + Assert.assertTrue(query.getDataSource() instanceof TableDataSource); + String dsName = Iterables.getOnlyElement(query.getDataSource().getNames()); + if (dsName.equals("ds1")) { + responseContext.put("ds1", "ds1"); + return Sequences.simple(Arrays.asList(1, 2, 3)); + } else if (dsName.equals("ds2")) { + responseContext.put("ds2", "ds2"); + return Sequences.simple(Arrays.asList(4, 5, 6)); + } else { + throw new AssertionError("Unexpected DataSource"); + } + } + }; + UnionQueryRunner runner = new UnionQueryRunner( + baseRunner, + new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ) + ); + // Make a dummy query with Union datasource + Query q = Druids.newTimeseriesQueryBuilder() + .dataSource( + new UnionDataSource( + Arrays.asList( + new TableDataSource("ds1"), + new TableDataSource("ds2") + ) + ) + ) + .intervals("2014-01-01T00:00:00Z/2015-01-01T00:00:00Z") + .aggregators(QueryRunnerTestHelper.commonAggregators) + .build(); + Map responseContext = Maps.newHashMap(); + Sequence result = runner.run(q, responseContext); + List res = Sequences.toList(result, Lists.newArrayList()); + Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5, 6), res); + + // verify response context + Assert.assertEquals(2, responseContext.size()); + Assert.assertEquals("ds1", responseContext.get("ds1")); + Assert.assertEquals("ds2", responseContext.get("ds2")); + } + +} diff --git a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java index 9e8032caa5af..68afc07bda13 100644 --- a/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java +++ b/processing/src/test/java/io/druid/query/timeseries/TimeSeriesUnionQueryRunnerTest.java @@ -137,99 +137,84 @@ public void testUnionResultMerging() ) ) .build(); - QueryToolChest toolChest = new TimeseriesQueryQueryToolChest( - QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()); + QueryToolChest toolChest = new TimeseriesQueryQueryToolChest(QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()); QueryRunner mergingrunner = toolChest.mergeResults( new UnionQueryRunner>( - (Iterable) Arrays.asList( - new QueryRunner>() - { - @Override - public Sequence> run( - Query> query, - Map context - ) - { - return Sequences.simple( - Lists.newArrayList( - new Result<>( - new DateTime("2011-04-02"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 1L, - "idx", - 2L - ) - ) - ), - new Result<>( - new DateTime("2011-04-03"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 3L, - "idx", - 4L - ) - ) - ) - ) - ); - } - }, - new QueryRunner>() - { - - @Override - public Sequence> run( - Query> query, - Map context - ) - { - { - return Sequences.simple( - Lists.newArrayList( - new Result<>( - new DateTime("2011-04-01"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 5L, - "idx", - 6L - ) - + new QueryRunner>() + { + @Override + public Sequence> run(Query> query, + Map responseContext + ) + { + if (query.getDataSource().equals(new TableDataSource("ds1"))) { + return Sequences.simple( + Lists.newArrayList( + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 1L, + "idx", + 2L ) - ), - new Result<>( - new DateTime("2011-04-02"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 7L, - "idx", - 8L - ) + ) + ), + new Result( + new DateTime("2011-04-03"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 3L, + "idx", + 4L ) - ), - new Result<>( - new DateTime("2011-04-04"), - new TimeseriesResultValue( - ImmutableMap.of( - "rows", - 9L, - "idx", - 10L - ) + ) + ) + ) + ); + } else { + return Sequences.simple( + Lists.newArrayList( + new Result( + new DateTime("2011-04-01"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 5L, + "idx", + 6L + ) + ) + ), + new Result( + new DateTime("2011-04-02"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 7L, + "idx", + 8L + ) + ) + ), + new Result( + new DateTime("2011-04-04"), + new TimeseriesResultValue( + ImmutableMap.of( + "rows", + 9L, + "idx", + 10L ) ) ) - ); - } - } + ) + ); } - ), + } + }, toolChest ) ); diff --git a/server/src/main/java/io/druid/client/BrokerServerView.java b/server/src/main/java/io/druid/client/BrokerServerView.java index 9b788784f48f..706e7ee652d6 100644 --- a/server/src/main/java/io/druid/client/BrokerServerView.java +++ b/server/src/main/java/io/druid/client/BrokerServerView.java @@ -39,7 +39,6 @@ import io.druid.server.coordination.DruidServerMetadata; import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineLookup; -import io.druid.timeline.UnionTimeLineLookup; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; @@ -260,27 +259,11 @@ private void serverRemovedSegment(DruidServerMetadata server, DataSegment segmen @Override - public TimelineLookup getTimeline(DataSource dataSource) + public VersionedIntervalTimeline getTimeline(DataSource dataSource) { - final List tables = dataSource.getNames(); + String table = Iterables.getOnlyElement(dataSource.getNames()); synchronized (lock) { - if (tables.size() == 1) { - return timelines.get(tables.get(0)); - } else { - return new UnionTimeLineLookup<>( - Iterables.transform( - tables, new Function>() - { - - @Override - public TimelineLookup apply(String input) - { - return timelines.get(input); - } - } - ) - ); - } + return timelines.get(table); } } diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index 818aac2f519e..5566f14c387e 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -148,34 +148,23 @@ public QueryRunner getQueryRunnerForIntervals(final Query query, Itera public QueryRunner getQueryRunnerForSegments(final Query query, Iterable specs) { final QueryRunnerFactory> factory = conglomerate.findFactory(query); - final List names = query.getDataSource().getNames(); - return new UnionQueryRunner<>( - Iterables.transform( - names, new Function() - { - @Override - public QueryRunner apply(String input) - { - Iterable chiefsOfDataSource = chiefs.get(input); - return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( - factory.mergeRunners( - MoreExecutors.sameThreadExecutor(), - // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock - Iterables.transform( - chiefsOfDataSource, new Function>() - { - @Override - public QueryRunner apply(FireChief input) - { - return input.getQueryRunner(query); - } - } - ) - ) - ); - } - } - ), conglomerate.findFactory(query).getToolchest() + + Iterable chiefsOfDataSource = chiefs.get(Iterables.getOnlyElement(query.getDataSource().getNames())); + return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( + factory.mergeRunners( + MoreExecutors.sameThreadExecutor(), + // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock + Iterables.transform( + chiefsOfDataSource, new Function>() + { + @Override + public QueryRunner apply(FireChief input) + { + return input.getQueryRunner(query); + } + } + ) + ) ); } diff --git a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java index 4fced547feef..8af8b31dd2b2 100644 --- a/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java +++ b/server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java @@ -29,6 +29,7 @@ import io.druid.query.RetryQueryRunnerConfig; import io.druid.query.SegmentDescriptor; +import io.druid.query.UnionQueryRunner; import java.util.Map; import org.joda.time.Interval; @@ -82,19 +83,22 @@ private QueryRunner makeRunner(final Query query) final FinalizeResultsQueryRunner baseRunner = new FinalizeResultsQueryRunner( toolChest.postMergeQueryDecoration( toolChest.mergeResults( - toolChest.preMergeQueryDecoration( - new RetryQueryRunner( + new UnionQueryRunner( + toolChest.preMergeQueryDecoration( + new RetryQueryRunner( baseClient, toolChest, retryConfig, - objectMapper) + objectMapper + ) + ), + toolChest ) ) ), toolChest ); - final Map context = query.getContext(); PostProcessingOperator postProcessing = null; if (context != null) { diff --git a/server/src/main/java/io/druid/server/coordination/ServerManager.java b/server/src/main/java/io/druid/server/coordination/ServerManager.java index 982c65ea73db..0cbb504f6b38 100644 --- a/server/src/main/java/io/druid/server/coordination/ServerManager.java +++ b/server/src/main/java/io/druid/server/coordination/ServerManager.java @@ -51,6 +51,7 @@ import io.druid.query.ReferenceCountingSegmentQueryRunner; import io.druid.query.ReportTimelineMissingSegmentQueryRunner; import io.druid.query.SegmentDescriptor; +import io.druid.query.TableDataSource; import io.druid.query.spec.SpecificSegmentQueryRunner; import io.druid.query.spec.SpecificSegmentSpec; import io.druid.segment.ReferenceCountingSegment; @@ -60,7 +61,6 @@ import io.druid.timeline.DataSegment; import io.druid.timeline.TimelineLookup; import io.druid.timeline.TimelineObjectHolder; -import io.druid.timeline.UnionTimeLineLookup; import io.druid.timeline.VersionedIntervalTimeline; import io.druid.timeline.partition.PartitionChunk; import io.druid.timeline.partition.PartitionHolder; @@ -260,11 +260,12 @@ public QueryRunner getQueryRunnerForIntervals(Query query, Iterable timeline = getTimelineLookup(query.getDataSource()); + final VersionedIntervalTimeline timeline = dataSources.get(dataSourceName); if (timeline == null) { return new NoopQueryRunner(); @@ -334,26 +335,9 @@ public QueryRunner apply(PartitionChunk input) ); } - private TimelineLookup getTimelineLookup(DataSource dataSource) + private String getDataSourceName(DataSource dataSource) { - final List names = dataSource.getNames(); - if (names.size() == 1) { - return dataSources.get(names.get(0)); - } else { - return new UnionTimeLineLookup<>( - Iterables.transform( - names, new Function>() - { - - @Override - public TimelineLookup apply(String input) - { - return dataSources.get(input); - } - } - ) - ); - } + return Iterables.getOnlyElement(dataSource.getNames()); } @Override @@ -369,7 +353,11 @@ public QueryRunner getQueryRunnerForSegments(Query query, Iterable> toolChest = factory.getToolchest(); - final TimelineLookup timeline = getTimelineLookup(query.getDataSource()); + String dataSourceName = getDataSourceName(query.getDataSource()); + + final VersionedIntervalTimeline timeline = dataSources.get( + dataSourceName + ); if (timeline == null) { return new NoopQueryRunner();