diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 53b6dd72a0b6b..33e650faf4a68 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -472,6 +472,19 @@ object AdminUtils extends Logging { changeEntityConfig(zkUtils, ConfigType.Client, clientId, configs) } + /** + * Update the config for a user and create a change notification so the change will propagate to other brokers + * + * @param zkUtils Zookeeper utilities used to write the config to ZK + * @param sanitizedUserPrincipal: The base64 encoded user principal for which configs are being changed + * @param configs: The final set of configs that will be applied to the user. If any new configs need to be added or + * existing configs need to be deleted, it should be done prior to invoking this API + * + */ + def changeUserConfig(zkUtils: ZkUtils, sanitizedUserPrincipal: String, configs: Properties) { + changeEntityConfig(zkUtils, ConfigType.User, sanitizedUserPrincipal, configs) + } + /** * Update the config for an existing topic and create a change notification so the change will propagate to other brokers * diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index eaddd84e538d9..d28d76d3fd1da 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -22,7 +22,7 @@ import java.util.Properties import joptsimple._ import kafka.admin.TopicCommand._ import kafka.log.{Defaults, LogConfig} -import kafka.server.{ClientConfigOverride, ConfigType} +import kafka.server.{ClientConfigOverride, ClientQuotaManagerConfig, ConfigType, QuotaId} import kafka.utils.{CommandLineUtils, ZkUtils} import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.utils.Utils @@ -41,7 +41,7 @@ object ConfigCommand { val opts = new ConfigCommandOptions(args) if(args.length == 0) - CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity (topics/clients) configs") + CommandLineUtils.printUsageAndDie(opts.parser, "Add/Remove entity (topics/clients/users) configs") opts.checkArgs() @@ -76,12 +76,20 @@ object ConfigCommand { configs.putAll(configsToBeAdded) configsToBeDeleted.foreach(config => configs.remove(config)) - if (entityType.equals(ConfigType.Topic)) { - AdminUtils.changeTopicConfig(zkUtils, entityName, configs) - println("Updated config for topic: \"%s\".".format(entityName)) - } else { - AdminUtils.changeClientIdConfig(zkUtils, entityName, configs) - println("Updated config for clientId: \"%s\".".format(entityName)) + entityType match { + case ConfigType.Topic => + AdminUtils.changeTopicConfig(zkUtils, entityName, configs) + println("Updated config for topic: \"%s\".".format(entityName)) + case ConfigType.Client => + AdminUtils.changeClientIdConfig(zkUtils, entityName, configs) + println("Updated config for clientId: \"%s\".".format(entityName)) + case ConfigType.User => + // Set non-encoded name as property to identify record easily since the path contains base64-encoded name + configs.setProperty("user_principal", entityName) + AdminUtils.changeUserConfig(zkUtils, QuotaId.sanitize(ClientQuotaManagerConfig.User, entityName), configs) + println("Updated config for user principal: \"%s\".".format(entityName)) + case _ => + throw new IllegalArgumentException("Unknown entity type " + entityType) } } @@ -145,10 +153,10 @@ object ConfigCommand { .ofType(classOf[String]) val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.") val describeOpt = parser.accepts("describe", "List configs for the given entity.") - val entityType = parser.accepts("entity-type", "Type of entity (topics/clients)") + val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users)") .withRequiredArg .ofType(classOf[String]) - val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id)") + val entityName = parser.accepts("entity-name", "Name of entity (topic name/client id/user principal name)") .withRequiredArg .ofType(classOf[String]) @@ -156,7 +164,9 @@ object ConfigCommand { val addConfig = parser.accepts("add-config", "Key Value pairs configs to add 'k1=v1,k2=v2'. The following is a list of valid configurations: " + "For entity_type '" + ConfigType.Topic + "': " + nl + LogConfig.configNames.map("\t" + _).mkString(nl) + nl + "For entity_type '" + ConfigType.Client + "': " + nl + "\t" + ClientConfigOverride.ProducerOverride - + nl + "\t" + ClientConfigOverride.ConsumerOverride) + + nl + "\t" + ClientConfigOverride.ConsumerOverride + + "For entity_type '" + ConfigType.User + "': " + nl + "\t" + ClientConfigOverride.ProducerOverride + + nl + "\t" + ClientConfigOverride.ConsumerOverride) .withRequiredArg .ofType(classOf[String]) .withValuesSeparatedBy(',') @@ -190,8 +200,8 @@ object ConfigCommand { throw new IllegalArgumentException("At least one of --add-config or --delete-config must be specified with --alter") } val entityTypeVal = options.valueOf(entityType) - if(! entityTypeVal.equals(ConfigType.Topic) && ! entityTypeVal.equals(ConfigType.Client)) { - throw new IllegalArgumentException("--entity-type must be '%s' or '%s'".format(ConfigType.Topic, ConfigType.Client)) + if(! entityTypeVal.equals(ConfigType.Topic) && ! entityTypeVal.equals(ConfigType.Client) && !entityTypeVal.equals(ConfigType.User)) { + throw new IllegalArgumentException("--entity-type must be '%s', '%s' or '%s'".format(ConfigType.Topic, ConfigType.Client, ConfigType.User)) } } } diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 5863c72c03739..9f6c4f9dc08a0 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -16,6 +16,8 @@ */ package kafka.server +import javax.xml.bind.DatatypeConverter +import java.nio.charset.StandardCharsets import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit} import kafka.utils.{ShutdownableThread, Logging} @@ -35,12 +37,14 @@ private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor /** * Configuration settings for quota management + * @param quotaType Type of quota which indicates whether quotas are applied to user principal or clientIds * @param quotaBytesPerSecondDefault The default bytes per second quota allocated to any client * @param numQuotaSamples The number of samples to retain in memory * @param quotaWindowSizeSeconds The time span of each sample * */ -case class ClientQuotaManagerConfig(quotaBytesPerSecondDefault: Long = +case class ClientQuotaManagerConfig(quotaType: String = ClientQuotaManagerConfig.DefaultQuotaType, + quotaBytesPerSecondDefault: Long = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault, numQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples, @@ -48,6 +52,9 @@ case class ClientQuotaManagerConfig(quotaBytesPerSecondDefault: Long = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds) object ClientQuotaManagerConfig { + val User = "user" + val ClientId = "client-id" + val DefaultQuotaType = ClientId val QuotaBytesPerSecondDefault = Long.MaxValue // Always have 10 whole windows + 1 current window val DefaultNumQuotaSamples = 11 @@ -56,6 +63,22 @@ object ClientQuotaManagerConfig { val InactiveSensorExpirationTimeSeconds = 3600 } +object QuotaId { + def sanitize(quotaType: String, quotaId: String) = { + if (quotaType == ClientQuotaManagerConfig.ClientId) + quotaId + else + DatatypeConverter.printBase64Binary(quotaId.getBytes(StandardCharsets.UTF_8)) + } + + def fromSanitizedId(quotaType: String, sanitizedId: String) = { + if (quotaType == ClientQuotaManagerConfig.ClientId) + sanitizedId + else + new String(DatatypeConverter.parseBase64Binary(sanitizedId), StandardCharsets.UTF_8) + } +} + /** * Helper class that records per-client metrics. It is also responsible for maintaining Quota usage statistics * for all clients. @@ -70,6 +93,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, private val time: Time) extends Logging { private val overriddenQuota = new ConcurrentHashMap[String, Quota]() private val defaultQuota = Quota.upperBound(config.quotaBytesPerSecondDefault) + private val metricsTag = config.quotaType private val lock = new ReentrantReadWriteLock() private val delayQueue = new DelayQueue[ThrottledResponse]() val throttledRequestReaper = new ThrottledRequestReaper(delayQueue) @@ -107,8 +131,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * @return Number of milliseconds to delay the response in case of Quota violation. * Zero otherwise */ - def recordAndMaybeThrottle(clientId: String, value: Int, callback: Int => Unit): Int = { - val clientSensors = getOrCreateQuotaSensors(clientId) + def recordAndMaybeThrottle(userPrincipal: String, clientId: String, value: Int, callback: Int => Unit): Int = { + val quotaEntity = QuotaEntity(if (config.quotaType == ClientQuotaManagerConfig.ClientId) clientId else userPrincipal) + val clientSensors = getOrCreateQuotaSensors(quotaEntity) var throttleTimeMs = 0 try { clientSensors.quotaSensor.record(value) @@ -117,8 +142,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } catch { case qve: QuotaViolationException => // Compute the delay - val clientMetric = metrics.metrics().get(clientRateMetricName(clientId)) - throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId))) + val clientMetric = metrics.metrics().get(quotaEntity.clientRateMetricName) + throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(quotaEntity.quotaId))) clientSensors.throttleTimeSensor.record(throttleTimeMs) // If delayed, add the element to the delayQueue delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback)) @@ -128,6 +153,13 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, throttleTimeMs } + /** + * Returns the quota for the client with the specified quota id + */ + def quota(quotaId: String) = { + if (overriddenQuota.containsKey(quotaId)) overriddenQuota.get(quotaId) else defaultQuota + } + /* * This calculates the amount of time needed to bring the metric within quota * assuming that no new metrics are recorded. @@ -153,21 +185,37 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } - /** - * Returns the quota for the specified clientId - */ - def quota(clientId: String): Quota = - if (overriddenQuota.containsKey(clientId)) overriddenQuota.get(clientId) else defaultQuota; + case class QuotaEntity(quotaId: String) { + private val sanitizedId = QuotaId.sanitize(config.quotaType, quotaId) + + def throttleTimeSensorName: String = apiKey + "ThrottleTime-" + config.quotaType + "-" + sanitizedId + + def quotaSensorName: String = apiKey + "-" + config.quotaType + "-" + sanitizedId + + def clientRateMetricName: MetricName = { + metrics.metricName("byte-rate", apiKey, + "Tracking byte-rate per " + config.quotaType, + metricsTag, sanitizedId) + } + + def throttleMetricName: MetricName = { + metrics.metricName("throttle-time", + apiKey, + "Tracking average throttle-time per " + config.quotaType, + metricsTag, + sanitizedId) + } + } /* * This function either returns the sensors for a given client id or creates them if they don't exist * First sensor of the tuple is the quota enforcement sensor. Second one is the throttle time sensor */ - private def getOrCreateQuotaSensors(clientId: String): ClientSensors = { + private def getOrCreateQuotaSensors(quotaEntity: QuotaEntity): ClientSensors = { // Names of the sensors to access - val quotaSensorName = getQuotaSensorName(clientId) - val throttleTimeSensorName = getThrottleTimeSensorName(clientId) + val quotaSensorName = quotaEntity.quotaSensorName + val throttleTimeSensorName = quotaEntity.throttleTimeSensorName var quotaSensor: Sensor = null var throttleTimeSensor: Sensor = null @@ -209,19 +257,15 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, throttleTimeSensor = metrics.sensor(throttleTimeSensorName, null, ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds) - throttleTimeSensor.add(metrics.metricName("throttle-time", - apiKey, - "Tracking average throttle-time per client", - "client-id", - clientId), new Avg()) + throttleTimeSensor.add(quotaEntity.throttleMetricName, new Avg()) } if (quotaSensor == null) { quotaSensor = metrics.sensor(quotaSensorName, - getQuotaMetricConfig(quota(clientId)), + getQuotaMetricConfig(quota(quotaEntity.quotaId)), ClientQuotaManagerConfig.InactiveSensorExpirationTimeSeconds) - quotaSensor.add(clientRateMetricName(clientId), new Rate()) + quotaSensor.add(quotaEntity.clientRateMetricName, new Rate()) } } finally { lock.writeLock().unlock() @@ -231,10 +275,6 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, ClientSensors(quotaSensor, throttleTimeSensor) } - private def getThrottleTimeSensorName(clientId: String): String = apiKey + "ThrottleTime-" + clientId - - private def getQuotaSensorName(clientId: String): String = apiKey + "-" + clientId - private def getQuotaMetricConfig(quota: Quota): MetricConfig = { new MetricConfig() .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS) @@ -243,11 +283,19 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } /** - * Overrides quotas per clientId - * @param clientId client to override + * Overrides quotas per clientId or user + * @param quotaId clientId or user principal to override * @param quota custom quota to apply */ - def updateQuota(clientId: String, quota: Quota) = { + def updateQuota(quotaId: String, quota: Quota) { + updateQuota(quotaId, quota, isDelete=false) + } + + def removeQuota(quotaId: String) { + updateQuota(quotaId, defaultQuota, isDelete=true) + } + + private def updateQuota(quotaId: String, quota: Quota, isDelete: Boolean) = { /* * Acquire the write lock to apply changes in the quota objects. * This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists). @@ -255,20 +303,21 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * The write lock prevents quota update and creation at the same time. It also guards against concurrent quota change * notifications */ + val quotaEntity = QuotaEntity(quotaId) lock.writeLock().lock() try { - logger.info(s"Changing quota for clientId $clientId to ${quota.bound()}") + logger.info(s"Changing quota for quota-type ${config.quotaType} quota-id ${quotaId} to ${quota.bound}") - if (quota.equals(defaultQuota)) - this.overriddenQuota.remove(clientId) + if (isDelete) + this.overriddenQuota.remove(quotaId) else - this.overriddenQuota.put(clientId, quota) + this.overriddenQuota.put(quotaId, quota) // Change the underlying metric config if the sensor has been created val allMetrics = metrics.metrics() - val quotaMetricName = clientRateMetricName(clientId) + val quotaMetricName = quotaEntity.clientRateMetricName if (allMetrics.containsKey(quotaMetricName)) { - logger.info(s"Sensor for clientId $clientId already exists. Changing quota to ${quota.bound()} in MetricConfig") + logger.info(s"Sensor for quota-type ${config.quotaType} quota-id ${quotaId} already exists. Changing quota to ${quota.bound} in MetricConfig") allMetrics.get(quotaMetricName).config(getQuotaMetricConfig(quota)) } } finally { @@ -276,12 +325,6 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } } - private def clientRateMetricName(clientId: String): MetricName = { - metrics.metricName("byte-rate", apiKey, - "Tracking byte-rate per client", - "client-id", clientId) - } - def shutdown() = { throttledRequestReaper.shutdown() } diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index ab1d7825e1832..a3dfc6694fc45 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -72,21 +72,24 @@ object ClientConfigOverride { } /** - * The ClientIdConfigHandler will process clientId config changes in ZK. - * The callback provides the clientId and the full properties set read from ZK. + * The QuotaConfigHandler will process clientId or user principal quota config changes in ZK. + * The callback provides the clientId or base64-encoded user principal and the full properties set read from ZK. * This implementation reports the overrides to the respective ClientQuotaManager objects */ -class ClientIdConfigHandler(private val quotaManagers: Map[Short, ClientQuotaManager]) extends ConfigHandler { +class QuotaConfigHandler(private val quotaType: String, private val quotaManagers: Map[Short, ClientQuotaManager]) extends ConfigHandler { - def processConfigChanges(clientId: String, clientConfig: Properties) = { - if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) { - quotaManagers(ApiKeys.PRODUCE.id).updateQuota(clientId, - new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true)) - } + def processConfigChanges(sanitizedQuotaId: String, quotaConfig: Properties) = { + val quotaId = QuotaId.fromSanitizedId(quotaType, sanitizedQuotaId) + if (quotaConfig.containsKey(ClientConfigOverride.ProducerOverride)) { + quotaManagers(ApiKeys.PRODUCE.id).updateQuota(quotaId, + new Quota(quotaConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true)) + } else + quotaManagers(ApiKeys.PRODUCE.id).removeQuota(quotaId) - if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) { - quotaManagers(ApiKeys.FETCH.id).updateQuota(clientId, - new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true)) - } + if (quotaConfig.containsKey(ClientConfigOverride.ConsumerOverride)) { + quotaManagers(ApiKeys.FETCH.id).updateQuota(quotaId, + new Quota(quotaConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true)) + } else + quotaManagers(ApiKeys.FETCH.id).removeQuota(quotaId) } } diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala index eb406affa7242..4f8c8acc7545d 100644 --- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -37,6 +37,7 @@ import org.I0Itec.zkclient.{IZkStateListener, IZkChildListener, ZkClient} object ConfigType { val Topic = "topics" val Client = "clients" + val User = "users" } /** @@ -79,6 +80,19 @@ class DynamicConfigManager(private val zkUtils: ZkUtils, private val time: Time = SystemTime) extends Logging { private var lastExecutedChange = -1L + // Apply all existing client/user configs to the ClientIdConfigHandler/UserConfigHandler to bootstrap the overrides + configHandlers.foreach { + case (ConfigType.Client, handler) => + AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach { + case (clientId, properties) => handler.processConfigChanges(clientId, properties) + } + case (ConfigType.User, handler) => + AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.User).foreach { + case (userPrincipal, properties) => handler.processConfigChanges(userPrincipal, properties) + } + case _ => + } + object ConfigChangedNotificationHandler extends NotificationHandler { override def processNotification(json: String) = { Json.parseFull(json) match { @@ -92,6 +106,7 @@ class DynamicConfigManager(private val zkUtils: ZkUtils, val entityType = map.get("entity_type") match { case Some(ConfigType.Topic) => ConfigType.Topic case Some(ConfigType.Client) => ConfigType.Client + case Some(ConfigType.User) => ConfigType.User case _ => throw new IllegalArgumentException("Config change notification must have 'entity_type' set to either 'client' or 'topic'." + " Received: " + json) } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 1edc16242db5c..220f87aff2614 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -386,6 +386,7 @@ class KafkaApis(val requestChannel: RequestChannel, request.apiRemoteCompleteTimeMs = SystemTime.milliseconds quotaManagers(ApiKeys.PRODUCE.id).recordAndMaybeThrottle( + request.session.principal.getName, request.header.clientId, numBytesAppended, produceResponseCallback) @@ -482,7 +483,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (fetchRequest.isFromFollower) { fetchResponseCallback(0) } else { - quotaManagers(ApiKeys.FETCH.id).recordAndMaybeThrottle(fetchRequest.clientId, + quotaManagers(ApiKeys.FETCH.id).recordAndMaybeThrottle(request.session.principal.getName, fetchRequest.clientId, FetchResponse.responseSize(mergedPartitionData.groupBy(_._1.topic), fetchRequest.versionId), fetchResponseCallback) @@ -971,12 +972,14 @@ class KafkaApis(val requestChannel: RequestChannel, */ private def instantiateQuotaManagers(cfg: KafkaConfig): Map[Short, ClientQuotaManager] = { val producerQuotaManagerCfg = ClientQuotaManagerConfig( + quotaType = cfg.quotaType, quotaBytesPerSecondDefault = cfg.producerQuotaBytesPerSecondDefault, numQuotaSamples = cfg.numQuotaSamples, quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds ) val consumerQuotaManagerCfg = ClientQuotaManagerConfig( + quotaType = cfg.quotaType, quotaBytesPerSecondDefault = cfg.consumerQuotaBytesPerSecondDefault, numQuotaSamples = cfg.numQuotaSamples, quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index a6644847f41e2..0ca1184875a4d 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -151,6 +151,7 @@ object Defaults { val ConsumerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault val NumQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples val QuotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds + val QuotaType = ClientQuotaManagerConfig.ClientId val DeleteTopicEnable = false @@ -311,6 +312,7 @@ object KafkaConfig { val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default" val NumQuotaSamplesProp = "quota.window.num" val QuotaWindowSizeSecondsProp = "quota.window.size.seconds" + val QuotaTypeProp = "quota.type" val DeleteTopicEnableProp = "delete.topic.enable" val CompressionTypeProp = "compression.type" @@ -507,6 +509,7 @@ object KafkaConfig { val ConsumerQuotaBytesPerSecondDefaultDoc = "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second" val NumQuotaSamplesDoc = "The number of samples to retain in memory" val QuotaWindowSizeSecondsDoc = "The time span of each sample" + val QuotaTypeDoc = "Selects the entity to which quotas are applied. Valid values are 'client-id' and 'user'" val DeleteTopicEnableDoc = "Enables delete topic. Delete topic through the admin tool will have no effect if this config is turned off" val CompressionTypeDoc = "Specify the final compression type for a given topic. This configuration accepts the standard compression codecs " + @@ -689,6 +692,7 @@ object KafkaConfig { .define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc) .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc) .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc) + .define(QuotaTypeProp, STRING, Defaults.QuotaType, in(ClientQuotaManagerConfig.ClientId, ClientQuotaManagerConfig.User), LOW, QuotaTypeDoc) /** ********* SSL Configuration ****************/ @@ -919,6 +923,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp) val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp) val quotaWindowSizeSeconds = getInt(KafkaConfig.QuotaWindowSizeSecondsProp) + val quotaType = getString(KafkaConfig.QuotaTypeProp) val deleteTopicEnable = getBoolean(KafkaConfig.DeleteTopicEnableProp) val compressionType = getString(KafkaConfig.CompressionTypeProp) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index f95d9ef74039d..a9d7c8dc198d8 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -218,14 +218,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr Mx4jLoader.maybeLoad() /* start dynamic config manager */ - dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config), - ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers)) - - // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides - // TODO: Move this logic to DynamicConfigManager - AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach { - case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties) + val quotaConfigType = config.quotaType match { + case ClientQuotaManagerConfig.ClientId => ConfigType.Client + case ClientQuotaManagerConfig.User => ConfigType.User + case quotaType => throw new IllegalArgumentException(s"Invalid quota type $quotaType") } + dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config), + quotaConfigType -> new QuotaConfigHandler(config.quotaType, apis.quotaManagers)) // Create the config manager. start listening to notifications dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers) diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotasTest.scala similarity index 58% rename from core/src/test/scala/integration/kafka/api/QuotasTest.scala rename to core/src/test/scala/integration/kafka/api/BaseQuotasTest.scala index b6a0ae5a6d6fa..b884f329cca6a 100644 --- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotasTest.scala @@ -16,10 +16,9 @@ package kafka.api import java.util.Properties -import kafka.admin.AdminUtils import kafka.consumer.SimpleConsumer import kafka.integration.KafkaServerTestHarness -import kafka.server.{ClientQuotaManager, ClientConfigOverride, KafkaConfig, KafkaServer} +import kafka.server.{ClientQuotaManager, ClientConfigOverride, KafkaConfig, KafkaServer, QuotaId} import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer._ @@ -27,38 +26,42 @@ import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics.{Quota, KafkaMetric} import org.apache.kafka.common.protocol.ApiKeys -import org.junit.Assert.assertEquals -import org.junit.Assert.assertTrue +import org.junit.Assert._ import org.junit.{After, Before, Test} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.collection.Map import scala.collection.mutable +import kafka.server.ClientQuotaManagerConfig + +abstract class BaseQuotasTest extends IntegrationTestHarness { + + def quotaType: String + def clientPrincipal : String + def producerQuotaId : String + def consumerQuotaId : String + def changeQuota(quotaId: String, props: Properties) + + override val serverCount = 2 + val producerCount = 0 + val consumerCount = 0 -class QuotasTest extends KafkaServerTestHarness { private val producerBufferSize = 300000 - private val producerId1 = "QuotasTestProducer-1" - private val producerId2 = "QuotasTestProducer-2" - private val consumerId1 = "QuotasTestConsumer-1" - private val consumerId2 = "QuotasTestConsumer-2" + protected val producerClientId = "QuotasTestProducer-1" + protected val consumerClientId = "QuotasTestConsumer-1" - val numServers = 2 - val overridingProps = new Properties() + this.serverConfig.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "2") + this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") + this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "100") + this.serverConfig.setProperty(KafkaConfig.GroupMaxSessionTimeoutMsProp, "30000") // Low enough quota that a producer sending a small payload in a tight loop should get throttled - overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "8000") - overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, "2500") - - override def generateConfigs() = { - FixedPortTestUtils.createBrokerConfigs(numServers, - zkConnect, - enableControlledShutdown = false) - .map(KafkaConfig.fromProps(_, overridingProps)) - } + this.serverConfig.setProperty(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "8000") + this.serverConfig.setProperty(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, "2500") + this.serverConfig.setProperty(KafkaConfig.QuotaTypeProp, quotaType) - var producers = mutable.Buffer[KafkaProducer[Array[Byte], Array[Byte]]]() - var consumers = mutable.Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]() var replicaConsumers = mutable.Buffer[SimpleConsumer]() var leaderNode: KafkaServer = null @@ -68,28 +71,29 @@ class QuotasTest extends KafkaServerTestHarness { @Before override def setUp() { super.setUp() + val producerSecurityProps = TestUtils.producerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties) + val consumerSecurityProps = TestUtils.consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties) val producerProps = new Properties() + producerProps.putAll(producerSecurityProps) producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList) producerProps.put(ProducerConfig.ACKS_CONFIG, "0") producerProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, producerBufferSize.toString) - producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId1) + producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerClientId) producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArraySerializer]) producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps) - producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, producerId2) - producers += new KafkaProducer[Array[Byte], Array[Byte]](producerProps) - val numPartitions = 1 - val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers) + val leaders = TestUtils.createTopic(zkUtils, topic1, numPartitions, serverCount, servers) leaderNode = if (leaders(0).get == servers.head.config.brokerId) servers.head else servers(1) followerNode = if (leaders(0).get != servers.head.config.brokerId) servers.head else servers(1) assertTrue("Leader of all partitions of the topic should exist", leaders.values.forall(leader => leader.isDefined)) // Create consumers val consumerProps = new Properties + consumerProps.putAll(consumerSecurityProps) consumerProps.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "QuotasTest") consumerProps.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) consumerProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") @@ -99,20 +103,14 @@ class QuotasTest extends KafkaServerTestHarness { consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[org.apache.kafka.common.serialization.ByteArrayDeserializer]) - consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId1) + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerClientId) consumers += new KafkaConsumer(consumerProps) // Create replica consumers with the same clientId as the high level consumer. These requests should never be throttled - replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId1) - - consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId2) - consumers += new KafkaConsumer(consumerProps) - replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId2) + replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerClientId) } @After override def tearDown() { - producers.foreach( _.close ) - consumers.foreach( _.close ) replicaConsumers.foreach( _.close ) super.tearDown() } @@ -126,8 +124,8 @@ class QuotasTest extends KafkaServerTestHarness { val producerMetricName = leaderNode.metrics.metricName("throttle-time", ApiKeys.PRODUCE.name, - "Tracking throttle-time per client", - "client-id", producerId1) + "Tracking throttle-time per " + quotaType, + quotaMetricTag, producerMetricTagValue) assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0) // Consumer should read in a bursty manner and get throttled immediately @@ -137,52 +135,88 @@ class QuotasTest extends KafkaServerTestHarness { replicaConsumers.head.fetch(request) val consumerMetricName = leaderNode.metrics.metricName("throttle-time", ApiKeys.FETCH.name, - "Tracking throttle-time per client", - "client-id", consumerId1) + "Tracking throttle-time per " + quotaType, + quotaMetricTag, consumerMetricTagValue) assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0) } @Test def testProducerConsumerOverrideUnthrottled() { - // Give effectively unlimited quota for producerId2 and consumerId2 + // Give effectively unlimited quota for producer and consumer val props = new Properties() props.put(ClientConfigOverride.ProducerOverride, Long.MaxValue.toString) props.put(ClientConfigOverride.ConsumerOverride, Long.MaxValue.toString) - AdminUtils.changeClientIdConfig(zkUtils, producerId2, props) - AdminUtils.changeClientIdConfig(zkUtils, consumerId2, props) + changeQuota(producerQuotaId, props) + changeQuota(consumerQuotaId, props) TestUtils.retry(10000) { val quotaManagers: Map[Short, ClientQuotaManager] = leaderNode.apis.quotaManagers - val overrideProducerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(producerId2) - val overrideConsumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(consumerId2) + val overrideProducerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(producerQuotaId) + val overrideConsumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(consumerQuotaId) - assertEquals(s"ClientId $producerId2 must have unlimited producer quota", Quota.upperBound(Long.MaxValue), overrideProducerQuota) - assertEquals(s"ClientId $consumerId2 must have unlimited consumer quota", Quota.upperBound(Long.MaxValue), overrideConsumerQuota) + assertEquals(s"$quotaType $producerQuotaId must have unlimited producer quota", Quota.upperBound(Long.MaxValue), overrideProducerQuota) + assertEquals(s"$quotaType $consumerQuotaId must have unlimited consumer quota", Quota.upperBound(Long.MaxValue), overrideConsumerQuota) } - val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala val numRecords = 1000 - produce(producers(1), numRecords) + produce(producers.head, numRecords) val producerMetricName = leaderNode.metrics.metricName("throttle-time", ApiKeys.PRODUCE.name, - "Tracking throttle-time per client", - "client-id", producerId2) + "Tracking throttle-time per " + quotaType, + quotaMetricTag, producerMetricTagValue) assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), 0.0) // The "client" consumer does not get throttled. - consume(consumers(1), numRecords) + consume(consumers.head, numRecords) // The replica consumer should not be throttled also. Create a fetch request which will exceed the quota immediately val request = new FetchRequestBuilder().addFetch(topic1, 0, 0, 1024*1024).replicaId(followerNode.config.brokerId).build() - replicaConsumers(1).fetch(request) + replicaConsumers.head.fetch(request) val consumerMetricName = leaderNode.metrics.metricName("throttle-time", ApiKeys.FETCH.name, - "Tracking throttle-time per client", - "client-id", consumerId2) + "Tracking throttle-time per " + quotaType, + quotaMetricTag, consumerMetricTagValue) assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), 0.0) } + @Test + def testQuotaOverrideDelete() { + // Override producer and consumer quotas to unlimited + val props = new Properties() + props.put(ClientConfigOverride.ProducerOverride, Long.MaxValue.toString) + props.put(ClientConfigOverride.ConsumerOverride, Long.MaxValue.toString) + + changeQuota(producerQuotaId, props) + changeQuota(consumerQuotaId, props) + + val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala + val producerMetricName = leaderNode.metrics.metricName("throttle-time", + ApiKeys.PRODUCE.name, + "Tracking throttle-time per " + quotaType, + quotaMetricTag, producerMetricTagValue) + val consumerMetricName = leaderNode.metrics.metricName("throttle-time", + ApiKeys.FETCH.name, + "Tracking throttle-time per " + quotaType, + quotaMetricTag, consumerMetricTagValue) + val numRecords = 1000 + produce(producers.head, numRecords) + assertTrue("Should not have been throttled", allMetrics(producerMetricName).value() == 0) + consume(consumers.head, numRecords) + assertTrue("Should not have been throttled", allMetrics(consumerMetricName).value() == 0) + + // Delete producer and consumer quota overrides. Consumer and producer should now be + // throttled since broker defaults are very small + val emptyProps = new Properties() + changeQuota(producerQuotaId, emptyProps) + changeQuota(consumerQuotaId, emptyProps) + produce(producers.head, numRecords) + + assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0) + consume(consumers.head, numRecords) + assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0) + } + def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = { var numBytesProduced = 0 for (i <- 0 to count) { @@ -204,4 +238,8 @@ class QuotasTest extends KafkaServerTestHarness { } } } + + private def quotaMetricTag = quotaType + private def producerMetricTagValue = QuotaId.sanitize(quotaType, producerQuotaId) + private def consumerMetricTagValue = QuotaId.sanitize(quotaType, consumerQuotaId) } diff --git a/core/src/test/scala/integration/kafka/api/ClientIdQuotasTest.scala b/core/src/test/scala/integration/kafka/api/ClientIdQuotasTest.scala new file mode 100644 index 0000000000000..2120d97a9bfed --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/ClientIdQuotasTest.scala @@ -0,0 +1,39 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package kafka.api + +import java.util.Properties + +import kafka.admin.AdminUtils +import kafka.server.{KafkaConfig, ClientQuotaManagerConfig} +import org.apache.kafka.common.security.auth.KafkaPrincipal + +class ClientIdQuotasTest extends BaseQuotasTest { + + override def generateConfigs() = { + FixedPortTestUtils.createBrokerConfigs(serverCount, + zkConnect, + enableControlledShutdown = false) + .map(KafkaConfig.fromProps(_, this.serverConfig)) + } + + override def quotaType = ClientQuotaManagerConfig.ClientId + override val clientPrincipal = KafkaPrincipal.ANONYMOUS.getName + override val producerQuotaId = producerClientId + override val consumerQuotaId = consumerClientId + override def changeQuota(quotaId: String, props: Properties) { + AdminUtils.changeClientIdConfig(zkUtils, quotaId, props) + } +} diff --git a/core/src/test/scala/integration/kafka/api/UserQuotasTest.scala b/core/src/test/scala/integration/kafka/api/UserQuotasTest.scala new file mode 100644 index 0000000000000..631b3ede6bc0e --- /dev/null +++ b/core/src/test/scala/integration/kafka/api/UserQuotasTest.scala @@ -0,0 +1,41 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package kafka.api + +import java.io.File +import java.util.Properties + +import kafka.admin.AdminUtils + +import kafka.server.{KafkaConfig, ClientQuotaManagerConfig} + +import org.apache.kafka.common.protocol.SecurityProtocol +import kafka.server.QuotaId + +class UserQuotasTest extends BaseQuotasTest with SaslTestHarness { + + override protected def securityProtocol = SecurityProtocol.SASL_SSL + override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) + override protected val zkSaslEnabled = false + override protected val saslProperties = Some(kafkaSaslProperties(kafkaClientSaslMechanism, Some(kafkaServerSaslMechanisms))) + + override def quotaType: String = ClientQuotaManagerConfig.User + override val clientPrincipal = "client" + override val producerQuotaId = clientPrincipal + override val consumerQuotaId = clientPrincipal + override def changeQuota(quotaId: String, props: Properties) { + AdminUtils.changeUserConfig(zkUtils, QuotaId.sanitize(quotaType, quotaId), props) + } +} diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/BaseQuotaManagerTest.scala similarity index 62% rename from core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala rename to core/src/test/scala/unit/kafka/server/BaseQuotaManagerTest.scala index 69e83c03be88e..469700948c5d9 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/BaseQuotaManagerTest.scala @@ -23,10 +23,11 @@ import org.apache.kafka.common.utils.MockTime import org.junit.Assert.{assertEquals, assertTrue} import org.junit.{Before, Test} -class ClientQuotaManagerTest { +abstract class BaseQuotaManagerTest { private val time = new MockTime - private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500) + val quotaType: String + private var config: ClientQuotaManagerConfig = null var numCallbacks: Int = 0 def callback(delayTimeMs: Int) { @@ -36,38 +37,48 @@ class ClientQuotaManagerTest { @Before def beforeMethod() { numCallbacks = 0 + config = ClientQuotaManagerConfig(quotaType = quotaType, quotaBytesPerSecondDefault = 500) + } + + case class Client(id: String) { + val clientId = "client-" + id + val user = "user-" + id + val quotaId = if (quotaType == ClientQuotaManagerConfig.ClientId) clientId else user + def sanitizedQuotaId = QuotaId.sanitize(quotaType, quotaId) } @Test def testQuotaParsing() { val clientMetrics = new ClientQuotaManager(config, newMetrics, "producer", time) + val p1 = Client("p1") + val p2 = Client("p2") // Case 1: Update the quota. Assert that the new quota value is returned - clientMetrics.updateQuota("p1", new Quota(2000, true)) - clientMetrics.updateQuota("p2", new Quota(4000, true)) + clientMetrics.updateQuota(p1.quotaId, new Quota(2000, true)) + clientMetrics.updateQuota(p2.quotaId, new Quota(4000, true)) try { assertEquals("Default producer quota should be 500", new Quota(500, true), clientMetrics.quota("random-client-id")) - assertEquals("Should return the overridden value (2000)", new Quota(2000, true), clientMetrics.quota("p1")) - assertEquals("Should return the overridden value (4000)", new Quota(4000, true), clientMetrics.quota("p2")) + assertEquals("Should return the overridden value (2000)", new Quota(2000, true), clientMetrics.quota(p1.quotaId)) + assertEquals("Should return the overridden value (4000)", new Quota(4000, true), clientMetrics.quota(p2.quotaId)) // p1 should be throttled using the overridden quota - var throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 2500 * config.numQuotaSamples, this.callback) + var throttleTimeMs = clientMetrics.recordAndMaybeThrottle(p1.user, p1.clientId, 2500 * config.numQuotaSamples, this.callback) assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0) // Case 2: Change quota again. The quota should be updated within KafkaMetrics as well since the sensor was created. // p1 should not longer be throttled after the quota change - clientMetrics.updateQuota("p1", new Quota(3000, true)) - assertEquals("Should return the newly overridden value (3000)", new Quota(3000, true), clientMetrics.quota("p1")) + clientMetrics.updateQuota(p1.quotaId, new Quota(3000, true)) + assertEquals("Should return the newly overridden value (3000)", new Quota(3000, true), clientMetrics.quota(p1.quotaId)) - throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, this.callback) + throttleTimeMs = clientMetrics.recordAndMaybeThrottle(p1.user, p1.clientId, 0, this.callback) assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs) // Case 3: Change quota back to default. Should be throttled again - clientMetrics.updateQuota("p1", new Quota(500, true)) - assertEquals("Should return the default value (500)", new Quota(500, true), clientMetrics.quota("p1")) + clientMetrics.updateQuota(p1.quotaId, new Quota(500, true)) + assertEquals("Should return the default value (500)", new Quota(500, true), clientMetrics.quota(p1.quotaId)) - throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, this.callback) + throttleTimeMs = clientMetrics.recordAndMaybeThrottle(p1.user, p1.clientId, 0, this.callback) assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0) } finally { clientMetrics.shutdown() @@ -84,7 +95,7 @@ class ClientQuotaManagerTest { * if we produce under the quota */ for (i <- 0 until 10) { - clientMetrics.recordAndMaybeThrottle("unknown", 400, callback) + clientMetrics.recordAndMaybeThrottle("anonymous", "unknown", 400, callback) time.sleep(1000) } assertEquals(10, numCallbacks) @@ -95,7 +106,7 @@ class ClientQuotaManagerTest { // (600 - quota)/quota*window-size = (600-500)/500*10.5 seconds = 2100 // 10.5 seconds because the last window is half complete time.sleep(500) - val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2300, callback) + val sleepTime = clientMetrics.recordAndMaybeThrottle("anonymous", "unknown", 2300, callback) assertEquals("Should be throttled", 2100, sleepTime) assertEquals(1, queueSizeMetric.value().toInt) @@ -111,12 +122,12 @@ class ClientQuotaManagerTest { // Could continue to see delays until the bursty sample disappears for (i <- 0 until 10) { - clientMetrics.recordAndMaybeThrottle("unknown", 400, callback) + clientMetrics.recordAndMaybeThrottle("anonymous", "unknown", 400, callback) time.sleep(1000) } assertEquals("Should be unthrottled since bursty sample has rolled over", - 0, clientMetrics.recordAndMaybeThrottle("unknown", 0, callback)) + 0, clientMetrics.recordAndMaybeThrottle("anonymous", "unknown", 0, callback)) } finally { clientMetrics.shutdown() } @@ -126,16 +137,18 @@ class ClientQuotaManagerTest { def testExpireThrottleTimeSensor() { val metrics = newMetrics val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time) + val client1 = Client("client1") + val throttleSensorName = s"producerThrottleTime-$quotaType-${client1.sanitizedQuotaId}" try { - clientMetrics.recordAndMaybeThrottle("client1", 100, callback) + clientMetrics.recordAndMaybeThrottle(client1.user, client1.clientId, 100, callback) // remove the throttle time sensor - metrics.removeSensor("producerThrottleTime-client1") + metrics.removeSensor(throttleSensorName) // should not throw an exception even if the throttle time sensor does not exist. - val throttleTime = clientMetrics.recordAndMaybeThrottle("client1", 10000, callback) + val throttleTime = clientMetrics.recordAndMaybeThrottle(client1.user, client1.clientId, 10000, callback) assertTrue("Should be throttled", throttleTime > 0) // the sensor should get recreated - val throttleTimeSensor = metrics.getSensor("producerThrottleTime-client1") - assertTrue("Throttle time sensor should exist", throttleTimeSensor != null) + val throttleTimeSensor = metrics.getSensor(throttleSensorName) + assertTrue(s"Throttle time sensor $throttleSensorName should exist", throttleTimeSensor != null) } finally { clientMetrics.shutdown() } @@ -145,21 +158,24 @@ class ClientQuotaManagerTest { def testExpireQuotaSensors() { val metrics = newMetrics val clientMetrics = new ClientQuotaManager(config, metrics, "producer", time) + val client1 = Client("client1") + val throttleSensorName = s"producerThrottleTime-$quotaType-${client1.sanitizedQuotaId}" + val rateSensorName = s"producer-$quotaType-${client1.sanitizedQuotaId}" try { - clientMetrics.recordAndMaybeThrottle("client1", 100, callback) + clientMetrics.recordAndMaybeThrottle(client1.user, client1.clientId, 100, callback) // remove all the sensors - metrics.removeSensor("producerThrottleTime-client1") - metrics.removeSensor("producer-client1") + metrics.removeSensor(throttleSensorName) + metrics.removeSensor(rateSensorName) // should not throw an exception - val throttleTime = clientMetrics.recordAndMaybeThrottle("client1", 10000, callback) + val throttleTime = clientMetrics.recordAndMaybeThrottle(client1.user, client1.clientId, 10000, callback) assertTrue("Should be throttled", throttleTime > 0) // all the sensors should get recreated - val throttleTimeSensor = metrics.getSensor("producerThrottleTime-client1") - assertTrue("Throttle time sensor should exist", throttleTimeSensor != null) + val throttleTimeSensor = metrics.getSensor(throttleSensorName) + assertTrue(s"Throttle time sensor $throttleSensorName should exist", throttleTimeSensor != null) - val byteRateSensor = metrics.getSensor("producer-client1") - assertTrue("Byte rate sensor should exist", byteRateSensor != null) + val byteRateSensor = metrics.getSensor(rateSensorName) + assertTrue(s"Byte rate sensor $rateSensorName should exist", byteRateSensor != null) } finally { clientMetrics.shutdown() } diff --git a/core/src/test/scala/unit/kafka/server/ClientIdQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientIdQuotaManagerTest.scala new file mode 100644 index 0000000000000..42b69eb02bdb7 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ClientIdQuotaManagerTest.scala @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +class ClientIdQuotaManagerTest extends BaseQuotaManagerTest { + + override val quotaType = ClientQuotaManagerConfig.ClientId +} diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index d1ad3a3e2ab14..5de6f98bc9558 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -67,7 +67,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { AdminUtils.changeClientIdConfig(zkUtils, clientId, props) TestUtils.retry(10000) { - val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler] + val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Client).asInstanceOf[QuotaConfigHandler] val quotaManagers: Map[Short, ClientQuotaManager] = servers(0).apis.quotaManagers val overrideProducerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(clientId) val overrideConsumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(clientId) diff --git a/core/src/test/scala/unit/kafka/server/UserQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/UserQuotaManagerTest.scala new file mode 100644 index 0000000000000..e0d7730f80476 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/UserQuotaManagerTest.scala @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.server + +class UserQuotaManagerTest extends BaseQuotaManagerTest { + + override val quotaType = ClientQuotaManagerConfig.User +}