KAFKA-10199: Implement adding active tasks to the state updater#12128
Conversation
This PR adds the default implementation of the state updater. The implementation only implements adding active tasks to the state updater.
guozhangwang
left a comment
There was a problem hiding this comment.
Thanks @cadonna ! I made a pass on that, and I will merge the PR as-is once we get green builds.
Please feel free to reply those comments later on and if some makes sense, we can have incorporated in the next PR.
| */ | ||
| Set<TopicPartition> completedChangelogs(); | ||
|
|
||
| boolean allChangelogsCompleted(); |
There was a problem hiding this comment.
nit: also add a javadoc explanation, especially emphasize that standby task's changelogs would never be completed, and hence this function would only return true if there's no standby tasks.
|
|
||
| public class DefaultStateUpdater implements StateUpdater { | ||
|
|
||
| private final static String BUG_ERROR_MESSAGE = "This indicates a bug. " + |
There was a problem hiding this comment.
Maybe we can move this static field to the StateUpdater interface?
There was a problem hiding this comment.
I would not do that since it is actually an implementation detail. I would rather move it to a global location where it is accessible from multiple classes since it is a quite general message. I am not aware of a class for global constants in Streams.
|
|
||
| private final ChangelogReader changelogReader; | ||
| private final AtomicBoolean isRunning = new AtomicBoolean(true); | ||
| private final java.util.function.Consumer<Set<TopicPartition>> offsetResetter; |
There was a problem hiding this comment.
nit: why not also import java.util.function.Consumer?
There was a problem hiding this comment.
I saw that we did it in other places like #10000, but I cannot remember why..
There was a problem hiding this comment.
Here it is not needed and I removed it. In other places, I think there might be a name clash between java.util.function.Consumer and org.apache.kafka.clients.consumer.Consumer.
|
|
||
| @Override | ||
| public void add(final Task task) { | ||
| if (stateUpdaterThread == null) { |
There was a problem hiding this comment.
I think in the end state, we may want to have more than the state-updater as the prefix since we would have multiple such threads. But this can be added later.
| } | ||
| } | ||
|
|
||
| private void performActionsOnTasks() throws InterruptedException { |
There was a problem hiding this comment.
I'm assuming throws InterruptedException is for other actions for the future PRs, while ADD does not throw any yet.
There was a problem hiding this comment.
Same assumption elsewhere.
There was a problem hiding this comment.
Those were indeed not needed.
| } | ||
| } | ||
|
|
||
| private boolean isStateless(final Task task) { |
There was a problem hiding this comment.
This is a meta thought: I think in the end state, we should let the stream thread to make the call and skip sending them to the tasksAndActions queue, than letting such stateless task ping-pong between the two sides, wdyt?
There was a problem hiding this comment.
Yeah, that is an option. Let's see how we feel when we integrate the state updater with the rest of the code.
| return task.changelogPartitions().isEmpty() && task.isActive(); | ||
| } | ||
|
|
||
| private void endRestorationIfChangelogsCompletelyRead(final Task task, |
There was a problem hiding this comment.
nit: rename to CompleteTaskRestorationIfPossible?
There was a problem hiding this comment.
I renamed it to maybeCompleteRestoration(). It is a bit shorter 🙂.
| restoredActiveTasksLock.lock(); | ||
| try { | ||
| while (restoredActiveTasks.isEmpty() && now <= deadline) { | ||
| final boolean elapsed = restoredActiveTasksCondition.await(deadline - now, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Seems elapsed not used.
There was a problem hiding this comment.
I know, but IIRC I got a spotbug error if I did not use the return value.
There was a problem hiding this comment.
Hmm. I think we have lots of places here return values are not used, don't know why spotbug is sensitive about this one (maybe we just disabled this rule in other classes :P ) thanks for letting me know.
| final boolean elapsed = restoredActiveTasksCondition.await(deadline - now, TimeUnit.MILLISECONDS); | ||
| now = time.milliseconds(); | ||
| } | ||
| while (!restoredActiveTasks.isEmpty()) { |
There was a problem hiding this comment.
Can we just result.addAll and then restoredActiveTasks.clear() with just a if condition?
There was a problem hiding this comment.
Good point! We even do not need the if condition.
| stateUpdaterThread.isRunning.set(false); | ||
| stateUpdaterThread.interrupt(); | ||
| try { | ||
| stateUpdaterThread.join(timeout.toMillis()); |
There was a problem hiding this comment.
Generally we use a latch to check if the thread terminates successfully in time since the join function returns void and we cannot check if it did die within the timeout. If it did not terminate within the timeout, we could consider throw an exception for it.
This PR adds the default implementation of the state updater.
The implementation only implements adding active tasks to the
state updater.
Committer Checklist (excluded from commit message)