KAFKA-9051: Prematurely complete source offset read requests for stopped tasks#7532
KAFKA-9051: Prematurely complete source offset read requests for stopped tasks#7532rhauch merged 7 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
I think we should try to retain using the interface everywhere. extend the interface if you have to.
There was a problem hiding this comment.
I think this would return stale values for a key. if this future was prematurely closed, then we might still return a stale values for a key. we should be careful that this doesn't break any of the existing contracts of OffsetReader.
There was a problem hiding this comment.
Yes, this is noted in the description. However, if we want to stick strictly to the API for the OffsetStorageReader, once option we have is to simply return an empty map to all callers blocked on the future.
There was a problem hiding this comment.
can we call this method stop() or forceStop()?
There was a problem hiding this comment.
How about forceComplete? stop is a little ambiguous w/r/t whether threads blocked on get encounter a CancellationException or continue normally.
There was a problem hiding this comment.
we should cache the fact that this future has been force stopped, and if so, any subsequent calls to get() should immediately return with either en empty map. right now, it looks like it will re-try to catch up with the remote log.
There was a problem hiding this comment.
I agree with the gist of this comment, but I think the right place for that caching behavior is in the offset reader class, not in the backing store (since each task is given its own offset reader, but the backing store is shared among all of them).
|
Thanks for the review, @liukrimhrim and @wicknicks! I've incorporated most of your comments and responded to the rest; ready for another round when you have time |
…turning null/empty map
|
@wicknicks I've altered the functionality here to be a little more permissive for tasks in a healthy Kafka cluster and (hopefully) a little less likely to provide bad data to zombie tasks that may negatively impact running (or to-be-run) tasks. Would you mind doing another round when you have time? |
ncliang
left a comment
There was a problem hiding this comment.
Looks good in general. Thanks for implementing the changes we discussed yesterday! Just one comment about testing the change.
| } | ||
| this.cancelled = true; | ||
| finishedLatch.countDown(); | ||
| return true; |
There was a problem hiding this comment.
Do we have tests to test the cancellation logic?
ncliang
left a comment
There was a problem hiding this comment.
Looks great! Just a few more comments.
| } | ||
|
|
||
| protected static void runSeparateTestThread(Runnable task) { | ||
| Thread t = new Thread(task); |
There was a problem hiding this comment.
You can use ExecutorService API and replace calls to this method with just executorService.submit()
| }); | ||
| assertFalse(testCallback.isDone()); | ||
| testCallback.get(); | ||
| if (testThreadException.get() != null) { |
There was a problem hiding this comment.
Shouldn't the get() be canceled and throw CancellationException above? When will the code below be executed?
There was a problem hiding this comment.
It shouldn't be executed, but just in case something goes wrong, it seems more appropriate to throw that exception than just to fail the test because the CancellationException wasn't thrown from get.
| @Override | ||
| public boolean cancel(boolean b) { | ||
| return false; | ||
| if (!b) { |
There was a problem hiding this comment.
This implementation doesn't strictly adhere to the Future#cancel() contract - https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html#cancel-boolean-
After this method returns, subsequent calls to isDone() will always return true. Before, we were ignoring the mayInterruptIfRunning flag and always cancelling when cancel() is called. I think I'd prefer that behavior than not actually cancelling when cancel(false) is called.
There was a problem hiding this comment.
Before, we were ignoring the mayInterruptIfRunning flag and always cancelling when cancel() is called
I don't believe this is correct. Before, cancel was just a no-op and no cancellation occurred, regardless of the value of mayInterruptIfRunning. That also violated the contract of the Future interface; I don't see this as a huge issue since we know that no other parts of the code base rely on the cancel method anyways.
Still, after reading the javadocs more carefully, I think it should be possible to implement cancel correctly if mayInterruptIfRunning is false.
There was a problem hiding this comment.
I was referring to a previous snapshot where you didn't check the flag.
| testCallback.onCompletion(expectedError, null); | ||
| assertEquals(0, testCallback.numberOfConversions()); | ||
| try { | ||
| testCallback.get(); |
| testCallback.onCompletion(null, "420"); | ||
| assertEquals(0, testCallback.numberOfConversions()); | ||
| try { | ||
| testCallback.get(); |
There was a problem hiding this comment.
ditto, missing fail() call
| }); | ||
| assertFalse(testCallback.isDone()); | ||
| try { | ||
| testCallback.get(); |
|
@ncliang thanks, ready for the next round |
| } | ||
|
|
||
| public void close() { | ||
| synchronized (offsetReadFutures) { |
There was a problem hiding this comment.
We should first check to see if this is closed, and if so simply return.
There was a problem hiding this comment.
Ack, will address.
| synchronized (offsetReadFutures) { | ||
| closed.set(true); | ||
| for (Future<Map<ByteBuffer, ByteBuffer>> offsetReadFuture : offsetReadFutures) { | ||
| offsetReadFuture.cancel(true); |
There was a problem hiding this comment.
It is now possible with your changes for ConvertingFutureCallback to throw an exception during cancel(boolean). If that happens, then this reader instance will not be properly/completely closed, so that needs to be handled.
There was a problem hiding this comment.
Ack, will address.
…ped tasks (#7532) Prematurely complete source offset read requests for stopped tasks, and added unit tests. Author: Chris Egerton <chrise@confluent.io> Reviewers: Arjun Satish <arjun@confluent.io>, Nigel Liang <nigel@nigelliang.com>, Jinxin Liu <liukrimhim@gmail.com>, Randall Hauch <rhauch@gmail.com>
…ped tasks (#7532) Prematurely complete source offset read requests for stopped tasks, and added unit tests. Author: Chris Egerton <chrise@confluent.io> Reviewers: Arjun Satish <arjun@confluent.io>, Nigel Liang <nigel@nigelliang.com>, Jinxin Liu <liukrimhim@gmail.com>, Randall Hauch <rhauch@gmail.com>
…ped tasks (#7532) Prematurely complete source offset read requests for stopped tasks, and added unit tests. Author: Chris Egerton <chrise@confluent.io> Reviewers: Arjun Satish <arjun@confluent.io>, Nigel Liang <nigel@nigelliang.com>, Jinxin Liu <liukrimhim@gmail.com>, Randall Hauch <rhauch@gmail.com>
…ped tasks (#7532) Prematurely complete source offset read requests for stopped tasks, and added unit tests. Author: Chris Egerton <chrise@confluent.io> Reviewers: Arjun Satish <arjun@confluent.io>, Nigel Liang <nigel@nigelliang.com>, Jinxin Liu <liukrimhim@gmail.com>, Randall Hauch <rhauch@gmail.com>
…ped tasks (#7532) Prematurely complete source offset read requests for stopped tasks, and added unit tests. Author: Chris Egerton <chrise@confluent.io> Reviewers: Arjun Satish <arjun@confluent.io>, Nigel Liang <nigel@nigelliang.com>, Jinxin Liu <liukrimhim@gmail.com>, Randall Hauch <rhauch@gmail.com>
Jira
The changes here cause source offset readers to forcefully close when tasks fail to shut down within the graceful shutdown timeout period. When this happens, all pending and future offset read requests will throw an exception.
This is in line with the API for the OffsetStorageReader class, which states that "The only case when an exception will be thrown is if the entire request failed, e.g. because the underlying storage was unavailable.". If a task is blocked on reading offsets from Kafka to the point where it has failed to shut down within the graceful shutdown timeout period, it's safe to say that the offset read request has failed and as a result, throw an exception.
Initially, I considered just returning null values from the offset reader once closed; however, this may cause source tasks to mutate some external state as if there were no offset for the requested source partitions and may negatively impact other tasks that have since been started by this connector. An exception is safer, and with the distinction between throwing one when a task has exceeded its graceful shutdown period vs. just being scheduled to stop, should not damage the functionality of tasks running in a healthy environment.
Committer Checklist (excluded from commit message)