KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread#14629
KAFKA-15653: Pass requestLocal as argument to callback so we use the correct one for the thread#14629jolshan merged 11 commits intoapache:trunkfrom
Conversation
| // If the callback is actually executed on a request thread, we can directly execute | ||
| // it without re-scheduling it. | ||
| fun(T) | ||
| fun(requestLocal, T) |
There was a problem hiding this comment.
This is actually still potentially wrong. I will fix it.
There was a problem hiding this comment.
This change was made for when we want to execute the callback early on the same thread as the one receiving the request. (Ie, before we do verification) In this case, the current request is actually the same. I think we should adjust the conditional to reflect that.
| if (threadCurrentRequest.get() != null) { | ||
| // If the callback is actually executed on a request thread, we can directly execute | ||
| if (threadCurrentRequest.get() == currentRequest) { | ||
| // If the callback is actually executed on the same request thread, we can directly execute |
There was a problem hiding this comment.
The case when the callback is executed on the same request thread only happens when the txn coordinator is not available. Since it's a rare case, I am wondering if we really need to optimize for that. It seems that it's simpler to always enqueue the callback. Then, we could probably get rid of the ThreadLocal stuff.
There was a problem hiding this comment.
Without this, there were a ton of failing tests when David implemented his change. We never really figured out the issue. We also have to consider the case where we bypass the thread check (ie tests) where we don't have the callback queue set up.
There was a problem hiding this comment.
@jolshan : I wasn't suggesting simply skip the test threadCurrentRequest.get() == currentRequest. I was wondering if we could always add the callback to the callbackQueue through the following call, independent of whether the caller is in the same request thread of not.
requestChannel.sendCallbackRequest
So, wrap will just be the following.
def wrap[T](fun: T => Unit): T => Unit = {
requestChannel.sendCallbackRequest(RequestChannel.CallbackRequest(() => fun(T), currentRequest))
}
There was a problem hiding this comment.
Right -- we had that before and it caused issues @dajac has more context.
I am concerned that executing the callback and before we return from the request thread causes issues.
In addition, for tests, we would need to set up the request channel etc.
There was a problem hiding this comment.
Thanks, Justine. It seems that we can't easily get rid of the ThreadLocal stuff. So, we can leave this as it is.
I am concerned that executing the callback and before we return from the request thread causes issues.
With the current PR, it seems that it's possible for the callback to be called before the request handler thread finishes processing the request (that generates the callback), right? Is that causing any problem?
There was a problem hiding this comment.
Sorry maybe I'm not explaining myself well.
We saw problems before when the callback was scheduled before the request returned. We fixed it by adding this check.
If we want to remove this check, we will need to do further investigation for that case.
There was a problem hiding this comment.
My understanding is that concurrency is not a problem in production logic (i.e. the fact that callback might actually execute at any time before or after or concurrently with the passing thread), but unit tests rely on deterministic order of execution and so adding concurrency here makes them "flaky".
I agree in principle that if we could fold this logic into just one case (i.e. always schedule on a new request thread), it would make it simpler; when this change was proposed I looked into unit test fix and seemed more involved than adding this logic.
There was a problem hiding this comment.
Thanks for the explanation, Justine and Artem. This sounds good then.
There was a problem hiding this comment.
Can we file a JIRA for investigating the alternative? I agree with @junrao that it would be much simpler to reason about if we always schedule it.
There was a problem hiding this comment.
| } | ||
| } | ||
|
|
||
| private def appendEntries(allEntries: Map[TopicPartition, MemoryRecords], |
There was a problem hiding this comment.
Could we add a comment that this method will potentially be called in a different request thread and it should avoid accessing any thread unsafe data structures?
| producerEpoch = batchInfo.producerEpoch, | ||
| topicPartitions = notYetVerifiedEntriesPerPartition.keySet.toSeq, | ||
| callback = KafkaRequestHandler.wrap(appendEntries(entriesPerPartition)(_)) | ||
| callback = KafkaRequestHandler.wrap(appendEntries(entriesPerPartition, internalTopicsAllowed, origin, requiredAcks, verificationGuards.toMap, |
There was a problem hiding this comment.
This approach works. It's bit complicated since there are multiple callbacks (from KafkaApis and from ReplicaManager) wrapped in different levels. An alternative approach is to simply re-enqueue the original request and some kind of context from the result of pre-validation. When the pre-validation is done, we could just run the logic on the original request again through KafkaAPIs but with the new context, which will allow us to bypass the validation. This avoids the need to pass appendEntries as a callback. Will that be simpler?
There was a problem hiding this comment.
I'm not sure exactly how this would work. I suppose we could have a flag "verified" in some sort of context, but where is that context stored?
Right now the request considers that some partitions are verified and some are not. (I'm not sure the producer actually sends more than one partition), but the code is currently generalizable to handle multiple partitions. If this is the case the context needs to store which partitions are verified and which are not.
I agree that this is complicated, but I would need some time to think about how this would work. I'm also not sure the implications of going through the .handle of the request path -- it would be as if we received another request in the metrics etc.
There was a problem hiding this comment.
I think the way it's implemented right know is that we're just continuing processing from the current point after the network operation has completed, so as far as the "application" logic is concerned, the machinery to resume the execution on the request pool is hidden from the "application" logic. This allows to do an arbitrary number of non-blocking waits with minimal disruption of the application logic flow (the change is localized in the point of non-blocking wait).
In this case we could inject a new produce request (if I understand your question correctly) with some context saying that it's "verified", but I think it'd be more intrusive as we'd need to implement some produce-specific "verified" context (that would have to include verification guard and whatnot) and pass it down through all functions. We'd also need to consider the offset commit path, which is a separate path that eventually comes to this function. Also, if we consider a more general functionality (we don't have any, but hopefully, if we make Kafka more async there will be more cases), generally we'd want to preserve the progress that has been made until the network call, so if a request would need to work work XYZ and we have non-blocking network calls between stages, it would be good that the overall work would be X [RPC] Y [RPC] Z, rather than X [RPC] XY [RPC] XYZ.
There was a problem hiding this comment.
Thanks, Justine and Artem. Agreed that the alternative approach has its own drawback. So, we could punt on this.
divijvaidya
left a comment
There was a problem hiding this comment.
Thanks for this PR Justine!
I understand that we have provided a fix for requestLocal here but I am concerned that the thread safety for other shared state is not protected by guardrails. We are relying on future contributors & reviewers to know that these states (such as errorsPerPartition Map) ought to be thread safe. It may lead to bugs.
I have left a comment with some other options that we can consider.
| internalTopicsAllowed: Boolean, | ||
| origin: AppendOrigin, | ||
| requiredAcks: Short, | ||
| verificationGuards: Map[TopicPartition, VerificationGuard], |
There was a problem hiding this comment.
-
Are we sure that the Maps being passed here are not concurrently modified by two threads? Asking because they are not thread safe (they aren't concurrent hash maps). I haven't looked in details about the params passed here so please feel free to say that you have already verified this.
-
How can we prevent future bugs where someone mutates one of these parameters in perhaps other parts of the code such as Kafka APIs where appendEntries is called from without knowing that all these data structures are supposed to be thread safe?
One option to prevent future bugs could be to pass a deep copy of these objects to the callback (instead of a copy of the reference). Another thing we can do is to restrict the number of parameters required by the callback.
Having said that, have we already considered not having this as a callback and instead appending this synchronously? I understand that it has drawbacks since it is in critical produce path but maybe that would be acceptable given the complexity of ensuring thread safety here?
There was a problem hiding this comment.
These maps are not mutable right?
There was a problem hiding this comment.
These maps are not mutable right?
Don't know. I haven't really checked. I wanted to ask you instead if you have verified that :)
There was a problem hiding this comment.
Sorry for my phrasing. I was saying they are mutable. 😅
There was a problem hiding this comment.
Sorry for my phrasing. I was saying they are mutable. 😅
Did you mean to say immutable here? If they are mutable, then we need to ensure that they are not concurrently mutated in different threads.
There was a problem hiding this comment.
I agree with @junrao here.
The expectation is that queuing up of the log record can be done synchronously.
However, it does not mean that it should be written to the log. The minimum is to guarantee the ordering of the records based on the state changes.
There was a problem hiding this comment.
I think we also discussed that the ordering of the log is not the important part but rather committing stale data.
Is it true that the ordering of the log records is not important? It seems that you want the latest log to reflect the latest state, right?
I was saying that appendEntries queues up a log record and we could use it there.
Yes, the only thing is whether appendEntries can queue up the log record in the right order. I am not sure simply acquiring the group lock when asynchronously calling appendEntries achieves this.
There was a problem hiding this comment.
My understanding from David is that the main thing we must guarantee is that we can't update the in memory-state again until the corresponding records are written to the log. From that, I thought that as long as those steps are synchronous (and I suppose we need a lock for now) that is ok.
The ordering of the requests themselves is not guaranteed already.
There was a problem hiding this comment.
The ordering of the requests themselves is not guaranteed already.
It's true that there is not strong ordering guarantee among different clients. However, the ordering how the group coordinator serializes them in the log seems important.
Consider an example. A group coordinator receives a joinGroup request from client 1 first and generates a group record rec1=GroupA(members: client1). It then receives joinGroup request from client 2 and generates a group record rec2=GroupA(members: client1, client2). If rec1 comes after rec2 in the log and the coordinator fails over, the new coordinator will restore the group state to include only client1 as its members and lose the valid member client2.
Consider another example that involves consumer offsets. A group coordinator starts with a state in which client 1 owns partition 1. It receives an offset commit request from client 1 on partition 1. This is a valid request since client 1 owns partition 1. So, it generates rec3= GroupA(offsetForPartition1=100). A rebalance happens. The coordinator re-assigns partition 1 to client 2 and generates rec4= GroupA(partition1 owned by client 2). If rec3 comes after rec4 in the log and the coordinator fails over, when the new coordinator rebuilds its state, rec3 will look like invalid since it happens when client1 no longer owns partition 1.
There was a problem hiding this comment.
The serialization can be done asynchronously, without holding locks and relying on synchronous execution. It looks like it's should be that hard to support that in the new group coordinator #14705.
| * @return Wrapped callback that would execute `fun` on a request thread | ||
| */ | ||
| def wrap[T](fun: T => Unit): T => Unit = { | ||
| def wrap[T](fun: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { |
There was a problem hiding this comment.
Since this is a generic util, it would be useful for future users of this method to understand what kind of fun could be passed in. I was thinking that perhaps we could make fun of a new interface (sth like ThreadSafeCallback). We can document the expectation of ThreadSafeCallback (e.g., only executed once, but could be in an arbitrary request handler thread; if an implementation needs to access a share data structure, the access needs to be thread safe, etc). Then, anyone who uses this method in the future needs to be aware of this interface. This may address a bit of Divij's concern. Is that worth doing?
There was a problem hiding this comment.
That seems reasonable to me.
There was a problem hiding this comment.
I think ideally in a future change we'd get rid of passing RequestLocal as an argument and maybe make it a static thread local that could be accessed from the point of use rather than being passed through the whole stack.
There are couple problems that contributed to this issue:
- the functions here are already written for asynchronous completion (because we wait for replication) and in such cases generally the convention is that the arguments of a function are not bound to the executing thread (i.e. rooted on the call stack or globally)
- the point of use was outside of the core change so folks didn't look into the specifics of the RequestLocal semantics (i.e. even if appendEntries was an explicit function as it now is, I'm not sure if the problem had been noticed).
| T => { | ||
| if (threadCurrentRequest.get() != null) { | ||
| // If the callback is actually executed on a request thread, we can directly execute | ||
| if (threadCurrentRequest.get() == currentRequest) { |
There was a problem hiding this comment.
Ismael mentioned in #9229 (comment) that thread locals are most useful when one doesn't control the code. So, I am wondering if we could get rid of the two ThreadLocal: threadRequestChannel and threadCurrentRequest in KafkaRequestHandler introduced in KAFKA-14561. The reason for the former is to obtain requestChannel. We could simply pass in requestChannel to ReplicaManager.appendRecords. The reason for the latter is (1) to obtain currentRequest and (2) to make sure that the callback can be short-circuited if it's called on the same request handler thread. For (1), we could also pass in currentRequest to ReplicaManager.appendRecords. For (2), we could change the code such that KafkaRequestHandler.wrap is called only when the callback truly needs to be run from a different thread. Otherwise, we can just call the callback directly there.
There was a problem hiding this comment.
Hmmm. I'm not sure it makes sense to try to pass the request channel and current in to every method we want to do a callback for.
As for 2 and short circuiting... Are you suggesting I move the wrap call into AddPartitionsToTxnManager? If so, I need to pass the appendEntries method in there as well. That may be trickier with the typing.
There was a problem hiding this comment.
Hmmm. I'm not sure it makes sense to try to pass the request channel and current in to every method we want to do a callback for.
Currently, the callback is only needed for produce request in ReplicaManager. If you look at KafkaApis.handleProduceRequest, we already pass in both request channel and current request to ReplicaManager.appendRecords through sendResponseCallback.
As for 2 and short circuiting... Are you suggesting I move the wrap call into AddPartitionsToTxnManager? If so, I need to pass the appendEntries method in there as well. That may be trickier with the typing.
Yes. Currently, we already pass in appendEntries as a callback to AddPartitionsToTxnManager, right?
There was a problem hiding this comment.
We pass the wrapped method which is a simpler type. type AppendCallback = Map[TopicPartition, Errors] => Unit
There was a problem hiding this comment.
As for just produce. Yes, this is the case now, but I think Artem was trying to create the wrap method as a "general" callback mechanism.
There was a problem hiding this comment.
Thanks, Justine and Artem. Yes, passing around the context has it's own issues. The main thing with thread local is to make sure that we don't introduce any GC issues. Currently, it seems that we never remove threadRequestChannel. Since we allow dynamically changing the number of request handler threads, it's probably better to remove threadRequestChannel when the thread completes?
There was a problem hiding this comment.
Hmm. We assign the request channel when the thread is created. I assume that request channel will remain with the thread during its lifetime. Is that incorrect?
There was a problem hiding this comment.
True in the common case. I was wondering what happens if we dynamically reduce the number of request handler threads.
There was a problem hiding this comment.
If we reduce then I assume the thread just stops?
There was a problem hiding this comment.
I understand the issue. I have a commit below that clears the request local.
|
Following up with build failure here: #14545 (comment) |
| * The RequestLocal passed in must belong to the request handler thread that is executing the callback. | ||
| */ | ||
| def wrap[T](fun: T => Unit): T => Unit = { | ||
| class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit) |
There was a problem hiding this comment.
nit: could probably be case class?
There was a problem hiding this comment.
Jun wanted the explicit class so folks would see the comment about the request local etc.
We could do a case class otherwise
There was a problem hiding this comment.
Looking at this again. I am not sure AsynchronousCompletionCallback adds much value. It makes the existing subtle code even harder to understand. So, we probably could just go back to the original way, but add the comments here to the javadoc for asyncCompletionCallback in wrap.
There was a problem hiding this comment.
I've updated the PR to reflect this.
| internalTopicsAllowed: Boolean, | ||
| origin: AppendOrigin, | ||
| requiredAcks: Short, | ||
| verificationGuards: Map[TopicPartition, VerificationGuard], |
There was a problem hiding this comment.
Hmm, without the lock, an offset commit may be reordered after a group state change. To make it safe, we would need to reacquire the group lock after validation completes so that we could check again whether the request should be accepted.
|
It seems the offset validation is a tricky question requiring more thought, so I will file a jira for it. Given that the offset verification is not in 3.6, I think it makes sense to merge the request local fix separately. Are we all in agreement for that part of the change? cc: @dajac @junrao @hachikuji |
junrao
left a comment
There was a problem hiding this comment.
@jolshan : Thanks for the explanation. Given that the offset verification is not in 3.6, I agree that we can fix the request local issue first. Left a few minor comments.
Just to be clear. The txn verification for consumer offset is in trunk and still needs to be fixed, right?
| * The RequestLocal passed in must belong to the request handler thread that is executing the callback. | ||
| */ | ||
| def wrap[T](fun: T => Unit): T => Unit = { | ||
| class AsynchronousCompletionCallback[T](val fun: (RequestLocal, T) => Unit) |
There was a problem hiding this comment.
Looking at this again. I am not sure AsynchronousCompletionCallback adds much value. It makes the existing subtle code even harder to understand. So, we probably could just go back to the original way, but add the comments here to the javadoc for asyncCompletionCallback in wrap.
Yes. We will need to fix for 3.7. Here is the JIRA: https://issues.apache.org/jira/browse/KAFKA-15784 |
| * @return Wrapped callback that schedules `asyncCompletionCallback` on an arbitrary request thread | ||
| */ | ||
| def wrap[T](fun: T => Unit): T => Unit = { | ||
| def executeOrRegisterAsyncCallback[T](asyncCompletionCallback: (RequestLocal, T) => Unit, requestLocal: RequestLocal): T => Unit = { |
There was a problem hiding this comment.
Sorry, just realized that this method doesn't execute or register the callback directly. It creates a wrapped callback for that instead. So, wrap is still the more appropriate name. We could probably change the description to sth like "Creates a wrapped callback to be executed synchronously on the calling request thread or asynchronously on an arbitrary request thread."
There was a problem hiding this comment.
Yeah. I was thinking about that when I was renaming but I forgot that in the early return case, we still wrap. I'll change the description again to match this as well.
|
@jolshan : Are the 39 test failures related to the PR? |
|
Given that I made minimal changes and the previous tests only had 10-15 or so regular flakes, I don't think so. |
|
This one only had 4 failures from usual suspects. I will run one more time to make sure the 35 weren't flaky and come back. EDIT: I realized I am silly and didn't wait for the build to complete. Here is the completed run for 18 with 12 failures I will look at. |
|
I took a look at the most recent 19 failures. The majority of them are failing on trunk in the last few (5 or so) runs.
Looks like an issue with InitProducerId that doesn't seem related to this change
Issue with Quorum controller that doesn't seem related to my change.
I feel like I have seen a similar issue in the past, but will double check my branch to make sure nothing is wrong. |
|
@jolshan : Thanks for the analysis. If the test failures are unrelated, feel free to merge the PR. It would be useful to file jiras to track new transient test failures. |
|
Hey @jolshan - You might already know this but sharing in case you don't. A tool I use to quickly find whether a test is flaky is https://ge.apache.org/scans/tests?search.rootProjectNames=kafka&search.tags=trunk&search.timeZoneId=Europe%2FBerlin&tests.container=kafka.zk.ZkMigrationIntegrationTest You can just replace the test class & name in the url and you can view the last 7 days results for that specific test including the flakiness rate & cause of failures. |
|
Thanks @divijvaidya. I was taking a look at Gradle enterprise as well. But the link is helpful |
…correct one for the thread (#14629) With the new callback mechanism we were accidentally passing context with the wrong request local. Now include a RequestLocal as an explicit argument to the callback. Also make the arguments passed through the callback clearer by separating the method out. Added a test to ensure we use the request handler's request local and not the one passed in when the callback is executed via the request handler. Reviewers: Ismael Juma <ismael@juma.me.uk>, Divij Vaidya <diviv@amazon.com>, David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jun Rao <junrao@gmail.com>, Conflicts: core/src/main/scala/kafka/server/KafkaRequestHandler.scala core/src/main/scala/kafka/server/ReplicaManager.scala core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala Please enter the commit message for your changes. Lines starting
…correct one for the thread (apache#14629) With the new callback mechanism we were accidentally passing context with the wrong request local. Now include a RequestLocal as an explicit argument to the callback. Also make the arguments passed through the callback clearer by separating the method out. Added a test to ensure we use the request handler's request local and not the one passed in when the callback is executed via the request handler. Reviewers: Ismael Juma <ismael@juma.me.uk>, Divij Vaidya <diviv@amazon.com>, David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jun Rao <junrao@gmail.com>,
…correct one for the thread (apache#14629) With the new callback mechanism we were accidentally passing context with the wrong request local. Now include a RequestLocal as an explicit argument to the callback. Also make the arguments passed through the callback clearer by separating the method out. Added a test to ensure we use the request handler's request local and not the one passed in when the callback is executed via the request handler. Reviewers: Ismael Juma <ismael@juma.me.uk>, Divij Vaidya <diviv@amazon.com>, David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io>, Artem Livshits <alivshits@confluent.io>, Jun Rao <junrao@gmail.com>,
With the new callback mechanism we were accidentally passing context with the wrong request local. Now include a RequestLocal as an explicit argument to the callback.
Also make the arguments passed through the callback clearer by separating the method out.
Added a test to ensure we use the request handler's request local and not the one passed in when the callback is executed via the request handler.
Committer Checklist (excluded from commit message)