diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index c070e466517a..ec1d046ee31b 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -139,7 +139,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)| |`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)| |`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)| -|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| +|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)| |`reportParseExceptions`|Boolean|*DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|no (default == false)| |`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)| |`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)| diff --git a/docs/content/development/extensions-core/kinesis-ingestion.md b/docs/content/development/extensions-core/kinesis-ingestion.md index 0578dd23b3b1..8c2ac335a1d7 100644 --- a/docs/content/development/extensions-core/kinesis-ingestion.md +++ b/docs/content/development/extensions-core/kinesis-ingestion.md @@ -135,7 +135,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |`maxTotalRows`|Long|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)| |`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)| |`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)| -|`indexSpec`|Object|Tune how data is indexed, see 'IndexSpec' below for more details.|no| +|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)| |`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)| |`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)| |`resetOffsetAutomatically`|Boolean|Whether to reset the consumer sequence numbers if the next sequence number that it is trying to fetch is less than the earliest available sequence number for that particular shard. The sequence number will be reset to either the earliest or latest sequence number depending on `useEarliestOffset` property of `KinesisSupervisorIOConfig` (see below). This situation typically occurs when messages in Kinesis are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular shard will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)| diff --git a/docs/content/ingestion/hadoop.md b/docs/content/ingestion/hadoop.md index b9a6d72e7989..1c0bfbc58c62 100644 --- a/docs/content/ingestion/hadoop.md +++ b/docs/content/ingestion/hadoop.md @@ -192,7 +192,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |combineText|Boolean|Use CombineTextInputFormat to combine multiple files into a file split. This can speed up Hadoop jobs when processing a large number of small files.|no (default == false)| |useCombiner|Boolean|Use Hadoop combiner to merge rows at mapper if possible.|no (default == false)| |jobProperties|Object|A map of properties to add to the Hadoop job configuration, see below for details.|no (default == null)| -|indexSpec|Object|Tune how data is indexed. See below for more information.|no| +|indexSpec|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for more information.|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|no (default = same as indexSpec)| |numBackgroundPersistThreads|Integer|The number of new background threads to use for incremental persists. Using this feature causes a notable increase in memory pressure and cpu usage but will make the job finish more quickly. If changing from the default of 0 (use current thread for persists), we recommend setting it to 1.|no (default == 0)| |forceExtendableShardSpecs|Boolean|Forces use of extendable shardSpecs. Hash-based partitioning always uses an extendable shardSpec. For single-dimension partitioning, this option should be set to true to use an extendable shardSpec. For partitioning, please check [Partitioning specification](#partitioning-specification). This option can be useful when you need to append more data to existing dataSource.|no (default = false)| |useExplicitVersion|Boolean|Forces HadoopIndexTask to use version.|no (default = false)| diff --git a/docs/content/ingestion/native_tasks.md b/docs/content/ingestion/native_tasks.md index ad7cac91359f..c5cd91bbec40 100644 --- a/docs/content/ingestion/native_tasks.md +++ b/docs/content/ingestion/native_tasks.md @@ -185,6 +185,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |maxTotalRows|Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no| |numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as indexSpec|no| |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| |reportParseExceptions|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|false|no| |pushTimeout|Milliseconds to wait for pushing segments. It must be >= 0, where 0 means to wait forever.|0|no| @@ -375,6 +376,14 @@ An example of the result is "metricCompression": "lz4", "longEncoding": "longs" }, + "indexSpecForIntermediatePersists": { + "bitmap": { + "type": "concise" + }, + "dimensionCompression": "lz4", + "metricCompression": "lz4", + "longEncoding": "longs" + }, "maxPendingPersists": 0, "reportParseExceptions": false, "pushTimeout": 0, @@ -555,6 +564,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if maxRowsPerSegment is set.|null|no| |partitionDimensions|The dimensions to partition on. Leave blank to select all dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored otherwise.|null|no| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| +|indexSpecForIntermediatePersists|defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. this can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. however, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values.|same as indexSpec|no| |maxPendingPersists|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|0 (meaning one persist can be running concurrently with ingestion, and none can be queued up)|no| |forceGuaranteedRollup|Forces guaranteeing the [perfect rollup](../ingestion/index.html#roll-up-modes). The perfect rollup optimizes the total size of generated segments and querying time while indexing time will be increased. If this is set to true, the index task will read the entire input data twice: one for finding the optimal number of partitions per time chunk and one for generating segments. Note that the result segments would be hash-partitioned. You can set `forceExtendableShardSpecs` if you plan to append more data to the same time range in the future. This flag cannot be used with `appendToExisting` of IOConfig. For more details, see the below __Segment pushing modes__ section.|false|no| |reportParseExceptions|DEPRECATED. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1.|false|no| diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java index 4258fc9d9099..874ae6106513 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpec.java @@ -179,6 +179,7 @@ public HadoopIndexTask createTask(Interval interval, String version, List> shardSpecs; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final int rowFlushBoundary; private final long maxBytesInMemory; private final boolean leaveIntermediate; @@ -105,6 +107,7 @@ public HadoopTuningConfig( final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec, final @JsonProperty("shardSpecs") Map> shardSpecs, final @JsonProperty("indexSpec") IndexSpec indexSpec, + final @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, final @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, final @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, final @JsonProperty("leaveIntermediate") boolean leaveIntermediate, @@ -132,6 +135,8 @@ public HadoopTuningConfig( this.partitionsSpec = partitionsSpec == null ? DEFAULT_PARTITIONS_SPEC : partitionsSpec; this.shardSpecs = shardSpecs == null ? DEFAULT_SHARD_SPECS : shardSpecs; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null ? DEFAULT_ROW_FLUSH_BOUNDARY : maxRowsInMemoryCOMPAT : maxRowsInMemory; @@ -199,6 +204,12 @@ public IndexSpec getIndexSpec() return indexSpec; } + @JsonProperty + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + @JsonProperty("maxRowsInMemory") public int getRowFlushBoundary() { @@ -314,6 +325,7 @@ public HadoopTuningConfig withWorkingPath(String path) partitionsSpec, shardSpecs, indexSpec, + indexSpecForIntermediatePersists, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, @@ -343,6 +355,7 @@ public HadoopTuningConfig withVersion(String ver) partitionsSpec, shardSpecs, indexSpec, + indexSpecForIntermediatePersists, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, @@ -372,6 +385,7 @@ public HadoopTuningConfig withShardSpecs(Map> specs partitionsSpec, specs, indexSpec, + indexSpecForIntermediatePersists, rowFlushBoundary, maxBytesInMemory, leaveIntermediate, diff --git a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java index 6c032b06e7bb..f4cad315270a 100644 --- a/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/org/apache/druid/indexer/IndexGeneratorJob.java @@ -605,7 +605,7 @@ private File persist( ) throws IOException { return HadoopDruidIndexerConfig.INDEX_MERGER_V9 - .persist(index, interval, file, config.getIndexSpec(), progressIndicator, null); + .persist(index, interval, file, config.getIndexSpecForIntermediatePersists(), progressIndicator, null); } protected File mergeQueryableIndex( diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java index 3fcf50ad989c..d0d7180a6e4a 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/BatchDeltaIngestionTest.java @@ -462,6 +462,7 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig( null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java index 59453fce8821..26498746c217 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -199,6 +199,7 @@ public DetermineHashedPartitionsJobTest( null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java index e9265bcabf84..3d8b06bab777 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/DeterminePartitionsJobTest.java @@ -262,6 +262,7 @@ public DeterminePartitionsJobTest( null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java index 7573aeaa4d3a..9c723ac9abd9 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -86,6 +86,7 @@ public void testHashedBucketSelection() null, null, null, + null, false, false, false, @@ -164,6 +165,7 @@ public void testNoneShardSpecBucketSelection() null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java index 25535832054d..ef29cb738f85 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/HadoopTuningConfigTest.java @@ -43,6 +43,7 @@ public void testSerde() throws Exception null, null, null, + null, 100, null, true, @@ -70,6 +71,7 @@ public void testSerde() throws Exception Assert.assertNotNull(actual.getPartitionsSpec()); Assert.assertEquals(ImmutableMap.>of(), actual.getShardSpecs()); Assert.assertEquals(new IndexSpec(), actual.getIndexSpec()); + Assert.assertEquals(new IndexSpec(), actual.getIndexSpecForIntermediatePersists()); Assert.assertEquals(100, actual.getRowFlushBoundary()); Assert.assertEquals(true, actual.isLeaveIntermediate()); Assert.assertEquals(true, actual.isCleanupOnFailure()); diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java index 3925bcd730c9..a7757ac30340 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/IndexGeneratorJobTest.java @@ -521,6 +521,7 @@ public void setUp() throws Exception null, null, null, + null, maxRowsInMemory, maxBytesInMemory, true, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java index 3969e126a0b1..ff5aed625eaf 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/JobHelperTest.java @@ -114,6 +114,7 @@ public void setup() throws Exception null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java index 7f2ce22f4019..4da9ee41b77d 100644 --- a/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/org/apache/druid/indexer/path/GranularityPathSpecTest.java @@ -62,6 +62,7 @@ public class GranularityPathSpecTest null, null, null, + null, false, false, false, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index 9d2bed80ead6..1676a2fa5c97 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -63,6 +63,7 @@ private static File createNewBasePersistDirectory() private final int maxPendingPersists; private final ShardSpec shardSpec; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final boolean reportParseExceptions; private final long publishAndHandoffTimeout; private final long alertTimeout; @@ -84,6 +85,7 @@ public RealtimeAppenderatorTuningConfig( @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("shardSpec") ShardSpec shardSpec, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("reportParseExceptions") Boolean reportParseExceptions, @JsonProperty("publishAndHandoffTimeout") Long publishAndHandoffTimeout, @JsonProperty("alertTimeout") Long alertTimeout, @@ -106,6 +108,8 @@ public RealtimeAppenderatorTuningConfig( this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.reportParseExceptions = reportParseExceptions == null ? defaultReportParseExceptions : reportParseExceptions; @@ -196,6 +200,13 @@ public IndexSpec getIndexSpec() return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + @Override @JsonProperty public boolean isReportParseExceptions() @@ -253,6 +264,7 @@ public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir) maxPendingPersists, shardSpec, indexSpec, + indexSpecForIntermediatePersists, reportParseExceptions, publishAndHandoffTimeout, alertTimeout, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java index 6e0e1d861d16..f872042049ae 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/YeOldePlumberSchool.java @@ -236,7 +236,7 @@ private void spillIfSwappable() indexMergerV9.persist( indexToPersist.getIndex(), dirToPersist, - config.getIndexSpec(), + config.getIndexSpecForIntermediatePersists(), config.getSegmentWriteOutMediumFactory() ); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 27df20a6a476..e766effb4125 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1270,6 +1270,7 @@ public static class IndexTuningConfig implements TuningConfig, AppenderatorConfi private final Integer numShards; private final List partitionDimensions; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final File basePersistDirectory; private final int maxPendingPersists; @@ -1305,6 +1306,7 @@ public IndexTuningConfig( @JsonProperty("numShards") @Nullable Integer numShards, @JsonProperty("partitionDimensions") @Nullable List partitionDimensions, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, // This parameter is left for compatibility when reading existing JSONs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @@ -1327,6 +1329,7 @@ public IndexTuningConfig( numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1346,7 +1349,7 @@ public IndexTuningConfig( private IndexTuningConfig() { - this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); } private IndexTuningConfig( @@ -1357,6 +1360,7 @@ private IndexTuningConfig( @Nullable Integer numShards, @Nullable List partitionDimensions, @Nullable IndexSpec indexSpec, + @Nullable IndexSpec indexSpecForIntermediatePersists, @Nullable Integer maxPendingPersists, @Nullable Boolean forceGuaranteedRollup, @Nullable Boolean reportParseExceptions, @@ -1384,6 +1388,8 @@ private IndexTuningConfig( this.numShards = numShards == null || numShards.equals(-1) ? null : numShards; this.partitionDimensions = partitionDimensions == null ? Collections.emptyList() : partitionDimensions; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.maxPendingPersists = maxPendingPersists == null ? DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists; this.forceGuaranteedRollup = forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup; this.reportParseExceptions = reportParseExceptions == null @@ -1420,6 +1426,7 @@ public IndexTuningConfig withBasePersistDirectory(File dir) numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1442,6 +1449,7 @@ public IndexTuningConfig withMaxTotalRows(Long maxTotalRows) numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1464,6 +1472,7 @@ public IndexTuningConfig withMaxRowsPerSegment(int maxRowsPerSegment) numShards, partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, @@ -1533,6 +1542,13 @@ public IndexSpec getIndexSpec() return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + @Override public File getBasePersistDirectory() { @@ -1618,19 +1634,22 @@ public boolean equals(Object o) } IndexTuningConfig that = (IndexTuningConfig) o; return maxRowsInMemory == that.maxRowsInMemory && - Objects.equals(maxTotalRows, that.maxTotalRows) && + maxBytesInMemory == that.maxBytesInMemory && maxPendingPersists == that.maxPendingPersists && forceGuaranteedRollup == that.forceGuaranteedRollup && reportParseExceptions == that.reportParseExceptions && pushTimeout == that.pushTimeout && + logParseExceptions == that.logParseExceptions && + maxParseExceptions == that.maxParseExceptions && + maxSavedParseExceptions == that.maxSavedParseExceptions && Objects.equals(maxRowsPerSegment, that.maxRowsPerSegment) && + Objects.equals(maxTotalRows, that.maxTotalRows) && Objects.equals(numShards, that.numShards) && + Objects.equals(partitionDimensions, that.partitionDimensions) && Objects.equals(indexSpec, that.indexSpec) && + Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && - Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && - logParseExceptions == that.logParseExceptions && - maxParseExceptions == that.maxParseExceptions && - maxSavedParseExceptions == that.maxSavedParseExceptions; + Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory); } @Override @@ -1639,18 +1658,21 @@ public int hashCode() return Objects.hash( maxRowsPerSegment, maxRowsInMemory, + maxBytesInMemory, maxTotalRows, numShards, + partitionDimensions, indexSpec, + indexSpecForIntermediatePersists, basePersistDirectory, maxPendingPersists, forceGuaranteedRollup, reportParseExceptions, pushTimeout, - segmentWriteOutMediumFactory, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + segmentWriteOutMediumFactory ); } @@ -1663,7 +1685,9 @@ public String toString() ", maxBytesInMemory=" + maxBytesInMemory + ", maxTotalRows=" + maxTotalRows + ", numShards=" + numShards + + ", partitionDimensions=" + partitionDimensions + ", indexSpec=" + indexSpec + + ", indexSpecForIntermediatePersists=" + indexSpecForIntermediatePersists + ", basePersistDirectory=" + basePersistDirectory + ", maxPendingPersists=" + maxPendingPersists + ", forceGuaranteedRollup=" + forceGuaranteedRollup + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 2a29726965c4..924be846d9c9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -333,6 +333,7 @@ private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningC tuningConfig.getNumShards(), null, tuningConfig.getIndexSpec(), + tuningConfig.getIndexSpecForIntermediatePersists(), tuningConfig.getMaxPendingPersists(), true, false, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index 2c415492277b..9c480d26550e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -71,6 +71,7 @@ public static ParallelIndexTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -84,6 +85,7 @@ public ParallelIndexTuningConfig( @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, @JsonProperty("numShards") @Nullable Integer numShards, @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, @JsonProperty("forceGuaranteedRollup") @Nullable Boolean forceGuaranteedRollup, @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, @@ -109,6 +111,7 @@ public ParallelIndexTuningConfig( numShards, null, indexSpec, + indexSpecForIntermediatePersists, maxPendingPersists, null, forceGuaranteedRollup, @@ -188,7 +191,6 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash( super.hashCode(), maxNumSubTasks, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java index b594e42a66cc..1d48ad9c8c80 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java @@ -47,6 +47,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements TuningConfi @Deprecated private final int maxPendingPersists; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final boolean reportParseExceptions; private final long handoffConditionTimeout; private final boolean resetOffsetAutomatically; @@ -68,6 +69,7 @@ public SeekableStreamIndexTaskTuningConfig( @Nullable File basePersistDirectory, @Nullable Integer maxPendingPersists, @Nullable IndexSpec indexSpec, + @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @Deprecated @JsonProperty("buildV9Directly") @Nullable Boolean buildV9Directly, @Deprecated @Nullable Boolean reportParseExceptions, @@ -96,6 +98,8 @@ public SeekableStreamIndexTaskTuningConfig( this.basePersistDirectory = defaults.getBasePersistDirectory(); this.maxPendingPersists = maxPendingPersists == null ? 0 : maxPendingPersists; this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.reportParseExceptions = reportParseExceptions == null ? defaults.isReportParseExceptions() : reportParseExceptions; @@ -187,6 +191,13 @@ public IndexSpec getIndexSpec() return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + /** * Always returns true, doesn't affect the version being built. */ @@ -281,6 +292,7 @@ public boolean equals(Object o) Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && Objects.equals(indexSpec, that.indexSpec) && + Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && Objects.equals(segmentWriteOutMediumFactory, that.segmentWriteOutMediumFactory) && Objects.equals(intermediateHandoffPeriod, that.intermediateHandoffPeriod); } @@ -297,6 +309,7 @@ public int hashCode() basePersistDirectory, maxPendingPersists, indexSpec, + indexSpecForIntermediatePersists, reportParseExceptions, handoffConditionTimeout, resetOffsetAutomatically, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index c1d682e1e280..d675069b1022 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1416,6 +1416,7 @@ private AppenderatorDriverRealtimeIndexTask makeRealtimeTask( null, null, null, + null, reportParseExceptions, handoffTimeout, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 69f10f6be400..bb6993848c94 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -288,6 +288,7 @@ private static IndexTuningConfig createTuningConfig() CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -462,6 +463,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -522,6 +524,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException, Segm CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -582,6 +585,7 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException, Segment CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -846,6 +850,7 @@ public void testTargetPartitionSizeWithPartitionConfig() throws IOException, Seg CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, @@ -1034,6 +1039,7 @@ private static void assertIngestionSchema( CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + null, 5000, true, true, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 97279f94b643..4bae4c8ed75b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -1004,6 +1004,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception null, indexSpec, null, + null, true, true, false, @@ -1126,6 +1127,7 @@ public void testMultipleParseExceptionsFailure() throws Exception null, indexSpec, null, + null, true, false, false, @@ -1241,6 +1243,7 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc null, indexSpec, null, + null, true, true, false, @@ -1706,6 +1709,7 @@ static IndexTuningConfig createTuningConfig( partitionDimensions, indexSpec, null, + null, true, forceGuaranteedRollup, reportParseException, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 6901543de1c4..daeeaab63bbf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -835,6 +835,7 @@ private RealtimeIndexTask makeRealtimeTask( null, null, null, + null, true, 0, 0, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index a09dd65d95ab..a58b2951e008 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -202,6 +202,7 @@ public void testIndexTaskSerde() throws Exception null, null, indexSpec, + null, 3, true, false, @@ -284,6 +285,7 @@ public void testIndexTaskwithResourceSerde() throws Exception null, null, indexSpec, + null, 3, true, false, @@ -393,6 +395,7 @@ public void testRealtimeIndexTaskSerde() throws Exception NoneShardSpec.instance(), indexSpec, null, + null, 0, 0, true, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index 6f7c3b9e713d..c7c8d0b897e3 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -188,6 +188,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, numTotalSubTasks, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 704c5fa4500b..04aa5a7a1b07 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -430,6 +430,7 @@ private TestSupervisorTask newTask( null, null, null, + null, NUM_SUB_TASKS, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index dd9096310d86..0fa747f6604f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -138,6 +138,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, 2, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index cfd02ac18cee..cf7dd372bfdd 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -250,6 +250,7 @@ public void testWith1MaxNumSubTasks() throws Exception null, null, null, + null, 1, null, null, @@ -290,6 +291,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, 2, null, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java index d587c6c0591c..2d393020b3a2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java @@ -71,6 +71,7 @@ public void testSerdeWithMaxRowsPerSegment() CompressionStrategy.LZF, LongEncodingStrategy.LONGS ), + new IndexSpec(), 1, false, true, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 6bdfbe4371b9..d8c0af320a9e 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -690,6 +690,7 @@ public void testIndexTask() throws Exception null, null, indexSpec, + null, 3, true, false, @@ -771,6 +772,7 @@ public void testIndexTaskFailure() throws Exception null, null, indexSpec, + null, 3, true, false, @@ -1160,6 +1162,7 @@ public void testResumeTasks() throws Exception null, indexSpec, null, + null, false, null, null, @@ -1290,6 +1293,7 @@ private RealtimeIndexTask newRealtimeIndexTask() null, null, null, + null, 0, 0, null, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index ae55fffc637e..6381d0fb1f5c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -661,6 +661,7 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() null, null, null, + null, null ) { diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java index 43be6e080a45..84edad165068 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java @@ -82,6 +82,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File defaultMaxPendingPersists, defaultShardSpec, defaultIndexSpec, + defaultIndexSpec, true, 0, 0, @@ -103,6 +104,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File private final int maxPendingPersists; private final ShardSpec shardSpec; private final IndexSpec indexSpec; + private final IndexSpec indexSpecForIntermediatePersists; private final int persistThreadPriority; private final int mergeThreadPriority; private final boolean reportParseExceptions; @@ -125,6 +127,7 @@ public RealtimeTuningConfig( @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @JsonProperty("shardSpec") ShardSpec shardSpec, @JsonProperty("indexSpec") IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, // This parameter is left for compatibility when reading existing configs, to be removed in Druid 0.12. @JsonProperty("buildV9Directly") Boolean buildV9Directly, @JsonProperty("persistThreadPriority") int persistThreadPriority, @@ -152,6 +155,8 @@ public RealtimeTuningConfig( this.maxPendingPersists = maxPendingPersists == null ? defaultMaxPendingPersists : maxPendingPersists; this.shardSpec = shardSpec == null ? defaultShardSpec : shardSpec; this.indexSpec = indexSpec == null ? defaultIndexSpec : indexSpec; + this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists == null ? + this.indexSpec : indexSpecForIntermediatePersists; this.mergeThreadPriority = mergeThreadPriority; this.persistThreadPriority = persistThreadPriority; this.reportParseExceptions = reportParseExceptions == null @@ -233,6 +238,13 @@ public IndexSpec getIndexSpec() return indexSpec; } + @JsonProperty + @Override + public IndexSpec getIndexSpecForIntermediatePersists() + { + return indexSpecForIntermediatePersists; + } + /** * Always returns true, doesn't affect the version being built. */ @@ -302,6 +314,7 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) maxPendingPersists, shardSpec, indexSpec, + indexSpecForIntermediatePersists, true, persistThreadPriority, mergeThreadPriority, @@ -326,6 +339,7 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir) maxPendingPersists, shardSpec, indexSpec, + indexSpecForIntermediatePersists, true, persistThreadPriority, mergeThreadPriority, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index 1e81792c4304..2889a988a4d7 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -67,6 +67,8 @@ default Long getMaxTotalRows() IndexSpec getIndexSpec(); + IndexSpec getIndexSpecForIntermediatePersists(); + File getBasePersistDirectory(); @Nullable diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 647340bbb502..136b24e84433 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -61,7 +61,6 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; -import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; @@ -1260,12 +1259,11 @@ private int persistHydrant(FireHydrant indexToPersist, SegmentIdWithShardSpec id final File persistedFile; final File persistDir = createPersistDirIfNeeded(identifier); - final IndexSpec indexSpec = tuningConfig.getIndexSpec(); persistedFile = indexMerger.persist( indexToPersist.getIndex(), identifier.getInterval(), new File(persistDir, String.valueOf(indexToPersist.getCount())), - indexSpec, + tuningConfig.getIndexSpecForIntermediatePersists(), tuningConfig.getSegmentWriteOutMediumFactory() ); diff --git a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java index 8e0ad2b999f5..7aa59f6d8ccd 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/plumber/RealtimePlumber.java @@ -53,7 +53,6 @@ import org.apache.druid.query.SegmentDescriptor; import org.apache.druid.segment.IndexIO; import org.apache.druid.segment.IndexMerger; -import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.Metadata; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; @@ -955,14 +954,12 @@ protected int persistHydrant( try { int numRows = indexToPersist.getIndex().size(); - final IndexSpec indexSpec = config.getIndexSpec(); - indexToPersist.getIndex().getMetadata().putAll(metadataElems); final File persistedFile = indexMerger.persist( indexToPersist.getIndex(), interval, new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), - indexSpec, + config.getIndexSpecForIntermediatePersists(), config.getSegmentWriteOutMediumFactory() ); diff --git a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java index cf57072fe1de..373c4975242d 100644 --- a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java +++ b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.data.CompressionStrategy; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.joda.time.Period; import org.junit.Assert; @@ -87,6 +88,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(0, config.getHandoffConditionTimeout()); Assert.assertEquals(0, config.getAlertTimeout()); Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(new NumberedShardSpec(0, 1), config.getShardSpec()); Assert.assertEquals(0, config.getMaxPendingPersists()); @@ -111,7 +113,9 @@ public void testSerdeWithNonDefaults() throws Exception + " \"mergeThreadPriority\": 100,\n" + " \"reportParseExceptions\": true,\n" + " \"handoffConditionTimeout\": 100,\n" - + " \"alertTimeout\": 70\n" + + " \"alertTimeout\": 70,\n" + + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" + + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" }\n" + "}"; ObjectMapper mapper = TestHelper.makeJsonMapper(); @@ -128,7 +132,6 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals("/tmp/xxx", config.getBasePersistDirectory().toString()); Assert.assertEquals(100, config.getHandoffConditionTimeout()); Assert.assertEquals(70, config.getAlertTimeout()); - Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); Assert.assertEquals(new Period("PT1H"), config.getIntermediatePersistPeriod()); Assert.assertEquals(new NumberedShardSpec(0, 1), config.getShardSpec()); Assert.assertEquals(100, config.getMaxPendingPersists()); @@ -137,5 +140,8 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(100, config.getPersistThreadPriority()); Assert.assertEquals(new Period("PT1H"), config.getWindowPeriod()); Assert.assertEquals(true, config.isReportParseExceptions()); + Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec()); + Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists()); + } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index 15a650b38ce0..ee79dc1f2286 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -72,6 +72,7 @@ public AppenderatorPlumberTest() throws Exception null, null, null, + null, true, 0, 0, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java index af706dd53405..0d6cac57cca4 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -160,6 +160,7 @@ public AppenderatorTester( null, null, null, + null, 0, 0, null, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index e02ca6d657bc..b92bd1210422 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -144,6 +144,7 @@ public int columnCacheSizeBytes() null, null, null, + null, 0, 0, null, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 4052cc678f4a..31790305808c 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -206,6 +206,7 @@ public void setUp() throws Exception null, null, null, + null, true, 0, 0, diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java index de55360a61d2..c1a4066c20fa 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java @@ -75,6 +75,7 @@ public void testSwap() throws Exception null, null, null, + null, 0, 0, null, @@ -229,6 +230,7 @@ public void testDedup() throws Exception null, null, null, + null, 0, 0, null, diff --git a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java index f9159fbcadb3..bb759b6ddf7c 100644 --- a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java @@ -166,6 +166,7 @@ public void testTaskValidator() throws Exception 1, NoneShardSpec.instance(), new IndexSpec(), + new IndexSpec(), null, 0, 0,