Skip to content

KAFKA-10866: Add metadata to ConsumerRecords#9836

Merged
vvcephei merged 10 commits intotrunkfrom
kafka-10866-consumerrecords-metadata
Jan 28, 2021
Merged

KAFKA-10866: Add metadata to ConsumerRecords#9836
vvcephei merged 10 commits intotrunkfrom
kafka-10866-consumerrecords-metadata

Conversation

@vvcephei
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei commented Jan 6, 2021

Expose fetched metadata via the ConsumerRecords
object as described in KIP-695.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Copy link
Copy Markdown
Contributor Author

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @guozhangwang , do you have time to review this?

I'll send a follow-on PR for the Streams code.

By the way, I pushed this branch to the Apache repo so that I can concurrently send the follow-on PR based on it.

Comment thread build.gradle
testCompile libs.junitJupiterApi
testCompile libs.junitVintageEngine
testCompile libs.mockitoCore
testCompile libs.hamcrest
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To get access to more Matchers

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually removed these from clients when we migrated to JUnit 5...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, my apologies. I'll send a new PR to back this out.

Comment on lines +103 to +106
files="MemoryRecordsTest|MetricsTest|RequestResponseTest|TestSslUtils|AclAuthorizerBenchmark|MockConsumer"/>

<suppress checks="CyclomaticComplexity"
files="MockConsumer"/>
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a couple more branches to MockConsumer, which pushed it over the line.

private final Map<TopicPartition, List<ConsumerRecord<K, V>>> records;
private final Map<TopicPartition, Metadata> metadata;

public static final class Metadata {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made this a static inner class, since "metadata" is such an abstract concept. This way, the scope is clear.

}
}

private static <K, V> Map<TopicPartition, Metadata> extractMetadata(final FetchedRecords<K, V> fetchedRecords) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we translate from the internal result container to the public one. It could be moved to a utility class, but this seemed fine.

* @throws KafkaException if the rebalance callback throws exception
*/
private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollForFetches(Timer timer) {
private FetchedRecords<K, V> pollForFetches(Timer timer) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added an internal struct so that the Fetcher can also return the desired metadata.

Comment on lines +674 to 662
if (!records.isEmpty()) {
fetched.addRecords(partition, records);
recordsRemaining -= records.size();
}
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then, we get the records (note the main logic is now internal to FetchedRecords).


KafkaConsumer<String, String> consumer = newConsumerNoAutoCommit(time, client, subscription, metadata);
consumer.assign(Arrays.asList(tp0, tp1));
consumer.assign(asList(tp0, tp1));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A side effect of adding static imports.


ConsumerRecords<String, String> records = consumer.poll(Duration.ZERO);
assertEquals(0, records.count());
assertThat(records.metadata(), equalTo(emptyMap()));
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dropped in a couple of extra assertions where it seemed appropriate. There are also some new tests farther down to really exercise the new code.

Comment on lines +2507 to +2508
long logFirstOffset;
long logLastOffset;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered inferring these from the sequence of other mock interactions, but it seemed more sensible to just have a way to specify them explicitly.


// the first fetchedRecords() should return the first valid message
assertEquals(1, fetcher.fetchedRecords().get(tp0).size());
assertEquals(1, fetcher.fetchedRecords().records().get(tp0).size());
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a consequence of moving the records inside of a response struct.

@mjsax mjsax added the kip Requires or implements a KIP label Jan 8, 2021
@vvcephei vvcephei force-pushed the kafka-10866-consumerrecords-metadata branch from 41900e1 to ddef3d1 Compare January 12, 2021 00:44
@guozhangwang
Copy link
Copy Markdown
Contributor

@vvcephei I'm starting to review this PR now (sorry for the late delay!). Could you rebase the PR a bit?

@vvcephei vvcephei force-pushed the kafka-10866-consumerrecords-metadata branch 2 times, most recently from 552131c to 2e46bce Compare January 20, 2021 15:52
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly minor comments, otherwise LGTM!


final Map<TopicPartition, ConsumerRecords.Metadata> metadata = new HashMap<>();
for (final TopicPartition partition : subscriptions.assignedPartitions()) {
if (subscriptions.hasValidPosition(partition) && beginningOffsets.containsKey(partition) && endOffsets.containsKey(partition)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why also check beginningOffsets here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is from before I removed it.

if (fetchMetadata == null
|| !fetchMetadata.position().offsetEpoch.isPresent()
|| fetchPosition.offsetEpoch.isPresent()
&& fetchMetadata.position().offsetEpoch.get() <= fetchPosition.offsetEpoch.get()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, why we do not want to update the metadata if epoch is stale?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, would <= accept null from fetchMetadata.position().offsetEpoch.get() on left hand side?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good catch. It looks like this was also leftover from a previous version.

I used to directly populate the returned metadata from the fetch response, but now I'm just populating the returned metadata from the subscription state, which initializeCompletedFetch has already updated.

The benefit is that we don't have to worry about cases like this, since they've already been checked.

fetched.put(partition, newRecords);
TopicPartition partition = nextInLineFetch.partition;

if (subscriptions.isAssigned(partition)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it ever happen that this condition failed except mocking tests?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this check from fetchRecords, which says "this can happen when a rebalance happened before fetched records are returned to the consumer's poll call". I.e., it seems like it can actually happen, but a comment is called for, at least. I'll add it.



@Test
public void testPollMetadataWithExtraPartitions() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the test cover 1) stale epoch, 2) no prev value, cases?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Thanks to your other comment, I've removed the stale epoch check.

By the "no prev value" case, are you referring to what happens when we get a fetch response for a partition for the first time? This is actually all we're testing here.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, thanks!

@vvcephei
Copy link
Copy Markdown
Contributor Author

Huh, I can't get the PlaintextConsumerTest to fail locally...

@guozhangwang
Copy link
Copy Markdown
Contributor

Made another pass on the latest commit and it LGTM. Also triggered the test again.

Once it passed we can merge as-is.

@vvcephei vvcephei force-pushed the kafka-10866-consumerrecords-metadata branch from 1cc0bb7 to ffbaeab Compare January 22, 2021 15:55
@vvcephei
Copy link
Copy Markdown
Contributor Author

Thanks, @guozhangwang !

I ran the PlaintextConsumerTest a bunch more times, and also searched through the trunk build logs. I think this PR did make the test more flaky, and I suspect the reason is that the consumer long-poll will now return "early" if we get a metadata-only fetch response. I've adjusted the test to account for this, and we'll see how the build does now.

@guozhangwang
Copy link
Copy Markdown
Contributor

I ran the PlaintextConsumerTest a bunch more times, and also searched through the trunk build logs. I think this PR did make the test more flaky, and I suspect the reason is that the consumer long-poll will now return "early" if we get a metadata-only fetch response. I've adjusted the test to account for this, and we'll see how the build does now.

Got it. How about this:

In KafkaConsumer.java line

if (!records.isEmpty()) {
                    // before returning the fetched records, we can send off the next round of fetches
                    // and avoid block waiting for their responses to enable pipelining while the user
                    // is handling the fetched records.
                    //
                    // NOTE: since the consumed position has already been updated, we must not allow
                    // wakeups or any other errors to be triggered prior to returning the fetched records.
                    if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) {
                        client.transmitSends();
                    }

                    return this.interceptors.onConsume(new ConsumerRecords<>(records));
                }

We change the condition to records.records().isEmpty?

@vvcephei
Copy link
Copy Markdown
Contributor Author

Thanks @guozhangwang ,

I considered that, but I think for this work, we actually do want to return the metadata if we have fetched some. Since the semantics of poll(timeout) are that it will wait up to the timeout to return and that it may or may not return any records, it seems this PR's behavior is just fine.

All that is wrong is that this particular test expects to get records back after a single poll. Among our integration tests, this is a pretty unique expectation, so I felt good about relaxing it. Does that seem ok to you?

@guozhangwang
Copy link
Copy Markdown
Contributor

Hey @vvcephei I was concerned about not the tests actually, it just rang to me if we should be paranoid about any side-effects for a tighter consumer loop in user code like:

while (running()) {
    records = consumer.poll();
    if (!records.isEmpty()) // process them
}

Before this loop is iterated say N times every second, and now it could be M times every second where M >> N but we are still returning the same number of records in every sec. If user's process logic does not have, e.g., if (!records.isEmpty()) or they have some computational logic for every records variable returned no matter if it is empty, then users may see their CPU increases surprisingly.

But I also realized that the above change I made would effectively "kill" the purpose of this approach. So probably we do not have a better solution ATM without even larger API changes, and we'd just bite it and see if there's any surprises.

@vvcephei
Copy link
Copy Markdown
Contributor Author

Thanks @guozhangwang . I share that concern.

It's a little different than what you pointed out, but to call attention to it: I actually didn't change the definition of ConsumerRecords#isEmpty. It is still just return records.isEmpty(), so it is only true when there are records. The specific rationale is not to break logic similar to what you quoted.

To your point, it's true that M > N, but I doubt that M >> N. The rate at which we return is still bounded by the latency * concurrency of fetch request-response lifecycles that return some records or metadata. As long as there's no new information to return, the Consumer's long-poll will still be effective.

And to the final point, I really doubt that any real code could have an expectation that they can just call poll and always get nonzero records back. It just doesn't seem like a reasonable expectation from a user's perspective.

@guozhangwang
Copy link
Copy Markdown
Contributor

Yup, I think we are on the same page for:

  1. ConsumerRecords#isEmpty: I was not pointing that it may not return true even if only metadata changed, I meant that if users do not have the records.isEmpty condition before the processing logic, then they may waste some more cycles now.
  2. Users should not expect that consumer.poll always returns non-zero records: that is also true. Like I said, I'm just thinking if it is a paranoid thought for a rare case.

For M v.s. N, I think you're right that, even with different consumer's min.byte and max.wait configurations, the difference should be small if any. So all clear on my side.

@vvcephei vvcephei force-pushed the kafka-10866-consumerrecords-metadata branch from ffbaeab to bb23614 Compare January 26, 2021 22:27
@vvcephei
Copy link
Copy Markdown
Contributor Author

That last build was green, but it was a few days ago. Just rebased and pushed again to make sure I don't break the build when I merge this.

@vvcephei vvcephei force-pushed the kafka-10866-consumerrecords-metadata branch from bb23614 to fc3ec40 Compare January 27, 2021 20:08
@vvcephei
Copy link
Copy Markdown
Contributor Author

Most of those failures were known flaky tests, but one was an EasyMock error. I'm not able to repro it locally after a rebase, though. Rebased, pushed, and trying one more time to get a clean build.

@vvcephei
Copy link
Copy Markdown
Contributor Author

Flaky test failures:

    Build / JDK 11 / org.apache.kafka.clients.consumer.internals.FetcherTest.testEarlierOffsetResetArrivesLate()
    Build / JDK 11 / org.apache.kafka.clients.producer.KafkaProducerTest.testHeadersWithExtendedClasses()
    Build / JDK 15 / kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete()

The most concerning one is the FetcherTest, but it's also failing on trunk.

@vvcephei vvcephei merged commit fdcf8fb into trunk Jan 28, 2021
@vvcephei vvcephei deleted the kafka-10866-consumerrecords-metadata branch January 28, 2021 00:18
@ijuma
Copy link
Copy Markdown
Member

ijuma commented Feb 2, 2021

Have we verified that this doesn't cause perf regressions? It seems to have caused system test failures: https://issues.apache.org/jira/browse/KAFKA-12268

@vvcephei
Copy link
Copy Markdown
Contributor Author

vvcephei commented Feb 3, 2021

Thanks @ijuma ; We did run the Streams system tests and benchmarks and verified there was no perf regression. I should have run the Client system tests too; sorry about that. I'll follow up on #10022

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Feb 3, 2021

Thanks @vvcephei !

vvcephei pushed a commit to vvcephei/kafka that referenced this pull request Feb 12, 2021
vvcephei pushed a commit that referenced this pull request Feb 12, 2021
This reverts commit fdcf8fb.

Closes #10119

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
vvcephei pushed a commit to vvcephei/kafka that referenced this pull request Feb 17, 2021
vvcephei added a commit that referenced this pull request Mar 2, 2021
Implements KIP-695

Reverts a previous behavior change to Consumer.poll and replaces
it with a new Consumer.currentLag API, which returns the client's
currently cached lag.

Uses this new API to implement the desired task idling semantics
improvement from KIP-695.

Reverts fdcf8fb / KAFKA-10866: Add metadata to ConsumerRecords (#9836)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Guozhang Wang <guozhang@apache.org>
vvcephei added a commit to confluentinc/kafka that referenced this pull request Mar 3, 2021
…he#10137)

Implements KIP-695

Reverts a previous behavior change to Consumer.poll and replaces
it with a new Consumer.currentLag API, which returns the client's
currently cached lag.

Uses this new API to implement the desired task idling semantics
improvement from KIP-695.

Reverts fdcf8fb / KAFKA-10866: Add metadata to ConsumerRecords (apache#9836)

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Guozhang Wang <guozhang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

consumer kip Requires or implements a KIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants