Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ class DynamicLogConfig(logManager: LogManager) extends Reconfigurable with Loggi
props ++= newBrokerDefaults.asScala
props ++= log.config.originals.asScala.filterKeys(log.config.overriddenConfigs.contains)

val logConfig = LogConfig(props.asJava)
val logConfig = LogConfig(props.asJava, log.config.overriddenConfigs)
log.updateConfig(newBrokerDefaults.asScala.keySet, logConfig)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,40 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
stopAndVerifyProduceConsume(producerThread, consumerThread)
}

@Test
def testConsecutiveConfigChange(): Unit = {
val topic2 = "testtopic2"
val topicProps = new Properties
topicProps.put(KafkaConfig.MinInSyncReplicasProp, "2")
TestUtils.createTopic(zkClient, topic2, 1, replicationFactor = numServers, servers, topicProps)
var log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString)

val props = new Properties
props.put(KafkaConfig.MinInSyncReplicasProp, "3")
// Make a broker-default config
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.MinInSyncReplicasProp, "3"))
// Verify that all broker defaults have been updated again
servers.foreach { server =>
props.asScala.foreach { case (k, v) =>
assertEquals(s"Not reconfigured $k", v, server.config.originals.get(k).toString)
}
}

log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) // Verify topic-level config survives

// Make a second broker-default change
props.clear()
props.put(KafkaConfig.LogRetentionTimeMillisProp, "604800000")
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogRetentionTimeMillisProp, "604800000"))
log = servers.head.logManager.getLog(new TopicPartition(topic2, 0)).getOrElse(throw new IllegalStateException("Log not found"))
assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
assertEquals("2", log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) // Verify topic-level config still survives
}

@Test
def testDefaultTopicConfig(): Unit = {
val (producerThread, consumerThread) = startProduceConsume(retries = 0)
Expand Down Expand Up @@ -498,7 +532,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
val partitions = (0 until numPartitions).map(i => new TopicPartition(topic, i)).filter { tp =>
zkClient.getLeaderForPartition(tp) == Some(leaderId)
}
assertTrue(s"Partitons not found with leader $leaderId", partitions.nonEmpty)
assertTrue(s"Partitions not found with leader $leaderId", partitions.nonEmpty)
partitions.foreach { tp =>
(1 to 2).foreach { i =>
val replicaFetcherManager = servers(i).replicaManager.replicaFetcherManager
Expand Down