Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ public Quota(double bound, boolean upper) {
this.upper = upper;
}

public static Quota lessThan(double upperBound) {
public static Quota upperBound(double upperBound) {
return new Quota(upperBound, true);
}

public static Quota moreThan(double lowerBound) {
public static Quota lowerBound(double lowerBound) {
return new Quota(lowerBound, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public double combine(List<Sample> samples, MetricConfig config, long now) {
total += s.value;
count += s.eventCount;
}
return total / count;
return count == 0 ? 0 : total / count;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,8 @@ public void testDuplicateMetricName() {
@Test
public void testQuotas() {
Sensor sensor = metrics.sensor("test");
sensor.add(new MetricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lessThan(5.0)));
sensor.add(new MetricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.moreThan(0.0)));
sensor.add(new MetricName("test1.total", "grp1"), new Total(), new MetricConfig().quota(Quota.upperBound(5.0)));
sensor.add(new MetricName("test2.total", "grp1"), new Total(), new MetricConfig().quota(Quota.lowerBound(0.0)));
sensor.record(5.0);
try {
sensor.record(1.0);
Expand All @@ -341,12 +341,12 @@ public void testQuotas() {

@Test
public void testQuotasEquality() {
final Quota quota1 = Quota.lessThan(10.5);
final Quota quota2 = Quota.moreThan(10.5);
final Quota quota1 = Quota.upperBound(10.5);
final Quota quota2 = Quota.lowerBound(10.5);

assertFalse("Quota with different upper values shouldn't be equal", quota1.equals(quota2));

final Quota quota3 = Quota.moreThan(10.5);
final Quota quota3 = Quota.lowerBound(10.5);

assertTrue("Quota with same upper and bound values should be equal", quota2.equals(quota3));
}
Expand Down
17 changes: 9 additions & 8 deletions core/src/main/scala/kafka/admin/AdminUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ object AdminUtils extends Logging {
/**
* Add partitions to existing topic with optional replica assignment
*
* @param zkClient Zookeeper client
* @param zkUtils Zookeeper utilities
* @param topic Topic for adding partitions to
* @param numPartitions Number of partitions to be set
* @param replicaAssignmentStr Manual replica assignment
Expand Down Expand Up @@ -177,7 +177,7 @@ object AdminUtils extends Logging {
/**
* Delete the whole directory of the given consumer group if the group is inactive.
*
* @param zkClient Zookeeper client
* @param zkUtils Zookeeper utilities
* @param group Consumer group
* @return whether or not we deleted the consumer group information
*/
Expand All @@ -194,7 +194,7 @@ object AdminUtils extends Logging {
* Delete the given consumer group's information for the given topic in Zookeeper if the group is inactive.
* If the consumer group consumes no other topics, delete the whole consumer group directory.
*
* @param zkClient Zookeeper client
* @param zkUtils Zookeeper utilities
* @param group Consumer group
* @param topic Topic of the consumer group information we wish to delete
* @return whether or not we deleted the consumer group information for the given topic
Expand All @@ -216,7 +216,7 @@ object AdminUtils extends Logging {
/**
* Delete every inactive consumer group's information about the given topic in Zookeeper.
*
* @param zkClient Zookeeper client
* @param zkUtils Zookeeper utilities
* @param topic Topic of the consumer group information we wish to delete
*/
def deleteAllConsumerGroupInfoForTopicInZK(zkUtils: ZkUtils, topic: String) {
Expand Down Expand Up @@ -294,7 +294,7 @@ object AdminUtils extends Logging {

/**
* Update the config for a client and create a change notification so the change will propagate to other brokers
* @param zkClient: The ZkClient handle used to write the new config to zookeeper
* @param zkUtils Zookeeper utilities used to write the config to ZK
* @param clientId: The clientId for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
Expand All @@ -306,7 +306,7 @@ object AdminUtils extends Logging {

/**
* Update the config for an existing topic and create a change notification so the change will propagate to other brokers
* @param zkClient: The ZkClient handle used to write the new config to zookeeper
* @param zkUtils Zookeeper utilities used to write the config to ZK
* @param topic: The topic for which configs are being changed
* @param configs: The final set of configs that will be applied to the topic. If any new configs need to be added or
* existing configs need to be deleted, it should be done prior to invoking this API
Expand Down Expand Up @@ -379,6 +379,9 @@ object AdminUtils extends Logging {
def fetchAllTopicConfigs(zkUtils: ZkUtils): Map[String, Properties] =
zkUtils.getAllTopics().map(topic => (topic, fetchEntityConfig(zkUtils, ConfigType.Topic, topic))).toMap

def fetchAllEntityConfigs(zkUtils: ZkUtils, entityType: String): Map[String, Properties] =
zkUtils.getAllEntitiesWithConfig(entityType).map(entity => (entity, fetchEntityConfig(zkUtils, entityType, entity))).toMap

def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils): TopicMetadata =
fetchTopicMetadataFromZk(topic, zkUtils, new mutable.HashMap[Int, Broker])

Expand All @@ -387,8 +390,6 @@ object AdminUtils extends Logging {
topics.map(topic => fetchTopicMetadataFromZk(topic, zkUtils, cachedBrokerInfo))
}



private def fetchTopicMetadataFromZk(topic: String, zkUtils: ZkUtils, cachedBrokerInfo: mutable.HashMap[Int, Broker], protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT): TopicMetadata = {
if(zkUtils.pathExists(getTopicPath(topic))) {
val topicPartitionAssignment = zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic).get
Expand Down
73 changes: 44 additions & 29 deletions core/src/main/scala/kafka/server/ClientQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package kafka.server

import java.util.concurrent.{DelayQueue, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}

import kafka.utils.{ShutdownableThread, Logging}
import org.apache.kafka.common.MetricName
Expand All @@ -36,27 +36,22 @@ private case class ClientSensors(quotaSensor: Sensor, throttleTimeSensor: Sensor
/**
* Configuration settings for quota management
* @param quotaBytesPerSecondDefault The default bytes per second quota allocated to any client
* @param quotaBytesPerSecondOverrides The comma separated overrides per client. "c1=X,c2=Y"
* @param numQuotaSamples The number of samples to retain in memory
* @param quotaWindowSizeSeconds The time span of each sample
*
*/
case class ClientQuotaManagerConfig(quotaBytesPerSecondDefault: Long =
ClientQuotaManagerConfig.QuotaBytesPerSecondDefault,
quotaBytesPerSecondOverrides: String =
ClientQuotaManagerConfig.QuotaBytesPerSecondOverrides,
numQuotaSamples: Int =
ClientQuotaManagerConfig.DefaultNumQuotaSamples,
quotaWindowSizeSeconds: Int =
ClientQuotaManagerConfig.DefaultQuotaWindowSizeSeconds)

object ClientQuotaManagerConfig {
val QuotaBytesPerSecondDefault = Long.MaxValue
val QuotaBytesPerSecondOverrides = ""
// Always have 10 whole windows + 1 current window
val DefaultNumQuotaSamples = 11
val DefaultQuotaWindowSizeSeconds = 1
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove MaxThrottleTimeSeconds since it is not used?

val MaxThrottleTimeSeconds = 30
// Purge sensors after 1 hour of inactivity
val InactiveSensorExpirationTimeSeconds = 3600
}
Expand All @@ -73,8 +68,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private val metrics: Metrics,
private val apiKey: String,
private val time: Time) extends Logging {
private val overriddenQuota = initQuotaMap(config.quotaBytesPerSecondOverrides)
private val defaultQuota = Quota.lessThan(config.quotaBytesPerSecondDefault)
private val overriddenQuota = new ConcurrentHashMap[String, Quota]()
private val defaultQuota = Quota.upperBound(config.quotaBytesPerSecondDefault)
private val lock = new ReentrantReadWriteLock()
private val delayQueue = new DelayQueue[ThrottledResponse]()
val throttledRequestReaper = new ThrottledRequestReaper(delayQueue)
Expand Down Expand Up @@ -124,13 +119,12 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
// Compute the delay
val clientMetric = metrics.metrics().get(clientRateMetricName(clientId))
throttleTimeMs = throttleTime(clientMetric, getQuotaMetricConfig(quota(clientId)))
clientSensors.throttleTimeSensor.record(throttleTimeMs)
delayQueue.add(new ThrottledResponse(time, throttleTimeMs, callback))
delayQueueSensor.record()
// If delayed, add the element to the delayQueue
logger.debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
}
// If the request is not throttled, a throttleTime of 0 ms is recorded
clientSensors.throttleTimeSensor.record(throttleTimeMs)
throttleTimeMs
}

Expand Down Expand Up @@ -160,10 +154,10 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}

/**
* Returns the consumer quota for the specified clientId
* @return
* Returns the quota for the specified clientId
*/
private[server] def quota(clientId: String): Quota = overriddenQuota.getOrElse(clientId, defaultQuota)
def quota(clientId: String): Quota =
if (overriddenQuota.containsKey(clientId)) overriddenQuota.get(clientId) else defaultQuota;

/*
* This function either returns the sensors for a given client id or creates them if they don't exist
Expand All @@ -172,8 +166,8 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
private def getOrCreateQuotaSensors(clientId: String): ClientSensors = {

// Names of the sensors to access
val quotaSensorName = apiKey + "-" + clientId
val throttleTimeSensorName = apiKey + "ThrottleTime-" + clientId
val quotaSensorName = getQuotaSensorName(clientId)
val throttleTimeSensorName = getThrottleTimeSensorName(clientId)
var quotaSensor: Sensor = null
var throttleTimeSensor: Sensor = null

Expand Down Expand Up @@ -231,28 +225,49 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
ClientSensors(quotaSensor, throttleTimeSensor)
}

private def getThrottleTimeSensorName(clientId: String): String = apiKey + "ThrottleTime-" + clientId

private def getQuotaSensorName(clientId: String): String = apiKey + "-" + clientId

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A very minor comment here about style -- when we write one line function in scala in kafka, do we prefer to avoid "{}" and implement function in one line?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we are very consistent about this but I have observed it both ways. I'll clean it up to make it a single line

private def getQuotaMetricConfig(quota: Quota): MetricConfig = {
new MetricConfig()
.timeWindow(config.quotaWindowSizeSeconds, TimeUnit.SECONDS)
.samples(config.numQuotaSamples)
.quota(quota)
}

/* Construct a Map of (clientId -> Quota)
* The input config is specified as a comma-separated K=V pairs
/**
* Overrides quotas per clientId
* @param clientId client to override
* @param quota custom quota to apply
*/
private def initQuotaMap(input: String): Map[String, Quota] = {
// If empty input, return an empty map
if (input.trim.length == 0)
Map[String, Quota]()
else
input.split(",").map(entry => {
val trimmedEntry = entry.trim
val pair: Array[String] = trimmedEntry.split("=")
if (pair.length != 2)
throw new IllegalArgumentException("Incorrectly formatted override entry (%s). Format is k1=v1,k2=v2".format(entry))
pair(0) -> new Quota(pair(1).toDouble, true)
}).toMap
def updateQuota(clientId: String, quota: Quota) = {
/*
* Acquire the write lock to apply changes in the quota objects.
* This method changes the quota in the overriddenQuota map and applies the update on the actual KafkaMetric object (if it exists).
* If the KafkaMetric hasn't been created, the most recent value will be used from the overriddenQuota map.
* The write lock prevents quota update and creation at the same time. It also guards against concurrent quota change
* notifications
*/
lock.writeLock().lock()
try {
logger.info(s"Changing quota for clientId $clientId to ${quota.bound()}")

if (quota.equals(defaultQuota))
this.overriddenQuota.remove(clientId)
else
this.overriddenQuota.put(clientId, quota)

// Change the underlying metric config if the sensor has been created
val allMetrics = metrics.metrics()
val quotaMetricName = clientRateMetricName(clientId)
if (allMetrics.containsKey(quotaMetricName)) {
logger.info(s"Sensor for clientId $clientId already exists. Changing quota to ${quota.bound()} in MetricConfig")
allMetrics.get(quotaMetricName).config(getQuotaMetricConfig(quota))
}
} finally {
lock.writeLock().unlock()
}
}

private def clientRateMetricName(clientId: String): MetricName = {
Expand Down
30 changes: 22 additions & 8 deletions core/src/main/scala/kafka/server/ConfigHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,24 @@ import java.util.Properties

import kafka.common.TopicAndPartition
import kafka.log.{Log, LogConfig, LogManager}
import kafka.utils.Pool
import kafka.api.RequestKeys
import org.apache.kafka.common.metrics.Quota

import scala.collection.mutable
import scala.collection.Map

/**
* The ConfigHandler is used to process config change notifications received by the DynamicConfigManager
*/
trait ConfigHandler {
def processConfigChanges(entityName : String, value : Properties)
def processConfigChanges(entityName: String, value: Properties)
}

/**
* The TopicConfigHandler will process topic config changes in ZK.
* The callback provides the topic name and the full properties set read from ZK
*/
class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler{
class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandler {

def processConfigChanges(topic : String, topicConfig : Properties) {
val logs: mutable.Buffer[(TopicAndPartition, Log)] = logManager.logsByTopicPartition.toBuffer
Expand All @@ -55,15 +57,27 @@ class TopicConfigHandler(private val logManager: LogManager) extends ConfigHandl
}
}

object ClientConfigOverride {
val ProducerOverride = "producer_byte_rate"
val ConsumerOverride = "consumer_byte_rate"
}

/**
* The ClientIdConfigHandler will process clientId config changes in ZK.
* The callback provides the clientId and the full properties set read from ZK.
* This implementation does nothing currently. In the future, it will change quotas per client
* This implementation reports the overrides to the respective ClientQuotaManager objects
*/
class ClientIdConfigHandler extends ConfigHandler {
val configPool = new Pool[String, Properties]()
class ClientIdConfigHandler(private val quotaManagers: Map[Short, ClientQuotaManager]) extends ConfigHandler {

def processConfigChanges(clientId : String, clientConfig : Properties): Unit = {
configPool.put(clientId, clientConfig)
def processConfigChanges(clientId: String, clientConfig: Properties) = {
if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) {
quotaManagers(RequestKeys.ProduceKey).updateQuota(clientId,
new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true))
}

if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) {
quotaManagers(RequestKeys.FetchKey).updateQuota(clientId,
new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true))
}
}
}
6 changes: 4 additions & 2 deletions core/src/main/scala/kafka/server/DynamicConfigManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object ConfigType {
*
*/
class DynamicConfigManager(private val zkUtils: ZkUtils,
private val configHandler : Map[String, ConfigHandler],
private val configHandlers: Map[String, ConfigHandler],
private val changeExpirationMs: Long = 15*60*1000,
private val time: Time = SystemTime) extends Logging {
private var lastExecutedChange = -1L
Expand Down Expand Up @@ -138,7 +138,9 @@ class DynamicConfigManager(private val zkUtils: ZkUtils,
case Some(value: String) => value
case _ => throw new IllegalArgumentException("Config change notification does not specify 'entity_name'. Received: " + json)
}
configHandler(entityType).processConfigChanges(entity, AdminUtils.fetchEntityConfig(zkUtils, entityType, entity))
val entityConfig = AdminUtils.fetchEntityConfig(zkUtils, entityType, entity)
logger.info(s"Processing override for entityType: $entityType, entity: $entity with config: $entityConfig")
configHandlers(entityType).processConfigChanges(entity, entityConfig)

case o => throw new IllegalArgumentException("Config change notification has an unexpected value. The format is:" +
"{\"version\" : 1," +
Expand Down
5 changes: 1 addition & 4 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import kafka.network._
import kafka.network.RequestChannel.{Session, Response}
import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write}
import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.SecurityProtocol
import org.apache.kafka.common.requests.{HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse}
Expand All @@ -55,7 +54,7 @@ class KafkaApis(val requestChannel: RequestChannel,

this.logIdent = "[KafkaApi-%d] ".format(brokerId)
// Store all the quota managers for each type of request
private val quotaManagers = instantiateQuotaManagers(config)
val quotaManagers: Map[Short, ClientQuotaManager] = instantiateQuotaManagers(config)

/**
* Top-level method that handles all requests and multiplexes to the right api
Expand Down Expand Up @@ -784,14 +783,12 @@ class KafkaApis(val requestChannel: RequestChannel,
private def instantiateQuotaManagers(cfg: KafkaConfig): Map[Short, ClientQuotaManager] = {
val producerQuotaManagerCfg = ClientQuotaManagerConfig(
quotaBytesPerSecondDefault = cfg.producerQuotaBytesPerSecondDefault,
quotaBytesPerSecondOverrides = cfg.producerQuotaBytesPerSecondOverrides,
numQuotaSamples = cfg.numQuotaSamples,
quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
)

val consumerQuotaManagerCfg = ClientQuotaManagerConfig(
quotaBytesPerSecondDefault = cfg.consumerQuotaBytesPerSecondDefault,
quotaBytesPerSecondOverrides = cfg.consumerQuotaBytesPerSecondOverrides,
numQuotaSamples = cfg.numQuotaSamples,
quotaWindowSizeSeconds = cfg.quotaWindowSizeSeconds
)
Expand Down
Loading