From 1918e00eabf6daa6a8a48a651465b868ea91b331 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Mon, 21 Dec 2020 17:27:27 +0800 Subject: [PATCH 01/12] add offset for broker entry metadata --- .../pulsar/common/api/proto/PulsarApi.java | 57 +++++++++++++++++++ pulsar-common/src/main/proto/PulsarApi.proto | 1 + 2 files changed, 58 insertions(+) diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index aa9f2c1d4e6a0..e2d5680d48333 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -7074,6 +7074,10 @@ public interface BrokerEntryMetadataOrBuilder // optional uint64 broker_timestamp = 1; boolean hasBrokerTimestamp(); long getBrokerTimestamp(); + + // optional uint64 offset = 2; + boolean hasOffset(); + long getOffset(); } public static final class BrokerEntryMetadata extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -7120,8 +7124,19 @@ public long getBrokerTimestamp() { return brokerTimestamp_; } + // optional uint64 offset = 2; + public static final int OFFSET_FIELD_NUMBER = 2; + private long offset_; + public boolean hasOffset() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public long getOffset() { + return offset_; + } + private void initFields() { brokerTimestamp_ = 0L; + offset_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -7143,6 +7158,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00000001) == 0x00000001)) { output.writeUInt64(1, brokerTimestamp_); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeUInt64(2, offset_); + } } private int memoizedSerializedSize = -1; @@ -7155,6 +7173,10 @@ public int getSerializedSize() { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeUInt64Size(1, brokerTimestamp_); } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeUInt64Size(2, offset_); + } memoizedSerializedSize = size; return size; } @@ -7270,6 +7292,8 @@ public Builder clear() { super.clear(); brokerTimestamp_ = 0L; bitField0_ = (bitField0_ & ~0x00000001); + offset_ = 0L; + bitField0_ = (bitField0_ & ~0x00000002); return this; } @@ -7307,6 +7331,10 @@ public org.apache.pulsar.common.api.proto.PulsarApi.BrokerEntryMetadata buildPar to_bitField0_ |= 0x00000001; } result.brokerTimestamp_ = brokerTimestamp_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.offset_ = offset_; result.bitField0_ = to_bitField0_; return result; } @@ -7316,6 +7344,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.BrokerEntr if (other.hasBrokerTimestamp()) { setBrokerTimestamp(other.getBrokerTimestamp()); } + if (other.hasOffset()) { + setOffset(other.getOffset()); + } return this; } @@ -7350,6 +7381,11 @@ public Builder mergeFrom( brokerTimestamp_ = input.readUInt64(); break; } + case 16: { + bitField0_ |= 0x00000002; + offset_ = input.readUInt64(); + break; + } } } } @@ -7377,6 +7413,27 @@ public Builder clearBrokerTimestamp() { return this; } + // optional uint64 offset = 2; + private long offset_ ; + public boolean hasOffset() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + public long getOffset() { + return offset_; + } + public Builder setOffset(long value) { + bitField0_ |= 0x00000002; + offset_ = value; + + return this; + } + public Builder clearOffset() { + bitField0_ = (bitField0_ & ~0x00000002); + offset_ = 0L; + + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.BrokerEntryMetadata) } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index b99882864f3d8..7767e9b173097 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -183,6 +183,7 @@ message SingleMessageMetadata { // metadata added for entry from broker message BrokerEntryMetadata { optional uint64 broker_timestamp = 1; + optional uint64 offset = 2; } enum ServerError { From 74a9a083cf4bc59fcd4ab498f9e2b395c50bffdb Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Tue, 22 Dec 2020 10:50:54 +0800 Subject: [PATCH 02/12] add managedledger interceptor --- .../bookkeeper/mledger/impl/OpAddEntry.java | 12 ++- .../interceptor/ManagedLedgerInterceptor.java | 33 +++++++ .../ManagedLedgerInterceptorImpl.java | 91 +++++++++++++++++++ .../pulsar/common/protocol/Commands.java | 33 +++++++ 4 files changed, 167 insertions(+), 2 deletions(-) create mode 100644 managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 08d188fc66dac..c46b1d02d72d8 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -44,7 +44,7 @@ * Handles the life-cycle of an addEntry() operation. * */ -class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { +public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback { protected ManagedLedgerImpl ml; LedgerHandle ledger; private long entryId; @@ -272,7 +272,15 @@ void close() { public State getState() { return state; } - + + public ByteBuf getData() { + return data; + } + + public void setData(ByteBuf data) { + this.data = data; + } + private final Handle recyclerHandle; private OpAddEntry(Handle recyclerHandle) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java new file mode 100644 index 0000000000000..e36519af4bf35 --- /dev/null +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.bookkeeper.mledger.interceptor; + +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.mledger.impl.OpAddEntry; + +import java.util.Map; + +/** + * Interceptor for ManagedLedger. + * */ +public interface ManagedLedgerInterceptor { + OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize); + void onManagedLedgerPropertiesInitialize(Map propertiesMap); + void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle ledgerHandle); +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java new file mode 100644 index 0000000000000..3634e09cdcf14 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java @@ -0,0 +1,91 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.intercept; + +import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.client.api.LedgerEntries; +import org.apache.bookkeeper.client.api.LedgerEntry; +import org.apache.bookkeeper.mledger.impl.OpAddEntry; +import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; +import org.apache.pulsar.common.protocol.Commands; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; + + +public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor { + private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImpl.class); + private static final String OFFSET = "offset"; + + private AtomicLong offsetGenerator; + private Set brokerEntryMetadataInterceptors; + + public ManagedLedgerInterceptorImpl() { + offsetGenerator = new AtomicLong(-1); + } + + public ManagedLedgerInterceptorImpl(AtomicLong offsetGenerator, + Set brokerEntryMetadataInterceptors) { + this.offsetGenerator = offsetGenerator; + this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors; + } + + @Override + public OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize) { + assert op != null; + assert batchSize > 0; + op.setData(Commands.addBrokerEntryMetadata(op.getData(), + brokerEntryMetadataInterceptors, offsetGenerator, batchSize)); + return op; + } + + @Override + public void onManagedLedgerPropertiesInitialize(Map propertiesMap) { + assert propertiesMap != null; + assert propertiesMap.size() > 0; + + if (propertiesMap.containsKey(OFFSET)) { + offsetGenerator.set(Long.parseLong(propertiesMap.get(OFFSET))); + } + } + + @Override + public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) { + try { + LedgerEntries ledgerEntries = + lh.read(lh.getLastAddConfirmed() - 1, lh.getLastAddConfirmed()); + for (LedgerEntry entry : ledgerEntries) { + PulsarApi.BrokerEntryMetadata brokerEntryMetadata = + Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer()); + if (brokerEntryMetadata != null) { + if (brokerEntryMetadata.hasOffset() && offsetGenerator.get() < brokerEntryMetadata.getOffset()) { + offsetGenerator.set(brokerEntryMetadata.getOffset()); + } + } + } + } catch (org.apache.bookkeeper.client.api.BKException | InterruptedException e) { + log.error("[{}] Read last entry error.", name, e); + } + } +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 7ec913244a5ee..742468f4fa1c1 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; @@ -1962,6 +1963,38 @@ public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload, return compositeByteBuf; } + public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload, + Set brokerInterceptors, + AtomicLong offsetGenerator, + int batchSize) { + // | BROKER_ENTRY_METADATA_MAGIC_NUMBER | BROKER_ENTRY_METADATA_SIZE | BROKER_ENTRY_METADATA | + // | 2 bytes | 4 bytes | BROKER_ENTRY_METADATA_SIZE bytes | + + PulsarApi.BrokerEntryMetadata.Builder brokerMetadataBuilder = PulsarApi.BrokerEntryMetadata.newBuilder(); + for (BrokerEntryMetadataInterceptor interceptor : brokerInterceptors) { + interceptor.intercept(brokerMetadataBuilder); + } + brokerMetadataBuilder.setOffset(offsetGenerator.addAndGet(batchSize)); + PulsarApi.BrokerEntryMetadata brokerEntryMetadata = brokerMetadataBuilder.build(); + int brokerMetaSize = brokerEntryMetadata.getSerializedSize(); + ByteBuf brokerMeta = + PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6); + brokerMeta.writeShort(Commands.magicBrokerEntryMetadata); + brokerMeta.writeInt(brokerMetaSize); + ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get(brokerMeta); + try { + brokerEntryMetadata.writeTo(outStream); + } catch (IOException e) { + // This is in-memory serialization, should not fail + throw new RuntimeException(e); + } + outStream.recycle(); + + CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer(); + compositeByteBuf.addComponents(true, brokerMeta, headerAndPayload); + return compositeByteBuf; + } + public static ByteBuf skipBrokerEntryMetadataIfExist(ByteBuf headerAndPayloadWithBrokerEntryMetadata) { int readerIndex = headerAndPayloadWithBrokerEntryMetadata.readerIndex(); if (headerAndPayloadWithBrokerEntryMetadata.readShort() == magicBrokerEntryMetadata) { From c38bba15f646654783fd7a27c8bf0ec498cb2384 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Tue, 22 Dec 2020 11:09:01 +0800 Subject: [PATCH 03/12] initialize intercptor for ML --- .../apache/bookkeeper/mledger/ManagedLedgerConfig.java | 9 +++++++++ .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 6 ++++++ .../org/apache/pulsar/broker/service/BrokerService.java | 6 ++++++ 3 files changed, 21 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index b1f25122abbe5..7f982b15e88d0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -33,6 +33,7 @@ import org.apache.bookkeeper.common.annotation.InterfaceStability; import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; +import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet; /** @@ -75,6 +76,7 @@ public class ManagedLedgerConfig { private LedgerOffloader ledgerOffloader = NullLedgerOffloader.INSTANCE; private int newEntriesCheckDelayInMillis = 10; private Clock clock = Clock.systemUTC(); + private ManagedLedgerInterceptor managedLedgerInterceptor; public boolean isCreateIfMissing() { return createIfMissing; @@ -637,4 +639,11 @@ public void setNewEntriesCheckDelayInMillis(int newEntriesCheckDelayInMillis) { this.newEntriesCheckDelayInMillis = newEntriesCheckDelayInMillis; } + public ManagedLedgerInterceptor getManagedLedgerInterceptor() { + return managedLedgerInterceptor; + } + + public void setManagedLedgerInterceptor(ManagedLedgerInterceptor managedLedgerInterceptor) { + this.managedLedgerInterceptor = managedLedgerInterceptor; + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ba50d2ba4270c..89eb90d41b190 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -108,6 +108,7 @@ import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback; +import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; import org.apache.bookkeeper.mledger.offload.OffloadUtils; import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; @@ -195,6 +196,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { volatile PositionImpl lastConfirmedEntry; + private ManagedLedgerInterceptor managedLedgerInterceptor; + protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3; protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60; @@ -283,6 +286,7 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0)); this.mlOwnershipChecker = mlOwnershipChecker; this.propertiesMap = Maps.newHashMap(); + this.managedLedgerInterceptor = config.getManagedLedgerInterceptor(); } synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) { @@ -310,6 +314,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { propertiesMap.put(property.getKey(), property.getValue()); } } + managedLedgerInterceptor.onManagedLedgerPropertiesInitialize(propertiesMap); // Last ledger stat may be zeroed, we must update it if (ledgers.size() > 0) { @@ -325,6 +330,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { .setEntries(lh.getLastAddConfirmed() + 1).setSize(lh.getLength()) .setTimestamp(clock.millis()).build(); ledgers.put(id, info); + managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh); initializeBookKeeper(callback); } else if (isNoSuchLedgerExistsException(rc)) { log.warn("[{}] Ledger not found: {}", name, ledgers.lastKey()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6cfdf38b75ca3..8e4d55dd9247e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -83,6 +83,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.bookkeeper.util.ZkUtils; import org.apache.commons.lang3.StringUtils; @@ -98,6 +99,7 @@ import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader; import org.apache.pulsar.broker.intercept.BrokerInterceptor; +import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl; import org.apache.pulsar.broker.loadbalance.LoadManager; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; @@ -1092,6 +1094,10 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, } getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> { + // init managedLedger interceptor + ManagedLedgerInterceptor mlInterceptor = + new ManagedLedgerInterceptorImpl(new AtomicLong(-1), brokerEntryMetadataInterceptors); + managedLedgerConfig.setManagedLedgerInterceptor(mlInterceptor); managedLedgerConfig.setCreateIfMissing(createIfMissing); // Once we have the configuration, we can proceed with the async open operation From 483cee9faa435addb5c127b2996f73bb4e4e58da Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Tue, 22 Dec 2020 11:37:00 +0800 Subject: [PATCH 04/12] interceptor entry before add to bookie to generator offset --- .../bookkeeper/mledger/ManagedLedger.java | 15 ++++++++ .../mledger/ManagedLedgerException.java | 6 ++++ .../mledger/impl/ManagedLedgerImpl.java | 36 ++++++++++++++++++- .../bookkeeper/mledger/impl/OpAddEntry.java | 31 ++++++++++++++++ .../service/persistent/PersistentTopic.java | 23 +----------- 5 files changed, 88 insertions(+), 23 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 4b06b5ee9299a..b125bf33d3263 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -132,6 +132,21 @@ public interface ManagedLedger { */ void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx); + /** + * Append a new entry asynchronously. + * + * @see #addEntry(byte[]) + * @param buffer + * buffer with the data entry + * @param batchSize + * batch size for data entry + * @param callback + * callback object + * @param ctx + * opaque context + */ + void asyncAddEntry(ByteBuf buffer, int batchSize, AddEntryCallback callback, Object ctx); + /** * Open a ManagedCursor in this ManagedLedger. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 14202cb824f24..0f56b993912c3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -157,6 +157,12 @@ public CursorNotFoundException(String msg) { } } + public static class ManagedLedgerInterceptException extends ManagedLedgerException { + public ManagedLedgerInterceptException(String msg) { + super(msg); + } + } + @Override public synchronized Throwable fillInStackTrace() { // Disable stack traces to be filled in diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 89eb90d41b190..dbe68e9b14136 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -64,6 +64,8 @@ import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; + +import io.netty.util.ReferenceCountUtil; import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.AsyncCallback.CreateCallback; import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; @@ -98,6 +100,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.InvalidCursorPositionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerInterceptException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; @@ -615,6 +618,18 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation))); } + @Override + public void asyncAddEntry(ByteBuf buffer, int batchSize, AddEntryCallback callback, Object ctx) { + if (log.isDebugEnabled()) { + log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state); + } + + OpAddEntry addOperation = OpAddEntry.create(this, buffer, batchSize, callback, ctx); + + // Jump to specific thread to avoid contention from writers writing from different threads + executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation))); + } + private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { pendingAddEntries.add(addOperation); final State state = STATE_UPDATER.get(this); @@ -678,8 +693,27 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { addOperation.setCloseWhenDone(true); STATE_UPDATER.set(this, State.ClosingLedger); } + // interceptor entry before add to bookie + if (beforeAddEntry(addOperation)) { + addOperation.initiate(); + } + } + } - addOperation.initiate(); + private boolean beforeAddEntry(OpAddEntry addOperation) { + // if no interceptor, just return true to make sure addOperation will be initiate() + if (managedLedgerInterceptor == null) { + return true; + } + try { + managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getBatchSize()); + return true; + } catch (Exception e) { + addOperation.failed( + new ManagedLedgerInterceptException("Interceptor managed ledger before add to bookie failed.")); + ReferenceCountUtil.release(addOperation.data); + log.error("[{}] Failed to interceptor entry before add to bookie.", name, e); + return false; } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index c46b1d02d72d8..903bf28f274dd 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -48,6 +48,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba protected ManagedLedgerImpl ml; LedgerHandle ledger; private long entryId; + private int batchSize; @SuppressWarnings("unused") private static final AtomicReferenceFieldUpdater callbackUpdater = @@ -95,6 +96,27 @@ public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCall return op; } + public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int batchSize, AddEntryCallback callback, Object ctx) { + OpAddEntry op = RECYCLER.get(); + op.ml = ml; + op.ledger = null; + op.batchSize = batchSize; + op.data = data.retain(); + op.dataLength = data.readableBytes(); + op.callback = callback; + op.ctx = ctx; + op.addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml); + op.closeWhenDone = false; + op.entryId = -1; + op.startTime = System.nanoTime(); + op.state = State.OPEN; + ml.mbean.addAddEntrySample(op.dataLength); + if (log.isDebugEnabled()) { + log.debug("Created new OpAddEntry {}", op); + } + return op; + } + public void setLedger(LedgerHandle ledger) { this.ledger = ledger; } @@ -277,6 +299,14 @@ public ByteBuf getData() { return data; } + public int getBatchSize() { + return batchSize; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + public void setData(ByteBuf data) { this.data = data; } @@ -298,6 +328,7 @@ public void recycle() { ml = null; ledger = null; data = null; + batchSize = 0; dataLength = -1; callback = null; ctx = null; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 676f98cb8b41d..3dd2d64e45840 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -357,10 +357,7 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont messageDeduplication.isDuplicate(publishContext, headersAndPayload); switch (status) { case NotDup: - // intercept headersAndPayload and add entry metadata - if (appendBrokerEntryMetadata(headersAndPayload, publishContext)) { - ledger.asyncAddEntry(headersAndPayload, this, publishContext); - } + ledger.asyncAddEntry(headersAndPayload, this, publishContext); break; case Dup: // Immediately acknowledge duplicated message @@ -374,24 +371,6 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont } } - private boolean appendBrokerEntryMetadata(ByteBuf headersAndPayload, PublishContext publishContext) { - // just return true if BrokerEntryMetadata is not enabled - if (!brokerService.isBrokerEntryMetadataEnabled()) { - return true; - } - - try { - headersAndPayload = Commands.addBrokerEntryMetadata(headersAndPayload, - brokerService.getBrokerEntryMetadataInterceptors()); - } catch (Exception e) { - decrementPendingWriteOpsAndCheck(); - publishContext.completed(new BrokerServiceException.AddEntryMetadataException(e), -1, -1); - log.error("[{}] Failed to add broker entry metadata.", topic, e); - return false; - } - return true; - } - public void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) { if (ledger instanceof ManagedLedgerImpl) { ((ManagedLedgerImpl) ledger).asyncReadEntry(position, callback, ctx); From 01da511fbfefecf2411d9038ae98e3dab4b6e569 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Tue, 22 Dec 2020 16:50:02 +0800 Subject: [PATCH 05/12] add asyncFindPosition method for ManagedLedger --- .../bookkeeper/mledger/ManagedLedger.java | 8 ++++ .../mledger/impl/ManagedLedgerImpl.java | 45 +++++++++++++++++++ .../bookkeeper/mledger/impl/OpFindNewest.java | 28 +++++++++--- 3 files changed, 76 insertions(+), 5 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index b125bf33d3263..6e65a6e73fb07 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -22,6 +22,8 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.function.BiFunction; +import java.util.function.Predicate; import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.annotation.InterfaceStability; @@ -32,6 +34,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; +import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; /** @@ -535,4 +538,9 @@ void asyncSetProperties(Map properties, final AsyncCallbacks.Upd * Roll current ledger if it is full */ void rollCurrentLedgerIfFull(); + + /** + * Find position by sequenceId. + * */ + CompletableFuture asyncFindPosition(com.google.common.base.Predicate predicate); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index dbe68e9b14136..46ca8bfd3ed56 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -79,6 +79,7 @@ import org.apache.bookkeeper.common.util.OrderedExecutor; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.common.util.Retries; +import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; @@ -121,9 +122,12 @@ import org.apache.bookkeeper.mledger.util.CallbackMutex; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; +import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.metadata.api.Stat; +import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1510,6 +1514,47 @@ public void closeComplete(int rc, LedgerHandle lh, Object o) { } } + @Override + public CompletableFuture asyncFindPosition(com.google.common.base.Predicate predicate) { + + + CompletableFuture future = new CompletableFuture(); + Long firstLedgerId = ledgers.firstKey(); + final PositionImpl startPosition = firstLedgerId == null ? null : new PositionImpl(firstLedgerId, 0); + if (startPosition == null) { + future.complete(null); + return future; + } + AsyncCallbacks.FindEntryCallback findEntryCallback = new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + final Position finalPosition; + if (position == null) { + finalPosition = startPosition; + if (finalPosition == null) { + log.warn("[{}] Unable to find position for predicate {}.", name, predicate); + future.complete(null); + return; + } + log.info("[{}] Unable to find position for predicate {}. Use the first position {} instead.", name, predicate, startPosition); + } else { + finalPosition = getNextValidPosition((PositionImpl) position); + } + future.complete((PositionImpl) finalPosition); + } + + @Override + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { + log.warn("[{}] Unable to find position for predicate {}.", name, predicate); + future.complete(null); + } + }; + long max = getNumberOfEntries() - 1; + OpFindNewest op = new OpFindNewest(this, startPosition, predicate, max, findEntryCallback, null); + op.find(); + return future; + } + void clearPendingAddEntries(ManagedLedgerException e) { while (!pendingAddEntries.isEmpty()) { OpAddEntry op = pendingAddEntries.poll(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java index 861d2472e05d6..cbecedd1640c2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java @@ -31,6 +31,7 @@ class OpFindNewest implements ReadEntryCallback { private final ManagedCursorImpl cursor; + private final ManagedLedgerImpl ledger; private final PositionImpl startPosition; private final FindEntryCallback callback; private final Predicate condition; @@ -49,6 +50,23 @@ enum State { public OpFindNewest(ManagedCursorImpl cursor, PositionImpl startPosition, Predicate condition, long numberOfEntries, FindEntryCallback callback, Object ctx) { this.cursor = cursor; + this.ledger = cursor.ledger; + this.startPosition = startPosition; + this.callback = callback; + this.condition = condition; + this.ctx = ctx; + + this.min = 0; + this.max = numberOfEntries; + + this.searchPosition = startPosition; + this.state = State.checkFirst; + } + + public OpFindNewest(ManagedLedgerImpl ledger, PositionImpl startPosition, Predicate condition, + long numberOfEntries, FindEntryCallback callback, Object ctx) { + this.cursor = null; + this.ledger = ledger; this.startPosition = startPosition; this.callback = callback; this.condition = condition; @@ -77,7 +95,7 @@ public void readEntryComplete(Entry entry, Object ctx) { // check last entry state = State.checkLast; - searchPosition = cursor.ledger.getPositionAfterN(searchPosition, max, PositionBound.startExcluded); + searchPosition = ledger.getPositionAfterN(searchPosition, max, PositionBound.startExcluded); find(); } break; @@ -88,7 +106,7 @@ public void readEntryComplete(Entry entry, Object ctx) { } else { // start binary search state = State.searching; - searchPosition = cursor.ledger.getPositionAfterN(startPosition, mid(), PositionBound.startExcluded); + searchPosition = ledger.getPositionAfterN(startPosition, mid(), PositionBound.startExcluded); find(); } break; @@ -106,7 +124,7 @@ public void readEntryComplete(Entry entry, Object ctx) { callback.findEntryComplete(lastMatchedPosition, OpFindNewest.this.ctx); return; } - searchPosition = cursor.ledger.getPositionAfterN(startPosition, mid(), PositionBound.startExcluded); + searchPosition = ledger.getPositionAfterN(startPosition, mid(), PositionBound.startExcluded); find(); } } @@ -117,8 +135,8 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { } public void find() { - if (cursor.hasMoreEntries(searchPosition)) { - cursor.ledger.asyncReadEntry(searchPosition, this, null); + if (cursor != null ? cursor.hasMoreEntries(searchPosition) : ledger.hasMoreEntries(searchPosition)) { + ledger.asyncReadEntry(searchPosition, this, null); } else { callback.findEntryComplete(lastMatchedPosition, OpFindNewest.this.ctx); } From 4d1dfb397de9e156dec579d4aba8ad57b3bfa86f Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Tue, 22 Dec 2020 19:15:01 +0800 Subject: [PATCH 06/12] support append offset in managedInterceptor --- .../mledger/impl/ManagedLedgerImpl.java | 15 +++--- .../ManagedLedgerInterceptorImpl.java | 45 ++++++++-------- .../pulsar/broker/service/BrokerService.java | 19 +++++-- ...endBrokerTimestampMetadataInterceptor.java | 8 +++ .../AppendOffsetMetadataInterceptor.java | 51 +++++++++++++++++++ .../BrokerEntryMetadataInterceptor.java | 2 + .../pulsar/common/protocol/Commands.java | 3 +- 7 files changed, 111 insertions(+), 32 deletions(-) create mode 100644 pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendOffsetMetadataInterceptor.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 46ca8bfd3ed56..de4527ba6671b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -122,12 +122,9 @@ import org.apache.bookkeeper.mledger.util.CallbackMutex; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.commons.lang3.tuple.Pair; -import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; -import org.apache.pulsar.common.protocol.Commands; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.pulsar.metadata.api.Stat; -import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -293,7 +290,9 @@ public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper this.maximumRolloverTimeMs = (long) (config.getMaximumRolloverTimeMs() * (1 + random.nextDouble() * 5 / 100.0)); this.mlOwnershipChecker = mlOwnershipChecker; this.propertiesMap = Maps.newHashMap(); - this.managedLedgerInterceptor = config.getManagedLedgerInterceptor(); + if (config.getManagedLedgerInterceptor() != null) { + this.managedLedgerInterceptor = config.getManagedLedgerInterceptor(); + } } synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callback, final Object ctx) { @@ -321,7 +320,9 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { propertiesMap.put(property.getKey(), property.getValue()); } } - managedLedgerInterceptor.onManagedLedgerPropertiesInitialize(propertiesMap); + if (managedLedgerInterceptor != null) { + managedLedgerInterceptor.onManagedLedgerPropertiesInitialize(propertiesMap); + } // Last ledger stat may be zeroed, we must update it if (ledgers.size() > 0) { @@ -337,7 +338,9 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { .setEntries(lh.getLastAddConfirmed() + 1).setSize(lh.getLength()) .setTimestamp(clock.millis()).build(); ledgers.put(id, info); - managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh); + if (managedLedgerInterceptor != null) { + managedLedgerInterceptor.onManagedLedgerLastLedgerInitialize(name, lh); + } initializeBookKeeper(callback); } else if (isNoSuchLedgerExistsException(rc)) { log.warn("[{}] Ledger not found: {}", name, ledgers.lastKey()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java index 3634e09cdcf14..83700df2fa1f7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java @@ -24,6 +24,7 @@ import org.apache.bookkeeper.mledger.impl.OpAddEntry; import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.intercept.AppendOffsetMetadataInterceptor; import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; import org.apache.pulsar.common.protocol.Commands; import org.slf4j.Logger; @@ -31,32 +32,27 @@ import java.util.Map; import java.util.Set; -import java.util.concurrent.atomic.AtomicLong; public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor { private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImpl.class); private static final String OFFSET = "offset"; - private AtomicLong offsetGenerator; - private Set brokerEntryMetadataInterceptors; - public ManagedLedgerInterceptorImpl() { - offsetGenerator = new AtomicLong(-1); - } + private final Set brokerEntryMetadataInterceptors; + - public ManagedLedgerInterceptorImpl(AtomicLong offsetGenerator, - Set brokerEntryMetadataInterceptors) { - this.offsetGenerator = offsetGenerator; + public ManagedLedgerInterceptorImpl(Set brokerEntryMetadataInterceptors) { this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors; } + @Override public OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize) { assert op != null; assert batchSize > 0; - op.setData(Commands.addBrokerEntryMetadata(op.getData(), - brokerEntryMetadataInterceptors, offsetGenerator, batchSize)); + + op.setData(Commands.addBrokerEntryMetadata(op.getData(), brokerEntryMetadataInterceptors, batchSize)); return op; } @@ -66,22 +62,31 @@ public void onManagedLedgerPropertiesInitialize(Map propertiesMa assert propertiesMap.size() > 0; if (propertiesMap.containsKey(OFFSET)) { - offsetGenerator.set(Long.parseLong(propertiesMap.get(OFFSET))); + for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { + if (interceptor instanceof AppendOffsetMetadataInterceptor) { + ((AppendOffsetMetadataInterceptor) interceptor) + .recoveryOffsetGenerator(Long.parseLong(propertiesMap.get(OFFSET))); + } + } } } @Override public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) { try { - LedgerEntries ledgerEntries = - lh.read(lh.getLastAddConfirmed() - 1, lh.getLastAddConfirmed()); - for (LedgerEntry entry : ledgerEntries) { - PulsarApi.BrokerEntryMetadata brokerEntryMetadata = - Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer()); - if (brokerEntryMetadata != null) { - if (brokerEntryMetadata.hasOffset() && offsetGenerator.get() < brokerEntryMetadata.getOffset()) { - offsetGenerator.set(brokerEntryMetadata.getOffset()); + for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { + if (interceptor instanceof AppendOffsetMetadataInterceptor) { + LedgerEntries ledgerEntries = + lh.read(lh.getLastAddConfirmed() - 1, lh.getLastAddConfirmed()); + for (LedgerEntry entry : ledgerEntries) { + PulsarApi.BrokerEntryMetadata brokerEntryMetadata = + Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer()); + if (brokerEntryMetadata != null && brokerEntryMetadata.hasOffset()) { + ((AppendOffsetMetadataInterceptor) interceptor) + .recoveryOffsetGenerator(brokerEntryMetadata.getOffset()); + } } + } } } catch (org.apache.bookkeeper.client.api.BKException | InterruptedException e) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 8e4d55dd9247e..574d70fdb8a78 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -128,6 +128,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.FieldContext; +import org.apache.pulsar.common.intercept.AppendOffsetMetadataInterceptor; import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -1094,10 +1095,20 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, } getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> { - // init managedLedger interceptor - ManagedLedgerInterceptor mlInterceptor = - new ManagedLedgerInterceptorImpl(new AtomicLong(-1), brokerEntryMetadataInterceptors); - managedLedgerConfig.setManagedLedgerInterceptor(mlInterceptor); + if (isBrokerEntryMetadataEnabled()) { + // init managedLedger interceptor + for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { + if (interceptor instanceof AppendOffsetMetadataInterceptor) { + // add individual AppendOffsetMetadataInterceptor for each topic + brokerEntryMetadataInterceptors.remove(interceptor); + brokerEntryMetadataInterceptors.add(new AppendOffsetMetadataInterceptor()); + } + } + ManagedLedgerInterceptor mlInterceptor = + new ManagedLedgerInterceptorImpl(brokerEntryMetadataInterceptors); + managedLedgerConfig.setManagedLedgerInterceptor(mlInterceptor); + } + managedLedgerConfig.setCreateIfMissing(createIfMissing); // Once we have the configuration, we can proceed with the async open operation diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java index 6043f26f61c9c..78cdfc845e7c2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendBrokerTimestampMetadataInterceptor.java @@ -30,4 +30,12 @@ public class AppendBrokerTimestampMetadataInterceptor implements BrokerEntryMeta public PulsarApi.BrokerEntryMetadata.Builder intercept(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata) { return brokerMetadata.setBrokerTimestamp(System.currentTimeMillis()); } + + @Override + public PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize( + PulsarApi.BrokerEntryMetadata.Builder brokerMetadata, + int batchSize) { + // do nothing, just return brokerMetadata + return brokerMetadata; + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendOffsetMetadataInterceptor.java b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendOffsetMetadataInterceptor.java new file mode 100644 index 0000000000000..3555133b35325 --- /dev/null +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendOffsetMetadataInterceptor.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.common.intercept; + +import org.apache.pulsar.common.api.proto.PulsarApi; + +import java.util.concurrent.atomic.AtomicLong; + +public class AppendOffsetMetadataInterceptor implements BrokerEntryMetadataInterceptor{ + private final AtomicLong offsetGenerator; + + public AppendOffsetMetadataInterceptor() { + this.offsetGenerator = new AtomicLong(-1); + } + + public void recoveryOffsetGenerator(long offset) { + if (offsetGenerator.get() < offset) { + offsetGenerator.set(offset); + } + } + + @Override + public PulsarApi.BrokerEntryMetadata.Builder intercept(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata) { + // do nothing, just return brokerMetadata + return brokerMetadata; + } + + @Override + public PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize( + PulsarApi.BrokerEntryMetadata.Builder brokerMetadata, + int batchSize) { + return brokerMetadata.setOffset(offsetGenerator.addAndGet(batchSize)); + } + +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java index 42dfc8d30ed0b..6dcb794b19265 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/BrokerEntryMetadataInterceptor.java @@ -26,4 +26,6 @@ */ public interface BrokerEntryMetadataInterceptor { PulsarApi.BrokerEntryMetadata.Builder intercept(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata); + PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize(PulsarApi.BrokerEntryMetadata.Builder brokerMetadata, + int batchSize); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 742468f4fa1c1..7dfb13104a767 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -1965,7 +1965,6 @@ public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload, public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload, Set brokerInterceptors, - AtomicLong offsetGenerator, int batchSize) { // | BROKER_ENTRY_METADATA_MAGIC_NUMBER | BROKER_ENTRY_METADATA_SIZE | BROKER_ENTRY_METADATA | // | 2 bytes | 4 bytes | BROKER_ENTRY_METADATA_SIZE bytes | @@ -1973,8 +1972,8 @@ public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload, PulsarApi.BrokerEntryMetadata.Builder brokerMetadataBuilder = PulsarApi.BrokerEntryMetadata.newBuilder(); for (BrokerEntryMetadataInterceptor interceptor : brokerInterceptors) { interceptor.intercept(brokerMetadataBuilder); + interceptor.interceptWithBatchSize(brokerMetadataBuilder, batchSize); } - brokerMetadataBuilder.setOffset(offsetGenerator.addAndGet(batchSize)); PulsarApi.BrokerEntryMetadata brokerEntryMetadata = brokerMetadataBuilder.build(); int brokerMetaSize = brokerEntryMetadata.getSerializedSize(); ByteBuf brokerMeta = From d8cbd22610081e0683896641c6627971589c4907 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Tue, 22 Dec 2020 19:27:54 +0800 Subject: [PATCH 07/12] interceptor pending write request --- .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index de4527ba6671b..ebcd1537922f9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -1404,10 +1404,12 @@ public synchronized void updateLedgersIdsComplete(Stat stat) { // If op is used by another ledger handle, we need to close it and create a new one if (existsOp.ledger != null) { existsOp.close(); - existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.callback, existsOp.ctx); + existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.getBatchSize(), existsOp.callback, existsOp.ctx); } existsOp.setLedger(currentLedger); - pendingAddEntries.add(existsOp); + if (beforeAddEntry(existsOp)) { + pendingAddEntries.add(existsOp); + } } } while (existsOp != null && --pendingSize > 0); From 5137d6f42409a7943d8986ee0fb3c817a6ae8f01 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Tue, 22 Dec 2020 19:36:19 +0800 Subject: [PATCH 08/12] add offet to ManagedLedgerInfo --- .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 3 +++ .../mledger/interceptor/ManagedLedgerInterceptor.java | 1 + .../broker/intercept/ManagedLedgerInterceptorImpl.java | 9 +++++++++ .../intercept/AppendOffsetMetadataInterceptor.java | 3 +++ 4 files changed, 16 insertions(+) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index ebcd1537922f9..d1acf3f945b78 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3175,6 +3175,9 @@ private ManagedLedgerInfo getManagedLedgerInfo() { mlInfo.setTerminatedPosition(NestedPositionInfo.newBuilder().setLedgerId(lastConfirmedEntry.getLedgerId()) .setEntryId(lastConfirmedEntry.getEntryId())); } + if (managedLedgerInterceptor != null) { + managedLedgerInterceptor.onUpdateManagedLedgerInfo(propertiesMap); + } for (Map.Entry property : propertiesMap.entrySet()) { mlInfo.addProperties(MLDataFormats.KeyValue.newBuilder() .setKey(property.getKey()).setValue(property.getValue())); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java index e36519af4bf35..69706a07a1b2a 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java @@ -30,4 +30,5 @@ public interface ManagedLedgerInterceptor { OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize); void onManagedLedgerPropertiesInitialize(Map propertiesMap); void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle ledgerHandle); + void onUpdateManagedLedgerInfo(Map propertiesMap); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java index 83700df2fa1f7..fd83162f9567a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java @@ -93,4 +93,13 @@ public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) { log.error("[{}] Read last entry error.", name, e); } } + + @Override + public void onUpdateManagedLedgerInfo(Map propertiesMap) { + for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { + if (interceptor instanceof AppendOffsetMetadataInterceptor) { + propertiesMap.put(OFFSET, String.valueOf(((AppendOffsetMetadataInterceptor)interceptor).getOffset())); + } + } + } } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendOffsetMetadataInterceptor.java b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendOffsetMetadataInterceptor.java index 3555133b35325..8db5a7eb5a5e6 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendOffsetMetadataInterceptor.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendOffsetMetadataInterceptor.java @@ -48,4 +48,7 @@ public PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize( return brokerMetadata.setOffset(offsetGenerator.addAndGet(batchSize)); } + public long getOffset() { + return offsetGenerator.get(); + } } From 4205436ad809dcee2e19539744e2019f63ef43b1 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Tue, 22 Dec 2020 19:59:12 +0800 Subject: [PATCH 09/12] add test for offset --- .../common/protocol/CommandUtilsTests.java | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java index df5bf76d7525b..9fbce8123b5ab 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java @@ -145,14 +145,19 @@ public void testByteBufComposite() throws Exception { @Test public void testAddBrokerEntryMetadata() throws Exception { + int MOCK_BATCH_SIZE = 10; String data = "test-message"; ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length()); byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8)); PulsarApi.BrokerEntryMetadata brokerMetadata = - PulsarApi.BrokerEntryMetadata.newBuilder().setBrokerTimestamp(System.currentTimeMillis()).build(); + PulsarApi.BrokerEntryMetadata + .newBuilder() + .setBrokerTimestamp(System.currentTimeMillis()) + .setOffset(MOCK_BATCH_SIZE - 1) + .build(); ByteBuf dataWithBrokerEntryMetadata = - Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors()); + Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors(), MOCK_BATCH_SIZE); assertEquals(brokerMetadata.getSerializedSize() + data.length() + 6, dataWithBrokerEntryMetadata.readableBytes()); @@ -167,7 +172,7 @@ public void testSkipBrokerEntryMetadata() throws Exception { ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length()); byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8)); ByteBuf dataWithBrokerEntryMetadata = - Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors()); + Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors(), 11); Commands.skipBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata); assertEquals(data.length(), dataWithBrokerEntryMetadata.readableBytes()); @@ -179,15 +184,17 @@ public void testSkipBrokerEntryMetadata() throws Exception { @Test public void testParseBrokerEntryMetadata() throws Exception { + int MOCK_BATCH_SIZE = 10; String data = "test-message"; ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length()); byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8)); ByteBuf dataWithBrokerEntryMetadata = - Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors()); + Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors(), MOCK_BATCH_SIZE); PulsarApi.BrokerEntryMetadata brokerMetadata = Commands.parseBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata); assertTrue(brokerMetadata.getBrokerTimestamp() <= System.currentTimeMillis()); + assertEquals(brokerMetadata.getOffset(), MOCK_BATCH_SIZE - 1); assertEquals(data.length(), dataWithBrokerEntryMetadata.readableBytes()); byte [] content = new byte[dataWithBrokerEntryMetadata.readableBytes()]; @@ -198,6 +205,7 @@ public void testParseBrokerEntryMetadata() throws Exception { public Set getBrokerEntryMetadataInterceptors() { Set interceptorNames = new HashSet<>(); interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"); + interceptorNames.add("org.apache.pulsar.common.intercept.AppendOffsetMetadataInterceptor"); return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames, Thread.currentThread().getContextClassLoader()); } From a52038b0eaf234175b9803e4f7c63e51eaecc2c4 Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Tue, 22 Dec 2020 21:06:04 +0800 Subject: [PATCH 10/12] add test for ManagedLedgerInterceptor --- .../bookkeeper/mledger/ManagedLedger.java | 54 +++++ .../mledger/impl/ManagedLedgerImpl.java | 52 ++++ .../ManagedLedgerInterceptorImpl.java | 28 ++- .../MangedLedgerInterceptorImplTest.java | 228 ++++++++++++++++++ .../service/PersistentMessageFinderTest.java | 9 +- 5 files changed, 357 insertions(+), 14 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 6e65a6e73fb07..0c1a71c2f4ee1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -35,6 +35,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; /** @@ -77,6 +78,18 @@ public interface ManagedLedger { */ Position addEntry(byte[] data) throws InterruptedException, ManagedLedgerException; + /** + * Append a new entry to the end of a managed ledger. + * + * @param data + * data entry to be persisted + * @param batchSize + * batchSize of entry + * @return the Position at which the entry has been inserted + * @throws ManagedLedgerException + */ + Position addEntry(byte[] data, int batchSize) throws InterruptedException, ManagedLedgerException; + /** * Append a new entry asynchronously. * @@ -105,6 +118,22 @@ public interface ManagedLedger { */ Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException; + /** + * Append a new entry to the end of a managed ledger. + * + * @param data + * data entry to be persisted + * @param batchSize + * batchSize of entry + * @param offset + * offset in the data array + * @param length + * number of bytes + * @return the Position at which the entry has been inserted + * @throws ManagedLedgerException + */ + Position addEntry(byte[] data, int batchSize, int offset, int length) throws InterruptedException, ManagedLedgerException; + /** * Append a new entry asynchronously. * @@ -122,6 +151,26 @@ public interface ManagedLedger { */ void asyncAddEntry(byte[] data, int offset, int length, AddEntryCallback callback, Object ctx); + /** + * Append a new entry asynchronously. + * + * @see #addEntry(byte[]) + * @param data + * data entry to be persisted + * @param batchSize + * batchSize of entry + * @param offset + * offset in the data array + * @param length + * number of bytes + * @param callback + * callback object + * @param ctx + * opaque context + */ + void asyncAddEntry(byte[] data, int batchSize, int offset, int length, AddEntryCallback callback, Object ctx); + + /** * Append a new entry asynchronously. * @@ -543,4 +592,9 @@ void asyncSetProperties(Map properties, final AsyncCallbacks.Upd * Find position by sequenceId. * */ CompletableFuture asyncFindPosition(com.google.common.base.Predicate predicate); + + /** + * Get the ManagedLedgerInterceptor for ManagedLedger. + * */ + ManagedLedgerInterceptor getManagedLedgerInterceptor(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index d1acf3f945b78..9d7e4144a1555 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -566,6 +566,11 @@ public Position addEntry(byte[] data) throws InterruptedException, ManagedLedger return addEntry(data, 0, data.length); } + @Override + public Position addEntry(byte[] data, int batchSize) throws InterruptedException, ManagedLedgerException { + return addEntry(data, batchSize,0, data.length); + } + @Override public Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException { final CountDownLatch counter = new CountDownLatch(1); @@ -601,6 +606,41 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { return result.position; } + @Override + public Position addEntry(byte[] data, int batchSize, int offset, int length) throws InterruptedException, ManagedLedgerException { + final CountDownLatch counter = new CountDownLatch(1); + // Result list will contain the status exception and the resulting + // position + class Result { + ManagedLedgerException status = null; + Position position = null; + } + final Result result = new Result(); + + asyncAddEntry(data, batchSize, offset, length, new AddEntryCallback() { + @Override + public void addComplete(Position position, Object ctx) { + result.position = position; + counter.countDown(); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + result.status = exception; + counter.countDown(); + } + }, null); + + counter.await(); + + if (result.status != null) { + log.error("[{}] Error adding entry", name, result.status); + throw result.status; + } + + return result.position; + } + @Override public void asyncAddEntry(final byte[] data, final AddEntryCallback callback, final Object ctx) { asyncAddEntry(data, 0, data.length, callback, ctx); @@ -613,6 +653,13 @@ public void asyncAddEntry(final byte[] data, int offset, int length, final AddEn asyncAddEntry(buffer, callback, ctx); } + @Override + public void asyncAddEntry(final byte[] data, int batchSize, int offset, int length, final AddEntryCallback callback, + final Object ctx) { + ByteBuf buffer = Unpooled.wrappedBuffer(data, offset, length); + asyncAddEntry(buffer, batchSize, callback, ctx); + } + @Override public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) { if (log.isDebugEnabled()) { @@ -1560,6 +1607,11 @@ public void findEntryFailed(ManagedLedgerException exception, Optional return future; } + @Override + public ManagedLedgerInterceptor getManagedLedgerInterceptor() { + return managedLedgerInterceptor; + } + void clearPendingAddEntries(ManagedLedgerException e) { while (!pendingAddEntries.isEmpty()) { OpAddEntry op = pendingAddEntries.poll(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java index fd83162f9567a..4b90ce310832e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.intercept; +import java.util.Map; +import java.util.Set; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.LedgerEntries; import org.apache.bookkeeper.client.api.LedgerEntry; @@ -30,10 +32,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Map; -import java.util.Set; - - public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor { private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImpl.class); private static final String OFFSET = "offset"; @@ -46,20 +44,30 @@ public ManagedLedgerInterceptorImpl(Set brokerEn this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors; } + public long getOffset() { + long offset = -1; + for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { + if (interceptor instanceof AppendOffsetMetadataInterceptor) { + offset = ((AppendOffsetMetadataInterceptor) interceptor).getOffset(); + } + } + return offset; + } @Override public OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize) { - assert op != null; - assert batchSize > 0; - + if (op == null || batchSize <= 0) { + return op; + } op.setData(Commands.addBrokerEntryMetadata(op.getData(), brokerEntryMetadataInterceptors, batchSize)); return op; } @Override public void onManagedLedgerPropertiesInitialize(Map propertiesMap) { - assert propertiesMap != null; - assert propertiesMap.size() > 0; + if (propertiesMap == null || propertiesMap.size() == 0) { + return; + } if (propertiesMap.containsKey(OFFSET)) { for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { @@ -98,7 +106,7 @@ public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) { public void onUpdateManagedLedgerInfo(Map propertiesMap) { for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { if (interceptor instanceof AppendOffsetMetadataInterceptor) { - propertiesMap.put(OFFSET, String.valueOf(((AppendOffsetMetadataInterceptor)interceptor).getOffset())); + propertiesMap.put(OFFSET, String.valueOf(((AppendOffsetMetadataInterceptor) interceptor).getOffset())); } } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java new file mode 100644 index 0000000000000..9b5c2071b1321 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.intercept; + +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; +import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; +import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils; +import org.apache.pulsar.common.protocol.Commands; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import static com.sun.org.apache.xml.internal.serialize.OutputFormat.Defaults.Encoding; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotEquals; +import static org.testng.Assert.assertNotNull; + +public class MangedLedgerInterceptorImplTest extends MockedBookKeeperTestCase { + private static final Logger log = LoggerFactory.getLogger(MangedLedgerInterceptorImplTest.class); + + + @Test + public void testAddBrokerEntryMetadata() throws Exception { + final int MOCK_BATCH_SIZE = 2; + int numberOfEntries = 10; + final String ledgerAndCursorName = "topicEntryMetadataSequenceId"; + + ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors()); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(2); + config.setManagedLedgerInterceptor(interceptor); + + ManagedLedger ledger = factory.open(ledgerAndCursorName, config); + ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + + for ( int i = 0 ; i < numberOfEntries; i ++) { + ledger.addEntry(("message" + i).getBytes(), MOCK_BATCH_SIZE); + } + + + assertEquals(19, ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset()); + List entryList = cursor.readEntries(numberOfEntries); + for (int i = 0 ; i < numberOfEntries; i ++) { + PulsarApi.BrokerEntryMetadata metadata = + Commands.parseBrokerEntryMetadataIfExist(entryList.get(i).getDataBuffer()); + assertNotNull(metadata); + assertEquals(metadata.getOffset(), (i + 1) * MOCK_BATCH_SIZE - 1); + } + + cursor.close();; + ledger.close(); + factory.shutdown(); + } + + + @Test(timeOut = 20000) + public void testRecoveryOffset() throws Exception { + final int MOCK_BATCH_SIZE = 2; + ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors()); + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setManagedLedgerInterceptor(interceptor); + ManagedLedger ledger = factory.open("my_recovery_offset_test_ledger", config); + + ledger.addEntry("dummy-entry-1".getBytes(Encoding), MOCK_BATCH_SIZE); + + ManagedCursor cursor = ledger.openCursor("c1"); + + ledger.addEntry("dummy-entry-2".getBytes(Encoding), MOCK_BATCH_SIZE); + + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(), MOCK_BATCH_SIZE * 2 - 1); + + ledger.close(); + + log.info("Closing ledger and reopening"); + + // / Reopen the same managed-ledger + ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); + ledger = factory2.open("my_recovery_offset_test_ledger", config); + + cursor = ledger.openCursor("c1"); + + assertEquals(ledger.getNumberOfEntries(), 2); + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(), MOCK_BATCH_SIZE * 2 - 1); + + + List entries = cursor.readEntries(100); + assertEquals(entries.size(), 1); + entries.forEach(e -> e.release()); + + cursor.close(); + ledger.close(); + factory2.shutdown(); + } + + @Test + public void testFindPositionByOffset() throws Exception { + final int MOCK_BATCH_SIZE = 2; + final int maxEntriesPerLedger = 5; + int maxSequenceIdPerLedger = MOCK_BATCH_SIZE * maxEntriesPerLedger; + ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors()); + + + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setManagedLedgerInterceptor(interceptor); + managedLedgerConfig.setMaxEntriesPerLedger(5); + + ManagedLedger ledger = factory.open("my_ml_broker_entry_metadata_test_ledger", managedLedgerConfig); + ManagedCursor cursor = ledger.openCursor("c1"); + + long firstLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + firstLedgerId = ((PositionImpl) ledger.addEntry("dummy-entry".getBytes(Encoding), MOCK_BATCH_SIZE)).getLedgerId(); + } + + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(), 9); + + + PositionImpl position = null; + for (int offset = 0 ; offset <= ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(); offset ++) { + position = ledger.asyncFindPosition(new OffsetSearchPredicate(offset)).get(); + assertEquals(position.getEntryId(), (offset % maxSequenceIdPerLedger) / MOCK_BATCH_SIZE); + } + + // roll over ledger + long secondLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + secondLedgerId = ((PositionImpl) ledger.addEntry("dummy-entry".getBytes(Encoding), MOCK_BATCH_SIZE)).getLedgerId(); + } + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(), 19); + assertNotEquals(firstLedgerId, secondLedgerId); + + for (int offset = 0 ; offset <= ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(); offset ++) { + position = ledger.asyncFindPosition(new OffsetSearchPredicate(offset)).get(); + assertEquals(position.getEntryId(), (offset % maxSequenceIdPerLedger) / MOCK_BATCH_SIZE); + } + + // reopen ledger + ledger.close(); + // / Reopen the same managed-ledger + ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); + ledger = factory2.open("my_ml_broker_entry_metadata_test_ledger", managedLedgerConfig); + + long thirdLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + thirdLedgerId = ((PositionImpl) ledger.addEntry("dummy-entry".getBytes(Encoding), MOCK_BATCH_SIZE)).getLedgerId(); + } + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(), 29); + assertNotEquals(secondLedgerId, thirdLedgerId); + + for (int offset = 0 ; offset <= ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(); offset ++) { + position = ledger.asyncFindPosition(new OffsetSearchPredicate(offset)).get(); + assertEquals(position.getEntryId(), (offset % maxSequenceIdPerLedger) / MOCK_BATCH_SIZE); + } + cursor.close(); + ledger.close(); + factory2.shutdown(); + } + + public static Set getBrokerEntryMetadataInterceptors() { + Set interceptorNames = new HashSet<>(); + interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"); + interceptorNames.add("org.apache.pulsar.common.intercept.AppendOffsetMetadataInterceptor"); + return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames, + Thread.currentThread().getContextClassLoader()); + } + + class OffsetSearchPredicate implements com.google.common.base.Predicate { + + long offsetToSearch = -1; + public OffsetSearchPredicate(long offsetToSearch) { + this.offsetToSearch = offsetToSearch; + } + + @Override + public String toString() { + final StringBuffer sb = new StringBuffer("OffsetSearchPredicate{"); + sb.append("offsetToSearch=").append(offsetToSearch); + sb.append('}'); + return sb.toString(); + } + + @Override + public boolean apply(@Nullable Entry entry) { + try { + PulsarApi.BrokerEntryMetadata brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer()); + return brokerEntryMetadata.getOffset() < offsetToSearch; + } catch (Exception e) { + log.error("Error deserialize message for message position find", e); + } finally { + entry.release(); + } + return false; + } + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 174080212cfc9..7071d5bfbbddb 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -108,7 +108,7 @@ public static ByteBuf createMessageByteBufWrittenToLedger(String msg) throws Exc public static byte[] appendBrokerTimestamp(ByteBuf headerAndPayloads) throws Exception { ByteBuf msgWithEntryMeta = - Commands.addBrokerEntryMetadata(headerAndPayloads, getBrokerEntryMetadataInterceptors()); + Commands.addBrokerEntryMetadata(headerAndPayloads, getBrokerEntryMetadataInterceptors(), 1); byte[] byteMessage = msgWithEntryMeta.nioBuffer().array(); msgWithEntryMeta.release(); return byteMessage; @@ -321,9 +321,10 @@ void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception } public static Set getBrokerEntryMetadataInterceptors() { - - return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors( - Sets.newHashSet("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"), + Set interceptorNames = new HashSet<>(); + interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"); + interceptorNames.add("org.apache.pulsar.common.intercept.AppendOffsetMetadataInterceptor"); + return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames, Thread.currentThread().getContextClassLoader()); } /** From 3b98e9f5cacd873802ac1df30e0ae2398fd0e8ee Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Thu, 24 Dec 2020 10:09:47 +0800 Subject: [PATCH 11/12] apply comment --- .../bookkeeper/mledger/ManagedLedger.java | 29 ++++----- .../mledger/impl/ManagedLedgerImpl.java | 27 ++++----- .../bookkeeper/mledger/impl/OpAddEntry.java | 16 ++--- .../interceptor/ManagedLedgerInterceptor.java | 26 ++++++++ .../ManagedLedgerInterceptorImpl.java | 34 +++++------ .../pulsar/broker/service/BrokerService.java | 6 +- .../pulsar/broker/service/Producer.java | 5 ++ .../apache/pulsar/broker/service/Topic.java | 4 ++ .../service/persistent/PersistentTopic.java | 3 +- .../MangedLedgerInterceptorImplTest.java | 60 ++++++++----------- .../service/PersistentMessageFinderTest.java | 2 +- .../pulsar/common/api/proto/PulsarApi.java | 52 ++++++++-------- ...va => AppendIndexMetadataInterceptor.java} | 20 +++---- pulsar-common/src/main/proto/PulsarApi.proto | 2 +- .../common/protocol/CommandUtilsTests.java | 6 +- 15 files changed, 158 insertions(+), 134 deletions(-) rename pulsar-common/src/main/java/org/apache/pulsar/common/intercept/{AppendOffsetMetadataInterceptor.java => AppendIndexMetadataInterceptor.java} (72%) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 0c1a71c2f4ee1..6274f977051ee 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -22,8 +22,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.function.BiFunction; -import java.util.function.Predicate; import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.annotation.InterfaceStability; @@ -34,7 +32,6 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback; -import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; @@ -83,12 +80,12 @@ public interface ManagedLedger { * * @param data * data entry to be persisted - * @param batchSize - * batchSize of entry + * @param numberOfMessages + * numberOfMessages of entry * @return the Position at which the entry has been inserted * @throws ManagedLedgerException */ - Position addEntry(byte[] data, int batchSize) throws InterruptedException, ManagedLedgerException; + Position addEntry(byte[] data, int numberOfMessages) throws InterruptedException, ManagedLedgerException; /** * Append a new entry asynchronously. @@ -123,8 +120,8 @@ public interface ManagedLedger { * * @param data * data entry to be persisted - * @param batchSize - * batchSize of entry + * @param numberOfMessages + * numberOfMessages of entry * @param offset * offset in the data array * @param length @@ -132,7 +129,7 @@ public interface ManagedLedger { * @return the Position at which the entry has been inserted * @throws ManagedLedgerException */ - Position addEntry(byte[] data, int batchSize, int offset, int length) throws InterruptedException, ManagedLedgerException; + Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException, ManagedLedgerException; /** * Append a new entry asynchronously. @@ -157,8 +154,8 @@ public interface ManagedLedger { * @see #addEntry(byte[]) * @param data * data entry to be persisted - * @param batchSize - * batchSize of entry + * @param numberOfMessages + * numberOfMessages of entry * @param offset * offset in the data array * @param length @@ -168,7 +165,7 @@ public interface ManagedLedger { * @param ctx * opaque context */ - void asyncAddEntry(byte[] data, int batchSize, int offset, int length, AddEntryCallback callback, Object ctx); + void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, AddEntryCallback callback, Object ctx); /** @@ -190,14 +187,14 @@ public interface ManagedLedger { * @see #addEntry(byte[]) * @param buffer * buffer with the data entry - * @param batchSize - * batch size for data entry + * @param numberOfMessages + * numberOfMessages for data entry * @param callback * callback object * @param ctx * opaque context */ - void asyncAddEntry(ByteBuf buffer, int batchSize, AddEntryCallback callback, Object ctx); + void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx); /** * Open a ManagedCursor in this ManagedLedger. @@ -591,7 +588,7 @@ void asyncSetProperties(Map properties, final AsyncCallbacks.Upd /** * Find position by sequenceId. * */ - CompletableFuture asyncFindPosition(com.google.common.base.Predicate predicate); + CompletableFuture asyncFindPosition(com.google.common.base.Predicate predicate); /** * Get the ManagedLedgerInterceptor for ManagedLedger. diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 9d7e4144a1555..562a3318f90b3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -567,8 +567,8 @@ public Position addEntry(byte[] data) throws InterruptedException, ManagedLedger } @Override - public Position addEntry(byte[] data, int batchSize) throws InterruptedException, ManagedLedgerException { - return addEntry(data, batchSize,0, data.length); + public Position addEntry(byte[] data, int numberOfMessages) throws InterruptedException, ManagedLedgerException { + return addEntry(data, numberOfMessages, 0, data.length); } @Override @@ -607,7 +607,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { } @Override - public Position addEntry(byte[] data, int batchSize, int offset, int length) throws InterruptedException, ManagedLedgerException { + public Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException, ManagedLedgerException { final CountDownLatch counter = new CountDownLatch(1); // Result list will contain the status exception and the resulting // position @@ -617,7 +617,7 @@ class Result { } final Result result = new Result(); - asyncAddEntry(data, batchSize, offset, length, new AddEntryCallback() { + asyncAddEntry(data, numberOfMessages, offset, length, new AddEntryCallback() { @Override public void addComplete(Position position, Object ctx) { result.position = position; @@ -654,10 +654,10 @@ public void asyncAddEntry(final byte[] data, int offset, int length, final AddEn } @Override - public void asyncAddEntry(final byte[] data, int batchSize, int offset, int length, final AddEntryCallback callback, + public void asyncAddEntry(final byte[] data, int numberOfMessages, int offset, int length, final AddEntryCallback callback, final Object ctx) { ByteBuf buffer = Unpooled.wrappedBuffer(data, offset, length); - asyncAddEntry(buffer, batchSize, callback, ctx); + asyncAddEntry(buffer, numberOfMessages, callback, ctx); } @Override @@ -673,12 +673,12 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) } @Override - public void asyncAddEntry(ByteBuf buffer, int batchSize, AddEntryCallback callback, Object ctx) { + public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state); } - OpAddEntry addOperation = OpAddEntry.create(this, buffer, batchSize, callback, ctx); + OpAddEntry addOperation = OpAddEntry.create(this, buffer, numberOfMessages, callback, ctx); // Jump to specific thread to avoid contention from writers writing from different threads executor.executeOrdered(name, safeRun(() -> internalAsyncAddEntry(addOperation))); @@ -760,13 +760,13 @@ private boolean beforeAddEntry(OpAddEntry addOperation) { return true; } try { - managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getBatchSize()); + managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages()); return true; } catch (Exception e) { addOperation.failed( new ManagedLedgerInterceptException("Interceptor managed ledger before add to bookie failed.")); ReferenceCountUtil.release(addOperation.data); - log.error("[{}] Failed to interceptor entry before add to bookie.", name, e); + log.error("[{}] Failed to intercept adding an entry to bookie.", name, e); return false; } } @@ -1451,7 +1451,7 @@ public synchronized void updateLedgersIdsComplete(Stat stat) { // If op is used by another ledger handle, we need to close it and create a new one if (existsOp.ledger != null) { existsOp.close(); - existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.getBatchSize(), existsOp.callback, existsOp.ctx); + existsOp = OpAddEntry.create(existsOp.ml, existsOp.data, existsOp.getNumberOfMessages(), existsOp.callback, existsOp.ctx); } existsOp.setLedger(currentLedger); if (beforeAddEntry(existsOp)) { @@ -1567,10 +1567,9 @@ public void closeComplete(int rc, LedgerHandle lh, Object o) { } @Override - public CompletableFuture asyncFindPosition(com.google.common.base.Predicate predicate) { + public CompletableFuture asyncFindPosition(com.google.common.base.Predicate predicate) { - - CompletableFuture future = new CompletableFuture(); + CompletableFuture future = new CompletableFuture(); Long firstLedgerId = ledgers.firstKey(); final PositionImpl startPosition = firstLedgerId == null ? null : new PositionImpl(firstLedgerId, 0); if (startPosition == null) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java index 903bf28f274dd..fa5228c5f73f3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java @@ -48,7 +48,7 @@ public class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallba protected ManagedLedgerImpl ml; LedgerHandle ledger; private long entryId; - private int batchSize; + private int numberOfMessages; @SuppressWarnings("unused") private static final AtomicReferenceFieldUpdater callbackUpdater = @@ -96,11 +96,11 @@ public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, AddEntryCall return op; } - public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int batchSize, AddEntryCallback callback, Object ctx) { + public static OpAddEntry create(ManagedLedgerImpl ml, ByteBuf data, int numberOfMessages, AddEntryCallback callback, Object ctx) { OpAddEntry op = RECYCLER.get(); op.ml = ml; op.ledger = null; - op.batchSize = batchSize; + op.numberOfMessages = numberOfMessages; op.data = data.retain(); op.dataLength = data.readableBytes(); op.callback = callback; @@ -299,12 +299,12 @@ public ByteBuf getData() { return data; } - public int getBatchSize() { - return batchSize; + public int getNumberOfMessages() { + return numberOfMessages; } - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; + public void setNumberOfMessages(int numberOfMessages) { + this.numberOfMessages = numberOfMessages; } public void setData(ByteBuf data) { @@ -328,7 +328,7 @@ public void recycle() { ml = null; ledger = null; data = null; - batchSize = 0; + numberOfMessages = 0; dataLength = -1; callback = null; ctx = null; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java index 69706a07a1b2a..f5d9f12d87291 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/interceptor/ManagedLedgerInterceptor.java @@ -19,6 +19,8 @@ package org.apache.bookkeeper.mledger.interceptor; import org.apache.bookkeeper.client.LedgerHandle; +import org.apache.bookkeeper.common.annotation.InterfaceAudience; +import org.apache.bookkeeper.common.annotation.InterfaceStability; import org.apache.bookkeeper.mledger.impl.OpAddEntry; import java.util.Map; @@ -26,9 +28,33 @@ /** * Interceptor for ManagedLedger. * */ +@InterfaceAudience.LimitedPrivate +@InterfaceStability.Stable public interface ManagedLedgerInterceptor { + + /** + * Intercept an OpAddEntry and return an OpAddEntry. + * @param op an OpAddEntry to be intercepted. + * @param batchSize + * @return an OpAddEntry. + */ OpAddEntry beforeAddEntry(OpAddEntry op, int batchSize); + + /** + * Intercept when ManagedLedger is initialized. + * @param propertiesMap map of properties. + */ void onManagedLedgerPropertiesInitialize(Map propertiesMap); + + /** + * Intercept when ManagedLedger is initialized. + * @param name name of ManagedLedger + * @param ledgerHandle a LedgerHandle. + */ void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle ledgerHandle); + + /** + * @param propertiesMap map of properties. + */ void onUpdateManagedLedgerInfo(Map propertiesMap); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java index 4b90ce310832e..0e59d2f3fd7c0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java @@ -26,7 +26,7 @@ import org.apache.bookkeeper.mledger.impl.OpAddEntry; import org.apache.bookkeeper.mledger.interceptor.ManagedLedgerInterceptor; import org.apache.pulsar.common.api.proto.PulsarApi; -import org.apache.pulsar.common.intercept.AppendOffsetMetadataInterceptor; +import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor; import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; import org.apache.pulsar.common.protocol.Commands; import org.slf4j.Logger; @@ -34,7 +34,7 @@ public class ManagedLedgerInterceptorImpl implements ManagedLedgerInterceptor { private static final Logger log = LoggerFactory.getLogger(ManagedLedgerInterceptorImpl.class); - private static final String OFFSET = "offset"; + private static final String INDEX = "index"; private final Set brokerEntryMetadataInterceptors; @@ -44,14 +44,14 @@ public ManagedLedgerInterceptorImpl(Set brokerEn this.brokerEntryMetadataInterceptors = brokerEntryMetadataInterceptors; } - public long getOffset() { - long offset = -1; + public long getIndex() { + long index = -1; for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { - if (interceptor instanceof AppendOffsetMetadataInterceptor) { - offset = ((AppendOffsetMetadataInterceptor) interceptor).getOffset(); + if (interceptor instanceof AppendIndexMetadataInterceptor) { + index = ((AppendIndexMetadataInterceptor) interceptor).getIndex(); } } - return offset; + return index; } @Override @@ -69,11 +69,11 @@ public void onManagedLedgerPropertiesInitialize(Map propertiesMa return; } - if (propertiesMap.containsKey(OFFSET)) { + if (propertiesMap.containsKey(INDEX)) { for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { - if (interceptor instanceof AppendOffsetMetadataInterceptor) { - ((AppendOffsetMetadataInterceptor) interceptor) - .recoveryOffsetGenerator(Long.parseLong(propertiesMap.get(OFFSET))); + if (interceptor instanceof AppendIndexMetadataInterceptor) { + ((AppendIndexMetadataInterceptor) interceptor) + .recoveryIndexGenerator(Long.parseLong(propertiesMap.get(INDEX))); } } } @@ -83,15 +83,15 @@ public void onManagedLedgerPropertiesInitialize(Map propertiesMa public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) { try { for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { - if (interceptor instanceof AppendOffsetMetadataInterceptor) { + if (interceptor instanceof AppendIndexMetadataInterceptor) { LedgerEntries ledgerEntries = lh.read(lh.getLastAddConfirmed() - 1, lh.getLastAddConfirmed()); for (LedgerEntry entry : ledgerEntries) { PulsarApi.BrokerEntryMetadata brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(entry.getEntryBuffer()); - if (brokerEntryMetadata != null && brokerEntryMetadata.hasOffset()) { - ((AppendOffsetMetadataInterceptor) interceptor) - .recoveryOffsetGenerator(brokerEntryMetadata.getOffset()); + if (brokerEntryMetadata != null && brokerEntryMetadata.hasIndex()) { + ((AppendIndexMetadataInterceptor) interceptor) + .recoveryIndexGenerator(brokerEntryMetadata.getIndex()); } } @@ -105,8 +105,8 @@ public void onManagedLedgerLastLedgerInitialize(String name, LedgerHandle lh) { @Override public void onUpdateManagedLedgerInfo(Map propertiesMap) { for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { - if (interceptor instanceof AppendOffsetMetadataInterceptor) { - propertiesMap.put(OFFSET, String.valueOf(((AppendOffsetMetadataInterceptor) interceptor).getOffset())); + if (interceptor instanceof AppendIndexMetadataInterceptor) { + propertiesMap.put(INDEX, String.valueOf(((AppendIndexMetadataInterceptor) interceptor).getIndex())); } } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 574d70fdb8a78..792c67652dc80 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -128,7 +128,7 @@ import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.configuration.FieldContext; -import org.apache.pulsar.common.intercept.AppendOffsetMetadataInterceptor; +import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor; import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -1098,10 +1098,10 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, if (isBrokerEntryMetadataEnabled()) { // init managedLedger interceptor for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { - if (interceptor instanceof AppendOffsetMetadataInterceptor) { + if (interceptor instanceof AppendIndexMetadataInterceptor) { // add individual AppendOffsetMetadataInterceptor for each topic brokerEntryMetadataInterceptors.remove(interceptor); - brokerEntryMetadataInterceptors.add(new AppendOffsetMetadataInterceptor()); + brokerEntryMetadataInterceptors.add(new AppendIndexMetadataInterceptor()); } } ManagedLedgerInterceptor mlInterceptor = diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java index c69f9bade52a5..2bca2320a552d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java @@ -463,6 +463,11 @@ static MessagePublishContext get(Producer producer, long lowestSequenceId, long return callback; } + @Override + public long getNumberOfMessages() { + return batchSize; + } + private final Handle recyclerHandle; private MessagePublishContext(Handle recyclerHandle) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index af88ce8407e07..c940195d9b8dd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -90,6 +90,10 @@ default void setOriginalHighestSequenceId(long originalHighestSequenceId) { default long getOriginalHighestSequenceId() { return -1L; } + + default long getNumberOfMessages() { + return 1L; + } } void publishMessage(ByteBuf headersAndPayload, PublishContext callback); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 3dd2d64e45840..d086cd880d717 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -357,7 +357,8 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont messageDeduplication.isDuplicate(publishContext, headersAndPayload); switch (status) { case NotDup: - ledger.asyncAddEntry(headersAndPayload, this, publishContext); + ledger.asyncAddEntry(headersAndPayload, + (int) publishContext.getNumberOfMessages(), this, publishContext); break; case Dup: // Immediately acknowledge duplicated message diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java index 9b5c2071b1321..2c86b9814a860 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java @@ -69,13 +69,13 @@ public void testAddBrokerEntryMetadata() throws Exception { } - assertEquals(19, ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset()); + assertEquals(19, ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex()); List entryList = cursor.readEntries(numberOfEntries); for (int i = 0 ; i < numberOfEntries; i ++) { PulsarApi.BrokerEntryMetadata metadata = Commands.parseBrokerEntryMetadataIfExist(entryList.get(i).getDataBuffer()); assertNotNull(metadata); - assertEquals(metadata.getOffset(), (i + 1) * MOCK_BATCH_SIZE - 1); + assertEquals(metadata.getIndex(), (i + 1) * MOCK_BATCH_SIZE - 1); } cursor.close();; @@ -85,13 +85,13 @@ public void testAddBrokerEntryMetadata() throws Exception { @Test(timeOut = 20000) - public void testRecoveryOffset() throws Exception { + public void testRecoveryIndex() throws Exception { final int MOCK_BATCH_SIZE = 2; ManagedLedgerInterceptor interceptor = new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors()); ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setManagedLedgerInterceptor(interceptor); - ManagedLedger ledger = factory.open("my_recovery_offset_test_ledger", config); + ManagedLedger ledger = factory.open("my_recovery_index_test_ledger", config); ledger.addEntry("dummy-entry-1".getBytes(Encoding), MOCK_BATCH_SIZE); @@ -99,7 +99,7 @@ public void testRecoveryOffset() throws Exception { ledger.addEntry("dummy-entry-2".getBytes(Encoding), MOCK_BATCH_SIZE); - assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(), MOCK_BATCH_SIZE * 2 - 1); + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(), MOCK_BATCH_SIZE * 2 - 1); ledger.close(); @@ -107,12 +107,12 @@ public void testRecoveryOffset() throws Exception { // / Reopen the same managed-ledger ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(bkc, bkc.getZkHandle()); - ledger = factory2.open("my_recovery_offset_test_ledger", config); + ledger = factory2.open("my_recovery_index_test_ledger", config); cursor = ledger.openCursor("c1"); assertEquals(ledger.getNumberOfEntries(), 2); - assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(), MOCK_BATCH_SIZE * 2 - 1); + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(), MOCK_BATCH_SIZE * 2 - 1); List entries = cursor.readEntries(100); @@ -125,7 +125,7 @@ public void testRecoveryOffset() throws Exception { } @Test - public void testFindPositionByOffset() throws Exception { + public void testFindPositionByIndex() throws Exception { final int MOCK_BATCH_SIZE = 2; final int maxEntriesPerLedger = 5; int maxSequenceIdPerLedger = MOCK_BATCH_SIZE * maxEntriesPerLedger; @@ -144,13 +144,13 @@ public void testFindPositionByOffset() throws Exception { firstLedgerId = ((PositionImpl) ledger.addEntry("dummy-entry".getBytes(Encoding), MOCK_BATCH_SIZE)).getLedgerId(); } - assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(), 9); + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(), 9); PositionImpl position = null; - for (int offset = 0 ; offset <= ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(); offset ++) { - position = ledger.asyncFindPosition(new OffsetSearchPredicate(offset)).get(); - assertEquals(position.getEntryId(), (offset % maxSequenceIdPerLedger) / MOCK_BATCH_SIZE); + for (int index = 0; index <= ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(); index ++) { + position = (PositionImpl) ledger.asyncFindPosition(new IndexSearchPredicate(index)).get(); + assertEquals(position.getEntryId(), (index % maxSequenceIdPerLedger) / MOCK_BATCH_SIZE); } // roll over ledger @@ -158,12 +158,12 @@ public void testFindPositionByOffset() throws Exception { for (int i = 0; i < maxEntriesPerLedger; i++) { secondLedgerId = ((PositionImpl) ledger.addEntry("dummy-entry".getBytes(Encoding), MOCK_BATCH_SIZE)).getLedgerId(); } - assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(), 19); + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(), 19); assertNotEquals(firstLedgerId, secondLedgerId); - for (int offset = 0 ; offset <= ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(); offset ++) { - position = ledger.asyncFindPosition(new OffsetSearchPredicate(offset)).get(); - assertEquals(position.getEntryId(), (offset % maxSequenceIdPerLedger) / MOCK_BATCH_SIZE); + for (int index = 0; index <= ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(); index ++) { + position = (PositionImpl) ledger.asyncFindPosition(new IndexSearchPredicate(index)).get(); + assertEquals(position.getEntryId(), (index % maxSequenceIdPerLedger) / MOCK_BATCH_SIZE); } // reopen ledger @@ -176,12 +176,12 @@ public void testFindPositionByOffset() throws Exception { for (int i = 0; i < maxEntriesPerLedger; i++) { thirdLedgerId = ((PositionImpl) ledger.addEntry("dummy-entry".getBytes(Encoding), MOCK_BATCH_SIZE)).getLedgerId(); } - assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(), 29); + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(), 29); assertNotEquals(secondLedgerId, thirdLedgerId); - for (int offset = 0 ; offset <= ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getOffset(); offset ++) { - position = ledger.asyncFindPosition(new OffsetSearchPredicate(offset)).get(); - assertEquals(position.getEntryId(), (offset % maxSequenceIdPerLedger) / MOCK_BATCH_SIZE); + for (int index = 0; index <= ((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(); index ++) { + position = (PositionImpl) ledger.asyncFindPosition(new IndexSearchPredicate(index)).get(); + assertEquals(position.getEntryId(), (index % maxSequenceIdPerLedger) / MOCK_BATCH_SIZE); } cursor.close(); ledger.close(); @@ -191,31 +191,23 @@ public void testFindPositionByOffset() throws Exception { public static Set getBrokerEntryMetadataInterceptors() { Set interceptorNames = new HashSet<>(); interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"); - interceptorNames.add("org.apache.pulsar.common.intercept.AppendOffsetMetadataInterceptor"); + interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"); return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames, Thread.currentThread().getContextClassLoader()); } - class OffsetSearchPredicate implements com.google.common.base.Predicate { + class IndexSearchPredicate implements com.google.common.base.Predicate { - long offsetToSearch = -1; - public OffsetSearchPredicate(long offsetToSearch) { - this.offsetToSearch = offsetToSearch; - } - - @Override - public String toString() { - final StringBuffer sb = new StringBuffer("OffsetSearchPredicate{"); - sb.append("offsetToSearch=").append(offsetToSearch); - sb.append('}'); - return sb.toString(); + long indexToSearch = -1; + public IndexSearchPredicate(long indexToSearch) { + this.indexToSearch = indexToSearch; } @Override public boolean apply(@Nullable Entry entry) { try { PulsarApi.BrokerEntryMetadata brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(entry.getDataBuffer()); - return brokerEntryMetadata.getOffset() < offsetToSearch; + return brokerEntryMetadata.getIndex() < indexToSearch; } catch (Exception e) { log.error("Error deserialize message for message position find", e); } finally { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index 7071d5bfbbddb..c0c78d2a92857 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -323,7 +323,7 @@ void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception public static Set getBrokerEntryMetadataInterceptors() { Set interceptorNames = new HashSet<>(); interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"); - interceptorNames.add("org.apache.pulsar.common.intercept.AppendOffsetMetadataInterceptor"); + interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"); return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames, Thread.currentThread().getContextClassLoader()); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index e2d5680d48333..f19ba6339f4b7 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -7075,9 +7075,9 @@ public interface BrokerEntryMetadataOrBuilder boolean hasBrokerTimestamp(); long getBrokerTimestamp(); - // optional uint64 offset = 2; - boolean hasOffset(); - long getOffset(); + // optional uint64 index = 2; + boolean hasIndex(); + long getIndex(); } public static final class BrokerEntryMetadata extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -7124,19 +7124,19 @@ public long getBrokerTimestamp() { return brokerTimestamp_; } - // optional uint64 offset = 2; - public static final int OFFSET_FIELD_NUMBER = 2; - private long offset_; - public boolean hasOffset() { + // optional uint64 index = 2; + public static final int INDEX_FIELD_NUMBER = 2; + private long index_; + public boolean hasIndex() { return ((bitField0_ & 0x00000002) == 0x00000002); } - public long getOffset() { - return offset_; + public long getIndex() { + return index_; } private void initFields() { brokerTimestamp_ = 0L; - offset_ = 0L; + index_ = 0L; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -7159,7 +7159,7 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr output.writeUInt64(1, brokerTimestamp_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { - output.writeUInt64(2, offset_); + output.writeUInt64(2, index_); } } @@ -7175,7 +7175,7 @@ public int getSerializedSize() { } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream - .computeUInt64Size(2, offset_); + .computeUInt64Size(2, index_); } memoizedSerializedSize = size; return size; @@ -7292,7 +7292,7 @@ public Builder clear() { super.clear(); brokerTimestamp_ = 0L; bitField0_ = (bitField0_ & ~0x00000001); - offset_ = 0L; + index_ = 0L; bitField0_ = (bitField0_ & ~0x00000002); return this; } @@ -7334,7 +7334,7 @@ public org.apache.pulsar.common.api.proto.PulsarApi.BrokerEntryMetadata buildPar if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } - result.offset_ = offset_; + result.index_ = index_; result.bitField0_ = to_bitField0_; return result; } @@ -7344,8 +7344,8 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.BrokerEntr if (other.hasBrokerTimestamp()) { setBrokerTimestamp(other.getBrokerTimestamp()); } - if (other.hasOffset()) { - setOffset(other.getOffset()); + if (other.hasIndex()) { + setIndex(other.getIndex()); } return this; } @@ -7383,7 +7383,7 @@ public Builder mergeFrom( } case 16: { bitField0_ |= 0x00000002; - offset_ = input.readUInt64(); + index_ = input.readUInt64(); break; } } @@ -7413,23 +7413,23 @@ public Builder clearBrokerTimestamp() { return this; } - // optional uint64 offset = 2; - private long offset_ ; - public boolean hasOffset() { + // optional uint64 index = 2; + private long index_ ; + public boolean hasIndex() { return ((bitField0_ & 0x00000002) == 0x00000002); } - public long getOffset() { - return offset_; + public long getIndex() { + return index_; } - public Builder setOffset(long value) { + public Builder setIndex(long value) { bitField0_ |= 0x00000002; - offset_ = value; + index_ = value; return this; } - public Builder clearOffset() { + public Builder clearIndex() { bitField0_ = (bitField0_ & ~0x00000002); - offset_ = 0L; + index_ = 0L; return this; } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendOffsetMetadataInterceptor.java b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java similarity index 72% rename from pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendOffsetMetadataInterceptor.java rename to pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java index 8db5a7eb5a5e6..dba3d9aec49e3 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendOffsetMetadataInterceptor.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java @@ -22,16 +22,16 @@ import java.util.concurrent.atomic.AtomicLong; -public class AppendOffsetMetadataInterceptor implements BrokerEntryMetadataInterceptor{ - private final AtomicLong offsetGenerator; +public class AppendIndexMetadataInterceptor implements BrokerEntryMetadataInterceptor{ + private final AtomicLong indexGenerator; - public AppendOffsetMetadataInterceptor() { - this.offsetGenerator = new AtomicLong(-1); + public AppendIndexMetadataInterceptor() { + this.indexGenerator = new AtomicLong(-1); } - public void recoveryOffsetGenerator(long offset) { - if (offsetGenerator.get() < offset) { - offsetGenerator.set(offset); + public void recoveryIndexGenerator(long index) { + if (indexGenerator.get() < index) { + indexGenerator.set(index); } } @@ -45,10 +45,10 @@ public PulsarApi.BrokerEntryMetadata.Builder intercept(PulsarApi.BrokerEntryMeta public PulsarApi.BrokerEntryMetadata.Builder interceptWithBatchSize( PulsarApi.BrokerEntryMetadata.Builder brokerMetadata, int batchSize) { - return brokerMetadata.setOffset(offsetGenerator.addAndGet(batchSize)); + return brokerMetadata.setIndex(indexGenerator.addAndGet(batchSize)); } - public long getOffset() { - return offsetGenerator.get(); + public long getIndex() { + return indexGenerator.get(); } } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index 7767e9b173097..ace18c8cba962 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -183,7 +183,7 @@ message SingleMessageMetadata { // metadata added for entry from broker message BrokerEntryMetadata { optional uint64 broker_timestamp = 1; - optional uint64 offset = 2; + optional uint64 index = 2; } enum ServerError { diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java index 9fbce8123b5ab..2bd4df09d0516 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java @@ -154,7 +154,7 @@ public void testAddBrokerEntryMetadata() throws Exception { PulsarApi.BrokerEntryMetadata .newBuilder() .setBrokerTimestamp(System.currentTimeMillis()) - .setOffset(MOCK_BATCH_SIZE - 1) + .setIndex(MOCK_BATCH_SIZE - 1) .build(); ByteBuf dataWithBrokerEntryMetadata = Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors(), MOCK_BATCH_SIZE); @@ -194,7 +194,7 @@ public void testParseBrokerEntryMetadata() throws Exception { Commands.parseBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata); assertTrue(brokerMetadata.getBrokerTimestamp() <= System.currentTimeMillis()); - assertEquals(brokerMetadata.getOffset(), MOCK_BATCH_SIZE - 1); + assertEquals(brokerMetadata.getIndex(), MOCK_BATCH_SIZE - 1); assertEquals(data.length(), dataWithBrokerEntryMetadata.readableBytes()); byte [] content = new byte[dataWithBrokerEntryMetadata.readableBytes()]; @@ -205,7 +205,7 @@ public void testParseBrokerEntryMetadata() throws Exception { public Set getBrokerEntryMetadataInterceptors() { Set interceptorNames = new HashSet<>(); interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"); - interceptorNames.add("org.apache.pulsar.common.intercept.AppendOffsetMetadataInterceptor"); + interceptorNames.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor"); return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(interceptorNames, Thread.currentThread().getContextClassLoader()); } From d4fd5c813bb0b621c70f0877ac6dcf932335ffdd Mon Sep 17 00:00:00 2001 From: aloyszhang Date: Thu, 24 Dec 2020 11:20:38 +0800 Subject: [PATCH 12/12] fix send error --- .../broker/service/persistent/PersistentTopic.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index d086cd880d717..49e9042d960ac 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -357,8 +357,7 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont messageDeduplication.isDuplicate(publishContext, headersAndPayload); switch (status) { case NotDup: - ledger.asyncAddEntry(headersAndPayload, - (int) publishContext.getNumberOfMessages(), this, publishContext); + asyncAddEntry(headersAndPayload, publishContext); break; case Dup: // Immediately acknowledge duplicated message @@ -372,6 +371,15 @@ public void publishMessage(ByteBuf headersAndPayload, PublishContext publishCont } } + private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) { + if (brokerService.isBrokerEntryMetadataEnabled()) { + ledger.asyncAddEntry(headersAndPayload, + (int) publishContext.getNumberOfMessages(), this, publishContext); + } else { + ledger.asyncAddEntry(headersAndPayload, this, publishContext); + } + } + public void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) { if (ledger instanceof ManagedLedgerImpl) { ((ManagedLedgerImpl) ledger).asyncReadEntry(position, callback, ctx);