Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,19 @@ object AdminUtils extends Logging {
changeEntityConfig(zkUtils, ConfigType.Client, clientId, configs)
}

/**
* Update the config for a user and create a change notification so the change will propagate to other brokers
*
* @param zkUtils Zookeeper utilities used to write the config to ZK
* @param sanitizedUserPrincipal: The base64 encoded user principal for which configs are being changed
* @param configs: The final set of configs that will be applied to the user. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
*
*/
def changeUserConfig(zkUtils: ZkUtils, sanitizedUserPrincipal: String, configs: Properties) {
changeEntityConfig(zkUtils, ConfigType.User, sanitizedUserPrincipal, configs)
}

/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
*
Expand Down
36 changes: 23 additions & 13 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.Properties
import joptsimple._
import kafka.admin.TopicCommand._
import kafka.log.{Defaults, LogConfig}
import kafka.server.{ClientConfigOverride, ConfigType}
import kafka.server.{ClientConfigOverride, ClientQuotaManagerConfig, ConfigType, QuotaId}
import kafka.utils.{CommandLineUtils, ZkUtils}
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Utils
Expand All @@ -41,7 +41,7 @@ object ConfigCommand {
val opts = new ConfigCommandOptions(args)

if(args.length == 0)
CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity (topics/clients) configs")
CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity (topics/clients/users) configs")

opts.checkArgs()

Expand Down Expand Up @@ -76,12 +76,20 @@ object ConfigCommand {
configs.putAll(configsToBeAdded)
configsToBeDeleted.foreach(config => configs.remove(config))

if (entityType.equals(ConfigType.Topic)) {
AdminUtils.changeTopicConfig(zkUtils, entityName, configs)
println("Updated config for topic: \"%s\".".format(entityName))
} else {
AdminUtils.changeClientIdConfig(zkUtils, entityName, configs)
println("Updated config for clientId: \"%s\".".format(entityName))
entityType match {
case ConfigType.Topic =>
AdminUtils.changeTopicConfig(zkUtils, entityName, configs)
println("Updated config for topic: \"%s\".".format(entityName))
case ConfigType.Client =>
AdminUtils.changeClientIdConfig(zkUtils, entityName, configs)
println("Updated config for clientId: \"%s\".".format(entityName))
case ConfigType.User =>
// Set non-encoded name as property to identify record easily since the path contains base64-encoded name
configs.setProperty("user_principal", entityName)
AdminUtils.changeUserConfig(zkUtils, QuotaId.sanitize(ClientQuotaManagerConfig.User, entityName), configs)
println("Updated config for user principal: \"%s\".".format(entityName))
case _ =>
throw new IllegalArgumentException("Unknown entity type " + entityType)
}
}

Expand Down Expand Up @@ -145,18 +153,20 @@ object ConfigCommand {
.ofType(classOf[String])
val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.")
val describeOpt = parser.accepts("describe", "List configs for the given entity.")
val entityType = parser.accepts("entity-type", "Type of entity (topics/clients)")
val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users)")
.withRequiredArg
.ofType(classOf[String])
val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id)")
val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name)")
.withRequiredArg
.ofType(classOf[String])

val nl = System.getProperty("line.separator")
val addConfig = parser.accepts("add-config", "Key Value pairs configs to add 'k1=v1,k2=v2'. The following is a list of valid configurations: " +
"For entity_type '" + ConfigType.Topic + "': " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl +
"For entity_type '" + ConfigType.Client + "': " + nl + "\t" + ClientConfigOverride.ProducerOverride
+ nl + "\t" + ClientConfigOverride.ConsumerOverride)
+ nl + "\t" + ClientConfigOverride.ConsumerOverride +
"For entity_type '" + ConfigType.User + "': " + nl + "\t" + ClientConfigOverride.ProducerOverride
+ nl + "\t" + ClientConfigOverride.ConsumerOverride)
.withRequiredArg
.ofType(classOf[String])
.withValuesSeparatedBy(',')
Expand Down Expand Up @@ -190,8 +200,8 @@ object ConfigCommand {
throw new IllegalArgumentException("At least one of --add-config or --delete-config must be specified with --alter")
}
val entityTypeVal = options.valueOf(entityType)
if(! entityTypeVal.equals(ConfigType.Topic) && ! entityTypeVal.equals(ConfigType.Client)) {
throw new IllegalArgumentException("--entity-type must be '%s' or '%s'".format(ConfigType.Topic, ConfigType.Client))
if(! entityTypeVal.equals(ConfigType.Topic) && ! entityTypeVal.equals(ConfigType.Client) && !entityTypeVal.equals(ConfigType.User)) {
throw new IllegalArgumentException("--entity-type must be '%s', '%s' or '%s'".format(ConfigType.Topic, ConfigType.Client, ConfigType.User))
}
}
}
Expand Down
121 changes: 82 additions & 39 deletions core/src/main/scala/kafka/server/ClientQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package kafka.server

import javax.xml.bind.DatatypeConverter
import java.nio.charset.StandardCharsets
import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}

import kafka.utils.{ShutdownableThread, Logging}
Expand All @@ -35,19 +37,24 @@ private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor

/**
* Configuration settings for quota management
* @param quotaType Type of quota which indicates whether quotas are applied to user principal or clientIds
* @param quotaBytesPerSecondDefault The default bytes per second quota allocated to any client
* @param numQuotaSamples The number of samples to retain in memory
* @param quotaWindowSizeSeconds The time span of each sample
*
*/
case class ClientQuotaManagerConfig(quotaBytesPerSecondDefault: Long =
case class ClientQuotaManagerConfig(quotaType: String = ClientQuotaManagerConfig.DefaultQuotaType,
quotaBytesPerSecondDefault: Long =
ClientQuotaManagerConfig.QuotaBytesPerSecondDefault,
numQuotaSamples: Int =
ClientQuotaManagerConfig.DefaultNumQuotaSamples,
quotaWindowSizeSeconds: Int =
ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds)

object ClientQuotaManagerConfig {
val User = "user"
val ClientId = "client-id"
val DefaultQuotaType = ClientId
val QuotaBytesPerSecondDefault = Long.MaxValue
// Always have 10 whole windows + 1 current window
val DefaultNumQuotaSamples = 11
Expand All @@ -56,6 +63,22 @@ object ClientQuotaManagerConfig {
val InactiveSensorExpirationTimeSeconds = 3600
}

object QuotaId {
def sanitize(quotaType: String, quotaId: String) = {
if (quotaType == ClientQuotaManagerConfig.ClientId)
quotaId
else
DatatypeConverter.printBase64Binary(quotaId.getBytes(StandardCharsets.UTF_8))
}

def fromSanitizedId(quotaType: String, sanitizedId: String) = {
if (quotaType == ClientQuotaManagerConfig.ClientId)
sanitizedId
else
new String(DatatypeConverter.parseBase64Binary(sanitizedId), StandardCharsets.UTF_8)
}
}

/**
* Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics
* for all clients.
Expand All @@ -70,6 +93,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val time: Time) extends Logging {
private val overriddenQuota = new ConcurrentHashMap[String, Quota]()
private val defaultQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
private val metricsTag = config.quotaType
private val lock = new ReentrantReadWriteLock()
private val delayQueue = new DelayQueue[ThrottledResponse]()
val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
Expand Down Expand Up @@ -107,8 +131,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* @return Number of milliseconds to delay the response in case of Quota violation.
* Zero otherwise
*/
def recordAndMaybeThrottle(clientId: String, value: Int, callback: Int => Unit): Int = {
val clientSensors = getOrCreateQuotaSensors(clientId)
def recordAndMaybeThrottle(userPrincipal: String, clientId: String, value: Int, callback: Int => Unit): Int = {
val quotaEntity = QuotaEntity(if (config.quotaType == ClientQuotaManagerConfig.ClientId) clientId else userPrincipal)
val clientSensors = getOrCreateQuotaSensors(quotaEntity)
var throttleTimeMs = 0
try {
clientSensors.quotaSensor.record(value)
Expand All @@ -117,8 +142,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
} catch {
case qve: QuotaViolationException =>
// Compute the delay
val clientMetric = metrics.metrics().get(clientRateMetricName(clientId))
throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId)))
val clientMetric = metrics.metrics().get(quotaEntity.clientRateMetricName)
throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(quotaEntity.quotaId)))
clientSensors.throttleTimeSensor.record(throttleTimeMs)
// If delayed, add the element to the delayQueue
delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
Expand All @@ -128,6 +153,13 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
throttleTimeMs
}

/**
* Returns the quota for the client with the specified quota id
*/
def quota(quotaId: String) = {
if (overriddenQuota.containsKey(quotaId)) overriddenQuota.get(quotaId) else defaultQuota
}

/*
* This calculates the amount of time needed to bring the metric within quota
* assuming that no new metrics are recorded.
Expand All @@ -153,21 +185,37 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}
}

/**
* Returns the quota for the specified clientId
*/
def quota(clientId: String): Quota =
if (overriddenQuota.containsKey(clientId)) overriddenQuota.get(clientId) else defaultQuota;
case class QuotaEntity(quotaId: String) {
private val sanitizedId = QuotaId.sanitize(config.quotaType, quotaId)

def throttleTimeSensorName: String = apiKey + "ThrottleTime-" + config.quotaType + "-" + sanitizedId

def quotaSensorName: String = apiKey + "-" + config.quotaType + "-" + sanitizedId

def clientRateMetricName: MetricName = {
metrics.metricName("byte-rate", apiKey,
"Tracking byte-rate per " + config.quotaType,
metricsTag, sanitizedId)
}

def throttleMetricName: MetricName = {
metrics.metricName("throttle-time",
apiKey,
"Tracking average throttle-time per " + config.quotaType,
metricsTag,
sanitizedId)
}
}

/*
* This function either returns the sensors for a given client id or creates them if they don't exist
* First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor
*/
private def getOrCreateQuotaSensors(clientId: String): ClientSensors = {
private def getOrCreateQuotaSensors(quotaEntity: QuotaEntity): ClientSensors = {

// Names of the sensors to access
val quotaSensorName = getQuotaSensorName(clientId)
val throttleTimeSensorName = getThrottleTimeSensorName(clientId)
val quotaSensorName = quotaEntity.quotaSensorName
val throttleTimeSensorName = quotaEntity.throttleTimeSensorName
var quotaSensor: Sensor = null
var throttleTimeSensor: Sensor = null

Expand Down Expand Up @@ -209,19 +257,15 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
throttleTimeSensor = metrics.sensor(throttleTimeSensorName,
null,
ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds)
throttleTimeSensor.add(metrics.metricName("throttle-time",
apiKey,
"Tracking average throttle-time per client",
"client-id",
clientId), new Avg())
throttleTimeSensor.add(quotaEntity.throttleMetricName, new Avg())
}


if (quotaSensor == null) {
quotaSensor = metrics.sensor(quotaSensorName,
getQuotaMetricConfig(quota(clientId)),
getQuotaMetricConfig(quota(quotaEntity.quotaId)),
ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds)
quotaSensor.add(clientRateMetricName(clientId), new Rate())
quotaSensor.add(quotaEntity.clientRateMetricName, new Rate())
}
} finally {
lock.writeLock().unlock()
Expand All @@ -231,10 +275,6 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
ClientSensors(quotaSensor, throttleTimeSensor)
}

private def getThrottleTimeSensorName(clientId: String): String = apiKey + "ThrottleTime-" + clientId

private def getQuotaSensorName(clientId: String): String = apiKey + "-" + clientId

private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
new MetricConfig()
.timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
Expand All @@ -243,45 +283,48 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}

/**
* Overrides quotas per clientId
* @param clientId client to override
* Overrides quotas per clientId or user
* @param quotaId clientId or user principal to override
* @param quota custom quota to apply
*/
def updateQuota(clientId: String, quota: Quota) = {
def updateQuota(quotaId: String, quota: Quota) {
updateQuota(quotaId, quota, isDelete=false)
}

def removeQuota(quotaId: String) {
updateQuota(quotaId, defaultQuota, isDelete=true)
}

private def updateQuota(quotaId: String, quota: Quota, isDelete: Boolean) = {
/*
* Acquire the write lock to apply changes in the quota objects.
* This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists).
* If the KafkaMetric hasn't been created, the most recent value will be used from the overriddenQuota map.
* The write lock prevents quota update and creation at the same time. It also guards against concurrent quota change
* notifications
*/
val quotaEntity = QuotaEntity(quotaId)
lock.writeLock().lock()
try {
logger.info(s"Changing quota for clientId $clientId to ${quota.bound()}")
logger.info(s"Changing quota for quota-type ${config.quotaType} quota-id ${quotaId} to ${quota.bound}")

if (quota.equals(defaultQuota))
this.overriddenQuota.remove(clientId)
if (isDelete)
this.overriddenQuota.remove(quotaId)
else
this.overriddenQuota.put(clientId, quota)
this.overriddenQuota.put(quotaId, quota)

// Change the underlying metric config if the sensor has been created
val allMetrics = metrics.metrics()
val quotaMetricName = clientRateMetricName(clientId)
val quotaMetricName = quotaEntity.clientRateMetricName
if (allMetrics.containsKey(quotaMetricName)) {
logger.info(s"Sensor for clientId $clientId already exists. Changing quota to ${quota.bound()} in MetricConfig")
logger.info(s"Sensor for quota-type ${config.quotaType} quota-id ${quotaId} already exists. Changing quota to ${quota.bound} in MetricConfig")
allMetrics.get(quotaMetricName).config(getQuotaMetricConfig(quota))
}
} finally {
lock.writeLock().unlock()
}
}

private def clientRateMetricName(clientId: String): MetricName = {
metrics.metricName("byte-rate", apiKey,
"Tracking byte-rate per client",
"client-id", clientId)
}

def shutdown() = {
throttledRequestReaper.shutdown()
}
Expand Down
27 changes: 15 additions & 12 deletions core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,21 +72,24 @@ object ClientConfigOverride {
}

/**
* The ClientIdConfigHandler will process clientId config changes in ZK.
* The callback provides the clientId and the full properties set read from ZK.
* The QuotaConfigHandler will process clientId or user principal quota config changes in ZK.
* The callback provides the clientId or base64-encoded user principal and the full properties set read from ZK.
* This implementation reports the overrides to the respective ClientQuotaManager objects
*/
class ClientIdConfigHandler(private val quotaManagers: Map[Short, ClientQuotaManager]) extends ConfigHandler {
class QuotaConfigHandler(private val quotaType: String, private val quotaManagers: Map[Short, ClientQuotaManager]) extends ConfigHandler {

def processConfigChanges(clientId: String, clientConfig: Properties) = {
if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) {
quotaManagers(ApiKeys.PRODUCE.id).updateQuota(clientId,
new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true))
}
def processConfigChanges(sanitizedQuotaId: String, quotaConfig: Properties) = {
val quotaId = QuotaId.fromSanitizedId(quotaType, sanitizedQuotaId)
if (quotaConfig.containsKey(ClientConfigOverride.ProducerOverride)) {
quotaManagers(ApiKeys.PRODUCE.id).updateQuota(quotaId,
new Quota(quotaConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true))
} else
quotaManagers(ApiKeys.PRODUCE.id).removeQuota(quotaId)

if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) {
quotaManagers(ApiKeys.FETCH.id).updateQuota(clientId,
new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true))
}
if (quotaConfig.containsKey(ClientConfigOverride.ConsumerOverride)) {
quotaManagers(ApiKeys.FETCH.id).updateQuota(quotaId,
new Quota(quotaConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true))
} else
quotaManagers(ApiKeys.FETCH.id).removeQuota(quotaId)
}
}
Loading