From 9ffba6703df5eeba73373fb3a4e7daab2d3e446c Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 28 Apr 2016 10:22:17 -0700 Subject: [PATCH 1/4] KAFKA-3636: change default max session timeout to 3 minutes --- .../src/test/scala/integration/kafka/api/BaseConsumerTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index 23fcfa61570a1..d5b2952f1bb99 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -48,7 +48,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout - this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000") + this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "180000") this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") From b379915aba54d42004fe9f0c3497919442e61780 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 28 Apr 2016 11:15:51 -0700 Subject: [PATCH 2/4] change the right default setting --- core/src/main/scala/kafka/server/KafkaConfig.scala | 12 ++++++------ .../integration/kafka/api/BaseConsumerTest.scala | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 5e28bd7d16c07..373cbfdc857b4 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -130,9 +130,9 @@ object Defaults { val ControlledShutdownRetryBackoffMs = 5000 val ControlledShutdownEnable = true - /** ********* Consumer coordinator configuration ***********/ - val ConsumerMinSessionTimeoutMs = 6000 - val ConsumerMaxSessionTimeoutMs = 30000 + /** ********* Group coordinator configuration ***********/ + val GroupMinSessionTimeoutMs = 6000 + val GroupMaxSessionTimeoutMs = 180000 /** ********* Offset management configuration ***********/ val OffsetMetadataMaxSize = OffsetConfig.DefaultMaxMetadataSize @@ -371,7 +371,7 @@ object KafkaConfig { val AuthorizerClassNameDoc = "The authorizer class that should be used for authorization" /** ********* Socket Server Configuration ***********/ val PortDoc = "DEPRECATED: only used when `listeners` is not set. " + - "Use `listeners` instead. \n" + + "Use `listeners` instead. \n" + "the port to listen and accept connections on" val HostNameDoc = "DEPRECATED: only used when `listeners` is not set. " + "Use `listeners` instead. \n" + @@ -662,8 +662,8 @@ object KafkaConfig { .define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM, ControlledShutdownEnableDoc) /** ********* Consumer coordinator configuration ***********/ - .define(GroupMinSessionTimeoutMsProp, INT, Defaults.ConsumerMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc) - .define(GroupMaxSessionTimeoutMsProp, INT, Defaults.ConsumerMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc) + .define(GroupMinSessionTimeoutMsProp, INT, Defaults.GroupMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc) + .define(GroupMaxSessionTimeoutMsProp, INT, Defaults.GroupMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc) /** ********* Offset management configuration ***********/ .define(OffsetMetadataMaxSizeProp, INT, Defaults.OffsetMetadataMaxSize, HIGH, OffsetMetadataMaxSizeDoc) diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala index d5b2952f1bb99..23fcfa61570a1 100644 --- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala @@ -48,7 +48,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging { this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") // set small enough session timeout - this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "180000") + this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000") this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") From 454da6ef35efff9a26182b4d3ec613c56e9c27cc Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 28 Apr 2016 12:23:48 -0700 Subject: [PATCH 3/4] change default max session timeout to 5 minutes --- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 373cbfdc857b4..61448c080d9bb 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -132,7 +132,7 @@ object Defaults { /** ********* Group coordinator configuration ***********/ val GroupMinSessionTimeoutMs = 6000 - val GroupMaxSessionTimeoutMs = 180000 + val GroupMaxSessionTimeoutMs = 300000 /** ********* Offset management configuration ***********/ val OffsetMetadataMaxSize = OffsetConfig.DefaultMaxMetadataSize From b1a295942800e1f27727b706fb8bf09a5b230091 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 28 Apr 2016 13:33:28 -0700 Subject: [PATCH 4/4] more missed renaming --- core/src/main/scala/kafka/server/KafkaConfig.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 61448c080d9bb..c5da55d4acac6 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -484,8 +484,8 @@ object KafkaConfig { val ControlledShutdownRetryBackoffMsDoc = "Before each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying." val ControlledShutdownEnableDoc = "Enable controlled shutdown of the server" /** ********* Consumer coordinator configuration ***********/ - val ConsumerMinSessionTimeoutMsDoc = "The minimum allowed session timeout for registered consumers. Shorter timeouts leader to quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources." - val ConsumerMaxSessionTimeoutMsDoc = "The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures." + val GroupMinSessionTimeoutMsDoc = "The minimum allowed session timeout for registered consumers. Shorter timeouts leader to quicker failure detection at the cost of more frequent consumer heartbeating, which can overwhelm broker resources." + val GroupMaxSessionTimeoutMsDoc = "The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages in between heartbeats at the cost of a longer time to detect failures." /** ********* Offset management configuration ***********/ val OffsetMetadataMaxSizeDoc = "The maximum size for a metadata entry associated with an offset commit" val OffsetsLoadBufferSizeDoc = "Batch size for reading from the offsets segments when loading offsets into the cache." @@ -661,9 +661,9 @@ object KafkaConfig { .define(ControlledShutdownRetryBackoffMsProp, LONG, Defaults.ControlledShutdownRetryBackoffMs, MEDIUM, ControlledShutdownRetryBackoffMsDoc) .define(ControlledShutdownEnableProp, BOOLEAN, Defaults.ControlledShutdownEnable, MEDIUM, ControlledShutdownEnableDoc) - /** ********* Consumer coordinator configuration ***********/ - .define(GroupMinSessionTimeoutMsProp, INT, Defaults.GroupMinSessionTimeoutMs, MEDIUM, ConsumerMinSessionTimeoutMsDoc) - .define(GroupMaxSessionTimeoutMsProp, INT, Defaults.GroupMaxSessionTimeoutMs, MEDIUM, ConsumerMaxSessionTimeoutMsDoc) + /** ********* Group coordinator configuration ***********/ + .define(GroupMinSessionTimeoutMsProp, INT, Defaults.GroupMinSessionTimeoutMs, MEDIUM, GroupMinSessionTimeoutMsDoc) + .define(GroupMaxSessionTimeoutMsProp, INT, Defaults.GroupMaxSessionTimeoutMs, MEDIUM, GroupMaxSessionTimeoutMsDoc) /** ********* Offset management configuration ***********/ .define(OffsetMetadataMaxSizeProp, INT, Defaults.OffsetMetadataMaxSize, HIGH, OffsetMetadataMaxSizeDoc)