diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java index 4388c705fb4f..f10e40b17f46 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/input/DruidInputSource.java @@ -65,6 +65,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -78,6 +79,31 @@ public class DruidInputSource extends AbstractInputSource implements SplittableI { private static final Logger LOG = new Logger(DruidInputSource.class); + /** + * A Comparator that orders {@link WindowedSegmentId} mainly by segmentId (which is important), and then by intervals + * (which is arbitrary, and only here for totality of ordering). + */ + private static final Comparator WINDOWED_SEGMENT_ID_COMPARATOR = + Comparator.comparing(WindowedSegmentId::getSegmentId) + .thenComparing(windowedSegmentId -> windowedSegmentId.getIntervals().size()) + .thenComparing( + (WindowedSegmentId a, WindowedSegmentId b) -> { + // Same segmentId, same intervals list size. Compare each interval. + int cmp = 0; + + for (int i = 0; i < a.getIntervals().size(); i++) { + cmp = Comparators.intervalsByStartThenEnd() + .compare(a.getIntervals().get(i), b.getIntervals().get(i)); + + if (cmp != 0) { + return cmp; + } + } + + return cmp; + } + ); + private final String dataSource; // Exactly one of interval and segmentIds should be non-null. Typically 'interval' is specified directly // by the user creating this firehose and 'segmentIds' is used for sub-tasks if it is split for parallel @@ -313,6 +339,8 @@ public static Iterator>> createSplits( //noinspection ConstantConditions return Iterators.transform( convertedSplitHintSpec.split( + // segmentIdToSize is sorted by segment ID; useful for grouping up segments from the same time chunk into + // the same input split. segmentIdToSize.keySet().iterator(), segmentId -> new InputFileAttribute( Preconditions.checkNotNull(segmentIdToSize.get(segmentId), "segment size for [%s]", segmentId) @@ -322,7 +350,10 @@ public static Iterator>> createSplits( ); } - private static Map createWindowedSegmentIdFromTimeline( + /** + * Returns a map of {@link WindowedSegmentId} to size, sorted by {@link WindowedSegmentId#getSegmentId()}. + */ + private static SortedMap createWindowedSegmentIdFromTimeline( List> timelineHolders ) { @@ -335,9 +366,9 @@ private static Map createWindowedSegmentIdFromTimeline( ).addInterval(holder.getInterval()); } } - // It is important to create this map after windowedSegmentIds is completely filled - // because WindowedSegmentId can be updated. - Map segmentSizeMap = new HashMap<>(); + // It is important to create this map after windowedSegmentIds is completely filled, because WindowedSegmentIds + // can be updated while being constructed. (Intervals are added.) + SortedMap segmentSizeMap = new TreeMap<>(WINDOWED_SEGMENT_ID_COMPARATOR); windowedSegmentIds.forEach((segment, segmentId) -> segmentSizeMap.put(segmentId, segment.getSize())); return segmentSizeMap; }