From b32ed89f8c59b8ac3f53235f00babf8896a2b070 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Sat, 5 Dec 2020 13:58:31 -0800 Subject: [PATCH] DruidInputSource: Sort segments by ID before grouping into splits. This is useful because it groups up segments for the same time chunk into the same splits, which in turn is useful because it minimizes the number of time chunks that each task will have to deal with. --- .../indexing/input/DruidInputSource.java | 39 +++++++++++++++++-- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 4388c705fb4f..f10e40b17f46 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -65,6 +65,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -78,6 +79,31 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI { private static final Logger LOG = new Logger(DruidInputSource.class); + /** + * A Comparator that orders {@link WindowedSegmentId} mainly by segmentId (which is important), and then by intervals + * (which is arbitrary, and only here for totality of ordering). + */ + private static final Comparator WINDOWED_SEGMENT_ID_COMPARATOR = + Comparator.comparing(WindowedSegmentId::getSegmentId) + .thenComparing(windowedSegmentId -> windowedSegmentId.getIntervals().size()) + .thenComparing( + (WindowedSegmentId a, WindowedSegmentId b) -> { + // Same segmentId, same intervals list size. Compare each interval. + int cmp = 0; + + for (int i = 0; i < a.getIntervals().size(); i++) { + cmp = Comparators.intervalsByStartThenEnd() + .compare(a.getIntervals().get(i), b.getIntervals().get(i)); + + if (cmp != 0) { + return cmp; + } + } + + return cmp; + } + ); + private final String dataSource; // Exactly one of interval and segmentIds should be non-null. Typically 'interval' is specified directly // by the user creating this firehose and 'segmentIds' is used for sub-tasks if it is split for parallel @@ -313,6 +339,8 @@ public static Iterator>> createSplits( //noinspection ConstantConditions return Iterators.transform( convertedSplitHintSpec.split( + // segmentIdToSize is sorted by segment ID; useful for grouping up segments from the same time chunk into + // the same input split. segmentIdToSize.keySet().iterator(), segmentId -> new InputFileAttribute( Preconditions.checkNotNull(segmentIdToSize.get(segmentId), "segment size for [%s]", segmentId) @@ -322,7 +350,10 @@ public static Iterator>> createSplits( ); } - private static Map createWindowedSegmentIdFromTimeline( + /** + * Returns a map of {@link WindowedSegmentId} to size, sorted by {@link WindowedSegmentId#getSegmentId()}. + */ + private static SortedMap createWindowedSegmentIdFromTimeline( List> timelineHolders ) { @@ -335,9 +366,9 @@ private static Map createWindowedSegmentIdFromTimeline( ).addInterval(holder.getInterval()); } } - // It is important to create this map after windowedSegmentIds is completely filled - // because WindowedSegmentId can be updated. - Map segmentSizeMap = new HashMap<>(); + // It is important to create this map after windowedSegmentIds is completely filled, because WindowedSegmentIds + // can be updated while being constructed. (Intervals are added.) + SortedMap segmentSizeMap = new TreeMap<>(WINDOWED_SEGMENT_ID_COMPARATOR); windowedSegmentIds.forEach((segment, segmentId) -> segmentSizeMap.put(segmentId, segment.getSize())); return segmentSizeMap; }