Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
val numServers = 2
overridingProps.put(KafkaConfig.NumPartitionsProp, 4.toString)
TestUtils.createBrokerConfigs(numServers, zkConnect, false, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = saslProperties).map(KafkaConfig.fromProps(_, overridingProps))
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties).map(KafkaConfig.fromProps(_, overridingProps))
}

private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = _
Expand All @@ -69,7 +69,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {

protected def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
saslProperties = saslProperties, retries = retries, lingerMs = lingerMs, props = props)
saslProperties = clientSaslProperties, retries = retries, lingerMs = lingerMs, props = props)
registerProducer(producer)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
maxBlockMs = 3000L,
securityProtocol = this.securityProtocol,
trustStoreFile = this.trustStoreFile,
saslProperties = this.saslProperties,
saslProperties = this.clientSaslProperties,
props = Some(producerConfig))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class EndToEndClusterIdTest extends KafkaServerTestHarness {

override def generateConfigs() = {
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = saslProperties)
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
cfgs.foreach(_.putAll(serverConfig))
cfgs.map(KafkaConfig.fromProps)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {

override def generateConfigs() = {
val cfgs = TestUtils.createBrokerConfigs(serverCount, zkConnect, interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = saslProperties)
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
cfgs.foreach(_.putAll(serverConfig))
cfgs.map(KafkaConfig.fromProps)
}

@Before
override def setUp() {
val producerSecurityProps = TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)
val consumerSecurityProps = TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)
val producerSecurityProps = TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
val consumerSecurityProps = TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile, clientSaslProperties)
super.setUp()
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer])
Expand All @@ -81,15 +81,15 @@ trait IntegrationTestHarness extends KafkaServerTestHarness {
TestUtils.createNewProducer(brokerList,
securityProtocol = this.securityProtocol,
trustStoreFile = this.trustStoreFile,
saslProperties = this.saslProperties,
saslProperties = this.clientSaslProperties,
props = Some(producerConfig))
}

def createNewConsumer: KafkaConsumer[Array[Byte], Array[Byte]] = {
TestUtils.createNewConsumer(brokerList,
securityProtocol = this.securityProtocol,
trustStoreFile = this.trustStoreFile,
saslProperties = this.saslProperties,
saslProperties = this.clientSaslProperties,
props = Some(consumerConfig))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, CompressionType.GZIP.name)
producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, Long.MaxValue.toString)
val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
saslProperties = saslProperties, retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps))
saslProperties = clientSaslProperties, retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps))
(0 until numRecords).foreach { i =>
producer.send(new ProducerRecord(tp.topic, tp.partition, i.toLong, s"key $i".getBytes, s"value $i".getBytes))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
*/
package kafka.api

import java.io.File
import java.util.Properties

import kafka.utils.{JaasTestUtils,TestUtils}
import kafka.utils.TestUtils
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.errors.GroupAuthorizationException
Expand All @@ -30,24 +29,24 @@ import scala.collection.JavaConverters._

abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms)))
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))

protected def kafkaClientSaslMechanism: String
protected def kafkaServerSaslMechanisms: List[String]

@Before
override def setUp {
startSasl(Both, List(kafkaClientSaslMechanism), kafkaServerSaslMechanisms)
startSasl(Both, kafkaServerSaslMechanisms, Option(kafkaClientSaslMechanism))
super.setUp
}

// Use JAAS configuration properties for clients so that dynamic JAAS configuration is also tested by this set of tests
override protected def setJaasConfiguration(mode: SaslSetupMode, serverMechanisms: List[String], clientMechanisms: List[String],
serverKeytabFile: Option[File] = None, clientKeytabFile: Option[File] = None) {
override protected def setJaasConfiguration(mode: SaslSetupMode, serverMechanisms: List[String], clientMechanism: Option[String]) {
// create static config with client login context with credentials for JaasTestUtils 'client2'
super.setJaasConfiguration(mode, kafkaServerSaslMechanisms, clientMechanisms, serverKeytabFile, clientKeytabFile)
super.setJaasConfiguration(mode, kafkaServerSaslMechanisms, clientMechanism)
// set dynamic properties with credentials for JaasTestUtils 'client1'
val clientLoginContext = JaasTestUtils.clientLoginModule(kafkaClientSaslMechanism, clientKeytabFile)
val clientLoginContext = jaasClientLoginModule(kafkaClientSaslMechanism)
producerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
consumerConfig.put(SaslConfigs.SASL_JAAS_CONFIG, clientLoginContext)
}
Expand All @@ -70,7 +69,7 @@ abstract class SaslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {
val consumer2 = TestUtils.createNewConsumer(brokerList,
securityProtocol = securityProtocol,
trustStoreFile = trustStoreFile,
saslProperties = saslProperties,
saslProperties = clientSaslProperties,
props = Some(consumer2Config))
consumers += consumer2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslTestHarne
override protected val zkSaslEnabled = true
override protected val kafkaClientSaslMechanism = "PLAIN"
override protected val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN")
override protected def allKafkaClientSaslMechanisms = List("PLAIN", "GSSAPI")
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms)))
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))

