Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/ingestion/supervisor.md
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ For configuration properties specific to Kafka and Kinesis, see [Kafka tuning co
|`logParseExceptions`|Boolean|If `true`, Druid logs an error message when a parsing exception occurs, containing information about the row where the error occurred.|No|`false`|
|`maxParseExceptions`|Integer|The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Setting `reportParseExceptions` overrides this limit.|No|unlimited|
|`maxSavedParseExceptions`|Integer|When a parse exception occurs, Druid keeps track of the most recent parse exceptions. `maxSavedParseExceptions` limits the number of saved exception instances. These saved exceptions are available after the task finishes in the [task completion report](../ingestion/tasks.md#task-reports). Setting `reportParseExceptions` overrides this limit.|No|0|
|`maxColumnsToMerge`|Integer|Limit of the number of segments to merge in a single phase when merging segments for publishing. This limit affects the total number of columns present in a set of segments to merge. If the limit is exceeded, segment merging occurs in multiple phases. Druid merges at least 2 segments per phase, regardless of this setting.|No|-1|

## Start a supervisor

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ public RabbitStreamIndexTaskTuningConfig(
@Nullable Integer numPersistThreads,
@Nullable Integer recordBufferSize,
@Nullable Integer recordBufferOfferTimeout,
@Nullable Integer maxRecordsPerPoll)
@Nullable Integer maxRecordsPerPoll,
@Nullable Integer maxColumnsToMerge
)
{
super(
appendableIndexSpec,
Expand All @@ -97,7 +99,8 @@ public RabbitStreamIndexTaskTuningConfig(
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads
numPersistThreads,
maxColumnsToMerge
);

this.recordBufferSize = recordBufferSize;
Expand Down Expand Up @@ -130,7 +133,8 @@ private RabbitStreamIndexTaskTuningConfig(
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
)
{
this(
Expand All @@ -156,7 +160,9 @@ private RabbitStreamIndexTaskTuningConfig(
numPersistThreads,
recordBufferSize,
recordBufferOfferTimeout,
maxRecordsPerPoll);
maxRecordsPerPoll,
maxColumnsToMerge
);
}

@Nullable
Expand Down Expand Up @@ -226,7 +232,8 @@ public RabbitStreamIndexTaskTuningConfig withBasePersistDirectory(File dir)
getNumPersistThreads(),
getRecordBufferSizeConfigured(),
getRecordBufferOfferTimeout(),
getMaxRecordsPerPollConfigured()
getMaxRecordsPerPollConfigured(),
getMaxColumnsToMerge()
);
}

Expand All @@ -253,6 +260,7 @@ public String toString()
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", numPersistThreads=" + getNumPersistThreads() +
", maxRecordsPerPole=" + getMaxRecordsPerPollConfigured() +
", maxColumnsToMerge=" + getMaxColumnsToMerge() +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public static RabbitStreamSupervisorTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}
Expand Down Expand Up @@ -99,7 +100,9 @@ public RabbitStreamSupervisorTuningConfig(
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll)
@JsonProperty("maxRecordsPerPoll") @Nullable Integer maxRecordsPerPoll,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
)
{
super(
appendableIndexSpec,
Expand All @@ -124,7 +127,8 @@ public RabbitStreamSupervisorTuningConfig(
numPersistThreads,
recordBufferSize,
recordBufferOfferTimeout,
maxRecordsPerPoll
maxRecordsPerPoll,
maxColumnsToMerge
);
this.workerThreads = workerThreads;
this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES);
Expand Down Expand Up @@ -210,6 +214,7 @@ public String toString()
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", numPersistThreads=" + getNumPersistThreads() +
", maxRecordsPerPoll=" + getMaxRecordsPerPollConfigured() +
", maxColumnsToMerge=" + getMaxColumnsToMerge() +
'}';
}

Expand Down Expand Up @@ -239,7 +244,8 @@ public RabbitStreamIndexTaskTuningConfig convertToTaskTuningConfig()
getRecordBufferSizeConfigured(),
getRecordBufferOfferTimeout(),
getMaxRecordsPerPollConfigured(),
getNumPersistThreads()
getNumPersistThreads(),
getMaxColumnsToMerge()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,8 @@ public void testtoString() throws Exception
"maxParseExceptions=0, " +
"maxSavedParseExceptions=0, " +
"numPersistThreads=1, " +
"maxRecordsPerPoll=null}";
"maxRecordsPerPoll=null, " +
"maxColumnsToMerge=-1}";


Assert.assertEquals(resStr, config.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ public void setupTest()
null,
null,
null,
100);
100,
null
);
rowIngestionMetersFactory = new TestUtils().getRowIngestionMetersFactory();
serviceEmitter = new StubServiceEmitter("RabbitStreamSupervisorTest", "localhost");
EmittingLogger.registerEmitter(serviceEmitter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public KafkaIndexTaskTuningConfig(
@Nullable Boolean logParseExceptions,
@Nullable Integer maxParseExceptions,
@Nullable Integer maxSavedParseExceptions,
@Nullable Integer numPersistThreads
@Nullable Integer numPersistThreads,
@Nullable Integer maxColumnsToMerge
)
{
super(
Expand All @@ -76,7 +77,8 @@ public KafkaIndexTaskTuningConfig(
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads
numPersistThreads,
maxColumnsToMerge
);
}

Expand All @@ -100,7 +102,8 @@ private KafkaIndexTaskTuningConfig(
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
)
{
this(
Expand All @@ -123,7 +126,8 @@ private KafkaIndexTaskTuningConfig(
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads
numPersistThreads,
maxColumnsToMerge
);
}

Expand All @@ -150,7 +154,8 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir)
isLogParseExceptions(),
getMaxParseExceptions(),
getMaxSavedParseExceptions(),
getNumPersistThreads()
getNumPersistThreads(),
getMaxColumnsToMerge()
);
}

Expand All @@ -177,7 +182,8 @@ public String toString()
", maxParseExceptions=" + getMaxParseExceptions() +
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", numPersistThreads=" + getNumPersistThreads() +
'}';
", getMaxColumnsToMerge=" + getMaxColumnsToMerge() +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public static KafkaSupervisorTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}
Expand Down Expand Up @@ -93,7 +94,8 @@ public KafkaSupervisorTuningConfig(
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge
)
{
super(
Expand All @@ -116,7 +118,8 @@ public KafkaSupervisorTuningConfig(
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads
numPersistThreads,
maxColumnsToMerge
);
this.workerThreads = workerThreads;
this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES);
Expand Down Expand Up @@ -229,7 +232,8 @@ public KafkaIndexTaskTuningConfig convertToTaskTuningConfig()
isLogParseExceptions(),
getMaxParseExceptions(),
getMaxSavedParseExceptions(),
getNumPersistThreads()
getNumPersistThreads(),
getMaxColumnsToMerge()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2861,6 +2861,7 @@ private KafkaIndexTask createTask(
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
null,
null
);
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public void testSerdeWithDefaults() throws Exception
Assert.assertEquals(false, config.isReportParseExceptions());
Assert.assertEquals(Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout());
Assert.assertEquals(1, config.getNumPersistThreads());
Assert.assertEquals(-1, config.getMaxColumnsToMerge());
}

@Test
Expand Down Expand Up @@ -123,6 +124,7 @@ public void testSerdeWithNonDefaults() throws Exception
config.getIndexSpecForIntermediatePersists()
);
Assert.assertEquals(2, config.getNumPersistThreads());
Assert.assertEquals(-1, config.getMaxColumnsToMerge());
}

@Test
Expand Down Expand Up @@ -152,7 +154,8 @@ public void testConvert()
null,
null,
null,
2
2,
5
);
KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig();

Expand All @@ -168,6 +171,7 @@ public void testConvert()
Assert.assertEquals(true, copy.isReportParseExceptions());
Assert.assertEquals(5L, copy.getHandoffConditionTimeout());
Assert.assertEquals(2, copy.getNumPersistThreads());
Assert.assertEquals(5, copy.getMaxColumnsToMerge());
}

@Test
Expand All @@ -193,7 +197,8 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
true,
42,
42,
2
2,
-1
);

String serialized = mapper.writeValueAsString(base);
Expand All @@ -219,6 +224,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads());
Assert.assertEquals(base.getMaxColumnsToMerge(), deserialized.getMaxColumnsToMerge());
}

@Test
Expand All @@ -244,6 +250,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
42,
42,
2,
-1,
"extra string"
);

Expand All @@ -269,6 +276,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads());
Assert.assertEquals(base.getMaxColumnsToMerge(), deserialized.getMaxColumnsToMerge());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,8 @@ public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
null,
null,
null,
null
null,
null
);

EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(kafkaSupervisorIOConfig).anyTimes();
Expand Down Expand Up @@ -497,6 +498,7 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException
null,
null,
null,
null,
null
),
null
Expand Down Expand Up @@ -4221,6 +4223,7 @@ public void testIsTaskCurrent()
null,
null,
null,
null,
null
)
);
Expand Down Expand Up @@ -4260,6 +4263,7 @@ public void testIsTaskCurrent()
null,
null,
null,
null,
null
);

Expand Down Expand Up @@ -4413,6 +4417,7 @@ public void testSequenceNameDoesNotChangeWithTaskId()
null,
null,
null,
null,
null
)
);
Expand Down Expand Up @@ -4888,6 +4893,7 @@ public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
null,
null,
10,
null,
null
);

Expand Down Expand Up @@ -5002,6 +5008,7 @@ public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
null,
null,
null,
null,
null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public TestModifiedKafkaIndexTaskTuningConfig(
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
@JsonProperty("extra") String extra
)
{
Expand All @@ -79,7 +80,8 @@ public TestModifiedKafkaIndexTaskTuningConfig(
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions,
numPersistThreads
numPersistThreads,
maxColumnsToMerge
);
this.extra = extra;
}
Expand Down
Loading