Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,21 @@
*/
package org.apache.pulsar.broker.service;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.lang.reflect.InvocationTargetException;
import java.util.*;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
Expand All @@ -46,12 +44,14 @@
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.service.filtering.Filter;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.PulsarApi.IntRange;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion;
import org.apache.pulsar.common.naming.TopicName;
Expand All @@ -63,6 +63,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ExecutionException;

import static com.google.common.base.Preconditions.checkArgument;

/**
* A Consumer is a consumer currently connected and associated with a Subscription
*/
Expand Down Expand Up @@ -116,6 +120,8 @@ public class Consumer {

private final PulsarApi.KeySharedMeta keySharedMeta;

private final Filter filter;

/**
* It starts keep tracking the average messages per entry.
* The initial value is 1000, when new value comes, it will update with
Expand All @@ -132,7 +138,7 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
int priorityLevel, String consumerName,
int maxUnackedMessages, ServerCnx cnx, String appId,
Map<String, String> metadata, boolean readCompacted, InitialPosition subscriptionInitialPosition,
PulsarApi.KeySharedMeta keySharedMeta) throws BrokerServiceException {
PulsarApi.KeySharedMeta keySharedMeta, PulsarApi.FilterMeta filterMeta) throws BrokerServiceException {

this.subscription = subscription;
this.subType = subType;
Expand Down Expand Up @@ -169,12 +175,37 @@ public Consumer(Subscription subscription, SubType subType, String topicName, lo
stats.setClientVersion(cnx.getClientVersion());
stats.metadata = this.metadata;

// no multi-version support, per message schema, etc
// gotta check how pulsar client consumers themselves do it

if (Subscription.isIndividualAckMode(subType)) {
this.pendingAcks = new ConcurrentLongLongPairHashMap(256, 1);
} else {
// We don't need to keep track of pending acks if the subscription is not shared
this.pendingAcks = null;
}
if (filterMeta != null) {
Filter tempFilter = null;
try {
Properties filterProperties = new Properties();
filterMeta.getFilterPropertiesList().forEach(kv -> filterProperties.put(kv.getKey(), kv.getValue()));

Class filterClazz = Class.forName(filterMeta.getFilterClassName());
tempFilter = (Filter) filterClazz.getConstructor(Properties.class).newInstance(filterProperties);
if (tempFilter.isSchemaAware()) {
Schema<GenericRecord> genericRecordSchema = Schema.AUTO_CONSUME();
genericRecordSchema.configureSchemaInfo(topicName, "", cnx.getBrokerService().getPulsar().getSchemaRegistryService()
.getSchema(TopicName.get(topicName).getSchemaName()).get().schema.toSchemaInfo());
tempFilter.setSchema(genericRecordSchema);
}
} catch (InstantiationException | InvocationTargetException | NoSuchMethodException | IllegalAccessException | ClassNotFoundException | NullPointerException | InterruptedException | ExecutionException e) {
log.warn("Setting up filtered schema on " + topicName + " for consumer " + consumerName + " failed", e);
ctx().writeAndFlush(Commands.newError(0, PulsarApi.ServerError.UnknownError, e.getCause().getMessage()));
}
filter = tempFilter;
} else {
filter = null;
}
}

public SubType subType() {
Expand Down Expand Up @@ -263,6 +294,7 @@ public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes ba
chuckedMessageRate.recordMultipleEvents(totalChunkedMessages, 0);

ctx.channel().eventLoop().execute(() -> {
ArrayList<Position> filteredEntries = new ArrayList<>();
for (int i = 0; i < entries.size(); i++) {
Entry entry = entries.get(i);
if (entry == null) {
Expand All @@ -283,14 +315,30 @@ public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes ba

MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
MessageIdData messageId = messageIdBuilder
.setLedgerId(entry.getLedgerId())
.setEntryId(entry.getEntryId())
.setPartition(partitionIdx)
.build();
.setLedgerId(entry.getLedgerId())
.setEntryId(entry.getEntryId())
.setPartition(partitionIdx)
.build();

ByteBuf metadataAndPayload = entry.getDataBuffer();
// increment ref-count of data and release at the end of process: so, we can get chance to call entry.release
metadataAndPayload.retain();

// take a look at message and filter it out if necessary
if (filter != null) {
metadataAndPayload.markReaderIndex();
Commands.skipMessageMetadata(metadataAndPayload);
metadataAndPayload.skipBytes(metadataAndPayload.readInt());
if (!filter.matches(metadataAndPayload)) {
filteredEntries.add(entry.getPosition());
messageId.recycle();
messageIdBuilder.recycle();
entry.release();
continue;
}
metadataAndPayload.resetReaderIndex();
}

// skip checksum by incrementing reader-index if consumer-client doesn't support checksum verification
if (cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v11.getNumber()) {
Commands.skipChecksumIfPresent(metadataAndPayload);
Expand All @@ -313,6 +361,9 @@ public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes ba
entry.release();
}

if (filteredEntries.size() > 0) {
subscription.acknowledgeMessage(filteredEntries, AckType.Individual, null);
}
// Use an empty write here so that we can just tie the flush with the write promise for last entry
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER, writePromise);
batchSizes.recyle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
final boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.getReplicateSubscriptionState();
final boolean forceTopicCreation = subscribe.getForceTopicCreation();
final PulsarApi.KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? subscribe.getKeySharedMeta() : null;
final PulsarApi.FilterMeta filterMeta = subscribe.hasFilterMeta() ? subscribe.getFilterMeta() : null;

CompletableFuture<Boolean> isProxyAuthorizedFuture;
if (service.isAuthorizationEnabled() && originalPrincipal != null) {
Expand Down Expand Up @@ -861,12 +862,12 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {
.thenCompose(v -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata,
readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated, keySharedMeta));
readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated, keySharedMeta, filterMeta));
} else {
return topic.subscribe(ServerCnx.this, subscriptionName, consumerId,
subType, priorityLevel, consumerName, isDurable,
startMessageId, metadata, readCompacted, initialPosition,
startMessageRollbackDurationSec, isReplicated, keySharedMeta);
startMessageRollbackDurationSec, isReplicated, keySharedMeta, filterMeta);
}
})
.thenAccept(consumer -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ default long getOriginalHighestSequenceId() {
void recordAddLatency(long latency, TimeUnit unit);

CompletableFuture<Consumer> subscribe(ServerCnx cnx, String subscriptionName, long consumerId, SubType subType,
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
long startMessageRollbackDurationSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta);
int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
long startMessageRollbackDurationSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta, PulsarApi.FilterMeta filterMeta);

CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
boolean replicateSubscriptionState);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package org.apache.pulsar.broker.service.filtering;

import io.netty.buffer.ByteBuf;
import org.apache.pulsar.client.api.schema.GenericRecord;

import java.util.Map;
import java.util.Properties;

public class BasicAvroFilter extends Filter {

public BasicAvroFilter(Properties props) {
super(props);
}

@Override
public boolean matches(ByteBuf val) {
byte[] b = new byte[val.readableBytes()];
val.readBytes(b);
GenericRecord gr = getSchema().decode(b);

for (Map.Entry<Object, Object> entry : properties.entrySet()) {
if (!gr.getField((String) entry.getKey()).equals(entry.getValue())) {
return false;
}
}
return true;
}

@Override
public boolean isSchemaAware() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package org.apache.pulsar.broker.service.filtering;

import io.netty.buffer.ByteBuf;

import java.nio.charset.StandardCharsets;
import java.util.Properties;

public class BytesPrefixFilter extends Filter {

public static final String BYTES_PREFIX_FILTER_PREFIX = "bytes_prefix_filter_prefix";
private final byte[] prefix;

public BytesPrefixFilter(Properties props) {
super(props);
this.prefix = ((String) props.get(BYTES_PREFIX_FILTER_PREFIX)).getBytes(StandardCharsets.UTF_8);
}

@Override
public boolean matches(ByteBuf val) {
for (int i = 0; i < prefix.length; i++) {
if (val.readByte() != prefix[i]) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.apache.pulsar.broker.service.filtering;

import io.netty.buffer.ByteBuf;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.GenericRecord;

import java.util.Properties;

public abstract class Filter {

protected Properties properties;
private Schema<GenericRecord> schema;

Filter(Properties props) {
properties = props;
}

public abstract boolean matches(ByteBuf val);

public boolean isSchemaAware() {
return false;
}

final public void setSchema(Schema<GenericRecord> schema) {
this.schema = schema;
}

final public Schema<GenericRecord> getSchema() {
return schema;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.pulsar.broker.service.filtering;

import io.netty.buffer.ByteBuf;

import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.regex.Pattern;

public class RegexFilter extends Filter {

public static final String REGEX_FILTER_PATTERN_KEY = "regex_filter_pattern_key";
private final Pattern pat;

public RegexFilter(Properties props) {
super(props);
pat = Pattern.compile((String) props.get(REGEX_FILTER_PATTERN_KEY));
}

@Override
public boolean matches(ByteBuf val) {
return pat.matcher(val.readCharSequence(val.readableBytes(), StandardCharsets.UTF_8)).matches();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,9 @@ public void handleProducerRemoved(Producer producer) {

@Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
long resetStartMessageBackInSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta) {
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
long resetStartMessageBackInSec, boolean replicateSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta, PulsarApi.FilterMeta filterMeta) {

final CompletableFuture<Consumer> future = new CompletableFuture<>();

Expand Down Expand Up @@ -288,7 +288,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri

try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0,
cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta, filterMeta);
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
consumer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,9 +508,9 @@ protected void handleProducerRemoved(Producer producer) {

@Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
long startMessageRollbackDurationSec, boolean replicatedSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta) {
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
long startMessageRollbackDurationSec, boolean replicatedSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta, PulsarApi.FilterMeta filterMeta) {

final CompletableFuture<Consumer> future = new CompletableFuture<>();

Expand Down Expand Up @@ -595,7 +595,7 @@ public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscri
subscriptionFuture.thenAccept(subscription -> {
try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta);
maxUnackedMessages, cnx, cnx.getRole(), metadata, readCompacted, initialPosition, keySharedMeta, filterMeta);
subscription.addConsumer(consumer);

checkBackloggedCursors();
Expand Down
Loading