diff --git a/processing/src/main/java/io/druid/query/QueryToolChest.java b/processing/src/main/java/io/druid/query/QueryToolChest.java index e62348228fd5..ee9bcb9cad5f 100644 --- a/processing/src/main/java/io/druid/query/QueryToolChest.java +++ b/processing/src/main/java/io/druid/query/QueryToolChest.java @@ -21,11 +21,13 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; +import com.metamx.common.guava.Sequence; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.query.aggregation.MetricManipulationFn; import io.druid.timeline.LogicalSegment; import java.util.List; +import org.joda.time.Interval; /** * The broker-side (also used by server in some cases) API for a specific Query type. This API is still undergoing @@ -116,6 +118,22 @@ public CacheStrategy getCacheStrategy(QueryType qu { return null; } + + /** + * Creates a Function that can take in a ResultType and return a new sequence of ResultType having filled out + * time points where are absent in the query result between valid data intervals with zeros. + * + * This exists because a client wants to have a query result whose time continuum is without any holes. + * + * This function is called when merging sequences from caches or servers on the Broker. + * + * @param query The Query that is currently being processed + * @return A function that will fill out all metrics with zeros on absent data intervals. + */ + public Function> makeFillZerosFn(final QueryType query) + { + return null; + } /** * Wraps a QueryRunner. The input QueryRunner is the QueryRunner as it exists *before* being passed to diff --git a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java index 1a61c067c977..1e8bff37ff47 100644 --- a/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java +++ b/processing/src/main/java/io/druid/query/timeseries/TimeseriesQueryQueryToolChest.java @@ -25,6 +25,8 @@ import com.google.common.collect.Maps; import com.google.common.collect.Ordering; import com.google.inject.Inject; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; import com.metamx.common.guava.nary.BinaryFn; import com.metamx.emitter.service.ServiceMetricEvent; import io.druid.granularity.QueryGranularity; @@ -43,6 +45,7 @@ import io.druid.query.aggregation.PostAggregator; import io.druid.query.filter.DimFilter; import org.joda.time.DateTime; +import org.joda.time.Interval; import javax.annotation.Nullable; import java.nio.ByteBuffer; @@ -207,6 +210,47 @@ public Result apply(@Nullable Object input) }; } + @Override + public Function>> makeFillZerosFn(final TimeseriesQuery query) + { + if (query.isSkipEmptyBuckets()) + { + return null; + } + + return new Function>>() + { + @Override + public Sequence> apply(Interval interval) + { + final QueryGranularity granularity = query.getGranularity(); + final List aggs = query.getAggregatorSpecs(); + + return Sequences.map( + Sequences.simple(granularity.iterable(interval.getStartMillis(), interval.getEndMillis())), + new Function>() + { + @Override + public Result apply(Long timestamp) + { + Iterator aggsIter = aggs.iterator(); + Map retVal = Maps.newLinkedHashMap(); + + while (aggsIter.hasNext()) + { + final AggregatorFactory factory = aggsIter.next(); + retVal.put(factory.getName(), 0); + } + + return new Result( + granularity.toDateTime(timestamp), + new TimeseriesResultValue(retVal)); + } + }); + } + }; + } + @Override public QueryRunner> preMergeQueryDecoration(QueryRunner> runner) { diff --git a/server/src/main/java/io/druid/client/CachingClusteredClient.java b/server/src/main/java/io/druid/client/CachingClusteredClient.java index 4e962a2f7776..e1abbb2748b6 100644 --- a/server/src/main/java/io/druid/client/CachingClusteredClient.java +++ b/server/src/main/java/io/druid/client/CachingClusteredClient.java @@ -170,9 +170,9 @@ public Sequence run(final Query query, final Map responseC // Note that enabling this leads to putting uncovered intervals information in the response headers // and might blow up in some cases https://github.com/druid-io/druid/issues/2108 int uncoveredIntervalsLimit = BaseQuery.getContextUncoveredIntervalsLimit(query, 0); + final List uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit); if (uncoveredIntervalsLimit > 0) { - List uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit); boolean uncoveredIntervalsOverflowed = false; for (Interval interval : query.getIntervals()) { @@ -314,8 +314,9 @@ public Sequence get() ArrayList> sequencesByInterval = Lists.newArrayList(); addSequencesFromCache(sequencesByInterval); addSequencesFromServer(sequencesByInterval); + addSequencesFromUncoveredIntervals(sequencesByInterval); - return mergeCachedAndUncachedSequences(query, sequencesByInterval); + return mergeResultSequences(query, sequencesByInterval); } private void addSequencesFromCache(ArrayList> listOfSequences) @@ -523,11 +524,20 @@ public void onFailure(Throwable throwable) listOfSequences.add(resultSeqToAdd); } } + + private void addSequencesFromUncoveredIntervals(ArrayList> listOfSequences) + { + final Function> fillZerosFunction = toolChest.makeFillZerosFn(query); + if (fillZerosFunction != null) + { + listOfSequences.addAll(Lists.transform(uncoveredIntervals, fillZerosFunction)); + } + } }// End of Supplier ); } - protected Sequence mergeCachedAndUncachedSequences( + protected Sequence mergeResultSequences( Query query, List> sequencesByInterval ) diff --git a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java index 4f09b05cfcfc..83579751234f 100644 --- a/server/src/test/java/io/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/io/druid/client/CachingClusteredClientTest.java @@ -692,6 +692,119 @@ client, new TimeseriesQueryQueryToolChest( ); } + @Test + public void testTimeseriesZerofillingOnUncoveredIntervals() throws Exception + { + final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder() + .dataSource(DATA_SOURCE) + .intervals(SEG_SPEC) + .filters(DIM_FILTER) + .granularity(GRANULARITY) + .aggregators(AGGS) + .postAggregators(POST_AGGS) + .context(ImmutableMap.of("skipEmptyBuckets", "false" + ,"uncoveredIntervalsLimit", 10)); + + + final QueryRunner runner = new FinalizeResultsQueryRunner( + client, new TimeseriesQueryQueryToolChest( + QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator() + ) + ); + + timeline = new VersionedIntervalTimeline<>(Ordering.natural()); + + Map>> mockResults = Maps.newHashMap(); + mockResults.put(new Interval("2015-01-01/2015-01-03"), makeTimeResults( + new DateTime("2015-01-01T00"), 50, 5000, + new DateTime("2015-01-02T00"), 30, 6000 + )); + mockResults.put(new Interval("2015-01-06/2015-01-08"), makeTimeResults( + new DateTime("2015-01-06T00"), 50, 5000, + new DateTime("2015-01-07T00"), 30, 6000 + )); + mockResults.put(new Interval("2015-01-08/2015-01-11"), makeTimeResults( + new DateTime("2015-01-08T00"), 50, 5000, + new DateTime("2015-01-09T00"), 30, 6000, + new DateTime("2015-01-10T00"), 50, 5000 + )); + + final Iterable> exResults = makeTimeResults( + new DateTime("2014-12-29T00"), 0, 0, + new DateTime("2014-12-30T00"), 0, 0, + new DateTime("2014-12-31T00"), 0, 0, + new DateTime("2015-01-01T00"), 50, 5000, + new DateTime("2015-01-02T00"), 30, 6000, + new DateTime("2015-01-03T00"), 0, 0, + new DateTime("2015-01-04T00"), 0, 0, + new DateTime("2015-01-05T00"), 0, 0, + new DateTime("2015-01-06T00"), 50, 5000, + new DateTime("2015-01-07T00"), 30, 6000, + new DateTime("2015-01-08T00"), 50, 5000, + new DateTime("2015-01-09T00"), 30, 6000, + new DateTime("2015-01-10T00"), 50, 5000, + new DateTime("2015-01-11T00"), 0, 0, + new DateTime("2015-01-12T00"), 0, 0 + ); + + List mocks = Lists.newArrayList(); + mocks.add(serverView); + + int i = 0; + for (Map.Entry>> entry : mockResults.entrySet()) { + + ServerExpectation expectation = new ServerExpectation( + String.format("%s", entry.getKey()), + entry.getKey(), + makeMock(mocks, DataSegment.class), + entry.getValue() + ); + + DruidServer druidServer = servers[i++]; + DataSegment dataSegment = makeMock(mocks, DataSegment.class); + ServerSelector selector = new ServerSelector( + expectation.getSegment(), + new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()) + ); + selector.addServerAndUpdateSegment(new QueryableDruidServer(druidServer, null), selector.getSegment()); + timeline.add(expectation.getInterval(), "0", new SingleElementPartitionChunk(selector)); + + QueryRunner queryable = makeMock(mocks, QueryRunner.class); + EasyMock.expect(queryable.run(EasyMock.anyObject(Query.class), EasyMock.anyObject(Map.class))) + .andReturn(toQueryableTimeseriesResults( + true, + ImmutableList.of(expectation.getSegmentId()), + ImmutableList.of(expectation.getInterval()), + ImmutableList.>>of(expectation.getResults()))) + .once(); + + EasyMock.expect(serverView.getQueryRunner(druidServer)) + .andReturn(queryable) + .once(); + } + + final TimeseriesQuery query = builder.build(); + runWithMocks( + new Runnable() + { + @Override + public void run() + { + Map context = Maps.newHashMap(); + Sequence> acResults = runner.run( + query.withQuerySegmentSpec( + new MultipleIntervalSegmentSpec( + ImmutableList.of(new Interval("2014-12-29/2015-01-13")))), + context); + TestHelper.assertExpectedResults(exResults, acResults); + } + }, + mocks.toArray() + ); + + + } + @Test public void testDisableUseCache() throws Exception { @@ -925,7 +1038,7 @@ public void testOutOfOrderSequenceMerging() throws Exception new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983, new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983 ), - client.mergeCachedAndUncachedSequences( + client.mergeResultSequences( new TopNQueryBuilder() .dataSource("test") .intervals("2011-01-06/2011-01-10") @@ -1962,7 +2075,8 @@ public Result apply( List> retVal = Lists.newArrayListWithCapacity(objects.length / 3); for (int i = 0; i < objects.length; i += 3) { - double avg_impr = ((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue(); + double rows = ((Number) objects[i + 1]).doubleValue(); + double avg_impr = (rows == 0) ? 0 : ((Number) objects[i + 2]).doubleValue() / rows; retVal.add( new Result<>( (DateTime) objects[i], @@ -1972,8 +2086,8 @@ public Result apply( .put("imps", objects[i + 2]) .put("impers", objects[i + 2]) .put("avg_imps_per_row", avg_impr) - .put("avg_imps_per_row_half", avg_impr / 2) - .put("avg_imps_per_row_double", avg_impr * 2) + .put("avg_imps_per_row_half", avg_impr == 0 ? 0 : avg_impr / 2) + .put("avg_imps_per_row_double", avg_impr == 0 ? 0 : avg_impr * 2) .build() ) )