Skip to content

KAFKA-5697: prevent poll() from blocking forever#4855

Merged
hachikuji merged 51 commits intoapache:trunkfrom
vvcephei:kafka-5697-stream-thread-shutdown
May 26, 2018
Merged

KAFKA-5697: prevent poll() from blocking forever#4855
hachikuji merged 51 commits intoapache:trunkfrom
vvcephei:kafka-5697-stream-thread-shutdown

Conversation

@vvcephei
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei commented Apr 12, 2018

Add the new stricter-timeout version of poll proposed in KIP-266.

The pre-existing variant poll(long timeout) would block indefinitely for metadata
updates if they were needed, then it would issue a fetch and poll for timeout ms
for new records. The initial indefinite metadata block caused applications to become
stuck when the brokers became unavailable. The existence of the timeout parameter
made the indefinite block especially unintuitive.

This PR adds poll(Duration timeout) with the semantics:

  1. iff a metadata update is needed:
    1. send (asynchronous) metadata requests
    2. poll for metadata responses (counts against timeout)
      • if no response within timeout, return an empty collection immediately
  2. if there is fetch data available, return it immediately
  3. if there is no fetch request in flight, send fetch requests
  4. poll for fetch responses (counts against timeout)
    • if no response within timeout, return an empty collection (leaving async fetch request for the next poll)
    • if we get a response, return the response

The old method, poll(long timeout) is deprecated, but we do not change its semantics, so it remains:

  1. iff a metadata update is needed:
    1. send (asynchronous) metadata requests
    2. poll for metadata responses indefinitely until we get it
  2. if there is fetch data available, return it immediately
  3. if there is no fetch request in flight, send fetch requests
  4. poll for fetch responses (counts against timeout)
    • if no response within timeout, return an empty collection (leaving async fetch request for the next poll)
    • if we get a response, return the response

One notable usage is prohibited by the new poll: previously, you could call poll(0) to block for metadata updates, for example to initialize the client, supposedly without fetching records. Note, though, that this behavior is not according to any contract, and there is no guarantee that poll(0) won't return records the first time it's called. Therefore, it has always been unsafe to ignore the response.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented Apr 12, 2018

@guozhangwang I think this change is a good solution to the problem you reported in https://issues.apache.org/jira/browse/KAFKA-5697. WDYT?

I've tried tomake the change in a 'private' fashion so as not to
change any public APIs, but let me know if you think it still needs a KIP.

Who else should review a change like this?

@hachikuji
Copy link
Copy Markdown
Contributor

Thanks for the patch. I knew we should be pretty close to being able to do this by now after some of the recent work. Note, however, that ConsumerCoordinator.refreshCommittedOffsetsIfNeeded still blocks.

The main thing to be careful of is probably the case you mentioned where users have depended on poll(0) in order to ensure they have a valid assignment. I think this is the only thing that has kept me from finishing this. We could do a KIP to be on the safe side, but I'm on the fence whether it's actually needed. Maybe the question is whether we should give users an alternative API to await an assignment.

@guozhangwang guozhangwang requested review from hachikuji and mjsax April 12, 2018 01:19
@guozhangwang
Copy link
Copy Markdown
Contributor

I'll leave to @hachikuji and @mjsax to review and unblock you.

@vvcephei
Copy link
Copy Markdown
Contributor Author

Hey @hachikuji, Thanks for taking a look!

I'll add a timeout to refreshCommittedOffsetsIfNeeded as well, and also look at the failing integration tests.

I was unaware that folks would use poll(0) to wait for an assignment without fetching. I was thinking it was just to "drain" the consumer of previously fetched records. Although, now that you mention it, that should have been obvious from the tests I changed.

In that light, I do think I should go ahead and do a KIP, since with this change there would be no way to wait for an assignment without polling. That will also be a good way to seek feedback on whether or not we need the drain-previously-fetched use case, since that will also no longer be possible. And of course, users may need to increase their timeout value since the metadata updates will now count against it.

If it's not too distracting to you, I'll keep this PR open to sketch out the new method and to update the tests and work though any failures while I write the KIP.

@hachikuji
Copy link
Copy Markdown
Contributor

@vvcephei Yeah, that sounds good to me. I'm not sure how common that usage is--it's definitely convenient in testing--but it's probably a good idea to get some more visibility for the change to be on the safe side. Thanks for picking this up, by the way!

@vvcephei
Copy link
Copy Markdown
Contributor Author

Just to record my current state on this...

