diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 14135b037920a..c6c341fd921d1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -125,17 +125,22 @@ public void setCloseWhenDone(boolean closeWhenDone) { public void initiate() { if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) { - ByteBuf duplicateBuffer = data.retainedDuplicate(); // internally asyncAddEntry() will take the ownership of the buffer and release it at the end addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml); lastInitTime = System.nanoTime(); if (ml.getManagedLedgerInterceptor() != null) { + long originalDataLen = data.readableBytes(); payloadProcessorHandle = ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this, duplicateBuffer); if (payloadProcessorHandle != null) { duplicateBuffer = payloadProcessorHandle.getProcessedPayload(); + // If data len of entry changes, correct "dataLength" and "currentLedgerSize". + if (originalDataLen != duplicateBuffer.readableBytes()) { + this.dataLength = duplicateBuffer.readableBytes(); + this.ml.currentLedgerSize += (dataLength - originalDataLen); + } } } ledger.asyncAddEntry(duplicateBuffer, this, addOpCount); diff --git a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImplTest2.java b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImplTest2.java new file mode 100644 index 0000000000000..080ef9ea4c5fe --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImplTest2.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.testng.Assert.assertEquals; +import static org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest.TestPayloadProcessor; +import java.util.HashSet; +import java.util.Set; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl; +import org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest; +import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +/*** + * Differ to {@link MangedLedgerInterceptorImplTest}, this test can call {@link ManagedLedgerImpl}'s methods modified + * by "default". + */ +@Slf4j +@Test(groups = "broker") +public class MangedLedgerInterceptorImplTest2 extends MockedBookKeeperTestCase { + + public static void switchLedgerManually(ManagedLedgerImpl ledger){ + LedgerHandle originalLedgerHandle = ledger.currentLedger; + ledger.ledgerClosed(ledger.currentLedger); + ledger.createLedgerAfterClosed(); + Awaitility.await().until(() -> { + return ledger.state == ManagedLedgerImpl.State.LedgerOpened && ledger.currentLedger != originalLedgerHandle; + }); + } + + @Test + public void testCurrentLedgerSizeCorrectIfHasInterceptor() throws Exception { + final String mlName = "ml1"; + final String cursorName = "cursor1"; + + // Registry interceptor. + ManagedLedgerConfig config = new ManagedLedgerConfig(); + Set processors = new HashSet(); + processors.add(new TestPayloadProcessor()); + ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(new HashSet(), processors); + config.setManagedLedgerInterceptor(interceptor); + config.setMaxEntriesPerLedger(100); + + // Add one entry. + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(cursorName); + ledger.addEntry(new byte[1]); + + // Mark "currentLedgerSize" and switch ledger. + long currentLedgerSize = ledger.getCurrentLedgerSize(); + switchLedgerManually(ledger); + + // verify. + assertEquals(currentLedgerSize, MangedLedgerInterceptorImplTest.calculatePreciseSize(ledger)); + + // cleanup. + cursor.close(); + ledger.close(); + factory.getEntryCacheManager().clear(); + factory.shutdown(); + config.setManagedLedgerInterceptor(null); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java index ec9dd53685f87..0cf0adb5181ec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.function.Predicate; import lombok.Cleanup; @@ -32,6 +33,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.OpAddEntry; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; @@ -60,7 +62,7 @@ public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase { private static final Logger log = LoggerFactory.getLogger(MangedLedgerInterceptorImplTest.class); - public class TestPayloadProcessor implements ManagedLedgerPayloadProcessor { + public static class TestPayloadProcessor implements ManagedLedgerPayloadProcessor { @Override public Processor inputProcessor() { return new Processor() { @@ -158,6 +160,47 @@ public void testMessagePayloadProcessor() throws Exception { config.setManagedLedgerInterceptor(null); } + @Test + public void testTotalSizeCorrectIfHasInterceptor() throws Exception { + final String mlName = "ml1"; + final String cursorName = "cursor1"; + + // Registry interceptor. + ManagedLedgerConfig config = new ManagedLedgerConfig(); + Set processors = new HashSet(); + processors.add(new TestPayloadProcessor()); + ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(new HashSet(), processors); + config.setManagedLedgerInterceptor(interceptor); + config.setMaxEntriesPerLedger(2); + + // Add many entries and consume. + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(cursorName); + for (int i = 0; i < 5; i++){ + cursor.delete(ledger.addEntry(new byte[1])); + } + + // Trim ledgers. + CompletableFuture trimLedgerFuture = new CompletableFuture<>(); + ledger.trimConsumedLedgersInBackground(trimLedgerFuture); + trimLedgerFuture.join(); + + // verify. + assertEquals(ledger.getTotalSize(), calculatePreciseSize(ledger)); + + // cleanup. + cursor.close(); + ledger.close(); + factory.getEntryCacheManager().clear(); + factory.shutdown(); + config.setManagedLedgerInterceptor(null); + } + + public static long calculatePreciseSize(ManagedLedgerImpl ledger){ + return ledger.getLedgersInfo().values().stream() + .map(info -> info.getSize()).reduce((l1,l2) -> l1 + l2).orElse(0L) + ledger.getCurrentLedgerSize(); + } + @Test(timeOut = 20000) public void testRecoveryIndex() throws Exception { final int MOCK_BATCH_SIZE = 2;