diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java index 4661b652eeda..a961a8143a08 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java @@ -189,6 +189,15 @@ File persist( @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory ) throws IOException; + File mergeSegmentFiles( + File[] segmentFiles, + boolean rollup, + AggregatorFactory[] metricAggs, + File outDir, + IndexSpec indexSpec, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + ) throws IOException; + File mergeQueryableIndex( List indexes, boolean rollup, diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index 135f8cb1266d..850b10ca0112 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -798,6 +798,40 @@ public File persist( ); } + @Override + public File mergeSegmentFiles( + File[] segmentFiles, + boolean rollup, + AggregatorFactory[] metricAggs, + File outDir, + IndexSpec indexSpec, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + ) throws IOException + { + List indexes = Lists.transform(Arrays.asList(segmentFiles), input -> { + try { + return indexIO.loadIndex(input); + } + catch (IOException e) { + throw new ISE(e, "load index %s failed!?", input.getAbsolutePath()); + } + }); + + try { + return mergeQueryableIndex(indexes, rollup, metricAggs, outDir, indexSpec, new BaseProgressIndicator(), segmentWriteOutMediumFactory); + } + finally { + for (QueryableIndex index : indexes) { + try { + index.close(); + } + catch (Exception e) { + log.warn(e, "failed to close index when merge"); + } + } + } + } + @Override public File mergeQueryableIndex( List indexes, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 9c837b088161..8df64bfd388d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -82,6 +82,7 @@ import javax.annotation.Nullable; import java.io.Closeable; import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; @@ -716,8 +717,20 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink closer.register(segmentAndCloseable.rhs); } - mergedFile = indexMerger.mergeQueryableIndex( - indexes, + File[] hydrantDirs = persistDir.listFiles( + new FilenameFilter() + { + @Override + public boolean accept(File dir, String fileName) + { + // To avoid reading and listing of "merged" dir + return !(Ints.tryParse(fileName) == null); + } + } + ); + + mergedFile = indexMerger.mergeSegmentFiles( + hydrantDirs, schema.getGranularitySpec().isRollup(), schema.getAggregators(), mergedTarget, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index b0453d90c388..d8eed2d3ba2e 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -430,8 +430,20 @@ public void doRun() closer.register(segmentAndCloseable.rhs); } - mergedFile = indexMerger.mergeQueryableIndex( - indexes, + File[] hydrantDirs = persistDir.listFiles( + new FilenameFilter() + { + @Override + public boolean accept(File dir, String fileName) + { + // To avoid reading and listing of "merged" dir + return !(Ints.tryParse(fileName) == null); + } + } + ); + + mergedFile = indexMerger.mergeSegmentFiles( + hydrantDirs, schema.getGranularitySpec().isRollup(), schema.getAggregators(), mergedTarget,