@Test
def testMultipleBrokerMechanisms() {

val plainSaslProducer = producers.head
val plainSaslConsumer = consumers.head

val gssapiSaslProperties = kafkaSaslProperties("GSSAPI")
val gssapiSaslProperties = kafkaClientSaslProperties("GSSAPI", dynamicJaasConfig = true)
val gssapiSaslProducer = TestUtils.createNewProducer(brokerList,
securityProtocol = this.securityProtocol,
trustStoreFile = this.trustStoreFile,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,6 @@ class SaslPlainPlaintextConsumerTest extends BaseConsumerTest with SaslTestHarne
this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms)))
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
}
37 changes: 24 additions & 13 deletions core/src/test/scala/integration/kafka/api/SaslSetup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,32 +43,36 @@ trait SaslSetup {
private val workDir = TestUtils.tempDir()
private val kdcConf = MiniKdc.createConfig
private var kdc: MiniKdc = null
private var serverKeytabFile: Option[File] = null
private var clientKeytabFile: Option[File] = null

def startSasl(mode: SaslSetupMode = Both, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanisms: List[String]) {
def startSasl(mode: SaslSetupMode = Both, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) {
// Important if tests leak consumers, producers or brokers
LoginManager.closeAll()
val hasKerberos = mode != ZkSasl && (kafkaClientSaslMechanisms.contains("GSSAPI") || kafkaServerSaslMechanisms.contains("GSSAPI"))
val hasKerberos = mode != ZkSasl && (kafkaClientSaslMechanism == Some("GSSAPI") || kafkaServerSaslMechanisms.contains("GSSAPI"))
if (hasKerberos) {
val serverKeytabFile = TestUtils.tempFile()
val clientKeytabFile = TestUtils.tempFile()
setJaasConfiguration(mode, kafkaServerSaslMechanisms, kafkaClientSaslMechanisms, Some(serverKeytabFile), Some(clientKeytabFile))
this.clientKeytabFile = Some(clientKeytabFile)
this.serverKeytabFile = Some(serverKeytabFile)
kdc = new MiniKdc(kdcConf, workDir)
kdc.start()
kdc.createPrincipal(serverKeytabFile, JaasTestUtils.KafkaServerPrincipalUnqualifiedName + "/localhost")
kdc.createPrincipal(clientKeytabFile, JaasTestUtils.KafkaClientPrincipalUnqualifiedName, JaasTestUtils.KafkaClientPrincipalUnqualifiedName2)
} else {
setJaasConfiguration(mode, kafkaServerSaslMechanisms, kafkaClientSaslMechanisms)
this.clientKeytabFile = None
this.serverKeytabFile = None
}
setJaasConfiguration(mode, kafkaServerSaslMechanisms, kafkaClientSaslMechanism)
if (mode == Both || mode == ZkSasl)
System.setProperty("zookeeper.authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider")
}

protected def setJaasConfiguration(mode: SaslSetupMode, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanisms: List[String],
serverKeytabFile: Option[File] = None, clientKeytabFile: Option[File] = None) {
protected def setJaasConfiguration(mode: SaslSetupMode, kafkaServerSaslMechanisms: List[String], kafkaClientSaslMechanism: Option[String]) {
val jaasFile = mode match {
case ZkSasl => JaasTestUtils.writeZkFile()
case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerSaslMechanisms, kafkaClientSaslMechanisms, serverKeytabFile, clientKeytabFile)
case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerSaslMechanisms, kafkaClientSaslMechanisms, serverKeytabFile, clientKeytabFile)
case KafkaSasl => JaasTestUtils.writeKafkaFile(kafkaServerSaslMechanisms, kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile)
case Both => JaasTestUtils.writeZkAndKafkaFiles(kafkaServerSaslMechanisms, kafkaClientSaslMechanism, serverKeytabFile, clientKeytabFile)
}
// This will cause a reload of the Configuration singleton when `getConfiguration` is called
Configuration.setConfiguration(null)
Expand All @@ -85,13 +89,20 @@ trait SaslSetup {
Configuration.setConfiguration(null)
}

def kafkaSaslProperties(clientSaslMechanism: String, serverSaslMechanisms: Option[Seq[String]] = None) = {
def kafkaServerSaslProperties(serverSaslMechanisms: Seq[String], interBrokerSaslMechanism: String) = {
val props = new Properties
props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, interBrokerSaslMechanism)
props.put(SaslConfigs.SASL_ENABLED_MECHANISMS, serverSaslMechanisms.mkString(","))
props
}

def kafkaClientSaslProperties(clientSaslMechanism: String, dynamicJaasConfig: Boolean = false) = {
val props = new Properties
props.put(SaslConfigs.SASL_MECHANISM, clientSaslMechanism)
serverSaslMechanisms.foreach { serverMechanisms =>
props.put(KafkaConfig.SaslMechanismInterBrokerProtocolProp, clientSaslMechanism)
props.put(SaslConfigs.SASL_ENABLED_MECHANISMS, serverMechanisms.mkString(","))
}
if (dynamicJaasConfig)
props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasClientLoginModule(clientSaslMechanism))
props
}

def jaasClientLoginModule(clientSaslMechanism: String): String = JaasTestUtils.clientLoginModule(clientSaslMechanism, clientKeytabFile)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@ trait SaslTestHarness extends ZooKeeperTestHarness with SaslSetup {
protected val kafkaClientSaslMechanism = "GSSAPI"
protected val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)

// Override this list to enable client login modules for multiple mechanisms for testing
// of multi-mechanism brokers with clients using different mechanisms in a single JVM
protected def allKafkaClientSaslMechanisms = List(kafkaClientSaslMechanism)

@Before
override def setUp() {
if (zkSaslEnabled)
startSasl(Both, kafkaServerSaslMechanisms, allKafkaClientSaslMechanisms)
startSasl(Both, kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism))
else
startSasl(KafkaSasl, kafkaServerSaslMechanisms, allKafkaClientSaslMechanisms)
startSasl(KafkaSasl, kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism))
super.setUp
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class SslEndToEndAuthorizationTest extends EndToEndAuthorizationTest {

@Before
override def setUp {
startSasl(ZkSasl, List.empty, List.empty)
startSasl(ZkSasl, List.empty, None)
super.setUp
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ class UserQuotaTest extends BaseQuotaTest with SaslTestHarness {
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks"))
override protected val zkSaslEnabled = false
override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms)))
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))

