diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java index 3abf423510021..7068d3115d795 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java @@ -17,7 +17,6 @@ package org.apache.kafka.common.metrics; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.MetricName; /** * Thrown when a sensor records a value that causes a metric to go outside the bounds configured as its quota @@ -25,18 +24,18 @@ public class QuotaViolationException extends KafkaException { private static final long serialVersionUID = 1L; - private final MetricName metricName; + private final KafkaMetric metric; private final double value; private final double bound; - public QuotaViolationException(MetricName metricName, double value, double bound) { - this.metricName = metricName; + public QuotaViolationException(KafkaMetric metric, double value, double bound) { + this.metric = metric; this.value = value; this.bound = bound; } - public MetricName metricName() { - return metricName; + public KafkaMetric metric() { + return metric; } public double value() { @@ -51,7 +50,7 @@ public double bound() { public String toString() { return getClass().getName() + ": '" - + metricName + + metric.metricName() + "' violated quota. Actual: " + value + ", Threshold: " diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index 1402d06dc6c69..209789be0b24e 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -209,8 +209,7 @@ public void checkQuotas(long timeMs) { if (quota != null) { double value = metric.measurableValue(timeMs); if (!quota.acceptable(value)) { - throw new QuotaViolationException(metric.metricName(), value, - quota.bound()); + throw new QuotaViolationException(metric, value, quota.bound()); } } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java index da57647d7f133..4572d5b06e97a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java @@ -68,4 +68,14 @@ public long writeTo(GatheringByteChannel channel) throws IOException { public long remaining() { return remaining; } + + @Override + public String toString() { + return "ByteBufferSend(" + + "destination='" + destination + "'" + + ", size=" + size + + ", remaining=" + remaining + + ", pending=" + pending + + ')'; + } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java index 2e78a17451d92..c75573b963833 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java @@ -118,6 +118,15 @@ public Map recordConversionStats() { return recordConversionStats; } + @Override + public String toString() { + return "MultiRecordsSend(" + + "dest='" + dest + "'" + + ", size=" + size + + ", totalWritten=" + totalWritten + + ')'; + } + private void updateRecordConversionStats(Send completedSend) { // The underlying send might have accumulated statistics that need to be recorded. For example, // LazyDownConversionRecordsSend accumulates statistics related to the number of bytes down-converted, the amount diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 17ea3f3ab2d01..b9fd8a8045b43 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -87,8 +87,8 @@ object RequestChannel extends Logging { @volatile var apiLocalCompleteTimeNanos = -1L @volatile var responseCompleteTimeNanos = -1L @volatile var responseDequeueTimeNanos = -1L - @volatile var apiRemoteCompleteTimeNanos = -1L @volatile var messageConversionsTimeNanos = 0L + @volatile var apiThrottleTimeMs = 0L @volatile var temporaryMemoryBytes = 0L @volatile var recordNetworkThreadTimeCallback: Option[Long => Unit] = None @@ -170,16 +170,6 @@ object RequestChannel extends Logging { def updateRequestMetrics(networkThreadTimeNanos: Long, response: Response): Unit = { val endTimeNanos = Time.SYSTEM.nanoseconds - // In some corner cases, apiLocalCompleteTimeNanos may not be set when the request completes if the remote - // processing time is really small. This value is set in KafkaApis from a request handling thread. - // This may be read in a network thread before the actual update happens in KafkaApis which will cause us to - // see a negative value here. In that case, use responseCompleteTimeNanos as apiLocalCompleteTimeNanos. - if (apiLocalCompleteTimeNanos < 0) - apiLocalCompleteTimeNanos = responseCompleteTimeNanos - // If the apiRemoteCompleteTimeNanos is not set (i.e., for requests that do not go through a purgatory), then it is - // the same as responseCompleteTimeNanos. - if (apiRemoteCompleteTimeNanos < 0) - apiRemoteCompleteTimeNanos = responseCompleteTimeNanos /** * Converts nanos to millis with micros precision as additional decimal places in the request log have low @@ -193,8 +183,7 @@ object RequestChannel extends Logging { val requestQueueTimeMs = nanosToMs(requestDequeueTimeNanos - startTimeNanos) val apiLocalTimeMs = nanosToMs(apiLocalCompleteTimeNanos - requestDequeueTimeNanos) - val apiRemoteTimeMs = nanosToMs(apiRemoteCompleteTimeNanos - apiLocalCompleteTimeNanos) - val apiThrottleTimeMs = nanosToMs(responseCompleteTimeNanos - apiRemoteCompleteTimeNanos) + val apiRemoteTimeMs = nanosToMs(responseCompleteTimeNanos - apiLocalCompleteTimeNanos) val responseQueueTimeMs = nanosToMs(responseDequeueTimeNanos - responseCompleteTimeNanos) val responseSendTimeMs = nanosToMs(endTimeNanos - responseDequeueTimeNanos) val messageConversionsTimeMs = nanosToMs(messageConversionsTimeNanos) @@ -215,7 +204,7 @@ object RequestChannel extends Logging { m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs)) m.localTimeHist.update(Math.round(apiLocalTimeMs)) m.remoteTimeHist.update(Math.round(apiRemoteTimeMs)) - m.throttleTimeHist.update(Math.round(apiThrottleTimeMs)) + m.throttleTimeHist.update(apiThrottleTimeMs) m.responseQueueTimeHist.update(Math.round(responseQueueTimeMs)) m.responseSendTimeHist.update(Math.round(responseSendTimeMs)) m.totalTimeHist.update(Math.round(totalTimeMs)) @@ -276,12 +265,6 @@ object RequestChannel extends Logging { } abstract class Response(val request: Request) { - locally { - val nowNs = Time.SYSTEM.nanoseconds - request.responseCompleteTimeNanos = nowNs - if (request.apiLocalCompleteTimeNanos == -1L) - request.apiLocalCompleteTimeNanos = nowNs - } def processor: Int = request.processor @@ -326,7 +309,7 @@ object RequestChannel extends Logging { } } -class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends KafkaMetricsGroup { +class RequestChannel(val queueSize: Int, val metricNamePrefix : String, time: Time) extends KafkaMetricsGroup { import RequestChannel._ val metrics = new RequestChannel.Metrics private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) @@ -362,6 +345,7 @@ class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends /** Send a response back to the socket server to be sent over the network */ def sendResponse(response: RequestChannel.Response): Unit = { + if (isTraceEnabled) { val requestHeader = response.request.header val message = response match { @@ -379,6 +363,18 @@ class RequestChannel(val queueSize: Int, val metricNamePrefix : String) extends trace(message) } + response match { + // We should only send one of the following per request + case _: SendResponse | _: NoOpResponse | _: CloseConnectionResponse => + val request = response.request + val timeNanos = time.nanoseconds() + request.responseCompleteTimeNanos = timeNanos + if (request.apiLocalCompleteTimeNanos == -1L) + request.apiLocalCompleteTimeNanos = timeNanos + // For a given request, these may happen in addition to one in the previous section, skip updating the metrics + case _: StartThrottlingResponse | _: EndThrottlingResponse => () + } + val processor = processors.get(response.processor) // The processor may be null if it was shutdown. In this case, the connections // are closed, so the response is dropped. @@ -444,7 +440,8 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup { val localTimeHist = newHistogram(LocalTimeMs, biased = true, tags) // time a request takes to wait on remote brokers (currently only relevant to fetch and produce requests) val remoteTimeHist = newHistogram(RemoteTimeMs, biased = true, tags) - // time a request is throttled + // time a request is throttled, not part of the request processing time (throttling is done at the client level + // for clients that support KIP-219 and by muting the channel for the rest) val throttleTimeHist = newHistogram(ThrottleTimeMs, biased = true, tags) // time a response spent in a response queue val responseQueueTimeHist = newHistogram(ResponseQueueTimeMs, biased = true, tags) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 350e58d2110f6..8328c244b1b25 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -25,7 +25,6 @@ import java.util import java.util.Optional import java.util.concurrent._ import java.util.concurrent.atomic._ -import java.util.function.Supplier import kafka.cluster.{BrokerEndPoint, EndPoint} import kafka.metrics.KafkaMetricsGroup @@ -92,11 +91,12 @@ class SocketServer(val config: KafkaConfig, // data-plane private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]() - val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix) + val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time) // control-plane private var controlPlaneProcessorOpt : Option[Processor] = None private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None - val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, ControlPlaneMetricPrefix)) + val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => + new RequestChannel(20, ControlPlaneMetricPrefix, time)) private var nextProcessorId = 0 private var connectionQuotas: ConnectionQuotas = _ @@ -908,10 +908,6 @@ private[kafka] class Processor(val id: Int, } } - private def nowNanosSupplier = new Supplier[java.lang.Long] { - override def get(): java.lang.Long = time.nanoseconds() - } - private def poll(): Unit = { val pollTimeout = if (newConnections.isEmpty) 300 else 0 try selector.poll(pollTimeout) @@ -929,7 +925,8 @@ private[kafka] class Processor(val id: Int, openOrClosingChannel(receive.source) match { case Some(channel) => val header = RequestHeader.parse(receive.payload) - if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, nowNanosSupplier)) + if (header.apiKey == ApiKeys.SASL_HANDSHAKE && channel.maybeBeginServerReauthentication(receive, + () => time.nanoseconds())) trace(s"Begin re-authentication: $channel") else { val nowNanos = time.nanoseconds() diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala index 8da19d94f755b..7a1c55dc3a47d 100644 --- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala @@ -250,18 +250,16 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, } def recordAndGetThrottleTimeMs(session: Session, clientId: String, value: Double, timeMs: Long): Int = { - var throttleTimeMs = 0 val clientSensors = getOrCreateQuotaSensors(session, clientId) try { clientSensors.quotaSensor.record(value, timeMs) + 0 } catch { - case _: QuotaViolationException => - // Compute the delay - val clientMetric = metrics.metrics().get(clientRateMetricName(clientSensors.metricTags)) - throttleTimeMs = throttleTime(clientMetric).toInt - debug("Quota violated for sensor (%s). Delay time: (%d)".format(clientSensors.quotaSensor.name(), throttleTimeMs)) + case e: QuotaViolationException => + val throttleTimeMs = throttleTime(e.value, e.bound, windowSize(e.metric, timeMs)).toInt + debug(s"Quota violated for sensor (${clientSensors.quotaSensor.name}). Delay time: ($throttleTimeMs)") + throttleTimeMs } - throttleTimeMs } /** "Unrecord" the given value that has already been recorded for the given user/client by recording a negative value @@ -337,16 +335,16 @@ class ClientQuotaManager(private val config: ClientQuotaManagerConfig, * we need to add a delay of X to W such that O * W / (W + X) = T. * Solving for X, we get X = (O - T)/T * W. */ - protected def throttleTime(clientMetric: KafkaMetric): Long = { - val config = clientMetric.config - val rateMetric: Rate = measurableAsRate(clientMetric.metricName(), clientMetric.measurable()) - val quota = config.quota() - val difference = clientMetric.metricValue.asInstanceOf[Double] - quota.bound + protected def throttleTime(quotaValue: Double, quotaBound: Double, windowSize: Long): Long = { + val difference = quotaValue - quotaBound // Use the precise window used by the rate calculation - val throttleTimeMs = difference / quota.bound * rateMetric.windowSize(config, time.milliseconds()) - throttleTimeMs.round + val throttleTimeMs = difference / quotaBound * windowSize + Math.round(throttleTimeMs) } + private def windowSize(metric: KafkaMetric, timeMs: Long): Long = + measurableAsRate(metric.metricName, metric.measurable).windowSize(metric.config, timeMs) + // Casting to Rate because we only use Rate in Quota computation private def measurableAsRate(name: MetricName, measurable: Measurable): Rate = { measurable match { diff --git a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala index 817b7ce78ceec..3cc4f68df4a81 100644 --- a/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ClientRequestQuotaManager.scala @@ -46,17 +46,12 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig, * @param request client request * @return Number of milliseconds to throttle in case of quota violation. Zero otherwise */ - def maybeRecordAndGetThrottleTimeMs(request: RequestChannel.Request): Int = { - if (request.apiRemoteCompleteTimeNanos == -1) { - // When this callback is triggered, the remote API call has completed - request.apiRemoteCompleteTimeNanos = time.nanoseconds - } - + def maybeRecordAndGetThrottleTimeMs(request: RequestChannel.Request, timeMs: Long): Int = { if (quotasEnabled) { request.recordNetworkThreadTimeCallback = Some(timeNanos => recordNoThrottle( getOrCreateQuotaSensors(request.session, request.header.clientId), nanosToPercentage(timeNanos))) recordAndGetThrottleTimeMs(request.session, request.header.clientId, - nanosToPercentage(request.requestThreadTimeNanos), time.milliseconds()) + nanosToPercentage(request.requestThreadTimeNanos), timeMs) } else { 0 } @@ -69,8 +64,8 @@ class ClientRequestQuotaManager(private val config: ClientQuotaManagerConfig, } } - override protected def throttleTime(clientMetric: KafkaMetric): Long = { - math.min(super.throttleTime(clientMetric), maxThrottleTimeMs) + override protected def throttleTime(quotaValue: Double, quotaBound: Double, windowSize: Long): Long = { + math.min(super.throttleTime(quotaValue, quotaBound, windowSize), maxThrottleTimeMs) } override protected def clientRateMetricName(quotaMetricTags: Map[String, String]): MetricName = { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index f61324809991d..049c4e09c609a 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -539,20 +539,21 @@ class KafkaApis(val requestChannel: RequestChannel, } } - // When this callback is triggered, the remote API call has completed - request.apiRemoteCompleteTimeNanos = time.nanoseconds - // Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the quotas // have been violated. If both quotas have been violated, use the max throttle time between the two quotas. Note // that the request quota is not enforced if acks == 0. - val bandwidthThrottleTimeMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, numBytesAppended, time.milliseconds) - val requestThrottleTimeMs = if (produceRequest.acks == 0) 0 else quotas.request.maybeRecordAndGetThrottleTimeMs(request) + val timeMs = time.milliseconds() + val bandwidthThrottleTimeMs = quotas.produce.maybeRecordAndGetThrottleTimeMs(request, numBytesAppended, timeMs) + val requestThrottleTimeMs = + if (produceRequest.acks == 0) 0 + else quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs) val maxThrottleTimeMs = Math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs) if (maxThrottleTimeMs > 0) { + request.apiThrottleTimeMs = maxThrottleTimeMs if (bandwidthThrottleTimeMs > requestThrottleTimeMs) { - quotas.produce.throttle(request, bandwidthThrottleTimeMs, sendResponse) + quotas.produce.throttle(request, bandwidthThrottleTimeMs, requestChannel.sendResponse) } else { - quotas.request.throttle(request, requestThrottleTimeMs, sendResponse) + quotas.request.throttle(request, requestThrottleTimeMs, requestChannel.sendResponse) } } @@ -741,10 +742,6 @@ class KafkaApis(val requestChannel: RequestChannel, } erroneous.foreach { case (tp, data) => partitions.put(tp, data) } - // When this callback is triggered, the remote API call has completed. - // Record time before any byte-rate throttling. - request.apiRemoteCompleteTimeNanos = time.nanoseconds - var unconvertedFetchResponse: FetchResponse[Records] = null def createResponse(throttleTimeMs: Int): FetchResponse[BaseRecords] = { @@ -794,19 +791,20 @@ class KafkaApis(val requestChannel: RequestChannel, // quotas have been violated. If both quotas have been violated, use the max throttle time between the two // quotas. When throttled, we unrecord the recorded bandwidth quota value val responseSize = fetchContext.getResponseSize(partitions, versionId) - val timeMs = time.milliseconds - val requestThrottleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request) + val timeMs = time.milliseconds() + val requestThrottleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs) val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs) val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs) if (maxThrottleTimeMs > 0) { + request.apiThrottleTimeMs = maxThrottleTimeMs // Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value // from the fetch quota because we are going to return an empty response. quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs) if (bandwidthThrottleTimeMs > requestThrottleTimeMs) { - quotas.fetch.throttle(request, bandwidthThrottleTimeMs, sendResponse) + quotas.fetch.throttle(request, bandwidthThrottleTimeMs, requestChannel.sendResponse) } else { - quotas.request.throttle(request, requestThrottleTimeMs, sendResponse) + quotas.request.throttle(request, requestThrottleTimeMs, requestChannel.sendResponse) } // If throttling is required, return an empty response. unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs) @@ -3012,17 +3010,23 @@ class KafkaApis(val requestChannel: RequestChannel, private def sendResponseMaybeThrottle(request: RequestChannel.Request, createResponse: Int => AbstractResponse, onComplete: Option[Send => Unit] = None): Unit = { - val throttleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request) - quotas.request.throttle(request, throttleTimeMs, sendResponse) + val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request) + quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse) sendResponse(request, Some(createResponse(throttleTimeMs)), onComplete) } private def sendErrorResponseMaybeThrottle(request: RequestChannel.Request, error: Throwable): Unit = { - val throttleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request) - quotas.request.throttle(request, throttleTimeMs, sendResponse) + val throttleTimeMs = maybeRecordAndGetThrottleTimeMs(request) + quotas.request.throttle(request, throttleTimeMs, requestChannel.sendResponse) sendErrorOrCloseConnection(request, error, throttleTimeMs) } + private def maybeRecordAndGetThrottleTimeMs(request: RequestChannel.Request): Int = { + val throttleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request, time.milliseconds()) + request.apiThrottleTimeMs = throttleTimeMs + throttleTimeMs + } + private def sendResponseExemptThrottle(request: RequestChannel.Request, response: AbstractResponse, onComplete: Option[Send => Unit] = None): Unit = { @@ -3072,10 +3076,7 @@ class KafkaApis(val requestChannel: RequestChannel, case None => new RequestChannel.NoOpResponse(request) } - sendResponse(response) - } - private def sendResponse(response: RequestChannel.Response): Unit = { requestChannel.sendResponse(response) } diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala index 375cd4868b25d..7098849cc1fc6 100644 --- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala @@ -107,7 +107,8 @@ class ReplicationQuotaManager(val config: ReplicationQuotaManagerConfig, sensor().checkQuotas() } catch { case qve: QuotaViolationException => - trace("%s: Quota violated for sensor (%s), metric: (%s), metric-value: (%f), bound: (%f)".format(replicationType, sensor().name(), qve.metricName, qve.value, qve.bound)) + trace(s"$replicationType: Quota violated for sensor (${sensor().name}), metric: (${qve.metric.metricName}), " + + s"metric-value: (${qve.value}), bound: (${qve.bound})") return true } false diff --git a/core/src/main/scala/kafka/server/ThrottledChannel.scala b/core/src/main/scala/kafka/server/ThrottledChannel.scala index c46188f606e79..531ef5da43199 100644 --- a/core/src/main/scala/kafka/server/ThrottledChannel.scala +++ b/core/src/main/scala/kafka/server/ThrottledChannel.scala @@ -33,9 +33,11 @@ import org.apache.kafka.common.utils.Time * @param throttleTimeMs Delay associated with this request * @param channelThrottlingCallback Callback for channel throttling */ -class ThrottledChannel(val request: RequestChannel.Request, val time: Time, val throttleTimeMs: Int, channelThrottlingCallback: Response => Unit) +class ThrottledChannel(val request: RequestChannel.Request, val time: Time, val throttleTimeMs: Int, + channelThrottlingCallback: Response => Unit) extends Delayed with Logging { - var endTime = time.milliseconds + throttleTimeMs + + private val endTimeNanos = time.nanoseconds() + TimeUnit.MILLISECONDS.toNanos(throttleTimeMs) // Notify the socket server that throttling has started for this channel. channelThrottlingCallback(new RequestChannel.StartThrottlingResponse(request)) @@ -47,13 +49,11 @@ class ThrottledChannel(val request: RequestChannel.Request, val time: Time, val } override def getDelay(unit: TimeUnit): Long = { - unit.convert(endTime - time.milliseconds, TimeUnit.MILLISECONDS) + unit.convert(endTimeNanos - time.nanoseconds(), TimeUnit.NANOSECONDS) } override def compareTo(d: Delayed): Int = { val other = d.asInstanceOf[ThrottledChannel] - if (this.endTime < other.endTime) -1 - else if (this.endTime > other.endTime) 1 - else 0 + java.lang.Long.compare(this.endTimeNanos, other.endTimeNanos) } -} \ No newline at end of file +} diff --git a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala index 3dbfa8f3882c2..e463167268626 100644 --- a/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala +++ b/core/src/main/scala/kafka/utils/timer/TimerTaskList.scala @@ -122,12 +122,8 @@ private[timer] class TimerTaskList(taskCounter: AtomicInteger) extends Delayed { } def compareTo(d: Delayed): Int = { - val other = d.asInstanceOf[TimerTaskList] - - if(getExpiration < other.getExpiration) -1 - else if(getExpiration > other.getExpiration) 1 - else 0 + java.lang.Long.compare(getExpiration, other.getExpiration) } } @@ -159,7 +155,7 @@ private[timer] class TimerTaskEntry(val timerTask: TimerTask, val expirationMs: } override def compare(that: TimerTaskEntry): Int = { - this.expirationMs compare that.expirationMs + java.lang.Long.compare(expirationMs, that.expirationMs) } } diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index cbd61b3ec3076..fee5de7771757 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -18,7 +18,9 @@ import java.time.Duration import java.util.concurrent.TimeUnit import java.util.{Collections, HashMap, Properties} +import com.yammer.metrics.core.{Histogram, Meter} import kafka.api.QuotaTestClients._ +import kafka.metrics.KafkaYammerMetrics import kafka.server.{ClientQuotaManager, ClientQuotaManagerConfig, DynamicConfig, KafkaConfig, KafkaServer, QuotaType} import kafka.utils.TestUtils import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer} @@ -26,10 +28,13 @@ import org.apache.kafka.clients.producer._ import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback import org.apache.kafka.common.{Metric, MetricName, TopicPartition} import org.apache.kafka.common.metrics.{KafkaMetric, Quota} +import org.apache.kafka.common.protocol.ApiKeys import org.apache.kafka.common.security.auth.KafkaPrincipal import org.junit.Assert._ import org.junit.{Before, Test} +import org.scalatest.Assertions.fail +import scala.collection.Map import scala.jdk.CollectionConverters._ abstract class BaseQuotaTest extends IntegrationTestHarness { @@ -186,15 +191,11 @@ abstract class QuotaTestClients(topic: String, val producer: KafkaProducer[Array[Byte], Array[Byte]], val consumer: KafkaConsumer[Array[Byte], Array[Byte]]) { - def userPrincipal: KafkaPrincipal def overrideQuotas(producerQuota: Long, consumerQuota: Long, requestQuota: Double): Unit def removeQuotaOverrides(): Unit - def quotaMetricTags(clientId: String): Map[String, String] - - def quota(quotaManager: ClientQuotaManager, userPrincipal: KafkaPrincipal, clientId: String): Quota = { - quotaManager.quota(userPrincipal, clientId) - } + protected def userPrincipal: KafkaPrincipal + protected def quotaMetricTags(clientId: String): Map[String, String] def produceUntilThrottled(maxRecords: Int, waitForRequestCompletion: Boolean = true): Int = { var numProduced = 0 @@ -235,19 +236,38 @@ abstract class QuotaTestClients(topic: String, numConsumed } - def verifyProduceThrottle(expectThrottle: Boolean, verifyClientMetric: Boolean = true): Unit = { + private def quota(quotaManager: ClientQuotaManager, userPrincipal: KafkaPrincipal, clientId: String): Quota = { + quotaManager.quota(userPrincipal, clientId) + } + + private def verifyThrottleTimeRequestChannelMetric(apiKey: ApiKeys, metricNameSuffix: String, + clientId: String, expectThrottle: Boolean): Unit = { + val throttleTimeMs = brokerRequestMetricsThrottleTimeMs(apiKey, metricNameSuffix) + if (expectThrottle) + assertTrue(s"Client with id=$clientId should have been throttled, $throttleTimeMs", throttleTimeMs > 0) + else + assertEquals(s"Client with id=$clientId should not have been throttled", 0.0, throttleTimeMs, 0.0) + } + + def verifyProduceThrottle(expectThrottle: Boolean, verifyClientMetric: Boolean = true, + verifyRequestChannelMetric: Boolean = true): Unit = { verifyThrottleTimeMetric(QuotaType.Produce, producerClientId, expectThrottle) + if (verifyRequestChannelMetric) + verifyThrottleTimeRequestChannelMetric(ApiKeys.PRODUCE, "", producerClientId, expectThrottle) if (verifyClientMetric) verifyProducerClientThrottleTimeMetric(expectThrottle) } - def verifyConsumeThrottle(expectThrottle: Boolean, verifyClientMetric: Boolean = true): Unit = { + def verifyConsumeThrottle(expectThrottle: Boolean, verifyClientMetric: Boolean = true, + verifyRequestChannelMetric: Boolean = true): Unit = { verifyThrottleTimeMetric(QuotaType.Fetch, consumerClientId, expectThrottle) + if (verifyRequestChannelMetric) + verifyThrottleTimeRequestChannelMetric(ApiKeys.FETCH, "Consumer", consumerClientId, expectThrottle) if (verifyClientMetric) verifyConsumerClientThrottleTimeMetric(expectThrottle) } - def verifyThrottleTimeMetric(quotaType: QuotaType, clientId: String, expectThrottle: Boolean): Unit = { + private def verifyThrottleTimeMetric(quotaType: QuotaType, clientId: String, expectThrottle: Boolean): Unit = { val throttleMetricValue = metricValue(throttleMetric(quotaType, clientId)) if (expectThrottle) { assertTrue(s"Client with id=$clientId should have been throttled", throttleMetricValue > 0) @@ -256,7 +276,7 @@ abstract class QuotaTestClients(topic: String, } } - def throttleMetricName(quotaType: QuotaType, clientId: String): MetricName = { + private def throttleMetricName(quotaType: QuotaType, clientId: String): MetricName = { leaderNode.metrics.metricName("throttle-time", quotaType.toString, quotaMetricTags(clientId).asJava) @@ -266,12 +286,28 @@ abstract class QuotaTestClients(topic: String, leaderNode.metrics.metrics.get(throttleMetricName(quotaType, clientId)) } + private def brokerRequestMetricsThrottleTimeMs(apiKey: ApiKeys, metricNameSuffix: String): Double = { + def yammerMetricValue(name: String): Double = { + val allMetrics = KafkaYammerMetrics.defaultRegistry.allMetrics.asScala + val (_, metric) = allMetrics.find { case (metricName, _) => + metricName.getMBeanName.startsWith(name) + }.getOrElse(fail(s"Unable to find broker metric $name: allMetrics: ${allMetrics.keySet.map(_.getMBeanName)}")) + metric match { + case m: Meter => m.count.toDouble + case m: Histogram => m.max + case m => fail(s"Unexpected broker metric of class ${m.getClass}") + } + } + + yammerMetricValue(s"kafka.network:type=RequestMetrics,name=ThrottleTimeMs,request=${apiKey.name}$metricNameSuffix") + } + def exemptRequestMetric: KafkaMetric = { val metricName = leaderNode.metrics.metricName("exempt-request-time", QuotaType.Request.toString, "") leaderNode.metrics.metrics.get(metricName) } - def verifyProducerClientThrottleTimeMetric(expectThrottle: Boolean): Unit = { + private def verifyProducerClientThrottleTimeMetric(expectThrottle: Boolean): Unit = { val tags = new HashMap[String, String] tags.put("client-id", producerClientId) val avgMetric = producer.metrics.get(new MetricName("produce-throttle-time-avg", "producer-metrics", "", tags)) diff --git a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala index 1fdee6525e970..587e438dad097 100644 --- a/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala +++ b/core/src/test/scala/integration/kafka/api/CustomQuotaCallbackTest.scala @@ -264,11 +264,16 @@ class CustomQuotaCallbackTest extends IntegrationTestHarness with SaslSetup { def produceConsume(expectProduceThrottle: Boolean, expectConsumeThrottle: Boolean): Unit = { val numRecords = 1000 val produced = produceUntilThrottled(numRecords, waitForRequestCompletion = false) - verifyProduceThrottle(expectProduceThrottle, verifyClientMetric = false) + // don't verify request channel metrics as it's difficult to write non flaky assertions + // given the specifics of this test (throttle metric removal followed by produce/consume + // until throttled) + verifyProduceThrottle(expectProduceThrottle, verifyClientMetric = false, + verifyRequestChannelMetric = false) // make sure there are enough records on the topic to test consumer throttling produceWithoutThrottle(topic, numRecords - produced) consumeUntilThrottled(numRecords, waitForRequestCompletion = false) - verifyConsumeThrottle(expectConsumeThrottle, verifyClientMetric = false) + verifyConsumeThrottle(expectConsumeThrottle, verifyClientMetric = false, + verifyRequestChannelMetric = false) } def removeThrottleMetrics(): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index d41d4d81d8dc7..7dcb70f9725ee 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -1841,8 +1841,8 @@ class KafkaApisTest { } private def expectNoThrottling(): Capture[RequestChannel.Response] = { - EasyMock.expect(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(EasyMock.anyObject[RequestChannel.Request]())) - .andReturn(0) + EasyMock.expect(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(EasyMock.anyObject[RequestChannel.Request](), + EasyMock.anyObject[Long])).andReturn(0) EasyMock.expect(clientRequestQuotaManager.throttle(EasyMock.anyObject[RequestChannel.Request](), EasyMock.eq(0), EasyMock.anyObject[RequestChannel.Response => Unit]()))