I decided a good way to approach this was to lay the groundwork by introducing timeouts to every call in the poll path that can block, but not changing any semantics presented by the KafkaConsumer itself (by passing Long.MAX_VALUE to all the new internal timeout parameters).

I found in testing that altering the semantics of poll's timeouts was very disruptive to our unit and integration tests both, hence my taking the conservative approach for now.

Of course, the new timeouts are of no value unless they are exposed in some way, and that's what I'm going to work on tomorrow. Once I have poked around enough to have a decent proposal, then I'll write the promised KIP.

@vvcephei
Copy link
Copy Markdown
Contributor Author

rebased

@vvcephei
Copy link
Copy Markdown
Contributor Author

  • rebased to bring in the new scala streams tests (which failed last time).
  • fixed a race condition in ConsumerCoordinator

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented Apr 24, 2018

It's been a while since I've recorded status on this. I have a theory that poll can strike a balance between blocking and async by being fundamentally async and optionally blocking while it polls for async operations to complete.

Thus, if we need to refresh metadata, and let's say that metadata operation would take 2s and then we have some records to grab, and that would take 3s, then all these scenarios would work as expected:

completely async:

poll(0) => [] // fires off the metadata update requests
sleep(1s)
poll(0) => [] // metadata updates are still pending
sleep(1s)
poll(0) => [] // metadata updates are complete. sends fetch request.
sleep(1s)
poll(0) => [] // fetch request is still pending
sleep(1s)
poll(0) => [] // fetch request is still pending
sleep(1s)
poll(0) => [record1, record2] // response was available

completely sync:

poll(5s) => [record1, record2] // sent metadata update request, waited 2s for response, sent fetch request, waited 3s for response

sync, but not long enough to complete the full operation:

poll(3s) => [] // sent metadata update request, waited 2s for response, sent fetch request, waited 1s for response
sleep(1s)
poll(3s) => [record1, record2] // waited 1s and got the fetch response.

In other words, we make poll completely async but leave in the option to block for responses, since that makes some use cases (especially tests and POCs simpler).

Even with some timeout bigger than 0 but less than infinity, it's ok to block as long as we can and then return an empty list if we didn't finish, whether the thing we didn't finish was a metadata update or a fetch request. This is ok because callers are supposed to poll poll, aka just keep calling it repeatedly until they get what they want or forever (whichever comes first).

I needed to validate that metadata updates can actually function in an async manner, so (after I added a timeout to the metadata update) I set the metadata update timeout to 0 and put it in a tight loop that would continue until it completed. This is equivalent to the current "block until metadata update completes", but it'll only work if async metadata updates is implemented correctly.

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented Apr 24, 2018

The tests passed on JDK7 and JDK10.

The failure on JDK8 looks like an unrelated failure out of Zookeeper:

kafka.api.PlaintextConsumerTest.testMultiConsumerRoundRobinAssignment

org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /config/topics/topic
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:119)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
	at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:472)
	at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1424)
	at kafka.zk.KafkaZkClient.create$1(KafkaZkClient.scala:262)
	at kafka.zk.KafkaZkClient.setOrCreateEntityConfigs(KafkaZkClient.scala:269)
	at kafka.zk.AdminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(AdminZkClient.scala:99)
	at kafka.zk.AdminZkClient.createTopic(AdminZkClient.scala:56)
	at kafka.utils.TestUtils$.createTopic(TestUtils.scala:296)
	at kafka.integration.KafkaServerTestHarness.createTopic(KafkaServerTestHarness.scala:123)
	at kafka.api.BaseConsumerTest.setUp(BaseConsumerTest.scala:71)
	at sun.reflect.GeneratedMethodAccessor156.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
	at sun.reflect.GeneratedMethodAccessor10.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
	at com.sun.proxy.$Proxy1.processTestClass(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:108)
	at sun.reflect.GeneratedMethodAccessor9.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:146)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:128)
	at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
	at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
	at java.lang.Thread.run(Thread.java:748)

Retest this, please.

@vvcephei
Copy link
Copy Markdown
Contributor Author

Now that I've validated the approach, I've added the new methods I'm proposing and deprecating the old poll.

@vvcephei
Copy link
Copy Markdown
Contributor Author

One remaining task before this is mergable is to update at least some tests to use either the new awaitAssignmentMetadata or the new poll, as appropriate.

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented May 4, 2018

@hachikuji @ConcurrencyPractitioner ,

I've updated this PR with the latest state from KIP-266, as a more concrete example of the proposal. WDYT?

