Skip to content

KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP#15213

Merged
showuon merged 3 commits intoapache:trunkfrom
clolov:kip-1005-kafka-16154
Feb 29, 2024
Merged

KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP#15213
showuon merged 3 commits intoapache:trunkfrom
clolov:kip-1005-kafka-16154

Conversation

@clolov
Copy link
Copy Markdown
Contributor

@clolov clolov commented Jan 17, 2024

Summary

This is the first part of the implementation of KIP-1005

The purpose of this pull request is for the broker to start returning the correct offset when it receives a -5 as a timestamp in a ListOffsets API request

Comment on lines -1303 to -1311
val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache => {
val epochOpt: Optional[Integer] = leaderEpochCache.asJava.flatMap(cache => {
val epoch = cache.epochForOffset(curLocalLogStartOffset)
if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else Optional.empty[EpochEntry]()
if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else Optional.empty[Integer]()
})

val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
else Optional.empty[Integer]()

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't really see a point in this check earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset. As far as I can tell the cache.epochForOffset already carries it out. Let me know in case I have misunderstood something.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done for backward compatibility with older message format. You can go through the comments in the previous block:

The first cached epoch usually corresponds to the log start offset, but we have to verify this since it may not be true following a message format version bump as the epoch will not be available for log entries written in the older format.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subjective:

Can we update the method similar to below for readability/clarity:

if (remoteLogEnabled()) {
  val curHighestRemoteOffset = highestOffsetInRemoteStorage()
  var result: Optional[Integer] = Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)
  if (leaderEpochCache.isDefined) {
    val epochOpt = leaderEpochCache.get.epochForOffset(curHighestRemoteOffset)
    if (epochOpt.isPresent) {
      result = Optional.of(epochOpt.getAsInt) 
    }
  }
  Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, highestOffsetInRemoteStorage(), result))
}

Note when the highestOffsetInRemoteStorage is -1, then there won't be corresponding leaderEpoch, we have to return the NO_PARTITION_LEADER_EPOCH constant which is -1.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully I have achieved both in the subsequent commit (reverted one of the changes and made both easier to read). Let me know if this isn't the case!

}

@Test
def testFetchLatestTieredTimestampWithRemoteStorage(): Unit = {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test could be combined with testFetchOffsetByTimestampFromRemoteStorage as the only difference it has are lines 2167, 2193, 2203 and 2204. Let me know your thoughts!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's maintain two tests for clarity. Can you add comments for L2197 log.fetchOffsetByTimestamp(secondTimestamp, Some(remoteLogManager)), that it search for offset from both remote and local storage?

targetTimestamp != ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP &&
targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP)
targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP &&
targetTimestamp != ListOffsetsRequest.LATEST_TIERED_TIMESTAMP)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we allow the timestamp -4 and -5 when IBP is less than 0.10? Tiered Storage is supported from IBP 2.8-IV1

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed them both, but I don't think it would have caused problems either way.

Comment on lines -1303 to -1311
val earliestLocalLogEpochEntry = leaderEpochCache.asJava.flatMap(cache => {
val epochOpt: Optional[Integer] = leaderEpochCache.asJava.flatMap(cache => {
val epoch = cache.epochForOffset(curLocalLogStartOffset)
if (epoch.isPresent) cache.epochEntry(epoch.getAsInt) else Optional.empty[EpochEntry]()
if (epoch.isPresent) Optional.of[Integer](epoch.getAsInt) else Optional.empty[Integer]()
})

val epochOpt = if (earliestLocalLogEpochEntry.isPresent && earliestLocalLogEpochEntry.get().startOffset <= curLocalLogStartOffset)
Optional.of[Integer](earliestLocalLogEpochEntry.get().epoch)
else Optional.empty[Integer]()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is done for backward compatibility with older message format. You can go through the comments in the previous block:

The first cached epoch usually corresponds to the log start offset, but we have to verify this since it may not be true following a message format version bump as the epoch will not be available for log entries written in the older format.


Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, highestOffsetInRemoteStorage(), optEpoch))
} else {
Option.empty
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we return empty, then exception here will be thrown back to the caller. Do we want to throw an error back to the caller (or) empty response?

 Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1))

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! In the KIP I have specified that Kafka should return no offset in such situations. I shall aim to add an integration test from the point of view of the client in an upcoming pull request


log.appendAsLeader(TestUtils.singletonRecords(
value = TestUtils.randomBytes(10),
timestamp = firstTimestamp),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we using the first timestamp again?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is nothing preventing people to send batches of records with out-of-order timestamps (in fact, entries in the timeindex take this into account). I also think that for this particular case it doesn't matter as what I am trying to test here is that we don't get a response back. For the sake of focusing on what the test is supposed to test, however, I have removed this 😊

}

@Test
def testFetchLatestTieredTimestampWithRemoteStorage(): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's maintain two tests for clarity. Can you add comments for L2197 log.fetchOffsetByTimestamp(secondTimestamp, Some(remoteLogManager)), that it search for offset from both remote and local storage?

@divijvaidya divijvaidya added kip Requires or implements a KIP tiered-storage Related to the Tiered Storage feature labels Jan 23, 2024
@clolov clolov force-pushed the kip-1005-kafka-16154 branch from 5d62284 to 791f786 Compare January 23, 2024 11:52
@clolov
Copy link
Copy Markdown
Contributor Author

clolov commented Jan 23, 2024

Thanks a lot for the review @kamalcph! I have hopefully addressed everything 😊

Copy link
Copy Markdown
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the patch!

We can add integration tests once we add support to query the latest-tiered and earliest-local offsets from the admin-client.

@showuon
Copy link
Copy Markdown
Member

showuon commented Jan 31, 2024

Will check this this or next week. Thanks.

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Left some comments.


if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) &&
targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP &&
targetTimestamp != ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP &&
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this is a bug fix, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, Kamal called this out

Comment on lines +1310 to +1304
else Optional.empty[Integer]()
var epochResult: Optional[Integer] = Optional.of(RecordBatch.NO_PARTITION_LEADER_EPOCH)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we change the return for "no leaderEpoch" case from empty to -1? I had a check, it seems won't change anything because the default value of leaderEpoch in ListOffsetsPartitionResponse is -1. Any thought on this change?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct, this is a miss on my side as part of making this piece of code more readable - fixing it in the subsequent commit.

The logic should be

  • For EARLIEST_LOCAL_TIMESTAMP - return empty unless there is a leader epoch
  • For LATEST_TIERED_TIMESTAMP - if highest offset is -1 then return -1, if there is a highest offset then return empty unless there is a leader epoch

@satishd
Copy link
Copy Markdown
Member

satishd commented Feb 2, 2024

I will take a look at this PR by 5th /7th.

@clolov clolov force-pushed the kip-1005-kafka-16154 branch from 791f786 to 4fa7671 Compare February 7, 2024 14:20
@clolov
Copy link
Copy Markdown
Contributor Author

clolov commented Feb 7, 2024

Heya @showuon @kamalcph @satishd, I hope I have addressed the latest comments + rebased!

Copy link
Copy Markdown
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @clolov for the PR. Overall LGTM, left a couple of minor comments.

Comment thread core/src/main/scala/kafka/log/UnifiedLog.scala Outdated
Comment thread core/src/main/scala/kafka/log/UnifiedLog.scala Outdated
@clolov clolov force-pushed the kip-1005-kafka-16154 branch from 4fa7671 to af4eb14 Compare February 8, 2024 13:16
Copy link
Copy Markdown
Member

@satishd satishd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @clolov for addressing the review comments. LGTM.

Copy link
Copy Markdown
Member

@showuon showuon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@showuon
Copy link
Copy Markdown
Member

showuon commented Feb 19, 2024

@clolov , there's a compilation error in jdk 8/scala 2.12. Could you help fix it? Maybe rebasing to the latest trunk could solve it?
https://ci-builds.apache.org/blue/organizations/jenkins/Kafka%2Fkafka-pr/detail/PR-15213/4/pipeline

@showuon
Copy link
Copy Markdown
Member

showuon commented Feb 27, 2024

@clolov , do we have any update about the compilation error fix?

@clolov
Copy link
Copy Markdown
Contributor Author

clolov commented Feb 28, 2024

Apologies, I have had a lot of work recently, I will aim to provide an update by the end of the day today

@clolov clolov force-pushed the kip-1005-kafka-16154 branch from af4eb14 to ac20141 Compare February 28, 2024 18:05
@clolov
Copy link
Copy Markdown
Contributor Author

clolov commented Feb 28, 2024

Heya @showuon, I tried to reproduce the compilation failure of the unrelated test class locally, but have been unable to. I have rebased and kicked off a new build in case this was a temporary failure. If it occurs again I will continue looking into it.

@showuon
Copy link
Copy Markdown
Member

showuon commented Feb 29, 2024

Failed tests are unrelated.

@showuon showuon merged commit 55a6d30 into apache:trunk Feb 29, 2024
Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clolov : Thanks for the PR and sorry for the late review. Left a comment below.

*/
public static final long EARLIEST_LOCAL_TIMESTAMP = -4L;

public static final long LATEST_TIERED_TIMESTAMP = -5L;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically, if we add a new targetTimestamp value, we will need to bump up the version of the ListOffsetsRequest. See https://github.com/apache/kafka/pull/10760/files. Otherwise, a client could be setting LATEST_TIERED_TIMESTAMP and assuming that the server supports it, but the server actually does not.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, thanks a lot for bringing this up in the mailing list and here, I will open a pull request to amend this miss!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for following up, @clolov !

clolov added a commit to clolov/kafka that referenced this pull request Apr 5, 2024
…e#15213)

This is the first part of the implementation of KIP-1005

The purpose of this pull request is for the broker to start returning the correct offset when it receives a -5 as a timestamp in a ListOffsets API request

Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>
Phuc-Hong-Tran pushed a commit to Phuc-Hong-Tran/kafka that referenced this pull request Jun 6, 2024
…e#15213)

This is the first part of the implementation of KIP-1005

The purpose of this pull request is for the broker to start returning the correct offset when it receives a -5 as a timestamp in a ListOffsets API request

Reviewers: Luke Chen <showuon@gmail.com>, Kamal Chandraprakash <kamal.chandraprakash@gmail.com>, Satish Duggana <satishd@apache.org>
jlprat added a commit to jlprat/kafka that referenced this pull request Jun 20, 2024
jlprat added a commit to jlprat/kafka that referenced this pull request Jun 30, 2024
jlprat added a commit that referenced this pull request Jul 2, 2024
…6400)

* Revert "KAFKA-16154: Broker returns offset for LATEST_TIERED_TIMESTAMP (#15213)"
* Set 3.8_IV0 as latest production version in 3.8
* Bring changes committed in KAFKA-16968
* Make ClusterTest annotation metadata default to 3.9

---------

Signed-off-by: Josep Prat <josep.prat@aiven.io>

Reviewers: Justine Olshan <jolshan@confluent.io>, Jun Rao <junrao@gmail.com>
@clolov clolov deleted the kip-1005-kafka-16154 branch January 27, 2026 11:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

kip Requires or implements a KIP tiered-storage Related to the Tiered Storage feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants