From 62678b552f7f2a225569305daf8c74a7f841bca5 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Fri, 13 Jan 2017 19:01:53 +0000 Subject: [PATCH 1/2] KAFKA-4568: Simplify test code for multiple SASL mechanisms --- .../api/SaslEndToEndAuthorizationTest.scala | 12 ++++----- .../api/SaslMultiMechanismConsumerTest.scala | 3 +-- .../integration/kafka/api/SaslSetup.scala | 25 +++++++++++-------- .../kafka/api/SaslTestHarness.scala | 8 ++---- .../api/SslEndToEndAuthorizationTest.scala | 2 +- .../unit/kafka/utils/JaasTestUtils.scala | 14 +++++------ 6 files changed, 31 insertions(+), 33 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala index 992649a0f9eff..380c0522b485d 100644 --- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala @@ -16,10 +16,9 @@ */ package kafka.api -import java.io.File import java.util.Properties -import kafka.utils.{JaasTestUtils,TestUtils} +import kafka.utils.TestUtils import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.errors.GroupAuthorizationException @@ -37,17 +36,16 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { @Before override def setUp { - startSasl(Both, List(kafkaClientSaslMechanism), kafkaServerSaslMechanisms) + startSasl(Both, kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism)) super.setUp } // Use JAAS configuration properties for clients so that dynamic JAAS configuration is also tested by this set of tests - override protected def setJaasConfiguration(mode: SaslSetupMode, serverMechanisms: List[String], clientMechanisms: List[String], - serverKeytabFile: Option[File] = None, clientKeytabFile: Option[File] = None) { + override protected def setJaasConfiguration(mode: SaslSetupMode, serverMechanisms: List[String], clientMechanism: Option[String]) { // create static config with client login context with credentials for JaasTestUtils 'client2' - super.setJaasConfiguration(mode, kafkaServerSaslMechanisms, clientMechanisms, serverKeytabFile, clientKeytabFile) + super.setJaasConfiguration(mode, kafkaServerSaslMechanisms, clientMechanism) // set dynamic properties with credentials for JaasTestUtils 'client1' - val clientLoginContext = JaasTestUtils.clientLoginModule(kafkaClientSaslMechanism, clientKeytabFile) + val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism) producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext) consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext) } diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala index 5814e9463560c..71c922f936eee 100644 --- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala @@ -23,7 +23,6 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslTestHarne override protected val zkSaslEnabled = true override protected val kafkaClientSaslMechanism = "PLAIN" override protected val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN") - override protected def allKafkaClientSaslMechanisms = List("PLAIN", "GSSAPI") this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) @@ -35,7 +34,7 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslTestHarne val plainSaslProducer = producers.head val plainSaslConsumer = consumers.head - val gssapiSaslProperties = kafkaSaslProperties("GSSAPI") + val gssapiSaslProperties = kafkaSaslProperties("GSSAPI", dynamicJaasConfig=true) val gssapiSaslProducer = TestUtils.createNewProducer(brokerList, securityProtocol = this.securityProtocol, trustStoreFile = this.trustStoreFile, diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index c1e2da2cbf797..e0480d8bd2350 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -43,32 +43,33 @@ trait SaslSetup { private val workDir = TestUtils.tempDir() private val kdcConf = MiniKdc.createConfig private var kdc: MiniKdc = null + var serverKeytabFile: Option[File] = None + var clientKeytabFile: Option[File] = None - def startSasl(mode: SaslSetupMode = Both, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanisms: List[String]) { + def startSasl(mode: SaslSetupMode = Both, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) { // Important if tests leak consumers, producers or brokers LoginManager.closeAll() - val hasKerberos = mode != ZkSasl && (kafkaClientSaslMechanisms.contains("GSSAPI") || kafkaServerSaslMechanisms.contains("GSSAPI")) + val hasKerberos = mode != ZkSasl && (kafkaClientSaslMechanism == Some("GSSAPI") || kafkaServerSaslMechanisms.contains("GSSAPI")) if (hasKerberos) { val serverKeytabFile = TestUtils.tempFile() val clientKeytabFile = TestUtils.tempFile() - setJaasConfiguration(mode, kafkaServerSaslMechanisms, kafkaClientSaslMechanisms, Some(serverKeytabFile), Some(clientKeytabFile)) + this.clientKeytabFile = Some(clientKeytabFile) + this.serverKeytabFile = Some(serverKeytabFile) kdc = new MiniKdc(kdcConf, workDir) kdc.start() kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost") kdc.createPrincipal(clientKeytabFile, JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2) - } else { - setJaasConfiguration(mode, kafkaServerSaslMechanisms, kafkaClientSaslMechanisms) } + setJaasConfiguration(mode, kafkaServerSaslMechanisms, kafkaClientSaslMechanism) if (mode == Both || mode == ZkSasl) System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider") } - protected def setJaasConfiguration(mode: SaslSetupMode, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanisms: List[String], - serverKeytabFile: Option[File] = None, clientKeytabFile: Option[File] = None) { + protected def setJaasConfiguration(mode: SaslSetupMode, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) { val jaasFile = mode match { case ZkSasl => JaasTestUtils.writeZkFile() - case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerSaslMechanisms, kafkaClientSaslMechanisms, serverKeytabFile, clientKeytabFile) - case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerSaslMechanisms, kafkaClientSaslMechanisms, serverKeytabFile, clientKeytabFile) + case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerSaslMechanisms, kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile) + case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerSaslMechanisms, kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile) } // This will cause a reload of the Configuration singleton when `getConfiguration` is called Configuration.setConfiguration(null) @@ -85,13 +86,17 @@ trait SaslSetup { Configuration.setConfiguration(null) } - def kafkaSaslProperties(clientSaslMechanism: String, serverSaslMechanisms: Option[Seq[String]] = None) = { + def kafkaSaslProperties(clientSaslMechanism: String, serverSaslMechanisms: Option[Seq[String]] = None, dynamicJaasConfig: Boolean = false) = { val props = new Properties props.put(SaslConfigs.SASL_MECHANISM, clientSaslMechanism) serverSaslMechanisms.foreach { serverMechanisms => props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, clientSaslMechanism) props.put(SaslConfigs.SASL_ENABLED_MECHANISMS, serverMechanisms.mkString(",")) } + if (dynamicJaasConfig) + props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasClientLoginModule(clientSaslMechanism)) props } + + def jaasClientLoginModule(clientSaslMechanism: String): String = JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile) } diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala index 8fd3eb4422e0e..440fdc49e3765 100644 --- a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala @@ -20,16 +20,12 @@ trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup { protected val kafkaClientSaslMechanism = "GSSAPI" protected val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism) - // Override this list to enable client login modules for multiple mechanisms for testing - // of multi-mechanism brokers with clients using different mechanisms in a single JVM - protected def allKafkaClientSaslMechanisms = List(kafkaClientSaslMechanism) - @Before override def setUp() { if (zkSaslEnabled) - startSasl(Both, kafkaServerSaslMechanisms, allKafkaClientSaslMechanisms) + startSasl(Both, kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism)) else - startSasl(KafkaSasl, kafkaServerSaslMechanisms, allKafkaClientSaslMechanisms) + startSasl(KafkaSasl, kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism)) super.setUp } diff --git a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala index 365c0ba5b5886..064e783508130 100644 --- a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala @@ -29,7 +29,7 @@ class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { @Before override def setUp { - startSasl(ZkSasl, List.empty, List.empty) + startSasl(ZkSasl, List.empty, None) super.setUp } } diff --git a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala index 7055b7e2b835b..0949eb793b4e5 100644 --- a/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/JaasTestUtils.scala @@ -128,16 +128,16 @@ object JaasTestUtils { jaasFile.getCanonicalPath } - def writeKafkaFile(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanisms: List[String], serverKeyTabLocation: Option[File], clientKeyTabLocation: Option[File]): String = { + def writeKafkaFile(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], clientKeyTabLocation: Option[File]): String = { val jaasFile = TestUtils.tempFile() - val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanisms, clientKeyTabLocation)) + val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation)) writeToFile(jaasFile, kafkaSections) jaasFile.getCanonicalPath } - def writeZkAndKafkaFiles(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanisms: List[String], serverKeyTabLocation: Option[File], clientKeyTabLocation: Option[File]): String = { + def writeZkAndKafkaFiles(kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String], serverKeyTabLocation: Option[File], clientKeyTabLocation: Option[File]): String = { val jaasFile = TestUtils.tempFile() - val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanisms, clientKeyTabLocation)) + val kafkaSections = Seq(kafkaServerSection(kafkaServerSaslMechanisms, serverKeyTabLocation), kafkaClientSection(kafkaClientSaslMechanism, clientKeyTabLocation)) writeToFile(jaasFile, kafkaSections ++ zkSections) jaasFile.getCanonicalPath } @@ -209,9 +209,9 @@ object JaasTestUtils { /* * Used for the static JAAS configuration and it uses the credentials for client#2 */ - private def kafkaClientSection(mechanisms: List[String], keytabLocation: Option[File]): JaasSection = { - new JaasSection(KafkaClientContextName, mechanisms.map(m => - kafkaClientModule(m, keytabLocation, KafkaClientPrincipal2, KafkaPlainUser2, KafkaPlainPassword2, KafkaScramUser2, KafkaScramPassword2))) + private def kafkaClientSection(mechanism: Option[String], keytabLocation: Option[File]): JaasSection = { + new JaasSection(KafkaClientContextName, mechanism.map(m => + kafkaClientModule(m, keytabLocation, KafkaClientPrincipal2, KafkaPlainUser2, KafkaPlainPassword2, KafkaScramUser2, KafkaScramPassword2)).toSeq) } private def jaasSectionsToString(jaasSections: Seq[JaasSection]): String = From 90db5d48ed5b45a6123e49d4f5d6b1bd02eda780 Mon Sep 17 00:00:00 2001 From: Rajini Sivaram Date: Tue, 17 Jan 2017 20:26:03 +0000 Subject: [PATCH 2/2] KAFKA-4568: Address review comments --- .../kafka/api/BaseProducerSendTest.scala | 4 ++-- .../kafka/api/EndToEndAuthorizationTest.scala | 2 +- .../kafka/api/EndToEndClusterIdTest.scala | 2 +- .../kafka/api/IntegrationTestHarness.scala | 10 +++++----- .../kafka/api/PlaintextConsumerTest.scala | 2 +- .../api/SaslEndToEndAuthorizationTest.scala | 5 +++-- .../api/SaslMultiMechanismConsumerTest.scala | 5 +++-- .../api/SaslPlainPlaintextConsumerTest.scala | 3 ++- .../integration/kafka/api/SaslSetup.scala | 20 ++++++++++++------- .../kafka/api/SaslTestHarness.scala | 4 ++-- .../integration/kafka/api/UserQuotaTest.scala | 3 ++- .../integration/KafkaServerTestHarness.scala | 3 ++- .../unit/kafka/server/BaseRequestTest.scala | 2 +- .../server/SaslApiVersionsRequestTest.scala | 3 ++- 14 files changed, 40 insertions(+), 28 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index 82409bbcd88fc..9ebc7e3bf3b87 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -43,7 +43,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { val numServers = 2 overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString) TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol), - trustStoreFile = trustStoreFile, saslProperties = saslProperties).map(KafkaConfig.fromProps(_, overridingProps)) + trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps)) } private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _ @@ -69,7 +69,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { protected def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = { val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile, - saslProperties = saslProperties, retries = retries, lingerMs = lingerMs, props = props) + saslProperties = clientSaslProperties, retries = retries, lingerMs = lingerMs, props = props) registerProducer(producer) } diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 3e391d3911490..3866cc14e847a 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -169,7 +169,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas maxBlockMs = 3000L, securityProtocol = this.securityProtocol, trustStoreFile = this.trustStoreFile, - saslProperties = this.saslProperties, + saslProperties = this.clientSaslProperties, props = Some(producerConfig)) } diff --git a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala index 9e03e2781cf84..d885d9b2894f9 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndClusterIdTest.scala @@ -104,7 +104,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness { override def generateConfigs() = { val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), - trustStoreFile = trustStoreFile, saslProperties = saslProperties) + trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties) cfgs.foreach(_.putAll(serverConfig)) cfgs.map(KafkaConfig.fromProps) } diff --git a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala index ee556d71d94da..92088f8210cc0 100644 --- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala @@ -47,15 +47,15 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { override def generateConfigs() = { val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol), - trustStoreFile = trustStoreFile, saslProperties = saslProperties) + trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties) cfgs.foreach(_.putAll(serverConfig)) cfgs.map(KafkaConfig.fromProps) } @Before override def setUp() { - val producerSecurityProps = TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties) - val consumerSecurityProps = TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties) + val producerSecurityProps = TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) + val consumerSecurityProps = TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties) super.setUp() producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) @@ -81,7 +81,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { TestUtils.createNewProducer(brokerList, securityProtocol = this.securityProtocol, trustStoreFile = this.trustStoreFile, - saslProperties = this.saslProperties, + saslProperties = this.clientSaslProperties, props = Some(producerConfig)) } @@ -89,7 +89,7 @@ trait IntegrationTestHarness extends KafkaServerTestHarness { TestUtils.createNewConsumer(brokerList, securityProtocol = this.securityProtocol, trustStoreFile = this.trustStoreFile, - saslProperties = this.saslProperties, + saslProperties = this.clientSaslProperties, props = Some(consumerConfig)) } diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala index aefe5bd15ba25..282d67cf5c741 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala @@ -528,7 +528,7 @@ class PlaintextConsumerTest extends BaseConsumerTest { producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name) producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Long.MaxValue.toString) val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile, - saslProperties = saslProperties, retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps)) + saslProperties = clientSaslProperties, retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps)) (0 until numRecords).foreach { i => producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key $i".getBytes, s"value $i".getBytes)) } diff --git a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala index 380c0522b485d..826eb5ce8babf 100644 --- a/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslEndToEndAuthorizationTest.scala @@ -29,7 +29,8 @@ import scala.collection.JavaConverters._ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { override protected def securityProtocol = SecurityProtocol.SASL_SSL - override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms))) + override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) + override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) protected def kafkaClientSaslMechanism: String protected def kafkaServerSaslMechanisms: List[String] @@ -68,7 +69,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest { val consumer2 = TestUtils.createNewConsumer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile, - saslProperties = saslProperties, + saslProperties = clientSaslProperties, props = Some(consumer2Config)) consumers += consumer2 diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala index 71c922f936eee..3ff133f228ac4 100644 --- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala @@ -26,7 +26,8 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslTestHarne this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) - override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms))) + override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) + override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) @Test def testMultipleBrokerMechanisms() { @@ -34,7 +35,7 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslTestHarne val plainSaslProducer = producers.head val plainSaslConsumer = consumers.head - val gssapiSaslProperties = kafkaSaslProperties("GSSAPI", dynamicJaasConfig=true) + val gssapiSaslProperties = kafkaClientSaslProperties("GSSAPI", dynamicJaasConfig = true) val gssapiSaslProducer = TestUtils.createNewProducer(brokerList, securityProtocol = this.securityProtocol, trustStoreFile = this.trustStoreFile, diff --git a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala index bdca577c0e878..125d4318ebe8b 100644 --- a/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslPlainPlaintextConsumerTest.scala @@ -23,5 +23,6 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslTestHarne this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) - override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms))) + override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) + override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) } diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala b/core/src/test/scala/integration/kafka/api/SaslSetup.scala index e0480d8bd2350..36b9d41e0584d 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala @@ -43,8 +43,8 @@ trait SaslSetup { private val workDir = TestUtils.tempDir() private val kdcConf = MiniKdc.createConfig private var kdc: MiniKdc = null - var serverKeytabFile: Option[File] = None - var clientKeytabFile: Option[File] = None + private var serverKeytabFile: Option[File] = null + private var clientKeytabFile: Option[File] = null def startSasl(mode: SaslSetupMode = Both, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) { // Important if tests leak consumers, producers or brokers @@ -59,6 +59,9 @@ trait SaslSetup { kdc.start() kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost") kdc.createPrincipal(clientKeytabFile, JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2) + } else { + this.clientKeytabFile = None + this.serverKeytabFile = None } setJaasConfiguration(mode, kafkaServerSaslMechanisms, kafkaClientSaslMechanism) if (mode == Both || mode == ZkSasl) @@ -86,13 +89,16 @@ trait SaslSetup { Configuration.setConfiguration(null) } - def kafkaSaslProperties(clientSaslMechanism: String, serverSaslMechanisms: Option[Seq[String]] = None, dynamicJaasConfig: Boolean = false) = { + def kafkaServerSaslProperties(serverSaslMechanisms: Seq[String], interBrokerSaslMechanism: String) = { + val props = new Properties + props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, interBrokerSaslMechanism) + props.put(SaslConfigs.SASL_ENABLED_MECHANISMS, serverSaslMechanisms.mkString(",")) + props + } + + def kafkaClientSaslProperties(clientSaslMechanism: String, dynamicJaasConfig: Boolean = false) = { val props = new Properties props.put(SaslConfigs.SASL_MECHANISM, clientSaslMechanism) - serverSaslMechanisms.foreach { serverMechanisms => - props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, clientSaslMechanism) - props.put(SaslConfigs.SASL_ENABLED_MECHANISMS, serverMechanisms.mkString(",")) - } if (dynamicJaasConfig) props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasClientLoginModule(clientSaslMechanism)) props diff --git a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala index 440fdc49e3765..97faa36e7fa2c 100644 --- a/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala +++ b/core/src/test/scala/integration/kafka/api/SaslTestHarness.scala @@ -23,9 +23,9 @@ trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup { @Before override def setUp() { if (zkSaslEnabled) - startSasl(Both, kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism)) + startSasl(Both, kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism)) else - startSasl(KafkaSasl, kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism)) + startSasl(KafkaSasl, kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism)) super.setUp } diff --git a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala index 4677c8ce9ab2b..c8d0a77aac146 100644 --- a/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/UserQuotaTest.scala @@ -29,7 +29,8 @@ class UserQuotaTest extends BaseQuotaTest with SaslTestHarness { override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) override protected val zkSaslEnabled = false - override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms))) + override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) + override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) override val userPrincipal = JaasTestUtils.KafkaClientPrincipalUnqualifiedName2 override val producerQuotaId = QuotaId(Some(userPrincipal), None) diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 270fca2e01624..bfaff0bb343e3 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -75,7 +75,8 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness { protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT protected def listenerName: ListenerName = ListenerName.forSecurityProtocol(securityProtocol) protected def trustStoreFile: Option[File] = None - protected def saslProperties: Option[Properties] = None + protected def serverSaslProperties: Option[Properties] = None + protected def clientSaslProperties: Option[Properties] = None @Before override def setUp() { diff --git a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala index 3825489ddb737..b864e5d0875fb 100644 --- a/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseRequestTest.scala @@ -42,7 +42,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness { val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect, enableControlledShutdown = false, enableDeleteTopic = true, interBrokerSecurityProtocol = Some(securityProtocol), - trustStoreFile = trustStoreFile, saslProperties = saslProperties) + trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties) props.foreach(propertyOverrides) props.map(KafkaConfig.fromProps) } diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 34d7d147c1cb7..07e03e3824ef9 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -33,7 +33,8 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness { override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT override protected val kafkaClientSaslMechanism = "PLAIN" override protected val kafkaServerSaslMechanisms = List("PLAIN") - override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms))) + override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) + override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism)) override protected val zkSaslEnabled = false override def numBrokers = 1