From b97e26e8d1701b74694d9c76cfd1d6ba16134635 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Tue, 18 Jun 2019 01:01:37 -0700 Subject: [PATCH 1/6] disable all compression in intermediate segment persists while ingestion --- .../kafka/KafkaIndexTaskTuningConfig.java | 4 ++ .../kafka/supervisor/KafkaSupervisorSpec.java | 1 + .../KafkaSupervisorTuningConfig.java | 3 ++ .../indexing/kafka/KafkaIndexTaskTest.java | 1 + .../kafka/KafkaIndexTaskTuningConfigTest.java | 4 ++ .../kafka/supervisor/KafkaSupervisorTest.java | 2 + ...estModifiedKafkaIndexTaskTuningConfig.java | 2 + .../indexer/HadoopDruidIndexerConfig.java | 5 +++ .../druid/indexer/HadoopIngestionSpec.java | 1 + .../druid/indexer/HadoopTuningConfig.java | 15 +++++++ .../druid/indexer/IndexGeneratorJob.java | 2 +- .../indexer/BatchDeltaIngestionTest.java | 1 + .../DetermineHashedPartitionsJobTest.java | 1 + .../indexer/DeterminePartitionsJobTest.java | 1 + .../indexer/HadoopDruidIndexerConfigTest.java | 2 + .../druid/indexer/HadoopTuningConfigTest.java | 1 + .../druid/indexer/IndexGeneratorJobTest.java | 1 + .../apache/druid/indexer/JobHelperTest.java | 1 + .../indexer/path/GranularityPathSpecTest.java | 1 + .../RealtimeAppenderatorTuningConfig.java | 12 ++++++ .../common/index/YeOldePlumberSchool.java | 2 +- .../druid/indexing/common/task/IndexTask.java | 40 +++++++++++++++---- .../parallel/ParallelIndexSupervisorTask.java | 1 + .../parallel/ParallelIndexTuningConfig.java | 3 ++ .../SeekableStreamIndexTaskTuningConfig.java | 13 ++++++ ...penderatorDriverRealtimeIndexTaskTest.java | 1 + .../common/task/CompactionTaskTest.java | 6 +++ .../indexing/common/task/IndexTaskTest.java | 4 ++ .../common/task/RealtimeIndexTaskTest.java | 1 + .../indexing/common/task/TaskSerdeTest.java | 3 ++ .../ParallelIndexSupervisorTaskKillTest.java | 1 + ...rallelIndexSupervisorTaskResourceTest.java | 1 + .../ParallelIndexSupervisorTaskSerdeTest.java | 1 + .../ParallelIndexSupervisorTaskTest.java | 2 + .../ParallelIndexTuningConfigTest.java | 1 + .../indexing/overlord/TaskLifecycleTest.java | 4 ++ .../SeekableStreamSupervisorStateTest.java | 1 + .../indexing/RealtimeTuningConfig.java | 14 +++++++ .../appenderator/AppenderatorConfig.java | 9 +++++ .../appenderator/AppenderatorImpl.java | 4 +- .../realtime/plumber/RealtimePlumber.java | 5 +-- .../segment/realtime/RealtimeManagerTest.java | 3 ++ .../appenderator/AppenderatorPlumberTest.java | 1 + .../appenderator/AppenderatorTester.java | 1 + ...DefaultOfflineAppenderatorFactoryTest.java | 1 + .../plumber/RealtimePlumberSchoolTest.java | 1 + .../segment/realtime/plumber/SinkTest.java | 2 + .../cli/validate/DruidJsonValidatorTest.java | 1 + 48 files changed, 171 insertions(+), 17 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java index 2104759f2966..f23ee7b0fced 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java @@ -41,6 +41,7 @@ public KafkaIndexTaskTuningConfig( @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @@ -62,6 +63,7 @@ public KafkaIndexTaskTuningConfig( basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, true, reportParseExceptions, handoffConditionTimeout, @@ -87,6 +89,7 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) dir, getMaxPendingPersists(), getIndexSpec(), + getIndexSpecForIntermediatePersists(), true, isReportParseExceptions(), getHandoffConditionTimeout(), @@ -112,6 +115,7 @@ public String toString() ", basePersistDirectory=" + getBasePersistDirectory() + ", maxPendingPersists=" + getMaxPendingPersists() + ", indexSpec=" + getIndexSpec() + + ", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() + ", reportParseExceptions=" + isReportParseExceptions() + ", handoffConditionTimeout=" + getHandoffConditionTimeout() + ", resetOffsetAutomatically=" + isResetOffsetAutomatically() + diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index ef6259f64121..60c187f07487 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -85,6 +85,7 @@ public KafkaSupervisorSpec( null, null, null, + null, null ), ioConfig, diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 8e4c6e9f9137..27e9b295edfe 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -53,6 +53,7 @@ public KafkaSupervisorTuningConfig( @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @@ -80,6 +81,7 @@ public KafkaSupervisorTuningConfig( basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, true, reportParseExceptions, handoffConditionTimeout, @@ -186,6 +188,7 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() getBasePersistDirectory(), getMaxPendingPersists(), getIndexSpec(), + getIndexSpecForIntermediatePersists(), true, isReportParseExceptions(), getHandoffConditionTimeout(), diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 6c42b3b85627..4b746003d47d 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2467,6 +2467,7 @@ private KafkaIndexTask createTask( null, null, null, + null, true, reportParseExceptions, handoffConditionTimeout, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index 63b0e985e0b7..4af6e01082c8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -117,6 +117,7 @@ public void testConvert() new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, @@ -159,6 +160,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, @@ -206,6 +208,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException new File("/tmp/xxx"), 4, new IndexSpec(), + new IndexSpec(), true, true, 5L, @@ -252,6 +255,7 @@ private static KafkaIndexTaskTuningConfig copy(KafkaIndexTaskTuningConfig config config.getBasePersistDirectory(), 0, config.getIndexSpec(), + config.getIndexSpecForIntermediatePersists(), true, config.isReportParseExceptions(), config.getHandoffConditionTimeout(), diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 2eff41c7106c..815c99b71486 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -220,6 +220,7 @@ public void setupTest() new File("/test"), null, null, + null, true, false, null, @@ -3016,6 +3017,7 @@ public void testIsTaskCurrent() new File("/test"), null, null, + null, true, false, null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java index 3cc124f6afd2..27e69e8e7e00 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java @@ -45,6 +45,7 @@ public TestModifiedKafkaIndexTaskTuningConfig( @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @@ -67,6 +68,7 @@ public TestModifiedKafkaIndexTaskTuningConfig( basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, true, reportParseExceptions, handoffConditionTimeout, diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java index fc9a5d966a3b..4aa8cd25d462 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopDruidIndexerConfig.java @@ -299,6 +299,11 @@ public IndexSpec getIndexSpec() return schema.getTuningConfig().getIndexSpec(); } + public IndexSpec getIndexSpecForIntermediatePersists() + { + return schema.getTuningConfig().getIndexSpecForIntermediatePersists(); + } + public boolean isOverwriteFiles() { return schema.getTuningConfig().isOverwriteFiles(); diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIngestionSpec.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIngestionSpec.java index 30dfdb4c65a6..65d34ea00195 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIngestionSpec.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopIngestionSpec.java @@ -26,6 +26,7 @@ import org.apache.druid.indexer.hadoop.DatasourceIngestionSpec; import org.apache.druid.indexer.hadoop.WindowedDataSegment; import org.apache.druid.indexer.path.UsedSegmentLister; +import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.indexing.IngestionSpec; import org.apache.druid.timeline.DataSegment; diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java index 5fd9b3d8f2fa..ca4b60231b68 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java @@ -30,6 +30,7 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.indexing.TuningConfig; +import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; import javax.annotation.Nullable; import java.util.List; @@ -55,6 +56,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() DEFAULT_PARTITIONS_SPEC, DEFAULT_SHARD_SPECS, DEFAULT_INDEX_SPEC, + AppenderatorConfig.DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, DEFAULT_ROW_FLUSH_BOUNDARY, 0L, false, @@ -81,6 +83,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() private final PartitionsSpec partitionsSpec; private final Map> shardSpecs; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final int rowFlushBoundary; private final long maxBytesInMemory; private final boolean leaveIntermediate; @@ -105,6 +108,7 @@ public HadoopTuningConfig( final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, final @JsonProperty("shardSpecs") Map> shardSpecs, final @JsonProperty("indexSpec") IndexSpec indexSpec, + final @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, final @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, final @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, final @JsonProperty("leaveIntermediate") boolean leaveIntermediate, @@ -132,6 +136,8 @@ public HadoopTuningConfig( this.partitionsSpec = partitionsSpec == null ? DEFAULT_PARTITIONS_SPEC : partitionsSpec; this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + AppenderatorConfig.DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS : indexSpecForIntermediatePersists; this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null ? DEFAULT_ROW_FLUSH_BOUNDARY : maxRowsInMemoryCOMPAT : maxRowsInMemory; @@ -199,6 +205,12 @@ public IndexSpec getIndexSpec() return indexSpec; } + @JsonProperty + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + @JsonProperty("maxRowsInMemory") public int getRowFlushBoundary() { @@ -314,6 +326,7 @@ public HadoopTuningConfig withWorkingPath(String path) partitionsSpec, shardSpecs, indexSpec, + indexSpecForIntermediatePersists, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, @@ -343,6 +356,7 @@ public HadoopTuningConfig withVersion(String ver) partitionsSpec, shardSpecs, indexSpec, + indexSpecForIntermediatePersists, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, @@ -372,6 +386,7 @@ public HadoopTuningConfig withShardSpecs(Map> specs partitionsSpec, specs, indexSpec, + indexSpecForIntermediatePersists, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index 6c032b06e7bb..f4cad315270a 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -605,7 +605,7 @@ private File persist( ) throws IOException { return HadoopDruidIndexerConfig.INDEX_MERGER_V9 - .persist(index, interval, file, config.getIndexSpec(), progressIndicator, null); + .persist(index, interval, file, config.getIndexSpecForIntermediatePersists(), progressIndicator, null); } protected File mergeQueryableIndex( diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java index 3fcf50ad989c..d0d7180a6e4a 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java @@ -462,6 +462,7 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig( null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java index 59453fce8821..26498746c217 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -199,6 +199,7 @@ public DetermineHashedPartitionsJobTest( null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java index e9265bcabf84..3d8b06bab777 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java @@ -262,6 +262,7 @@ public DeterminePartitionsJobTest( null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java index 7573aeaa4d3a..9c723ac9abd9 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -86,6 +86,7 @@ public void testHashedBucketSelection() null, null, null, + null, false, false, false, @@ -164,6 +165,7 @@ public void testNoneShardSpecBucketSelection() null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java index 25535832054d..0c52ab533a13 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java @@ -43,6 +43,7 @@ public void testSerde() throws Exception null, null, null, + null, 100, null, true, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java index 3925bcd730c9..a7757ac30340 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java @@ -521,6 +521,7 @@ public void setUp() throws Exception null, null, null, + null, maxRowsInMemory, maxBytesInMemory, true, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java index 3969e126a0b1..ff5aed625eaf 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java @@ -114,6 +114,7 @@ public void setup() throws Exception null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java index 7f2ce22f4019..4da9ee41b77d 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java @@ -62,6 +62,7 @@ public class GranularityPathSpecTest null, null, null, + null, false, false, false, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index 9d2bed80ead6..9ccd6c81a6f6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -63,6 +63,7 @@ private static File createNewBasePersistDirectory() private final int maxPendingPersists; private final ShardSpec shardSpec; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final boolean reportParseExceptions; private final long publishAndHandoffTimeout; private final long alertTimeout; @@ -84,6 +85,7 @@ public RealtimeAppenderatorTuningConfig( @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("shardSpec") ShardSpec shardSpec, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("publishAndHandoffTimeout") Long publishAndHandoffTimeout, @JsonProperty("alertTimeout") Long alertTimeout, @@ -106,6 +108,8 @@ public RealtimeAppenderatorTuningConfig( this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + AppenderatorConfig.DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS : indexSpecForIntermediatePersists; this.reportParseExceptions = reportParseExceptions == null ? defaultReportParseExceptions : reportParseExceptions; @@ -196,6 +200,13 @@ public IndexSpec getIndexSpec() return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + @Override @JsonProperty public boolean isReportParseExceptions() @@ -253,6 +264,7 @@ public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir) maxPendingPersists, shardSpec, indexSpec, + indexSpecForIntermediatePersists, reportParseExceptions, publishAndHandoffTimeout, alertTimeout, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java index 6e0e1d861d16..f872042049ae 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java @@ -236,7 +236,7 @@ private void spillIfSwappable() indexMergerV9.persist( indexToPersist.getIndex(), dirToPersist, - config.getIndexSpec(), + config.getIndexSpecForIntermediatePersists(), config.getSegmentWriteOutMediumFactory() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 458c621d3966..6944333eb83b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1270,6 +1270,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private final Integer numShards; private final List partitionDimensions; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final File basePersistDirectory; private final int maxPendingPersists; @@ -1305,6 +1306,7 @@ public IndexTuningConfig( @JsonProperty("numShards") @Nullable Integer numShards, @JsonProperty("partitionDimensions") @Nullable List partitionDimensions, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, // This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @@ -1327,6 +1329,7 @@ public IndexTuningConfig( numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1346,7 +1349,7 @@ public IndexTuningConfig( private IndexTuningConfig() { - this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); } private IndexTuningConfig( @@ -1357,6 +1360,7 @@ private IndexTuningConfig( @Nullable Integer numShards, @Nullable List partitionDimensions, @Nullable IndexSpec indexSpec, + @Nullable IndexSpec indexSpecForIntermediatePersists, @Nullable Integer maxPendingPersists, @Nullable Boolean forceGuaranteedRollup, @Nullable Boolean reportParseExceptions, @@ -1384,6 +1388,8 @@ private IndexTuningConfig( this.numShards = numShards == null || numShards.equals(-1) ? null : numShards; this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + AppenderatorConfig.DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS : indexSpecForIntermediatePersists; this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; this.forceGuaranteedRollup = forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup; this.reportParseExceptions = reportParseExceptions == null @@ -1420,6 +1426,7 @@ public IndexTuningConfig withBasePersistDirectory(File dir) numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1442,6 +1449,7 @@ public IndexTuningConfig withMaxTotalRows(Long maxTotalRows) numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1464,6 +1472,7 @@ public IndexTuningConfig withMaxRowsPerSegment(int maxRowsPerSegment) numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1533,6 +1542,13 @@ public IndexSpec getIndexSpec() return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + @Override public File getBasePersistDirectory() { @@ -1618,19 +1634,22 @@ public boolean equals(Object o) } IndexTuningConfig that = (IndexTuningConfig) o; return maxRowsInMemory == that.maxRowsInMemory && - Objects.equals(maxTotalRows, that.maxTotalRows) && + maxBytesInMemory == that.maxBytesInMemory && maxPendingPersists == that.maxPendingPersists && forceGuaranteedRollup == that.forceGuaranteedRollup && reportParseExceptions == that.reportParseExceptions && pushTimeout == that.pushTimeout && + logParseExceptions == that.logParseExceptions && + maxParseExceptions == that.maxParseExceptions && + maxSavedParseExceptions == that.maxSavedParseExceptions && Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && + Objects.equals(maxTotalRows, that.maxTotalRows) && Objects.equals(numShards, that.numShards) && + Objects.equals(partitionDimensions, that.partitionDimensions) && Objects.equals(indexSpec, that.indexSpec) && + Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && - Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && - logParseExceptions == that.logParseExceptions && - maxParseExceptions == that.maxParseExceptions && - maxSavedParseExceptions == that.maxSavedParseExceptions; + Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory); } @Override @@ -1639,18 +1658,21 @@ public int hashCode() return Objects.hash( maxRowsPerSegment, maxRowsInMemory, + maxBytesInMemory, maxTotalRows, numShards, + partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, basePersistDirectory, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, pushTimeout, - segmentWriteOutMediumFactory, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + segmentWriteOutMediumFactory ); } @@ -1663,7 +1685,9 @@ public String toString() ", maxBytesInMemory=" + maxBytesInMemory + ", maxTotalRows=" + maxTotalRows + ", numShards=" + numShards + + ", partitionDimensions=" + partitionDimensions + ", indexSpec=" + indexSpec + + ", indexSpecForIntermediatePersists=" + indexSpecForIntermediatePersists + ", basePersistDirectory=" + basePersistDirectory + ", maxPendingPersists=" + maxPendingPersists + ", forceGuaranteedRollup=" + forceGuaranteedRollup + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 2a29726965c4..924be846d9c9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -333,6 +333,7 @@ private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningC tuningConfig.getNumShards(), null, tuningConfig.getIndexSpec(), + tuningConfig.getIndexSpecForIntermediatePersists(), tuningConfig.getMaxPendingPersists(), true, false, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index 2c415492277b..c432bb1d4301 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -71,6 +71,7 @@ public static ParallelIndexTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -84,6 +85,7 @@ public ParallelIndexTuningConfig( @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, @JsonProperty("numShards") @Nullable Integer numShards, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @JsonProperty("forceGuaranteedRollup") @Nullable Boolean forceGuaranteedRollup, @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @@ -109,6 +111,7 @@ public ParallelIndexTuningConfig( numShards, null, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, null, forceGuaranteedRollup, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java index b594e42a66cc..48269d2f40a6 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java @@ -47,6 +47,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi @Deprecated private final int maxPendingPersists; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final boolean reportParseExceptions; private final long handoffConditionTimeout; private final boolean resetOffsetAutomatically; @@ -68,6 +69,7 @@ public SeekableStreamIndexTaskTuningConfig( @Nullable File basePersistDirectory, @Nullable Integer maxPendingPersists, @Nullable IndexSpec indexSpec, + @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @Deprecated @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @Deprecated @Nullable Boolean reportParseExceptions, @@ -96,6 +98,8 @@ public SeekableStreamIndexTaskTuningConfig( this.basePersistDirectory = defaults.getBasePersistDirectory(); this.maxPendingPersists = maxPendingPersists == null ? 0 : maxPendingPersists; this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + AppenderatorConfig.DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS : indexSpecForIntermediatePersists; this.reportParseExceptions = reportParseExceptions == null ? defaults.isReportParseExceptions() : reportParseExceptions; @@ -187,6 +191,13 @@ public IndexSpec getIndexSpec() return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + /** * Always returns true, doesn't affect the version being built. */ @@ -281,6 +292,7 @@ public boolean equals(Object o) Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && Objects.equals(indexSpec, that.indexSpec) && + Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod); } @@ -297,6 +309,7 @@ public int hashCode() basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 413e1228d706..e20d43acfcec 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1417,6 +1417,7 @@ private AppenderatorDriverRealtimeIndexTask makeRealtimeTask( null, null, null, + null, reportParseExceptions, handoffTimeout, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 69f10f6be400..bb6993848c94 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -288,6 +288,7 @@ private static IndexTuningConfig createTuningConfig() CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -462,6 +463,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -522,6 +524,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -582,6 +585,7 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -846,6 +850,7 @@ public void testTargetPartitionSizeWithPartitionConfig() throws IOException, Seg CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -1034,6 +1039,7 @@ private static void assertIngestionSchema( CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 97279f94b643..4bae4c8ed75b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -1004,6 +1004,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception null, indexSpec, null, + null, true, true, false, @@ -1126,6 +1127,7 @@ public void testMultipleParseExceptionsFailure() throws Exception null, indexSpec, null, + null, true, false, false, @@ -1241,6 +1243,7 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc null, indexSpec, null, + null, true, true, false, @@ -1706,6 +1709,7 @@ static IndexTuningConfig createTuningConfig( partitionDimensions, indexSpec, null, + null, true, forceGuaranteedRollup, reportParseException, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index e2a24f582377..3a7611184e36 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -836,6 +836,7 @@ private RealtimeIndexTask makeRealtimeTask( null, null, null, + null, true, 0, 0, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index 1d15b6808b37..3f1fe795b9ab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -202,6 +202,7 @@ public void testIndexTaskSerde() throws Exception null, null, indexSpec, + null, 3, true, false, @@ -284,6 +285,7 @@ public void testIndexTaskwithResourceSerde() throws Exception null, null, indexSpec, + null, 3, true, false, @@ -394,6 +396,7 @@ public void testRealtimeIndexTaskSerde() throws Exception NoneShardSpec.instance(), indexSpec, null, + null, 0, 0, true, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index 6f7c3b9e713d..c7c8d0b897e3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -188,6 +188,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, numTotalSubTasks, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 704c5fa4500b..04aa5a7a1b07 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -430,6 +430,7 @@ private TestSupervisorTask newTask( null, null, null, + null, NUM_SUB_TASKS, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index dd9096310d86..0fa747f6604f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -138,6 +138,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, 2, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index cfd02ac18cee..cf7dd372bfdd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -250,6 +250,7 @@ public void testWith1MaxNumSubTasks() throws Exception null, null, null, + null, 1, null, null, @@ -290,6 +291,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, 2, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java index d587c6c0591c..2d393020b3a2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java @@ -71,6 +71,7 @@ public void testSerdeWithMaxRowsPerSegment() CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + new IndexSpec(), 1, false, true, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 6f2b99218a26..44c7010f0469 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -690,6 +690,7 @@ public void testIndexTask() throws Exception null, null, indexSpec, + null, 3, true, false, @@ -771,6 +772,7 @@ public void testIndexTaskFailure() throws Exception null, null, indexSpec, + null, 3, true, false, @@ -1160,6 +1162,7 @@ public void testResumeTasks() throws Exception null, indexSpec, null, + null, false, null, null, @@ -1291,6 +1294,7 @@ private RealtimeIndexTask newRealtimeIndexTask() null, null, null, + null, 0, 0, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 1058eb01e073..cfc5210b7038 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -655,6 +655,7 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() null, null, null, + null, null ) { diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java index 43be6e080a45..56f1dc970fcf 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java @@ -82,6 +82,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File defaultMaxPendingPersists, defaultShardSpec, defaultIndexSpec, + AppenderatorConfig.DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, true, 0, 0, @@ -103,6 +104,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File private final int maxPendingPersists; private final ShardSpec shardSpec; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final int persistThreadPriority; private final int mergeThreadPriority; private final boolean reportParseExceptions; @@ -125,6 +127,7 @@ public RealtimeTuningConfig( @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("shardSpec") ShardSpec shardSpec, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("persistThreadPriority") int persistThreadPriority, @@ -152,6 +155,8 @@ public RealtimeTuningConfig( this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + AppenderatorConfig.DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS : indexSpecForIntermediatePersists; this.mergeThreadPriority = mergeThreadPriority; this.persistThreadPriority = persistThreadPriority; this.reportParseExceptions = reportParseExceptions == null @@ -233,6 +238,13 @@ public IndexSpec getIndexSpec() return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + /** * Always returns true, doesn't affect the version being built. */ @@ -302,6 +314,7 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) maxPendingPersists, shardSpec, indexSpec, + indexSpecForIntermediatePersists, true, persistThreadPriority, mergeThreadPriority, @@ -326,6 +339,7 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir) maxPendingPersists, shardSpec, indexSpec, + indexSpecForIntermediatePersists, true, persistThreadPriority, mergeThreadPriority, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index 1e81792c4304..39b164497bd2 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -20,6 +20,7 @@ package org.apache.druid.segment.realtime.appenderator; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Period; @@ -28,6 +29,12 @@ public interface AppenderatorConfig { + IndexSpec DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS = new IndexSpec( + null, + CompressionStrategy.UNCOMPRESSED, + CompressionStrategy.NONE, + null); + boolean isReportParseExceptions(); /** @@ -67,6 +74,8 @@ default Long getMaxTotalRows() IndexSpec getIndexSpec(); + IndexSpec getIndexSpecForIntermediatePersists(); + File getBasePersistDirectory(); @Nullable diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 647340bbb502..136b24e84433 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -61,7 +61,6 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; -import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; @@ -1260,12 +1259,11 @@ private int persistHydrant(FireHydrant indexToPersist, SegmentIdWithShardSpec id final File persistedFile; final File persistDir = createPersistDirIfNeeded(identifier); - final IndexSpec indexSpec = tuningConfig.getIndexSpec(); persistedFile = indexMerger.persist( indexToPersist.getIndex(), identifier.getInterval(), new File(persistDir, String.valueOf(indexToPersist.getCount())), - indexSpec, + tuningConfig.getIndexSpecForIntermediatePersists(), tuningConfig.getSegmentWriteOutMediumFactory() ); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index 8e0ad2b999f5..7aa59f6d8ccd 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -53,7 +53,6 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; -import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.Metadata; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; @@ -955,14 +954,12 @@ protected int persistHydrant( try { int numRows = indexToPersist.getIndex().size(); - final IndexSpec indexSpec = config.getIndexSpec(); - indexToPersist.getIndex().getMetadata().putAll(metadataElems); final File persistedFile = indexMerger.persist( indexToPersist.getIndex(), interval, new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), - indexSpec, + config.getIndexSpecForIntermediatePersists(), config.getSegmentWriteOutMediumFactory() ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/org/apache/druid/segment/realtime/RealtimeManagerTest.java index 7df4c14984f3..25e6bf162e6b 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/RealtimeManagerTest.java @@ -204,6 +204,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws ParseExcept null, null, null, + null, 0, 0, null, @@ -269,6 +270,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws ParseExcept new LinearShardSpec(0), null, null, + null, 0, 0, null, @@ -290,6 +292,7 @@ public FirehoseV2 connect(InputRowParser parser, Object arg1) throws ParseExcept new LinearShardSpec(1), null, null, + null, 0, 0, null, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index 15a650b38ce0..ee79dc1f2286 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -72,6 +72,7 @@ public AppenderatorPlumberTest() throws Exception null, null, null, + null, true, 0, 0, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java index af706dd53405..0d6cac57cca4 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -160,6 +160,7 @@ public AppenderatorTester( null, null, null, + null, 0, 0, null, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index e02ca6d657bc..b92bd1210422 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -144,6 +144,7 @@ public int columnCacheSizeBytes() null, null, null, + null, 0, 0, null, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 4052cc678f4a..31790305808c 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -206,6 +206,7 @@ public void setUp() throws Exception null, null, null, + null, true, 0, 0, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java index de55360a61d2..c1a4066c20fa 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java @@ -75,6 +75,7 @@ public void testSwap() throws Exception null, null, null, + null, 0, 0, null, @@ -229,6 +230,7 @@ public void testDedup() throws Exception null, null, null, + null, 0, 0, null, diff --git a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java index 79260b639eba..24a33b915b90 100644 --- a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java @@ -167,6 +167,7 @@ public void testTaskValidator() throws Exception 1, NoneShardSpec.instance(), new IndexSpec(), + new IndexSpec(), null, 0, 0, From 055b1181896d13ab0b0dffa78b4b4f7986ebcdbb Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Wed, 19 Jun 2019 00:38:54 -0700 Subject: [PATCH 2/6] more changes and build fix --- .../materializedview/MaterializedViewSupervisorSpec.java | 1 + .../druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java | 3 +++ .../indexing/kinesis/supervisor/KinesisSupervisorSpec.java | 1 + .../kinesis/supervisor/KinesisSupervisorTuningConfig.java | 3 +++ .../apache/druid/indexing/kinesis/KinesisIndexTaskTest.java | 1 + .../indexing/kinesis/KinesisIndexTaskTuningConfigTest.java | 3 +++ .../indexing/kinesis/supervisor/KinesisSupervisorTest.java | 2 ++ .../kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java | 3 +++ .../java/org/apache/druid/indexer/HadoopIngestionSpec.java | 1 - 9 files changed, 17 insertions(+), 1 deletion(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index 4258fc9d9099..874ae6106513 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -179,6 +179,7 @@ public HadoopIndexTask createTask(Interval interval, String version, List Date: Thu, 4 Jul 2019 21:37:25 -0700 Subject: [PATCH 3/6] by default retain existing indexingSpec for intermediate persisted segments --- .../java/org/apache/druid/indexer/HadoopTuningConfig.java | 4 ++-- .../common/index/RealtimeAppenderatorTuningConfig.java | 2 +- .../org/apache/druid/indexing/common/task/IndexTask.java | 2 +- .../seekablestream/SeekableStreamIndexTaskTuningConfig.java | 2 +- .../apache/druid/segment/indexing/RealtimeTuningConfig.java | 4 ++-- .../segment/realtime/appenderator/AppenderatorConfig.java | 6 ------ 6 files changed, 7 insertions(+), 13 deletions(-) diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java index ca4b60231b68..c6974aa29a5a 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java @@ -56,7 +56,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig() DEFAULT_PARTITIONS_SPEC, DEFAULT_SHARD_SPECS, DEFAULT_INDEX_SPEC, - AppenderatorConfig.DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, + DEFAULT_INDEX_SPEC, DEFAULT_ROW_FLUSH_BOUNDARY, 0L, false, @@ -137,7 +137,7 @@ public HadoopTuningConfig( this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? - AppenderatorConfig.DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS : indexSpecForIntermediatePersists; + this.indexSpec : indexSpecForIntermediatePersists; this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null ? DEFAULT_ROW_FLUSH_BOUNDARY : maxRowsInMemoryCOMPAT : maxRowsInMemory; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index 9ccd6c81a6f6..1676a2fa5c97 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -109,7 +109,7 @@ public RealtimeAppenderatorTuningConfig( this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec; this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? - AppenderatorConfig.DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS : indexSpecForIntermediatePersists; + this.indexSpec : indexSpecForIntermediatePersists; this.reportParseExceptions = reportParseExceptions == null ? defaultReportParseExceptions : reportParseExceptions; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index b441d456d868..e766effb4125 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1389,7 +1389,7 @@ private IndexTuningConfig( this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? - AppenderatorConfig.DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS : indexSpecForIntermediatePersists; + this.indexSpec : indexSpecForIntermediatePersists; this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; this.forceGuaranteedRollup = forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup; this.reportParseExceptions = reportParseExceptions == null diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java index 48269d2f40a6..1d48ad9c8c80 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java @@ -99,7 +99,7 @@ public SeekableStreamIndexTaskTuningConfig( this.maxPendingPersists = maxPendingPersists == null ? 0 : maxPendingPersists; this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec; this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? - AppenderatorConfig.DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS : indexSpecForIntermediatePersists; + this.indexSpec : indexSpecForIntermediatePersists; this.reportParseExceptions = reportParseExceptions == null ? defaults.isReportParseExceptions() : reportParseExceptions; diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java index 56f1dc970fcf..84edad165068 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java @@ -82,7 +82,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File defaultMaxPendingPersists, defaultShardSpec, defaultIndexSpec, - AppenderatorConfig.DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS, + defaultIndexSpec, true, 0, 0, @@ -156,7 +156,7 @@ public RealtimeTuningConfig( this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec; this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? - AppenderatorConfig.DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS : indexSpecForIntermediatePersists; + this.indexSpec : indexSpecForIntermediatePersists; this.mergeThreadPriority = mergeThreadPriority; this.persistThreadPriority = persistThreadPriority; this.reportParseExceptions = reportParseExceptions == null diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index 39b164497bd2..8903430bdf7a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -29,12 +29,6 @@ public interface AppenderatorConfig { - IndexSpec DEFAULT_INDEX_SPEC_FOR_INTERMEDIATE_PERSISTS = new IndexSpec( - null, - CompressionStrategy.UNCOMPRESSED, - CompressionStrategy.NONE, - null); - boolean isReportParseExceptions(); /** From 41564c4aada0d09290da867a7d1b4aee5fc9bb00 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 4 Jul 2019 22:52:30 -0700 Subject: [PATCH 4/6] document indexSpecForIntermediatePersists index tuning config --- .../development/extensions-core/kafka-ingestion.md | 3 ++- .../development/extensions-core/kinesis-ingestion.md | 3 ++- docs/content/ingestion/hadoop.md | 3 ++- docs/content/ingestion/native_tasks.md | 10 ++++++++++ 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index c070e466517a..ec1d046ee31b 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -139,7 +139,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)| |`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)| |`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)| -|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| +|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)| |`reportParseExceptions`|Boolean|*DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|no (default == false)| |`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)| |`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)| diff --git a/docs/content/development/extensions-core/kinesis-ingestion.md b/docs/content/development/extensions-core/kinesis-ingestion.md index 0578dd23b3b1..8c2ac335a1d7 100644 --- a/docs/content/development/extensions-core/kinesis-ingestion.md +++ b/docs/content/development/extensions-core/kinesis-ingestion.md @@ -135,7 +135,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)| |`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)| |`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)| -|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| +|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)| |`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)| |`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)| |`resetOffsetAutomatically`|Boolean|Whether to reset the consumer sequence numbers if the next sequence number that it is trying to fetch is less than the earliest available sequence number for that particular shard. The sequence number will be reset to either the earliest or latest sequence number depending on `useEarliestOffset` property of `KinesisSupervisorIOConfig` (see below). This situation typically occurs when messages in Kinesis are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular shard will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)| diff --git a/docs/content/ingestion/hadoop.md b/docs/content/ingestion/hadoop.md index b9a6d72e7989..1c0bfbc58c62 100644 --- a/docs/content/ingestion/hadoop.md +++ b/docs/content/ingestion/hadoop.md @@ -192,7 +192,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |combineText|Boolean|Use CombineTextInputFormat to combine multiple files into a file split. This can speed up Hadoop jobs when processing a large number of small files.|no (default == false)| |useCombiner|Boolean|Use Hadoop combiner to merge rows at mapper if possible.|no (default == false)| |jobProperties|Object|A map of properties to add to the Hadoop job configuration, see below for details.|no (default == null)| -|indexSpec|Object|Tune how data is indexed. See below for more information.|no| +|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)| |numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)| |forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Hash-based partitioning always uses an extendable shardSpec. For single-dimension partitioning, this option should be set to true to use an extendable shardSpec. For partitioning, please check [Partitioning specification](#partitioning-specification). This option can be useful when you need to append more data to existing dataSource.|no (default = false)| |useExplicitVersion|Boolean|Forces HadoopIndexTask to use version.|no (default = false)| diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md index ad7cac91359f..c5cd91bbec40 100644 --- a/docs/content/ingestion/native_tasks.md +++ b/docs/content/ingestion/native_tasks.md @@ -185,6 +185,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no| |numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as indexSpec|no| |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| |pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no| @@ -375,6 +376,14 @@ An example of the result is "metricCompression": "lz4", "longEncoding": "longs" }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "concise" + }, + "dimensionCompression": "lz4", + "metricCompression": "lz4", + "longEncoding": "longs" + }, "maxPendingPersists": 0, "reportParseExceptions": false, "pushTimeout": 0, @@ -555,6 +564,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored otherwise.|null|no| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as indexSpec|no| |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| |forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](../ingestion/index.html#roll-up-modes). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments. Note that the result segments would be hash-partitioned. You can set `forceExtendableShardSpecs` if you plan to append more data to the same time range in the future. This flag cannot be used with `appendToExisting` of IOConfig. For more details, see the below __Segment pushing modes__ section.|false|no| |reportParseExceptions|DEPRECATED. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|false|no| From 60912289e3ca0956c97b09e9e8315084d3bc2842 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Thu, 4 Jul 2019 23:37:28 -0700 Subject: [PATCH 5/6] fix build issues --- .../main/java/org/apache/druid/indexer/HadoopTuningConfig.java | 1 - .../druid/segment/realtime/appenderator/AppenderatorConfig.java | 1 - 2 files changed, 2 deletions(-) diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java index c6974aa29a5a..e61f912e9e8e 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/HadoopTuningConfig.java @@ -30,7 +30,6 @@ import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.indexing.TuningConfig; -import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig; import javax.annotation.Nullable; import java.util.List; diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index 8903430bdf7a..2889a988a4d7 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -20,7 +20,6 @@ package org.apache.druid.segment.realtime.appenderator; import org.apache.druid.segment.IndexSpec; -import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; import org.joda.time.Period; From 39b3c6a659ea4148ccf8feb72f4e5a8078b2028b Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Mon, 8 Jul 2019 13:01:32 -0700 Subject: [PATCH 6/6] update serde tests --- .../indexing/kafka/KafkaIndexTaskTuningConfigTest.java | 8 +++++++- .../supervisor/KafkaSupervisorTuningConfigTest.java | 8 +++++++- .../apache/druid/indexer/HadoopTuningConfigTest.java | 1 + .../task/batch/parallel/ParallelIndexTuningConfig.java | 1 - .../segment/indexing/RealtimeTuningConfigTest.java | 10 ++++++++-- 5 files changed, 23 insertions(+), 5 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index 4af6e01082c8..57b4f372cf83 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -25,6 +25,7 @@ import org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.indexing.TuningConfig; import org.joda.time.Period; import org.junit.Assert; @@ -65,6 +66,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(false, config.isReportParseExceptions()); Assert.assertEquals(0, config.getHandoffConditionTimeout()); } @@ -81,7 +83,9 @@ public void testSerdeWithNonDefaults() throws Exception + " \"intermediatePersistPeriod\": \"PT1H\",\n" + " \"maxPendingPersists\": 100,\n" + " \"reportParseExceptions\": true,\n" - + " \"handoffConditionTimeout\": 100\n" + + " \"handoffConditionTimeout\": 100,\n" + + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" + + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n" + "}"; KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue( @@ -103,6 +107,8 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(100, config.getMaxPendingPersists()); Assert.assertEquals(true, config.isReportParseExceptions()); Assert.assertEquals(100, config.getHandoffConditionTimeout()); + Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists()); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java index 3312a10aa944..5859f9035b06 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java @@ -24,6 +24,7 @@ import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.segment.indexing.TuningConfig; import org.joda.time.Duration; import org.joda.time.Period; @@ -63,6 +64,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(false, config.isReportParseExceptions()); Assert.assertEquals(0, config.getHandoffConditionTimeout()); Assert.assertNull(config.getWorkerThreads()); @@ -90,7 +92,9 @@ public void testSerdeWithNonDefaults() throws Exception + " \"chatRetries\": 14,\n" + " \"httpTimeout\": \"PT15S\",\n" + " \"shutdownTimeout\": \"PT95S\",\n" - + " \"offsetFetchPeriod\": \"PT20S\"\n" + + " \"offsetFetchPeriod\": \"PT20S\",\n" + + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" + + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n" + "}"; KafkaSupervisorTuningConfig config = (KafkaSupervisorTuningConfig) mapper.readValue( @@ -116,6 +120,8 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(Duration.standardSeconds(15), config.getHttpTimeout()); Assert.assertEquals(Duration.standardSeconds(95), config.getShutdownTimeout()); Assert.assertEquals(Duration.standardSeconds(20), config.getOffsetFetchPeriod()); + Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists()); } } diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java index 0c52ab533a13..ef29cb738f85 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java @@ -71,6 +71,7 @@ public void testSerde() throws Exception Assert.assertNotNull(actual.getPartitionsSpec()); Assert.assertEquals(ImmutableMap.>of(), actual.getShardSpecs()); Assert.assertEquals(new IndexSpec(), actual.getIndexSpec()); + Assert.assertEquals(new IndexSpec(), actual.getIndexSpecForIntermediatePersists()); Assert.assertEquals(100, actual.getRowFlushBoundary()); Assert.assertEquals(true, actual.isLeaveIntermediate()); Assert.assertEquals(true, actual.isCleanupOnFailure()); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index c432bb1d4301..9c480d26550e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -191,7 +191,6 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash( super.hashCode(), maxNumSubTasks, diff --git a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java index cf57072fe1de..373c4975242d 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.joda.time.Period; import org.junit.Assert; @@ -87,6 +88,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(0, config.getHandoffConditionTimeout()); Assert.assertEquals(0, config.getAlertTimeout()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(new NumberedShardSpec(0, 1), config.getShardSpec()); Assert.assertEquals(0, config.getMaxPendingPersists()); @@ -111,7 +113,9 @@ public void testSerdeWithNonDefaults() throws Exception + " \"mergeThreadPriority\": 100,\n" + " \"reportParseExceptions\": true,\n" + " \"handoffConditionTimeout\": 100,\n" - + " \"alertTimeout\": 70\n" + + " \"alertTimeout\": 70,\n" + + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" + + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n" + "}"; ObjectMapper mapper = TestHelper.makeJsonMapper(); @@ -128,7 +132,6 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString()); Assert.assertEquals(100, config.getHandoffConditionTimeout()); Assert.assertEquals(70, config.getAlertTimeout()); - Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); Assert.assertEquals(new NumberedShardSpec(0, 1), config.getShardSpec()); Assert.assertEquals(100, config.getMaxPendingPersists()); @@ -137,5 +140,8 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(100, config.getPersistThreadPriority()); Assert.assertEquals(new Period("PT1H"), config.getWindowPeriod()); Assert.assertEquals(true, config.isReportParseExceptions()); + Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists()); + } }