Skip to content

KAFKA-5697: issue Consumer#wakeup during Streams shutdown#4930

Closed
vvcephei wants to merge 10 commits intoapache:trunkfrom
vvcephei:streams-client-wakeup-on-shutdown
Closed

KAFKA-5697: issue Consumer#wakeup during Streams shutdown#4930
vvcephei wants to merge 10 commits intoapache:trunkfrom
vvcephei:streams-client-wakeup-on-shutdown

Conversation

@vvcephei
Copy link
Copy Markdown
Contributor

Wakeup consumers during shutdown to break them out of any internally blocking calls.

Semantically, it should be fine to treat a WakeupException as "no work to do", which will then continue the threads' polling loops, leading them to discover that they are supposed to shut down, which they will do gracefully.

The existing tests should be sufficient to verify no regressions.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@vvcephei
Copy link
Copy Markdown
Contributor Author

@guozhangwang @mjsax @bbejeck ,

Guozhang, Jason, and I had a discussion this morning and decided to go ahead and put this change in to address the immediate issue and allow the longer-term fix (#4855 and KIP-266) to proceed at their own pace.

I opted to go ahead and replace the Consumer#poll occurrences in our tests as well, so we can cleanly verify that there are no direct calls to Consumer#poll in Streams' code base.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vvcephei This is a meta comment on this approach:

Note that for the global thread, in each loop (at the beginning of the loop we check isRunning)

1.a: We may call consumer.poll once in pollAndUpdate.

1.b: We may call consumer.poll multiple times in a while (offset < highWatermark) loop.

  1. For the normal consumer, we only call poll once in each loop after checking isRunning.

3.a For the restore consumer, we call poll() once in maybeUpdateStandbyTasks, it is covered by checking if (state == State.RUNNING && taskManager.hasStandbyRunningTasks()) again.

3.b. And we call poll() once in updateNewAndRestoringTasks, which is called at most once in the loop.

So for 1.b which is called multiple times in the loop, do we need to special handle this case?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this constructor? Shouldn't it be auto generated if no other constructors are given?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the private constructor in a final class actually makes it impossible to instantiate the class, ensuring it can only be used for its static methods.

This is basically a pattern for declaring a "static-methods-only" class.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, makes sense

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: rename with ConsumerUtils to be consistent with other classes?

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Apr 26, 2018

@vvcephei thanks for the patch overall looks good.

Test failures are relevant though.

@vvcephei
Copy link
Copy Markdown
Contributor Author

@guozhangwang @bbejeck

The InternalTopicIntegrationTest was failing because there's no guarantee that the streams app would actually process anything (including create internal topics) before the test makes assertions about said internal topics.

Actually, this was never guaranteed, but I guess the specific interactions would get the right metadata actions enqueued before the call to close, which would then block on them before the assertions were made. In other words, it was always a race, but now we're certain to lose it.

The only solution I could think of was to block until the processing is complete, and the only way I could think to do that is to wait for lag to hit 0.

I was surprised that I couldn't get at the consumer metrics. Is this expected? I think there's some aspect of the system design I'm missing here. I just added them provisionally to the streams metrics, but I'm not sure if this is desired either (at the least, it might be kip-worthy).

@guozhangwang
Copy link
Copy Markdown
Contributor

I think we have a similar situation in some other places as well, my proposal was to wait for the number of records being consumed from the sink topic with IntegrationTestUtils.waitXX, and then validate.

@guozhangwang
Copy link
Copy Markdown
Contributor

So for 1.b which is called multiple times in the loop, do we need to special handle this case?

@vvcephei have you thought about this?

@vvcephei
Copy link
Copy Markdown
Contributor Author

Ah, I saw those concerns previously, but overlooked them on this latest pass. Let me think about it, @guozhangwang .

@vvcephei
Copy link
Copy Markdown
Contributor Author

@guozhangwang ,

Yeah, all the cases you listed are fine, except 1.b, which will just loop around and get stuck in poll again (since Wakeup isn't sticky). In the case of my other approach, async poll, it would be an infinite loop instead. I'll need some time to come up with an elegant solution.

Thanks for the catch!

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented Apr 30, 2018

@guozhangwang,

Actually, the global state case is even more different from the other streams threads. The GlobalStreamsThread overrides Thread#start() and blocks until the state transitions to running. If there's an exception during this process, it surfaces it in the thread that called start().

By contrast, the local threads won't block the call to start(). Instead, if they have trouble initializing, they just throw the exception in their own thread, terminating the thread, and hopefully there's an uncaught exception handler to report it.

This is important because if we have any global state, we wouldn't be able to call KafkaStreams#close() at all, since the initialization would be hanging and blocking the call to KafkaStreams#start(). Bothe start and close synchronize on KafkaStreams, so there's no way to close if it's hung during startup.

This seems like a problem, so I went ahead and removed that blocking code, instead just throwing in the Global thread if there's any trouble initializing. This is worthy of special scrutiny because I'm not sure why it was set up like that to begin with. I asked @dguy , but it's night for him right now, so he hasn't had a chance to answer yet.

It also turns out that GST runs into a blocking consumer call even before it calls poll, when it tries to list the partitions for its topics. The bummer is that there didn't seem to be any safe semantics that would let me to treat a Wakeup like a "no-op" response. If I say that there aren't any partitions, we'd throw an exception to the top level, which is good because that's normally a user mistake.

Also, the call stack between GST#run and GlobalStateManagerImpl#register is quite deep (7 calls) and passes through GST, StateConsumer, GlobalStateUpdateTask, GSMI, multiple StateStores, and AbstractProcessor. restore is void, so I could detect the shutdown and just return early, but I worried about the consequences of just aborting the restore early with so much different code interpreting its return as success. So I created an exception to signal shutdown, which would be thrown all the way back up the call chain to GST, where it could be properly handled.

All in all, I'm not to sure about this "solution", but after kicking around various options all day, I figured it was time to get other opinions.

@vvcephei
Copy link
Copy Markdown
Contributor Author

@bbejeck For some reason, GH wouldn't let me mention you in the last comment... What do you think about it?

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented May 1, 2018

@guozhangwang Ok, I talked to @dguy and he confirmed that the GlobalStreamThread does need to complete initialization before everything else starts.

I can add a thread to issue wakeups to the client on an interval and treat the wakeups as timeouts, checking the state and retrying until we get a connection (or until we run out of retries).

OR we can fix the main threads and leave the GST blocking startup until KIP-266 goes through, since the wakeups will then be unnecessary.

@guozhangwang
Copy link
Copy Markdown
Contributor

@vvcephei Thanks for the detailed explanation. I agree that the global thread's case is different and that's partially why we introduced StreamsConfig.RETRIES_CONFIG as part of KIP-224 to remedy this before we have the final holy grail of KIP-266.

So let's only fix this in main thread as for KAFKA-5697 itself and consider global thread as part of KIP-266 later.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented May 2, 2018

@vvcephei thanks for the details. I agree with @guozhangwang, at this point with KIP-266 looming we just change the main thread and deal with GST when we have the timeouts in place.

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented May 3, 2018

I've just confirmed that the wait-on-metrics method is correct wrt @guozhangwang 's earlier concern that the producer needs to flush before we check the lag. This already does happen (ultimately, in org.apache.kafka.streams.integration.utils.IntegrationTestUtils#produceKeyValuesSynchronouslyWithTimestamp(java.lang.String, java.util.Collection<org.apache.kafka.streams.KeyValue<K,V>>, java.util.Properties, java.lang.Long, boolean))

if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics());
result.putAll(metrics.metrics());
return Collections.unmodifiableMap(result);
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that I'm adding all the internal consumer metrics to the map returned by metrics. Is this ok?!?!?

I think so, since it's supposed to be a "map of all metrics", so I'd argue it's not a semantic change, and all the Streams metrics will still be there, so nothing will break.

But it's clearly not what we had before, so I want to ask explicitly.

I guess the alternative for anyone, like me, who wants to get the lag metrics is to register a metric reporter for the consumers and capture the metrics that way, but I would question the existence of this method if it only returns some of the metrics and you have to implement your own reporter to capture the others...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure seems fine to me, but I'll let others weigh in.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the catch.

I agree that we should be able to return all the embedded client's metrics as well; thinking about this a bit more, we should also include the producer metrics as well.

I'll go ahead and merge this PR as is while leaving a TODO marker for @vvcephei

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented May 3, 2018

@guozhangwang @bbejeck @mjsax Call for final reviews.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @vvcephei just a couple of minor comments and one meta-comment. Overall looks good.

Do we want to add the special excpetion handling logic for GST now or just do the main thread and wait until KIP-266 lands?

stateRestoreListener,
streamsConfig);
streamsConfig,
new GlobalStateManagerImpl.IsRunning() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since this is used 4 times in the test create one instance?

if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics());
result.putAll(metrics.metrics());
return Collections.unmodifiableMap(result);
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure seems fine to me, but I'll let others weigh in.

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented May 4, 2018

