Skip to content

[fix][broker] Fix message loss during topic compaction#20980

Merged
Technoboy- merged 4 commits intoapache:masterfrom
coderzc:fix_compacted_lost
Aug 14, 2023
Merged

[fix][broker] Fix message loss during topic compaction#20980
Technoboy- merged 4 commits intoapache:masterfrom
coderzc:fix_compacted_lost

Conversation

@coderzc
Copy link
Copy Markdown
Member

@coderzc coderzc commented Aug 11, 2023

Motivation

If a batch contains a message with a null value (i.e. payloadSize == 0) and PhaseOneResult.from haven't been set, then the compactor may skip the whole batch message according to PhaseOneResult.from in phase two, which will lead to message loss during the topic compact. The root cause is deletedMessage of whole batch be marked true when the batch contains one message with a null value.

Also, PhaseOneResult.to has the same problem, when finding the value of the message is null, PhaseOneResult.to will not be updated.

MessageId id = m.getMessageId();
boolean deletedMessage = false;
boolean replaceMessage = false;
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
if (RawBatchConverter.isReadableBatch(m)) {
try {
for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
if (e != null) {
if (e.getRight() > 0) {
MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
replaceMessage = old != null;
} else {
deletedMessage = true;
latestForKey.remove(e.getMiddle());
}
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}
} catch (IOException ioe) {
log.info("Error decoding batch for message {}. Whole batch will be included in output",
id, ioe);
}
} else {
Pair<String, Integer> keyAndSize = extractKeyAndSize(m);
if (keyAndSize != null) {
if (keyAndSize.getRight() > 0) {
MessageId old = latestForKey.put(keyAndSize.getLeft(), id);
replaceMessage = old != null;
} else {
deletedMessage = true;
latestForKey.remove(keyAndSize.getLeft());
}
}
if (replaceMessage || deletedMessage) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
}
}
MessageId first = firstMessageId.orElse(deletedMessage ? null : id);
MessageId to = deletedMessage ? toMessageId.orElse(null) : id;

Modifications

  • Fix deletedMessage calculate in the batch message, just when delete times are equal to the number of messages in the batch to mark deletedMessage=true.

  • Don't extract message with null key in phase one, since messages with null keys should be retained intact.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@coderzc coderzc changed the title [fix][broker] Fix data lost during topic compact [fix][broker] Fix data lost during topic compaction Aug 11, 2023
@github-actions github-actions Bot added the doc-not-needed Your PR changes do not impact docs label Aug 11, 2023
@coderzc coderzc force-pushed the fix_compacted_lost branch from e7f35b0 to 2d3b2be Compare August 11, 2023 12:51
@coderzc coderzc force-pushed the fix_compacted_lost branch from 2d3b2be to 982ea19 Compare August 11, 2023 12:59
@coderzc coderzc self-assigned this Aug 11, 2023
@coderzc coderzc added area/broker type/bug The PR fixed a bug or issue reported a bug labels Aug 11, 2023
@coderzc coderzc changed the title [fix][broker] Fix data lost during topic compaction [fix][broker] Fix message loss during topic compaction Aug 11, 2023
@coderzc coderzc closed this Aug 11, 2023
@coderzc coderzc reopened this Aug 11, 2023
}
});
if (to.equals(id)) {
if (lastReadId.equals(id)) {
Copy link
Copy Markdown
Member Author

@coderzc coderzc Aug 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be a 'return' missing in line-287, we may need to open another PR to fix it.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if we don't stop phase two, the next entry may be dropped because it does not exist in the latestForKey.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another PR will fix it, please see #20988

Copy link
Copy Markdown
Contributor

@gaoran10 gaoran10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Left a question.

}
});
if (to.equals(id)) {
if (lastReadId.equals(id)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if we don't stop phase two, the next entry may be dropped because it does not exist in the latestForKey.

Comment thread pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java Outdated
@coderzc coderzc force-pushed the fix_compacted_lost branch 2 times, most recently from b1854cc to c90dcd4 Compare August 12, 2023 07:39
@coderzc coderzc force-pushed the fix_compacted_lost branch from c90dcd4 to 696a6e5 Compare August 12, 2023 07:40
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Aug 12, 2023

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 73.12%. Comparing base (9862884) to head (df8b903).
Report is 1334 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #20980       +/-   ##
=============================================
+ Coverage     33.53%   73.12%   +39.59%     
- Complexity    12174    32252    +20078     
=============================================
  Files          1621     1875      +254     
  Lines        126919   139443    +12524     
  Branches      13851    15333     +1482     
=============================================
+ Hits          42561   101967    +59406     
+ Misses        78745    29414    -49331     
- Partials       5613     8062     +2449     
Flag Coverage Δ
inttests 24.24% <20.00%> (-0.03%) ⬇️
systests 25.13% <20.00%> (?)
unittests 72.41% <100.00%> (+40.36%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...g/apache/pulsar/client/impl/RawBatchConverter.java 93.50% <100.00%> (+19.82%) ⬆️
...rg/apache/pulsar/compaction/TwoPhaseCompactor.java 74.88% <100.00%> (+4.70%) ⬆️

... and 1520 files with indirect coverage changes

Copy link
Copy Markdown
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, left a minor comment.

Comment on lines +136 to +137
boolean singleDeletedMessage = false;
boolean singleReplaceMessage = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we don't need add above 2 variables to make the code more easily to read

for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(m)) {
    if (e != null) {
        if (e.getRight() > 0) {
            MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
            if (old != null) {
                mxBean.addCompactionRemovedEvent(reader.getTopic());
            }
        } else {
            latestForKey.remove(e.getMiddle());
            deleteCnt++;
            mxBean.addCompactionRemovedEvent(reader.getTopic());
        }
    }
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea.

Copy link
Copy Markdown
Contributor

@Technoboy- Technoboy- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just fix the above comment

@Technoboy- Technoboy- merged commit 3ab420c into apache:master Aug 14, 2023
coderzc added a commit that referenced this pull request Aug 14, 2023
coderzc added a commit that referenced this pull request Aug 15, 2023
coderzc added a commit that referenced this pull request Aug 15, 2023
coderzc added a commit that referenced this pull request Aug 15, 2023
coderzc added a commit that referenced this pull request Aug 15, 2023
coderzc added a commit that referenced this pull request Aug 15, 2023
coderzc added a commit that referenced this pull request Aug 15, 2023
coderzc added a commit that referenced this pull request Aug 15, 2023
coderzc added a commit that referenced this pull request Aug 15, 2023
coderzc added a commit that referenced this pull request Aug 15, 2023
coderzc added a commit that referenced this pull request Aug 15, 2023
coderzc added a commit that referenced this pull request Aug 15, 2023
coderzc added a commit that referenced this pull request Aug 15, 2023
coderzc added a commit that referenced this pull request Aug 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants