diff --git a/docs/content/ingestion/batch-ingestion.md b/docs/content/ingestion/batch-ingestion.md index d26612699699..d0b9cc8f5bfc 100644 --- a/docs/content/ingestion/batch-ingestion.md +++ b/docs/content/ingestion/batch-ingestion.md @@ -207,6 +207,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |useCombiner|Boolean|Use hadoop combiner to merge rows at mapper if possible.|no (default == false)| |jobProperties|Object|a map of properties to add to the Hadoop job configuration.|no (default == null)| |buildV9Directly|Boolean|Whether to build v9 index directly instead of building v8 index and convert it to v9 format|no (default = false)| +|persistBackgroundCount|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage, but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)| ### Partitioning specification diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index 522511d0a773..fd92fbe02d8e 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import io.druid.indexer.partitions.HashedPartitionsSpec; import io.druid.indexer.partitions.PartitionsSpec; @@ -43,6 +44,7 @@ public class HadoopTuningConfig implements TuningConfig private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000; private static final boolean DEFAULT_USE_COMBINER = false; private static final Boolean DEFAULT_BUILD_V9_DIRECTLY = Boolean.FALSE; + private static final int DEFAULT_PERSIST_BACKGROUND_COUNT = 0; public static HadoopTuningConfig makeDefaultTuningConfig() { @@ -61,7 +63,8 @@ public static HadoopTuningConfig makeDefaultTuningConfig() false, false, null, - DEFAULT_BUILD_V9_DIRECTLY + DEFAULT_BUILD_V9_DIRECTLY, + DEFAULT_PERSIST_BACKGROUND_COUNT ); } @@ -79,6 +82,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() private final boolean combineText; private final boolean useCombiner; private final Boolean buildV9Directly; + private final int persistBackgroundCount; @JsonCreator public HadoopTuningConfig( @@ -97,7 +101,8 @@ public HadoopTuningConfig( final @JsonProperty("useCombiner") Boolean useCombiner, // See https://github.com/druid-io/druid/pull/1922 final @JsonProperty("rowFlushBoundary") Integer maxRowsInMemoryCOMPAT, - final @JsonProperty("buildV9Directly") Boolean buildV9Directly + final @JsonProperty("buildV9Directly") Boolean buildV9Directly, + final @JsonProperty("persistBackgroundCount") Integer persistBackgroundCount ) { this.workingPath = workingPath; @@ -116,6 +121,8 @@ public HadoopTuningConfig( this.combineText = combineText; this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue(); this.buildV9Directly = buildV9Directly == null ? DEFAULT_BUILD_V9_DIRECTLY : buildV9Directly; + this.persistBackgroundCount = persistBackgroundCount == null ? DEFAULT_PERSIST_BACKGROUND_COUNT : persistBackgroundCount; + Preconditions.checkArgument(this.persistBackgroundCount >= 0, "Not support persistBackgroundCount < 0"); } @JsonProperty @@ -201,6 +208,12 @@ public Boolean getBuildV9Directly() { return buildV9Directly; } + @JsonProperty + public int getPersistBackgroundCount() + { + return persistBackgroundCount; + } + public HadoopTuningConfig withWorkingPath(String path) { return new HadoopTuningConfig( @@ -218,7 +231,8 @@ public HadoopTuningConfig withWorkingPath(String path) combineText, useCombiner, null, - buildV9Directly + buildV9Directly, + persistBackgroundCount ); } @@ -239,7 +253,8 @@ public HadoopTuningConfig withVersion(String ver) combineText, useCombiner, null, - buildV9Directly + buildV9Directly, + persistBackgroundCount ); } @@ -260,7 +275,8 @@ public HadoopTuningConfig withShardSpecs(Map> s combineText, useCombiner, null, - buildV9Directly + buildV9Directly, + persistBackgroundCount ); } } diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index 0a651f733695..dbbf162610c1 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -29,9 +29,15 @@ import com.google.common.hash.HashFunction; import com.google.common.hash.Hashing; import com.google.common.primitives.Longs; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.logger.Logger; +import io.druid.common.guava.ThreadRenamingRunnable; +import io.druid.concurrent.Execs; import io.druid.data.input.InputRow; import io.druid.data.input.Row; import io.druid.data.input.Rows; @@ -73,6 +79,15 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** */ @@ -531,6 +546,8 @@ protected void reduce( final Interval interval = config.getGranularitySpec().bucketInterval(bucket.time).get(); + ListeningExecutorService persistExecutor = null; + List> persistFutures = Lists.newArrayList(); IncrementalIndex index = makeIncrementalIndex( bucket, combiningAggs, @@ -550,6 +567,35 @@ protected void reduce( Set allDimensionNames = Sets.newLinkedHashSet(); final ProgressIndicator progressIndicator = makeProgressIndicator(context); + int persistBackgroundCount = config.getSchema().getTuningConfig().getPersistBackgroundCount(); + if (persistBackgroundCount > 0) { + final BlockingQueue queue = new SynchronousQueue<>(); + ExecutorService executorService = new ThreadPoolExecutor( + persistBackgroundCount, + persistBackgroundCount, + 0L, + TimeUnit.MILLISECONDS, + queue, + Execs.makeThreadFactory("IndexGeneratorJob_persist_%d"), + new RejectedExecutionHandler() + { + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) + { + try { + executor.getQueue().put(r); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RejectedExecutionException("Got Interrupted while adding to the Queue"); + } + } + } + ); + persistExecutor = MoreExecutors.listeningDecorator(executorService); + } else { + persistExecutor = MoreExecutors.sameThreadExecutor(); + } for (final BytesWritable bw : values) { context.progress(); @@ -575,9 +621,29 @@ protected void reduce( toMerge.add(file); context.progress(); - persist(index, interval, file, progressIndicator); - // close this index and make a new one, reusing same buffer - index.close(); + final IncrementalIndex persistIndex = index; + persistFutures.add( + persistExecutor.submit( + new ThreadRenamingRunnable(String.format("%s-persist", file.getName())) + { + @Override + public void doRun() + { + try { + persist(persistIndex, interval, file, progressIndicator); + } + catch (Exception e) { + log.error("persist index error", e); + throw Throwables.propagate(e); + } + finally { + // close this index + persistIndex.close(); + } + } + } + ) + ); index = makeIncrementalIndex( bucket, @@ -611,6 +677,9 @@ protected void reduce( toMerge.add(finalFile); } + Futures.allAsList(persistFutures).get(1, TimeUnit.HOURS); + persistExecutor.shutdown(); + for (File file : toMerge) { indexes.add(HadoopDruidIndexerConfig.INDEX_IO.loadIndex(file)); } @@ -665,8 +734,17 @@ indexes, aggregators, new File(baseFlushFile, "merged"), progressIndicator FileUtils.deleteDirectory(file); } } + catch (ExecutionException e) { + throw Throwables.propagate(e); + } + catch (TimeoutException e) { + throw Throwables.propagate(e); + } finally { index.close(); + if (persistExecutor != null) { + persistExecutor.shutdownNow(); + } } } } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 6860d703446e..301220017916 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -382,6 +382,7 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Mapof(), actual.getJobProperties()); Assert.assertEquals(true, actual.isCombineText()); Assert.assertEquals(true, actual.getUseCombiner()); + Assert.assertEquals(0, actual.getPersistBackgroundCount()); } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index 42dbeddba39b..e1a5238d1392 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -502,7 +502,8 @@ public void setUp() throws Exception false, useCombiner, null, - buildV9Directly + buildV9Directly, + null ) ) ); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index 4116ea294642..024ef33742ae 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -116,6 +116,7 @@ public void setup() throws Exception false, false, null, + null, null ) ) diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java index 77e31e79fe10..aef6ddd5400a 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java @@ -120,7 +120,7 @@ public void testAddInputPath() throws Exception jsonMapper ), new HadoopIOConfig(null, null, null), - new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null) + new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null, null) ); granularityPathSpec.setDataGranularity(Granularity.HOUR); diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index 126a37b1bfa4..3c7e92df8973 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -202,6 +202,7 @@ public InputStream openStream() throws IOException false, false, null, + null, null ) )