Skip to content

KAFKA-6897: Prevent producer from blocking indefinitely after close#5027

Merged
hachikuji merged 1 commit intoapache:trunkfrom
dhruvilshah3:producer-close
Jul 21, 2018
Merged

KAFKA-6897: Prevent producer from blocking indefinitely after close#5027
hachikuji merged 1 commit intoapache:trunkfrom
dhruvilshah3:producer-close

Conversation

@dhruvilshah3
Copy link
Copy Markdown
Contributor

@dhruvilshah3 dhruvilshah3 commented May 17, 2018

After successful completion of KafkaProducer#close, it is possible that an application calls KafkaProducer#send. If the send is invoked for a topic for which we do not have any metadata, the producer will block until max.block.ms elapses - we do not expect to receive any metadata update in this case because Sender (and NetworkClient) has already exited. It is only when RecordAccumulator#append is invoked that we notice that the producer has already been closed and throw an exception. If max.block.ms is set to Long.MaxValue (or a sufficiently high value in general), the producer could block awaiting metadata indefinitely.

This patch makes sure Metadata#awaitUpdate periodically checks if the network client has been closed, and if so bails out as soon as possible.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@dhruvilshah3 dhruvilshah3 changed the title KAFKA-6897: Prevent producer from blocking indefinitely on close KAFKA-6897: Prevent producer from blocking indefinitely after close May 17, 2018
@cmccabe
Copy link
Copy Markdown
Contributor

cmccabe commented May 17, 2018

The concept seems good. I'm not sure that IllegalStateException is the right one to throw, since the state is legal and reachable. Maybe something like TimeoutException?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to overload version for this purpose? Would a separate flag work? Maybe a more conventional approach would just use a close() method and have an isClosed field or something like that?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it might be a bit awkward to have a close() method for a class like Metadata that has no underlying resources really. That said, I also introduced close() for MetadataUpdater so may be we should do the same for Metadata as well. I will update the PR.

@dhruvilshah3
Copy link
Copy Markdown
Contributor Author

dhruvilshah3 commented May 17, 2018

@cmccabe I agree that IllegalStateException is probably not the most appropriate one to throw in this case. I was following along with whatever other KafkaProducer methods do when invoked after close - for example, RecordAccumulator#abortBatches and RecordAccumulator#append throw IllegalStateException as well. May be we should change all of these to throw a more suitable exception in a subsequent PR?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not a big deal since awaitUpdate will not block in wait for longer than 100ms, but maybe we may as well call notifyAll()?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should Metadata extend Closeable?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering if we should throw KafkaException. This is an expected state since the producer is designed to block in send() to await metadata and there is not really any way for a user to avoid it. To be consistent, we could also raise KafkaException from RecordAccumulator in the similar scenario.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the existing IllegalStateException is confusing and we should fix it.

@dhruvilshah3
Copy link
Copy Markdown
Contributor Author

Retest this please

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, left a few comments. Also note the build failure. Might be a good idea to rebase and verify that the patch is still compiling.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure it makes sense to change this one. In fact, close() should really be idempotent, so maybe we can just remove this check?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's fine to leave this unchanged since it is only invoked at the start of the mock producer apis.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not a big deal, but I do see this being called from MockProducer#send for example so it might be worth keeping things consistent by throwing KafkaException as we do when KafkaProducer#send is called after close.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, but perhaps we could let MetadataUpdater implement Closeable? We can still override close() so that it doesn't throw an exception.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to update this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to the other comment - Metadata#update throws IllegalStateException when invoked after close.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have the notify in close(), do we still need this change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, reverted.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use KafkaException here as well?

Copy link
Copy Markdown
Contributor Author

@dhruvilshah3 dhruvilshah3 May 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I feel IllegalStateException is more appropriate in this case. We expect NetworkClient to not invoke Metadata#update after it has called Metadata#close().

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message seems a little low level for something which will get propagated back to the user. An alternative to consider would be to let awaitUpdate return a boolean indicating whether the update happened or not. That would allow us to raise an exception with a producer-specific message from send().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am now catching this exception in KafkaProducer#send and rethrowing with a more appropriate message.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have KafkaProducer calling into this now, so needs to be public.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. The old logic would let us continue fetching in the case of a timeout. Do you think that was not intentional?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we continued fetching, we would have failed the test at the end. tearDown() checks if we saw any background errors and fails the test if we did, so I thought that this change should be reasonable. Let me know if you think otherwise.

@dhruvilshah3 dhruvilshah3 force-pushed the producer-close branch 2 times, most recently from ed4be25 to efd3e9a Compare May 18, 2018 23:21
@hachikuji
Copy link
Copy Markdown
Contributor

Discussed offline, but we should try and distinguish legitimate illegal state errors when a producer method is called after close returns.

@dhruvilshah3 dhruvilshah3 force-pushed the producer-close branch 4 times, most recently from 19794a5 to 8e47fe3 Compare July 3, 2018 20:37
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. Left a few more comments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why we need to catch this? It's generally a bad practice to ignore interrupts, so usually we either let the exception propagate or we reset the interrupt so that the caller has a chance to observe it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am rethrowing as KafkaException if the interrupt was because of producer close; close() calls notifyAll() which could interrupt the wait() in this method. Does this seem reasonable?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that we are losing the indication that the interrupt has occurred. A caller up the stack may depend on seeing it. I think I would just let the exception be raised in all cases even if the producer is being closed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should Metadata extend Closeable?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "after producer has been closed"?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we chain the caught exception? We expect this to be the close exception, but it could also be a timeout or an authentication failure. Might be useful to know in some scenarios.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since it's trace level anyway, maybe we should just print the stacktrace instead of just the message.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it is from the original code, but asserting the error message seems dubious. Maybe we can just verify the exception type is KafkaException?

@asfgit
Copy link
Copy Markdown

asfgit commented Jul 20, 2018

FAILURE
9289 tests run, 5 skipped, 0 failed.
--none--

@dhruvilshah3
Copy link
Copy Markdown
Contributor Author

Retest this please

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. Just one additional comment.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a little brittle. If there is a delay before executing the task, then send() may raise the wrong exception. I think we could make it more reliable by waiting until the topic "test" has been added to Metadata.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. There is still some degree of uncertainty (even if much smaller than before) so I retained the sleep.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate? I don't see any point in the code where we would return between adding the topic and awaiting the update.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use waitForCondition?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate? I don't see any point in the code where we would return between adding the topic and awaiting the update.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the patch.

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Jul 21, 2018

@hachikuji is this ready to be merged? And is it just trunk or 2.0 as well?

@hachikuji
Copy link
Copy Markdown
Contributor

@ijuma Yes, I will merge to trunk and 2.0.

@hachikuji hachikuji merged commit d11f6f2 into apache:trunk Jul 21, 2018
hachikuji pushed a commit that referenced this pull request Jul 21, 2018
… closed (#5027)

After successful completion of KafkaProducer#close, it is possible that an application calls KafkaProducer#send. If the send is invoked for a topic for which we do not have any metadata, the producer will block until `max.block.ms` elapses - we do not expect to receive any metadata update in this case because Sender (and NetworkClient) has already exited. It is only when RecordAccumulator#append is invoked that we notice that the producer has already been closed and throw an exception. If `max.block.ms` is set to Long.MaxValue (or a sufficiently high value in general), the producer could block awaiting metadata indefinitely.

This patch makes sure `Metadata#awaitUpdate` periodically checks if the network client has been closed, and if so bails out as soon as possible.
ijuma added a commit to confluentinc/kafka that referenced this pull request Jul 23, 2018
* apache-github/2.0:
  MINOR: Close ZooKeeperClient if waitUntilConnected fails during construction (apache#5411)
  KAFKA-6897; Prevent KafkaProducer.send from blocking when producer is closed (apache#5027)
@dhruvilshah3 dhruvilshah3 deleted the producer-close branch July 23, 2018 07:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants