From 80a90048eb731c4a3f9fa22c0de659c948af489d Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Thu, 2 Feb 2023 22:01:02 +0800 Subject: [PATCH 1/5] [fix] [ml] Fix the incorrect total size if use ML interceptor --- .../bookkeeper/mledger/impl/OpAddEntry.java | 4 +- .../MangedLedgerInterceptorImplTest2.java | 85 +++++++++++++++++++ .../MangedLedgerInterceptorImplTest.java | 45 +++++++++- 3 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImplTest2.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 14135b037920a..1906500b47a67 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -125,7 +125,7 @@ public void setCloseWhenDone(boolean closeWhenDone) { public void initiate() { if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) { - + long originalDataLen = data.readableBytes(); ByteBuf duplicateBuffer = data.retainedDuplicate(); // internally asyncAddEntry() will take the ownership of the buffer and release it at the end @@ -138,6 +138,8 @@ public void initiate() { duplicateBuffer = payloadProcessorHandle.getProcessedPayload(); } } + this.dataLength = duplicateBuffer.readableBytes(); + this.ml.currentLedgerSize += (dataLength - originalDataLen); ledger.asyncAddEntry(duplicateBuffer, this, addOpCount); } else { log.warn("[{}] initiate with unexpected state {}, expect OPEN state.", ml.getName(), state); diff --git a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImplTest2.java b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImplTest2.java new file mode 100644 index 0000000000000..080ef9ea4c5fe --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/MangedLedgerInterceptorImplTest2.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.impl; + +import static org.testng.Assert.assertEquals; +import static org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest.TestPayloadProcessor; +import java.util.HashSet; +import java.util.Set; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl; +import org.apache.pulsar.broker.intercept.MangedLedgerInterceptorImplTest; +import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor; +import org.awaitility.Awaitility; +import org.testng.annotations.Test; + +/*** + * Differ to {@link MangedLedgerInterceptorImplTest}, this test can call {@link ManagedLedgerImpl}'s methods modified + * by "default". + */ +@Slf4j +@Test(groups = "broker") +public class MangedLedgerInterceptorImplTest2 extends MockedBookKeeperTestCase { + + public static void switchLedgerManually(ManagedLedgerImpl ledger){ + LedgerHandle originalLedgerHandle = ledger.currentLedger; + ledger.ledgerClosed(ledger.currentLedger); + ledger.createLedgerAfterClosed(); + Awaitility.await().until(() -> { + return ledger.state == ManagedLedgerImpl.State.LedgerOpened && ledger.currentLedger != originalLedgerHandle; + }); + } + + @Test + public void testCurrentLedgerSizeCorrectIfHasInterceptor() throws Exception { + final String mlName = "ml1"; + final String cursorName = "cursor1"; + + // Registry interceptor. + ManagedLedgerConfig config = new ManagedLedgerConfig(); + Set processors = new HashSet(); + processors.add(new TestPayloadProcessor()); + ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(new HashSet(), processors); + config.setManagedLedgerInterceptor(interceptor); + config.setMaxEntriesPerLedger(100); + + // Add one entry. + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(cursorName); + ledger.addEntry(new byte[1]); + + // Mark "currentLedgerSize" and switch ledger. + long currentLedgerSize = ledger.getCurrentLedgerSize(); + switchLedgerManually(ledger); + + // verify. + assertEquals(currentLedgerSize, MangedLedgerInterceptorImplTest.calculatePreciseSize(ledger)); + + // cleanup. + cursor.close(); + ledger.close(); + factory.getEntryCacheManager().clear(); + factory.shutdown(); + config.setManagedLedgerInterceptor(null); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java index ec9dd53685f87..0cf0adb5181ec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java @@ -20,6 +20,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.function.Predicate; import lombok.Cleanup; @@ -32,6 +33,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.OpAddEntry; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; @@ -60,7 +62,7 @@ public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase { private static final Logger log = LoggerFactory.getLogger(MangedLedgerInterceptorImplTest.class); - public class TestPayloadProcessor implements ManagedLedgerPayloadProcessor { + public static class TestPayloadProcessor implements ManagedLedgerPayloadProcessor { @Override public Processor inputProcessor() { return new Processor() { @@ -158,6 +160,47 @@ public void testMessagePayloadProcessor() throws Exception { config.setManagedLedgerInterceptor(null); } + @Test + public void testTotalSizeCorrectIfHasInterceptor() throws Exception { + final String mlName = "ml1"; + final String cursorName = "cursor1"; + + // Registry interceptor. + ManagedLedgerConfig config = new ManagedLedgerConfig(); + Set processors = new HashSet(); + processors.add(new TestPayloadProcessor()); + ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(new HashSet(), processors); + config.setManagedLedgerInterceptor(interceptor); + config.setMaxEntriesPerLedger(2); + + // Add many entries and consume. + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(mlName, config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(cursorName); + for (int i = 0; i < 5; i++){ + cursor.delete(ledger.addEntry(new byte[1])); + } + + // Trim ledgers. + CompletableFuture trimLedgerFuture = new CompletableFuture<>(); + ledger.trimConsumedLedgersInBackground(trimLedgerFuture); + trimLedgerFuture.join(); + + // verify. + assertEquals(ledger.getTotalSize(), calculatePreciseSize(ledger)); + + // cleanup. + cursor.close(); + ledger.close(); + factory.getEntryCacheManager().clear(); + factory.shutdown(); + config.setManagedLedgerInterceptor(null); + } + + public static long calculatePreciseSize(ManagedLedgerImpl ledger){ + return ledger.getLedgersInfo().values().stream() + .map(info -> info.getSize()).reduce((l1,l2) -> l1 + l2).orElse(0L) + ledger.getCurrentLedgerSize(); + } + @Test(timeOut = 20000) public void testRecoveryIndex() throws Exception { final int MOCK_BATCH_SIZE = 2; From e944c8950df58236fe59d1c771f197ae12f5135c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 3 Feb 2023 09:52:38 +0800 Subject: [PATCH 2/5] address comment --- .../java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 1906500b47a67..da021c1f2631b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -138,8 +138,10 @@ public void initiate() { duplicateBuffer = payloadProcessorHandle.getProcessedPayload(); } } - this.dataLength = duplicateBuffer.readableBytes(); - this.ml.currentLedgerSize += (dataLength - originalDataLen); + if (originalDataLen != duplicateBuffer.readableBytes()) { + this.dataLength = duplicateBuffer.readableBytes(); + this.ml.currentLedgerSize += (dataLength - originalDataLen); + } ledger.asyncAddEntry(duplicateBuffer, this, addOpCount); } else { log.warn("[{}] initiate with unexpected state {}, expect OPEN state.", ml.getName(), state); From 6602f1168eaf77bed668db5ba5b067332d27c52c Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 3 Feb 2023 09:53:49 +0800 Subject: [PATCH 3/5] address comment --- .../java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index da021c1f2631b..89e720ad0cb2b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -138,7 +138,7 @@ public void initiate() { duplicateBuffer = payloadProcessorHandle.getProcessedPayload(); } } - if (originalDataLen != duplicateBuffer.readableBytes()) { + if (payloadProcessorHandle != null && originalDataLen != duplicateBuffer.readableBytes()) { this.dataLength = duplicateBuffer.readableBytes(); this.ml.currentLedgerSize += (dataLength - originalDataLen); } From ae6611237ecbda814c0530e2b24585e1e9fc3776 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 3 Feb 2023 09:56:54 +0800 Subject: [PATCH 4/5] address comment --- .../org/apache/bookkeeper/mledger/impl/OpAddEntry.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 89e720ad0cb2b..65cf0180f1ced 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -136,12 +136,13 @@ public void initiate() { duplicateBuffer); if (payloadProcessorHandle != null) { duplicateBuffer = payloadProcessorHandle.getProcessedPayload(); + // If data len of entry changes, correct "dataLength" and "currentLedgerSize". + if (originalDataLen != duplicateBuffer.readableBytes()) { + this.dataLength = duplicateBuffer.readableBytes(); + this.ml.currentLedgerSize += (dataLength - originalDataLen); + } } } - if (payloadProcessorHandle != null && originalDataLen != duplicateBuffer.readableBytes()) { - this.dataLength = duplicateBuffer.readableBytes(); - this.ml.currentLedgerSize += (dataLength - originalDataLen); - } ledger.asyncAddEntry(duplicateBuffer, this, addOpCount); } else { log.warn("[{}] initiate with unexpected state {}, expect OPEN state.", ml.getName(), state); From 2dec903d4a630de798b88243911b21adca3a68fe Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Fri, 3 Feb 2023 11:09:35 +0800 Subject: [PATCH 5/5] improve code --- .../java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 65cf0180f1ced..c6c341fd921d1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -125,13 +125,13 @@ public void setCloseWhenDone(boolean closeWhenDone) { public void initiate() { if (STATE_UPDATER.compareAndSet(OpAddEntry.this, State.OPEN, State.INITIATED)) { - long originalDataLen = data.readableBytes(); ByteBuf duplicateBuffer = data.retainedDuplicate(); // internally asyncAddEntry() will take the ownership of the buffer and release it at the end addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml); lastInitTime = System.nanoTime(); if (ml.getManagedLedgerInterceptor() != null) { + long originalDataLen = data.readableBytes(); payloadProcessorHandle = ml.getManagedLedgerInterceptor().processPayloadBeforeLedgerWrite(this, duplicateBuffer); if (payloadProcessorHandle != null) {