Skip to content

KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a…#15621

Merged
chia7712 merged 52 commits intoapache:trunkfrom
chia7712:KAFKA-16310
Apr 10, 2024
Merged

KAFKA-16310 ListOffsets doesn't report the offset with maxTimestamp a…#15621
chia7712 merged 52 commits intoapache:trunkfrom
chia7712:KAFKA-16310

Conversation

@chia7712
Copy link
Copy Markdown
Member

We do iterate the records to find the offsetOfMaxTimestamp instead of returning the cached one when handling ListOffsetsRequest.MAX_TIMESTAMP, since it is hard to align all paths to get correct offsetOfMaxTimestamp. The known paths are shown below.

  1. convertAndAssignOffsetsNonCompressed -> we CAN get correct offsetOfMaxTimestamp when validating all records
  2. assignOffsetsNonCompressed -> ditto
  3. validateMessagesAndAssignOffsetsCompressed -> ditto
  4. validateMessagesAndAssignOffsetsCompressed#buildRecordsAndAssignOffsets -> ditto
  5. appendAsFollow#append#analyzeAndValidateRecords -> we CAN'T get correct offsetOfMaxTimestamp as iterating all records is expensive when fetching records from leader
  6. LogSegment#recover -> ditto

https://issues.apache.org/jira/browse/KAFKA-16310

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@chia7712 chia7712 requested review from ijuma, junrao and showuon March 28, 2024 20:31
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 : Thanks for the PR.

We also need to
(1) revert all offsetForMaxTimestamp to shallowOffsetMaxTimestamp
(2) change/revert the implementation to set shallowOffsetMaxTimestamp accordingly.
(3) add tests for follower appends

Comment thread core/src/main/scala/kafka/log/UnifiedLog.scala Outdated
@chia7712
Copy link
Copy Markdown
Member Author

(1) revert all offsetForMaxTimestamp to shallowOffsetMaxTimestamp
(2) change/revert the implementation to set shallowOffsetMaxTimestamp accordingly.

Do we need to revert all of them? the paths we had fixed works well now.

  1. It seems to me adding comments for both "recover" and "follower" cases can remind readers that this offsetOfMaxTimestampMs is shallow.
  2. or we can only rename offsetForMaxTimestamp back to shallowOffsetMaxTimestamp but we keep the implementation.

@junrao WDYT?

(3) add tests for follower appends

will complete it later

// case 0: test the offsets from leader's append path
check()

// case 1: test the offsets from follower's append path.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao the extra tests are added. please take a look

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Mar 29, 2024

Do we need to revert all of them? the paths we had fixed works well now.

  1. It seems to me adding comments for both "recover" and "follower" cases can remind readers that this offsetOfMaxTimestampMs is shallow.
  2. or we can only rename offsetForMaxTimestamp back to shallowOffsetMaxTimestamp but we keep the implementation.

@chia7712 : I think both are important. First, it's important to be able to derive the same thing consistently from the leader and the follower log. This affects things like the time indexing entries. It will be confusing if the leader adds an offset in the middle of a batch while the follower adds an offset at the end of the batch. Second, it's important to name things as accurately as possible. Otherwise, future developers could make inaccurate assumptions.

@chia7712
Copy link
Copy Markdown
Member Author

I think both are important. First, it's important to be able to derive the same thing consistently from the leader and the follower log. This affects things like the time indexing entries. It will be confusing if the leader adds an offset in the middle of a batch while the follower adds an offset at the end of the batch. Second, it's important to name things as accurately as possible. Otherwise, future developers could make inaccurate assumptions.

you are right. I have reverted the impl and naming. Also, I add extra comments for the "spec" of offsetOfMaxTimestamp

@chia7712
Copy link
Copy Markdown
Member Author

chia7712 commented Apr 1, 2024

rebase code and apply Luke's patch from https://github.com/chia7712/kafka/pull/3/files

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 : Thanks for the PR. Added a few comments.

Comment thread clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java Outdated
Comment thread clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java Outdated
Comment thread storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java Outdated
Comment thread storage/src/main/java/org/apache/kafka/storage/internals/log/LogSegment.java Outdated
Comment thread core/src/main/scala/kafka/log/UnifiedLog.scala Outdated
@chia7712
Copy link
Copy Markdown
Member Author

chia7712 commented Apr 1, 2024

@junrao thanks for all your reviews and patience. all comments are addressed

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 : Thanks for the updated PR. A few more comments.

Comment thread storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java Outdated
Comment thread storage/src/main/java/org/apache/kafka/storage/internals/log/LogAppendInfo.java Outdated
Comment thread storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java Outdated
Comment thread storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java Outdated
Comment thread storage/src/main/java/org/apache/kafka/storage/internals/log/LogValidator.java Outdated
@junrao
Copy link
Copy Markdown
Contributor

junrao commented Apr 7, 2024

@chia7712 : There are quite a few test failures on kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch().

@chia7712
Copy link
Copy Markdown
Member Author

chia7712 commented Apr 7, 2024

There are quite a few test failures on kafka.server.ListOffsetsRequestTest.testResponseIncludesLeaderEpoch().

yep, I have fixed it on my local. will update PR later. thanks for the reminder :)

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 : Thanks for the updated PR. Just a couple of minor comments.

Comment thread core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala Outdated
@chia7712
Copy link
Copy Markdown
Member Author

chia7712 commented Apr 8, 2024

@junrao thanks for reviews. both comments get addressed in 581242c

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Apr 8, 2024

@chia7712 : Thanks for the updated PR. The code looks good to me. There were 50 failed tests. Is any of them related to the PR? If not, have they all been tracked?

@chia7712
Copy link
Copy Markdown
Member Author

chia7712 commented Apr 9, 2024

Thanks for the updated PR. The code looks good to me. There were 50 failed tests. Is any of them related to the PR? If not, have they all been tracked?

there are many timeout exception, and so I feel that could be caused by busy server. I will trigger QA again instead of creating a bunch of flaky issues

@chia7712
Copy link
Copy Markdown
Member Author

chia7712 commented Apr 9, 2024

Build / JDK 11 and Scala 2.13 / testLowMaxFetchSizeForRequestAndPartition(String, String).quorum=kraft+kip848.groupProtocol=consumer – kafka.api.PlaintextConsumerFetchTest

https://issues.apache.org/jira/browse/KAFKA-16494

Build / JDK 11 and Scala 2.13 / testFenceMultipleBrokers() – org.apache.kafka.controller.QuorumControllerTest

https://issues.apache.org/jira/browse/KAFKA-15898

Build / JDK 17 and Scala 2.13 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest
Build / JDK 8 and Scala 2.12 / testSyncTopicConfigs() – org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest

https://issues.apache.org/jira/browse/KAFKA-15945

Build / JDK 17 and Scala 2.13 / testReplicateFromLatest() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationExactlyOnceTest

https://issues.apache.org/jira/browse/KAFKA-16383

Build / JDK 8 and Scala 2.12 / testCoordinatorFailover(String, String).quorum=kraft.groupProtocol=classic – kafka.api.SslConsumerTest

https://issues.apache.org/jira/browse/KAFKA-16024

Build / JDK 8 and Scala 2.12 / "testCommitTransactionTimeout(String).quorum=kraft+kip848" – org.apache.kafka.tiered.storage.integration.TransactionsWithTieredStoreTest

https://issues.apache.org/jira/browse/KAFKA-16495

Build / JDK 8 and Scala 2.12 / testDescribeQuorumReplicationSuccessful [2] Type=Raft-Isolated, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest

https://issues.apache.org/jira/browse/KAFKA-15104

@junrao those failed tests pass on my local, and they have jira now. Please review this PR again. thanks!

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Apr 9, 2024

@chia7712 : Thanks for triaging the failed tests. In the last run, it seem that JDK 21 and Scala 2.13 didn't complete. Could you trigger another build? Typically, this could be done by closing the PR, waiting for 20 secs and opening it again.

@chia7712
Copy link
Copy Markdown
Member Author

chia7712 commented Apr 9, 2024

In the last run, it seem that JDK 21 and Scala 2.13 didn't complete. Could you trigger another build? Typically, this could be done by closing the PR, waiting for 20 secs and opening it again.

thanks for the tip. I rebase the code to trigger QA in order to make sure this PR works well with latest code :)

@chia7712
Copy link
Copy Markdown
Member Author

Build / JDK 21 and Scala 2.13 / testInvalidPasswordSaslScram() – org.apache.kafka.common.security.authenticator.SaslAuthenticatorFailureNoDelayTest

https://issues.apache.org/jira/browse/KAFKA-16497

Build / JDK 21 and Scala 2.13 / testReplicateFromLatest() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationSSLTest

https://issues.apache.org/jira/browse/KAFKA-16383

Build / JDK 21 and Scala 2.13 / testAlterSinkConnectorOffsetsZombieSinkTasks – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest

https://issues.apache.org/jira/browse/KAFKA-15917

Build / JDK 21 and Scala 2.13 / testGetSinkConnectorOffsets – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest

https://issues.apache.org/jira/browse/KAFKA-16498

Build / JDK 21 and Scala 2.13 / testResetSinkConnectorOffsetsOverriddenConsumerGroupId – org.apache.kafka.connect.integration.OffsetsApiIntegrationTest

https://issues.apache.org/jira/browse/KAFKA-15891

Build / JDK 21 and Scala 2.13 / testCacheEviction() – org.apache.kafka.server.ClientMetricsManagerTest

https://issues.apache.org/jira/browse/KAFKA-16499

Build / JDK 21 and Scala 2.13 / testTaskRequestWithOldStartMsGetsUpdated() – org.apache.kafka.trogdor.coordinator.CoordinatorTest

https://issues.apache.org/jira/browse/KAFKA-16136

Build / JDK 17 and Scala 2.13 / "testTrustStoreAlter(String).quorum=kraft" – kafka.server.DynamicBrokerReconfigurationTest

https://issues.apache.org/jira/browse/KAFKA-16500

Build / JDK 8 and Scala 2.12 / testConsumptionWithBrokerFailures() – kafka.api.ConsumerBounceTest

https://issues.apache.org/jira/browse/KAFKA-15146

Build / JDK 11 and Scala 2.13 / testReplicateSourceDefault() – org.apache.kafka.connect.mirror.integration.MirrorConnectorsIntegrationBaseTest

https://issues.apache.org/jira/browse/KAFKA-15927

Build / JDK 11 and Scala 2.13 / testSeparateOffsetsTopic – org.apache.kafka.connect.integration.ExactlyOnceSourceIntegrationTest

https://issues.apache.org/jira/browse/KAFKA-14089

Build / JDK 11 and Scala 2.13 / testConsumptionWithBrokerFailures() – kafka.api.ConsumerBounceTest

https://issues.apache.org/jira/browse/KAFKA-15146

Build / JDK 11 and Scala 2.13 / "testCreateUserWithDelegationToken(String).quorum=kraft" – kafka.api.DelegationTokenEndToEndAuthorizationWithOwnerTest

https://issues.apache.org/jira/browse/KAFKA-16501

Build / JDK 11 and Scala 2.13 / "testBrokerHeartbeatDuringMigration(MetadataVersion).metadataVersion=3.4-IV0" – org.apache.kafka.controller.QuorumControllerTest

https://issues.apache.org/jira/browse/KAFKA-15963

