From 5d5189a837ab68b0d27dd8813cd9d6bb6039bdbe Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Wed, 31 Aug 2022 18:35:51 +0800 Subject: [PATCH 01/11] PIP-180 Part VI, Add ShadowManagedLedgerImpl - finish processSourceManagedLedgerInfo - finish internalAsyncAddEntry - ShadowTopicTest pass - ShadowReplicatorTest pass Add test cases --- .../mledger/ManagedLedgerConfig.java | 5 + .../mledger/ManagedLedgerException.java | 15 + .../mledger/impl/ManagedLedgerImpl.java | 79 ++-- .../bookkeeper/mledger/impl/MetaStore.java | 17 + .../mledger/impl/MetaStoreImpl.java | 49 ++- .../bookkeeper/mledger/impl/OpAddEntry.java | 12 + .../mledger/impl/ShadowManagedLedgerImpl.java | 337 +++++++++++++++++- .../pulsar/broker/service/AbstractTopic.java | 17 +- .../pulsar/broker/service/BrokerService.java | 2 +- .../pulsar/broker/service/Producer.java | 64 +++- .../pulsar/broker/service/ServerCnx.java | 8 +- .../service/persistent/PersistentTopic.java | 11 + .../persistent/ShadowReplicatorTest.java | 15 +- .../service/persistent/ShadowTopicTest.java | 163 ++++++++- 14 files changed, 709 insertions(+), 85 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index c02a781fda354..f41e4206238b0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -33,6 +33,7 @@ import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; import org.apache.commons.collections4.MapUtils; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet; /** @@ -86,6 +87,10 @@ public class ManagedLedgerConfig { private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; + @Getter + @Setter + private TopicName topicName = null; + public boolean isCreateIfMissing() { return createIfMissing; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 1fa565d6ec788..66f88626db444 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -89,6 +89,21 @@ public ManagedLedgerFencedException(Exception e) { } } + public static class ManagedLedgerSourceNotReadyException extends ManagedLedgerException { + + public ManagedLedgerSourceNotReadyException(String msg) { + super(msg); + } + + public ManagedLedgerSourceNotReadyException(Throwable e) { + super(e); + } + + public ManagedLedgerSourceNotReadyException(String msg, Throwable e) { + super(msg, e); + } + } + public static class ManagedLedgerNotFoundException extends ManagedLedgerException { public ManagedLedgerNotFoundException(Exception e) { super(e); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 46f79a7146279..f322ac9d2ae29 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -152,7 +152,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { protected final BookKeeper bookKeeper; protected final String name; private final Map ledgerMetadata; - private final BookKeeper.DigestType digestType; + protected final BookKeeper.DigestType digestType; protected ManagedLedgerConfig config; protected Map propertiesMap; @@ -164,7 +164,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { .concurrencyLevel(1) // number of sections .build(); protected final NavigableMap ledgers = new ConcurrentSkipListMap<>(); - private volatile Stat ledgersStat; + protected volatile Stat ledgersStat; // contains all cursors, where durable cursors are ordered by mark delete position private final ManagedCursorContainer cursors = new ManagedCursorContainer(); @@ -215,9 +215,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final CallbackMutex offloadMutex = new CallbackMutex(); private static final CompletableFuture NULL_OFFLOAD_PROMISE = CompletableFuture .completedFuture(PositionImpl.LATEST); - private volatile LedgerHandle currentLedger; - private long currentLedgerEntries = 0; - private long currentLedgerSize = 0; + protected volatile LedgerHandle currentLedger; + protected long currentLedgerEntries = 0; + protected long currentLedgerSize = 0; private volatile long lastLedgerCreatedTimestamp = 0; private volatile long lastLedgerCreationFailureTimestamp = 0; private long lastLedgerCreationInitiationTimestamp = 0; @@ -236,9 +236,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { volatile PositionImpl lastConfirmedEntry; - private ManagedLedgerInterceptor managedLedgerInterceptor; + protected ManagedLedgerInterceptor managedLedgerInterceptor; - private volatile long lastAddEntryTimeMs = 0; + protected volatile long lastAddEntryTimeMs = 0; private long inactiveLedgerRollOverTimeMs = 0; protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3; @@ -285,7 +285,7 @@ public enum PositionBound { startIncluded, startExcluded } - private static final AtomicReferenceFieldUpdater STATE_UPDATER = + protected static final AtomicReferenceFieldUpdater STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state"); protected volatile State state = null; private volatile boolean migrated = false; @@ -294,7 +294,7 @@ public enum PositionBound { private final OrderedScheduler scheduledExecutor; @Getter - private final OrderedExecutor executor; + protected final OrderedExecutor executor; @Getter private final ManagedLedgerFactoryImpl factory; @@ -465,7 +465,11 @@ public void operationFailed(MetaStoreException e) { scheduleTimeoutTask(); } - private synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) { + protected boolean isLedgersReadonly() { + return false; + } + + protected synchronized void initializeBookKeeper(final ManagedLedgerInitializeLedgerCallback callback) { if (log.isDebugEnabled()) { log.debug("[{}] initializing bookkeeper; ledgers {}", name, ledgers); } @@ -550,7 +554,7 @@ public void operationFailed(MetaStoreException e) { }, ledgerMetadata); } - private void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) { + protected void initializeCursors(final ManagedLedgerInitializeLedgerCallback callback) { if (log.isDebugEnabled()) { log.debug("[{}] initializing cursors", name); } @@ -791,7 +795,7 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback })); } - private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { + protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { if (!beforeAddEntry(addOperation)) { return; } @@ -861,7 +865,7 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { lastAddEntryTimeMs = System.currentTimeMillis(); } - private boolean beforeAddEntry(OpAddEntry addOperation) { + protected boolean beforeAddEntry(OpAddEntry addOperation) { // if no interceptor, just return true to make sure addOperation will be initiate() if (managedLedgerInterceptor == null) { return true; @@ -1609,7 +1613,7 @@ public void operationFailed(MetaStoreException e) { } } - private void handleBadVersion(Throwable e) { + protected void handleBadVersion(Throwable e) { if (e instanceof BadVersionException) { setFenced(); } @@ -1649,7 +1653,7 @@ void createNewOpAddEntryForNewLedger() { } while (existsOp != null && --pendingSize > 0); } - private synchronized void updateLedgersIdsComplete() { + protected synchronized void updateLedgersIdsComplete() { STATE_UPDATER.set(this, State.LedgerOpened); updateLastLedgerCreatedTimeAndScheduleRolloverTask(); @@ -2542,8 +2546,8 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { return; } - List ledgersToDelete = new ArrayList(); - List offloadedLedgersToDelete = new ArrayList(); + List ledgersToDelete = new ArrayList<>(); + List offloadedLedgersToDelete = new ArrayList<>(); Optional optionalOffloadPolicies = Optional.ofNullable(config.getLedgerOffloader() != null && config.getLedgerOffloader() != NullLedgerOffloader.INSTANCE ? config.getLedgerOffloader().getOffloadPolicies() @@ -2671,23 +2675,8 @@ void internalTrimLedgers(boolean isTruncate, CompletableFuture promise) { return; } - PositionImpl currentLastConfirmedEntry = lastConfirmedEntry; - // Update metadata - for (LedgerInfo ls : ledgersToDelete) { - if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) { - // this info is relevant because the lastMessageId won't be available anymore - log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be " - + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry); - } + doDeleteLedgers(ledgersToDelete); - invalidateReadHandle(ls.getLedgerId()); - - ledgers.remove(ls.getLedgerId()); - NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries()); - TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize()); - - entryCache.invalidateAllEntries(ls.getLedgerId()); - } for (LedgerInfo ls : offloadedLedgersToDelete) { LedgerInfo.Builder newInfoBuilder = ls.toBuilder(); newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true); @@ -2737,6 +2726,26 @@ public void operationFailed(MetaStoreException e) { } } + protected void doDeleteLedgers(List ledgersToDelete) { + PositionImpl currentLastConfirmedEntry = lastConfirmedEntry; + // Update metadata + for (LedgerInfo ls : ledgersToDelete) { + if (currentLastConfirmedEntry != null && ls.getLedgerId() == currentLastConfirmedEntry.getLedgerId()) { + // this info is relevant because the lastMessageId won't be available anymore + log.info("[{}] Ledger {} contains the current last confirmed entry {}, and it is going to be " + + "deleted", name, ls.getLedgerId(), currentLastConfirmedEntry); + } + + invalidateReadHandle(ls.getLedgerId()); + + ledgers.remove(ls.getLedgerId()); + NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, -ls.getEntries()); + TOTAL_SIZE_UPDATER.addAndGet(this, -ls.getSize()); + + entryCache.invalidateAllEntries(ls.getLedgerId()); + } + } + /** * Non-durable cursors have to be moved forward when data is trimmed since they are not retain that data. * This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped @@ -3696,7 +3705,7 @@ public NavigableMap getLedgersInfo() { return ledgers; } - private ManagedLedgerInfo getManagedLedgerInfo() { + protected ManagedLedgerInfo getManagedLedgerInfo() { return buildManagedLedgerInfo(ledgers); } @@ -4284,7 +4293,7 @@ public CompletableFuture> getEnsemblesAsync(long ledgerId) { }); } - private void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { + protected void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { this.lastLedgerCreatedTimestamp = clock.millis(); if (config.getMaximumRolloverTimeMs() > 0) { if (checkLedgerRollTask != null && !checkLedgerRollTask.isDone()) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java index b4c1383b77273..21e12d81a727d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java @@ -43,6 +43,10 @@ interface MetaStoreCallback { void operationFailed(MetaStoreException e); } + interface UpdateCallback { + void onUpdate(T result, Stat stat); + } + /** * Get the metadata used by the ManagedLedger. * @@ -71,6 +75,19 @@ default void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, Map properties, MetaStoreCallback callback); + /** + * Watch the metadata used by the ManagedLedger. + * @param ledgerName + * @param callback + */ + void watchManagedLedgerInfo(String ledgerName, UpdateCallback callback); + + /** + * Unwatch the metadata changes for ledgerName. + * @param ledgerName + */ + void unwatchManagedLedgerInfo(String ledgerName); + /** * * @param ledgerName diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index 94f332bd931d5..1ac6c3cbb0877 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -30,7 +30,9 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; +import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.mledger.ManagedLedgerException; @@ -47,10 +49,12 @@ import org.apache.pulsar.common.compression.CompressionCodecProvider; import org.apache.pulsar.metadata.api.MetadataStore; import org.apache.pulsar.metadata.api.MetadataStoreException; +import org.apache.pulsar.metadata.api.Notification; +import org.apache.pulsar.metadata.api.NotificationType; import org.apache.pulsar.metadata.api.Stat; @Slf4j -public class MetaStoreImpl implements MetaStore { +public class MetaStoreImpl implements MetaStore, Consumer { private static final String BASE_NODE = "/managed-ledgers"; private static final String PREFIX = BASE_NODE + "/"; @@ -62,11 +66,15 @@ public class MetaStoreImpl implements MetaStore { private final CompressionType ledgerInfoCompressionType; private final CompressionType cursorInfoCompressionType; + private final Map> managedLedgerInfoUpdateCallbackMap; + public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) { this.store = store; this.executor = executor; this.ledgerInfoCompressionType = CompressionType.NONE; this.cursorInfoCompressionType = CompressionType.NONE; + managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>(); + store.registerListener(this); } public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String ledgerInfoCompressionType, @@ -75,6 +83,8 @@ public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String ledge this.executor = executor; this.ledgerInfoCompressionType = parseCompressionType(ledgerInfoCompressionType); this.cursorInfoCompressionType = parseCompressionType(cursorInfoCompressionType); + managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>(); + store.registerListener(this); } private CompressionType parseCompressionType(String value) { @@ -327,6 +337,43 @@ public CompletableFuture asyncExists(String path) { return store.exists(PREFIX + path); } + @Override + public void watchManagedLedgerInfo(String ledgerName, UpdateCallback callback) { + managedLedgerInfoUpdateCallbackMap.put(PREFIX + ledgerName, callback); + } + + @Override + public void unwatchManagedLedgerInfo(String ledgerName) { + managedLedgerInfoUpdateCallbackMap.remove(PREFIX + ledgerName); + } + + @Override + public void accept(Notification notification) { + if (!notification.getPath().startsWith(PREFIX) || notification.getType() != NotificationType.Modified) { + return; + } + UpdateCallback callback = managedLedgerInfoUpdateCallbackMap.get(notification.getPath()); + if (callback == null) { + return; + } + String ledgerName = notification.getPath().substring(PREFIX.length()); + store.get(notification.getPath()).thenAcceptAsync(optResult -> { + if (optResult.isPresent()) { + ManagedLedgerInfo info; + try { + info = parseManagedLedgerInfo(optResult.get().getValue()); + info = updateMLInfoTimestamp(info); + callback.onUpdate(info, optResult.get().getStat()); + } catch (InvalidProtocolBufferException e) { + log.error("[{}] Error when parseManagedLedgerInfo", ledgerName, e); + } + } + }, executor.chooseThread(ledgerName)).exceptionally(ex -> { + log.error("[{}] Error when read ManagedLedgerInfo", ledgerName, ex); + return null; + }); + } + // // update timestamp if missing or 0 // 3 cases - timestamp does not exist for ledgers serialized before diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 43ad349ca84b9..ca36ab944c8b9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -33,6 +33,7 @@ import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; import org.apache.bookkeeper.mledger.util.SafeRun; import org.apache.bookkeeper.util.SafeRunnable; @@ -143,6 +144,17 @@ public void initiate() { } } + public void initiateShadowWrite() { + if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) { + addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml); + lastInitTime = System.nanoTime(); + //Use entryId in PublishContext and call addComplete directly. + this.addComplete(BKException.Code.OK, ledger, ((Position) ctx).getEntryId(), addOpCount); + } else { + log.warn("[{}] initiate with unexpected state {}, expect OPEN state.", ml.getName(), state); + } + } + public void failed(ManagedLedgerException e) { AddEntryCallback cb = callbackUpdater.getAndSet(this, null); if (cb != null) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java index 4bc0b1c87251e..28a427677581c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -18,40 +18,365 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException; +import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.AsyncCallback; +import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.util.OrderedScheduler; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.metadata.api.Stat; /** * Working in progress until PIP-180 is finished. - * Currently, it works nothing different with ManagedLedgerImpl. */ @Slf4j public class ShadowManagedLedgerImpl extends ManagedLedgerImpl { private final TopicName shadowSource; private final String sourceMLName; + private volatile Stat sourceLedgersStat; public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store, ManagedLedgerConfig config, OrderedScheduler scheduledExecutor, String name, final Supplier mlOwnershipChecker) { super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker); - this.shadowSource = TopicName.get(config.getShadowSource()); - this.sourceMLName = shadowSource.getPersistenceNamingEncoding(); + if (config.getTopicName().isPartitioned() && TopicName.getPartitionIndex(config.getShadowSource()) == -1) { + this.shadowSource = + TopicName.get(config.getShadowSource()).getPartition(config.getTopicName().getPartitionIndex()); + } else { + this.shadowSource = TopicName.get(config.getShadowSource()); + } + this.sourceMLName = + shadowSource.getPersistenceNamingEncoding(); } + /** + * ShadowManagedLedger init steps: + * 1. this.initialize : read source managedLedgerInfo + * 2. super.initialize : read its own read source managedLedgerInfo + * 3. this.initializeBookKeeper + * 4. super.initializeCursors + * @param callback + * @param ctx + */ @Override synchronized void initialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) { - // TODO: ShadowManagedLedger has different initialize process from normal ManagedLedger, - // which is complicated and will be implemented in the next PRs. - super.initialize(callback, ctx); + log.info("Opening shadow managed ledger {} with source={}", name, sourceMLName); + + executor.executeOrdered(name, safeRun(() -> doInitialize(callback, ctx))); + } + + private void doInitialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) { + // Fetch the list of existing ledgers in the source managed ledger + store.watchManagedLedgerInfo(sourceMLName, (managedLedgerInfo, stat) -> + executor.executeOrdered(name, safeRun(() -> processSourceManagedLedgerInfo(managedLedgerInfo, stat))) + ); + store.getManagedLedgerInfo(sourceMLName, false, null, new MetaStore.MetaStoreCallback<>() { + @Override + public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) { + if (log.isDebugEnabled()) { + log.debug("[{}][{}] Source ML info:{}", name, sourceMLName, mlInfo); + } + sourceLedgersStat = stat; + // Fails if init with empty ledger. Very small chance here, since shadow topic is + // created when source topic exists. + if (mlInfo.getLedgerInfoCount() == 0) { + log.warn("[{}] Source topic ledger list is empty! source={},mlInfo={},stat={}", name, sourceMLName, + mlInfo, stat); +// callback.initializeFailed(new ManagedLedgerException.ManagedLedgerSourceNotReadyException( +// "Source managed ledger " + sourceMLName + " is not ready yet.")); + ShadowManagedLedgerImpl.super.initialize(callback, ctx); + return; + } + + if (mlInfo.hasTerminatedPosition()) { + state = State.Terminated; + lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition()); + log.info("[{}][{}] Recovering managed ledger terminated at {}", name, sourceMLName, + lastConfirmedEntry); + } + + for (LedgerInfo ls : mlInfo.getLedgerInfoList()) { + ledgers.put(ls.getLedgerId(), ls); + } + + final long lastLedgerId = ledgers.lastKey(); + mbean.startDataLedgerOpenOp(); + AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> executor.executeOrdered(name, safeRun(() -> { + mbean.endDataLedgerOpenOp(); + if (log.isDebugEnabled()) { + log.debug("[{}] Opened source ledger {}", name, lastLedgerId); + } + if (rc == BKException.Code.OK) { + LedgerInfo info = + LedgerInfo.newBuilder() + .setLedgerId(lastLedgerId) + .setEntries(lh.getLastAddConfirmed() + 1) + .setSize(lh.getLength()) + .setTimestamp(clock.millis()).build(); + ledgers.put(lastLedgerId, info); + + //Always consider the last ledger is opened in source. + STATE_UPDATER.set(ShadowManagedLedgerImpl.this, State.LedgerOpened); + currentLedger = lh; + + if (managedLedgerInterceptor != null) { + managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh) + .thenRun(() -> ShadowManagedLedgerImpl.super.initialize(callback, ctx)) + .exceptionally(ex -> { + callback.initializeFailed( + new ManagedLedgerException.ManagedLedgerInterceptException( + ex.getCause())); + return null; + }); + } else { + ShadowManagedLedgerImpl.super.initialize(callback, ctx); + } + } else if (isNoSuchLedgerExistsException(rc)) { + log.warn("[{}] Source ledger not found: {}", name, lastLedgerId); + ledgers.remove(lastLedgerId); + ShadowManagedLedgerImpl.super.initialize(callback, ctx); + } else { + log.error("[{}] Failed to open source ledger {}: {}", name, lastLedgerId, + BKException.getMessage(rc)); + callback.initializeFailed(createManagedLedgerException(rc)); + } + })); + //open ledger in readonly mode. + bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, config.getPassword(), opencb, null); + + } + + @Override + public void operationFailed(ManagedLedgerException.MetaStoreException e) { + if (e instanceof ManagedLedgerException.MetadataNotFoundException) { + callback.initializeFailed(new ManagedLedgerException.ManagedLedgerNotFoundException(e)); + } else { + callback.initializeFailed(new ManagedLedgerException(e)); + } + } + }); } public TopicName getShadowSource() { return shadowSource; } + + @Override + protected boolean isLedgersReadonly() { + return true; + } + + @Override + protected synchronized void initializeBookKeeper(ManagedLedgerInitializeLedgerCallback callback) { + if (log.isDebugEnabled()) { + log.debug("[{}] initializing bookkeeper for shadowManagedLedger; ledgers {}", name, ledgers); + } + + // Calculate total entries and size + Iterator iterator = ledgers.values().iterator(); + while (iterator.hasNext()) { + LedgerInfo li = iterator.next(); + if (li.getEntries() > 0) { + NUMBER_OF_ENTRIES_UPDATER.addAndGet(this, li.getEntries()); + TOTAL_SIZE_UPDATER.addAndGet(this, li.getSize()); + } else if (li.getLedgerId() != currentLedger.getId()) { + //do not remove the last empty ledger. + iterator.remove(); + } + } + + initLastConfirmedEntry(); + // Save it back to ensure all nodes exist and properties are persisted. + store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, new MetaStore.MetaStoreCallback<>() { + @Override + public void operationComplete(Void result, Stat stat) { + ledgersStat = stat; + initializeCursors(callback); + } + + @Override + public void operationFailed(ManagedLedgerException.MetaStoreException e) { + handleBadVersion(e); + callback.initializeFailed(new ManagedLedgerException(e)); + } + }); + } + + private void initLastConfirmedEntry() { + if (lastConfirmedEntry != null || currentLedger == null) { + return; + } + lastConfirmedEntry = new PositionImpl(currentLedger.getId(), currentLedger.getLastAddConfirmed()); + // bypass empty ledgers, find last ledger with Message if possible. + while (lastConfirmedEntry.getEntryId() == -1) { + Map.Entry formerLedger = ledgers.lowerEntry(lastConfirmedEntry.getLedgerId()); + if (formerLedger != null) { + LedgerInfo ledgerInfo = formerLedger.getValue(); + lastConfirmedEntry = PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1); + } else { + break; + } + } + } + + @Override + protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { + if (!beforeAddEntry(addOperation)) { + return; + } + if (state == State.Terminated) { + addOperation.failed(new ManagedLedgerException.ManagedLedgerTerminatedException( + "Managed ledger was already terminated")); + return; + } + if (state != State.LedgerOpened) { + addOperation.failed(new ManagedLedgerException("Managed ledger is not opened")); + return; + } + + if (addOperation.getCtx() == null || !(addOperation.getCtx() instanceof Position position)) { + addOperation.failed(new ManagedLedgerException("Illegal addOperation context object.")); + return; + } + + if (log.isDebugEnabled()) { + log.debug("[{}] Add entry into shadow ledger lh={} entries={}, pos=({},{})", + name, currentLedger.getId(), currentLedgerEntries, position.getLedgerId(), position.getEntryId()); + } + pendingAddEntries.add(addOperation); + if (position.getLedgerId() == currentLedger.getId()) { + // Write into lastLedger + addOperation.setLedger(currentLedger); + currentLedgerEntries = position.getEntryId(); + currentLedgerSize += addOperation.data.readableBytes(); + addOperation.initiateShadowWrite(); + } + lastAddEntryTimeMs = System.currentTimeMillis(); + } + + /** + * Handle source ManagedLedgerInfo updates. + * Update types: + * 1. new ledgers. + * 2. old ledgers deleted. + * 3. old ledger offload info updated (including ledger deleted from bookie by offloader) + * + */ + private synchronized void processSourceManagedLedgerInfo(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) { + + if (log.isDebugEnabled()) { + log.debug("[{}][{}] new SourceManagedLedgerInfo:{}", name, sourceMLName, mlInfo); + } + + sourceLedgersStat = stat; + + if (mlInfo.hasTerminatedPosition()) { + state = State.Terminated; + lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition()); + log.info("[{}][{}] Process managed ledger terminated at {}", name, sourceMLName, lastConfirmedEntry); + } + + TreeMap newLedgerInfos = new TreeMap<>(); + for (LedgerInfo ls : mlInfo.getLedgerInfoList()) { + newLedgerInfos.put(ls.getLedgerId(), ls); + } + + for (Map.Entry ledgerInfoEntry : newLedgerInfos.entrySet()) { + Long ledgerId = ledgerInfoEntry.getKey(); + LedgerInfo ledgerInfo = ledgerInfoEntry.getValue(); + if (ledgerInfo.getEntries() > 0) { + LedgerInfo oldLedgerInfo = ledgers.put(ledgerId, ledgerInfo); + if (oldLedgerInfo == null) { + log.info("[{}]Read new ledger info from source,ledgerId={}", name, ledgerId); + } else { + if (!oldLedgerInfo.equals(ledgerInfo)) { + log.info("[{}] Old ledger info updated in source,ledgerId={}", name, ledgerId); + // ledger deleted from bookkeeper by offloader. + if (ledgerInfo.hasOffloadContext() + && ledgerInfo.getOffloadContext().getBookkeeperDeleted() + && (!oldLedgerInfo.hasOffloadContext() || !oldLedgerInfo.getOffloadContext() + .getBookkeeperDeleted())) { + log.info("[{}] Old ledger removed from bookkeeper by offloader in source,ledgerId={}", name, + ledgerId); + invalidateReadHandle(ledgerId); + } + } + } + } + } + Long lastLedgerId = newLedgerInfos.lastKey(); + // open the last ledger. + if (lastLedgerId != null && !(currentLedger != null && currentLedger.getId() == lastLedgerId)) { + ledgers.put(lastLedgerId, newLedgerInfos.get(lastLedgerId)); + mbean.startDataLedgerOpenOp(); + //open ledger in readonly mode. + bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, config.getPassword(), + (rc, lh, ctx1) -> executor.executeOrdered(name, safeRun(() -> { + mbean.endDataLedgerOpenOp(); + if (log.isDebugEnabled()) { + log.debug("[{}] Opened new source ledger {}", name, lastLedgerId); + } + if (rc == BKException.Code.OK) { + LedgerInfo info = LedgerInfo.newBuilder() + .setLedgerId(lastLedgerId) + .setEntries(lh.getLastAddConfirmed() + 1) + .setSize(lh.getLength()) + .setTimestamp(clock.millis()).build(); + ledgers.put(lastLedgerId, info); + currentLedger = lh; + currentLedgerEntries = 0; + currentLedgerSize = 0; + initLastConfirmedEntry(); + updateLedgersIdsComplete(); + maybeUpdateCursorBeforeTrimmingConsumedLedger(); + } else if (isNoSuchLedgerExistsException(rc)) { + log.warn("[{}] Source ledger not found: {}", name, lastLedgerId); + ledgers.remove(lastLedgerId); + } else { + log.error("[{}] Failed to open source ledger {}: {}", name, lastLedgerId, + BKException.getMessage(rc)); + } + })), null); + } + + //handle old ledgers deleted. + List ledgersToDelete = new ArrayList<>(ledgers.headMap(newLedgerInfos.firstKey(), false).values()); + if (!ledgersToDelete.isEmpty()) { + log.info("[{}]ledgers deleted in source, size={}", name, ledgersToDelete.size()); + try { + advanceCursorsIfNecessary(ledgersToDelete); + } catch (ManagedLedgerException.LedgerNotExistException e) { + log.info("[{}] First non deleted Ledger is not found, advanceCursors fails", name); + } + doDeleteLedgers(ledgersToDelete); + } + } + + + @Override + public synchronized void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx) { + store.unwatchManagedLedgerInfo(sourceMLName); + super.asyncClose(callback, ctx); + } + + @Override + protected void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { + // do nothing. + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 02e9adcca3722..c9f95ab524f55 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -620,13 +620,15 @@ public String getReplicatorPrefix() { return replicatorPrefix; } + protected String getSchemaId() { + String base = TopicName.get(getName()).getPartitionedTopicName(); + return TopicName.get(base).getSchemaName(); + } @Override public CompletableFuture hasSchema() { - String base = TopicName.get(getName()).getPartitionedTopicName(); - String id = TopicName.get(base).getSchemaName(); return brokerService.pulsar() .getSchemaRegistryService() - .getSchema(id).thenApply(Objects::nonNull); + .getSchema(getSchemaId()).thenApply(Objects::nonNull); } @Override @@ -635,8 +637,7 @@ public CompletableFuture addSchema(SchemaData schema) { return CompletableFuture.completedFuture(SchemaVersion.Empty); } - String base = TopicName.get(getName()).getPartitionedTopicName(); - String id = TopicName.get(base).getSchemaName(); + String id = getSchemaId(); SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService(); if (allowAutoUpdateSchema()) { @@ -667,8 +668,7 @@ private boolean allowAutoUpdateSchema() { @Override public CompletableFuture deleteSchema() { - String base = TopicName.get(getName()).getPartitionedTopicName(); - String id = TopicName.get(base).getSchemaName(); + String id = getSchemaId(); SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService(); return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id)) .thenCompose(schema -> { @@ -687,8 +687,7 @@ public CompletableFuture deleteSchema() { @Override public CompletableFuture checkSchemaCompatibleForConsumer(SchemaData schema) { - String base = TopicName.get(getName()).getPartitionedTopicName(); - String id = TopicName.get(base).getSchemaName(); + String id = getSchemaId(); return brokerService.pulsar() .getSchemaRegistryService() .checkConsumerCompatibility(id, schema, getSchemaCompatibilityStrategy()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 2015128b9e800..a5a7c4ec3a7fa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1779,7 +1779,7 @@ public CompletableFuture getManagedLedgerConfig(TopicName t serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled()); managedLedgerConfig.setNewEntriesCheckDelayInMillis( serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis()); - + managedLedgerConfig.setTopicName(topicName); return managedLedgerConfig; }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index d7b99f9e8146e..e6957e86e7671 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -36,6 +36,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException; import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; @@ -85,6 +86,7 @@ public class Producer { private final boolean isRemote; private final String remoteCluster; private final boolean isNonPersistentTopic; + private final boolean isShadowTopic; private final boolean isEncrypted; private final ProducerAccessMode accessMode; @@ -116,6 +118,8 @@ public Producer(Topic topic, TransportCnx cnx, long producerId, String producerN this.chunkedMessageRate = new Rate(); this.isNonPersistentTopic = topic instanceof NonPersistentTopic; this.msgDrop = this.isNonPersistentTopic ? new Rate() : null; + this.isShadowTopic = + topic instanceof PersistentTopic && ((PersistentTopic) topic).getShadowSourceTopic().isPresent(); this.metadata = metadata != null ? metadata : Collections.emptyMap(); @@ -182,14 +186,14 @@ public boolean isSuccessorTo(Producer other) { } public void publishMessage(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, - boolean isChunked, boolean isMarker) { - if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize)) { - publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker); + boolean isChunked, boolean isMarker, Position position) { + if (checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, position)) { + publishMessageToTopic(headersAndPayload, sequenceId, batchSize, isChunked, isMarker, position); } } public void publishMessage(long producerId, long lowestSequenceId, long highestSequenceId, - ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) { + ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker, Position position) { if (lowestSequenceId > highestSequenceId) { cnx.execute(() -> { cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.MetadataError, @@ -198,13 +202,22 @@ public void publishMessage(long producerId, long lowestSequenceId, long highestS }); return; } - if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize)) { + if (checkAndStartPublish(producerId, highestSequenceId, headersAndPayload, batchSize, position)) { publishMessageToTopic(headersAndPayload, lowestSequenceId, highestSequenceId, batchSize, isChunked, - isMarker); + isMarker, position); } } - public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize) { + public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf headersAndPayload, long batchSize, + Position position) { + if (isShadowTopic && position == null || !isShadowTopic && position != null) { + cnx.execute(() -> { + cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError, + "Only shadow topic supports sending messages with messageId"); + cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes()); + }); + return false; + } if (isClosed) { cnx.execute(() -> { cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.PersistenceError, @@ -246,20 +259,20 @@ public boolean checkAndStartPublish(long producerId, long sequenceId, ByteBuf he } private void publishMessageToTopic(ByteBuf headersAndPayload, long sequenceId, long batchSize, boolean isChunked, - boolean isMarker) { + boolean isMarker, Position position) { MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, msgIn, headersAndPayload.readableBytes(), - batchSize, isChunked, System.nanoTime(), isMarker); + batchSize, isChunked, System.nanoTime(), isMarker, position); this.cnx.getBrokerService().getInterceptor() .onMessagePublish(this, headersAndPayload, messagePublishContext); topic.publishMessage(headersAndPayload, messagePublishContext); } private void publishMessageToTopic(ByteBuf headersAndPayload, long lowestSequenceId, long highestSequenceId, - long batchSize, boolean isChunked, boolean isMarker) { + long batchSize, boolean isChunked, boolean isMarker, Position position) { MessagePublishContext messagePublishContext = MessagePublishContext.get(this, lowestSequenceId, highestSequenceId, msgIn, headersAndPayload.readableBytes(), batchSize, - isChunked, System.nanoTime(), isMarker); + isChunked, System.nanoTime(), isMarker, position); this.cnx.getBrokerService().getInterceptor() .onMessagePublish(this, headersAndPayload, messagePublishContext); topic.publishMessage(headersAndPayload, messagePublishContext); @@ -335,7 +348,7 @@ public TransportCnx getCnx() { return this.cnx; } - private static final class MessagePublishContext implements PublishContext, Runnable { + private static final class MessagePublishContext implements PublishContext, Runnable, Position { /* * To store context information built by message payload * processors (time duration, size etc), if any configured @@ -361,6 +374,21 @@ private static final class MessagePublishContext implements PublishContext, Runn private long entryTimestamp; + @Override + public Position getNext() { + return null; + } + + @Override + public long getLedgerId() { + return ledgerId; + } + + @Override + public long getEntryId() { + return entryId; + } + public String getProducerName() { return producer.getProducerName(); } @@ -505,7 +533,7 @@ public void run() { } static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn, int msgSize, - long batchSize, boolean chunked, long startTimeNs, boolean isMarker) { + long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = sequenceId; @@ -517,6 +545,8 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn callback.originalSequenceId = -1L; callback.startTimeNs = startTimeNs; callback.isMarker = isMarker; + callback.ledgerId = position == null ? -1 : position.getLedgerId(); + callback.entryId = position == null ? -1 : position.getEntryId(); if (callback.propertyMap != null) { callback.propertyMap.clear(); } @@ -524,7 +554,7 @@ static MessagePublishContext get(Producer producer, long sequenceId, Rate rateIn } static MessagePublishContext get(Producer producer, long lowestSequenceId, long highestSequenceId, Rate rateIn, - int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker) { + int msgSize, long batchSize, boolean chunked, long startTimeNs, boolean isMarker, Position position) { MessagePublishContext callback = RECYCLER.get(); callback.producer = producer; callback.sequenceId = lowestSequenceId; @@ -537,6 +567,8 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long callback.startTimeNs = startTimeNs; callback.chunked = chunked; callback.isMarker = isMarker; + callback.ledgerId = position == null ? -1 : position.getLedgerId(); + callback.entryId = position == null ? -1 : position.getEntryId(); if (callback.propertyMap != null) { callback.propertyMap.clear(); } @@ -757,10 +789,10 @@ public void checkEncryption() { public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId, ByteBuf headersAndPayload, long batchSize, boolean isChunked, boolean isMarker) { - checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize); + checkAndStartPublish(producerId, sequenceId, headersAndPayload, batchSize, null); MessagePublishContext messagePublishContext = MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn, - headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker); + headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime(), isMarker, null); this.cnx.getBrokerService().getInterceptor() .onMessagePublish(this, headersAndPayload, messagePublishContext); topic.publishTxnMessage(txnID, headersAndPayload, messagePublishContext); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 3ce02e23489ed..15842f4598549 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1596,13 +1596,17 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) { return; } + // This position is only used for shadow replicator + Position position = send.hasMessageId() + ? PositionImpl.get(send.getMessageId().getLedgerId(), send.getMessageId().getEntryId()) : null; + // Persist the message if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) { producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), - headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker()); + headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker(), position); } else { producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, - send.getNumMessages(), send.isIsChunk(), send.isMarker()); + send.getNumMessages(), send.isIsChunk(), send.isMarker(), position); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ea20d413484cf..d3206cf03b89b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1767,6 +1767,17 @@ public int getNumberOfSameAddressConsumers(final String clientAddress) { return getNumberOfSameAddressConsumers(clientAddress, subscriptions.values()); } + @Override + protected String getSchemaId() { + if (shadowSourceTopic == null) { + return super.getSchemaId(); + } else { + //reuse schema from shadow source. + String base = shadowSourceTopic.getPartitionedTopicName(); + return TopicName.get(base).getSchemaName(); + } + } + @Override public ConcurrentOpenHashMap getSubscriptions() { return subscriptions; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java index 0ab645f089048..cc169f355f023 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowReplicatorTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service.persistent; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; @@ -30,7 +31,6 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.common.policies.data.TenantInfoImpl; -import org.assertj.core.util.Lists; import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.BeforeClass; @@ -62,10 +62,14 @@ public void testShadowReplication() throws Exception { String shadowTopicName = "persistent://prop1/ns-shadow/shadow-topic"; String shadowTopicName2 = "persistent://prop1/ns-shadow/shadow-topic-2"; + admin.topics().createNonPartitionedTopic(sourceTopicName); + admin.topics().createShadowTopic(shadowTopicName, sourceTopicName); + admin.topics().createShadowTopic(shadowTopicName2, sourceTopicName); + admin.topics().setShadowTopics(sourceTopicName, Lists.newArrayList(shadowTopicName, shadowTopicName2)); + @Cleanup Producer producer = pulsarClient.newProducer().topic(sourceTopicName).create(); - // NOTE: shadow topic is not ready yet. So use normal topic instead. - // The only difference for consumer should be that the message id is changed. + @Cleanup Consumer shadowConsumer = pulsarClient.newConsumer().topic(shadowTopicName).subscriptionName("shadow-sub") @@ -78,8 +82,6 @@ public void testShadowReplication() throws Exception { PersistentTopic sourceTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(sourceTopicName).get().get(); - admin.topics().setShadowTopics(sourceTopicName, Lists.newArrayList(shadowTopicName, shadowTopicName2)); - Awaitility.await().untilAsserted(()->Assert.assertEquals(sourceTopic.getShadowReplicators().size(), 2)); ShadowReplicator @@ -129,7 +131,6 @@ public void testShadowReplication() throws Exception { //`replicatedFrom` is set as localClusterName in shadow topic. Assert.assertNotEquals(shadowMessage.getReplicatedFrom(), sourceMessage.getReplicatedFrom()); - //Currently, msg is copied in BK. So the message id is not the same. - Assert.assertNotEquals(shadowMessage.getMessageId(), sourceMessage.getMessageId()); + Assert.assertEquals(shadowMessage.getMessageId(), sourceMessage.getMessageId()); } } \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java index 9026f406b4312..281e4b8d255bc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java @@ -19,10 +19,25 @@ package org.apache.pulsar.broker.service.persistent; +import com.google.common.collect.Lists; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; +import lombok.AllArgsConstructor; +import lombok.Cleanup; +import lombok.Data; +import lombok.NoArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.ShadowManagedLedgerImpl; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.common.naming.TopicName; +import org.awaitility.Awaitility; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -43,10 +58,14 @@ protected void cleanup() throws Exception { internalCleanup(); } - @Test() + private String newShadowSourceTopicName() { + return "persistent://" + newTopicName(); + } + + @Test public void testNonPartitionedShadowTopicSetup() throws Exception { - String sourceTopic = "persistent://prop/ns-abc/source"; - String shadowTopic = "persistent://prop/ns-abc/shadow"; + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; //1. test shadow topic setting in topic creation. admin.topics().createNonPartitionedTopic(sourceTopic); admin.topics().createShadowTopic(shadowTopic, sourceTopic); @@ -65,20 +84,22 @@ public void testNonPartitionedShadowTopicSetup() throws Exception { Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic); } - @Test() + @Test public void testPartitionedShadowTopicSetup() throws Exception { - String sourceTopic = "persistent://prop/ns-abc/source-p"; - String shadowTopic = "persistent://prop/ns-abc/shadow-p"; + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; + String sourceTopicPartition = TopicName.get(sourceTopic).getPartition(0).toString(); String shadowTopicPartition = TopicName.get(shadowTopic).getPartition(0).toString(); //1. test shadow topic setting in topic creation. admin.topics().createPartitionedTopic(sourceTopic, 2); admin.topics().createShadowTopic(shadowTopic, sourceTopic); pulsarClient.newProducer().topic(shadowTopic).create().close();//trigger loading partitions. + PersistentTopic brokerShadowTopic = (PersistentTopic) pulsar.getBrokerService() .getTopicIfExists(shadowTopicPartition).get().get(); Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl); - Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic); + Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopicPartition); Assert.assertEquals(admin.topics().getShadowSource(shadowTopic), sourceTopic); //2. test shadow topic could be properly loaded after unload. @@ -89,8 +110,134 @@ public void testPartitionedShadowTopicSetup() throws Exception { brokerShadowTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(shadowTopicPartition).get().get(); Assert.assertTrue(brokerShadowTopic.getManagedLedger() instanceof ShadowManagedLedgerImpl); - Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopic); + Assert.assertEquals(brokerShadowTopic.getShadowSourceTopic().get().toString(), sourceTopicPartition); + } + + @Test + public void testShadowTopicNotWritable() throws Exception { + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; + admin.topics().createNonPartitionedTopic(sourceTopic); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + @Cleanup + Producer producer = pulsarClient.newProducer().topic(shadowTopic).create(); + Assert.expectThrows(PulsarClientException.NotAllowedException.class, ()-> producer.send(new byte[]{1,2,3})); + } + + private void awaitUntilShadowReplicatorReady(String sourceTopic, String shadowTopic) { + Awaitility.await().untilAsserted(()->{ + PersistentTopic sourcePersistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(sourceTopic).get().get(); + ShadowReplicator + replicator = (ShadowReplicator) sourcePersistentTopic.getShadowReplicators().get(shadowTopic); + if (replicator == null) { + return; + } + Assert.assertEquals(String.valueOf(replicator.getState()), "Started"); + }); + } + @Test + public void testShadowTopicConsuming() throws Exception { + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; + admin.topics().createNonPartitionedTopic(sourceTopic); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic)); + awaitUntilShadowReplicatorReady(sourceTopic, shadowTopic); + + @Cleanup Producer producer = pulsarClient.newProducer().topic(sourceTopic).create(); + @Cleanup Consumer consumer = + pulsarClient.newConsumer().topic(shadowTopic).subscriptionName("sub").subscribe(); + byte[] content = "Hello Shadow Topic".getBytes(StandardCharsets.UTF_8); + MessageId id = producer.send(content); + Message msg = consumer.receive(5, TimeUnit.SECONDS); + Assert.assertEquals(msg.getMessageId(), id); + Assert.assertEquals(msg.getValue(), content); + } + + + @Test + public void testShadowTopicConsumingWithStringSchema() throws Exception { + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; + admin.topics().createNonPartitionedTopic(sourceTopic); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic)); + awaitUntilShadowReplicatorReady(sourceTopic, shadowTopic); + + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); + @Cleanup Consumer consumer = + pulsarClient.newConsumer(Schema.STRING).topic(shadowTopic).subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); + String content = "Hello Shadow Topic"; + MessageId id = producer.send(content); + Message msg = consumer.receive(); + Assert.assertEquals(msg.getMessageId(), id); + Assert.assertEquals(msg.getValue(), content); + + for (int i = 0; i < 10; i++) { + producer.send(content + i); + } + for (int i = 0; i < 10; i++) { + Assert.assertEquals(consumer.receive().getValue(), content + i); + } } + @AllArgsConstructor + @NoArgsConstructor + @Data + private static class Point { + int x; + int y; + } + @Test + public void testShadowTopicConsumingWithJsonSchema() throws Exception { + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; + admin.topics().createNonPartitionedTopic(sourceTopic); + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic)); + awaitUntilShadowReplicatorReady(sourceTopic, shadowTopic); + + @Cleanup Producer producer = + pulsarClient.newProducer(Schema.JSON(Point.class)).topic(sourceTopic).create(); + @Cleanup Consumer consumer = + pulsarClient.newConsumer(Schema.JSON(Point.class)).topic(shadowTopic).subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); + Point content = new Point(1, 2); + MessageId id = producer.send(content); + Message msg = consumer.receive(); + Assert.assertEquals(msg.getMessageId(), id); + Assert.assertEquals(msg.getValue(), content); + } + + @Test + public void testConsumeShadowMessageWithoutCache() throws Exception { + String sourceTopic = newShadowSourceTopicName(); + String shadowTopic = sourceTopic + "-shadow"; + admin.topics().createNonPartitionedTopic(sourceTopic); + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING).topic(sourceTopic).create(); + String content = "Hello Shadow Topic"; + MessageId id = producer.send(content); + for (int i = 0; i < 10; i++) { + producer.send(content + i); + } + + admin.topics().createShadowTopic(shadowTopic, sourceTopic); + // disable shadow replicator + // admin.topics().setShadowTopics(sourceTopic, Lists.newArrayList(shadowTopic)); + @Cleanup Consumer consumer = + pulsarClient.newConsumer(Schema.STRING).topic(shadowTopic).subscriptionName("sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Message msg = consumer.receive(); + Assert.assertEquals(msg.getMessageId(), id); + Assert.assertEquals(msg.getValue(), content); + + for (int i = 0; i < 10; i++) { + Assert.assertEquals(consumer.receive().getValue(), content + i); + } + } } From 07d964985e85bae4c8bafa88596115edce1f9aaa Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Fri, 4 Nov 2022 17:39:56 +0800 Subject: [PATCH 02/11] fix test --- .../org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java | 8 ++++++-- .../java/org/apache/pulsar/client/impl/ProducerImpl.java | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index 1ac6c3cbb0877..bcb73553324dd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -74,7 +74,9 @@ public MetaStoreImpl(MetadataStore store, OrderedExecutor executor) { this.ledgerInfoCompressionType = CompressionType.NONE; this.cursorInfoCompressionType = CompressionType.NONE; managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>(); - store.registerListener(this); + if (store != null) { + store.registerListener(this); + } } public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String ledgerInfoCompressionType, @@ -84,7 +86,9 @@ public MetaStoreImpl(MetadataStore store, OrderedExecutor executor, String ledge this.ledgerInfoCompressionType = parseCompressionType(ledgerInfoCompressionType); this.cursorInfoCompressionType = parseCompressionType(cursorInfoCompressionType); managedLedgerInfoUpdateCallbackMap = new ConcurrentHashMap<>(); - store.registerListener(this); + if (store != null) { + store.registerListener(this); + } } private CompressionType parseCompressionType(String value) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index e82bcb7cad6e7..182e0c22c3ac8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1505,7 +1505,9 @@ void setBatchSizeByte(long batchSizeByte) { void setMessageId(long ledgerId, long entryId, int partitionIndex) { if (msg != null) { - msg.setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex)); + if (msg.getMessageId() != null) { + msg.setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex)); + } } else if (msgs.size() == 1) { // If there is only one message in batch, the producer will publish messages like non-batch msgs.get(0).setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex)); From 1f570e18bb09b0b2c6c3776642894f0f3c33c4e4 Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Sat, 5 Nov 2022 17:49:37 +0800 Subject: [PATCH 03/11] fix testCompactionRetentionOnTopicCreationWithTopicPolicies --- .../main/java/org/apache/pulsar/client/impl/ProducerImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 182e0c22c3ac8..1c1d50e9352f8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -1505,7 +1505,7 @@ void setBatchSizeByte(long batchSizeByte) { void setMessageId(long ledgerId, long entryId, int partitionIndex) { if (msg != null) { - if (msg.getMessageId() != null) { + if (msg.getMessageId() == null) { msg.setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex)); } } else if (msgs.size() == 1) { From cb1fc4066dceb234a3bd5e940e5bb03144e89f4a Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Sat, 5 Nov 2022 21:38:19 +0800 Subject: [PATCH 04/11] fix testBlockIfQueueFullWhenChunking --- .../apache/pulsar/client/impl/ProducerImpl.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java index 1c1d50e9352f8..dceaf50214182 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java @@ -533,6 +533,8 @@ public void sendAsync(Message message, SendCallback callback) { ? msg.getMessageBuilder().getSchemaVersion() : null; byte[] orderingKey = totalChunks > 1 && msg.getMessageBuilder().hasOrderingKey() ? msg.getMessageBuilder().getOrderingKey() : null; + // msg.messageId will be reset if previous message chunk is sent successfully. + final MessageId messageId = msg.getMessageId(); for (int chunkId = 0; chunkId < totalChunks; chunkId++) { // Need to reset the schemaVersion, because the schemaVersion is based on a ByteBuf object in // `MessageMetadata`, if we want to re-serialize the `SEND` command using a same `MessageMetadata`, @@ -555,7 +557,7 @@ public void sendAsync(Message message, SendCallback callback) { synchronized (this) { serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks, readStartIndex, payloadChunkSize, compressedPayload, compressed, - compressedPayload.readableBytes(), callback, chunkedMessageCtx); + compressedPayload.readableBytes(), callback, chunkedMessageCtx, messageId); readStartIndex = ((chunkId + 1) * payloadChunkSize); } } @@ -617,7 +619,8 @@ private void serializeAndSendMessage(MessageImpl msg, boolean compressed, int compressedPayloadSize, SendCallback callback, - ChunkedMessageCtx chunkedMessageCtx) throws IOException { + ChunkedMessageCtx chunkedMessageCtx, + MessageId messageId) throws IOException { ByteBuf chunkPayload = compressedPayload; MessageMetadata msgMetadata = msg.getMessageBuilder(); if (totalChunks > 1 && TopicName.get(topic).isPersistent()) { @@ -686,14 +689,14 @@ private void serializeAndSendMessage(MessageImpl msg, : 1; final OpSendMsg op; if (msg.getSchemaState() == MessageImpl.SchemaState.Ready) { - ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, msg.getMessageId(), msgMetadata, + ByteBufPair cmd = sendMessage(producerId, sequenceId, numMessages, messageId, msgMetadata, encryptedPayload); op = OpSendMsg.create(msg, cmd, sequenceId, callback); } else { op = OpSendMsg.create(msg, null, sequenceId, callback); final MessageMetadata finalMsgMetadata = msgMetadata; op.rePopulate = () -> { - op.cmd = sendMessage(producerId, sequenceId, numMessages, msg.getMessageId(), finalMsgMetadata, + op.cmd = sendMessage(producerId, sequenceId, numMessages, messageId, finalMsgMetadata, encryptedPayload); }; } @@ -1505,9 +1508,7 @@ void setBatchSizeByte(long batchSizeByte) { void setMessageId(long ledgerId, long entryId, int partitionIndex) { if (msg != null) { - if (msg.getMessageId() == null) { - msg.setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex)); - } + msg.setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex)); } else if (msgs.size() == 1) { // If there is only one message in batch, the producer will publish messages like non-batch msgs.get(0).setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex)); From f31e1c0b817cf5b83ba3176c219bdda89af11751 Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Fri, 18 Nov 2022 11:11:00 +0800 Subject: [PATCH 05/11] address comments --- .../bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java | 8 +++----- .../java/org/apache/pulsar/broker/service/Producer.java | 3 +++ 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java index 28a427677581c..ab8815264373a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -93,13 +93,10 @@ public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) log.debug("[{}][{}] Source ML info:{}", name, sourceMLName, mlInfo); } sourceLedgersStat = stat; - // Fails if init with empty ledger. Very small chance here, since shadow topic is - // created when source topic exists. if (mlInfo.getLedgerInfoCount() == 0) { + // Small chance here, since shadow topic is created after source topic exists. log.warn("[{}] Source topic ledger list is empty! source={},mlInfo={},stat={}", name, sourceMLName, mlInfo, stat); -// callback.initializeFailed(new ManagedLedgerException.ManagedLedgerSourceNotReadyException( -// "Source managed ledger " + sourceMLName + " is not ready yet.")); ShadowManagedLedgerImpl.super.initialize(callback, ctx); return; } @@ -281,7 +278,8 @@ protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { private synchronized void processSourceManagedLedgerInfo(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) { if (log.isDebugEnabled()) { - log.debug("[{}][{}] new SourceManagedLedgerInfo:{}", name, sourceMLName, mlInfo); + log.debug("[{}][{}] new SourceManagedLedgerInfo:{}, prevStat={},stat={}", name, sourceMLName, mlInfo, + sourceLedgersStat, stat); } sourceLedgersStat = stat; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index e6957e86e7671..fe2249679a84c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -348,6 +348,9 @@ public TransportCnx getCnx() { return this.cnx; } + /** + * MessagePublishContext implements Position because that ShadowManagedLedger need to know the source position info. + */ private static final class MessagePublishContext implements PublishContext, Runnable, Position { /* * To store context information built by message payload From 9023336084acd7c72157a25adbc042b246b73f6f Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Fri, 25 Nov 2022 18:51:58 +0800 Subject: [PATCH 06/11] move topic name related logic in managed-ledger to broker layer. --- .../mledger/ManagedLedgerConfig.java | 2 +- .../impl/ManagedLedgerFactoryImpl.java | 1 + .../mledger/impl/ShadowManagedLedgerImpl.java | 68 ++++++++----------- .../pulsar/broker/service/BrokerService.java | 19 +++++- .../service/persistent/PersistentTopic.java | 2 +- .../service/persistent/ShadowTopicTest.java | 2 +- .../pulsar/common/naming/TopicName.java | 8 +++ 7 files changed, 56 insertions(+), 46 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index f41e4206238b0..28b8e054eceb1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -89,7 +89,7 @@ public class ManagedLedgerConfig { @Getter @Setter - private TopicName topicName = null; + private String shadowSourceName; public boolean isCreateIfMissing() { return createIfMissing; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 9f3fe9bb0c4a7..c7c83ca4c843a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -81,6 +81,7 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java index ab8815264373a..7e05591c579fa 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -37,7 +37,6 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.metadata.api.Stat; /** @@ -45,8 +44,6 @@ */ @Slf4j public class ShadowManagedLedgerImpl extends ManagedLedgerImpl { - - private final TopicName shadowSource; private final String sourceMLName; private volatile Stat sourceLedgersStat; @@ -55,14 +52,7 @@ public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper book OrderedScheduler scheduledExecutor, String name, final Supplier mlOwnershipChecker) { super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker); - if (config.getTopicName().isPartitioned() && TopicName.getPartitionIndex(config.getShadowSource()) == -1) { - this.shadowSource = - TopicName.get(config.getShadowSource()).getPartition(config.getTopicName().getPartitionIndex()); - } else { - this.shadowSource = TopicName.get(config.getShadowSource()); - } - this.sourceMLName = - shadowSource.getPersistenceNamingEncoding(); + this.sourceMLName = config.getShadowSourceName(); } /** @@ -71,6 +61,7 @@ public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper book * 2. super.initialize : read its own read source managedLedgerInfo * 3. this.initializeBookKeeper * 4. super.initializeCursors + * * @param callback * @param ctx */ @@ -170,10 +161,6 @@ public void operationFailed(ManagedLedgerException.MetaStoreException e) { }); } - public TopicName getShadowSource() { - return shadowSource; - } - @Override protected boolean isLedgersReadonly() { return true; @@ -273,7 +260,6 @@ protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { * 1. new ledgers. * 2. old ledgers deleted. * 3. old ledger offload info updated (including ledger deleted from bookie by offloader) - * */ private synchronized void processSourceManagedLedgerInfo(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) { @@ -326,31 +312,31 @@ private synchronized void processSourceManagedLedgerInfo(MLDataFormats.ManagedLe //open ledger in readonly mode. bookKeeper.asyncOpenLedgerNoRecovery(lastLedgerId, digestType, config.getPassword(), (rc, lh, ctx1) -> executor.executeOrdered(name, safeRun(() -> { - mbean.endDataLedgerOpenOp(); - if (log.isDebugEnabled()) { - log.debug("[{}] Opened new source ledger {}", name, lastLedgerId); - } - if (rc == BKException.Code.OK) { - LedgerInfo info = LedgerInfo.newBuilder() - .setLedgerId(lastLedgerId) - .setEntries(lh.getLastAddConfirmed() + 1) - .setSize(lh.getLength()) - .setTimestamp(clock.millis()).build(); - ledgers.put(lastLedgerId, info); - currentLedger = lh; - currentLedgerEntries = 0; - currentLedgerSize = 0; - initLastConfirmedEntry(); - updateLedgersIdsComplete(); - maybeUpdateCursorBeforeTrimmingConsumedLedger(); - } else if (isNoSuchLedgerExistsException(rc)) { - log.warn("[{}] Source ledger not found: {}", name, lastLedgerId); - ledgers.remove(lastLedgerId); - } else { - log.error("[{}] Failed to open source ledger {}: {}", name, lastLedgerId, - BKException.getMessage(rc)); - } - })), null); + mbean.endDataLedgerOpenOp(); + if (log.isDebugEnabled()) { + log.debug("[{}] Opened new source ledger {}", name, lastLedgerId); + } + if (rc == BKException.Code.OK) { + LedgerInfo info = LedgerInfo.newBuilder() + .setLedgerId(lastLedgerId) + .setEntries(lh.getLastAddConfirmed() + 1) + .setSize(lh.getLength()) + .setTimestamp(clock.millis()).build(); + ledgers.put(lastLedgerId, info); + currentLedger = lh; + currentLedgerEntries = 0; + currentLedgerSize = 0; + initLastConfirmedEntry(); + updateLedgersIdsComplete(); + maybeUpdateCursorBeforeTrimmingConsumedLedger(); + } else if (isNoSuchLedgerExistsException(rc)) { + log.warn("[{}] Source ledger not found: {}", name, lastLedgerId); + ledgers.remove(lastLedgerId); + } else { + log.error("[{}] Failed to open source ledger {}: {}", name, lastLedgerId, + BKException.getMessage(rc)); + } + })), null); } //handle old ledgers deleted. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index a5a7c4ec3a7fa..c5f3d508a56db 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -19,6 +19,7 @@ package org.apache.pulsar.broker.service; import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.bookkeeper.mledger.ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import static org.apache.commons.collections4.CollectionUtils.isEmpty; import static org.apache.commons.lang3.StringUtils.isNotBlank; @@ -85,6 +86,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.util.Futures; +import org.apache.commons.collections4.MapUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy; import org.apache.pulsar.broker.PulsarServerException; @@ -1452,8 +1454,17 @@ protected CompletableFuture> fetchTopicPropertiesAsync(Topic if (metadata.partitions == PartitionedTopicMetadata.NON_PARTITIONED) { return managedLedgerFactory.getManagedLedgerPropertiesAsync( topicName.getPersistenceNamingEncoding()); + } else { + // Check if the partitioned topic is a ShadowTopic + if (MapUtils.getString(metadata.properties, PROPERTY_SOURCE_TOPIC_KEY) != null) { + String sourceTopic = metadata.properties.get(PROPERTY_SOURCE_TOPIC_KEY); + Map result = new HashMap<>(); + result.put(PROPERTY_SOURCE_TOPIC_KEY, TopicName.getTopicPartitionNameString( + sourceTopic, topicName.getPartitionIndex())); + return CompletableFuture.completedFuture(result); + } + return CompletableFuture.completedFuture(null); } - return CompletableFuture.completedFuture(metadata.properties); }); } } @@ -1473,6 +1484,7 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean propertiesFuture = CompletableFuture.completedFuture(properties); } propertiesFuture.thenAccept(finalProperties -> + //TODO add topicName in properties? createPersistentTopic(topic, createIfMissing, topicFuture, finalProperties) ).exceptionally(throwable -> { log.warn("[{}] Read topic property failed", topic, throwable); @@ -1528,6 +1540,10 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, } managedLedgerConfig.setCreateIfMissing(createIfMissing); managedLedgerConfig.setProperties(properties); + String shadowSource = managedLedgerConfig.getShadowSource(); + if (shadowSource != null) { + managedLedgerConfig.setShadowSourceName(TopicName.get(shadowSource).getPersistenceNamingEncoding()); + } // Once we have the configuration, we can proceed with the async open operation managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig, @@ -1779,7 +1795,6 @@ public CompletableFuture getManagedLedgerConfig(TopicName t serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled()); managedLedgerConfig.setNewEntriesCheckDelayInMillis( serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis()); - managedLedgerConfig.setTopicName(topicName); return managedLedgerConfig; }); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index d3206cf03b89b..7cf9ad722676a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -306,7 +306,7 @@ public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerS } transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry()); if (ledger instanceof ShadowManagedLedgerImpl) { - shadowSourceTopic = ((ShadowManagedLedgerImpl) ledger).getShadowSource(); + shadowSourceTopic = TopicName.get(ledger.getConfig().getShadowSource()); } else { shadowSourceTopic = null; } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java index 281e4b8d255bc..3a7740af1ee89 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java @@ -150,7 +150,7 @@ public void testShadowTopicConsuming() throws Exception { pulsarClient.newConsumer().topic(shadowTopic).subscriptionName("sub").subscribe(); byte[] content = "Hello Shadow Topic".getBytes(StandardCharsets.UTF_8); MessageId id = producer.send(content); - Message msg = consumer.receive(5, TimeUnit.SECONDS); + Message msg = consumer.receive(); Assert.assertEquals(msg.getMessageId(), id); Assert.assertEquals(msg.getValue(), content); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index 52c3ef3a3cada..7987a54630442 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -290,6 +290,14 @@ public static int getPartitionIndex(String topic) { return partitionIndex; } + /** + * A helper method to get a partition name of a topic in String. + * @return topic + "-partition-" + partition. + */ + public static String getTopicPartitionNameString(String topic, int partitionIndex) { + return topic + PARTITIONED_TOPIC_SUFFIX + partitionIndex; + } + /** * Returns the http rest path for use in the admin web service. * Eg: From 1dd5c7b709a75cc99e68d234bdb18abd644083b0 Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Fri, 25 Nov 2022 18:58:20 +0800 Subject: [PATCH 07/11] Add version check when process mlInfo from sourceML --- .../mledger/impl/ShadowManagedLedgerImpl.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java index 7e05591c579fa..14ac9b72e5300 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -83,6 +83,11 @@ public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) if (log.isDebugEnabled()) { log.debug("[{}][{}] Source ML info:{}", name, sourceMLName, mlInfo); } + if (sourceLedgersStat != null && sourceLedgersStat.getVersion() >= stat.getVersion()) { + log.warn("Newer version of mlInfo is already processed. Previous stat={}, current stat={}", + sourceLedgersStat, stat); + return; + } sourceLedgersStat = stat; if (mlInfo.getLedgerInfoCount() == 0) { // Small chance here, since shadow topic is created after source topic exists. @@ -267,7 +272,11 @@ private synchronized void processSourceManagedLedgerInfo(MLDataFormats.ManagedLe log.debug("[{}][{}] new SourceManagedLedgerInfo:{}, prevStat={},stat={}", name, sourceMLName, mlInfo, sourceLedgersStat, stat); } - + if (sourceLedgersStat != null && sourceLedgersStat.getVersion() >= stat.getVersion()) { + log.warn("Newer version of mlInfo is already processed. Previous stat={}, current stat={}", + sourceLedgersStat, stat); + return; + } sourceLedgersStat = stat; if (mlInfo.hasTerminatedPosition()) { From 48ebdeb09582db87482671786a55d6ef70d9400f Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Fri, 25 Nov 2022 20:33:38 +0800 Subject: [PATCH 08/11] Removed Terminated state from ShadowManagedLedger. --- .../mledger/ManagedLedgerException.java | 15 --------------- .../mledger/impl/ShadowManagedLedgerImpl.java | 17 ++++++++++------- 2 files changed, 10 insertions(+), 22 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 66f88626db444..1fa565d6ec788 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -89,21 +89,6 @@ public ManagedLedgerFencedException(Exception e) { } } - public static class ManagedLedgerSourceNotReadyException extends ManagedLedgerException { - - public ManagedLedgerSourceNotReadyException(String msg) { - super(msg); - } - - public ManagedLedgerSourceNotReadyException(Throwable e) { - super(e); - } - - public ManagedLedgerSourceNotReadyException(String msg, Throwable e) { - super(msg, e); - } - } - public static class ManagedLedgerNotFoundException extends ManagedLedgerException { public ManagedLedgerNotFoundException(Exception e) { super(e); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java index 14ac9b72e5300..cf3a9aeab4e46 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -98,7 +98,6 @@ public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) } if (mlInfo.hasTerminatedPosition()) { - state = State.Terminated; lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition()); log.info("[{}][{}] Recovering managed ledger terminated at {}", name, sourceMLName, lastConfirmedEntry); @@ -229,11 +228,6 @@ protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { if (!beforeAddEntry(addOperation)) { return; } - if (state == State.Terminated) { - addOperation.failed(new ManagedLedgerException.ManagedLedgerTerminatedException( - "Managed ledger was already terminated")); - return; - } if (state != State.LedgerOpened) { addOperation.failed(new ManagedLedgerException("Managed ledger is not opened")); return; @@ -259,6 +253,16 @@ protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { lastAddEntryTimeMs = System.currentTimeMillis(); } + /** + * terminate is not allowed on shadow topic. + * @param callback + * @param ctx + */ + @Override + public synchronized void asyncTerminate(AsyncCallbacks.TerminateCallback callback, Object ctx) { + callback.terminateFailed(new ManagedLedgerException("Terminate is not allowed on shadow topic."), ctx); + } + /** * Handle source ManagedLedgerInfo updates. * Update types: @@ -280,7 +284,6 @@ private synchronized void processSourceManagedLedgerInfo(MLDataFormats.ManagedLe sourceLedgersStat = stat; if (mlInfo.hasTerminatedPosition()) { - state = State.Terminated; lastConfirmedEntry = new PositionImpl(mlInfo.getTerminatedPosition()); log.info("[{}][{}] Process managed ledger terminated at {}", name, sourceMLName, lastConfirmedEntry); } From b9a2916592620cb2a8308381c887b63fbc199432 Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Fri, 25 Nov 2022 23:12:15 +0800 Subject: [PATCH 09/11] Add ShadowManagedLedgerImplTest --- .../mledger/impl/ManagedLedgerImpl.java | 2 +- .../bookkeeper/mledger/impl/OpAddEntry.java | 25 ++-- .../mledger/impl/ShadowManagedLedgerImpl.java | 47 ++++-- .../impl/ShadowManagedLedgerImplTest.java | 140 ++++++++++++++++++ .../service/persistent/ShadowTopicTest.java | 3 +- 5 files changed, 195 insertions(+), 22 deletions(-) create mode 100644 managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index f322ac9d2ae29..a487443f72bb3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -218,7 +218,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { protected volatile LedgerHandle currentLedger; protected long currentLedgerEntries = 0; protected long currentLedgerSize = 0; - private volatile long lastLedgerCreatedTimestamp = 0; + protected volatile long lastLedgerCreatedTimestamp = 0; private volatile long lastLedgerCreationFailureTimestamp = 0; private long lastLedgerCreationInitiationTimestamp = 0; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index ca36ab944c8b9..3f7a2a80f9a62 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -169,7 +169,6 @@ public void failed(ManagedLedgerException e) { @Override public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) { - if (!STATE_UPDATER.compareAndSet(OpAddEntry.this, State.INITIATED, State.COMPLETED)) { log.warn("[{}] The add op is terminal legacy callback for entry {}-{} adding.", ml.getName(), lh.getId(), entryId); @@ -177,11 +176,13 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) return; } - if (ledger.getId() != lh.getId()) { - log.warn("[{}] ledgerId {} doesn't match with acked ledgerId {}", ml.getName(), ledger.getId(), lh.getId()); + if (ledger != null && lh != null) { + if (ledger.getId() != lh.getId()) { + log.warn("[{}] ledgerId {} doesn't match with acked ledgerId {}", ml.getName(), ledger.getId(), lh.getId()); + } + checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(), + lh.getId()); } - checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(), - lh.getId()); if (!checkAndCompleteOp(ctx)) { // means callback might have been completed by different thread (timeout task thread).. so do nothing @@ -191,7 +192,7 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) this.entryId = entryId; if (log.isDebugEnabled()) { log.debug("[{}] [{}] write-complete: ledger-id={} entry-id={} size={} rc={}", this, ml.getName(), - lh.getId(), entryId, dataLength, rc); + lh == null ? -1 : lh.getId(), entryId, dataLength, rc); } if (rc != BKException.Code.OK) { @@ -220,23 +221,27 @@ public void safeRun() { ManagedLedgerImpl.NUMBER_OF_ENTRIES_UPDATER.incrementAndGet(ml); ManagedLedgerImpl.TOTAL_SIZE_UPDATER.addAndGet(ml, dataLength); + + long ledgerId = ledger != null ? ledger.getId() : ((Position) ctx).getLedgerId(); if (ml.hasActiveCursors()) { // Avoid caching entries if no cursor has been created - EntryImpl entry = EntryImpl.create(ledger.getId(), entryId, data); + EntryImpl entry = EntryImpl.create(ledgerId, entryId, data); // EntryCache.insert: duplicates entry by allocating new entry and data. so, recycle entry after calling // insert ml.entryCache.insert(entry); entry.release(); } - PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId); + PositionImpl lastEntry = PositionImpl.get(ledgerId, entryId); ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(ml); ml.lastConfirmedEntry = lastEntry; if (closeWhenDone) { - log.info("[{}] Closing ledger {} for being full", ml.getName(), ledger.getId()); + log.info("[{}] Closing ledger {} for being full", ml.getName(), ledgerId); // `data` will be released in `closeComplete` - ledger.asyncClose(this, ctx); + if (ledger != null) { + ledger.asyncClose(this, ctx); + } } else { updateLatency(); AddEntryCallback cb = callbackUpdater.getAndSet(this, null); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java index cf3a9aeab4e46..6831d8680be93 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java @@ -40,7 +40,7 @@ import org.apache.pulsar.metadata.api.Stat; /** - * Working in progress until PIP-180 is finished. + * Detailed design can be found in PIP-180. */ @Slf4j public class ShadowManagedLedgerImpl extends ManagedLedgerImpl { @@ -61,14 +61,10 @@ public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper book * 2. super.initialize : read its own read source managedLedgerInfo * 3. this.initializeBookKeeper * 4. super.initializeCursors - * - * @param callback - * @param ctx */ @Override synchronized void initialize(ManagedLedgerInitializeLedgerCallback callback, Object ctx) { log.info("Opening shadow managed ledger {} with source={}", name, sourceMLName); - executor.executeOrdered(name, safeRun(() -> doInitialize(callback, ctx))); } @@ -207,7 +203,7 @@ public void operationFailed(ManagedLedgerException.MetaStoreException e) { } private void initLastConfirmedEntry() { - if (lastConfirmedEntry != null || currentLedger == null) { + if (currentLedger == null) { return; } lastConfirmedEntry = new PositionImpl(currentLedger.getId(), currentLedger.getLastAddConfirmed()); @@ -243,13 +239,15 @@ protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { name, currentLedger.getId(), currentLedgerEntries, position.getLedgerId(), position.getEntryId()); } pendingAddEntries.add(addOperation); - if (position.getLedgerId() == currentLedger.getId()) { + if (position.getLedgerId() <= currentLedger.getId()) { // Write into lastLedger - addOperation.setLedger(currentLedger); + if (position.getLedgerId() == currentLedger.getId()) { + addOperation.setLedger(currentLedger); + } currentLedgerEntries = position.getEntryId(); currentLedgerSize += addOperation.data.readableBytes(); addOperation.initiateShadowWrite(); - } + } // for addOperation with ledgerId > currentLedger, will be processed in `updateLedgersIdsComplete` lastAddEntryTimeMs = System.currentTimeMillis(); } @@ -371,8 +369,37 @@ public synchronized void asyncClose(AsyncCallbacks.CloseCallback callback, Objec super.asyncClose(callback, ctx); } + @Override + protected synchronized void updateLedgersIdsComplete() { + STATE_UPDATER.set(this, State.LedgerOpened); + updateLastLedgerCreatedTimeAndScheduleRolloverTask(); + + if (log.isDebugEnabled()) { + log.debug("[{}] Resending {} pending messages", name, pendingAddEntries.size()); + } + + createNewOpAddEntryForNewLedger(); + + // Process all the pending addEntry requests + for (OpAddEntry op : pendingAddEntries) { + Position position = (Position) op.getCtx(); + if (position.getLedgerId() <= currentLedger.getId()) { + if (position.getLedgerId() == currentLedger.getId()) { + op.setLedger(currentLedger); + } else { + op.setLedger(null); + } + currentLedgerEntries = position.getEntryId(); + currentLedgerSize += op.data.readableBytes(); + op.initiateShadowWrite(); + } else { + break; + } + } + } + @Override protected void updateLastLedgerCreatedTimeAndScheduleRolloverTask() { - // do nothing. + this.lastLedgerCreatedTimestamp = clock.millis(); } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java new file mode 100644 index 0000000000000..2780765038d32 --- /dev/null +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java @@ -0,0 +1,140 @@ +package org.apache.bookkeeper.mledger.impl; + +import static org.testng.Assert.*; +import io.netty.buffer.ByteBuf; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +@Slf4j +public class ShadowManagedLedgerImplTest extends MockedBookKeeperTestCase { + + private ShadowManagedLedgerImpl openShadowManagedLedger(String name, String sourceName) + throws ManagedLedgerException, InterruptedException { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setShadowSourceName(sourceName); + Map properties = new HashMap<>(); + properties.put(ManagedLedgerConfig.PROPERTY_SOURCE_TOPIC_KEY, "source_topic"); + config.setProperties(properties); + ManagedLedger shadowML = factory.open(name, config); + assertTrue(shadowML instanceof ShadowManagedLedgerImpl); + return (ShadowManagedLedgerImpl) shadowML; + } + + @Test + public void testShadowWrites() throws Exception { + ManagedLedgerImpl sourceML = (ManagedLedgerImpl) factory.open("source_ML", new ManagedLedgerConfig() + .setMaxEntriesPerLedger(2) + .setRetentionTime(-1, TimeUnit.DAYS) + .setRetentionSizeInMB(-1)); + byte[] data = new byte[10]; + List positions = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + Position pos = sourceML.addEntry(data); + log.info("pos={}", pos); + positions.add(pos); + } + log.info("currentLedgerId:{}", sourceML.currentLedger.getId()); + assertEquals(sourceML.ledgers.size(), 3); + + ShadowManagedLedgerImpl shadowML = openShadowManagedLedger("shadow_ML", "source_ML"); + //After init, the state should be the same. + assertEquals(shadowML.ledgers.size(), 3); + assertEquals(sourceML.currentLedger.getId(), shadowML.currentLedger.getId()); + assertEquals(sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + + //Add new data to source ML + Position newPos = sourceML.addEntry(data); + + // The state should not be the same. + log.info("Source.LCE={},Shadow.LCE={}", sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + assertNotEquals(sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + + //Add new data to source ML, and a new ledger rolled + newPos = sourceML.addEntry(data); + assertEquals(sourceML.ledgers.size(), 4); + Awaitility.await().untilAsserted(()->assertEquals(shadowML.ledgers.size(), 4)); + log.info("Source.LCE={},Shadow.LCE={}", sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + Awaitility.await().untilAsserted(()->assertEquals(sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry)); + + {// test write entry with ledgerId < currentLedger + CompletableFuture future = new CompletableFuture<>(); + shadowML.asyncAddEntry(data, new AsyncCallbacks.AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + future.complete(position); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, positions.get(2)); + assertEquals(future.get(), positions.get(2)); + // LCE is not updated. + log.info("1.Source.LCE={},Shadow.LCE={}", sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + assertNotEquals(sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + } + + {// test write entry with ledgerId == currentLedger + newPos = sourceML.addEntry(data); + assertEquals(sourceML.ledgers.size(), 4); + assertNotEquals(sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + + CompletableFuture future = new CompletableFuture<>(); + shadowML.asyncAddEntry(data, new AsyncCallbacks.AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + future.complete(position); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, newPos); + assertEquals(future.get(), newPos); + // LCE should be updated. + log.info("2.Source.LCE={},Shadow.LCE={}", sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + assertEquals(sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + } + + {// test write entry with ledgerId > currentLedger + PositionImpl fakePos = PositionImpl.get(newPos.getLedgerId() + 1, newPos.getEntryId()); + + CompletableFuture future = new CompletableFuture<>(); + shadowML.asyncAddEntry(data, new AsyncCallbacks.AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + future.complete(position); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + future.completeExceptionally(exception); + } + }, fakePos); + //This write will be queued unit new ledger is rolled in source. + + newPos = sourceML.addEntry(data); // new ledger rolled. + newPos = sourceML.addEntry(data); + Awaitility.await().untilAsserted(() -> assertEquals(shadowML.ledgers.size(), 5)); + assertEquals(future.get(), fakePos); + // LCE should be updated. + log.info("3.Source.LCE={},Shadow.LCE={}", sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + assertEquals(sourceML.lastConfirmedEntry, shadowML.lastConfirmedEntry); + } + } +} \ No newline at end of file diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java index 3a7740af1ee89..5f8e8f6ffd9b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ShadowTopicTest.java @@ -150,7 +150,8 @@ public void testShadowTopicConsuming() throws Exception { pulsarClient.newConsumer().topic(shadowTopic).subscriptionName("sub").subscribe(); byte[] content = "Hello Shadow Topic".getBytes(StandardCharsets.UTF_8); MessageId id = producer.send(content); - Message msg = consumer.receive(); + log.info("msg send to source topic, id={}", id); + Message msg = consumer.receive(5, TimeUnit.SECONDS); Assert.assertEquals(msg.getMessageId(), id); Assert.assertEquals(msg.getValue(), content); } From d11e5021c7b7ddabca1e5e894332a54b96981fe3 Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Fri, 25 Nov 2022 23:30:05 +0800 Subject: [PATCH 10/11] fix header --- .../impl/ShadowManagedLedgerImplTest.java | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java index 2780765038d32..4482e9944c0ce 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImplTest.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ package org.apache.bookkeeper.mledger.impl; import static org.testng.Assert.*; From 536f10a7219824b09f2b52634279cc15a24978c6 Mon Sep 17 00:00:00 2001 From: JiangHaiting Date: Fri, 25 Nov 2022 23:38:44 +0800 Subject: [PATCH 11/11] fix compile --- .../org/apache/bookkeeper/mledger/ManagedLedgerConfig.java | 1 - .../bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java | 1 - .../org/apache/bookkeeper/mledger/impl/OpAddEntry.java | 7 ++++--- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 28b8e054eceb1..6e88a8e650d58 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -33,7 +33,6 @@ import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; import org.apache.commons.collections4.MapUtils; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet; /** diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index c7c83ca4c843a..9f3fe9bb0c4a7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -81,7 +81,6 @@ import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig; import org.apache.pulsar.common.util.DateFormatter; import org.apache.pulsar.common.util.FutureUtil; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 3f7a2a80f9a62..34d94efb4942b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -178,10 +178,11 @@ public void addComplete(int rc, final LedgerHandle lh, long entryId, Object ctx) if (ledger != null && lh != null) { if (ledger.getId() != lh.getId()) { - log.warn("[{}] ledgerId {} doesn't match with acked ledgerId {}", ml.getName(), ledger.getId(), lh.getId()); + log.warn("[{}] ledgerId {} doesn't match with acked ledgerId {}", ml.getName(), ledger.getId(), + lh.getId()); } - checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(), - lh.getId()); + checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", + ledger.getId(), lh.getId()); } if (!checkAndCompleteOp(ctx)) {