From 6ee2340f5ab1227d5e61776de1333e9fb29587a6 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Wed, 16 Dec 2020 12:34:30 -0800 Subject: [PATCH 1/6] Multiphase merge for IndexMergerV9 --- .../indexing/IndexMergeBenchmark.java | 3 +- .../druid/indexer/IndexGeneratorJob.java | 2 +- .../indexing/common/task/CompactionTask.java | 3 +- .../druid/indexing/common/task/IndexTask.java | 31 +- .../parallel/ParallelIndexSupervisorTask.java | 3 +- .../parallel/ParallelIndexTuningConfig.java | 10 +- .../parallel/PartialSegmentMergeTask.java | 3 +- .../ClientCompactionTaskQuerySerdeTest.java | 1 + .../common/task/CompactionTaskRunTest.java | 1 + .../common/task/CompactionTaskTest.java | 6 + .../common/task/IndexTaskSerdeTest.java | 18 +- .../indexing/common/task/IndexTaskTest.java | 12 +- .../indexing/common/task/TaskSerdeTest.java | 2 + ...stractParallelIndexSupervisorTaskTest.java | 2 + .../ParallelIndexSupervisorTaskKillTest.java | 1 + ...rallelIndexSupervisorTaskResourceTest.java | 1 + .../ParallelIndexSupervisorTaskSerdeTest.java | 1 + .../ParallelIndexSupervisorTaskTest.java | 1 + .../parallel/ParallelIndexTestingFactory.java | 3 +- .../ParallelIndexTuningConfigTest.java | 7 + .../SinglePhaseParallelIndexingTest.java | 1 + .../indexing/overlord/TaskLifecycleTest.java | 4 + .../org/apache/druid/segment/IndexMerger.java | 10 +- .../apache/druid/segment/IndexMergerV9.java | 140 +++++++- .../aggregation/AggregationTestHelper.java | 2 +- .../apache/druid/segment/EmptyIndexTest.java | 3 +- .../apache/druid/segment/IndexBuilder.java | 3 +- .../druid/segment/IndexMergerRollupTest.java | 2 +- .../druid/segment/IndexMergerTestBase.java | 328 ++++++++++++++++-- .../IndexMergerV9WithSpatialIndexTest.java | 3 +- .../druid/segment/SchemalessIndexTest.java | 11 +- .../org/apache/druid/segment/TestIndex.java | 3 +- .../filter/SpatialFilterBonusTest.java | 3 +- .../segment/filter/SpatialFilterTest.java | 3 +- .../segment/generator/SegmentGenerator.java | 3 +- .../appenderator/AppenderatorConfig.java | 5 + .../appenderator/AppenderatorImpl.java | 3 +- .../UnifiedIndexerAppenderatorsManager.java | 12 +- .../realtime/plumber/RealtimePlumber.java | 3 +- 39 files changed, 575 insertions(+), 78 deletions(-) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java index b677a966a1c2..ce34070cdbef 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java @@ -173,7 +173,8 @@ public void mergeV9(Blackhole blackhole) throws Exception schemaInfo.getAggsArray(), tmpFile, new IndexSpec(), - null + null, + -1 ); blackhole.consume(mergedFile); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index f3694710f6b9..2a2d7a6f2e58 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -609,7 +609,7 @@ protected File mergeQueryableIndex( { boolean rollup = config.getSchema().getDataSchema().getGranularitySpec().isRollup(); return HadoopDruidIndexerConfig.INDEX_MERGER_V9 - .mergeQueryableIndex(indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator, null); + .mergeQueryableIndex(indexes, rollup, aggs, file, config.getIndexSpec(), progressIndicator, null, -1); } @Override diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index e773b6f95d33..0fa006d96a23 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -243,7 +243,8 @@ static ParallelIndexTuningConfig getTuningConfig(TuningConfig tuningConfig) null, indexTuningConfig.isLogParseExceptions(), indexTuningConfig.getMaxParseExceptions(), - indexTuningConfig.getMaxSavedParseExceptions() + indexTuningConfig.getMaxSavedParseExceptions(), + indexTuningConfig.getMaxColumnsToMerge() ); } else { throw new ISE( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 69484b54f13a..5822664e04b6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -73,6 +73,7 @@ import org.apache.druid.java.util.common.guava.Comparators; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.apache.druid.segment.IndexMerger; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.incremental.AppendableIndexSpec; import org.apache.druid.segment.incremental.ParseExceptionHandler; @@ -1122,6 +1123,7 @@ public static class IndexTuningConfig implements AppenderatorConfig private final AppendableIndexSpec appendableIndexSpec; private final int maxRowsInMemory; private final long maxBytesInMemory; + private final int maxColumnsToMerge; // null if all partitionsSpec related params are null. see getDefaultPartitionsSpec() for details. @Nullable @@ -1210,7 +1212,8 @@ public IndexTuningConfig( SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge ) { this( @@ -1235,7 +1238,8 @@ public IndexTuningConfig( segmentWriteOutMediumFactory, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + maxColumnsToMerge ); Preconditions.checkArgument( @@ -1246,7 +1250,7 @@ public IndexTuningConfig( private IndexTuningConfig() { - this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); } private IndexTuningConfig( @@ -1264,7 +1268,8 @@ private IndexTuningConfig( @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, @Nullable Boolean logParseExceptions, @Nullable Integer maxParseExceptions, - @Nullable Integer maxSavedParseExceptions + @Nullable Integer maxSavedParseExceptions, + @Nullable Integer maxColumnsToMerge ) { this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; @@ -1272,6 +1277,9 @@ private IndexTuningConfig( // initializing this to 0, it will be lazily initialized to a value // @see #getMaxBytesInMemoryOrDefault() this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; + this.maxColumnsToMerge = maxColumnsToMerge == null + ? IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE + : maxColumnsToMerge; this.partitionsSpec = partitionsSpec; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? @@ -1320,7 +1328,8 @@ public IndexTuningConfig withBasePersistDirectory(File dir) segmentWriteOutMediumFactory, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + maxColumnsToMerge ); } @@ -1341,7 +1350,8 @@ public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) segmentWriteOutMediumFactory, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + maxColumnsToMerge ); } @@ -1442,6 +1452,12 @@ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() return segmentWriteOutMediumFactory; } + @Override + public int getMaxColumnsToMerge() + { + return maxColumnsToMerge; + } + @JsonProperty public boolean isLogParseExceptions() { @@ -1532,6 +1548,7 @@ public boolean equals(Object o) return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) && maxRowsInMemory == that.maxRowsInMemory && maxBytesInMemory == that.maxBytesInMemory && + maxColumnsToMerge == that.maxColumnsToMerge && maxPendingPersists == that.maxPendingPersists && forceGuaranteedRollup == that.forceGuaranteedRollup && reportParseExceptions == that.reportParseExceptions && @@ -1553,6 +1570,7 @@ public int hashCode() appendableIndexSpec, maxRowsInMemory, maxBytesInMemory, + maxColumnsToMerge, partitionsSpec, indexSpec, indexSpecForIntermediatePersists, @@ -1574,6 +1592,7 @@ public String toString() return "IndexTuningConfig{" + "maxRowsInMemory=" + maxRowsInMemory + ", maxBytesInMemory=" + maxBytesInMemory + + ", maxColumnsToMerge=" + maxColumnsToMerge + ", partitionsSpec=" + partitionsSpec + ", indexSpec=" + indexSpec + ", indexSpecForIntermediatePersists=" + indexSpecForIntermediatePersists + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 3d272b43813c..c9989e96d40f 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -991,7 +991,8 @@ private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningC tuningConfig.getSegmentWriteOutMediumFactory(), tuningConfig.isLogParseExceptions(), tuningConfig.getMaxParseExceptions(), - tuningConfig.getMaxSavedParseExceptions() + tuningConfig.getMaxSavedParseExceptions(), + tuningConfig.getMaxColumnsToMerge() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index ca7fdf13ffed..8e2cd8aa2653 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -99,6 +99,7 @@ public static ParallelIndexTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -131,7 +132,8 @@ public ParallelIndexTuningConfig( @JsonProperty("totalNumMergeTasks") @Nullable Integer totalNumMergeTasks, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge ) { super( @@ -155,7 +157,8 @@ public ParallelIndexTuningConfig( segmentWriteOutMediumFactory, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + maxColumnsToMerge ); if (maxNumSubTasks != null && maxNumConcurrentSubTasks != null) { @@ -276,7 +279,8 @@ public ParallelIndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpe getTotalNumMergeTasks(), isLogParseExceptions(), getMaxParseExceptions(), - getMaxSavedParseExceptions() + getMaxSavedParseExceptions(), + getMaxColumnsToMerge() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java index fad7b07f8314..8749a0ce419c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/PartialSegmentMergeTask.java @@ -339,7 +339,8 @@ private static Pair> mergeSegmentsInSamePartition( dataSchema.getAggregators(), outDir, tuningConfig.getIndexSpec(), - tuningConfig.getSegmentWriteOutMediumFactory() + tuningConfig.getSegmentWriteOutMediumFactory(), + tuningConfig.getMaxColumnsToMerge() ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index 0daffc66bbe8..a25ae37648ae 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -238,6 +238,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException 100, null, null, + null, null ) ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 1fce094d110f..b466f3546c47 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -298,6 +298,7 @@ public void testRunWithHashPartitioning() throws Exception null, null, null, + null, null ) ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 14c0840be4bd..2e056034b4be 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -334,6 +334,7 @@ private static ParallelIndexTuningConfig createTuningConfig() null, null, null, + null, null ); } @@ -462,6 +463,7 @@ public void testSerdeWithOldTuningConfigSuccessfullyDeserializeToNewOne() throws null, null, null, + null, null ), null, @@ -614,6 +616,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null, null, null, + null, null ); final List ingestionSpecs = CompactionTask.createIngestionSchema( @@ -682,6 +685,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm null, null, null, + null, null ); final List ingestionSpecs = CompactionTask.createIngestionSchema( @@ -750,6 +754,7 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment null, null, null, + null, null ); final List ingestionSpecs = CompactionTask.createIngestionSchema( @@ -1139,6 +1144,7 @@ private void assertIngestionSchema( null, null, null, + null, null ), expectedSegmentGranularity diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java index bf2e44428f29..a65a7c6cf448 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java @@ -82,7 +82,8 @@ public void testSerdeTuningConfigWithDynamicPartitionsSpec() throws IOException OffHeapMemorySegmentWriteOutMediumFactory.instance(), true, 10, - 100 + 100, + 1234 ); assertSerdeTuningConfig(tuningConfig); } @@ -116,7 +117,8 @@ public void testSerdeTuningConfigWithHashedPartitionsSpec() throws IOException OffHeapMemorySegmentWriteOutMediumFactory.instance(), true, 10, - 100 + 100, + null ); assertSerdeTuningConfig(tuningConfig); } @@ -150,7 +152,8 @@ public void testSerdeTuningConfigWithDeprecatedDynamicPartitionsSpec() throws IO OffHeapMemorySegmentWriteOutMediumFactory.instance(), true, 10, - 100 + 100, + null ); assertSerdeTuningConfig(tuningConfig); } @@ -184,7 +187,8 @@ public void testSerdeTuningConfigWithDeprecatedHashedPartitionsSpec() throws IOE OffHeapMemorySegmentWriteOutMediumFactory.instance(), true, 10, - 100 + 100, + 1234 ); assertSerdeTuningConfig(tuningConfig); } @@ -220,7 +224,8 @@ public void testForceGuaranteedRollupWithDynamicPartitionsSpec() OffHeapMemorySegmentWriteOutMediumFactory.instance(), true, 10, - 100 + 100, + null ); } @@ -255,7 +260,8 @@ public void testBestEffortRollupWithHashedPartitionsSpec() OffHeapMemorySegmentWriteOutMediumFactory.instance(), true, 10, - 100 + 100, + null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index a04e94b17e00..2baece1be254 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -1135,7 +1135,8 @@ public void testMultipleParseExceptionsSuccess() throws Exception null, true, 7, - 7 + 7, + null ); final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); @@ -1264,7 +1265,8 @@ public void testMultipleParseExceptionsFailure() throws Exception null, true, 2, - 5 + 5, + null ); final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); @@ -1385,7 +1387,8 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc null, true, 2, - 5 + 5, + null ); final TimestampSpec timestampSpec = new TimestampSpec("time", "auto", null); @@ -1830,7 +1833,8 @@ static IndexTuningConfig createTuningConfig( null, null, null, - 1 + 1, + null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index aa3d33bf2506..3facbb6d5022 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -267,6 +267,7 @@ public void testIndexTaskSerde() throws Exception null, null, null, + null, null ) ), @@ -348,6 +349,7 @@ public void testIndexTaskwithResourceSerde() throws Exception null, null, null, + null, null ) ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 45b0f383b110..4d638a9204a4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -167,6 +167,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase null, null, null, + null, null ); @@ -248,6 +249,7 @@ protected ParallelIndexTuningConfig newTuningConfig( null, null, null, + null, null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index fc193b0e1321..ee231b50d780 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -188,6 +188,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, null ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index be6b0a86737e..a4937774eebf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -437,6 +437,7 @@ private TestSupervisorTask newTask( null, null, null, + null, null ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index b4a712580062..70c34f9b94c8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -271,6 +271,7 @@ ParallelIndexIngestionSpec build() null, null, null, + null, null ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 60882358cb1f..46663c6fba12 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -224,6 +224,7 @@ public void testFailToConstructWhenBothAppendToExistingAndForceGuaranteedRollupA null, false, null, + null, null ); final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java index c2f712320b91..aacb6b0111fb 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java @@ -182,7 +182,8 @@ ParallelIndexTuningConfig build() 22, logParseExceptions, maxParseExceptions, - 25 + 25, + null ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java index a826ae9cd42d..cf862ff6a9e7 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java @@ -99,6 +99,7 @@ public void testSerdeWithMaxRowsPerSegment() null, false, null, + null, null ); final byte[] json = mapper.writeValueAsBytes(tuningConfig); @@ -142,6 +143,7 @@ public void testSerdeWithMaxNumConcurrentSubTasks() throws IOException null, false, null, + null, null ); final byte[] json = mapper.writeValueAsBytes(tuningConfig); @@ -185,6 +187,7 @@ public void testSerdeWithMaxNumSubTasks() throws IOException null, false, null, + null, null ); final byte[] json = mapper.writeValueAsBytes(tuningConfig); @@ -230,6 +233,7 @@ public void testSerdeWithMaxNumSubTasksAndMaxNumConcurrentSubTasks() null, false, null, + null, null ); } @@ -272,6 +276,7 @@ public void testConstructorWithHashedPartitionsSpecAndNonForceGuaranteedRollupFa null, false, null, + null, null ); } @@ -314,6 +319,7 @@ public void testConstructorWithSingleDimensionPartitionsSpecAndNonForceGuarantee null, false, null, + null, null ); } @@ -356,6 +362,7 @@ public void testConstructorWithDynamicPartitionsSpecAndForceGuaranteedRollupFail null, false, null, + null, null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index e444f8091839..d27c19bc1e19 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -362,6 +362,7 @@ public void testWith1MaxNumConcurrentSubTasks() null, null, null, + null, null ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 48f75d7a82c3..d48520ad3bb8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -764,6 +764,7 @@ public void testIndexTask() throws Exception null, null, null, + null, null ) ), @@ -845,6 +846,7 @@ public void testIndexTaskFailure() throws Exception null, null, null, + null, null ) ), @@ -1271,6 +1273,7 @@ public void testResumeTasks() throws Exception null, null, null, + null, null ) ), @@ -1379,6 +1382,7 @@ public void testUnifiedAppenderatorsManagerCleanup() throws Exception null, null, null, + null, null ) ), diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java index 1b0079089403..f9fba3fc554e 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMerger.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMerger.java @@ -63,6 +63,7 @@ public interface IndexMerger SerializerUtils SERIALIZER_UTILS = new SerializerUtils(); int INVALID_ROW = -1; + int UNLIMITED_MAX_COLUMNS_TO_MERGE = -1; static List getMergedDimensionsFromQueryableIndexes(List indexes) { @@ -196,7 +197,8 @@ File mergeQueryableIndex( AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + int maxColumnsToMerge ) throws IOException; File mergeQueryableIndex( @@ -206,7 +208,8 @@ File mergeQueryableIndex( File outDir, IndexSpec indexSpec, ProgressIndicator progress, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + int maxColumnsToMerge ) throws IOException; @VisibleForTesting @@ -215,7 +218,8 @@ File merge( boolean rollup, AggregatorFactory[] metricAggs, File outDir, - IndexSpec indexSpec + IndexSpec indexSpec, + int maxColumnsToMerge ) throws IOException; // Faster than IndexMaker diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index caf9f3c3f76d..c76a9d0812a0 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -28,6 +28,7 @@ import com.google.common.primitives.Ints; import com.google.inject.Inject; import org.apache.druid.common.config.NullHandling; +import org.apache.druid.common.utils.UUIDUtils; import org.apache.druid.io.ZeroCopyByteArrayOutputStream; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.FileUtils; @@ -861,7 +862,7 @@ public File persist( org.apache.commons.io.FileUtils.forceMkdir(outDir); log.debug("Starting persist for interval[%s], rows[%,d]", dataInterval, index.size()); - return merge( + return multiphaseMerge( Collections.singletonList( new IncrementalIndexAdapter( dataInterval, @@ -878,7 +879,8 @@ public File persist( outDir, indexSpec, progress, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + -1 ); } @@ -889,7 +891,8 @@ public File mergeQueryableIndex( final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + int maxColumnsToMerge ) throws IOException { return mergeQueryableIndex( @@ -899,7 +902,8 @@ public File mergeQueryableIndex( outDir, indexSpec, new BaseProgressIndicator(), - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + maxColumnsToMerge ); } @@ -911,17 +915,19 @@ public File mergeQueryableIndex( File outDir, IndexSpec indexSpec, ProgressIndicator progress, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + int maxColumnsToMerge ) throws IOException { - return merge( + return multiphaseMerge( IndexMerger.toIndexableAdapters(indexes), rollup, metricAggs, outDir, indexSpec, progress, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + maxColumnsToMerge ); } @@ -931,25 +937,137 @@ public File merge( boolean rollup, final AggregatorFactory[] metricAggs, File outDir, - IndexSpec indexSpec + IndexSpec indexSpec, + int maxColumnsToMerge ) throws IOException { - return merge(indexes, rollup, metricAggs, outDir, indexSpec, new BaseProgressIndicator(), null); + return multiphaseMerge(indexes, rollup, metricAggs, outDir, indexSpec, new BaseProgressIndicator(), null, maxColumnsToMerge); } - private File merge( + private File multiphaseMerge( List indexes, final boolean rollup, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, ProgressIndicator progress, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + int maxColumnsToMerge ) throws IOException { FileUtils.deleteDirectory(outDir); org.apache.commons.io.FileUtils.forceMkdir(outDir); + if (maxColumnsToMerge == IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE) { + return merge( + indexes, + rollup, + metricAggs, + outDir, + indexSpec, + progress, + segmentWriteOutMediumFactory + ); + } + + List> currentPhases = getMergePhases(indexes, maxColumnsToMerge); + List currentOutputs = new ArrayList<>(); + + while (true) { + for (List phase : currentPhases) { + File phaseOutDir; + if (currentPhases.size() == 1) { + // use the given outDir on the final merge phase + phaseOutDir = outDir; + } else { + phaseOutDir = java.nio.file.Files.createTempDirectory(outDir.toPath(), UUIDUtils.generateUuid()).toFile(); + } + File phaseOutput = merge( + phase, + rollup, + metricAggs, + phaseOutDir, + indexSpec, + progress, + segmentWriteOutMediumFactory + ); + currentOutputs.add(phaseOutput); + } + if (currentOutputs.size() == 1) { + // we're done, making a single File output + return currentOutputs.get(0); + } else { + // convert Files to QueryableIndexIndexableAdapter and do another merge phase + List qIndexAdapters = new ArrayList<>(); + for (File outputFile : currentOutputs) { + QueryableIndex qIndex = indexIO.loadIndex(outputFile, true); + qIndexAdapters.add(new QueryableIndexIndexableAdapter(qIndex)); + } + currentPhases = getMergePhases(qIndexAdapters, maxColumnsToMerge); + currentOutputs = new ArrayList<>(); + } + } + } + + private List> getMergePhases(List indexes, int maxColumnsToMerge) + { + List> toMerge = new ArrayList<>(); + // always merge at least two segments regardless of column limit + if (indexes.size() <= 2) { + if (getIndexColumnCount(indexes) > maxColumnsToMerge) { + log.warn("index pair has more columns than maxColumnsToMerge [%d].", maxColumnsToMerge); + } + toMerge.add(indexes); + } else { + List currentPhase = new ArrayList<>(); + int currentColumnCount = 0; + for (IndexableAdapter index : indexes) { + int indexColumnCount = getIndexColumnCount(index); + if (indexColumnCount > maxColumnsToMerge) { + log.warn("index has more columns [%d] than maxColumnsToMerge [%d]!", indexColumnCount, maxColumnsToMerge); + } + + // always merge at least two segments regardless of column limit + if (currentPhase.size() > 1 && currentColumnCount + indexColumnCount > maxColumnsToMerge) { + toMerge.add(currentPhase); + currentPhase = new ArrayList<>(); + currentColumnCount = indexColumnCount; + currentPhase.add(index); + } else { + currentPhase.add(index); + currentColumnCount += indexColumnCount; + } + } + toMerge.add(currentPhase); + } + return toMerge; + } + + private int getIndexColumnCount(IndexableAdapter indexableAdapter) + { + // +1 for the __time column + return 1 + indexableAdapter.getDimensionNames().size() + indexableAdapter.getMetricNames().size(); + } + + private int getIndexColumnCount(List indexableAdapters) + { + int count = 0; + for (IndexableAdapter indexableAdapter : indexableAdapters) { + count += getIndexColumnCount(indexableAdapter); + } + return count; + } + + private File merge( + List indexes, + final boolean rollup, + final AggregatorFactory[] metricAggs, + File outDir, + IndexSpec indexSpec, + ProgressIndicator progress, + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + ) throws IOException + { final List mergedDimensions = IndexMerger.getMergedDimensions(indexes); final List mergedMetrics = IndexMerger.mergeIndexed( diff --git a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java index 5417f458a344..c59933343f31 100644 --- a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java @@ -532,7 +532,7 @@ public void createIndex( for (File file : toMerge) { indexes.add(indexIO.loadIndex(file)); } - indexMerger.mergeQueryableIndex(indexes, rollup, metrics, outDir, new IndexSpec(), null); + indexMerger.mergeQueryableIndex(indexes, rollup, metrics, outDir, new IndexSpec(), null, -1); for (QueryableIndex qi : indexes) { qi.close(); diff --git a/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java b/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java index e9e8a17e6b21..caeab9610ef7 100644 --- a/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java +++ b/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java @@ -87,7 +87,8 @@ public void testEmptyIndex() throws Exception true, new AggregatorFactory[0], tmpDir, - new IndexSpec() + new IndexSpec(), + -1 ); QueryableIndex emptyQueryableIndex = TestHelper.getTestIndexIO().loadIndex(tmpDir); diff --git a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java index 5813f6d298ec..2ed65fd443dd 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java @@ -176,7 +176,8 @@ public QueryableIndex buildMMappedMergedIndex() AggregatorFactory.class ), new File(tmpDir, StringUtils.format("testIndex-%s", UUID.randomUUID())), - indexSpec + indexSpec, + -1 ) ); for (QueryableIndex index : persisted) { diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java index 547abcbcfca8..3cd0e8620ced 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerRollupTest.java @@ -95,7 +95,7 @@ private void testStringFirstLastRollup( } File indexFile = indexMerger - .mergeQueryableIndex(indexes, true, aggregatorFactories, tempDir, indexSpec, null); + .mergeQueryableIndex(indexes, true, aggregatorFactories, tempDir, indexSpec, null, -1); try (QueryableIndex mergedIndex = indexIO.loadIndex(indexFile)) { Assert.assertEquals("Number of rows should be 1", 1, mergedIndex.getNumRows()); } diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java index 86e8503f997f..b16429f20ca5 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java @@ -322,7 +322,8 @@ public void testPersistMerge() throws Exception mergedAggregators, mergedDir, indexSpec, - null + null, + -1 ) ) ); @@ -387,7 +388,8 @@ public void testPersistEmptyColumn() throws Exception new AggregatorFactory[]{}, tmpDir3, indexSpec, - null + null, + -1 ) ) ); @@ -444,7 +446,8 @@ public void testMergeRetainsValues() throws Exception new AggregatorFactory[]{new CountAggregatorFactory("count")}, mergedDir, indexSpec, - null + null, + -1 ) ) ); @@ -500,7 +503,8 @@ public void testAppendRetainsValues() throws Exception mergedAggregators, mergedDir, indexSpec, - null + null, + -1 ) ) ); @@ -571,7 +575,8 @@ public void testMergeSpecChange() throws Exception mergedAggregators, mergedDir, newSpec, - null + null, + -1 ) ) ); @@ -784,7 +789,8 @@ public void testNonLexicographicDimOrderMerge() throws Exception new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged, indexSpec, - null + null, + -1 ) ) ); @@ -877,7 +883,8 @@ public void testMergeWithDimensionsList() throws Exception new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged, indexSpec, - null + null, + -1 ) ) ); @@ -949,7 +956,8 @@ public void testDisjointDimMerge() throws Exception new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged, indexSpec, - null + null, + -1 ) ) ); @@ -1078,7 +1086,8 @@ public void testJointDimMerge() throws Exception new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged, indexSpec, - null + null, + -1 ) ) ); @@ -1251,7 +1260,8 @@ public void testNoRollupMergeWithDuplicateRow() throws Exception new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged, indexSpec, - null + null, + -1 ) ) ); @@ -1381,7 +1391,8 @@ public void testMergeWithSupersetOrdering() throws Exception new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged, indexSpec, - null + null, + -1 ) ) ); @@ -1394,7 +1405,8 @@ public void testMergeWithSupersetOrdering() throws Exception new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged2, indexSpec, - null + null, + -1 ) ) ); @@ -1524,7 +1536,8 @@ public void testMismatchedDimensions() throws IOException new LongSumAggregatorFactory("C", "C"), }, tmpDirMerged, - indexSpec + indexSpec, + -1 ); } @@ -1573,7 +1586,8 @@ public void testAddMetrics() throws IOException true, new AggregatorFactory[]{new LongSumAggregatorFactory("A", "A"), new LongSumAggregatorFactory("C", "C")}, tmpDirMerged, - indexSpec + indexSpec, + -1 ); final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(closer.closeLater(indexIO.loadIndex( merged))); @@ -1643,7 +1657,8 @@ public void testAddMetricsBothSidesNull() throws IOException new LongSumAggregatorFactory("C", "C") }, tmpDirMerged, - indexSpec + indexSpec, + -1 ); final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(closer.closeLater(indexIO.loadIndex( merged))); @@ -1706,7 +1721,8 @@ public void testMismatchedMetrics() throws IOException new LongSumAggregatorFactory("D", "D") }, tmpDirMerged, - indexSpec + indexSpec, + -1 ); // Since D was not present in any of the indices, it is not present in the output @@ -1750,7 +1766,8 @@ public void testMismatchedMetricsVarying() throws IOException new LongSumAggregatorFactory("D", "D") }, tmpDirMerged, - indexSpec + indexSpec, + -1 ); final QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter( closer.closeLater(indexIO.loadIndex(merged)) @@ -1784,7 +1801,8 @@ public void testMergeNumericDims() throws Exception new AggregatorFactory[]{new CountAggregatorFactory("count")}, tmpDirMerged, indexSpec, - null + null, + -1 ) ) ); @@ -2265,7 +2283,8 @@ public void testMultivalDim_mergeAcrossSegments_rollupWorks() throws Exception }, tmpDirMerged, indexSpec, - null + null, + -1 ) ) ); @@ -2494,7 +2513,8 @@ public void testMultivalDim_persistAndMerge_dimensionValueOrderingRules() throws }, tmpDirMerged, indexSpec, - null + null, + -1 ) ) ); @@ -2611,6 +2631,274 @@ public void testMultivalDim_persistAndMerge_dimensionValueOrderingRules() throws } } + private MapBasedInputRow getRowForTestMaxColumnsToMerge( + long ts, + String d1, + String d2, + String d3, + String d4, + String d5 + ) + { + return new MapBasedInputRow( + ts, + Arrays.asList("d1", "d2", "d3", "d4", "d5"), + ImmutableMap.of( + "d1", d1, + "d2", d2, + "d3", d3, + "d4", d4, + "d5", d5 + ) + ); + } + + private void validateTestMaxColumnsToMergeOutputSegment(QueryableIndex merged) + { + final QueryableIndexIndexableAdapter adapter = new QueryableIndexIndexableAdapter(merged); + final List rowList = RowIteratorHelper.toList(adapter.getRows()); + + Assert.assertEquals( + ImmutableList.of("d1", "d2", "d3", "d4", "d5"), + ImmutableList.copyOf(adapter.getDimensionNames()) + ); + + Assert.assertEquals(4, rowList.size()); + + Assert.assertEquals( + Arrays.asList("a", "b", "c", "d", "e"), + rowList.get(0).dimensionValues() + ); + Assert.assertEquals(1L, rowList.get(0).metricValues().get(0)); + + Assert.assertEquals( + Arrays.asList("aa", "bb", "cc", "dd", "ee"), + rowList.get(1).dimensionValues() + ); + Assert.assertEquals(1L, rowList.get(1).metricValues().get(0)); + + Assert.assertEquals( + Arrays.asList("aaa", "bbb", "ccc", "ddd", "eee"), + rowList.get(2).dimensionValues() + ); + Assert.assertEquals(1L, rowList.get(2).metricValues().get(0)); + + Assert.assertEquals( + Arrays.asList("1", "2", "3", "4", "5"), + rowList.get(3).dimensionValues() + ); + Assert.assertEquals(3L, rowList.get(3).metricValues().get(0)); + + checkBitmapIndex(Collections.singletonList(0), adapter.getBitmapIndex("d1", "a")); + checkBitmapIndex(Collections.singletonList(1), adapter.getBitmapIndex("d1", "aa")); + checkBitmapIndex(Collections.singletonList(2), adapter.getBitmapIndex("d1", "aaa")); + checkBitmapIndex(Collections.singletonList(2), adapter.getBitmapIndex("d1", "aaa")); + checkBitmapIndex(Collections.singletonList(3), adapter.getBitmapIndex("d1", "1")); + + checkBitmapIndex(Collections.singletonList(0), adapter.getBitmapIndex("d2", "b")); + checkBitmapIndex(Collections.singletonList(1), adapter.getBitmapIndex("d2", "bb")); + checkBitmapIndex(Collections.singletonList(2), adapter.getBitmapIndex("d2", "bbb")); + checkBitmapIndex(Collections.singletonList(3), adapter.getBitmapIndex("d2", "2")); + + checkBitmapIndex(Collections.singletonList(0), adapter.getBitmapIndex("d3", "c")); + checkBitmapIndex(Collections.singletonList(1), adapter.getBitmapIndex("d3", "cc")); + checkBitmapIndex(Collections.singletonList(2), adapter.getBitmapIndex("d3", "ccc")); + checkBitmapIndex(Collections.singletonList(3), adapter.getBitmapIndex("d3", "3")); + + checkBitmapIndex(Collections.singletonList(0), adapter.getBitmapIndex("d4", "d")); + checkBitmapIndex(Collections.singletonList(1), adapter.getBitmapIndex("d4", "dd")); + checkBitmapIndex(Collections.singletonList(2), adapter.getBitmapIndex("d4", "ddd")); + checkBitmapIndex(Collections.singletonList(3), adapter.getBitmapIndex("d4", "4")); + + checkBitmapIndex(Collections.singletonList(0), adapter.getBitmapIndex("d5", "e")); + checkBitmapIndex(Collections.singletonList(1), adapter.getBitmapIndex("d5", "ee")); + checkBitmapIndex(Collections.singletonList(2), adapter.getBitmapIndex("d5", "eee")); + checkBitmapIndex(Collections.singletonList(3), adapter.getBitmapIndex("d5", "5")); + } + + @Test + public void testMaxColumnsToMerge() throws Exception + { + IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() + .withMetrics(new CountAggregatorFactory("count")) + .withRollup(true) + .build(); + + IncrementalIndex toPersistA = new IncrementalIndex.Builder() + .setIndexSchema(indexSchema) + .setMaxRowCount(1000) + .buildOnheap(); + toPersistA.add(getRowForTestMaxColumnsToMerge(10000, "a", "b", "c", "d", "e")); + toPersistA.add(getRowForTestMaxColumnsToMerge(99999, "1", "2", "3", "4", "5")); + + IncrementalIndex toPersistB = new IncrementalIndex.Builder() + .setIndexSchema(indexSchema) + .setMaxRowCount(1000) + .buildOnheap(); + toPersistB.add(getRowForTestMaxColumnsToMerge(20000, "aa", "bb", "cc", "dd", "ee")); + toPersistB.add(getRowForTestMaxColumnsToMerge(99999, "1", "2", "3", "4", "5")); + + IncrementalIndex toPersistC = new IncrementalIndex.Builder() + .setIndexSchema(indexSchema) + .setMaxRowCount(1000) + .buildOnheap(); + toPersistC.add(getRowForTestMaxColumnsToMerge(30000, "aaa", "bbb", "ccc", "ddd", "eee")); + toPersistC.add(getRowForTestMaxColumnsToMerge(99999, "1", "2", "3", "4", "5")); + + final File tmpDirA = temporaryFolder.newFolder(); + final File tmpDirB = temporaryFolder.newFolder(); + final File tmpDirC = temporaryFolder.newFolder(); + + QueryableIndex indexA = closer.closeLater( + indexIO.loadIndex(indexMerger.persist(toPersistA, tmpDirA, indexSpec, null)) + ); + + QueryableIndex indexB = closer.closeLater( + indexIO.loadIndex(indexMerger.persist(toPersistB, tmpDirB, indexSpec, null)) + ); + + QueryableIndex indexC = closer.closeLater( + indexIO.loadIndex(indexMerger.persist(toPersistC, tmpDirC, indexSpec, null)) + ); + + // no column limit + final File tmpDirMerged0 = temporaryFolder.newFolder(); + final QueryableIndex merged0 = closer.closeLater( + indexIO.loadIndex( + indexMerger.mergeQueryableIndex( + Arrays.asList(indexA, indexB, indexC), + true, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged0, + indexSpec, + null, + -1 + ) + ) + ); + validateTestMaxColumnsToMergeOutputSegment(merged0); + + // column limit is greater than total # of columns + final File tmpDirMerged1 = temporaryFolder.newFolder(); + final QueryableIndex merged1 = closer.closeLater( + indexIO.loadIndex( + indexMerger.mergeQueryableIndex( + Arrays.asList(indexA, indexB, indexC), + true, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged1, + indexSpec, + null, + 50 + ) + ) + ); + validateTestMaxColumnsToMergeOutputSegment(merged1); + + // column limit is greater than 2 segments worth of columns + final File tmpDirMerged2 = temporaryFolder.newFolder(); + final QueryableIndex merged2 = closer.closeLater( + indexIO.loadIndex( + indexMerger.mergeQueryableIndex( + Arrays.asList(indexA, indexB, indexC), + true, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged2, + indexSpec, + null, + 15 + ) + ) + ); + validateTestMaxColumnsToMergeOutputSegment(merged2); + + // column limit is between 1 and 2 segments worth of columns (merge two segments at once) + final File tmpDirMerged3 = temporaryFolder.newFolder(); + final QueryableIndex merged3 = closer.closeLater( + indexIO.loadIndex( + indexMerger.mergeQueryableIndex( + Arrays.asList(indexA, indexB, indexC), + true, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged3, + indexSpec, + null, + 9 + ) + ) + ); + validateTestMaxColumnsToMergeOutputSegment(merged3); + + // column limit is less than 1 segment + final File tmpDirMerged4 = temporaryFolder.newFolder(); + final QueryableIndex merged4 = closer.closeLater( + indexIO.loadIndex( + indexMerger.mergeQueryableIndex( + Arrays.asList(indexA, indexB, indexC), + true, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged4, + indexSpec, + null, + 3 + ) + ) + ); + validateTestMaxColumnsToMergeOutputSegment(merged4); + + // column limit is exactly 1 segment's worth of columns + final File tmpDirMerged5 = temporaryFolder.newFolder(); + final QueryableIndex merged5 = closer.closeLater( + indexIO.loadIndex( + indexMerger.mergeQueryableIndex( + Arrays.asList(indexA, indexB, indexC), + true, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged5, + indexSpec, + null, + 6 + ) + ) + ); + validateTestMaxColumnsToMergeOutputSegment(merged5); + + // column limit is exactly 2 segment's worth of columns + final File tmpDirMerged6 = temporaryFolder.newFolder(); + final QueryableIndex merged6 = closer.closeLater( + indexIO.loadIndex( + indexMerger.mergeQueryableIndex( + Arrays.asList(indexA, indexB, indexC), + true, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged6, + indexSpec, + null, + 12 + ) + ) + ); + validateTestMaxColumnsToMergeOutputSegment(merged6); + + // column limit is exactly the total number of columns + final File tmpDirMerged7 = temporaryFolder.newFolder(); + final QueryableIndex merged7 = closer.closeLater( + indexIO.loadIndex( + indexMerger.mergeQueryableIndex( + Arrays.asList(indexA, indexB, indexC), + true, + new AggregatorFactory[]{new CountAggregatorFactory("count")}, + tmpDirMerged7, + indexSpec, + null, + 18 + ) + ) + ); + validateTestMaxColumnsToMergeOutputSegment(merged7); + } + + private QueryableIndex persistAndLoad(List schema, InputRow... rows) throws IOException { IncrementalIndex toPersist = IncrementalIndexTest.createIndex(null, new DimensionsSpec(schema, null, null)); diff --git a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java index ccc6ff7b359d..7a0f8e787c68 100644 --- a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java +++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java @@ -508,7 +508,8 @@ private static QueryableIndex makeMergedQueryableIndex( METRIC_AGGS, mergedFile, indexSpec, - null + null, + -1 ) ); return mergedRealtime; diff --git a/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java b/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java index 0cf4d0dd134b..6acd72a033bc 100644 --- a/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java +++ b/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java @@ -218,7 +218,8 @@ public QueryableIndex getMergedIncrementalIndex() METRIC_AGGS, mergedFile, INDEX_SPEC, - null + null, + -1 ) ); @@ -265,7 +266,8 @@ public QueryableIndex getMergedIncrementalIndex(int index1, int index2) METRIC_AGGS, mergedFile, INDEX_SPEC, - null + null, + -1 ) ); @@ -301,7 +303,7 @@ public QueryableIndex getMergedIncrementalIndex(int[] indexes) } return indexIO.loadIndex( - indexMerger.mergeQueryableIndex(indexesToMerge, true, METRIC_AGGS, mergedFile, INDEX_SPEC, null) + indexMerger.mergeQueryableIndex(indexesToMerge, true, METRIC_AGGS, mergedFile, INDEX_SPEC, null, -1) ); } catch (IOException e) { @@ -562,7 +564,8 @@ public QueryableIndex apply(@Nullable File input) METRIC_AGGS, mergedFile, INDEX_SPEC, - null + null, + -1 ) ); } diff --git a/processing/src/test/java/org/apache/druid/segment/TestIndex.java b/processing/src/test/java/org/apache/druid/segment/TestIndex.java index 932a28c1cde0..58bf56c10f36 100644 --- a/processing/src/test/java/org/apache/druid/segment/TestIndex.java +++ b/processing/src/test/java/org/apache/druid/segment/TestIndex.java @@ -207,7 +207,8 @@ public class TestIndex METRIC_AGGS, mergedFile, INDEX_SPEC, - null + null, + -1 ) ); } diff --git a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java index a6207aa09e52..5cc81909dc3f 100644 --- a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java +++ b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java @@ -456,7 +456,8 @@ private static QueryableIndex makeMergedQueryableIndex( METRIC_AGGS, mergedFile, indexSpec, - null + null, + -1 ) ); diff --git a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java index 744b25d812f3..8b961b6de54f 100644 --- a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java +++ b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java @@ -507,7 +507,8 @@ private static QueryableIndex makeMergedQueryableIndex(IndexSpec indexSpec) METRIC_AGGS, mergedFile, indexSpec, - null + null, + -1 ) ); diff --git a/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java index e2bee73dc668..3170507b2bd3 100644 --- a/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java +++ b/processing/src/test/java/org/apache/druid/segment/generator/SegmentGenerator.java @@ -187,7 +187,8 @@ public QueryableIndex generate( .map(AggregatorFactory::getCombiningFactory) .toArray(AggregatorFactory[]::new), outDir, - new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null) + new IndexSpec(new RoaringBitmapSerdeFactory(true), null, null, null), + -1 ) ); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index 3fc8164cdf18..685d6ec60301 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -61,4 +61,9 @@ default Long getMaxTotalRows() @Nullable SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory(); + + default int getMaxColumnsToMerge() + { + return -1; + } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index ec9200cf81da..211a3542c375 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -772,7 +772,8 @@ private DataSegment mergeAndPush( schema.getAggregators(), mergedTarget, tuningConfig.getIndexSpec(), - tuningConfig.getSegmentWriteOutMediumFactory() + tuningConfig.getSegmentWriteOutMediumFactory(), + tuningConfig.getMaxColumnsToMerge() ); mergeFinishTime = System.nanoTime(); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 087ac2f8534a..d7609a3fae6a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -511,7 +511,8 @@ public File mergeQueryableIndex( AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + int maxColumnsToMerge ) { ListenableFuture mergeFuture = mergeExecutor.submit( @@ -527,7 +528,8 @@ public File call() metricAggs, outDir, indexSpec, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + maxColumnsToMerge ); } catch (IOException ioe) { @@ -590,7 +592,8 @@ public File merge( boolean rollup, AggregatorFactory[] metricAggs, File outDir, - IndexSpec indexSpec + IndexSpec indexSpec, + int maxColumnsToMerge ) { throw new UOE(ERROR_MSG); @@ -650,7 +653,8 @@ public File mergeQueryableIndex( File outDir, IndexSpec indexSpec, ProgressIndicator progress, - @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory + @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + int maxColumnsToMerge ) { throw new UOE(ERROR_MSG); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index 2aec7587a1bc..4ecab39cc933 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -437,7 +437,8 @@ public void doRun() schema.getAggregators(), mergedTarget, config.getIndexSpec(), - config.getSegmentWriteOutMediumFactory() + config.getSegmentWriteOutMediumFactory(), + -1 ); } catch (Throwable t) { From 25a8d5789a893e6257c066e9a6a72034d8d3cc25 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 17 Dec 2020 15:44:21 -0800 Subject: [PATCH 2/6] JSON fix --- .../java/org/apache/druid/indexing/common/task/IndexTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 5822664e04b6..78d469957bd9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1453,6 +1453,7 @@ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() } @Override + @JsonProperty public int getMaxColumnsToMerge() { return maxColumnsToMerge; From 262f223fb71d6d4f2393d44d240cf300ed331108 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 17 Dec 2020 19:05:12 -0800 Subject: [PATCH 3/6] Cleanup temp files --- .../apache/druid/segment/IndexMergerV9.java | 78 ++++++++++++------- 1 file changed, 49 insertions(+), 29 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index c76a9d0812a0..341753f2d89a 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -28,7 +28,6 @@ import com.google.common.primitives.Ints; import com.google.inject.Inject; import org.apache.druid.common.config.NullHandling; -import org.apache.druid.common.utils.UUIDUtils; import org.apache.druid.io.ZeroCopyByteArrayOutputStream; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.FileUtils; @@ -958,6 +957,8 @@ private File multiphaseMerge( FileUtils.deleteDirectory(outDir); org.apache.commons.io.FileUtils.forceMkdir(outDir); + List tempDirs = new ArrayList<>(); + if (maxColumnsToMerge == IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE) { return merge( indexes, @@ -973,38 +974,57 @@ private File multiphaseMerge( List> currentPhases = getMergePhases(indexes, maxColumnsToMerge); List currentOutputs = new ArrayList<>(); - while (true) { - for (List phase : currentPhases) { - File phaseOutDir; - if (currentPhases.size() == 1) { - // use the given outDir on the final merge phase - phaseOutDir = outDir; + log.debug("base outDir: " + outDir); + + try { + while (true) { + for (List phase : currentPhases) { + File phaseOutDir; + if (currentPhases.size() == 1) { + // use the given outDir on the final merge phase + phaseOutDir = outDir; + } else { + phaseOutDir = FileUtils.createTempDir(); + tempDirs.add(phaseOutDir); + } + log.debug("phase outDir: " + phaseOutDir); + + File phaseOutput = merge( + phase, + rollup, + metricAggs, + phaseOutDir, + indexSpec, + progress, + segmentWriteOutMediumFactory + ); + currentOutputs.add(phaseOutput); + } + if (currentOutputs.size() == 1) { + // we're done, we made a single File output + return currentOutputs.get(0); } else { - phaseOutDir = java.nio.file.Files.createTempDirectory(outDir.toPath(), UUIDUtils.generateUuid()).toFile(); + // convert Files to QueryableIndexIndexableAdapter and do another merge phase + List qIndexAdapters = new ArrayList<>(); + for (File outputFile : currentOutputs) { + QueryableIndex qIndex = indexIO.loadIndex(outputFile, true); + qIndexAdapters.add(new QueryableIndexIndexableAdapter(qIndex)); + } + currentPhases = getMergePhases(qIndexAdapters, maxColumnsToMerge); + currentOutputs = new ArrayList<>(); } - File phaseOutput = merge( - phase, - rollup, - metricAggs, - phaseOutDir, - indexSpec, - progress, - segmentWriteOutMediumFactory - ); - currentOutputs.add(phaseOutput); } - if (currentOutputs.size() == 1) { - // we're done, making a single File output - return currentOutputs.get(0); - } else { - // convert Files to QueryableIndexIndexableAdapter and do another merge phase - List qIndexAdapters = new ArrayList<>(); - for (File outputFile : currentOutputs) { - QueryableIndex qIndex = indexIO.loadIndex(outputFile, true); - qIndexAdapters.add(new QueryableIndexIndexableAdapter(qIndex)); + } + finally { + for (File tempDir : tempDirs) { + if (tempDir.exists()) { + try { + FileUtils.deleteDirectory(tempDir); + } + catch (Exception e) { + log.warn(e, "Failed to remove directory[%s]", tempDir); + } } - currentPhases = getMergePhases(qIndexAdapters, maxColumnsToMerge); - currentOutputs = new ArrayList<>(); } } } From 76458ecb77b0b18bb0fc0a36e4bf6c389abce3ca Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 17 Dec 2020 19:26:52 -0800 Subject: [PATCH 4/6] Docs --- docs/ingestion/native-batch.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/ingestion/native-batch.md b/docs/ingestion/native-batch.md index b754967c88f9..4b5dfa49ae3f 100644 --- a/docs/ingestion/native-batch.md +++ b/docs/ingestion/native-batch.md @@ -204,6 +204,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |maxRowsPerSegment|Deprecated. Use `partitionsSpec` instead. Used in sharding. Determines how many rows are in each segment.|5000000|no| |maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no| |maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no| +|maxColumnsToMerge|A parameter that limits how many segments can be merged in a single phase when merging segments for publishing. This limit is imposed on the total number of columns present in a set of segments being merged. If the limit is exceeded, segment merging will occur in multiple phases. At least 2 segments will be merged in a single phase, regardless of this setting.|-1 (unlimited)|no| |maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no| |numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create when using a `hashed` `partitionsSpec`. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no| |splitHintSpec|Used to give a hint to control the amount of data that each first phase task reads. This hint could be ignored depending on the implementation of the input source. See [Split hint spec](#split-hint-spec) for more details.|size-based split hint spec|no| From 1e90ea3e56a921585268e1a4a4a6039328a977ff Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 24 Dec 2020 16:56:21 -0800 Subject: [PATCH 5/6] Address logging and add IT --- .../druid/tests/indexer/ITIndexerTest.java | 22 +++++ ...ia_index_with_merge_column_limit_task.json | 86 +++++++++++++++++++ .../apache/druid/segment/IndexMergerV9.java | 9 +- 3 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 integration-tests/src/test/resources/indexer/wikipedia_index_with_merge_column_limit_task.json diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java index a602d4a5aa05..154dd7674c54 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java @@ -54,6 +54,10 @@ public class ITIndexerTest extends AbstractITBatchIndexTest private static final String MERGE_REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json"; private static final String MERGE_REINDEX_DATASOURCE = "wikipedia_merge_reindex_test"; + + private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_TASK = "/indexer/wikipedia_index_with_merge_column_limit_task.json"; + private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE = "wikipedia_index_with_merge_column_limit_test"; + @Test public void testIndexData() throws Exception { @@ -152,4 +156,22 @@ public void testMERGEIndexData() throws Exception ); } } + + + @Test + public void testIndexWithMergeColumnLimitData() throws Exception + { + try ( + final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); + ) { + doIndexTest( + INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE, + INDEX_WITH_MERGE_COLUMN_LIMIT_TASK, + INDEX_QUERIES_RESOURCE, + false, + true, + true + ); + } + } } diff --git a/integration-tests/src/test/resources/indexer/wikipedia_index_with_merge_column_limit_task.json b/integration-tests/src/test/resources/indexer/wikipedia_index_with_merge_column_limit_task.json new file mode 100644 index 000000000000..35b115c9f191 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/wikipedia_index_with_merge_column_limit_task.json @@ -0,0 +1,86 @@ +{ + "type": "index", + "spec": { + "dataSchema": { + "dataSource": "%%DATASOURCE%%", + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + }, + { + "name": "thetaSketch", + "type": "thetaSketch", + "fieldName": "user" + }, + { + "name": "quantilesDoublesSketch", + "type": "quantilesDoublesSketch", + "fieldName": "delta" + }, + { + "name": "HLLSketchBuild", + "type": "HLLSketchBuild", + "fieldName": "user" + } + ], + "granularitySpec": { + "segmentGranularity": "DAY", + "queryGranularity": "second", + "intervals" : [ "2013-08-31/2013-09-02" ] + }, + "parser": { + "parseSpec": { + "format" : "json", + "timestampSpec": { + "column": "timestamp" + }, + "dimensionsSpec": { + "dimensions": [ + "page", + {"type": "string", "name": "language", "createBitmapIndex": false}, + "user", + "unpatrolled", + "newPage", + "robot", + "anonymous", + "namespace", + "continent", + "country", + "region", + "city" + ] + } + } + } + }, + "ioConfig": { + "type": "index", + "firehose": { + "type": "local", + "baseDir": "/resources/data/batch_index/json", + "filter": "wikipedia_index_data*" + } + }, + "tuningConfig": { + "type": "index", + "maxRowsPerSegment": 3, + "maxColumnsToMerge" : 30 + } + } +} \ No newline at end of file diff --git a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java index 341753f2d89a..cb65289d78d6 100644 --- a/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java +++ b/processing/src/main/java/org/apache/druid/segment/IndexMergerV9.java @@ -977,16 +977,20 @@ private File multiphaseMerge( log.debug("base outDir: " + outDir); try { + int tierCounter = 0; while (true) { + log.info("Merging %d phases, tiers finished processed so far: %d.", currentPhases.size(), tierCounter); for (List phase : currentPhases) { File phaseOutDir; if (currentPhases.size() == 1) { // use the given outDir on the final merge phase phaseOutDir = outDir; + log.info("Performing final merge phase."); } else { phaseOutDir = FileUtils.createTempDir(); tempDirs.add(phaseOutDir); } + log.info("Merging phase with %d indexes.", phase.size()); log.debug("phase outDir: " + phaseOutDir); File phaseOutput = merge( @@ -1012,6 +1016,7 @@ private File multiphaseMerge( } currentPhases = getMergePhases(qIndexAdapters, maxColumnsToMerge); currentOutputs = new ArrayList<>(); + tierCounter += 1; } } } @@ -1035,7 +1040,7 @@ private List> getMergePhases(List index // always merge at least two segments regardless of column limit if (indexes.size() <= 2) { if (getIndexColumnCount(indexes) > maxColumnsToMerge) { - log.warn("index pair has more columns than maxColumnsToMerge [%d].", maxColumnsToMerge); + log.info("index pair has more columns than maxColumnsToMerge [%d].", maxColumnsToMerge); } toMerge.add(indexes); } else { @@ -1044,7 +1049,7 @@ private List> getMergePhases(List index for (IndexableAdapter index : indexes) { int indexColumnCount = getIndexColumnCount(index); if (indexColumnCount > maxColumnsToMerge) { - log.warn("index has more columns [%d] than maxColumnsToMerge [%d]!", indexColumnCount, maxColumnsToMerge); + log.info("index has more columns [%d] than maxColumnsToMerge [%d]!", indexColumnCount, maxColumnsToMerge); } // always merge at least two segments regardless of column limit From ccf61c32c1275cee9f254169b15dbccc207a0c53 Mon Sep 17 00:00:00 2001 From: jon-wei Date: Thu, 24 Dec 2020 19:06:00 -0800 Subject: [PATCH 6/6] Fix spelling and test unloader datasource name --- .../java/org/apache/druid/tests/indexer/ITIndexerTest.java | 3 +-- website/.spelling | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java index 154dd7674c54..9563306a0a70 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java @@ -54,7 +54,6 @@ public class ITIndexerTest extends AbstractITBatchIndexTest private static final String MERGE_REINDEX_QUERIES_RESOURCE = "/indexer/wikipedia_merge_index_queries.json"; private static final String MERGE_REINDEX_DATASOURCE = "wikipedia_merge_reindex_test"; - private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_TASK = "/indexer/wikipedia_index_with_merge_column_limit_task.json"; private static final String INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE = "wikipedia_index_with_merge_column_limit_test"; @@ -162,7 +161,7 @@ public void testMERGEIndexData() throws Exception public void testIndexWithMergeColumnLimitData() throws Exception { try ( - final Closeable ignored1 = unloader(INDEX_DATASOURCE + config.getExtraDatasourceNameSuffix()); + final Closeable ignored1 = unloader(INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE + config.getExtraDatasourceNameSuffix()); ) { doIndexTest( INDEX_WITH_MERGE_COLUMN_LIMIT_DATASOURCE, diff --git a/website/.spelling b/website/.spelling index 1be0a2f3feb9..155f65f6747d 100644 --- a/website/.spelling +++ b/website/.spelling @@ -1032,6 +1032,7 @@ httpAuthenticationUsername ingestSegment InputSource DruidInputSource +maxColumnsToMerge maxInputSegmentBytesPerTask maxNumConcurrentSubTasks maxNumSegmentsToMerge