Two things came up for me going through this...

  1. The methods Jason proposed to include actually all already used the configs to set a timeout (request.timeout.ms). I was previously unaware of the use of this config in some of Consumer's methods. I'm still in favor of the current approach, but just to point out that a less dramatic change exists... to keep all the old method signatures and just make them respect request.timeout.ms .

  2. I've learned that Duration lets you choose durations much greater than Long.MAX_VALUE (which is 25 days). If you try to get millis out of it, though, you get an exception. In contrast, TimeUnit.toMillis will just coerce larger values to Long.MAX_VALUE. I flirted somewhat with using Duration "all the way down", and you can see some examples of that in KafkaConsumer, but I think it would become unwieldy. Instead, I think a good approach would be to check up front that the duration is "in bounds" and throw an IllegalArgumentException if not.

Alternatives: we could coerce to MAX_VALUE ourselves, or we could use a loop to actually wait for any duration specified.

I'm not sure either of those points are super relevant to the KIP discussion (unless you think so), so I'll leave it to you to bring them up if you're concerned.

I'll be out for a week, so I won't be available to reply to any comments until then.

Note that the tests for this PR won't pass until the CIT agents are updated to java 8 (but you should be able to build it yourself if desired).

@ConcurrencyPractitioner
Copy link
Copy Markdown
Contributor

Hi @vvcephei,

Thanks for setting up the basic methods in the interfaces, but you could just stick with implement poll.
I could take over the rest once you are done.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this to after line 95 ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

24.86 days is not a month :-)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Truth! But I wanted to convey the ballpark to help readers and maintainers understand that unpredicated calls will block for quite a long time, but not "effectively forever", which is the mental mistake I made when I read "max long milliseconds". So now, I have this mental scale: "max long" seconds is effectively forever, with milliseconds it's a month-ish, and with nanoseconds it's about a week.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't the time spent in this loop be counted toward remaining ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good eye. But this is the "old" method, whose behavior I'm preserving. So this is a re-expression of the existing behavior, that metadata update will block arbitrarily long and not count against the timeout, followed by the fetch, to which the timeout applies.

@guozhangwang
Copy link
Copy Markdown
Contributor

@vvcephei KAFKA-5697 has been fixed via GitHub Pull Request #4930, so I'm wondering if it is more for KIP-266 itself?

@tedyu
Copy link
Copy Markdown
Contributor

tedyu commented May 6, 2018

retest this please

@vvcephei
Copy link
Copy Markdown
Contributor Author

@ConcurrencyPractitioner Sure thing; I'll pull the extra stuff back out. it wasn't as trivial as I thought it would be, evidenced by the fact that the tests broke, so I agree that poll alone is a big enough piece for one PR.

@vvcephei
Copy link
Copy Markdown
Contributor Author

@tedyu Thanks for the reivew! The test failures were related. Something in the "extra methods" I popped in caused the test failures, but I'm about to take them back out, so we should be green again.

@vvcephei
Copy link
Copy Markdown
Contributor Author

Ok, bringing this PR back to life. I've pulled out the "extra" methods I'd added. I expect the tests to pass.

Next up, I need to write a few tests for the deprecated method and migrate the rest of the code base and tests away from it.

@hachikuji should I just migrate the clients code base and leave the rest of kafka alone? Or migrate the whole project? I'm not sure what the norm with this project is.

@guozhangwang
Copy link
Copy Markdown
Contributor

@vvcephei thanks for the updated PR, please ping me and @hachikuji whenever it is ready to be reviewed. Please do the following to help reviewers better understand the PR diffs:

  1. For the old API implementation, if we HAVE TO make any changes to it (personally I'd prefer to not make any changes to keep its scope small, but I'll leave it to you), state clearly in the description what are those and making sure they do not change any semantics of the old API.

  2. For the new API implementation, describe its semantics that given a (possibly zero) timeout value, what will happen if a) it need to send metadata, b) it needs to (re-)join group, c) its metadata is fresh and group is stable, and there is inflight fetch request, d) its metadata is fresh and group is stable, and there is NO inflight fetch request.

  3. Also it would help to add a few more bullet points on the actual changes in KafkaConsumer and AbstractCoordinator to help better understanding the diff details.

vvcephei added 5 commits May 22, 2018 11:44
Several tests depended on being able to send a timeout of 0, but still
have the coordinator poll take non-zero time to do its work. I updated
them to send a long enough timeout for the coordinator to to the
required work.
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch. Left a few comments.

// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.needRejoin())
if (coordinator.needRejoin()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: avoiding the braces has been the convention in kafka. I was never too fond of the convention and I'm not strict about enforcing it for new code, but it's a little annoying and distracting to change the old code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sorry; it happened in a few places when I applied auto-formatting to the methods I was touching. I'll make a pass through and revert formatting changes.

return this.interceptors.onConsume(new ConsumerRecords<>(records));
}
final long fetchEnd = time.milliseconds();
remainingMs = Math.max(0L, remainingMs - (fetchEnd - fetchStart));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need super precise timing control for any of these APIs, but the advantage of the previous approach is that the timeout is effectively a deadline and you won't accrue rounding errors while updating the time remaining. I'm not sure what the reason to change the logic was.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... This is a result of double-refactoring. I got away from the "elapsed" approach when I introduced the "remainingTimeAtLeastZero" method, and then I made a recent change to avoid calling system time so much. I would have gone back to the original time-tracking approach, but I'd forgotten about it.

You are right, tracking elapsed time doesn't rack up rounding errors. I'll add another patch momentarily using this approach throughout my diff instead of "remainingTime".


long startMs = time.milliseconds();
coordinator.poll(startMs, timeout);
return updateFetchPositions(remainingMsAtLeastZero(startMs, timeout));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since Coordinator.poll() can block, we probably have to update startMs here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why we're calling remainingMsAtLeastZero for a new timeout here. Although I might be about to change this to use elapsed-time instead.

// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure
if (!hasAllFetchPositions && pollTimeout > retryBackoffMs)
if (!subscriptions.hasAllFetchPositions() && pollTimeout > retryBackoffMs) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous code tried to avoid unnecessary passes over the assigned partitions since the set can get fairly large. I'm not sure how beneficial this optimization was. Maybe it's fine to lose it. Nevertheless, since poll can be called quite tightly, it would be nice to avoid redundant work.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. I'm not sure what a good way to do this is... hasAllFetchPositions was the result of updateAllFetchPositions before (which changed because the result now indicates whether it timed out or not). But updateAllFetchPositions is not longer called in this scope, so we can't consider keeping its result around.

I could cache the "hasAllFetchPositions" fact on a field in KafkaConsumer, but it seems a little risky. I think it would be fine... WDYT?

*/
protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
long remainingMs = timeoutMs;
public synchronized boolean ensureCoordinatorReady(final long timeoutMs) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was actually because of the point mentioned before about avoiding unnecessary system calls. It's just kind of annoying when you have to pass it down deep into the stack, so we may not have followed the pattern consistently.

Also, why change this to public?

}

private Set<TopicPartition> pendingCommittedOffsetRequest = null;
private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> pendingCommittedOffsetResponse = null;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move these to the top with the rest of the fields? Maybe we can choose clearer names?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep. I'll also wrap them in an inner class, since we're going to add generation too.


private Set<TopicPartition> pendingCommittedOffsetRequest = null;
private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> pendingCommittedOffsetResponse = null;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the basic idea makes sense and is what I was expecting. It might not feel too elegant, but I think a simple approach is best initially. An interesting point to consider is what would happen if an offset fetch is in-flight while a rebalance is in progress. When it returns, the offsets may be stale. I am wondering if it makes sense to fence the response using the group generation. In other words, we record the generation at the time of the request and then verify when we receive the response that it matches the current group generation.

*/
public void ensureFreshMetadata() {
if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(time.milliseconds()) == 0)
if (this.metadata.updateRequested() || this.metadata.timeToNextUpdate(time.milliseconds()) == 0) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we replace this with ensureFreshMetadata(Long.MAX_VALUE)?


