From 250a278212b73ce2433bfb8407e6e4188c2685d9 Mon Sep 17 00:00:00 2001 From: seeday Date: Wed, 22 Jul 2020 08:50:35 -0500 Subject: [PATCH 1/2] Initial (squashed) implementation of message filtering --- .../pulsar/broker/service/Consumer.java | 61 +- .../pulsar/broker/service/ServerCnx.java | 5 +- .../apache/pulsar/broker/service/Topic.java | 6 +- .../service/filtering/BasicAvroFilter.java | 33 + .../service/filtering/BytesPrefixFilter.java | 27 + .../broker/service/filtering/Filter.java | 31 + .../broker/service/filtering/RegexFilter.java | 23 + .../nonpersistent/NonPersistentTopic.java | 8 +- .../service/persistent/PersistentTopic.java | 8 +- ...sistentDispatcherFailoverConsumerTest.java | 19 +- .../PersistentTopicConcurrentTest.java | 8 +- .../broker/service/PersistentTopicTest.java | 64 +- .../client/api/MessageFilteringTest.java | 138 ++++ .../api/SimpleTypedProducerConsumerTest.java | 62 +- .../pulsar/client/api/ConsumerBuilder.java | 7 + .../client/api/MessageFilterPolicy.java | 56 ++ .../client/impl/ConsumerBuilderImpl.java | 24 +- .../pulsar/client/impl/ConsumerImpl.java | 2 +- .../impl/conf/ConsumerConfigurationData.java | 15 +- .../pulsar/common/api/proto/PulsarApi.java | 623 ++++++++++++++++++ .../pulsar/common/protocol/Commands.java | 22 +- pulsar-common/src/main/proto/PulsarApi.proto | 12 +- 22 files changed, 1148 insertions(+), 106 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/BasicAvroFilter.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/BytesPrefixFilter.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/Filter.java create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/RegexFilter.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageFilteringTest.java create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageFilterPolicy.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 2131b18799e9a..9516d59a3b369 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -18,11 +18,8 @@ */ package org.apache.pulsar.broker.service; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.common.base.MoreObjects; import com.google.common.collect.Lists; - import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; @@ -30,11 +27,12 @@ import io.netty.channel.ChannelPromise; import java.util.ArrayList; -import java.util.BitSet; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.lang.reflect.InvocationTargetException; +import java.util.*; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; import java.util.stream.Collectors; @@ -46,12 +44,14 @@ import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; +import org.apache.pulsar.broker.service.filtering.Filter; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck; import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; -import org.apache.pulsar.common.api.proto.PulsarApi.IntRange; import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData; import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion; import org.apache.pulsar.common.naming.TopicName; @@ -63,6 +63,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.ExecutionException; + +import static com.google.common.base.Preconditions.checkArgument; + /** * A Consumer is a consumer currently connected and associated with a Subscription */ @@ -116,6 +120,8 @@ public class Consumer { private final PulsarApi.KeySharedMeta keySharedMeta; + private final Filter filter; + /** * It starts keep tracking the average messages per entry. * The initial value is 1000, when new value comes, it will update with @@ -132,7 +138,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo int priorityLevel, String consumerName, int maxUnackedMessages, ServerCnx cnx, String appId, Map metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition, - PulsarApi.KeySharedMeta keySharedMeta) throws BrokerServiceException { + PulsarApi.KeySharedMeta keySharedMeta, PulsarApi.FilterMeta filterMeta) throws BrokerServiceException { this.subscription = subscription; this.subType = subType; @@ -169,12 +175,37 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo stats.setClientVersion(cnx.getClientVersion()); stats.metadata = this.metadata; + // no multi-version support, per message schema, etc + // gotta check how pulsar client consumers themselves do it + if (Subscription.isIndividualAckMode(subType)) { this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1); } else { // We don't need to keep track of pending acks if the subscription is not shared this.pendingAcks = null; } + if (filterMeta != null) { + Filter tempFilter = null; + try { + Properties filterProperties = new Properties(); + filterMeta.getFilterPropertiesList().forEach(kv -> filterProperties.put(kv.getKey(), kv.getValue())); + + Class filterClazz = Class.forName(filterMeta.getFilterClassName()); + tempFilter = (Filter) filterClazz.getConstructor(Properties.class).newInstance(filterProperties); + if (tempFilter.isSchemaAware()) { + Schema genericRecordSchema = Schema.AUTO_CONSUME(); + genericRecordSchema.configureSchemaInfo(topicName, "", cnx.getBrokerService().getPulsar().getSchemaRegistryService() + .getSchema(TopicName.get(topicName).getSchemaName()).get().schema.toSchemaInfo()); + tempFilter.setSchema(genericRecordSchema); + } + } catch (InstantiationException | InvocationTargetException | NoSuchMethodException | IllegalAccessException | ClassNotFoundException | NullPointerException | InterruptedException | ExecutionException e) { + log.warn("Setting up filtered schema on " + topicName + " for consumer " + consumerName + " failed", e); + ctx().writeAndFlush(Commands.newError(0, PulsarApi.ServerError.UnknownError, e.getCause().getMessage())); + } + filter = tempFilter; + } else { + filter = null; + } } public SubType subType() { @@ -263,6 +294,7 @@ public ChannelPromise sendMessages(final List entries, EntryBatchSizes ba chuckedMessageRate.recordMultipleEvents(totalChunkedMessages, 0); ctx.channel().eventLoop().execute(() -> { + ArrayList filteredEntries = new ArrayList<>(); for (int i = 0; i < entries.size(); i++) { Entry entry = entries.get(i); if (entry == null) { @@ -291,6 +323,20 @@ public ChannelPromise sendMessages(final List entries, EntryBatchSizes ba ByteBuf metadataAndPayload = entry.getDataBuffer(); // increment ref-count of data and release at the end of process: so, we can get chance to call entry.release metadataAndPayload.retain(); + + // take a look at message and filter it out if necessary + metadataAndPayload.markReaderIndex(); + Commands.skipMessageMetadata(metadataAndPayload); + metadataAndPayload.skipBytes(metadataAndPayload.readInt()); + if (this.filter != null && !this.filter.matches(metadataAndPayload)) { + filteredEntries.add(entry.getPosition()); + messageId.recycle(); + messageIdBuilder.recycle(); + entry.release(); + continue; + } + metadataAndPayload.resetReaderIndex(); + // skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) { Commands.skipChecksumIfPresent(metadataAndPayload); @@ -313,6 +359,9 @@ public ChannelPromise sendMessages(final List entries, EntryBatchSizes ba entry.release(); } + if (filteredEntries.size() > 0) { + subscription.acknowledgeMessage(filteredEntries, AckType.Individual, null); + } // Use an empty write here so that we can just tie the flush with the write promise for last entry ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise); batchSizes.recyle(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 72f86f7a18daa..28269e48bca6f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -765,6 +765,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { final boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.getReplicateSubscriptionState(); final boolean forceTopicCreation = subscribe.getForceTopicCreation(); final PulsarApi.KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? subscribe.getKeySharedMeta() : null; + final PulsarApi.FilterMeta filterMeta = subscribe.hasFilterMeta() ? subscribe.getFilterMeta() : null; CompletableFuture isProxyAuthorizedFuture; if (service.isAuthorizationEnabled() && originalPrincipal != null) { @@ -861,12 +862,12 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { .thenCompose(v -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, - readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated, keySharedMeta)); + readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated, keySharedMeta, filterMeta)); } else { return topic.subscribe(ServerCnx.this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, initialPosition, - startMessageRollbackDurationSec, isReplicated, keySharedMeta); + startMessageRollbackDurationSec, isReplicated, keySharedMeta, filterMeta); } }) .thenAccept(consumer -> { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index d20e700d1b552..5e52add6cd827 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -103,9 +103,9 @@ default long getOriginalHighestSequenceId() { void recordAddLatency(long latency, TimeUnit unit); CompletableFuture subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType, - int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, - Map metadata, boolean readCompacted, InitialPosition initialPosition, - long startMessageRollbackDurationSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta); + int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, + Map metadata, boolean readCompacted, InitialPosition initialPosition, + long startMessageRollbackDurationSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta, PulsarApi.FilterMeta filterMeta); CompletableFuture createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/BasicAvroFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/BasicAvroFilter.java new file mode 100644 index 0000000000000..0bef1c555869a --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/BasicAvroFilter.java @@ -0,0 +1,33 @@ +package org.apache.pulsar.broker.service.filtering; + +import io.netty.buffer.ByteBuf; +import org.apache.pulsar.client.api.schema.GenericRecord; + +import java.util.Map; +import java.util.Properties; + +public class BasicAvroFilter extends Filter { + + public BasicAvroFilter(Properties props) { + super(props); + } + + @Override + public boolean matches(ByteBuf val) { + byte[] b = new byte[val.readableBytes()]; + val.readBytes(b); + GenericRecord gr = getSchema().decode(b); + + for (Map.Entry entry : properties.entrySet()) { + if (!gr.getField((String) entry.getKey()).equals(entry.getValue())) { + return false; + } + } + return true; + } + + @Override + public boolean isSchemaAware() { + return true; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/BytesPrefixFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/BytesPrefixFilter.java new file mode 100644 index 0000000000000..fe65f9a1f0967 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/BytesPrefixFilter.java @@ -0,0 +1,27 @@ +package org.apache.pulsar.broker.service.filtering; + +import io.netty.buffer.ByteBuf; + +import java.nio.charset.StandardCharsets; +import java.util.Properties; + +public class BytesPrefixFilter extends Filter { + + public static final String BYTES_PREFIX_FILTER_PREFIX = "bytes_prefix_filter_prefix"; + private final byte[] prefix; + + public BytesPrefixFilter(Properties props) { + super(props); + this.prefix = ((String) props.get(BYTES_PREFIX_FILTER_PREFIX)).getBytes(StandardCharsets.UTF_8); + } + + @Override + public boolean matches(ByteBuf val) { + for (int i = 0; i < prefix.length; i++) { + if (val.readByte() != prefix[i]) { + return false; + } + } + return true; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/Filter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/Filter.java new file mode 100644 index 0000000000000..e5ceedeea6e57 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/Filter.java @@ -0,0 +1,31 @@ +package org.apache.pulsar.broker.service.filtering; + +import io.netty.buffer.ByteBuf; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; + +import java.util.Properties; + +public abstract class Filter { + + protected Properties properties; + private Schema schema; + + Filter(Properties props) { + properties = props; + } + + public abstract boolean matches(ByteBuf val); + + public boolean isSchemaAware() { + return false; + } + + final public void setSchema(Schema schema) { + this.schema = schema; + } + + final public Schema getSchema() { + return schema; + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/RegexFilter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/RegexFilter.java new file mode 100644 index 0000000000000..6cce488684d57 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/filtering/RegexFilter.java @@ -0,0 +1,23 @@ +package org.apache.pulsar.broker.service.filtering; + +import io.netty.buffer.ByteBuf; + +import java.nio.charset.StandardCharsets; +import java.util.Properties; +import java.util.regex.Pattern; + +public class RegexFilter extends Filter { + + public static final String REGEX_FILTER_PATTERN_KEY = "regex_filter_pattern_key"; + private final Pattern pat; + + public RegexFilter(Properties props) { + super(props); + pat = Pattern.compile((String) props.get(REGEX_FILTER_PATTERN_KEY)); + } + + @Override + public boolean matches(ByteBuf val) { + return pat.matcher(val.readCharSequence(val.readableBytes(), StandardCharsets.UTF_8)).matches(); + } +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index eb8d9d339e05c..f65171bca647c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -235,9 +235,9 @@ public void handleProducerRemoved(Producer producer) { @Override public CompletableFuture subscribe(final ServerCnx cnx, String subscriptionName, long consumerId, - SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, - Map metadata, boolean readCompacted, InitialPosition initialPosition, - long resetStartMessageBackInSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta) { + SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, + Map metadata, boolean readCompacted, InitialPosition initialPosition, + long resetStartMessageBackInSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta, PulsarApi.FilterMeta filterMeta) { final CompletableFuture future = new CompletableFuture<>(); @@ -288,7 +288,7 @@ public CompletableFuture subscribe(final ServerCnx cnx, String subscri try { Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, - cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta); + cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta, filterMeta); subscription.addConsumer(consumer); if (!cnx.isActive()) { consumer.close(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 8983238d2dbed..98f4a02180358 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -508,9 +508,9 @@ protected void handleProducerRemoved(Producer producer) { @Override public CompletableFuture subscribe(final ServerCnx cnx, String subscriptionName, long consumerId, - SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, - Map metadata, boolean readCompacted, InitialPosition initialPosition, - long startMessageRollbackDurationSec, boolean replicatedSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta) { + SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId, + Map metadata, boolean readCompacted, InitialPosition initialPosition, + long startMessageRollbackDurationSec, boolean replicatedSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta, PulsarApi.FilterMeta filterMeta) { final CompletableFuture future = new CompletableFuture<>(); @@ -595,7 +595,7 @@ public CompletableFuture subscribe(final ServerCnx cnx, String subscri subscriptionFuture.thenAccept(subscription -> { try { Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, - maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta); + maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta, filterMeta); subscription.addConsumer(consumer); checkBackloggedCursors(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java index 8f31a96e26461..2207be55f2ad0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentDispatcherFailoverConsumerTest.java @@ -47,7 +47,6 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -278,7 +277,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception { // 2. Add old consumer Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, - "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null); + "Cons1"/* consumer name */, 50000, serverCnxWithOldVersion, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, null); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); @@ -289,7 +288,7 @@ public void testConsumerGroupChangesWithOldNewConsumers() throws Exception { // 3. Add new consumer Consumer consumer2 = new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, - "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null); + "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, null); pdfc.addConsumer(consumer2); consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); @@ -318,7 +317,7 @@ public void testAddRemoveConsumer() throws Exception { // 2. Add consumer Consumer consumer1 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, null)); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); assertSame(consumers.get(0).consumerName(), consumer1.consumerName()); @@ -342,7 +341,7 @@ public void testAddRemoveConsumer() throws Exception { // 5. Add another consumer which does not change active consumer Consumer consumer2 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null)); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, null)); pdfc.addConsumer(consumer2); consumers = pdfc.getConsumers(); assertSame(pdfc.getActiveConsumer().consumerName(), consumer1.consumerName()); @@ -356,7 +355,7 @@ public void testAddRemoveConsumer() throws Exception { // 6. Add a consumer which changes active consumer Consumer consumer0 = spy(new Consumer(sub, SubType.Exclusive, topic.getName(), 0 /* consumer id */, 0, "Cons0"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, null)); pdfc.addConsumer(consumer0); consumers = pdfc.getConsumers(); assertSame(pdfc.getActiveConsumer().consumerName(), consumer0.consumerName()); @@ -439,7 +438,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception { // 2. Add a consumer Consumer consumer1 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 1, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, null)); pdfc.addConsumer(consumer1); List consumers = pdfc.getConsumers(); assertEquals(1, consumers.size()); @@ -448,7 +447,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception { // 3. Add a consumer with same priority level and consumer name is smaller in lexicographic order. Consumer consumer2 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 1, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null)); + false /* read compacted */, InitialPosition.Latest, null, null)); pdfc.addConsumer(consumer2); // 4. Verify active consumer doesn't change @@ -461,7 +460,7 @@ public void testAddRemoveConsumerNonPartitionedTopic() throws Exception { // 5. Add another consumer which has higher priority level Consumer consumer3 = spy(new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null)); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, null)); pdfc.addConsumer(consumer3); consumers = pdfc.getConsumers(); assertEquals(3, consumers.size()); @@ -651,7 +650,7 @@ private Consumer getNextConsumer(PersistentDispatcherMultipleConsumers dispatche private Consumer createConsumer(int priority, int permit, boolean blocked, int id) throws Exception { Consumer consumer = new Consumer(null, SubType.Shared, "test-topic", id, priority, ""+id, 5000, - serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + serverCnx, "appId", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, null); try { consumer.flowPermits(permit); } catch (Exception e) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java index cfc3937d799d1..e203dd4d9bec4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -122,7 +122,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception { Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/, false /* replicated */, null); + 0 /*avoid reseting cursor*/, false /* replicated */, null, null); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -181,7 +181,7 @@ public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception { Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/, false /* replicated */, null); + 0 /*avoid reseting cursor*/, false /* replicated */, null, null); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -244,7 +244,7 @@ public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception { Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/, false /* replicated */, null); + 0 /*avoid reseting cursor*/, false /* replicated */, null, null); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -303,7 +303,7 @@ public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception { Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/, false /* replicated */, null); + 0 /*avoid reseting cursor*/, false /* replicated */, null, null); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index ae0bd4dc9a248..52c28492559a9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -524,7 +524,7 @@ public void testSubscribeFail() throws Exception { Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/, false, null); + 0 /*avoid reseting cursor*/, false, null, null); try { f1.get(); fail("should fail with exception"); @@ -544,13 +544,13 @@ public void testSubscribeUnsubscribe() throws Exception { // 1. simple subscribe Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/,false, null); + 0 /*avoid reseting cursor*/,false, null, null); f1.get(); // 2. duplicate subscribe Future f2 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/,false, null); + 0 /*avoid reseting cursor*/,false, null, null); try { f2.get(); fail("should fail with exception"); @@ -572,7 +572,7 @@ public void testChangeSubscriptionType() throws Exception { PersistentSubscription sub = new PersistentSubscription(topic, "change-sub-type", cursorMock, false); Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, - "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null); + "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, null); sub.addConsumer(consumer); consumer.close(); @@ -582,7 +582,7 @@ public void testChangeSubscriptionType() throws Exception { Dispatcher previousDispatcher = sub.getDispatcher(); consumer = new Consumer(sub, subType, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, "myrole-1", - Collections.emptyMap(), false, InitialPosition.Latest, null); + Collections.emptyMap(), false, InitialPosition.Latest, null, null); sub.addConsumer(consumer); assertTrue(sub.getDispatcher().isConsumerConnected()); @@ -605,7 +605,7 @@ public void testAddRemoveConsumer() throws Exception { // 1. simple add consumer Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, null); sub.addConsumer(consumer); assertTrue(sub.getDispatcher().isConsumerConnected()); @@ -638,7 +638,7 @@ public void testAddRemoveConsumerDurableCursor() throws Exception { PersistentSubscription sub = new PersistentSubscription(topic, "non-durable-sub", cursorMock, false); Consumer consumer = new Consumer(sub, SubType.Exclusive, topic.getName(), 1, 0, "Cons1", 50000, serverCnx, - "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null); + "myrole-1", Collections.emptyMap(), false, InitialPosition.Latest, null, null); sub.addConsumer(consumer); assertFalse(sub.getDispatcher().isClosed()); @@ -670,14 +670,14 @@ public void testMaxConsumersShared() throws Exception { // 1. add consumer1 Consumer consumer = new Consumer(sub, SubType.Shared, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, null); sub.addConsumer(consumer); assertEquals(sub.getConsumers().size(), 1); // 2. add consumer2 Consumer consumer2 = new Consumer(sub, SubType.Shared, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, null); sub.addConsumer(consumer2); assertEquals(sub.getConsumers().size(), 2); @@ -685,7 +685,7 @@ public void testMaxConsumersShared() throws Exception { try { Consumer consumer3 = new Consumer(sub, SubType.Shared, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, null); sub.addConsumer(consumer3); fail("should have failed"); } catch (BrokerServiceException e) { @@ -698,7 +698,7 @@ public void testMaxConsumersShared() throws Exception { // 4. add consumer4 to sub2 Consumer consumer4 = new Consumer(sub2, SubType.Shared, topic.getName(), 4 /* consumer id */, 0, "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, null); sub2.addConsumer(consumer4); assertEquals(sub2.getConsumers().size(), 1); @@ -709,7 +709,7 @@ public void testMaxConsumersShared() throws Exception { try { Consumer consumer5 = new Consumer(sub2, SubType.Shared, topic.getName(), 5 /* consumer id */, 0, "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, null); sub2.addConsumer(consumer5); fail("should have failed"); } catch (BrokerServiceException e) { @@ -761,14 +761,14 @@ public void testMaxConsumersFailover() throws Exception { // 1. add consumer1 Consumer consumer = new Consumer(sub, SubType.Failover, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, null); sub.addConsumer(consumer); assertEquals(sub.getConsumers().size(), 1); // 2. add consumer2 Consumer consumer2 = new Consumer(sub, SubType.Failover, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, null); sub.addConsumer(consumer2); assertEquals(sub.getConsumers().size(), 2); @@ -776,7 +776,7 @@ public void testMaxConsumersFailover() throws Exception { try { Consumer consumer3 = new Consumer(sub, SubType.Failover, topic.getName(), 3 /* consumer id */, 0, "Cons3"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, null); sub.addConsumer(consumer3); fail("should have failed"); } catch (BrokerServiceException e) { @@ -789,7 +789,7 @@ public void testMaxConsumersFailover() throws Exception { // 4. add consumer4 to sub2 Consumer consumer4 = new Consumer(sub2, SubType.Failover, topic.getName(), 4 /* consumer id */, 0, "Cons4"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, null); sub2.addConsumer(consumer4); assertEquals(sub2.getConsumers().size(), 1); @@ -800,7 +800,7 @@ public void testMaxConsumersFailover() throws Exception { try { Consumer consumer5 = new Consumer(sub2, SubType.Failover, topic.getName(), 5 /* consumer id */, 0, "Cons5"/* consumer name */, 50000, serverCnx, "myrole-1", Collections.emptyMap(), - false /* read compacted */, InitialPosition.Latest, null); + false /* read compacted */, InitialPosition.Latest, null, null); sub2.addConsumer(consumer5); fail("should have failed"); } catch (BrokerServiceException e) { @@ -840,7 +840,7 @@ public void testUbsubscribeRaceConditions() throws Exception { PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); PersistentSubscription sub = new PersistentSubscription(topic, "sub-1", cursorMock, false); Consumer consumer1 = new Consumer(sub, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, null); sub.addConsumer(consumer1); doAnswer(new Answer() { @@ -862,7 +862,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { try { Thread.sleep(10); /* delay to ensure that the ubsubscribe gets executed first */ new Consumer(sub, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, null); } catch (BrokerServiceException e) { assertTrue(e instanceof BrokerServiceException.SubscriptionFencedException); } @@ -893,7 +893,7 @@ public void testDeleteTopic() throws Exception { Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, - 0 /*avoid reseting cursor*/,false /* replicated */, null); + 0 /*avoid reseting cursor*/,false /* replicated */, null, null); f1.get(); assertTrue(topic.delete().isCompletedExceptionally()); @@ -909,7 +909,7 @@ public void testDeleteAndUnsubscribeTopic() throws Exception { Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/,false /* replicated */, null); + 0 /*avoid reseting cursor*/,false /* replicated */, null, null); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -964,7 +964,7 @@ public void testConcurrentTopicAndSubscriptionDelete() throws Exception { Future f1 = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/,false /* replicated */, null); + 0 /*avoid reseting cursor*/,false /* replicated */, null, null); f1.get(); final CyclicBarrier barrier = new CyclicBarrier(2); @@ -1052,7 +1052,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { Future f = topic.subscribe(serverCnx, cmd.getSubscription(), cmd.getConsumerId(), cmd.getSubType(), 0, cmd.getConsumerName(), cmd.getDurable(), null, Collections.emptyMap(), cmd.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/,false /* replicated */, null); + 0 /*avoid reseting cursor*/,false /* replicated */, null, null); try { f.get(); fail("should have failed"); @@ -1181,7 +1181,7 @@ public void testFailoverSubscription() throws Exception { Future f1 = topic1.subscribe(serverCnx, cmd1.getSubscription(), cmd1.getConsumerId(), cmd1.getSubType(), 0, cmd1.getConsumerName(), cmd1.getDurable(), null, Collections.emptyMap(), cmd1.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/,false /* replicated */, null); + 0 /*avoid reseting cursor*/,false /* replicated */, null, null); f1.get(); // 2. Subscribe with partition topic @@ -1194,7 +1194,7 @@ public void testFailoverSubscription() throws Exception { Future f2 = topic2.subscribe(serverCnx, cmd2.getSubscription(), cmd2.getConsumerId(), cmd2.getSubType(), 0, cmd2.getConsumerName(), cmd2.getDurable(), null, Collections.emptyMap(), cmd2.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/, false /* replicated */, null); + 0 /*avoid reseting cursor*/, false /* replicated */, null, null); f2.get(); // 3. Subscribe and create second consumer @@ -1205,7 +1205,7 @@ public void testFailoverSubscription() throws Exception { Future f3 = topic2.subscribe(serverCnx, cmd3.getSubscription(), cmd3.getConsumerId(), cmd3.getSubType(), 0, cmd3.getConsumerName(), cmd3.getDurable(), null, Collections.emptyMap(), cmd3.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/, false /* replicated */, null); + 0 /*avoid reseting cursor*/, false /* replicated */, null, null); f3.get(); assertEquals( @@ -1227,7 +1227,7 @@ public void testFailoverSubscription() throws Exception { Future f4 = topic2.subscribe(serverCnx, cmd4.getSubscription(), cmd4.getConsumerId(), cmd4.getSubType(), 0, cmd4.getConsumerName(), cmd4.getDurable(), null, Collections.emptyMap(), cmd4.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/, false /* replicated */, null); + 0 /*avoid reseting cursor*/, false /* replicated */, null, null); f4.get(); assertEquals( @@ -1254,7 +1254,7 @@ public void testFailoverSubscription() throws Exception { Future f5 = topic2.subscribe(serverCnx, cmd5.getSubscription(), cmd5.getConsumerId(), cmd5.getSubType(), 0, cmd5.getConsumerName(), cmd5.getDurable(), null, Collections.emptyMap(), cmd5.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/,false /* replicated */, null); + 0 /*avoid reseting cursor*/,false /* replicated */, null, null); try { f5.get(); fail("should fail with exception"); @@ -1271,7 +1271,7 @@ public void testFailoverSubscription() throws Exception { Future f6 = topic2.subscribe(serverCnx, cmd6.getSubscription(), cmd6.getConsumerId(), cmd6.getSubType(), 0, cmd6.getConsumerName(), cmd6.getDurable(), null, Collections.emptyMap(), cmd6.getReadCompacted(), InitialPosition.Latest, - 0 /*avoid reseting cursor*/, false /* replicated */, null); + 0 /*avoid reseting cursor*/, false /* replicated */, null, null); f6.get(); // 7. unsubscribe exclusive sub @@ -1529,21 +1529,21 @@ public void testBacklogCursor() throws Exception { ManagedCursor cursor1 = ledger.openCursor("c1"); PersistentSubscription sub1 = new PersistentSubscription(topic, "sub-1", cursor1, false); Consumer consumer1 = new Consumer(sub1, SubType.Exclusive, topic.getName(), 1 /* consumer id */, 0, "Cons1"/* consumer name */, - 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-1", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, null); topic.getSubscriptions().put(Codec.decode(cursor1.getName()), sub1); sub1.addConsumer(consumer1); // Open cursor2, add it into activeCursor-container and add it into subscription consumer list ManagedCursor cursor2 = ledger.openCursor("c2"); PersistentSubscription sub2 = new PersistentSubscription(topic, "sub-2", cursor2, false); Consumer consumer2 = new Consumer(sub2, SubType.Exclusive, topic.getName(), 2 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-2", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-2", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, null); topic.getSubscriptions().put(Codec.decode(cursor2.getName()), sub2); sub2.addConsumer(consumer2); // Open cursor3, add it into activeCursor-container and do not add it into subscription consumer list ManagedCursor cursor3 = ledger.openCursor("c3"); PersistentSubscription sub3 = new PersistentSubscription(topic, "sub-3", cursor3, false); Consumer consumer3 = new Consumer(sub2, SubType.Exclusive, topic.getName(), 3 /* consumer id */, 0, "Cons2"/* consumer name */, - 50000, serverCnx, "myrole-3", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null); + 50000, serverCnx, "myrole-3", Collections.emptyMap(), false /* read compacted */, InitialPosition.Latest, null, null); topic.getSubscriptions().put(Codec.decode(cursor3.getName()), sub3); // Case1: cursors are active as haven't started deactivateBacklogCursor scan diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageFilteringTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageFilteringTest.java new file mode 100644 index 0000000000000..56bf48cfa18e7 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageFilteringTest.java @@ -0,0 +1,138 @@ +package org.apache.pulsar.client.api; + +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; + +import static org.testng.Assert.assertNull; + +public class MessageFilteringTest extends ProducerConsumerBase { + private static final Logger log = LoggerFactory.getLogger(MessageFilteringTest.class); + + private static class SampleAvroPojo { + public String aString; + public Double aDouble; + + public SampleAvroPojo(String string, double doubl) { + aString = string; + aDouble = doubl; + } + } + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @DataProvider + public static Object[][] variationsForExpectedPos() { + return new Object[][]{ + // batching / start-inclusive / num-of-messages + {true, true, 10}, + {true, false, 10}, + {false, true, 10}, + {false, false, 10}, + + {true, true, 100}, + {true, false, 100}, + {false, true, 100}, + {false, false, 100}, + }; + } + + @AfterMethod + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testBytesPrefixFilter() throws Exception { + log.info("-- Starting {} test --", methodName); + + Consumer consumer = pulsarClient.newConsumer() + .topic("persistent://my-property/my-ns/my-topic1") + .messageFilterPolicy(MessageFilterPolicy.bytesPrefixPolicy(new byte[]{'1'})) + .subscriptionName("my-subscriber-name").subscribe(); + + ProducerBuilder producerBuilder = pulsarClient.newProducer() + .topic("persistent://my-property/my-ns/my-topic1"); + + producerBuilder.enableBatching(true); + producerBuilder.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS); + producerBuilder.batchingMaxMessages(5); + + Producer producer = producerBuilder.create(); + for (int i = 0; i < 30; i++) { + String message = i + "-hello-world-" + i; + producer.send(message.getBytes()); + } + + Set messageSet = Sets.newHashSet(); + Message msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "1-hello-world-1"; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + for (int i = 10; i < 20; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + expectedMessage = i + "-hello-world-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + assertNull(consumer.receive(1, TimeUnit.SECONDS)); + consumer.close(); + log.info("-- Exiting {} test --", methodName); + } + + @Test + public void testRegexPrefixFilter() throws Exception { + log.info("-- Starting {} test --", methodName); + + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic1") + .messageFilterPolicy(MessageFilterPolicy.regexFilterPolicy(Pattern.compile("message-[1-5][0-9]"))) + .subscriptionName("my-subscriber-name").subscribe(); + + ProducerBuilder producerBuilder = pulsarClient.newProducer() + .topic("persistent://my-property/my-ns/my-topic1"); + + producerBuilder.enableBatching(true); + producerBuilder.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS); + producerBuilder.batchingMaxMessages(5); + + Producer producer = producerBuilder.create(); + for (int i = 0; i < 100; i++) { + String message = "message-" + i; + producer.send(message.getBytes()); + } + + Set messageSet = Sets.newHashSet(); + Message msg = null; + for (int i = 10; i < 60; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + String receivedMessage = new String(msg.getData()); + log.debug("Received message: [{}]", receivedMessage); + String expectedMessage = "message-" + i; + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + assertNull(consumer.receive(1, TimeUnit.SECONDS)); + consumer.close(); + log.info("-- Exiting {} test --", methodName); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java index 112bbe7cbe25a..66a959cf89bcd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java @@ -18,10 +18,6 @@ */ package org.apache.pulsar.client.api; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - import com.google.common.base.MoreObjects; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -53,6 +49,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.testng.Assert.*; + public class SimpleTypedProducerConsumerTest extends ProducerConsumerBase { private static final Logger log = LoggerFactory.getLogger(SimpleTypedProducerConsumerTest.class); @@ -674,4 +672,60 @@ public void testMessageBuilderLoadConf() throws Exception { } } + @Test + public void testFilteredAvroProducerConsumer() throws Exception { + log.info("-- Starting {} test --", methodName); + + AvroSchema avroSchema = + AvroSchema.of(SchemaDefinition.builder(). + withPojo(AvroEncodedPojo.class).build()); + + Consumer consumer = pulsarClient + .newConsumer(avroSchema) + .messageFilterPolicy(new MessageFilterPolicy("org.apache.pulsar.broker.service.filtering.BasicAvroFilter") { + @Override + public Map getProperties() { + HashMap map = new HashMap<>(); + map.put("message", "my-message-3"); + return map; + } + }) + .topic("persistent://my-property/use/my-ns/my-topic1") + .subscriptionName("my-subscriber-name") + .subscribe(); + + Producer producer = pulsarClient + .newProducer(avroSchema) + .topic("persistent://my-property/use/my-ns/my-topic1") + .create(); + + for (int i = 0; i < 10; i++) { + String message = "my-message-" + i; + producer.send(new AvroEncodedPojo(message)); + } + + Message msg = null; + Set messageSet = Sets.newHashSet(); + for (int i = 3; i < 4; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + AvroEncodedPojo receivedMessage = msg.getValue(); + log.debug("Received message: [{}]", receivedMessage); + AvroEncodedPojo expectedMessage = new AvroEncodedPojo("my-message-" + i); + testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage); + } + // Acknowledge the consumption of all messages at once + consumer.acknowledgeCumulative(msg); + assertNull(consumer.receive(1, TimeUnit.SECONDS)); + consumer.close(); + + SchemaRegistry.SchemaAndMetadata storedSchema = pulsar.getSchemaRegistryService() + .getSchema("my-property/my-ns/my-topic1") + .get(); + + Assert.assertEquals(storedSchema.schema.getData(), avroSchema.getSchemaInfo().getSchema()); + + log.info("-- Exiting {} test --", methodName); + + } + } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index def180760b3d6..88d4e9d64086b 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -577,6 +577,13 @@ public interface ConsumerBuilder extends Cloneable { */ ConsumerBuilder keySharedPolicy(KeySharedPolicy keySharedPolicy); + /** + * Set message filtering policy for consumer. + * @param messageFilterPolicy The {@link MessageFilterPolicy} to use + * @return + */ + ConsumerBuilder messageFilterPolicy(MessageFilterPolicy messageFilterPolicy); + /** * Set the consumer to include the given position of any reset operation like {@link Consumer#seek(long) or * {@link Consumer#seek(MessageId)}}. diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageFilterPolicy.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageFilterPolicy.java new file mode 100644 index 0000000000000..c70049d94aca7 --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/MessageFilterPolicy.java @@ -0,0 +1,56 @@ +package org.apache.pulsar.client.api; + +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.regex.Pattern; + +/** + * This allows message filtering to be configured and happen on the brokers. + */ +public abstract class MessageFilterPolicy { + + protected final String className; + protected final Map properties; + + protected MessageFilterPolicy(String className) { + this.className = className; + this.properties = new HashMap<>(); + } + + public String getFilterClassName() { + return this.className; + } + + public Map getProperties() { + return properties; + } + + public static BytesPrefixFilterPolicy bytesPrefixPolicy(byte[] prefix) { + return new BytesPrefixFilterPolicy(prefix); + } + + public static RegexFilterPolicy regexFilterPolicy(Pattern pattern) { + return new RegexFilterPolicy(pattern); + } + + /** + * This filter simply matches against the bytes of the message. + */ + public static class BytesPrefixFilterPolicy extends MessageFilterPolicy { + public BytesPrefixFilterPolicy(byte[] prefix) { + super("org.apache.pulsar.broker.service.filtering.BytesPrefixFilter"); + properties.put("bytes_prefix_filter_prefix", new String(prefix, StandardCharsets.UTF_8)); + } + } + + /** + * This filter matches against a string or byte[] using a java regex. + */ + public static class RegexFilterPolicy extends MessageFilterPolicy { + public RegexFilterPolicy(Pattern pattern) { + super("org.apache.pulsar.broker.service.filtering.RegexFilter"); + properties.put("regex_filter_pattern_key", pattern.pattern()); + } + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 2134e1ca6fa2b..b37842cc5777d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -32,24 +32,8 @@ import lombok.AccessLevel; import lombok.Getter; import org.apache.commons.lang3.StringUtils; -import org.apache.pulsar.client.api.BatchReceivePolicy; -import org.apache.pulsar.client.api.Consumer; -import org.apache.pulsar.client.api.ConsumerBuilder; -import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; -import org.apache.pulsar.client.api.ConsumerEventListener; -import org.apache.pulsar.client.api.ConsumerInterceptor; -import org.apache.pulsar.client.api.CryptoKeyReader; -import org.apache.pulsar.client.api.DeadLetterPolicy; -import org.apache.pulsar.client.api.KeySharedPolicy; -import org.apache.pulsar.client.api.MessageCrypto; -import org.apache.pulsar.client.api.MessageListener; -import org.apache.pulsar.client.api.PulsarClientException; -import org.apache.pulsar.client.api.RegexSubscriptionMode; +import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.api.PulsarClientException.InvalidConfigurationException; -import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.apache.pulsar.client.api.SubscriptionMode; -import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; import org.apache.pulsar.client.util.RetryMessageUtil; @@ -409,6 +393,12 @@ public ConsumerBuilder keySharedPolicy(KeySharedPolicy keySharedPolicy) { return this; } + @Override + public ConsumerBuilder messageFilterPolicy(MessageFilterPolicy messageFilterPolicy) { + conf.setMessageFilterPolicy(messageFilterPolicy); + return this; + } + @Override public ConsumerBuilder enableRetry(boolean retryEnable) { conf.setRetryEnable(retryEnable); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index e0f1bcb0ecb56..2ec90031c4dde 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -773,7 +773,7 @@ public void connectionOpened(final ClientCnx cnx) { ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, conf.isReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()), - startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy()); + startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(), conf.getMessageFilterPolicy()); if (startMessageIdData != null) { startMessageIdData.recycle(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index eb827035e03ec..31fb32e41ad25 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -34,18 +34,7 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; -import org.apache.pulsar.client.api.BatchReceivePolicy; -import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; -import org.apache.pulsar.client.api.ConsumerEventListener; -import org.apache.pulsar.client.api.CryptoKeyReader; -import org.apache.pulsar.client.api.DeadLetterPolicy; -import org.apache.pulsar.client.api.KeySharedPolicy; -import org.apache.pulsar.client.api.MessageCrypto; -import org.apache.pulsar.client.api.MessageListener; -import org.apache.pulsar.client.api.RegexSubscriptionMode; -import org.apache.pulsar.client.api.SubscriptionInitialPosition; -import org.apache.pulsar.client.api.SubscriptionMode; -import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.*; @Data @NoArgsConstructor @@ -125,6 +114,8 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { private KeySharedPolicy keySharedPolicy; + private MessageFilterPolicy messageFilterPolicy; + @JsonIgnore public String getSingleTopic() { checkArgument(topicNames.size() == 1); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java index 2f822e8bf6c8b..7417661d9e18f 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java @@ -224,6 +224,7 @@ public enum ProtocolVersion v13(13, 13), v14(14, 14), v15(15, 15), + v16(16, 16), ; public static final int v0_VALUE = 0; @@ -242,6 +243,7 @@ public enum ProtocolVersion public static final int v13_VALUE = 13; public static final int v14_VALUE = 14; public static final int v15_VALUE = 15; + public static final int v16_VALUE = 16; public final int getNumber() { return value; } @@ -264,6 +266,7 @@ public static ProtocolVersion valueOf(int value) { case 13: return v13; case 14: return v14; case 15: return v15; + case 16: return v16; default: return null; } } @@ -10803,6 +10806,530 @@ public Builder clearAllowOutOfOrderDelivery() { // @@protoc_insertion_point(class_scope:pulsar.proto.KeySharedMeta) } + public interface FilterMetaOrBuilder + extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder { + + // required string filterClassName = 1; + boolean hasFilterClassName(); + String getFilterClassName(); + + // repeated .pulsar.proto.KeyValue filterProperties = 2; + java.util.List + getFilterPropertiesList(); + org.apache.pulsar.common.api.proto.PulsarApi.KeyValue getFilterProperties(int index); + int getFilterPropertiesCount(); + } + public static final class FilterMeta extends + org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite + implements FilterMetaOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage { + // Use FilterMeta.newBuilder() to construct. + private io.netty.util.Recycler.Handle handle; + private FilterMeta(io.netty.util.Recycler.Handle handle) { + this.handle = handle; + } + + private static final io.netty.util.Recycler RECYCLER = new io.netty.util.Recycler() { + protected FilterMeta newObject(Handle handle) { + return new FilterMeta(handle); + } + }; + + public void recycle() { + this.initFields(); + this.memoizedIsInitialized = -1; + this.bitField0_ = 0; + this.memoizedSerializedSize = -1; + if (handle != null) { RECYCLER.recycle(this, handle); } + } + + private FilterMeta(boolean noInit) {} + + private static final FilterMeta defaultInstance; + public static FilterMeta getDefaultInstance() { + return defaultInstance; + } + + public FilterMeta getDefaultInstanceForType() { + return defaultInstance; + } + + private int bitField0_; + // required string filterClassName = 1; + public static final int FILTERCLASSNAME_FIELD_NUMBER = 1; + private java.lang.Object filterClassName_; + public boolean hasFilterClassName() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getFilterClassName() { + java.lang.Object ref = filterClassName_; + if (ref instanceof String) { + return (String) ref; + } else { + org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString bs = + (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref; + String s = bs.toStringUtf8(); + if (org.apache.pulsar.shaded.com.google.protobuf.v241.Internal.isValidUtf8(bs)) { + filterClassName_ = s; + } + return s; + } + } + private org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString getFilterClassNameBytes() { + java.lang.Object ref = filterClassName_; + if (ref instanceof String) { + org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString b = + org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.copyFromUtf8((String) ref); + filterClassName_ = b; + return b; + } else { + return (org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref; + } + } + + // repeated .pulsar.proto.KeyValue filterProperties = 2; + public static final int FILTERPROPERTIES_FIELD_NUMBER = 2; + private java.util.List filterProperties_; + public java.util.List getFilterPropertiesList() { + return filterProperties_; + } + public java.util.List + getFilterPropertiesOrBuilderList() { + return filterProperties_; + } + public int getFilterPropertiesCount() { + return filterProperties_.size(); + } + public org.apache.pulsar.common.api.proto.PulsarApi.KeyValue getFilterProperties(int index) { + return filterProperties_.get(index); + } + public org.apache.pulsar.common.api.proto.PulsarApi.KeyValueOrBuilder getFilterPropertiesOrBuilder( + int index) { + return filterProperties_.get(index); + } + + private void initFields() { + filterClassName_ = ""; + filterProperties_ = java.util.Collections.emptyList(); + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasFilterClassName()) { + memoizedIsInitialized = 0; + return false; + } + for (int i = 0; i < getFilterPropertiesCount(); i++) { + if (!getFilterProperties(i).isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream output) + throws java.io.IOException { + throw new RuntimeException("Cannot use CodedOutputStream"); + } + + public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getFilterClassNameBytes()); + } + for (int i = 0; i < filterProperties_.size(); i++) { + output.writeMessage(2, filterProperties_.get(i)); + } + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeBytesSize(1, getFilterClassNameBytes()); + } + for (int i = 0; i < filterProperties_.size(); i++) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeMessageSize(2, filterProperties_.get(i)); + } + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta parseFrom( + org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data) + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + throw new RuntimeException("Disabled"); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta parseFrom( + org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString data, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + throw new RuntimeException("Disabled"); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta parseFrom(byte[] data) + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta parseFrom( + byte[] data, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + return newBuilder().mergeFrom(data, extensionRegistry) + .buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta parseFrom(java.io.InputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta parseFrom( + java.io.InputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta parseDelimitedFrom( + java.io.InputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + Builder builder = newBuilder(); + if (builder.mergeDelimitedFrom(input, extensionRegistry)) { + return builder.buildParsed(); + } else { + return null; + } + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta parseFrom( + org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input) + throws java.io.IOException { + return newBuilder().mergeFrom(input).buildParsed(); + } + public static org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta parseFrom( + org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return newBuilder().mergeFrom(input, extensionRegistry) + .buildParsed(); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + public static final class Builder extends + org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite.Builder< + org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta, Builder> + implements org.apache.pulsar.common.api.proto.PulsarApi.FilterMetaOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder { + // Construct using org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta.newBuilder() + private final io.netty.util.Recycler.Handle handle; + private Builder(io.netty.util.Recycler.Handle handle) { + this.handle = handle; + maybeForceBuilderInitialization(); + } + private final static io.netty.util.Recycler RECYCLER = new io.netty.util.Recycler() { + protected Builder newObject(io.netty.util.Recycler.Handle handle) { + return new Builder(handle); + } + }; + + public void recycle() { + clear(); + if (handle != null) {RECYCLER.recycle(this, handle);} + } + + private void maybeForceBuilderInitialization() { + } + private static Builder create() { + return RECYCLER.get(); + } + + public Builder clear() { + super.clear(); + filterClassName_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + filterProperties_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta getDefaultInstanceForType() { + return org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta.getDefaultInstance(); + } + + public org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta build() { + org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + private org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta buildParsed() + throws org.apache.pulsar.shaded.com.google.protobuf.v241.InvalidProtocolBufferException { + org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException( + result).asInvalidProtocolBufferException(); + } + return result; + } + + public org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta buildPartial() { + org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta result = org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta.RECYCLER.get(); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.filterClassName_ = filterClassName_; + if (((bitField0_ & 0x00000002) == 0x00000002)) { + filterProperties_ = java.util.Collections.unmodifiableList(filterProperties_); + bitField0_ = (bitField0_ & ~0x00000002); + } + result.filterProperties_ = filterProperties_; + result.bitField0_ = to_bitField0_; + return result; + } + + public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta other) { + if (other == org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta.getDefaultInstance()) return this; + if (other.hasFilterClassName()) { + setFilterClassName(other.getFilterClassName()); + } + if (!other.filterProperties_.isEmpty()) { + if (filterProperties_.isEmpty()) { + filterProperties_ = other.filterProperties_; + bitField0_ = (bitField0_ & ~0x00000002); + } else { + ensureFilterPropertiesIsMutable(); + filterProperties_.addAll(other.filterProperties_); + } + + } + return this; + } + + public final boolean isInitialized() { + if (!hasFilterClassName()) { + + return false; + } + for (int i = 0; i < getFilterPropertiesCount(); i++) { + if (!getFilterProperties(i).isInitialized()) { + + return false; + } + } + return true; + } + + public Builder mergeFrom(org.apache.pulsar.shaded.com.google.protobuf.v241.CodedInputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + throw new java.io.IOException("Merge from CodedInputStream is disabled"); + } + public Builder mergeFrom( + org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input, + org.apache.pulsar.shaded.com.google.protobuf.v241.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + while (true) { + int tag = input.readTag(); + switch (tag) { + case 0: + + return this; + default: { + if (!input.skipField(tag)) { + + return this; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + filterClassName_ = input.readBytes(); + break; + } + case 18: { + org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.newBuilder(); + input.readMessage(subBuilder, extensionRegistry); + addFilterProperties(subBuilder.buildPartial()); + break; + } + } + } + } + + private int bitField0_; + + // required string filterClassName = 1; + private java.lang.Object filterClassName_ = ""; + public boolean hasFilterClassName() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + public String getFilterClassName() { + java.lang.Object ref = filterClassName_; + if (!(ref instanceof String)) { + String s = ((org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString) ref).toStringUtf8(); + filterClassName_ = s; + return s; + } else { + return (String) ref; + } + } + public Builder setFilterClassName(String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + filterClassName_ = value; + + return this; + } + public Builder clearFilterClassName() { + bitField0_ = (bitField0_ & ~0x00000001); + filterClassName_ = getDefaultInstance().getFilterClassName(); + + return this; + } + void setFilterClassName(org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString value) { + bitField0_ |= 0x00000001; + filterClassName_ = value; + + } + + // repeated .pulsar.proto.KeyValue filterProperties = 2; + private java.util.List filterProperties_ = + java.util.Collections.emptyList(); + private void ensureFilterPropertiesIsMutable() { + if (!((bitField0_ & 0x00000002) == 0x00000002)) { + filterProperties_ = new java.util.ArrayList(filterProperties_); + bitField0_ |= 0x00000002; + } + } + + public java.util.List getFilterPropertiesList() { + return java.util.Collections.unmodifiableList(filterProperties_); + } + public int getFilterPropertiesCount() { + return filterProperties_.size(); + } + public org.apache.pulsar.common.api.proto.PulsarApi.KeyValue getFilterProperties(int index) { + return filterProperties_.get(index); + } + public Builder setFilterProperties( + int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyValue value) { + if (value == null) { + throw new NullPointerException(); + } + ensureFilterPropertiesIsMutable(); + filterProperties_.set(index, value); + + return this; + } + public Builder setFilterProperties( + int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.Builder builderForValue) { + ensureFilterPropertiesIsMutable(); + filterProperties_.set(index, builderForValue.build()); + + return this; + } + public Builder addFilterProperties(org.apache.pulsar.common.api.proto.PulsarApi.KeyValue value) { + if (value == null) { + throw new NullPointerException(); + } + ensureFilterPropertiesIsMutable(); + filterProperties_.add(value); + + return this; + } + public Builder addFilterProperties( + int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyValue value) { + if (value == null) { + throw new NullPointerException(); + } + ensureFilterPropertiesIsMutable(); + filterProperties_.add(index, value); + + return this; + } + public Builder addFilterProperties( + org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.Builder builderForValue) { + ensureFilterPropertiesIsMutable(); + filterProperties_.add(builderForValue.build()); + + return this; + } + public Builder addFilterProperties( + int index, org.apache.pulsar.common.api.proto.PulsarApi.KeyValue.Builder builderForValue) { + ensureFilterPropertiesIsMutable(); + filterProperties_.add(index, builderForValue.build()); + + return this; + } + public Builder addAllFilterProperties( + java.lang.Iterable values) { + ensureFilterPropertiesIsMutable(); + super.addAll(values, filterProperties_); + + return this; + } + public Builder clearFilterProperties() { + filterProperties_ = java.util.Collections.emptyList(); + bitField0_ = (bitField0_ & ~0x00000002); + + return this; + } + public Builder removeFilterProperties(int index) { + ensureFilterPropertiesIsMutable(); + filterProperties_.remove(index); + + return this; + } + + // @@protoc_insertion_point(builder_scope:pulsar.proto.FilterMeta) + } + + static { + defaultInstance = new FilterMeta(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:pulsar.proto.FilterMeta) + } + public interface CommandSubscribeOrBuilder extends org.apache.pulsar.shaded.com.google.protobuf.v241.MessageLiteOrBuilder { @@ -10875,6 +11402,10 @@ public interface CommandSubscribeOrBuilder // optional .pulsar.proto.KeySharedMeta keySharedMeta = 17; boolean hasKeySharedMeta(); org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta getKeySharedMeta(); + + // optional .pulsar.proto.FilterMeta filterMeta = 18; + boolean hasFilterMeta(); + org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta getFilterMeta(); } public static final class CommandSubscribe extends org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite @@ -11246,6 +11777,16 @@ public org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta getKeySharedMe return keySharedMeta_; } + // optional .pulsar.proto.FilterMeta filterMeta = 18; + public static final int FILTERMETA_FIELD_NUMBER = 18; + private org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta filterMeta_; + public boolean hasFilterMeta() { + return ((bitField0_ & 0x00010000) == 0x00010000); + } + public org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta getFilterMeta() { + return filterMeta_; + } + private void initFields() { topic_ = ""; subscription_ = ""; @@ -11264,6 +11805,7 @@ private void initFields() { forceTopicCreation_ = true; startMessageRollbackDurationSec_ = 0L; keySharedMeta_ = org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta.getDefaultInstance(); + filterMeta_ = org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta.getDefaultInstance(); } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { @@ -11314,6 +11856,12 @@ public final boolean isInitialized() { return false; } } + if (hasFilterMeta()) { + if (!getFilterMeta().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + } memoizedIsInitialized = 1; return true; } @@ -11377,6 +11925,9 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr if (((bitField0_ & 0x00008000) == 0x00008000)) { output.writeMessage(17, keySharedMeta_); } + if (((bitField0_ & 0x00010000) == 0x00010000)) { + output.writeMessage(18, filterMeta_); + } } private int memoizedSerializedSize = -1; @@ -11453,6 +12004,10 @@ public int getSerializedSize() { size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream .computeMessageSize(17, keySharedMeta_); } + if (((bitField0_ & 0x00010000) == 0x00010000)) { + size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream + .computeMessageSize(18, filterMeta_); + } memoizedSerializedSize = size; return size; } @@ -11600,6 +12155,8 @@ public Builder clear() { bitField0_ = (bitField0_ & ~0x00008000); keySharedMeta_ = org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta.getDefaultInstance(); bitField0_ = (bitField0_ & ~0x00010000); + filterMeta_ = org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta.getDefaultInstance(); + bitField0_ = (bitField0_ & ~0x00020000); return this; } @@ -11702,6 +12259,10 @@ public org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe buildPartia to_bitField0_ |= 0x00008000; } result.keySharedMeta_ = keySharedMeta_; + if (((from_bitField0_ & 0x00020000) == 0x00020000)) { + to_bitField0_ |= 0x00010000; + } + result.filterMeta_ = filterMeta_; result.bitField0_ = to_bitField0_; return result; } @@ -11766,6 +12327,9 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandSub if (other.hasKeySharedMeta()) { mergeKeySharedMeta(other.getKeySharedMeta()); } + if (other.hasFilterMeta()) { + mergeFilterMeta(other.getFilterMeta()); + } return this; } @@ -11814,6 +12378,12 @@ public final boolean isInitialized() { return false; } } + if (hasFilterMeta()) { + if (!getFilterMeta().isInitialized()) { + + return false; + } + } return true; } @@ -11948,6 +12518,16 @@ public Builder mergeFrom( subBuilder.recycle(); break; } + case 146: { + org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta.newBuilder(); + if (hasFilterMeta()) { + subBuilder.mergeFrom(getFilterMeta()); + } + input.readMessage(subBuilder, extensionRegistry); + setFilterMeta(subBuilder.buildPartial()); + subBuilder.recycle(); + break; + } } } } @@ -12496,6 +13076,49 @@ public Builder clearKeySharedMeta() { return this; } + // optional .pulsar.proto.FilterMeta filterMeta = 18; + private org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta filterMeta_ = org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta.getDefaultInstance(); + public boolean hasFilterMeta() { + return ((bitField0_ & 0x00020000) == 0x00020000); + } + public org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta getFilterMeta() { + return filterMeta_; + } + public Builder setFilterMeta(org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta value) { + if (value == null) { + throw new NullPointerException(); + } + filterMeta_ = value; + + bitField0_ |= 0x00020000; + return this; + } + public Builder setFilterMeta( + org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta.Builder builderForValue) { + filterMeta_ = builderForValue.build(); + + bitField0_ |= 0x00020000; + return this; + } + public Builder mergeFilterMeta(org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta value) { + if (((bitField0_ & 0x00020000) == 0x00020000) && + filterMeta_ != org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta.getDefaultInstance()) { + filterMeta_ = + org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta.newBuilder(filterMeta_).mergeFrom(value).buildPartial(); + } else { + filterMeta_ = value; + } + + bitField0_ |= 0x00020000; + return this; + } + public Builder clearFilterMeta() { + filterMeta_ = org.apache.pulsar.common.api.proto.PulsarApi.FilterMeta.getDefaultInstance(); + + bitField0_ = (bitField0_ & ~0x00020000); + return this; + } + // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe) } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index d8671399b2324..3c5d8ec15aed2 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -42,6 +42,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Triple; import org.apache.pulsar.client.api.KeySharedPolicy; +import org.apache.pulsar.client.api.MessageFilterPolicy; import org.apache.pulsar.client.api.Range; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; import org.apache.pulsar.common.api.AuthData; @@ -546,14 +547,17 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu boolean createTopicIfDoesNotExist) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition, - startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null); + startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null, null); } public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, - SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map metadata, boolean readCompacted, boolean isReplicated, - InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, - SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy) { + SubType subType, int priorityLevel, String consumerName, boolean isDurable, + MessageIdData startMessageId, Map metadata, + boolean readCompacted, boolean isReplicated, + InitialPosition subscriptionInitialPosition, + long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, + boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy, + MessageFilterPolicy filterPolicy) { CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder(); subscribeBuilder.setTopic(topic); subscribeBuilder.setSubscription(subscription); @@ -586,6 +590,14 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu subscribeBuilder.setKeySharedMeta(keySharedMetaBuilder.build()); } + if (filterPolicy != null) { + PulsarApi.FilterMeta.Builder filterMetaBuilder = PulsarApi.FilterMeta.newBuilder(); + filterMetaBuilder.setFilterClassName(filterPolicy.getFilterClassName()); + filterPolicy.getProperties().forEach((s, s2) -> + filterMetaBuilder.addFilterProperties(KeyValue.newBuilder().setKey(s).setValue(s2).build())); + subscribeBuilder.setFilterMeta(filterMetaBuilder.build()); + } + if (startMessageId != null) { subscribeBuilder.setStartMessageId(startMessageId); } diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto index c4acca242bbb1..c18a3b7ae0e75 100644 --- a/pulsar-common/src/main/proto/PulsarApi.proto +++ b/pulsar-common/src/main/proto/PulsarApi.proto @@ -227,6 +227,7 @@ enum ProtocolVersion { v14 = 14; // Add CommandAuthChallenge and CommandAuthResponse for mutual auth // Added Key_Shared subscription v15 = 15; // Add CommandGetOrCreateSchema and CommandGetOrCreateSchemaResponse + v16 = 16; // Add message filtering info to consumer } message CommandConnect { @@ -294,6 +295,11 @@ message KeySharedMeta { optional bool allowOutOfOrderDelivery = 4 [default = false]; } +message FilterMeta { + required string filterClassName = 1; + repeated KeyValue filterProperties = 2; +} + message CommandSubscribe { enum SubType { Exclusive = 0; @@ -310,7 +316,7 @@ message CommandSubscribe { optional string consumer_name = 6; optional int32 priority_level = 7; - // Signal wether the subscription should be backed by a + // Signal whether the subscription should be backed by a // durable cursor or not optional bool durable = 8 [default = true]; @@ -319,7 +325,7 @@ message CommandSubscribe { // will send messages from that point optional MessageIdData start_message_id = 9; - /// Add optional metadata key=value to this consumer + // Add optional metadata key=value to this consumer repeated KeyValue metadata = 10; optional bool read_compacted = 11; @@ -350,6 +356,8 @@ message CommandSubscribe { optional uint64 start_message_rollback_duration_sec = 16 [default = 0]; optional KeySharedMeta keySharedMeta = 17; + + optional FilterMeta filterMeta = 18; } message CommandPartitionedTopicMetadata { From b3dde857420b6ae437a5cb9faac573e15cbfb413 Mon Sep 17 00:00:00 2001 From: seeday Date: Wed, 22 Jul 2020 14:23:02 -0500 Subject: [PATCH 2/2] Isolate filtering a little better --- .../pulsar/broker/service/Consumer.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 9516d59a3b369..707ef57b36e27 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -315,27 +315,29 @@ public ChannelPromise sendMessages(final List entries, EntryBatchSizes ba MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder(); MessageIdData messageId = messageIdBuilder - .setLedgerId(entry.getLedgerId()) - .setEntryId(entry.getEntryId()) - .setPartition(partitionIdx) - .build(); + .setLedgerId(entry.getLedgerId()) + .setEntryId(entry.getEntryId()) + .setPartition(partitionIdx) + .build(); ByteBuf metadataAndPayload = entry.getDataBuffer(); // increment ref-count of data and release at the end of process: so, we can get chance to call entry.release metadataAndPayload.retain(); // take a look at message and filter it out if necessary - metadataAndPayload.markReaderIndex(); - Commands.skipMessageMetadata(metadataAndPayload); - metadataAndPayload.skipBytes(metadataAndPayload.readInt()); - if (this.filter != null && !this.filter.matches(metadataAndPayload)) { - filteredEntries.add(entry.getPosition()); - messageId.recycle(); - messageIdBuilder.recycle(); - entry.release(); - continue; + if (filter != null) { + metadataAndPayload.markReaderIndex(); + Commands.skipMessageMetadata(metadataAndPayload); + metadataAndPayload.skipBytes(metadataAndPayload.readInt()); + if (!filter.matches(metadataAndPayload)) { + filteredEntries.add(entry.getPosition()); + messageId.recycle(); + messageIdBuilder.recycle(); + entry.release(); + continue; + } + metadataAndPayload.resetReaderIndex(); } - metadataAndPayload.resetReaderIndex(); // skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) {