Granularity interval materialization#10742
Conversation
e866686 to
48b49a6
Compare
There was a problem hiding this comment.
I will cleanup the javadoc in next commit
There was a problem hiding this comment.
Contract indicates that inputIntervals must be left "as is".... will remove sorting in next commit
a1a334c to
9f81cf7
Compare
…iformGranularitySpec
… Travis forbidden method errors in IntervalsByGranularity
9f81cf7 to
86c3f54
Compare
| ArrayList<Interval> condensedIntervals = JodaUtils.condenseIntervals(() -> uniqueIntervals.iterator()); | ||
| intervalIterator = condensedIntervals.iterator(); | ||
| } else { | ||
| IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity); |
There was a problem hiding this comment.
Are we no longer condensing the intervals if segmentGranularity != null?
| private final Iterable<Interval> intervalIterable; | ||
| private final TreeSet<Interval> intervals; | ||
|
|
||
| public LookupIntervalBuckets(Iterable<Interval> intervalIterable) |
There was a problem hiding this comment.
Is this class basically something like a lazy load of the TreeSet?
There was a problem hiding this comment.
I added this class to more cleanly re-use the TreeSet logic for bucket extraction among the ArbitraryGranularitySpec and the UniformGranularitySpec. The code in this PR goal is to avoid materialization of intervals in the Overlord only. We will deal with materialization issues elsewhere later. Thus when code outside the Overlord needs to get a bucket for a particuar DateTime then the code will actually materialize all the intervals and stores them in a fast lookup data structure (like TreeSet) to return the bucket. In the UniformGranularitySpec, we could still iterate through all the intervals and find the bucket (since the intervals are sorted) but that would be linear in the number of the intervals for every call. This is not acceptable given the frequency of calling the method to find the bucket. We feel we can improve on this but we decided to divide the work and at least for now prevent the materialization of intervals in the overlord (see PR description also).
| return Optional.of(JodaUtils.condenseIntervals(setOptional.get())); | ||
| Iterable<Interval> bucketIntervals = schema.getDataSchema().getGranularitySpec().bucketIntervals(); | ||
| if (bucketIntervals.iterator().hasNext()) { | ||
| return Optional.of(JodaUtils.condenseIntervals(schema.getDataSchema().getGranularitySpec().bucketIntervals())); |
There was a problem hiding this comment.
nit: can reuse bucketIntervals variable here
…eated elements (see added unit tests that were broken before this change)
| IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity); | ||
| intervalIterator = intervalsByGranularity.granularityIntervalsIterator(); | ||
| // the following is calling a condense that does not materialize the intervals: | ||
| uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervalsByGranularity.granularityIntervalsIterator())); |
There was a problem hiding this comment.
doesn't addAll materialize all the intervals? Is materializing the condensed intervals fine as long as we are not materializing the original intervals?
There was a problem hiding this comment.
Yes, addAll is using the materialized intervals being returned by the condenseIntervals call. The assumption here is that the condensed intervals are much fewer than the bucket intervals. I think this assumption will hold in the majority of real cases. I think that in a rare corner case where the original intervals cannot be condensed then we still would have issues, but not in the majority of production cases.
There was a problem hiding this comment.
Yes,
addAllis using the materialized intervals being returned by thecondenseIntervalscall. The assumption here is that the condensed intervals are much fewer than the bucket intervals. I think this assumption will hold in the majority of real cases. I think that in a rare corner case where the original intervals cannot be condensed then we still would have issues, but not in the majority of production cases.
I agree but it doesn't seem hard to avoid materialization. How about adding a new method condensedIntervalsIterator() which returns an iterator that lazily computes condensed intervals? It could be something like this:
public static Iterator<Interval> condensedIntervalsIterator(Iterator<Interval> sortedIntervals)
{
if (!sortedIntervals.hasNext()) {
return Iterators.emptyIterator();
}
final PeekingIterator<Interval> peekingIterator = Iterators.peekingIterator(sortedIntervals);
return new Iterator<Interval>()
{
@Override
public boolean hasNext()
{
return peekingIterator.hasNext();
}
@Override
public Interval next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
Interval currInterval = peekingIterator.next();
while (peekingIterator.hasNext()) {
Interval next = peekingIterator.peek();
if (currInterval.abuts(next)) {
currInterval = new Interval(currInterval.getStart(), next.getEnd());
peekingIterator.next();
} else if (currInterval.overlaps(next)) {
DateTime nextEnd = next.getEnd();
DateTime currEnd = currInterval.getEnd();
currInterval = new Interval(
currInterval.getStart(),
nextEnd.isAfter(currEnd) ? nextEnd : currEnd
);
peekingIterator.next();
} else {
break;
}
}
return currInterval;
}
};
}Then, tryTimeChunkLock() doesn't have to materialize intervals at all after condensing.
protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
{
// The given intervals are first converted to align with segment granularity. This is because,
// when an overwriting task finds a version for a given input row, it expects the interval
// associated to each version to be equal or larger than the time bucket where the input row falls in.
// See ParallelIndexSupervisorTask.findVersion().
final Iterator<Interval> intervalIterator;
final Granularity segmentGranularity = getSegmentGranularity();
if (segmentGranularity == null) {
intervalIterator = JodaUtils.condenseIntervals(intervals).iterator();
} else {
IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
// the following is calling a condense that does not materialize the intervals:
intervalIterator = JodaUtils.condensedIntervalsIterator(intervalsByGranularity.granularityIntervalsIterator());
}
// Intervals are already condensed to avoid creating too many locks.
// Intervals are also sorted and thus it's safe to compare only the previous interval and current one for dedup.
Interval prev = null;
while (intervalIterator.hasNext()) {
final Interval cur = intervalIterator.next();
if (prev != null && cur.equals(prev)) {
continue;
}
prev = cur;
final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur));
if (lock == null) {
return false;
}
}
return true;
}There was a problem hiding this comment.
Hmm, on the second thought, I think the current code is good enough. However, please add some comment explaining why it is OK to materialize intervals here. I think your comment above would be nice.
There was a problem hiding this comment.
I like your suggestion so I followed it and will avoid materializing them.
| /** | ||
| * This method does not materialize the intervals represented by the | ||
| * sortedIntervals iterator. However, caller needs to insure that sortedIntervals | ||
| * is already sorted in ascending order. |
There was a problem hiding this comment.
Since Interval is not Comparable (it's not because there could be multiple ways to compare them), it would be nice to explicitly state the order should be Comparators.intervalsByStartThenEnd() to make it more clear.
There was a problem hiding this comment.
How about adding a sanity check in the method, so that ti can break if this expectation doesn't meet? Otherwise, it will be hard to debug if something goes wrong in the future.
| intervalSet.addAll(intervals); | ||
| this.sortedIntervals = new ArrayList<>(intervals.size()); | ||
| this.sortedIntervals.addAll(intervalSet); | ||
| this.sortedIntervals.sort(Comparators.intervalsByStartThenEnd()); |
There was a problem hiding this comment.
It seems that IntervalIterator would work only when sortedIntervals don't overlap each other because the intervals returned from IntervalIterator may not be sorted otherwise. Is this correct? Then, please add a sanity check after dedup to make sure there is no overlap.
There was a problem hiding this comment.
Great catch! I will add the sanity check
There was a problem hiding this comment.
@jihoonson @loquisgon How can we be sure that intervals don't overlap each?
| if (currentIterator.hasNext()) { | ||
|
|
||
| // drop all subsequent intervals that are the same as the previous... | ||
| while (previous != null && previous.equals(currentIterator.peek())) { |
There was a problem hiding this comment.
Does the iterator created from granularity.getIterable() ever return the same interval? It doesn't seem like so and this check seems unnecessary.
There was a problem hiding this comment.
I put comments in the code to illustrate when this can happen (also I have unit tests to catch this)
| if (sortedIntervals.isEmpty()) { | ||
| ite = Collections.emptyIterator(); | ||
| } else { | ||
| ite = new IntervalIterator(sortedIntervals); |
There was a problem hiding this comment.
Does IntervalIterator just flatten the nested iterators and concat them? If this is the case, I suggest to use FluentIterable.from(sortedIntervals).transformAndConcat(interval -> granularity.getIterable(interval)) instead because it's better to use a well-tested library than inventing another wheel.
| groupByJob.setOutputFormatClass(SequenceFileOutputFormat.class); | ||
| groupByJob.setPartitionerClass(DetermineHashedPartitionsPartitioner.class); | ||
| if (!config.getSegmentGranularIntervals().isPresent()) { | ||
| if (!config.getSegmentGranularIntervals().iterator().hasNext()) { |
There was a problem hiding this comment.
This code just wants to know whether inputIntervals are set or not. Suggest to add a new method hasInputIntervals() in GranularitySpec for better readability.
There was a problem hiding this comment.
Same comment for other places where it calls iterator.hasNext() for the same purpose.
There was a problem hiding this comment.
Rather than adding a new method I decided to use GranularitySpec::inputIntervals().isEmpty(). Done.
| IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity); | ||
| intervalIterator = intervalsByGranularity.granularityIntervalsIterator(); | ||
| // the following is calling a condense that does not materialize the intervals: | ||
| uniqueCondensedIntervals.addAll(JodaUtils.condenseIntervals(intervalsByGranularity.granularityIntervalsIterator())); |
There was a problem hiding this comment.
Yes,
addAllis using the materialized intervals being returned by thecondenseIntervalscall. The assumption here is that the condensed intervals are much fewer than the bucket intervals. I think this assumption will hold in the majority of real cases. I think that in a rare corner case where the original intervals cannot be condensed then we still would have issues, but not in the majority of production cases.
I agree but it doesn't seem hard to avoid materialization. How about adding a new method condensedIntervalsIterator() which returns an iterator that lazily computes condensed intervals? It could be something like this:
public static Iterator<Interval> condensedIntervalsIterator(Iterator<Interval> sortedIntervals)
{
if (!sortedIntervals.hasNext()) {
return Iterators.emptyIterator();
}
final PeekingIterator<Interval> peekingIterator = Iterators.peekingIterator(sortedIntervals);
return new Iterator<Interval>()
{
@Override
public boolean hasNext()
{
return peekingIterator.hasNext();
}
@Override
public Interval next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
Interval currInterval = peekingIterator.next();
while (peekingIterator.hasNext()) {
Interval next = peekingIterator.peek();
if (currInterval.abuts(next)) {
currInterval = new Interval(currInterval.getStart(), next.getEnd());
peekingIterator.next();
} else if (currInterval.overlaps(next)) {
DateTime nextEnd = next.getEnd();
DateTime currEnd = currInterval.getEnd();
currInterval = new Interval(
currInterval.getStart(),
nextEnd.isAfter(currEnd) ? nextEnd : currEnd
);
peekingIterator.next();
} else {
break;
}
}
return currInterval;
}
};
}Then, tryTimeChunkLock() doesn't have to materialize intervals at all after condensing.
protected boolean tryTimeChunkLock(TaskActionClient client, List<Interval> intervals) throws IOException
{
// The given intervals are first converted to align with segment granularity. This is because,
// when an overwriting task finds a version for a given input row, it expects the interval
// associated to each version to be equal or larger than the time bucket where the input row falls in.
// See ParallelIndexSupervisorTask.findVersion().
final Iterator<Interval> intervalIterator;
final Granularity segmentGranularity = getSegmentGranularity();
if (segmentGranularity == null) {
intervalIterator = JodaUtils.condenseIntervals(intervals).iterator();
} else {
IntervalsByGranularity intervalsByGranularity = new IntervalsByGranularity(intervals, segmentGranularity);
// the following is calling a condense that does not materialize the intervals:
intervalIterator = JodaUtils.condensedIntervalsIterator(intervalsByGranularity.granularityIntervalsIterator());
}
// Intervals are already condensed to avoid creating too many locks.
// Intervals are also sorted and thus it's safe to compare only the previous interval and current one for dedup.
Interval prev = null;
while (intervalIterator.hasNext()) {
final Interval cur = intervalIterator.next();
if (prev != null && cur.equals(prev)) {
continue;
}
prev = cur;
final TaskLock lock = client.submit(new TimeChunkLockTryAcquireAction(TaskLockType.EXCLUSIVE, cur));
if (lock == null) {
return false;
}
}
return true;
}|
|
||
| interval = maybeInterval.get(); | ||
| if (!bucketIntervals.get().contains(interval)) { | ||
| if (!Iterators.contains(bucketIntervals.iterator(), interval)) { |
There was a problem hiding this comment.
This method is called whenever subtasks need to allocate a new segment via the supervisor task. As a result, this code is never called in the Overlord. It might be better to do this check without materialization for less memory pressure, but will degrade ingestion performance. I suggest to keep the current behaviour of materializing all bucket intervals here because I'm not sure what is the best way to handle this yet. We need to think more about how to fix the OOM error in the task as a follow-up.
| import java.util.Iterator; | ||
| import java.util.TreeSet; | ||
|
|
||
| public class LookupIntervalBuckets |
There was a problem hiding this comment.
Please add some javadoc explaining what this class does and what is for.
| * @return Iterable of all time groups | ||
| */ | ||
| Optional<SortedSet<Interval>> bucketIntervals(); | ||
| Iterable<Interval> bucketIntervals(); |
There was a problem hiding this comment.
Suggest sortedBucketIntervals() to make it clear the results are sorted.
| ingestionSchema, | ||
| getContext(), | ||
| intervalToPartitions | ||
| toolbox, |
There was a problem hiding this comment.
nit: I think this PR is OK, but just FYI, it's usually recommended to not fix the code style unless you are modifying that area because it makes hard to find what are the real changes of the PR.
There was a problem hiding this comment.
Probably that was an automatic intelli-j change.
|
Forgot to mention one more thing. I think this PR won't fix all OOM errors in the Overlord and the next place where we should fix is likely |
… when element is null for consistency with other methods in this class (as well that null interval when condensing does not make sense)
jihoonson
left a comment
There was a problem hiding this comment.
LGTM. +1 after CI. @loquisgon thank you!
Description
The
UniformGranularitySpeccurrently materializes all intervals when it is constructed. This may cause OOM in the Overlord and other modules whenever the resulting interval list is very large. The changes here should avoid that materialization in all places in the overlord for theUniformGranularitySpec. There is still one method in the unform granularity spec,public Optional<Interval> bucketInterval(DateTime dt)that materializes all the intervals but care is taken that the intervals are not materialized unless that particular method is invoked. Since this method is only called outside the Overlord the OOM issues should be less drastic than they are today.Key changed/added classes in this PR
A public API change was made to the interface
GranularitySpec. Now the methodbucketIntervals()returns aIterable<Interval>rather than a materialized set of intervals as it used to return before this change. The other significant change was to avoid materialization of the intervals in the constructor of theUniformGranularitySpec.Most of the work is delegated to a new helper class
IntervalsByGranularitywhich takes a Collection of intervals and a Granularity in its constructor to build an Iterator avoiding the materialization of the intervals. This class also ensures that the intervals returned by the iterator are appropriately sorted. Then this class is used in the critical places were materialization was taken place. The classArbitraryGranularitySpecstill materializes all intervals so use the one with care.This PR has: