Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion core/src/main/scala/kafka/zk/AdminZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
update: Boolean = false) {
validateCreateOrUpdateTopic(topic, partitionReplicaAssignment, config, update)

// Configs only matter if a topic is being created. Changing configs via AlterTopic is not supported
if (!update) {
// write out the config if there is any, this isn't transactional with the partition assignments
zkClient.setOrCreateEntityConfigs(ConfigType.Topic, topic, config)
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/kafka/zk/KafkaZkClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
/**
* Sets or creates the entity znode path with the given configs depending
* on whether it already exists or not.
*
* If this is method is called concurrently, the last writer wins. In cases where we update configs and then
* partition assignment (i.e. create topic), it's possible for one thread to set this and the other to set the
* partition assignment. As such, the recommendation is to never call create topic for the same topic with different
* configs/partition assignment concurrently.
*
* @param rootEntityType entity type
* @param sanitizedEntityName entity name
* @throws KeeperException if there is an error while setting or creating the znode
Expand All @@ -257,16 +263,19 @@ class KafkaZkClient private (zooKeeperClient: ZooKeeperClient, isSecure: Boolean
retryRequestUntilConnected(setDataRequest)
}

def create(configData: Array[Byte]) = {
def createOrSet(configData: Array[Byte]): Unit = {
val path = ConfigEntityZNode.path(rootEntityType, sanitizedEntityName)
createRecursive(path, ConfigEntityZNode.encode(config))
try createRecursive(path, ConfigEntityZNode.encode(config))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second parameter can be replaced with configData, right ?

catch {
case _: NodeExistsException => set(configData).maybeThrow
}
}

val configData = ConfigEntityZNode.encode(config)

val setDataResponse = set(configData)
setDataResponse.resultCode match {
case Code.NONODE => create(configData)
case Code.NONODE => createOrSet(configData)
case _ => setDataResponse.maybeThrow
}
}
Expand Down
26 changes: 25 additions & 1 deletion core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import kafka.utils.TestUtils._
import kafka.utils.{Logging, TestUtils}
import kafka.zk.{AdminZkClient, KafkaZkClient, ZooKeeperTestHarness}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.{InvalidReplicaAssignmentException, InvalidTopicException, TopicExistsException}
import org.apache.kafka.common.metrics.Quota
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.{After, Test}
Expand Down Expand Up @@ -132,7 +134,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
}

@Test
def testConcurrentTopicCreation() {
def testMockedConcurrentTopicCreation() {
val topic = "test.topic"

// simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
Expand All @@ -147,6 +149,28 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
}
}

@Test
def testConcurrentTopicCreation() {
val topic = "test-concurrent-topic-creation"
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
val props = new Properties
props.setProperty(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")
def createTopic(): Unit = {
try adminZkClient.createTopic(topic, 3, 1, props)
catch { case _: TopicExistsException => () }
val (_, partitionAssignment) = zkClient.getPartitionAssignmentForTopics(Set(topic)).head
assertEquals(3, partitionAssignment.size)
partitionAssignment.foreach { case (partition, replicas) =>
assertEquals(s"Unexpected replication factor for $partition", 1, replicas.size)
}
val savedProps = zkClient.getEntityConfigs(ConfigType.Topic, topic)
assertEquals(props, savedProps)
}

TestUtils.assertConcurrent("Concurrent topic creation failed", Seq(createTopic, createTopic),
JTestUtils.DEFAULT_MAX_WAIT_MS.toInt)
}

/**
* This test creates a topic with a few config overrides and checks that the configs are applied to the new topic
* then changes the config and checks that the new values take effect.
Expand Down