Skip to content

KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions#11046

Merged
hachikuji merged 5 commits intoapache:trunkfrom
C0urante:kafka-12980
Dec 9, 2021
Merged

KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions#11046
hachikuji merged 5 commits intoapache:trunkfrom
C0urante:kafka-12980

Conversation

@C0urante
Copy link
Copy Markdown
Contributor

@C0urante C0urante commented Jul 14, 2021

Jira

This is useful for reads to the end of a topic that contains aborted or empty transactions. If an aborted transaction is at the end of the topic, the consumer can now be expected to return from poll if it advances past that aborted transaction, and users can query the consumer's latest position for the relevant topic partitions to see if it has managed to make it past the end of the topic (or rather, what was the end of the topic when the attempt to read to the end of the topic began).

For a concrete example of this logic, see the KafkaBasedLog::readToLogEnd method that Connect employs to refresh its view of internal topics.

No new unit tests are added, but many existing ones are modified to ensure that aborted and empty transactions are detected and reported correctly by Fetcher::collectFetch.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@C0urante
Copy link
Copy Markdown
Contributor Author

C0urante commented Oct 7, 2021

@hachikuji could you take a look at this when you have a chance?

Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java Outdated
@C0urante
Copy link
Copy Markdown
Contributor Author

Thanks for the review @hachikuji; I agree that it's simpler and more future-proof to make the logic more general and not worry about the specific case of aborted transactions. I've updated the PR (and title) accordingly.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good overall. Left some small comments.

Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetch.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java Outdated
final Fetch<K, V> fetch = pollForFetches(timer);
if (!fetch.isEmpty()) {
if (fetch.records().isEmpty()) {
log.debug(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not so sure about the value of this log line. Maybe it would be more useful to ensure that we have enough logging in Fetcher that we can see when the position gets advanced without any record data? That would tell us not only why a poll() returned with empty data, but which partition caused it to do so. What do you think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that works. My goal with this line was to help people understand the cause for this new behavior, as it may seem like a bug at first if poll returns empty batches before the poll timeout has elapsed. I've taken a stab at a message that has functional value (i.e., naming the specific topic partition whose position has advanced without any user-visible records) and still tries to let the user know about how/why this may affect poll behavior; LMKWYT.

Also, this section was pretty buggy as-was since it skipped the call to client.transmitSends and didn't pass the empty batch through the consumer's interceptors. I've fixed both of these; LMK if you think we should skip passing an empty batch to interceptors, though.

@C0urante
Copy link
Copy Markdown
Contributor Author

Thanks @hachikuji. I've addressed the nits and tried to address the more substantial comment regarding logging; interested in your thoughts when you have a moment.

* This method returns immediately if there are records available. Otherwise, it will await the passed timeout.
* If the timeout expires, an empty record set will be returned. Note that this method may block beyond the
* timeout in order to execute custom {@link ConsumerRebalanceListener} callbacks.
* This method returns immediately if there are records available or if the position advances past control records.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: past control records or aborted transactions when isolation.level=READ_COMMITTED

Copy link
Copy Markdown
Contributor Author

@C0urante C0urante Dec 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, will add.

if (partRecords.isEmpty()) {
log.debug(
"Advanced position for partition {} without receiving any user-visible records. "
+ "This is likely due to skipping over control records in the current fetch, "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this. First, we can augment the message above to something like this:

  log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`", 
    position, nextPosition, completedFetch.partition, partRecords.size());

This gives us enough information in the logs to see which partitions caused the poll() to return and it tell us exactly where in the log the aborted transaction/control records exist. Second, maybe we can add back your previous log line, but make it a little more terse and put it at trace level:

                        log.trace(
                                "Returning empty records from `poll()` since the consumer's position has advanced "
                                        + "for at least one topic partition")

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me 👍

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall. Just two minor comments.

Comment thread clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java Outdated
completedFetch.partition
);
log.trace("Returning empty records from `poll()` "
+ "since the consumer's position has advanced for at least one topic partition");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I think this comment made more sense in its original location in KafkaConsumer. At this level, it seems redundant after the changes in the log message above. We would already say "... and returning 0 records from poll()"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true. I was hoping we could have something in here that explicitly states that this can happen because of the change in behavior implemented in this PR (i.e., skipping control records or aborted transactions). If you think it's worth it to call that out in a log message we can do that here, otherwise the entire if (partRecords.isEmpty()) branch is unnecessary.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm inclined to either remove the log line entirely or move it back to its former location in KafkaConsumer. Will leave it up to you.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved it to KafkaConsumer.

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the updates.

@hachikuji hachikuji merged commit ddb6959 into apache:trunk Dec 9, 2021
@C0urante C0urante deleted the kafka-12980 branch December 9, 2021 16:46
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…ion advances due to aborted transactions (apache#11046)

Return empty record batch from Consumer::poll when position advances due to aborted transactions or control records. This is useful for reads to the end of a topic that contains aborted or empty transactions. If an aborted transaction is at the end of the topic, the consumer can now be expected to return from `poll` if it advances past that aborted transaction, and users can query the consumer's latest `position` for the relevant topic partitions to see if it has managed to make it past the end of the topic (or rather, what was the end of the topic when the attempt to read to the end of the topic began).

Reviewers: Jason Gustafson <jason@confluent.io>
@nbali
Copy link
Copy Markdown

nbali commented Mar 28, 2023

@C0urante FYI this has a functionality loss. If you KafkaConsumer.poll(Duration) you are unable to tell anymore if it returns empty if it was a timeout - so most likely the topic is fully consumed - or not. Right now even a single empty (numRecords = 0, positionAdvanced = true) fetch will trigger a return. Deduplicated KTables can return fetches like that.

@C0urante
Copy link
Copy Markdown
Contributor Author

@nbali this PR was merged over two years ago and I've lost almost all of the context around it. If you are seeing problems because of the changes made here, please file a Jira ticket describing what's going wrong (including examples and/or a practical use case if possible) and link it to the ticket for this change.

@nbali
Copy link
Copy Markdown

nbali commented Mar 29, 2023

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants