Skip to content

KAFKA-13603: Allow the empty active segment to have missing offset index during recovery#11345

Merged
junrao merged 33 commits intoapache:trunkfrom
ccding:last
Jan 27, 2022
Merged

KAFKA-13603: Allow the empty active segment to have missing offset index during recovery#11345
junrao merged 33 commits intoapache:trunkfrom
ccding:last

Conversation

@ccding
Copy link
Copy Markdown
Contributor

@ccding ccding commented Sep 20, 2021

Within a LogSegment, the TimeIndex and OffsetIndex are lazy indices that don't get created on disk until they are accessed for the first time. However, Log recovery logic expects the presence of an offset index file on disk for each segment, otherwise, the segment is considered corrupted.

This PR introduces a forceFlushActiveSegment boolean for the log.flush function to allow the shutdown process to flush the empty active segment, which makes sure the offset index file exists.

Co-Author: Kowshik Prakasam kowshik@gmail.com

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@ccding
Copy link
Copy Markdown
Contributor Author

ccding commented Sep 20, 2021

cc @kowshik

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Sep 22, 2021

@ccding the PR does have unit tests though, so we should update that part of the PR message.

@ccding
Copy link
Copy Markdown
Contributor Author

ccding commented Sep 22, 2021

Updated the Test section

Copy link
Copy Markdown
Contributor

@kowshik kowshik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccding Thanks for the PR. LGTM. Just a minor comment: could we please update the PR description to use kowshik@gmail.com as my email address?

Within a LogSegment, the TimeIndex and OffsetIndex are lazy indices that don't get created on disk until they are accessed for the first time. However, Log recovery logic expects the presence of offset index file on disk for each segment, otherwise the segment is considered corrupted.

Author:    Kowshik Prakasam <kowshik@gmail.com>
@ccding
Copy link
Copy Markdown
Contributor Author

ccding commented Sep 23, 2021

@kowshik Thanks for the code review. Updated your email in the PR description as well as in the commit message.

@ccding
Copy link
Copy Markdown
Contributor Author

ccding commented Sep 24, 2021

Failed tests are irrelevant and passed on my local run.

 Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.mirror.integration.IdentityReplicationIntegrationTest.testOneWayReplicationWithAutoOffsetSync()	4.7 sec	1
 Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithFewerReplicasThanBrokers	56 sec	1

@ccding
Copy link
Copy Markdown
Contributor Author

ccding commented Sep 27, 2021

ping @junrao @ijuma for code review

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccding : Thanks for the PR. Left one comment below.

