Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,25 @@
package org.apache.kafka.common.metrics;

import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;

/**
* Thrown when a sensor records a value that causes a metric to go outside the bounds configured as its quota
*/
public class QuotaViolationException extends KafkaException {

private static final long serialVersionUID = 1L;
private final MetricName metricName;
private final KafkaMetric metric;
private final double value;
private final double bound;

public QuotaViolationException(MetricName metricName, double value, double bound) {
this.metricName = metricName;
public QuotaViolationException(KafkaMetric metric, double value, double bound) {
this.metric = metric;
this.value = value;
this.bound = bound;
}

public MetricName metricName() {
return metricName;
public KafkaMetric metric() {
return metric;
}

public double value() {
Expand All @@ -51,7 +50,7 @@ public double bound() {
public String toString() {
return getClass().getName()
+ ": '"
+ metricName
+ metric.metricName()
+ "' violated quota. Actual: "
+ value
+ ", Threshold: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,7 @@ public void checkQuotas(long timeMs) {
if (quota != null) {
double value = metric.measurableValue(timeMs);
if (!quota.acceptable(value)) {
throw new QuotaViolationException(metric.metricName(), value,
quota.bound());
throw new QuotaViolationException(metric, value, quota.bound());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,14 @@ public long writeTo(GatheringByteChannel channel) throws IOException {
public long remaining() {
return remaining;
}

@Override
public String toString() {
return "ByteBufferSend(" +
"destination='" + destination + "'" +
", size=" + size +
", remaining=" + remaining +
", pending=" + pending +
')';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ public Map<TopicPartition, RecordConversionStats> recordConversionStats() {
return recordConversionStats;
}

@Override
public String toString() {
return "MultiRecordsSend(" +
"dest='" + dest + "'" +
", size=" + size +
", totalWritten=" + totalWritten +
')';
}

private void updateRecordConversionStats(Send completedSend) {
// The underlying send might have accumulated statistics that need to be recorded. For example,
// LazyDownConversionRecordsSend accumulates statistics related to the number of bytes down-converted, the amount
Expand Down
41 changes: 19 additions & 22 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ object RequestChannel extends Logging {
@volatile var apiLocalCompleteTimeNanos = -1L
@volatile var responseCompleteTimeNanos = -1L
@volatile var responseDequeueTimeNanos = -1L
@volatile var apiRemoteCompleteTimeNanos = -1L
@volatile var messageConversionsTimeNanos = 0L
@volatile var apiThrottleTimeMs = 0L
@volatile var temporaryMemoryBytes = 0L
@volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None

Expand Down Expand Up @@ -170,16 +170,6 @@ object RequestChannel extends Logging {

def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = {
val endTimeNanos = Time.SYSTEM.nanoseconds
// In some corner cases, apiLocalCompleteTimeNanos may not be set when the request completes if the remote
// processing time is really small. This value is set in KafkaApis from a request handling thread.
// This may be read in a network thread before the actual update happens in KafkaApis which will cause us to
// see a negative value here. In that case, use responseCompleteTimeNanos as apiLocalCompleteTimeNanos.
if (apiLocalCompleteTimeNanos < 0)
apiLocalCompleteTimeNanos = responseCompleteTimeNanos
// If the apiRemoteCompleteTimeNanos is not set (i.e., for requests that do not go through a purgatory), then it is
// the same as responseCompleteTimeNanos.
if (apiRemoteCompleteTimeNanos < 0)
apiRemoteCompleteTimeNanos = responseCompleteTimeNanos

/**
* Converts nanos to millis with micros precision as additional decimal places in the request log have low
Expand All @@ -193,8 +183,7 @@ object RequestChannel extends Logging {

val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos)
val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos)
val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos)
val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos)
val apiRemoteTimeMs = nanosToMs(responseCompleteTimeNanos - apiLocalCompleteTimeNanos)
val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos)
val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos)
val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos)
Expand All @@ -215,7 +204,7 @@ object RequestChannel extends Logging {
m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
m.localTimeHist.update(Math.round(apiLocalTimeMs))
m.remoteTimeHist.update(Math.round(apiRemoteTimeMs))
m.throttleTimeHist.update(Math.round(apiThrottleTimeMs))
m.throttleTimeHist.update(apiThrottleTimeMs)
m.responseQueueTimeHist.update(Math.round(responseQueueTimeMs))
m.responseSendTimeHist.update(Math.round(responseSendTimeMs))
m.totalTimeHist.update(Math.round(totalTimeMs))
Expand Down Expand Up @@ -276,12 +265,6 @@ object RequestChannel extends Logging {
}

abstract class Response(val request: Request) {
locally {
val nowNs = Time.SYSTEM.nanoseconds
request.responseCompleteTimeNanos = nowNs
if (request.apiLocalCompleteTimeNanos == -1L)
request.apiLocalCompleteTimeNanos = nowNs
}

def processor: Int = request.processor

Expand Down Expand Up @@ -326,7 +309,7 @@ object RequestChannel extends Logging {
}
}

class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup {
class RequestChannel(val queueSize: Int, val metricNamePrefix : String, time: Time) extends KafkaMetricsGroup {
import RequestChannel._
val metrics = new RequestChannel.Metrics
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
Expand Down Expand Up @@ -362,6 +345,7 @@ class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends

/** Send a response back to the socket server to be sent over the network */
def sendResponse(response: RequestChannel.Response): Unit = {

if (isTraceEnabled) {
val requestHeader = response.request.header
val message = response match {
Expand All @@ -379,6 +363,18 @@ class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends
trace(message)
}

response match {
// We should only send one of the following per request
case _: SendResponse | _: NoOpResponse | _: CloseConnectionResponse =>
val request = response.request
val timeNanos = time.nanoseconds()
request.responseCompleteTimeNanos = timeNanos
if (request.apiLocalCompleteTimeNanos == -1L)
request.apiLocalCompleteTimeNanos = timeNanos
// For a given request, these may happen in addition to one in the previous section, skip updating the metrics
case _: StartThrottlingResponse | _: EndThrottlingResponse => ()
}

val processor = processors.get(response.processor)
// The processor may be null if it was shutdown. In this case, the connections
// are closed, so the response is dropped.
Expand Down Expand Up @@ -444,7 +440,8 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
val localTimeHist = newHistogram(LocalTimeMs, biased = true, tags)
// time a request takes to wait on remote brokers (currently only relevant to fetch and produce requests)
val remoteTimeHist = newHistogram(RemoteTimeMs, biased = true, tags)
// time a request is throttled
// time a request is throttled, not part of the request processing time (throttling is done at the client level
// for clients that support KIP-219 and by muting the channel for the rest)
val throttleTimeHist = newHistogram(ThrottleTimeMs, biased = true, tags)
// time a response spent in a response queue
val responseQueueTimeHist = newHistogram(ResponseQueueTimeMs, biased = true, tags)
Expand Down
13 changes: 5 additions & 8 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import java.util
import java.util.Optional
import java.util.concurrent._
import java.util.concurrent.atomic._
import java.util.function.Supplier

import kafka.cluster.{BrokerEndPoint, EndPoint}
import kafka.metrics.KafkaMetricsGroup
Expand Down Expand Up @@ -92,11 +91,12 @@ class SocketServer(val config: KafkaConfig,
// data-plane
private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix)
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time)
// control-plane
private var controlPlaneProcessorOpt : Option[Processor] = None
private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, ControlPlaneMetricPrefix))
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
new RequestChannel(20, ControlPlaneMetricPrefix, time))

private var nextProcessorId = 0
private var connectionQuotas: ConnectionQuotas = _
Expand Down Expand Up @@ -908,10 +908,6 @@ private[kafka] class Processor(val id: Int,
}
}

private def nowNanosSupplier = new Supplier[java.lang.Long] {
override def get(): java.lang.Long = time.nanoseconds()
}

private def poll(): Unit = {
val pollTimeout = if (newConnections.isEmpty) 300 else 0
try selector.poll(pollTimeout)
Expand All @@ -929,7 +925,8 @@ private[kafka] class Processor(val id: Int,
openOrClosingChannel(receive.source) match {
case Some(channel) =>
val header = RequestHeader.parse(receive.payload)
if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, nowNanosSupplier))
if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive,
() => time.nanoseconds()))
trace(s"Begin re-authentication: $channel")
else {
val nowNanos = time.nanoseconds()
Expand Down
26 changes: 12 additions & 14 deletions core/src/main/scala/kafka/server/ClientQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -250,18 +250,16 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
}

def recordAndGetThrottleTimeMs(session: Session, clientId: String, value: Double, timeMs: Long): Int = {
var throttleTimeMs = 0
val clientSensors = getOrCreateQuotaSensors(session, clientId)
try {
clientSensors.quotaSensor.record(value, timeMs)
0
} catch {
case _: QuotaViolationException =>
// Compute the delay
val clientMetric = metrics.metrics().get(clientRateMetricName(clientSensors.metricTags))
throttleTimeMs = throttleTime(clientMetric).toInt
debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs))
case e: QuotaViolationException =>
val throttleTimeMs = throttleTime(e.value, e.bound, windowSize(e.metric, timeMs)).toInt
debug(s"Quota violated for sensor (${clientSensors.quotaSensor.name}). Delay time: ($throttleTimeMs)")
throttleTimeMs
}
throttleTimeMs
}

/** "Unrecord" the given value that has already been recorded for the given user/client by recording a negative value
Expand Down Expand Up @@ -337,16 +335,16 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig,
* we need to add a delay of X to W such that O * W / (W + X) = T.
* Solving for X, we get X = (O - T)/T * W.
*/
protected def throttleTime(clientMetric: KafkaMetric): Long = {
val config = clientMetric.config
val rateMetric: Rate = measurableAsRate(clientMetric.metricName(), clientMetric.measurable())
val quota = config.quota()
val difference = clientMetric.metricValue.asInstanceOf[Double] - quota.bound
protected def throttleTime(quotaValue: Double, quotaBound: Double, windowSize: Long): Long = {
val difference = quotaValue - quotaBound
// Use the precise window used by the rate calculation
val throttleTimeMs = difference / quota.bound * rateMetric.windowSize(config, time.milliseconds())
throttleTimeMs.round
val throttleTimeMs = difference / quotaBound * windowSize
Math.round(throttleTimeMs)
}

private def windowSize(metric: KafkaMetric, timeMs: Long): Long =
measurableAsRate(metric.metricName, metric.measurable).windowSize(metric.config, timeMs)

// Casting to Rate because we only use Rate in Quota computation
private def measurableAsRate(name: MetricName, measurable: Measurable): Rate = {
measurable match {
Expand Down
13 changes: 4 additions & 9 deletions core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,12 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
* @param request client request
* @return Number of milliseconds to throttle in case of quota violation. Zero otherwise
*/
def maybeRecordAndGetThrottleTimeMs(request: RequestChannel.Request): Int = {
if (request.apiRemoteCompleteTimeNanos == -1) {
// When this callback is triggered, the remote API call has completed
request.apiRemoteCompleteTimeNanos = time.nanoseconds
}

def maybeRecordAndGetThrottleTimeMs(request: RequestChannel.Request, timeMs: Long): Int = {
if (quotasEnabled) {
request.recordNetworkThreadTimeCallback = Some(timeNanos => recordNoThrottle(
getOrCreateQuotaSensors(request.session, request.header.clientId), nanosToPercentage(timeNanos)))
recordAndGetThrottleTimeMs(request.session, request.header.clientId,
nanosToPercentage(request.requestThreadTimeNanos), time.milliseconds())
nanosToPercentage(request.requestThreadTimeNanos), timeMs)
} else {
0
}
Expand All @@ -69,8 +64,8 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig,
}
}

override protected def throttleTime(clientMetric: KafkaMetric): Long = {
math.min(super.throttleTime(clientMetric), maxThrottleTimeMs)
override protected def throttleTime(quotaValue: Double, quotaBound: Double, windowSize: Long): Long = {
math.min(super.throttleTime(quotaValue, quotaBound, windowSize), maxThrottleTimeMs)
}

override protected def clientRateMetricName(quotaMetricTags: Map[String, String]): MetricName = {
Expand Down
Loading