KAFKA-9441: Unify committing within TaskManager#8218
Conversation
There was a problem hiding this comment.
This is not really relevant for this PR, but we need to add it for KIP-447 eventually, thus I just include it in this PR.
There was a problem hiding this comment.
We moved this to TaskManager
There was a problem hiding this comment.
Please see my other comment above --- I think it is cleaner to just call foreach(active-task) task.recordCollector.commit inside the task-manager; and inside RecordCollectorImpl we check that eosEnabled is always true, otherwise illegal-state thrown.
In the next PR where we have the thread-producer, we could then only create a single recordCollector object that is shared among all active tasks and wraps the thread-producer, and then the caller taskManager code then can just get one active task and call its record-collector's commit function knowing that is sufficient to actually commit for all tasks since everyone is using the same record-collector.
WDYT?
There was a problem hiding this comment.
After syncing offline about this, I think I'm convinced now that moving this logic into TaskManager is better.
There was a problem hiding this comment.
On suspend() and prepareCommit() we don't commit yet, but return the offsets that need to be committed
There was a problem hiding this comment.
We don't commit and thus don't throw any longer
There was a problem hiding this comment.
Frankly, not sure if this is correct any longer. What do we want to record with this sensor exactly? Flushing can be expensive and we might want to record it as part of committing -- but I am not sure.
There was a problem hiding this comment.
I am not happy with this rewrite (but as I know that John did some changes in this class in another PR, I just did this hack her for now -- needs some cleanup after a rebase)
There was a problem hiding this comment.
We could also do a second loop over all tasks, after calling commit(..) below -- not sure if this is ok as-is?
There was a problem hiding this comment.
I would prefer a second loop to guarantee a consistent reflection on the task committed state.
There was a problem hiding this comment.
Moved both tests to TaskManagerTest
There was a problem hiding this comment.
move all 4 tests to TaskManagerTest
There was a problem hiding this comment.
Checkmark for proving the 6 tests are all migrated.
|
Could we have a summary for this PR? |
|
The summary is not longer than the PR title. We make |
|
The title sounds pretty vague to me. The description could at least include what the committing behavior look like under |
There was a problem hiding this comment.
We need to allow committing in SUSPENDED state now as we first suspend all tasks and than commit. Cf. TaskManager#handleRevocation()
There was a problem hiding this comment.
MInor improvement: we include writing the checkpoint and the caller can indicate if it should be written or not.
There was a problem hiding this comment.
This issues was introduced in the PR that introduced StreamsProducer -- we forgot to close them. Fixing this on the side.
There was a problem hiding this comment.
Sounds good, just mark that depending on John's fix, we probably don't need to handle this.
There was a problem hiding this comment.
We call closeClean below -- just fixing the comment here for now (\cc @guozhangwang)
Note that we don't commit offsets for this case any longer -- previously, committing offsets "might" have been done with closeClean() (even if I believe that the task would be marked as "commitNeeded == false"). We don't let the TaskManager commit offsets here, as it should not be required.
There was a problem hiding this comment.
Similar to above: this issue was introduced in the StreamsProducer PR. We nee to close the producer when we remove it.
There was a problem hiding this comment.
Not sure why we use an iterator here. Simplifying the code with a for-loop
There was a problem hiding this comment.
We need to commit explicitly in TTD now to mimic the TaskManger. Hence, we need access to the consumer and streamsProducer
guozhangwang
left a comment
There was a problem hiding this comment.
I'm still in the middle of reviewing this PR, just left a meta question here: previously the commit ordering is
- flush store, flush producer
- commit offset
- write checkpoint file
Now it becomes 1/3/2. It means if we have a crash between 2 and 3, in the past we would likely read from an old checkpointed offset and replay the changelogs, which is fine with or without EOS (with EOS we would just read committed data still so it is safe).
But now if we crash between 3 and 2, it means the checkpoint file has been written, but the offsets are not committed yet, which means we could potentially lose some data. Right?
abbccdda
left a comment
There was a problem hiding this comment.
Thanks for the PR, left some comments
There was a problem hiding this comment.
Did you already update the KIP for the new config?
There was a problem hiding this comment.
What's the benefit of building this as a static helper?
There was a problem hiding this comment.
We will need this later (ie follow up PR) and it reduced code duplication
There was a problem hiding this comment.
We should have done this from the beginning on... (it's just a "side fix")
There was a problem hiding this comment.
Sounds good, just mark that depending on John's fix, we probably don't need to handle this.
There was a problem hiding this comment.
Add a comment describing the new return statement.
There was a problem hiding this comment.
What's the reasoning her for only wrapping the consumer offset commit case here, not for EOS case?
There was a problem hiding this comment.
You mean exception handling? For the producer all exception handling is done within StreamsProducer (note that threadProducer above is a StreamsProducer, not a KafkaProducer)
There was a problem hiding this comment.
Always feels better for one less parameter :)
There was a problem hiding this comment.
Checkmark for proving the 6 tests are all migrated.
b083c8d to
86aa7f6
Compare
86aa7f6 to
0b5ae1b
Compare
There was a problem hiding this comment.
This is an open question: we don't want to remove this sensor however it was unclear to me how to handle this metric after we split "task committing" into three steps (prepareCommit; taskManager#commit; postCommit).
There was a problem hiding this comment.
Should we attempt to add more fine-grained metrics for 3 stages then?
There was a problem hiding this comment.
Frankly, I have no good idea atm... Also, if we change metrics, we need to update the KIP and it's getting more complicated. If possible, I would prefer to not change any metric, but not sure if it is possible...
There was a problem hiding this comment.
I actually think that we can remove this DEBUG-level per-task commit metrics, since we already have the INFO-level per-thread commit metric and this one does not provide much more additional information?
There was a problem hiding this comment.
Simplification to avoid passing in eosEnabled and reducing constructor parameter list -- we just piggy back on the application.id that shall be null for non-eos.
There was a problem hiding this comment.
It seems a bit roundabout to have to remember we should send a null application.id as the constructor argument to indicate that eos is enabled. What's wrong with saying "eos is enabled" when you want eos to be enabled?
There was a problem hiding this comment.
Subjectively I'd +1 that adding one more parameter to avoid piggy-backing on the applicationId is better.
There was a problem hiding this comment.
It's a personal preference I guess. But seems you don't like it. Will revert it.
There was a problem hiding this comment.
Side cleanup: All those method can actually be package private.
There was a problem hiding this comment.
Removing this state -- this is an open question if I did this correctly. \cc @vvcephei
There was a problem hiding this comment.
After we addressed the question how we want to do metrics, we can update this tests
There was a problem hiding this comment.
Because we make app method in StreamsProducer package private but need access to commit() we add TestDriverProducer to get access.
There was a problem hiding this comment.
Just added to give public access to commitTransaction() to TTD (it's more elegant than making StreamsProducer#commitTransaction public IMHO)
There was a problem hiding this comment.
Why making commitTransaction is less elegant? I thought that was fine since StreamsProducer is inside the internals package anyways? In fact, in TTD we have access to InternalTopologyBuilder accessing it functions (we used to also have a wrapper of InternalTopologyBuilder which we removed later) so I thought that was the agreed pattern.
There was a problem hiding this comment.
It's obviously subjective -- personally, even if something is internal, we should not just declare stuff as public but try to keep it to a minimum to follow the idea of encapsulation (not always possible). If you want me to remove this class and make the method public I can do it in a follow up PR. Not sure if we have an agreed pattern, though.
There was a problem hiding this comment.
Cool, in that sense let's just keep it then -- do not add it in one PR and remove it immediately in the next.
0b5ae1b to
2aeb998
Compare
There was a problem hiding this comment.
nit: we could log thread-id here for easier log search.
There was a problem hiding this comment.
The threadId is already added to the log prefix when the log object is created in StreamsThread
There was a problem hiding this comment.
Could we internalize this state check inside the task to simplify the logic here?
There was a problem hiding this comment.
prepareCloseClean() already does a state check and returns emptyMap if state is CREATED.
The point of this check is, that we don't add anything to the consumedOffsetsAndMetadataPerTask map -- this is important for the corner case for which all tasks are in state CREATED and thus no transaction was initialized. For this case we cannot call producer.addOffsetsToTranscation() and must skip this step entirely. Note, that we have a corresponding check below to not call commitOffsetsOrTransaction if the map is empty.
There was a problem hiding this comment.
See my other comments: we should not commit in CREATED, RESTORING and SUSPENDED state, and it's better just to let the prepareXX function to indicate if there's anything to commit based on its state internally than letting task-manager to branch on the task state -- more specifically, here the prepareClose call should not return the map of checkpoints but the map of partition -> partition-timestamps (if empty it means nothing to commit), since the checkpoint map are not needed at task-manager at all and post commit, if the offsets should be empty it would still be empty.
There was a problem hiding this comment.
Similarly here, this state check could be internalized.
There was a problem hiding this comment.
I think we should just let the prepareXX function to return the map of partitions -> partition-timestamp to indicate if it should be included in the map of committing offsets, so that we do not need to leak the state into task-manager here. Also we only need to call mainConsumer.position once for all tasks -- please see my other comment above.
Also: we should not try to commit state if we are in RESTORING but only flushing store and writing checkpoints (I think this is already the behavior in trunk), since the partitions are paused from the main-consumer before restoration is done --- maybe it is partly causing some unit test flakiness.
There was a problem hiding this comment.
nit: let's order the functions as
prepareCloseClean
closeClean
prepareCloseDirty
closeDirty
There was a problem hiding this comment.
Prepare to uncleanly close a task that we may not own.
There was a problem hiding this comment.
I try to keep "order" and group test methods to keep an overview if test coverage is complete.
// generic tests
// functional
// exception handling
// non-EOS tests
// functional
// exception handling
// EOS tests
// functional
// exception handling
There was a problem hiding this comment.
Looks like we lack test coverage for TimeoutException and KafkaException cases
There was a problem hiding this comment.
Yeah, this PR does not yet add all required test...
There was a problem hiding this comment.
Covered via shouldCommitNextOffsetFromQueueIfAvailable and shouldCommitConsumerPositionIfRecordQueueIsEmpty
There was a problem hiding this comment.
We don't have unit test coverage for this exception case
There was a problem hiding this comment.
Added test shouldThrowWhenHandlingClosingTasksOnProducerCloseError
There was a problem hiding this comment.
We lack unit test coverage for this case
There was a problem hiding this comment.
Could we verify the assignment stack and lost stack separately, by doing handleAssignment verify first before calling handleLost
There was a problem hiding this comment.
Not sure if I can follow? The comments just mark which setup calls belongs to which test call, nothing more. All setup is done upfront before we call the actually methods under test.
There was a problem hiding this comment.
This is also a meta comment: I'm trying to think of a way where we do not need to mess the hierarchy of task / task-manager classes which are just cleaned.
One wild thought following my previous comment, with the goal to avoid all of those prepare post ordering etc, is to abstract this logic away from the task / task-manager and handle inside the record-collector. In other words, the task still follow the same pattern as in today's trunk, e.g. in commitState, calling recordCollector.commit(consumedOffsetsAndMetadata);. However, the recordCollector does not call the corresponding commitTxn immediately but wait until all expected commit calls as been triggered and then call the function with the aggregated offset map. More specifically:
- We let the txn-manager to keep a reference to the shared record-collector.
- We add a "pre-commit" function inside record-collector which passed in the set of expected tasks (or partitions?) to be committed; and then when
commitis called, if there is no set of expected tasks set, record-collector would trigger immediately, otherwise, wait until all the expected elements have been passed in, and then trigger. - In these scenarios:
3.a) Suspend: Although we may only suspend a subset of tasks, we'd still have to commit ALL tasks under eos-beta, so we just call record-collector.preCommit with all the tasks, and then forloop task.suspend.
3.b) Commit: we would have to commit all tasks, so we just call record-collector.preCommit with all the tasks, and then forloop task.commit.
3.c) CloseClean: no matter what task(s) we are closing, we need to committing for all, so the same as above.
3.d) CloseDirty: we do not need to commit at all, so we do not need to call record-collector.preCommit since we know its commit function would not be triggered.
I admit it is not ideal since we are sort of poking a hole inside record-collector to be tasks-aware, but it saves all the code changes we'd have to introduce in task and most of the task-manager messiness.
@mjsax sorry for going back and forth on the high-level code design here, I know changing 1000+ LOC is not a easy job... but I just want to make sure we introduce as less tech complexity in our code base as possible to achieve the same thing.
There was a problem hiding this comment.
nit: add a check that taskId exists in taskProducers to make sure we do not return null.
There was a problem hiding this comment.
Actually on a second thought, I'm wondering if the following inside TaskManager is cleaner:
for (task <- taskManager.activeTasks)
task.recordCollector().commit(taskToCommit.getTask(task.id);
Instead of:
activeTaskCreator.streamsProducerForTask(taskToCommit.getKey()).commitTransaction(taskToCommit.getValue());
My gut feeling is that it is cleaner to not access the task creator for its created stream-producers (and hence here we need to change the task-producer map to streamsProducers), but just access each task's record collector and call its commit --- today we already have a StreamTask#recordCollector method.
There was a problem hiding this comment.
I think it's unclean to let the RecordCollector commit (note that this PR removes RecordCollector not at side refactoring but on purpose) -- to me the RecordCollector has the responsibility to bridge the gap between the runtime code (that is typed), and the Producer that uses <byte[],byte[]> (ie, it serialized the data and manages errors from send) -- why would a collector know anything about committing (for which it also needs a handle to the consumer)?
About accessing the ActiveTaskCreator: we could also expose the StreamsProducer via the RecordCollector though (or directly via the task)? That would be cleaner I guess.
There was a problem hiding this comment.
Subjectively I'd +1 that adding one more parameter to avoid piggy-backing on the applicationId is better.
There was a problem hiding this comment.
Please see my other comment above --- I think it is cleaner to just call foreach(active-task) task.recordCollector.commit inside the task-manager; and inside RecordCollectorImpl we check that eosEnabled is always true, otherwise illegal-state thrown.
In the next PR where we have the thread-producer, we could then only create a single recordCollector object that is shared among all active tasks and wraps the thread-producer, and then the caller taskManager code then can just get one active task and call its record-collector's commit function knowing that is sufficient to actually commit for all tasks since everyone is using the same record-collector.
WDYT?
There was a problem hiding this comment.
This is a meta comment: I think we can consolidate prepareCommit and prepareClose and prepareSuspend here by introducing the clean parameters to the function, since their logic are very similar (for the part that they diffs a bit, it can be pushed to post logic), and on task-manager during commit:
- for each task -> task.prepareCommit(true)
- commit
- for each task -> task.postCommit(true)
During close:
if (clean)
1) for each task -> task.prepareCommit(true)
2) commit()
3) for each task -> task.postCommit(true)
else
1) for each task -> task.prepareCommit(false)
// do not commit
3) for each task -> task.postCommit(false)
4) tasks.close(flag)
And the same for suspension.
There was a problem hiding this comment.
Will do this in a follow up PR.
There was a problem hiding this comment.
SG.
I think in this PR we still can do the change to let prepareXX to return the map of partitions -> partition-timestamp to indicate whether this task should be included in committing.
There was a problem hiding this comment.
I actually think that we can remove this DEBUG-level per-task commit metrics, since we already have the INFO-level per-thread commit metric and this one does not provide much more additional information?
|
Addressed review comments and fix bug exposed by system tests. Also added more tests. New system test run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3834/ |
|
LGTM. Please feel free to merge after green builds. |
|
The failed test seems to be just flaky |
|
Actually the same test failed in both previous runs -- but it passes locally for me. As this PR has nothing to do with the failing test, I might just want go ahead and merge. The last PR that changed the failing test was merged recently: 605d55d There is one concerning comment on that PR: Seems the nightly Jenkins job also fails on this test: -
Did this PR break the Jenkins job? \cc @guozhangwang |
|
test this please |
3daa9a4 to
693b33d
Compare
|
System test failure Given that some more fixed got pushed to |
|
@mjsax only two |
|
Also, the two already-failing tests ( |
|
@ableegoldman Thanks for pointing it out! Pushed a fix and updated the unit test accordingly. \cc @guozhangwang New system test run: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3835/ |
|
followed by Seems to be unrelated to this PR. Are we good to merge? |
The task-level commit metrics were removed without deprecation in KIP-447 and the corresponding PR #8218. However, we forgot to update the docs and to remove the code that creates the task-level commit sensor. This PR removes the task-level commit metrics from the docs and removes the code that creates the task-level commit sensor. The code was effectively dead since it was only used in tests. Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
The task-level commit metrics were removed without deprecation in KIP-447 and the corresponding PR apache#8218. However, we forgot to update the docs and to remove the code that creates the task-level commit sensor. This PR removes the task-level commit metrics from the docs and removes the code that creates the task-level commit sensor. The code was effectively dead since it was only used in tests. Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
The task-level commit metrics were removed without deprecation in KIP-447 and the corresponding PR apache#8218. However, we forgot to update the docs and to remove the code that creates the task-level commit sensor. This PR removes the task-level commit metrics from the docs and removes the code that creates the task-level commit sensor. The code was effectively dead since it was only used in tests. Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
The task-level commit metrics were removed without deprecation in KIP-447 and the corresponding PR apache#8218. However, we forgot to update the docs and to remove the code that creates the task-level commit sensor. This PR removes the task-level commit metrics from the docs and removes the code that creates the task-level commit sensor. The code was effectively dead since it was only used in tests. Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
The task-level commit metrics were removed without deprecation in KIP-447 and the corresponding PR apache#8218. However, we forgot to update the docs and to remove the code that creates the task-level commit sensor. This PR removes the task-level commit metrics from the docs and removes the code that creates the task-level commit sensor. The code was effectively dead since it was only used in tests. Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <matthias@confluent.io>
With KIP-447 we need to commit all tasks at once using eos-beta as all tasks share the same producer. Right now, each task is committed individually (by making multiple calls to
consumer.commitSyncor individual calls toproducer.commitTransactionfor the corresponding task producer).To allow for a unified commit logic, we move the commit logic into
TaskManager. For non-eos, we collect all offset to be committed and do a single call toconsumer.commitSync-- for eos-alpha we still commit each transaction individually (but now triggered by theTaskManagerto have all code in one place).To allow for a unified commit logic, we need to split existing method on
Taskinterface in pre/post parts to allow committing in between, in particular:commit()->prepareCommit()andpostCommit()suspend()->prepareSuspend()andsuspend()(we keep the name as it still does suspend the task)closeClean()->prepareCloseClean()andcloseClean(Map<TopicPartitions, Long> checkpoint)(we keep the name as it still does close the task)closeDirty()->prepareCloseDirty()andcloseDirty()(we keep the name as it still does close the task)prepareCloseClean()returns checkpoint information to allow checkpointing after theTaskManagerdid the commit withincloseClean().In a follow up PR, we will introduce eso-beta and commit a single transaction over all task using the shared producer.
Call for review @guozhangwang @abbccdda