Build / JDK 11 and Scala 2.13 / shouldWorkWithUncleanShutdownWipeOutStateStore[exactly_once_v2] – org.apache.kafka.streams.integration.EOSUncleanShutdownIntegrationTest

https://issues.apache.org/jira/browse/KAFKA-16502

Build / JDK 11 and Scala 2.13 / testDescribeQuorumReplicationSuccessful [1] Type=Rt-Combined, MetadataVersion=3.8-IV0, Security=PLAINTEXT – org.apache.kafka.tools.MetadataQuorumCommandTest

https://issues.apache.org/jira/browse/KAFKA-15104

ok, all pass on my local and they have jira.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 : Thanks for triaging the tests. LGTM

@chia7712 chia7712 merged commit 9a6760f into apache:trunk Apr 10, 2024
@chia7712
Copy link
Copy Markdown
Member Author

chia7712 commented Apr 10, 2024

@junrao @showuon thanks for all your reviews and help!

@chia7712 chia7712 deleted the KAFKA-16310 branch April 26, 2024 22:38
val expectedOffsetOfMaxTimestamp = 1
assertEquals(expectedOffsetOfMaxTimestamp, validatingResults.offsetOfMaxTimestampMs,
s"Offset of max timestamp should be 1")
assertEquals(2, validatingResults.shallowOffsetOfMaxTimestamp)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chia7712 : There seems to be an existing bug. The method is checkNonCompressed(), but in line 370, we set the compression codec to GZIP.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, that is a "TYPO" but it does not change the test. We do pass the "NONE" to create LogValidator so it will run the path assignOffsetsNonCompressed

However, I do observe a potential bug.

context

  1. Those batches can have different compression
  2. We take the compression from last batch
    if (batchCompression != CompressionType.NONE)

potential bug

topic-level compression = GZIP
batch_0 = NONE
batch_1 = GZIP

In this case, we don't rebuild records according to topic-level compression since the compression of "last batch" is equal to GZIP. Hence, it results in batch_0 having incorrect compression.

This bug does not produce corrupt records, so we can add comments/docs to describe that issue. Or we can fix it by changing the sourceCompression to be a "collection" of all batches' compression, and then do conversion if one of them is mismatched.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for another, LogValidator is moved to storage module already but its unit test is still in core module. That is a bit weird. We can rewrite it by java with bug fix and then move it to storage module. I have filed https://issues.apache.org/jira/browse/KAFKA-16689

Copy link
Copy Markdown
Contributor

@junrao junrao May 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep, that is a "TYPO" but it does not change the test.

The issue is that the test is testing the wrong expected value. For magic of 1, the offset for max timestamp should be 1 instead of 2.

However, I do observe a potential bug.

Yes, this can lead to inaccurate LogAppendInfo.sourceCompression. But it doesn't seem to have real impact now. LogAppendInfo.sourceCompression is only used in LogValidator, which is only called by the leader. In the leader, currently, we expect only 1 batch per producer.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see #15904

Phuc-Hong-Tran pushed a commit to Phuc-Hong-Tran/kafka that referenced this pull request Jun 6, 2024
apache#15621)

We do iterate the records to find the offsetOfMaxTimestamp instead of returning the cached one when handling ListOffsetsRequest.MAX_TIMESTAMP, since it is hard to align all paths to get correct offsetOfMaxTimestamp. The known paths are shown below.

1. convertAndAssignOffsetsNonCompressed -> we CAN get correct offsetOfMaxTimestamp when validating all records
2. assignOffsetsNonCompressed -> ditto
3. validateMessagesAndAssignOffsetsCompressed -> ditto
4. validateMessagesAndAssignOffsetsCompressed#buildRecordsAndAssignOffsets -> ditto
5. appendAsFollow#append#analyzeAndValidateRecords -> we CAN'T get correct offsetOfMaxTimestamp as iterating all records is expensive when fetching records from leader
6. LogSegment#recover -> ditto

Reviewers: Jun Rao <junrao@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants