diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala index 8a6b3ee212d3f..060c0b4d4aec8 100644 --- a/core/src/main/scala/kafka/zk/AdminZkClient.scala +++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala @@ -93,7 +93,6 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging { update: Boolean = false) { validateCreateOrUpdateTopic(topic, partitionReplicaAssignment, config, update) - // Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported if (!update) { // write out the config if there is any, this isn't transactional with the partition assignments zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config) diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala b/core/src/main/scala/kafka/zk/KafkaZkClient.scala index bb342945ea834..ec4932ab47bfb 100644 --- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala +++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala @@ -246,6 +246,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean /** * Sets or creates the entity znode path with the given configs depending * on whether it already exists or not. + * + * If this is method is called concurrently, the last writer wins. In cases where we update configs and then + * partition assignment (i.e. create topic), it's possible for one thread to set this and the other to set the + * partition assignment. As such, the recommendation is to never call create topic for the same topic with different + * configs/partition assignment concurrently. + * * @param rootEntityType entity type * @param sanitizedEntityName entity name * @throws KeeperException if there is an error while setting or creating the znode @@ -257,16 +263,19 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean retryRequestUntilConnected(setDataRequest) } - def create(configData: Array[Byte]) = { + def createOrSet(configData: Array[Byte]): Unit = { val path = ConfigEntityZNode.path(rootEntityType, sanitizedEntityName) - createRecursive(path, ConfigEntityZNode.encode(config)) + try createRecursive(path, ConfigEntityZNode.encode(config)) + catch { + case _: NodeExistsException => set(configData).maybeThrow + } } val configData = ConfigEntityZNode.encode(config) val setDataResponse = set(configData) setDataResponse.resultCode match { - case Code.NONODE => create(configData) + case Code.NONODE => createOrSet(configData) case _ => setDataResponse.maybeThrow } } diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala index fe5fbff55d05d..39745e5e608dd 100644 --- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala @@ -28,8 +28,10 @@ import kafka.utils.TestUtils._ import kafka.utils.{Logging, TestUtils} import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness} import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException} import org.apache.kafka.common.metrics.Quota +import org.apache.kafka.test.{TestUtils => JTestUtils} import org.easymock.EasyMock import org.junit.Assert._ import org.junit.{After, Test} @@ -132,7 +134,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware } @Test - def testConcurrentTopicCreation() { + def testMockedConcurrentTopicCreation() { val topic = "test.topic" // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes @@ -147,6 +149,28 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware } } + @Test + def testConcurrentTopicCreation() { + val topic = "test-concurrent-topic-creation" + TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4)) + val props = new Properties + props.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2") + def createTopic(): Unit = { + try adminZkClient.createTopic(topic, 3, 1, props) + catch { case _: TopicExistsException => () } + val (_, partitionAssignment) = zkClient.getPartitionAssignmentForTopics(Set(topic)).head + assertEquals(3, partitionAssignment.size) + partitionAssignment.foreach { case (partition, replicas) => + assertEquals(s"Unexpected replication factor for $partition", 1, replicas.size) + } + val savedProps = zkClient.getEntityConfigs(ConfigType.Topic, topic) + assertEquals(props, savedProps) + } + + TestUtils.assertConcurrent("Concurrent topic creation failed", Seq(createTopic, createTopic), + JTestUtils.DEFAULT_MAX_WAIT_MS.toInt) + } + /** * This test creates a topic with a few config overrides and checks that the configs are applied to the new topic * then changes the config and checks that the new values take effect.