From 1d1e31f64f57b115d90863c3b6fccc953447108d Mon Sep 17 00:00:00 2001 From: kaijianding Date: Tue, 23 May 2017 11:22:51 +0800 Subject: [PATCH 1/4] fix TestKafkaExtractionCluster fail due to port already used --- .../java/io/druid/query/lookup/TestKafkaExtractionCluster.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java index a419efdc6b6d..7d62277a9b3c 100644 --- a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java +++ b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java @@ -59,6 +59,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Random; /** * @@ -128,6 +129,7 @@ public void close() throws IOException serverProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath); serverProperties.put("zookeeper.session.timeout.ms", "10000"); serverProperties.put("zookeeper.sync.time.ms", "200"); + serverProperties.put("port", String.valueOf(new Random().nextInt(9999) + 10000)); kafkaConfig = new KafkaConfig(serverProperties); From 6077867421dc16d776a3ff1fdaa3a7c6182f45c4 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Tue, 30 May 2017 09:17:31 +0800 Subject: [PATCH 2/4] explicitly unmap hydrant files when abandonSegment to recyle mmap memory --- .../java/io/druid/segment/realtime/plumber/RealtimePlumber.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 161cc6a3905a..a1270e4020d0 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -431,6 +431,7 @@ public void doRun() mergedFile, sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) ); + index.close(); log.info("Inserting [%s] to the metadata store", sink.getSegment().getIdentifier()); segmentPublisher.publishSegment(segment); @@ -861,6 +862,7 @@ protected void abandonSegment(final long truncatedTime, final Sink sink) ); for (FireHydrant hydrant : sink) { cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); + hydrant.getSegment().close(); } synchronized (handoffCondition) { handoffCondition.notifyAll(); From f5ea52900800264c94c0d85da634455bf38783f0 Mon Sep 17 00:00:00 2001 From: kaijianding Date: Tue, 30 May 2017 16:08:33 +0800 Subject: [PATCH 3/4] address the comments --- .../lookup/TestKafkaExtractionCluster.java | 4 +- .../java/io/druid/segment/IndexMerger.java | 41 +++++++++++-------- .../realtime/plumber/RealtimePlumber.java | 6 +-- 3 files changed, 28 insertions(+), 23 deletions(-) diff --git a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java index 7d62277a9b3c..6b723ff5053d 100644 --- a/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java +++ b/extensions-core/kafka-extraction-namespace/src/test/java/io/druid/query/lookup/TestKafkaExtractionCluster.java @@ -59,7 +59,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; /** * @@ -129,7 +129,7 @@ public void close() throws IOException serverProperties.put("zookeeper.connect", zkTestServer.getConnectString() + zkKafkaPath); serverProperties.put("zookeeper.session.timeout.ms", "10000"); serverProperties.put("zookeeper.sync.time.ms", "200"); - serverProperties.put("port", String.valueOf(new Random().nextInt(9999) + 10000)); + serverProperties.put("port", String.valueOf(ThreadLocalRandom.current().nextInt(9999) + 10000)); kafkaConfig = new KafkaConfig(serverProperties); diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index bd271d599eeb..b0a23cf54d02 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -214,24 +214,8 @@ public File mergeQueryableIndex( ProgressIndicator progress ) throws IOException { - // We are materializing the list for performance reasons. Lists.transform - // only creates a "view" of the original list, meaning the function gets - // applied every time you access an element. - List indexAdapteres = Lists.newArrayList( - Iterables.transform( - indexes, - new Function() - { - @Override - public IndexableAdapter apply(final QueryableIndex input) - { - return new QueryableIndexIndexableAdapter(input); - } - } - ) - ); return merge( - indexAdapteres, + toIndexableAdapters(indexes), rollup, metricAggs, outDir, @@ -268,6 +252,24 @@ public Iterable apply(@Nullable IndexableAdapter input) ); } + private static List toIndexableAdapters(List indexes) + { + // We are materializing the list for performance reasons. Lists.transform + // only creates a "view" of the original list, meaning the function gets + // applied every time you access an element. + return Lists.transform( + indexes, + new Function() + { + @Override + public IndexableAdapter apply(final QueryableIndex input) + { + return new QueryableIndexIndexableAdapter(input); + } + } + ); + } + private static List getLongestSharedDimOrder(List indexes) { int maxSize = 0; @@ -303,6 +305,11 @@ private static List getLongestSharedDimOrder(List inde return ImmutableList.copyOf(orderingCandidate); } + public static List getMergedDimensionsFromQueryableIndexes(List indexes) + { + return getMergedDimensions(toIndexableAdapters(indexes)); + } + public static List getMergedDimensions(List indexes) { if (indexes.size() == 0) { diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index a1270e4020d0..1e1b3cb18092 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -42,10 +42,10 @@ import io.druid.concurrent.TaskThreadPriority; import io.druid.data.input.Committer; import io.druid.data.input.InputRow; -import io.druid.java.util.common.granularity.Granularity; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.concurrent.ScheduledExecutors; +import io.druid.java.util.common.granularity.Granularity; import io.druid.query.Query; import io.druid.query.QueryRunner; import io.druid.query.QueryRunnerFactoryConglomerate; @@ -424,14 +424,12 @@ public void doRun() metrics.incrementMergeCpuTime(VMUtils.safeGetThreadCpuTime() - mergeThreadCpuTime); metrics.incrementMergeTimeMillis(mergeStopwatch.elapsed(TimeUnit.MILLISECONDS)); - QueryableIndex index = indexIO.loadIndex(mergedFile); log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier()); DataSegment segment = dataSegmentPusher.push( mergedFile, - sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) + sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)) ); - index.close(); log.info("Inserting [%s] to the metadata store", sink.getSegment().getIdentifier()); segmentPublisher.publishSegment(segment); From e909687a6b5fa2c3e9ef0b94e9e3df84bf49139c Mon Sep 17 00:00:00 2001 From: kaijianding Date: Fri, 2 Jun 2017 04:41:01 +0800 Subject: [PATCH 4/4] apply to AppenderatorImpl --- .../java/io/druid/segment/IndexMerger.java | 22 ++++++++++--------- .../appenderator/AppenderatorImpl.java | 12 +++++++--- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index b0a23cf54d02..8c8e3be841b9 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -257,16 +257,18 @@ private static List toIndexableAdapters(List i // We are materializing the list for performance reasons. Lists.transform // only creates a "view" of the original list, meaning the function gets // applied every time you access an element. - return Lists.transform( - indexes, - new Function() - { - @Override - public IndexableAdapter apply(final QueryableIndex input) - { - return new QueryableIndexIndexableAdapter(input); - } - } + return Lists.newArrayList( + Iterables.transform( + indexes, + new Function() + { + @Override + public IndexableAdapter apply(final QueryableIndex input) + { + return new QueryableIndexIndexableAdapter(input); + } + } + ) ); } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 1c3e2794b94d..eab71a0d5b74 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -570,11 +570,9 @@ private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink tuningConfig.getIndexSpec() ); - QueryableIndex index = indexIO.loadIndex(mergedFile); - DataSegment segment = dataSegmentPusher.push( mergedFile, - sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) + sink.getSegment().withDimensions(IndexMerger.getMergedDimensionsFromQueryableIndexes(indexes)) ); objectMapper.writeValue(descriptorFile, segment); @@ -920,6 +918,14 @@ public Object apply(@Nullable Object input) if (cache != null) { cache.close(SinkQuerySegmentWalker.makeHydrantCacheIdentifier(hydrant)); } + try { + hydrant.getSegment().close(); + } + catch (IOException e) { + log.makeAlert(e, "Failed to explicitly close segment[%s]", schema.getDataSource()) + .addData("identifier", hydrant.getSegment().getIdentifier()) + .emit(); + } } if (removeOnDiskData) {