Skip to content

Conversation

@Technoboy-
Copy link
Contributor

@Technoboy- Technoboy- commented May 28, 2022

Motivation

When MessageDeduplication#purgeInactiveProducers:

public synchronized void purgeInactiveProducers() {
long minimumActiveTimestamp = System.currentTimeMillis() - TimeUnit.MINUTES
.toMillis(pulsar.getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes());
Iterator<java.util.Map.Entry<String, Long>> mapIterator = inactiveProducers.entrySet().iterator();
boolean hasInactive = false;
while (mapIterator.hasNext()) {
java.util.Map.Entry<String, Long> entry = mapIterator.next();
String producerName = entry.getKey();
long lastActiveTimestamp = entry.getValue();
if (lastActiveTimestamp < minimumActiveTimestamp) {
log.info("[{}] Purging dedup information for producer {}", topic.getName(), producerName);
mapIterator.remove();
highestSequencedPushed.remove(producerName);
highestSequencedPersisted.remove(producerName);
hasInactive = true;
}
}
if (hasInactive) {
takeSnapshot(getManagedCursor().getMarkDeletedPosition());
}
}

If MessageDeduplication status is not Enabled, the cursor will be null. and cause to be NPE at line 475.

See cursor initializes:

} else if ((status == Status.Disabled || status == Status.Initialized) && shouldBeEnabled) {
// Enable deduping
CompletableFuture<Void> future = new CompletableFuture<>();
managedLedger.asyncOpenCursor(PersistentTopic.DEDUPLICATION_CURSOR_NAME, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
// We don't want to retain cache for this cursor
cursor.setAlwaysInactive();
managedCursor = cursor;
recoverSequenceIdsMap().thenRun(() -> {
status = Status.Enabled;
future.complete(null);
log.info("[{}] Enabled deduplication", topic.getName());
}).exceptionally(ex -> {
status = Status.Failed;
log.warn("[{}] Failed to enable deduplication: {}", topic.getName(), ex.getMessage());
future.completeExceptionally(ex);
return null;
});
}

StackTrace

May 25, 2022 @ 13:39:04.552	2022-05-25T05:39:04,546+0000 [pulsar-inactivity-monitor-22-1] ERROR org.apache.bookkeeper.common.util.SafeRunnable - Unexpected throwable caught 
May 25, 2022 @ 13:39:04.552		at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$78(BrokerService.java:1768) 
May 25, 2022 @ 13:39:04.552		at org.apache.pulsar.broker.service.persistent.MessageDeduplication.purgeInactiveProducers(MessageDeduplication.java:475) 
May 25, 2022 @ 13:39:04.552		at java.util.Optional.ifPresent(Optional.java:183) ~[?:?]
May 25, 2022 @ 13:39:04.552		at org.apache.pulsar.broker.service.BrokerService.checkMessageDeduplicationInfo(BrokerService.java:1735) 
May 25, 2022 @ 13:39:04.552		at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
May 25, 2022 @ 13:39:04.552		at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:272) 
May 25, 2022 @ 13:39:04.552		at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) 
May 25, 2022 @ 13:39:04.552	java.lang.NullPointerException: null
May 25, 2022 @ 13:39:04.552		at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:1766) 
May 25, 2022 @ 13:39:04.552		at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) 

Documentation

  • doc-not-needed
    (Please explain why)

@Technoboy- Technoboy- self-assigned this May 28, 2022
@Technoboy- Technoboy- added this to the 2.11.0 milestone May 28, 2022
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label May 28, 2022
@Technoboy- Technoboy- added release/2.9.3 release/2.8.4 release/2.10.1 and removed doc-not-needed Your PR changes do not impact docs labels May 28, 2022
@Technoboy- Technoboy- marked this pull request as ready for review May 28, 2022 02:37
@github-actions
Copy link

@Technoboy-:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@Technoboy- Technoboy- added doc-not-needed Your PR changes do not impact docs area/broker and removed doc-label-missing labels May 28, 2022
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if it could solve the NPE. Is there a race condition like the following code snippet I commented?

        if (hasInactive && isEnabled()) { // 1. isEnabled() returns true, managedCursor is not null
            // during 1 and 2, some other methods changed `managedCursor` to null
            takeSnapshot(getManagedCursor().getMarkDeletedPosition()); // 2. managedCursor might be null

I'd prefer the following way.

        final ManagedCursor cursor = managedCursor;
        if (hasInactive && cursor != null) {
            takeSnapshot(cursor.getMarkDeletedPosition(), cursor);
        }
    private void takeSnapshot(Position position) {
        takeSnapshot(position, getManagedCursor());
    }

    private void takeSnapshot(Position position, ManagedCursor cursor) {
        /* ... */
        cursor.asyncMarkDelete(position, snapshot, new MarkDeleteCallback() {

@Technoboy-
Copy link
Contributor Author

I'm not sure if it could solve the NPE. Is there a race condition like the following code snippet I commented?

        if (hasInactive && isEnabled()) { // 1. isEnabled() returns true, managedCursor is not null
            // during 1 and 2, some other methods changed `managedCursor` to null
            takeSnapshot(getManagedCursor().getMarkDeletedPosition()); // 2. managedCursor might be null

I'd prefer the following way.

        final ManagedCursor cursor = managedCursor;
        if (hasInactive && cursor != null) {
            takeSnapshot(cursor.getMarkDeletedPosition(), cursor);
        }
    private void takeSnapshot(Position position) {
        takeSnapshot(position, getManagedCursor());
    }

    private void takeSnapshot(Position position, ManagedCursor cursor) {
        /* ... */
        cursor.asyncMarkDelete(position, snapshot, new MarkDeleteCallback() {

Method checkStatus and purgeInactiveProducers is synchonized. Seems no race condition.

@Technoboy- Technoboy- merged commit 01d7bfa into apache:master Jun 1, 2022
hangc0276 pushed a commit that referenced this pull request Jun 7, 2022
@hangc0276 hangc0276 added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Jun 7, 2022
codelipenghui pushed a commit to codelipenghui/incubator-pulsar that referenced this pull request Jun 7, 2022
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jun 7, 2022
(cherry picked from commit 01d7bfa)
(cherry picked from commit 532aa85)
nicoloboschi pushed a commit to datastax/pulsar that referenced this pull request Jun 7, 2022
(cherry picked from commit 01d7bfa)
(cherry picked from commit b4c704e)
codelipenghui pushed a commit that referenced this pull request Jun 10, 2022
@codelipenghui codelipenghui added the cherry-picked/branch-2.9 Archived: 2.9 is end of life label Jun 10, 2022
@Technoboy- Technoboy- deleted the fix-msg-dup-npe branch August 10, 2022 05:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants