From 8db1d420bc7df1b7a5f6b206d79ba2fbe504cb03 Mon Sep 17 00:00:00 2001 From: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> Date: Tue, 15 Aug 2023 13:36:47 +0530 Subject: [PATCH 1/3] Refine docs and tests --- .../kafka-supervisor-reference.md | 8 +- .../indexing/kafka/KafkaRecordSupplier.java | 2 +- .../kafka/KafkaRecordSupplierTest.java | 125 +++++++++++------- 3 files changed, 87 insertions(+), 48 deletions(-) diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md b/docs/development/extensions-core/kafka-supervisor-reference.md index be87a3dff9ce..d0d338c42579 100644 --- a/docs/development/extensions-core/kafka-supervisor-reference.md +++ b/docs/development/extensions-core/kafka-supervisor-reference.md @@ -37,7 +37,7 @@ This topic contains configuration reference information for the Apache Kafka sup |Field|Type|Description|Required| |-----|----|-----------|--------| -|`topic`|String|The Kafka topic to read from. Must be a specific topic. Topic patterns are not supported.|yes| +|`topic`|String|The Kafka topic to read from. Must be a specific topic. Topic patterns are supported if `multiTopic` is enabled.|yes| |`inputFormat`|Object|`inputFormat` to define input data parsing. See [Specifying data format](#specifying-data-format) for details about specifying the input format.|yes| |`consumerProperties`|Map|A map of properties to pass to the Kafka consumer. See [More on consumer properties](#more-on-consumerproperties).|yes| |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)| @@ -144,6 +144,12 @@ impressions, you will set `topic` to `clicks|impressions` in the IO config. If n match the regex, druid will automatically start ingesting from those new topics. If you enable multi-topic ingestion for a datasource, downgrading will cause the ingestion to fail for that datasource. +When ingesting from multiple topics, the partitions are assigned based on the hashcode of topic and the id of the +partition within that topic. The partition assignment might not be uniform across all the tasks. It's also assumed +that partitions across individual topics have similar load. It is recommended that you have a higher number of +partitions for a high load topic and a lower number of partitions for a low load topic. Assuming that you want to +ingest from both high and low load topic in the same supervisor. + ## More on consumerProperties Consumer properties must contain a property `bootstrap.servers` with a list of Kafka brokers in the form: `:,:,...`. diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java index 96d7a849af20..a88300552e2a 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaRecordSupplier.java @@ -231,7 +231,7 @@ public Set getPartitionIds(String stream) throw DruidException.forPersona(DruidException.Persona.OPERATOR) .ofCategory(DruidException.Category.INVALID_INPUT) .build("Topic [%s] is not found." - + "Check that the topic exists in Kafka cluster", stream); + + " Check that the topic exists in Kafka cluster", stream); } } return allPartitions.stream() diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java index 0718f3d08c94..bfd81464ba2f 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaRecordSupplierTest.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; import org.apache.curator.test.TestingCluster; import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.data.input.kafka.KafkaTopicPartition; @@ -57,6 +58,7 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.regex.Pattern; import java.util.stream.Collectors; public class KafkaRecordSupplierTest @@ -67,6 +69,9 @@ public class KafkaRecordSupplierTest private static final int POLL_RETRY = 5; private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper(); + private static KafkaTopicPartition PARTITION_0 = new KafkaTopicPartition(false, null, 0); + private static KafkaTopicPartition PARTITION_1 = new KafkaTopicPartition(false, null, 1); + private static String TOPIC = "topic"; private static int TOPIC_POS_FIX = 0; private static TestingCluster ZK_SERVER; @@ -78,21 +83,21 @@ public class KafkaRecordSupplierTest private static List> generateRecords(String topic) { return ImmutableList.of( - new ProducerRecord<>(TOPIC, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 0, null, StringUtils.toUtf8("unparseable")), - new ProducerRecord<>(TOPIC, 0, null, StringUtils.toUtf8("unparseable2")), - new ProducerRecord<>(TOPIC, 0, null, null), - new ProducerRecord<>(TOPIC, 0, null, jb("2013", "f", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 0, null, jb("2049", "f", "y", "notanumber", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 1, null, jb("2049", "f", "y", "10", "notanumber", "1.0")), - new ProducerRecord<>(TOPIC, 1, null, jb("2049", "f", "y", "10", "20.0", "notanumber")), - new ProducerRecord<>(TOPIC, 1, null, jb("2012", "g", "y", "10", "20.0", "1.0")), - new ProducerRecord<>(TOPIC, 1, null, jb("2011", "h", "y", "10", "20.0", "1.0")) + new ProducerRecord<>(topic, 0, null, jb("2008", "a", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2009", "b", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2010", "c", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2011", "d", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2011", "e", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("246140482-04-24T15:36:27.903Z", "x", "z", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable")), + new ProducerRecord<>(topic, 0, null, StringUtils.toUtf8("unparseable2")), + new ProducerRecord<>(topic, 0, null, null), + new ProducerRecord<>(topic, 0, null, jb("2013", "f", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 0, null, jb("2049", "f", "y", "notanumber", "20.0", "1.0")), + new ProducerRecord<>(topic, 1, null, jb("2049", "f", "y", "10", "notanumber", "1.0")), + new ProducerRecord<>(topic, 1, null, jb("2049", "f", "y", "10", "20.0", "notanumber")), + new ProducerRecord<>(topic, 1, null, jb("2012", "g", "y", "10", "20.0", "1.0")), + new ProducerRecord<>(topic, 1, null, jb("2011", "h", "y", "10", "20.0", "1.0")) ); } @@ -229,8 +234,8 @@ public void testSupplierSetup() throws ExecutionException, InterruptedException insertData(); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( @@ -241,12 +246,40 @@ public void testSupplierSetup() throws ExecutionException, InterruptedException recordSupplier.assign(partitions); Assert.assertEquals(partitions, recordSupplier.getAssignment()); - Assert.assertEquals(ImmutableSet.of(new KafkaTopicPartition(false, TOPIC, 0), new KafkaTopicPartition(false, TOPIC, 1)), + Assert.assertEquals(ImmutableSet.of(PARTITION_0, PARTITION_1), recordSupplier.getPartitionIds(TOPIC)); recordSupplier.close(); } + @Test + public void testMultiTopicSupplierSetup() throws ExecutionException, InterruptedException + { + // Insert data into TOPIC + insertData(); + + // Insert data into other topic + String otherTopic = nextTopicName(); + records = generateRecords(otherTopic); + insertData(); + + KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( + KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, true); + + String stream = Pattern.quote(TOPIC) + "|" + Pattern.quote(otherTopic); + Set partitions = recordSupplier.getPartitionIds(stream); + Set diff = Sets.difference( + ImmutableSet.of( + new KafkaTopicPartition(true, TOPIC, 0), + new KafkaTopicPartition(true, TOPIC, 1), + new KafkaTopicPartition(true, otherTopic, 0), + new KafkaTopicPartition(true, otherTopic, 1) + ), + partitions + ); + Assert.assertEquals(diff.toString(), 0, diff.size()); + } + @Test public void testSupplierSetupCustomDeserializer() throws ExecutionException, InterruptedException { @@ -255,8 +288,8 @@ public void testSupplierSetupCustomDeserializer() throws ExecutionException, Int insertData(); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); Map properties = KAFKA_SERVER.consumerProperties(); @@ -275,7 +308,7 @@ public void testSupplierSetupCustomDeserializer() throws ExecutionException, Int recordSupplier.assign(partitions); Assert.assertEquals(partitions, recordSupplier.getAssignment()); - Assert.assertEquals(ImmutableSet.of(new KafkaTopicPartition(false, TOPIC, 0), new KafkaTopicPartition(false, TOPIC, 1)), + Assert.assertEquals(ImmutableSet.of(PARTITION_0, PARTITION_1), recordSupplier.getPartitionIds(TOPIC)); recordSupplier.close(); @@ -329,8 +362,8 @@ public void testPollCustomDeserializer() throws InterruptedException, ExecutionE insertData(); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); Map properties = KAFKA_SERVER.consumerProperties(); @@ -371,8 +404,8 @@ public void testPoll() throws InterruptedException, ExecutionException insertData(); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( @@ -424,8 +457,8 @@ public void testPollAfterMoreDataAdded() throws InterruptedException, ExecutionE } Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); @@ -491,12 +524,12 @@ public void testSeek() throws InterruptedException, ExecutionException // Insert data insertData(); - StreamPartition partition0 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); - StreamPartition partition1 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)); + StreamPartition partition0 = StreamPartition.of(TOPIC, PARTITION_0); + StreamPartition partition1 = StreamPartition.of(TOPIC, PARTITION_1); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( @@ -534,12 +567,12 @@ public void testSeekToLatest() throws InterruptedException, ExecutionException // Insert data insertData(); - StreamPartition partition0 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); - StreamPartition partition1 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)); + StreamPartition partition0 = StreamPartition.of(TOPIC, PARTITION_0); + StreamPartition partition1 = StreamPartition.of(TOPIC, PARTITION_1); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( @@ -568,11 +601,11 @@ public void testSeekUnassigned() throws InterruptedException, ExecutionException } } - StreamPartition partition0 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); - StreamPartition partition1 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)); + StreamPartition partition0 = StreamPartition.of(TOPIC, PARTITION_0); + StreamPartition partition1 = StreamPartition.of(TOPIC, PARTITION_1); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)) + StreamPartition.of(TOPIC, PARTITION_0) ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( @@ -593,12 +626,12 @@ public void testPosition() throws ExecutionException, InterruptedException // Insert data insertData(); - StreamPartition partition0 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); - StreamPartition partition1 = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)); + StreamPartition partition0 = StreamPartition.of(TOPIC, PARTITION_0); + StreamPartition partition1 = StreamPartition.of(TOPIC, PARTITION_1); Set> partitions = ImmutableSet.of( - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)), - StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 1)) + StreamPartition.of(TOPIC, PARTITION_0), + StreamPartition.of(TOPIC, PARTITION_1) ); KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( @@ -638,7 +671,7 @@ public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetShoul { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); - StreamPartition streamPartition = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); + StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -650,7 +683,7 @@ public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseEarliestOffsetSho { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); - StreamPartition streamPartition = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); + StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); recordSupplier.seekToEarliest(partitions); @@ -662,7 +695,7 @@ public void getLatestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShouldR { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); - StreamPartition streamPartition = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); + StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); recordSupplier.seekToLatest(partitions); @@ -674,7 +707,7 @@ public void getEarliestSequenceNumberWhenPartitionIsEmptyAndUseLatestOffsetShoul { KafkaRecordSupplier recordSupplier = new KafkaRecordSupplier( KAFKA_SERVER.consumerProperties(), OBJECT_MAPPER, null, false); - StreamPartition streamPartition = StreamPartition.of(TOPIC, new KafkaTopicPartition(false, TOPIC, 0)); + StreamPartition streamPartition = StreamPartition.of(TOPIC, PARTITION_0); Set> partitions = ImmutableSet.of(streamPartition); recordSupplier.assign(partitions); recordSupplier.seekToLatest(partitions); From 998e24c1abec67df08abb7b5618bf3e7948897fb Mon Sep 17 00:00:00 2001 From: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> Date: Wed, 16 Aug 2023 14:53:52 +0530 Subject: [PATCH 2/3] Use a separate field for topic pattern --- .../kafka-supervisor-reference.md | 19 +++-- .../kafka/supervisor/KafkaSupervisor.java | 6 +- .../supervisor/KafkaSupervisorIOConfig.java | 45 ++++++++-- .../kafka/supervisor/KafkaSupervisorSpec.java | 2 +- .../indexing/kafka/KafkaSamplerSpecTest.java | 64 ++++++++++++-- .../KafkaSupervisorIOConfigTest.java | 35 +++++++- .../supervisor/KafkaSupervisorSpecTest.java | 84 +++++++++++++++++++ .../kafka/supervisor/KafkaSupervisorTest.java | 8 +- 8 files changed, 231 insertions(+), 32 deletions(-) diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md b/docs/development/extensions-core/kafka-supervisor-reference.md index d0d338c42579..536f1ade95be 100644 --- a/docs/development/extensions-core/kafka-supervisor-reference.md +++ b/docs/development/extensions-core/kafka-supervisor-reference.md @@ -37,7 +37,8 @@ This topic contains configuration reference information for the Apache Kafka sup |Field|Type|Description|Required| |-----|----|-----------|--------| -|`topic`|String|The Kafka topic to read from. Must be a specific topic. Topic patterns are supported if `multiTopic` is enabled.|yes| +|`topic`|String|The Kafka topic to read from. Must be a specific topic. Use this setting when you want to ingest from a single kafka topic.|yes| +|`topicPattern`|String|A regex pattern that can used to select multiple kafka topics to ingest data from. Either this or `topic` can be used in a spec. See [Ingesting from multiple topics](#ingesting-from-multiple-topics) for more details.|yes| |`inputFormat`|Object|`inputFormat` to define input data parsing. See [Specifying data format](#specifying-data-format) for details about specifying the input format.|yes| |`consumerProperties`|Map|A map of properties to pass to the Kafka consumer. See [More on consumer properties](#more-on-consumerproperties).|yes| |`pollTimeout`|Long|The length of time to wait for the Kafka consumer to poll records, in milliseconds|no (default == 100)| @@ -53,7 +54,6 @@ This topic contains configuration reference information for the Apache Kafka sup |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)| |`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest tasks. See [Tasks Autoscaler Properties](#task-autoscaler-properties).|no (default == null)| |`idleConfig`|Object|Defines how and when Kafka Supervisor can become idle. See [Idle Supervisor Configuration](#idle-supervisor-configuration) for more details.|no (default == null)| -|`multiTopic`|Boolean|Set this to true if you want to ingest data from multiple Kafka topics using a single supervisor. See [Ingesting from multiple topics](#ingesting-from-multiple-topics) for more details.|no (default == false)| ## Task Autoscaler Properties @@ -138,13 +138,16 @@ The following example demonstrates supervisor spec with `lagBased` autoScaler an } ``` ## Ingesting from multiple topics -To ingest from multiple topics, you have to set `multiTopic` in the supervisor IO config to `true`. Multiple topics -can be passed as a regex pattern as the value for `topic` in the IO config. For example, to ingest data from clicks and -impressions, you will set `topic` to `clicks|impressions` in the IO config. If new topics are added to the cluster that -match the regex, druid will automatically start ingesting from those new topics. If you enable multi-topic -ingestion for a datasource, downgrading will cause the ingestion to fail for that datasource. -When ingesting from multiple topics, the partitions are assigned based on the hashcode of topic and the id of the +To ingest data from multiple topics, you have to set `topicPattern` in the supervisor IO config and not set `topic`. +Multiple topics can be passed as a regex pattern as the value for `topicPattern` in the IO config. For example, to +ingest data from clicks and impressions, you will set `topicPattern` to `clicks|impressions` in the IO config. +Similarly, you can use `metrics-.*` as the value for `topicPattern` if you want to ingest from all the topics that +start with `metrics-`. If new topics are added to the cluster that match the regex, Druid will automatically start +ingesting from those new topics. If you enable multi-topic ingestion for a datasource, downgrading to a version +lesser than 28.0.0 will cause the ingestion for that datasource to fail. + +When ingesting data from multiple topics, the partitions are assigned based on the hashcode of topic and the id of the partition within that topic. The partition assignment might not be uniform across all the tasks. It's also assumed that partitions across individual topics have similar load. It is recommended that you have a higher number of partitions for a high load topic and a lower number of partitions for a low load topic. Assuming that you want to diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index d79d5f3211c4..a146e090b55f 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -169,7 +169,7 @@ protected SeekableStreamSupervisorReportPayload creat Map partitionLag = getRecordLagPerPartitionInLatestSequences(getHighestCurrentOffsets()); return new KafkaSupervisorReportPayload( spec.getDataSchema().getDataSource(), - ioConfig.getTopic(), + ioConfig.getStream(), numPartitions, ioConfig.getReplicas(), ioConfig.getTaskDuration().getMillis() / 1000, @@ -204,8 +204,8 @@ protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( baseSequenceName, null, null, - new SeekableStreamStartSequenceNumbers<>(kafkaIoConfig.getTopic(), startPartitions, Collections.emptySet()), - new SeekableStreamEndSequenceNumbers<>(kafkaIoConfig.getTopic(), endPartitions), + new SeekableStreamStartSequenceNumbers<>(kafkaIoConfig.getStream(), startPartitions, Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(kafkaIoConfig.getStream(), endPartitions), kafkaIoConfig.getConsumerProperties(), kafkaIoConfig.getPollTimeout(), true, diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index c244fea95c7f..fbf55f4ab5ed 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputFormat; +import org.apache.druid.error.InvalidInput; import org.apache.druid.indexing.seekablestream.extension.KafkaConfigOverrides; import org.apache.druid.indexing.seekablestream.supervisor.IdleConfig; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; @@ -48,11 +49,13 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig private final Map consumerProperties; private final long pollTimeout; private final KafkaConfigOverrides configOverrides; - private final boolean multiTopic; + private final String topic; + private final String topicPattern; @JsonCreator public KafkaSupervisorIOConfig( @JsonProperty("topic") String topic, + @JsonProperty("topicPattern") String topicPattern, @JsonProperty("inputFormat") InputFormat inputFormat, @JsonProperty("replicas") Integer replicas, @JsonProperty("taskCount") Integer taskCount, @@ -69,12 +72,11 @@ public KafkaSupervisorIOConfig( @JsonProperty("lateMessageRejectionStartDateTime") DateTime lateMessageRejectionStartDateTime, @JsonProperty("configOverrides") KafkaConfigOverrides configOverrides, @JsonProperty("idleConfig") IdleConfig idleConfig, - @JsonProperty("multiTopic") Boolean multiTopic, @JsonProperty("stopTaskCount") Integer stopTaskCount ) { super( - Preconditions.checkNotNull(topic, "topic"), + checkTopicArguments(topic, topicPattern), inputFormat, replicas, taskCount, @@ -98,13 +100,26 @@ public KafkaSupervisorIOConfig( ); this.pollTimeout = pollTimeout != null ? pollTimeout : DEFAULT_POLL_TIMEOUT_MILLIS; this.configOverrides = configOverrides; - this.multiTopic = multiTopic != null ? multiTopic : DEFAULT_IS_MULTI_TOPIC; + this.topic = topic; + this.topicPattern = topicPattern; } + /** + * Only used in testing or serialization/deserialization + */ @JsonProperty public String getTopic() { - return getStream(); + return topic; + } + + /** + * Only used in testing or serialization/deserialization + */ + @JsonProperty + public String getTopicPattern() + { + return topicPattern; } @JsonProperty @@ -131,10 +146,9 @@ public KafkaConfigOverrides getConfigOverrides() return configOverrides; } - @JsonProperty public boolean isMultiTopic() { - return multiTopic; + return topicPattern != null; } @Override @@ -142,6 +156,7 @@ public String toString() { return "KafkaSupervisorIOConfig{" + "topic='" + getTopic() + '\'' + + "topicPattern='" + getTopicPattern() + '\'' + ", replicas=" + getReplicas() + ", taskCount=" + getTaskCount() + ", taskDuration=" + getTaskDuration() + @@ -157,7 +172,23 @@ public String toString() ", lateMessageRejectionStartDateTime=" + getLateMessageRejectionStartDateTime() + ", configOverrides=" + getConfigOverrides() + ", idleConfig=" + getIdleConfig() + + ", stopTaskCount=" + getStopTaskCount() + '}'; } + private static String checkTopicArguments(String topic, String topicPattern) + { + if (topic == null && topicPattern == null) { + throw InvalidInput.exception("Either topic or topicPattern must be specified"); + } + if (topic != null && topicPattern != null) { + throw InvalidInput.exception( + "Only one of topic [%s] or topicPattern [%s] must be specified", + topic, + topicPattern + ); + } + return topic != null ? topic : topicPattern; + } + } diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index af6f69ab2ec2..9337175378f8 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -114,7 +114,7 @@ public Set getInputSourceResources() @Override public String getSource() { - return getIoConfig() != null ? getIoConfig().getTopic() : null; + return getIoConfig() != null ? getIoConfig().getStream() : null; } @Override diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index a6731f06a777..9a6fd03726d0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -72,6 +72,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; public class KafkaSamplerSpecTest extends InitializedNullHandlingTest { @@ -163,6 +164,60 @@ public void testSample() null, new KafkaSupervisorIOConfig( TOPIC, + null, + new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), + null, + null, + null, + kafkaServer.consumerProperties(), + null, + null, + null, + null, + true, + null, + null, + null, + null, + null, + null, + null + ), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec( + supervisorSpec, + new SamplerConfig(5, 5_000, null, null), + new InputSourceSampler(OBJECT_MAPPER), + OBJECT_MAPPER + ); + + runSamplerAndCompareResponse(samplerSpec, true); + } + + @Test + public void testSampleWithTopicPattern() + { + insertData(generateRecords(TOPIC)); + + KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec( + null, + DATA_SCHEMA, + null, + new KafkaSupervisorIOConfig( + null, + Pattern.quote(TOPIC), new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null, null, @@ -179,7 +234,6 @@ public void testSample() null, null, null, - false, null ), null, @@ -216,6 +270,7 @@ public void testSampleKafkaInputFormat() null, new KafkaSupervisorIOConfig( TOPIC, + null, new KafkaInputFormat( null, null, @@ -240,7 +295,6 @@ public void testSampleKafkaInputFormat() null, null, null, - false, null ), null, @@ -331,6 +385,7 @@ public void testWithInputRowParser() throws IOException null, null, null, + null, kafkaServer.consumerProperties(), null, null, @@ -343,7 +398,6 @@ public void testWithInputRowParser() throws IOException null, null, null, - false, null ), null, @@ -508,6 +562,7 @@ public void testInvalidKafkaConfig() null, new KafkaSupervisorIOConfig( TOPIC, + null, new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null, null, @@ -527,7 +582,6 @@ public void testInvalidKafkaConfig() null, null, null, - false, null ), null, @@ -564,6 +618,7 @@ public void testGetInputSourceResources() null, new KafkaSupervisorIOConfig( TOPIC, + null, new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null), null, null, @@ -583,7 +638,6 @@ public void testGetInputSourceResources() null, null, null, - false, null ), null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java index 51453e4335af..231705418f53 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfigTest.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; +import org.apache.druid.error.DruidException; import org.apache.druid.indexing.kafka.KafkaConsumerConfigs; import org.apache.druid.indexing.kafka.KafkaIndexTaskModule; import org.apache.druid.indexing.kafka.KafkaRecordSupplier; @@ -75,6 +76,7 @@ public void testSerdeWithDefaults() throws Exception ); Assert.assertEquals("my-topic", config.getTopic()); + Assert.assertNull(config.getTopicPattern()); Assert.assertEquals(1, (int) config.getReplicas()); Assert.assertEquals(1, (int) config.getTaskCount()); Assert.assertEquals(Duration.standardMinutes(60), config.getTaskDuration()); @@ -89,6 +91,28 @@ public void testSerdeWithDefaults() throws Exception Assert.assertFalse("lateMessageRejectionStartDateTime", config.getLateMessageRejectionStartDateTime().isPresent()); } + @Test + public void testSerdeWithTopicPattern() throws Exception + { + String jsonStr = "{\n" + + " \"type\": \"kafka\",\n" + + " \"topicPattern\": \"my-topic.*\",\n" + + " \"consumerProperties\": {\"bootstrap.servers\":\"localhost:9092\"}\n" + + "}"; + + KafkaSupervisorIOConfig config = mapper.readValue( + mapper.writeValueAsString( + mapper.readValue( + jsonStr, + KafkaSupervisorIOConfig.class + ) + ), KafkaSupervisorIOConfig.class + ); + + Assert.assertEquals("my-topic.*", config.getTopicPattern()); + Assert.assertNull(config.getTopic()); + } + @Test public void testSerdeWithNonDefaultsWithLateMessagePeriod() throws Exception { @@ -118,6 +142,7 @@ public void testSerdeWithNonDefaultsWithLateMessagePeriod() throws Exception ); Assert.assertEquals("my-topic", config.getTopic()); + Assert.assertNull(config.getTopicPattern()); Assert.assertEquals(3, (int) config.getReplicas()); Assert.assertEquals(9, (int) config.getTaskCount()); Assert.assertEquals(Duration.standardMinutes(30), config.getTaskDuration()); @@ -160,6 +185,7 @@ public void testSerdeWithNonDefaultsWithLateMessageStartDateTime() throws Except ); Assert.assertEquals("my-topic", config.getTopic()); + Assert.assertNull(config.getTopicPattern()); Assert.assertEquals(3, (int) config.getReplicas()); Assert.assertEquals(9, (int) config.getTaskCount()); Assert.assertEquals(Duration.standardMinutes(30), config.getTaskDuration()); @@ -189,6 +215,7 @@ public void testSerdeForConsumerPropertiesWithPasswords() throws Exception KafkaRecordSupplier.addConsumerPropertiesFromConfig(props, mapper, config.getConsumerProperties()); Assert.assertEquals("my-topic", config.getTopic()); + Assert.assertNull(config.getTopicPattern()); Assert.assertEquals("localhost:9092", props.getProperty("bootstrap.servers")); Assert.assertEquals("mytruststorepassword", props.getProperty("ssl.truststore.password")); Assert.assertEquals("mykeystorepassword", props.getProperty("ssl.keystore.password")); @@ -204,8 +231,8 @@ public void testTopicRequired() throws Exception + "}"; exception.expect(JsonMappingException.class); - exception.expectCause(CoreMatchers.isA(NullPointerException.class)); - exception.expectMessage(CoreMatchers.containsString("topic")); + exception.expectCause(CoreMatchers.isA(DruidException.class)); + exception.expectMessage(CoreMatchers.containsString("Either topic or topicPattern must be specified")); mapper.readValue(jsonStr, KafkaSupervisorIOConfig.class); } @@ -293,6 +320,7 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( "test", null, + null, 1, 1, new Period("PT1H"), @@ -308,7 +336,6 @@ public void testAutoScalingConfigSerde() throws JsonProcessingException null, null, null, - false, null ); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); @@ -336,6 +363,7 @@ public void testIdleConfigSerde() throws JsonProcessingException KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( "test", null, + null, 1, 1, new Period("PT1H"), @@ -351,7 +379,6 @@ public void testIdleConfigSerde() throws JsonProcessingException null, null, mapper.convertValue(idleConfig, IdleConfig.class), - false, null ); String ioConfig = mapper.writeValueAsString(kafkaSupervisorIOConfig); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java index 7874a11c588d..9ff0c0bd7840 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java @@ -129,6 +129,7 @@ public void testSerde() throws IOException Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); Assert.assertNotNull(spec.getIoConfig()); Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNull(spec.getIoConfig().getTopicPattern()); Assert.assertNotNull(spec.getTuningConfig()); Assert.assertNull(spec.getContext()); Assert.assertFalse(spec.isSuspended()); @@ -147,6 +148,85 @@ public void testSerde() throws IOException Assert.assertEquals(serialized, stable); } + @Test + public void testSerdeWithTopicPattern() throws IOException + { + String json = "{\n" + + " \"type\": \"kafka\",\n" + + " \"dataSchema\": {\n" + + " \"dataSource\": \"metrics-kafka\",\n" + + " \"parser\": {\n" + + " \"type\": \"string\",\n" + + " \"parseSpec\": {\n" + + " \"format\": \"json\",\n" + + " \"timestampSpec\": {\n" + + " \"column\": \"timestamp\",\n" + + " \"format\": \"auto\"\n" + + " },\n" + + " \"dimensionsSpec\": {\n" + + " \"dimensions\": [],\n" + + " \"dimensionExclusions\": [\n" + + " \"timestamp\",\n" + + " \"value\"\n" + + " ]\n" + + " }\n" + + " }\n" + + " },\n" + + " \"metricsSpec\": [\n" + + " {\n" + + " \"name\": \"count\",\n" + + " \"type\": \"count\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_sum\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleSum\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_min\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMin\"\n" + + " },\n" + + " {\n" + + " \"name\": \"value_max\",\n" + + " \"fieldName\": \"value\",\n" + + " \"type\": \"doubleMax\"\n" + + " }\n" + + " ],\n" + + " \"granularitySpec\": {\n" + + " \"type\": \"uniform\",\n" + + " \"segmentGranularity\": \"HOUR\",\n" + + " \"queryGranularity\": \"NONE\"\n" + + " }\n" + + " },\n" + + " \"ioConfig\": {\n" + + " \"topicPattern\": \"metrics.*\",\n" + + " \"consumerProperties\": {\n" + + " \"bootstrap.servers\": \"localhost:9092\"\n" + + " },\n" + + " \"taskCount\": 1\n" + + " }\n" + + "}"; + KafkaSupervisorSpec spec = mapper.readValue(json, KafkaSupervisorSpec.class); + + Assert.assertNotNull(spec); + Assert.assertNotNull(spec.getDataSchema()); + Assert.assertEquals("metrics.*", spec.getIoConfig().getTopicPattern()); + Assert.assertNull(spec.getIoConfig().getTopic()); + Assert.assertNotNull(spec.getTuningConfig()); + Assert.assertNull(spec.getContext()); + String serialized = mapper.writeValueAsString(spec); + + // expect default values populated in reserialized string + Assert.assertTrue(serialized.contains("\"topicPattern\":\"metrics.*\"")); + Assert.assertTrue(serialized, serialized.contains("\"topic\":null")); + + KafkaSupervisorSpec spec2 = mapper.readValue(serialized, KafkaSupervisorSpec.class); + + String stable = mapper.writeValueAsString(spec2); + + Assert.assertEquals(serialized, stable); + } @Test public void testSerdeWithInputFormat() throws IOException { @@ -215,6 +295,7 @@ public void testSerdeWithInputFormat() throws IOException Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); Assert.assertNotNull(spec.getIoConfig()); Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNull(spec.getIoConfig().getTopicPattern()); Assert.assertNotNull(spec.getTuningConfig()); Assert.assertNull(spec.getContext()); Assert.assertFalse(spec.isSuspended()); @@ -301,6 +382,7 @@ public void testSerdeWithSpec() throws IOException Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); Assert.assertNotNull(spec.getIoConfig()); Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNull(spec.getIoConfig().getTopicPattern()); Assert.assertNotNull(spec.getTuningConfig()); Assert.assertNull(spec.getContext()); Assert.assertFalse(spec.isSuspended()); @@ -389,6 +471,7 @@ public void testSerdeWithSpecAndInputFormat() throws IOException Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); Assert.assertNotNull(spec.getIoConfig()); Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNull(spec.getIoConfig().getTopicPattern()); Assert.assertNotNull(spec.getTuningConfig()); Assert.assertNull(spec.getContext()); Assert.assertFalse(spec.isSuspended()); @@ -473,6 +556,7 @@ public void testSuspendResume() throws IOException Assert.assertEquals(4, spec.getDataSchema().getAggregators().length); Assert.assertNotNull(spec.getIoConfig()); Assert.assertEquals("metrics", spec.getIoConfig().getTopic()); + Assert.assertNull(spec.getIoConfig().getTopicPattern()); Assert.assertNotNull(spec.getTuningConfig()); Assert.assertNull(spec.getContext()); Assert.assertFalse(spec.isSuspended()); diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 7ee0ed9f1e43..5f1311085cab 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -293,6 +293,7 @@ public SeekableStreamIndexTaskClient build( KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( topic, + null, INPUT_FORMAT, replicas, 1, @@ -309,7 +310,6 @@ public SeekableStreamIndexTaskClient build( null, null, new IdleConfig(true, 1000L), - false, 1 ); @@ -4501,6 +4501,7 @@ private TestableKafkaSupervisor getTestableSupervisor( consumerProperties.put("bootstrap.servers", kafkaHost); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( topic, + null, INPUT_FORMAT, replicas, taskCount, @@ -4517,7 +4518,6 @@ private TestableKafkaSupervisor getTestableSupervisor( null, null, idleConfig, - false, null ); @@ -4614,6 +4614,7 @@ private TestableKafkaSupervisor getTestableSupervisorCustomIsTaskCurrent( consumerProperties.put("isolation.level", "read_committed"); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( topic, + null, INPUT_FORMAT, replicas, taskCount, @@ -4630,7 +4631,6 @@ private TestableKafkaSupervisor getTestableSupervisorCustomIsTaskCurrent( null, null, null, - false, null ); @@ -4731,6 +4731,7 @@ private KafkaSupervisor getSupervisor( consumerProperties.put("isolation.level", "read_committed"); KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( topic, + null, INPUT_FORMAT, replicas, taskCount, @@ -4747,7 +4748,6 @@ private KafkaSupervisor getSupervisor( null, null, null, - false, null ); From caab61390db2ab4a53718db366ed8d611157f6db Mon Sep 17 00:00:00 2001 From: Abhishek Agarwal <1477457+abhishekagarwal87@users.noreply.github.com> Date: Wed, 16 Aug 2023 15:02:00 +0530 Subject: [PATCH 3/3] fix spell check --- website/.spelling | 1 + 1 file changed, 1 insertion(+) diff --git a/website/.spelling b/website/.spelling index 26d7049ef44e..5a80fa3dba1d 100644 --- a/website/.spelling +++ b/website/.spelling @@ -328,6 +328,7 @@ gzip gzipped hadoop hasher +hashcode hashtable high-QPS historicals