Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions processing/src/main/java/io/druid/query/BaseQuery.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ public static <T> boolean getContextFinalize(Query<T> query, boolean defaultValu
return parseBoolean(query, "finalize", defaultValue);
}

public static <T> int getContextUncoveredIntervalsLimit(Query<T> query, int defaultValue)
{
return parseInt(query, "uncoveredIntervalsLimit", defaultValue);
}

private static <T> int parseInt(Query<T> query, String key, int defaultValue)
{
Object val = query.getContextValue(key);
Expand Down
67 changes: 44 additions & 23 deletions server/src/main/java/io/druid/client/CachingClusteredClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -166,33 +166,54 @@ public Sequence<T> run(final Query<T> query, final Map<String, Object> responseC
Set<Pair<ServerSelector, SegmentDescriptor>> segments = Sets.newLinkedHashSet();

List<TimelineObjectHolder<String, ServerSelector>> serversLookup = Lists.newLinkedList();
List<Interval> uncoveredIntervals = Lists.newLinkedList();

for (Interval interval : query.getIntervals()) {
Iterable<TimelineObjectHolder<String, ServerSelector>> lookup = timeline.lookup(interval);
long startMillis = interval.getStartMillis();
long endMillis = interval.getEndMillis();
for (TimelineObjectHolder<String, ServerSelector> holder : lookup) {
Interval holderInterval = holder.getInterval();
long intervalStart = holderInterval.getStartMillis();
if (startMillis != intervalStart) {
uncoveredIntervals.add(new Interval(startMillis, intervalStart));

// Note that enabling this leads to putting uncovered intervals information in the response headers
// and might blow up in some cases https://github.com/druid-io/druid/issues/2108
int uncoveredIntervalsLimit = BaseQuery.getContextUncoveredIntervalsLimit(query, 0);

if (uncoveredIntervalsLimit > 0) {
List<Interval> uncoveredIntervals = Lists.newArrayListWithCapacity(uncoveredIntervalsLimit);
boolean uncoveredIntervalsOverflowed = false;

for (Interval interval : query.getIntervals()) {
Iterable<TimelineObjectHolder<String, ServerSelector>> lookup = timeline.lookup(interval);
long startMillis = interval.getStartMillis();
long endMillis = interval.getEndMillis();
for (TimelineObjectHolder<String, ServerSelector> holder : lookup) {
Interval holderInterval = holder.getInterval();
long intervalStart = holderInterval.getStartMillis();
if (!uncoveredIntervalsOverflowed && startMillis != intervalStart) {
if (uncoveredIntervalsLimit > uncoveredIntervals.size()) {
uncoveredIntervals.add(new Interval(startMillis, intervalStart));
} else {
uncoveredIntervalsOverflowed = true;
}
}
startMillis = holderInterval.getEndMillis();
serversLookup.add(holder);
}
startMillis = holderInterval.getEndMillis();
serversLookup.add(holder);
}

if (startMillis < endMillis) {
uncoveredIntervals.add(new Interval(startMillis, endMillis));
if (!uncoveredIntervalsOverflowed && startMillis < endMillis) {
if (uncoveredIntervalsLimit > uncoveredIntervals.size()) {
uncoveredIntervals.add(new Interval(startMillis, endMillis));
} else {
uncoveredIntervalsOverflowed = true;
}
}
}
}

if (!uncoveredIntervals.isEmpty()) {
// This returns intervals for which NO segment is present.
// Which is not necessarily an indication that the data doesn't exist or is
// incomplete. The data could exist and just not be loaded yet. In either
// case, though, this query will not include any data from the identified intervals.
responseContext.put("uncoveredIntervals", uncoveredIntervals);
if (!uncoveredIntervals.isEmpty()) {
// This returns intervals for which NO segment is present.
// Which is not necessarily an indication that the data doesn't exist or is
// incomplete. The data could exist and just not be loaded yet. In either
// case, though, this query will not include any data from the identified intervals.
responseContext.put("uncoveredIntervals", uncoveredIntervals);
responseContext.put("uncoveredIntervalsOverflowed", uncoveredIntervalsOverflowed);
}
} else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do tests hit both branches here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests in CachingClusteredClientTest and CachingClusteredClientFunctionalityTest together hit both paths.

for (Interval interval : query.getIntervals()) {
Iterables.addAll(serversLookup, timeline.lookup(interval));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is racy between the other timeline.lookup call and here. There should only be one timeline.lookup call. And if this passes tests then we probably need more concurrency testing for caching clustered client.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is only one call ... the code flows are different when uncoveredIntervals are to be populated and not populated. its simpler this way bcoz the code for uncoveredIntervals disabled (else on line 213) is simple and better on its own.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad, will look at it closer here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeline.lookup doesn't have a contract about always guaranteeing to return non-null but it does seem to. I'm adding a test for sanity validation of it in another PR.

}
}

// Let tool chest filter out unneeded segments
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package io.druid.client;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListeningExecutorService;
Expand Down Expand Up @@ -76,12 +77,14 @@ public void setUp() throws Exception
public void testUncoveredInterval() throws Exception {
addToTimeline(new Interval("2015-01-02/2015-01-03"), "1");
addToTimeline(new Interval("2015-01-04/2015-01-05"), "1");
addToTimeline(new Interval("2015-02-04/2015-02-05"), "1");

final Druids.TimeseriesQueryBuilder builder = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.intervals("2015-01-02/2015-01-03")
.granularity("day")
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")));
.aggregators(Arrays.<AggregatorFactory>asList(new CountAggregatorFactory("rows")))
.context(ImmutableMap.<String, Object>of("uncoveredIntervalsLimit", 3));

Map<String, Object> responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
Expand All @@ -90,45 +93,51 @@ public void testUncoveredInterval() throws Exception {
builder.intervals("2015-01-01/2015-01-03");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
assertUncovered(responseContext, "2015-01-01/2015-01-02");
assertUncovered(responseContext, false, "2015-01-01/2015-01-02");

builder.intervals("2015-01-01/2015-01-04");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
assertUncovered(responseContext, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04");
assertUncovered(responseContext, false, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04");

builder.intervals("2015-01-02/2015-01-04");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
assertUncovered(responseContext, "2015-01-03/2015-01-04");
assertUncovered(responseContext, false, "2015-01-03/2015-01-04");

builder.intervals("2015-01-01/2015-01-30");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
assertUncovered(responseContext, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-01-30");
assertUncovered(responseContext, false, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-01-30");

builder.intervals("2015-01-02/2015-01-30");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
assertUncovered(responseContext, "2015-01-03/2015-01-04", "2015-01-05/2015-01-30");
assertUncovered(responseContext, false, "2015-01-03/2015-01-04", "2015-01-05/2015-01-30");

builder.intervals("2015-01-04/2015-01-30");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
assertUncovered(responseContext, "2015-01-05/2015-01-30");
assertUncovered(responseContext, false, "2015-01-05/2015-01-30");

builder.intervals("2015-01-10/2015-01-30");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
assertUncovered(responseContext, "2015-01-10/2015-01-30");
assertUncovered(responseContext, false, "2015-01-10/2015-01-30");

builder.intervals("2015-01-01/2015-02-25");
responseContext = new HashMap<>();
client.run(builder.build(), responseContext);
assertUncovered(responseContext, true, "2015-01-01/2015-01-02", "2015-01-03/2015-01-04", "2015-01-05/2015-02-04");
}

private void assertUncovered(Map<String, Object> context, String... intervals) {
private void assertUncovered(Map<String, Object> context, boolean uncoveredIntervalsOverflowed, String... intervals) {
List<Interval> expectedList = Lists.newArrayListWithExpectedSize(intervals.length);
for (String interval : intervals) {
expectedList.add(new Interval(interval));
}
Assert.assertEquals((Object) expectedList, context.get("uncoveredIntervals"));
Assert.assertEquals(uncoveredIntervalsOverflowed, context.get("uncoveredIntervalsOverflowed"));
}

private void addToTimeline(Interval interval, String version) {
Expand Down