if (lazyOffsetIndex.file.exists) {
def sanityCheck(timeIndexFileNewlyCreated: Boolean, isActiveSegment: Boolean): Unit = {
// We allow for absence of offset index file only for an empty active segment.
if ((isActiveSegment && size == 0) || lazyOffsetIndex.file.exists) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering why the active segment will be missing the offset index file during a clean shutdown. When we load the segments during broker restart, we call resizeIndexes() on the last segment. This should trigger the creation of the offset index file, which will be flushed on broker shutdown.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we load the segments during broker restart, we call resizeIndexes() on the last segment. This should trigger the creation of the offset index file, which will be flushed on broker shutdown.

The sanityCheck is called before resizeIndexes.

It appears you are talking about we start the broker then immediately shut it down. In between the active segment may have been changed, and if the new one is empty, no index file is created.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still trying to understand if the missing index is the result of a clean shutdown or a hard shutdown. When will roll a segment, the index on the new active segment is created lazily. However, during a clean shutdown, we force flush the active segment, which should trigger the creation of an empty index file because the following method is used in segment flush.

def offsetIndex: OffsetIndex = lazyOffsetIndex.get
On a hard shutdown, it's possible for the offset index to be missing. However, in that case, the offset index can be missing even when the log is not empty. So, I am wondering how common of an issue that we are fixing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao: When the UnifiedLog is flushed during clean shutdown, we flush the LocaLog until the logEndOffset. Here an empty active segment is not included in the list of candidate segments to be flushed. The reason is that during LocalLog.flush(), the LogSegments.values(recoveryPoint, logEndOffset) call here does not select the empty active segment (doc), because, the logEndOffset would match the base offset of the empty active segment and thus get ommitted. So, prior to clean shutdown if the empty active segment's offset index was never created before, then, the offset index will not be created during clean shutdown because the empty active segment is never flushed.

The above is shown in the following passing unit test:

@Test
def testFlushEmptyActiveSegmentDoesNotCreateOffsetIndex(): Unit = {
    // Create an empty log.
    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
    val log = createLog(logDir, logConfig)
    val oneRecord = TestUtils.records(List(
      new SimpleRecord(mockTime.milliseconds, "a".getBytes, "value".getBytes)
    ))

    // Append a record and flush. Verify that there exists only 1 segment.
    log.appendAsLeader(oneRecord, leaderEpoch = 0)
    assertEquals(1, log.logEndOffset)
    log.flush()
    assertEquals(1, log.logSegments.size)
    assertTrue(UnifiedLog.logFile(logDir, 0).exists())
    assertTrue(UnifiedLog.offsetIndexFile(logDir, 0).exists())
    assertFalse(UnifiedLog.logFile(logDir, 1).exists())
    assertFalse(UnifiedLog.offsetIndexFile(logDir, 1).exists())

    // Roll the log and verify that the new active segment's offset index is missing.
    log.roll()
    assertEquals(2, log.logSegments.size)
    assertTrue(UnifiedLog.logFile(logDir, 0).exists())
    assertTrue(UnifiedLog.offsetIndexFile(logDir, 0).exists())
    assertTrue(UnifiedLog.logFile(logDir, 1).exists())
    assertFalse(UnifiedLog.offsetIndexFile(logDir, 1).exists())

    // Flush the log and once again verify that the active segment's offset index is still missing.
    log.flush()
    assertTrue(UnifiedLog.logFile(logDir, 0).exists())
    assertTrue(UnifiedLog.offsetIndexFile(logDir, 0).exists())
    assertTrue(UnifiedLog.logFile(logDir, 1).exists())
    assertFalse(UnifiedLog.offsetIndexFile(logDir, 1).exists())

    // Close the log and verify that the active segment's offset index is still missing.
    log.close()
    assertTrue(UnifiedLog.logFile(logDir, 0).exists())
    assertTrue(UnifiedLog.offsetIndexFile(logDir, 0).exists())
    assertTrue(UnifiedLog.logFile(logDir, 1).exists())
    assertFalse(UnifiedLog.offsetIndexFile(logDir, 1).exists())
}

This PR mainly fixes a logging issue in the code. For example, one situation where the issue happens more frequently is the following: Imagine there exists a topic with very low ingress traffic in some/all partitions. Imagine that for this topic the retention setting causes all existing segments to expire and get removed. In such a case, we roll the log to create an active segment. This ensures there is at least one segment remaining in the LocalLog when the retention loop completes. However we don't create the offset index for the active segment until the first append operation. Now before the first append, if the Kafka cluster is rolled then we will see this false negative corruption error message during recovery.

This PR fixes the logging problem by ignoring the absence of offset index for an empty active segment during recovery.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kowshik : Thanks for the great explanation. It makes sense to me now.

If we don't flush the only empty log segment during clean shutdown, we could lose the log segment file as well, which causes the replica to lose track of logEndOffset. I am wondering if we should force flush the empty log segment during clean shutdown too.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will def flush(): Unit = flush(logEndOffset + 1) trigger flushing empty active segments every time we roll a segment, not only during shutdown?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccding When we roll a segment, we explicitly ensure to flush only the old segment. See this LOC.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kowshik : #8346 tries to avoid opening the index during close() when the index is not opened yet. This applies to existing segments on broker restart. For active segment, we typically need to open the index anyway. So, we probably don't need to optimize the rare case when it's empty. Plus, the danger of losing logEndOffset is a bigger concern than avoiding the cost of opening one index file.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Sounds good to me. We can flush the active segment during clean shutdown. That's a very elegant way to handle this problem.

@ccding Would you like to update the PR with the approach proposed by @junrao?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Please let me know if I misunderstood anything.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccding : Thanks for the updated PR. One more comment below.

* Flush all local log segments
*/
def flush(): Unit = flush(logEndOffset)
def flush(): Unit = flush(logEndOffset + 1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a comment why we need to flush to logEndOffset + 1? Also, could we have a test that verifies the index file is present after an empty segment is rolled and the broker is shut down?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the comment. Will write it later. Was too busy yesterday and didn't have time to do so.

Copy link
Copy Markdown
Contributor

@kowshik kowshik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccding: Thanks for the updated PR. Just one comment below.

* Flush all local log segments
*/
def flush(): Unit = flush(logEndOffset)
def flush(): Unit = flush(logEndOffset + 1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add unit test coverage for this change in UnifiedLogTest.scala?

@ccding
Copy link
Copy Markdown
Contributor Author

ccding commented Oct 4, 2021

While I am working on the test, I have a question:

The log.flush() function is also called at

and
if (localLog.unflushedMessages >= config.flushInterval) flush()

They are not during a shutdown. Will these also flush empty active segments and hurt performance?

cc @junrao @kowshik

@kowshik
Copy link
Copy Markdown
Contributor

kowshik commented Oct 5, 2021

@ccding During a call to log.flush(), we remember here the offset upto which the log was flushed. So, a subsequent flush() does not cause additional disk I/O due to this check unless the logEndOffset has advanced. This means that empty active segments shouldn't add additional burden to the log.flush() operation during each call, unless new empty active segments are generated in between 2 calls but that's quite uncommon (see relevant comment).

Furthermore, we typically don't configure to flush the log periodically or during appends. Even then, when we call log.flush() without passing an offset parameter, the intent was always to flush all data written to the log. It is just a corner case that "all data written to the log" needs to also include an empty active segment for correctness reasons since the logEndOffset is derived from it during recovery. Some related details on documentation below:

(1) Periodic flush:

if(timeSinceLastFlush >= log.config.flushMs)
log.flush()
} catch {

Here, log.flush() executes only when the timeSinceLastFlush >= log.config.flushMs. The flush.ms configuration is documented here and it has a default value of Long.MaxValue (very high!) defined here. The doc recommends us not to override the config unless needed.

(2) Flush during appends:

if (localLog.unflushedMessages >= config.flushInterval) flush()

Here, flush() executes only when unflushedMessages >= config.flushInterval. Similar explanation to the above. The flush.messages configuration is documented here and it has a default value of Long.MaxValue (very high!) defined here. The doc recommends us not to override the config unless needed.

@kowshik
Copy link
Copy Markdown
Contributor

kowshik commented Oct 21, 2021

@ccding Are you planning to add a unit test for this PR, and address the recent review comments?

@ccding
Copy link
Copy Markdown
Contributor Author

ccding commented Oct 21, 2021

@kowshik I will find some time to work on it

@ccding
Copy link
Copy Markdown
Contributor Author

ccding commented Oct 27, 2021

@junrao @kowshik I addressed the above comments. PTAL

@ccding
Copy link
Copy Markdown
Contributor Author

ccding commented Oct 27, 2021

The change failed recovery point check at

verifyRecoveredLog(log, lastOffset)

I am not sure what to do. Please advise.

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Nov 1, 2021

@ccding : Thanks for reporting the test failure. This brings up a good point. It's kind of weird to ever have recovery point > log end offset. I am thinking that another potential way to fix this is for the flush() call in close() from recovery point to log end offset to be inclusive on both ends. This way, a flush of (log end offset, log end offset) will force a flush since it's needed for flushing the metadata of log end offset. All other flushes will still be exclusive on the right end since they don't need to preserve the metadata on the right end.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccding : Thanks for the updated PR. A couple of more comments.


/**
* Flush all local log segments
* We have to pass logEngOffset + 1 to the `def flush(offset: Long): Unit` function to flush empty
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is in the wrong place.

def close(): Unit = {
debug("Closing log")
lock synchronized {
flush(logEndOffset + 1)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to have the same problem that the log recovery point could be moved to logEndOffset + 1, which is a bit weird?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, this doesn't work. I am trying to figure out a proper solution.

@ccding
Copy link
Copy Markdown
Contributor Author

ccding commented Dec 22, 2021

@kowshik @junrao @hachikuji I have addressed the review comments and please take a look. thanks

@ccding
Copy link
Copy Markdown
Contributor Author

ccding commented Jan 6, 2022

ping @kowshik @junrao @hachikuji

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccding : Thanks for the updated PR. A few more comments.


override def flush(): Unit = {
log.flush()
override def flush(inclusive: Boolean): Unit = {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should inclusive be renamed to forceFlushActiveSegment?


/**
* Flush local log segments for all offsets up to offset-1 if includingOffset=false; up to offset
* if includingOffset=true. The recovery point is set to offset-1.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is inaccurate. The recovery point is always offset.

* Flush local log segments for all offsets up to offset-1 if includingOffset=false; up to offset
* if includingOffset=true. The recovery point is set to offset-1.
*
* @param offset The offset to flush up to (non-inclusive); the new recovery point
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we get rid of "(non-inclusive)"?

private def flush(offset: Long, includingOffset: Boolean): Unit = {
val flushOffset = if (includingOffset) offset + 1 else offset
val newRecoveryPoint = offset
maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $flushOffset and recovery point $newRecoveryPoint") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of $flushOffset, perhaps it's clearer to use "$offset(ex/inclusive)"? Ditto for the debug logging below.

case _: NoSuchFileException =>
error(s"${params.logIdentifier}Could not find offset index file corresponding to log file" +
s" ${segment.log.file.getAbsolutePath}, recovering segment and rebuilding index files...")
if (segment.baseOffset < params.recoveryPointCheckpoint)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is correct if hadCleanShutdown is false.

If hadCleanShutdown is true, it seems the condition should be segment.baseOffset <=params.recoveryPointCheckpoint. Or maybe we should just always log the error if hadCleanShutdown is true.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if hadCleanShutdown is true, it should never throw NoSuchFileException unless there is a bug in the code.

Added the hadCleanShutdown check anyways to catch potential issues: if (params.hadCleanShutdown || segment.baseOffset < params.recoveryPointCheckpoint)

* @param inclusive Whether the flush includes the log end offset. Should be `true` during close; otherwise false.
*/
void flush();
void flush(boolean inclusive);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use forceFlushActiveSegment in UnifiedLog.flush(). Should we be consistent with the name?

assertEquals(lastOffset, log.recoveryPoint, s"Unexpected recovery point")
assertEquals(numMessages, log.logEndOffset, s"Should have $numMessages messages when log is reopened w/o recovery")
assertEquals(0, log.activeSegment.timeIndex.entries, "Should have same number of time index entries as before.")
log.activeSegment.sanityCheck(true) // this should not throw
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a comment why this check won't throw after re-instantiating the log?

assertThrows(classOf[NoSuchFileException], () => log.activeSegment.sanityCheck(true))
var lastOffset = log.logEndOffset

log = createLog(logDir, logConfig, recoveryPoint = lastOffset, lastShutdownClean = false)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call log.closeHandlers() before assigning a new value to log? Otherwise, it seems that we are leaking file handles.

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccding : Thanks for the updated PR. Just a couple of more minor comments.

+ { if (includingOffset) "inclusive" else "exclusive" }
+ s") and recovery point $newRecoveryPoint") {
if (flushOffset > localLog.recoveryPoint) {
debug(s"Flushing log up to offset $flushOffset with recovery point $newRecoveryPoint, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of $flushOffset, could we change to "$offset(ex/inclusive)"?

/**
* Flush the current log to disk.
*
* @param forceFlushActiveSegment Whether the flush includes the log end offset. Should be `true` during close; otherwise false.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether the flush includes the log end offset => Whether to force flush the active segment?

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccding : Thanks for the updated PR. LGTM

@hachikuji : Any other comments from you?

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Jan 19, 2022

@ccding : The PR is kind of large now. Could you associate the PR with a jira? Thanks.

@ccding ccding changed the title Allow empty last segment to have missing offset index during recovery KAFKA-13603: Allow empty active segment to have missing offset index during recovery Jan 20, 2022
@ccding ccding changed the title KAFKA-13603: Allow empty active segment to have missing offset index during recovery KAFKA-13603: Allow the empty active segment to have missing offset index during recovery Jan 20, 2022
Copy link
Copy Markdown
Contributor

@kowshik kowshik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccding Thanks for the PR. LGTM. Just a small comment below.

s") and recovery point $newRecoveryPoint") {
if (flushOffset > localLog.recoveryPoint) {
debug(s"Flushing log up to offset (" +
{ if (includingOffset) "inclusive" else "exclusive" } +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clause if (includingOffset) "inclusive" else "exclusive" } is redundant. This can be extracted into a separate variable, or just eliminate it and instead you could just print the value of includingOffset.

@junrao
Copy link
Copy Markdown
Contributor

junrao commented Jan 26, 2022

@ccding : There is a conflict now. Could you rebase?

Copy link
Copy Markdown
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ccding : Thanks for rebasing. One more comment below. Also, do you know why all tests are failing?

val flushOffset = if (includingOffset) offset + 1 else offset
val newRecoveryPoint = offset
maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset=$offset, " +
s"includingOffset=$includingOffset, newRecoveryPoint=$newRecoveryPoint") {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the logging that you had before the last commit is more intuitive.

Copy link
Copy Markdown
Contributor

@kowshik kowshik left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updated PR. Have a small comment below.

if (flushOffset > localLog.recoveryPoint) {
debug(s"Flushing log up to offset=$offset, includingOffset=$includingOffset, " +
s"newRecoveryPoint=$newRecoveryPoint, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " +
debug(s"Flushing log up to offset ($includingOffsetStr)" +
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think perhaps you meant to use offset=$offset?

@junrao junrao merged commit a21aec8 into apache:trunk Jan 27, 2022
@junrao
Copy link
Copy Markdown
Contributor

junrao commented Jan 27, 2022

@ccding : Thanks for the latest PR. LGTM. Merged to trunk.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants