From e11a581d841c0e16df4f089c81717bb9cc1b1724 Mon Sep 17 00:00:00 2001 From: huxi Date: Sat, 25 Jan 2020 06:47:22 +0800 Subject: [PATCH] KAFKA-9254; Overridden topic configs are reset after dynamic default change (#7870) Currently, when a dynamic change is made to the broker-level default log configuration, existing log configs will be recreated with an empty overridden configs. In such case, when updating dynamic broker configs a second round, the topic-level configs are lost. This can cause unexpected data loss, for example, if the cleanup policy changes from "compact" to "delete." Reviewers: Rajini Sivaram , Jason Gustafson (cherry picked from commit 0e7f867041959c5d77727c7f5ce32d363fa09fc2) --- .../kafka/server/DynamicBrokerConfig.scala | 2 +- .../DynamicBrokerReconfigurationTest.scala | 36 ++++++++++++++++++- 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 5aaec89874af5..e42c8941cc128 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -540,7 +540,7 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi props ++= newBrokerDefaults.asScala props ++= log.config.originals.asScala.filterKeys(log.config.overriddenConfigs.contains) - val logConfig = LogConfig(props.asJava) + val logConfig = LogConfig(props.asJava, log.config.overriddenConfigs) log.updateConfig(newBrokerDefaults.asScala.keySet, logConfig) } } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 5ef7cd2c0d460..8c3a657600fb5 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -292,6 +292,40 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet stopAndVerifyProduceConsume(producerThread, consumerThread) } + @Test + def testConsecutiveConfigChange(): Unit = { + val topic2 = "testtopic2" + val topicProps = new Properties + topicProps.put(KafkaConfig.MinInSyncReplicasProp, "2") + TestUtils.createTopic(zkClient, topic2, 1, replicationFactor = numServers, servers, topicProps) + var log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found")) + assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp)) + assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) + + val props = new Properties + props.put(KafkaConfig.MinInSyncReplicasProp, "3") + // Make a broker-default config + reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MinInSyncReplicasProp, "3")) + // Verify that all broker defaults have been updated again + servers.foreach { server => + props.asScala.foreach { case (k, v) => + assertEquals(s"Not reconfigured $k", v, server.config.originals.get(k).toString) + } + } + + log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found")) + assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp)) + assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) // Verify topic-level config survives + + // Make a second broker-default change + props.clear() + props.put(KafkaConfig.LogRetentionTimeMillisProp, "604800000") + reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogRetentionTimeMillisProp, "604800000")) + log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found")) + assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp)) + assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) // Verify topic-level config still survives + } + @Test def testDefaultTopicConfig(): Unit = { val (producerThread, consumerThread) = startProduceConsume(retries = 0) @@ -498,7 +532,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet val partitions = (0 until numPartitions).map(i => new TopicPartition(topic, i)).filter { tp => zkClient.getLeaderForPartition(tp) == Some(leaderId) } - assertTrue(s"Partitons not found with leader $leaderId", partitions.nonEmpty) + assertTrue(s"Partitions not found with leader $leaderId", partitions.nonEmpty) partitions.foreach { tp => (1 to 2).foreach { i => val replicaFetcherManager = servers(i).replicaManager.replicaFetcherManager