override val userPrincipal = JaasTestUtils.KafkaClientPrincipalUnqualifiedName2
override val producerQuotaId = QuotaId(Some(userPrincipal), None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ trait KafkaServerTestHarness extends ZooKeeperTestHarness {
protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
protected def listenerName: ListenerName = ListenerName.forSecurityProtocol(securityProtocol)
protected def trustStoreFile: Option[File] = None
protected def saslProperties: Option[Properties] = None
protected def serverSaslProperties: Option[Properties] = None
protected def clientSaslProperties: Option[Properties] = None

@Before
override def setUp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ abstract class BaseRequestTest extends KafkaServerTestHarness {
val props = TestUtils.createBrokerConfigs(numBrokers, zkConnect,
enableControlledShutdown = false, enableDeleteTopic = true,
interBrokerSecurityProtocol = Some(securityProtocol),
trustStoreFile = trustStoreFile, saslProperties = saslProperties)
trustStoreFile = trustStoreFile, saslProperties = serverSaslProperties)
props.foreach(propertyOverrides)
props.map(KafkaConfig.fromProps)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ class SaslApiVersionsRequestTest extends BaseRequestTest with SaslTestHarness {
override protected def securityProtocol = SecurityProtocol.SASL_PLAINTEXT
override protected val kafkaClientSaslMechanism = "PLAIN"
override protected val kafkaServerSaslMechanisms = List("PLAIN")
override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms)))
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
override protected val clientSaslProperties = Some(kafkaClientSaslProperties(kafkaClientSaslMechanism))
override protected val zkSaslEnabled = false
override def numBrokers = 1

Expand Down
Loading