Skip to content

KAFKA-4453: add request prioritization#2181

Closed
onurkaraman wants to merge 2 commits intoapache:trunkfrom
onurkaraman:KAFKA-4453
Closed

KAFKA-4453: add request prioritization#2181
onurkaraman wants to merge 2 commits intoapache:trunkfrom
onurkaraman:KAFKA-4453

Conversation

@onurkaraman
Copy link
Copy Markdown
Contributor

Today all requests (client requests, broker requests, controller requests) to a broker are put into the same queue. They all have the same priority. So a backlog of requests ahead of the controller request will delay the processing of controller requests. This causes requests infront of the controller request to get processed based on stale state.

Side effects may include giving clients stale metadata, rejecting ProduceRequests and FetchRequests, and data loss (for some unofficial definition of data loss in terms of messages beyond the high watermark).

We'd like to minimize the number of requests processed based on stale state. With request prioritization, controller requests get processed before regular queued up requests, so requests can get processed with up-to-date state.

Request prioritization can happen at the network layer with the RequestChannel. The RequestChannel can categorize the request as regular or prioritized based on the request id. If the incoming request id matches that of UpdateMetadataRequest, LeaderAndIsrRequest, and StopReplicaRequest, the request can get prioritized.

One solution is to simply add a prioritized request queue to supplement the existing request queue in the RequestChannel and add request prioritization-aware logic to both the sendRequest and receiveRequest operations of RequestChannel. sendRequest puts the request into the respective queue based on whether the request is prioritized or not. receiveRequest can optimistically check the prioritized request queue and otherwise fallback to the regular request queue. One subtlety here is whether to do a timed poll on just the regular request queue or on both the prioritized request queue and regular request queue sequentially. Only applying the timed poll to the regular request queue punishes a prioritized request that arrives before a regular request but moments after the prioritized request check. Applying the timed poll to both queues sequentially results in a guaranteed latency increase on a regular request.

An alternative is to replace RequestChannel’s existing request queue with a prioritization-aware blocking queue. This approach avoids the earlier stated subtlety by allowing the timed poll to apply to either prioritized or regular requests in low-throughput scenarios while still allowing queued prioritized requests to go ahead of queued regular requests.

This patch goes with the latter approach to avoid punishing late arriving prioritized requests.

@becketqin
Copy link
Copy Markdown
Contributor

Thanks for the patch. LGTM. Regarding the boot sequence issue that the broker may receive a request from a client before it receives the controllers request, could you file another Jira ticket to track?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to prioritize ControlledShutdownRequest as well?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd have a set containing the prioritised request ids and then simply use contains instead of having a potentially large number of condition checks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Same thought 👍

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@becketqin probably not since this is to prioritize controller-to-broker requests only.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Nov 30, 2016

What testing has been done to validate this approach? cc @junrao

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma Same thought 👍

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@becketqin probably not since this is to prioritize controller-to-broker requests only.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the motivation of this custom queue, but I would really like to avoid it if we can. Let us brainstorm offline. You probably saw this - seems hacky, but also works.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@onurkaraman, what was the conclusion of the offline discussion?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The discussion happened 9 months ago, so it's hard to say for sure. From what I remember, we tried just using/composing existing concurrent data structures to get the desired effect but they were all unpleasant hacks.

@onurkaraman onurkaraman force-pushed the KAFKA-4453 branch 2 times, most recently from b3a25dc to 21126c1 Compare December 1, 2016 23:49
@onurkaraman
Copy link
Copy Markdown
Contributor Author

This PR had merge conflicts so I rebased and pushed.

@onurkaraman
Copy link
Copy Markdown
Contributor Author

By the way, can we resurface this discussion?

@asfgit
Copy link
Copy Markdown

asfgit commented Jun 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/5350/
Test FAILed (JDK 7 and Scala 2.11).

@asfgit
Copy link
Copy Markdown

asfgit commented Jun 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/5335/
Test PASSed (JDK 8 and Scala 2.12).

Today all requests (client requests, broker requests, controller requests) to a broker are put into the same queue. They all have the same priority. So a backlog of requests ahead of the controller request will delay the processing of controller requests. This causes requests infront of the controller request to get processed based on stale state.

Side effects may include giving clients stale metadata, rejecting ProduceRequests and FetchRequests, and data loss (for some unofficial definition of data loss in terms of messages beyond the high watermark).

We'd like to minimize the number of requests processed based on stale state. With request prioritization, controller requests get processed before regular queued up requests, so requests can get processed with up-to-date state.

Request prioritization can happen at the network layer with the RequestChannel. The RequestChannel can categorize the request as regular or prioritized based on the api key. If the incoming request api key  matches that of UpdateMetadataRequest, LeaderAndIsrRequest, and StopReplicaRequest, the request can get prioritized.

One solution is to simply add a prioritized request queue to supplement the existing request queue in the RequestChannel and add request prioritization-aware logic to both the sendRequest and receiveRequest operations of RequestChannel. sendRequest puts the request into the respective queue based on whether the request is prioritized or not. receiveRequest can optimistically check the prioritized request queue and otherwise fallback to the regular request queue. One subtlety here is whether to do a timed poll on just the regular request queue or on both the prioritized request queue and regular request queue sequentially. Only applying the timed poll to the regular request queue punishes a prioritized request that arrives before a regular request but moments after the prioritized request check. Applying the timed poll to both queues sequentially results in a guaranteed latency increase on a regular request.

An alternative is to replace RequestChannel’s existing request queue with a prioritization-aware blocking queue. This approach avoids the earlier stated subtlety by allowing the timed poll to apply to either prioritized or regular requests in low-throughput scenarios while still allowing queued prioritized requests to go ahead of queued regular requests.

This patch goes with the latter approach to avoid punishing late arriving prioritized requests.
@onurkaraman
Copy link
Copy Markdown
Contributor Author

This PR had merge conflicts so I rebased and pushed.

Copy link
Copy Markdown
Contributor

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the patch. Made a pass and left some comments.

*
* @return potentially prioritized element from the head of the queue
*/
private def dequeue: E = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that by Scala convention this method should have parenthesis?

* Prioritized elements in the queue get polled before regular elements in the queue.
*/
private[network] class PrioritizationAwareBlockingQueue[E](private val capacity: Int) {
private var count: Int = 0
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks that the count is shared by the prioritized and normal requests. When there are too many normal requests, it seems that the prioritized request may not really be prioritized to be put into the queue? So the queue is actually prioritized on the get side. This is probably OK for most of the cases. But if the queue is moving slowly and there are N putting threads, each thread may have to wait for N/2 on average. Do we want to consider prioritize on the put side as well, i.e. adding another set of conditional variables and count.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting point. Request prioritization would be most useful when the system is under heavy load, meaning it would be most useful when the queue is already full.

I was hoping to avoid a config change. The current implementation tries to provide a solution while still honoring the existing "queued.max.requests" config.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I think I see what you mean. We can add prioritization to the put and still honor the the existing "queued.max.requests".

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we actually want prioritization on both the get-side and put-side, I have some code ready. It's just a matter of deciding which behavior we want.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@onurkaraman Sorry for the late reply. I think it may worth prioritize on the put side as well. Could you update the patch with that logic?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added the commit to the PR.

import scala.reflect.ClassTag

object RequestChannel extends Logging {
val PrioritizedApiKeys = Set(ApiKeys.LEADER_AND_ISR.id, ApiKeys.UPDATE_METADATA_KEY.id, ApiKeys.STOP_REPLICA.id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember we discussed this before, but can you remind me why ControlledShutdownRequest is not prioritized?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you commented on this in an above review a long time ago. Joel's reason is why:
#2181 (comment)

The PR tries to keep request prioritization simple and only prioritize controller-to-broker requests.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is prioritizing just the controller requests enough? From a client's perspective, it wants not only the metadata to be up to date, but also other things like all live replicas to be in sync. Otherwise, some produce requests with acks=all and min.isr could fail even with up-to-date metadata. So, shouldn't we be prioritizing replica fetch requests as well?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao The original idea was to separate the control plain and the data plain so the state transition in the cluster can be propagated faster. Treating the replication traffic differently from the user traffic could be useful. But I feel it is more of a quota problem than a priority issue. If we prioritize the replica fetch request, it might be possible that no client requests can be served because there are always replication fetch in the queue.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if you can use quota to solve the replication issue, can't you use it to solve the controller request issue? Basically, broker internal communication won't be throttled and client traffic will be throttled.

Also, if the issue is too many requests from the controller, Onur's patch in KAFKA-5642 will reduce the controller request rate during a controlled shutdown significantly.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the use case for Quota is mostly for throughput control. And what we are trying to achieve is to improve state change latency.

The issue we are trying to solve is that when a broker has many requests queued up and moving slowly, the controller requests handling are delayed by long. This may cause a long period of inconsistent state within the cluster because some of the brokers may see the state change much later than the other brokers. It is not only for controlled shutdown but would help with other admin operations in a busy cluster as well. In this case, we actually want to improve the state propagation latency, quota may indirectly help but will not work as well as the prioritized requests.

@becketqin
Copy link
Copy Markdown
Contributor

cc @junrao @ijuma @jjkoshy to see if you have any concerns about the change.

The patch is trying to solve the issue that when the requests are queued up in some brokers the cluster state change are also delayed because all the controller requests are sharing the same queue with the client requests. This may result in an extended period of double leader/no leader, discrepancy in metadata, etc. The patch uses two different queues to prioritize the controller requests over client requests. With the change there will be a better guarantee on the state change propagation delay.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@onurkaraman : Thanks for the patch. A few comments below.

* @return potentially prioritized element from the head of the queue
*/
private def dequeue: E = {
val e = if (prioritizedElements.isEmpty) regularElements.poll else prioritizedElements.poll
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, when an item is added to an empty queue, there could be multiple request handler threads calling poll(). They could all get notified at the same time and all see prioritizedElements.isEmpty being false and one of them will then be blocked forever. One implication of this is that the requestHandlerIdleAvg metric won't be updated.

Also, how does this work with KIP-72? Ideally, we probably want to bound the request queue by memory , instead of number of requests. The issue is that in KIP-72, we delay the processing of an incoming request in the socket layer, at which point we don't know the request type yet. So, it's a bit hard to prioritize based on request type there.

Copy link
Copy Markdown
Contributor Author

@onurkaraman onurkaraman Aug 31, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, when an item is added to an empty queue, there could be multiple request handler threads calling poll(). They could all get notified at the same time and all see prioritizedElements.isEmpty being false and one of them will then be blocked forever. One implication of this is that the requestHandlerIdleAvg metric won't be updated.

A few comments on the above:

  1. A put only calls signal, not signalAll, so only a single waiting thread is woken up. Note that the code would still work even if signalAll were to be called.
  2. Regardless of the signal variant called, the thread(s) that are woken up re-acquire the lock before returning from await. This means only one thread is active in the critical section with the lock at any point in time. Access to all of our internal data structures are synchronized under the lock.
  3. RequestChannel today already uses ArrayBlockingQueue. ArrayBlockingQueue's put API will block indefinitely until it's interrupted or no longer full. The PrioritizationAwareBlockingQueue behaves the same way. You can compare ArrayBlockingQueue's implementation with PrioritizationAwareBlockingQueue if you want.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, didn't see the lock. So, this is fine then.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Good point about KIP-72. If we follow the original idea to separate control plain and data plain. The controller to broker connection should have a dedicated selector which should never get throttled. It is likely OK because we know that at any given point there should only be one controller and at most one controller request in process.

import scala.reflect.ClassTag

object RequestChannel extends Logging {
val PrioritizedApiKeys = Set(ApiKeys.LEADER_AND_ISR.id, ApiKeys.UPDATE_METADATA_KEY.id, ApiKeys.STOP_REPLICA.id)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is prioritizing just the controller requests enough? From a client's perspective, it wants not only the metadata to be up to date, but also other things like all live replicas to be in sync. Otherwise, some produce requests with acks=all and min.isr could fail even with up-to-date metadata. So, shouldn't we be prioritizing replica fetch requests as well?

private[network] class PrioritizationAwareBlockingQueue[E](private val capacity: Int) {
private var count: Int = 0
private val prioritizedElements: ArrayDeque[E] = new ArrayDeque[E]()
private val regularElements: ArrayDeque[E] = new ArrayDeque[E]()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a metric so that we can monitor both queues' size?

if (nanos <= 0) {
return null.asInstanceOf[E]
}
nanos = notEmpty.awaitNanos(nanos)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could be in the while loop multiple times. So, we probably need to recalculate the remaining time in each loop?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm already doing this by using the return value of awaitNanos. Here are the docs:
https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html#awaitNanos(long)

* @return potentially prioritized element from the head of the queue
*/
private def dequeue: E = {
val e = if (prioritizedElements.isEmpty) regularElements.poll else prioritizedElements.poll
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, didn't see the lock. So, this is fine then.

@becketqin
Copy link
Copy Markdown
Contributor

@junrao Do you have further concerns on the request prioritization?

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Sep 14, 2017

@becketqin : I am still not sure about the patch.

In general, I agree that we want to protect the broker internal traffic more than the client traffic. Intuitively, it seems that we want to protect all internal traffic including all broker to broker communication, not just the LeaderAndIsrRequest and the UpdateMetadataRequest.

The next question is on the approach. First of all, if the broker doesn't have the resource to process at least all the internal traffic, none of the approach will be effective. If the broker does have enough resource, we could either (1) prioritize internal traffic or (2) give the internal traffic enough quota. I am not sure if you could solve the problem by just having (1) though since it seems that the ongoing client traffic w/o throttling could still delay the processing of the internal traffic.

Finally, if the concern is that the controller is sending too many requests, we probably should just address that directly?

@becketqin
Copy link
Copy Markdown
Contributor

becketqin commented Sep 15, 2017

@junrao Let me try to convince you that we should give the highest priority to the controller requests (and potentially admin requests).

The main idea behind this patch is not to distinguish between internal/external traffic. It is about separating Control from Data. They exist in both internal and external traffic. More specifically,

  • For broker to broker (internal) communication, control means state changes (controller requests), data means replication traffic.
  • For client to broker (external) communication, control means admin requests, data means Produce/Consume.

It is important to prioritize control requests because it gives Kafka agility and usability regardless of the workload. Arguably I think it is "OK" for a Kafka cluster to have UnderReplicatedPartition because it is a well defined state. But it is not OK for different brokers to report different metadata, or not able to shutdown a broker, etc. An analogy would be running an application on the laptop. It is OK for the application to stuck, but it should not cause the entire machine to hung so the users can still do a kill -9 on it.

Regarding the data, I agree that we should give priority to the internal traffic, but that is more of a discussion within the data traffic instead of the focus of this patch.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Sep 15, 2017

By the way, I also agree that we should separate the control and data planes. Is this the right approach though? Do we have enough visibility into queue sizes for control versus data planes and other observability metrics? I think it would make sense to do some thinking of how this should work and to propose a KIP IMO.

@becketqin
Copy link
Copy Markdown
Contributor

@ijuma Yeah, for the entire control/data plane separation a KIP would be more appropriate as we may need a separate socket (even port), dedicated request handlers, etc. We were just trying to see if there is a simple approach to solve the most pressing issue so we can include it to 1.0.0. Let's do the KIP given there are concerns for this approach.

pendingPrioritizedPuts += 1
}
while (count == capacity || (!prioritized && pendingPrioritizedPuts > 0)) {
notFull.await()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's possible to end up with a deadlock.
Say if the queue already is full, and two threads A and B are trying to put items into the queue, and both of them got blocked at this line awaiting for the queue to become notFull. Say thread A is trying to put a regular item and thread B is trying to put a prioritized item, then another thread C polls one item out of the queue and then calls notFull.signal, which will wake up ONLY ONE blocked thread. If thread A is waken up, the while condition check will put it back to sleep again. Thread B never gets waken up, and therefore does not have a chance to proceed.

Using notFull.signalAll in the dequeue in function can resolve this problem, but calling signalAll can have performance impacts. We should probably think about other options.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Feb 21, 2019

Implemented via #5921

@ijuma ijuma closed this Feb 21, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants