Skip to content

KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout#10022

Closed
rajinisivaram wants to merge 1 commit intoapache:trunkfrom
rajinisivaram:KAFKA-12268-fetch
Closed

KAFKA-12268; Return from KafkaConsumer.poll only when records available or timeout#10022
rajinisivaram wants to merge 1 commit intoapache:trunkfrom
rajinisivaram:KAFKA-12268-fetch

Conversation

@rajinisivaram
Copy link
Copy Markdown
Contributor

Changes from https://issues.apache.org/jira/browse/KAFKA-10866 cause early return from KafkaConsumer.poll() even when records are not available. We should respect timeout specified in poll() and return only when records are available.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@chia7712
Copy link
Copy Markdown
Member

chia7712 commented Feb 2, 2021

How about making Fetcher#fetchedRecords update metadata only if records is not empty? In other words, the Fetcher.java#L643 can be moved to Fetcher.java#L659. The FetchedRecords having no records always gets discarded. Hence, we don't need to set metadata of FetchedRecords when records is empty.

@rajinisivaram
Copy link
Copy Markdown
Contributor Author

@chia7712 Thanks for the review. There is a test that verifies that we get metadata even when there are no records (

// ... but we can still get metadata that was in the fetch response
). So I assumed that we wanted to populate this for some reason. I wasn't sure if we also rely on the consumer returning early to use the metadata, so will wait for @vvcephei to take a look.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @rajinisivaram and @chia7712 ,

The intent of my change was specifically to return early when we get some fetch response metadata even if we didn't get any records. The rationale for this was part of https://cwiki.apache.org/confluence/display/KAFKA/KIP-695%3A+Further+Improve+Kafka+Streams+Timestamp+Synchronization

Do you think the design is wrong, or is the problem just that it broke assumptions that are in the system tests? In the latter case, should we just fix the system tests? My thinking was that it's not reasonable for any caller to just assume that there will be records returned when poll returns, but I'm certainly not 100% sure on that.

By the way, I'm sorry that I didn't run the Client system tests as part of my work, and I didn't notice the failures after I merged.


final FetchedRecords<K, V> records = pollForFetches(timer);
if (!records.isEmpty()) {
if (!records.records().isEmpty()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this change so that we would immediately send off the next round of fetches after handling any fetch responses, not just any fetch responses that contain records.

Was my reasoning incorrect?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure whether this is a kind of behavior change. The docs of KafkaConsumer#poll indicates that timeout is used to wait available records.

     * This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
     * If the timeout expires, an empty record set will be returned. Note that this method may block beyond the
     * timeout in order to execute custom {@link ConsumerRebalanceListener} callbacks.

Maybe we can introduce a new API poll(Duration, Options) (similar to KafkaAdmin. The options enables us to adjust the poll behavior for specific use cases. Also, it opens a room to give various poll in the future.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had thought about that with @vvcephei on the original PR that changes this behavior:

#9836 (comment)

And we decided this is not going to impact users much in practice and hence still went ahead with it since by the end of the day, we would still condition on ConsumerRecords#isEmpty or the for loop to process records if there are any; but I admit that I did not catch the javadoc statement back then.

At the moment, I think we could change the javadoc along with the PR if so far we've only seen our tests being broken because it relies on this guarantee; if you have any common use cases that may be impacted by this behavior change, I'd love to hear and revisit.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, I think we could change the javadoc along with the PR if so far we've only seen our tests being broken because it relies on this guarantee; if you have any common use cases that may be impacted by this behavior change, I'd love to hear and revisit.

just imagine a use case :)

Users tried to random access a record according to offset. Hence, the consumer is NOT in a loop. Instead, it works like a getter method.

def randomAccess(offset: Int, duration: Duration): Seq[ConsumerRecords] = {
  consumer.seek(tp, offset)
  consumer.poll(duration).asJava
}

In this case, users expect the method returns empty record "only if" there is no available records. With the new behavior, users possibly receive empty results before timeout and they have to rewrite the code like below.

def randomAccess(offset: Int, duration: Duration): Seq[ConsumerRecords] = {
  consumer.seek(tp, offset)
  val endTime = System.currentTimeMillis() + duration.toMillis
  while (System.currentTimeMillis() <= endTime) {
    val records = consumer.poll(duration).asJava
    if (!records.isEmpty) return records
  }
  Seq.empty
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixing system tests would be straightforward, but as @chia7712 said, this could impact existing applications which rely on the current behaviour. For example, applications may poll(longTimeout) and exit if no records are returned, treating it as an error condition, similar to system tests. Adding a new API with options sounds like the safe alternative to maintain compatibility. But it may be good to follow up on the mailing list, perhaps on the discussion thread of KIP-695 to get wider opinion. Anyway the change proposed in this PR seems unsuitable, we can either fix system tests or make the new behaviour optional.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, all. I agree with the conclusion that the system is operating as designed, so we should close this PR and either change the design or the system tests.

I also think @rajinisivaram 's suggestion to discuss it on the mailing list is a good one, so that the discussion will be part of the history of the KIP and not lost to the sands of time in this thread.

I'll send a new response to the vote thread to re-initialize that discussion.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, @chia7712 and @rajinisivaram , I've restarted the VOTE thread with a new message.

Hopefully, we can wrap up that discussion quickly, so I can circle back to either change the feature or the tests.

Thanks for that counter-example, @chia7712 ! Actually, we were aware of that kind of case, and your proposed workaround is exactly what we had to do in the integration tests: https://github.com/apache/kafka/pull/9836/files#diff-735dcc2179315ebd78a7c75fd21b70b0ae81b90f3d5ec761740bc80abeae891fR1875-R1888 :)

The key question, which I tried to pose in the mailing list, is whether this is really a "real" use case we have to support, or whether it's just something we happen to do in some tests or are able to imagine. We can certainly add a new method to the interface, but that also has nontrivial usability costs, as users need to understand the differences of those two methods and we also have to maintain and test both code paths. If it's not that likely that someone outside of our own project will be harmed, it seems better to just make the change in place.

Anyway, we should discuss on the mailing list; I just wanted to acknowledge your response.

@rajinisivaram
Copy link
Copy Markdown
Contributor Author

@vvcephei Thank you! I will close this PR since the discussion can continue on the mailing list.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants