Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions processing/src/main/java/io/druid/query/QueryToolChest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.metamx.common.guava.Sequence;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.timeline.LogicalSegment;

import java.util.List;
import org.joda.time.Interval;

/**
* The broker-side (also used by server in some cases) API for a specific Query type. This API is still undergoing
Expand Down Expand Up @@ -116,6 +118,22 @@ public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType qu
{
return null;
}

/**
* Creates a Function that can take in a ResultType and return a new sequence of ResultType having filled out
* time points where are absent in the query result between valid data intervals with zeros.
*
* This exists because a client wants to have a query result whose time continuum is without any holes.
*
* This function is called when merging sequences from caches or servers on the Broker.
*
* @param query The Query that is currently being processed
* @return A function that will fill out all metrics with zeros on absent data intervals.
*/
public Function<Interval, Sequence<ResultType>> makeFillZerosFn(final QueryType query)
{
return null;
}

/**
* Wraps a QueryRunner. The input QueryRunner is the QueryRunner as it exists *before* being passed to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.granularity.QueryGranularity;
Expand All @@ -43,6 +45,7 @@
import io.druid.query.aggregation.PostAggregator;
import io.druid.query.filter.DimFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;

import javax.annotation.Nullable;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -207,6 +210,47 @@ public Result<TimeseriesResultValue> apply(@Nullable Object input)
};
}

@Override
public Function<Interval, Sequence<Result<TimeseriesResultValue>>> makeFillZerosFn(final TimeseriesQuery query)
{
if (query.isSkipEmptyBuckets())
{
return null;
}

return new Function<Interval, Sequence<Result<TimeseriesResultValue>>>()
{
@Override
public Sequence<Result<TimeseriesResultValue>> apply(Interval interval)
{
final QueryGranularity granularity = query.getGranularity();
final List<AggregatorFactory> aggs = query.getAggregatorSpecs();

return Sequences.map(
Sequences.simple(granularity.iterable(interval.getStartMillis(), interval.getEndMillis())),
new Function<Long, Result<TimeseriesResultValue>>()
{
@Override
public Result<TimeseriesResultValue> apply(Long timestamp)
{
Iterator<AggregatorFactory> aggsIter = aggs.iterator();
Map<String, Object> retVal = Maps.newLinkedHashMap();

while (aggsIter.hasNext())
{
final AggregatorFactory factory = aggsIter.next();
retVal.put(factory.getName(), 0);
}

return new Result<TimeseriesResultValue>(
granularity.toDateTime(timestamp),
new TimeseriesResultValue(retVal));
}
});
}
};
}

@Override
public QueryRunner<Result<TimeseriesResultValue>> preMergeQueryDecoration(QueryRunner<Result<TimeseriesResultValue>> runner)
{
Expand Down
16 changes: 13 additions & 3 deletions server/src/main/java/io/druid/client/CachingClusteredClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -170,9 +170,9 @@ public Sequence<T> run(final Query<T> query, final Map<String, Object> responseC
// Note that enabling this leads to putting uncovered intervals information in the response headers
// and might blow up in some cases https://github.com/druid-io/druid/issues/2108
int uncoveredIntervalsLimit = BaseQuery.getContextUncoveredIntervalsLimit(query, 0);
final List<Interval> uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit);

if (uncoveredIntervalsLimit > 0) {
List<Interval> uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit);
boolean uncoveredIntervalsOverflowed = false;

for (Interval interval : query.getIntervals()) {
Expand Down Expand Up @@ -314,8 +314,9 @@ public Sequence<T> get()
ArrayList<Sequence<T>> sequencesByInterval = Lists.newArrayList();
addSequencesFromCache(sequencesByInterval);
addSequencesFromServer(sequencesByInterval);
addSequencesFromUncoveredIntervals(sequencesByInterval);

return mergeCachedAndUncachedSequences(query, sequencesByInterval);
return mergeResultSequences(query, sequencesByInterval);
}

private void addSequencesFromCache(ArrayList<Sequence<T>> listOfSequences)
Expand Down Expand Up @@ -523,11 +524,20 @@ public void onFailure(Throwable throwable)
listOfSequences.add(resultSeqToAdd);
}
}

private void addSequencesFromUncoveredIntervals(ArrayList<Sequence<T>> listOfSequences)
{
final Function<Interval, Sequence<T>> fillZerosFunction = toolChest.makeFillZerosFn(query);
if (fillZerosFunction != null)
{
listOfSequences.addAll(Lists.transform(uncoveredIntervals, fillZerosFunction));
}
}
}// End of Supplier
);
}

protected Sequence<T> mergeCachedAndUncachedSequences(
protected Sequence<T> mergeResultSequences(
Query<T> query,
List<Sequence<T>> sequencesByInterval
)
Expand Down
122 changes: 118 additions & 4 deletions server/src/test/java/io/druid/client/CachingClusteredClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,119 @@ client, new TimeseriesQueryQueryToolChest(
);
}

@Test
public void testTimeseriesZerofillingOnUncoveredIntervals() throws Exception
{
final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource(DATA_SOURCE)
.intervals(SEG_SPEC)
.filters(DIM_FILTER)
.granularity(GRANULARITY)
.aggregators(AGGS)
.postAggregators(POST_AGGS)
.context(ImmutableMap.<String, Object>of("skipEmptyBuckets", "false"
,"uncoveredIntervalsLimit", 10));


final QueryRunner runner = new FinalizeResultsQueryRunner(
client, new TimeseriesQueryQueryToolChest(
QueryRunnerTestHelper.NoopIntervalChunkingQueryRunnerDecorator()
)
);

timeline = new VersionedIntervalTimeline<>(Ordering.natural());

Map<Interval, Iterable<Result<TimeseriesResultValue>>> mockResults = Maps.newHashMap();
mockResults.put(new Interval("2015-01-01/2015-01-03"), makeTimeResults(
new DateTime("2015-01-01T00"), 50, 5000,
new DateTime("2015-01-02T00"), 30, 6000
));
mockResults.put(new Interval("2015-01-06/2015-01-08"), makeTimeResults(
new DateTime("2015-01-06T00"), 50, 5000,
new DateTime("2015-01-07T00"), 30, 6000
));
mockResults.put(new Interval("2015-01-08/2015-01-11"), makeTimeResults(
new DateTime("2015-01-08T00"), 50, 5000,
new DateTime("2015-01-09T00"), 30, 6000,
new DateTime("2015-01-10T00"), 50, 5000
));

final Iterable<Result<TimeseriesResultValue>> exResults = makeTimeResults(
new DateTime("2014-12-29T00"), 0, 0,
new DateTime("2014-12-30T00"), 0, 0,
new DateTime("2014-12-31T00"), 0, 0,
new DateTime("2015-01-01T00"), 50, 5000,
new DateTime("2015-01-02T00"), 30, 6000,
new DateTime("2015-01-03T00"), 0, 0,
new DateTime("2015-01-04T00"), 0, 0,
new DateTime("2015-01-05T00"), 0, 0,
new DateTime("2015-01-06T00"), 50, 5000,
new DateTime("2015-01-07T00"), 30, 6000,
new DateTime("2015-01-08T00"), 50, 5000,
new DateTime("2015-01-09T00"), 30, 6000,
new DateTime("2015-01-10T00"), 50, 5000,
new DateTime("2015-01-11T00"), 0, 0,
new DateTime("2015-01-12T00"), 0, 0
);

List<Object> mocks = Lists.newArrayList();
mocks.add(serverView);

int i = 0;
for (Map.Entry<Interval, Iterable<Result<TimeseriesResultValue>>> entry : mockResults.entrySet()) {

ServerExpectation<TimeseriesResultValue> expectation = new ServerExpectation(
String.format("%s", entry.getKey()),
entry.getKey(),
makeMock(mocks, DataSegment.class),
entry.getValue()
);

DruidServer druidServer = servers[i++];
DataSegment dataSegment = makeMock(mocks, DataSegment.class);
ServerSelector selector = new ServerSelector(
expectation.getSegment(),
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
);
selector.addServerAndUpdateSegment(new QueryableDruidServer(druidServer, null), selector.getSegment());
timeline.add(expectation.getInterval(), "0", new SingleElementPartitionChunk<ServerSelector>(selector));

QueryRunner queryable = makeMock(mocks, QueryRunner.class);
EasyMock.expect(queryable.run(EasyMock.anyObject(Query.class), EasyMock.anyObject(Map.class)))
.andReturn(toQueryableTimeseriesResults(
true,
ImmutableList.<String>of(expectation.getSegmentId()),
ImmutableList.<Interval>of(expectation.getInterval()),
ImmutableList.<Iterable<Result<TimeseriesResultValue>>>of(expectation.getResults())))
.once();

EasyMock.expect(serverView.getQueryRunner(druidServer))
.andReturn(queryable)
.once();
}

final TimeseriesQuery query = builder.build();
runWithMocks(
new Runnable()
{
@Override
public void run()
{
Map<String, List> context = Maps.newHashMap();
Sequence<Result<TimeseriesResultValue>> acResults = runner.run(
query.withQuerySegmentSpec(
new MultipleIntervalSegmentSpec(
ImmutableList.of(new Interval("2014-12-29/2015-01-13")))),
context);
TestHelper.assertExpectedResults(exResults, acResults);
}
},
mocks.toArray()
);


}

@Test
public void testDisableUseCache() throws Exception
{
Expand Down Expand Up @@ -925,7 +1038,7 @@ public void testOutOfOrderSequenceMerging() throws Exception
new DateTime("2011-01-09"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983,
new DateTime("2011-01-09T01"), "a", 50, 4985, "b", 50, 4984, "c", 50, 4983
),
client.mergeCachedAndUncachedSequences(
client.mergeResultSequences(
new TopNQueryBuilder()
.dataSource("test")
.intervals("2011-01-06/2011-01-10")
Expand Down Expand Up @@ -1962,7 +2075,8 @@ public Result<TimeBoundaryResultValue> apply(

List<Result<TimeseriesResultValue>> retVal = Lists.newArrayListWithCapacity(objects.length / 3);
for (int i = 0; i < objects.length; i += 3) {
double avg_impr = ((Number) objects[i + 2]).doubleValue() / ((Number) objects[i + 1]).doubleValue();
double rows = ((Number) objects[i + 1]).doubleValue();
double avg_impr = (rows == 0) ? 0 : ((Number) objects[i + 2]).doubleValue() / rows;
retVal.add(
new Result<>(
(DateTime) objects[i],
Expand All @@ -1972,8 +2086,8 @@ public Result<TimeBoundaryResultValue> apply(
.put("imps", objects[i + 2])
.put("impers", objects[i + 2])
.put("avg_imps_per_row", avg_impr)
.put("avg_imps_per_row_half", avg_impr / 2)
.put("avg_imps_per_row_double", avg_impr * 2)
.put("avg_imps_per_row_half", avg_impr == 0 ? 0 : avg_impr / 2)
.put("avg_imps_per_row_double", avg_impr == 0 ? 0 : avg_impr * 2)
.build()
)
)
Expand Down