Skip to content

KAFKA-7285: Create new producer on each rebalance if EOS enabled#5501

Merged
mjsax merged 6 commits intoapache:trunkfrom
mjsax:kafka-7285-producer-fence-fix
Aug 16, 2018
Merged

KAFKA-7285: Create new producer on each rebalance if EOS enabled#5501
mjsax merged 6 commits intoapache:trunkfrom
mjsax:kafka-7285-producer-fence-fix

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Aug 13, 2018

  • close producer on suspend() if EOS enabled
  • create new producer on resume() if EOS enabled

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@mjsax mjsax added the streams label Aug 13, 2018
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 13, 2018

Call for review @guozhangwang @bbejeck @vvcephei @hachikuji

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 13, 2018

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks fine to me.

I'm a bit concerned about our current inter-twined logic on the close(clean, isZombie) calls. But that is sort of a tech debt and out of this scope.

public void resume() {
// nothing to do; new transaction will be started only after topology is initialized
log.debug("Resuming");
if (eosEnabled) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: above comment can be removed.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 14, 2018

@guozhangwang Can you create a JIRA about the close(clean, isZombie) issue so we don't forget about it?

@vvcephei
Copy link
Copy Markdown
Contributor

The system tests failed: http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2018-08-13--001.1534210196--mjsax--kafka-7285-producer-fence-fix--285a344/report.html

looking at:

Module: kafkatest.tests.streams.streams_broker_down_resilience_test
Class:  StreamsBrokerDownResilience
Method: test_streams_should_failover_while_brokers_down

http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2018-08-13--001.1534210196--mjsax--kafka-7285-producer-fence-fix--285a344/StreamsBrokerDownResilience/test_streams_should_failover_while_brokers_down/11.tgz

Worker got:

[2018-08-13 23:50:21,502] ERROR stream-thread [failover_with_broker_down-2427c47c-9846-472d-a6f6-e80efbadee6a-StreamThread-1] Failed to process stream task 0_0 due to the following error: (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=streamsResilienceSource, partition=0, offset=0
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:356)
	at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:96)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
	at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:957)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:764)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:733)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key null value 2 timestamp 1534204221595) to topic streamsResilienceSink due to java.lang.IllegalStateException: Cannot perform operation after producer has been closed
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:214)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:101)
	at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:89)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:144)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:127)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:91)
	at org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:144)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:127)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:91)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:340)
	... 6 more
Caused by: java.lang.IllegalStateException: Cannot perform operation after producer has been closed
	at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:835)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:844)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:828)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
	... 18 more

Picking another random test:

Module: kafkatest.tests.streams.streams_standby_replica_test
Class:  StreamsStandbyTask
Method: test_standby_tasks_rebalance

http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2018-08-13--001.1534210196--mjsax--kafka-7285-producer-fence-fix--285a344/StreamsStandbyTask/test_standby_tasks_rebalance/14.tgz

Looks like the same failure (worker6):

[2018-08-13 23:54:47,422] ERROR stream-thread [kafka-streams-standby-tasks-fc378783-475d-41b0-8011-3e755d34f114-StreamThread-1] Failed to process stream task 0_0 due to the following error: (org.apache.kafka.streams.processor.internals.AssignedStreamsTasks)
org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=standbyTaskSource1, partition=0, offset=10256
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:356)
	at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:96)
	at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:413)
	at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:957)
	at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:832)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:764)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:733)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] Abort sending since an error caught with a previous record (key 5 value [B@40d1b4ea timestamp 1534204483891) to topic kafka-streams-standby-tasks-in-memory-store-changelog due to java.lang.IllegalStateException: Cannot perform operation after producer has been closed
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:214)
	at org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:59)
	at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:66)
	at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:100)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.access$000(CachingKeyValueStore.java:38)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore$1.apply(CachingKeyValueStore.java:83)
	at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141)
	at org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:234)
	at org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:241)
	at org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:151)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:227)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
	at org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
	at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:155)
	at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:94)
	at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:115)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:144)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:130)
	at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:91)
	at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
	at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:340)
	... 6 more
Caused by: java.lang.IllegalStateException: Cannot perform operation after producer has been closed
	at org.apache.kafka.clients.producer.KafkaProducer.throwIfProducerClosed(KafkaProducer.java:835)
	at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:844)
	at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:828)
	at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:162)
	... 27 more