java 8 failure was:

java.lang.AssertionError: expected acls Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 has Allow permission for operations: Read from hosts: *, User:21 has Allow permission for operations: Read from hosts: *, User:39 has Allow permission for operations: Read from hosts: *, User:43 has Allow permission for operations: Read from hosts: *, User:3 has Allow permission for operations: Read from hosts: *, User:35 has Allow permission for operations: Read from hosts: *, User:15 has Allow permission for operations: Read from hosts: *, User:16 has Allow permission for operations: Read from hosts: *, User:22 has Allow permission for operations: Read from hosts: *, User:26 has Allow permission for operations: Read from hosts: *, User:11 has Allow permission for operations: Read from hosts: *, User:38 has Allow permission for operations: Read from hosts: *, User:8 has Allow permission for operations: Read from hosts: *, User:28 has Allow permission for operations: Read from hosts: *, User:32 has Allow permission for operations: Read from hosts: *, User:25 has Allow permission for operations: Read from hosts: *, User:41 has Allow permission for operations: Read from hosts: *, User:44 has Allow permission for operations: Read from hosts: *, User:48 has Allow permission for operations: Read from hosts: *, User:2 has Allow permission for operations: Read from hosts: *, User:9 has Allow permission for operations: Read from hosts: *, User:14 has Allow permission for operations: Read from hosts: *, User:46 has Allow permission for operations: Read from hosts: *, User:13 has Allow permission for operations: Read from hosts: *, User:5 has Allow permission for operations: Read from hosts: *, User:29 has Allow permission for operations: Read from hosts: *, User:45 has Allow permission for operations: Read from hosts: *, User:6 has Allow permission for operations: Read from hosts: *, User:37 has Allow permission for operations: Read from hosts: *, User:23 has Allow permission for operations: Read from hosts: *, User:19 has Allow permission for operations: Read from hosts: *, User:24 has Allow permission for operations: Read from hosts: *, User:17 has Allow permission for operations: Read from hosts: *, User:34 has Allow permission for operations: Read from hosts: *, User:12 has Allow permission for operations: Read from hosts: *, User:42 has Allow permission for operations: Read from hosts: *, User:4 has Allow permission for operations: Read from hosts: *, User:47 has Allow permission for operations: Read from hosts: *, User:18 has Allow permission for operations: Read from hosts: *, User:31 has Allow permission for operations: Read from hosts: *, User:49 has Allow permission for operations: Read from hosts: *, User:33 has Allow permission for operations: Read from hosts: *, User:1 has Allow permission for operations: Read from hosts: *, User:27 has Allow permission for operations: Read from hosts: *) but got Set(User:36 has Allow permission for operations: Read from hosts: *, User:7 has Allow permission for operations: Read from hosts: *, User:21 has Allow permission for operations: Read from hosts: *, User:39 has Allow permission for operations: Read from hosts: *, User:43 has Allow permission for operations: Read from hosts: *, User:3 has Allow permission for operations: Read from hosts: *, User:35 has Allow permission for operations: Read from hosts: *, User:15 has Allow permission for operations: Read from hosts: *, User:16 has Allow permission for operations: Read from hosts: *, User:22 has Allow permission for operations: Read from hosts: *, User:26 has Allow permission for operations: Read from hosts: *, User:11 has Allow permission for operations: Read from hosts: *, User:38 has Allow permission for operations: Read from hosts: *, User:8 has Allow permission for operations: Read from hosts: *, User:28 has Allow permission for operations: Read from hosts: *, User:32 has Allow permission for operations: Read from hosts: *, User:25 has Allow permission for operations: Read from hosts: *, User:41 has Allow permission for operations: Read from hosts: *, User:44 has Allow permission for operations: Read from hosts: *, User:48 has Allow permission for operations: Read from hosts: *, User:2 has Allow permission for operations: Read from hosts: *, User:9 has Allow permission for operations: Read from hosts: *, User:14 has Allow permission for operations: Read from hosts: *, User:46 has Allow permission for operations: Read from hosts: *, User:13 has Allow permission for operations: Read from hosts: *, User:5 has Allow permission for operations: Read from hosts: *, User:29 has Allow permission for operations: Read from hosts: *, User:45 has Allow permission for operations: Read from hosts: *, User:6 has Allow permission for operations: Read from hosts: *, User:37 has Allow permission for operations: Read from hosts: *, User:23 has Allow permission for operations: Read from hosts: *, User:19 has Allow permission for operations: Read from hosts: *, User:24 has Allow permission for operations: Read from hosts: *, User:17 has Allow permission for operations: Read from hosts: *, User:34 has Allow permission for operations: Read from hosts: *, User:12 has Allow permission for operations: Read from hosts: *, User:42 has Allow permission for operations: Read from hosts: *, User:4 has Allow permission for operations: Read from hosts: *, User:47 has Allow permission for operations: Read from hosts: *, User:18 has Allow permission for operations: Read from hosts: *, User:31 has Allow permission for operations: Read from hosts: *, User:49 has Allow permission for operations: Read from hosts: *, User:33 has Allow permission for operations: Read from hosts: *, User:27 has Allow permission for operations: Read from hosts: *)
	at kafka.utils.TestUtils$.fail(TestUtils.scala:357)
	at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:867)
	at kafka.utils.TestUtils$.waitAndVerifyAcls(TestUtils.scala:1251)
	at kafka.security.auth.SimpleAclAuthorizerTest.testHighConcurrencyModificationOfResourceAcls(SimpleAclAuthorizerTest.scala:350)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
	at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
	at com.sun.proxy.$Proxy1.processTestClass(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:108)
	at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:146)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:128)
	at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
	at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
	at java.lang.Thread.run(Thread.java:748)

java 10 failure was

org.apache.kafka.common.errors.IllegalSaslStateException: Unexpected handshake request with client mechanism PLAIN, enabled mechanisms are []

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented May 4, 2018

@bbejeck I think that the wakeup/shutdown code still has some value in GST post-startup, but I'm also ok with ditching the code you mentioned.

I don't think I'll be able to check in again by the time the others weigh in, so I'll just say that you all should feel free to wait on this until I get back, or even modify it and merge it, if you don't like the current state. I'm fine any which way.

if (globalStreamThread != null) result.putAll(globalStreamThread.consumerMetrics());
result.putAll(metrics.metrics());
return Collections.unmodifiableMap(result);
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the catch.

I agree that we should be able to return all the embedded client's metrics as well; thinking about this a bit more, we should also include the producer metrics as well.

I'll go ahead and merge this PR as is while leaving a TODO marker for @vvcephei

@guozhangwang
Copy link
Copy Markdown
Contributor

Merged to trunk.

@mjsax mjsax added the streams label May 7, 2018
ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
Wakeup consumers during shutdown to break them out of any internally blocking calls.

Semantically, it should be fine to treat a WakeupException as "no work to do", which will then continue the threads' polling loops, leading them to discover that they are supposed to shut down, which they will do gracefully.

The existing tests should be sufficient to verify no regressions.

Author: John Roesler <john@confluent.io>

Reviewers: Bill Bejeck <bbejeck@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes apache#4930 from vvcephei/streams-client-wakeup-on-shutdown

minor javadoc updates
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants