Skip to content

KAFKA-6607: Commit correct offsets for transactional input data#8040

Merged
mjsax merged 6 commits intoapache:2.5from
mjsax:kafka-6607-eos-commit
Feb 11, 2020
Merged

KAFKA-6607: Commit correct offsets for transactional input data#8040
mjsax merged 6 commits intoapache:2.5from
mjsax:kafka-6607-eos-commit

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Feb 5, 2020

Currently, Kafka Streams commits "offset + 1" that may lead to incorrect "consumer lag" if the input topic is transactional, because the committed offset does "step on" the commit marker, instead of "skipping it".

With this PR, we commit "offsetOfNextRecord" or consumer.position() to step over potential transactional markers to fix this issue.

Call for review @guozhangwang @ableegoldman

This PR is against 2.5 branch on purpose to avoid conflict with the current Kafka Streams refactoring. After the refactoring is merged, we can port this PR to trunk.

@mjsax mjsax added the streams label Feb 5, 2020
@mjsax mjsax changed the base branch from trunk to 2.5 February 5, 2020 01:16
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some additional side cleanup

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new method is add for the fix

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To reuse this condition, we move it to out test utils class

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new method create in test utils class to make is reusable

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to set retries no a not-zero value for transactions...

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value for retries are Integer.MAX_VALUE anyways right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we set it to zero within TestUtils.producerConfig() (not sure why) -- should we remove it there instead?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. I am not sure either but we can check if removing that breaks any other tests --- with the new deliver timeout value we should no longer rely on that config values, --- if we want, we should change the deliver.timeout not this one.

I guess it's up to you :) if it is too much we can just keep it as is.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the retries overwrite and left the delivery timeout default. Locally all integration tests passed. Hence, I hope it's fine that way.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original method was too long and checkstyle failed -- we could also add a checkstyle exception... This was just a quick fix -- let me know what you think

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me -- better not add more checkstyle exceptions :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add verification for the "head record offset"

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve some existing tests, and add couple of more that are missing.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side cleanup

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add "head record offset" verification

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just some test improvments

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we call consumer.position() now, we need to fix the mock consumer setup in the TTD

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot share the consumer any longer, because the global task calls unassign() that nukes our setup from above.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear why? In MockConsumer we only do the following:

public synchronized void unsubscribe() {
        ensureNotClosed();
        committed.clear();
        subscriptions.unsubscribe();
    }

And the beginningOffsets map are not nuked.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that the subscription is nuked and when we call position() the MockConsumer checks if the passed in partition is in its subscription and fails before it tries to access the beginningOffsets map.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah okay, got it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was actually detected by the improved tests... Minor side fix.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @mjsax ! The proposed solution looks good to me. Just a few minor comments plus a meta one for consumer.position exception handling.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to consider handling two exceptions that consumer.position may throw: KafkaException -> should be a fatal one; TimeoutException -> in this case we cannot commit, probably have to treat as fatal..

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. We can wrap KafkaException as StreamsException. A TimeoutException should never happen (compare my comment -- let me know if you think the comment is incorrect) -- hence, we can rethrow TimeoutException as IllegalStateException to flag potential bugs.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value for retries are Integer.MAX_VALUE anyways right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unkonwn-partition.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me -- better not add more checkstyle exceptions :)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not clear why? In MockConsumer we only do the following:

public synchronized void unsubscribe() {
        ensureNotClosed();
        committed.clear();
        subscriptions.unsubscribe();
    }

And the beginningOffsets map are not nuked.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Feb 6, 2020

Updated.

@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM!

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Feb 6, 2020

Java 8:

kafka.admin.ResetConsumerGroupOffsetTest.testResetOffsetsAllTopicsAllGroups
org.apache.kafka.streams.processor.internals.RecordQueueTest.shouldThrowOnNegativeTimestamp

Java 11:

kafka.admin.DeleteConsumerGroupsTest.testDeleteEmptyGroup
org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
org.apache.kafka.streams.processor.internals.RecordQueueTest.shouldThrowOnNegativeTimestamp
kafka.api.PlaintextProducerSendTest.testNonBlockingProducer
org.apache.kafka.streams.kstream.internals.KStreamImplTest.shouldSupportTriggerMaterializedWithKTableFromKStream

Retest this please.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Feb 6, 2020

Java 8:

kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance

Java 11:

kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout
kafka.api.ConsumerBounceTest.testRollingBrokerRestartsWithSmallerMaxGroupSizeConfigDisruptsBigGroup
kafka.api.PlaintextConsumerTest.testLowMaxFetchSizeForRequestAndPartition
org.apache.kafka.streams.integration.SmokeTestDriverIntegrationTest.shouldWorkWithRebalance
org.apache.kafka.streams.kstream.internals.KStreamImplTest.shouldSupportTriggerMaterializedWithKTableFromKStream

Retest this please.

@mjsax mjsax force-pushed the kafka-6607-eos-commit branch from b551ea0 to fc3cc69 Compare February 8, 2020 03:07
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Feb 8, 2020

@guozhangwang The SmokeTestDriverIntegrationTest failed on this PR exposing an issue: for an unknown reason, sometimes consumer.position() returned an offset one larger than expected. This seems to happen during a rebalance and leads to data loss -- the original thread did process data up to offset X, but does commit offset X+2 (note that this test does run with at-least-once semantics, hence, there are no tx-marker gaps), and thus the new thread starts at the wrong position and the record at offset X+1 is never processed. I talked to @hachikuji about this today, but was also not sure why this happens.

Long story short: to fix the issue, I change this PR to get consumer.position not when we commit, but each each when we add data to the internal task queues. This allows us to get the correct consumer position.

However, with that fix, the EosIntegrationTest failed. The reason is, that in this test, we use consumer config max.poll = 1. This implies that each time we get one record at offset X, the consumer position is always X+1. When we reach the end of the input topic (with last processed record offset being Y), we commit Y+1 instead of Y+2 because the consumer did not step over the commit marker yet. Only in the next poll() call, the consumer would step over it. However, when the consumer steps over it does not return any data for the partition (as we are the end of the partition) and thus we also don't update our tracked consumer position in the record queue. @hachikuji suggested to let the consumer return an empty list of consumer records for the corresponding partition for this case. This PR includes this consumer change.

Let me know what you think. Also @hachikuji for review of the consumer change.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if records might not be empty, we need to filter out the dummy records we added to indicate tx-markers

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add a dummy record is we stepped over an tx-marker

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of nothing, we return an empty list if we step over a tx-marker. After a second fetchRecords() we return nothing.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We know pass in the current consumerPosition to track it correctly.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We track the consumer position expliclity now

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could happen if the buffer is empty and if the consumer only stepped over a commit marker passing in empty list of records.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we clear the partitionGroup within closeTopology() that we call above -- however, because of the consumer position tracking, we need to delay it after the commit.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We add the current consumer position when we add records to the queue now.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I improve this test a little bit, adding more conditions.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Feb 8, 2020

@guozhangwang @hachikuji This fix is for KS that does manual commits. I am wondering if consumer auto.offset.commit does the right thing already or if we would need a fix for this case, too?

@mjsax mjsax force-pushed the kafka-6607-eos-commit branch from f4cfcc7 to 1281c0b Compare February 8, 2020 20:14
@guozhangwang
Copy link
Copy Markdown
Contributor

@mjsax Hmm.. after reading the PR I'm a bit inclined to fix the "unknown reason" that caused consumer.position to return the wrong value here; I'm still wondering if it is related to consumer only (I read through the code once again just now and cannot find an obvious root cause) or it is related to the interaction of streams / consumer. Can we reproduce this bug with a consumer smoke scenario?

@guozhangwang @hachikuji This fix is for KS that does manual commits. I am wondering if consumer auto.offset.commit does the right thing already or if we would need a fix for this case, too?

If we believe the consumer.position is buggy then it would affect auto.offset.commit too since we just rely on the position to commit when it triggers.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Feb 10, 2020

I basically agree (just wanted to make progress on this PR -- it's easy to revert it back :) -- even if I think that the explicit position tracking is too bad -- not sure about the change in the consumer itself...) -- however, I don't understand the consumer code good enough to fix the consumer (if there is really a bug). Not sure if you or @hachikuji would have time to investigate and verify if there is a bug or if it's a usage issue in Streams?

This fix should go into 2.5 and code freeze is this week (Wednesday 2/12).

For me, SmokeTestDriverIntegrationTest failed in each run so it should be easy to reproduce with an older version of this PR).

@mjsax mjsax force-pushed the kafka-6607-eos-commit branch from 1281c0b to ab3ba68 Compare February 11, 2020 00:22
@mjsax mjsax force-pushed the kafka-6607-eos-commit branch from ab3ba68 to 7f3c62b Compare February 11, 2020 01:44
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Feb 11, 2020

Updated this PR.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed the latest commit, just one minor comment otherwise LGTM!

// if we are in PENDING_SHUTDOWN and don't find the task it implies that it was a newly assigned
// task that we just skipped to create;
// hence, we just skip adding the corresponding records
continue;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can also log this as INFO for debugging purposes?

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Feb 11, 2020

Java 8 passed.
Java 11 failed with org.apache.kafka.connect.mirror.MirrorConnectorsIntegrationTest.testReplication

Retest this please.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Feb 11, 2020

Java 8: kafka.admin.DescribeConsumerGroupTest.testDescribeGroupWithShortInitializationTimeout
Java 11: kafka.admin.DescribeConsumerGroupTest.testDescribeGroupMembersWithShortInitializationTimeout

Different test than above. Seems unrelated. Mering this PR.

@mjsax mjsax merged commit 4912a8d into apache:2.5 Feb 11, 2020
@mjsax mjsax deleted the kafka-6607-eos-commit branch February 11, 2020 21:59
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Feb 11, 2020

PR for trunk: #8091

stanislavkozlovski pushed a commit to stanislavkozlovski/kafka that referenced this pull request Feb 18, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants