diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md b/docs/development/extensions-core/kafka-supervisor-reference.md index d141b23477ff..037ffe752eb7 100644 --- a/docs/development/extensions-core/kafka-supervisor-reference.md +++ b/docs/development/extensions-core/kafka-supervisor-reference.md @@ -232,6 +232,7 @@ The `tuningConfig` object is optional. If you don't specify the `tuningConfig` o |`intermediateHandoffPeriod`|ISO 8601 period|The period that determines how often tasks hand off segments. Handoff occurs if `maxRowsPerSegment` or `maxTotalRows` is reached or every `intermediateHandoffPeriod`, whichever happens first.|No|P2147483647D| |`intermediatePersistPeriod`|ISO 8601 period|The period that determines the rate at which intermediate persists occur.|No|PT10M| |`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If a new intermediate persist exceeds this limit, Druid blocks ingestion until the currently running persist finishes. One persist can be running concurrently with ingestion, and none can be queued up. The maximum heap memory usage for indexing scales is `maxRowsInMemory * (2 + maxPendingPersists)`.|No|0| +|`numPersistThreads`|Integer|The number of threads to use to create and persist incremental segments on the disk. Higher ingestion data throughput results in a larger number of incremental segments, causing significant CPU time to be spent on the creation of the incremental segments on the disk. For datasources with number of columns running into hundreds or thousands, creation of the incremental segments may take up significant time, in the order of multiple seconds. In both of these scenarios, ingestion can stall or pause frequently, causing it to fall behind. You can use additional threads to parallelize the segment creation without blocking ingestion as long as there are sufficient CPU resources available.|No|1| |`indexSpec`|Object|Defines how Druid indexes the data. See [IndexSpec](#indexspec) for more information.|No|| |`indexSpecForIntermediatePersists`|Object|Defines segment storage format options to use at indexing time for intermediate persisted temporary segments. You can use `indexSpecForIntermediatePersists` to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published. See [IndexSpec](#indexspec) for possible values.|No|Same as `indexSpec`| |`reportParseExceptions`|Boolean|DEPRECATED. If `true`, Druid throws exceptions encountered during parsing causing ingestion to halt. If `false`, Druid skips unparseable rows and fields. Setting `reportParseExceptions` to `true` overrides existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to not more than 1.|No|`false`| diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java index ab5379ffa6fa..04b332aac850 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java @@ -109,7 +109,8 @@ public static Task getTask() null, null, null, - 1L + 1L, + null ) ), null diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java index be3aac9ac25d..de4bc38fc7a5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java @@ -51,7 +51,8 @@ public KafkaIndexTaskTuningConfig( @Nullable Period intermediateHandoffPeriod, @Nullable Boolean logParseExceptions, @Nullable Integer maxParseExceptions, - @Nullable Integer maxSavedParseExceptions + @Nullable Integer maxSavedParseExceptions, + @Nullable Integer numPersistThreads ) { super( @@ -74,7 +75,8 @@ public KafkaIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); } @@ -97,7 +99,8 @@ private KafkaIndexTaskTuningConfig( @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { this( @@ -119,7 +122,8 @@ private KafkaIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); } @@ -145,7 +149,8 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) getIntermediateHandoffPeriod(), isLogParseExceptions(), getMaxParseExceptions(), - getMaxSavedParseExceptions() + getMaxSavedParseExceptions(), + getNumPersistThreads() ); } @@ -171,7 +176,8 @@ public String toString() ", logParseExceptions=" + isLogParseExceptions() + ", maxParseExceptions=" + getMaxParseExceptions() + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + - '}'; + ", numPersistThreads=" + getNumPersistThreads() + + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index c8cc454bf272..d32d694ad540 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -64,6 +64,7 @@ public static KafkaSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -91,7 +92,8 @@ public KafkaSupervisorTuningConfig( @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { super( @@ -113,7 +115,8 @@ public KafkaSupervisorTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); this.workerThreads = workerThreads; this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES); @@ -199,6 +202,7 @@ public String toString() ", logParseExceptions=" + isLogParseExceptions() + ", maxParseExceptions=" + getMaxParseExceptions() + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + + ", numPersistThreads=" + getNumPersistThreads() + '}'; } @@ -224,7 +228,8 @@ public KafkaIndexTaskTuningConfig convertToTaskTuningConfig() getIntermediateHandoffPeriod(), isLogParseExceptions(), getMaxParseExceptions(), - getMaxSavedParseExceptions() + getMaxSavedParseExceptions(), + getNumPersistThreads() ); } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index f1001174266b..b2a9702da5e9 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2851,7 +2851,8 @@ private KafkaIndexTask createTask( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + null ); if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) { final TreeMap> checkpoints = new TreeMap<>(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index 88d8de80605b..5c67f6e50212 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -72,6 +72,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(false, config.isReportParseExceptions()); Assert.assertEquals(Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout()); + Assert.assertEquals(1, config.getNumPersistThreads()); } @Test @@ -89,7 +90,8 @@ public void testSerdeWithNonDefaults() throws Exception + " \"handoffConditionTimeout\": 100,\n" + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n" - + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n" + + " \"appendableIndexSpec\": { \"type\" : \"onheap\" },\n" + + " \"numPersistThreads\": 2\n" + "}"; KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue( @@ -120,6 +122,7 @@ public void testSerdeWithNonDefaults() throws Exception IndexSpec.builder().withDimensionCompression(CompressionStrategy.UNCOMPRESSED).build(), config.getIndexSpecForIntermediatePersists() ); + Assert.assertEquals(2, config.getNumPersistThreads()); } @Test @@ -148,7 +151,8 @@ public void testConvert() null, null, null, - null + null, + 2 ); KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig(); @@ -163,6 +167,7 @@ public void testConvert() Assert.assertEquals(IndexSpec.DEFAULT, copy.getIndexSpec()); Assert.assertEquals(true, copy.isReportParseExceptions()); Assert.assertEquals(5L, copy.getHandoffConditionTimeout()); + Assert.assertEquals(2, copy.getNumPersistThreads()); } @Test @@ -187,7 +192,8 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException null, true, 42, - 42 + 42, + 2 ); String serialized = mapper.writeValueAsString(base); @@ -212,6 +218,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions()); Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions()); Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); + Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads()); } @Test @@ -236,6 +243,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException true, 42, 42, + 2, "extra string" ); @@ -260,6 +268,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions()); Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions()); Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); + Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads()); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 21221185e76f..21264a9e3968 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -336,6 +336,7 @@ public SeekableStreamIndexTaskClient build( null, null, null, + null, null ); @@ -489,6 +490,7 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException null, null, null, + null, null ), null @@ -3997,6 +3999,7 @@ public void testIsTaskCurrent() null, null, null, + null, null ) ); @@ -4035,6 +4038,7 @@ public void testIsTaskCurrent() null, null, null, + null, null ); @@ -4187,6 +4191,7 @@ public void testSequenceNameDoesNotChangeWithTaskId() null, null, null, + null, null ) ); @@ -4661,7 +4666,8 @@ public SeekableStreamIndexTaskClient build( null, null, null, - 10 + 10, + null ); return new TestableKafkaSupervisor( @@ -4774,6 +4780,7 @@ public SeekableStreamIndexTaskClient build( null, null, null, + null, null ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java index fd6cbb48b897..8e5ad243d2e5 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java @@ -55,6 +55,7 @@ public TestModifiedKafkaIndexTaskTuningConfig( @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, @JsonProperty("extra") String extra ) { @@ -77,7 +78,8 @@ public TestModifiedKafkaIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); this.extra = extra; } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java index 0c3cd7177522..a86541a04873 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java @@ -115,7 +115,8 @@ public KinesisIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + null ); this.recordBufferSize = recordBufferSize; this.recordBufferSizeBytes = recordBufferSizeBytes; @@ -362,6 +363,6 @@ public String toString() ", maxRecordsPerPoll=" + maxRecordsPerPoll + ", maxBytesPerPoll=" + maxBytesPerPoll + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + - '}'; + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index 95154acf1590..4a117976e184 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -68,6 +68,8 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig private final int maxParseExceptions; private final int maxSavedParseExceptions; + private final int numPersistThreads; + public RealtimeAppenderatorTuningConfig( @Nullable AppendableIndexSpec appendableIndexSpec, Integer maxRowsInMemory, @@ -87,7 +89,8 @@ public RealtimeAppenderatorTuningConfig( @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, @Nullable Boolean logParseExceptions, @Nullable Integer maxParseExceptions, - @Nullable Integer maxSavedParseExceptions + @Nullable Integer maxSavedParseExceptions, + @Nullable Integer numPersistThreads ) { this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; @@ -133,6 +136,8 @@ public RealtimeAppenderatorTuningConfig( this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions; + this.numPersistThreads = numPersistThreads == null ? + DEFAULT_NUM_PERSIST_THREADS : Math.max(numPersistThreads, DEFAULT_NUM_PERSIST_THREADS); } @JsonCreator @@ -154,7 +159,8 @@ private RealtimeAppenderatorTuningConfig( @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { this( @@ -176,7 +182,8 @@ private RealtimeAppenderatorTuningConfig( segmentWriteOutMediumFactory, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); } @@ -314,6 +321,13 @@ public int getMaxSavedParseExceptions() return maxSavedParseExceptions; } + @Override + @JsonProperty + public int getNumPersistThreads() + { + return numPersistThreads; + } + @Override public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir) { @@ -336,7 +350,8 @@ public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir) segmentWriteOutMediumFactory, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 6959de2809de..bb3bc6826f83 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -286,7 +286,8 @@ static CompactionTuningConfig getTuningConfig(TuningConfig tuningConfig) parallelIndexTuningConfig.getMaxParseExceptions(), parallelIndexTuningConfig.getMaxSavedParseExceptions(), parallelIndexTuningConfig.getMaxColumnsToMerge(), - parallelIndexTuningConfig.getAwaitSegmentAvailabilityTimeoutMillis() + parallelIndexTuningConfig.getAwaitSegmentAvailabilityTimeoutMillis(), + parallelIndexTuningConfig.getNumPersistThreads() ); } else if (tuningConfig instanceof IndexTuningConfig) { final IndexTuningConfig indexTuningConfig = (IndexTuningConfig) tuningConfig; @@ -320,7 +321,8 @@ static CompactionTuningConfig getTuningConfig(TuningConfig tuningConfig) indexTuningConfig.getMaxParseExceptions(), indexTuningConfig.getMaxSavedParseExceptions(), indexTuningConfig.getMaxColumnsToMerge(), - indexTuningConfig.getAwaitSegmentAvailabilityTimeoutMillis() + indexTuningConfig.getAwaitSegmentAvailabilityTimeoutMillis(), + indexTuningConfig.getNumPersistThreads() ); } else { throw new ISE( @@ -1383,7 +1385,8 @@ public static CompactionTuningConfig defaultConfig() null, null, null, - 0L + 0L, + null ); } @@ -1419,7 +1422,8 @@ public CompactionTuningConfig( @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, - @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis + @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { super( @@ -1453,7 +1457,8 @@ public CompactionTuningConfig( maxSavedParseExceptions, maxColumnsToMerge, awaitSegmentAvailabilityTimeoutMillis, - null + null, + numPersistThreads ); Preconditions.checkArgument( @@ -1495,7 +1500,8 @@ public CompactionTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) getMaxParseExceptions(), getMaxSavedParseExceptions(), getMaxColumnsToMerge(), - getAwaitSegmentAvailabilityTimeoutMillis() + getAwaitSegmentAvailabilityTimeoutMillis(), + getNumPersistThreads() ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index b18a6f0f1748..e90f531b58d4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1300,6 +1300,8 @@ public static class IndexTuningConfig implements AppenderatorConfig @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + private final int numPersistThreads; + @Nullable private static PartitionsSpec getPartitionsSpec( boolean forceGuaranteedRollup, @@ -1366,7 +1368,8 @@ public IndexTuningConfig( @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, - @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis + @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { this( @@ -1394,7 +1397,8 @@ public IndexTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxColumnsToMerge, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + numPersistThreads ); Preconditions.checkArgument( @@ -1405,7 +1409,7 @@ public IndexTuningConfig( private IndexTuningConfig() { - this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); } private IndexTuningConfig( @@ -1426,7 +1430,8 @@ private IndexTuningConfig( @Nullable Integer maxParseExceptions, @Nullable Integer maxSavedParseExceptions, @Nullable Integer maxColumnsToMerge, - @Nullable Long awaitSegmentAvailabilityTimeoutMillis + @Nullable Long awaitSegmentAvailabilityTimeoutMillis, + @Nullable Integer numPersistThreads ) { this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; @@ -1472,6 +1477,8 @@ private IndexTuningConfig( } else { this.awaitSegmentAvailabilityTimeoutMillis = awaitSegmentAvailabilityTimeoutMillis; } + this.numPersistThreads = numPersistThreads == null ? + DEFAULT_NUM_PERSIST_THREADS : Math.max(numPersistThreads, DEFAULT_NUM_PERSIST_THREADS); } @Override @@ -1495,7 +1502,8 @@ public IndexTuningConfig withBasePersistDirectory(File dir) maxParseExceptions, maxSavedParseExceptions, maxColumnsToMerge, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + numPersistThreads ); } @@ -1519,7 +1527,8 @@ public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) maxParseExceptions, maxSavedParseExceptions, maxColumnsToMerge, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + numPersistThreads ); } @@ -1708,6 +1717,13 @@ public long getAwaitSegmentAvailabilityTimeoutMillis() return awaitSegmentAvailabilityTimeoutMillis; } + @JsonProperty + @Override + public int getNumPersistThreads() + { + return numPersistThreads; + } + @Override public boolean equals(Object o) { @@ -1730,6 +1746,7 @@ public boolean equals(Object o) logParseExceptions == that.logParseExceptions && maxParseExceptions == that.maxParseExceptions && maxSavedParseExceptions == that.maxSavedParseExceptions && + numPersistThreads == that.numPersistThreads && Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(indexSpec, that.indexSpec) && Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && @@ -1759,7 +1776,8 @@ public int hashCode() maxParseExceptions, maxSavedParseExceptions, segmentWriteOutMediumFactory, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + numPersistThreads ); } @@ -1784,6 +1802,7 @@ public String toString() ", maxSavedParseExceptions=" + maxSavedParseExceptions + ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory + ", awaitSegmentAvailabilityTimeoutMillis=" + awaitSegmentAvailabilityTimeoutMillis + + ", numPersistThreads=" + numPersistThreads + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 9940dfdcb4de..df987798aa72 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -1272,7 +1272,8 @@ private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningC tuningConfig.getMaxParseExceptions(), tuningConfig.getMaxSavedParseExceptions(), tuningConfig.getMaxColumnsToMerge(), - tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis() + tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis(), + tuningConfig.getNumPersistThreads() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index 1021244f6cfd..7e33261ee7ce 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -107,6 +107,7 @@ public static ParallelIndexTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -143,7 +144,8 @@ public ParallelIndexTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis, - @JsonProperty("maxAllowedLockCount") @Nullable Integer maxAllowedLockCount + @JsonProperty("maxAllowedLockCount") @Nullable Integer maxAllowedLockCount, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { super( @@ -170,7 +172,8 @@ public ParallelIndexTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxColumnsToMerge, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + numPersistThreads ); if (maxNumSubTasks != null && maxNumConcurrentSubTasks != null) { @@ -306,7 +309,8 @@ public ParallelIndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpe getMaxSavedParseExceptions(), getMaxColumnsToMerge(), getAwaitSegmentAvailabilityTimeoutMillis(), - getMaxAllowedLockCount() + getMaxAllowedLockCount(), + getNumPersistThreads() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java index 904f2820af60..204d1b3e7a33 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java @@ -61,6 +61,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato private final int maxParseExceptions; private final int maxSavedParseExceptions; + private final int numPersistThreads; + public SeekableStreamIndexTaskTuningConfig( @Nullable AppendableIndexSpec appendableIndexSpec, @Nullable Integer maxRowsInMemory, @@ -81,7 +83,8 @@ public SeekableStreamIndexTaskTuningConfig( @Nullable Period intermediateHandoffPeriod, @Nullable Boolean logParseExceptions, @Nullable Integer maxParseExceptions, - @Nullable Integer maxSavedParseExceptions + @Nullable Integer maxSavedParseExceptions, + @Nullable Integer numPersistThreads ) { // Cannot be a static because default basePersistDirectory is unique per-instance @@ -134,6 +137,8 @@ public SeekableStreamIndexTaskTuningConfig( this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions; + this.numPersistThreads = numPersistThreads == null ? + DEFAULT_NUM_PERSIST_THREADS : Math.max(numPersistThreads, DEFAULT_NUM_PERSIST_THREADS); } @Override @@ -277,6 +282,13 @@ public boolean isSkipSequenceNumberAvailabilityCheck() return skipSequenceNumberAvailabilityCheck; } + @Override + @JsonProperty + public int getNumPersistThreads() + { + return numPersistThreads; + } + @Override public abstract SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir); @@ -302,6 +314,7 @@ public boolean equals(Object o) logParseExceptions == that.logParseExceptions && maxParseExceptions == that.maxParseExceptions && maxSavedParseExceptions == that.maxSavedParseExceptions && + numPersistThreads == that.numPersistThreads && Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && @@ -333,7 +346,8 @@ public int hashCode() skipSequenceNumberAvailabilityCheck, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java index ef9a67562eaf..63bad86c1952 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java @@ -89,6 +89,7 @@ public TestIndexTask( null, null, null, + null, null ) ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index c29f482b66ea..b864fdb44a54 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1452,7 +1452,8 @@ private AppenderatorDriverRealtimeIndexTask makeRealtimeTask( null, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + null ); return new AppenderatorDriverRealtimeIndexTask( taskId, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorsTest.java index c0489c61b425..00e170d90f5d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorsTest.java @@ -176,7 +176,8 @@ public AppenderatorTester( 0L, OffHeapMemorySegmentWriteOutMediumFactory.instance(), IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE, - basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory + basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory, + null ); metrics = new FireDepartmentMetrics(); @@ -348,6 +349,7 @@ static class TestIndexTuningConfig implements AppenderatorConfig private final IndexSpec indexSpecForIntermediatePersists; @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + private final int numPersistThreads; public TestIndexTuningConfig( AppendableIndexSpec appendableIndexSpec, @@ -360,7 +362,8 @@ public TestIndexTuningConfig( Long pushTimeout, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, Integer maxColumnsToMerge, - File basePersistDirectory + File basePersistDirectory, + Integer numPersistThreads ) { this.appendableIndexSpec = appendableIndexSpec; @@ -377,6 +380,8 @@ public TestIndexTuningConfig( this.partitionsSpec = null; this.indexSpecForIntermediatePersists = this.indexSpec; + + this.numPersistThreads = numPersistThreads == null ? DEFAULT_NUM_PERSIST_THREADS : numPersistThreads; } @Override @@ -465,6 +470,12 @@ public Period getIntermediatePersistPeriod() return new Period(Integer.MAX_VALUE); // intermediate persist doesn't make much sense for batch jobs } + @Override + public int getNumPersistThreads() + { + return numPersistThreads; + } + @Override public boolean equals(Object o) { @@ -483,6 +494,7 @@ public boolean equals(Object o) maxPendingPersists == that.maxPendingPersists && reportParseExceptions == that.reportParseExceptions && pushTimeout == that.pushTimeout && + numPersistThreads == that.numPersistThreads && Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(indexSpec, that.indexSpec) && Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && @@ -506,7 +518,8 @@ public int hashCode() maxPendingPersists, reportParseExceptions, pushTimeout, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + numPersistThreads ); } @@ -526,6 +539,7 @@ public String toString() ", reportParseExceptions=" + reportParseExceptions + ", pushTimeout=" + pushTimeout + ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory + + ", numPersistThreads=" + numPersistThreads + '}'; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/BatchAppenderatorsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/BatchAppenderatorsTest.java index f81007722350..18371b2afd54 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/BatchAppenderatorsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/BatchAppenderatorsTest.java @@ -181,7 +181,8 @@ public AppenderatorTester( 0L, OffHeapMemorySegmentWriteOutMediumFactory.instance(), IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE, - basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory + basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory, + null ); metrics = new FireDepartmentMetrics(); @@ -377,6 +378,7 @@ static class TestIndexTuningConfig implements AppenderatorConfig private final IndexSpec indexSpecForIntermediatePersists; @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + private final int numPersistThreads; public TestIndexTuningConfig( AppendableIndexSpec appendableIndexSpec, @@ -389,7 +391,8 @@ public TestIndexTuningConfig( Long pushTimeout, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, Integer maxColumnsToMerge, - File basePersistDirectory + File basePersistDirectory, + Integer numPersistThreads ) { this.appendableIndexSpec = appendableIndexSpec; @@ -406,6 +409,8 @@ public TestIndexTuningConfig( this.partitionsSpec = null; this.indexSpecForIntermediatePersists = this.indexSpec; + + this.numPersistThreads = numPersistThreads == null ? DEFAULT_NUM_PERSIST_THREADS : numPersistThreads; } @Override @@ -494,6 +499,12 @@ public Period getIntermediatePersistPeriod() return new Period(Integer.MAX_VALUE); // intermediate persist doesn't make much sense for batch jobs } + @Override + public int getNumPersistThreads() + { + return numPersistThreads; + } + @Override public boolean equals(Object o) { @@ -512,6 +523,7 @@ public boolean equals(Object o) maxPendingPersists == that.maxPendingPersists && reportParseExceptions == that.reportParseExceptions && pushTimeout == that.pushTimeout && + numPersistThreads == that.numPersistThreads && Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(indexSpec, that.indexSpec) && Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && @@ -535,7 +547,8 @@ public int hashCode() maxPendingPersists, reportParseExceptions, pushTimeout, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + numPersistThreads ); } @@ -555,6 +568,7 @@ public String toString() ", reportParseExceptions=" + reportParseExceptions + ", pushTimeout=" + pushTimeout + ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory + + ", numPersistThreads=" + numPersistThreads + '}'; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index 5c58818529bf..fc581d4954b9 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -286,6 +286,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException null, 2, null, + null, null ) ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 78c4f1fabbf5..03994e35e518 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -369,6 +369,7 @@ public void testRunWithHashPartitioning() throws Exception null, null, null, + null, null ) ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 321ad5db3fd1..c90c08349c4d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -366,6 +366,7 @@ private static CompactionTask.CompactionTuningConfig createTuningConfig() null, null, null, + null, null ); } @@ -665,6 +666,7 @@ public void testSerdeWithOldTuningConfigSuccessfullyDeserializeToNewOne() throws null, null, null, + null, null ), null, @@ -747,6 +749,7 @@ public void testGetTuningConfigWithIndexTuningConfig() null, null, null, + null, null ); @@ -785,6 +788,7 @@ public void testGetTuningConfigWithIndexTuningConfig() null, null, null, + null, null ); @@ -831,6 +835,7 @@ public void testGetTuningConfigWithParallelIndexTuningConfig() null, null, null, + null, null ); @@ -869,6 +874,7 @@ public void testGetTuningConfigWithParallelIndexTuningConfig() null, null, null, + null, null ); @@ -1019,6 +1025,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null, null, null, + null, null ); final List ingestionSpecs = CompactionTask.createIngestionSchema( @@ -1095,6 +1102,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException null, null, null, + null, null ); final List ingestionSpecs = CompactionTask.createIngestionSchema( @@ -1171,6 +1179,7 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException null, null, null, + null, null ); final List ingestionSpecs = CompactionTask.createIngestionSchema( @@ -1832,6 +1841,7 @@ private void assertIngestionSchema( null, null, null, + null, null ), expectedSegmentGranularity, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTuningConfigTest.java index ace69c69433b..6831ea4adb60 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTuningConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTuningConfigTest.java @@ -102,7 +102,8 @@ public void testSerdeWithNonZeroAwaitSegmentAvailabilityTimeoutMillis() null, null, null, - 5L + 5L, + null ); } @@ -144,7 +145,8 @@ public void testSerdeWithZeroAwaitSegmentAvailabilityTimeoutMillis() null, null, null, - 0L + 0L, + null ); Assert.assertEquals(0L, tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis()); } @@ -187,6 +189,7 @@ public void testSerdeWithNullAwaitSegmentAvailabilityTimeoutMillis() null, null, null, + null, null ); Assert.assertEquals(0L, tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java index 04075ea2b157..40d9cb4919ff 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java @@ -85,7 +85,8 @@ public void testSerdeTuningConfigWithDynamicPartitionsSpec() throws IOException 10, 100, 1234, - 0L + 0L, + null ); assertSerdeTuningConfig(tuningConfig); } @@ -122,7 +123,8 @@ public void testSerdeTuningConfigWithHashedPartitionsSpec() throws IOException 10, 100, null, - -1L + -1L, + null ); assertSerdeTuningConfig(tuningConfig); } @@ -159,7 +161,8 @@ public void testSerdeTuningConfigWithDeprecatedDynamicPartitionsSpec() throws IO 10, 100, null, - 1L + 1L, + null ); assertSerdeTuningConfig(tuningConfig); } @@ -196,6 +199,7 @@ public void testSerdeTuningConfigWithDeprecatedHashedPartitionsSpec() throws IOE 10, 100, 1234, + null, null ); assertSerdeTuningConfig(tuningConfig); @@ -235,6 +239,7 @@ public void testForceGuaranteedRollupWithDynamicPartitionsSpec() 10, 100, null, + null, null ); } @@ -273,6 +278,7 @@ public void testBestEffortRollupWithHashedPartitionsSpec() 10, 100, null, + null, null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index a172753ed922..b60ddb80970c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -1618,6 +1618,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception 7, 7, null, + null, null ); @@ -1796,6 +1797,7 @@ public void testMultipleParseExceptionsFailure() throws Exception 2, 5, null, + null, null ); @@ -1938,6 +1940,7 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc 2, 5, null, + null, null ); @@ -2842,6 +2845,7 @@ static IndexTuningConfig createTuningConfig( null, 1, null, + null, null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index 14888b1327fe..2e3e7d5d8ce4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -862,6 +862,7 @@ private RealtimeIndexTask makeRealtimeTask( handoffTimeout, null, null, + null, null ); return new RealtimeIndexTask( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index d3deefc9ae78..8c0cbd925627 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -271,7 +271,8 @@ public void testIndexTaskSerde() throws Exception null, null, null, - 1L + 1L, + null ) ), null @@ -357,6 +358,7 @@ public void testIndexTaskwithResourceSerde() throws Exception null, null, null, + null, null ) ), @@ -426,6 +428,7 @@ public void testRealtimeIndexTaskSerde() throws Exception null, null, null, + null, null ) ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 0f165912e9a2..4a0707e3586f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -193,6 +193,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase 5, null, null, + null, null ); @@ -303,6 +304,7 @@ protected ParallelIndexTuningConfig newTuningConfig( 5, null, null, + null, null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index 7bf894fff4d7..be75a32c878c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -202,6 +202,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, null ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index 4ee422a04144..266604a8542c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -454,6 +454,7 @@ private TestSupervisorTask newTask( null, null, null, + null, null ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index 6c83c225fa9f..7b92ba44bd9d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -271,6 +271,7 @@ ParallelIndexIngestionSpec build() null, null, null, + null, null ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 2518c1053d16..6fb56aa33f7f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -271,6 +271,7 @@ public void testFailToConstructWhenBothAppendToExistingAndForceGuaranteedRollupA null, null, null, + null, null ); final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec( @@ -343,6 +344,7 @@ public void testFailToConstructWhenBothInputSourceAndParserAreSet() null, null, null, + null, null ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java index 7f315ab33043..0a0d4d4692f0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java @@ -184,7 +184,8 @@ ParallelIndexTuningConfig build() 25, null, null, - null + null, + 2 ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java index 380f826c18f8..a3f775509e2d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java @@ -103,7 +103,8 @@ public void testSerdeWithMaxRowsPerSegment() null, null, null, - null + null, + 2 ); final byte[] json = mapper.writeValueAsBytes(tuningConfig); final ParallelIndexTuningConfig fromJson = (ParallelIndexTuningConfig) mapper.readValue(json, TuningConfig.class); @@ -150,7 +151,8 @@ public void testSerdeWithMaxNumConcurrentSubTasks() throws IOException null, null, null, - null + null, + 2 ); final byte[] json = mapper.writeValueAsBytes(tuningConfig); final ParallelIndexTuningConfig fromJson = (ParallelIndexTuningConfig) mapper.readValue(json, TuningConfig.class); @@ -197,7 +199,8 @@ public void testSerdeWithMaxNumSubTasks() throws IOException null, null, null, - null + null, + 2 ); final byte[] json = mapper.writeValueAsBytes(tuningConfig); final ParallelIndexTuningConfig fromJson = (ParallelIndexTuningConfig) mapper.readValue(json, TuningConfig.class); @@ -246,6 +249,7 @@ public void testSerdeWithMaxNumSubTasksAndMaxNumConcurrentSubTasks() null, null, null, + null, null ); } @@ -292,6 +296,7 @@ public void testConstructorWithHashedPartitionsSpecAndNonForceGuaranteedRollupFa null, null, null, + null, null ); } @@ -338,6 +343,7 @@ public void testConstructorWithSingleDimensionPartitionsSpecAndNonForceGuarantee null, null, null, + null, null ); } @@ -384,6 +390,7 @@ public void testConstructorWithDynamicPartitionsSpecAndForceGuaranteedRollupFail null, null, null, + null, null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index d22769c1f791..0da7ed0934e6 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -586,6 +586,7 @@ public void testWith1MaxNumConcurrentSubTasks() null, null, null, + null, null ), VALID_INPUT_SOURCE_FILTER @@ -711,7 +712,8 @@ public void testMaxLocksWith1MaxNumConcurrentSubTasks() null, null, null, - 0 + 0, + null ), VALID_INPUT_SOURCE_FILTER ); @@ -772,7 +774,8 @@ public void testMaxLocksWith2MaxNumConcurrentSubTasks() null, null, null, - 0 + 0, + null ), VALID_INPUT_SOURCE_FILTER ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index 002c70262cb6..6c467bb7ac7c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -751,6 +751,7 @@ public void testIndexTask() null, null, null, + null, null ) ), @@ -835,6 +836,7 @@ public void testIndexTaskFailure() null, null, null, + null, null ) ), @@ -1386,6 +1388,7 @@ public void testResumeTasks() throws Exception null, null, null, + null, null ) ), @@ -1497,6 +1500,7 @@ public void testUnifiedAppenderatorsManagerCleanup() throws Exception null, null, null, + null, null ) ), @@ -1655,6 +1659,7 @@ private RealtimeIndexTask newRealtimeIndexTask() null, null, null, + null, null ); FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 9215f24a3858..12eb3c7e10ab 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -517,6 +517,7 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() null, null, null, + null, null ) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index cb653c6aa848..2a69ff064cb4 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -2019,6 +2019,7 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() null, null, null, + null, null ) { diff --git a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java index c5bc20f45f21..c0ccb967dbdf 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java @@ -125,6 +125,21 @@ public static ExecutorService newBlockingSingleThreaded( final int capacity, final Integer priority ) + { + return newBlockingThreaded(nameFormat, 1, capacity, priority); + } + + public static ExecutorService newBlockingThreaded(final String nameFormat, int nThreads, final int capacity) + { + return newBlockingThreaded(nameFormat, nThreads, capacity, null); + } + + public static ExecutorService newBlockingThreaded( + final String nameFormat, + int nThreads, + final int capacity, + final Integer priority + ) { final BlockingQueue queue; if (capacity > 0) { @@ -133,8 +148,8 @@ public static ExecutorService newBlockingSingleThreaded( queue = new SynchronousQueue<>(); } return new ThreadPoolExecutor( - 1, - 1, + nThreads, + nThreads, 0L, TimeUnit.MILLISECONDS, queue, diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java index 32c95ebc6add..a75a79b4dd9f 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java @@ -79,7 +79,8 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File DEFAULT_HANDOFF_CONDITION_TIMEOUT, DEFAULT_ALERT_TIMEOUT, null, - DEFAULT_DEDUP_COLUMN + DEFAULT_DEDUP_COLUMN, + DEFAULT_NUM_PERSIST_THREADS ); } @@ -105,6 +106,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; @Nullable private final String dedupColumn; + private final int numPersistThreads; public RealtimeTuningConfig( @Nullable AppendableIndexSpec appendableIndexSpec, @@ -126,7 +128,8 @@ public RealtimeTuningConfig( Long handoffConditionTimeout, Long alertTimeout, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, - @Nullable String dedupColumn + @Nullable String dedupColumn, + @Nullable Integer numPersistThreads ) { this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; @@ -164,6 +167,8 @@ public RealtimeTuningConfig( Preconditions.checkArgument(this.alertTimeout >= 0, "alertTimeout must be >= 0"); this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; this.dedupColumn = dedupColumn == null ? DEFAULT_DEDUP_COLUMN : dedupColumn; + this.numPersistThreads = numPersistThreads == null ? + DEFAULT_NUM_PERSIST_THREADS : Math.max(numPersistThreads, DEFAULT_NUM_PERSIST_THREADS); } @JsonCreator @@ -185,7 +190,8 @@ private RealtimeTuningConfig( @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @JsonProperty("alertTimeout") Long alertTimeout, @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, - @JsonProperty("dedupColumn") @Nullable String dedupColumn + @JsonProperty("dedupColumn") @Nullable String dedupColumn, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { this( @@ -208,7 +214,8 @@ private RealtimeTuningConfig( handoffConditionTimeout, alertTimeout, segmentWriteOutMediumFactory, - dedupColumn + dedupColumn, + numPersistThreads ); } @@ -349,6 +356,13 @@ public String getDedupColumn() return dedupColumn; } + @Override + @JsonProperty + public int getNumPersistThreads() + { + return numPersistThreads; + } + public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -371,7 +385,8 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) handoffConditionTimeout, alertTimeout, segmentWriteOutMediumFactory, - dedupColumn + dedupColumn, + numPersistThreads ); } @@ -398,7 +413,8 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir) handoffConditionTimeout, alertTimeout, segmentWriteOutMediumFactory, - dedupColumn + dedupColumn, + numPersistThreads ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index fff3466a94eb..ab1b09b772cd 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -28,12 +28,20 @@ public interface AppenderatorConfig extends TuningConfig { + + int DEFAULT_NUM_PERSIST_THREADS = 1; + boolean isReportParseExceptions(); int getMaxPendingPersists(); boolean isSkipBytesInMemoryOverheadCheck(); + default int getNumPersistThreads() + { + return DEFAULT_NUM_PERSIST_THREADS; + } + /** * Maximum number of rows in a single segment before pushing to deep storage */ diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 2268c00e2eb2..3fa5b8456c1b 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -188,7 +188,7 @@ public class AppenderatorImpl implements Appenderator */ private final Map> persistedHydrantMetadata = Collections.synchronizedMap(new IdentityHashMap<>()); - + /** * This constructor allows the caller to provide its own SinkQuerySegmentWalker. * @@ -1148,9 +1148,9 @@ private void initializeExecutors() if (persistExecutor == null) { // use a blocking single threaded executor to throttle the firehose when write to disk is slow persistExecutor = MoreExecutors.listeningDecorator( - Execs.newBlockingSingleThreaded( + Execs.newBlockingThreaded( "[" + StringUtils.encodeForFormat(myId) + "]-appenderator-persist", - maxPendingPersists + tuningConfig.getNumPersistThreads(), maxPendingPersists ) ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java index 4b1d384d0f7e..1609744e9e8a 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java @@ -214,11 +214,11 @@ private void initializeExecutors() log.debug("There will be up to[%d] pending persists", maxPendingPersists); if (persistExecutor == null) { - // use a blocking single threaded executor to throttle the firehose when write to disk is slow + log.info("Number of persist threads [%d]", tuningConfig.getNumPersistThreads()); persistExecutor = MoreExecutors.listeningDecorator( - Execs.newBlockingSingleThreaded( + Execs.newBlockingThreaded( "[" + StringUtils.encodeForFormat(myId) + "]-batch-appenderator-persist", - maxPendingPersists + tuningConfig.getNumPersistThreads(), maxPendingPersists ) ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 4ebe492ca234..0593e6e823bc 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -1176,11 +1176,11 @@ private void initializeExecutors() final int maxPendingPersists = tuningConfig.getMaxPendingPersists(); if (persistExecutor == null) { - // use a blocking single threaded executor to throttle the firehose when write to disk is slow + log.info("Number of persist threads [%d]", tuningConfig.getNumPersistThreads()); persistExecutor = MoreExecutors.listeningDecorator( - Execs.newBlockingSingleThreaded( + Execs.newBlockingThreaded( "[" + StringUtils.encodeForFormat(myId) + "]-appenderator-persist", - maxPendingPersists + tuningConfig.getNumPersistThreads(), maxPendingPersists ) ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 8b5b14823573..61993fbcfe56 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -572,6 +572,12 @@ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() { return baseConfig.getSegmentWriteOutMediumFactory(); } + + @Override + public int getNumPersistThreads() + { + return baseConfig.getNumPersistThreads(); + } } private IndexMerger wrapIndexMerger(IndexMerger baseMerger) diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index dc96e912dcf1..f795492819a8 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -92,6 +92,7 @@ public void setUp() throws Exception null, null, null, + null, null ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmensSinksBatchAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmensSinksBatchAppenderatorTester.java index c1d2bf7a3a5d..eb63abc95986 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmensSinksBatchAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmensSinksBatchAppenderatorTester.java @@ -180,7 +180,8 @@ public ClosedSegmensSinksBatchAppenderatorTester( 0L, OffHeapMemorySegmentWriteOutMediumFactory.instance(), IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE, - basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory + basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory, + null ); metrics = new FireDepartmentMetrics(); @@ -314,6 +315,7 @@ static class TestIndexTuningConfig implements AppenderatorConfig private final IndexSpec indexSpecForIntermediatePersists; @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + private final int numPersistThreads; public TestIndexTuningConfig( AppendableIndexSpec appendableIndexSpec, @@ -326,7 +328,8 @@ public TestIndexTuningConfig( Long pushTimeout, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, Integer maxColumnsToMerge, - File basePersistDirectory + File basePersistDirectory, + @Nullable Integer numPersistThreads ) { this.appendableIndexSpec = appendableIndexSpec; @@ -343,6 +346,7 @@ public TestIndexTuningConfig( this.partitionsSpec = null; this.indexSpecForIntermediatePersists = this.indexSpec; + this.numPersistThreads = numPersistThreads == null ? DEFAULT_NUM_PERSIST_THREADS : numPersistThreads; } @Override @@ -430,6 +434,12 @@ public Period getIntermediatePersistPeriod() { return new Period(Integer.MAX_VALUE); // intermediate persist doesn't make much sense for batch jobs } + + @Override + public int getNumPersistThreads() + { + return numPersistThreads; + } @Override public boolean equals(Object o) @@ -449,6 +459,7 @@ public boolean equals(Object o) maxPendingPersists == that.maxPendingPersists && reportParseExceptions == that.reportParseExceptions && pushTimeout == that.pushTimeout && + numPersistThreads == that.numPersistThreads && Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(indexSpec, that.indexSpec) && Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && @@ -472,7 +483,8 @@ public int hashCode() maxPendingPersists, reportParseExceptions, pushTimeout, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + numPersistThreads ); } @@ -492,6 +504,7 @@ public String toString() ", reportParseExceptions=" + reportParseExceptions + ", pushTimeout=" + pushTimeout + ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory + + ", numPersistThreads=" + numPersistThreads + '}'; } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index c6c2069b681c..975784268128 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -146,6 +146,7 @@ public int getNumThreads() null, null, null, + null, null ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsAppenderatorTester.java index 552dc893baf8..8869eb907e0d 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsAppenderatorTester.java @@ -150,7 +150,8 @@ public OpenAndClosedSegmentsAppenderatorTester( 0L, OffHeapMemorySegmentWriteOutMediumFactory.instance(), IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE, - basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory + basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory, + null ); metrics = new FireDepartmentMetrics(); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java index e81fd9795d83..3a13a257ce72 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java @@ -152,6 +152,7 @@ public StreamAppenderatorTester( null, null, null, + null, null ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index eeea2a930e53..74a1217ed275 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -225,6 +225,7 @@ public void setUp() throws Exception null, null, null, + null, null ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java index 6caafa417160..0d3c9c23bfff 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java @@ -103,6 +103,7 @@ public void testSwap() throws Exception null, null, null, + null, null ); final Sink sink = new Sink( @@ -260,7 +261,8 @@ public void testDedup() throws Exception null, null, null, - "dedupColumn" + "dedupColumn", + null ); final Sink sink = new Sink( interval, @@ -396,7 +398,8 @@ public void testGetSinkSignature() throws IndexSizeExceededException null, null, null, - "dedupColumn" + "dedupColumn", + null ); final Sink sink = new Sink( interval, diff --git a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java index 232c87eb8801..b843465147eb 100644 --- a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java @@ -173,7 +173,8 @@ public void testTaskValidator() throws Exception null, null, null, - null + null, + 2 ) ), null