KAFKA-7719: Improve fairness in SocketServer processors (KIP-402)#6022
KAFKA-7719: Improve fairness in SocketServer processors (KIP-402)#6022ijuma merged 4 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
Nit: we can take the chance and change this to use string interpolation.
There was a problem hiding this comment.
@ijuma Thanks for the review. Updated all the strings in SocketServer.
ijuma
left a comment
There was a problem hiding this comment.
Thanks for the PR. One question.
There was a problem hiding this comment.
Do we really want a name with a $?
There was a problem hiding this comment.
@ijuma Thanks for the review. It is using string interpolation, metric name is AcceptorIdlePercent. Hope I haven't missed something.
There was a problem hiding this comment.
My bad, it just looked a bit odd in the diff.
240b5c6 to
2d08883
Compare
2d08883 to
82c3168
Compare
ijuma
left a comment
There was a problem hiding this comment.
Thanks for the PR, left a couple of minor comments.
| val IdlePercentMetricName = "IdlePercent" | ||
| val NetworkProcessorMetricTag = "networkProcessor" | ||
| val ListenerMetricTag = "listener" | ||
| val SocketServerMetricsGroup = "socket-server-metrics" |
There was a problem hiding this comment.
Should this be in the SocketServer companion object? It could then just be called MetricsGroup perhaps.
| brokerId: Int, | ||
| connectionQuotas: ConnectionQuotas) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { | ||
| connectionQuotas: ConnectionQuotas, | ||
| metricPrefix: String = "") extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { |
There was a problem hiding this comment.
I think it would be better not to have a default for metricPrefix.
| private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = synchronized { | ||
| endpointOpt.foreach { endpoint => | ||
| val controlPlaneAcceptor = createAcceptor(endpoint) | ||
| val controlPlaneAcceptor = createAcceptor(endpoint, "ControlPlane") |
There was a problem hiding this comment.
One more suggestion: we should probably create a constant for this (in the SocketServer companion object probably), move the following two constants to the same place and use them in metric names. If we had used constants when the control plane PR was introduced, it would have made this review easier and avoids bugs where people mistype stuff. We are also hardcoding the prefix in the thread name control-plane-kafka-socket-acceptor even though we pass the constant to KafkaRequestHandlerPool. Quite inconsistent.
val DataPlanePrefix = "data-plane"
val ControlPlanePrefix = "control-plane"| val blockedPercentMetric = blockedPercentMetrics.head.asInstanceOf[Meter] | ||
| val blockedPercent = blockedPercentMetric.meanRate | ||
| if (expectBlocked) { | ||
| assertTrue(s"Acceptor idle percent not recorded: $blockedPercent", blockedPercent > 0.0) |
There was a problem hiding this comment.
Seems like we need to fix the text here and the following line.
| override def poll(timeout: Long): Unit = { | ||
| try { | ||
| if (pollBlockMs > 0) | ||
| Thread.sleep(pollBlockMs) |
There was a problem hiding this comment.
A bit unfortunate that we have to do this in what we class as a unit test. Should this be an integration test?
There was a problem hiding this comment.
@ijuma Thanks for the reviews. I have rewritten the test to avoid the sleep.
ijuma
left a comment
There was a problem hiding this comment.
Thanks for the updates @rajinisivaram, a couple more comments/questions.
| config.numIoThreads, "RequestHandlerAvgIdlePercent", socketServer.DataPlanePrefix) | ||
| config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix) | ||
|
|
||
| config.controlPlaneListener.foreach { _ => |
There was a problem hiding this comment.
Should we change this to do socketServer.controlPlaneRequestChannelOpt.foreach instead?
There was a problem hiding this comment.
I saw that yesterday and wasn't sure why it was that way, so had left it. Updated.
| Thread.getAllStackTraces.asScala.exists { case (thread, stackTrace) => | ||
| thread.getName.contains("kafka-socket-acceptor") && | ||
| thread.getState == Thread.State.WAITING && | ||
| stackTrace.toList.toString.contains("ArrayBlockingQueue") |
There was a problem hiding this comment.
We are relying on some details that could change under us. Would the test fail if the thread name changed or if the queue implementation changed? And if so, would it output enough debugging information?
There was a problem hiding this comment.
Yes, the test will fail in those cases. The relevant exceptions would have been processed by Processor and logged at ERROR level. I have changed the code to use assertions containing the relevant debugging info in the main test thread to make it more obvious.
…s (KIP-402) Adds a new listener config `max.connections` to limit the number of active connections on each listener. The config may be prefixed with listener prefix. This limit may be dynamically reconfigured without restarting the broker. This is one of the PRs for KIP-402 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-402%3A+Improve+fairness+in+SocketServer+processors). Note that this is currently built on top of PR #6022 Author: Rajini Sivaram <rajinisivaram@googlemail.com> Reviewers: Gwen Shapira <cshapi@gmail.com> Closes #6034 from rajinisivaram/KAFKA-7730-max-connections
* AK/trunk: fix typo (apache#5150) MINOR: Reduce replica.fetch.backoff.ms in ReassignPartitionsClusterTest (apache#5887) KAFKA-7766: Fail fast PR builds (apache#6059) KAFKA-7798: Expose embedded clientIds (apache#6107) KAFKA-7641; Introduce "group.max.size" config to limit group sizes (apache#6163) KAFKA-7433; Introduce broker options in TopicCommand to use AdminClient (KIP-377) MINOR: Fix some field definitions for ListOffsetReponse (apache#6214) KAFKA-7873; Always seek to beginning in KafkaBasedLog (apache#6203) KAFKA-7719: Improve fairness in SocketServer processors (KIP-402) (apache#6022) MINOR: fix checkstyle suppressions for generated RPC code to work on Windows KAFKA-7859: Use automatic RPC generation in LeaveGroups (apache#6188) KAFKA-7652: Part II; Add single-point query for SessionStore and use for flushing / getter (apache#6161) KAFKA-3522: Add RocksDBTimestampedStore (apache#6149) KAFKA-3522: Replace RecordConverter with TimestampedBytesStore (apache#6204)
…ache#6022) Limit the number of new connections processed in each iteration of each Processor. Block Acceptor if the connection queue is full on all Processors. Added a metric to track accept blocked time percent. See KIP-402 for details. Reviewers: Ismael Juma <ismael@juma.me.uk>
…s (KIP-402) Adds a new listener config `max.connections` to limit the number of active connections on each listener. The config may be prefixed with listener prefix. This limit may be dynamically reconfigured without restarting the broker. This is one of the PRs for KIP-402 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-402%3A+Improve+fairness+in+SocketServer+processors). Note that this is currently built on top of PR apache#6022 Author: Rajini Sivaram <rajinisivaram@googlemail.com> Reviewers: Gwen Shapira <cshapi@gmail.com> Closes apache#6034 from rajinisivaram/KAFKA-7730-max-connections
Limit the number of new connections processed in each iteration in
SocketServeron eachProcessor. BlockAcceptorif the connection queue is full on all Processors. Added a metric to track accept idle time percent. See KIP-402 for details.Committer Checklist (excluded from commit message)