[2018-08-13 23:54:47,424] INFO stream-thread [kafka-streams-standby-tasks-fc378783-475d-41b0-8011-3e755d34f114-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN (org.apache.kafka.streams.processor.internals.StreamThread)

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @mjsax LGTM just one minor question/nit

if (!isZombie && transactionInFlight) {
producer.abortTransaction();
}
transactionInFlight = false;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we want to consider setting producer to null here as well if eosEnabled? I realize this branch of the code should only get exercised when closing, but just in case we make changes I don't think it will hurt.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 14, 2018

Updated this. Re-triggered system tests:
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1911/

}
try {
if (!isZombie) {
recordCollector.close();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we avoid calling recordCollector.close() unless eosEnabled?

It seems like this block used to be guarded by if(eosEnabled), but it's not anymore.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

 - fixed close logic
 - added more tests
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 15, 2018

Updated again. Added more test and discovered some more issues with close/suspend and fixed it.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 15, 2018

Previous system test passed.

Re-triggered again: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1912/

@guozhangwang
Copy link
Copy Markdown
Contributor

Jenkins failures are relevant?

00:31:14 [ant:checkstyle] [ERROR] /home/jenkins/jenkins-slave/workspace/kafka-pr-jdk10-scala2.12/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:558:5: Cyclomatic Complexity is 17 (max allowed is 16). [CyclomaticComplexity]
00:31:15 
00:31:15 > Task :streams:checkstyleMain FAILED

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 15, 2018

Seems like... will have a look.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 15, 2018

Updated.

if (eosEnabled && !isZombie) {
if (transactionInFlight) {
try {
producer.abortTransaction();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If suspend fails, we will always call task.close(..) again inside AssignedTasks, in which we will abort transactions and close record collectors. So do we have to do it here?

Copy link
Copy Markdown
Member Author

@mjsax mjsax Aug 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good point. If we do the cleanup twice, we might call producer.abortTransaction() and producer.close() again, after a successful producer.close() and thus log unnecessary error message (the exceptions itself would get swallowed).

However, we need to make sure that we don't fail a second time with closeTopology() when task.close(...) calls task.suspend(false,...) a second time.

Will update the PR.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 15, 2018

Previous system tests passed.

Updated and retriggered: https://jenkins.confluent.io/job/system-test-kafka-branch-builder/1913/

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Aug 15, 2018

failure unrelated

retest this please

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Aug 15, 2018

@mjsax no futher comments, updated code looks good to me.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 15, 2018

Retest this please

@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM. Please feel free to merge after tests pass.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 15, 2018

Again kafka.api.AdminClientIntegrationTest.testForceClose

Retest this please

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 15, 2018

kafka.api.AdminClientIntegrationTest.testMinimumRequestTimeouts
kafka.api.SaslOAuthBearerSslEndToEndAuthorizationTest.testTwoConsumersWithDifferentSaslCredentials

Retest this please

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 16, 2018

Java8:

BUILD SUCCESSFUL in 2h 5m 39s

...

ERROR: Step ?Publish coverage to GitHub? aborted due to exception: 
23:01:34 java.io.IOException: Exceeded rate limit for repository
23:01:34 	at com.github.terma.jenkins.githubprcoveragestatus.GitHubPullRequestRepository.getGitHubRepository(GitHubPullRequestRepository.java:46)
23:01:34 Caused: java.io.IOException: Error while accessing rate limit API
23:01:34 	at com.github.terma.jenkins.githubprcoveragestatus.GitHubPullRequestRepository.getGitHubRepository(GitHubPullRequestRepository.java:51)
23:01:34 	at com.github.terma.jenkins.githubprcoveragestatus.CompareCoverageAction.perform(CompareCoverageAction.java:110)
23:01:34 	at hudson.tasks.BuildStepCompatibilityLayer.perform(BuildStepCompatibilityLayer.java:81)
23:01:34 	at hudson.tasks.BuildStepMonitor$1.perform(BuildStepMonitor.java:20)
23:01:34 	at hudson.model.AbstractBuild$AbstractBuildExecution.perform(AbstractBuild.java:744)
23:01:34 	at hudson.model.AbstractBuild$AbstractBuildExecution.performAllBuildSteps(AbstractBuild.java:690)
23:01:34 	at hudson.model.Build$BuildExecution.post2(Build.java:186)
23:01:34 	at hudson.model.AbstractBuild$AbstractBuildExecution.post(AbstractBuild.java:635)
23:01:34 	at hudson.model.Run.execute(Run.java:1819)
23:01:34 	at hudson.model.FreeStyleBuild.run(FreeStyleBuild.java:43)
23:01:34 	at hudson.model.ResourceController.execute(ResourceController.java:97)
23:01:34 	at hudson.model.Executor.run(Executor.java:429)

@mjsax mjsax merged commit 3c14feb into apache:trunk Aug 16, 2018
@mjsax mjsax deleted the kafka-7285-producer-fence-fix branch August 16, 2018 00:13
mjsax added a commit that referenced this pull request Aug 16, 2018
Reviewers: Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Aug 16, 2018

Merged to trunk and cherry-picked to 2.0 and 1.1 (cherry-picking to 1.0 had too many conflicts)

mjsax added a commit that referenced this pull request Aug 16, 2018
Reviewers: Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>
guozhangwang pushed a commit that referenced this pull request Sep 5, 2018
Back porting #5501 broke some tests.

Reviewers: John Roesler <vvcephei@users.noreply.github.com>, Guozhang Wang <wangguoz@gmail.com>
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…che#5501)

Reviewers: Guozhang Wang <guozhang@confluent.io>, John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants