KAFKA-14247: Consumer background thread base implementation#12672
KAFKA-14247: Consumer background thread base implementation#12672vvcephei merged 45 commits intoapache:trunkfrom
Conversation
kirktrue
left a comment
There was a problem hiding this comment.
Are ConsumerRequestEvent/ConsumerResponseEvent and ApplicationEvent/BackgroundEvent the same thing?
12ce01d to
7c7c072
Compare
0ae5431 to
af2727e
Compare
Documentation on the beahvior
Stubbed event handler
This reverts commit 625b16c.
This reverts commit 816b285.
clean up documentation implemented poll and commitSync clean up clean up
background thread
initialize backgorund thread. refactor stuff a bit Added unit tests more unit tests renaming and documentation clean up clean up clean up
network IO thread
88e3d39 to
422eed4
Compare
28a5814 to
6cfa912
Compare
guozhangwang
left a comment
There was a problem hiding this comment.
Thanks @philipnee for the PR. I made a first pass and left some comments.
| log.debug("{} started", getClass()); | ||
| while (running) { | ||
| runOnce(); | ||
| time.sleep(retryBackoffMs); |
There was a problem hiding this comment.
Instead of setting a sleep here, I think it's better to have a waitOnEmptyInputQueue function with a condition. Where the caller thread would notify when enqueuing a new item.
There was a problem hiding this comment.
@guozhangwang - I thought we wanted to maintain an active loop because we want to keep sending fetches and rebalance requests despite an empty ApplicationQueue.
There was a problem hiding this comment.
Yeah I think so -- please see my other comment below: #12672 (comment). But I still feel that instead of having a sleep for each iteration, we should just consider poll on the network client with a timeout, i.e. supposing our loop would look like:
- try to get some event from the queue, if yes, then handle it and potentially send the corresponding request;
- if the queue is empty, then try to send a fetch request.
- poll the network client in order to receive any responses ready-to-read on the socket.
Then our logic could be: if there's no new actions taken at step 1/2), i.e. we do not have any new items from the queue, and we do not yet need to send any new fetch/rebalance-related requests, then at step 3) we poll for a bit longer time until being notified by the caller that there's new items in the queue; otherwise, at step 3) we just poll without timeout and then immediately move on to the next iteration.
| applicationEventQueue, | ||
| backgroundEventQueue); | ||
| assertTrue(eventHandler.add(new NoopApplicationEvent("hello-world"))); | ||
| while (eventHandler.isEmpty()) { } |
There was a problem hiding this comment.
This is a general comment: I think it worth adding a poll(timeout) in the EventHandler interface since 1) for testing, we do not need the while loop any more, and 2) in production code, for the caller thread scenarios I think there are cases where a poll(timeout) would be useful as well.
|
|
||
| public enum EventType { | ||
| COMMIT, | ||
| NOOP, |
There was a problem hiding this comment.
Ditto, I think we should keep the testing-related element out of the production code eventually. If we are going to remove it later then that's fine.
There was a problem hiding this comment.
Thanks, I think the NOOP event will be moved to the test package in the future PR; I left it here to elucidate the purpose of a few methods in the background thread.
There was a problem hiding this comment.
Thanks for clarifying this, makes sense.
| /** | ||
| * Noop event. Intentionally left it here for demonstration purpose. | ||
| */ | ||
| public class NoopBackgroundEvent extends BackgroundEvent { |
There was a problem hiding this comment.
Ditto here, I think we should move it to test package, not main package.
d17fe3b to
8f2c870
Compare
|
Hey @guozhangwang , much thanks for the detail reviews, I tried to address some of the comments, please review them. In particular:
I left the NOOP event there, but I agree we should move it once we've got an actual event implemented, which should happen soon. |
|
cc John - @vvcephei |
kirktrue
left a comment
There was a problem hiding this comment.
I did another review of the code. I haven't gotten as far as the tests yet.
| log.error("The background thread failed due to unexpected error", | ||
| t); | ||
| if (t instanceof RuntimeException) | ||
| this.exception.set(Optional.of((RuntimeException) t)); |
There was a problem hiding this comment.
Do we want to use KafkaException instead of RuntimeException?
There was a problem hiding this comment.
We could, there are other places in the code that uses RTE so trying to be consistent with the existing code.
| } | ||
|
|
||
| public void close() { | ||
| this.wakeup(); |
There was a problem hiding this comment.
Does the order of these two lines (wakeup and running = false) want to be swapped?
close is called by the polling thread, right? If wakeup is called, it will throw an exception, but then the background thread might potentially loop back around to runOnce before the call to set running to false by the polling thread.
| Sensor fetcherThrottleTimeSensor) { | ||
| this.applicationEventQueue = applicationEventQueue; | ||
| this.backgroundEventQueue = backgroundEventQueue; | ||
| ConsumerMetadata metadata = bootstrapMetadata(logContext, |
There was a problem hiding this comment.
The call to bootstrapMetadata is blocking, right? Is that what we want? If so, can you add some comments (in the code) as to that behavior?
There was a problem hiding this comment.
It should be fine for this call to block, I think because all it does is create a Node object with id = -1 and an IP address. The actual metadata refresh happens during the background thread loop, not during this function call, since there are no network response deps to this call, so it shouldn't be blocking for too long.
|
Hey, @philipnee, @guozhangwang isn't available for reviews right now, so I'll pick it up. I reviewed it, and it all looks good to me. I had a large number of code convention corrections, so I just went ahead and applied the changes, rather than peppering you with nitpicks. I hope that's ok. Generally:
I know that's all arbitrary, and that there are merits to doing things other ways; I'm just trying to keep these files in line with the rest of AK. Thanks! Once the tests pass, I'll go ahead and merge. |
|
Unrelated test failure: |
|
Thanks @vvcephei Will be better at the basic styling requirements in the future... |
…2672) Adds skeleton of the background thread. 1-pager: https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor Continuation of apache#12663 Reviewers: Guozhang Wang <guozhang@apache.org>, Kirk True <kirk@mustardgrain.com>, John Roesler <vvcephei@apache.org>
…2672) Adds skeleton of the background thread. 1-pager: https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor Continuation of apache#12663 Reviewers: Guozhang Wang <guozhang@apache.org>, Kirk True <kirk@mustardgrain.com>, John Roesler <vvcephei@apache.org>
…2672) (#76) Adds skeleton of the background thread. 1-pager: https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor Continuation of apache#12663 Reviewers: Guozhang Wang <guozhang@apache.org>, Kirk True <kirk@mustardgrain.com>, John Roesler <vvcephei@apache.org> Co-authored-by: Philip Nee <pnee@confluent.io>
1-pager: https://cwiki.apache.org/confluence/display/KAFKA/Proposal%3A+Consumer+Threading+Model+Refactor
EventHandler runs a task to ingest the ApplicationEvents. The task runs on a background thread, and consumes events from the ApplicationEventQueue, and produces events to the BackgroundEventQueue.
The PR consist of the basic skeleton of the background thread with
Continuation of #12663
Committer Checklist (excluded from commit message)