KAFKA-9441: Add internal StreamsProducer#8105
Conversation
There was a problem hiding this comment.
This allow us to simplify RecordCollectorImplTest significantly -- we can register a exception that we want to throw on the next invocation of the corresponding method.
There was a problem hiding this comment.
I think we should do the regular state check first before we throw any test ingested exception.
There was a problem hiding this comment.
That depends on the purpose of this class. According to the meta comment:
By default this mock will synchronously complete each send call successfully. However it can be configured to allow the user to control the completion of the call and supply an optional error for the producer to throw.
We are definitely not concerned too much about the internal correctness of MockProducer, as long as it is reproducing the error we injected IMHO.
There was a problem hiding this comment.
We just replaced this with the commitTransactionException -- a user can just set a ProducerFencedException to get the same behavior
There was a problem hiding this comment.
We use StreamsProducer now that encapsulates the actually producer, does transaction state handling, and error handling. RecordCollectorImpl is now agnostic to transactions as desired.
There was a problem hiding this comment.
Note that we don't even need to call a "maybeInitTx" on StreamsProducer -- it embeds the corresponding logic internally and fully automatic.
There was a problem hiding this comment.
This comment is unclear to me, that is why I added the text -- can you explain what it means @guozhangwang ?
There was a problem hiding this comment.
These first line should be removed -- I did it in my current on-going PR.
There was a problem hiding this comment.
I think we could also set to null here -- not sure which one would be better?
There was a problem hiding this comment.
Seems the comment does not make sense any longer? Only side cleanup in this class
There was a problem hiding this comment.
To avoid redundant code, we setup everything for non-eos and eos case upfront (even if not every test needs everything).
There was a problem hiding this comment.
We can avoid to create "collectors" in almost all tests now reducing the overall complexity of the test
There was a problem hiding this comment.
I added stricter verification to all tests, to make sure the right exception was triggered.
There was a problem hiding this comment.
MockProducer does not support the correct handling of callback exceptions, hence, we need to do it manually. (Did not think it worth to extend MockProducer -- let me know what you think)
There was a problem hiding this comment.
The name and the test does not really match as it throws in beginTransaction. In any case, we have test for both cases already and thus I removed this duplicate test (please verify that I did not reduce test coverage)
There was a problem hiding this comment.
The two cases are differ that one throwing KafkaException (fatal) and the other throwing ProducerFencedException (task-migrated).
There was a problem hiding this comment.
My argument was that the old test was named "send" but did throw in "beginTx" (and there was already a corresponding "beginTx" test in the old code -> hence "redundant" / or at least need a fix). However, the new StreamsProducerTest should cover all cases for "send" as well as "beginTx":
BeginTx:
shouldThrowTaskMigrateExceptionOnEosBeginTxnFencedshouldThrowTaskMigrateExceptionOnEosBeginTxnErrorshouldFailOnEosBeginTxnFatal
send:
shouldThrowTaskMigratedExceptionOnEosSendFencedshouldThrowTaskMigratedExceptionOnEosSendUnknownPidshouldThrowTaskMigrateExceptionOnEosSendOffsetFencedshouldThrowStreamsExceptionOnEosSendOffsetErrorshouldFailOnEosSendOffsetFatal
There was a problem hiding this comment.
This is actually an incorrect test -- on commit, we would never do anything with StandbyTasks because they always return false on commitNeeded() -- the used StateMachineTask got a corresponding fix.
There was a problem hiding this comment.
Hmm... maybe that's correct, but that would also mean that the standby tasks are not committed at any occasions except closing which sounds like a lurking bug to me?
There was a problem hiding this comment.
Sounds correct -- I was double checking this code and this issue was introduced in the refactoring via 4090f9a
To not block this PR further, I would suggest to keep this change as-is and do a follow up PR to fix the issue.
There was a problem hiding this comment.
I will make a separate PR to fix this.
There was a problem hiding this comment.
Removed this test for the same reason as above.
There was a problem hiding this comment.
This is the fix for StateMachineTask -- it should behave the same way as an actual active or standby task
There was a problem hiding this comment.
After John's TDD fix, this is not used any longer (just cleanup to remove it).
|
Are the test failures in |
There was a problem hiding this comment.
nit: can we consolidate producerFencedOnCommitTxn to the more-general commitTransactionException? I.e. if you want to fence on commit, you just register the commitTransactionException as a ProducerFencedException
There was a problem hiding this comment.
The passed in value would be true for both alpha and beta right? if yes we can just name it eosEnabled.
There was a problem hiding this comment.
These first line should be removed -- I did it in my current on-going PR.
There was a problem hiding this comment.
When are we removing the entry upon task closure? If it never cleans up we could potentially have an ever-growing map.
There was a problem hiding this comment.
The TaskManager does this in handleAssignment() and handleLostAll().
There was a problem hiding this comment.
Regret to see this ... I guess in order to allow mocking the task creation inside threadTest we'd have to do this, but it also makes the code less readable. Maybe we can augment the createTask interface to pass in all parameters so that the preparation logic can then be pushed into the StreamThread?
There was a problem hiding this comment.
Also the taskProducers map is reference in three different classes: 1) creator to populate it, 2) task-manager to remove from it, 3) thread to get metrics from it (this is to be fixed).
I think we can let it to be purely owned by the task-manager, after we fixed 3) (cc @vvcephei ), as for 1) if we can augment the createTask above, we can then push the addition logic into TaskManager and creates the record-collector with the passed-in producer, and the only pass-in the record collector into createTask interface.
WDYT?
There was a problem hiding this comment.
I did not want to do more refactoring this PR is already quite big -- it would be best to push TaskCreator stuff completely into TaskManager.
There was a problem hiding this comment.
Fair enough :) could you add a TODO marker here?
There was a problem hiding this comment.
nit: in KIP-447 we would not only use applicationId but also consumer's whole ConsumerGroupMetadata, so for future-uses I'm thinking maybe we can just pass in the consumer object here, and then before KIP-447 we can just get the applicationId from its consumer.metadata.
There was a problem hiding this comment.
It's for sure an intermediate state -- will get changed in follow up PRs.
There was a problem hiding this comment.
The passed in task-id are used for two purposes: 1) define eosAlphaEnabled, 2) use in logging (btw there are still some place where we do not check taskId == null). And in #8058 we no longer log the task-id in TaskMigrated since when it happens we would migrate all the tasks anyways. So we do not need it in the TaskMigrated exception.
So I'd suggest the following:
a) we keep a String for taskIds which would be a single task-id if the passed in taskId != null or default to "all owned active tasks" for logging purposes only.
b) we keep the eosEnabled checking the taskId as is.
And moving forward as we remove the alpha we would replace this parameter with a boolean only since it would always be all owned active tasks.
There was a problem hiding this comment.
Ditto here: even later when we introduce the eosBeta the logic here would be the same as we wrap the logic in TransactionProducer right? In that case we can just name this boolean as eosEnabled.
There was a problem hiding this comment.
nit: rename to isProducerFenced?
There was a problem hiding this comment.
An UnknownProducerIdException does not imply fencing? Does it?
There was a problem hiding this comment.
Well yes and no: it is not fenced by txn coordinator, but by broker :) Anyways I'm just a feeling a bit awkward that in the caller we say if it is recoverable, we always handle it as task-migrated --- it is a very nit so your call
There was a problem hiding this comment.
It was called isRecoverable() before -- think I just leave it.
|
Updates this PR -- also fixed the TDD issue. |
8242fa0 to
32f4eca
Compare
|
Rebase to resolve merge conflict. Call for review @guozhangwang |
abbccdda
left a comment
There was a problem hiding this comment.
Take an initial pass. I put an architectural idea there which we could discuss further after the long weekend.
There was a problem hiding this comment.
nit: could be simplified as eosEnabled
There was a problem hiding this comment.
That depends on the purpose of this class. According to the meta comment:
By default this mock will synchronously complete each send call successfully. However it can be configured to allow the user to control the completion of the call and supply an optional error for the producer to throw.
We are definitely not concerned too much about the internal correctness of MockProducer, as long as it is reproducing the error we injected IMHO.
There was a problem hiding this comment.
"Task " + taskId + " could not get partition information for topic " + topic
There was a problem hiding this comment.
It looks weird to let transactionManager to send record when the application is not on EOS. What's the issue with name StreamProducer?
There was a problem hiding this comment.
We can also go back to StreamProducer -- I just had the impression that TransactionManager is the better name. You are correct, that if EOS is disabled, the TM has nothing to manage :) -- I don't feel strong about the name either way.
There was a problem hiding this comment.
should be abortTransactionIfOngoing or maybeAbortTransaction as there may not be one on-going txn.
There was a problem hiding this comment.
Well, the idea of the TM is "hide" those details -- as a caller, I don't want to think about it, I just say, something was wrong, please abort. That the call might be a no-op should be an implementation detail to the caller.
There was a problem hiding this comment.
I don't think the comment is necessary
There was a problem hiding this comment.
Agree -- copied this from RecrodCollectorImpl...
There was a problem hiding this comment.
On stream this could only be consumer, so we could use consumerGroupId
There was a problem hiding this comment.
appid == groupid and in KS we always use appid.
There was a problem hiding this comment.
nit: as we are using full names elsewhere, we could also name it maybeInitTransaction
|
Thanks for the review @abbccdda -- updated this PR. |
There was a problem hiding this comment.
Needed to change the constructor to make it shorter (checkstyle failed). Hence, some variables cannot be final any longer.
9a94b2b to
e181ee8
Compare
|
Java 8: Java 11: (No need to retest atm. Waiting for reviews.) |
|
retest this please |
|
I couldn't find one of my important previous comments anywhere, so I will just rephrase it here: I'm thinking whether it makes sense to define a WDYT? |
guozhangwang
left a comment
There was a problem hiding this comment.
@mjsax I made another pass (this time including the tests) and most are minor. Please feel free to merge after you've replied them.
There was a problem hiding this comment.
We should remove K9113 here since in the scope of KAFKA-9113 we are going to treat this as fatal (which is already incorporated now). If we want to use the buffer-and-retry logic then it would be out of 9113's scope.
There was a problem hiding this comment.
Well yes and no: it is not fenced by txn coordinator, but by broker :) Anyways I'm just a feeling a bit awkward that in the caller we say if it is recoverable, we always handle it as task-migrated --- it is a very nit so your call
There was a problem hiding this comment.
nit: If we rename it to StreamProducer then maybe this can be renamed to embeddedProducer or sth.
There was a problem hiding this comment.
Are you fine with kafkaProducer() ?
There was a problem hiding this comment.
If the serialization failed we wrap it as a StreamsException -- with this PR it seems a regression that we do not do it any more.
There was a problem hiding this comment.
Brackets looks unnecessary here.
There was a problem hiding this comment.
I reuse the variable name thrown twice and want to make it final -- hence the braces
There was a problem hiding this comment.
qq: not sure what does this mean.
There was a problem hiding this comment.
It means functional test, ie, we that something should work (instead of negative test that test that something should not work/throw an error). Are you fine with this or do you have a suggestion on how to rephrase it?
There was a problem hiding this comment.
nit: shouldFailOnInitTxnWhenEOSDisabled.
There was a problem hiding this comment.
Ditto here. Not sure what does positive mean.
abbccdda
left a comment
There was a problem hiding this comment.
Blocking the merge here as we want to at least rename the class to StreamProducer.
e181ee8 to
f4d4103
Compare
|
Update this PR. Let me know if I addresses all comments sufficiently. If yes, I hope we can merge if Jenkins is green. |
|
LGTM. |
Upfront refactoring for KIP-447. Introduces `TransactionManager` that allows to share a producer over mulitple tasks and track the TX status.
f4d4103 to
ffe90a4
Compare
|
Rebase this PR to resolve merge conflicts. |
* apache-github/trunk: (23 commits) KAFKA-9530; Fix flaky test `testDescribeGroupWithShortInitializationTimeout` (apache#8154) HOTFIX: fix NPE in Kafka Streams IQ (apache#8158) MINOR: set scala version automatically based on gradle.properties KAFKA-9577; SaslClientAuthenticator incorrectly negotiates SASL_HANDSHAKE version (apache#8142) KAFKA-9441: Add internal TransactionManager (apache#8105) MINOR: Document endpoints for connector topic tracking (KIP-558) MINOR: Standby task commit needed when offsets updated (apache#8146) KAFKA-9206; Throw KafkaException on CORRUPT_MESSAGE error in Fetch response (apache#8111) MINOR: Remove unwanted regexReplace on tests/kafkatest/__init__.py KAFKA-9586: Fix errored json filename in ops documentation KAFKA-9575: Mention ZooKeeper 3.5.7 upgrade KAFKA-9481: Graceful handling TaskMigrated and TaskCorrupted (apache#8058) HOTFIX: don't try to remove uninitialized changelogs from assignment & don't prematurely mark task closed (apache#8140) MINOR: Fix javadoc at org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion (apache#7337) MINOR: Improve EOS example exception handling (apache#8052) MINOR: Fix a number of warnings in clients test (apache#8073) MINOR: Update shell scripts to support z/OS system (apache#7913) MINOR: Wording fix in Streams DSL docs (apache#5692) MINOR: Add missing @test annotation to MetadataTest#testMetadataMerge (apache#8141) KAFKA-9533: ValueTransform forwards `null` values (apache#8108) ...
…etrics-common * confluent/master: (76 commits) KAFKA-9530; Fix flaky test `testDescribeGroupWithShortInitializationTimeout` (apache#8154) HOTFIX: fix NPE in Kafka Streams IQ (apache#8158) MINOR: set scala version automatically based on gradle.properties KAFKA-9577; SaslClientAuthenticator incorrectly negotiates SASL_HANDSHAKE version (apache#8142) KAFKA-9441: Add internal TransactionManager (apache#8105) MINOR: Document endpoints for connector topic tracking (KIP-558) MINOR: Standby task commit needed when offsets updated (apache#8146) Changes to migrate to Artifactory (#263) KAFKA-9206; Throw KafkaException on CORRUPT_MESSAGE error in Fetch response (apache#8111) MINOR: Remove unwanted regexReplace on tests/kafkatest/__init__.py KAFKA-9586: Fix errored json filename in ops documentation KAFKA-9575: Mention ZooKeeper 3.5.7 upgrade KAFKA-9481: Graceful handling TaskMigrated and TaskCorrupted (apache#8058) HOTFIX: don't try to remove uninitialized changelogs from assignment & don't prematurely mark task closed (apache#8140) MINOR: Fix javadoc at org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion (apache#7337) MINOR: Improve EOS example exception handling (apache#8052) MINOR: Fix a number of warnings in clients test (apache#8073) MINOR: Update shell scripts to support z/OS system (apache#7913) MINOR: Wording fix in Streams DSL docs (apache#5692) MINOR: Add missing @test annotation to MetadataTest#testMetadataMerge (apache#8141) ...
Upfront refactoring for KIP-447.
Introduces
StreamsProducerthat allows to share a producer over multiple tasks and track the TX status.Call for review @guozhangwang @abbccdda