From c495e76789fb7fccf22b0c2e7304516a236bf8af Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 12 Oct 2015 11:39:20 -0700 Subject: [PATCH 01/10] KAFKA-2209 - Dynamically change quotas per clientId --- .../kafka/server/ClientQuotaManager.scala | 66 +++++++++++------- .../scala/kafka/server/ConfigHandler.scala | 27 ++++++-- .../kafka/server/DynamicConfigManager.scala | 4 +- .../main/scala/kafka/server/KafkaApis.scala | 12 ++-- .../main/scala/kafka/server/KafkaConfig.scala | 12 ---- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../integration/kafka/api/QuotasTest.scala | 4 +- .../kafka/server/ClientQuotaManagerTest.scala | 68 ++++--------------- .../server/DynamicConfigChangeTest.scala | 22 +++--- .../unit/kafka/server/KafkaConfigTest.scala | 2 - 10 files changed, 104 insertions(+), 115 deletions(-) diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 24f294d2b43df..f24d70a8e0326 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -16,7 +16,7 @@ */ package kafka.server -import java.util.concurrent.{DelayQueue, TimeUnit} +import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit} import kafka.utils.{ShutdownableThread, Logging} import org.apache.kafka.common.MetricName @@ -36,15 +36,12 @@ private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor /** * Configuration settings for quota management * @param quotaBytesPerSecondDefault The default bytes per second quota allocated to any client - * @param quotaBytesPerSecondOverrides The comma separated overrides per client. "c1=X,c2=Y" * @param numQuotaSamples The number of samples to retain in memory * @param quotaWindowSizeSeconds The time span of each sample * */ case class ClientQuotaManagerConfig(quotaBytesPerSecondDefault: Long = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault, - quotaBytesPerSecondOverrides: String = - ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides, numQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples, quotaWindowSizeSeconds: Int = @@ -52,7 +49,6 @@ case class ClientQuotaManagerConfig(quotaBytesPerSecondDefault: Long = object ClientQuotaManagerConfig { val QuotaBytesPerSecondDefault = Long.MaxValue - val QuotaBytesPerSecondOverrides = "" // Always have 10 whole windows + 1 current window val DefaultNumQuotaSamples = 11 val DefaultQuotaWindowSizeSeconds = 1 @@ -73,7 +69,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, private val metrics: Metrics, private val apiKey: String, private val time: Time) extends Logging { - private val overriddenQuota = initQuotaMap(config.quotaBytesPerSecondOverrides) + private val overriddenQuota = new ConcurrentHashMap[String, Quota]() private val defaultQuota = Quota.lessThan(config.quotaBytesPerSecondDefault) private val lock = new ReentrantReadWriteLock() private val delayQueue = new DelayQueue[ThrottledResponse]() @@ -163,7 +159,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * Returns the consumer quota for the specified clientId * @return */ - private[server] def quota(clientId: String): Quota = overriddenQuota.getOrElse(clientId, defaultQuota) + private[server] def quota(clientId: String): Quota = overriddenQuota.getOrDefault(clientId, defaultQuota) /* * This function either returns the sensors for a given client id or creates them if they don't exist @@ -172,8 +168,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, private def getOrCreateQuotaSensors(clientId: String): ClientSensors = { // Names of the sensors to access - val quotaSensorName = apiKey + "-" + clientId - val throttleTimeSensorName = apiKey + "ThrottleTime-" + clientId + val quotaSensorName = getQuotaSensorName(clientId) + val throttleTimeSensorName = getThrottleTimeSensorName(clientId) var quotaSensor: Sensor = null var throttleTimeSensor: Sensor = null @@ -231,6 +227,14 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, ClientSensors(quotaSensor, throttleTimeSensor) } + private def getThrottleTimeSensorName(clientId: String): String = { + apiKey + "ThrottleTime-" + clientId + } + + private def getQuotaSensorName(clientId: String): String = { + apiKey + "-" + clientId + } + private def getQuotaMetricConfig(quota: Quota): MetricConfig = { new MetricConfig() .timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS) @@ -238,21 +242,37 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, .quota(quota) } - /* Construct a Map of (clientId -> Quota) - * The input config is specified as a comma-separated K=V pairs + /** + * Overrides quotas per clientId + * @param clientId client to override + * @param quota custom quota to apply */ - private def initQuotaMap(input: String): Map[String, Quota] = { - // If empty input, return an empty map - if (input.trim.length == 0) - Map[String, Quota]() - else - input.split(",").map(entry => { - val trimmedEntry = entry.trim - val pair: Array[String] = trimmedEntry.split("=") - if (pair.length != 2) - throw new IllegalArgumentException("Incorrectly formatted override entry (%s). Format is k1=v1,k2=v2".format(entry)) - pair(0) -> new Quota(pair(1).toDouble, true) - }).toMap + def updateQuota(clientId: String, quota: Quota) = { + /* + * Acquire the write lock to apply changes in the quota objects. + * This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object. + * The write lock prevents quota update and creation at the same time. It also guards against concurrent quota change + * notifications + */ + lock.writeLock().lock() + try { + logger.info(s"Changing quota for clientId $clientId to ${quota.bound()}") + + if (quota.equals(defaultQuota)) + this.overriddenQuota.remove(clientId) + else + this.overriddenQuota.put(clientId, quota) + + // Change the underlying metric config if the sensor has been created + val allMetrics = metrics.metrics() + val quotaMetricName = clientRateMetricName(clientId) + if (allMetrics.containsKey(quotaMetricName)) { + logger.info(s"Sensor for clientId $clientId already exists. Changing quota to ${quota.bound()} in MetricConfig") + allMetrics.get(quotaMetricName).config(getQuotaMetricConfig(quota)) + } + } finally { + lock.writeLock().unlock() + } } private def clientRateMetricName(clientId: String): MetricName = { diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 8347a69a34c38..58d5a5f6022fc 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -21,7 +21,8 @@ import java.util.Properties import kafka.common.TopicAndPartition import kafka.log.{Log, LogConfig, LogManager} -import kafka.utils.Pool +import kafka.api.RequestKeys +import org.apache.kafka.common.metrics.Quota import scala.collection.mutable @@ -36,7 +37,7 @@ trait ConfigHandler { * The TopicConfigHandler will process topic config changes in ZK. * The callback provides the topic name and the full properties set read from ZK */ -class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler{ +class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler { def processConfigChanges(topic : String, topicConfig : Properties) { val logs: mutable.Buffer[(TopicAndPartition, Log)] = logManager.logsByTopicPartition.toBuffer @@ -55,15 +56,27 @@ class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandl } } +object ClientConfigOverride { + val ProducerOverride = "producer_byte_rate" + val ConsumerOverride = "consumer_byte_rate" +} + /** * The ClientIdConfigHandler will process clientId config changes in ZK. * The callback provides the clientId and the full properties set read from ZK. - * This implementation does nothing currently. In the future, it will change quotas per client + * This implementation reports the overrides to the respective ClientQuotaManager objects */ -class ClientIdConfigHandler extends ConfigHandler { - val configPool = new Pool[String, Properties]() +class ClientIdConfigHandler(private val quotaManagers: Map[Short, ClientQuotaManager]) extends ConfigHandler { - def processConfigChanges(clientId : String, clientConfig : Properties): Unit = { - configPool.put(clientId, clientConfig) + def processConfigChanges(clientId : String, clientConfig : Properties) = { + if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) { + quotaManagers.get(RequestKeys.ProduceKey).get.updateQuota(clientId, + new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toInt, true)) + } + + if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) { + quotaManagers.get(RequestKeys.FetchKey).get.updateQuota(clientId, + new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toInt, true)) + } } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala index d443a1feb9a49..82e7e1923ecf0 100644 --- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -138,7 +138,9 @@ class DynamicConfigManager(private val zkUtils: ZkUtils, case Some(value: String) => value case _ => throw new IllegalArgumentException("Config change notification does not specify 'entity_name'. Received: " + json) } - configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkUtils, entityType, entity)) + val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, entityType, entity) + logger.info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig") + configHandler(entityType).processConfigChanges(entity, entityConfig) case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" + "{\"version\" : 1," + diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index c80bd4683c300..a1e6627fd8522 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -19,6 +19,8 @@ package kafka.server import java.nio.ByteBuffer +import org.apache.kafka.common.TopicPartition +import kafka.api._ import kafka.admin.AdminUtils import kafka.api._ import kafka.common._ @@ -55,7 +57,7 @@ class KafkaApis(val requestChannel: RequestChannel, this.logIdent = "[KafkaApi-%d] ".format(brokerId) // Store all the quota managers for each type of request - private val quotaManagers = instantiateQuotaManagers(config) + val quotaManagers: immutable.Map[Short, ClientQuotaManager] = instantiateQuotaManagers(config) /** * Top-level method that handles all requests and multiplexes to the right api @@ -781,22 +783,22 @@ class KafkaApis(val requestChannel: RequestChannel, /* * Returns a Map of all quota managers configured. The request Api key is the key for the Map */ - private def instantiateQuotaManagers(cfg: KafkaConfig): Map[Short, ClientQuotaManager] = { + private def instantiateQuotaManagers(cfg: KafkaConfig): immutable.Map[Short, ClientQuotaManager] = { val producerQuotaManagerCfg = ClientQuotaManagerConfig( quotaBytesPerSecondDefault = cfg.producerQuotaBytesPerSecondDefault, - quotaBytesPerSecondOverrides = cfg.producerQuotaBytesPerSecondOverrides, + // quotaBytesPerSecondOverrides = cfg.producerQuotaBytesPerSecondOverrides, numQuotaSamples = cfg.numQuotaSamples, quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds ) val consumerQuotaManagerCfg = ClientQuotaManagerConfig( quotaBytesPerSecondDefault = cfg.consumerQuotaBytesPerSecondDefault, - quotaBytesPerSecondOverrides = cfg.consumerQuotaBytesPerSecondOverrides, + // quotaBytesPerSecondOverrides = cfg.consumerQuotaBytesPerSecondOverrides, numQuotaSamples = cfg.numQuotaSamples, quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds ) - val quotaManagers = Map[Short, ClientQuotaManager]( + val quotaManagers = immutable.Map[Short, ClientQuotaManager]( RequestKeys.ProduceKey -> new ClientQuotaManager(producerQuotaManagerCfg, metrics, RequestKeys.nameForKey(RequestKeys.ProduceKey), new org.apache.kafka.common.utils.SystemTime), RequestKeys.FetchKey -> diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index b054f48089363..5b311e22117bb 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -142,8 +142,6 @@ object Defaults { /** ********* Quota Configuration ***********/ val ProducerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault val ConsumerQuotaBytesPerSecondDefault = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault - val ProducerQuotaBytesPerSecondOverrides = ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides - val ConsumerQuotaBytesPerSecondOverrides = ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides val NumQuotaSamples: Int = ClientQuotaManagerConfig.DefaultNumQuotaSamples val QuotaWindowSizeSeconds: Int = ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds @@ -294,8 +292,6 @@ object KafkaConfig { /** ********* Quota Configuration ***********/ val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default" val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default" - val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides" - val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides" val NumQuotaSamplesProp = "quota.window.num" val QuotaWindowSizeSecondsProp = "quota.window.size.seconds" @@ -468,10 +464,6 @@ object KafkaConfig { /** ********* Quota Configuration ***********/ val ProducerQuotaBytesPerSecondDefaultDoc = "Any producer distinguished by clientId will get throttled if it produces more bytes than this value per-second" val ConsumerQuotaBytesPerSecondDefaultDoc = "Any consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-second" - val ProducerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default producer quota. " + - "Example: clientIdX=10485760,clientIdY=10485760" - val ConsumerQuotaBytesPerSecondOverridesDoc = "Comma separated list of clientId:quotaBytesPerSecond to override the default consumer quota. " + - "Example: clientIdX=10485760,clientIdY=10485760" val NumQuotaSamplesDoc = "The number of samples to retain in memory" val QuotaWindowSizeSecondsDoc = "The time span of each sample" @@ -644,8 +636,6 @@ object KafkaConfig { /** ********* Quota configuration ***********/ .define(ProducerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ProducerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ProducerQuotaBytesPerSecondDefaultDoc) .define(ConsumerQuotaBytesPerSecondDefaultProp, LONG, Defaults.ConsumerQuotaBytesPerSecondDefault, atLeast(1), HIGH, ConsumerQuotaBytesPerSecondDefaultDoc) - .define(ProducerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ProducerQuotaBytesPerSecondOverrides, HIGH, ProducerQuotaBytesPerSecondOverridesDoc) - .define(ConsumerQuotaBytesPerSecondOverridesProp, STRING, Defaults.ConsumerQuotaBytesPerSecondOverrides, HIGH, ConsumerQuotaBytesPerSecondOverridesDoc) .define(NumQuotaSamplesProp, INT, Defaults.NumQuotaSamples, atLeast(1), LOW, NumQuotaSamplesDoc) .define(QuotaWindowSizeSecondsProp, INT, Defaults.QuotaWindowSizeSeconds, atLeast(1), LOW, QuotaWindowSizeSecondsDoc) @@ -846,8 +836,6 @@ case class KafkaConfig (props: java.util.Map[_, _]) extends AbstractConfig(Kafka /** ********* Quota Configuration **************/ val producerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp) val consumerQuotaBytesPerSecondDefault = getLong(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp) - val producerQuotaBytesPerSecondOverrides = getString(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp) - val consumerQuotaBytesPerSecondOverrides = getString(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp) val numQuotaSamples = getInt(KafkaConfig.NumQuotaSamplesProp) val quotaWindowSizeSeconds = getInt(KafkaConfig.QuotaWindowSizeSecondsProp) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 84d48cb319b3b..03e59885b3ec0 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -209,7 +209,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr /* start dynamic config manager */ dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager), - ConfigType.Client -> new ClientIdConfigHandler) + ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers)) dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers) dynamicConfigManager.startup() diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala index 735a3b2564d84..649419d48f04b 100644 --- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala +++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala @@ -48,8 +48,8 @@ class QuotasTest extends KafkaServerTestHarness { overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, "2500") // un-throttled - overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp, producerId2 + "=" + Long.MaxValue) - overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp, consumerId2 + "=" + Long.MaxValue) + //overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp, producerId2 + "=" + Long.MaxValue) + //overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp, consumerId2 + "=" + Long.MaxValue) override def generateConfigs() = { FixedPortTestUtils.createBrokerConfigs(numServers, diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index 75e856a0ba221..c63b84efa3ea6 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -26,8 +26,7 @@ import org.junit.{Assert, Before, Test} class ClientQuotaManagerTest { private val time = new MockTime - private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500, - quotaBytesPerSecondOverrides = "p1=2000,p2=4000") + private val config = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500) var numCallbacks: Int = 0 def callback(delayTimeMs: Int) { @@ -42,6 +41,9 @@ class ClientQuotaManagerTest { @Test def testQuotaParsing() { val clientMetrics = new ClientQuotaManager(config, newMetrics, "producer", time) + clientMetrics.updateQuota("p1", new Quota(2000, true)); + clientMetrics.updateQuota("p2", new Quota(4000, true)); + try { Assert.assertEquals("Default producer quota should be 500", new Quota(500, true), clientMetrics.quota("random-client-id")) @@ -49,6 +51,16 @@ class ClientQuotaManagerTest { new Quota(2000, true), clientMetrics.quota("p1")) Assert.assertEquals("Should return the overridden value (4000)", new Quota(4000, true), clientMetrics.quota("p2")) + + // change quota again + clientMetrics.updateQuota("p1", new Quota(3000, true)); + Assert.assertEquals("Should return the newly overridden value (3000)", + new Quota(3000, true), clientMetrics.quota("p1")) + + // Change back to default + clientMetrics.updateQuota("p1", new Quota(500, true)); + Assert.assertEquals("Should return the default value (500)", + new Quota(500, true), clientMetrics.quota("p1")) } finally { clientMetrics.shutdown() } @@ -102,58 +114,6 @@ class ClientQuotaManagerTest { } } - @Test - def testOverrideParse() { - var testConfig = ClientQuotaManagerConfig() - var clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time) - - try { - // Case 1 - Default config - Assert.assertEquals(new Quota(ClientQuotaManagerConfig.QuotaBytesPerSecondDefault, true), - clientMetrics.quota("p1")) - } finally { - clientMetrics.shutdown() - } - - - // Case 2 - Empty override - testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500, - quotaBytesPerSecondOverrides = "p1=2000,p2=4000,,") - - clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time) - try { - Assert.assertEquals(new Quota(2000, true), clientMetrics.quota("p1")) - Assert.assertEquals(new Quota(4000, true), clientMetrics.quota("p2")) - } finally { - clientMetrics.shutdown() - } - - // Case 3 - NumberFormatException for override - testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500, - quotaBytesPerSecondOverrides = "p1=2000,p2=4000,p3=p4") - try { - clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "consumer", time) - Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides) - } - catch { - // Swallow. - case nfe: NumberFormatException => - } - - // Case 4 - IllegalArgumentException for override - testConfig = ClientQuotaManagerConfig(quotaBytesPerSecondDefault = 500, - quotaBytesPerSecondOverrides = "p1=2000=3000") - try { - clientMetrics = new ClientQuotaManager(testConfig, newMetrics, "producer", time) - Assert.fail("Should fail to parse invalid config " + testConfig.quotaBytesPerSecondOverrides) - } - catch { - // Swallow. - case nfe: IllegalArgumentException => - } - - } - def newMetrics: Metrics = { new Metrics(new MetricConfig(), Collections.emptyList(), time) } diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 6061e66567207..23b03c8d1ea35 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -19,6 +19,8 @@ package kafka.server import java.util.Properties import junit.framework.Assert._ +import kafka.api.RequestKeys +import org.apache.kafka.common.metrics.Quota import org.easymock.{Capture, EasyMock} import org.junit.Test import kafka.integration.KafkaServerTestHarness @@ -52,22 +54,26 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { } } - // For now client config changes do not do anything. Simply verify that the call was made @Test - def testClientConfigChange() { + def testClientQuotaConfigChange() { assertTrue("Should contain a ConfigHandler for topics", this.servers(0).dynamicConfigHandlers.contains(ConfigType.Client)) val clientId = "testClient" val props = new Properties() - props.put("a.b", "c") - props.put("x.y", "z") + props.put(ClientConfigOverride.ProducerOverride, "1000") + props.put(ClientConfigOverride.ConsumerOverride, "2000") AdminUtils.changeClientIdConfig(zkUtils, clientId, props) + TestUtils.retry(10000) { val configHandler = this.servers(0).dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler] - assertTrue("ClientId testClient must exist", configHandler.configPool.contains(clientId)) - assertEquals("ClientId testClient must be the only override", 1, configHandler.configPool.size) - assertEquals("c", configHandler.configPool.get(clientId).getProperty("a.b")) - assertEquals("z", configHandler.configPool.get(clientId).getProperty("x.y")) + val quotaManagers: Map[Short, ClientQuotaManager] = servers(0).apis.quotaManagers + val overrideProducerQuota = quotaManagers.get(RequestKeys.ProduceKey).get.quota(clientId) + val overrideConsumerQuota = quotaManagers.get(RequestKeys.FetchKey).get.quota(clientId) + + assertEquals(s"ClientId $clientId must have overridden producer quota of 1000", + Quota.lessThan(1000), overrideProducerQuota) + assertEquals(s"ClientId $clientId must have overridden consumer quota of 2000", + Quota.lessThan(2000), overrideConsumerQuota) } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 1c3e55da98c79..3e277fab2ec54 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -479,8 +479,6 @@ class KafkaConfigTest { case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2") case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") - case KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp => // ignore string - case KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp => // ignore string case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0") From 721e746f3fabc101c4dc812312bb410b08a1493f Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 12 Oct 2015 15:08:39 -0700 Subject: [PATCH 02/10] Minor fixes --- .../kafka/server/ClientQuotaManager.scala | 5 ++- .../scala/kafka/server/ConfigHandler.scala | 4 +- .../main/scala/kafka/server/KafkaApis.scala | 2 - .../integration/kafka/api/QuotasTest.scala | 39 +++++++++++++------ .../kafka/server/ClientQuotaManagerTest.scala | 17 +++++++- 5 files changed, 47 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index f24d70a8e0326..fd6b971740c85 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -159,7 +159,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * Returns the consumer quota for the specified clientId * @return */ - private[server] def quota(clientId: String): Quota = overriddenQuota.getOrDefault(clientId, defaultQuota) + def quota(clientId: String): Quota = overriddenQuota.getOrDefault(clientId, defaultQuota) /* * This function either returns the sensors for a given client id or creates them if they don't exist @@ -250,7 +250,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, def updateQuota(clientId: String, quota: Quota) = { /* * Acquire the write lock to apply changes in the quota objects. - * This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object. + * This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists). + * If the KafkaMetric hasn't been created, the most recent value will be used from the overriddenQuota map. * The write lock prevents quota update and creation at the same time. It also guards against concurrent quota change * notifications */ diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index 58d5a5f6022fc..cd601aa0fec0b 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -71,12 +71,12 @@ class ClientIdConfigHandler(private val quotaManagers: Map[Short, ClientQuotaMan def processConfigChanges(clientId : String, clientConfig : Properties) = { if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) { quotaManagers.get(RequestKeys.ProduceKey).get.updateQuota(clientId, - new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toInt, true)) + new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true)) } if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) { quotaManagers.get(RequestKeys.FetchKey).get.updateQuota(clientId, - new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toInt, true)) + new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true)) } } } \ No newline at end of file diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index a1e6627fd8522..44a2ef98ea8b2 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -786,14 +786,12 @@ class KafkaApis(val requestChannel: RequestChannel, private def instantiateQuotaManagers(cfg: KafkaConfig): immutable.Map[Short, ClientQuotaManager] = { val producerQuotaManagerCfg = ClientQuotaManagerConfig( quotaBytesPerSecondDefault = cfg.producerQuotaBytesPerSecondDefault, - // quotaBytesPerSecondOverrides = cfg.producerQuotaBytesPerSecondOverrides, numQuotaSamples = cfg.numQuotaSamples, quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds ) val consumerQuotaManagerCfg = ClientQuotaManagerConfig( quotaBytesPerSecondDefault = cfg.consumerQuotaBytesPerSecondDefault, - // quotaBytesPerSecondOverrides = cfg.consumerQuotaBytesPerSecondOverrides, numQuotaSamples = cfg.numQuotaSamples, quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds ) diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala index 649419d48f04b..597432e9e0c10 100644 --- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala +++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala @@ -16,17 +16,18 @@ package kafka.api import java.util.Properties -import junit.framework.Assert +import kafka.admin.AdminUtils import kafka.consumer.SimpleConsumer import kafka.integration.KafkaServerTestHarness -import kafka.server.{KafkaServer, KafkaConfig} +import kafka.server._ import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer._ import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.common.MetricName -import org.apache.kafka.common.metrics.KafkaMetric -import org.junit.Assert._ +import org.apache.kafka.common.metrics.{Quota, KafkaMetric} +import org.junit.Assert.assertEquals +import org.junit.Assert.assertTrue import org.junit.{After, Before, Test} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -39,6 +40,7 @@ class QuotasTest extends KafkaServerTestHarness { private val producerId2 = "QuotasTestProducer-2" private val consumerId1 = "QuotasTestConsumer-1" private val consumerId2 = "QuotasTestConsumer-2" + private val EPS = 0.000001 val numServers = 2 val overridingProps = new Properties() @@ -47,10 +49,6 @@ class QuotasTest extends KafkaServerTestHarness { overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp, "8000") overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp, "2500") - // un-throttled - //overridingProps.put(KafkaConfig.ProducerQuotaBytesPerSecondOverridesProp, producerId2 + "=" + Long.MaxValue) - //overridingProps.put(KafkaConfig.ConsumerQuotaBytesPerSecondOverridesProp, consumerId2 + "=" + Long.MaxValue) - override def generateConfigs() = { FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect, @@ -111,6 +109,23 @@ class QuotasTest extends KafkaServerTestHarness { consumers += new KafkaConsumer(consumerProps) replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId2) + // Give effectively unlimited quota for producerId2 and consumerId2 + val props = new Properties() + props.put(ClientConfigOverride.ProducerOverride, Long.MaxValue.toString) + props.put(ClientConfigOverride.ConsumerOverride, Long.MaxValue.toString) + + AdminUtils.changeClientIdConfig(zkClient, producerId2, props) + AdminUtils.changeClientIdConfig(zkClient, consumerId2, props) + + TestUtils.retry(10000) { + val quotaManagers: Map[Short, ClientQuotaManager] = leaderNode.apis.quotaManagers + val overrideProducerQuota = quotaManagers.get(RequestKeys.ProduceKey).get.quota(producerId2) + val overrideConsumerQuota = quotaManagers.get(RequestKeys.FetchKey).get.quota(consumerId2) + + assertEquals(s"ClientId $producerId2 must have unlimited producer quota", Quota.lessThan(Long.MaxValue), overrideProducerQuota) + assertEquals(s"ClientId $consumerId2 must have unlimited consumer quota", Quota.lessThan(Long.MaxValue), overrideConsumerQuota) + } + } @After @@ -132,7 +147,7 @@ class QuotasTest extends KafkaServerTestHarness { RequestKeys.nameForKey(RequestKeys.ProduceKey), "Tracking throttle-time per client", "client-id", producerId1) - Assert.assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0) + assertTrue("Should have been throttled", allMetrics(producerMetricName).value() > 0) // Consumer should read in a bursty manner and get throttled immediately consume(consumers.head, numRecords) @@ -143,7 +158,7 @@ class QuotasTest extends KafkaServerTestHarness { RequestKeys.nameForKey(RequestKeys.FetchKey), "Tracking throttle-time per client", "client-id", consumerId1) - Assert.assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0) + assertTrue("Should have been throttled", allMetrics(consumerMetricName).value() > 0) } @Test @@ -155,7 +170,7 @@ class QuotasTest extends KafkaServerTestHarness { RequestKeys.nameForKey(RequestKeys.ProduceKey), "Tracking throttle-time per client", "client-id", producerId2) - Assert.assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value()) + assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), EPS) // The "client" consumer does not get throttled. consume(consumers(1), numRecords) @@ -166,7 +181,7 @@ class QuotasTest extends KafkaServerTestHarness { RequestKeys.nameForKey(RequestKeys.FetchKey), "Tracking throttle-time per client", "client-id", consumerId2) - Assert.assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value()) + assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), EPS) } def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = { diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index c63b84efa3ea6..ec135032a2dc2 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -41,6 +41,8 @@ class ClientQuotaManagerTest { @Test def testQuotaParsing() { val clientMetrics = new ClientQuotaManager(config, newMetrics, "producer", time) + + // Case 1: Update the quota. Assert that the new quota value is returned clientMetrics.updateQuota("p1", new Quota(2000, true)); clientMetrics.updateQuota("p2", new Quota(4000, true)); @@ -52,15 +54,26 @@ class ClientQuotaManagerTest { Assert.assertEquals("Should return the overridden value (4000)", new Quota(4000, true), clientMetrics.quota("p2")) - // change quota again + // p1 should be throttled using the default quota + var throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 2500 * config.numQuotaSamples, this.callback) + Assert.assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0) + + // Case 2: Change quota again. The quota should be updated within KafkaMetrics as well since the sensor was created. + // p1 should not longer be throttled after the quota change clientMetrics.updateQuota("p1", new Quota(3000, true)); Assert.assertEquals("Should return the newly overridden value (3000)", new Quota(3000, true), clientMetrics.quota("p1")) - // Change back to default + throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, this.callback) + Assert.assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs) + + // Case 3: Change quota back to default. Should be throttled again clientMetrics.updateQuota("p1", new Quota(500, true)); Assert.assertEquals("Should return the default value (500)", new Quota(500, true), clientMetrics.quota("p1")) + + throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, this.callback) + Assert.assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0) } finally { clientMetrics.shutdown() } From da73c0d0b222d036662f2a0a1b32879f5b02d94a Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 12 Oct 2015 15:11:38 -0700 Subject: [PATCH 03/10] Minor change --- core/src/test/scala/integration/kafka/api/QuotasTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala index 597432e9e0c10..3cb23964c0d82 100644 --- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala +++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala @@ -19,7 +19,7 @@ import java.util.Properties import kafka.admin.AdminUtils import kafka.consumer.SimpleConsumer import kafka.integration.KafkaServerTestHarness -import kafka.server._ +import kafka.server.{ClientQuotaManager, ClientConfigOverride, KafkaConfig, KafkaServer} import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} import org.apache.kafka.clients.producer._ From b6defc57935a7f6f77b25b903c05d1c7a3809455 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Sun, 18 Oct 2015 09:48:24 -0700 Subject: [PATCH 04/10] Addressing Jun and Dongs comments --- .../apache/kafka/common/metrics/Quota.java | 4 +- .../kafka/common/metrics/MetricsTest.java | 10 ++--- .../main/scala/kafka/admin/AdminUtils.scala | 7 ++-- .../kafka/server/ClientQuotaManager.scala | 17 +++----- .../scala/kafka/server/ConfigHandler.scala | 5 ++- .../kafka/server/DynamicConfigManager.scala | 4 +- .../main/scala/kafka/server/KafkaApis.scala | 6 +-- .../main/scala/kafka/server/KafkaServer.scala | 8 ++++ .../integration/kafka/api/QuotasTest.scala | 37 ++++++++-------- .../scala/unit/kafka/admin/AdminTest.scala | 36 +++++++++++++++- .../kafka/server/ClientQuotaManagerTest.scala | 42 +++++++++---------- .../server/DynamicConfigChangeTest.scala | 8 ++-- 12 files changed, 111 insertions(+), 73 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java index 235b5994cb907..8431e50ef50a1 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Quota.java @@ -29,11 +29,11 @@ public Quota(double bound, boolean upper) { this.upper = upper; } - public static Quota lessThan(double upperBound) { + public static Quota upperBound(double upperBound) { return new Quota(upperBound, true); } - public static Quota moreThan(double lowerBound) { + public static Quota lowerBound(double lowerBound) { return new Quota(lowerBound, false); } diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java index bd84ebe5873ad..d465c98b6eb60 100644 --- a/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java @@ -320,8 +320,8 @@ public void testDuplicateMetricName() { @Test public void testQuotas() { Sensor sensor = metrics.sensor("test"); - sensor.add(new MetricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lessThan(5.0))); - sensor.add(new MetricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.moreThan(0.0))); + sensor.add(new MetricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.upperBound(5.0))); + sensor.add(new MetricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lowerBound(0.0))); sensor.record(5.0); try { sensor.record(1.0); @@ -341,12 +341,12 @@ public void testQuotas() { @Test public void testQuotasEquality() { - final Quota quota1 = Quota.lessThan(10.5); - final Quota quota2 = Quota.moreThan(10.5); + final Quota quota1 = Quota.upperBound(10.5); + final Quota quota2 = Quota.lowerBound(10.5); assertFalse("Quota with different upper values shouldn't be equal", quota1.equals(quota2)); - final Quota quota3 = Quota.moreThan(10.5); + final Quota quota3 = Quota.lowerBound(10.5); assertTrue("Quota with same upper and bound values should be equal", quota2.equals(quota3)); } diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index ecc5b9dec1515..2472f45d448c6 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -379,16 +379,17 @@ object AdminUtils extends Logging { def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] = zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap + def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] = + ZkUtils.getAllEntitiesWithConfig(zkClient, entityType).map(entity => (entity, fetchEntityConfig(zkClient, entityType, entity))).toMap + def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): TopicMetadata = - fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker]) + fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker]) def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): Set[TopicMetadata] = { val cachedBrokerInfo = new mutable.HashMap[Int, Broker]() topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, cachedBrokerInfo)) } - - private def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils, cachedBrokerInfo: mutable.HashMap[Int, Broker], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): TopicMetadata = { if(zkUtils.pathExists(getTopicPath(topic))) { val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic).get diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index fd6b971740c85..db873c69b86bc 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -52,7 +52,6 @@ object ClientQuotaManagerConfig { // Always have 10 whole windows + 1 current window val DefaultNumQuotaSamples = 11 val DefaultQuotaWindowSizeSeconds = 1 - val MaxThrottleTimeSeconds = 30 // Purge sensors after 1 hour of inactivity val InactiveSensorExpirationTimeSeconds = 3600 } @@ -70,7 +69,7 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, private val apiKey: String, private val time: Time) extends Logging { private val overriddenQuota = new ConcurrentHashMap[String, Quota]() - private val defaultQuota = Quota.lessThan(config.quotaBytesPerSecondDefault) + private val defaultQuota = Quota.upperBound(config.quotaBytesPerSecondDefault) private val lock = new ReentrantReadWriteLock() private val delayQueue = new DelayQueue[ThrottledResponse]() val throttledRequestReaper = new ThrottledRequestReaper(delayQueue) @@ -156,10 +155,10 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } /** - * Returns the consumer quota for the specified clientId - * @return + * Returns the quota for the specified clientId */ - def quota(clientId: String): Quota = overriddenQuota.getOrDefault(clientId, defaultQuota) + def quota(clientId: String): Quota = + if (overriddenQuota.containsKey(clientId)) overriddenQuota.get(clientId) else defaultQuota; /* * This function either returns the sensors for a given client id or creates them if they don't exist @@ -227,13 +226,9 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, ClientSensors(quotaSensor, throttleTimeSensor) } - private def getThrottleTimeSensorName(clientId: String): String = { - apiKey + "ThrottleTime-" + clientId - } + private def getThrottleTimeSensorName(clientId: String): String = apiKey + "ThrottleTime-" + clientId - private def getQuotaSensorName(clientId: String): String = { - apiKey + "-" + clientId - } + private def getQuotaSensorName(clientId: String): String = apiKey + "-" + clientId private def getQuotaMetricConfig(quota: Quota): MetricConfig = { new MetricConfig() diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index cd601aa0fec0b..cb1c9ea91083b 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -25,6 +25,7 @@ import kafka.api.RequestKeys import org.apache.kafka.common.metrics.Quota import scala.collection.mutable +import scala.collection.Map /** * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager @@ -70,12 +71,12 @@ class ClientIdConfigHandler(private val quotaManagers: Map[Short, ClientQuotaMan def processConfigChanges(clientId : String, clientConfig : Properties) = { if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) { - quotaManagers.get(RequestKeys.ProduceKey).get.updateQuota(clientId, + quotaManagers(RequestKeys.ProduceKey).updateQuota(clientId, new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true)) } if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) { - quotaManagers.get(RequestKeys.FetchKey).get.updateQuota(clientId, + quotaManagers(RequestKeys.FetchKey).updateQuota(clientId, new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true)) } } diff --git a/core/src/main/scala/kafka/server/DynamicConfigManager.scala b/core/src/main/scala/kafka/server/DynamicConfigManager.scala index 82e7e1923ecf0..cb4b8f18391fd 100644 --- a/core/src/main/scala/kafka/server/DynamicConfigManager.scala +++ b/core/src/main/scala/kafka/server/DynamicConfigManager.scala @@ -71,7 +71,7 @@ object ConfigType { * */ class DynamicConfigManager(private val zkUtils: ZkUtils, - private val configHandler : Map[String, ConfigHandler], + private val configHandlers: Map[String, ConfigHandler], private val changeExpirationMs: Long = 15*60*1000, private val time: Time = SystemTime) extends Logging { private var lastExecutedChange = -1L @@ -140,7 +140,7 @@ class DynamicConfigManager(private val zkUtils: ZkUtils, } val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, entityType, entity) logger.info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig") - configHandler(entityType).processConfigChanges(entity, entityConfig) + configHandlers(entityType).processConfigChanges(entity, entityConfig) case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" + "{\"version\" : 1," + diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 44a2ef98ea8b2..d746467c5c00b 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -57,7 +57,7 @@ class KafkaApis(val requestChannel: RequestChannel, this.logIdent = "[KafkaApi-%d] ".format(brokerId) // Store all the quota managers for each type of request - val quotaManagers: immutable.Map[Short, ClientQuotaManager] = instantiateQuotaManagers(config) + val quotaManagers: Map[Short, ClientQuotaManager] = instantiateQuotaManagers(config) /** * Top-level method that handles all requests and multiplexes to the right api @@ -783,7 +783,7 @@ class KafkaApis(val requestChannel: RequestChannel, /* * Returns a Map of all quota managers configured. The request Api key is the key for the Map */ - private def instantiateQuotaManagers(cfg: KafkaConfig): immutable.Map[Short, ClientQuotaManager] = { + private def instantiateQuotaManagers(cfg: KafkaConfig): Map[Short, ClientQuotaManager] = { val producerQuotaManagerCfg = ClientQuotaManagerConfig( quotaBytesPerSecondDefault = cfg.producerQuotaBytesPerSecondDefault, numQuotaSamples = cfg.numQuotaSamples, @@ -796,7 +796,7 @@ class KafkaApis(val requestChannel: RequestChannel, quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds ) - val quotaManagers = immutable.Map[Short, ClientQuotaManager]( + val quotaManagers = Map[Short, ClientQuotaManager]( RequestKeys.ProduceKey -> new ClientQuotaManager(producerQuotaManagerCfg, metrics, RequestKeys.nameForKey(RequestKeys.ProduceKey), new org.apache.kafka.common.utils.SystemTime), RequestKeys.FetchKey -> diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 03e59885b3ec0..9087f34188e03 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -210,6 +210,14 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr /* start dynamic config manager */ dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager), ConfigType.Client -> new ClientIdConfigHandler(apis.quotaManagers)) + + // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides + // TODO: Move this logic to DynamicConfigManager + AdminUtils.fetchAllEntityConfigs(zkClient, ConfigType.Client).foreach { + case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties) + } + + // Create the config manager. start listening to notifications dynamicConfigManager = new DynamicConfigManager(zkUtils, dynamicConfigHandlers) dynamicConfigManager.startup() diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala index 3cb23964c0d82..78080b09ef8e6 100644 --- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala +++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala @@ -32,6 +32,7 @@ import org.junit.{After, Before, Test} import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ +import scala.collection.Map import scala.collection.mutable class QuotasTest extends KafkaServerTestHarness { @@ -108,24 +109,6 @@ class QuotasTest extends KafkaServerTestHarness { consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, consumerId2) consumers += new KafkaConsumer(consumerProps) replicaConsumers += new SimpleConsumer("localhost", leaderNode.boundPort(), 1000000, 64*1024, consumerId2) - - // Give effectively unlimited quota for producerId2 and consumerId2 - val props = new Properties() - props.put(ClientConfigOverride.ProducerOverride, Long.MaxValue.toString) - props.put(ClientConfigOverride.ConsumerOverride, Long.MaxValue.toString) - - AdminUtils.changeClientIdConfig(zkClient, producerId2, props) - AdminUtils.changeClientIdConfig(zkClient, consumerId2, props) - - TestUtils.retry(10000) { - val quotaManagers: Map[Short, ClientQuotaManager] = leaderNode.apis.quotaManagers - val overrideProducerQuota = quotaManagers.get(RequestKeys.ProduceKey).get.quota(producerId2) - val overrideConsumerQuota = quotaManagers.get(RequestKeys.FetchKey).get.quota(consumerId2) - - assertEquals(s"ClientId $producerId2 must have unlimited producer quota", Quota.lessThan(Long.MaxValue), overrideProducerQuota) - assertEquals(s"ClientId $consumerId2 must have unlimited consumer quota", Quota.lessThan(Long.MaxValue), overrideConsumerQuota) - } - } @After @@ -163,6 +146,24 @@ class QuotasTest extends KafkaServerTestHarness { @Test def testProducerConsumerOverrideUnthrottled() { + // Give effectively unlimited quota for producerId2 and consumerId2 + val props = new Properties() + props.put(ClientConfigOverride.ProducerOverride, Long.MaxValue.toString) + props.put(ClientConfigOverride.ConsumerOverride, Long.MaxValue.toString) + + AdminUtils.changeClientIdConfig(zkClient, producerId2, props) + AdminUtils.changeClientIdConfig(zkClient, consumerId2, props) + + TestUtils.retry(10000) { + val quotaManagers: Map[Short, ClientQuotaManager] = leaderNode.apis.quotaManagers + val overrideProducerQuota = quotaManagers.get(RequestKeys.ProduceKey).get.quota(producerId2) + val overrideConsumerQuota = quotaManagers.get(RequestKeys.FetchKey).get.quota(consumerId2) + + assertEquals(s"ClientId $producerId2 must have unlimited producer quota", Quota.upperBound(Long.MaxValue), overrideProducerQuota) + assertEquals(s"ClientId $consumerId2 must have unlimited consumer quota", Quota.upperBound(Long.MaxValue), overrideConsumerQuota) + } + + val allMetrics: mutable.Map[MetricName, KafkaMetric] = leaderNode.metrics.metrics().asScala val numRecords = 1000 produce(producers(1), numRecords) diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 52ea580b5439a..44229e11e21c1 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -17,6 +17,8 @@ package kafka.admin import junit.framework.Assert._ +import kafka.api.RequestKeys +import org.apache.kafka.common.metrics.Quota import org.junit.Test import java.util.Properties import kafka.utils._ @@ -28,6 +30,7 @@ import kafka.server.{ConfigType, KafkaServer, KafkaConfig} import java.io.File import TestUtils._ +import scala.collection.{Map, immutable} class AdminTest extends ZooKeeperTestHarness with Logging { @@ -102,7 +105,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging { 10 -> List(1, 2, 3), 11 -> List(1, 3, 4) ) - val leaderForPartitionMap = Map( + val leaderForPartitionMap = immutable.Map( 0 -> 0, 1 -> 1, 2 -> 2, @@ -417,4 +420,35 @@ class AdminTest extends ZooKeeperTestHarness with Logging { server.config.logDirs.foreach(CoreUtils.rm(_)) } } + + /** + * This test simulates a client config change in ZK whose notification has been purged. + * Basically, it asserts that notifications are bootstrapped from ZK + */ + @Test + def testBootstrapClientIdConfig() { + val clientId = "my-client" + val props = new Properties() + props.setProperty("producer_byte_rate", "1000") + props.setProperty("consumer_byte_rate", "2000") + + // Write config without notification to ZK. + val configMap = Map[String, String] ("producer_byte_rate" -> "1000", "consumer_byte_rate" -> "2000") + val map = Map("version" -> 1, "config" -> configMap) + ZkUtils.updatePersistentPath(zkClient, ZkUtils.getEntityConfigPath(ConfigType.Client, clientId), Json.encode(map)) + + val configInZk: Map[String, Properties] = AdminUtils.fetchAllEntityConfigs(zkClient, ConfigType.Client) + assertEquals("Must have 1 overriden client config", 1, configInZk.size) + assertEquals(props, configInZk(clientId)) + + // Test that the existing clientId overrides are read + val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) + try { + assertEquals(new Quota(1000, true), server.apis.quotaManagers(RequestKeys.ProduceKey).quota(clientId)); + assertEquals(new Quota(2000, true), server.apis.quotaManagers(RequestKeys.FetchKey).quota(clientId)); + } finally { + server.shutdown() + server.config.logDirs.foreach(CoreUtils.rm(_)) + } + } } diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index ec135032a2dc2..c5e5687892009 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -21,7 +21,8 @@ import java.util.Collections import org.apache.kafka.common.MetricName import org.apache.kafka.common.metrics.{MetricConfig, Metrics, Quota} import org.apache.kafka.common.utils.MockTime -import org.junit.{Assert, Before, Test} +import org.junit.Assert.{assertEquals, assertTrue} +import org.junit.{Before, Test} class ClientQuotaManagerTest { private val time = new MockTime @@ -47,33 +48,28 @@ class ClientQuotaManagerTest { clientMetrics.updateQuota("p2", new Quota(4000, true)); try { - Assert.assertEquals("Default producer quota should be 500", - new Quota(500, true), clientMetrics.quota("random-client-id")) - Assert.assertEquals("Should return the overridden value (2000)", - new Quota(2000, true), clientMetrics.quota("p1")) - Assert.assertEquals("Should return the overridden value (4000)", - new Quota(4000, true), clientMetrics.quota("p2")) - - // p1 should be throttled using the default quota + assertEquals("Default producer quota should be 500", new Quota(500, true), clientMetrics.quota("random-client-id")) + assertEquals("Should return the overridden value (2000)", new Quota(2000, true), clientMetrics.quota("p1")) + assertEquals("Should return the overridden value (4000)", new Quota(4000, true), clientMetrics.quota("p2")) + + // p1 should be throttled using the overridden quota var throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 2500 * config.numQuotaSamples, this.callback) - Assert.assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0) + assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0) // Case 2: Change quota again. The quota should be updated within KafkaMetrics as well since the sensor was created. // p1 should not longer be throttled after the quota change clientMetrics.updateQuota("p1", new Quota(3000, true)); - Assert.assertEquals("Should return the newly overridden value (3000)", - new Quota(3000, true), clientMetrics.quota("p1")) + assertEquals("Should return the newly overridden value (3000)", new Quota(3000, true), clientMetrics.quota("p1")) throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, this.callback) - Assert.assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs) + assertEquals(s"throttleTimeMs should be 0. was $throttleTimeMs", 0, throttleTimeMs) // Case 3: Change quota back to default. Should be throttled again clientMetrics.updateQuota("p1", new Quota(500, true)); - Assert.assertEquals("Should return the default value (500)", - new Quota(500, true), clientMetrics.quota("p1")) + assertEquals("Should return the default value (500)", new Quota(500, true), clientMetrics.quota("p1")) throttleTimeMs = clientMetrics.recordAndMaybeThrottle("p1", 0, this.callback) - Assert.assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0) + assertTrue(s"throttleTimeMs should be > 0. was $throttleTimeMs", throttleTimeMs > 0) } finally { clientMetrics.shutdown() } @@ -92,8 +88,8 @@ class ClientQuotaManagerTest { clientMetrics.recordAndMaybeThrottle("unknown", 400, callback) time.sleep(1000) } - Assert.assertEquals(10, numCallbacks) - Assert.assertEquals(0, queueSizeMetric.value().toInt) + assertEquals(10, numCallbacks) + assertEquals(0, queueSizeMetric.value().toInt) // Create a spike. // 400*10 + 2000 + 300 = 6300/10.5 = 600 bytes per second. @@ -106,13 +102,13 @@ class ClientQuotaManagerTest { Assert.assertEquals(1, queueSizeMetric.value().toInt) // After a request is delayed, the callback cannot be triggered immediately clientMetrics.throttledRequestReaper.doWork() - Assert.assertEquals(10, numCallbacks) + assertEquals(10, numCallbacks) time.sleep(sleepTime) // Callback can only be triggered after the the delay time passes clientMetrics.throttledRequestReaper.doWork() - Assert.assertEquals(0, queueSizeMetric.value().toInt) - Assert.assertEquals(11, numCallbacks) + assertEquals(0, queueSizeMetric.value().toInt) + assertEquals(11, numCallbacks) // Could continue to see delays until the bursty sample disappears for (i <- 0 until 10) { @@ -120,8 +116,8 @@ class ClientQuotaManagerTest { time.sleep(1000) } - Assert.assertEquals("Should be unthrottled since bursty sample has rolled over", - 0, clientMetrics.recordAndMaybeThrottle("unknown", 0, callback)) + assertEquals("Should be unthrottled since bursty sample has rolled over", + 0, clientMetrics.recordAndMaybeThrottle("unknown", 0, callback)) } finally { clientMetrics.shutdown() } diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 23b03c8d1ea35..0d41b2e70034f 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -29,6 +29,8 @@ import kafka.common._ import kafka.log.LogConfig import kafka.admin.{AdminOperationException, AdminUtils} +import scala.collection.Map + class DynamicConfigChangeTest extends KafkaServerTestHarness { def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) @@ -71,9 +73,9 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { val overrideConsumerQuota = quotaManagers.get(RequestKeys.FetchKey).get.quota(clientId) assertEquals(s"ClientId $clientId must have overridden producer quota of 1000", - Quota.lessThan(1000), overrideProducerQuota) - assertEquals(s"ClientId $clientId must have overridden consumer quota of 2000", - Quota.lessThan(2000), overrideConsumerQuota) + Quota.upperBound(1000), overrideProducerQuota) + assertEquals(s"ClientId $clientId must have overridden consumer quota of 2000", + Quota.upperBound(2000), overrideConsumerQuota) } } From 329a9eed41a2a34a6fe7596714bcd3368697927c Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Sun, 18 Oct 2015 10:13:27 -0700 Subject: [PATCH 05/10] Minor --- .../test/scala/unit/kafka/server/ClientQuotaManagerTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala index c5e5687892009..fadcd5ae3f9fb 100644 --- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala @@ -98,8 +98,8 @@ class ClientQuotaManagerTest { time.sleep(500) val sleepTime = clientMetrics.recordAndMaybeThrottle("unknown", 2300, callback) - Assert.assertEquals("Should be throttled", 2100, sleepTime) - Assert.assertEquals(1, queueSizeMetric.value().toInt) + assertEquals("Should be throttled", 2100, sleepTime) + assertEquals(1, queueSizeMetric.value().toInt) // After a request is delayed, the callback cannot be triggered immediately clientMetrics.throttledRequestReaper.doWork() assertEquals(10, numCallbacks) From ded04973e8d2f6527af403fce354b9fcdea5a796 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Sun, 18 Oct 2015 10:18:08 -0700 Subject: [PATCH 06/10] Indentation changes --- core/src/main/scala/kafka/server/ConfigHandler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala index cb1c9ea91083b..606156a15197c 100644 --- a/core/src/main/scala/kafka/server/ConfigHandler.scala +++ b/core/src/main/scala/kafka/server/ConfigHandler.scala @@ -31,7 +31,7 @@ import scala.collection.Map * The ConfigHandler is used to process config change notifications received by the DynamicConfigManager */ trait ConfigHandler { - def processConfigChanges(entityName : String, value : Properties) + def processConfigChanges(entityName: String, value: Properties) } /** @@ -69,7 +69,7 @@ object ClientConfigOverride { */ class ClientIdConfigHandler(private val quotaManagers: Map[Short, ClientQuotaManager]) extends ConfigHandler { - def processConfigChanges(clientId : String, clientConfig : Properties) = { + def processConfigChanges(clientId: String, clientConfig: Properties) = { if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) { quotaManagers(RequestKeys.ProduceKey).updateQuota(clientId, new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true)) From e2b4daf743eee524a097f1e07678245663bd83ad Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Mon, 19 Oct 2015 23:01:59 -0700 Subject: [PATCH 07/10] Addressing some minor comments from jun --- .../main/java/org/apache/kafka/common/metrics/stats/Avg.java | 2 +- core/src/main/scala/kafka/server/ClientQuotaManager.scala | 3 +-- core/src/test/scala/integration/kafka/api/QuotasTest.scala | 5 ++--- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java index ed6767f369e02..0fe7380fe3762 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/stats/Avg.java @@ -39,7 +39,7 @@ public double combine(List samples, MetricConfig config, long now) { total += s.value; count += s.eventCount; } - return total / count; + return count == 0 ? 0 : total / count; } } diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index db873c69b86bc..82fec73f80a3c 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -119,13 +119,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, // Compute the delay val clientMetric = metrics.metrics().get(clientRateMetricName(clientId)) throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId))) + clientSensors.throttleTimeSensor.record(throttleTimeMs) delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback)) delayQueueSensor.record() // If delayed, add the element to the delayQueue logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs)) } - // If the request is not throttled, a throttleTime of 0 ms is recorded - clientSensors.throttleTimeSensor.record(throttleTimeMs) throttleTimeMs } diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala index 78080b09ef8e6..5d2cd542ded67 100644 --- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala +++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala @@ -41,7 +41,6 @@ class QuotasTest extends KafkaServerTestHarness { private val producerId2 = "QuotasTestProducer-2" private val consumerId1 = "QuotasTestConsumer-1" private val consumerId2 = "QuotasTestConsumer-2" - private val EPS = 0.000001 val numServers = 2 val overridingProps = new Properties() @@ -171,7 +170,7 @@ class QuotasTest extends KafkaServerTestHarness { RequestKeys.nameForKey(RequestKeys.ProduceKey), "Tracking throttle-time per client", "client-id", producerId2) - assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), EPS) + assertEquals("Should not have been throttled", 0.0, allMetrics(producerMetricName).value(), 0.0) // The "client" consumer does not get throttled. consume(consumers(1), numRecords) @@ -182,7 +181,7 @@ class QuotasTest extends KafkaServerTestHarness { RequestKeys.nameForKey(RequestKeys.FetchKey), "Tracking throttle-time per client", "client-id", consumerId2) - assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), EPS) + assertEquals("Should not have been throttled", 0.0, allMetrics(consumerMetricName).value(), 0.0) } def produce(p: KafkaProducer[Array[Byte], Array[Byte]], count: Int): Int = { From 54d9e8c504ae7dc9447610b6a06f80d108eccdb1 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Tue, 20 Oct 2015 20:18:01 -0700 Subject: [PATCH 08/10] Rebasing --- core/src/main/scala/kafka/admin/AdminUtils.scala | 16 ++++++++-------- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../scala/integration/kafka/api/QuotasTest.scala | 4 ++-- .../test/scala/unit/kafka/admin/AdminTest.scala | 4 ++-- .../kafka/server/DynamicConfigChangeTest.scala | 2 +- 5 files changed, 14 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 2472f45d448c6..6fff176cbaaab 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -98,7 +98,7 @@ object AdminUtils extends Logging { /** * Add partitions to existing topic with optional replica assignment * - * @param zkClient Zookeeper client + * @param zkUtils Zookeeper utilities * @param topic Topic for adding partitions to * @param numPartitions Number of partitions to be set * @param replicaAssignmentStr Manual replica assignment @@ -177,7 +177,7 @@ object AdminUtils extends Logging { /** * Delete the whole directory of the given consumer group if the group is inactive. * - * @param zkClient Zookeeper client + * @param zkUtils Zookeeper utilities * @param group Consumer group * @return whether or not we deleted the consumer group information */ @@ -194,7 +194,7 @@ object AdminUtils extends Logging { * Delete the given consumer group's information for the given topic in Zookeeper if the group is inactive. * If the consumer group consumes no other topics, delete the whole consumer group directory. * - * @param zkClient Zookeeper client + * @param zkUtils Zookeeper utilities * @param group Consumer group * @param topic Topic of the consumer group information we wish to delete * @return whether or not we deleted the consumer group information for the given topic @@ -216,7 +216,7 @@ object AdminUtils extends Logging { /** * Delete every inactive consumer group's information about the given topic in Zookeeper. * - * @param zkClient Zookeeper client + * @param zkUtils Zookeeper utilities * @param topic Topic of the consumer group information we wish to delete */ def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) { @@ -294,7 +294,7 @@ object AdminUtils extends Logging { /** * Update the config for a client and create a change notification so the change will propagate to other brokers - * @param zkClient: The ZkClient handle used to write the new config to zookeeper + * @param zkUtils Zookeeper utilities used to write the config to ZK * @param clientId: The clientId for which configs are being changed * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or * existing configs need to be deleted, it should be done prior to invoking this API @@ -306,7 +306,7 @@ object AdminUtils extends Logging { /** * Update the config for an existing topic and create a change notification so the change will propagate to other brokers - * @param zkClient: The ZkClient handle used to write the new config to zookeeper + * @param zkUtils Zookeeper utilities used to write the config to ZK * @param topic: The topic for which configs are being changed * @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or * existing configs need to be deleted, it should be done prior to invoking this API @@ -380,10 +380,10 @@ object AdminUtils extends Logging { zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] = - ZkUtils.getAllEntitiesWithConfig(zkClient, entityType).map(entity => (entity, fetchEntityConfig(zkClient, entityType, entity))).toMap + zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): TopicMetadata = - fetchTopicMetadataFromZk(topic, zkClient, new mutable.HashMap[Int, Broker]) + fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker]) def fetchTopicMetadataFromZk(topics: Set[String], zkUtils: ZkUtils): Set[TopicMetadata] = { val cachedBrokerInfo = new mutable.HashMap[Int, Broker]() diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 9087f34188e03..d2a1e61a415f0 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -213,7 +213,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr // Apply all existing client configs to the ClientIdConfigHandler to bootstrap the overrides // TODO: Move this logic to DynamicConfigManager - AdminUtils.fetchAllEntityConfigs(zkClient, ConfigType.Client).foreach { + AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client).foreach { case (clientId, properties) => dynamicConfigHandlers(ConfigType.Client).processConfigChanges(clientId, properties) } diff --git a/core/src/test/scala/integration/kafka/api/QuotasTest.scala b/core/src/test/scala/integration/kafka/api/QuotasTest.scala index 5d2cd542ded67..649c9277eacbe 100644 --- a/core/src/test/scala/integration/kafka/api/QuotasTest.scala +++ b/core/src/test/scala/integration/kafka/api/QuotasTest.scala @@ -150,8 +150,8 @@ class QuotasTest extends KafkaServerTestHarness { props.put(ClientConfigOverride.ProducerOverride, Long.MaxValue.toString) props.put(ClientConfigOverride.ConsumerOverride, Long.MaxValue.toString) - AdminUtils.changeClientIdConfig(zkClient, producerId2, props) - AdminUtils.changeClientIdConfig(zkClient, consumerId2, props) + AdminUtils.changeClientIdConfig(zkUtils, producerId2, props) + AdminUtils.changeClientIdConfig(zkUtils, consumerId2, props) TestUtils.retry(10000) { val quotaManagers: Map[Short, ClientQuotaManager] = leaderNode.apis.quotaManagers diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 44229e11e21c1..0570c793e5d6a 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -435,9 +435,9 @@ class AdminTest extends ZooKeeperTestHarness with Logging { // Write config without notification to ZK. val configMap = Map[String, String] ("producer_byte_rate" -> "1000", "consumer_byte_rate" -> "2000") val map = Map("version" -> 1, "config" -> configMap) - ZkUtils.updatePersistentPath(zkClient, ZkUtils.getEntityConfigPath(ConfigType.Client, clientId), Json.encode(map)) + zkUtils.updatePersistentPath(ZkUtils.getEntityConfigPath(ConfigType.Client, clientId), Json.encode(map)) - val configInZk: Map[String, Properties] = AdminUtils.fetchAllEntityConfigs(zkClient, ConfigType.Client) + val configInZk: Map[String, Properties] = AdminUtils.fetchAllEntityConfigs(zkUtils, ConfigType.Client) assertEquals("Must have 1 overriden client config", 1, configInZk.size) assertEquals(props, configInZk(clientId)) diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 0d41b2e70034f..6b49c4ea2bf8d 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -18,7 +18,7 @@ package kafka.server import java.util.Properties -import junit.framework.Assert._ +import org.junit.Assert._ import kafka.api.RequestKeys import org.apache.kafka.common.metrics.Quota import org.easymock.{Capture, EasyMock} From 3c3d950ad42793c67f65b0cf4456452aa4d2c2e3 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Wed, 21 Oct 2015 13:54:10 -0700 Subject: [PATCH 09/10] Minor --- core/src/main/scala/kafka/server/KafkaApis.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index d746467c5c00b..2494fee22ba7e 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -19,7 +19,6 @@ package kafka.server import java.nio.ByteBuffer -import org.apache.kafka.common.TopicPartition import kafka.api._ import kafka.admin.AdminUtils import kafka.api._ @@ -32,7 +31,6 @@ import kafka.network._ import kafka.network.RequestChannel.{Session, Response} import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write} import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils} -import org.I0Itec.zkclient.ZkClient import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.SecurityProtocol import org.apache.kafka.common.requests.{HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse} From 5c3aa7eedde59b790215ee9a7e948344032324e7 Mon Sep 17 00:00:00 2001 From: Aditya Auradkar Date: Wed, 21 Oct 2015 14:28:38 -0700 Subject: [PATCH 10/10] Minor --- core/src/main/scala/kafka/server/KafkaApis.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 2494fee22ba7e..2ef97304d531f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -19,7 +19,6 @@ package kafka.server import java.nio.ByteBuffer -import kafka.api._ import kafka.admin.AdminUtils import kafka.api._ import kafka.common._