KAFKA-9751: Forward CreateTopicsRequest for FindCoordinator/Metadata when topic creation is needed#9579
Conversation
c60f843 to
b5f0b2f
Compare
b5f0b2f to
5e0a499
Compare
cdc9037 to
df0cc62
Compare
7982773 to
309d45a
Compare
becebdb to
ab42e9f
Compare
hachikuji
left a comment
There was a problem hiding this comment.
Did a quick overview and just want to make sure I understand the high-level details. It looks like this patch is handling two cases:
- automatic topic creation through
Metadata - automatic internal topic creation through
FindCoordinator
When the broker encounters one of these cases, it sends a CreateTopic request to the controller. However, the broker does not wait for the response before returning the client response. In the case of FindCoordinator, we return COORDINATOR_NOT_AVAILABLE; for Metadata, we return LEADER_NOT_AVAILABLE.
Do I have that right?
One question that occurred to me is whether we need logic to avoid spamming the controller with CreateTopic requests while we are waiting for the initial request to go through? For example, after seeing COORDINATOR_NOT_AVAILABLE, the consumer will backoff a few ms and then retry. Do we need somewhere to track the topics that are already awaiting creation?
There was a problem hiding this comment.
An alternative that we have done elsewhere would be to introduce an AbstractMetadataRequestTest which we can pull the common cases up to. A more elegant option might be to figure out how to use @ParameterizedTest so that we can provide config overrides. This would be a little difficult at the moment because we initialize brokers in a @Before method. Probably means we need to move away from this approach long term. For now, the abstract class seems preferable. Similar for CreateTopicsRequestWithForwardingTest.
There was a problem hiding this comment.
The controller should have these configurations as well. Perhaps it is better to use -1 for this and replication factor and let the controller fill them in?
There was a problem hiding this comment.
I agree it's equivalent, but I think we could be conservative here to keep the logic on broker side for now, to reduce logical change in this PR.
There was a problem hiding this comment.
I don't think it is equivalent, at least not completely. My thought was to reduce the reliance on the broker's configuration, which is more likely to be stale than the controller. This actually raises an interesting question about the CreateTopic API which I had not thought of before. If we receive a CreateTopic request for an internal topic, which configuration should we use? Currently it looks like we will apply the standard topic defaults, but that does not seem right. I filed https://issues.apache.org/jira/browse/KAFKA-12280, so we can consider this later.
There was a problem hiding this comment.
We seem to have lost this handling or am I missing something?
There was a problem hiding this comment.
We intentionally avoid using adminZkClient so that we could go through topic creation rules through zkAdminManager. TopicExistsException is handled there.
There was a problem hiding this comment.
Inside ZkAdminManager.createTopics, I see that we catch TopicExistsException. However, I do not see any logic to translate it to LEADER_NOT_AVAILABLE. Can you show me where this happens?
There was a problem hiding this comment.
The problem we have is that ZkAdminManager.createTopics only takes a callback instead of responding to you in realtime whether we hit TopicExists. Right now we are doing the topic creation async, so unless this is necessary to be fixed (which today we would just return UNKNOWN_PARTITION which seems to be semantically similar to LEADER_NOT_AVAILABLE), I think we could just returning unknown partition immediately without waiting for the async creation?
There was a problem hiding this comment.
Hmm.. In the old logic, we would attempt topic creation through zookeeper first. Then, if the topic was created successfully, we would return LEADER_NOT_AVAILABLE to give time for the controller to elect a leader. Now we return LEADER_NOT_AVAILABLE immediately and we send the CreateTopic request asynchronously. We don't know if the CreateTopic request will ultimately succeed or not. Perhaps it would be better to keep returning UNKNOWN_TOPIC_OR_PARTITION until we see that the topic exists.
0dcd806 to
b303f12
Compare
9b4521a to
2848e8d
Compare
|
|
||
| val channelManager = | ||
| if (enableForwarding) | ||
| Some(new BrokerToControllerChannelManager( |
There was a problem hiding this comment.
Let's leave this for a follow-up, but just want to mention that it is probably better if we can reuse the same BrokerToControllerChannelManager as ForwardingManager. Can you file a JIRA for a follow-up?
There was a problem hiding this comment.
Sure, we do have https://issues.apache.org/jira/browse/KAFKA-10348 to track.
There was a problem hiding this comment.
I don't think it is equivalent, at least not completely. My thought was to reduce the reliance on the broker's configuration, which is more likely to be stale than the controller. This actually raises an interesting question about the CreateTopic API which I had not thought of before. If we receive a CreateTopic request for an internal topic, which configuration should we use? Currently it looks like we will apply the standard topic defaults, but that does not seem right. I filed https://issues.apache.org/jira/browse/KAFKA-12280, so we can consider this later.
| } | ||
|
|
||
| override def createTopics(topics: Set[CreatableTopic], | ||
| controllerMutationQuota: ControllerMutationQuota): Unit = { |
There was a problem hiding this comment.
Eventually we need to figure out quota behavior for forwarded requests. I am wondering if it makes sense to apply the quota on each broker separately before sending the CreateTopic to the controller or if we rely on the controller exclusively.
cc @dajac
There was a problem hiding this comment.
@hachikuji Sorry for my late reply. I've missed the notification. We have to enforce the quota on the controller exclusively. It is a global quota and we can't really distribute it fairly in the cluster. In this case, it would be great if we could propagate the principal and clientId to the controller to enforce the quota. However, I wonder how we could propagate the error and the delay to the client if the topic creation is throttled. Perhaps, we could reply with UNKNOW_TOPIC_OR_PARTITION until the topic can be created.
|
@abbccdda I had one additional thought here. When we receive a normal |
2fc0011 to
c68c7d0
Compare
|
@abbccdda Thanks for the updates. I opened a PR with a few fixes to speed this along since we're trying to get it checked in today: abbccdda#6. The tests that were previously failing now seem to be passing (at least when testing locally). |
7e745da to
1559647
Compare
always define auto topic creation manager
1559647 to
abf30d8
Compare
| } | ||
| } | ||
|
|
||
| clearInflightRequests(creatableTopics) |
There was a problem hiding this comment.
Can you use a try/finally here?
|
I triggered a new build. Is there a reason why you aborted the previous one @abbccdda? |
|
only one no-related test failure in connect. Verified on local, will merge the PR. |
An accidental change of logging level for streams from #9579, correcting it. Reviewers: Bill Bejeck <bbejeck@gmail.com>
An accidental change of logging level for streams from apache#9579, correcting it. Reviewers: Bill Bejeck <bbejeck@gmail.com>
An accidental change of logging level for streams from #9579, correcting it. Reviewers: Bill Bejeck <bbejeck@gmail.com>
This PR forward the entire FindCoordinator request to the active controller when the internal topic being queried is not ready to be served yet.
Committer Checklist (excluded from commit message)