From da4030cd81c22ae276f526358e7aa8438ff81de6 Mon Sep 17 00:00:00 2001 From: Pramod Immaneni Date: Mon, 19 Jul 2021 11:32:32 -0700 Subject: [PATCH 1/8] Added parallel index persists of segment shards during ingestion --- .../java/util/common/concurrent/Execs.java | 19 +++++++++++++++++-- .../appenderator/AppenderatorImpl.java | 18 ++++++++++++++++-- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java index c5bc20f45f21..c0ccb967dbdf 100644 --- a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java +++ b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java @@ -125,6 +125,21 @@ public static ExecutorService newBlockingSingleThreaded( final int capacity, final Integer priority ) + { + return newBlockingThreaded(nameFormat, 1, capacity, priority); + } + + public static ExecutorService newBlockingThreaded(final String nameFormat, int nThreads, final int capacity) + { + return newBlockingThreaded(nameFormat, nThreads, capacity, null); + } + + public static ExecutorService newBlockingThreaded( + final String nameFormat, + int nThreads, + final int capacity, + final Integer priority + ) { final BlockingQueue queue; if (capacity > 0) { @@ -133,8 +148,8 @@ public static ExecutorService newBlockingSingleThreaded( queue = new SynchronousQueue<>(); } return new ThreadPoolExecutor( - 1, - 1, + nThreads, + nThreads, 0L, TimeUnit.MILLISECONDS, queue, diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 1003467c6e16..886b11d91cfe 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -188,6 +188,8 @@ public class AppenderatorImpl implements Appenderator private final Map> persistedHydrantMetadata = Collections.synchronizedMap(new IdentityHashMap<>()); + private int persistThreads = 1; + /** * This constructor allows the caller to provide its own SinkQuerySegmentWalker. * @@ -247,6 +249,18 @@ public class AppenderatorImpl implements Appenderator } else { log.debug("Running closed segments appenderator"); } + + try { + String pThreads = System.getProperty("druid.exp.persist.threads"); + if (pThreads != null) { + persistThreads = Integer.parseInt(pThreads); + } + } + catch (Exception e) { + log.warn(e, "Error getting persist threads, defaulting"); + } + + log.info("Number of persist threads [%d]", persistThreads); } @Override @@ -1146,9 +1160,9 @@ private void initializeExecutors() if (persistExecutor == null) { // use a blocking single threaded executor to throttle the firehose when write to disk is slow persistExecutor = MoreExecutors.listeningDecorator( - Execs.newBlockingSingleThreaded( + Execs.newBlockingThreaded( "[" + StringUtils.encodeForFormat(myId) + "]-appenderator-persist", - maxPendingPersists + persistThreads, maxPendingPersists ) ); } From 48231d754b9a5fb24d8f7b45a4c39d6650a07c92 Mon Sep 17 00:00:00 2001 From: Pramod Immaneni Date: Mon, 20 Jun 2022 03:08:42 -0700 Subject: [PATCH 2/8] Added ingestion spec for parallelization to persistence of incremental segments --- .../k8s/overlord/common/K8sTestUtils.java | 3 +- .../kafka/KafkaIndexTaskTuningConfig.java | 18 ++++++---- .../KafkaSupervisorTuningConfig.java | 11 +++++-- .../indexing/kafka/KafkaIndexTaskTest.java | 3 +- .../kafka/KafkaIndexTaskTuningConfigTest.java | 15 +++++++-- .../kafka/supervisor/KafkaSupervisorTest.java | 8 ++++- ...estModifiedKafkaIndexTaskTuningConfig.java | 4 ++- .../kinesis/KinesisIndexTaskTuningConfig.java | 18 ++++++---- .../KinesisSupervisorTuningConfig.java | 11 +++++-- .../kinesis/KinesisIndexTaskSerdeTest.java | 1 + .../kinesis/KinesisIndexTaskTest.java | 3 +- .../KinesisIndexTaskTuningConfigTest.java | 17 +++++++--- .../supervisor/KinesisSupervisorTest.java | 3 ++ ...tModifiedKinesisIndexTaskTuningConfig.java | 7 ++-- .../RealtimeAppenderatorTuningConfig.java | 23 ++++++++++--- .../indexing/common/task/CompactionTask.java | 18 ++++++---- .../druid/indexing/common/task/IndexTask.java | 33 +++++++++++++++---- .../parallel/ParallelIndexSupervisorTask.java | 3 +- .../parallel/ParallelIndexTuningConfig.java | 10 ++++-- .../SeekableStreamIndexTaskTuningConfig.java | 18 ++++++++-- .../druid/indexing/common/TestIndexTask.java | 1 + ...penderatorDriverRealtimeIndexTaskTest.java | 3 +- .../common/task/AppenderatorsTest.java | 20 +++++++++-- .../common/task/BatchAppenderatorsTest.java | 20 +++++++++-- .../ClientCompactionTaskQuerySerdeTest.java | 1 + .../common/task/CompactionTaskRunTest.java | 1 + .../common/task/CompactionTaskTest.java | 10 ++++++ .../task/CompactionTuningConfigTest.java | 7 ++-- .../common/task/IndexTaskSerdeTest.java | 12 +++++-- .../indexing/common/task/IndexTaskTest.java | 4 +++ .../common/task/RealtimeIndexTaskTest.java | 1 + .../indexing/common/task/TaskSerdeTest.java | 5 ++- ...stractParallelIndexSupervisorTaskTest.java | 2 ++ .../ParallelIndexSupervisorTaskKillTest.java | 1 + ...rallelIndexSupervisorTaskResourceTest.java | 1 + .../ParallelIndexSupervisorTaskSerdeTest.java | 1 + .../ParallelIndexSupervisorTaskTest.java | 2 ++ .../parallel/ParallelIndexTestingFactory.java | 3 +- .../ParallelIndexTuningConfigTest.java | 13 ++++++-- .../SinglePhaseParallelIndexingTest.java | 7 ++-- .../indexing/overlord/TaskLifecycleTest.java | 5 +++ .../SeekableStreamSupervisorSpecTest.java | 1 + .../SeekableStreamSupervisorStateTest.java | 1 + .../indexing/RealtimeTuningConfig.java | 28 ++++++++++++---- .../appenderator/AppenderatorConfig.java | 8 +++++ .../appenderator/BatchAppenderator.java | 6 ++-- .../appenderator/StreamAppenderator.java | 6 ++-- .../UnifiedIndexerAppenderatorsManager.java | 6 ++++ .../appenderator/AppenderatorPlumberTest.java | 1 + ...edSegmensSinksBatchAppenderatorTester.java | 19 +++++++++-- ...DefaultOfflineAppenderatorFactoryTest.java | 1 + ...enAndClosedSegmentsAppenderatorTester.java | 3 +- .../StreamAppenderatorTester.java | 1 + .../plumber/RealtimePlumberSchoolTest.java | 1 + .../segment/realtime/plumber/SinkTest.java | 4 ++- .../cli/validate/DruidJsonValidatorTest.java | 3 +- 56 files changed, 344 insertions(+), 92 deletions(-) diff --git a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java index 0f454102986d..0fb23b6052a7 100644 --- a/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java +++ b/extensions-contrib/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/common/K8sTestUtils.java @@ -135,7 +135,8 @@ public static Task getTask() null, null, null, - 1L + 1L, + null ) ), null diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java index be3aac9ac25d..de4bc38fc7a5 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java @@ -51,7 +51,8 @@ public KafkaIndexTaskTuningConfig( @Nullable Period intermediateHandoffPeriod, @Nullable Boolean logParseExceptions, @Nullable Integer maxParseExceptions, - @Nullable Integer maxSavedParseExceptions + @Nullable Integer maxSavedParseExceptions, + @Nullable Integer numPersistThreads ) { super( @@ -74,7 +75,8 @@ public KafkaIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); } @@ -97,7 +99,8 @@ private KafkaIndexTaskTuningConfig( @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { this( @@ -119,7 +122,8 @@ private KafkaIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); } @@ -145,7 +149,8 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir) getIntermediateHandoffPeriod(), isLogParseExceptions(), getMaxParseExceptions(), - getMaxSavedParseExceptions() + getMaxSavedParseExceptions(), + getNumPersistThreads() ); } @@ -171,7 +176,8 @@ public String toString() ", logParseExceptions=" + isLogParseExceptions() + ", maxParseExceptions=" + getMaxParseExceptions() + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + - '}'; + ", numPersistThreads=" + getNumPersistThreads() + + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index c99c36b8d456..c0975de94ae7 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -69,6 +69,7 @@ public static KafkaSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -98,7 +99,8 @@ public KafkaSupervisorTuningConfig( @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { super( @@ -120,7 +122,8 @@ public KafkaSupervisorTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); this.workerThreads = workerThreads; this.chatAsync = chatAsync; @@ -233,6 +236,7 @@ public String toString() ", logParseExceptions=" + isLogParseExceptions() + ", maxParseExceptions=" + getMaxParseExceptions() + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + + ", numPersistThreads=" + getNumPersistThreads() + '}'; } @@ -258,7 +262,8 @@ public KafkaIndexTaskTuningConfig convertToTaskTuningConfig() getIntermediateHandoffPeriod(), isLogParseExceptions(), getMaxParseExceptions(), - getMaxSavedParseExceptions() + getMaxSavedParseExceptions(), + getNumPersistThreads() ); } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java index 0ba17cdc3889..334ee8e33c3a 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -2806,7 +2806,8 @@ private KafkaIndexTask createTask( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + null ); if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) { final TreeMap> checkpoints = new TreeMap<>(); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java index 5033929830a9..5cfeafd1b7b1 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java @@ -72,6 +72,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertEquals(new IndexSpec(), config.getIndexSpecForIntermediatePersists()); Assert.assertEquals(false, config.isReportParseExceptions()); Assert.assertEquals(0, config.getHandoffConditionTimeout()); + Assert.assertEquals(1, config.getNumPersistThreads()); } @Test @@ -89,7 +90,8 @@ public void testSerdeWithNonDefaults() throws Exception + " \"handoffConditionTimeout\": 100,\n" + " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n" + " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n" - + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n" + + " \"appendableIndexSpec\": { \"type\" : \"onheap\" },\n" + + " \"numPersistThreads\": 2\n" + "}"; KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue( @@ -114,6 +116,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(100, config.getHandoffConditionTimeout()); Assert.assertEquals(new IndexSpec(null, null, CompressionStrategy.NONE, null), config.getIndexSpec()); Assert.assertEquals(new IndexSpec(null, CompressionStrategy.UNCOMPRESSED, null, null), config.getIndexSpecForIntermediatePersists()); + Assert.assertEquals(2, config.getNumPersistThreads()); } @Test @@ -144,7 +147,8 @@ public void testConvert() null, null, null, - null + null, + 2 ); KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig(); @@ -159,6 +163,7 @@ public void testConvert() Assert.assertEquals(new IndexSpec(), copy.getIndexSpec()); Assert.assertEquals(true, copy.isReportParseExceptions()); Assert.assertEquals(5L, copy.getHandoffConditionTimeout()); + Assert.assertEquals(2, copy.getNumPersistThreads()); } @Test @@ -183,7 +188,8 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException null, true, 42, - 42 + 42, + 2 ); String serialized = mapper.writeValueAsString(base); @@ -208,6 +214,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions()); Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions()); Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); + Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads()); } @Test @@ -232,6 +239,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException true, 42, 42, + 2, "extra string" ); @@ -256,6 +264,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions()); Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions()); Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); + Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads()); } @Test diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 28b915d22cd4..a106c76022da 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -333,6 +333,7 @@ public SeekableStreamIndexTaskClient build( null, null, null, + null, null ); @@ -479,6 +480,7 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException null, null, null, + null, null ), null @@ -3993,6 +3995,7 @@ public void testIsTaskCurrent() null, null, null, + null, null ) ); @@ -4033,6 +4036,7 @@ public void testIsTaskCurrent() null, null, null, + null, null ); @@ -4561,7 +4565,8 @@ public SeekableStreamIndexTaskClient build( null, null, null, - 10 + 10, + null ); return new TestableKafkaSupervisor( @@ -4675,6 +4680,7 @@ public SeekableStreamIndexTaskClient build( null, null, null, + null, null ); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java index fd6cbb48b897..8e5ad243d2e5 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java @@ -55,6 +55,7 @@ public TestModifiedKafkaIndexTaskTuningConfig( @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, @JsonProperty("extra") String extra ) { @@ -77,7 +78,8 @@ public TestModifiedKafkaIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); this.extra = extra; } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java index 06d8cf537230..b1ab31a63433 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java @@ -90,7 +90,8 @@ public KinesisIndexTaskTuningConfig( @Nullable Integer maxParseExceptions, @Nullable Integer maxSavedParseExceptions, @Nullable Integer maxRecordsPerPoll, - @Nullable Period intermediateHandoffPeriod + @Nullable Period intermediateHandoffPeriod, + @Nullable Integer numPersistThreads ) { super( @@ -113,7 +114,8 @@ public KinesisIndexTaskTuningConfig( intermediateHandoffPeriod, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); this.recordBufferSize = recordBufferSize; this.recordBufferOfferTimeout = recordBufferOfferTimeout == null @@ -154,7 +156,8 @@ private KinesisIndexTaskTuningConfig( @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, - @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { this( @@ -182,7 +185,8 @@ private KinesisIndexTaskTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod + intermediateHandoffPeriod, + numPersistThreads ); } @@ -270,7 +274,8 @@ public KinesisIndexTaskTuningConfig withBasePersistDirectory(File dir) getMaxParseExceptions(), getMaxSavedParseExceptions(), getMaxRecordsPerPollConfigured(), - getIntermediateHandoffPeriod() + getIntermediateHandoffPeriod(), + getNumPersistThreads() ); } @@ -333,6 +338,7 @@ public String toString() ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + ", maxRecordsPerPoll=" + maxRecordsPerPoll + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + - '}'; + ", numPersistThreads=" + getNumPersistThreads() + + '}'; } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index c9d00b77e986..d4add7695375 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -79,6 +79,7 @@ public static KinesisSupervisorTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -116,7 +117,8 @@ public KinesisSupervisorTuningConfig( @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration, @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod, - @JsonProperty("useListShards") Boolean useListShards + @JsonProperty("useListShards") Boolean useListShards, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { super( @@ -144,7 +146,8 @@ public KinesisSupervisorTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod + intermediateHandoffPeriod, + numPersistThreads ); this.workerThreads = workerThreads; @@ -271,6 +274,7 @@ public String toString() ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + ", repartitionTransitionDuration=" + getRepartitionTransitionDuration() + ", useListShards=" + isUseListShards() + + ", numPersistThreads=" + getNumPersistThreads() + '}'; } @@ -302,7 +306,8 @@ public KinesisIndexTaskTuningConfig convertToTaskTuningConfig() getMaxParseExceptions(), getMaxSavedParseExceptions(), getMaxRecordsPerPollConfigured(), - getIntermediateHandoffPeriod() + getIntermediateHandoffPeriod(), + getNumPersistThreads() ); } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index 9d9b3dd417d5..6a13ae003d5b 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -72,6 +72,7 @@ public class KinesisIndexTaskSerdeTest null, null, null, + null, null ); private static final KinesisIndexTaskIOConfig IO_CONFIG = new KinesisIndexTaskIOConfig( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 17beea1cceb1..a90ba03a5b21 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2360,7 +2360,8 @@ private KinesisIndexTask createTask( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod + intermediateHandoffPeriod, + null ); return createTask(taskId, dataSchema, ioConfig, tuningConfig, context); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 2892ea51c372..7483213a5fc0 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -83,6 +83,7 @@ public void testSerdeWithDefaults() throws Exception Assert.assertNull(config.getFetchThreads()); Assert.assertFalse(config.isSkipSequenceNumberAvailabilityCheck()); Assert.assertFalse(config.isResetOffsetAutomatically()); + Assert.assertEquals(1, config.getNumPersistThreads()); } @Test @@ -103,7 +104,8 @@ public void testSerdeWithNonDefaults() throws Exception + " \"resetOffsetAutomatically\": false,\n" + " \"skipSequenceNumberAvailabilityCheck\": true,\n" + " \"fetchThreads\": 2,\n" - + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n" + + " \"appendableIndexSpec\": { \"type\" : \"onheap\" },\n" + + " \"numPersistThreads\": 2\n" + "}"; KinesisIndexTaskTuningConfig config = (KinesisIndexTaskTuningConfig) mapper.readValue( @@ -131,6 +133,7 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(2, (int) config.getFetchThreads()); Assert.assertTrue(config.isSkipSequenceNumberAvailabilityCheck()); Assert.assertFalse(config.isResetOffsetAutomatically()); + Assert.assertEquals(2, config.getNumPersistThreads()); } @Test @@ -161,7 +164,8 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException 500, 500, 6000, - new Period("P3D") + new Period("P3D"), + 2 ); String serialized = mapper.writeValueAsString(base); @@ -190,6 +194,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout()); Assert.assertEquals(base.getRecordBufferSizeConfigured(), deserialized.getRecordBufferSizeConfigured()); Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), deserialized.getMaxRecordsPerPollConfigured()); + Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads()); } @Test @@ -220,7 +225,8 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException 500, 500, 6000, - new Period("P3D") + new Period("P3D"), + 2 ); String serialized = mapper.writeValueAsString(new TestModifiedKinesisIndexTaskTuningConfig(base, "loool")); @@ -248,6 +254,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout()); Assert.assertEquals(base.getRecordBufferSizeConfigured(), deserialized.getRecordBufferSizeConfigured()); Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), deserialized.getMaxRecordsPerPollConfigured()); + Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads()); } @Test @@ -313,7 +320,8 @@ public void testConvert() null, null, null, - null + null, + 2 ); KinesisIndexTaskTuningConfig copy = original.convertToTaskTuningConfig(); @@ -336,6 +344,7 @@ public void testConvert() Assert.assertTrue(copy.isResetOffsetAutomatically()); Assert.assertEquals(10, (int) copy.getMaxRecordsPerPollConfigured()); Assert.assertEquals(new Period().withDays(Integer.MAX_VALUE), copy.getIntermediateHandoffPeriod()); + Assert.assertEquals(2, copy.getNumPersistThreads()); } @Test diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 265a9fc144f8..0cd7291eab26 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -209,6 +209,7 @@ public void setupTest() null, null, null, + null, null ); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -3979,6 +3980,7 @@ public void testIsTaskCurrent() null, null, null, + null, null ); @@ -5036,6 +5038,7 @@ public SeekableStreamIndexTaskClient build( null, null, null, + null, null ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java index 70611266efe3..05ee17df2ca8 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java @@ -61,6 +61,7 @@ public TestModifiedKinesisIndexTaskTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, @JsonProperty("extra") String extra ) { @@ -89,7 +90,8 @@ public TestModifiedKinesisIndexTaskTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod + intermediateHandoffPeriod, + numPersistThreads ); this.extra = extra; } @@ -121,7 +123,8 @@ public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig bas base.getMaxParseExceptions(), base.getMaxSavedParseExceptions(), base.getMaxRecordsPerPollConfigured(), - base.getIntermediateHandoffPeriod() + base.getIntermediateHandoffPeriod(), + base.getNumPersistThreads() ); this.extra = extra; } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index d91daa6d92a6..0de1532fea76 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -68,6 +68,8 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig private final int maxParseExceptions; private final int maxSavedParseExceptions; + private final int numPersistThreads; + public RealtimeAppenderatorTuningConfig( @Nullable AppendableIndexSpec appendableIndexSpec, Integer maxRowsInMemory, @@ -87,7 +89,8 @@ public RealtimeAppenderatorTuningConfig( @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, @Nullable Boolean logParseExceptions, @Nullable Integer maxParseExceptions, - @Nullable Integer maxSavedParseExceptions + @Nullable Integer maxSavedParseExceptions, + @Nullable Integer numPersistThreads ) { this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; @@ -133,6 +136,8 @@ public RealtimeAppenderatorTuningConfig( this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions; + this.numPersistThreads = numPersistThreads == null ? + DEFAULT_NUM_PERSIST_THREADS : Math.max(numPersistThreads, DEFAULT_NUM_PERSIST_THREADS); } @JsonCreator @@ -154,7 +159,8 @@ private RealtimeAppenderatorTuningConfig( @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, - @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { this( @@ -176,7 +182,8 @@ private RealtimeAppenderatorTuningConfig( segmentWriteOutMediumFactory, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); } @@ -314,6 +321,13 @@ public int getMaxSavedParseExceptions() return maxSavedParseExceptions; } + @Override + @JsonProperty + public int getNumPersistThreads() + { + return numPersistThreads; + } + @Override public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir) { @@ -336,7 +350,8 @@ public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir) segmentWriteOutMediumFactory, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java index 9dc559657f06..243c61435679 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java @@ -292,7 +292,8 @@ static CompactionTuningConfig getTuningConfig(TuningConfig tuningConfig) parallelIndexTuningConfig.getMaxParseExceptions(), parallelIndexTuningConfig.getMaxSavedParseExceptions(), parallelIndexTuningConfig.getMaxColumnsToMerge(), - parallelIndexTuningConfig.getAwaitSegmentAvailabilityTimeoutMillis() + parallelIndexTuningConfig.getAwaitSegmentAvailabilityTimeoutMillis(), + parallelIndexTuningConfig.getNumPersistThreads() ); } else if (tuningConfig instanceof IndexTuningConfig) { final IndexTuningConfig indexTuningConfig = (IndexTuningConfig) tuningConfig; @@ -326,7 +327,8 @@ static CompactionTuningConfig getTuningConfig(TuningConfig tuningConfig) indexTuningConfig.getMaxParseExceptions(), indexTuningConfig.getMaxSavedParseExceptions(), indexTuningConfig.getMaxColumnsToMerge(), - indexTuningConfig.getAwaitSegmentAvailabilityTimeoutMillis() + indexTuningConfig.getAwaitSegmentAvailabilityTimeoutMillis(), + indexTuningConfig.getNumPersistThreads() ); } else { throw new ISE( @@ -1388,7 +1390,8 @@ public static CompactionTuningConfig defaultConfig() null, null, null, - 0L + 0L, + null ); } @@ -1424,7 +1427,8 @@ public CompactionTuningConfig( @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, - @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis + @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { super( @@ -1458,7 +1462,8 @@ public CompactionTuningConfig( maxSavedParseExceptions, maxColumnsToMerge, awaitSegmentAvailabilityTimeoutMillis, - null + null, + numPersistThreads ); Preconditions.checkArgument( @@ -1500,7 +1505,8 @@ public CompactionTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) getMaxParseExceptions(), getMaxSavedParseExceptions(), getMaxColumnsToMerge(), - getAwaitSegmentAvailabilityTimeoutMillis() + getAwaitSegmentAvailabilityTimeoutMillis(), + getNumPersistThreads() ); } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java index 31acf6e6b6a2..56bafce54e7b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/IndexTask.java @@ -1251,6 +1251,8 @@ public static class IndexTuningConfig implements AppenderatorConfig @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + private final int numPersistThreads; + @Nullable private static PartitionsSpec getPartitionsSpec( boolean forceGuaranteedRollup, @@ -1317,7 +1319,8 @@ public IndexTuningConfig( @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, - @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis + @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { this( @@ -1345,7 +1348,8 @@ public IndexTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxColumnsToMerge, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + numPersistThreads ); Preconditions.checkArgument( @@ -1356,7 +1360,7 @@ public IndexTuningConfig( private IndexTuningConfig() { - this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); } private IndexTuningConfig( @@ -1377,7 +1381,8 @@ private IndexTuningConfig( @Nullable Integer maxParseExceptions, @Nullable Integer maxSavedParseExceptions, @Nullable Integer maxColumnsToMerge, - @Nullable Long awaitSegmentAvailabilityTimeoutMillis + @Nullable Long awaitSegmentAvailabilityTimeoutMillis, + @Nullable Integer numPersistThreads ) { this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; @@ -1423,6 +1428,8 @@ private IndexTuningConfig( } else { this.awaitSegmentAvailabilityTimeoutMillis = awaitSegmentAvailabilityTimeoutMillis; } + this.numPersistThreads = numPersistThreads == null ? + DEFAULT_NUM_PERSIST_THREADS : Math.max(numPersistThreads, DEFAULT_NUM_PERSIST_THREADS); } @Override @@ -1446,7 +1453,8 @@ public IndexTuningConfig withBasePersistDirectory(File dir) maxParseExceptions, maxSavedParseExceptions, maxColumnsToMerge, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + numPersistThreads ); } @@ -1470,7 +1478,8 @@ public IndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpec) maxParseExceptions, maxSavedParseExceptions, maxColumnsToMerge, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + numPersistThreads ); } @@ -1659,6 +1668,13 @@ public long getAwaitSegmentAvailabilityTimeoutMillis() return awaitSegmentAvailabilityTimeoutMillis; } + @JsonProperty + @Override + public int getNumPersistThreads() + { + return numPersistThreads; + } + @Override public boolean equals(Object o) { @@ -1681,6 +1697,7 @@ public boolean equals(Object o) logParseExceptions == that.logParseExceptions && maxParseExceptions == that.maxParseExceptions && maxSavedParseExceptions == that.maxSavedParseExceptions && + numPersistThreads == that.numPersistThreads && Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(indexSpec, that.indexSpec) && Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && @@ -1710,7 +1727,8 @@ public int hashCode() maxParseExceptions, maxSavedParseExceptions, segmentWriteOutMediumFactory, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + numPersistThreads ); } @@ -1735,6 +1753,7 @@ public String toString() ", maxSavedParseExceptions=" + maxSavedParseExceptions + ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory + ", awaitSegmentAvailabilityTimeoutMillis=" + awaitSegmentAvailabilityTimeoutMillis + + ", numPersistThreads=" + numPersistThreads + '}'; } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java index 90850eb2bdb2..59c6ca438799 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTask.java @@ -1249,7 +1249,8 @@ private static IndexTuningConfig convertToIndexTuningConfig(ParallelIndexTuningC tuningConfig.getMaxParseExceptions(), tuningConfig.getMaxSavedParseExceptions(), tuningConfig.getMaxColumnsToMerge(), - tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis() + tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis(), + tuningConfig.getNumPersistThreads() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java index 1021244f6cfd..7e33261ee7ce 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfig.java @@ -107,6 +107,7 @@ public static ParallelIndexTuningConfig defaultConfig() null, null, null, + null, null ); } @@ -143,7 +144,8 @@ public ParallelIndexTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge, @JsonProperty("awaitSegmentAvailabilityTimeoutMillis") @Nullable Long awaitSegmentAvailabilityTimeoutMillis, - @JsonProperty("maxAllowedLockCount") @Nullable Integer maxAllowedLockCount + @JsonProperty("maxAllowedLockCount") @Nullable Integer maxAllowedLockCount, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { super( @@ -170,7 +172,8 @@ public ParallelIndexTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxColumnsToMerge, - awaitSegmentAvailabilityTimeoutMillis + awaitSegmentAvailabilityTimeoutMillis, + numPersistThreads ); if (maxNumSubTasks != null && maxNumConcurrentSubTasks != null) { @@ -306,7 +309,8 @@ public ParallelIndexTuningConfig withPartitionsSpec(PartitionsSpec partitionsSpe getMaxSavedParseExceptions(), getMaxColumnsToMerge(), getAwaitSegmentAvailabilityTimeoutMillis(), - getMaxAllowedLockCount() + getMaxAllowedLockCount(), + getNumPersistThreads() ); } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java index 904f2820af60..204d1b3e7a33 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java @@ -61,6 +61,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato private final int maxParseExceptions; private final int maxSavedParseExceptions; + private final int numPersistThreads; + public SeekableStreamIndexTaskTuningConfig( @Nullable AppendableIndexSpec appendableIndexSpec, @Nullable Integer maxRowsInMemory, @@ -81,7 +83,8 @@ public SeekableStreamIndexTaskTuningConfig( @Nullable Period intermediateHandoffPeriod, @Nullable Boolean logParseExceptions, @Nullable Integer maxParseExceptions, - @Nullable Integer maxSavedParseExceptions + @Nullable Integer maxSavedParseExceptions, + @Nullable Integer numPersistThreads ) { // Cannot be a static because default basePersistDirectory is unique per-instance @@ -134,6 +137,8 @@ public SeekableStreamIndexTaskTuningConfig( this.logParseExceptions = logParseExceptions == null ? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS : logParseExceptions; + this.numPersistThreads = numPersistThreads == null ? + DEFAULT_NUM_PERSIST_THREADS : Math.max(numPersistThreads, DEFAULT_NUM_PERSIST_THREADS); } @Override @@ -277,6 +282,13 @@ public boolean isSkipSequenceNumberAvailabilityCheck() return skipSequenceNumberAvailabilityCheck; } + @Override + @JsonProperty + public int getNumPersistThreads() + { + return numPersistThreads; + } + @Override public abstract SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir); @@ -302,6 +314,7 @@ public boolean equals(Object o) logParseExceptions == that.logParseExceptions && maxParseExceptions == that.maxParseExceptions && maxSavedParseExceptions == that.maxSavedParseExceptions && + numPersistThreads == that.numPersistThreads && Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(intermediatePersistPeriod, that.intermediatePersistPeriod) && Objects.equals(basePersistDirectory, that.basePersistDirectory) && @@ -333,7 +346,8 @@ public int hashCode() skipSequenceNumberAvailabilityCheck, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + numPersistThreads ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java index 0dbc6f718c8a..5b9b79ee1a13 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestIndexTask.java @@ -89,6 +89,7 @@ public TestIndexTask( null, null, null, + null, null ) ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 8daedf81c9c7..0acfe1b77264 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -1435,7 +1435,8 @@ private AppenderatorDriverRealtimeIndexTask makeRealtimeTask( null, logParseExceptions, maxParseExceptions, - maxSavedParseExceptions + maxSavedParseExceptions, + null ); return new AppenderatorDriverRealtimeIndexTask( taskId, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorsTest.java index c7b312f190b4..954247fac251 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/AppenderatorsTest.java @@ -175,7 +175,8 @@ public AppenderatorTester( 0L, OffHeapMemorySegmentWriteOutMediumFactory.instance(), IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE, - basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory + basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory, + null ); metrics = new FireDepartmentMetrics(); @@ -350,6 +351,7 @@ static class TestIndexTuningConfig implements AppenderatorConfig private final IndexSpec indexSpecForIntermediatePersists; @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + private final int numPersistThreads; public TestIndexTuningConfig( AppendableIndexSpec appendableIndexSpec, @@ -362,7 +364,8 @@ public TestIndexTuningConfig( Long pushTimeout, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, Integer maxColumnsToMerge, - File basePersistDirectory + File basePersistDirectory, + Integer numPersistThreads ) { this.appendableIndexSpec = appendableIndexSpec; @@ -379,6 +382,8 @@ public TestIndexTuningConfig( this.partitionsSpec = null; this.indexSpecForIntermediatePersists = this.indexSpec; + + this.numPersistThreads = numPersistThreads == null ? DEFAULT_NUM_PERSIST_THREADS : numPersistThreads; } @Override @@ -467,6 +472,12 @@ public Period getIntermediatePersistPeriod() return new Period(Integer.MAX_VALUE); // intermediate persist doesn't make much sense for batch jobs } + @Override + public int getNumPersistThreads() + { + return numPersistThreads; + } + @Override public boolean equals(Object o) { @@ -485,6 +496,7 @@ public boolean equals(Object o) maxPendingPersists == that.maxPendingPersists && reportParseExceptions == that.reportParseExceptions && pushTimeout == that.pushTimeout && + numPersistThreads == that.numPersistThreads && Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(indexSpec, that.indexSpec) && Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && @@ -508,7 +520,8 @@ public int hashCode() maxPendingPersists, reportParseExceptions, pushTimeout, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + numPersistThreads ); } @@ -528,6 +541,7 @@ public String toString() ", reportParseExceptions=" + reportParseExceptions + ", pushTimeout=" + pushTimeout + ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory + + ", numPersistThreads=" + numPersistThreads + '}'; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/BatchAppenderatorsTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/BatchAppenderatorsTest.java index 1eae933df6e8..beb08b834184 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/BatchAppenderatorsTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/BatchAppenderatorsTest.java @@ -180,7 +180,8 @@ public AppenderatorTester( 0L, OffHeapMemorySegmentWriteOutMediumFactory.instance(), IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE, - basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory + basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory, + null ); metrics = new FireDepartmentMetrics(); @@ -379,6 +380,7 @@ static class TestIndexTuningConfig implements AppenderatorConfig private final IndexSpec indexSpecForIntermediatePersists; @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + private final int numPersistThreads; public TestIndexTuningConfig( AppendableIndexSpec appendableIndexSpec, @@ -391,7 +393,8 @@ public TestIndexTuningConfig( Long pushTimeout, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, Integer maxColumnsToMerge, - File basePersistDirectory + File basePersistDirectory, + Integer numPersistThreads ) { this.appendableIndexSpec = appendableIndexSpec; @@ -408,6 +411,8 @@ public TestIndexTuningConfig( this.partitionsSpec = null; this.indexSpecForIntermediatePersists = this.indexSpec; + + this.numPersistThreads = numPersistThreads == null ? DEFAULT_NUM_PERSIST_THREADS : numPersistThreads; } @Override @@ -496,6 +501,12 @@ public Period getIntermediatePersistPeriod() return new Period(Integer.MAX_VALUE); // intermediate persist doesn't make much sense for batch jobs } + @Override + public int getNumPersistThreads() + { + return numPersistThreads; + } + @Override public boolean equals(Object o) { @@ -514,6 +525,7 @@ public boolean equals(Object o) maxPendingPersists == that.maxPendingPersists && reportParseExceptions == that.reportParseExceptions && pushTimeout == that.pushTimeout && + numPersistThreads == that.numPersistThreads && Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(indexSpec, that.indexSpec) && Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && @@ -537,7 +549,8 @@ public int hashCode() maxPendingPersists, reportParseExceptions, pushTimeout, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + numPersistThreads ); } @@ -557,6 +570,7 @@ public String toString() ", reportParseExceptions=" + reportParseExceptions + ", pushTimeout=" + pushTimeout + ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory + + ", numPersistThreads=" + numPersistThreads + '}'; } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java index c6b6d6fdf0a4..bb67231ae89f 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ClientCompactionTaskQuerySerdeTest.java @@ -290,6 +290,7 @@ public void testCompactionTaskToClientCompactionTaskQuery() throws IOException null, 2, null, + null, null ) ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java index 702074aad930..811e3d6f0caf 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskRunTest.java @@ -337,6 +337,7 @@ public void testRunWithHashPartitioning() throws Exception null, null, null, + null, null ) ) diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java index 374adcb43841..cf30d6d05030 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java @@ -350,6 +350,7 @@ private static CompactionTask.CompactionTuningConfig createTuningConfig() null, null, null, + null, null ); } @@ -643,6 +644,7 @@ public void testSerdeWithOldTuningConfigSuccessfullyDeserializeToNewOne() throws null, null, null, + null, null ), null, @@ -706,6 +708,7 @@ public void testGetTuningConfigWithIndexTuningConfig() null, null, null, + null, null ); @@ -744,6 +747,7 @@ public void testGetTuningConfigWithIndexTuningConfig() null, null, null, + null, null ); @@ -790,6 +794,7 @@ public void testGetTuningConfigWithParallelIndexTuningConfig() null, null, null, + null, null ); @@ -828,6 +833,7 @@ public void testGetTuningConfigWithParallelIndexTuningConfig() null, null, null, + null, null ); @@ -977,6 +983,7 @@ public void testCreateIngestionSchemaWithTargetPartitionSize() throws IOExceptio null, null, null, + null, null ); final List ingestionSpecs = CompactionTask.createIngestionSchema( @@ -1052,6 +1059,7 @@ public void testCreateIngestionSchemaWithMaxTotalRows() throws IOException null, null, null, + null, null ); final List ingestionSpecs = CompactionTask.createIngestionSchema( @@ -1127,6 +1135,7 @@ public void testCreateIngestionSchemaWithNumShards() throws IOException null, null, null, + null, null ); final List ingestionSpecs = CompactionTask.createIngestionSchema( @@ -1875,6 +1884,7 @@ private void assertIngestionSchema( null, null, null, + null, null ), expectedSegmentGranularity, diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTuningConfigTest.java index 2370d4c94ac5..47c9bb397f65 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTuningConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTuningConfigTest.java @@ -102,7 +102,8 @@ public void testSerdeWithNonZeroAwaitSegmentAvailabilityTimeoutMillis() null, null, null, - 5L + 5L, + null ); } @@ -144,7 +145,8 @@ public void testSerdeWithZeroAwaitSegmentAvailabilityTimeoutMillis() null, null, null, - 0L + 0L, + null ); Assert.assertEquals(0L, tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis()); } @@ -187,6 +189,7 @@ public void testSerdeWithNullAwaitSegmentAvailabilityTimeoutMillis() null, null, null, + null, null ); Assert.assertEquals(0L, tuningConfig.getAwaitSegmentAvailabilityTimeoutMillis()); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java index 29faed025b94..42a114cd571d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskSerdeTest.java @@ -85,7 +85,8 @@ public void testSerdeTuningConfigWithDynamicPartitionsSpec() throws IOException 10, 100, 1234, - 0L + 0L, + null ); assertSerdeTuningConfig(tuningConfig); } @@ -122,7 +123,8 @@ public void testSerdeTuningConfigWithHashedPartitionsSpec() throws IOException 10, 100, null, - -1L + -1L, + null ); assertSerdeTuningConfig(tuningConfig); } @@ -159,7 +161,8 @@ public void testSerdeTuningConfigWithDeprecatedDynamicPartitionsSpec() throws IO 10, 100, null, - 1L + 1L, + null ); assertSerdeTuningConfig(tuningConfig); } @@ -196,6 +199,7 @@ public void testSerdeTuningConfigWithDeprecatedHashedPartitionsSpec() throws IOE 10, 100, 1234, + null, null ); assertSerdeTuningConfig(tuningConfig); @@ -235,6 +239,7 @@ public void testForceGuaranteedRollupWithDynamicPartitionsSpec() 10, 100, null, + null, null ); } @@ -273,6 +278,7 @@ public void testBestEffortRollupWithHashedPartitionsSpec() 10, 100, null, + null, null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java index 5b6dcbfcc421..771405f68a48 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java @@ -1569,6 +1569,7 @@ public void testMultipleParseExceptionsSuccess() throws Exception 7, 7, null, + null, null ); @@ -1747,6 +1748,7 @@ public void testMultipleParseExceptionsFailure() throws Exception 2, 5, null, + null, null ); @@ -1889,6 +1891,7 @@ public void testMultipleParseExceptionsFailureAtDeterminePartitions() throws Exc 2, 5, null, + null, null ); @@ -2727,6 +2730,7 @@ static IndexTuningConfig createTuningConfig( null, 1, null, + null, null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java index c3c1f4b7800e..ae3d0f846e66 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -849,6 +849,7 @@ private RealtimeIndexTask makeRealtimeTask( handoffTimeout, null, null, + null, null ); return new RealtimeIndexTask( diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java index 7bc6548dc209..3bb1d7739ac8 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java @@ -271,7 +271,8 @@ public void testIndexTaskSerde() throws Exception null, null, null, - 1L + 1L, + null ) ), null @@ -357,6 +358,7 @@ public void testIndexTaskwithResourceSerde() throws Exception null, null, null, + null, null ) ), @@ -426,6 +428,7 @@ public void testRealtimeIndexTaskSerde() throws Exception null, null, null, + null, null ) ), diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java index 109a99fb36e9..c8585644968a 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractParallelIndexSupervisorTaskTest.java @@ -192,6 +192,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase 5, null, null, + null, null ); @@ -320,6 +321,7 @@ protected ParallelIndexTuningConfig newTuningConfig( 5, null, null, + null, null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java index 7bf894fff4d7..be75a32c878c 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskKillTest.java @@ -202,6 +202,7 @@ private ParallelIndexSupervisorTask newTask( null, null, null, + null, null ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java index b11d939f760f..9cb5790d2904 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskResourceTest.java @@ -450,6 +450,7 @@ private TestSupervisorTask newTask( null, null, null, + null, null ) ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java index 6c83c225fa9f..7b92ba44bd9d 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskSerdeTest.java @@ -271,6 +271,7 @@ ParallelIndexIngestionSpec build() null, null, null, + null, null ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java index 02966e623bf7..bd7b69bd8cc0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java @@ -271,6 +271,7 @@ public void testFailToConstructWhenBothAppendToExistingAndForceGuaranteedRollupA null, null, null, + null, null ); final ParallelIndexIngestionSpec indexIngestionSpec = new ParallelIndexIngestionSpec( @@ -343,6 +344,7 @@ public void testFailToConstructWhenBothInputSourceAndParserAreSet() null, null, null, + null, null ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java index 7f315ab33043..0a0d4d4692f0 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTestingFactory.java @@ -184,7 +184,8 @@ ParallelIndexTuningConfig build() 25, null, null, - null + null, + 2 ); } } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java index 6bbe5fe2f976..294b45fb06d2 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexTuningConfigTest.java @@ -103,7 +103,8 @@ public void testSerdeWithMaxRowsPerSegment() null, null, null, - null + null, + 2 ); final byte[] json = mapper.writeValueAsBytes(tuningConfig); final ParallelIndexTuningConfig fromJson = (ParallelIndexTuningConfig) mapper.readValue(json, TuningConfig.class); @@ -150,7 +151,8 @@ public void testSerdeWithMaxNumConcurrentSubTasks() throws IOException null, null, null, - null + null, + 2 ); final byte[] json = mapper.writeValueAsBytes(tuningConfig); final ParallelIndexTuningConfig fromJson = (ParallelIndexTuningConfig) mapper.readValue(json, TuningConfig.class); @@ -197,7 +199,8 @@ public void testSerdeWithMaxNumSubTasks() throws IOException null, null, null, - null + null, + 2 ); final byte[] json = mapper.writeValueAsBytes(tuningConfig); final ParallelIndexTuningConfig fromJson = (ParallelIndexTuningConfig) mapper.readValue(json, TuningConfig.class); @@ -246,6 +249,7 @@ public void testSerdeWithMaxNumSubTasksAndMaxNumConcurrentSubTasks() null, null, null, + null, null ); } @@ -292,6 +296,7 @@ public void testConstructorWithHashedPartitionsSpecAndNonForceGuaranteedRollupFa null, null, null, + null, null ); } @@ -338,6 +343,7 @@ public void testConstructorWithSingleDimensionPartitionsSpecAndNonForceGuarantee null, null, null, + null, null ); } @@ -384,6 +390,7 @@ public void testConstructorWithDynamicPartitionsSpecAndForceGuaranteedRollupFail null, null, null, + null, null ); } diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java index 0b5fbf2dce51..9b638962ef40 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java @@ -574,6 +574,7 @@ public void testWith1MaxNumConcurrentSubTasks() null, null, null, + null, null ), VALID_INPUT_SOURCE_FILTER @@ -699,7 +700,8 @@ public void testMaxLocksWith1MaxNumConcurrentSubTasks() null, null, null, - 0 + 0, + null ), VALID_INPUT_SOURCE_FILTER ); @@ -760,7 +762,8 @@ public void testMaxLocksWith2MaxNumConcurrentSubTasks() null, null, null, - 0 + 0, + null ), VALID_INPUT_SOURCE_FILTER ); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java index eea61893fc07..9935e834d83b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/TaskLifecycleTest.java @@ -799,6 +799,7 @@ public void testIndexTask() throws Exception null, null, null, + null, null ) ), @@ -883,6 +884,7 @@ public void testIndexTaskFailure() throws Exception null, null, null, + null, null ) ), @@ -1319,6 +1321,7 @@ public void testResumeTasks() throws Exception null, null, null, + null, null ) ), @@ -1430,6 +1433,7 @@ public void testUnifiedAppenderatorsManagerCleanup() throws Exception null, null, null, + null, null ) ), @@ -1597,6 +1601,7 @@ private RealtimeIndexTask newRealtimeIndexTask() null, null, null, + null, null ); FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java index 52e160058588..32013b005d82 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -512,6 +512,7 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() null, null, null, + null, null ) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index d622984b0bab..c78851c7bd97 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1223,6 +1223,7 @@ public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() null, null, null, + null, null ) { diff --git a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java index 55ae04bd7aec..a16a999ad3ff 100644 --- a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java @@ -78,7 +78,8 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File DEFAULT_HANDOFF_CONDITION_TIMEOUT, DEFAULT_ALERT_TIMEOUT, null, - DEFAULT_DEDUP_COLUMN + DEFAULT_DEDUP_COLUMN, + DEFAULT_NUM_PERSIST_THREADS ); } @@ -104,6 +105,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable File private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; @Nullable private final String dedupColumn; + private final int numPersistThreads; public RealtimeTuningConfig( @Nullable AppendableIndexSpec appendableIndexSpec, @@ -125,7 +127,8 @@ public RealtimeTuningConfig( Long handoffConditionTimeout, Long alertTimeout, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, - @Nullable String dedupColumn + @Nullable String dedupColumn, + @Nullable Integer numPersistThreads ) { this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec; @@ -163,6 +166,8 @@ public RealtimeTuningConfig( Preconditions.checkArgument(this.alertTimeout >= 0, "alertTimeout must be >= 0"); this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory; this.dedupColumn = dedupColumn == null ? DEFAULT_DEDUP_COLUMN : dedupColumn; + this.numPersistThreads = numPersistThreads == null ? + DEFAULT_NUM_PERSIST_THREADS : Math.max(numPersistThreads, DEFAULT_NUM_PERSIST_THREADS); } @JsonCreator @@ -184,7 +189,8 @@ private RealtimeTuningConfig( @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout, @JsonProperty("alertTimeout") Long alertTimeout, @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, - @JsonProperty("dedupColumn") @Nullable String dedupColumn + @JsonProperty("dedupColumn") @Nullable String dedupColumn, + @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads ) { this( @@ -207,7 +213,8 @@ private RealtimeTuningConfig( handoffConditionTimeout, alertTimeout, segmentWriteOutMediumFactory, - dedupColumn + dedupColumn, + numPersistThreads ); } @@ -348,6 +355,13 @@ public String getDedupColumn() return dedupColumn; } + @Override + @JsonProperty + public int getNumPersistThreads() + { + return numPersistThreads; + } + public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) { return new RealtimeTuningConfig( @@ -370,7 +384,8 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy) handoffConditionTimeout, alertTimeout, segmentWriteOutMediumFactory, - dedupColumn + dedupColumn, + numPersistThreads ); } @@ -397,7 +412,8 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir) handoffConditionTimeout, alertTimeout, segmentWriteOutMediumFactory, - dedupColumn + dedupColumn, + numPersistThreads ); } } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java index fff3466a94eb..ab1b09b772cd 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -28,12 +28,20 @@ public interface AppenderatorConfig extends TuningConfig { + + int DEFAULT_NUM_PERSIST_THREADS = 1; + boolean isReportParseExceptions(); int getMaxPendingPersists(); boolean isSkipBytesInMemoryOverheadCheck(); + default int getNumPersistThreads() + { + return DEFAULT_NUM_PERSIST_THREADS; + } + /** * Maximum number of rows in a single segment before pushing to deep storage */ diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java index 9e0a2672f0d4..60ac99dbca61 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BatchAppenderator.java @@ -214,11 +214,11 @@ private void initializeExecutors() log.debug("There will be up to[%d] pending persists", maxPendingPersists); if (persistExecutor == null) { - // use a blocking single threaded executor to throttle the firehose when write to disk is slow + log.info("Number of persist threads [%d]", tuningConfig.getNumPersistThreads()); persistExecutor = MoreExecutors.listeningDecorator( - Execs.newBlockingSingleThreaded( + Execs.newBlockingThreaded( "[" + StringUtils.encodeForFormat(myId) + "]-batch-appenderator-persist", - maxPendingPersists + tuningConfig.getNumPersistThreads(), maxPendingPersists ) ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java index 70a60a77d85e..324c7f35b103 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderator.java @@ -1066,11 +1066,11 @@ private void initializeExecutors() final int maxPendingPersists = tuningConfig.getMaxPendingPersists(); if (persistExecutor == null) { - // use a blocking single threaded executor to throttle the firehose when write to disk is slow + log.info("Number of persist threads [%d]", tuningConfig.getNumPersistThreads()); persistExecutor = MoreExecutors.listeningDecorator( - Execs.newBlockingSingleThreaded( + Execs.newBlockingThreaded( "[" + StringUtils.encodeForFormat(myId) + "]-appenderator-persist", - maxPendingPersists + tuningConfig.getNumPersistThreads(), maxPendingPersists ) ); } diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java index 0a728eb890c3..042f86ac59f0 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/UnifiedIndexerAppenderatorsManager.java @@ -567,6 +567,12 @@ public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory() { return baseConfig.getSegmentWriteOutMediumFactory(); } + + @Override + public int getNumPersistThreads() + { + return baseConfig.getNumPersistThreads(); + } } private IndexMerger wrapIndexMerger(IndexMerger baseMerger) diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index dc96e912dcf1..f795492819a8 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -92,6 +92,7 @@ public void setUp() throws Exception null, null, null, + null, null ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmensSinksBatchAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmensSinksBatchAppenderatorTester.java index 01895f077f8f..f0c5781c9d1e 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmensSinksBatchAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/ClosedSegmensSinksBatchAppenderatorTester.java @@ -179,7 +179,8 @@ public ClosedSegmensSinksBatchAppenderatorTester( 0L, OffHeapMemorySegmentWriteOutMediumFactory.instance(), IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE, - basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory + basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory, + null ); metrics = new FireDepartmentMetrics(); @@ -316,6 +317,7 @@ static class TestIndexTuningConfig implements AppenderatorConfig private final IndexSpec indexSpecForIntermediatePersists; @Nullable private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory; + private final int numPersistThreads; public TestIndexTuningConfig( AppendableIndexSpec appendableIndexSpec, @@ -328,7 +330,8 @@ public TestIndexTuningConfig( Long pushTimeout, @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, Integer maxColumnsToMerge, - File basePersistDirectory + File basePersistDirectory, + @Nullable Integer numPersistThreads ) { this.appendableIndexSpec = appendableIndexSpec; @@ -345,6 +348,7 @@ public TestIndexTuningConfig( this.partitionsSpec = null; this.indexSpecForIntermediatePersists = this.indexSpec; + this.numPersistThreads = numPersistThreads == null ? DEFAULT_NUM_PERSIST_THREADS : numPersistThreads; } @Override @@ -432,6 +436,12 @@ public Period getIntermediatePersistPeriod() { return new Period(Integer.MAX_VALUE); // intermediate persist doesn't make much sense for batch jobs } + + @Override + public int getNumPersistThreads() + { + return numPersistThreads; + } @Override public boolean equals(Object o) @@ -451,6 +461,7 @@ public boolean equals(Object o) maxPendingPersists == that.maxPendingPersists && reportParseExceptions == that.reportParseExceptions && pushTimeout == that.pushTimeout && + numPersistThreads == that.numPersistThreads && Objects.equals(partitionsSpec, that.partitionsSpec) && Objects.equals(indexSpec, that.indexSpec) && Objects.equals(indexSpecForIntermediatePersists, that.indexSpecForIntermediatePersists) && @@ -474,7 +485,8 @@ public int hashCode() maxPendingPersists, reportParseExceptions, pushTimeout, - segmentWriteOutMediumFactory + segmentWriteOutMediumFactory, + numPersistThreads ); } @@ -494,6 +506,7 @@ public String toString() ", reportParseExceptions=" + reportParseExceptions + ", pushTimeout=" + pushTimeout + ", segmentWriteOutMediumFactory=" + segmentWriteOutMediumFactory + + ", numPersistThreads=" + numPersistThreads + '}'; } } diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index cf8f2eea0d38..f9121e668def 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -151,6 +151,7 @@ public int columnCacheSizeBytes() null, null, null, + null, null ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsAppenderatorTester.java index e7e94d3a7e8e..52c5a27ca7c7 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/OpenAndClosedSegmentsAppenderatorTester.java @@ -150,7 +150,8 @@ public OpenAndClosedSegmentsAppenderatorTester( 0L, OffHeapMemorySegmentWriteOutMediumFactory.instance(), IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE, - basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory + basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory, + null ); metrics = new FireDepartmentMetrics(); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java index 413f315f50a7..bc224a99fc63 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java @@ -147,6 +147,7 @@ public StreamAppenderatorTester( null, null, null, + null, null ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index eeea2a930e53..74a1217ed275 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -225,6 +225,7 @@ public void setUp() throws Exception null, null, null, + null, null ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java index 2d3bdc4ae7ee..013d5bd1d37d 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java @@ -84,6 +84,7 @@ public void testSwap() throws Exception null, null, null, + null, null ); final Sink sink = new Sink( @@ -241,7 +242,8 @@ public void testDedup() throws Exception null, null, null, - "dedupColumn" + "dedupColumn", + null ); final Sink sink = new Sink( interval, diff --git a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java index 3b920dfa656b..35584da1f363 100644 --- a/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/org/apache/druid/cli/validate/DruidJsonValidatorTest.java @@ -173,7 +173,8 @@ public void testTaskValidator() throws Exception null, null, null, - null + null, + 2 ) ), null From 96e03e70cf0af36149c0f3af869d756828f8b94d Mon Sep 17 00:00:00 2001 From: Pramod Immaneni Date: Thu, 1 Jun 2023 18:13:39 -0700 Subject: [PATCH 3/8] Cleaned up earlier implementation attempt that is superseded --- .../realtime/appenderator/AppenderatorImpl.java | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java index 886b11d91cfe..9d7fd6766763 100644 --- a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -187,9 +187,6 @@ public class AppenderatorImpl implements Appenderator */ private final Map> persistedHydrantMetadata = Collections.synchronizedMap(new IdentityHashMap<>()); - - private int persistThreads = 1; - /** * This constructor allows the caller to provide its own SinkQuerySegmentWalker. * @@ -249,18 +246,6 @@ public class AppenderatorImpl implements Appenderator } else { log.debug("Running closed segments appenderator"); } - - try { - String pThreads = System.getProperty("druid.exp.persist.threads"); - if (pThreads != null) { - persistThreads = Integer.parseInt(pThreads); - } - } - catch (Exception e) { - log.warn(e, "Error getting persist threads, defaulting"); - } - - log.info("Number of persist threads [%d]", persistThreads); } @Override @@ -1162,7 +1147,7 @@ private void initializeExecutors() persistExecutor = MoreExecutors.listeningDecorator( Execs.newBlockingThreaded( "[" + StringUtils.encodeForFormat(myId) + "]-appenderator-persist", - persistThreads, maxPendingPersists + tuningConfig.getNumPersistThreads(), maxPendingPersists ) ); } From d6de2dfa17fb71ae668262e496de5e9866b3e9c6 Mon Sep 17 00:00:00 2001 From: Pramod Immaneni Date: Fri, 2 Jun 2023 15:18:07 -0700 Subject: [PATCH 4/8] Added documentation --- docs/development/extensions-core/kafka-supervisor-reference.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md b/docs/development/extensions-core/kafka-supervisor-reference.md index d2ac7afcbe16..0c96801ce54b 100644 --- a/docs/development/extensions-core/kafka-supervisor-reference.md +++ b/docs/development/extensions-core/kafka-supervisor-reference.md @@ -198,6 +198,7 @@ The `tuningConfig` is optional and default parameters will be used if no `tuning | `maxTotalRows` | Long | The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier. | no (default == unlimited) | | `intermediatePersistPeriod` | ISO8601 Period | The period that determines the rate at which intermediate persists occur. | no (default == PT10M) | | `maxPendingPersists` | Integer | Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with `maxRowsInMemory` * (2 + `maxPendingPersists`). | no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up) | +| numPersistThreads | Integer | The number of threads to use to create and persist incremental segments on the disk. Higher ingestion data throughput results in larger number of incremental segments, causing significant cpu time to be spent on the creation of the incremental segments on the disk. For datasources with number of columns running into hundreds or thousands, creation of the incremental segments may take up significant time, in the order of multiple seconds. Both these scenarios can cause ingestion can pause frequently or stall causing it to fall behind. With more threads the segment creation can be parallelized without blocking ingestion as long as there are sufficient cpu resources available. | no (default == 1) | | `indexSpec` | Object | Tune how data is indexed. See [IndexSpec](#indexspec) for more information. | no | | `indexSpecForIntermediatePersists`| | Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values. | no (default = same as `indexSpec`) | | `reportParseExceptions` | Boolean | *DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1. | no (default == false) | From c20736b9f3005322dc159f1daf03a1cf3b3ca0e6 Mon Sep 17 00:00:00 2001 From: Pramod Immaneni Date: Mon, 31 Jul 2023 06:52:36 -0700 Subject: [PATCH 5/8] Incorporated documentation feedback --- docs/development/extensions-core/kafka-supervisor-reference.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md b/docs/development/extensions-core/kafka-supervisor-reference.md index 0c96801ce54b..627418add2a9 100644 --- a/docs/development/extensions-core/kafka-supervisor-reference.md +++ b/docs/development/extensions-core/kafka-supervisor-reference.md @@ -198,7 +198,7 @@ The `tuningConfig` is optional and default parameters will be used if no `tuning | `maxTotalRows` | Long | The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier. | no (default == unlimited) | | `intermediatePersistPeriod` | ISO8601 Period | The period that determines the rate at which intermediate persists occur. | no (default == PT10M) | | `maxPendingPersists` | Integer | Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with `maxRowsInMemory` * (2 + `maxPendingPersists`). | no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up) | -| numPersistThreads | Integer | The number of threads to use to create and persist incremental segments on the disk. Higher ingestion data throughput results in larger number of incremental segments, causing significant cpu time to be spent on the creation of the incremental segments on the disk. For datasources with number of columns running into hundreds or thousands, creation of the incremental segments may take up significant time, in the order of multiple seconds. Both these scenarios can cause ingestion can pause frequently or stall causing it to fall behind. With more threads the segment creation can be parallelized without blocking ingestion as long as there are sufficient cpu resources available. | no (default == 1) | +| numPersistThreads | Integer | The number of threads to use to create and persist incremental segments on the disk. Higher ingestion data throughput results in a larger number of incremental segments, causing significant CPU time to be spent on the creation of the incremental segments on the disk. For datasources with number of columns running into hundreds or thousands, creation of the incremental segments may take up significant time, in the order of multiple seconds. In both of these scenarios, ingestion can stall or pause frequently, causing it to fall behind. You can use additional threads to parallelize the segment creation without blocking ingestion as long as there are sufficient CPU resources available. | no (default == 1) | | `indexSpec` | Object | Tune how data is indexed. See [IndexSpec](#indexspec) for more information. | no | | `indexSpecForIntermediatePersists`| | Defines segment storage format options to be used at indexing time for intermediate persisted temporary segments. This can be used to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published, see [IndexSpec](#indexspec) for possible values. | no (default = same as `indexSpec`) | | `reportParseExceptions` | Boolean | *DEPRECATED*. If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped. Setting `reportParseExceptions` to true will override existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more than 1. | no (default == false) | From f6bf56dba7742a7020bae7dfcd49a58afba06e82 Mon Sep 17 00:00:00 2001 From: Pramod Immaneni Date: Sat, 3 Feb 2024 07:22:25 -0800 Subject: [PATCH 6/8] Revert incremental segment persistence parallelization for Kinesis as requested --- .../kinesis/KinesisIndexTaskTuningConfig.java | 15 +++++---------- .../supervisor/KinesisSupervisorTuningConfig.java | 11 +++-------- .../kinesis/KinesisIndexTaskSerdeTest.java | 1 - .../indexing/kinesis/KinesisIndexTaskTest.java | 3 +-- .../kinesis/KinesisIndexTaskTuningConfigTest.java | 15 +++------------ .../kinesis/supervisor/KinesisSupervisorTest.java | 3 --- .../TestModifiedKinesisIndexTaskTuningConfig.java | 7 ++----- 7 files changed, 14 insertions(+), 41 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java index b1ab31a63433..69c357171466 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java @@ -90,8 +90,7 @@ public KinesisIndexTaskTuningConfig( @Nullable Integer maxParseExceptions, @Nullable Integer maxSavedParseExceptions, @Nullable Integer maxRecordsPerPoll, - @Nullable Period intermediateHandoffPeriod, - @Nullable Integer numPersistThreads + @Nullable Period intermediateHandoffPeriod ) { super( @@ -115,7 +114,7 @@ public KinesisIndexTaskTuningConfig( logParseExceptions, maxParseExceptions, maxSavedParseExceptions, - numPersistThreads + null ); this.recordBufferSize = recordBufferSize; this.recordBufferOfferTimeout = recordBufferOfferTimeout == null @@ -156,8 +155,7 @@ private KinesisIndexTaskTuningConfig( @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, - @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, - @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod ) { this( @@ -185,8 +183,7 @@ private KinesisIndexTaskTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod, - numPersistThreads + intermediateHandoffPeriod ); } @@ -274,8 +271,7 @@ public KinesisIndexTaskTuningConfig withBasePersistDirectory(File dir) getMaxParseExceptions(), getMaxSavedParseExceptions(), getMaxRecordsPerPollConfigured(), - getIntermediateHandoffPeriod(), - getNumPersistThreads() + getIntermediateHandoffPeriod() ); } @@ -338,7 +334,6 @@ public String toString() ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + ", maxRecordsPerPoll=" + maxRecordsPerPoll + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + - ", numPersistThreads=" + getNumPersistThreads() + '}'; } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java index 580c6623f8c6..357bc9e57ca4 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java @@ -74,7 +74,6 @@ public static KinesisSupervisorTuningConfig defaultConfig() null, null, null, - null, null ); } @@ -110,8 +109,7 @@ public KinesisSupervisorTuningConfig( @JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod, @JsonProperty("repartitionTransitionDuration") Period repartitionTransitionDuration, @JsonProperty("offsetFetchPeriod") Period offsetFetchPeriod, - @JsonProperty("useListShards") Boolean useListShards, - @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads + @JsonProperty("useListShards") Boolean useListShards ) { super( @@ -139,8 +137,7 @@ public KinesisSupervisorTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod, - numPersistThreads + intermediateHandoffPeriod ); this.workerThreads = workerThreads; @@ -240,7 +237,6 @@ public String toString() ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + ", repartitionTransitionDuration=" + getRepartitionTransitionDuration() + ", useListShards=" + isUseListShards() + - ", numPersistThreads=" + getNumPersistThreads() + '}'; } @@ -272,8 +268,7 @@ public KinesisIndexTaskTuningConfig convertToTaskTuningConfig() getMaxParseExceptions(), getMaxSavedParseExceptions(), getMaxRecordsPerPollConfigured(), - getIntermediateHandoffPeriod(), - getNumPersistThreads() + getIntermediateHandoffPeriod() ); } } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java index 3bb43ba0d3bc..2cba3f54187f 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java @@ -76,7 +76,6 @@ public class KinesisIndexTaskSerdeTest null, null, null, - null, null ); private static final KinesisIndexTaskIOConfig IO_CONFIG = new KinesisIndexTaskIOConfig( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java index 37cbd2f0f9f6..69516979f3e1 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java @@ -2362,8 +2362,7 @@ private KinesisIndexTask createTask( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod, - null + intermediateHandoffPeriod ); return createTask(taskId, dataSchema, ioConfig, tuningConfig, context); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index abe432743d8d..3eaa8e81f90c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -84,7 +84,6 @@ public void testSerdeWithDefaults() throws Exception Assert.assertNull(config.getFetchThreads()); Assert.assertFalse(config.isSkipSequenceNumberAvailabilityCheck()); Assert.assertFalse(config.isResetOffsetAutomatically()); - Assert.assertEquals(1, config.getNumPersistThreads()); } @Test @@ -106,7 +105,6 @@ public void testSerdeWithNonDefaults() throws Exception + " \"skipSequenceNumberAvailabilityCheck\": true,\n" + " \"fetchThreads\": 2,\n" + " \"appendableIndexSpec\": { \"type\" : \"onheap\" },\n" - + " \"numPersistThreads\": 2\n" + "}"; KinesisIndexTaskTuningConfig config = (KinesisIndexTaskTuningConfig) mapper.readValue( @@ -134,7 +132,6 @@ public void testSerdeWithNonDefaults() throws Exception Assert.assertEquals(2, (int) config.getFetchThreads()); Assert.assertTrue(config.isSkipSequenceNumberAvailabilityCheck()); Assert.assertFalse(config.isResetOffsetAutomatically()); - Assert.assertEquals(2, config.getNumPersistThreads()); } @Test @@ -165,8 +162,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException 500, 500, 6000, - new Period("P3D"), - 2 + new Period("P3D") ); String serialized = mapper.writeValueAsString(base); @@ -195,7 +191,6 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout()); Assert.assertEquals(base.getRecordBufferSizeConfigured(), deserialized.getRecordBufferSizeConfigured()); Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), deserialized.getMaxRecordsPerPollConfigured()); - Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads()); } @Test @@ -226,8 +221,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException 500, 500, 6000, - new Period("P3D"), - 2 + new Period("P3D") ); String serialized = mapper.writeValueAsString(new TestModifiedKinesisIndexTaskTuningConfig(base, "loool")); @@ -255,7 +249,6 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout()); Assert.assertEquals(base.getRecordBufferSizeConfigured(), deserialized.getRecordBufferSizeConfigured()); Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), deserialized.getMaxRecordsPerPollConfigured()); - Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads()); } @Test @@ -319,8 +312,7 @@ public void testConvert() null, null, null, - null, - 2 + null ); KinesisIndexTaskTuningConfig copy = original.convertToTaskTuningConfig(); @@ -343,7 +335,6 @@ public void testConvert() Assert.assertTrue(copy.isResetOffsetAutomatically()); Assert.assertEquals(10, (int) copy.getMaxRecordsPerPollConfigured()); Assert.assertEquals(new Period().withDays(Integer.MAX_VALUE), copy.getIntermediateHandoffPeriod()); - Assert.assertEquals(2, copy.getNumPersistThreads()); } @Test diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index a73873013291..39d943dbebef 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -211,7 +211,6 @@ public void setupTest() null, null, null, - null, null ); rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory(); @@ -3979,7 +3978,6 @@ public void testIsTaskCurrent() null, null, null, - null, null ); @@ -5087,7 +5085,6 @@ public SeekableStreamIndexTaskClient build( null, null, null, - null, null ); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java index 05ee17df2ca8..70611266efe3 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java @@ -61,7 +61,6 @@ public TestModifiedKinesisIndexTaskTuningConfig( @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions, @JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll, @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, - @JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads, @JsonProperty("extra") String extra ) { @@ -90,8 +89,7 @@ public TestModifiedKinesisIndexTaskTuningConfig( maxParseExceptions, maxSavedParseExceptions, maxRecordsPerPoll, - intermediateHandoffPeriod, - numPersistThreads + intermediateHandoffPeriod ); this.extra = extra; } @@ -123,8 +121,7 @@ public TestModifiedKinesisIndexTaskTuningConfig(KinesisIndexTaskTuningConfig bas base.getMaxParseExceptions(), base.getMaxSavedParseExceptions(), base.getMaxRecordsPerPollConfigured(), - base.getIntermediateHandoffPeriod(), - base.getNumPersistThreads() + base.getIntermediateHandoffPeriod() ); this.extra = extra; } From 42e8291364fdb8a563d0ceed5a5efb9ad5e34e57 Mon Sep 17 00:00:00 2001 From: Pramod Immaneni Date: Sat, 3 Feb 2024 12:48:29 -0800 Subject: [PATCH 7/8] Fix errors after merge --- .../druid/indexing/kafka/supervisor/KafkaSupervisorTest.java | 1 + .../org/apache/druid/segment/realtime/plumber/SinkTest.java | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 21a1bea4083f..21264a9e3968 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -4191,6 +4191,7 @@ public void testSequenceNameDoesNotChangeWithTaskId() null, null, null, + null, null ) ); diff --git a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java index b286c04645d3..0d3c9c23bfff 100644 --- a/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/org/apache/druid/segment/realtime/plumber/SinkTest.java @@ -398,7 +398,8 @@ public void testGetSinkSignature() throws IndexSizeExceededException null, null, null, - "dedupColumn" + "dedupColumn", + null ); final Sink sink = new Sink( interval, From 41a50eb1c625bbcbdc254157dda8b9f4428959ea Mon Sep 17 00:00:00 2001 From: Pramod Immaneni Date: Mon, 5 Feb 2024 11:13:31 -0800 Subject: [PATCH 8/8] Fixed unit test --- .../indexing/kinesis/KinesisIndexTaskTuningConfigTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 934cd06002d5..b61c5cf2ae48 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -106,7 +106,7 @@ public void testSerdeWithNonDefaults() throws Exception + " \"resetOffsetAutomatically\": false,\n" + " \"skipSequenceNumberAvailabilityCheck\": true,\n" + " \"fetchThreads\": 2,\n" - + " \"appendableIndexSpec\": { \"type\" : \"onheap\" },\n" + + " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n" + "}"; KinesisIndexTaskTuningConfig config = (KinesisIndexTaskTuningConfig) mapper.readValue(