boolean ensureFreshMetadata(final long timeout) {
if (timeout < 0) {
return false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we just raise IllegalArgumentException as we do in most other cases?

import static org.junit.Assert.fail;

@SuppressWarnings({"deprecation", "SameParameterValue"})
public class DeprecatedMethodsKafkaConsumerTest {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There has to be a better way to do this. For example, maybe we can extend KafkaConsumerTest?

@vvcephei
Copy link
Copy Markdown
Contributor Author

Allrighty, @hachikuji,

Thanks so much for the thorough review. I think I've addressed all your concerns.

Regarding the timeout-vs-deadline discussion. I've refactored all my timeout handling in terms of elapsed time and minimized the system calls. I think this is good enough for now. I would agree with switching over to deadlines in the future, but it feels too big to lump in with this work.

Thanks,
-John

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. Just a few more comments.


// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might also consider moving this into SubscriptionState? We can invalidate the value whenever the assignment changes or an offset is reset. That would also give us a shortcut for missingFetchPositions().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'd better defer this for later.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.


private synchronized boolean rejoinIncomplete() {
return joinFuture != null;
protected synchronized boolean rejoinNeededOrPending() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably update the javadoc

}

private boolean sameRequest(final Set<TopicPartition> currentRequest, final Generation currentGeneration) {
return currentGeneration.generationId == generation.generationId
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can implement equals for Generation?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered it, but wanted to make sure this was in fact the comparison we should be doing here. I guess it is.


// expose for tests
@Override
protected synchronized boolean ensureCoordinatorReady(final long timeoutMs) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? Seems like the same signature as the parent.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird, but doing this makes the method accessible in the WorkerCoordinatorTest, which it isn't otherwise because the test is in a different package from AbstractCoordinator. WorkerCoordinator can see the superclass's method because it is a subclass.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

assertTrue(s"Found unexpected threads during $context, allThreads=$threads", noUnexpected)
assertTrue(
s"Found unexpected threads during $context, allThreads=$threads, " +
s"unexpected=${threads.filterNot(t => unexpectedThreadNames.forall(s => !t.contains(s)))}",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@vvcephei
Copy link
Copy Markdown
Contributor Author

Ok, @hachikuji , I think this PR is gtg now, supposing the tests pass.

// update metadata (if needed) and keep track of the metadata used for assignment so that
// we can check after rebalance completion whether anything has changed
client.ensureFreshMetadata();
if (!client.ensureFreshMetadata(Long.MAX_VALUE)) throw new TimeoutException();
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji May 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's tough to avoid blocking here. All of this would be considerably easier if we moved the rebalance to the background thread. Another improvement for another time.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch

@hachikuji hachikuji merged commit c470ff7 into apache:trunk May 26, 2018
@vvcephei vvcephei deleted the kafka-5697-stream-thread-shutdown branch May 29, 2018 15:26
private PendingCommittedOffsetRequest pendingCommittedOffsetRequest = null;

private static class PendingCommittedOffsetRequest {
private final Set<TopicPartition> request;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The names here are a bit misleading, maybe we can fix them in #5087? cc @hachikuji . I'm thinking requestedPartitions and requestedGeneration?

return true;
}

private long remainingTimeAtLeastZero(final long timeoutMs, final long elapsed) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have three remainingTimeAtLeastZero functions, in AbstractCoordinator, ConsumerCoordinator and KafkaConsumer. Is it intentional? If not we could leave just one to avoid unintentional code divergence in the future. cc @vvcephei

ying-zheng pushed a commit to ying-zheng/kafka that referenced this pull request Jul 6, 2018
Add the new stricter-timeout version of `poll` proposed in KIP-266.

The pre-existing variant `poll(long timeout)` would block indefinitely for metadata
updates if they were needed, then it would issue a fetch and poll for `timeout` ms 
for new records. The initial indefinite metadata block caused applications to become
stuck when the brokers became unavailable. The existence of the timeout parameter
made the indefinite block especially unintuitive.

This PR adds `poll(Duration timeout)` with the semantics:
1. iff a metadata update is needed:
    1. send (asynchronous) metadata requests
    2. poll for metadata responses (counts against timeout)
        - if no response within timeout, **return an empty collection immediately**
2. if there is fetch data available, **return it immediately**
3. if there is no fetch request in flight, send fetch requests
4. poll for fetch responses (counts against timeout)
    - if no response within timeout, **return an empty collection** (leaving async fetch request for the next poll)
    - if we get a response, **return the response**

The old method, `poll(long timeout)` is deprecated, but we do not change its semantics, so it remains:
1. iff a metadata update is needed:
    1. send (asynchronous) metadata requests
    2. poll for metadata responses *indefinitely until we get it*
2. if there is fetch data available, **return it immediately**
3. if there is no fetch request in flight, send fetch requests
4. poll for fetch responses (counts against timeout)
    - if no response within timeout, **return an empty collection** (leaving async fetch request for the next poll)
    - if we get a response, **return the response**

One notable usage is prohibited by the new `poll`: previously, you could call `poll(0)` to block for metadata updates, for example to initialize the client, supposedly without fetching records. Note, though, that this behavior is not according to any contract, and there is no guarantee that `poll(0)` won't return records the first time it's called. Therefore, it has always been unsafe to ignore the response.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants