Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,11 @@ trait EndToEndAuthorizationTest extends IntegrationTestHarness with SaslSetup {
override val producerCount = 1
override val consumerCount = 2
override val serverCount = 3
override val setClusterAcl = Some { () =>

override def setAclsBeforeServersStart() {
AclCommand.main(clusterAclArgs)
servers.foreach(s =>
TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, clusterResource)
)
}

val numRecords = 1
val group = "group"
val topic = "e2etopic"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,33 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
var brokerList: String = null
var alive: Array[Boolean] = null
val kafkaPrincipalType = KafkaPrincipal.USER_TYPE
val setClusterAcl: Option[() => Unit] = None

/**
* Implementations must override this method to return a set of KafkaConfigs. This method will be invoked for every
* test and should not reuse previous configurations unless they select their ports randomly when servers are started.
*/
def generateConfigs(): Seq[KafkaConfig]

/**
* Override this in case ACLs must be set before `servers` are started.
*
* This is required in some cases because of the topic creation in the setup of `IntegrationTestHarness`. If the ACLs
* are only set later, tests may fail. The failure could manifest itself as a cluster action
* authorization exception when processing an update metadata request (controller -> broker) or in more obscure
* ways (e.g. __consumer_offsets topic replication fails because the metadata cache has no brokers as a previous
* update metadata request failed due to an authorization exception).
*
* The default implementation of this method is a no-op.
*/
def setAclsBeforeServersStart() {}

def configs: Seq[KafkaConfig] = {
if (instanceConfigs == null)
instanceConfigs = generateConfigs()
instanceConfigs
}

def serverForId(id: Int) = servers.find(s => s.config.brokerId == id)
def serverForId(id: Int): Option[KafkaServer] = servers.find(s => s.config.brokerId == id)

protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
protected def trustStoreFile: Option[File] = None
Expand All @@ -61,23 +73,17 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
@Before
override def setUp() {
super.setUp
if (configs.size <= 0)

if (configs.isEmpty)
throw new KafkaException("Must supply at least one server config.")

// default implementation is a no-op, it is overridden by subclasses if required
setAclsBeforeServersStart()

servers = configs.map(TestUtils.createServer(_)).toBuffer
brokerList = TestUtils.getBrokerListStrFromServers(servers, securityProtocol)
alive = new Array[Boolean](servers.length)
Arrays.fill(alive, true)
// We need to set a cluster ACL in some cases here
// because of the topic creation in the setup of
// IntegrationTestHarness. If we don't, then tests
// fail with a cluster action authorization exception
// when processing an update metadata request
// (controller -> broker).
//
// The following method does nothing by default, but
// if the test case requires setting up a cluster ACL,
// then it needs to be implemented.
setClusterAcl.foreach(_.apply)
}

@After
Expand Down
10 changes: 10 additions & 0 deletions core/src/test/scala/unit/kafka/zk/EmbeddedZookeeper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ class EmbeddedZookeeper() {
def shutdown() {
CoreUtils.swallow(zookeeper.shutdown())
CoreUtils.swallow(factory.shutdown())

def isDown(): Boolean = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a few tests that only extend ZooKeeperTestHarness. Do we need to ensure that ZK is down when shutting down ZooKeeperTestHarness?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ZooKeeperTestHarness.tearDown calls this method:

if (zookeeper != null)
        CoreUtils.swallow(zookeeper.shutdown())

Isn't that good enough?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. The code looks good then.

try {
ZkFourLetterWords.sendStat("127.0.0.1", port, 3000)
false
} catch { case _: Throwable => true }
}

Iterator.continually(isDown()).exists(identity)

Utils.delete(logDir)
Utils.delete(snapshotDir)
}
Expand Down
13 changes: 0 additions & 13 deletions core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,6 @@ trait ZooKeeperTestHarness extends JUnitSuite with Logging {
CoreUtils.swallow(zkUtils.close())
if (zookeeper != null)
CoreUtils.swallow(zookeeper.shutdown())

def isDown(): Boolean = {
try {
ZkFourLetterWords.sendStat("127.0.0.1", zkPort, 3000)
false
} catch { case _: Throwable =>
debug("Server is down")
true
}
}

Iterator.continually(isDown()).exists(identity)

Configuration.setConfiguration(null)
}

Expand Down