From 96b69668488f39a1e422ab9acb1dfc7868d5f835 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Sun, 2 Dec 2018 02:15:19 +0800 Subject: [PATCH 1/3] release mmap immediately after merge indexes --- .../org/apache/druid/segment/IndexMerger.java | 9 +++++ .../apache/druid/segment/IndexMergerV9.java | 36 +++++++++++++++++++ .../appenderator/AppenderatorImpl.java | 17 +++++++-- .../realtime/plumber/RealtimePlumber.java | 16 +++++++-- 4 files changed, 74 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java index 4661b652eeda..ff1298930a49 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java @@ -189,6 +189,15 @@ File persist( @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory ) throws IOException; + File mergeSegmentFiles( + File[] segmentFiles, + boolean rollup, + final AggregatorFactory[] metricAggs, + File outDir, + IndexSpec indexSpec, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + ) throws IOException; + File mergeQueryableIndex( List indexes, boolean rollup, diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index 135f8cb1266d..ce39023c0bd4 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -798,6 +799,41 @@ public File persist( ); } + @Override + public File mergeSegmentFiles( + File[] segmentFiles, + boolean rollup, + AggregatorFactory[] metricAggs, + File outDir, + IndexSpec indexSpec, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + ) throws IOException + { + List indexes = Lists.transform(Arrays.asList(segmentFiles), input -> { + try { + return indexIO.loadIndex(input); + } + catch (IOException e) { + Throwables.propagate(e); + } + return null; + }); + + try { + return mergeQueryableIndex(indexes, rollup, metricAggs, outDir, indexSpec, new BaseProgressIndicator(), segmentWriteOutMediumFactory); + } + finally { + for (QueryableIndex index : indexes) { + try { + index.close(); + } + catch (Exception e) { + log.warn(e, "failed to close index when merge"); + } + } + } + } + @Override public File mergeQueryableIndex( List indexes, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 9c837b088161..8df64bfd388d 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -82,6 +82,7 @@ import javax.annotation.Nullable; import java.io.Closeable; import java.io.File; +import java.io.FilenameFilter; import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; @@ -716,8 +717,20 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink closer.register(segmentAndCloseable.rhs); } - mergedFile = indexMerger.mergeQueryableIndex( - indexes, + File[] hydrantDirs = persistDir.listFiles( + new FilenameFilter() + { + @Override + public boolean accept(File dir, String fileName) + { + // To avoid reading and listing of "merged" dir + return !(Ints.tryParse(fileName) == null); + } + } + ); + + mergedFile = indexMerger.mergeSegmentFiles( + hydrantDirs, schema.getGranularitySpec().isRollup(), schema.getAggregators(), mergedTarget, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index b0453d90c388..d8eed2d3ba2e 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -430,8 +430,20 @@ public void doRun() closer.register(segmentAndCloseable.rhs); } - mergedFile = indexMerger.mergeQueryableIndex( - indexes, + File[] hydrantDirs = persistDir.listFiles( + new FilenameFilter() + { + @Override + public boolean accept(File dir, String fileName) + { + // To avoid reading and listing of "merged" dir + return !(Ints.tryParse(fileName) == null); + } + } + ); + + mergedFile = indexMerger.mergeSegmentFiles( + hydrantDirs, schema.getGranularitySpec().isRollup(), schema.getAggregators(), mergedTarget, From 9544e18264113acb90c63e8ec007b78d7be52f73 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Sun, 2 Dec 2018 02:43:16 +0800 Subject: [PATCH 2/3] fix style --- .../src/main/java/org/apache/druid/segment/IndexMerger.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java index ff1298930a49..a961a8143a08 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java @@ -192,7 +192,7 @@ File persist( File mergeSegmentFiles( File[] segmentFiles, boolean rollup, - final AggregatorFactory[] metricAggs, + AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory From d1185420f5e9228ad11fd8c5cab29fbda73d081a Mon Sep 17 00:00:00 2001 From: kaijianding Date: Tue, 4 Dec 2018 01:23:55 +0800 Subject: [PATCH 3/3] remove Throwables.propagate --- .../src/main/java/org/apache/druid/segment/IndexMergerV9.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index ce39023c0bd4..850b10ca0112 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -814,9 +813,8 @@ public File mergeSegmentFiles( return indexIO.loadIndex(input); } catch (IOException e) { - Throwables.propagate(e); + throw new ISE(e, "load index %s failed!?", input.getAbsolutePath()); } - return null; }); try {