Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -78,6 +79,31 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI
{
private static final Logger LOG = new Logger(DruidInputSource.class);

/**
* A Comparator that orders {@link WindowedSegmentId} mainly by segmentId (which is important), and then by intervals
* (which is arbitrary, and only here for totality of ordering).
*/
private static final Comparator<WindowedSegmentId> WINDOWED_SEGMENT_ID_COMPARATOR =
Comparator.comparing(WindowedSegmentId::getSegmentId)
.thenComparing(windowedSegmentId -> windowedSegmentId.getIntervals().size())
.thenComparing(
(WindowedSegmentId a, WindowedSegmentId b) -> {
// Same segmentId, same intervals list size. Compare each interval.
int cmp = 0;

for (int i = 0; i < a.getIntervals().size(); i++) {
cmp = Comparators.intervalsByStartThenEnd()
.compare(a.getIntervals().get(i), b.getIntervals().get(i));

if (cmp != 0) {
return cmp;
}
}

return cmp;
}
);

private final String dataSource;
// Exactly one of interval and segmentIds should be non-null. Typically 'interval' is specified directly
// by the user creating this firehose and 'segmentIds' is used for sub-tasks if it is split for parallel
Expand Down Expand Up @@ -313,6 +339,8 @@ public static Iterator<InputSplit<List<WindowedSegmentId>>> createSplits(
//noinspection ConstantConditions
return Iterators.transform(
convertedSplitHintSpec.split(
// segmentIdToSize is sorted by segment ID; useful for grouping up segments from the same time chunk into
// the same input split.
segmentIdToSize.keySet().iterator(),
segmentId -> new InputFileAttribute(
Preconditions.checkNotNull(segmentIdToSize.get(segmentId), "segment size for [%s]", segmentId)
Expand All @@ -322,7 +350,10 @@ public static Iterator<InputSplit<List<WindowedSegmentId>>> createSplits(
);
}

private static Map<WindowedSegmentId, Long> createWindowedSegmentIdFromTimeline(
/**
* Returns a map of {@link WindowedSegmentId} to size, sorted by {@link WindowedSegmentId#getSegmentId()}.
*/
private static SortedMap<WindowedSegmentId, Long> createWindowedSegmentIdFromTimeline(
List<TimelineObjectHolder<String, DataSegment>> timelineHolders
)
{
Expand All @@ -335,9 +366,9 @@ private static Map<WindowedSegmentId, Long> createWindowedSegmentIdFromTimeline(
).addInterval(holder.getInterval());
}
}
// It is important to create this map after windowedSegmentIds is completely filled
// because WindowedSegmentId can be updated.
Map<WindowedSegmentId, Long> segmentSizeMap = new HashMap<>();
// It is important to create this map after windowedSegmentIds is completely filled, because WindowedSegmentIds
// can be updated while being constructed. (Intervals are added.)
SortedMap<WindowedSegmentId, Long> segmentSizeMap = new TreeMap<>(WINDOWED_SEGMENT_ID_COMPARATOR);
windowedSegmentIds.forEach((segment, segmentId) -> segmentSizeMap.put(segmentId, segment.getSize()));
return segmentSizeMap;
}
Expand Down